Skip to main content

iroh_services/
logs.rs

1//! Client-side log collection: a `tracing-subscriber` layer that buffers
2//! structured log records for shipment to iroh-services, plus a reload handle
3//! that lets the cloud control the level filter at runtime.
4//!
5//! # The cloud is the source of truth
6//!
7//! The level filter starts at `off`. No tracing events are captured until the
8//! cloud pushes a [`crate::protocol::SetLogLevel`] over the [`ClientHost`]
9//! channel. The cloud sends one immediately after authenticating a connected
10//! endpoint, derived from the per-endpoint
11//! `endpoint_log_settings` row (when present) plus the project default.
12//!
13//! Concretely this means the install argument is empty: the client process
14//! does not get to choose its own log level. The level you see is whatever
15//! the dashboard or REST API has decided.
16//!
17//! [`ClientHost`]: crate::ClientHost
18//!
19//! # Typical usage
20//!
21//! ```no_run
22//! use iroh_services::logs;
23//!
24//! # async fn run() -> anyhow::Result<()> {
25//! // Buffer-only subscriber, filter starts at `off`.
26//! let collector = logs::install()?;
27//!
28//! // Compose with a stderr fmt layer via `logs::layer()` to also render
29//! // filtered events locally:
30//! //
31//! //     use tracing_subscriber::prelude::*;
32//! //     let (collector, log_layer) = iroh_services::logs::layer();
33//! //     tracing_subscriber::registry()
34//! //         .with(log_layer)
35//! //         .with(tracing_subscriber::fmt::layer())
36//! //         .init();
37//!
38//! // Hand the collector to the client builder so it pushes batches over RPC,
39//! // and to the ClientHost so the cloud can override the level dynamically.
40//! # Ok(())
41//! # }
42//! ```
43//!
44//! Backed by a bounded VecDeque of [`LogLine`]; the oldest entries are dropped
45//! when the buffer fills, with the drop count reported on the next batch.
46
47use std::{
48    collections::VecDeque,
49    sync::{Arc, Mutex},
50    time::Instant,
51};
52
53use n0_future::{
54    task::{AbortOnDropHandle, JoinHandle},
55    time::Duration,
56};
57use tracing::{Event, Subscriber, debug, warn};
58use tracing_subscriber::{
59    EnvFilter, Layer, Registry,
60    fmt::{
61        format::Writer,
62        time::{FormatTime, SystemTime},
63    },
64    layer::{Context, SubscriberExt as _},
65    registry::LookupSpan,
66    reload,
67    util::SubscriberInitExt as _,
68};
69
70use crate::protocol::{FieldValue, LogLine, SpanInfo};
71
72/// Maximum number of buffered log lines awaiting cloud shipment.
73///
74/// When the buffer is full, the oldest line is dropped to make room and the
75/// drop counter is incremented. KISS default; tune from real usage.
76pub const DEFAULT_BUFFER_CAPACITY: usize = 1000;
77
78/// Maximum log emission rate per second per process.
79///
80/// Lines beyond this rate are dropped (counted in the drop counter). The
81/// default rate is generous enough to capture useful debug-level traffic
82/// without unbounded growth from a runaway log loop.
83pub const DEFAULT_RATE_PER_SECOND: u32 = 100;
84
85/// Errors that can occur while installing the log collector.
86#[derive(Debug, thiserror::Error)]
87pub enum InstallError {
88    /// The default tracing dispatcher is already set; install once at startup.
89    #[error("global tracing dispatcher is already set")]
90    AlreadyInstalled,
91    /// The supplied directives string was rejected by `EnvFilter`.
92    #[error("invalid filter directives: {0}")]
93    InvalidDirectives(String),
94}
95
96/// Errors that can occur while changing the active filter at runtime.
97#[derive(Debug, thiserror::Error)]
98pub enum SetFilterError {
99    /// The supplied directives string was rejected by `EnvFilter`.
100    #[error("invalid filter directives: {0}")]
101    InvalidDirectives(String),
102    /// Reloading the filter failed because the subscriber went away.
103    #[error("reload handle is no longer valid")]
104    ReloadFailed,
105}
106
107/// Handle to the buffered log collector. Cheap to clone; all clones share the
108/// same backing buffer and reload handle.
109#[derive(Clone)]
110pub struct LogCollector {
111    inner: Arc<CollectorInner>,
112}
113
114struct CollectorInner {
115    buffer: Mutex<RingBuffer>,
116    reload_handle: reload::Handle<EnvFilter, Registry>,
117    revert_task: Mutex<Option<AbortOnDropHandle<()>>>,
118}
119
120/// Off-state directive. The buffer captures nothing until the cloud sends
121/// a `SetLogLevel` with something more permissive.
122const OFF_DIRECTIVES: &str = "off";
123
124struct RingBuffer {
125    lines: VecDeque<LogLine>,
126    dropped: u32,
127    capacity: usize,
128    rate_per_second: u32,
129    window_start: Instant,
130    window_count: u32,
131}
132
133impl RingBuffer {
134    fn new(capacity: usize, rate_per_second: u32) -> Self {
135        Self {
136            lines: VecDeque::with_capacity(capacity.min(64)),
137            dropped: 0,
138            capacity,
139            rate_per_second,
140            window_start: Instant::now(),
141            window_count: 0,
142        }
143    }
144
145    fn push(&mut self, line: LogLine) {
146        let now = Instant::now();
147        if now.duration_since(self.window_start) >= Duration::from_secs(1) {
148            self.window_start = now;
149            self.window_count = 0;
150        }
151        if self.window_count >= self.rate_per_second {
152            self.dropped = self.dropped.saturating_add(1);
153            return;
154        }
155        self.window_count += 1;
156
157        if self.lines.len() == self.capacity {
158            self.lines.pop_front();
159            self.dropped = self.dropped.saturating_add(1);
160        }
161        self.lines.push_back(line);
162    }
163
164    fn drain(&mut self, max: usize) -> (Vec<LogLine>, u32) {
165        let take = self.lines.len().min(max);
166        let lines: Vec<LogLine> = self.lines.drain(..take).collect();
167        let dropped = std::mem::take(&mut self.dropped);
168        (lines, dropped)
169    }
170}
171
172impl LogCollector {
173    /// Returns the current number of buffered lines.
174    pub fn buffered(&self) -> usize {
175        self.inner.buffer.lock().expect("poisoned").lines.len()
176    }
177
178    /// Drains up to `max` lines from the buffer, along with the count of lines
179    /// dropped since the last drain.
180    pub fn drain(&self, max: usize) -> (Vec<LogLine>, u32) {
181        self.inner.buffer.lock().expect("poisoned").drain(max)
182    }
183
184    /// Sets the active filter directives. When `expires_in` is set,
185    /// schedules a revert after that duration. The revert target is
186    /// `revert_to` when supplied; `None` means revert to `off`.
187    pub fn set_filter(
188        &self,
189        directives: &str,
190        expires_in: Option<Duration>,
191        revert_to: Option<&str>,
192    ) -> Result<(), SetFilterError> {
193        let filter = EnvFilter::try_new(directives)
194            .map_err(|e| SetFilterError::InvalidDirectives(e.to_string()))?;
195        self.inner
196            .reload_handle
197            .reload(filter)
198            .map_err(|_| SetFilterError::ReloadFailed)?;
199
200        let mut guard = self.inner.revert_task.lock().expect("poisoned");
201        *guard = None;
202
203        if let Some(expires_in) = expires_in {
204            let collector = self.clone();
205            let revert_to = revert_to.map(str::to_string);
206            let handle: JoinHandle<()> = n0_future::task::spawn(async move {
207                n0_future::time::sleep(expires_in).await;
208                let target = revert_to.as_deref();
209                if let Err(err) = collector.revert(target) {
210                    warn!(?err, "failed to revert log filter");
211                }
212            });
213            *guard = Some(AbortOnDropHandle::new(handle));
214        }
215        Ok(())
216    }
217
218    /// Reverts the active filter to `to`, or to the off state when `to` is
219    /// `None`.
220    pub fn revert(&self, to: Option<&str>) -> Result<(), SetFilterError> {
221        let directives = to.unwrap_or(OFF_DIRECTIVES);
222        let filter = EnvFilter::try_new(directives)
223            .map_err(|e| SetFilterError::InvalidDirectives(e.to_string()))?;
224        self.inner
225            .reload_handle
226            .reload(filter)
227            .map_err(|_| SetFilterError::ReloadFailed)
228    }
229}
230
231impl std::fmt::Debug for LogCollector {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        f.debug_struct("LogCollector")
234            .field("buffered", &self.buffered())
235            .finish()
236    }
237}
238
239/// Installs a global tracing subscriber whose only output is a JSON-buffering
240/// layer that ships records to the cloud. The level filter starts at `off`;
241/// the cloud must push a `SetLogLevel` for any events to be captured.
242///
243/// Call exactly once at process start. For local console output in addition
244/// to cloud shipping, use [`layer`] and compose your own subscriber.
245pub fn install() -> Result<LogCollector, InstallError> {
246    let (collector, layer) = layer();
247    tracing_subscriber::registry()
248        .with(layer)
249        .try_init()
250        .map_err(|_| InstallError::AlreadyInstalled)?;
251    debug!("iroh-services log collector installed");
252    Ok(collector)
253}
254
255/// Builds the buffer layer and its [`LogCollector`] handle without installing
256/// a global subscriber. Use this when composing the collector with other
257/// layers; it returns the layer pre-wrapped in the reloadable filter.
258///
259/// Typical pattern for buffer + stderr fmt:
260///
261/// ```no_run
262/// use iroh_services::logs;
263/// use tracing_subscriber::prelude::*;
264///
265/// let (collector, log_layer) = logs::layer();
266/// tracing_subscriber::registry()
267///     .with(log_layer)
268///     .with(tracing_subscriber::fmt::layer())
269///     .try_init()
270///     .ok();
271/// # let _ = collector;
272/// ```
273pub fn layer() -> (LogCollector, impl Layer<Registry> + Send + Sync + 'static) {
274    // `EnvFilter::try_new("off")` cannot fail; "off" is always valid.
275    let filter = EnvFilter::try_new(OFF_DIRECTIVES).expect("'off' is always a valid directive");
276    let (filter, reload_handle) = reload::Layer::new(filter);
277
278    let inner = Arc::new(CollectorInner {
279        buffer: Mutex::new(RingBuffer::new(
280            DEFAULT_BUFFER_CAPACITY,
281            DEFAULT_RATE_PER_SECOND,
282        )),
283        reload_handle,
284        revert_task: Mutex::new(None),
285    });
286    let collector = LogCollector {
287        inner: inner.clone(),
288    };
289    let buffer_layer = BufferLayer { inner };
290    (collector, buffer_layer.with_filter(filter))
291}
292
293struct BufferLayer {
294    inner: Arc<CollectorInner>,
295}
296
297impl<S> Layer<S> for BufferLayer
298where
299    S: Subscriber + for<'a> LookupSpan<'a>,
300{
301    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
302        let metadata = event.metadata();
303        let mut timestamp = String::new();
304        let _ = SystemTime.format_time(&mut Writer::new(&mut timestamp));
305
306        let mut field_visitor = FieldVisitor::default();
307        event.record(&mut field_visitor);
308
309        let mut spans: Vec<SpanInfo> = Vec::new();
310        if let Some(scope) = ctx.event_scope(event) {
311            for span in scope.from_root() {
312                spans.push(SpanInfo {
313                    name: span.name().to_string(),
314                    fields: Vec::new(),
315                });
316            }
317        }
318
319        let line = LogLine {
320            timestamp,
321            level: metadata.level().to_string(),
322            target: metadata.target().to_string(),
323            fields: field_visitor.fields,
324            spans,
325        };
326
327        self.inner.buffer.lock().expect("poisoned").push(line);
328    }
329}
330
331#[derive(Default)]
332struct FieldVisitor {
333    fields: Vec<(String, FieldValue)>,
334}
335
336impl FieldVisitor {
337    fn push(&mut self, field: &tracing::field::Field, value: FieldValue) {
338        self.fields.push((field.name().to_string(), value));
339    }
340}
341
342impl tracing::field::Visit for FieldVisitor {
343    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
344        self.push(field, FieldValue::Str(value.to_string()));
345    }
346
347    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
348        self.push(field, FieldValue::I64(value));
349    }
350
351    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
352        self.push(field, FieldValue::U64(value));
353    }
354
355    fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
356        self.push(field, FieldValue::Other(value.to_string()));
357    }
358
359    fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
360        self.push(field, FieldValue::Other(value.to_string()));
361    }
362
363    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
364        self.push(field, FieldValue::Bool(value));
365    }
366
367    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
368        self.push(field, FieldValue::F64(value));
369    }
370
371    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
372        // The implicit `message` field arrives here when the producer used a
373        // bare format string (`info!("hello {x}")`). Store it as a plain
374        // string so the dashboard does not show it wrapped in quotes from a
375        // generic `Debug` formatter.
376        if field.name() == "message" {
377            self.push(field, FieldValue::Str(format!("{value:?}")));
378        } else {
379            self.push(field, FieldValue::Other(format!("{value:?}")));
380        }
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    fn message_is(line: &LogLine, want: &str) -> bool {
389        line.fields
390            .iter()
391            .any(|(k, v)| k == "message" && matches!(v, FieldValue::Str(s) if s == want))
392    }
393
394    #[test]
395    fn ring_buffer_rolls_over_oldest() {
396        let mut buf = RingBuffer::new(2, 1000);
397        for i in 0..5 {
398            buf.push(LogLine {
399                timestamp: format!("{i}"),
400                level: "INFO".into(),
401                target: "test".into(),
402                fields: Vec::new(),
403                spans: Vec::new(),
404            });
405        }
406        let (lines, dropped) = buf.drain(10);
407        assert_eq!(lines.len(), 2);
408        assert_eq!(lines[0].timestamp, "3");
409        assert_eq!(lines[1].timestamp, "4");
410        assert_eq!(dropped, 3);
411    }
412
413    #[test]
414    fn ring_buffer_throttles_per_second() {
415        let mut buf = RingBuffer::new(1000, 2);
416        for i in 0..10 {
417            buf.push(LogLine {
418                timestamp: format!("{i}"),
419                level: "INFO".into(),
420                target: "test".into(),
421                fields: Vec::new(),
422                spans: Vec::new(),
423            });
424        }
425        let (lines, dropped) = buf.drain(100);
426        assert_eq!(lines.len(), 2);
427        assert_eq!(dropped, 8);
428    }
429
430    #[tokio::test]
431    async fn collector_reload_changes_filter_then_reverts() {
432        let collector = match install() {
433            Ok(c) => c,
434            Err(InstallError::AlreadyInstalled) => return,
435            Err(e) => panic!("install: {e}"),
436        };
437
438        // Filter starts at "off" — even info lines are dropped.
439        tracing::info!(target: "logtest", "should not appear yet");
440        let (lines, _) = collector.drain(100);
441        assert!(!lines.iter().any(|l| message_is(l, "should not appear yet")));
442
443        // Cloud raises the level to info.
444        collector.set_filter("info", None, None).unwrap();
445        tracing::info!(target: "logtest", "first info");
446        tracing::trace!(target: "logtest", "should not appear");
447        let (lines, _) = collector.drain(100);
448        assert!(
449            lines
450                .iter()
451                .any(|l| l.target == "logtest" && message_is(l, "first info"))
452        );
453        assert!(!lines.iter().any(|l| message_is(l, "should not appear")));
454
455        // Cloud raises again to trace, with TTL and a revert target.
456        collector
457            .set_filter("trace", Some(Duration::from_millis(150)), Some("info"))
458            .unwrap();
459        tracing::trace!(target: "logtest", "should appear");
460        let (lines, _) = collector.drain(100);
461        assert!(lines.iter().any(|l| message_is(l, "should appear")));
462
463        n0_future::time::sleep(Duration::from_millis(300)).await;
464        tracing::trace!(target: "logtest", "should not appear after revert");
465        let (lines, _) = collector.drain(100);
466        assert!(
467            !lines
468                .iter()
469                .any(|l| message_is(l, "should not appear after revert"))
470        );
471    }
472}