Skip to main content

iroh_services/
client.rs

1use std::{
2    str::FromStr,
3    sync::{Arc, RwLock},
4};
5
6use anyhow::{Result, anyhow, ensure};
7use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
8use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder};
9use irpc_iroh::IrohLazyRemoteConnection;
10use n0_error::StackResultExt;
11use n0_future::{task::AbortOnDropHandle, time::Duration};
12use rcan::Rcan;
13use tokio::sync::oneshot;
14use tracing::{debug, trace, warn};
15use uuid::Uuid;
16
17#[cfg(feature = "net_diagnostics")]
18use crate::net_diagnostics::{DiagnosticsReport, checks::run_diagnostics};
19#[cfg(feature = "net_diagnostics")]
20use crate::protocol::PutNetworkDiagnostics;
21use crate::{
22    alerts::LogMonitor,
23    api_secret::ApiSecret,
24    caps::Caps,
25    protocol::{
26        ALPN, AlertInfo, Auth, IrohServicesClient, Ping, Pong, PutMetrics, RemoteError, SendAlert,
27    },
28};
29
30/// Client is the main handle for interacting with iroh-services. It communicates with
31/// iroh-services entirely through an iroh endpoint, and is configured through a builder.
32/// Client requires either an Ssh Key or [`ApiSecret`]
33///
34/// ```no_run
35/// use iroh::{Endpoint, endpoint::presets};
36/// use iroh_services::Client;
37///
38/// async fn build_client() -> anyhow::Result<()> {
39///     let endpoint = Endpoint::bind(presets::N0).await?;
40///
41///     // needs IROH_SERVICES_API_SECRET set to an environment variable
42///     // client will now push endpoint metrics to iroh-services.
43///     let client = Client::builder(&endpoint)
44///         .api_secret_from_str("MY_API_SECRET")?
45///         .build()
46///         .await;
47///
48///     Ok(())
49/// }
50/// ```
51///
52/// [`ApiSecret`]: crate::api_secret::ApiSecret
53#[derive(Debug, Clone)]
54pub struct Client {
55    // owned clone of the endpoint for diagnostics, and for connection restarts on actor close
56    #[allow(dead_code)]
57    endpoint: Endpoint,
58    message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
59    _actor_task: Arc<AbortOnDropHandle<()>>,
60}
61
62/// ClientBuilder provides configures and builds a iroh-services client, typically
63/// created with [`Client::builder`]
64pub struct ClientBuilder {
65    #[allow(dead_code)]
66    cap_expiry: Duration,
67    cap: Option<Rcan<Caps>>,
68    endpoint: Endpoint,
69    metrics_interval: Option<Duration>,
70    remote: Option<EndpointAddr>,
71    registry: Registry,
72}
73
74const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 1 month
75pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET";
76
77impl ClientBuilder {
78    pub fn new(endpoint: &Endpoint) -> Self {
79        let mut registry = Registry::default();
80        registry.register_all(endpoint.metrics());
81
82        Self {
83            cap: None,
84            cap_expiry: DEFAULT_CAP_EXPIRY,
85            endpoint: endpoint.clone(),
86            metrics_interval: Some(Duration::from_secs(60)),
87            remote: None,
88            registry,
89        }
90    }
91
92    /// Register a metrics group to forward to iroh-services
93    ///
94    /// The default registered metrics uses only the endpoint
95    pub fn register_metrics_group(mut self, metrics_group: Arc<dyn MetricsGroup>) -> Self {
96        self.registry.register(metrics_group);
97        self
98    }
99
100    /// Set the metrics collection interval
101    ///
102    /// Defaults to enabled, every 60 seconds.
103    pub fn metrics_interval(mut self, interval: Duration) -> Self {
104        self.metrics_interval = Some(interval);
105        self
106    }
107
108    /// Disable metrics collection.
109    pub fn disable_metrics_interval(mut self) -> Self {
110        self.metrics_interval = None;
111        self
112    }
113
114    /// Check IROH_SERVICES_API_SECRET environment variable for a valid API secret
115    pub fn api_secret_from_env(self) -> Result<Self> {
116        let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
117        self.api_secret(ticket)
118    }
119
120    /// set client API secret from an encoded string
121    pub fn api_secret_from_str(self, secret_key: &str) -> Result<Self> {
122        let key = ApiSecret::from_str(secret_key).context("invalid iroh services api secret")?;
123        self.api_secret(key)
124    }
125
126    /// Use a shared secret & remote iroh-services endpoint ID contained within a ticket
127    /// to construct a iroh-services client. The resulting client will have "Client"
128    /// capabilities.
129    ///
130    /// API secrets include remote details within them, and will set both the
131    /// remote and rcan values on the builder
132    pub fn api_secret(mut self, ticket: ApiSecret) -> Result<Self> {
133        let local_id = self.endpoint.id();
134        let rcan = crate::caps::create_api_token_from_secret_key(
135            ticket.secret,
136            local_id,
137            self.cap_expiry,
138            Caps::for_shared_secret(),
139        )?;
140
141        self.remote = Some(ticket.remote);
142        self.rcan(rcan)
143    }
144
145    /// Loads the private ssh key from the given path, and creates the needed capability.
146    #[cfg(feature = "ssh-key")]
147    pub async fn ssh_key_from_file<P: AsRef<std::path::Path>>(self, path: P) -> Result<Self> {
148        let file_content = tokio::fs::read_to_string(path).await?;
149        let private_key = ssh_key::PrivateKey::from_openssh(&file_content)?;
150
151        self.ssh_key(&private_key)
152    }
153
154    /// Creates the capability from the provided private ssh key.
155    #[cfg(feature = "ssh-key")]
156    pub fn ssh_key(mut self, key: &ssh_key::PrivateKey) -> Result<Self> {
157        let local_id = self.endpoint.id();
158        let rcan = crate::caps::create_api_token_from_ssh_key(
159            key,
160            local_id,
161            self.cap_expiry,
162            Caps::all(),
163        )?;
164        self.cap.replace(rcan);
165
166        Ok(self)
167    }
168
169    /// Sets the rcan directly.
170    pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
171        ensure!(
172            EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
173            "invalid audience"
174        );
175        self.cap.replace(cap);
176        Ok(self)
177    }
178
179    /// Sets the remote to dial, must be provided either directly by calling
180    /// this method, or through calling the api_secret builder methods.
181    pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
182        self.remote = Some(remote.into());
183        self
184    }
185
186    /// Create a new client, connected to the provide service node
187    #[must_use = "dropping the client will silently cancel all client tasks"]
188    pub async fn build(self) -> Result<Client, BuildError> {
189        debug!("starting iroh-services client");
190        let remote = self.remote.ok_or(BuildError::MissingRemote)?;
191        let capabilities = self.cap.ok_or(BuildError::MissingCapability)?;
192
193        let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
194        let client = IrohServicesClient::boxed(conn);
195
196        let (tx, rx) = tokio::sync::mpsc::channel(1);
197        let metrics_task = AbortOnDropHandle::new(n0_future::task::spawn(
198            ClientActor {
199                capabilities,
200                client,
201                session_id: Uuid::new_v4(),
202                authorized: false,
203            }
204            .run(self.registry, self.metrics_interval, rx),
205        ));
206
207        Ok(Client {
208            endpoint: self.endpoint,
209            message_channel: tx,
210            _actor_task: Arc::new(metrics_task),
211        })
212    }
213}
214
215#[derive(thiserror::Error, Debug)]
216pub enum BuildError {
217    #[error("Missing remote endpoint to dial")]
218    MissingRemote,
219    #[error("Missing capability")]
220    MissingCapability,
221    #[error("Unauthorized")]
222    Unauthorized,
223    #[error("Remote error: {0}")]
224    Remote(#[from] RemoteError),
225    #[error("Rpc connection error: {0}")]
226    Rpc(irpc::Error),
227    #[error("Connection error: {0}")]
228    Connect(ConnectError),
229}
230
231impl From<irpc::Error> for BuildError {
232    fn from(value: irpc::Error) -> Self {
233        match value {
234            irpc::Error::Request {
235                source:
236                    irpc::RequestError::Connection {
237                        source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
238                        ..
239                    },
240                ..
241            } if frame.error_code == 401u32.into() => Self::Unauthorized,
242            value => Self::Rpc(value),
243        }
244    }
245}
246
247#[derive(thiserror::Error, Debug)]
248pub enum Error {
249    #[error("Remote error: {0}")]
250    Remote(#[from] RemoteError),
251    #[error("Connection error: {0}")]
252    Rpc(#[from] irpc::Error),
253    #[error(transparent)]
254    Other(#[from] anyhow::Error),
255}
256
257impl Client {
258    pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
259        ClientBuilder::new(endpoint)
260    }
261
262    /// Pings the remote node.
263    pub async fn ping(&self) -> Result<Pong, Error> {
264        let (tx, rx) = oneshot::channel();
265        self.message_channel
266            .send(ClientActorMessage::Ping { done: tx })
267            .await
268            .map_err(|_| Error::Other(anyhow!("sending ping request")))?;
269
270        rx.await
271            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
272            .map_err(Error::Remote)
273    }
274
275    /// immediately send a single dump of metrics to iroh-services. It's not necessary
276    /// to call this function if you're using a non-zero metrics interval,
277    /// which will automatically propagate metrics on the set interval for you
278    pub async fn push_metrics(&self) -> Result<(), Error> {
279        let (tx, rx) = oneshot::channel();
280        self.message_channel
281            .send(ClientActorMessage::SendMetrics { done: tx })
282            .await
283            .map_err(|_| Error::Other(anyhow!("sending metrics")))?;
284
285        rx.await
286            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
287            .map_err(Error::Remote)
288    }
289
290    /// Enable alert forwarding. Returns a [`LogMonitor`] tracing layer that
291    /// captures ERROR-level log events from the `iroh` crate and forwards
292    /// them to n0des. The caller must install the returned layer into their
293    /// tracing subscriber stack.
294    ///
295    /// ```no_run
296    /// use tracing_subscriber::prelude::*;
297    ///
298    /// # async fn example(client: &iroh_n0des::Client) -> anyhow::Result<()> {
299    /// let alert_layer = client.enable_alerts().await?;
300    /// tracing_subscriber::registry()
301    ///     .with(alert_layer)
302    ///     .with(tracing_subscriber::fmt::layer())
303    ///     .init();
304    /// # Ok(())
305    /// # }
306    /// ```
307    pub async fn enable_alerts(&self) -> Result<LogMonitor, Error> {
308        let (tx, rx) = tokio::sync::mpsc::channel(64);
309        let (done_tx, done_rx) = oneshot::channel();
310        self.message_channel
311            .send(ClientActorMessage::EnableAlerts { rx, done: done_tx })
312            .await
313            .map_err(|_| Error::Other(anyhow!("enabling alerts")))?;
314        done_rx
315            .await
316            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
317        Ok(LogMonitor::new(tx))
318    }
319
320    /// Grant capabilities to a remote endpoint. Creates a signed RCAN token
321    /// and sends it to iroh-services for storage. The remote can then use this token
322    /// when dialing back to authorize its requests.
323    #[cfg(feature = "client_host")]
324    pub async fn grant_capability(
325        &self,
326        remote_id: EndpointId,
327        caps: impl IntoIterator<Item = impl Into<crate::caps::Cap>>,
328    ) -> Result<(), Error> {
329        let cap = crate::caps::create_grant_token(
330            self.endpoint.secret_key().clone(),
331            remote_id,
332            DEFAULT_CAP_EXPIRY,
333            Caps::new(caps),
334        )
335        .map_err(Error::Other)?;
336
337        let (tx, rx) = oneshot::channel();
338        self.message_channel
339            .send(ClientActorMessage::GrantCap {
340                cap: Box::new(cap),
341                done: tx,
342            })
343            .await
344            .map_err(|_| Error::Other(anyhow!("granting capability")))?;
345
346        rx.await
347            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
348    }
349
350    /// run local network status diagnostics, optionally uploading the results
351    #[cfg(feature = "net_diagnostics")]
352    pub async fn net_diagnostics(&self, send: bool) -> Result<DiagnosticsReport, Error> {
353        let report = run_diagnostics(&self.endpoint).await?;
354        if send {
355            let (tx, rx) = oneshot::channel();
356            self.message_channel
357                .send(ClientActorMessage::PutNetworkDiagnostics {
358                    done: tx,
359                    report: Box::new(report.clone()),
360                })
361                .await
362                .map_err(|_| Error::Other(anyhow!("sending network diagnostics report")))?;
363
364            let _ = rx
365                .await
366                .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
367        }
368
369        Ok(report)
370    }
371}
372
373enum ClientActorMessage {
374    SendMetrics {
375        done: oneshot::Sender<Result<(), RemoteError>>,
376    },
377    Ping {
378        done: oneshot::Sender<Result<Pong, RemoteError>>,
379    },
380    EnableAlerts {
381        rx: tokio::sync::mpsc::Receiver<AlertInfo>,
382        done: oneshot::Sender<()>,
383    },
384    // GrantCap is used by the `client_host` feature flag
385    #[allow(dead_code)]
386    GrantCap {
387        // boxed to avoid large enum variants
388        cap: Box<Rcan<Caps>>,
389        done: oneshot::Sender<Result<(), Error>>,
390    },
391    #[cfg(feature = "net_diagnostics")]
392    PutNetworkDiagnostics {
393        report: Box<DiagnosticsReport>,
394        done: oneshot::Sender<Result<(), Error>>,
395    },
396}
397
398struct ClientActor {
399    capabilities: Rcan<Caps>,
400    client: IrohServicesClient,
401    session_id: Uuid,
402    authorized: bool,
403}
404
405impl ClientActor {
406    async fn run(
407        mut self,
408        registry: Registry,
409        interval: Option<Duration>,
410        mut inbox: tokio::sync::mpsc::Receiver<ClientActorMessage>,
411    ) {
412        let registry = Arc::new(RwLock::new(registry));
413        let mut encoder = Encoder::new(registry);
414        let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
415        let mut alert_rx: Option<tokio::sync::mpsc::Receiver<AlertInfo>> = None;
416        trace!("starting client actor");
417        loop {
418            trace!("client actor tick");
419            tokio::select! {
420                biased;
421                Some(msg) = inbox.recv() => {
422                    match msg {
423                        ClientActorMessage::Ping{ done } => {
424                            let res = self.send_ping().await;
425                            if let Err(err) = done.send(res) {
426                                debug!("failed to send ping: {:#?}", err);
427                                self.authorized = false;
428                            }
429                        },
430                        ClientActorMessage::EnableAlerts{ rx, done } => {
431                            alert_rx = Some(rx);
432                            let _ = done.send(());
433                        }
434                        ClientActorMessage::SendMetrics{ done } => {
435                            trace!("sending metrics manually triggered");
436                            let res = self.send_metrics(&mut encoder).await;
437                            if let Err(err) = done.send(res) {
438                                debug!("failed to push metrics: {:#?}", err);
439                                self.authorized = false;
440                            }
441                        }
442                        ClientActorMessage::GrantCap{ cap, done } => {
443                            let res = self.grant_cap(*cap).await;
444                            if let Err(err) = done.send(res) {
445                                warn!("failed to grant capability: {:#?}", err);
446                            }
447                        }
448                        #[cfg(feature = "net_diagnostics")]
449                        ClientActorMessage::PutNetworkDiagnostics{ report, done } => {
450                            let res = self.put_network_diagnostics(*report).await;
451                            if let Err(err) = done.send(res) {
452                                warn!("failed to publish network diagnostics: {:#?}", err);
453                            }
454                        }
455                    }
456                }
457                _ = async {
458                    if let Some(ref mut timer) = metrics_timer {
459                        timer.tick().await;
460                    } else {
461                        std::future::pending::<()>().await;
462                    }
463                } => {
464                    trace!("metrics send tick");
465                    if let Err(err) = self.send_metrics(&mut encoder).await {
466                        debug!("failed to push metrics: {:#?}", err);
467                        self.authorized = false;
468                    }
469                },
470                Some(alert) = async {
471                    if let Some(ref mut rx) = alert_rx {
472                        rx.recv().await
473                    } else {
474                        std::future::pending::<Option<AlertInfo>>().await
475                    }
476                } => {
477                    if let Err(err) = self.send_alert(alert).await {
478                        debug!("failed to send alert: {:#?}", err);
479                        self.authorized = false;
480                    }
481                },
482            }
483        }
484    }
485
486    // sends an authorization request to the server
487    async fn auth(&mut self) -> Result<(), RemoteError> {
488        if self.authorized {
489            return Ok(());
490        }
491        trace!("client authorizing");
492        self.client
493            .rpc(Auth {
494                caps: self.capabilities.clone(),
495            })
496            .await
497            .inspect_err(|e| debug!("authorization failed: {:?}", e))
498            .map_err(|e| RemoteError::AuthError(e.to_string()))?;
499        self.authorized = true;
500        Ok(())
501    }
502
503    async fn send_ping(&mut self) -> Result<Pong, RemoteError> {
504        trace!("client actor send ping");
505        self.auth().await?;
506
507        let req = rand::random();
508        self.client
509            .rpc(Ping { req_id: req })
510            .await
511            .inspect_err(|e| warn!("rpc ping error: {e}"))
512            .map_err(|_| RemoteError::InternalServerError)
513    }
514
515    async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> {
516        trace!("client actor send metrics");
517        self.auth().await?;
518
519        let update = encoder.export();
520        // let delta = update_delta(&self.latest_ackd_update, &update);
521        let req = PutMetrics {
522            session_id: self.session_id,
523            update,
524        };
525
526        self.client
527            .rpc(req)
528            .await
529            .map_err(|_| RemoteError::InternalServerError)??;
530
531        Ok(())
532    }
533
534    async fn send_alert(&mut self, alert: AlertInfo) -> Result<(), RemoteError> {
535        trace!("client actor send alert");
536        self.auth().await?;
537
538        let req = SendAlert {
539            session_id: self.session_id,
540            alert,
541        };
542
543        self.client
544            .rpc(req)
545            .await
546            .map_err(|_| RemoteError::InternalServerError)??;
547
548        Ok(())
549    }
550
551    async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
552        trace!("client actor grant capability");
553        self.auth().await?;
554
555        self.client
556            .rpc(crate::protocol::GrantCap { cap })
557            .await
558            .map_err(|_| RemoteError::InternalServerError)??;
559
560        Ok(())
561    }
562
563    #[cfg(feature = "net_diagnostics")]
564    async fn put_network_diagnostics(
565        &mut self,
566        report: crate::net_diagnostics::DiagnosticsReport,
567    ) -> Result<(), Error> {
568        trace!("client actor publish network diagnostics");
569        self.auth().await?;
570
571        let req = PutNetworkDiagnostics { report };
572
573        self.client
574            .rpc(req)
575            .await
576            .map_err(|_| RemoteError::InternalServerError)??;
577
578        Ok(())
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use iroh::{Endpoint, EndpointAddr, SecretKey};
585    use temp_env_vars::temp_env_vars;
586
587    use crate::{
588        Client,
589        api_secret::ApiSecret,
590        caps::{Cap, Caps},
591        client::API_SECRET_ENV_VAR_NAME,
592    };
593
594    #[tokio::test]
595    #[temp_env_vars]
596    async fn test_api_key_from_env() {
597        // construct
598        let mut rng = rand::rng();
599        let shared_secret = SecretKey::generate(&mut rng);
600        let fake_endpoint_id = SecretKey::generate(&mut rng).public();
601        let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
602        unsafe {
603            std::env::set_var(API_SECRET_ENV_VAR_NAME, api_secret.to_string());
604        };
605
606        let endpoint = Endpoint::empty_builder().bind().await.unwrap();
607
608        let builder = Client::builder(&endpoint).api_secret_from_env().unwrap();
609
610        let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
611        assert_eq!(builder.remote, Some(fake_endpoint_addr));
612
613        // Compare capability fields individually to avoid flaky timestamp
614        // mismatches between the builder's rcan and a freshly-created one.
615        let cap = builder.cap.as_ref().expect("expected capability to be set");
616        assert_eq!(cap.capability(), &Caps::new([Cap::Client]));
617        assert_eq!(cap.audience(), &endpoint.id().as_verifying_key());
618        assert_eq!(cap.issuer(), &shared_secret.public().as_verifying_key());
619    }
620
621    /// Assert that disabling metrics interval can manually send metrics without
622    /// panicking. Metrics sending itself is expected to fail.
623    #[tokio::test]
624    async fn test_no_metrics_interval() {
625        let mut rng = rand::rng();
626        let shared_secret = SecretKey::generate(&mut rng);
627        let fake_endpoint_id = SecretKey::generate(&mut rng).public();
628        let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
629
630        let endpoint = Endpoint::empty_builder().bind().await.unwrap();
631
632        let client = Client::builder(&endpoint)
633            .disable_metrics_interval()
634            .api_secret(api_secret)
635            .unwrap()
636            .build()
637            .await
638            .unwrap();
639
640        let err = client.push_metrics().await;
641        assert!(err.is_err());
642    }
643}