1use anyhow::{Result, ensure};
2use iroh::{
3 Endpoint, EndpointId,
4 endpoint::Connection,
5 protocol::{AcceptError, ProtocolHandler},
6};
7use irpc::WithChannels;
8use irpc_iroh::read_request;
9use n0_error::AnyError;
10use n0_future::time::Duration;
11use rcan::{Capability, CapabilityOrigin, Rcan};
12use tracing::{debug, warn};
13
14use crate::{
15 caps::{Caps, LogsCap, NetDiagnosticsCap},
16 logs::LogCollector,
17 protocol::{ClientHostMessage, ClientHostProtocol, FetchLogs, RemoteError},
18};
19
20pub const CLIENT_HOST_ALPN: &[u8] = b"n0/n0des-client-host/1";
22
23pub type ClientHostClient = irpc::Client<ClientHostProtocol>;
24
25#[derive(Debug, Clone)]
27pub struct ClientHost {
28 endpoint: Endpoint,
29 log_collector: Option<LogCollector>,
30}
31
32impl ProtocolHandler for ClientHost {
33 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
34 self.handle_connection(connection).await.map_err(|e| {
35 let boxed: Box<dyn std::error::Error + Send + Sync> = e.into();
36 AcceptError::from(AnyError::from(boxed))
37 })
38 }
39}
40
41impl ClientHost {
42 pub fn new(endpoint: &Endpoint) -> Self {
43 Self {
44 endpoint: endpoint.clone(),
45 log_collector: None,
46 }
47 }
48
49 pub fn with_log_collector(mut self, collector: LogCollector) -> Self {
57 self.log_collector = Some(collector);
58 self
59 }
60
61 async fn handle_connection(&self, connection: Connection) -> Result<()> {
62 let remote_node_id = connection.remote_id();
63 let Some(first_request) = read_request::<ClientHostProtocol>(&connection).await? else {
64 return Ok(());
65 };
66
67 let ClientHostMessage::Auth(WithChannels { inner, tx, .. }) = first_request else {
68 debug!(remote_node_id = %remote_node_id.fmt_short(), "Expected initial auth message");
69 connection.close(400u32.into(), b"Expected initial auth message");
70 return Ok(());
71 };
72 let rcan = inner.caps;
73 let capability = rcan.capability();
74
75 let res = verify_rcan(&self.endpoint, remote_node_id, &rcan);
76 match res {
77 Ok(()) => tx.send(()).await?,
78 Err(err) => {
79 warn!("authentication failed: {err:?}");
80 connection.close(401u32.into(), b"Unauthorized");
81 return Ok(());
82 }
83 }
84
85 let Some(request) = read_request::<ClientHostProtocol>(&connection).await? else {
87 return Ok(());
88 };
89
90 match request {
91 ClientHostMessage::Auth(_) => {
92 connection.close(400u32.into(), b"Unexpected auth message");
93 anyhow::bail!("unexpected auth message");
94 }
95 ClientHostMessage::RunNetworkDiagnostics(msg) => {
96 let WithChannels { tx, .. } = msg;
97 let needed_caps = Caps::new([NetDiagnosticsCap::GetAny]);
98 if !capability.permits(&needed_caps) {
99 return send_missing_caps(tx, needed_caps).await;
100 }
101
102 let report =
103 crate::net_diagnostics::checks::run_diagnostics(&self.endpoint).await?;
104 tx.send(Ok(report))
105 .await
106 .inspect_err(|e| warn!("sending network diagnostics response: {:?}", e))?;
107 }
108 ClientHostMessage::SetLogLevel(msg) => {
109 let WithChannels { inner, tx, .. } = msg;
110 let needed_caps = Caps::new([LogsCap::SetLevel]);
111 if !capability.permits(&needed_caps) {
112 return send_missing_caps(tx, needed_caps).await;
113 }
114 let Some(ref collector) = self.log_collector else {
115 tx.send(Err(RemoteError::AuthError(
116 "log collection is not enabled on this client".into(),
117 )))
118 .await?;
119 return Ok(());
120 };
121 let expires_in = inner.expires_in_secs.map(Duration::from_secs);
122 match collector.set_filter(
123 &inner.directives,
124 expires_in,
125 inner.revert_to.as_deref(),
126 ) {
127 Ok(()) => {
128 debug!(
129 directives = %inner.directives,
130 expires_in_secs = ?inner.expires_in_secs,
131 "applied log level override"
132 );
133 tx.send(Ok(())).await?;
134 }
135 Err(err) => {
136 warn!(?err, "failed to apply log level override");
137 tx.send(Err(RemoteError::AuthError(err.to_string())))
138 .await?;
139 }
140 }
141 }
142 ClientHostMessage::FetchLogs(msg) => {
143 let WithChannels { inner, tx, .. } = msg;
144 let needed_caps = Caps::new([LogsCap::Fetch]);
145 if !capability.permits(&needed_caps) {
146 let _ = tx
147 .send(Err(RemoteError::MissingCapability(needed_caps)))
148 .await;
149 } else if let Some(collector) = self.log_collector.clone() {
150 stream_current_log_file(collector, inner, tx).await;
151 } else {
152 let _ = tx
153 .send(Err(RemoteError::AuthError(
154 "log collection is not enabled on this client".into(),
155 )))
156 .await;
157 }
158 }
159 }
160
161 connection.closed().await;
162 Ok(())
163 }
164}
165
166fn verify_rcan(endpoint: &Endpoint, remote_node: EndpointId, rcan: &Rcan<Caps>) -> Result<()> {
167 ensure!(
169 matches!(rcan.capability_origin(), CapabilityOrigin::Issuer),
170 "invalid capability origin: expected first-party token"
171 );
172
173 ensure!(
175 EndpointId::try_from(rcan.issuer().as_bytes())
176 .map(|id| id == endpoint.id())
177 .unwrap_or(false),
178 "invalid issuer: RCAN was not issued by this endpoint"
179 );
180
181 ensure!(
183 EndpointId::try_from(rcan.audience().as_bytes())
184 .map(|id| id == remote_node)
185 .unwrap_or(false),
186 "invalid audience: RCAN audience does not match remote node"
187 );
188
189 Ok(())
190}
191
192async fn send_missing_caps<T>(
193 tx: irpc::channel::oneshot::Sender<Result<T, RemoteError>>,
194 missing_caps: Caps,
195) -> Result<()> {
196 tx.send(Err(RemoteError::MissingCapability(missing_caps)))
197 .await?;
198 Ok(())
199}
200
201const FETCH_LOGS_CHUNK_BYTES: usize = 64 * 1024;
205
206async fn stream_current_log_file(
212 collector: LogCollector,
213 request: FetchLogs,
214 tx: irpc::channel::mpsc::Sender<Result<Vec<u8>, RemoteError>>,
215) {
216 use tokio::io::AsyncReadExt;
217
218 let path = match collector.current_log_file() {
219 Ok(Some(p)) => p,
220 Ok(None) => {
221 let _ = tx
222 .send(Err(RemoteError::AuthError(
223 "no log file is present on this client".into(),
224 )))
225 .await;
226 return;
227 }
228 Err(err) => {
229 warn!(?err, "failed to locate current log file");
230 let _ = tx.send(Err(RemoteError::InternalServerError)).await;
231 return;
232 }
233 };
234
235 let mut file = match tokio::fs::File::open(&path).await {
236 Ok(f) => f,
237 Err(err) => {
238 warn!(?err, path = %path.display(), "failed to open log file");
239 let _ = tx.send(Err(RemoteError::InternalServerError)).await;
240 return;
241 }
242 };
243
244 let max_bytes = request.max_bytes.unwrap_or(u64::MAX);
245 let mut sent: u64 = 0;
246 let mut buf = vec![0u8; FETCH_LOGS_CHUNK_BYTES];
247 loop {
248 let remaining = max_bytes.saturating_sub(sent);
249 if remaining == 0 {
250 break;
251 }
252 let take = (remaining.min(buf.len() as u64)) as usize;
253 let n = match file.read(&mut buf[..take]).await {
254 Ok(0) => break,
255 Ok(n) => n,
256 Err(err) => {
257 warn!(?err, "log file read failed");
258 let _ = tx.send(Err(RemoteError::InternalServerError)).await;
259 return;
260 }
261 };
262 if tx.send(Ok(buf[..n].to_vec())).await.is_err() {
263 return;
265 }
266 sent = sent.saturating_add(n as u64);
267 }
268 debug!(
269 path = %path.display(),
270 bytes = sent,
271 "streamed log file to remote"
272 );
273}
274
275#[cfg(test)]
276mod tests {
277 use iroh::{address_lookup::MemoryLookup, endpoint::presets, protocol::Router};
278 use irpc_iroh::IrohLazyRemoteConnection;
279 use n0_future::time::Duration;
280
281 use super::*;
282 use crate::{
283 ALPN,
284 caps::create_grant_token,
285 logs::{self, FileLoggerConfig},
286 protocol::{Auth, FetchLogs as FetchLogsReq, IrohServicesClient, RunNetworkDiagnostics},
287 };
288
289 #[tokio::test]
290 async fn test_diagnostics_host_run_diagnostics() {
291 let lookup = MemoryLookup::new();
292 let server_ep = iroh::Endpoint::builder(presets::Minimal)
293 .address_lookup(lookup.clone())
294 .bind()
295 .await
296 .unwrap();
297
298 let client_ep = iroh::Endpoint::builder(presets::Minimal)
299 .address_lookup(lookup.clone())
300 .bind()
301 .await
302 .unwrap();
303
304 let host = ClientHost::new(&server_ep);
305 let router = Router::builder(server_ep.clone())
306 .accept(CLIENT_HOST_ALPN, host)
307 .spawn();
308
309 let rcan = create_grant_token(
311 server_ep.secret_key().clone(),
312 client_ep.id(),
313 Duration::from_secs(3600),
314 Caps::for_shared_secret(),
315 )
316 .unwrap();
317
318 let conn = IrohLazyRemoteConnection::new(
320 client_ep.clone(),
321 server_ep.addr(),
322 CLIENT_HOST_ALPN.to_vec(),
323 );
324 let client = ClientHostClient::boxed(conn);
325
326 client.rpc(Auth { caps: rcan }).await.unwrap();
328
329 let result = client.rpc(RunNetworkDiagnostics).await.unwrap();
331 let report = result.expect("expected Ok(DiagnosticsReport)");
332 assert_eq!(report.endpoint_id, server_ep.id());
333
334 router.shutdown().await.unwrap();
335 client_ep.close().await;
336 }
337
338 #[tokio::test]
339 async fn test_client_host_rejects_self_signed_rcan() {
340 let lookup = MemoryLookup::new();
341 let server_ep = iroh::Endpoint::builder(presets::Minimal)
342 .address_lookup(lookup.clone())
343 .bind()
344 .await
345 .unwrap();
346
347 let client_ep = iroh::Endpoint::builder(presets::Minimal)
348 .address_lookup(lookup.clone())
349 .bind()
350 .await
351 .unwrap();
352
353 let host = ClientHost::new(&server_ep);
354 let router = Router::builder(server_ep.clone())
355 .accept(ALPN, host)
356 .spawn();
357
358 let rcan = create_grant_token(
360 client_ep.secret_key().clone(),
361 client_ep.id(),
362 Duration::from_secs(3600),
363 Caps::for_shared_secret(),
364 )
365 .unwrap();
366
367 let conn =
368 IrohLazyRemoteConnection::new(client_ep.clone(), server_ep.addr(), ALPN.to_vec());
369 let client = IrohServicesClient::boxed(conn);
370
371 let result = client.rpc(Auth { caps: rcan }).await;
373 assert!(
374 result.is_err(),
375 "expected auth to be rejected for self-signed RCAN"
376 );
377
378 router.shutdown().await.unwrap();
379 client_ep.close().await;
380 }
381
382 #[tokio::test]
385 async fn test_fetch_logs_streams_current_file() {
386 let tmp = tempfile::tempdir().unwrap();
387 let (collector, _layer, guard) =
391 logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("fetch-test"))
392 .unwrap();
393 drop(guard);
394
395 let payload: Vec<u8> = (0..200_000u32).flat_map(|i| i.to_le_bytes()).collect();
398 let file_path = tmp.path().join("fetch-test.2026-05-14");
399 std::fs::write(&file_path, &payload).unwrap();
400
401 let lookup = MemoryLookup::new();
402 let server_ep = iroh::Endpoint::builder(presets::Minimal)
403 .address_lookup(lookup.clone())
404 .bind()
405 .await
406 .unwrap();
407 let client_ep = iroh::Endpoint::builder(presets::Minimal)
408 .address_lookup(lookup.clone())
409 .bind()
410 .await
411 .unwrap();
412
413 let host = ClientHost::new(&server_ep).with_log_collector(collector);
414 let router = Router::builder(server_ep.clone())
415 .accept(CLIENT_HOST_ALPN, host)
416 .spawn();
417
418 let rcan = create_grant_token(
420 server_ep.secret_key().clone(),
421 client_ep.id(),
422 Duration::from_secs(3600),
423 Caps::new([LogsCap::Fetch]),
424 )
425 .unwrap();
426 let conn = IrohLazyRemoteConnection::new(
427 client_ep.clone(),
428 server_ep.addr(),
429 CLIENT_HOST_ALPN.to_vec(),
430 );
431 let client = ClientHostClient::boxed(conn);
432 client.rpc(Auth { caps: rcan }).await.unwrap();
433
434 let mut rx = client
435 .server_streaming(FetchLogsReq { max_bytes: None }, 16)
436 .await
437 .unwrap();
438
439 let mut got: Vec<u8> = Vec::new();
440 while let Some(chunk) = rx.recv().await.expect("server stream irpc error") {
441 let bytes = chunk.expect("server returned RemoteError");
442 got.extend_from_slice(&bytes);
443 }
444 assert_eq!(got, payload, "streamed bytes should match the file");
445
446 router.shutdown().await.unwrap();
447 client_ep.close().await;
448 }
449
450 #[tokio::test]
453 async fn test_fetch_logs_rejects_missing_cap() {
454 let tmp = tempfile::tempdir().unwrap();
455 let (collector, _layer, guard) =
456 logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("noaccess"))
457 .unwrap();
458 drop(guard);
459
460 let lookup = MemoryLookup::new();
461 let server_ep = iroh::Endpoint::builder(presets::Minimal)
462 .address_lookup(lookup.clone())
463 .bind()
464 .await
465 .unwrap();
466 let client_ep = iroh::Endpoint::builder(presets::Minimal)
467 .address_lookup(lookup.clone())
468 .bind()
469 .await
470 .unwrap();
471
472 let host = ClientHost::new(&server_ep).with_log_collector(collector);
473 let router = Router::builder(server_ep.clone())
474 .accept(CLIENT_HOST_ALPN, host)
475 .spawn();
476
477 let rcan = create_grant_token(
479 server_ep.secret_key().clone(),
480 client_ep.id(),
481 Duration::from_secs(3600),
482 Caps::new([LogsCap::SetLevel]),
483 )
484 .unwrap();
485 let conn = IrohLazyRemoteConnection::new(
486 client_ep.clone(),
487 server_ep.addr(),
488 CLIENT_HOST_ALPN.to_vec(),
489 );
490 let client = ClientHostClient::boxed(conn);
491 client.rpc(Auth { caps: rcan }).await.unwrap();
492
493 let mut rx = client
494 .server_streaming(FetchLogsReq { max_bytes: None }, 4)
495 .await
496 .unwrap();
497
498 let first = rx
499 .recv()
500 .await
501 .expect("server stream irpc error")
502 .expect("stream should produce one error");
503 assert!(matches!(first, Err(RemoteError::MissingCapability(_))));
504 assert!(
505 rx.recv().await.expect("server stream irpc error").is_none(),
506 "stream should close after error",
507 );
508
509 router.shutdown().await.unwrap();
510 client_ep.close().await;
511 }
512}