1use std::{
2 str::FromStr,
3 sync::{Arc, RwLock},
4};
5
6use anyhow::{Result, anyhow, ensure};
7use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
8use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder};
9use irpc_iroh::IrohLazyRemoteConnection;
10use n0_error::StackResultExt;
11use n0_future::{task::AbortOnDropHandle, time::Duration};
12use rcan::Rcan;
13use tokio::sync::oneshot;
14use tracing::{debug, trace, warn};
15use uuid::Uuid;
16
17use crate::{
18 api_secret::{API_SECRET_ENV_VAR_NAME, ApiSecret},
19 caps::{Caps, DEFAULT_CAP_EXPIRY},
20 net_diagnostics::{DiagnosticsReport, checks::run_diagnostics},
21 protocol::{
22 ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutMetrics,
23 PutNetworkDiagnostics, RemoteError,
24 },
25};
26
27#[derive(Debug, Clone)]
51pub struct Client {
52 #[allow(dead_code)]
54 endpoint: Endpoint,
55 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
56 _actor_task: Arc<AbortOnDropHandle<()>>,
57}
58
59pub struct ClientBuilder {
62 #[allow(dead_code)]
63 cap_expiry: Duration,
64 cap: Option<Rcan<Caps>>,
65 endpoint: Endpoint,
66 name: Option<String>,
67 metrics_interval: Option<Duration>,
68 remote: Option<EndpointAddr>,
69 registry: Registry,
70}
71
72impl ClientBuilder {
73 pub fn new(endpoint: &Endpoint) -> Self {
74 let mut registry = Registry::default();
75 registry.register_all(endpoint.metrics());
76
77 Self {
78 cap: None,
79 cap_expiry: DEFAULT_CAP_EXPIRY,
80 endpoint: endpoint.clone(),
81 name: None,
82 metrics_interval: Some(Duration::from_secs(60)),
83 remote: None,
84 registry,
85 }
86 }
87
88 pub fn register_metrics_group(mut self, metrics_group: Arc<dyn MetricsGroup>) -> Self {
92 self.registry.register(metrics_group);
93 self
94 }
95
96 pub fn metrics_interval(mut self, interval: Duration) -> Self {
100 self.metrics_interval = Some(interval);
101 self
102 }
103
104 pub fn disable_metrics_interval(mut self) -> Self {
106 self.metrics_interval = None;
107 self
108 }
109
110 pub fn name(mut self, name: impl Into<String>) -> Result<Self> {
125 let name = name.into();
126 validate_name(&name).map_err(BuildError::InvalidName)?;
127 self.name = Some(name);
128 Ok(self)
129 }
130
131 pub fn api_secret_from_env(self) -> Result<Self> {
133 let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
134 self.api_secret(ticket)
135 }
136
137 pub fn api_secret_from_str(self, secret_key: &str) -> Result<Self> {
139 let key = ApiSecret::from_str(secret_key).context("invalid iroh services api secret")?;
140 self.api_secret(key)
141 }
142
143 pub fn api_secret(mut self, ticket: ApiSecret) -> Result<Self> {
150 let local_id = self.endpoint.id();
151 let rcan = crate::caps::create_api_token_from_secret_key(
152 ticket.secret,
153 local_id,
154 self.cap_expiry,
155 Caps::for_shared_secret(),
156 )?;
157
158 self.remote = Some(ticket.remote);
159 self.rcan(rcan)
160 }
161
162 #[cfg(not(target_arch = "wasm32"))]
166 pub async fn ssh_key_from_file<P: AsRef<std::path::Path>>(self, path: P) -> Result<Self> {
167 let file_content = tokio::fs::read_to_string(path).await?;
168 self.ssh_key(&file_content)
169 }
170
171 #[cfg(not(target_arch = "wasm32"))]
173 pub fn ssh_key(mut self, pem: &str) -> Result<Self> {
174 let local_id = self.endpoint.id();
175 let rcan = crate::caps::create_api_token_from_openssh_pem(
176 pem,
177 local_id,
178 self.cap_expiry,
179 Caps::all(),
180 )?;
181 self.cap.replace(rcan);
182
183 Ok(self)
184 }
185
186 pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
188 ensure!(
189 EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
190 "invalid audience"
191 );
192 self.cap.replace(cap);
193 Ok(self)
194 }
195
196 pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
199 self.remote = Some(remote.into());
200 self
201 }
202
203 #[must_use = "dropping the client will silently cancel all client tasks"]
205 pub async fn build(self) -> Result<Client, BuildError> {
206 debug!("starting iroh-services client");
207 let remote = self.remote.ok_or(BuildError::MissingRemote)?;
208 let capabilities = self.cap.ok_or(BuildError::MissingCapability)?;
209
210 let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
211 let irpc_client = IrohServicesClient::boxed(conn);
212
213 let (tx, rx) = tokio::sync::mpsc::channel(1);
214 let actor_task = AbortOnDropHandle::new(n0_future::task::spawn(
215 ClientActor {
216 capabilities,
217 client: irpc_client,
218 name: self.name.clone(),
219 session_id: Uuid::new_v4(),
220 authorized: false,
221 }
222 .run(self.name, self.registry, self.metrics_interval, rx),
223 ));
224
225 Ok(Client {
226 endpoint: self.endpoint,
227 message_channel: tx,
228 _actor_task: Arc::new(actor_task),
229 })
230 }
231}
232
233#[derive(thiserror::Error, Debug)]
234pub enum BuildError {
235 #[error("Missing remote endpoint to dial")]
236 MissingRemote,
237 #[error("Missing capability")]
238 MissingCapability,
239 #[error("Unauthorized")]
240 Unauthorized,
241 #[error("Remote error: {0}")]
242 Remote(#[from] RemoteError),
243 #[error("Rpc connection error: {0}")]
244 Rpc(irpc::Error),
245 #[error("Connection error: {0}")]
246 Connect(ConnectError),
247 #[error("Invalid endpoint name: {0}")]
248 InvalidName(#[from] ValidateNameError),
249}
250
251impl From<irpc::Error> for BuildError {
252 fn from(value: irpc::Error) -> Self {
253 match value {
254 irpc::Error::Request {
255 source:
256 irpc::RequestError::Connection {
257 source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
258 ..
259 },
260 ..
261 } if frame.error_code == 401u32.into() => Self::Unauthorized,
262 value => Self::Rpc(value),
263 }
264 }
265}
266
267pub const CLIENT_NAME_MIN_LENGTH: usize = 2;
269pub const CLIENT_NAME_MAX_LENGTH: usize = 128;
271
272#[derive(Debug, thiserror::Error)]
274pub enum ValidateNameError {
275 #[error("Name is too long (must be no more than {CLIENT_NAME_MAX_LENGTH} characters).")]
276 TooLong,
277 #[error("Name is too short (must be at least {CLIENT_NAME_MIN_LENGTH} characters).")]
278 TooShort,
279}
280
281fn validate_name(name: &str) -> Result<(), ValidateNameError> {
282 if name.len() < CLIENT_NAME_MIN_LENGTH {
283 Err(ValidateNameError::TooShort)
284 } else if name.len() > CLIENT_NAME_MAX_LENGTH {
285 Err(ValidateNameError::TooLong)
286 } else {
287 Ok(())
288 }
289}
290
291#[derive(thiserror::Error, Debug)]
292pub enum Error {
293 #[error("Invalid endpoint name: {0}")]
294 InvalidName(#[from] ValidateNameError),
295 #[error("Remote error: {0}")]
296 Remote(#[from] RemoteError),
297 #[error("Connection error: {0}")]
298 Rpc(#[from] irpc::Error),
299 #[error(transparent)]
300 Other(#[from] anyhow::Error),
301}
302
303impl Client {
304 pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
305 ClientBuilder::new(endpoint)
306 }
307
308 pub async fn name(&self) -> Result<Option<String>, Error> {
310 let (tx, rx) = oneshot::channel();
311 self.message_channel
312 .send(ClientActorMessage::ReadName { done: tx })
313 .await
314 .map_err(|_| Error::Other(anyhow!("sending name read request")))?;
315
316 rx.await
317 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))
318 }
319
320 pub async fn set_name(&self, name: impl Into<String>) -> Result<(), Error> {
325 set_name_inner(self.message_channel.clone(), name.into()).await
326 }
327
328 pub async fn ping(&self) -> Result<Pong, Error> {
330 let (tx, rx) = oneshot::channel();
331 self.message_channel
332 .send(ClientActorMessage::Ping { done: tx })
333 .await
334 .map_err(|_| Error::Other(anyhow!("sending ping request")))?;
335
336 rx.await
337 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
338 .map_err(Error::Remote)
339 }
340
341 pub async fn push_metrics(&self) -> Result<(), Error> {
345 let (tx, rx) = oneshot::channel();
346 self.message_channel
347 .send(ClientActorMessage::SendMetrics { done: tx })
348 .await
349 .map_err(|_| Error::Other(anyhow!("sending metrics")))?;
350
351 rx.await
352 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
353 .map_err(Error::Remote)
354 }
355
356 pub async fn grant_capability(
360 &self,
361 remote_id: EndpointId,
362 caps: impl IntoIterator<Item = impl Into<crate::caps::Cap>>,
363 ) -> Result<(), Error> {
364 let cap = crate::caps::create_grant_token(
365 self.endpoint.secret_key().clone(),
366 remote_id,
367 DEFAULT_CAP_EXPIRY,
368 Caps::new(caps),
369 )
370 .map_err(Error::Other)?;
371
372 let (tx, rx) = oneshot::channel();
373 self.message_channel
374 .send(ClientActorMessage::GrantCap {
375 cap: Box::new(cap),
376 done: tx,
377 })
378 .await
379 .map_err(|_| Error::Other(anyhow!("granting capability")))?;
380
381 rx.await
382 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
383 }
384
385 pub async fn net_diagnostics(&self, send: bool) -> Result<DiagnosticsReport, Error> {
387 let report = run_diagnostics(&self.endpoint).await?;
388 if send {
389 let (tx, rx) = oneshot::channel();
390 self.message_channel
391 .send(ClientActorMessage::PutNetworkDiagnostics {
392 done: tx,
393 report: Box::new(report.clone()),
394 })
395 .await
396 .map_err(|_| Error::Other(anyhow!("sending network diagnostics report")))?;
397
398 let _ = rx
399 .await
400 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
401 }
402
403 Ok(report)
404 }
405}
406
407enum ClientActorMessage {
408 SendMetrics {
409 done: oneshot::Sender<Result<(), RemoteError>>,
410 },
411 Ping {
412 done: oneshot::Sender<Result<Pong, RemoteError>>,
413 },
414 #[allow(dead_code)]
416 GrantCap {
417 cap: Box<Rcan<Caps>>,
419 done: oneshot::Sender<Result<(), Error>>,
420 },
421 PutNetworkDiagnostics {
422 report: Box<DiagnosticsReport>,
423 done: oneshot::Sender<Result<(), Error>>,
424 },
425 ReadName {
426 done: oneshot::Sender<Option<String>>,
427 },
428 NameEndpoint {
429 name: String,
430 done: oneshot::Sender<Result<(), RemoteError>>,
431 },
432}
433
434struct ClientActor {
435 capabilities: Rcan<Caps>,
436 client: IrohServicesClient,
437 name: Option<String>,
438 session_id: Uuid,
439 authorized: bool,
440}
441
442impl ClientActor {
443 async fn run(
444 mut self,
445 initial_name: Option<String>,
446 registry: Registry,
447 interval: Option<Duration>,
448 mut inbox: tokio::sync::mpsc::Receiver<ClientActorMessage>,
449 ) {
450 let registry = Arc::new(RwLock::new(registry));
451 let mut encoder = Encoder::new(registry);
452 let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
453 trace!("starting client actor");
454
455 if let Some(name) = initial_name
456 && let Err(err) = self.send_name_endpoint(name).await
457 {
458 warn!(err = %err, "failed setting endpoint name on startup");
459 }
460
461 loop {
462 trace!("client actor tick");
463 tokio::select! {
464 biased;
465 Some(msg) = inbox.recv() => {
466 match msg {
467 ClientActorMessage::Ping{ done } => {
468 let res = self.send_ping().await;
469 if let Err(err) = done.send(res) {
470 debug!("failed to send ping: {:#?}", err);
471 self.authorized = false;
472 }
473 },
474 ClientActorMessage::SendMetrics{ done } => {
475 trace!("sending metrics manually triggered");
476 let res = self.send_metrics(&mut encoder).await;
477 if let Err(err) = done.send(res) {
478 debug!("failed to push metrics: {:#?}", err);
479 self.authorized = false;
480 }
481 }
482 ClientActorMessage::GrantCap{ cap, done } => {
483 let res = self.grant_cap(*cap).await;
484 if let Err(err) = done.send(res) {
485 warn!("failed to grant capability: {:#?}", err);
486 }
487 }
488 ClientActorMessage::ReadName{ done } => {
489 if let Err(err) = done.send(self.name.clone()) {
490 warn!("sending name value: {:#?}", err);
491 }
492 }
493 ClientActorMessage::NameEndpoint{ name, done } => {
494 let res = self.send_name_endpoint(name).await;
495 if let Err(err) = done.send(res) {
496 warn!("failed to name endpoint: {:#?}", err);
497 }
498 }
499 ClientActorMessage::PutNetworkDiagnostics{ report, done } => {
500 let res = self.put_network_diagnostics(*report).await;
501 if let Err(err) = done.send(res) {
502 warn!("failed to publish network diagnostics: {:#?}", err);
503 }
504 }
505 }
506 }
507 _ = async {
508 if let Some(ref mut timer) = metrics_timer {
509 timer.tick().await;
510 } else {
511 std::future::pending::<()>().await;
512 }
513 } => {
514 trace!("metrics send tick");
515 if let Err(err) = self.send_metrics(&mut encoder).await {
516 debug!("failed to push metrics: {:#?}", err);
517 self.authorized = false;
518 }
519 },
520 }
521 }
522 }
523
524 async fn auth(&mut self) -> Result<(), RemoteError> {
526 if self.authorized {
527 return Ok(());
528 }
529 trace!("client authorizing");
530 self.client
531 .rpc(Auth {
532 caps: self.capabilities.clone(),
533 })
534 .await
535 .inspect_err(|e| debug!("authorization failed: {:?}", e))
536 .map_err(|e| RemoteError::AuthError(e.to_string()))?;
537 self.authorized = true;
538 Ok(())
539 }
540
541 async fn send_ping(&mut self) -> Result<Pong, RemoteError> {
542 trace!("client actor send ping");
543 self.auth().await?;
544
545 let req = rand::random();
546 self.client
547 .rpc(Ping { req_id: req })
548 .await
549 .inspect_err(|e| warn!("rpc ping error: {e}"))
550 .map_err(|_| RemoteError::InternalServerError)
551 }
552
553 async fn send_name_endpoint(&mut self, name: String) -> Result<(), RemoteError> {
554 trace!("client sending name endpoint request");
555 self.auth().await?;
556
557 self.client
558 .rpc(NameEndpoint { name: name.clone() })
559 .await
560 .inspect_err(|e| debug!("name endpoint error: {e}"))
561 .map_err(|_| RemoteError::InternalServerError)??;
562 self.name = Some(name);
563 Ok(())
564 }
565
566 async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> {
567 trace!("client actor send metrics");
568 self.auth().await?;
569
570 let update = encoder.export();
571 let req = PutMetrics {
573 session_id: self.session_id,
574 update,
575 };
576
577 self.client
578 .rpc(req)
579 .await
580 .map_err(|_| RemoteError::InternalServerError)??;
581
582 Ok(())
583 }
584
585 async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
586 trace!("client actor grant capability");
587 self.auth().await?;
588
589 self.client
590 .rpc(crate::protocol::GrantCap { cap })
591 .await
592 .map_err(|_| RemoteError::InternalServerError)??;
593
594 Ok(())
595 }
596
597 async fn put_network_diagnostics(
598 &mut self,
599 report: crate::net_diagnostics::DiagnosticsReport,
600 ) -> Result<(), Error> {
601 trace!("client actor publish network diagnostics");
602 self.auth().await?;
603
604 let req = PutNetworkDiagnostics { report };
605
606 self.client
607 .rpc(req)
608 .await
609 .map_err(|_| RemoteError::InternalServerError)??;
610
611 Ok(())
612 }
613}
614
615async fn set_name_inner(
616 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
617 name: String,
618) -> Result<(), Error> {
619 validate_name(&name)?;
620 debug!(name_len = name.len(), "calling set name");
621 let (tx, rx) = oneshot::channel();
622 message_channel
623 .send(ClientActorMessage::NameEndpoint { name, done: tx })
624 .await
625 .map_err(|_| Error::Other(anyhow!("sending name endpoint request")))?;
626 rx.await
627 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
628 .map_err(Error::Remote)
629}
630
631#[cfg(test)]
632mod tests {
633 use iroh::{Endpoint, EndpointAddr, SecretKey, endpoint::presets};
634 use rand::{RngExt, SeedableRng};
635 use temp_env_vars::temp_env_vars;
636
637 use crate::{
638 Client,
639 api_secret::ApiSecret,
640 caps::{Cap, Caps},
641 client::{API_SECRET_ENV_VAR_NAME, BuildError, ValidateNameError},
642 };
643
644 #[tokio::test]
645 #[temp_env_vars]
646 async fn test_api_key_from_env() {
647 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
649 let shared_secret = SecretKey::from_bytes(&rng.random());
650 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
651 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
652 unsafe {
653 std::env::set_var(API_SECRET_ENV_VAR_NAME, api_secret.to_string());
654 };
655
656 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
657
658 let builder = Client::builder(&endpoint).api_secret_from_env().unwrap();
659
660 let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
661 assert_eq!(builder.remote, Some(fake_endpoint_addr));
662
663 let cap = builder.cap.as_ref().expect("expected capability to be set");
666 assert_eq!(cap.capability(), &Caps::new([Cap::Client]));
667 assert_eq!(cap.audience(), &endpoint.id().as_verifying_key());
668 assert_eq!(cap.issuer(), &shared_secret.public().as_verifying_key());
669 }
670
671 #[tokio::test]
674 async fn test_no_metrics_interval() {
675 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(1);
676 let shared_secret = SecretKey::from_bytes(&rng.random());
677 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
678 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
679
680 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
681
682 let client = Client::builder(&endpoint)
683 .disable_metrics_interval()
684 .api_secret(api_secret)
685 .unwrap()
686 .build()
687 .await
688 .unwrap();
689
690 let err = client.push_metrics().await;
691 assert!(err.is_err());
692 }
693
694 #[tokio::test]
695 async fn test_name() {
696 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
697 let shared_secret = SecretKey::from_bytes(&rng.random());
698 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
699 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
700
701 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
702
703 let builder = Client::builder(&endpoint)
704 .name("my-node 👋")
705 .unwrap()
706 .api_secret(api_secret)
707 .unwrap();
708
709 assert_eq!(builder.name, Some("my-node 👋".to_string()));
710
711 let Err(err) = Client::builder(&endpoint).name("a") else {
712 panic!("name should fail for strings under 2 bytes");
713 };
714 assert!(matches!(
715 err.downcast_ref::<BuildError>(),
716 Some(BuildError::InvalidName(ValidateNameError::TooShort))
717 ));
718
719 let too_long_name = "👋".repeat(129);
720 let Err(err) = Client::builder(&endpoint).name(&too_long_name) else {
721 panic!("name should fail for strings over 128 bytes");
722 };
723 assert!(matches!(
724 err.downcast_ref::<BuildError>(),
725 Some(BuildError::InvalidName(ValidateNameError::TooLong))
726 ));
727 }
728}