1use std::{
2 str::FromStr,
3 sync::{Arc, RwLock},
4};
5
6use anyhow::{Result, anyhow, ensure};
7use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
8use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder};
9use irpc_iroh::IrohLazyRemoteConnection;
10use n0_error::StackResultExt;
11use n0_future::{task::AbortOnDropHandle, time::Duration};
12use rcan::Rcan;
13use tokio::sync::oneshot;
14use tracing::{debug, trace, warn};
15use uuid::Uuid;
16
17#[cfg(feature = "net_diagnostics")]
18use crate::net_diagnostics::{DiagnosticsReport, checks::run_diagnostics};
19#[cfg(feature = "net_diagnostics")]
20use crate::protocol::PutNetworkDiagnostics;
21use crate::{
22 alerts::LogMonitor,
23 api_secret::ApiSecret,
24 caps::Caps,
25 protocol::{
26 ALPN, AlertInfo, Auth, IrohServicesClient, Ping, Pong, PutMetrics, RemoteError, SendAlert,
27 },
28};
29
30#[derive(Debug, Clone)]
54pub struct Client {
55 #[allow(dead_code)]
57 endpoint: Endpoint,
58 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
59 _actor_task: Arc<AbortOnDropHandle<()>>,
60}
61
62pub struct ClientBuilder {
65 #[allow(dead_code)]
66 cap_expiry: Duration,
67 cap: Option<Rcan<Caps>>,
68 endpoint: Endpoint,
69 metrics_interval: Option<Duration>,
70 remote: Option<EndpointAddr>,
71 registry: Registry,
72}
73
74const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET";
76
77impl ClientBuilder {
78 pub fn new(endpoint: &Endpoint) -> Self {
79 let mut registry = Registry::default();
80 registry.register_all(endpoint.metrics());
81
82 Self {
83 cap: None,
84 cap_expiry: DEFAULT_CAP_EXPIRY,
85 endpoint: endpoint.clone(),
86 metrics_interval: Some(Duration::from_secs(60)),
87 remote: None,
88 registry,
89 }
90 }
91
92 pub fn register_metrics_group(mut self, metrics_group: Arc<dyn MetricsGroup>) -> Self {
96 self.registry.register(metrics_group);
97 self
98 }
99
100 pub fn metrics_interval(mut self, interval: Duration) -> Self {
104 self.metrics_interval = Some(interval);
105 self
106 }
107
108 pub fn disable_metrics_interval(mut self) -> Self {
110 self.metrics_interval = None;
111 self
112 }
113
114 pub fn api_secret_from_env(self) -> Result<Self> {
116 let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
117 self.api_secret(ticket)
118 }
119
120 pub fn api_secret_from_str(self, secret_key: &str) -> Result<Self> {
122 let key = ApiSecret::from_str(secret_key).context("invalid iroh services api secret")?;
123 self.api_secret(key)
124 }
125
126 pub fn api_secret(mut self, ticket: ApiSecret) -> Result<Self> {
133 let local_id = self.endpoint.id();
134 let rcan = crate::caps::create_api_token_from_secret_key(
135 ticket.secret,
136 local_id,
137 self.cap_expiry,
138 Caps::for_shared_secret(),
139 )?;
140
141 self.remote = Some(ticket.remote);
142 self.rcan(rcan)
143 }
144
145 #[cfg(feature = "ssh-key")]
147 pub async fn ssh_key_from_file<P: AsRef<std::path::Path>>(self, path: P) -> Result<Self> {
148 let file_content = tokio::fs::read_to_string(path).await?;
149 let private_key = ssh_key::PrivateKey::from_openssh(&file_content)?;
150
151 self.ssh_key(&private_key)
152 }
153
154 #[cfg(feature = "ssh-key")]
156 pub fn ssh_key(mut self, key: &ssh_key::PrivateKey) -> Result<Self> {
157 let local_id = self.endpoint.id();
158 let rcan = crate::caps::create_api_token_from_ssh_key(
159 key,
160 local_id,
161 self.cap_expiry,
162 Caps::all(),
163 )?;
164 self.cap.replace(rcan);
165
166 Ok(self)
167 }
168
169 pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
171 ensure!(
172 EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
173 "invalid audience"
174 );
175 self.cap.replace(cap);
176 Ok(self)
177 }
178
179 pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
182 self.remote = Some(remote.into());
183 self
184 }
185
186 #[must_use = "dropping the client will silently cancel all client tasks"]
188 pub async fn build(self) -> Result<Client, BuildError> {
189 debug!("starting iroh-services client");
190 let remote = self.remote.ok_or(BuildError::MissingRemote)?;
191 let capabilities = self.cap.ok_or(BuildError::MissingCapability)?;
192
193 let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
194 let client = IrohServicesClient::boxed(conn);
195
196 let (tx, rx) = tokio::sync::mpsc::channel(1);
197 let metrics_task = AbortOnDropHandle::new(n0_future::task::spawn(
198 ClientActor {
199 capabilities,
200 client,
201 session_id: Uuid::new_v4(),
202 authorized: false,
203 }
204 .run(self.registry, self.metrics_interval, rx),
205 ));
206
207 Ok(Client {
208 endpoint: self.endpoint,
209 message_channel: tx,
210 _actor_task: Arc::new(metrics_task),
211 })
212 }
213}
214
215#[derive(thiserror::Error, Debug)]
216pub enum BuildError {
217 #[error("Missing remote endpoint to dial")]
218 MissingRemote,
219 #[error("Missing capability")]
220 MissingCapability,
221 #[error("Unauthorized")]
222 Unauthorized,
223 #[error("Remote error: {0}")]
224 Remote(#[from] RemoteError),
225 #[error("Rpc connection error: {0}")]
226 Rpc(irpc::Error),
227 #[error("Connection error: {0}")]
228 Connect(ConnectError),
229}
230
231impl From<irpc::Error> for BuildError {
232 fn from(value: irpc::Error) -> Self {
233 match value {
234 irpc::Error::Request {
235 source:
236 irpc::RequestError::Connection {
237 source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
238 ..
239 },
240 ..
241 } if frame.error_code == 401u32.into() => Self::Unauthorized,
242 value => Self::Rpc(value),
243 }
244 }
245}
246
247#[derive(thiserror::Error, Debug)]
248pub enum Error {
249 #[error("Remote error: {0}")]
250 Remote(#[from] RemoteError),
251 #[error("Connection error: {0}")]
252 Rpc(#[from] irpc::Error),
253 #[error(transparent)]
254 Other(#[from] anyhow::Error),
255}
256
257impl Client {
258 pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
259 ClientBuilder::new(endpoint)
260 }
261
262 pub async fn ping(&self) -> Result<Pong, Error> {
264 let (tx, rx) = oneshot::channel();
265 self.message_channel
266 .send(ClientActorMessage::Ping { done: tx })
267 .await
268 .map_err(|_| Error::Other(anyhow!("sending ping request")))?;
269
270 rx.await
271 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
272 .map_err(Error::Remote)
273 }
274
275 pub async fn push_metrics(&self) -> Result<(), Error> {
279 let (tx, rx) = oneshot::channel();
280 self.message_channel
281 .send(ClientActorMessage::SendMetrics { done: tx })
282 .await
283 .map_err(|_| Error::Other(anyhow!("sending metrics")))?;
284
285 rx.await
286 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
287 .map_err(Error::Remote)
288 }
289
290 pub async fn enable_alerts(&self) -> Result<LogMonitor, Error> {
308 let (tx, rx) = tokio::sync::mpsc::channel(64);
309 let (done_tx, done_rx) = oneshot::channel();
310 self.message_channel
311 .send(ClientActorMessage::EnableAlerts { rx, done: done_tx })
312 .await
313 .map_err(|_| Error::Other(anyhow!("enabling alerts")))?;
314 done_rx
315 .await
316 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
317 Ok(LogMonitor::new(tx))
318 }
319
320 #[cfg(feature = "client_host")]
324 pub async fn grant_capability(
325 &self,
326 remote_id: EndpointId,
327 caps: impl IntoIterator<Item = impl Into<crate::caps::Cap>>,
328 ) -> Result<(), Error> {
329 let cap = crate::caps::create_grant_token(
330 self.endpoint.secret_key().clone(),
331 remote_id,
332 DEFAULT_CAP_EXPIRY,
333 Caps::new(caps),
334 )
335 .map_err(Error::Other)?;
336
337 let (tx, rx) = oneshot::channel();
338 self.message_channel
339 .send(ClientActorMessage::GrantCap {
340 cap: Box::new(cap),
341 done: tx,
342 })
343 .await
344 .map_err(|_| Error::Other(anyhow!("granting capability")))?;
345
346 rx.await
347 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
348 }
349
350 #[cfg(feature = "net_diagnostics")]
352 pub async fn net_diagnostics(&self, send: bool) -> Result<DiagnosticsReport, Error> {
353 let report = run_diagnostics(&self.endpoint).await?;
354 if send {
355 let (tx, rx) = oneshot::channel();
356 self.message_channel
357 .send(ClientActorMessage::PutNetworkDiagnostics {
358 done: tx,
359 report: Box::new(report.clone()),
360 })
361 .await
362 .map_err(|_| Error::Other(anyhow!("sending network diagnostics report")))?;
363
364 let _ = rx
365 .await
366 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
367 }
368
369 Ok(report)
370 }
371}
372
373enum ClientActorMessage {
374 SendMetrics {
375 done: oneshot::Sender<Result<(), RemoteError>>,
376 },
377 Ping {
378 done: oneshot::Sender<Result<Pong, RemoteError>>,
379 },
380 EnableAlerts {
381 rx: tokio::sync::mpsc::Receiver<AlertInfo>,
382 done: oneshot::Sender<()>,
383 },
384 #[allow(dead_code)]
386 GrantCap {
387 cap: Box<Rcan<Caps>>,
389 done: oneshot::Sender<Result<(), Error>>,
390 },
391 #[cfg(feature = "net_diagnostics")]
392 PutNetworkDiagnostics {
393 report: Box<DiagnosticsReport>,
394 done: oneshot::Sender<Result<(), Error>>,
395 },
396}
397
398struct ClientActor {
399 capabilities: Rcan<Caps>,
400 client: IrohServicesClient,
401 session_id: Uuid,
402 authorized: bool,
403}
404
405impl ClientActor {
406 async fn run(
407 mut self,
408 registry: Registry,
409 interval: Option<Duration>,
410 mut inbox: tokio::sync::mpsc::Receiver<ClientActorMessage>,
411 ) {
412 let registry = Arc::new(RwLock::new(registry));
413 let mut encoder = Encoder::new(registry);
414 let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
415 let mut alert_rx: Option<tokio::sync::mpsc::Receiver<AlertInfo>> = None;
416 trace!("starting client actor");
417 loop {
418 trace!("client actor tick");
419 tokio::select! {
420 biased;
421 Some(msg) = inbox.recv() => {
422 match msg {
423 ClientActorMessage::Ping{ done } => {
424 let res = self.send_ping().await;
425 if let Err(err) = done.send(res) {
426 debug!("failed to send ping: {:#?}", err);
427 self.authorized = false;
428 }
429 },
430 ClientActorMessage::EnableAlerts{ rx, done } => {
431 alert_rx = Some(rx);
432 let _ = done.send(());
433 }
434 ClientActorMessage::SendMetrics{ done } => {
435 trace!("sending metrics manually triggered");
436 let res = self.send_metrics(&mut encoder).await;
437 if let Err(err) = done.send(res) {
438 debug!("failed to push metrics: {:#?}", err);
439 self.authorized = false;
440 }
441 }
442 ClientActorMessage::GrantCap{ cap, done } => {
443 let res = self.grant_cap(*cap).await;
444 if let Err(err) = done.send(res) {
445 warn!("failed to grant capability: {:#?}", err);
446 }
447 }
448 #[cfg(feature = "net_diagnostics")]
449 ClientActorMessage::PutNetworkDiagnostics{ report, done } => {
450 let res = self.put_network_diagnostics(*report).await;
451 if let Err(err) = done.send(res) {
452 warn!("failed to publish network diagnostics: {:#?}", err);
453 }
454 }
455 }
456 }
457 _ = async {
458 if let Some(ref mut timer) = metrics_timer {
459 timer.tick().await;
460 } else {
461 std::future::pending::<()>().await;
462 }
463 } => {
464 trace!("metrics send tick");
465 if let Err(err) = self.send_metrics(&mut encoder).await {
466 debug!("failed to push metrics: {:#?}", err);
467 self.authorized = false;
468 }
469 },
470 Some(alert) = async {
471 if let Some(ref mut rx) = alert_rx {
472 rx.recv().await
473 } else {
474 std::future::pending::<Option<AlertInfo>>().await
475 }
476 } => {
477 if let Err(err) = self.send_alert(alert).await {
478 debug!("failed to send alert: {:#?}", err);
479 self.authorized = false;
480 }
481 },
482 }
483 }
484 }
485
486 async fn auth(&mut self) -> Result<(), RemoteError> {
488 if self.authorized {
489 return Ok(());
490 }
491 trace!("client authorizing");
492 self.client
493 .rpc(Auth {
494 caps: self.capabilities.clone(),
495 })
496 .await
497 .inspect_err(|e| debug!("authorization failed: {:?}", e))
498 .map_err(|e| RemoteError::AuthError(e.to_string()))?;
499 self.authorized = true;
500 Ok(())
501 }
502
503 async fn send_ping(&mut self) -> Result<Pong, RemoteError> {
504 trace!("client actor send ping");
505 self.auth().await?;
506
507 let req = rand::random();
508 self.client
509 .rpc(Ping { req_id: req })
510 .await
511 .inspect_err(|e| warn!("rpc ping error: {e}"))
512 .map_err(|_| RemoteError::InternalServerError)
513 }
514
515 async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> {
516 trace!("client actor send metrics");
517 self.auth().await?;
518
519 let update = encoder.export();
520 let req = PutMetrics {
522 session_id: self.session_id,
523 update,
524 };
525
526 self.client
527 .rpc(req)
528 .await
529 .map_err(|_| RemoteError::InternalServerError)??;
530
531 Ok(())
532 }
533
534 async fn send_alert(&mut self, alert: AlertInfo) -> Result<(), RemoteError> {
535 trace!("client actor send alert");
536 self.auth().await?;
537
538 let req = SendAlert {
539 session_id: self.session_id,
540 alert,
541 };
542
543 self.client
544 .rpc(req)
545 .await
546 .map_err(|_| RemoteError::InternalServerError)??;
547
548 Ok(())
549 }
550
551 async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
552 trace!("client actor grant capability");
553 self.auth().await?;
554
555 self.client
556 .rpc(crate::protocol::GrantCap { cap })
557 .await
558 .map_err(|_| RemoteError::InternalServerError)??;
559
560 Ok(())
561 }
562
563 #[cfg(feature = "net_diagnostics")]
564 async fn put_network_diagnostics(
565 &mut self,
566 report: crate::net_diagnostics::DiagnosticsReport,
567 ) -> Result<(), Error> {
568 trace!("client actor publish network diagnostics");
569 self.auth().await?;
570
571 let req = PutNetworkDiagnostics { report };
572
573 self.client
574 .rpc(req)
575 .await
576 .map_err(|_| RemoteError::InternalServerError)??;
577
578 Ok(())
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use iroh::{Endpoint, EndpointAddr, SecretKey};
585 use temp_env_vars::temp_env_vars;
586
587 use crate::{
588 Client,
589 api_secret::ApiSecret,
590 caps::{Cap, Caps},
591 client::API_SECRET_ENV_VAR_NAME,
592 };
593
594 #[tokio::test]
595 #[temp_env_vars]
596 async fn test_api_key_from_env() {
597 let mut rng = rand::rng();
599 let shared_secret = SecretKey::generate(&mut rng);
600 let fake_endpoint_id = SecretKey::generate(&mut rng).public();
601 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
602 unsafe {
603 std::env::set_var(API_SECRET_ENV_VAR_NAME, api_secret.to_string());
604 };
605
606 let endpoint = Endpoint::empty_builder().bind().await.unwrap();
607
608 let builder = Client::builder(&endpoint).api_secret_from_env().unwrap();
609
610 let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
611 assert_eq!(builder.remote, Some(fake_endpoint_addr));
612
613 let cap = builder.cap.as_ref().expect("expected capability to be set");
616 assert_eq!(cap.capability(), &Caps::new([Cap::Client]));
617 assert_eq!(cap.audience(), &endpoint.id().as_verifying_key());
618 assert_eq!(cap.issuer(), &shared_secret.public().as_verifying_key());
619 }
620
621 #[tokio::test]
624 async fn test_no_metrics_interval() {
625 let mut rng = rand::rng();
626 let shared_secret = SecretKey::generate(&mut rng);
627 let fake_endpoint_id = SecretKey::generate(&mut rng).public();
628 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
629
630 let endpoint = Endpoint::empty_builder().bind().await.unwrap();
631
632 let client = Client::builder(&endpoint)
633 .disable_metrics_interval()
634 .api_secret(api_secret)
635 .unwrap()
636 .build()
637 .await
638 .unwrap();
639
640 let err = client.push_metrics().await;
641 assert!(err.is_err());
642 }
643}