1use std::{
2 str::FromStr,
3 sync::{Arc, RwLock},
4};
5
6use anyhow::{Result, anyhow, ensure};
7use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
8use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder};
9use irpc_iroh::IrohLazyRemoteConnection;
10use n0_error::StackResultExt;
11use n0_future::{task::AbortOnDropHandle, time::Duration};
12use rcan::Rcan;
13use tokio::sync::oneshot;
14use tracing::{debug, trace, warn};
15use uuid::Uuid;
16
17#[cfg(feature = "net_diagnostics")]
18use crate::net_diagnostics::{DiagnosticsReport, checks::run_diagnostics};
19#[cfg(feature = "net_diagnostics")]
20use crate::protocol::PutNetworkDiagnostics;
21use crate::{
22 api_secret::ApiSecret,
23 caps::Caps,
24 protocol::{ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutMetrics, RemoteError},
25};
26
27#[derive(Debug, Clone)]
51pub struct Client {
52 #[allow(dead_code)]
54 endpoint: Endpoint,
55 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
56 _actor_task: Arc<AbortOnDropHandle<()>>,
57}
58
59pub struct ClientBuilder {
62 #[allow(dead_code)]
63 cap_expiry: Duration,
64 cap: Option<Rcan<Caps>>,
65 endpoint: Endpoint,
66 name: Option<String>,
67 metrics_interval: Option<Duration>,
68 remote: Option<EndpointAddr>,
69 registry: Registry,
70}
71
72const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET";
74
75impl ClientBuilder {
76 pub fn new(endpoint: &Endpoint) -> Self {
77 let mut registry = Registry::default();
78 registry.register_all(endpoint.metrics());
79
80 Self {
81 cap: None,
82 cap_expiry: DEFAULT_CAP_EXPIRY,
83 endpoint: endpoint.clone(),
84 name: None,
85 metrics_interval: Some(Duration::from_secs(60)),
86 remote: None,
87 registry,
88 }
89 }
90
91 pub fn register_metrics_group(mut self, metrics_group: Arc<dyn MetricsGroup>) -> Self {
95 self.registry.register(metrics_group);
96 self
97 }
98
99 pub fn metrics_interval(mut self, interval: Duration) -> Self {
103 self.metrics_interval = Some(interval);
104 self
105 }
106
107 pub fn disable_metrics_interval(mut self) -> Self {
109 self.metrics_interval = None;
110 self
111 }
112
113 pub fn name(mut self, name: impl Into<String>) -> Result<Self> {
128 let name = name.into();
129 validate_name(&name).map_err(BuildError::InvalidName)?;
130 self.name = Some(name);
131 Ok(self)
132 }
133
134 pub fn api_secret_from_env(self) -> Result<Self> {
136 let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
137 self.api_secret(ticket)
138 }
139
140 pub fn api_secret_from_str(self, secret_key: &str) -> Result<Self> {
142 let key = ApiSecret::from_str(secret_key).context("invalid iroh services api secret")?;
143 self.api_secret(key)
144 }
145
146 pub fn api_secret(mut self, ticket: ApiSecret) -> Result<Self> {
153 let local_id = self.endpoint.id();
154 let rcan = crate::caps::create_api_token_from_secret_key(
155 ticket.secret,
156 local_id,
157 self.cap_expiry,
158 Caps::for_shared_secret(),
159 )?;
160
161 self.remote = Some(ticket.remote);
162 self.rcan(rcan)
163 }
164
165 #[cfg(feature = "ssh-key")]
167 pub async fn ssh_key_from_file<P: AsRef<std::path::Path>>(self, path: P) -> Result<Self> {
168 let file_content = tokio::fs::read_to_string(path).await?;
169 let private_key = ssh_key::PrivateKey::from_openssh(&file_content)?;
170
171 self.ssh_key(&private_key)
172 }
173
174 #[cfg(feature = "ssh-key")]
176 pub fn ssh_key(mut self, key: &ssh_key::PrivateKey) -> Result<Self> {
177 let local_id = self.endpoint.id();
178 let rcan = crate::caps::create_api_token_from_ssh_key(
179 key,
180 local_id,
181 self.cap_expiry,
182 Caps::all(),
183 )?;
184 self.cap.replace(rcan);
185
186 Ok(self)
187 }
188
189 pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
191 ensure!(
192 EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
193 "invalid audience"
194 );
195 self.cap.replace(cap);
196 Ok(self)
197 }
198
199 pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
202 self.remote = Some(remote.into());
203 self
204 }
205
206 #[must_use = "dropping the client will silently cancel all client tasks"]
208 pub async fn build(self) -> Result<Client, BuildError> {
209 debug!("starting iroh-services client");
210 let remote = self.remote.ok_or(BuildError::MissingRemote)?;
211 let capabilities = self.cap.ok_or(BuildError::MissingCapability)?;
212
213 let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
214 let irpc_client = IrohServicesClient::boxed(conn);
215
216 let (tx, rx) = tokio::sync::mpsc::channel(1);
217 let actor_task = AbortOnDropHandle::new(n0_future::task::spawn(
218 ClientActor {
219 capabilities,
220 client: irpc_client,
221 name: self.name.clone(),
222 session_id: Uuid::new_v4(),
223 authorized: false,
224 }
225 .run(self.name, self.registry, self.metrics_interval, rx),
226 ));
227
228 Ok(Client {
229 endpoint: self.endpoint,
230 message_channel: tx,
231 _actor_task: Arc::new(actor_task),
232 })
233 }
234}
235
236#[derive(thiserror::Error, Debug)]
237pub enum BuildError {
238 #[error("Missing remote endpoint to dial")]
239 MissingRemote,
240 #[error("Missing capability")]
241 MissingCapability,
242 #[error("Unauthorized")]
243 Unauthorized,
244 #[error("Remote error: {0}")]
245 Remote(#[from] RemoteError),
246 #[error("Rpc connection error: {0}")]
247 Rpc(irpc::Error),
248 #[error("Connection error: {0}")]
249 Connect(ConnectError),
250 #[error("Invalid endpoint name: {0}")]
251 InvalidName(#[from] ValidateNameError),
252}
253
254impl From<irpc::Error> for BuildError {
255 fn from(value: irpc::Error) -> Self {
256 match value {
257 irpc::Error::Request {
258 source:
259 irpc::RequestError::Connection {
260 source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
261 ..
262 },
263 ..
264 } if frame.error_code == 401u32.into() => Self::Unauthorized,
265 value => Self::Rpc(value),
266 }
267 }
268}
269
270pub const CLIENT_NAME_MIN_LENGTH: usize = 2;
272pub const CLIENT_NAME_MAX_LENGTH: usize = 128;
274
275#[derive(Debug, thiserror::Error)]
277pub enum ValidateNameError {
278 #[error("Name is too long (must be no more than {CLIENT_NAME_MAX_LENGTH} characters).")]
279 TooLong,
280 #[error("Name is too short (must be at least {CLIENT_NAME_MIN_LENGTH} characters).")]
281 TooShort,
282}
283
284fn validate_name(name: &str) -> Result<(), ValidateNameError> {
285 if name.len() < CLIENT_NAME_MIN_LENGTH {
286 Err(ValidateNameError::TooShort)
287 } else if name.len() > CLIENT_NAME_MAX_LENGTH {
288 Err(ValidateNameError::TooLong)
289 } else {
290 Ok(())
291 }
292}
293
294#[derive(thiserror::Error, Debug)]
295pub enum Error {
296 #[error("Invalid endpoint name: {0}")]
297 InvalidName(#[from] ValidateNameError),
298 #[error("Remote error: {0}")]
299 Remote(#[from] RemoteError),
300 #[error("Connection error: {0}")]
301 Rpc(#[from] irpc::Error),
302 #[error(transparent)]
303 Other(#[from] anyhow::Error),
304}
305
306impl Client {
307 pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
308 ClientBuilder::new(endpoint)
309 }
310
311 pub async fn name(&self) -> Result<Option<String>, Error> {
313 let (tx, rx) = oneshot::channel();
314 self.message_channel
315 .send(ClientActorMessage::ReadName { done: tx })
316 .await
317 .map_err(|_| Error::Other(anyhow!("sending name read request")))?;
318
319 rx.await
320 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))
321 }
322
323 pub async fn set_name(&self, name: impl Into<String>) -> Result<(), Error> {
328 set_name_inner(self.message_channel.clone(), name.into()).await
329 }
330
331 pub async fn ping(&self) -> Result<Pong, Error> {
333 let (tx, rx) = oneshot::channel();
334 self.message_channel
335 .send(ClientActorMessage::Ping { done: tx })
336 .await
337 .map_err(|_| Error::Other(anyhow!("sending ping request")))?;
338
339 rx.await
340 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
341 .map_err(Error::Remote)
342 }
343
344 pub async fn push_metrics(&self) -> Result<(), Error> {
348 let (tx, rx) = oneshot::channel();
349 self.message_channel
350 .send(ClientActorMessage::SendMetrics { done: tx })
351 .await
352 .map_err(|_| Error::Other(anyhow!("sending metrics")))?;
353
354 rx.await
355 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
356 .map_err(Error::Remote)
357 }
358
359 #[cfg(feature = "client_host")]
363 pub async fn grant_capability(
364 &self,
365 remote_id: EndpointId,
366 caps: impl IntoIterator<Item = impl Into<crate::caps::Cap>>,
367 ) -> Result<(), Error> {
368 let cap = crate::caps::create_grant_token(
369 self.endpoint.secret_key().clone(),
370 remote_id,
371 DEFAULT_CAP_EXPIRY,
372 Caps::new(caps),
373 )
374 .map_err(Error::Other)?;
375
376 let (tx, rx) = oneshot::channel();
377 self.message_channel
378 .send(ClientActorMessage::GrantCap {
379 cap: Box::new(cap),
380 done: tx,
381 })
382 .await
383 .map_err(|_| Error::Other(anyhow!("granting capability")))?;
384
385 rx.await
386 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
387 }
388
389 #[cfg(feature = "net_diagnostics")]
391 pub async fn net_diagnostics(&self, send: bool) -> Result<DiagnosticsReport, Error> {
392 let report = run_diagnostics(&self.endpoint).await?;
393 if send {
394 let (tx, rx) = oneshot::channel();
395 self.message_channel
396 .send(ClientActorMessage::PutNetworkDiagnostics {
397 done: tx,
398 report: Box::new(report.clone()),
399 })
400 .await
401 .map_err(|_| Error::Other(anyhow!("sending network diagnostics report")))?;
402
403 let _ = rx
404 .await
405 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
406 }
407
408 Ok(report)
409 }
410}
411
412enum ClientActorMessage {
413 SendMetrics {
414 done: oneshot::Sender<Result<(), RemoteError>>,
415 },
416 Ping {
417 done: oneshot::Sender<Result<Pong, RemoteError>>,
418 },
419 #[allow(dead_code)]
421 GrantCap {
422 cap: Box<Rcan<Caps>>,
424 done: oneshot::Sender<Result<(), Error>>,
425 },
426 #[cfg(feature = "net_diagnostics")]
427 PutNetworkDiagnostics {
428 report: Box<DiagnosticsReport>,
429 done: oneshot::Sender<Result<(), Error>>,
430 },
431 ReadName {
432 done: oneshot::Sender<Option<String>>,
433 },
434 NameEndpoint {
435 name: String,
436 done: oneshot::Sender<Result<(), RemoteError>>,
437 },
438}
439
440struct ClientActor {
441 capabilities: Rcan<Caps>,
442 client: IrohServicesClient,
443 name: Option<String>,
444 session_id: Uuid,
445 authorized: bool,
446}
447
448impl ClientActor {
449 async fn run(
450 mut self,
451 initial_name: Option<String>,
452 registry: Registry,
453 interval: Option<Duration>,
454 mut inbox: tokio::sync::mpsc::Receiver<ClientActorMessage>,
455 ) {
456 let registry = Arc::new(RwLock::new(registry));
457 let mut encoder = Encoder::new(registry);
458 let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
459 trace!("starting client actor");
460
461 if let Some(name) = initial_name
462 && let Err(err) = self.send_name_endpoint(name).await
463 {
464 warn!(err = %err, "failed setting endpoint name on startup");
465 }
466
467 loop {
468 trace!("client actor tick");
469 tokio::select! {
470 biased;
471 Some(msg) = inbox.recv() => {
472 match msg {
473 ClientActorMessage::Ping{ done } => {
474 let res = self.send_ping().await;
475 if let Err(err) = done.send(res) {
476 debug!("failed to send ping: {:#?}", err);
477 self.authorized = false;
478 }
479 },
480 ClientActorMessage::SendMetrics{ done } => {
481 trace!("sending metrics manually triggered");
482 let res = self.send_metrics(&mut encoder).await;
483 if let Err(err) = done.send(res) {
484 debug!("failed to push metrics: {:#?}", err);
485 self.authorized = false;
486 }
487 }
488 ClientActorMessage::GrantCap{ cap, done } => {
489 let res = self.grant_cap(*cap).await;
490 if let Err(err) = done.send(res) {
491 warn!("failed to grant capability: {:#?}", err);
492 }
493 }
494 ClientActorMessage::ReadName{ done } => {
495 if let Err(err) = done.send(self.name.clone()) {
496 warn!("sending name value: {:#?}", err);
497 }
498 }
499 ClientActorMessage::NameEndpoint{ name, done } => {
500 let res = self.send_name_endpoint(name).await;
501 if let Err(err) = done.send(res) {
502 warn!("failed to name endpoint: {:#?}", err);
503 }
504 }
505 #[cfg(feature = "net_diagnostics")]
506 ClientActorMessage::PutNetworkDiagnostics{ report, done } => {
507 let res = self.put_network_diagnostics(*report).await;
508 if let Err(err) = done.send(res) {
509 warn!("failed to publish network diagnostics: {:#?}", err);
510 }
511 }
512 }
513 }
514 _ = async {
515 if let Some(ref mut timer) = metrics_timer {
516 timer.tick().await;
517 } else {
518 std::future::pending::<()>().await;
519 }
520 } => {
521 trace!("metrics send tick");
522 if let Err(err) = self.send_metrics(&mut encoder).await {
523 debug!("failed to push metrics: {:#?}", err);
524 self.authorized = false;
525 }
526 },
527 }
528 }
529 }
530
531 async fn auth(&mut self) -> Result<(), RemoteError> {
533 if self.authorized {
534 return Ok(());
535 }
536 trace!("client authorizing");
537 self.client
538 .rpc(Auth {
539 caps: self.capabilities.clone(),
540 })
541 .await
542 .inspect_err(|e| debug!("authorization failed: {:?}", e))
543 .map_err(|e| RemoteError::AuthError(e.to_string()))?;
544 self.authorized = true;
545 Ok(())
546 }
547
548 async fn send_ping(&mut self) -> Result<Pong, RemoteError> {
549 trace!("client actor send ping");
550 self.auth().await?;
551
552 let req = rand::random();
553 self.client
554 .rpc(Ping { req_id: req })
555 .await
556 .inspect_err(|e| warn!("rpc ping error: {e}"))
557 .map_err(|_| RemoteError::InternalServerError)
558 }
559
560 async fn send_name_endpoint(&mut self, name: String) -> Result<(), RemoteError> {
561 trace!("client sending name endpoint request");
562 self.auth().await?;
563
564 self.client
565 .rpc(NameEndpoint { name: name.clone() })
566 .await
567 .inspect_err(|e| debug!("name endpoint error: {e}"))
568 .map_err(|_| RemoteError::InternalServerError)??;
569 self.name = Some(name);
570 Ok(())
571 }
572
573 async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> {
574 trace!("client actor send metrics");
575 self.auth().await?;
576
577 let update = encoder.export();
578 let req = PutMetrics {
580 session_id: self.session_id,
581 update,
582 };
583
584 self.client
585 .rpc(req)
586 .await
587 .map_err(|_| RemoteError::InternalServerError)??;
588
589 Ok(())
590 }
591
592 async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
593 trace!("client actor grant capability");
594 self.auth().await?;
595
596 self.client
597 .rpc(crate::protocol::GrantCap { cap })
598 .await
599 .map_err(|_| RemoteError::InternalServerError)??;
600
601 Ok(())
602 }
603
604 #[cfg(feature = "net_diagnostics")]
605 async fn put_network_diagnostics(
606 &mut self,
607 report: crate::net_diagnostics::DiagnosticsReport,
608 ) -> Result<(), Error> {
609 trace!("client actor publish network diagnostics");
610 self.auth().await?;
611
612 let req = PutNetworkDiagnostics { report };
613
614 self.client
615 .rpc(req)
616 .await
617 .map_err(|_| RemoteError::InternalServerError)??;
618
619 Ok(())
620 }
621}
622
623async fn set_name_inner(
624 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
625 name: String,
626) -> Result<(), Error> {
627 validate_name(&name)?;
628 debug!(name_len = name.len(), "calling set name");
629 let (tx, rx) = oneshot::channel();
630 message_channel
631 .send(ClientActorMessage::NameEndpoint { name, done: tx })
632 .await
633 .map_err(|_| Error::Other(anyhow!("sending name endpoint request")))?;
634 rx.await
635 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
636 .map_err(Error::Remote)
637}
638
639#[cfg(test)]
640mod tests {
641 use iroh::{Endpoint, EndpointAddr, SecretKey};
642 use temp_env_vars::temp_env_vars;
643
644 use crate::{
645 Client,
646 api_secret::ApiSecret,
647 caps::{Cap, Caps},
648 client::{API_SECRET_ENV_VAR_NAME, BuildError, ValidateNameError},
649 };
650
651 #[tokio::test]
652 #[temp_env_vars]
653 async fn test_api_key_from_env() {
654 use rand::SeedableRng;
655 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0);
657 let shared_secret = SecretKey::generate(&mut rng);
658 let fake_endpoint_id = SecretKey::generate(&mut rng).public();
659 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
660 unsafe {
661 std::env::set_var(API_SECRET_ENV_VAR_NAME, api_secret.to_string());
662 };
663
664 let endpoint = Endpoint::empty_builder().bind().await.unwrap();
665
666 let builder = Client::builder(&endpoint).api_secret_from_env().unwrap();
667
668 let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
669 assert_eq!(builder.remote, Some(fake_endpoint_addr));
670
671 let cap = builder.cap.as_ref().expect("expected capability to be set");
674 assert_eq!(cap.capability(), &Caps::new([Cap::Client]));
675 assert_eq!(cap.audience(), &endpoint.id().as_verifying_key());
676 assert_eq!(cap.issuer(), &shared_secret.public().as_verifying_key());
677 }
678
679 #[tokio::test]
682 async fn test_no_metrics_interval() {
683 use rand::SeedableRng;
684 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1);
685 let shared_secret = SecretKey::generate(&mut rng);
686 let fake_endpoint_id = SecretKey::generate(&mut rng).public();
687 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
688
689 let endpoint = Endpoint::empty_builder().bind().await.unwrap();
690
691 let client = Client::builder(&endpoint)
692 .disable_metrics_interval()
693 .api_secret(api_secret)
694 .unwrap()
695 .build()
696 .await
697 .unwrap();
698
699 let err = client.push_metrics().await;
700 assert!(err.is_err());
701 }
702
703 #[tokio::test]
704 async fn test_name() {
705 use rand::SeedableRng;
706 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0);
707 let shared_secret = SecretKey::generate(&mut rng);
708 let fake_endpoint_id = SecretKey::generate(&mut rng).public();
709 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
710
711 let endpoint = Endpoint::empty_builder().bind().await.unwrap();
712
713 let builder = Client::builder(&endpoint)
714 .name("my-node 👋")
715 .unwrap()
716 .api_secret(api_secret)
717 .unwrap();
718
719 assert_eq!(builder.name, Some("my-node 👋".to_string()));
720
721 let Err(err) = Client::builder(&endpoint).name("a") else {
722 panic!("name should fail for strings under 2 bytes");
723 };
724 assert!(matches!(
725 err.downcast_ref::<BuildError>(),
726 Some(BuildError::InvalidName(ValidateNameError::TooShort))
727 ));
728
729 let too_long_name = "👋".repeat(129);
730 let Err(err) = Client::builder(&endpoint).name(&too_long_name) else {
731 panic!("name should fail for strings over 128 bytes");
732 };
733 assert!(matches!(
734 err.downcast_ref::<BuildError>(),
735 Some(BuildError::InvalidName(ValidateNameError::TooLong))
736 ));
737 }
738}