1use std::{
48 collections::VecDeque,
49 sync::{Arc, Mutex},
50 time::Instant,
51};
52
53use n0_future::{
54 task::{AbortOnDropHandle, JoinHandle},
55 time::Duration,
56};
57use tracing::{Event, Subscriber, debug, warn};
58use tracing_subscriber::{
59 EnvFilter, Layer, Registry,
60 fmt::{
61 format::Writer,
62 time::{FormatTime, SystemTime},
63 },
64 layer::{Context, SubscriberExt as _},
65 registry::LookupSpan,
66 reload,
67 util::SubscriberInitExt as _,
68};
69
70use crate::protocol::{FieldValue, LogLine, SpanInfo};
71
72pub const DEFAULT_BUFFER_CAPACITY: usize = 1000;
77
78pub const DEFAULT_RATE_PER_SECOND: u32 = 100;
84
85#[derive(Debug, thiserror::Error)]
87pub enum InstallError {
88 #[error("global tracing dispatcher is already set")]
90 AlreadyInstalled,
91 #[error("invalid filter directives: {0}")]
93 InvalidDirectives(String),
94}
95
96#[derive(Debug, thiserror::Error)]
98pub enum SetFilterError {
99 #[error("invalid filter directives: {0}")]
101 InvalidDirectives(String),
102 #[error("reload handle is no longer valid")]
104 ReloadFailed,
105}
106
107#[derive(Clone)]
110pub struct LogCollector {
111 inner: Arc<CollectorInner>,
112}
113
114struct CollectorInner {
115 buffer: Mutex<RingBuffer>,
116 reload_handle: reload::Handle<EnvFilter, Registry>,
117 revert_task: Mutex<Option<AbortOnDropHandle<()>>>,
118}
119
120const OFF_DIRECTIVES: &str = "off";
123
124struct RingBuffer {
125 lines: VecDeque<LogLine>,
126 dropped: u32,
127 capacity: usize,
128 rate_per_second: u32,
129 window_start: Instant,
130 window_count: u32,
131}
132
133impl RingBuffer {
134 fn new(capacity: usize, rate_per_second: u32) -> Self {
135 Self {
136 lines: VecDeque::with_capacity(capacity.min(64)),
137 dropped: 0,
138 capacity,
139 rate_per_second,
140 window_start: Instant::now(),
141 window_count: 0,
142 }
143 }
144
145 fn push(&mut self, line: LogLine) {
146 let now = Instant::now();
147 if now.duration_since(self.window_start) >= Duration::from_secs(1) {
148 self.window_start = now;
149 self.window_count = 0;
150 }
151 if self.window_count >= self.rate_per_second {
152 self.dropped = self.dropped.saturating_add(1);
153 return;
154 }
155 self.window_count += 1;
156
157 if self.lines.len() == self.capacity {
158 self.lines.pop_front();
159 self.dropped = self.dropped.saturating_add(1);
160 }
161 self.lines.push_back(line);
162 }
163
164 fn drain(&mut self, max: usize) -> (Vec<LogLine>, u32) {
165 let take = self.lines.len().min(max);
166 let lines: Vec<LogLine> = self.lines.drain(..take).collect();
167 let dropped = std::mem::take(&mut self.dropped);
168 (lines, dropped)
169 }
170}
171
172impl LogCollector {
173 pub fn buffered(&self) -> usize {
175 self.inner.buffer.lock().expect("poisoned").lines.len()
176 }
177
178 pub fn drain(&self, max: usize) -> (Vec<LogLine>, u32) {
181 self.inner.buffer.lock().expect("poisoned").drain(max)
182 }
183
184 pub fn set_filter(
188 &self,
189 directives: &str,
190 expires_in: Option<Duration>,
191 revert_to: Option<&str>,
192 ) -> Result<(), SetFilterError> {
193 let filter = EnvFilter::try_new(directives)
194 .map_err(|e| SetFilterError::InvalidDirectives(e.to_string()))?;
195 self.inner
196 .reload_handle
197 .reload(filter)
198 .map_err(|_| SetFilterError::ReloadFailed)?;
199
200 let mut guard = self.inner.revert_task.lock().expect("poisoned");
201 *guard = None;
202
203 if let Some(expires_in) = expires_in {
204 let collector = self.clone();
205 let revert_to = revert_to.map(str::to_string);
206 let handle: JoinHandle<()> = n0_future::task::spawn(async move {
207 n0_future::time::sleep(expires_in).await;
208 let target = revert_to.as_deref();
209 if let Err(err) = collector.revert(target) {
210 warn!(?err, "failed to revert log filter");
211 }
212 });
213 *guard = Some(AbortOnDropHandle::new(handle));
214 }
215 Ok(())
216 }
217
218 pub fn revert(&self, to: Option<&str>) -> Result<(), SetFilterError> {
221 let directives = to.unwrap_or(OFF_DIRECTIVES);
222 let filter = EnvFilter::try_new(directives)
223 .map_err(|e| SetFilterError::InvalidDirectives(e.to_string()))?;
224 self.inner
225 .reload_handle
226 .reload(filter)
227 .map_err(|_| SetFilterError::ReloadFailed)
228 }
229}
230
231impl std::fmt::Debug for LogCollector {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 f.debug_struct("LogCollector")
234 .field("buffered", &self.buffered())
235 .finish()
236 }
237}
238
239pub fn install() -> Result<LogCollector, InstallError> {
246 let (collector, layer) = layer();
247 tracing_subscriber::registry()
248 .with(layer)
249 .try_init()
250 .map_err(|_| InstallError::AlreadyInstalled)?;
251 debug!("iroh-services log collector installed");
252 Ok(collector)
253}
254
255pub fn layer() -> (LogCollector, impl Layer<Registry> + Send + Sync + 'static) {
274 let filter = EnvFilter::try_new(OFF_DIRECTIVES).expect("'off' is always a valid directive");
276 let (filter, reload_handle) = reload::Layer::new(filter);
277
278 let inner = Arc::new(CollectorInner {
279 buffer: Mutex::new(RingBuffer::new(
280 DEFAULT_BUFFER_CAPACITY,
281 DEFAULT_RATE_PER_SECOND,
282 )),
283 reload_handle,
284 revert_task: Mutex::new(None),
285 });
286 let collector = LogCollector {
287 inner: inner.clone(),
288 };
289 let buffer_layer = BufferLayer { inner };
290 (collector, buffer_layer.with_filter(filter))
291}
292
293struct BufferLayer {
294 inner: Arc<CollectorInner>,
295}
296
297impl<S> Layer<S> for BufferLayer
298where
299 S: Subscriber + for<'a> LookupSpan<'a>,
300{
301 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
302 let metadata = event.metadata();
303 let mut timestamp = String::new();
304 let _ = SystemTime.format_time(&mut Writer::new(&mut timestamp));
305
306 let mut field_visitor = FieldVisitor::default();
307 event.record(&mut field_visitor);
308
309 let mut spans: Vec<SpanInfo> = Vec::new();
310 if let Some(scope) = ctx.event_scope(event) {
311 for span in scope.from_root() {
312 spans.push(SpanInfo {
313 name: span.name().to_string(),
314 fields: Vec::new(),
315 });
316 }
317 }
318
319 let line = LogLine {
320 timestamp,
321 level: metadata.level().to_string(),
322 target: metadata.target().to_string(),
323 fields: field_visitor.fields,
324 spans,
325 };
326
327 self.inner.buffer.lock().expect("poisoned").push(line);
328 }
329}
330
331#[derive(Default)]
332struct FieldVisitor {
333 fields: Vec<(String, FieldValue)>,
334}
335
336impl FieldVisitor {
337 fn push(&mut self, field: &tracing::field::Field, value: FieldValue) {
338 self.fields.push((field.name().to_string(), value));
339 }
340}
341
342impl tracing::field::Visit for FieldVisitor {
343 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
344 self.push(field, FieldValue::Str(value.to_string()));
345 }
346
347 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
348 self.push(field, FieldValue::I64(value));
349 }
350
351 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
352 self.push(field, FieldValue::U64(value));
353 }
354
355 fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
356 self.push(field, FieldValue::Other(value.to_string()));
357 }
358
359 fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
360 self.push(field, FieldValue::Other(value.to_string()));
361 }
362
363 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
364 self.push(field, FieldValue::Bool(value));
365 }
366
367 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
368 self.push(field, FieldValue::F64(value));
369 }
370
371 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
372 if field.name() == "message" {
377 self.push(field, FieldValue::Str(format!("{value:?}")));
378 } else {
379 self.push(field, FieldValue::Other(format!("{value:?}")));
380 }
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 fn message_is(line: &LogLine, want: &str) -> bool {
389 line.fields
390 .iter()
391 .any(|(k, v)| k == "message" && matches!(v, FieldValue::Str(s) if s == want))
392 }
393
394 #[test]
395 fn ring_buffer_rolls_over_oldest() {
396 let mut buf = RingBuffer::new(2, 1000);
397 for i in 0..5 {
398 buf.push(LogLine {
399 timestamp: format!("{i}"),
400 level: "INFO".into(),
401 target: "test".into(),
402 fields: Vec::new(),
403 spans: Vec::new(),
404 });
405 }
406 let (lines, dropped) = buf.drain(10);
407 assert_eq!(lines.len(), 2);
408 assert_eq!(lines[0].timestamp, "3");
409 assert_eq!(lines[1].timestamp, "4");
410 assert_eq!(dropped, 3);
411 }
412
413 #[test]
414 fn ring_buffer_throttles_per_second() {
415 let mut buf = RingBuffer::new(1000, 2);
416 for i in 0..10 {
417 buf.push(LogLine {
418 timestamp: format!("{i}"),
419 level: "INFO".into(),
420 target: "test".into(),
421 fields: Vec::new(),
422 spans: Vec::new(),
423 });
424 }
425 let (lines, dropped) = buf.drain(100);
426 assert_eq!(lines.len(), 2);
427 assert_eq!(dropped, 8);
428 }
429
430 #[tokio::test]
431 async fn collector_reload_changes_filter_then_reverts() {
432 let collector = match install() {
433 Ok(c) => c,
434 Err(InstallError::AlreadyInstalled) => return,
435 Err(e) => panic!("install: {e}"),
436 };
437
438 tracing::info!(target: "logtest", "should not appear yet");
440 let (lines, _) = collector.drain(100);
441 assert!(!lines.iter().any(|l| message_is(l, "should not appear yet")));
442
443 collector.set_filter("info", None, None).unwrap();
445 tracing::info!(target: "logtest", "first info");
446 tracing::trace!(target: "logtest", "should not appear");
447 let (lines, _) = collector.drain(100);
448 assert!(
449 lines
450 .iter()
451 .any(|l| l.target == "logtest" && message_is(l, "first info"))
452 );
453 assert!(!lines.iter().any(|l| message_is(l, "should not appear")));
454
455 collector
457 .set_filter("trace", Some(Duration::from_millis(150)), Some("info"))
458 .unwrap();
459 tracing::trace!(target: "logtest", "should appear");
460 let (lines, _) = collector.drain(100);
461 assert!(lines.iter().any(|l| message_is(l, "should appear")));
462
463 n0_future::time::sleep(Duration::from_millis(300)).await;
464 tracing::trace!(target: "logtest", "should not appear after revert");
465 let (lines, _) = collector.drain(100);
466 assert!(
467 !lines
468 .iter()
469 .any(|l| message_is(l, "should not appear after revert"))
470 );
471 }
472}