Skip to main content

iroh_services/
client.rs

1use std::{
2    str::FromStr,
3    sync::{Arc, RwLock},
4};
5
6use anyhow::{Result, anyhow, ensure};
7use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
8use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder};
9use irpc_iroh::IrohLazyRemoteConnection;
10use n0_error::StackResultExt;
11use n0_future::{task::AbortOnDropHandle, time::Duration};
12use rcan::Rcan;
13use tokio::sync::oneshot;
14use tracing::{debug, trace, warn};
15use uuid::Uuid;
16
17#[cfg(feature = "net_diagnostics")]
18use crate::net_diagnostics::{DiagnosticsReport, checks::run_diagnostics};
19#[cfg(feature = "net_diagnostics")]
20use crate::protocol::PutNetworkDiagnostics;
21use crate::{
22    api_secret::ApiSecret,
23    caps::Caps,
24    protocol::{ALPN, Auth, IrohServicesClient, Ping, Pong, PutMetrics, RemoteError},
25};
26
27/// Client is the main handle for interacting with iroh-services. It communicates with
28/// iroh-services entirely through an iroh endpoint, and is configured through a builder.
29/// Client requires either an Ssh Key or [`ApiSecret`]
30///
31/// ```no_run
32/// use iroh_services::Client;
33///
34/// async fn build_client() -> anyhow::Result<()> {
35///     let endpoint = iroh::Endpoint::bind().await?;
36///
37///     // needs IROH_SERVICES_API_SECRET set to an environment variable
38///     // client will now push endpoint metrics to iroh-services.
39///     let client = Client::builder(&endpoint)
40///         .api_secret_from_str("MY_API_SECRET")?
41///         .build()
42///         .await;
43///
44///     Ok(())
45/// }
46/// ```
47///
48/// [`ApiSecret`]: crate::api_secret::ApiSecret
49#[derive(Debug, Clone)]
50pub struct Client {
51    // owned clone of the endpoint for diagnostics, and for connection restarts on actor close
52    #[allow(dead_code)]
53    endpoint: Endpoint,
54    message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
55    _actor_task: Arc<AbortOnDropHandle<()>>,
56}
57
58/// ClientBuilder provides configures and builds a iroh-services client, typically
59/// created with [`Client::builder`]
60pub struct ClientBuilder {
61    #[allow(dead_code)]
62    cap_expiry: Duration,
63    cap: Option<Rcan<Caps>>,
64    endpoint: Endpoint,
65    metrics_interval: Option<Duration>,
66    remote: Option<EndpointAddr>,
67    registry: Registry,
68}
69
70const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 1 month
71pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET";
72
73impl ClientBuilder {
74    pub fn new(endpoint: &Endpoint) -> Self {
75        let mut registry = Registry::default();
76        registry.register_all(endpoint.metrics());
77
78        Self {
79            cap: None,
80            cap_expiry: DEFAULT_CAP_EXPIRY,
81            endpoint: endpoint.clone(),
82            metrics_interval: Some(Duration::from_secs(60)),
83            remote: None,
84            registry,
85        }
86    }
87
88    /// Register a metrics group to forward to iroh-services
89    ///
90    /// The default registered metrics uses only the endpoint
91    pub fn register_metrics_group(mut self, metrics_group: Arc<dyn MetricsGroup>) -> Self {
92        self.registry.register(metrics_group);
93        self
94    }
95
96    /// Set the metrics collection interval
97    ///
98    /// Defaults to enabled, every 60 seconds.
99    pub fn metrics_interval(mut self, interval: Duration) -> Self {
100        self.metrics_interval = Some(interval);
101        self
102    }
103
104    /// Disable metrics collection.
105    pub fn disable_metrics_interval(mut self) -> Self {
106        self.metrics_interval = None;
107        self
108    }
109
110    /// Check IROH_SERVICES_API_SECRET environment variable for a valid API secret
111    pub fn api_secret_from_env(self) -> Result<Self> {
112        let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
113        self.api_secret(ticket)
114    }
115
116    /// set client API secret from an encoded string
117    pub fn api_secret_from_str(self, secret_key: &str) -> Result<Self> {
118        let key = ApiSecret::from_str(secret_key).context("invalid iroh services api secret")?;
119        self.api_secret(key)
120    }
121
122    /// Use a shared secret & remote iroh-services endpoint ID contained within a ticket
123    /// to construct a iroh-services client. The resulting client will have "Client"
124    /// capabilities.
125    ///
126    /// API secrets include remote details within them, and will set both the
127    /// remote and rcan values on the builder
128    pub fn api_secret(mut self, ticket: ApiSecret) -> Result<Self> {
129        let local_id = self.endpoint.id();
130        let rcan = crate::caps::create_api_token_from_secret_key(
131            ticket.secret,
132            local_id,
133            self.cap_expiry,
134            Caps::for_shared_secret(),
135        )?;
136
137        self.remote = Some(ticket.remote);
138        self.rcan(rcan)
139    }
140
141    /// Loads the private ssh key from the given path, and creates the needed capability.
142    #[cfg(feature = "ssh-key")]
143    pub async fn ssh_key_from_file<P: AsRef<std::path::Path>>(self, path: P) -> Result<Self> {
144        let file_content = tokio::fs::read_to_string(path).await?;
145        let private_key = ssh_key::PrivateKey::from_openssh(&file_content)?;
146
147        self.ssh_key(&private_key)
148    }
149
150    /// Creates the capability from the provided private ssh key.
151    #[cfg(feature = "ssh-key")]
152    pub fn ssh_key(mut self, key: &ssh_key::PrivateKey) -> Result<Self> {
153        let local_id = self.endpoint.id();
154        let rcan = crate::caps::create_api_token_from_ssh_key(
155            key,
156            local_id,
157            self.cap_expiry,
158            Caps::all(),
159        )?;
160        self.cap.replace(rcan);
161
162        Ok(self)
163    }
164
165    /// Sets the rcan directly.
166    pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
167        ensure!(
168            EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
169            "invalid audience"
170        );
171        self.cap.replace(cap);
172        Ok(self)
173    }
174
175    /// Sets the remote to dial, must be provided either directly by calling
176    /// this method, or through calling the api_secret builder methods.
177    pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
178        self.remote = Some(remote.into());
179        self
180    }
181
182    /// Create a new client, connected to the provide service node
183    #[must_use = "dropping the client will silently cancel all client tasks"]
184    pub async fn build(self) -> Result<Client, BuildError> {
185        debug!("starting iroh-services client");
186        let remote = self.remote.ok_or(BuildError::MissingRemote)?;
187        let capabilities = self.cap.ok_or(BuildError::MissingCapability)?;
188
189        let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
190        let client = IrohServicesClient::boxed(conn);
191
192        let (tx, rx) = tokio::sync::mpsc::channel(1);
193        let metrics_task = AbortOnDropHandle::new(n0_future::task::spawn(
194            ClientActor {
195                capabilities,
196                client,
197                session_id: Uuid::new_v4(),
198                authorized: false,
199            }
200            .run(self.registry, self.metrics_interval, rx),
201        ));
202
203        Ok(Client {
204            endpoint: self.endpoint,
205            message_channel: tx,
206            _actor_task: Arc::new(metrics_task),
207        })
208    }
209}
210
211#[derive(thiserror::Error, Debug)]
212pub enum BuildError {
213    #[error("Missing remote endpoint to dial")]
214    MissingRemote,
215    #[error("Missing capability")]
216    MissingCapability,
217    #[error("Unauthorized")]
218    Unauthorized,
219    #[error("Remote error: {0}")]
220    Remote(#[from] RemoteError),
221    #[error("Rpc connection error: {0}")]
222    Rpc(irpc::Error),
223    #[error("Connection error: {0}")]
224    Connect(ConnectError),
225}
226
227impl From<irpc::Error> for BuildError {
228    fn from(value: irpc::Error) -> Self {
229        match value {
230            irpc::Error::Request {
231                source:
232                    irpc::RequestError::Connection {
233                        source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
234                        ..
235                    },
236                ..
237            } if frame.error_code == 401u32.into() => Self::Unauthorized,
238            value => Self::Rpc(value),
239        }
240    }
241}
242
243#[derive(thiserror::Error, Debug)]
244pub enum Error {
245    #[error("Remote error: {0}")]
246    Remote(#[from] RemoteError),
247    #[error("Connection error: {0}")]
248    Rpc(#[from] irpc::Error),
249    #[error(transparent)]
250    Other(#[from] anyhow::Error),
251}
252
253impl Client {
254    pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
255        ClientBuilder::new(endpoint)
256    }
257
258    /// Pings the remote node.
259    pub async fn ping(&self) -> Result<Pong, Error> {
260        let (tx, rx) = oneshot::channel();
261        self.message_channel
262            .send(ClientActorMessage::Ping { done: tx })
263            .await
264            .map_err(|_| Error::Other(anyhow!("sending ping request")))?;
265
266        rx.await
267            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
268            .map_err(Error::Remote)
269    }
270
271    /// immediately send a single dump of metrics to iroh-services. It's not necessary
272    /// to call this function if you're using a non-zero metrics interval,
273    /// which will automatically propagate metrics on the set interval for you
274    pub async fn push_metrics(&self) -> Result<(), Error> {
275        let (tx, rx) = oneshot::channel();
276        self.message_channel
277            .send(ClientActorMessage::SendMetrics { done: tx })
278            .await
279            .map_err(|_| Error::Other(anyhow!("sending metrics")))?;
280
281        rx.await
282            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
283            .map_err(Error::Remote)
284    }
285
286    /// Grant capabilities to a remote endpoint. Creates a signed RCAN token
287    /// and sends it to iroh-services for storage. The remote can then use this token
288    /// when dialing back to authorize its requests.
289    #[cfg(feature = "client_host")]
290    pub async fn grant_capability(
291        &self,
292        remote_id: EndpointId,
293        caps: impl IntoIterator<Item = impl Into<crate::caps::Cap>>,
294    ) -> Result<(), Error> {
295        let cap = crate::caps::create_grant_token(
296            self.endpoint.secret_key().clone(),
297            remote_id,
298            DEFAULT_CAP_EXPIRY,
299            Caps::new(caps),
300        )
301        .map_err(Error::Other)?;
302
303        let (tx, rx) = oneshot::channel();
304        self.message_channel
305            .send(ClientActorMessage::GrantCap {
306                cap: Box::new(cap),
307                done: tx,
308            })
309            .await
310            .map_err(|_| Error::Other(anyhow!("granting capability")))?;
311
312        rx.await
313            .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
314    }
315
316    /// run local network status diagnostics, optionally uploading the results
317    #[cfg(feature = "net_diagnostics")]
318    pub async fn net_diagnostics(&self, send: bool) -> Result<DiagnosticsReport, Error> {
319        let report = run_diagnostics(&self.endpoint).await?;
320        if send {
321            let (tx, rx) = oneshot::channel();
322            self.message_channel
323                .send(ClientActorMessage::PutNetworkDiagnostics {
324                    done: tx,
325                    report: Box::new(report.clone()),
326                })
327                .await
328                .map_err(|_| Error::Other(anyhow!("sending network diagnostics report")))?;
329
330            let _ = rx
331                .await
332                .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
333        }
334
335        Ok(report)
336    }
337}
338
339enum ClientActorMessage {
340    SendMetrics {
341        done: oneshot::Sender<Result<(), RemoteError>>,
342    },
343    Ping {
344        done: oneshot::Sender<Result<Pong, RemoteError>>,
345    },
346    // GrantCap is used by the `client_host` feature flag
347    #[allow(dead_code)]
348    GrantCap {
349        // boxed to avoid large enum variants
350        cap: Box<Rcan<Caps>>,
351        done: oneshot::Sender<Result<(), Error>>,
352    },
353    #[cfg(feature = "net_diagnostics")]
354    PutNetworkDiagnostics {
355        report: Box<DiagnosticsReport>,
356        done: oneshot::Sender<Result<(), Error>>,
357    },
358}
359
360struct ClientActor {
361    capabilities: Rcan<Caps>,
362    client: IrohServicesClient,
363    session_id: Uuid,
364    authorized: bool,
365}
366
367impl ClientActor {
368    async fn run(
369        mut self,
370        registry: Registry,
371        interval: Option<Duration>,
372        mut inbox: tokio::sync::mpsc::Receiver<ClientActorMessage>,
373    ) {
374        let registry = Arc::new(RwLock::new(registry));
375        let mut encoder = Encoder::new(registry);
376        let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
377        trace!("starting client actor");
378        loop {
379            trace!("client actor tick");
380            tokio::select! {
381                biased;
382                Some(msg) = inbox.recv() => {
383                    match msg {
384                        ClientActorMessage::Ping{ done } => {
385                            let res = self.send_ping().await;
386                            if let Err(err) = done.send(res) {
387                                debug!("failed to send ping: {:#?}", err);
388                                self.authorized = false;
389                            }
390                        },
391                        ClientActorMessage::SendMetrics{ done } => {
392                            trace!("sending metrics manually triggered");
393                            let res = self.send_metrics(&mut encoder).await;
394                            if let Err(err) = done.send(res) {
395                                debug!("failed to push metrics: {:#?}", err);
396                                self.authorized = false;
397                            }
398                        }
399                        ClientActorMessage::GrantCap{ cap, done } => {
400                            let res = self.grant_cap(*cap).await;
401                            if let Err(err) = done.send(res) {
402                                warn!("failed to grant capability: {:#?}", err);
403                            }
404                        }
405                        #[cfg(feature = "net_diagnostics")]
406                        ClientActorMessage::PutNetworkDiagnostics{ report, done } => {
407                            let res = self.put_network_diagnostics(*report).await;
408                            if let Err(err) = done.send(res) {
409                                warn!("failed to publish network diagnostics: {:#?}", err);
410                            }
411                        }
412                    }
413                }
414                _ = async {
415                    if let Some(ref mut timer) = metrics_timer {
416                        timer.tick().await;
417                    } else {
418                        std::future::pending::<()>().await;
419                    }
420                } => {
421                    trace!("metrics send tick");
422                    if let Err(err) = self.send_metrics(&mut encoder).await {
423                        debug!("failed to push metrics: {:#?}", err);
424                        self.authorized = false;
425                    }
426                },
427            }
428        }
429    }
430
431    // sends an authorization request to the server
432    async fn auth(&mut self) -> Result<(), RemoteError> {
433        if self.authorized {
434            return Ok(());
435        }
436        trace!("client authorizing");
437        self.client
438            .rpc(Auth {
439                caps: self.capabilities.clone(),
440            })
441            .await
442            .inspect_err(|e| debug!("authorization failed: {:?}", e))
443            .map_err(|e| RemoteError::AuthError(e.to_string()))?;
444        self.authorized = true;
445        Ok(())
446    }
447
448    async fn send_ping(&mut self) -> Result<Pong, RemoteError> {
449        trace!("client actor send ping");
450        self.auth().await?;
451
452        let req = rand::random();
453        self.client
454            .rpc(Ping { req_id: req })
455            .await
456            .inspect_err(|e| warn!("rpc ping error: {e}"))
457            .map_err(|_| RemoteError::InternalServerError)
458    }
459
460    async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> {
461        trace!("client actor send metrics");
462        self.auth().await?;
463
464        let update = encoder.export();
465        // let delta = update_delta(&self.latest_ackd_update, &update);
466        let req = PutMetrics {
467            session_id: self.session_id,
468            update,
469        };
470
471        self.client
472            .rpc(req)
473            .await
474            .map_err(|_| RemoteError::InternalServerError)??;
475
476        Ok(())
477    }
478
479    async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
480        trace!("client actor grant capability");
481        self.auth().await?;
482
483        self.client
484            .rpc(crate::protocol::GrantCap { cap })
485            .await
486            .map_err(|_| RemoteError::InternalServerError)??;
487
488        Ok(())
489    }
490
491    #[cfg(feature = "net_diagnostics")]
492    async fn put_network_diagnostics(
493        &mut self,
494        report: crate::net_diagnostics::DiagnosticsReport,
495    ) -> Result<(), Error> {
496        trace!("client actor publish network diagnostics");
497        self.auth().await?;
498
499        let req = PutNetworkDiagnostics { report };
500
501        self.client
502            .rpc(req)
503            .await
504            .map_err(|_| RemoteError::InternalServerError)??;
505
506        Ok(())
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use iroh::{Endpoint, EndpointAddr, SecretKey};
513    use temp_env_vars::temp_env_vars;
514
515    use crate::{
516        Client,
517        api_secret::ApiSecret,
518        caps::{Cap, Caps},
519        client::API_SECRET_ENV_VAR_NAME,
520    };
521
522    #[tokio::test]
523    #[temp_env_vars]
524    async fn test_api_key_from_env() {
525        // construct
526        let mut rng = rand::rng();
527        let shared_secret = SecretKey::generate(&mut rng);
528        let fake_endpoint_id = SecretKey::generate(&mut rng).public();
529        let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
530        unsafe {
531            std::env::set_var(API_SECRET_ENV_VAR_NAME, api_secret.to_string());
532        };
533
534        let endpoint = Endpoint::empty_builder(iroh::RelayMode::Disabled)
535            .bind()
536            .await
537            .unwrap();
538
539        let builder = Client::builder(&endpoint).api_secret_from_env().unwrap();
540
541        let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
542        assert_eq!(builder.remote, Some(fake_endpoint_addr));
543
544        // Compare capability fields individually to avoid flaky timestamp
545        // mismatches between the builder's rcan and a freshly-created one.
546        let cap = builder.cap.as_ref().expect("expected capability to be set");
547        assert_eq!(cap.capability(), &Caps::new([Cap::Client]));
548        assert_eq!(cap.audience(), &endpoint.id().as_verifying_key());
549        assert_eq!(cap.issuer(), &shared_secret.public().as_verifying_key());
550    }
551
552    /// Assert that disabling metrics interval can manually send metrics without
553    /// panicking. Metrics sending itself is expected to fail.
554    #[tokio::test]
555    async fn test_no_metrics_interval() {
556        let mut rng = rand::rng();
557        let shared_secret = SecretKey::generate(&mut rng);
558        let fake_endpoint_id = SecretKey::generate(&mut rng).public();
559        let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
560
561        let endpoint = Endpoint::empty_builder(iroh::RelayMode::Disabled)
562            .bind()
563            .await
564            .unwrap();
565
566        let client = Client::builder(&endpoint)
567            .disable_metrics_interval()
568            .api_secret(api_secret)
569            .unwrap()
570            .build()
571            .await
572            .unwrap();
573
574        let err = client.push_metrics().await;
575        assert!(err.is_err());
576    }
577}