iroh_n0des/
client.rs

1use std::{
2    env::VarError,
3    path::Path,
4    str::FromStr,
5    sync::{Arc, RwLock},
6    time::Duration,
7};
8
9use anyhow::{Result, anyhow, ensure};
10use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
11use iroh_metrics::{Registry, encoding::Encoder};
12use irpc_iroh::IrohLazyRemoteConnection;
13use n0_error::StackResultExt;
14use n0_future::task::AbortOnDropHandle;
15use rcan::Rcan;
16use tracing::{debug, trace, warn};
17use uuid::Uuid;
18
19use crate::{
20    caps::Caps,
21    protocol::{ALPN, Auth, N0desClient, Ping, PutMetrics, RemoteError},
22    ticket::N0desTicket,
23};
24
25#[derive(Debug)]
26pub struct Client {
27    client: N0desClient,
28    _metrics_task: Option<AbortOnDropHandle<()>>,
29}
30
31impl Drop for Client {
32    fn drop(&mut self) {
33        debug!("n0des client is being dropped");
34    }
35}
36
37/// Constructs a n0des client
38pub struct ClientBuilder {
39    cap_expiry: Duration,
40    cap: Option<Rcan<Caps>>,
41    endpoint: Endpoint,
42    enable_metrics: Option<Duration>,
43    remote: Option<EndpointAddr>,
44}
45
46const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 1 month
47
48impl ClientBuilder {
49    pub fn new(endpoint: &Endpoint) -> Self {
50        Self {
51            cap: None,
52            cap_expiry: DEFAULT_CAP_EXPIRY,
53            endpoint: endpoint.clone(),
54            enable_metrics: Some(Duration::from_secs(10)),
55            remote: None,
56        }
57    }
58
59    /// Set the metrics collection interval
60    ///
61    /// Defaults to enabled, every 60 seconds.
62    pub fn metrics_interval(mut self, interval: Duration) -> Self {
63        self.enable_metrics = Some(interval);
64        self
65    }
66
67    /// Disable metrics collection.
68    pub fn disable_metrics(mut self) -> Self {
69        self.enable_metrics = None;
70        self
71    }
72
73    /// Check N0DES_SECRET environment variable to supply a N0desTicket
74    pub fn secret_from_env(self) -> Result<Self> {
75        match std::env::var("N0DES_SECRET") {
76            Ok(ticket_string) => {
77                let ticket =
78                    N0desTicket::from_str(&ticket_string).context("invalid N0DES_SECRET")?;
79                self.ticket(ticket)
80            }
81            Err(VarError::NotPresent) => {
82                Err(anyhow!("N0DES_SECRET environment variable is not set"))
83            }
84            Err(VarError::NotUnicode(e)) => Err(anyhow!(
85                "N0DES_SECRET environment variable is not valid unicode: {:?}",
86                e
87            )),
88        }
89    }
90
91    /// Use a shared secret & remote n0des endpoint ID contained within a ticket
92    /// to construct a n0des client. The resulting client will have "Client"
93    /// capabilities.
94    pub fn ticket(mut self, ticket: N0desTicket) -> Result<Self> {
95        let local_id = self.endpoint.id();
96        let rcan = crate::caps::create_api_token_from_secret_key(
97            ticket.secret,
98            local_id,
99            self.cap_expiry,
100            Caps::for_shared_secret(),
101        )?;
102
103        self.remote = Some(ticket.remote);
104        self.rcan(rcan)
105    }
106
107    /// Loads the private ssh key from the given path, and creates the needed capability.
108    pub async fn ssh_key_from_file<P: AsRef<Path>>(self, path: P) -> Result<Self> {
109        let file_content = tokio::fs::read_to_string(path).await?;
110        let private_key = ssh_key::PrivateKey::from_openssh(&file_content)?;
111
112        self.ssh_key(&private_key)
113    }
114
115    /// Creates the capability from the provided private ssh key.
116    pub fn ssh_key(mut self, key: &ssh_key::PrivateKey) -> Result<Self> {
117        let local_id = self.endpoint.id();
118        let rcan = crate::caps::create_api_token(key, local_id, self.cap_expiry, Caps::all())?;
119        self.cap.replace(rcan);
120
121        Ok(self)
122    }
123
124    /// Sets the rcan directly.
125    pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
126        ensure!(
127            EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
128            "invalid audience"
129        );
130        self.cap.replace(cap);
131        Ok(self)
132    }
133
134    /// Sets the remote to dial, must be provided either directly by calling
135    /// this method, or via the
136    pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
137        self.remote = Some(remote.into());
138        self
139    }
140
141    /// Create a new client, connected to the provide service node
142    pub async fn build(self) -> Result<Client, BuildError> {
143        debug!("starting iroh-n0des client");
144        let remote = self.remote.ok_or(BuildError::MissingRemote)?;
145        let cap = self.cap.ok_or(BuildError::MissingCapability)?;
146
147        let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
148        let client = N0desClient::boxed(conn);
149
150        // If auth fails, the connection is aborted.
151        let () = client.rpc(Auth { caps: cap }).await?;
152
153        let metrics_task = self.enable_metrics.map(|interval| {
154            debug!(interval = ?interval, "starting metrics task");
155            AbortOnDropHandle::new(n0_future::task::spawn(
156                MetricsTask {
157                    client: client.clone(),
158                    session_id: Uuid::new_v4(),
159                    endpoint: self.endpoint.clone(),
160                }
161                .run(interval),
162            ))
163        });
164
165        Ok(Client {
166            client,
167            _metrics_task: metrics_task,
168        })
169    }
170}
171
172#[derive(thiserror::Error, Debug)]
173pub enum BuildError {
174    #[error("Missing remote endpoint to dial")]
175    MissingRemote,
176    #[error("Missing capability")]
177    MissingCapability,
178    #[error("Unauthorized")]
179    Unauthorized,
180    #[error("Remote error: {0}")]
181    Remote(#[from] RemoteError),
182    #[error("Rpc connection error: {0}")]
183    Rpc(irpc::Error),
184    #[error("Connection error: {0}")]
185    Connect(ConnectError),
186}
187
188impl From<irpc::Error> for BuildError {
189    fn from(value: irpc::Error) -> Self {
190        match value {
191            irpc::Error::Request {
192                source:
193                    irpc::RequestError::Connection {
194                        source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
195                        ..
196                    },
197                ..
198            } if frame.error_code == 401u32.into() => Self::Unauthorized,
199            value => Self::Rpc(value),
200        }
201    }
202}
203
204#[derive(thiserror::Error, Debug)]
205pub enum Error {
206    #[error("Remote error: {0}")]
207    Remote(#[from] RemoteError),
208    #[error("Connection error: {0}")]
209    Rpc(#[from] irpc::Error),
210    #[error(transparent)]
211    Other(#[from] anyhow::Error),
212}
213
214impl Client {
215    pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
216        ClientBuilder::new(endpoint)
217    }
218
219    /// Pings the remote node.
220    pub async fn ping(&mut self) -> Result<(), Error> {
221        let req = rand::random();
222        let pong = self.client.rpc(Ping { req }).await?;
223        if pong.req == req {
224            Ok(())
225        } else {
226            Err(Error::Other(anyhow!("unexpected pong response")))
227        }
228    }
229}
230
231struct MetricsTask {
232    client: N0desClient,
233    session_id: Uuid,
234    endpoint: Endpoint,
235}
236
237impl MetricsTask {
238    async fn run(self, interval: Duration) {
239        let mut registry = Registry::default();
240        registry.register_all(self.endpoint.metrics());
241        let registry = Arc::new(RwLock::new(registry));
242        let mut encoder = Encoder::new(registry);
243
244        let mut metrics_timer = tokio::time::interval(interval);
245
246        loop {
247            metrics_timer.tick().await;
248            trace!("metrics send tick");
249            if let Err(err) = self.send_metrics(&mut encoder).await {
250                warn!("failed to push metrics: {:#?}", err);
251            }
252        }
253    }
254
255    async fn send_metrics(&self, encoder: &mut Encoder) -> Result<()> {
256        let update = encoder.export();
257        let req = PutMetrics {
258            session_id: self.session_id,
259            update,
260        };
261
262        self.client
263            .rpc(req)
264            .await
265            .inspect_err(|e| warn!("rpc send metrics error: {e}"))?
266            .inspect_err(|e| warn!("metrics server response error: {e}"))?;
267        trace!("sent metrics");
268        Ok(())
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use iroh::{Endpoint, EndpointAddr, SecretKey};
275
276    use crate::{Client, caps::Caps, ticket::N0desTicket};
277
278    #[tokio::test]
279    async fn test_builder_from_env() {
280        // construct
281        let mut rng = rand::rng();
282        let shared_secret = SecretKey::generate(&mut rng);
283        let fake_endpoint_id = SecretKey::generate(&mut rng).public();
284        let n0des_ticket = N0desTicket::new(shared_secret.clone(), fake_endpoint_id);
285        unsafe {
286            std::env::set_var("N0DES_SECRET", n0des_ticket.to_string());
287        };
288
289        let endpoint = Endpoint::empty_builder(iroh::RelayMode::Disabled)
290            .bind()
291            .await
292            .unwrap();
293
294        let builder = Client::builder(&endpoint).secret_from_env().unwrap();
295
296        let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
297        assert_eq!(builder.remote, Some(fake_endpoint_addr));
298
299        let rcan = crate::caps::create_api_token_from_secret_key(
300            shared_secret,
301            endpoint.id(),
302            builder.cap_expiry,
303            Caps::for_shared_secret(),
304        )
305        .unwrap();
306        assert_eq!(builder.cap, Some(rcan));
307
308        unsafe {
309            std::env::remove_var("N0DES_SECRET");
310        };
311    }
312}