1use std::{
2 str::FromStr,
3 sync::{Arc, RwLock},
4};
5
6use anyhow::{Result, anyhow, ensure};
7use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
8use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder};
9use irpc_iroh::IrohLazyRemoteConnection;
10use n0_error::StackResultExt;
11use n0_future::{task::AbortOnDropHandle, time::Duration};
12use rcan::Rcan;
13use tokio::sync::oneshot;
14use tracing::{debug, trace, warn};
15use uuid::Uuid;
16
17use crate::{
18 api_secret::{API_SECRET_ENV_VAR_NAME, ApiSecret},
19 caps::{Caps, DEFAULT_CAP_EXPIRY},
20 net_diagnostics::{DiagnosticsReport, checks::run_diagnostics},
21 protocol::{
22 ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutMetrics,
23 PutNetworkDiagnostics, RemoteError,
24 },
25};
26
27#[derive(Debug, Clone)]
51pub struct Client {
52 #[allow(dead_code)]
54 endpoint: Endpoint,
55 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
56 _actor_task: Arc<AbortOnDropHandle<()>>,
57}
58
59pub struct ClientBuilder {
62 #[allow(dead_code)]
63 cap_expiry: Duration,
64 cap: Option<Rcan<Caps>>,
65 endpoint: Endpoint,
66 name: Option<String>,
67 metrics_interval: Option<Duration>,
68 remote: Option<EndpointAddr>,
69 registry: Registry,
70 #[cfg(not(target_arch = "wasm32"))]
71 log_collector: Option<crate::logs::LogCollector>,
72}
73
74impl ClientBuilder {
75 pub fn new(endpoint: &Endpoint) -> Self {
76 let mut registry = Registry::default();
77 registry.register_all(endpoint.metrics());
78
79 Self {
80 cap: None,
81 cap_expiry: DEFAULT_CAP_EXPIRY,
82 endpoint: endpoint.clone(),
83 name: None,
84 metrics_interval: Some(Duration::from_secs(60)),
85 remote: None,
86 registry,
87 #[cfg(not(target_arch = "wasm32"))]
88 log_collector: None,
89 }
90 }
91
92 #[cfg(not(target_arch = "wasm32"))]
101 pub fn with_log_collector(mut self, collector: crate::logs::LogCollector) -> Self {
102 self.log_collector = Some(collector);
103 self
104 }
105
106 pub fn register_metrics_group(mut self, metrics_group: Arc<dyn MetricsGroup>) -> Self {
110 self.registry.register(metrics_group);
111 self
112 }
113
114 pub fn metrics_interval(mut self, interval: Duration) -> Self {
118 self.metrics_interval = Some(interval);
119 self
120 }
121
122 pub fn disable_metrics_interval(mut self) -> Self {
124 self.metrics_interval = None;
125 self
126 }
127
128 pub fn name(mut self, name: impl Into<String>) -> Result<Self> {
143 let name = name.into();
144 validate_name(&name).map_err(BuildError::InvalidName)?;
145 self.name = Some(name);
146 Ok(self)
147 }
148
149 pub fn api_secret_from_env(self) -> Result<Self> {
151 let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
152 self.api_secret(ticket)
153 }
154
155 pub fn api_secret_from_str(self, secret_key: &str) -> Result<Self> {
157 let key = ApiSecret::from_str(secret_key).context("invalid iroh services api secret")?;
158 self.api_secret(key)
159 }
160
161 pub fn api_secret(mut self, ticket: ApiSecret) -> Result<Self> {
168 let local_id = self.endpoint.id();
169 let rcan = crate::caps::create_api_token_from_secret_key(
170 ticket.secret,
171 local_id,
172 self.cap_expiry,
173 Caps::for_shared_secret(),
174 )?;
175
176 self.remote = Some(ticket.remote);
177 self.rcan(rcan)
178 }
179
180 #[cfg(not(target_arch = "wasm32"))]
184 pub async fn ssh_key_from_file<P: AsRef<std::path::Path>>(self, path: P) -> Result<Self> {
185 let file_content = tokio::fs::read_to_string(path).await?;
186 self.ssh_key(&file_content)
187 }
188
189 #[cfg(not(target_arch = "wasm32"))]
191 pub fn ssh_key(mut self, pem: &str) -> Result<Self> {
192 let local_id = self.endpoint.id();
193 let rcan = crate::caps::create_api_token_from_openssh_pem(
194 pem,
195 local_id,
196 self.cap_expiry,
197 Caps::all(),
198 )?;
199 self.cap.replace(rcan);
200
201 Ok(self)
202 }
203
204 pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
206 ensure!(
207 EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
208 "invalid audience"
209 );
210 self.cap.replace(cap);
211 Ok(self)
212 }
213
214 pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
217 self.remote = Some(remote.into());
218 self
219 }
220
221 #[must_use = "dropping the client will silently cancel all client tasks"]
223 pub async fn build(self) -> Result<Client, BuildError> {
224 debug!("starting iroh-services client");
225 let remote = self.remote.ok_or(BuildError::MissingRemote)?;
226 let capabilities = self.cap.ok_or(BuildError::MissingCapability)?;
227
228 let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
229 let irpc_client = IrohServicesClient::boxed(conn);
230
231 let session_id = Uuid::new_v4();
232 let (tx, rx) = tokio::sync::mpsc::channel(8);
237 let actor_task = AbortOnDropHandle::new(n0_future::task::spawn(
238 ClientActor {
239 capabilities,
240 client: irpc_client,
241 name: self.name.clone(),
242 session_id,
243 authorized: false,
244 #[cfg(not(target_arch = "wasm32"))]
245 log_collector: self.log_collector,
246 }
247 .run(self.name, self.registry, self.metrics_interval, rx),
248 ));
249
250 Ok(Client {
251 endpoint: self.endpoint,
252 message_channel: tx,
253 _actor_task: Arc::new(actor_task),
254 })
255 }
256}
257
258#[derive(thiserror::Error, Debug)]
259pub enum BuildError {
260 #[error("Missing remote endpoint to dial")]
261 MissingRemote,
262 #[error("Missing capability")]
263 MissingCapability,
264 #[error("Unauthorized")]
265 Unauthorized,
266 #[error("Remote error: {0}")]
267 Remote(#[from] RemoteError),
268 #[error("Rpc connection error: {0}")]
269 Rpc(irpc::Error),
270 #[error("Connection error: {0}")]
271 Connect(ConnectError),
272 #[error("Invalid endpoint name: {0}")]
273 InvalidName(#[from] ValidateNameError),
274}
275
276impl From<irpc::Error> for BuildError {
277 fn from(value: irpc::Error) -> Self {
278 match value {
279 irpc::Error::Request {
280 source:
281 irpc::RequestError::Connection {
282 source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
283 ..
284 },
285 ..
286 } if frame.error_code == 401u32.into() => Self::Unauthorized,
287 value => Self::Rpc(value),
288 }
289 }
290}
291
292pub const CLIENT_NAME_MIN_LENGTH: usize = 2;
294pub const CLIENT_NAME_MAX_LENGTH: usize = 128;
296
297#[derive(Debug, thiserror::Error)]
299pub enum ValidateNameError {
300 #[error("Name is too long (must be no more than {CLIENT_NAME_MAX_LENGTH} characters).")]
301 TooLong,
302 #[error("Name is too short (must be at least {CLIENT_NAME_MIN_LENGTH} characters).")]
303 TooShort,
304}
305
306fn validate_name(name: &str) -> Result<(), ValidateNameError> {
307 if name.len() < CLIENT_NAME_MIN_LENGTH {
308 Err(ValidateNameError::TooShort)
309 } else if name.len() > CLIENT_NAME_MAX_LENGTH {
310 Err(ValidateNameError::TooLong)
311 } else {
312 Ok(())
313 }
314}
315
316#[derive(thiserror::Error, Debug)]
317pub enum Error {
318 #[error("Invalid endpoint name: {0}")]
319 InvalidName(#[from] ValidateNameError),
320 #[error("Remote error: {0}")]
321 Remote(#[from] RemoteError),
322 #[error("Connection error: {0}")]
323 Rpc(#[from] irpc::Error),
324 #[error(transparent)]
325 Other(#[from] anyhow::Error),
326}
327
328impl Client {
329 pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
330 ClientBuilder::new(endpoint)
331 }
332
333 pub async fn name(&self) -> Result<Option<String>, Error> {
335 let (tx, rx) = oneshot::channel();
336 self.message_channel
337 .send(ClientActorMessage::ReadName { done: tx })
338 .await
339 .map_err(|_| Error::Other(anyhow!("sending name read request")))?;
340
341 rx.await
342 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))
343 }
344
345 pub async fn set_name(&self, name: impl Into<String>) -> Result<(), Error> {
350 set_name_inner(self.message_channel.clone(), name.into()).await
351 }
352
353 pub async fn ping(&self) -> Result<Pong, Error> {
355 let (tx, rx) = oneshot::channel();
356 self.message_channel
357 .send(ClientActorMessage::Ping { done: tx })
358 .await
359 .map_err(|_| Error::Other(anyhow!("sending ping request")))?;
360
361 rx.await
362 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
363 .map_err(Error::Remote)
364 }
365
366 pub async fn push_metrics(&self) -> Result<(), Error> {
370 let (tx, rx) = oneshot::channel();
371 self.message_channel
372 .send(ClientActorMessage::SendMetrics { done: tx })
373 .await
374 .map_err(|_| Error::Other(anyhow!("sending metrics")))?;
375
376 rx.await
377 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
378 .map_err(Error::Remote)
379 }
380
381 pub async fn grant_capability(
385 &self,
386 remote_id: EndpointId,
387 caps: impl IntoIterator<Item = impl Into<crate::caps::Cap>>,
388 ) -> Result<(), Error> {
389 let cap = crate::caps::create_grant_token(
390 self.endpoint.secret_key().clone(),
391 remote_id,
392 DEFAULT_CAP_EXPIRY,
393 Caps::new(caps),
394 )
395 .map_err(Error::Other)?;
396
397 let (tx, rx) = oneshot::channel();
398 self.message_channel
399 .send(ClientActorMessage::GrantCap {
400 cap: Box::new(cap),
401 done: tx,
402 })
403 .await
404 .map_err(|_| Error::Other(anyhow!("granting capability")))?;
405
406 rx.await
407 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
408 }
409
410 pub async fn net_diagnostics(&self, send: bool) -> Result<DiagnosticsReport, Error> {
412 let report = run_diagnostics(&self.endpoint).await?;
413 if send {
414 let (tx, rx) = oneshot::channel();
415 self.message_channel
416 .send(ClientActorMessage::PutNetworkDiagnostics {
417 done: tx,
418 report: Box::new(report.clone()),
419 })
420 .await
421 .map_err(|_| Error::Other(anyhow!("sending network diagnostics report")))?;
422
423 let _ = rx
424 .await
425 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
426 }
427
428 Ok(report)
429 }
430}
431
432enum ClientActorMessage {
433 SendMetrics {
434 done: oneshot::Sender<Result<(), RemoteError>>,
435 },
436 Ping {
437 done: oneshot::Sender<Result<Pong, RemoteError>>,
438 },
439 #[allow(dead_code)]
441 GrantCap {
442 cap: Box<Rcan<Caps>>,
444 done: oneshot::Sender<Result<(), Error>>,
445 },
446 PutNetworkDiagnostics {
447 report: Box<DiagnosticsReport>,
448 done: oneshot::Sender<Result<(), Error>>,
449 },
450 ReadName {
451 done: oneshot::Sender<Option<String>>,
452 },
453 NameEndpoint {
454 name: String,
455 done: oneshot::Sender<Result<(), RemoteError>>,
456 },
457}
458
459struct ClientActor {
460 capabilities: Rcan<Caps>,
461 client: IrohServicesClient,
462 name: Option<String>,
463 session_id: Uuid,
464 authorized: bool,
465 #[cfg(not(target_arch = "wasm32"))]
466 log_collector: Option<crate::logs::LogCollector>,
467}
468
469impl ClientActor {
470 async fn run(
471 mut self,
472 initial_name: Option<String>,
473 registry: Registry,
474 interval: Option<Duration>,
475 mut inbox: tokio::sync::mpsc::Receiver<ClientActorMessage>,
476 ) {
477 let registry = Arc::new(RwLock::new(registry));
478 let mut encoder = Encoder::new(registry);
479 let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
480 trace!("starting client actor");
481
482 if let Some(name) = initial_name
483 && let Err(err) = self.send_name_endpoint(name).await
484 {
485 warn!(err = %err, "failed setting endpoint name on startup");
486 }
487
488 loop {
489 trace!("client actor tick");
490 tokio::select! {
491 biased;
492 Some(msg) = inbox.recv() => {
493 match msg {
494 ClientActorMessage::Ping{ done } => {
495 let res = self.send_ping().await;
496 if let Err(err) = done.send(res) {
497 debug!("failed to send ping: {:#?}", err);
498 self.authorized = false;
499 }
500 },
501 ClientActorMessage::SendMetrics{ done } => {
502 trace!("sending metrics manually triggered");
503 let res = self.send_metrics(&mut encoder).await;
504 if let Err(err) = done.send(res) {
505 debug!("failed to push metrics: {:#?}", err);
506 self.authorized = false;
507 }
508 }
509 ClientActorMessage::GrantCap{ cap, done } => {
510 let res = self.grant_cap(*cap).await;
511 if let Err(err) = done.send(res) {
512 warn!("failed to grant capability: {:#?}", err);
513 }
514 }
515 ClientActorMessage::ReadName{ done } => {
516 if let Err(err) = done.send(self.name.clone()) {
517 warn!("sending name value: {:#?}", err);
518 }
519 }
520 ClientActorMessage::NameEndpoint{ name, done } => {
521 let res = self.send_name_endpoint(name).await;
522 if let Err(err) = done.send(res) {
523 warn!("failed to name endpoint: {:#?}", err);
524 }
525 }
526 ClientActorMessage::PutNetworkDiagnostics{ report, done } => {
527 let res = self.put_network_diagnostics(*report).await;
528 if let Err(err) = done.send(res) {
529 warn!("failed to publish network diagnostics: {:#?}", err);
530 }
531 }
532 }
533 }
534 _ = async {
535 if let Some(ref mut timer) = metrics_timer {
536 timer.tick().await;
537 } else {
538 std::future::pending::<()>().await;
539 }
540 } => {
541 trace!("metrics send tick");
542 if let Err(err) = self.send_metrics(&mut encoder).await {
543 debug!("failed to push metrics: {:#?}", err);
544 self.authorized = false;
545 }
546 },
547 }
548 }
549 }
550
551 async fn auth(&mut self) -> Result<(), RemoteError> {
553 if self.authorized {
554 return Ok(());
555 }
556 trace!("client authorizing");
557 self.client
558 .rpc(Auth {
559 caps: self.capabilities.clone(),
560 })
561 .await
562 .inspect_err(|e| debug!("authorization failed: {:?}", e))
563 .map_err(|e| RemoteError::AuthError(e.to_string()))?;
564 self.authorized = true;
565
566 #[cfg(not(target_arch = "wasm32"))]
573 if let Some(collector) = self.log_collector.as_ref() {
574 match self.client.rpc(crate::protocol::GetLogLevel).await {
575 Ok(Ok(Some(settings))) => {
576 let expires_in = settings.expires_in_secs.map(Duration::from_secs);
577 if let Err(err) = collector.set_filter(
578 &settings.directives,
579 expires_in,
580 settings.revert_to.as_deref(),
581 ) {
582 warn!(?err, "failed to apply initial log level");
583 }
584 }
585 Ok(Ok(None)) => {
586 }
588 Ok(Err(err)) => {
589 debug!(?err, "cloud rejected initial GetLogLevel");
590 }
591 Err(err) => {
592 debug!(?err, "initial GetLogLevel rpc failed");
593 }
594 }
595 }
596
597 Ok(())
598 }
599
600 async fn send_ping(&mut self) -> Result<Pong, RemoteError> {
601 trace!("client actor send ping");
602 self.auth().await?;
603
604 let req = rand::random();
605 self.client
606 .rpc(Ping { req_id: req })
607 .await
608 .inspect_err(|e| warn!("rpc ping error: {e}"))
609 .map_err(|_| RemoteError::InternalServerError)
610 }
611
612 async fn send_name_endpoint(&mut self, name: String) -> Result<(), RemoteError> {
613 trace!("client sending name endpoint request");
614 self.auth().await?;
615
616 self.client
617 .rpc(NameEndpoint { name: name.clone() })
618 .await
619 .inspect_err(|e| debug!("name endpoint error: {e}"))
620 .map_err(|_| RemoteError::InternalServerError)??;
621 self.name = Some(name);
622 Ok(())
623 }
624
625 async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> {
626 trace!("client actor send metrics");
627 self.auth().await?;
628
629 let update = encoder.export();
630 let req = PutMetrics {
632 session_id: self.session_id,
633 update,
634 };
635
636 self.client
637 .rpc(req)
638 .await
639 .map_err(|_| RemoteError::InternalServerError)??;
640
641 Ok(())
642 }
643
644 async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
645 trace!("client actor grant capability");
646 self.auth().await?;
647
648 self.client
649 .rpc(crate::protocol::GrantCap { cap })
650 .await
651 .map_err(|_| RemoteError::InternalServerError)??;
652
653 Ok(())
654 }
655
656 async fn put_network_diagnostics(
657 &mut self,
658 report: crate::net_diagnostics::DiagnosticsReport,
659 ) -> Result<(), Error> {
660 trace!("client actor publish network diagnostics");
661 self.auth().await?;
662
663 let req = PutNetworkDiagnostics { report };
664
665 self.client
666 .rpc(req)
667 .await
668 .map_err(|_| RemoteError::InternalServerError)??;
669
670 Ok(())
671 }
672}
673
674async fn set_name_inner(
675 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
676 name: String,
677) -> Result<(), Error> {
678 validate_name(&name)?;
679 debug!(name_len = name.len(), "calling set name");
680 let (tx, rx) = oneshot::channel();
681 message_channel
682 .send(ClientActorMessage::NameEndpoint { name, done: tx })
683 .await
684 .map_err(|_| Error::Other(anyhow!("sending name endpoint request")))?;
685 rx.await
686 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
687 .map_err(Error::Remote)
688}
689
690#[cfg(test)]
691mod tests {
692 use iroh::{Endpoint, EndpointAddr, SecretKey, endpoint::presets};
693 use rand::{RngExt, SeedableRng};
694 use temp_env_vars::temp_env_vars;
695
696 use crate::{
697 Client,
698 api_secret::ApiSecret,
699 caps::{Cap, Caps},
700 client::{API_SECRET_ENV_VAR_NAME, BuildError, ValidateNameError},
701 };
702
703 #[tokio::test]
704 #[temp_env_vars]
705 async fn test_api_key_from_env() {
706 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
708 let shared_secret = SecretKey::from_bytes(&rng.random());
709 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
710 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
711 unsafe {
712 std::env::set_var(API_SECRET_ENV_VAR_NAME, api_secret.to_string());
713 };
714
715 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
716
717 let builder = Client::builder(&endpoint).api_secret_from_env().unwrap();
718
719 let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
720 assert_eq!(builder.remote, Some(fake_endpoint_addr));
721
722 let cap = builder.cap.as_ref().expect("expected capability to be set");
725 assert_eq!(cap.capability(), &Caps::new([Cap::Client]));
726 assert_eq!(cap.audience(), &endpoint.id().as_verifying_key());
727 assert_eq!(cap.issuer(), &shared_secret.public().as_verifying_key());
728 }
729
730 #[tokio::test]
733 async fn test_no_metrics_interval() {
734 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(1);
735 let shared_secret = SecretKey::from_bytes(&rng.random());
736 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
737 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
738
739 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
740
741 let client = Client::builder(&endpoint)
742 .disable_metrics_interval()
743 .api_secret(api_secret)
744 .unwrap()
745 .build()
746 .await
747 .unwrap();
748
749 let err = client.push_metrics().await;
750 assert!(err.is_err());
751 }
752
753 #[tokio::test]
754 async fn test_name() {
755 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
756 let shared_secret = SecretKey::from_bytes(&rng.random());
757 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
758 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
759
760 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
761
762 let builder = Client::builder(&endpoint)
763 .name("my-node 👋")
764 .unwrap()
765 .api_secret(api_secret)
766 .unwrap();
767
768 assert_eq!(builder.name, Some("my-node 👋".to_string()));
769
770 let Err(err) = Client::builder(&endpoint).name("a") else {
771 panic!("name should fail for strings under 2 bytes");
772 };
773 assert!(matches!(
774 err.downcast_ref::<BuildError>(),
775 Some(BuildError::InvalidName(ValidateNameError::TooShort))
776 ));
777
778 let too_long_name = "👋".repeat(129);
779 let Err(err) = Client::builder(&endpoint).name(&too_long_name) else {
780 panic!("name should fail for strings over 128 bytes");
781 };
782 assert!(matches!(
783 err.downcast_ref::<BuildError>(),
784 Some(BuildError::InvalidName(ValidateNameError::TooLong))
785 ));
786 }
787}