Skip to main content

iroh_services/
client_host.rs

1use anyhow::{Result, ensure};
2use iroh::{
3    Endpoint, EndpointId,
4    endpoint::Connection,
5    protocol::{AcceptError, ProtocolHandler},
6};
7use irpc::WithChannels;
8use irpc_iroh::read_request;
9use n0_error::AnyError;
10use n0_future::time::Duration;
11use rcan::{Capability, CapabilityOrigin, Rcan};
12use tracing::{debug, warn};
13
14use crate::{
15    caps::{Caps, LogsCap, NetDiagnosticsCap},
16    logs::LogCollector,
17    protocol::{ClientHostMessage, ClientHostProtocol, FetchLogs, RemoteError},
18};
19
20/// The ALPN for sending messages from the cloud node to the client.
21pub const CLIENT_HOST_ALPN: &[u8] = b"n0/n0des-client-host/1";
22
23pub type ClientHostClient = irpc::Client<ClientHostProtocol>;
24
25/// Protocol handler for cloud-to-endpoint connections.
26#[derive(Debug, Clone)]
27pub struct ClientHost {
28    endpoint: Endpoint,
29    log_collector: Option<LogCollector>,
30}
31
32impl ProtocolHandler for ClientHost {
33    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
34        self.handle_connection(connection).await.map_err(|e| {
35            let boxed: Box<dyn std::error::Error + Send + Sync> = e.into();
36            AcceptError::from(AnyError::from(boxed))
37        })
38    }
39}
40
41impl ClientHost {
42    pub fn new(endpoint: &Endpoint) -> Self {
43        Self {
44            endpoint: endpoint.clone(),
45            log_collector: None,
46        }
47    }
48
49    /// Enables the cloud to set the log level filter at runtime via the
50    /// [`SetLogLevel`] callback.
51    ///
52    /// Without a collector the handler still accepts the message but responds
53    /// with [`RemoteError::AuthError`] indicating the feature is disabled.
54    ///
55    /// [`SetLogLevel`]: crate::protocol::SetLogLevel
56    pub fn with_log_collector(mut self, collector: LogCollector) -> Self {
57        self.log_collector = Some(collector);
58        self
59    }
60
61    async fn handle_connection(&self, connection: Connection) -> Result<()> {
62        let remote_node_id = connection.remote_id();
63        let Some(first_request) = read_request::<ClientHostProtocol>(&connection).await? else {
64            return Ok(());
65        };
66
67        let ClientHostMessage::Auth(WithChannels { inner, tx, .. }) = first_request else {
68            debug!(remote_node_id = %remote_node_id.fmt_short(), "Expected initial auth message");
69            connection.close(400u32.into(), b"Expected initial auth message");
70            return Ok(());
71        };
72        let rcan = inner.caps;
73        let capability = rcan.capability();
74
75        let res = verify_rcan(&self.endpoint, remote_node_id, &rcan);
76        match res {
77            Ok(()) => tx.send(()).await?,
78            Err(err) => {
79                warn!("authentication failed: {err:?}");
80                connection.close(401u32.into(), b"Unauthorized");
81                return Ok(());
82            }
83        }
84
85        // Read exactly one callback request
86        let Some(request) = read_request::<ClientHostProtocol>(&connection).await? else {
87            return Ok(());
88        };
89
90        match request {
91            ClientHostMessage::Auth(_) => {
92                connection.close(400u32.into(), b"Unexpected auth message");
93                anyhow::bail!("unexpected auth message");
94            }
95            ClientHostMessage::RunNetworkDiagnostics(msg) => {
96                let WithChannels { tx, .. } = msg;
97                let needed_caps = Caps::new([NetDiagnosticsCap::GetAny]);
98                if !capability.permits(&needed_caps) {
99                    return send_missing_caps(tx, needed_caps).await;
100                }
101
102                let report =
103                    crate::net_diagnostics::checks::run_diagnostics(&self.endpoint).await?;
104                tx.send(Ok(report))
105                    .await
106                    .inspect_err(|e| warn!("sending network diagnostics response: {:?}", e))?;
107            }
108            ClientHostMessage::SetLogLevel(msg) => {
109                let WithChannels { inner, tx, .. } = msg;
110                let needed_caps = Caps::new([LogsCap::SetLevel]);
111                if !capability.permits(&needed_caps) {
112                    return send_missing_caps(tx, needed_caps).await;
113                }
114                let Some(ref collector) = self.log_collector else {
115                    tx.send(Err(RemoteError::AuthError(
116                        "log collection is not enabled on this client".into(),
117                    )))
118                    .await?;
119                    return Ok(());
120                };
121                let expires_in = inner.expires_in_secs.map(Duration::from_secs);
122                match collector.set_filter(
123                    &inner.directives,
124                    expires_in,
125                    inner.revert_to.as_deref(),
126                ) {
127                    Ok(()) => {
128                        debug!(
129                            directives = %inner.directives,
130                            expires_in_secs = ?inner.expires_in_secs,
131                            "applied log level override"
132                        );
133                        tx.send(Ok(())).await?;
134                    }
135                    Err(err) => {
136                        warn!(?err, "failed to apply log level override");
137                        tx.send(Err(RemoteError::AuthError(err.to_string())))
138                            .await?;
139                    }
140                }
141            }
142            ClientHostMessage::FetchLogs(msg) => {
143                let WithChannels { inner, tx, .. } = msg;
144                let needed_caps = Caps::new([LogsCap::Fetch]);
145                if !capability.permits(&needed_caps) {
146                    let _ = tx
147                        .send(Err(RemoteError::MissingCapability(needed_caps)))
148                        .await;
149                } else if let Some(collector) = self.log_collector.clone() {
150                    stream_current_log_file(collector, inner, tx).await;
151                } else {
152                    let _ = tx
153                        .send(Err(RemoteError::AuthError(
154                            "log collection is not enabled on this client".into(),
155                        )))
156                        .await;
157                }
158            }
159        }
160
161        connection.closed().await;
162        Ok(())
163    }
164}
165
166fn verify_rcan(endpoint: &Endpoint, remote_node: EndpointId, rcan: &Rcan<Caps>) -> Result<()> {
167    // Must be a first-party token (not delegated)
168    ensure!(
169        matches!(rcan.capability_origin(), CapabilityOrigin::Issuer),
170        "invalid capability origin: expected first-party token"
171    );
172
173    // Issuer must be this endpoint (we issued this grant)
174    ensure!(
175        EndpointId::try_from(rcan.issuer().as_bytes())
176            .map(|id| id == endpoint.id())
177            .unwrap_or(false),
178        "invalid issuer: RCAN was not issued by this endpoint"
179    );
180
181    // Audience must be the remote node (the token is for them)
182    ensure!(
183        EndpointId::try_from(rcan.audience().as_bytes())
184            .map(|id| id == remote_node)
185            .unwrap_or(false),
186        "invalid audience: RCAN audience does not match remote node"
187    );
188
189    Ok(())
190}
191
192async fn send_missing_caps<T>(
193    tx: irpc::channel::oneshot::Sender<Result<T, RemoteError>>,
194    missing_caps: Caps,
195) -> Result<()> {
196    tx.send(Err(RemoteError::MissingCapability(missing_caps)))
197        .await?;
198    Ok(())
199}
200
201/// Chunk size for streaming the rolling file back. 64 KiB is large enough
202/// to amortize the round-trip overhead and small enough that a tight
203/// `max_bytes` clamp still produces granular cut-off points.
204const FETCH_LOGS_CHUNK_BYTES: usize = 64 * 1024;
205
206/// Open the collector's currently-active rolling file and stream it back
207/// over `tx` in 64 KiB chunks. Stops at end-of-file or when
208/// `request.max_bytes` is reached. Errors during read are reported as a
209/// terminal `Err` chunk; the receiver should treat the stream's end as
210/// success.
211async fn stream_current_log_file(
212    collector: LogCollector,
213    request: FetchLogs,
214    tx: irpc::channel::mpsc::Sender<Result<Vec<u8>, RemoteError>>,
215) {
216    use tokio::io::AsyncReadExt;
217
218    let path = match collector.current_log_file() {
219        Ok(Some(p)) => p,
220        Ok(None) => {
221            let _ = tx
222                .send(Err(RemoteError::AuthError(
223                    "no log file is present on this client".into(),
224                )))
225                .await;
226            return;
227        }
228        Err(err) => {
229            warn!(?err, "failed to locate current log file");
230            let _ = tx.send(Err(RemoteError::InternalServerError)).await;
231            return;
232        }
233    };
234
235    let mut file = match tokio::fs::File::open(&path).await {
236        Ok(f) => f,
237        Err(err) => {
238            warn!(?err, path = %path.display(), "failed to open log file");
239            let _ = tx.send(Err(RemoteError::InternalServerError)).await;
240            return;
241        }
242    };
243
244    let max_bytes = request.max_bytes.unwrap_or(u64::MAX);
245    let mut sent: u64 = 0;
246    let mut buf = vec![0u8; FETCH_LOGS_CHUNK_BYTES];
247    loop {
248        let remaining = max_bytes.saturating_sub(sent);
249        if remaining == 0 {
250            break;
251        }
252        let take = (remaining.min(buf.len() as u64)) as usize;
253        let n = match file.read(&mut buf[..take]).await {
254            Ok(0) => break,
255            Ok(n) => n,
256            Err(err) => {
257                warn!(?err, "log file read failed");
258                let _ = tx.send(Err(RemoteError::InternalServerError)).await;
259                return;
260            }
261        };
262        if tx.send(Ok(buf[..n].to_vec())).await.is_err() {
263            // Receiver hung up; nothing more to do.
264            return;
265        }
266        sent = sent.saturating_add(n as u64);
267    }
268    debug!(
269        path = %path.display(),
270        bytes = sent,
271        "streamed log file to remote"
272    );
273}
274
275#[cfg(test)]
276mod tests {
277    use iroh::{address_lookup::MemoryLookup, endpoint::presets, protocol::Router};
278    use irpc_iroh::IrohLazyRemoteConnection;
279    use n0_future::time::Duration;
280
281    use super::*;
282    use crate::{
283        ALPN,
284        caps::create_grant_token,
285        logs::{self, FileLoggerConfig},
286        protocol::{Auth, FetchLogs as FetchLogsReq, IrohServicesClient, RunNetworkDiagnostics},
287    };
288
289    #[tokio::test]
290    async fn test_diagnostics_host_run_diagnostics() {
291        let lookup = MemoryLookup::new();
292        let server_ep = iroh::Endpoint::builder(presets::Minimal)
293            .address_lookup(lookup.clone())
294            .bind()
295            .await
296            .unwrap();
297
298        let client_ep = iroh::Endpoint::builder(presets::Minimal)
299            .address_lookup(lookup.clone())
300            .bind()
301            .await
302            .unwrap();
303
304        let host = ClientHost::new(&server_ep);
305        let router = Router::builder(server_ep.clone())
306            .accept(CLIENT_HOST_ALPN, host)
307            .spawn();
308
309        // The server grants capabilities to the client.
310        let rcan = create_grant_token(
311            server_ep.secret_key().clone(),
312            client_ep.id(),
313            Duration::from_secs(3600),
314            Caps::for_shared_secret(),
315        )
316        .unwrap();
317
318        // Connect on the net diagnostics ALPN
319        let conn = IrohLazyRemoteConnection::new(
320            client_ep.clone(),
321            server_ep.addr(),
322            CLIENT_HOST_ALPN.to_vec(),
323        );
324        let client = ClientHostClient::boxed(conn);
325
326        // authenticate with the server-issued grant
327        client.rpc(Auth { caps: rcan }).await.unwrap();
328
329        // send RunNetworkDiagnostics and verify we get a report back
330        let result = client.rpc(RunNetworkDiagnostics).await.unwrap();
331        let report = result.expect("expected Ok(DiagnosticsReport)");
332        assert_eq!(report.endpoint_id, server_ep.id());
333
334        router.shutdown().await.unwrap();
335        client_ep.close().await;
336    }
337
338    #[tokio::test]
339    async fn test_client_host_rejects_self_signed_rcan() {
340        let lookup = MemoryLookup::new();
341        let server_ep = iroh::Endpoint::builder(presets::Minimal)
342            .address_lookup(lookup.clone())
343            .bind()
344            .await
345            .unwrap();
346
347        let client_ep = iroh::Endpoint::builder(presets::Minimal)
348            .address_lookup(lookup.clone())
349            .bind()
350            .await
351            .unwrap();
352
353        let host = ClientHost::new(&server_ep);
354        let router = Router::builder(server_ep.clone())
355            .accept(ALPN, host)
356            .spawn();
357
358        // Client creates its own RCAN (self-signed, not issued by server).
359        let rcan = create_grant_token(
360            client_ep.secret_key().clone(),
361            client_ep.id(),
362            Duration::from_secs(3600),
363            Caps::for_shared_secret(),
364        )
365        .unwrap();
366
367        let conn =
368            IrohLazyRemoteConnection::new(client_ep.clone(), server_ep.addr(), ALPN.to_vec());
369        let client = IrohServicesClient::boxed(conn);
370
371        // auth should fail because the RCAN issuer is the client, not the server
372        let result = client.rpc(Auth { caps: rcan }).await;
373        assert!(
374            result.is_err(),
375            "expected auth to be rejected for self-signed RCAN"
376        );
377
378        router.shutdown().await.unwrap();
379        client_ep.close().await;
380    }
381
382    /// FetchLogs streams the currently-active rolling file from the
383    /// endpoint back to the cloud caller in chunks.
384    #[tokio::test]
385    async fn test_fetch_logs_streams_current_file() {
386        let tmp = tempfile::tempdir().unwrap();
387        // Stand up a LogCollector pointing at the tempdir but skip the
388        // global subscriber init: we want a controlled file we wrote
389        // directly, not whatever the appender buffers.
390        let (collector, _layer, guard) =
391            logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("fetch-test"))
392                .unwrap();
393        drop(guard);
394
395        // Write a known payload to a file matching the prefix so
396        // `current_log_file` picks it up.
397        let payload: Vec<u8> = (0..200_000u32).flat_map(|i| i.to_le_bytes()).collect();
398        let file_path = tmp.path().join("fetch-test.2026-05-14");
399        std::fs::write(&file_path, &payload).unwrap();
400
401        let lookup = MemoryLookup::new();
402        let server_ep = iroh::Endpoint::builder(presets::Minimal)
403            .address_lookup(lookup.clone())
404            .bind()
405            .await
406            .unwrap();
407        let client_ep = iroh::Endpoint::builder(presets::Minimal)
408            .address_lookup(lookup.clone())
409            .bind()
410            .await
411            .unwrap();
412
413        let host = ClientHost::new(&server_ep).with_log_collector(collector);
414        let router = Router::builder(server_ep.clone())
415            .accept(CLIENT_HOST_ALPN, host)
416            .spawn();
417
418        // Issue a grant that includes LogsCap::Fetch.
419        let rcan = create_grant_token(
420            server_ep.secret_key().clone(),
421            client_ep.id(),
422            Duration::from_secs(3600),
423            Caps::new([LogsCap::Fetch]),
424        )
425        .unwrap();
426        let conn = IrohLazyRemoteConnection::new(
427            client_ep.clone(),
428            server_ep.addr(),
429            CLIENT_HOST_ALPN.to_vec(),
430        );
431        let client = ClientHostClient::boxed(conn);
432        client.rpc(Auth { caps: rcan }).await.unwrap();
433
434        let mut rx = client
435            .server_streaming(FetchLogsReq { max_bytes: None }, 16)
436            .await
437            .unwrap();
438
439        let mut got: Vec<u8> = Vec::new();
440        while let Some(chunk) = rx.recv().await.expect("server stream irpc error") {
441            let bytes = chunk.expect("server returned RemoteError");
442            got.extend_from_slice(&bytes);
443        }
444        assert_eq!(got, payload, "streamed bytes should match the file");
445
446        router.shutdown().await.unwrap();
447        client_ep.close().await;
448    }
449
450    /// Endpoints without `LogsCap::Fetch` get a `MissingCapability` error
451    /// on the stream and no data.
452    #[tokio::test]
453    async fn test_fetch_logs_rejects_missing_cap() {
454        let tmp = tempfile::tempdir().unwrap();
455        let (collector, _layer, guard) =
456            logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("noaccess"))
457                .unwrap();
458        drop(guard);
459
460        let lookup = MemoryLookup::new();
461        let server_ep = iroh::Endpoint::builder(presets::Minimal)
462            .address_lookup(lookup.clone())
463            .bind()
464            .await
465            .unwrap();
466        let client_ep = iroh::Endpoint::builder(presets::Minimal)
467            .address_lookup(lookup.clone())
468            .bind()
469            .await
470            .unwrap();
471
472        let host = ClientHost::new(&server_ep).with_log_collector(collector);
473        let router = Router::builder(server_ep.clone())
474            .accept(CLIENT_HOST_ALPN, host)
475            .spawn();
476
477        // Grant SetLevel, not Fetch.
478        let rcan = create_grant_token(
479            server_ep.secret_key().clone(),
480            client_ep.id(),
481            Duration::from_secs(3600),
482            Caps::new([LogsCap::SetLevel]),
483        )
484        .unwrap();
485        let conn = IrohLazyRemoteConnection::new(
486            client_ep.clone(),
487            server_ep.addr(),
488            CLIENT_HOST_ALPN.to_vec(),
489        );
490        let client = ClientHostClient::boxed(conn);
491        client.rpc(Auth { caps: rcan }).await.unwrap();
492
493        let mut rx = client
494            .server_streaming(FetchLogsReq { max_bytes: None }, 4)
495            .await
496            .unwrap();
497
498        let first = rx
499            .recv()
500            .await
501            .expect("server stream irpc error")
502            .expect("stream should produce one error");
503        assert!(matches!(first, Err(RemoteError::MissingCapability(_))));
504        assert!(
505            rx.recv().await.expect("server stream irpc error").is_none(),
506            "stream should close after error",
507        );
508
509        router.shutdown().await.unwrap();
510        client_ep.close().await;
511    }
512}