Skip to main content

iroh_services/
client_host.rs

1use anyhow::{Result, ensure};
2use iroh::{
3    Endpoint, EndpointId,
4    endpoint::Connection,
5    protocol::{AcceptError, ProtocolHandler},
6};
7use irpc::WithChannels;
8use irpc_iroh::read_request;
9use n0_error::AnyError;
10use n0_future::time::Duration;
11use rcan::{Capability, CapabilityOrigin, Rcan};
12use tracing::{debug, warn};
13
14use crate::{
15    caps::{Caps, LogsCap, NetDiagnosticsCap},
16    logs::LogCollector,
17    protocol::{ClientHostMessage, ClientHostProtocol, RemoteError},
18};
19
20/// The ALPN for sending messages from the cloud node to the client.
21pub const CLIENT_HOST_ALPN: &[u8] = b"n0/n0des-client-host/1";
22
23pub type ClientHostClient = irpc::Client<ClientHostProtocol>;
24
25/// Protocol handler for cloud-to-endpoint connections.
26#[derive(Debug, Clone)]
27pub struct ClientHost {
28    endpoint: Endpoint,
29    log_collector: Option<LogCollector>,
30}
31
32impl ProtocolHandler for ClientHost {
33    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
34        self.handle_connection(connection).await.map_err(|e| {
35            let boxed: Box<dyn std::error::Error + Send + Sync> = e.into();
36            AcceptError::from(AnyError::from(boxed))
37        })
38    }
39}
40
41impl ClientHost {
42    pub fn new(endpoint: &Endpoint) -> Self {
43        Self {
44            endpoint: endpoint.clone(),
45            log_collector: None,
46        }
47    }
48
49    /// Enables the cloud to set the log level filter at runtime via the
50    /// [`SetLogLevel`] callback.
51    ///
52    /// Without a collector the handler still accepts the message but responds
53    /// with [`RemoteError::AuthError`] indicating the feature is disabled.
54    ///
55    /// [`SetLogLevel`]: crate::protocol::SetLogLevel
56    pub fn with_log_collector(mut self, collector: LogCollector) -> Self {
57        self.log_collector = Some(collector);
58        self
59    }
60
61    async fn handle_connection(&self, connection: Connection) -> Result<()> {
62        let remote_node_id = connection.remote_id();
63        let Some(first_request) = read_request::<ClientHostProtocol>(&connection).await? else {
64            return Ok(());
65        };
66
67        let ClientHostMessage::Auth(WithChannels { inner, tx, .. }) = first_request else {
68            debug!(remote_node_id = %remote_node_id.fmt_short(), "Expected initial auth message");
69            connection.close(400u32.into(), b"Expected initial auth message");
70            return Ok(());
71        };
72        let rcan = inner.caps;
73        let capability = rcan.capability();
74
75        let res = verify_rcan(&self.endpoint, remote_node_id, &rcan);
76        match res {
77            Ok(()) => tx.send(()).await?,
78            Err(err) => {
79                warn!("authentication failed: {err:?}");
80                connection.close(401u32.into(), b"Unauthorized");
81                return Ok(());
82            }
83        }
84
85        // Read exactly one callback request
86        let Some(request) = read_request::<ClientHostProtocol>(&connection).await? else {
87            return Ok(());
88        };
89
90        match request {
91            ClientHostMessage::Auth(_) => {
92                connection.close(400u32.into(), b"Unexpected auth message");
93                anyhow::bail!("unexpected auth message");
94            }
95            ClientHostMessage::RunNetworkDiagnostics(msg) => {
96                let WithChannels { tx, .. } = msg;
97                let needed_caps = Caps::new([NetDiagnosticsCap::GetAny]);
98                if !capability.permits(&needed_caps) {
99                    return send_missing_caps(tx, needed_caps).await;
100                }
101
102                let report =
103                    crate::net_diagnostics::checks::run_diagnostics(&self.endpoint).await?;
104                tx.send(Ok(report))
105                    .await
106                    .inspect_err(|e| warn!("sending network diagnostics response: {:?}", e))?;
107            }
108            ClientHostMessage::SetLogLevel(msg) => {
109                let WithChannels { inner, tx, .. } = msg;
110                let needed_caps = Caps::new([LogsCap::SetLevel]);
111                if !capability.permits(&needed_caps) {
112                    return send_missing_caps(tx, needed_caps).await;
113                }
114                let Some(ref collector) = self.log_collector else {
115                    tx.send(Err(RemoteError::AuthError(
116                        "log collection is not enabled on this client".into(),
117                    )))
118                    .await?;
119                    return Ok(());
120                };
121                let expires_in = inner.expires_in_secs.map(Duration::from_secs);
122                match collector.set_filter(
123                    &inner.directives,
124                    expires_in,
125                    inner.revert_to.as_deref(),
126                ) {
127                    Ok(()) => {
128                        debug!(
129                            directives = %inner.directives,
130                            expires_in_secs = ?inner.expires_in_secs,
131                            "applied log level override"
132                        );
133                        tx.send(Ok(())).await?;
134                    }
135                    Err(err) => {
136                        warn!(?err, "failed to apply log level override");
137                        tx.send(Err(RemoteError::AuthError(err.to_string())))
138                            .await?;
139                    }
140                }
141            }
142        }
143
144        connection.closed().await;
145        Ok(())
146    }
147}
148
149fn verify_rcan(endpoint: &Endpoint, remote_node: EndpointId, rcan: &Rcan<Caps>) -> Result<()> {
150    // Must be a first-party token (not delegated)
151    ensure!(
152        matches!(rcan.capability_origin(), CapabilityOrigin::Issuer),
153        "invalid capability origin: expected first-party token"
154    );
155
156    // Issuer must be this endpoint (we issued this grant)
157    ensure!(
158        EndpointId::try_from(rcan.issuer().as_bytes())
159            .map(|id| id == endpoint.id())
160            .unwrap_or(false),
161        "invalid issuer: RCAN was not issued by this endpoint"
162    );
163
164    // Audience must be the remote node (the token is for them)
165    ensure!(
166        EndpointId::try_from(rcan.audience().as_bytes())
167            .map(|id| id == remote_node)
168            .unwrap_or(false),
169        "invalid audience: RCAN audience does not match remote node"
170    );
171
172    Ok(())
173}
174
175async fn send_missing_caps<T>(
176    tx: irpc::channel::oneshot::Sender<Result<T, RemoteError>>,
177    missing_caps: Caps,
178) -> Result<()> {
179    tx.send(Err(RemoteError::MissingCapability(missing_caps)))
180        .await?;
181    Ok(())
182}
183
184#[cfg(test)]
185mod tests {
186    use iroh::{address_lookup::MemoryLookup, endpoint::presets, protocol::Router};
187    use irpc_iroh::IrohLazyRemoteConnection;
188    use n0_future::time::Duration;
189
190    use super::*;
191    use crate::{
192        ALPN,
193        caps::create_grant_token,
194        protocol::{Auth, IrohServicesClient, RunNetworkDiagnostics},
195    };
196
197    #[tokio::test]
198    async fn test_diagnostics_host_run_diagnostics() {
199        let lookup = MemoryLookup::new();
200        let server_ep = iroh::Endpoint::builder(presets::Minimal)
201            .address_lookup(lookup.clone())
202            .bind()
203            .await
204            .unwrap();
205
206        let client_ep = iroh::Endpoint::builder(presets::Minimal)
207            .address_lookup(lookup.clone())
208            .bind()
209            .await
210            .unwrap();
211
212        let host = ClientHost::new(&server_ep);
213        let router = Router::builder(server_ep.clone())
214            .accept(CLIENT_HOST_ALPN, host)
215            .spawn();
216
217        // The server grants capabilities to the client.
218        let rcan = create_grant_token(
219            server_ep.secret_key().clone(),
220            client_ep.id(),
221            Duration::from_secs(3600),
222            Caps::for_shared_secret(),
223        )
224        .unwrap();
225
226        // Connect on the net diagnostics ALPN
227        let conn = IrohLazyRemoteConnection::new(
228            client_ep.clone(),
229            server_ep.addr(),
230            CLIENT_HOST_ALPN.to_vec(),
231        );
232        let client = ClientHostClient::boxed(conn);
233
234        // authenticate with the server-issued grant
235        client.rpc(Auth { caps: rcan }).await.unwrap();
236
237        // send RunNetworkDiagnostics and verify we get a report back
238        let result = client.rpc(RunNetworkDiagnostics).await.unwrap();
239        let report = result.expect("expected Ok(DiagnosticsReport)");
240        assert_eq!(report.endpoint_id, server_ep.id());
241
242        router.shutdown().await.unwrap();
243        client_ep.close().await;
244    }
245
246    #[tokio::test]
247    async fn test_client_host_rejects_self_signed_rcan() {
248        let lookup = MemoryLookup::new();
249        let server_ep = iroh::Endpoint::builder(presets::Minimal)
250            .address_lookup(lookup.clone())
251            .bind()
252            .await
253            .unwrap();
254
255        let client_ep = iroh::Endpoint::builder(presets::Minimal)
256            .address_lookup(lookup.clone())
257            .bind()
258            .await
259            .unwrap();
260
261        let host = ClientHost::new(&server_ep);
262        let router = Router::builder(server_ep.clone())
263            .accept(ALPN, host)
264            .spawn();
265
266        // Client creates its own RCAN (self-signed, not issued by server).
267        let rcan = create_grant_token(
268            client_ep.secret_key().clone(),
269            client_ep.id(),
270            Duration::from_secs(3600),
271            Caps::for_shared_secret(),
272        )
273        .unwrap();
274
275        let conn =
276            IrohLazyRemoteConnection::new(client_ep.clone(), server_ep.addr(), ALPN.to_vec());
277        let client = IrohServicesClient::boxed(conn);
278
279        // auth should fail because the RCAN issuer is the client, not the server
280        let result = client.rpc(Auth { caps: rcan }).await;
281        assert!(
282            result.is_err(),
283            "expected auth to be rejected for self-signed RCAN"
284        );
285
286        router.shutdown().await.unwrap();
287        client_ep.close().await;
288    }
289}