1use std::{
2 env::VarError,
3 path::Path,
4 str::FromStr,
5 sync::{Arc, RwLock},
6 time::Duration,
7};
8
9use anyhow::{Result, anyhow, ensure};
10use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
11use iroh_metrics::{Registry, encoding::Encoder};
12use irpc_iroh::IrohLazyRemoteConnection;
13use n0_error::StackResultExt;
14use n0_future::task::AbortOnDropHandle;
15use rcan::Rcan;
16use tracing::{debug, trace, warn};
17use uuid::Uuid;
18
19use crate::{
20 caps::Caps,
21 protocol::{ALPN, Auth, N0desClient, Ping, PutMetrics, RemoteError},
22 ticket::N0desTicket,
23};
24
25#[derive(Debug)]
26pub struct Client {
27 client: N0desClient,
28 _metrics_task: Option<AbortOnDropHandle<()>>,
29}
30
31impl Drop for Client {
32 fn drop(&mut self) {
33 debug!("n0des client is being dropped");
34 }
35}
36
37pub struct ClientBuilder {
39 cap_expiry: Duration,
40 cap: Option<Rcan<Caps>>,
41 endpoint: Endpoint,
42 enable_metrics: Option<Duration>,
43 remote: Option<EndpointAddr>,
44}
45
46const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); impl ClientBuilder {
49 pub fn new(endpoint: &Endpoint) -> Self {
50 Self {
51 cap: None,
52 cap_expiry: DEFAULT_CAP_EXPIRY,
53 endpoint: endpoint.clone(),
54 enable_metrics: Some(Duration::from_secs(10)),
55 remote: None,
56 }
57 }
58
59 pub fn metrics_interval(mut self, interval: Duration) -> Self {
63 self.enable_metrics = Some(interval);
64 self
65 }
66
67 pub fn disable_metrics(mut self) -> Self {
69 self.enable_metrics = None;
70 self
71 }
72
73 pub fn secret_from_env(self) -> Result<Self> {
75 match std::env::var("N0DES_SECRET") {
76 Ok(ticket_string) => {
77 let ticket =
78 N0desTicket::from_str(&ticket_string).context("invalid N0DES_SECRET")?;
79 self.ticket(ticket)
80 }
81 Err(VarError::NotPresent) => {
82 Err(anyhow!("N0DES_SECRET environment variable is not set"))
83 }
84 Err(VarError::NotUnicode(e)) => Err(anyhow!(
85 "N0DES_SECRET environment variable is not valid unicode: {:?}",
86 e
87 )),
88 }
89 }
90
91 pub fn ticket(mut self, ticket: N0desTicket) -> Result<Self> {
95 let local_id = self.endpoint.id();
96 let rcan = crate::caps::create_api_token_from_secret_key(
97 ticket.secret,
98 local_id,
99 self.cap_expiry,
100 Caps::for_shared_secret(),
101 )?;
102
103 self.remote = Some(ticket.remote);
104 self.rcan(rcan)
105 }
106
107 pub async fn ssh_key_from_file<P: AsRef<Path>>(self, path: P) -> Result<Self> {
109 let file_content = tokio::fs::read_to_string(path).await?;
110 let private_key = ssh_key::PrivateKey::from_openssh(&file_content)?;
111
112 self.ssh_key(&private_key)
113 }
114
115 pub fn ssh_key(mut self, key: &ssh_key::PrivateKey) -> Result<Self> {
117 let local_id = self.endpoint.id();
118 let rcan = crate::caps::create_api_token(key, local_id, self.cap_expiry, Caps::all())?;
119 self.cap.replace(rcan);
120
121 Ok(self)
122 }
123
124 pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
126 ensure!(
127 EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
128 "invalid audience"
129 );
130 self.cap.replace(cap);
131 Ok(self)
132 }
133
134 pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
137 self.remote = Some(remote.into());
138 self
139 }
140
141 pub async fn build(self) -> Result<Client, BuildError> {
143 debug!("starting iroh-n0des client");
144 let remote = self.remote.ok_or(BuildError::MissingRemote)?;
145 let cap = self.cap.ok_or(BuildError::MissingCapability)?;
146
147 let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
148 let client = N0desClient::boxed(conn);
149
150 let () = client.rpc(Auth { caps: cap }).await?;
152
153 let metrics_task = self.enable_metrics.map(|interval| {
154 debug!(interval = ?interval, "starting metrics task");
155 AbortOnDropHandle::new(n0_future::task::spawn(
156 MetricsTask {
157 client: client.clone(),
158 session_id: Uuid::new_v4(),
159 endpoint: self.endpoint.clone(),
160 }
161 .run(interval),
162 ))
163 });
164
165 Ok(Client {
166 client,
167 _metrics_task: metrics_task,
168 })
169 }
170}
171
172#[derive(thiserror::Error, Debug)]
173pub enum BuildError {
174 #[error("Missing remote endpoint to dial")]
175 MissingRemote,
176 #[error("Missing capability")]
177 MissingCapability,
178 #[error("Unauthorized")]
179 Unauthorized,
180 #[error("Remote error: {0}")]
181 Remote(#[from] RemoteError),
182 #[error("Rpc connection error: {0}")]
183 Rpc(irpc::Error),
184 #[error("Connection error: {0}")]
185 Connect(ConnectError),
186}
187
188impl From<irpc::Error> for BuildError {
189 fn from(value: irpc::Error) -> Self {
190 match value {
191 irpc::Error::Request {
192 source:
193 irpc::RequestError::Connection {
194 source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
195 ..
196 },
197 ..
198 } if frame.error_code == 401u32.into() => Self::Unauthorized,
199 value => Self::Rpc(value),
200 }
201 }
202}
203
204#[derive(thiserror::Error, Debug)]
205pub enum Error {
206 #[error("Remote error: {0}")]
207 Remote(#[from] RemoteError),
208 #[error("Connection error: {0}")]
209 Rpc(#[from] irpc::Error),
210 #[error(transparent)]
211 Other(#[from] anyhow::Error),
212}
213
214impl Client {
215 pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
216 ClientBuilder::new(endpoint)
217 }
218
219 pub async fn ping(&mut self) -> Result<(), Error> {
221 let req = rand::random();
222 let pong = self.client.rpc(Ping { req }).await?;
223 if pong.req == req {
224 Ok(())
225 } else {
226 Err(Error::Other(anyhow!("unexpected pong response")))
227 }
228 }
229}
230
231struct MetricsTask {
232 client: N0desClient,
233 session_id: Uuid,
234 endpoint: Endpoint,
235}
236
237impl MetricsTask {
238 async fn run(self, interval: Duration) {
239 let mut registry = Registry::default();
240 registry.register_all(self.endpoint.metrics());
241 let registry = Arc::new(RwLock::new(registry));
242 let mut encoder = Encoder::new(registry);
243
244 let mut metrics_timer = tokio::time::interval(interval);
245
246 loop {
247 metrics_timer.tick().await;
248 trace!("metrics send tick");
249 if let Err(err) = self.send_metrics(&mut encoder).await {
250 warn!("failed to push metrics: {:#?}", err);
251 }
252 }
253 }
254
255 async fn send_metrics(&self, encoder: &mut Encoder) -> Result<()> {
256 let update = encoder.export();
257 let req = PutMetrics {
258 session_id: self.session_id,
259 update,
260 };
261
262 self.client
263 .rpc(req)
264 .await
265 .inspect_err(|e| warn!("rpc send metrics error: {e}"))?
266 .inspect_err(|e| warn!("metrics server response error: {e}"))?;
267 trace!("sent metrics");
268 Ok(())
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use iroh::{Endpoint, EndpointAddr, SecretKey};
275
276 use crate::{Client, caps::Caps, ticket::N0desTicket};
277
278 #[tokio::test]
279 async fn test_builder_from_env() {
280 let mut rng = rand::rng();
282 let shared_secret = SecretKey::generate(&mut rng);
283 let fake_endpoint_id = SecretKey::generate(&mut rng).public();
284 let n0des_ticket = N0desTicket::new(shared_secret.clone(), fake_endpoint_id);
285 unsafe {
286 std::env::set_var("N0DES_SECRET", n0des_ticket.to_string());
287 };
288
289 let endpoint = Endpoint::empty_builder(iroh::RelayMode::Disabled)
290 .bind()
291 .await
292 .unwrap();
293
294 let builder = Client::builder(&endpoint).secret_from_env().unwrap();
295
296 let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
297 assert_eq!(builder.remote, Some(fake_endpoint_addr));
298
299 let rcan = crate::caps::create_api_token_from_secret_key(
300 shared_secret,
301 endpoint.id(),
302 builder.cap_expiry,
303 Caps::for_shared_secret(),
304 )
305 .unwrap();
306 assert_eq!(builder.cap, Some(rcan));
307
308 unsafe {
309 std::env::remove_var("N0DES_SECRET");
310 };
311 }
312}