1use std::{
2 collections::BTreeMap,
3 str::FromStr,
4 sync::{Arc, RwLock},
5};
6
7use anyhow::{Result, anyhow, ensure};
8use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::ConnectError};
9use iroh_metrics::{MetricsGroup, Registry, encoding::Encoder};
10use irpc_iroh::IrohLazyRemoteConnection;
11use n0_error::StackResultExt;
12use n0_future::{task::AbortOnDropHandle, time::Duration};
13use rcan::Rcan;
14use tokio::sync::oneshot;
15use tracing::{debug, trace, warn};
16use uuid::Uuid;
17
18use crate::{
19 api_secret::ApiSecret,
20 caps::Caps,
21 net_diagnostics::{DiagnosticsReport, checks::run_diagnostics},
22 protocol::{
23 ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutMetrics,
24 PutNetworkDiagnostics, RemoteError, SetAttributes, SetGroup,
25 },
26};
27
28#[derive(Debug, Clone)]
52pub struct Client {
53 #[allow(dead_code)]
55 endpoint: Endpoint,
56 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
57 _actor_task: Arc<AbortOnDropHandle<()>>,
58}
59
60pub struct ClientBuilder {
63 #[allow(dead_code)]
64 cap_expiry: Duration,
65 cap: Option<Rcan<Caps>>,
66 endpoint: Endpoint,
67 name: Option<String>,
68 group: Option<String>,
69 attributes: Option<BTreeMap<String, String>>,
70 metrics_interval: Option<Duration>,
71 remote: Option<EndpointAddr>,
72 registry: Registry,
73}
74
75const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET";
77
78impl ClientBuilder {
79 pub fn new(endpoint: &Endpoint) -> Self {
80 let mut registry = Registry::default();
81 registry.register_all(endpoint.metrics());
82
83 Self {
84 cap: None,
85 cap_expiry: DEFAULT_CAP_EXPIRY,
86 endpoint: endpoint.clone(),
87 name: None,
88 group: None,
89 attributes: None,
90 metrics_interval: Some(Duration::from_secs(60)),
91 remote: None,
92 registry,
93 }
94 }
95
96 pub fn register_metrics_group(mut self, metrics_group: Arc<dyn MetricsGroup>) -> Self {
100 self.registry.register(metrics_group);
101 self
102 }
103
104 pub fn metrics_interval(mut self, interval: Duration) -> Self {
108 self.metrics_interval = Some(interval);
109 self
110 }
111
112 pub fn disable_metrics_interval(mut self) -> Self {
114 self.metrics_interval = None;
115 self
116 }
117
118 pub fn name(mut self, name: impl Into<String>) -> Result<Self> {
133 let name = name.into();
134 validate_name(&name).map_err(BuildError::InvalidName)?;
135 self.name = Some(name);
136 Ok(self)
137 }
138
139 pub fn group(mut self, group: impl Into<String>) -> Result<Self> {
144 let group = group.into();
145 validate_name(&group).map_err(BuildError::InvalidGroup)?;
146 self.group = Some(group);
147 Ok(self)
148 }
149
150 pub fn attributes<I, K, V>(mut self, attrs: I) -> Result<Self>
166 where
167 I: IntoIterator<Item = (K, V)>,
168 K: Into<String>,
169 V: Into<String>,
170 {
171 let collected: BTreeMap<String, String> = attrs
172 .into_iter()
173 .map(|(k, v)| (k.into(), v.into()))
174 .collect();
175 validate_attributes(&collected).map_err(BuildError::InvalidAttributes)?;
176 self.attributes = Some(collected);
177 Ok(self)
178 }
179
180 pub fn api_secret_from_env(self) -> Result<Self> {
182 let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
183 self.api_secret(ticket)
184 }
185
186 pub fn api_secret_from_str(self, secret_key: &str) -> Result<Self> {
188 let key = ApiSecret::from_str(secret_key).context("invalid iroh services api secret")?;
189 self.api_secret(key)
190 }
191
192 pub fn api_secret(mut self, ticket: ApiSecret) -> Result<Self> {
199 let local_id = self.endpoint.id();
200 let rcan = crate::caps::create_api_token_from_secret_key(
201 ticket.secret,
202 local_id,
203 self.cap_expiry,
204 Caps::for_shared_secret(),
205 )?;
206
207 self.remote = Some(ticket.remote);
208 self.rcan(rcan)
209 }
210
211 #[cfg(not(target_arch = "wasm32"))]
215 pub async fn ssh_key_from_file<P: AsRef<std::path::Path>>(self, path: P) -> Result<Self> {
216 let file_content = tokio::fs::read_to_string(path).await?;
217 self.ssh_key(&file_content)
218 }
219
220 #[cfg(not(target_arch = "wasm32"))]
222 pub fn ssh_key(mut self, pem: &str) -> Result<Self> {
223 let local_id = self.endpoint.id();
224 let rcan = crate::caps::create_api_token_from_openssh_pem(
225 pem,
226 local_id,
227 self.cap_expiry,
228 Caps::all(),
229 )?;
230 self.cap.replace(rcan);
231
232 Ok(self)
233 }
234
235 pub fn rcan(mut self, cap: Rcan<Caps>) -> Result<Self> {
237 ensure!(
238 EndpointId::from_verifying_key(*cap.audience()) == self.endpoint.id(),
239 "invalid audience"
240 );
241 self.cap.replace(cap);
242 Ok(self)
243 }
244
245 pub fn remote(mut self, remote: impl Into<EndpointAddr>) -> Self {
248 self.remote = Some(remote.into());
249 self
250 }
251
252 #[must_use = "dropping the client will silently cancel all client tasks"]
254 pub async fn build(self) -> Result<Client, BuildError> {
255 debug!("starting iroh-services client");
256 let remote = self.remote.ok_or(BuildError::MissingRemote)?;
257 let capabilities = self.cap.ok_or(BuildError::MissingCapability)?;
258
259 let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
260 let irpc_client = IrohServicesClient::boxed(conn);
261
262 let (tx, rx) = tokio::sync::mpsc::channel(1);
263 let actor_task = AbortOnDropHandle::new(n0_future::task::spawn(
264 ClientActor {
265 capabilities,
266 client: irpc_client,
267 name: self.name.clone(),
268 group: self.group.clone(),
269 attributes: self.attributes.clone().unwrap_or_default(),
270 session_id: Uuid::new_v4(),
271 authorized: false,
272 }
273 .run(
274 self.name,
275 self.group,
276 self.attributes,
277 self.registry,
278 self.metrics_interval,
279 rx,
280 ),
281 ));
282
283 Ok(Client {
284 endpoint: self.endpoint,
285 message_channel: tx,
286 _actor_task: Arc::new(actor_task),
287 })
288 }
289}
290
291#[derive(thiserror::Error, Debug)]
292pub enum BuildError {
293 #[error("Missing remote endpoint to dial")]
294 MissingRemote,
295 #[error("Missing capability")]
296 MissingCapability,
297 #[error("Unauthorized")]
298 Unauthorized,
299 #[error("Remote error: {0}")]
300 Remote(#[from] RemoteError),
301 #[error("Rpc connection error: {0}")]
302 Rpc(irpc::Error),
303 #[error("Connection error: {0}")]
304 Connect(ConnectError),
305 #[error("Invalid endpoint name: {0}")]
306 InvalidName(#[from] ValidateNameError),
307 #[error("Invalid endpoint group: {0}")]
308 InvalidGroup(ValidateNameError),
309 #[error("Invalid endpoint attributes: {0}")]
310 InvalidAttributes(#[from] ValidateAttributesError),
311}
312
313impl From<irpc::Error> for BuildError {
314 fn from(value: irpc::Error) -> Self {
315 match value {
316 irpc::Error::Request {
317 source:
318 irpc::RequestError::Connection {
319 source: iroh::endpoint::ConnectionError::ApplicationClosed(frame),
320 ..
321 },
322 ..
323 } if frame.error_code == 401u32.into() => Self::Unauthorized,
324 value => Self::Rpc(value),
325 }
326 }
327}
328
329pub const CLIENT_NAME_MIN_LENGTH: usize = 2;
331pub const CLIENT_NAME_MAX_LENGTH: usize = 128;
333
334#[derive(Debug, thiserror::Error)]
336pub enum ValidateNameError {
337 #[error("Name is too long (must be no more than {CLIENT_NAME_MAX_LENGTH} characters).")]
338 TooLong,
339 #[error("Name is too short (must be at least {CLIENT_NAME_MIN_LENGTH} characters).")]
340 TooShort,
341}
342
343fn validate_name(name: &str) -> Result<(), ValidateNameError> {
344 if name.len() < CLIENT_NAME_MIN_LENGTH {
345 Err(ValidateNameError::TooShort)
346 } else if name.len() > CLIENT_NAME_MAX_LENGTH {
347 Err(ValidateNameError::TooLong)
348 } else {
349 Ok(())
350 }
351}
352
353pub const CLIENT_ATTRIBUTE_VALUE_MAX_LENGTH: usize = 128;
355pub const CLIENT_ATTRIBUTES_MAX_COUNT: usize = 128;
357
358#[derive(Debug, thiserror::Error)]
360pub enum ValidateAttributesError {
361 #[error("Too many attributes (must be no more than {CLIENT_ATTRIBUTES_MAX_COUNT}).")]
362 TooManyEntries,
363 #[error("Invalid attribute key: {0}")]
364 InvalidKey(#[from] ValidateNameError),
365 #[error(
366 "Attribute value too long (must be no more than {CLIENT_ATTRIBUTE_VALUE_MAX_LENGTH} bytes)."
367 )]
368 ValueTooLong,
369}
370
371fn validate_attributes(attrs: &BTreeMap<String, String>) -> Result<(), ValidateAttributesError> {
372 if attrs.len() > CLIENT_ATTRIBUTES_MAX_COUNT {
373 return Err(ValidateAttributesError::TooManyEntries);
374 }
375 for (k, v) in attrs {
376 validate_name(k)?;
377 if v.len() > CLIENT_ATTRIBUTE_VALUE_MAX_LENGTH {
378 return Err(ValidateAttributesError::ValueTooLong);
379 }
380 }
381 Ok(())
382}
383
384#[derive(thiserror::Error, Debug)]
385pub enum Error {
386 #[error("Invalid endpoint name: {0}")]
387 InvalidName(#[from] ValidateNameError),
388 #[error("Invalid endpoint group: {0}")]
389 InvalidGroup(ValidateNameError),
390 #[error("Invalid endpoint attributes: {0}")]
391 InvalidAttributes(#[from] ValidateAttributesError),
392 #[error("Remote error: {0}")]
393 Remote(#[from] RemoteError),
394 #[error("Connection error: {0}")]
395 Rpc(#[from] irpc::Error),
396 #[error(transparent)]
397 Other(#[from] anyhow::Error),
398}
399
400impl Client {
401 pub fn builder(endpoint: &Endpoint) -> ClientBuilder {
402 ClientBuilder::new(endpoint)
403 }
404
405 pub async fn name(&self) -> Result<Option<String>, Error> {
407 let (tx, rx) = oneshot::channel();
408 self.message_channel
409 .send(ClientActorMessage::ReadName { done: tx })
410 .await
411 .map_err(|_| Error::Other(anyhow!("sending name read request")))?;
412
413 rx.await
414 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))
415 }
416
417 pub async fn set_name(&self, name: impl Into<String>) -> Result<(), Error> {
422 set_name_inner(self.message_channel.clone(), name.into()).await
423 }
424
425 pub async fn set_group(&self, group: impl Into<String>) -> Result<(), Error> {
430 set_group_inner(self.message_channel.clone(), group.into()).await
431 }
432
433 pub async fn set_attributes<I, K, V>(&self, attrs: I) -> Result<(), Error>
452 where
453 I: IntoIterator<Item = (K, V)>,
454 K: Into<String>,
455 V: Into<String>,
456 {
457 let collected: BTreeMap<String, String> = attrs
458 .into_iter()
459 .map(|(k, v)| (k.into(), v.into()))
460 .collect();
461 set_attributes_inner(self.message_channel.clone(), collected).await
462 }
463
464 pub async fn ping(&self) -> Result<Pong, Error> {
466 let (tx, rx) = oneshot::channel();
467 self.message_channel
468 .send(ClientActorMessage::Ping { done: tx })
469 .await
470 .map_err(|_| Error::Other(anyhow!("sending ping request")))?;
471
472 rx.await
473 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
474 .map_err(Error::Remote)
475 }
476
477 pub async fn push_metrics(&self) -> Result<(), Error> {
481 let (tx, rx) = oneshot::channel();
482 self.message_channel
483 .send(ClientActorMessage::SendMetrics { done: tx })
484 .await
485 .map_err(|_| Error::Other(anyhow!("sending metrics")))?;
486
487 rx.await
488 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
489 .map_err(Error::Remote)
490 }
491
492 pub async fn grant_capability(
496 &self,
497 remote_id: EndpointId,
498 caps: impl IntoIterator<Item = impl Into<crate::caps::Cap>>,
499 ) -> Result<(), Error> {
500 let cap = crate::caps::create_grant_token(
501 self.endpoint.secret_key().clone(),
502 remote_id,
503 DEFAULT_CAP_EXPIRY,
504 Caps::new(caps),
505 )
506 .map_err(Error::Other)?;
507
508 let (tx, rx) = oneshot::channel();
509 self.message_channel
510 .send(ClientActorMessage::GrantCap {
511 cap: Box::new(cap),
512 done: tx,
513 })
514 .await
515 .map_err(|_| Error::Other(anyhow!("granting capability")))?;
516
517 rx.await
518 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
519 }
520
521 pub async fn net_diagnostics(&self, send: bool) -> Result<DiagnosticsReport, Error> {
523 let report = run_diagnostics(&self.endpoint).await?;
524 if send {
525 let (tx, rx) = oneshot::channel();
526 self.message_channel
527 .send(ClientActorMessage::PutNetworkDiagnostics {
528 done: tx,
529 report: Box::new(report.clone()),
530 })
531 .await
532 .map_err(|_| Error::Other(anyhow!("sending network diagnostics report")))?;
533
534 let _ = rx
535 .await
536 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?;
537 }
538
539 Ok(report)
540 }
541}
542
543enum ClientActorMessage {
544 SendMetrics {
545 done: oneshot::Sender<Result<(), RemoteError>>,
546 },
547 Ping {
548 done: oneshot::Sender<Result<Pong, RemoteError>>,
549 },
550 #[allow(dead_code)]
552 GrantCap {
553 cap: Box<Rcan<Caps>>,
555 done: oneshot::Sender<Result<(), Error>>,
556 },
557 PutNetworkDiagnostics {
558 report: Box<DiagnosticsReport>,
559 done: oneshot::Sender<Result<(), Error>>,
560 },
561 ReadName {
562 done: oneshot::Sender<Option<String>>,
563 },
564 NameEndpoint {
565 name: String,
566 done: oneshot::Sender<Result<(), RemoteError>>,
567 },
568 SetGroup {
569 group: String,
570 done: oneshot::Sender<Result<(), RemoteError>>,
571 },
572 SetAttributes {
573 attributes: BTreeMap<String, String>,
574 done: oneshot::Sender<Result<(), RemoteError>>,
575 },
576}
577
578struct ClientActor {
579 capabilities: Rcan<Caps>,
580 client: IrohServicesClient,
581 name: Option<String>,
582 group: Option<String>,
583 attributes: BTreeMap<String, String>,
584 session_id: Uuid,
585 authorized: bool,
586}
587
588impl ClientActor {
589 async fn run(
590 mut self,
591 initial_name: Option<String>,
592 initial_group: Option<String>,
593 initial_attributes: Option<BTreeMap<String, String>>,
594 registry: Registry,
595 interval: Option<Duration>,
596 mut inbox: tokio::sync::mpsc::Receiver<ClientActorMessage>,
597 ) {
598 let registry = Arc::new(RwLock::new(registry));
599 let mut encoder = Encoder::new(registry);
600 let mut metrics_timer = interval.map(|interval| n0_future::time::interval(interval));
601 trace!("starting client actor");
602
603 if let Some(name) = initial_name
604 && let Err(err) = self.send_name_endpoint(name).await
605 {
606 warn!(err = %err, "failed setting endpoint name on startup");
607 }
608
609 if let Some(group) = initial_group
610 && let Err(err) = self.send_set_group(group).await
611 {
612 warn!(err = %err, "failed setting endpoint group on startup");
613 }
614
615 if let Some(attributes) = initial_attributes
616 && let Err(err) = self.send_set_attributes(attributes).await
617 {
618 warn!(err = %err, "failed setting endpoint attributes on startup");
619 }
620
621 loop {
622 trace!("client actor tick");
623 tokio::select! {
624 biased;
625 Some(msg) = inbox.recv() => {
626 match msg {
627 ClientActorMessage::Ping{ done } => {
628 let res = self.send_ping().await;
629 if let Err(err) = done.send(res) {
630 debug!("failed to send ping: {:#?}", err);
631 self.authorized = false;
632 }
633 },
634 ClientActorMessage::SendMetrics{ done } => {
635 trace!("sending metrics manually triggered");
636 let res = self.send_metrics(&mut encoder).await;
637 if let Err(err) = done.send(res) {
638 debug!("failed to push metrics: {:#?}", err);
639 self.authorized = false;
640 }
641 }
642 ClientActorMessage::GrantCap{ cap, done } => {
643 let res = self.grant_cap(*cap).await;
644 if let Err(err) = done.send(res) {
645 warn!("failed to grant capability: {:#?}", err);
646 }
647 }
648 ClientActorMessage::ReadName{ done } => {
649 if let Err(err) = done.send(self.name.clone()) {
650 warn!("sending name value: {:#?}", err);
651 }
652 }
653 ClientActorMessage::NameEndpoint{ name, done } => {
654 let res = self.send_name_endpoint(name).await;
655 if let Err(err) = done.send(res) {
656 warn!("failed to name endpoint: {:#?}", err);
657 }
658 }
659 ClientActorMessage::SetGroup{ group, done } => {
660 let res = self.send_set_group(group).await;
661 if let Err(err) = done.send(res) {
662 warn!("failed to set group: {:#?}", err);
663 }
664 }
665 ClientActorMessage::SetAttributes{ attributes, done } => {
666 let res = self.send_set_attributes(attributes).await;
667 if let Err(err) = done.send(res) {
668 warn!("failed to set attributes: {:#?}", err);
669 }
670 }
671 ClientActorMessage::PutNetworkDiagnostics{ report, done } => {
672 let res = self.put_network_diagnostics(*report).await;
673 if let Err(err) = done.send(res) {
674 warn!("failed to publish network diagnostics: {:#?}", err);
675 }
676 }
677 }
678 }
679 _ = async {
680 if let Some(ref mut timer) = metrics_timer {
681 timer.tick().await;
682 } else {
683 std::future::pending::<()>().await;
684 }
685 } => {
686 trace!("metrics send tick");
687 if let Err(err) = self.send_metrics(&mut encoder).await {
688 debug!("failed to push metrics: {:#?}", err);
689 self.authorized = false;
690 }
691 },
692 }
693 }
694 }
695
696 async fn auth(&mut self) -> Result<(), RemoteError> {
698 if self.authorized {
699 return Ok(());
700 }
701 trace!("client authorizing");
702 self.client
703 .rpc(Auth {
704 caps: self.capabilities.clone(),
705 })
706 .await
707 .inspect_err(|e| debug!("authorization failed: {:?}", e))
708 .map_err(|e| RemoteError::AuthError(e.to_string()))?;
709 self.authorized = true;
710 Ok(())
711 }
712
713 async fn send_ping(&mut self) -> Result<Pong, RemoteError> {
714 trace!("client actor send ping");
715 self.auth().await?;
716
717 let req = rand::random();
718 self.client
719 .rpc(Ping { req_id: req })
720 .await
721 .inspect_err(|e| warn!("rpc ping error: {e}"))
722 .map_err(|_| RemoteError::InternalServerError)
723 }
724
725 async fn send_name_endpoint(&mut self, name: String) -> Result<(), RemoteError> {
726 trace!("client sending name endpoint request");
727 self.auth().await?;
728
729 self.client
730 .rpc(NameEndpoint { name: name.clone() })
731 .await
732 .inspect_err(|e| debug!("name endpoint error: {e}"))
733 .map_err(|_| RemoteError::InternalServerError)??;
734 self.name = Some(name);
735 Ok(())
736 }
737
738 async fn send_set_group(&mut self, group: String) -> Result<(), RemoteError> {
739 trace!("client sending set group request");
740 self.auth().await?;
741
742 self.client
743 .rpc(SetGroup {
744 group: group.clone(),
745 })
746 .await
747 .inspect_err(|e| debug!("set group error: {e}"))
748 .map_err(|_| RemoteError::InternalServerError)??;
749 self.group = Some(group);
750 Ok(())
751 }
752
753 async fn send_set_attributes(
754 &mut self,
755 attributes: BTreeMap<String, String>,
756 ) -> Result<(), RemoteError> {
757 trace!("client sending set attributes request");
758 self.auth().await?;
759
760 self.client
761 .rpc(SetAttributes {
762 attributes: attributes.clone(),
763 })
764 .await
765 .inspect_err(|e| debug!("set attributes error: {e}"))
766 .map_err(|_| RemoteError::InternalServerError)??;
767 self.attributes = attributes;
768 Ok(())
769 }
770
771 async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> {
772 trace!("client actor send metrics");
773 self.auth().await?;
774
775 let update = encoder.export();
776 let req = PutMetrics {
778 session_id: self.session_id,
779 update,
780 };
781
782 self.client
783 .rpc(req)
784 .await
785 .map_err(|_| RemoteError::InternalServerError)??;
786
787 Ok(())
788 }
789
790 async fn grant_cap(&mut self, cap: Rcan<Caps>) -> Result<(), Error> {
791 trace!("client actor grant capability");
792 self.auth().await?;
793
794 self.client
795 .rpc(crate::protocol::GrantCap { cap })
796 .await
797 .map_err(|_| RemoteError::InternalServerError)??;
798
799 Ok(())
800 }
801
802 async fn put_network_diagnostics(
803 &mut self,
804 report: crate::net_diagnostics::DiagnosticsReport,
805 ) -> Result<(), Error> {
806 trace!("client actor publish network diagnostics");
807 self.auth().await?;
808
809 let req = PutNetworkDiagnostics { report };
810
811 self.client
812 .rpc(req)
813 .await
814 .map_err(|_| RemoteError::InternalServerError)??;
815
816 Ok(())
817 }
818}
819
820async fn set_name_inner(
821 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
822 name: String,
823) -> Result<(), Error> {
824 validate_name(&name)?;
825 debug!(name_len = name.len(), "calling set name");
826 let (tx, rx) = oneshot::channel();
827 message_channel
828 .send(ClientActorMessage::NameEndpoint { name, done: tx })
829 .await
830 .map_err(|_| Error::Other(anyhow!("sending name endpoint request")))?;
831 rx.await
832 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
833 .map_err(Error::Remote)
834}
835
836async fn set_group_inner(
837 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
838 group: String,
839) -> Result<(), Error> {
840 validate_name(&group).map_err(Error::InvalidGroup)?;
841 debug!(group_len = group.len(), "calling set group");
842 let (tx, rx) = oneshot::channel();
843 message_channel
844 .send(ClientActorMessage::SetGroup { group, done: tx })
845 .await
846 .map_err(|_| Error::Other(anyhow!("sending set group request")))?;
847 rx.await
848 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
849 .map_err(Error::Remote)
850}
851
852async fn set_attributes_inner(
853 message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
854 attributes: BTreeMap<String, String>,
855) -> Result<(), Error> {
856 validate_attributes(&attributes)?;
857 debug!(attr_count = attributes.len(), "calling set attributes");
858 let (tx, rx) = oneshot::channel();
859 message_channel
860 .send(ClientActorMessage::SetAttributes {
861 attributes,
862 done: tx,
863 })
864 .await
865 .map_err(|_| Error::Other(anyhow!("sending set attributes request")))?;
866 rx.await
867 .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))?
868 .map_err(Error::Remote)
869}
870
871#[cfg(test)]
872mod tests {
873 use std::collections::HashMap;
874
875 use iroh::{Endpoint, EndpointAddr, SecretKey, endpoint::presets};
876 use rand::{RngExt, SeedableRng};
877 use temp_env_vars::temp_env_vars;
878
879 use crate::{
880 Client,
881 api_secret::ApiSecret,
882 caps::{Cap, Caps},
883 client::{
884 API_SECRET_ENV_VAR_NAME, BuildError, CLIENT_ATTRIBUTES_MAX_COUNT,
885 ValidateAttributesError, ValidateNameError,
886 },
887 };
888
889 #[tokio::test]
890 #[temp_env_vars]
891 async fn test_api_key_from_env() {
892 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
894 let shared_secret = SecretKey::from_bytes(&rng.random());
895 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
896 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
897 unsafe {
898 std::env::set_var(API_SECRET_ENV_VAR_NAME, api_secret.to_string());
899 };
900
901 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
902
903 let builder = Client::builder(&endpoint).api_secret_from_env().unwrap();
904
905 let fake_endpoint_addr: EndpointAddr = fake_endpoint_id.into();
906 assert_eq!(builder.remote, Some(fake_endpoint_addr));
907
908 let cap = builder.cap.as_ref().expect("expected capability to be set");
911 assert_eq!(cap.capability(), &Caps::new([Cap::Client]));
912 assert_eq!(cap.audience(), &endpoint.id().as_verifying_key());
913 assert_eq!(cap.issuer(), &shared_secret.public().as_verifying_key());
914 }
915
916 #[tokio::test]
919 async fn test_no_metrics_interval() {
920 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(1);
921 let shared_secret = SecretKey::from_bytes(&rng.random());
922 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
923 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
924
925 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
926
927 let client = Client::builder(&endpoint)
928 .disable_metrics_interval()
929 .api_secret(api_secret)
930 .unwrap()
931 .build()
932 .await
933 .unwrap();
934
935 let err = client.push_metrics().await;
936 assert!(err.is_err());
937 }
938
939 #[tokio::test]
940 async fn test_name() {
941 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
942 let shared_secret = SecretKey::from_bytes(&rng.random());
943 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
944 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
945
946 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
947
948 let builder = Client::builder(&endpoint)
949 .name("my-node 👋")
950 .unwrap()
951 .api_secret(api_secret)
952 .unwrap();
953
954 assert_eq!(builder.name, Some("my-node 👋".to_string()));
955
956 let Err(err) = Client::builder(&endpoint).name("a") else {
957 panic!("name should fail for strings under 2 bytes");
958 };
959 assert!(matches!(
960 err.downcast_ref::<BuildError>(),
961 Some(BuildError::InvalidName(ValidateNameError::TooShort))
962 ));
963
964 let too_long_name = "👋".repeat(129);
965 let Err(err) = Client::builder(&endpoint).name(&too_long_name) else {
966 panic!("name should fail for strings over 128 bytes");
967 };
968 assert!(matches!(
969 err.downcast_ref::<BuildError>(),
970 Some(BuildError::InvalidName(ValidateNameError::TooLong))
971 ));
972 }
973
974 #[tokio::test]
975 async fn test_group() {
976 let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0);
977 let shared_secret = SecretKey::from_bytes(&rng.random());
978 let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public();
979 let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id);
980
981 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
982
983 let builder = Client::builder(&endpoint)
984 .group("staging")
985 .unwrap()
986 .api_secret(api_secret)
987 .unwrap();
988
989 assert_eq!(builder.group, Some("staging".to_string()));
990
991 let Err(err) = Client::builder(&endpoint).group("a") else {
992 panic!("group should fail for strings under 2 bytes");
993 };
994 assert!(matches!(
995 err.downcast_ref::<BuildError>(),
996 Some(BuildError::InvalidGroup(ValidateNameError::TooShort))
997 ));
998
999 let too_long_group = "👋".repeat(129);
1000 let Err(err) = Client::builder(&endpoint).group(&too_long_group) else {
1001 panic!("group should fail for strings over 128 bytes");
1002 };
1003 assert!(matches!(
1004 err.downcast_ref::<BuildError>(),
1005 Some(BuildError::InvalidGroup(ValidateNameError::TooLong))
1006 ));
1007 }
1008
1009 #[tokio::test]
1010 async fn test_attributes() {
1011 let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap();
1012
1013 let builder = Client::builder(&endpoint)
1015 .attributes(std::iter::empty::<(String, String)>())
1016 .unwrap();
1017 assert_eq!(builder.attributes.as_ref().map(|m| m.len()), Some(0));
1018
1019 let builder = Client::builder(&endpoint)
1021 .attributes([("env", "prod"), ("region", "us-west")])
1022 .unwrap();
1023 let attrs = builder.attributes.as_ref().expect("attributes set");
1024 assert_eq!(attrs.get("env").map(String::as_str), Some("prod"));
1025 assert_eq!(attrs.get("region").map(String::as_str), Some("us-west"));
1026
1027 let mut map: HashMap<String, String> = HashMap::new();
1029 map.insert("k1".into(), "v1".into());
1030 map.insert("k2".into(), "".into()); let builder = Client::builder(&endpoint).attributes(map).unwrap();
1032 let attrs = builder.attributes.as_ref().expect("attributes set");
1033 assert_eq!(attrs.get("k2").map(String::as_str), Some(""));
1034
1035 let too_long_value = "x".repeat(129);
1037 let Err(err) = Client::builder(&endpoint).attributes([("ok", too_long_value.as_str())])
1038 else {
1039 panic!("attributes should fail for value over 128 bytes");
1040 };
1041 assert!(matches!(
1042 err.downcast_ref::<BuildError>(),
1043 Some(BuildError::InvalidAttributes(
1044 ValidateAttributesError::ValueTooLong
1045 ))
1046 ));
1047
1048 let Err(err) = Client::builder(&endpoint).attributes([("a", "v")]) else {
1050 panic!("attributes should fail for key under 2 bytes");
1051 };
1052 assert!(matches!(
1053 err.downcast_ref::<BuildError>(),
1054 Some(BuildError::InvalidAttributes(
1055 ValidateAttributesError::InvalidKey(ValidateNameError::TooShort)
1056 ))
1057 ));
1058
1059 let big: Vec<(String, String)> = (0..(CLIENT_ATTRIBUTES_MAX_COUNT + 1))
1061 .map(|i| (format!("key_{i:04}"), format!("val_{i}")))
1062 .collect();
1063 let Err(err) = Client::builder(&endpoint).attributes(big) else {
1064 panic!("attributes should fail for more than 128 entries");
1065 };
1066 assert!(matches!(
1067 err.downcast_ref::<BuildError>(),
1068 Some(BuildError::InvalidAttributes(
1069 ValidateAttributesError::TooManyEntries
1070 ))
1071 ));
1072 }
1073}