Skip to main content

iroh_services/
client.rs

1use std::{
2    str::FromStr,
3    sync::{Arc, RwLock},
4};
5
6use anyhow::{Result, anyhow, ensure};
7use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
8use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder};
9use irpc_iroh::IrohLazyRemoteConnection;
10use n0_error::StackResultExt;
11use n0_future::{task::AbortOnDropHandle, time::Duration};
12use rcan::Rcan;
13use tokio::sync::oneshot;
14use tracing::{debug, trace, warn};
15use uuid::Uuid;
16
17use crate::{
18    api_secret::ApiSecret,
19    caps::Caps,
20    logs::LogCollector,
21    net_diagnostics::{DiagnosticsReport, checks::run_diagnostics},
22    protocol::{
23        ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutLogs, PutMetrics,
24        PutNetworkDiagnostics, RemoteError,
25    },
26};
27
28/// Client is the main handle for interacting with iroh-services. It communicates with
29/// iroh-services entirely through an iroh endpoint, and is configured through a builder.
30/// Client requires either an Ssh Key or [`ApiSecret`]
31///
32/// ```no_run
33/// use iroh::{Endpoint, endpoint::presets};
34/// use iroh_services::Client;
35///
36/// async fn build_client() -> anyhow::Result<()> {
37///     let endpoint = Endpoint::bind(presets::N0).await?;
38///
39///     // needs IROH_SERVICES_API_SECRET set to an environment variable
40///     // client will now push endpoint metrics to iroh-services.
41///     let client = Client::builder(&endpoint)
42///         .api_secret_from_str("MY_API_SECRET")?
43///         .build()
44///         .await;
45///
46///     Ok(())
47/// }
48/// ```
49///
50/// [`ApiSecret`]: crate::api_secret::ApiSecret
51#[derive(Debug, Clone)]
52pub struct Client {
53    // owned clone of the endpoint for diagnostics, and for connection restarts on actor close
54    #[allow(dead_code)]
55    endpoint: Endpoint,
56    message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
57    _actor_task: Arc<AbortOnDropHandle<()>>,
58    _log_flush_task: Option<Arc<AbortOnDropHandle<()>>>,
59}
60
61/// ClientBuilder provides configures and builds a iroh-services client, typically
62/// created with [`Client::builder`]
63pub struct ClientBuilder {
64    #[allow(dead_code)]
65    cap_expiry: Duration,
66    cap: Option<Rcan<Caps>>,
67    endpoint: Endpoint,
68    name: Option<String>,
69    metrics_interval: Option<Duration>,
70    remote: Option<EndpointAddr>,
71    registry: Registry,
72    log_collector: Option<LogCollector>,
73    log_flush_interval: Duration,
74    log_max_batch: usize,
75}
76
77const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 1 month
78pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET";
79
80/// Default interval between log batch flushes when log collection is enabled.
81pub const DEFAULT_LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(1);
82/// Default maximum batch size pushed in a single PutLogs request.
83pub const DEFAULT_LOG_MAX_BATCH: usize = 200;
84
85impl ClientBuilder {
86    pub fn new(endpoint: &Endpoint) -> Self {
87        let mut registry = Registry::default();
88        registry.register_all(endpoint.metrics());
89
90        Self {
91            cap: None,
92            cap_expiry: DEFAULT_CAP_EXPIRY,
93            endpoint: endpoint.clone(),
94            name: None,
95            metrics_interval: Some(Duration::from_secs(60)),
96            remote: None,
97            registry,
98            log_collector: None,
99            log_flush_interval: DEFAULT_LOG_FLUSH_INTERVAL,
100            log_max_batch: DEFAULT_LOG_MAX_BATCH,
101        }
102    }
103
104    /// Enables periodic shipment of buffered log lines to iroh-services.
105    ///
106    /// The collector is shared with [`crate::client_host::ClientHost`] when
107    /// runtime log-level overrides are needed; clone it before passing so both
108    /// sides hold a handle.
109    pub fn with_log_collection(mut self, collector: LogCollector) -> Self {
110        self.log_collector = Some(collector);
111        self
112    }
113
114    /// Override the log batch flush interval. Defaults to one second.
115    pub fn log_flush_interval(mut self, interval: Duration) -> Self {
116        self.log_flush_interval = interval;
117        self
118    }
119
120    /// Override the maximum number of lines included in a single PutLogs
121    /// request. Defaults to [`DEFAULT_LOG_MAX_BATCH`].
122    pub fn log_max_batch(mut self, max: usize) -> Self {
123        self.log_max_batch = max;
124        self
125    }
126
127    /// Register a metrics group to forward to iroh-services
128    ///
129    /// The default registered metrics uses only the endpoint
130    pub fn register_metrics_group(mut self, metrics_group: Arc<dyn MetricsGroup>) -> Self {
131        self.registry.register(metrics_group);
132        self
133    }
134
135    /// Set the metrics collection interval
136    ///
137    /// Defaults to enabled, every 60 seconds.
138    pub fn metrics_interval(mut self, interval: Duration) -> Self {
139        self.metrics_interval = Some(interval);
140        self
141    }
142
143    /// Disable metrics collection.
144    pub fn disable_metrics_interval(mut self) -> Self {
145        self.metrics_interval = None;
146        self
147    }
148
149    /// Set an optional human-readable name for the endpoint the client is
150    /// constructed with, making metrics from this endpoint easier to identify.
151    /// This is often used for associating with other services in your app,
152    /// like a database user id, machine name, permanent username, etc.
153    ///
154    /// When this builder method is called, the provided name is sent after the
155    /// client initially authenticates the endpoint server-side.
156    /// Errors will not interrupt client construction, instead producing a
157    /// warn-level log. For explicit error handling, use [`Client::set_name`].
158    ///
159    /// names can be any UTF-8 string, with a min length of 2 bytes, and
160    /// maximum length of 128 bytes. **name uniqueness is not enforced
161    /// server-side**, which means using the same name for different endpoints
162    /// will not produce an error
163    pub fn name(mut self, name: impl Into<String>) -> Result<Self> {
164        let name = name.into();
165        validate_name(&name).map_err(BuildError::InvalidName)?;
166        self.name = Some(name);
167        Ok(self)
168    }
169
170    /// Check IROH_SERVICES_API_SECRET environment variable for a valid API secret
171    pub fn api_secret_from_env(self) -> Result<Self> {
172        let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
173        self.api_secret(ticket)
174    }
175
176    /// set client API secret from an encoded string
177    pub fn api_secret_from_str(self, secret_key: &str) -> Result<Self> {
178        let key = ApiSecret::from_str(secret_key).context("invalid iroh services api secret")?;
179        self.api_secret(key)
180    }
181
182    /// Use a shared secret & remote iroh-services endpoint ID contained within a ticket
183    /// to construct a iroh-services client. The resulting client will have "Client"
184    /// capabilities.
185    ///
186    /// API secrets include remote details within them, and will set both the
187    /// remote and rcan values on the builder
188    pub fn api_secret(mut self, ticket: ApiSecret) -> Result<Self> {
189        let local_id = self.endpoint.id();
190        let rcan = crate::caps::create_api_token_from_secret_key(
191            ticket.secret,
192            local_id,
193            self.cap_expiry,
194            Caps::for_shared_secret(),
195        )?;
196
197        self.remote = Some(ticket.remote);
198        self.rcan(rcan)
199    }
200
201    /// Loads the private ssh key from the given path, and creates the needed capability.
202    #[cfg(not(target_arch = "wasm32"))]
203    pub async fn ssh_key_from_file<P: AsRef<std::path::Path>>(self, path: P) -> Result<Self> {
204        let file_content = tokio::fs::read_to_string(path).await?;
205        let private_key = ssh_key::PrivateKey::from_openssh(&file_content)?;
206
207        self.ssh_key(&private_key)
208    }
209
210    /// Creates the capability from the provided private ssh key.
211    #[cfg(not(target_arch = "wasm32"))]
212    pub fn ssh_key(mut self, key: &ssh_key::PrivateKey) -> Result<Self> {
213        let local_id = self.endpoint.id();
214        let rcan = crate::caps::create_api_token_from_ssh_key(
215            key,
216            local_id,
217            self.cap_expiry,
218            Caps::all(),
219        )?;
220        self.cap.replace(rcan);
221
222        Ok(self)
223    }
224
225    /// Sets the rcan directly.
226    pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
227        ensure!(
228            EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
229            "invalid audience"
230        );
231        self.cap.replace(cap);
232        Ok(self)
233    }
234
235    /// Sets the remote to dial, must be provided either directly by calling
236    /// this method, or through calling the api_secret builder methods.
237    pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
238        self.remote = Some(remote.into());
239        self
240    }
241
242    /// Create a new client, connected to the provide service node
243    #[must_use = "dropping the client will silently cancel all client tasks"]
244    pub async fn build(self) -> Result<Client, BuildError> {
245        debug!("starting iroh-services client");
246        let remote = self.remote.ok_or(BuildError::MissingRemote)?;
247        let capabilities = self.cap.ok_or(BuildError::MissingCapability)?;
248
249        let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
250        let irpc_client = IrohServicesClient::boxed(conn);
251
252        let session_id = Uuid::new_v4();
253        // The actor mailbox is only used for control-plane messages (auth,
254        // ping, name, grant_cap) plus the periodic metrics + log flush. A
255        // small buffer is enough but `1` head-of-line-blocks log flushes
256        // behind metrics ticks, so leave a little room.
257        let (tx, rx) = tokio::sync::mpsc::channel(8);
258        let actor_task = AbortOnDropHandle::new(n0_future::task::spawn(
259            ClientActor {
260                capabilities,
261                client: irpc_client,
262                name: self.name.clone(),
263                session_id,
264                authorized: false,
265            }
266            .run(self.name, self.registry, self.metrics_interval, rx),
267        ));
268
269        let log_flush_task = self.log_collector.map(|collector| {
270            let message_channel = tx.clone();
271            let interval = self.log_flush_interval;
272            let max_batch = self.log_max_batch;
273            Arc::new(AbortOnDropHandle::new(n0_future::task::spawn(
274                run_log_flush(message_channel, collector, interval, max_batch, session_id),
275            )))
276        });
277
278        Ok(Client {
279            endpoint: self.endpoint,
280            message_channel: tx,
281            _actor_task: Arc::new(actor_task),
282            _log_flush_task: log_flush_task,
283        })
284    }
285}
286
287#[derive(thiserror::Error, Debug)]
288pub enum BuildError {
289    #[error("Missing remote endpoint to dial")]
290    MissingRemote,
291    #[error("Missing capability")]
292    MissingCapability,
293    #[error("Unauthorized")]
294    Unauthorized,
295    #[error("Remote error: {0}")]
296    Remote(#[from] RemoteError),
297    #[error("Rpc connection error: {0}")]
298    Rpc(irpc::Error),
299    #[error("Connection error: {0}")]
300    Connect(ConnectError),
301    #[error("Invalid endpoint name: {0}")]
302    InvalidName(#[from] ValidateNameError),
303}
304
305impl From<irpc::Error> for BuildError {
306    fn from(value: irpc::Error) -> Self {
307        match value {
308            irpc::Error::Request {
309                source:
310                    irpc::RequestError::Connection {
311                        source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
312                        ..
313                    },
314                ..
315            } if frame.error_code == 401u32.into() => Self::Unauthorized,
316            value => Self::Rpc(value),
317        }
318    }
319}
320
321/// Minimum length in bytes for an endpoint name.
322pub const CLIENT_NAME_MIN_LENGTH: usize = 2;
323/// Maximum length in bytes for an endpoint name.
324pub const CLIENT_NAME_MAX_LENGTH: usize = 128;
325
326/// Error returned when an endpoint name fails validation.
327#[derive(Debug, thiserror::Error)]
328pub enum ValidateNameError {
329    #[error("Name is too long (must be no more than {CLIENT_NAME_MAX_LENGTH} characters).")]
330    TooLong,
331    #[error("Name is too short (must be at least {CLIENT_NAME_MIN_LENGTH} characters).")]
332    TooShort,
333}
334
335fn validate_name(name: &str) -> Result<(), ValidateNameError> {
336    if name.len() < CLIENT_NAME_MIN_LENGTH {
337        Err(ValidateNameError::TooShort)
338    } else if name.len() > CLIENT_NAME_MAX_LENGTH {
339        Err(ValidateNameError::TooLong)
340    } else {
341        Ok(())
342    }
343}
344
345#[derive(thiserror::Error, Debug)]
346pub enum Error {
347    #[error("Invalid endpoint name: {0}")]
348    InvalidName(#[from] ValidateNameError),
349    #[error("Remote error: {0}")]
350    Remote(#[from] RemoteError),
351    #[error("Connection error: {0}")]
352    Rpc(#[from] irpc::Error),
353    #[error(transparent)]
354    Other(#[from] anyhow::Error),
355}
356
357impl Client {
358    pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
359        ClientBuilder::new(endpoint)
360    }
361
362    /// Read the current endpoint name from the local client.
363    pub async fn name(&self) -> Result<Option<String>, Error> {
364        let (tx, rx) = oneshot::channel();
365        self.message_channel
366            .send(ClientActorMessage::ReadName { done: tx })
367            .await
368            .map_err(|_| Error::Other(anyhow!("sending name read request")))?;
369
370        rx.await
371            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))
372    }
373
374    /// Name the active endpoint cloud-side.
375    ///
376    /// names can be any UTF-8 string, with a min length of 2 bytes, and
377    /// maximum length of 128 bytes. **name uniqueness is not enforced.**
378    pub async fn set_name(&self, name: impl Into<String>) -> Result<(), Error> {
379        set_name_inner(self.message_channel.clone(), name.into()).await
380    }
381
382    /// Pings the remote node.
383    pub async fn ping(&self) -> Result<Pong, Error> {
384        let (tx, rx) = oneshot::channel();
385        self.message_channel
386            .send(ClientActorMessage::Ping { done: tx })
387            .await
388            .map_err(|_| Error::Other(anyhow!("sending ping request")))?;
389
390        rx.await
391            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
392            .map_err(Error::Remote)
393    }
394
395    /// immediately send a single dump of metrics to iroh-services. It's not necessary
396    /// to call this function if you're using a non-zero metrics interval,
397    /// which will automatically propagate metrics on the set interval for you
398    pub async fn push_metrics(&self) -> Result<(), Error> {
399        let (tx, rx) = oneshot::channel();
400        self.message_channel
401            .send(ClientActorMessage::SendMetrics { done: tx })
402            .await
403            .map_err(|_| Error::Other(anyhow!("sending metrics")))?;
404
405        rx.await
406            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
407            .map_err(Error::Remote)
408    }
409
410    /// Grant capabilities to a remote endpoint. Creates a signed RCAN token
411    /// and sends it to iroh-services for storage. The remote can then use this token
412    /// when dialing back to authorize its requests.
413    pub async fn grant_capability(
414        &self,
415        remote_id: EndpointId,
416        caps: impl IntoIterator<Item = impl Into<crate::caps::Cap>>,
417    ) -> Result<(), Error> {
418        let cap = crate::caps::create_grant_token(
419            self.endpoint.secret_key().clone(),
420            remote_id,
421            DEFAULT_CAP_EXPIRY,
422            Caps::new(caps),
423        )
424        .map_err(Error::Other)?;
425
426        let (tx, rx) = oneshot::channel();
427        self.message_channel
428            .send(ClientActorMessage::GrantCap {
429                cap: Box::new(cap),
430                done: tx,
431            })
432            .await
433            .map_err(|_| Error::Other(anyhow!("granting capability")))?;
434
435        rx.await
436            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
437    }
438
439    /// run local network status diagnostics, optionally uploading the results
440    pub async fn net_diagnostics(&self, send: bool) -> Result<DiagnosticsReport, Error> {
441        let report = run_diagnostics(&self.endpoint).await?;
442        if send {
443            let (tx, rx) = oneshot::channel();
444            self.message_channel
445                .send(ClientActorMessage::PutNetworkDiagnostics {
446                    done: tx,
447                    report: Box::new(report.clone()),
448                })
449                .await
450                .map_err(|_| Error::Other(anyhow!("sending network diagnostics report")))?;
451
452            let _ = rx
453                .await
454                .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
455        }
456
457        Ok(report)
458    }
459}
460
461enum ClientActorMessage {
462    SendMetrics {
463        done: oneshot::Sender<Result<(), RemoteError>>,
464    },
465    Ping {
466        done: oneshot::Sender<Result<Pong, RemoteError>>,
467    },
468    // GrantCap is used by the `client_host` feature flag
469    #[allow(dead_code)]
470    GrantCap {
471        // boxed to avoid large enum variants
472        cap: Box<Rcan<Caps>>,
473        done: oneshot::Sender<Result<(), Error>>,
474    },
475    PutNetworkDiagnostics {
476        report: Box<DiagnosticsReport>,
477        done: oneshot::Sender<Result<(), Error>>,
478    },
479    PutLogs {
480        request: PutLogs,
481        done: oneshot::Sender<Result<(), Error>>,
482    },
483    ReadName {
484        done: oneshot::Sender<Option<String>>,
485    },
486    NameEndpoint {
487        name: String,
488        done: oneshot::Sender<Result<(), RemoteError>>,
489    },
490}
491
492struct ClientActor {
493    capabilities: Rcan<Caps>,
494    client: IrohServicesClient,
495    name: Option<String>,
496    session_id: Uuid,
497    authorized: bool,
498}
499
500impl ClientActor {
501    async fn run(
502        mut self,
503        initial_name: Option<String>,
504        registry: Registry,
505        interval: Option<Duration>,
506        mut inbox: tokio::sync::mpsc::Receiver<ClientActorMessage>,
507    ) {
508        let registry = Arc::new(RwLock::new(registry));
509        let mut encoder = Encoder::new(registry);
510        let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
511        trace!("starting client actor");
512
513        if let Some(name) = initial_name
514            && let Err(err) = self.send_name_endpoint(name).await
515        {
516            warn!(err = %err, "failed setting endpoint name on startup");
517        }
518
519        loop {
520            trace!("client actor tick");
521            tokio::select! {
522                biased;
523                Some(msg) = inbox.recv() => {
524                    match msg {
525                        ClientActorMessage::Ping{ done } => {
526                            let res = self.send_ping().await;
527                            if let Err(err) = done.send(res) {
528                                debug!("failed to send ping: {:#?}", err);
529                                self.authorized = false;
530                            }
531                        },
532                        ClientActorMessage::SendMetrics{ done } => {
533                            trace!("sending metrics manually triggered");
534                            let res = self.send_metrics(&mut encoder).await;
535                            if let Err(err) = done.send(res) {
536                                debug!("failed to push metrics: {:#?}", err);
537                                self.authorized = false;
538                            }
539                        }
540                        ClientActorMessage::GrantCap{ cap, done } => {
541                            let res = self.grant_cap(*cap).await;
542                            if let Err(err) = done.send(res) {
543                                warn!("failed to grant capability: {:#?}", err);
544                            }
545                        }
546                        ClientActorMessage::ReadName{ done } => {
547                            if let Err(err) = done.send(self.name.clone()) {
548                                warn!("sending name value: {:#?}", err);
549                            }
550                        }
551                        ClientActorMessage::NameEndpoint{ name, done } => {
552                            let res = self.send_name_endpoint(name).await;
553                            if let Err(err) = done.send(res) {
554                                warn!("failed to name endpoint: {:#?}", err);
555                            }
556                        }
557                        ClientActorMessage::PutNetworkDiagnostics{ report, done } => {
558                            let res = self.put_network_diagnostics(*report).await;
559                            if let Err(err) = done.send(res) {
560                                warn!("failed to publish network diagnostics: {:#?}", err);
561                            }
562                        }
563                        ClientActorMessage::PutLogs{ request, done } => {
564                            let res = self.put_logs(request).await;
565                            if let Err(err) = done.send(res) {
566                                debug!("failed to publish logs: {:#?}", err);
567                                self.authorized = false;
568                            }
569                        }
570                    }
571                }
572                _ = async {
573                    if let Some(ref mut timer) = metrics_timer {
574                        timer.tick().await;
575                    } else {
576                        std::future::pending::<()>().await;
577                    }
578                } => {
579                    trace!("metrics send tick");
580                    if let Err(err) = self.send_metrics(&mut encoder).await {
581                        debug!("failed to push metrics: {:#?}", err);
582                        self.authorized = false;
583                    }
584                },
585            }
586        }
587    }
588
589    // sends an authorization request to the server
590    async fn auth(&mut self) -> Result<(), RemoteError> {
591        if self.authorized {
592            return Ok(());
593        }
594        trace!("client authorizing");
595        self.client
596            .rpc(Auth {
597                caps: self.capabilities.clone(),
598            })
599            .await
600            .inspect_err(|e| debug!("authorization failed: {:?}", e))
601            .map_err(|e| RemoteError::AuthError(e.to_string()))?;
602        self.authorized = true;
603        Ok(())
604    }
605
606    async fn send_ping(&mut self) -> Result<Pong, RemoteError> {
607        trace!("client actor send ping");
608        self.auth().await?;
609
610        let req = rand::random();
611        self.client
612            .rpc(Ping { req_id: req })
613            .await
614            .inspect_err(|e| warn!("rpc ping error: {e}"))
615            .map_err(|_| RemoteError::InternalServerError)
616    }
617
618    async fn send_name_endpoint(&mut self, name: String) -> Result<(), RemoteError> {
619        trace!("client sending name endpoint request");
620        self.auth().await?;
621
622        self.client
623            .rpc(NameEndpoint { name: name.clone() })
624            .await
625            .inspect_err(|e| debug!("name endpoint error: {e}"))
626            .map_err(|_| RemoteError::InternalServerError)??;
627        self.name = Some(name);
628        Ok(())
629    }
630
631    async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> {
632        trace!("client actor send metrics");
633        self.auth().await?;
634
635        let update = encoder.export();
636        // let delta = update_delta(&self.latest_ackd_update, &update);
637        let req = PutMetrics {
638            session_id: self.session_id,
639            update,
640        };
641
642        self.client
643            .rpc(req)
644            .await
645            .map_err(|_| RemoteError::InternalServerError)??;
646
647        Ok(())
648    }
649
650    async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
651        trace!("client actor grant capability");
652        self.auth().await?;
653
654        self.client
655            .rpc(crate::protocol::GrantCap { cap })
656            .await
657            .map_err(|_| RemoteError::InternalServerError)??;
658
659        Ok(())
660    }
661
662    async fn put_network_diagnostics(
663        &mut self,
664        report: crate::net_diagnostics::DiagnosticsReport,
665    ) -> Result<(), Error> {
666        trace!("client actor publish network diagnostics");
667        self.auth().await?;
668
669        let req = PutNetworkDiagnostics { report };
670
671        self.client
672            .rpc(req)
673            .await
674            .map_err(|_| RemoteError::InternalServerError)??;
675
676        Ok(())
677    }
678
679    async fn put_logs(&mut self, request: PutLogs) -> Result<(), Error> {
680        trace!(
681            lines = request.lines.len(),
682            dropped = request.dropped,
683            "client actor put logs"
684        );
685        self.auth().await?;
686
687        self.client
688            .rpc(request)
689            .await
690            .map_err(|_| RemoteError::InternalServerError)??;
691
692        Ok(())
693    }
694}
695
696async fn run_log_flush(
697    message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
698    collector: LogCollector,
699    interval: Duration,
700    max_batch: usize,
701    session_id: Uuid,
702) {
703    const INITIAL_BACKOFF: Duration = Duration::from_millis(500);
704    const MAX_BACKOFF: Duration = Duration::from_secs(30);
705
706    let mut ticker = n0_future::time::interval(interval);
707    // After a slow RPC the default `Burst` behavior would fire several
708    // ticks back-to-back; `Delay` waits a full interval from the previous
709    // completed tick.
710    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
711    let mut backoff = INITIAL_BACKOFF;
712    loop {
713        ticker.tick().await;
714        let (lines, dropped) = collector.drain(max_batch);
715        if lines.is_empty() && dropped == 0 {
716            backoff = INITIAL_BACKOFF;
717            continue;
718        }
719        let request = PutLogs {
720            session_id,
721            lines,
722            dropped,
723        };
724        let (tx, rx) = oneshot::channel();
725        if message_channel
726            .send(ClientActorMessage::PutLogs { request, done: tx })
727            .await
728            .is_err()
729        {
730            // Mailbox closed only when the actor task has terminated; that
731            // means the entire client is gone and there is nothing to do.
732            debug!("log flush stopped: client actor channel closed");
733            return;
734        }
735        match rx.await {
736            Ok(Ok(())) => {
737                backoff = INITIAL_BACKOFF;
738            }
739            // Either the RPC failed (Ok(Err)) or the actor dropped the
740            // response sender mid-handoff (Err(_)). Both are transient: keep
741            // ticking and back off so the next attempt happens later.
742            other => {
743                debug!(?other, ?backoff, "log flush attempt failed; backing off");
744                n0_future::time::sleep(backoff).await;
745                backoff = (backoff * 2).min(MAX_BACKOFF);
746            }
747        }
748    }
749}
750
751async fn set_name_inner(
752    message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
753    name: String,
754) -> Result<(), Error> {
755    validate_name(&name)?;
756    debug!(name_len = name.len(), "calling set name");
757    let (tx, rx) = oneshot::channel();
758    message_channel
759        .send(ClientActorMessage::NameEndpoint { name, done: tx })
760        .await
761        .map_err(|_| Error::Other(anyhow!("sending name endpoint request")))?;
762    rx.await
763        .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
764        .map_err(Error::Remote)
765}
766
767#[cfg(test)]
768mod tests {
769    use iroh::{Endpoint, EndpointAddr, SecretKey, endpoint::presets};
770    use rand::{RngExt, SeedableRng};
771    use temp_env_vars::temp_env_vars;
772
773    use crate::{
774        Client,
775        api_secret::ApiSecret,
776        caps::{Cap, Caps},
777        client::{API_SECRET_ENV_VAR_NAME, BuildError, ValidateNameError},
778    };
779
780    #[tokio::test]
781    #[temp_env_vars]
782    async fn test_api_key_from_env() {
783        // construct
784        let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
785        let shared_secret = SecretKey::from_bytes(&rng.random());
786        let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
787        let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
788        unsafe {
789            std::env::set_var(API_SECRET_ENV_VAR_NAME, api_secret.to_string());
790        };
791
792        let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
793
794        let builder = Client::builder(&endpoint).api_secret_from_env().unwrap();
795
796        let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
797        assert_eq!(builder.remote, Some(fake_endpoint_addr));
798
799        // Compare capability fields individually to avoid flaky timestamp
800        // mismatches between the builder's rcan and a freshly-created one.
801        let cap = builder.cap.as_ref().expect("expected capability to be set");
802        assert_eq!(cap.capability(), &Caps::new([Cap::Client]));
803        assert_eq!(cap.audience(), &endpoint.id().as_verifying_key());
804        assert_eq!(cap.issuer(), &shared_secret.public().as_verifying_key());
805    }
806
807    /// Assert that disabling metrics interval can manually send metrics without
808    /// panicking. Metrics sending itself is expected to fail.
809    #[tokio::test]
810    async fn test_no_metrics_interval() {
811        let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(1);
812        let shared_secret = SecretKey::from_bytes(&rng.random());
813        let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
814        let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
815
816        let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
817
818        let client = Client::builder(&endpoint)
819            .disable_metrics_interval()
820            .api_secret(api_secret)
821            .unwrap()
822            .build()
823            .await
824            .unwrap();
825
826        let err = client.push_metrics().await;
827        assert!(err.is_err());
828    }
829
830    #[tokio::test]
831    async fn test_name() {
832        let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
833        let shared_secret = SecretKey::from_bytes(&rng.random());
834        let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
835        let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
836
837        let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
838
839        let builder = Client::builder(&endpoint)
840            .name("my-node 👋")
841            .unwrap()
842            .api_secret(api_secret)
843            .unwrap();
844
845        assert_eq!(builder.name, Some("my-node 👋".to_string()));
846
847        let Err(err) = Client::builder(&endpoint).name("a") else {
848            panic!("name should fail for strings under 2 bytes");
849        };
850        assert!(matches!(
851            err.downcast_ref::<BuildError>(),
852            Some(BuildError::InvalidName(ValidateNameError::TooShort))
853        ));
854
855        let too_long_name = "👋".repeat(129);
856        let Err(err) = Client::builder(&endpoint).name(&too_long_name) else {
857            panic!("name should fail for strings over 128 bytes");
858        };
859        assert!(matches!(
860            err.downcast_ref::<BuildError>(),
861            Some(BuildError::InvalidName(ValidateNameError::TooLong))
862        ));
863    }
864}