1use std::{
2 str::FromStr,
3 sync::{Arc, RwLock},
4};
5
6use anyhow::{Result, anyhow, ensure};
7use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
8use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder};
9use irpc_iroh::IrohLazyRemoteConnection;
10use n0_error::StackResultExt;
11use n0_future::{task::AbortOnDropHandle, time::Duration};
12use rcan::Rcan;
13use tokio::sync::oneshot;
14use tracing::{debug, trace, warn};
15use uuid::Uuid;
16
17#[cfg(feature = "net_diagnostics")]
18use crate::net_diagnostics::{DiagnosticsReport, checks::run_diagnostics};
19#[cfg(feature = "net_diagnostics")]
20use crate::protocol::PutNetworkDiagnostics;
21use crate::{
22 api_secret::ApiSecret,
23 caps::Caps,
24 protocol::{ALPN, Auth, IrohServicesClient, Ping, Pong, PutMetrics, RemoteError},
25};
26
27#[derive(Debug, Clone)]
50pub struct Client {
51 #[allow(dead_code)]
53 endpoint: Endpoint,
54 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
55 _actor_task: Arc<AbortOnDropHandle<()>>,
56}
57
58pub struct ClientBuilder {
61 #[allow(dead_code)]
62 cap_expiry: Duration,
63 cap: Option<Rcan<Caps>>,
64 endpoint: Endpoint,
65 metrics_interval: Option<Duration>,
66 remote: Option<EndpointAddr>,
67 registry: Registry,
68}
69
70const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET";
72
73impl ClientBuilder {
74 pub fn new(endpoint: &Endpoint) -> Self {
75 let mut registry = Registry::default();
76 registry.register_all(endpoint.metrics());
77
78 Self {
79 cap: None,
80 cap_expiry: DEFAULT_CAP_EXPIRY,
81 endpoint: endpoint.clone(),
82 metrics_interval: Some(Duration::from_secs(60)),
83 remote: None,
84 registry,
85 }
86 }
87
88 pub fn register_metrics_group(mut self, metrics_group: Arc<dyn MetricsGroup>) -> Self {
92 self.registry.register(metrics_group);
93 self
94 }
95
96 pub fn metrics_interval(mut self, interval: Duration) -> Self {
100 self.metrics_interval = Some(interval);
101 self
102 }
103
104 pub fn disable_metrics_interval(mut self) -> Self {
106 self.metrics_interval = None;
107 self
108 }
109
110 pub fn api_secret_from_env(self) -> Result<Self> {
112 let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
113 self.api_secret(ticket)
114 }
115
116 pub fn api_secret_from_str(self, secret_key: &str) -> Result<Self> {
118 let key = ApiSecret::from_str(secret_key).context("invalid iroh services api secret")?;
119 self.api_secret(key)
120 }
121
122 pub fn api_secret(mut self, ticket: ApiSecret) -> Result<Self> {
129 let local_id = self.endpoint.id();
130 let rcan = crate::caps::create_api_token_from_secret_key(
131 ticket.secret,
132 local_id,
133 self.cap_expiry,
134 Caps::for_shared_secret(),
135 )?;
136
137 self.remote = Some(ticket.remote);
138 self.rcan(rcan)
139 }
140
141 #[cfg(feature = "ssh-key")]
143 pub async fn ssh_key_from_file<P: AsRef<std::path::Path>>(self, path: P) -> Result<Self> {
144 let file_content = tokio::fs::read_to_string(path).await?;
145 let private_key = ssh_key::PrivateKey::from_openssh(&file_content)?;
146
147 self.ssh_key(&private_key)
148 }
149
150 #[cfg(feature = "ssh-key")]
152 pub fn ssh_key(mut self, key: &ssh_key::PrivateKey) -> Result<Self> {
153 let local_id = self.endpoint.id();
154 let rcan = crate::caps::create_api_token_from_ssh_key(
155 key,
156 local_id,
157 self.cap_expiry,
158 Caps::all(),
159 )?;
160 self.cap.replace(rcan);
161
162 Ok(self)
163 }
164
165 pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
167 ensure!(
168 EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
169 "invalid audience"
170 );
171 self.cap.replace(cap);
172 Ok(self)
173 }
174
175 pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
178 self.remote = Some(remote.into());
179 self
180 }
181
182 #[must_use = "dropping the client will silently cancel all client tasks"]
184 pub async fn build(self) -> Result<Client, BuildError> {
185 debug!("starting iroh-services client");
186 let remote = self.remote.ok_or(BuildError::MissingRemote)?;
187 let capabilities = self.cap.ok_or(BuildError::MissingCapability)?;
188
189 let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
190 let client = IrohServicesClient::boxed(conn);
191
192 let (tx, rx) = tokio::sync::mpsc::channel(1);
193 let metrics_task = AbortOnDropHandle::new(n0_future::task::spawn(
194 ClientActor {
195 capabilities,
196 client,
197 session_id: Uuid::new_v4(),
198 authorized: false,
199 }
200 .run(self.registry, self.metrics_interval, rx),
201 ));
202
203 Ok(Client {
204 endpoint: self.endpoint,
205 message_channel: tx,
206 _actor_task: Arc::new(metrics_task),
207 })
208 }
209}
210
211#[derive(thiserror::Error, Debug)]
212pub enum BuildError {
213 #[error("Missing remote endpoint to dial")]
214 MissingRemote,
215 #[error("Missing capability")]
216 MissingCapability,
217 #[error("Unauthorized")]
218 Unauthorized,
219 #[error("Remote error: {0}")]
220 Remote(#[from] RemoteError),
221 #[error("Rpc connection error: {0}")]
222 Rpc(irpc::Error),
223 #[error("Connection error: {0}")]
224 Connect(ConnectError),
225}
226
227impl From<irpc::Error> for BuildError {
228 fn from(value: irpc::Error) -> Self {
229 match value {
230 irpc::Error::Request {
231 source:
232 irpc::RequestError::Connection {
233 source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
234 ..
235 },
236 ..
237 } if frame.error_code == 401u32.into() => Self::Unauthorized,
238 value => Self::Rpc(value),
239 }
240 }
241}
242
243#[derive(thiserror::Error, Debug)]
244pub enum Error {
245 #[error("Remote error: {0}")]
246 Remote(#[from] RemoteError),
247 #[error("Connection error: {0}")]
248 Rpc(#[from] irpc::Error),
249 #[error(transparent)]
250 Other(#[from] anyhow::Error),
251}
252
253impl Client {
254 pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
255 ClientBuilder::new(endpoint)
256 }
257
258 pub async fn ping(&self) -> Result<Pong, Error> {
260 let (tx, rx) = oneshot::channel();
261 self.message_channel
262 .send(ClientActorMessage::Ping { done: tx })
263 .await
264 .map_err(|_| Error::Other(anyhow!("sending ping request")))?;
265
266 rx.await
267 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
268 .map_err(Error::Remote)
269 }
270
271 pub async fn push_metrics(&self) -> Result<(), Error> {
275 let (tx, rx) = oneshot::channel();
276 self.message_channel
277 .send(ClientActorMessage::SendMetrics { done: tx })
278 .await
279 .map_err(|_| Error::Other(anyhow!("sending metrics")))?;
280
281 rx.await
282 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
283 .map_err(Error::Remote)
284 }
285
286 #[cfg(feature = "client_host")]
290 pub async fn grant_capability(
291 &self,
292 remote_id: EndpointId,
293 caps: impl IntoIterator<Item = impl Into<crate::caps::Cap>>,
294 ) -> Result<(), Error> {
295 let cap = crate::caps::create_grant_token(
296 self.endpoint.secret_key().clone(),
297 remote_id,
298 DEFAULT_CAP_EXPIRY,
299 Caps::new(caps),
300 )
301 .map_err(Error::Other)?;
302
303 let (tx, rx) = oneshot::channel();
304 self.message_channel
305 .send(ClientActorMessage::GrantCap {
306 cap: Box::new(cap),
307 done: tx,
308 })
309 .await
310 .map_err(|_| Error::Other(anyhow!("granting capability")))?;
311
312 rx.await
313 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
314 }
315
316 #[cfg(feature = "net_diagnostics")]
318 pub async fn net_diagnostics(&self, send: bool) -> Result<DiagnosticsReport, Error> {
319 let report = run_diagnostics(&self.endpoint).await?;
320 if send {
321 let (tx, rx) = oneshot::channel();
322 self.message_channel
323 .send(ClientActorMessage::PutNetworkDiagnostics {
324 done: tx,
325 report: Box::new(report.clone()),
326 })
327 .await
328 .map_err(|_| Error::Other(anyhow!("sending network diagnostics report")))?;
329
330 let _ = rx
331 .await
332 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
333 }
334
335 Ok(report)
336 }
337}
338
339enum ClientActorMessage {
340 SendMetrics {
341 done: oneshot::Sender<Result<(), RemoteError>>,
342 },
343 Ping {
344 done: oneshot::Sender<Result<Pong, RemoteError>>,
345 },
346 #[allow(dead_code)]
348 GrantCap {
349 cap: Box<Rcan<Caps>>,
351 done: oneshot::Sender<Result<(), Error>>,
352 },
353 #[cfg(feature = "net_diagnostics")]
354 PutNetworkDiagnostics {
355 report: Box<DiagnosticsReport>,
356 done: oneshot::Sender<Result<(), Error>>,
357 },
358}
359
360struct ClientActor {
361 capabilities: Rcan<Caps>,
362 client: IrohServicesClient,
363 session_id: Uuid,
364 authorized: bool,
365}
366
367impl ClientActor {
368 async fn run(
369 mut self,
370 registry: Registry,
371 interval: Option<Duration>,
372 mut inbox: tokio::sync::mpsc::Receiver<ClientActorMessage>,
373 ) {
374 let registry = Arc::new(RwLock::new(registry));
375 let mut encoder = Encoder::new(registry);
376 let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
377 trace!("starting client actor");
378 loop {
379 trace!("client actor tick");
380 tokio::select! {
381 biased;
382 Some(msg) = inbox.recv() => {
383 match msg {
384 ClientActorMessage::Ping{ done } => {
385 let res = self.send_ping().await;
386 if let Err(err) = done.send(res) {
387 debug!("failed to send ping: {:#?}", err);
388 self.authorized = false;
389 }
390 },
391 ClientActorMessage::SendMetrics{ done } => {
392 trace!("sending metrics manually triggered");
393 let res = self.send_metrics(&mut encoder).await;
394 if let Err(err) = done.send(res) {
395 debug!("failed to push metrics: {:#?}", err);
396 self.authorized = false;
397 }
398 }
399 ClientActorMessage::GrantCap{ cap, done } => {
400 let res = self.grant_cap(*cap).await;
401 if let Err(err) = done.send(res) {
402 warn!("failed to grant capability: {:#?}", err);
403 }
404 }
405 #[cfg(feature = "net_diagnostics")]
406 ClientActorMessage::PutNetworkDiagnostics{ report, done } => {
407 let res = self.put_network_diagnostics(*report).await;
408 if let Err(err) = done.send(res) {
409 warn!("failed to publish network diagnostics: {:#?}", err);
410 }
411 }
412 }
413 }
414 _ = async {
415 if let Some(ref mut timer) = metrics_timer {
416 timer.tick().await;
417 } else {
418 std::future::pending::<()>().await;
419 }
420 } => {
421 trace!("metrics send tick");
422 if let Err(err) = self.send_metrics(&mut encoder).await {
423 debug!("failed to push metrics: {:#?}", err);
424 self.authorized = false;
425 }
426 },
427 }
428 }
429 }
430
431 async fn auth(&mut self) -> Result<(), RemoteError> {
433 if self.authorized {
434 return Ok(());
435 }
436 trace!("client authorizing");
437 self.client
438 .rpc(Auth {
439 caps: self.capabilities.clone(),
440 })
441 .await
442 .inspect_err(|e| debug!("authorization failed: {:?}", e))
443 .map_err(|e| RemoteError::AuthError(e.to_string()))?;
444 self.authorized = true;
445 Ok(())
446 }
447
448 async fn send_ping(&mut self) -> Result<Pong, RemoteError> {
449 trace!("client actor send ping");
450 self.auth().await?;
451
452 let req = rand::random();
453 self.client
454 .rpc(Ping { req_id: req })
455 .await
456 .inspect_err(|e| warn!("rpc ping error: {e}"))
457 .map_err(|_| RemoteError::InternalServerError)
458 }
459
460 async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> {
461 trace!("client actor send metrics");
462 self.auth().await?;
463
464 let update = encoder.export();
465 let req = PutMetrics {
467 session_id: self.session_id,
468 update,
469 };
470
471 self.client
472 .rpc(req)
473 .await
474 .map_err(|_| RemoteError::InternalServerError)??;
475
476 Ok(())
477 }
478
479 async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
480 trace!("client actor grant capability");
481 self.auth().await?;
482
483 self.client
484 .rpc(crate::protocol::GrantCap { cap })
485 .await
486 .map_err(|_| RemoteError::InternalServerError)??;
487
488 Ok(())
489 }
490
491 #[cfg(feature = "net_diagnostics")]
492 async fn put_network_diagnostics(
493 &mut self,
494 report: crate::net_diagnostics::DiagnosticsReport,
495 ) -> Result<(), Error> {
496 trace!("client actor publish network diagnostics");
497 self.auth().await?;
498
499 let req = PutNetworkDiagnostics { report };
500
501 self.client
502 .rpc(req)
503 .await
504 .map_err(|_| RemoteError::InternalServerError)??;
505
506 Ok(())
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use iroh::{Endpoint, EndpointAddr, SecretKey};
513 use temp_env_vars::temp_env_vars;
514
515 use crate::{
516 Client,
517 api_secret::ApiSecret,
518 caps::{Cap, Caps},
519 client::API_SECRET_ENV_VAR_NAME,
520 };
521
522 #[tokio::test]
523 #[temp_env_vars]
524 async fn test_api_key_from_env() {
525 let mut rng = rand::rng();
527 let shared_secret = SecretKey::generate(&mut rng);
528 let fake_endpoint_id = SecretKey::generate(&mut rng).public();
529 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
530 unsafe {
531 std::env::set_var(API_SECRET_ENV_VAR_NAME, api_secret.to_string());
532 };
533
534 let endpoint = Endpoint::empty_builder(iroh::RelayMode::Disabled)
535 .bind()
536 .await
537 .unwrap();
538
539 let builder = Client::builder(&endpoint).api_secret_from_env().unwrap();
540
541 let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
542 assert_eq!(builder.remote, Some(fake_endpoint_addr));
543
544 let cap = builder.cap.as_ref().expect("expected capability to be set");
547 assert_eq!(cap.capability(), &Caps::new([Cap::Client]));
548 assert_eq!(cap.audience(), &endpoint.id().as_verifying_key());
549 assert_eq!(cap.issuer(), &shared_secret.public().as_verifying_key());
550 }
551
552 #[tokio::test]
555 async fn test_no_metrics_interval() {
556 let mut rng = rand::rng();
557 let shared_secret = SecretKey::generate(&mut rng);
558 let fake_endpoint_id = SecretKey::generate(&mut rng).public();
559 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
560
561 let endpoint = Endpoint::empty_builder(iroh::RelayMode::Disabled)
562 .bind()
563 .await
564 .unwrap();
565
566 let client = Client::builder(&endpoint)
567 .disable_metrics_interval()
568 .api_secret(api_secret)
569 .unwrap()
570 .build()
571 .await
572 .unwrap();
573
574 let err = client.push_metrics().await;
575 assert!(err.is_err());
576 }
577}