1use std::{
2 str::FromStr,
3 sync::{Arc, RwLock},
4};
5
6use anyhow::{Result, anyhow, ensure};
7use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
8use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder};
9use irpc_iroh::IrohLazyRemoteConnection;
10use n0_error::StackResultExt;
11use n0_future::{task::AbortOnDropHandle, time::Duration};
12use rcan::Rcan;
13use tokio::sync::oneshot;
14use tracing::{debug, trace, warn};
15use uuid::Uuid;
16
17use crate::{
18 api_secret::ApiSecret,
19 caps::Caps,
20 net_diagnostics::{DiagnosticsReport, checks::run_diagnostics},
21 protocol::{
22 ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutMetrics,
23 PutNetworkDiagnostics, RemoteError,
24 },
25};
26
27#[derive(Debug, Clone)]
51pub struct Client {
52 #[allow(dead_code)]
54 endpoint: Endpoint,
55 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
56 _actor_task: Arc<AbortOnDropHandle<()>>,
57}
58
59pub struct ClientBuilder {
62 #[allow(dead_code)]
63 cap_expiry: Duration,
64 cap: Option<Rcan<Caps>>,
65 endpoint: Endpoint,
66 name: Option<String>,
67 metrics_interval: Option<Duration>,
68 remote: Option<EndpointAddr>,
69 registry: Registry,
70 log_collector: Option<crate::logs::LogCollector>,
71}
72
73const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET";
75
76impl ClientBuilder {
77 pub fn new(endpoint: &Endpoint) -> Self {
78 let mut registry = Registry::default();
79 registry.register_all(endpoint.metrics());
80
81 Self {
82 cap: None,
83 cap_expiry: DEFAULT_CAP_EXPIRY,
84 endpoint: endpoint.clone(),
85 name: None,
86 metrics_interval: Some(Duration::from_secs(60)),
87 remote: None,
88 registry,
89 log_collector: None,
90 }
91 }
92
93 pub fn with_log_collector(mut self, collector: crate::logs::LogCollector) -> Self {
102 self.log_collector = Some(collector);
103 self
104 }
105
106 pub fn register_metrics_group(mut self, metrics_group: Arc<dyn MetricsGroup>) -> Self {
110 self.registry.register(metrics_group);
111 self
112 }
113
114 pub fn metrics_interval(mut self, interval: Duration) -> Self {
118 self.metrics_interval = Some(interval);
119 self
120 }
121
122 pub fn disable_metrics_interval(mut self) -> Self {
124 self.metrics_interval = None;
125 self
126 }
127
128 pub fn name(mut self, name: impl Into<String>) -> Result<Self> {
143 let name = name.into();
144 validate_name(&name).map_err(BuildError::InvalidName)?;
145 self.name = Some(name);
146 Ok(self)
147 }
148
149 pub fn api_secret_from_env(self) -> Result<Self> {
151 let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
152 self.api_secret(ticket)
153 }
154
155 pub fn api_secret_from_str(self, secret_key: &str) -> Result<Self> {
157 let key = ApiSecret::from_str(secret_key).context("invalid iroh services api secret")?;
158 self.api_secret(key)
159 }
160
161 pub fn api_secret(mut self, ticket: ApiSecret) -> Result<Self> {
168 let local_id = self.endpoint.id();
169 let rcan = crate::caps::create_api_token_from_secret_key(
170 ticket.secret,
171 local_id,
172 self.cap_expiry,
173 Caps::for_shared_secret(),
174 )?;
175
176 self.remote = Some(ticket.remote);
177 self.rcan(rcan)
178 }
179
180 #[cfg(not(target_arch = "wasm32"))]
182 pub async fn ssh_key_from_file<P: AsRef<std::path::Path>>(self, path: P) -> Result<Self> {
183 let file_content = tokio::fs::read_to_string(path).await?;
184 let private_key = ssh_key::PrivateKey::from_openssh(&file_content)?;
185
186 self.ssh_key(&private_key)
187 }
188
189 #[cfg(not(target_arch = "wasm32"))]
191 pub fn ssh_key(mut self, key: &ssh_key::PrivateKey) -> Result<Self> {
192 let local_id = self.endpoint.id();
193 let rcan = crate::caps::create_api_token_from_ssh_key(
194 key,
195 local_id,
196 self.cap_expiry,
197 Caps::all(),
198 )?;
199 self.cap.replace(rcan);
200
201 Ok(self)
202 }
203
204 pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
206 ensure!(
207 EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
208 "invalid audience"
209 );
210 self.cap.replace(cap);
211 Ok(self)
212 }
213
214 pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
217 self.remote = Some(remote.into());
218 self
219 }
220
221 #[must_use = "dropping the client will silently cancel all client tasks"]
223 pub async fn build(self) -> Result<Client, BuildError> {
224 debug!("starting iroh-services client");
225 let remote = self.remote.ok_or(BuildError::MissingRemote)?;
226 let capabilities = self.cap.ok_or(BuildError::MissingCapability)?;
227
228 let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
229 let irpc_client = IrohServicesClient::boxed(conn);
230
231 let session_id = Uuid::new_v4();
232 let (tx, rx) = tokio::sync::mpsc::channel(8);
237 let actor_task = AbortOnDropHandle::new(n0_future::task::spawn(
238 ClientActor {
239 capabilities,
240 client: irpc_client,
241 name: self.name.clone(),
242 session_id,
243 authorized: false,
244 log_collector: self.log_collector,
245 }
246 .run(self.name, self.registry, self.metrics_interval, rx),
247 ));
248
249 Ok(Client {
250 endpoint: self.endpoint,
251 message_channel: tx,
252 _actor_task: Arc::new(actor_task),
253 })
254 }
255}
256
257#[derive(thiserror::Error, Debug)]
258pub enum BuildError {
259 #[error("Missing remote endpoint to dial")]
260 MissingRemote,
261 #[error("Missing capability")]
262 MissingCapability,
263 #[error("Unauthorized")]
264 Unauthorized,
265 #[error("Remote error: {0}")]
266 Remote(#[from] RemoteError),
267 #[error("Rpc connection error: {0}")]
268 Rpc(irpc::Error),
269 #[error("Connection error: {0}")]
270 Connect(ConnectError),
271 #[error("Invalid endpoint name: {0}")]
272 InvalidName(#[from] ValidateNameError),
273}
274
275impl From<irpc::Error> for BuildError {
276 fn from(value: irpc::Error) -> Self {
277 match value {
278 irpc::Error::Request {
279 source:
280 irpc::RequestError::Connection {
281 source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
282 ..
283 },
284 ..
285 } if frame.error_code == 401u32.into() => Self::Unauthorized,
286 value => Self::Rpc(value),
287 }
288 }
289}
290
291pub const CLIENT_NAME_MIN_LENGTH: usize = 2;
293pub const CLIENT_NAME_MAX_LENGTH: usize = 128;
295
296#[derive(Debug, thiserror::Error)]
298pub enum ValidateNameError {
299 #[error("Name is too long (must be no more than {CLIENT_NAME_MAX_LENGTH} characters).")]
300 TooLong,
301 #[error("Name is too short (must be at least {CLIENT_NAME_MIN_LENGTH} characters).")]
302 TooShort,
303}
304
305fn validate_name(name: &str) -> Result<(), ValidateNameError> {
306 if name.len() < CLIENT_NAME_MIN_LENGTH {
307 Err(ValidateNameError::TooShort)
308 } else if name.len() > CLIENT_NAME_MAX_LENGTH {
309 Err(ValidateNameError::TooLong)
310 } else {
311 Ok(())
312 }
313}
314
315#[derive(thiserror::Error, Debug)]
316pub enum Error {
317 #[error("Invalid endpoint name: {0}")]
318 InvalidName(#[from] ValidateNameError),
319 #[error("Remote error: {0}")]
320 Remote(#[from] RemoteError),
321 #[error("Connection error: {0}")]
322 Rpc(#[from] irpc::Error),
323 #[error(transparent)]
324 Other(#[from] anyhow::Error),
325}
326
327impl Client {
328 pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
329 ClientBuilder::new(endpoint)
330 }
331
332 pub async fn name(&self) -> Result<Option<String>, Error> {
334 let (tx, rx) = oneshot::channel();
335 self.message_channel
336 .send(ClientActorMessage::ReadName { done: tx })
337 .await
338 .map_err(|_| Error::Other(anyhow!("sending name read request")))?;
339
340 rx.await
341 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))
342 }
343
344 pub async fn set_name(&self, name: impl Into<String>) -> Result<(), Error> {
349 set_name_inner(self.message_channel.clone(), name.into()).await
350 }
351
352 pub async fn ping(&self) -> Result<Pong, Error> {
354 let (tx, rx) = oneshot::channel();
355 self.message_channel
356 .send(ClientActorMessage::Ping { done: tx })
357 .await
358 .map_err(|_| Error::Other(anyhow!("sending ping request")))?;
359
360 rx.await
361 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
362 .map_err(Error::Remote)
363 }
364
365 pub async fn push_metrics(&self) -> Result<(), Error> {
369 let (tx, rx) = oneshot::channel();
370 self.message_channel
371 .send(ClientActorMessage::SendMetrics { done: tx })
372 .await
373 .map_err(|_| Error::Other(anyhow!("sending metrics")))?;
374
375 rx.await
376 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
377 .map_err(Error::Remote)
378 }
379
380 pub async fn grant_capability(
384 &self,
385 remote_id: EndpointId,
386 caps: impl IntoIterator<Item = impl Into<crate::caps::Cap>>,
387 ) -> Result<(), Error> {
388 let cap = crate::caps::create_grant_token(
389 self.endpoint.secret_key().clone(),
390 remote_id,
391 DEFAULT_CAP_EXPIRY,
392 Caps::new(caps),
393 )
394 .map_err(Error::Other)?;
395
396 let (tx, rx) = oneshot::channel();
397 self.message_channel
398 .send(ClientActorMessage::GrantCap {
399 cap: Box::new(cap),
400 done: tx,
401 })
402 .await
403 .map_err(|_| Error::Other(anyhow!("granting capability")))?;
404
405 rx.await
406 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
407 }
408
409 pub async fn net_diagnostics(&self, send: bool) -> Result<DiagnosticsReport, Error> {
411 let report = run_diagnostics(&self.endpoint).await?;
412 if send {
413 let (tx, rx) = oneshot::channel();
414 self.message_channel
415 .send(ClientActorMessage::PutNetworkDiagnostics {
416 done: tx,
417 report: Box::new(report.clone()),
418 })
419 .await
420 .map_err(|_| Error::Other(anyhow!("sending network diagnostics report")))?;
421
422 let _ = rx
423 .await
424 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
425 }
426
427 Ok(report)
428 }
429}
430
431enum ClientActorMessage {
432 SendMetrics {
433 done: oneshot::Sender<Result<(), RemoteError>>,
434 },
435 Ping {
436 done: oneshot::Sender<Result<Pong, RemoteError>>,
437 },
438 #[allow(dead_code)]
440 GrantCap {
441 cap: Box<Rcan<Caps>>,
443 done: oneshot::Sender<Result<(), Error>>,
444 },
445 PutNetworkDiagnostics {
446 report: Box<DiagnosticsReport>,
447 done: oneshot::Sender<Result<(), Error>>,
448 },
449 ReadName {
450 done: oneshot::Sender<Option<String>>,
451 },
452 NameEndpoint {
453 name: String,
454 done: oneshot::Sender<Result<(), RemoteError>>,
455 },
456}
457
458struct ClientActor {
459 capabilities: Rcan<Caps>,
460 client: IrohServicesClient,
461 name: Option<String>,
462 session_id: Uuid,
463 authorized: bool,
464 log_collector: Option<crate::logs::LogCollector>,
465}
466
467impl ClientActor {
468 async fn run(
469 mut self,
470 initial_name: Option<String>,
471 registry: Registry,
472 interval: Option<Duration>,
473 mut inbox: tokio::sync::mpsc::Receiver<ClientActorMessage>,
474 ) {
475 let registry = Arc::new(RwLock::new(registry));
476 let mut encoder = Encoder::new(registry);
477 let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
478 trace!("starting client actor");
479
480 if let Some(name) = initial_name
481 && let Err(err) = self.send_name_endpoint(name).await
482 {
483 warn!(err = %err, "failed setting endpoint name on startup");
484 }
485
486 loop {
487 trace!("client actor tick");
488 tokio::select! {
489 biased;
490 Some(msg) = inbox.recv() => {
491 match msg {
492 ClientActorMessage::Ping{ done } => {
493 let res = self.send_ping().await;
494 if let Err(err) = done.send(res) {
495 debug!("failed to send ping: {:#?}", err);
496 self.authorized = false;
497 }
498 },
499 ClientActorMessage::SendMetrics{ done } => {
500 trace!("sending metrics manually triggered");
501 let res = self.send_metrics(&mut encoder).await;
502 if let Err(err) = done.send(res) {
503 debug!("failed to push metrics: {:#?}", err);
504 self.authorized = false;
505 }
506 }
507 ClientActorMessage::GrantCap{ cap, done } => {
508 let res = self.grant_cap(*cap).await;
509 if let Err(err) = done.send(res) {
510 warn!("failed to grant capability: {:#?}", err);
511 }
512 }
513 ClientActorMessage::ReadName{ done } => {
514 if let Err(err) = done.send(self.name.clone()) {
515 warn!("sending name value: {:#?}", err);
516 }
517 }
518 ClientActorMessage::NameEndpoint{ name, done } => {
519 let res = self.send_name_endpoint(name).await;
520 if let Err(err) = done.send(res) {
521 warn!("failed to name endpoint: {:#?}", err);
522 }
523 }
524 ClientActorMessage::PutNetworkDiagnostics{ report, done } => {
525 let res = self.put_network_diagnostics(*report).await;
526 if let Err(err) = done.send(res) {
527 warn!("failed to publish network diagnostics: {:#?}", err);
528 }
529 }
530 }
531 }
532 _ = async {
533 if let Some(ref mut timer) = metrics_timer {
534 timer.tick().await;
535 } else {
536 std::future::pending::<()>().await;
537 }
538 } => {
539 trace!("metrics send tick");
540 if let Err(err) = self.send_metrics(&mut encoder).await {
541 debug!("failed to push metrics: {:#?}", err);
542 self.authorized = false;
543 }
544 },
545 }
546 }
547 }
548
549 async fn auth(&mut self) -> Result<(), RemoteError> {
551 if self.authorized {
552 return Ok(());
553 }
554 trace!("client authorizing");
555 self.client
556 .rpc(Auth {
557 caps: self.capabilities.clone(),
558 })
559 .await
560 .inspect_err(|e| debug!("authorization failed: {:?}", e))
561 .map_err(|e| RemoteError::AuthError(e.to_string()))?;
562 self.authorized = true;
563
564 if let Some(collector) = self.log_collector.as_ref() {
570 match self.client.rpc(crate::protocol::GetLogLevel).await {
571 Ok(Ok(Some(settings))) => {
572 let expires_in = settings.expires_in_secs.map(Duration::from_secs);
573 if let Err(err) = collector.set_filter(
574 &settings.directives,
575 expires_in,
576 settings.revert_to.as_deref(),
577 ) {
578 warn!(?err, "failed to apply initial log level");
579 }
580 }
581 Ok(Ok(None)) => {
582 }
584 Ok(Err(err)) => {
585 debug!(?err, "cloud rejected initial GetLogLevel");
586 }
587 Err(err) => {
588 debug!(?err, "initial GetLogLevel rpc failed");
589 }
590 }
591 }
592
593 Ok(())
594 }
595
596 async fn send_ping(&mut self) -> Result<Pong, RemoteError> {
597 trace!("client actor send ping");
598 self.auth().await?;
599
600 let req = rand::random();
601 self.client
602 .rpc(Ping { req_id: req })
603 .await
604 .inspect_err(|e| warn!("rpc ping error: {e}"))
605 .map_err(|_| RemoteError::InternalServerError)
606 }
607
608 async fn send_name_endpoint(&mut self, name: String) -> Result<(), RemoteError> {
609 trace!("client sending name endpoint request");
610 self.auth().await?;
611
612 self.client
613 .rpc(NameEndpoint { name: name.clone() })
614 .await
615 .inspect_err(|e| debug!("name endpoint error: {e}"))
616 .map_err(|_| RemoteError::InternalServerError)??;
617 self.name = Some(name);
618 Ok(())
619 }
620
621 async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> {
622 trace!("client actor send metrics");
623 self.auth().await?;
624
625 let update = encoder.export();
626 let req = PutMetrics {
628 session_id: self.session_id,
629 update,
630 };
631
632 self.client
633 .rpc(req)
634 .await
635 .map_err(|_| RemoteError::InternalServerError)??;
636
637 Ok(())
638 }
639
640 async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
641 trace!("client actor grant capability");
642 self.auth().await?;
643
644 self.client
645 .rpc(crate::protocol::GrantCap { cap })
646 .await
647 .map_err(|_| RemoteError::InternalServerError)??;
648
649 Ok(())
650 }
651
652 async fn put_network_diagnostics(
653 &mut self,
654 report: crate::net_diagnostics::DiagnosticsReport,
655 ) -> Result<(), Error> {
656 trace!("client actor publish network diagnostics");
657 self.auth().await?;
658
659 let req = PutNetworkDiagnostics { report };
660
661 self.client
662 .rpc(req)
663 .await
664 .map_err(|_| RemoteError::InternalServerError)??;
665
666 Ok(())
667 }
668}
669
670async fn set_name_inner(
671 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
672 name: String,
673) -> Result<(), Error> {
674 validate_name(&name)?;
675 debug!(name_len = name.len(), "calling set name");
676 let (tx, rx) = oneshot::channel();
677 message_channel
678 .send(ClientActorMessage::NameEndpoint { name, done: tx })
679 .await
680 .map_err(|_| Error::Other(anyhow!("sending name endpoint request")))?;
681 rx.await
682 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
683 .map_err(Error::Remote)
684}
685
686#[cfg(test)]
687mod tests {
688 use iroh::{Endpoint, EndpointAddr, SecretKey, endpoint::presets};
689 use rand::{RngExt, SeedableRng};
690 use temp_env_vars::temp_env_vars;
691
692 use crate::{
693 Client,
694 api_secret::ApiSecret,
695 caps::{Cap, Caps},
696 client::{API_SECRET_ENV_VAR_NAME, BuildError, ValidateNameError},
697 };
698
699 #[tokio::test]
700 #[temp_env_vars]
701 async fn test_api_key_from_env() {
702 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
704 let shared_secret = SecretKey::from_bytes(&rng.random());
705 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
706 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
707 unsafe {
708 std::env::set_var(API_SECRET_ENV_VAR_NAME, api_secret.to_string());
709 };
710
711 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
712
713 let builder = Client::builder(&endpoint).api_secret_from_env().unwrap();
714
715 let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
716 assert_eq!(builder.remote, Some(fake_endpoint_addr));
717
718 let cap = builder.cap.as_ref().expect("expected capability to be set");
721 assert_eq!(cap.capability(), &Caps::new([Cap::Client]));
722 assert_eq!(cap.audience(), &endpoint.id().as_verifying_key());
723 assert_eq!(cap.issuer(), &shared_secret.public().as_verifying_key());
724 }
725
726 #[tokio::test]
729 async fn test_no_metrics_interval() {
730 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(1);
731 let shared_secret = SecretKey::from_bytes(&rng.random());
732 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
733 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
734
735 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
736
737 let client = Client::builder(&endpoint)
738 .disable_metrics_interval()
739 .api_secret(api_secret)
740 .unwrap()
741 .build()
742 .await
743 .unwrap();
744
745 let err = client.push_metrics().await;
746 assert!(err.is_err());
747 }
748
749 #[tokio::test]
750 async fn test_name() {
751 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
752 let shared_secret = SecretKey::from_bytes(&rng.random());
753 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
754 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
755
756 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
757
758 let builder = Client::builder(&endpoint)
759 .name("my-node 👋")
760 .unwrap()
761 .api_secret(api_secret)
762 .unwrap();
763
764 assert_eq!(builder.name, Some("my-node 👋".to_string()));
765
766 let Err(err) = Client::builder(&endpoint).name("a") else {
767 panic!("name should fail for strings under 2 bytes");
768 };
769 assert!(matches!(
770 err.downcast_ref::<BuildError>(),
771 Some(BuildError::InvalidName(ValidateNameError::TooShort))
772 ));
773
774 let too_long_name = "👋".repeat(129);
775 let Err(err) = Client::builder(&endpoint).name(&too_long_name) else {
776 panic!("name should fail for strings over 128 bytes");
777 };
778 assert!(matches!(
779 err.downcast_ref::<BuildError>(),
780 Some(BuildError::InvalidName(ValidateNameError::TooLong))
781 ));
782 }
783}