Skip to main content

iroh_services/
client.rs

1use std::{
2    collections::BTreeMap,
3    str::FromStr,
4    sync::{Arc, RwLock},
5};
6
7use anyhow::{Result, anyhow, ensure};
8use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
9use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder};
10use irpc_iroh::IrohLazyRemoteConnection;
11use n0_error::StackResultExt;
12use n0_future::{task::AbortOnDropHandle, time::Duration};
13use rcan::Rcan;
14use tokio::sync::oneshot;
15use tracing::{debug, trace, warn};
16use uuid::Uuid;
17
18use crate::{
19    api_secret::ApiSecret,
20    caps::Caps,
21    net_diagnostics::{DiagnosticsReport, checks::run_diagnostics},
22    protocol::{
23        ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutMetrics,
24        PutNetworkDiagnostics, RemoteError, SetAttributes, SetGroup,
25    },
26};
27
28/// Client is the main handle for interacting with iroh-services. It communicates with
29/// iroh-services entirely through an iroh endpoint, and is configured through a builder.
30/// Client requires either an Ssh Key or [`ApiSecret`]
31///
32/// ```no_run
33/// use iroh::{Endpoint, endpoint::presets};
34/// use iroh_services::Client;
35///
36/// async fn build_client() -> anyhow::Result<()> {
37///     let endpoint = Endpoint::bind(presets::N0).await?;
38///
39///     // needs IROH_SERVICES_API_SECRET set to an environment variable
40///     // client will now push endpoint metrics to iroh-services.
41///     let client = Client::builder(&endpoint)
42///         .api_secret_from_str("MY_API_SECRET")?
43///         .build()
44///         .await;
45///
46///     Ok(())
47/// }
48/// ```
49///
50/// [`ApiSecret`]: crate::api_secret::ApiSecret
51#[derive(Debug, Clone)]
52pub struct Client {
53    // owned clone of the endpoint for diagnostics, and for connection restarts on actor close
54    #[allow(dead_code)]
55    endpoint: Endpoint,
56    message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
57    _actor_task: Arc<AbortOnDropHandle<()>>,
58}
59
60/// ClientBuilder provides configures and builds a iroh-services client, typically
61/// created with [`Client::builder`]
62pub struct ClientBuilder {
63    #[allow(dead_code)]
64    cap_expiry: Duration,
65    cap: Option<Rcan<Caps>>,
66    endpoint: Endpoint,
67    name: Option<String>,
68    group: Option<String>,
69    attributes: Option<BTreeMap<String, String>>,
70    metrics_interval: Option<Duration>,
71    remote: Option<EndpointAddr>,
72    registry: Registry,
73}
74
75const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 1 month
76pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET";
77
78impl ClientBuilder {
79    pub fn new(endpoint: &Endpoint) -> Self {
80        let mut registry = Registry::default();
81        registry.register_all(endpoint.metrics());
82
83        Self {
84            cap: None,
85            cap_expiry: DEFAULT_CAP_EXPIRY,
86            endpoint: endpoint.clone(),
87            name: None,
88            group: None,
89            attributes: None,
90            metrics_interval: Some(Duration::from_secs(60)),
91            remote: None,
92            registry,
93        }
94    }
95
96    /// Register a metrics group to forward to iroh-services
97    ///
98    /// The default registered metrics uses only the endpoint
99    pub fn register_metrics_group(mut self, metrics_group: Arc<dyn MetricsGroup>) -> Self {
100        self.registry.register(metrics_group);
101        self
102    }
103
104    /// Set the metrics collection interval
105    ///
106    /// Defaults to enabled, every 60 seconds.
107    pub fn metrics_interval(mut self, interval: Duration) -> Self {
108        self.metrics_interval = Some(interval);
109        self
110    }
111
112    /// Disable metrics collection.
113    pub fn disable_metrics_interval(mut self) -> Self {
114        self.metrics_interval = None;
115        self
116    }
117
118    /// Set an optional human-readable name for the endpoint the client is
119    /// constructed with, making metrics from this endpoint easier to identify.
120    /// This is often used for associating with other services in your app,
121    /// like a database user id, machine name, permanent username, etc.
122    ///
123    /// When this builder method is called, the provided name is sent after the
124    /// client initially authenticates the endpoint server-side.
125    /// Errors will not interrupt client construction, instead producing a
126    /// warn-level log. For explicit error handling, use [`Client::set_name`].
127    ///
128    /// names can be any UTF-8 string, with a min length of 2 bytes, and
129    /// maximum length of 128 bytes. **name uniqueness is not enforced
130    /// server-side**, which means using the same name for different endpoints
131    /// will not produce an error
132    pub fn name(mut self, name: impl Into<String>) -> Result<Self> {
133        let name = name.into();
134        validate_name(&name).map_err(BuildError::InvalidName)?;
135        self.name = Some(name);
136        Ok(self)
137    }
138
139    /// Attach the endpoint to a single named group when the client first
140    /// authenticates. Group names follow the same rules as endpoint names
141    /// (2–128 bytes UTF-8). Errors during startup propagate as warn-level
142    /// logs; for explicit error handling use [`Client::set_group`].
143    pub fn group(mut self, group: impl Into<String>) -> Result<Self> {
144        let group = group.into();
145        validate_name(&group).map_err(BuildError::InvalidGroup)?;
146        self.group = Some(group);
147        Ok(self)
148    }
149
150    /// Attach arbitrary key-value attributes to the endpoint when the client
151    /// first authenticates. Accepts any iterable of `(key, value)` pairs:
152    ///
153    /// ```no_run
154    /// # use iroh::{Endpoint, endpoint::presets};
155    /// # use iroh_services::Client;
156    /// # async fn example(endpoint: &Endpoint) -> anyhow::Result<()> {
157    /// let _ = Client::builder(endpoint).attributes([("env", "prod"), ("region", "us-west")])?;
158    /// # Ok(()) }
159    /// ```
160    ///
161    /// Keys follow the same length rules as endpoint names (2–128 bytes);
162    /// values may be empty and are capped at 128 bytes; the map is limited
163    /// to 128 entries. Errors during startup propagate as warn-level logs;
164    /// for explicit error handling use [`Client::set_attributes`].
165    pub fn attributes<I, K, V>(mut self, attrs: I) -> Result<Self>
166    where
167        I: IntoIterator<Item = (K, V)>,
168        K: Into<String>,
169        V: Into<String>,
170    {
171        let collected: BTreeMap<String, String> = attrs
172            .into_iter()
173            .map(|(k, v)| (k.into(), v.into()))
174            .collect();
175        validate_attributes(&collected).map_err(BuildError::InvalidAttributes)?;
176        self.attributes = Some(collected);
177        Ok(self)
178    }
179
180    /// Check IROH_SERVICES_API_SECRET environment variable for a valid API secret
181    pub fn api_secret_from_env(self) -> Result<Self> {
182        let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
183        self.api_secret(ticket)
184    }
185
186    /// set client API secret from an encoded string
187    pub fn api_secret_from_str(self, secret_key: &str) -> Result<Self> {
188        let key = ApiSecret::from_str(secret_key).context("invalid iroh services api secret")?;
189        self.api_secret(key)
190    }
191
192    /// Use a shared secret & remote iroh-services endpoint ID contained within a ticket
193    /// to construct a iroh-services client. The resulting client will have "Client"
194    /// capabilities.
195    ///
196    /// API secrets include remote details within them, and will set both the
197    /// remote and rcan values on the builder
198    pub fn api_secret(mut self, ticket: ApiSecret) -> Result<Self> {
199        let local_id = self.endpoint.id();
200        let rcan = crate::caps::create_api_token_from_secret_key(
201            ticket.secret,
202            local_id,
203            self.cap_expiry,
204            Caps::for_shared_secret(),
205        )?;
206
207        self.remote = Some(ticket.remote);
208        self.rcan(rcan)
209    }
210
211    /// Loads the private ssh key from the given path, and creates the needed capability.
212    ///
213    /// The file must contain an unencrypted PEM-encoded OpenSSH ed25519 private key.
214    #[cfg(not(target_arch = "wasm32"))]
215    pub async fn ssh_key_from_file<P: AsRef<std::path::Path>>(self, path: P) -> Result<Self> {
216        let file_content = tokio::fs::read_to_string(path).await?;
217        self.ssh_key(&file_content)
218    }
219
220    /// Creates the capability from the provided PEM-encoded OpenSSH ed25519 private key.
221    #[cfg(not(target_arch = "wasm32"))]
222    pub fn ssh_key(mut self, pem: &str) -> Result<Self> {
223        let local_id = self.endpoint.id();
224        let rcan = crate::caps::create_api_token_from_openssh_pem(
225            pem,
226            local_id,
227            self.cap_expiry,
228            Caps::all(),
229        )?;
230        self.cap.replace(rcan);
231
232        Ok(self)
233    }
234
235    /// Sets the rcan directly.
236    pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
237        ensure!(
238            EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
239            "invalid audience"
240        );
241        self.cap.replace(cap);
242        Ok(self)
243    }
244
245    /// Sets the remote to dial, must be provided either directly by calling
246    /// this method, or through calling the api_secret builder methods.
247    pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
248        self.remote = Some(remote.into());
249        self
250    }
251
252    /// Create a new client, connected to the provide service node
253    #[must_use = "dropping the client will silently cancel all client tasks"]
254    pub async fn build(self) -> Result<Client, BuildError> {
255        debug!("starting iroh-services client");
256        let remote = self.remote.ok_or(BuildError::MissingRemote)?;
257        let capabilities = self.cap.ok_or(BuildError::MissingCapability)?;
258
259        let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
260        let irpc_client = IrohServicesClient::boxed(conn);
261
262        let (tx, rx) = tokio::sync::mpsc::channel(1);
263        let actor_task = AbortOnDropHandle::new(n0_future::task::spawn(
264            ClientActor {
265                capabilities,
266                client: irpc_client,
267                name: self.name.clone(),
268                group: self.group.clone(),
269                attributes: self.attributes.clone().unwrap_or_default(),
270                session_id: Uuid::new_v4(),
271                authorized: false,
272            }
273            .run(
274                self.name,
275                self.group,
276                self.attributes,
277                self.registry,
278                self.metrics_interval,
279                rx,
280            ),
281        ));
282
283        Ok(Client {
284            endpoint: self.endpoint,
285            message_channel: tx,
286            _actor_task: Arc::new(actor_task),
287        })
288    }
289}
290
291#[derive(thiserror::Error, Debug)]
292pub enum BuildError {
293    #[error("Missing remote endpoint to dial")]
294    MissingRemote,
295    #[error("Missing capability")]
296    MissingCapability,
297    #[error("Unauthorized")]
298    Unauthorized,
299    #[error("Remote error: {0}")]
300    Remote(#[from] RemoteError),
301    #[error("Rpc connection error: {0}")]
302    Rpc(irpc::Error),
303    #[error("Connection error: {0}")]
304    Connect(ConnectError),
305    #[error("Invalid endpoint name: {0}")]
306    InvalidName(#[from] ValidateNameError),
307    #[error("Invalid endpoint group: {0}")]
308    InvalidGroup(ValidateNameError),
309    #[error("Invalid endpoint attributes: {0}")]
310    InvalidAttributes(#[from] ValidateAttributesError),
311}
312
313impl From<irpc::Error> for BuildError {
314    fn from(value: irpc::Error) -> Self {
315        match value {
316            irpc::Error::Request {
317                source:
318                    irpc::RequestError::Connection {
319                        source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
320                        ..
321                    },
322                ..
323            } if frame.error_code == 401u32.into() => Self::Unauthorized,
324            value => Self::Rpc(value),
325        }
326    }
327}
328
329/// Minimum length in bytes for an endpoint name.
330pub const CLIENT_NAME_MIN_LENGTH: usize = 2;
331/// Maximum length in bytes for an endpoint name.
332pub const CLIENT_NAME_MAX_LENGTH: usize = 128;
333
334/// Error returned when an endpoint name fails validation.
335#[derive(Debug, thiserror::Error)]
336pub enum ValidateNameError {
337    #[error("Name is too long (must be no more than {CLIENT_NAME_MAX_LENGTH} characters).")]
338    TooLong,
339    #[error("Name is too short (must be at least {CLIENT_NAME_MIN_LENGTH} characters).")]
340    TooShort,
341}
342
343fn validate_name(name: &str) -> Result<(), ValidateNameError> {
344    if name.len() < CLIENT_NAME_MIN_LENGTH {
345        Err(ValidateNameError::TooShort)
346    } else if name.len() > CLIENT_NAME_MAX_LENGTH {
347        Err(ValidateNameError::TooLong)
348    } else {
349        Ok(())
350    }
351}
352
353/// Maximum length in bytes for an attribute value. Values may be empty.
354pub const CLIENT_ATTRIBUTE_VALUE_MAX_LENGTH: usize = 128;
355/// Maximum number of entries allowed in the attributes map.
356pub const CLIENT_ATTRIBUTES_MAX_COUNT: usize = 128;
357
358/// Error returned when an attributes map fails validation.
359#[derive(Debug, thiserror::Error)]
360pub enum ValidateAttributesError {
361    #[error("Too many attributes (must be no more than {CLIENT_ATTRIBUTES_MAX_COUNT}).")]
362    TooManyEntries,
363    #[error("Invalid attribute key: {0}")]
364    InvalidKey(#[from] ValidateNameError),
365    #[error(
366        "Attribute value too long (must be no more than {CLIENT_ATTRIBUTE_VALUE_MAX_LENGTH} bytes)."
367    )]
368    ValueTooLong,
369}
370
371fn validate_attributes(attrs: &BTreeMap<String, String>) -> Result<(), ValidateAttributesError> {
372    if attrs.len() > CLIENT_ATTRIBUTES_MAX_COUNT {
373        return Err(ValidateAttributesError::TooManyEntries);
374    }
375    for (k, v) in attrs {
376        validate_name(k)?;
377        if v.len() > CLIENT_ATTRIBUTE_VALUE_MAX_LENGTH {
378            return Err(ValidateAttributesError::ValueTooLong);
379        }
380    }
381    Ok(())
382}
383
384#[derive(thiserror::Error, Debug)]
385pub enum Error {
386    #[error("Invalid endpoint name: {0}")]
387    InvalidName(#[from] ValidateNameError),
388    #[error("Invalid endpoint group: {0}")]
389    InvalidGroup(ValidateNameError),
390    #[error("Invalid endpoint attributes: {0}")]
391    InvalidAttributes(#[from] ValidateAttributesError),
392    #[error("Remote error: {0}")]
393    Remote(#[from] RemoteError),
394    #[error("Connection error: {0}")]
395    Rpc(#[from] irpc::Error),
396    #[error(transparent)]
397    Other(#[from] anyhow::Error),
398}
399
400impl Client {
401    pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
402        ClientBuilder::new(endpoint)
403    }
404
405    /// Read the current endpoint name from the local client.
406    pub async fn name(&self) -> Result<Option<String>, Error> {
407        let (tx, rx) = oneshot::channel();
408        self.message_channel
409            .send(ClientActorMessage::ReadName { done: tx })
410            .await
411            .map_err(|_| Error::Other(anyhow!("sending name read request")))?;
412
413        rx.await
414            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))
415    }
416
417    /// Name the active endpoint cloud-side.
418    ///
419    /// names can be any UTF-8 string, with a min length of 2 bytes, and
420    /// maximum length of 128 bytes. **name uniqueness is not enforced.**
421    pub async fn set_name(&self, name: impl Into<String>) -> Result<(), Error> {
422        set_name_inner(self.message_channel.clone(), name.into()).await
423    }
424
425    /// Attach the active endpoint to a single named group cloud-side.
426    ///
427    /// Group names follow the same rules as endpoint names: any UTF-8 string,
428    /// minimum 2 bytes, maximum 128 bytes. **group uniqueness is not enforced.**
429    pub async fn set_group(&self, group: impl Into<String>) -> Result<(), Error> {
430        set_group_inner(self.message_channel.clone(), group.into()).await
431    }
432
433    /// Replace the arbitrary key-value attributes on the active endpoint cloud-side.
434    ///
435    /// Accepts any iterable of `(key, value)` pairs (arrays of tuples, `Vec`s,
436    /// `HashMap`s, `BTreeMap`s, etc.), so most calls fit on a single line:
437    ///
438    /// ```no_run
439    /// # use iroh_services::Client;
440    /// # async fn example(client: Client) -> anyhow::Result<()> {
441    /// client
442    ///     .set_attributes([("env", "prod"), ("region", "us-west")])
443    ///     .await?;
444    /// # Ok(()) }
445    /// ```
446    ///
447    /// Keys follow the same rules as endpoint names (2–128 bytes). Values may
448    /// be empty and are limited to 128 bytes. The map is limited to 128
449    /// entries. Each call fully replaces the prior set; passing an empty
450    /// iterator clears all attributes.
451    pub async fn set_attributes<I, K, V>(&self, attrs: I) -> Result<(), Error>
452    where
453        I: IntoIterator<Item = (K, V)>,
454        K: Into<String>,
455        V: Into<String>,
456    {
457        let collected: BTreeMap<String, String> = attrs
458            .into_iter()
459            .map(|(k, v)| (k.into(), v.into()))
460            .collect();
461        set_attributes_inner(self.message_channel.clone(), collected).await
462    }
463
464    /// Pings the remote node.
465    pub async fn ping(&self) -> Result<Pong, Error> {
466        let (tx, rx) = oneshot::channel();
467        self.message_channel
468            .send(ClientActorMessage::Ping { done: tx })
469            .await
470            .map_err(|_| Error::Other(anyhow!("sending ping request")))?;
471
472        rx.await
473            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
474            .map_err(Error::Remote)
475    }
476
477    /// immediately send a single dump of metrics to iroh-services. It's not necessary
478    /// to call this function if you're using a non-zero metrics interval,
479    /// which will automatically propagate metrics on the set interval for you
480    pub async fn push_metrics(&self) -> Result<(), Error> {
481        let (tx, rx) = oneshot::channel();
482        self.message_channel
483            .send(ClientActorMessage::SendMetrics { done: tx })
484            .await
485            .map_err(|_| Error::Other(anyhow!("sending metrics")))?;
486
487        rx.await
488            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
489            .map_err(Error::Remote)
490    }
491
492    /// Grant capabilities to a remote endpoint. Creates a signed RCAN token
493    /// and sends it to iroh-services for storage. The remote can then use this token
494    /// when dialing back to authorize its requests.
495    pub async fn grant_capability(
496        &self,
497        remote_id: EndpointId,
498        caps: impl IntoIterator<Item = impl Into<crate::caps::Cap>>,
499    ) -> Result<(), Error> {
500        let cap = crate::caps::create_grant_token(
501            self.endpoint.secret_key().clone(),
502            remote_id,
503            DEFAULT_CAP_EXPIRY,
504            Caps::new(caps),
505        )
506        .map_err(Error::Other)?;
507
508        let (tx, rx) = oneshot::channel();
509        self.message_channel
510            .send(ClientActorMessage::GrantCap {
511                cap: Box::new(cap),
512                done: tx,
513            })
514            .await
515            .map_err(|_| Error::Other(anyhow!("granting capability")))?;
516
517        rx.await
518            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
519    }
520
521    /// run local network status diagnostics, optionally uploading the results
522    pub async fn net_diagnostics(&self, send: bool) -> Result<DiagnosticsReport, Error> {
523        let report = run_diagnostics(&self.endpoint).await?;
524        if send {
525            let (tx, rx) = oneshot::channel();
526            self.message_channel
527                .send(ClientActorMessage::PutNetworkDiagnostics {
528                    done: tx,
529                    report: Box::new(report.clone()),
530                })
531                .await
532                .map_err(|_| Error::Other(anyhow!("sending network diagnostics report")))?;
533
534            let _ = rx
535                .await
536                .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
537        }
538
539        Ok(report)
540    }
541}
542
543enum ClientActorMessage {
544    SendMetrics {
545        done: oneshot::Sender<Result<(), RemoteError>>,
546    },
547    Ping {
548        done: oneshot::Sender<Result<Pong, RemoteError>>,
549    },
550    // GrantCap is used by the `client_host` feature flag
551    #[allow(dead_code)]
552    GrantCap {
553        // boxed to avoid large enum variants
554        cap: Box<Rcan<Caps>>,
555        done: oneshot::Sender<Result<(), Error>>,
556    },
557    PutNetworkDiagnostics {
558        report: Box<DiagnosticsReport>,
559        done: oneshot::Sender<Result<(), Error>>,
560    },
561    ReadName {
562        done: oneshot::Sender<Option<String>>,
563    },
564    NameEndpoint {
565        name: String,
566        done: oneshot::Sender<Result<(), RemoteError>>,
567    },
568    SetGroup {
569        group: String,
570        done: oneshot::Sender<Result<(), RemoteError>>,
571    },
572    SetAttributes {
573        attributes: BTreeMap<String, String>,
574        done: oneshot::Sender<Result<(), RemoteError>>,
575    },
576}
577
578struct ClientActor {
579    capabilities: Rcan<Caps>,
580    client: IrohServicesClient,
581    name: Option<String>,
582    group: Option<String>,
583    attributes: BTreeMap<String, String>,
584    session_id: Uuid,
585    authorized: bool,
586}
587
588impl ClientActor {
589    async fn run(
590        mut self,
591        initial_name: Option<String>,
592        initial_group: Option<String>,
593        initial_attributes: Option<BTreeMap<String, String>>,
594        registry: Registry,
595        interval: Option<Duration>,
596        mut inbox: tokio::sync::mpsc::Receiver<ClientActorMessage>,
597    ) {
598        let registry = Arc::new(RwLock::new(registry));
599        let mut encoder = Encoder::new(registry);
600        let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
601        trace!("starting client actor");
602
603        if let Some(name) = initial_name
604            && let Err(err) = self.send_name_endpoint(name).await
605        {
606            warn!(err = %err, "failed setting endpoint name on startup");
607        }
608
609        if let Some(group) = initial_group
610            && let Err(err) = self.send_set_group(group).await
611        {
612            warn!(err = %err, "failed setting endpoint group on startup");
613        }
614
615        if let Some(attributes) = initial_attributes
616            && let Err(err) = self.send_set_attributes(attributes).await
617        {
618            warn!(err = %err, "failed setting endpoint attributes on startup");
619        }
620
621        loop {
622            trace!("client actor tick");
623            tokio::select! {
624                biased;
625                Some(msg) = inbox.recv() => {
626                    match msg {
627                        ClientActorMessage::Ping{ done } => {
628                            let res = self.send_ping().await;
629                            if let Err(err) = done.send(res) {
630                                debug!("failed to send ping: {:#?}", err);
631                                self.authorized = false;
632                            }
633                        },
634                        ClientActorMessage::SendMetrics{ done } => {
635                            trace!("sending metrics manually triggered");
636                            let res = self.send_metrics(&mut encoder).await;
637                            if let Err(err) = done.send(res) {
638                                debug!("failed to push metrics: {:#?}", err);
639                                self.authorized = false;
640                            }
641                        }
642                        ClientActorMessage::GrantCap{ cap, done } => {
643                            let res = self.grant_cap(*cap).await;
644                            if let Err(err) = done.send(res) {
645                                warn!("failed to grant capability: {:#?}", err);
646                            }
647                        }
648                        ClientActorMessage::ReadName{ done } => {
649                            if let Err(err) = done.send(self.name.clone()) {
650                                warn!("sending name value: {:#?}", err);
651                            }
652                        }
653                        ClientActorMessage::NameEndpoint{ name, done } => {
654                            let res = self.send_name_endpoint(name).await;
655                            if let Err(err) = done.send(res) {
656                                warn!("failed to name endpoint: {:#?}", err);
657                            }
658                        }
659                        ClientActorMessage::SetGroup{ group, done } => {
660                            let res = self.send_set_group(group).await;
661                            if let Err(err) = done.send(res) {
662                                warn!("failed to set group: {:#?}", err);
663                            }
664                        }
665                        ClientActorMessage::SetAttributes{ attributes, done } => {
666                            let res = self.send_set_attributes(attributes).await;
667                            if let Err(err) = done.send(res) {
668                                warn!("failed to set attributes: {:#?}", err);
669                            }
670                        }
671                        ClientActorMessage::PutNetworkDiagnostics{ report, done } => {
672                            let res = self.put_network_diagnostics(*report).await;
673                            if let Err(err) = done.send(res) {
674                                warn!("failed to publish network diagnostics: {:#?}", err);
675                            }
676                        }
677                    }
678                }
679                _ = async {
680                    if let Some(ref mut timer) = metrics_timer {
681                        timer.tick().await;
682                    } else {
683                        std::future::pending::<()>().await;
684                    }
685                } => {
686                    trace!("metrics send tick");
687                    if let Err(err) = self.send_metrics(&mut encoder).await {
688                        debug!("failed to push metrics: {:#?}", err);
689                        self.authorized = false;
690                    }
691                },
692            }
693        }
694    }
695
696    // sends an authorization request to the server
697    async fn auth(&mut self) -> Result<(), RemoteError> {
698        if self.authorized {
699            return Ok(());
700        }
701        trace!("client authorizing");
702        self.client
703            .rpc(Auth {
704                caps: self.capabilities.clone(),
705            })
706            .await
707            .inspect_err(|e| debug!("authorization failed: {:?}", e))
708            .map_err(|e| RemoteError::AuthError(e.to_string()))?;
709        self.authorized = true;
710        Ok(())
711    }
712
713    async fn send_ping(&mut self) -> Result<Pong, RemoteError> {
714        trace!("client actor send ping");
715        self.auth().await?;
716
717        let req = rand::random();
718        self.client
719            .rpc(Ping { req_id: req })
720            .await
721            .inspect_err(|e| warn!("rpc ping error: {e}"))
722            .map_err(|_| RemoteError::InternalServerError)
723    }
724
725    async fn send_name_endpoint(&mut self, name: String) -> Result<(), RemoteError> {
726        trace!("client sending name endpoint request");
727        self.auth().await?;
728
729        self.client
730            .rpc(NameEndpoint { name: name.clone() })
731            .await
732            .inspect_err(|e| debug!("name endpoint error: {e}"))
733            .map_err(|_| RemoteError::InternalServerError)??;
734        self.name = Some(name);
735        Ok(())
736    }
737
738    async fn send_set_group(&mut self, group: String) -> Result<(), RemoteError> {
739        trace!("client sending set group request");
740        self.auth().await?;
741
742        self.client
743            .rpc(SetGroup {
744                group: group.clone(),
745            })
746            .await
747            .inspect_err(|e| debug!("set group error: {e}"))
748            .map_err(|_| RemoteError::InternalServerError)??;
749        self.group = Some(group);
750        Ok(())
751    }
752
753    async fn send_set_attributes(
754        &mut self,
755        attributes: BTreeMap<String, String>,
756    ) -> Result<(), RemoteError> {
757        trace!("client sending set attributes request");
758        self.auth().await?;
759
760        self.client
761            .rpc(SetAttributes {
762                attributes: attributes.clone(),
763            })
764            .await
765            .inspect_err(|e| debug!("set attributes error: {e}"))
766            .map_err(|_| RemoteError::InternalServerError)??;
767        self.attributes = attributes;
768        Ok(())
769    }
770
771    async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> {
772        trace!("client actor send metrics");
773        self.auth().await?;
774
775        let update = encoder.export();
776        // let delta = update_delta(&self.latest_ackd_update, &update);
777        let req = PutMetrics {
778            session_id: self.session_id,
779            update,
780        };
781
782        self.client
783            .rpc(req)
784            .await
785            .map_err(|_| RemoteError::InternalServerError)??;
786
787        Ok(())
788    }
789
790    async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
791        trace!("client actor grant capability");
792        self.auth().await?;
793
794        self.client
795            .rpc(crate::protocol::GrantCap { cap })
796            .await
797            .map_err(|_| RemoteError::InternalServerError)??;
798
799        Ok(())
800    }
801
802    async fn put_network_diagnostics(
803        &mut self,
804        report: crate::net_diagnostics::DiagnosticsReport,
805    ) -> Result<(), Error> {
806        trace!("client actor publish network diagnostics");
807        self.auth().await?;
808
809        let req = PutNetworkDiagnostics { report };
810
811        self.client
812            .rpc(req)
813            .await
814            .map_err(|_| RemoteError::InternalServerError)??;
815
816        Ok(())
817    }
818}
819
820async fn set_name_inner(
821    message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
822    name: String,
823) -> Result<(), Error> {
824    validate_name(&name)?;
825    debug!(name_len = name.len(), "calling set name");
826    let (tx, rx) = oneshot::channel();
827    message_channel
828        .send(ClientActorMessage::NameEndpoint { name, done: tx })
829        .await
830        .map_err(|_| Error::Other(anyhow!("sending name endpoint request")))?;
831    rx.await
832        .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
833        .map_err(Error::Remote)
834}
835
836async fn set_group_inner(
837    message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
838    group: String,
839) -> Result<(), Error> {
840    validate_name(&group).map_err(Error::InvalidGroup)?;
841    debug!(group_len = group.len(), "calling set group");
842    let (tx, rx) = oneshot::channel();
843    message_channel
844        .send(ClientActorMessage::SetGroup { group, done: tx })
845        .await
846        .map_err(|_| Error::Other(anyhow!("sending set group request")))?;
847    rx.await
848        .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
849        .map_err(Error::Remote)
850}
851
852async fn set_attributes_inner(
853    message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
854    attributes: BTreeMap<String, String>,
855) -> Result<(), Error> {
856    validate_attributes(&attributes)?;
857    debug!(attr_count = attributes.len(), "calling set attributes");
858    let (tx, rx) = oneshot::channel();
859    message_channel
860        .send(ClientActorMessage::SetAttributes {
861            attributes,
862            done: tx,
863        })
864        .await
865        .map_err(|_| Error::Other(anyhow!("sending set attributes request")))?;
866    rx.await
867        .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
868        .map_err(Error::Remote)
869}
870
871#[cfg(test)]
872mod tests {
873    use std::collections::HashMap;
874
875    use iroh::{Endpoint, EndpointAddr, SecretKey, endpoint::presets};
876    use rand::{RngExt, SeedableRng};
877    use temp_env_vars::temp_env_vars;
878
879    use crate::{
880        Client,
881        api_secret::ApiSecret,
882        caps::{Cap, Caps},
883        client::{
884            API_SECRET_ENV_VAR_NAME, BuildError, CLIENT_ATTRIBUTES_MAX_COUNT,
885            ValidateAttributesError, ValidateNameError,
886        },
887    };
888
889    #[tokio::test]
890    #[temp_env_vars]
891    async fn test_api_key_from_env() {
892        // construct
893        let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
894        let shared_secret = SecretKey::from_bytes(&rng.random());
895        let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
896        let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
897        unsafe {
898            std::env::set_var(API_SECRET_ENV_VAR_NAME, api_secret.to_string());
899        };
900
901        let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
902
903        let builder = Client::builder(&endpoint).api_secret_from_env().unwrap();
904
905        let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
906        assert_eq!(builder.remote, Some(fake_endpoint_addr));
907
908        // Compare capability fields individually to avoid flaky timestamp
909        // mismatches between the builder's rcan and a freshly-created one.
910        let cap = builder.cap.as_ref().expect("expected capability to be set");
911        assert_eq!(cap.capability(), &Caps::new([Cap::Client]));
912        assert_eq!(cap.audience(), &endpoint.id().as_verifying_key());
913        assert_eq!(cap.issuer(), &shared_secret.public().as_verifying_key());
914    }
915
916    /// Assert that disabling metrics interval can manually send metrics without
917    /// panicking. Metrics sending itself is expected to fail.
918    #[tokio::test]
919    async fn test_no_metrics_interval() {
920        let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(1);
921        let shared_secret = SecretKey::from_bytes(&rng.random());
922        let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
923        let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
924
925        let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
926
927        let client = Client::builder(&endpoint)
928            .disable_metrics_interval()
929            .api_secret(api_secret)
930            .unwrap()
931            .build()
932            .await
933            .unwrap();
934
935        let err = client.push_metrics().await;
936        assert!(err.is_err());
937    }
938
939    #[tokio::test]
940    async fn test_name() {
941        let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
942        let shared_secret = SecretKey::from_bytes(&rng.random());
943        let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
944        let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
945
946        let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
947
948        let builder = Client::builder(&endpoint)
949            .name("my-node 👋")
950            .unwrap()
951            .api_secret(api_secret)
952            .unwrap();
953
954        assert_eq!(builder.name, Some("my-node 👋".to_string()));
955
956        let Err(err) = Client::builder(&endpoint).name("a") else {
957            panic!("name should fail for strings under 2 bytes");
958        };
959        assert!(matches!(
960            err.downcast_ref::<BuildError>(),
961            Some(BuildError::InvalidName(ValidateNameError::TooShort))
962        ));
963
964        let too_long_name = "👋".repeat(129);
965        let Err(err) = Client::builder(&endpoint).name(&too_long_name) else {
966            panic!("name should fail for strings over 128 bytes");
967        };
968        assert!(matches!(
969            err.downcast_ref::<BuildError>(),
970            Some(BuildError::InvalidName(ValidateNameError::TooLong))
971        ));
972    }
973
974    #[tokio::test]
975    async fn test_group() {
976        let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
977        let shared_secret = SecretKey::from_bytes(&rng.random());
978        let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
979        let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
980
981        let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
982
983        let builder = Client::builder(&endpoint)
984            .group("staging")
985            .unwrap()
986            .api_secret(api_secret)
987            .unwrap();
988
989        assert_eq!(builder.group, Some("staging".to_string()));
990
991        let Err(err) = Client::builder(&endpoint).group("a") else {
992            panic!("group should fail for strings under 2 bytes");
993        };
994        assert!(matches!(
995            err.downcast_ref::<BuildError>(),
996            Some(BuildError::InvalidGroup(ValidateNameError::TooShort))
997        ));
998
999        let too_long_group = "👋".repeat(129);
1000        let Err(err) = Client::builder(&endpoint).group(&too_long_group) else {
1001            panic!("group should fail for strings over 128 bytes");
1002        };
1003        assert!(matches!(
1004            err.downcast_ref::<BuildError>(),
1005            Some(BuildError::InvalidGroup(ValidateNameError::TooLong))
1006        ));
1007    }
1008
1009    #[tokio::test]
1010    async fn test_attributes() {
1011        let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
1012
1013        // empty iterator is accepted (clears attributes server-side)
1014        let builder = Client::builder(&endpoint)
1015            .attributes(std::iter::empty::<(String, String)>())
1016            .unwrap();
1017        assert_eq!(builder.attributes.as_ref().map(|m| m.len()), Some(0));
1018
1019        // array literal of `&str` tuples — the one-liner ergonomics
1020        let builder = Client::builder(&endpoint)
1021            .attributes([("env", "prod"), ("region", "us-west")])
1022            .unwrap();
1023        let attrs = builder.attributes.as_ref().expect("attributes set");
1024        assert_eq!(attrs.get("env").map(String::as_str), Some("prod"));
1025        assert_eq!(attrs.get("region").map(String::as_str), Some("us-west"));
1026
1027        // HashMap<String, String> also works
1028        let mut map: HashMap<String, String> = HashMap::new();
1029        map.insert("k1".into(), "v1".into());
1030        map.insert("k2".into(), "".into()); // empty value is allowed
1031        let builder = Client::builder(&endpoint).attributes(map).unwrap();
1032        let attrs = builder.attributes.as_ref().expect("attributes set");
1033        assert_eq!(attrs.get("k2").map(String::as_str), Some(""));
1034
1035        // value over 128 bytes errors
1036        let too_long_value = "x".repeat(129);
1037        let Err(err) = Client::builder(&endpoint).attributes([("ok", too_long_value.as_str())])
1038        else {
1039            panic!("attributes should fail for value over 128 bytes");
1040        };
1041        assert!(matches!(
1042            err.downcast_ref::<BuildError>(),
1043            Some(BuildError::InvalidAttributes(
1044                ValidateAttributesError::ValueTooLong
1045            ))
1046        ));
1047
1048        // key under 2 bytes errors
1049        let Err(err) = Client::builder(&endpoint).attributes([("a", "v")]) else {
1050            panic!("attributes should fail for key under 2 bytes");
1051        };
1052        assert!(matches!(
1053            err.downcast_ref::<BuildError>(),
1054            Some(BuildError::InvalidAttributes(
1055                ValidateAttributesError::InvalidKey(ValidateNameError::TooShort)
1056            ))
1057        ));
1058
1059        // more than 128 entries errors
1060        let big: Vec<(String, String)> = (0..(CLIENT_ATTRIBUTES_MAX_COUNT + 1))
1061            .map(|i| (format!("key_{i:04}"), format!("val_{i}")))
1062            .collect();
1063        let Err(err) = Client::builder(&endpoint).attributes(big) else {
1064            panic!("attributes should fail for more than 128 entries");
1065        };
1066        assert!(matches!(
1067            err.downcast_ref::<BuildError>(),
1068            Some(BuildError::InvalidAttributes(
1069                ValidateAttributesError::TooManyEntries
1070            ))
1071        ));
1072    }
1073}