Skip to main content

iroh_services/
client_host.rs

1use anyhow::{Result, ensure};
2use iroh::{
3    Endpoint, EndpointId,
4    endpoint::Connection,
5    protocol::{AcceptError, ProtocolHandler},
6};
7use irpc::WithChannels;
8use irpc_iroh::read_request;
9use n0_error::AnyError;
10#[cfg(not(target_arch = "wasm32"))]
11use n0_future::time::Duration;
12use rcan::{Capability, CapabilityOrigin, Rcan};
13use tracing::{debug, warn};
14
15use crate::{
16    caps::{Caps, LogsCap, NetDiagnosticsCap},
17    protocol::{ClientHostMessage, ClientHostProtocol, RemoteError},
18};
19// File-based log collection is native-only.
20#[cfg(not(target_arch = "wasm32"))]
21use crate::{logs::LogCollector, protocol::FetchLogs};
22
23/// The ALPN for sending messages from the cloud node to the client.
24pub const CLIENT_HOST_ALPN: &[u8] = b"n0/n0des-client-host/1";
25
26pub type ClientHostClient = irpc::Client<ClientHostProtocol>;
27
28/// Protocol handler for cloud-to-endpoint connections.
29#[derive(Debug, Clone)]
30pub struct ClientHost {
31    endpoint: Endpoint,
32    #[cfg(not(target_arch = "wasm32"))]
33    log_collector: Option<LogCollector>,
34}
35
36impl ProtocolHandler for ClientHost {
37    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
38        self.handle_connection(connection).await.map_err(|e| {
39            let boxed: Box<dyn std::error::Error + Send + Sync> = e.into();
40            AcceptError::from(AnyError::from(boxed))
41        })
42    }
43}
44
45impl ClientHost {
46    pub fn new(endpoint: &Endpoint) -> Self {
47        Self {
48            endpoint: endpoint.clone(),
49            #[cfg(not(target_arch = "wasm32"))]
50            log_collector: None,
51        }
52    }
53
54    /// Enables the cloud to set the log level filter at runtime via the
55    /// [`SetLogLevel`] callback.
56    ///
57    /// Without a collector the handler still accepts the message but responds
58    /// with [`RemoteError::AuthError`] indicating the feature is disabled.
59    ///
60    /// File-based log collection is native-only, so this is unavailable on
61    /// wasm.
62    ///
63    /// [`SetLogLevel`]: crate::protocol::SetLogLevel
64    #[cfg(not(target_arch = "wasm32"))]
65    pub fn with_log_collector(mut self, collector: LogCollector) -> Self {
66        self.log_collector = Some(collector);
67        self
68    }
69
70    async fn handle_connection(&self, connection: Connection) -> Result<()> {
71        let remote_node_id = connection.remote_id();
72        let Some(first_request) = read_request::<ClientHostProtocol>(&connection).await? else {
73            return Ok(());
74        };
75
76        let ClientHostMessage::Auth(WithChannels { inner, tx, .. }) = first_request else {
77            debug!(remote_node_id = %remote_node_id.fmt_short(), "Expected initial auth message");
78            connection.close(400u32.into(), b"Expected initial auth message");
79            return Ok(());
80        };
81        let rcan = inner.caps;
82        let capability = rcan.capability();
83
84        let res = verify_rcan(&self.endpoint, remote_node_id, &rcan);
85        match res {
86            Ok(()) => tx.send(()).await?,
87            Err(err) => {
88                warn!("authentication failed: {err:?}");
89                connection.close(401u32.into(), b"Unauthorized");
90                return Ok(());
91            }
92        }
93
94        // Read exactly one callback request
95        let Some(request) = read_request::<ClientHostProtocol>(&connection).await? else {
96            return Ok(());
97        };
98
99        match request {
100            ClientHostMessage::Auth(_) => {
101                connection.close(400u32.into(), b"Unexpected auth message");
102                anyhow::bail!("unexpected auth message");
103            }
104            ClientHostMessage::RunNetworkDiagnostics(msg) => {
105                let WithChannels { tx, .. } = msg;
106                let needed_caps = Caps::new([NetDiagnosticsCap::GetAny]);
107                if !capability.permits(&needed_caps) {
108                    return send_missing_caps(tx, needed_caps).await;
109                }
110
111                let report =
112                    crate::net_diagnostics::checks::run_diagnostics(&self.endpoint).await?;
113                tx.send(Ok(report))
114                    .await
115                    .inspect_err(|e| warn!("sending network diagnostics response: {:?}", e))?;
116            }
117            ClientHostMessage::SetLogLevel(msg) => {
118                let WithChannels { inner, tx, .. } = msg;
119                let needed_caps = Caps::new([LogsCap::SetLevel]);
120                if !capability.permits(&needed_caps) {
121                    return send_missing_caps(tx, needed_caps).await;
122                }
123                #[cfg(not(target_arch = "wasm32"))]
124                {
125                    let Some(ref collector) = self.log_collector else {
126                        tx.send(Err(RemoteError::AuthError(
127                            "log collection is not enabled on this client".into(),
128                        )))
129                        .await?;
130                        return Ok(());
131                    };
132                    let expires_in = inner.expires_in_secs.map(Duration::from_secs);
133                    match collector.set_filter(
134                        &inner.directives,
135                        expires_in,
136                        inner.revert_to.as_deref(),
137                    ) {
138                        Ok(()) => {
139                            debug!(
140                                directives = %inner.directives,
141                                expires_in_secs = ?inner.expires_in_secs,
142                                "applied log level override"
143                            );
144                            tx.send(Ok(())).await?;
145                        }
146                        Err(err) => {
147                            warn!(?err, "failed to apply log level override");
148                            tx.send(Err(RemoteError::AuthError(err.to_string())))
149                                .await?;
150                        }
151                    }
152                }
153                // The file logger is native-only; on wasm there is no
154                // collector to drive.
155                #[cfg(target_arch = "wasm32")]
156                {
157                    let _ = inner;
158                    tx.send(Err(RemoteError::AuthError(
159                        "log collection is not available on this client".into(),
160                    )))
161                    .await?;
162                }
163            }
164            ClientHostMessage::FetchLogs(msg) => {
165                let WithChannels { inner, tx, .. } = msg;
166                let needed_caps = Caps::new([LogsCap::Fetch]);
167                if !capability.permits(&needed_caps) {
168                    let _ = tx
169                        .send(Err(RemoteError::MissingCapability(needed_caps)))
170                        .await;
171                } else {
172                    #[cfg(not(target_arch = "wasm32"))]
173                    if let Some(collector) = self.log_collector.clone() {
174                        stream_current_log_file(collector, inner, tx).await;
175                    } else {
176                        let _ = tx
177                            .send(Err(RemoteError::AuthError(
178                                "log collection is not enabled on this client".into(),
179                            )))
180                            .await;
181                    }
182                    // The file logger is native-only; on wasm there is no
183                    // log file to stream.
184                    #[cfg(target_arch = "wasm32")]
185                    {
186                        let _ = inner;
187                        let _ = tx
188                            .send(Err(RemoteError::AuthError(
189                                "log collection is not available on this client".into(),
190                            )))
191                            .await;
192                    }
193                }
194            }
195        }
196
197        connection.closed().await;
198        Ok(())
199    }
200}
201
202fn verify_rcan(endpoint: &Endpoint, remote_node: EndpointId, rcan: &Rcan<Caps>) -> Result<()> {
203    // Must be a first-party token (not delegated)
204    ensure!(
205        matches!(rcan.capability_origin(), CapabilityOrigin::Issuer),
206        "invalid capability origin: expected first-party token"
207    );
208
209    // Issuer must be this endpoint (we issued this grant)
210    ensure!(
211        EndpointId::try_from(rcan.issuer().as_bytes())
212            .map(|id| id == endpoint.id())
213            .unwrap_or(false),
214        "invalid issuer: RCAN was not issued by this endpoint"
215    );
216
217    // Audience must be the remote node (the token is for them)
218    ensure!(
219        EndpointId::try_from(rcan.audience().as_bytes())
220            .map(|id| id == remote_node)
221            .unwrap_or(false),
222        "invalid audience: RCAN audience does not match remote node"
223    );
224
225    Ok(())
226}
227
228async fn send_missing_caps<T>(
229    tx: irpc::channel::oneshot::Sender<Result<T, RemoteError>>,
230    missing_caps: Caps,
231) -> Result<()> {
232    tx.send(Err(RemoteError::MissingCapability(missing_caps)))
233        .await?;
234    Ok(())
235}
236
237/// Chunk size for streaming the rolling file back. 64 KiB is large enough
238/// to amortize the round-trip overhead and small enough that a tight
239/// `max_bytes` clamp still produces granular cut-off points.
240#[cfg(not(target_arch = "wasm32"))]
241const FETCH_LOGS_CHUNK_BYTES: usize = 64 * 1024;
242
243/// Open the collector's currently-active rolling file and stream it back
244/// over `tx` in 64 KiB chunks. Stops at end-of-file or when
245/// `request.max_bytes` is reached. Errors during read are reported as a
246/// terminal `Err` chunk; the receiver should treat the stream's end as
247/// success.
248#[cfg(not(target_arch = "wasm32"))]
249async fn stream_current_log_file(
250    collector: LogCollector,
251    request: FetchLogs,
252    tx: irpc::channel::mpsc::Sender<Result<Vec<u8>, RemoteError>>,
253) {
254    use tokio::io::AsyncReadExt;
255
256    let path = match collector.current_log_file() {
257        Ok(Some(p)) => p,
258        Ok(None) => {
259            let _ = tx
260                .send(Err(RemoteError::AuthError(
261                    "no log file is present on this client".into(),
262                )))
263                .await;
264            return;
265        }
266        Err(err) => {
267            warn!(?err, "failed to locate current log file");
268            let _ = tx.send(Err(RemoteError::InternalServerError)).await;
269            return;
270        }
271    };
272
273    let mut file = match tokio::fs::File::open(&path).await {
274        Ok(f) => f,
275        Err(err) => {
276            warn!(?err, path = %path.display(), "failed to open log file");
277            let _ = tx.send(Err(RemoteError::InternalServerError)).await;
278            return;
279        }
280    };
281
282    let max_bytes = request.max_bytes.unwrap_or(u64::MAX);
283    let mut sent: u64 = 0;
284    let mut buf = vec![0u8; FETCH_LOGS_CHUNK_BYTES];
285    loop {
286        let remaining = max_bytes.saturating_sub(sent);
287        if remaining == 0 {
288            break;
289        }
290        let take = (remaining.min(buf.len() as u64)) as usize;
291        let n = match file.read(&mut buf[..take]).await {
292            Ok(0) => break,
293            Ok(n) => n,
294            Err(err) => {
295                warn!(?err, "log file read failed");
296                let _ = tx.send(Err(RemoteError::InternalServerError)).await;
297                return;
298            }
299        };
300        if tx.send(Ok(buf[..n].to_vec())).await.is_err() {
301            // Receiver hung up; nothing more to do.
302            return;
303        }
304        sent = sent.saturating_add(n as u64);
305    }
306    debug!(
307        path = %path.display(),
308        bytes = sent,
309        "streamed log file to remote"
310    );
311}
312
313#[cfg(test)]
314mod tests {
315    use iroh::{address_lookup::MemoryLookup, endpoint::presets, protocol::Router};
316    use irpc_iroh::IrohLazyRemoteConnection;
317    use n0_future::time::Duration;
318
319    use super::*;
320    use crate::{
321        ALPN,
322        caps::create_grant_token,
323        logs::{self, FileLoggerConfig},
324        protocol::{Auth, FetchLogs as FetchLogsReq, IrohServicesClient, RunNetworkDiagnostics},
325    };
326
327    #[tokio::test]
328    async fn test_diagnostics_host_run_diagnostics() {
329        let lookup = MemoryLookup::new();
330        let server_ep = iroh::Endpoint::builder(presets::Minimal)
331            .address_lookup(lookup.clone())
332            .bind()
333            .await
334            .unwrap();
335
336        let client_ep = iroh::Endpoint::builder(presets::Minimal)
337            .address_lookup(lookup.clone())
338            .bind()
339            .await
340            .unwrap();
341
342        let host = ClientHost::new(&server_ep);
343        let router = Router::builder(server_ep.clone())
344            .accept(CLIENT_HOST_ALPN, host)
345            .spawn();
346
347        // The server grants capabilities to the client.
348        let rcan = create_grant_token(
349            server_ep.secret_key().clone(),
350            client_ep.id(),
351            Duration::from_secs(3600),
352            Caps::for_shared_secret(),
353        )
354        .unwrap();
355
356        // Connect on the net diagnostics ALPN
357        let conn = IrohLazyRemoteConnection::new(
358            client_ep.clone(),
359            server_ep.addr(),
360            CLIENT_HOST_ALPN.to_vec(),
361        );
362        let client = ClientHostClient::boxed(conn);
363
364        // authenticate with the server-issued grant
365        client.rpc(Auth { caps: rcan }).await.unwrap();
366
367        // send RunNetworkDiagnostics and verify we get a report back
368        let result = client.rpc(RunNetworkDiagnostics).await.unwrap();
369        let report = result.expect("expected Ok(DiagnosticsReport)");
370        assert_eq!(report.endpoint_id, server_ep.id());
371
372        router.shutdown().await.unwrap();
373        client_ep.close().await;
374    }
375
376    #[tokio::test]
377    async fn test_client_host_rejects_self_signed_rcan() {
378        let lookup = MemoryLookup::new();
379        let server_ep = iroh::Endpoint::builder(presets::Minimal)
380            .address_lookup(lookup.clone())
381            .bind()
382            .await
383            .unwrap();
384
385        let client_ep = iroh::Endpoint::builder(presets::Minimal)
386            .address_lookup(lookup.clone())
387            .bind()
388            .await
389            .unwrap();
390
391        let host = ClientHost::new(&server_ep);
392        let router = Router::builder(server_ep.clone())
393            .accept(ALPN, host)
394            .spawn();
395
396        // Client creates its own RCAN (self-signed, not issued by server).
397        let rcan = create_grant_token(
398            client_ep.secret_key().clone(),
399            client_ep.id(),
400            Duration::from_secs(3600),
401            Caps::for_shared_secret(),
402        )
403        .unwrap();
404
405        let conn =
406            IrohLazyRemoteConnection::new(client_ep.clone(), server_ep.addr(), ALPN.to_vec());
407        let client = IrohServicesClient::boxed(conn);
408
409        // auth should fail because the RCAN issuer is the client, not the server
410        let result = client.rpc(Auth { caps: rcan }).await;
411        assert!(
412            result.is_err(),
413            "expected auth to be rejected for self-signed RCAN"
414        );
415
416        router.shutdown().await.unwrap();
417        client_ep.close().await;
418    }
419
420    /// FetchLogs streams the currently-active rolling file from the
421    /// endpoint back to the cloud caller in chunks.
422    #[tokio::test]
423    async fn test_fetch_logs_streams_current_file() {
424        let tmp = tempfile::tempdir().unwrap();
425        // Stand up a LogCollector pointing at the tempdir but skip the
426        // global subscriber init: we want a controlled file we wrote
427        // directly, not whatever the appender buffers.
428        let (collector, _layer, guard) =
429            logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("fetch-test"))
430                .unwrap();
431        drop(guard);
432
433        // Write a known payload to a file matching the prefix so
434        // `current_log_file` picks it up.
435        let payload: Vec<u8> = (0..200_000u32).flat_map(|i| i.to_le_bytes()).collect();
436        let file_path = tmp.path().join("fetch-test.2026-05-14");
437        std::fs::write(&file_path, &payload).unwrap();
438
439        let lookup = MemoryLookup::new();
440        let server_ep = iroh::Endpoint::builder(presets::Minimal)
441            .address_lookup(lookup.clone())
442            .bind()
443            .await
444            .unwrap();
445        let client_ep = iroh::Endpoint::builder(presets::Minimal)
446            .address_lookup(lookup.clone())
447            .bind()
448            .await
449            .unwrap();
450
451        let host = ClientHost::new(&server_ep).with_log_collector(collector);
452        let router = Router::builder(server_ep.clone())
453            .accept(CLIENT_HOST_ALPN, host)
454            .spawn();
455
456        // Issue a grant that includes LogsCap::Fetch.
457        let rcan = create_grant_token(
458            server_ep.secret_key().clone(),
459            client_ep.id(),
460            Duration::from_secs(3600),
461            Caps::new([LogsCap::Fetch]),
462        )
463        .unwrap();
464        let conn = IrohLazyRemoteConnection::new(
465            client_ep.clone(),
466            server_ep.addr(),
467            CLIENT_HOST_ALPN.to_vec(),
468        );
469        let client = ClientHostClient::boxed(conn);
470        client.rpc(Auth { caps: rcan }).await.unwrap();
471
472        let mut rx = client
473            .server_streaming(FetchLogsReq { max_bytes: None }, 16)
474            .await
475            .unwrap();
476
477        let mut got: Vec<u8> = Vec::new();
478        while let Some(chunk) = rx.recv().await.expect("server stream irpc error") {
479            let bytes = chunk.expect("server returned RemoteError");
480            got.extend_from_slice(&bytes);
481        }
482        assert_eq!(got, payload, "streamed bytes should match the file");
483
484        router.shutdown().await.unwrap();
485        client_ep.close().await;
486    }
487
488    /// Endpoints without `LogsCap::Fetch` get a `MissingCapability` error
489    /// on the stream and no data.
490    #[tokio::test]
491    async fn test_fetch_logs_rejects_missing_cap() {
492        let tmp = tempfile::tempdir().unwrap();
493        let (collector, _layer, guard) =
494            logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("noaccess"))
495                .unwrap();
496        drop(guard);
497
498        let lookup = MemoryLookup::new();
499        let server_ep = iroh::Endpoint::builder(presets::Minimal)
500            .address_lookup(lookup.clone())
501            .bind()
502            .await
503            .unwrap();
504        let client_ep = iroh::Endpoint::builder(presets::Minimal)
505            .address_lookup(lookup.clone())
506            .bind()
507            .await
508            .unwrap();
509
510        let host = ClientHost::new(&server_ep).with_log_collector(collector);
511        let router = Router::builder(server_ep.clone())
512            .accept(CLIENT_HOST_ALPN, host)
513            .spawn();
514
515        // Grant SetLevel, not Fetch.
516        let rcan = create_grant_token(
517            server_ep.secret_key().clone(),
518            client_ep.id(),
519            Duration::from_secs(3600),
520            Caps::new([LogsCap::SetLevel]),
521        )
522        .unwrap();
523        let conn = IrohLazyRemoteConnection::new(
524            client_ep.clone(),
525            server_ep.addr(),
526            CLIENT_HOST_ALPN.to_vec(),
527        );
528        let client = ClientHostClient::boxed(conn);
529        client.rpc(Auth { caps: rcan }).await.unwrap();
530
531        let mut rx = client
532            .server_streaming(FetchLogsReq { max_bytes: None }, 4)
533            .await
534            .unwrap();
535
536        let first = rx
537            .recv()
538            .await
539            .expect("server stream irpc error")
540            .expect("stream should produce one error");
541        assert!(matches!(first, Err(RemoteError::MissingCapability(_))));
542        assert!(
543            rx.recv().await.expect("server stream irpc error").is_none(),
544            "stream should close after error",
545        );
546
547        router.shutdown().await.unwrap();
548        client_ep.close().await;
549    }
550}