1use anyhow::{Result, ensure};
2use iroh::{
3 Endpoint, EndpointId,
4 endpoint::Connection,
5 protocol::{AcceptError, ProtocolHandler},
6};
7use irpc::WithChannels;
8use irpc_iroh::read_request;
9use n0_error::AnyError;
10#[cfg(not(target_arch = "wasm32"))]
11use n0_future::time::Duration;
12use rcan::{Capability, CapabilityOrigin, Rcan};
13use tracing::{debug, warn};
14
15use crate::{
16 caps::{Caps, LogsCap, NetDiagnosticsCap},
17 protocol::{ClientHostMessage, ClientHostProtocol, RemoteError},
18};
19#[cfg(not(target_arch = "wasm32"))]
21use crate::{logs::LogCollector, protocol::FetchLogs};
22
23pub const CLIENT_HOST_ALPN: &[u8] = b"n0/n0des-client-host/1";
25
26pub type ClientHostClient = irpc::Client<ClientHostProtocol>;
27
28#[derive(Debug, Clone)]
30pub struct ClientHost {
31 endpoint: Endpoint,
32 #[cfg(not(target_arch = "wasm32"))]
33 log_collector: Option<LogCollector>,
34}
35
36impl ProtocolHandler for ClientHost {
37 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
38 self.handle_connection(connection).await.map_err(|e| {
39 let boxed: Box<dyn std::error::Error + Send + Sync> = e.into();
40 AcceptError::from(AnyError::from(boxed))
41 })
42 }
43}
44
45impl ClientHost {
46 pub fn new(endpoint: &Endpoint) -> Self {
47 Self {
48 endpoint: endpoint.clone(),
49 #[cfg(not(target_arch = "wasm32"))]
50 log_collector: None,
51 }
52 }
53
54 #[cfg(not(target_arch = "wasm32"))]
65 pub fn with_log_collector(mut self, collector: LogCollector) -> Self {
66 self.log_collector = Some(collector);
67 self
68 }
69
70 async fn handle_connection(&self, connection: Connection) -> Result<()> {
71 let remote_node_id = connection.remote_id();
72 let Some(first_request) = read_request::<ClientHostProtocol>(&connection).await? else {
73 return Ok(());
74 };
75
76 let ClientHostMessage::Auth(WithChannels { inner, tx, .. }) = first_request else {
77 debug!(remote_node_id = %remote_node_id.fmt_short(), "Expected initial auth message");
78 connection.close(400u32.into(), b"Expected initial auth message");
79 return Ok(());
80 };
81 let rcan = inner.caps;
82 let capability = rcan.capability();
83
84 let res = verify_rcan(&self.endpoint, remote_node_id, &rcan);
85 match res {
86 Ok(()) => tx.send(()).await?,
87 Err(err) => {
88 warn!("authentication failed: {err:?}");
89 connection.close(401u32.into(), b"Unauthorized");
90 return Ok(());
91 }
92 }
93
94 let Some(request) = read_request::<ClientHostProtocol>(&connection).await? else {
96 return Ok(());
97 };
98
99 match request {
100 ClientHostMessage::Auth(_) => {
101 connection.close(400u32.into(), b"Unexpected auth message");
102 anyhow::bail!("unexpected auth message");
103 }
104 ClientHostMessage::RunNetworkDiagnostics(msg) => {
105 let WithChannels { tx, .. } = msg;
106 let needed_caps = Caps::new([NetDiagnosticsCap::GetAny]);
107 if !capability.permits(&needed_caps) {
108 return send_missing_caps(tx, needed_caps).await;
109 }
110
111 let report =
112 crate::net_diagnostics::checks::run_diagnostics(&self.endpoint).await?;
113 tx.send(Ok(report))
114 .await
115 .inspect_err(|e| warn!("sending network diagnostics response: {:?}", e))?;
116 }
117 ClientHostMessage::SetLogLevel(msg) => {
118 let WithChannels { inner, tx, .. } = msg;
119 let needed_caps = Caps::new([LogsCap::SetLevel]);
120 if !capability.permits(&needed_caps) {
121 return send_missing_caps(tx, needed_caps).await;
122 }
123 #[cfg(not(target_arch = "wasm32"))]
124 {
125 let Some(ref collector) = self.log_collector else {
126 tx.send(Err(RemoteError::AuthError(
127 "log collection is not enabled on this client".into(),
128 )))
129 .await?;
130 return Ok(());
131 };
132 let expires_in = inner.expires_in_secs.map(Duration::from_secs);
133 match collector.set_filter(
134 &inner.directives,
135 expires_in,
136 inner.revert_to.as_deref(),
137 ) {
138 Ok(()) => {
139 debug!(
140 directives = %inner.directives,
141 expires_in_secs = ?inner.expires_in_secs,
142 "applied log level override"
143 );
144 tx.send(Ok(())).await?;
145 }
146 Err(err) => {
147 warn!(?err, "failed to apply log level override");
148 tx.send(Err(RemoteError::AuthError(err.to_string())))
149 .await?;
150 }
151 }
152 }
153 #[cfg(target_arch = "wasm32")]
156 {
157 let _ = inner;
158 tx.send(Err(RemoteError::AuthError(
159 "log collection is not available on this client".into(),
160 )))
161 .await?;
162 }
163 }
164 ClientHostMessage::FetchLogs(msg) => {
165 let WithChannels { inner, tx, .. } = msg;
166 let needed_caps = Caps::new([LogsCap::Fetch]);
167 if !capability.permits(&needed_caps) {
168 let _ = tx
169 .send(Err(RemoteError::MissingCapability(needed_caps)))
170 .await;
171 } else {
172 #[cfg(not(target_arch = "wasm32"))]
173 if let Some(collector) = self.log_collector.clone() {
174 stream_current_log_file(collector, inner, tx).await;
175 } else {
176 let _ = tx
177 .send(Err(RemoteError::AuthError(
178 "log collection is not enabled on this client".into(),
179 )))
180 .await;
181 }
182 #[cfg(target_arch = "wasm32")]
185 {
186 let _ = inner;
187 let _ = tx
188 .send(Err(RemoteError::AuthError(
189 "log collection is not available on this client".into(),
190 )))
191 .await;
192 }
193 }
194 }
195 }
196
197 connection.closed().await;
198 Ok(())
199 }
200}
201
202fn verify_rcan(endpoint: &Endpoint, remote_node: EndpointId, rcan: &Rcan<Caps>) -> Result<()> {
203 ensure!(
205 matches!(rcan.capability_origin(), CapabilityOrigin::Issuer),
206 "invalid capability origin: expected first-party token"
207 );
208
209 ensure!(
211 EndpointId::try_from(rcan.issuer().as_bytes())
212 .map(|id| id == endpoint.id())
213 .unwrap_or(false),
214 "invalid issuer: RCAN was not issued by this endpoint"
215 );
216
217 ensure!(
219 EndpointId::try_from(rcan.audience().as_bytes())
220 .map(|id| id == remote_node)
221 .unwrap_or(false),
222 "invalid audience: RCAN audience does not match remote node"
223 );
224
225 Ok(())
226}
227
228async fn send_missing_caps<T>(
229 tx: irpc::channel::oneshot::Sender<Result<T, RemoteError>>,
230 missing_caps: Caps,
231) -> Result<()> {
232 tx.send(Err(RemoteError::MissingCapability(missing_caps)))
233 .await?;
234 Ok(())
235}
236
237#[cfg(not(target_arch = "wasm32"))]
241const FETCH_LOGS_CHUNK_BYTES: usize = 64 * 1024;
242
243#[cfg(not(target_arch = "wasm32"))]
249async fn stream_current_log_file(
250 collector: LogCollector,
251 request: FetchLogs,
252 tx: irpc::channel::mpsc::Sender<Result<Vec<u8>, RemoteError>>,
253) {
254 use tokio::io::AsyncReadExt;
255
256 let path = match collector.current_log_file() {
257 Ok(Some(p)) => p,
258 Ok(None) => {
259 let _ = tx
260 .send(Err(RemoteError::AuthError(
261 "no log file is present on this client".into(),
262 )))
263 .await;
264 return;
265 }
266 Err(err) => {
267 warn!(?err, "failed to locate current log file");
268 let _ = tx.send(Err(RemoteError::InternalServerError)).await;
269 return;
270 }
271 };
272
273 let mut file = match tokio::fs::File::open(&path).await {
274 Ok(f) => f,
275 Err(err) => {
276 warn!(?err, path = %path.display(), "failed to open log file");
277 let _ = tx.send(Err(RemoteError::InternalServerError)).await;
278 return;
279 }
280 };
281
282 let max_bytes = request.max_bytes.unwrap_or(u64::MAX);
283 let mut sent: u64 = 0;
284 let mut buf = vec![0u8; FETCH_LOGS_CHUNK_BYTES];
285 loop {
286 let remaining = max_bytes.saturating_sub(sent);
287 if remaining == 0 {
288 break;
289 }
290 let take = (remaining.min(buf.len() as u64)) as usize;
291 let n = match file.read(&mut buf[..take]).await {
292 Ok(0) => break,
293 Ok(n) => n,
294 Err(err) => {
295 warn!(?err, "log file read failed");
296 let _ = tx.send(Err(RemoteError::InternalServerError)).await;
297 return;
298 }
299 };
300 if tx.send(Ok(buf[..n].to_vec())).await.is_err() {
301 return;
303 }
304 sent = sent.saturating_add(n as u64);
305 }
306 debug!(
307 path = %path.display(),
308 bytes = sent,
309 "streamed log file to remote"
310 );
311}
312
313#[cfg(test)]
314mod tests {
315 use iroh::{address_lookup::MemoryLookup, endpoint::presets, protocol::Router};
316 use irpc_iroh::IrohLazyRemoteConnection;
317 use n0_future::time::Duration;
318
319 use super::*;
320 use crate::{
321 ALPN,
322 caps::create_grant_token,
323 logs::{self, FileLoggerConfig},
324 protocol::{Auth, FetchLogs as FetchLogsReq, IrohServicesClient, RunNetworkDiagnostics},
325 };
326
327 #[tokio::test]
328 async fn test_diagnostics_host_run_diagnostics() {
329 let lookup = MemoryLookup::new();
330 let server_ep = iroh::Endpoint::builder(presets::Minimal)
331 .address_lookup(lookup.clone())
332 .bind()
333 .await
334 .unwrap();
335
336 let client_ep = iroh::Endpoint::builder(presets::Minimal)
337 .address_lookup(lookup.clone())
338 .bind()
339 .await
340 .unwrap();
341
342 let host = ClientHost::new(&server_ep);
343 let router = Router::builder(server_ep.clone())
344 .accept(CLIENT_HOST_ALPN, host)
345 .spawn();
346
347 let rcan = create_grant_token(
349 server_ep.secret_key().clone(),
350 client_ep.id(),
351 Duration::from_secs(3600),
352 Caps::for_shared_secret(),
353 )
354 .unwrap();
355
356 let conn = IrohLazyRemoteConnection::new(
358 client_ep.clone(),
359 server_ep.addr(),
360 CLIENT_HOST_ALPN.to_vec(),
361 );
362 let client = ClientHostClient::boxed(conn);
363
364 client.rpc(Auth { caps: rcan }).await.unwrap();
366
367 let result = client.rpc(RunNetworkDiagnostics).await.unwrap();
369 let report = result.expect("expected Ok(DiagnosticsReport)");
370 assert_eq!(report.endpoint_id, server_ep.id());
371
372 router.shutdown().await.unwrap();
373 client_ep.close().await;
374 }
375
376 #[tokio::test]
377 async fn test_client_host_rejects_self_signed_rcan() {
378 let lookup = MemoryLookup::new();
379 let server_ep = iroh::Endpoint::builder(presets::Minimal)
380 .address_lookup(lookup.clone())
381 .bind()
382 .await
383 .unwrap();
384
385 let client_ep = iroh::Endpoint::builder(presets::Minimal)
386 .address_lookup(lookup.clone())
387 .bind()
388 .await
389 .unwrap();
390
391 let host = ClientHost::new(&server_ep);
392 let router = Router::builder(server_ep.clone())
393 .accept(ALPN, host)
394 .spawn();
395
396 let rcan = create_grant_token(
398 client_ep.secret_key().clone(),
399 client_ep.id(),
400 Duration::from_secs(3600),
401 Caps::for_shared_secret(),
402 )
403 .unwrap();
404
405 let conn =
406 IrohLazyRemoteConnection::new(client_ep.clone(), server_ep.addr(), ALPN.to_vec());
407 let client = IrohServicesClient::boxed(conn);
408
409 let result = client.rpc(Auth { caps: rcan }).await;
411 assert!(
412 result.is_err(),
413 "expected auth to be rejected for self-signed RCAN"
414 );
415
416 router.shutdown().await.unwrap();
417 client_ep.close().await;
418 }
419
420 #[tokio::test]
423 async fn test_fetch_logs_streams_current_file() {
424 let tmp = tempfile::tempdir().unwrap();
425 let (collector, _layer, guard) =
429 logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("fetch-test"))
430 .unwrap();
431 drop(guard);
432
433 let payload: Vec<u8> = (0..200_000u32).flat_map(|i| i.to_le_bytes()).collect();
436 let file_path = tmp.path().join("fetch-test.2026-05-14");
437 std::fs::write(&file_path, &payload).unwrap();
438
439 let lookup = MemoryLookup::new();
440 let server_ep = iroh::Endpoint::builder(presets::Minimal)
441 .address_lookup(lookup.clone())
442 .bind()
443 .await
444 .unwrap();
445 let client_ep = iroh::Endpoint::builder(presets::Minimal)
446 .address_lookup(lookup.clone())
447 .bind()
448 .await
449 .unwrap();
450
451 let host = ClientHost::new(&server_ep).with_log_collector(collector);
452 let router = Router::builder(server_ep.clone())
453 .accept(CLIENT_HOST_ALPN, host)
454 .spawn();
455
456 let rcan = create_grant_token(
458 server_ep.secret_key().clone(),
459 client_ep.id(),
460 Duration::from_secs(3600),
461 Caps::new([LogsCap::Fetch]),
462 )
463 .unwrap();
464 let conn = IrohLazyRemoteConnection::new(
465 client_ep.clone(),
466 server_ep.addr(),
467 CLIENT_HOST_ALPN.to_vec(),
468 );
469 let client = ClientHostClient::boxed(conn);
470 client.rpc(Auth { caps: rcan }).await.unwrap();
471
472 let mut rx = client
473 .server_streaming(FetchLogsReq { max_bytes: None }, 16)
474 .await
475 .unwrap();
476
477 let mut got: Vec<u8> = Vec::new();
478 while let Some(chunk) = rx.recv().await.expect("server stream irpc error") {
479 let bytes = chunk.expect("server returned RemoteError");
480 got.extend_from_slice(&bytes);
481 }
482 assert_eq!(got, payload, "streamed bytes should match the file");
483
484 router.shutdown().await.unwrap();
485 client_ep.close().await;
486 }
487
488 #[tokio::test]
491 async fn test_fetch_logs_rejects_missing_cap() {
492 let tmp = tempfile::tempdir().unwrap();
493 let (collector, _layer, guard) =
494 logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("noaccess"))
495 .unwrap();
496 drop(guard);
497
498 let lookup = MemoryLookup::new();
499 let server_ep = iroh::Endpoint::builder(presets::Minimal)
500 .address_lookup(lookup.clone())
501 .bind()
502 .await
503 .unwrap();
504 let client_ep = iroh::Endpoint::builder(presets::Minimal)
505 .address_lookup(lookup.clone())
506 .bind()
507 .await
508 .unwrap();
509
510 let host = ClientHost::new(&server_ep).with_log_collector(collector);
511 let router = Router::builder(server_ep.clone())
512 .accept(CLIENT_HOST_ALPN, host)
513 .spawn();
514
515 let rcan = create_grant_token(
517 server_ep.secret_key().clone(),
518 client_ep.id(),
519 Duration::from_secs(3600),
520 Caps::new([LogsCap::SetLevel]),
521 )
522 .unwrap();
523 let conn = IrohLazyRemoteConnection::new(
524 client_ep.clone(),
525 server_ep.addr(),
526 CLIENT_HOST_ALPN.to_vec(),
527 );
528 let client = ClientHostClient::boxed(conn);
529 client.rpc(Auth { caps: rcan }).await.unwrap();
530
531 let mut rx = client
532 .server_streaming(FetchLogsReq { max_bytes: None }, 4)
533 .await
534 .unwrap();
535
536 let first = rx
537 .recv()
538 .await
539 .expect("server stream irpc error")
540 .expect("stream should produce one error");
541 assert!(matches!(first, Err(RemoteError::MissingCapability(_))));
542 assert!(
543 rx.recv().await.expect("server stream irpc error").is_none(),
544 "stream should close after error",
545 );
546
547 router.shutdown().await.unwrap();
548 client_ep.close().await;
549 }
550}