iroh_n0des/simulation/
trace.rs1use std::{
2 io::BufRead,
3 sync::{Arc, Mutex, OnceLock},
4};
5
6use tracing_subscriber::{
7 EnvFilter, Layer, Registry,
8 fmt::{MakeWriter, writer::MutexGuardWriter},
9 layer::SubscriberExt,
10 util::{SubscriberInitExt, TryInitError},
11};
12
13use super::proto::ActiveTrace;
14use crate::simulation::ENV_TRACE_SERVER;
15
16const ENV_RUST_LOG: &str = "RUST_LOG";
17const ENV_SIM_LOG: &str = "SIM_LOG";
18
19pub fn init() {
20 static DID_INIT: OnceLock<bool> = OnceLock::new();
21 if DID_INIT.set(true).is_ok() {
22 try_init().expect("unreachable: checked OnceLock before init");
23 }
24}
25
26pub fn create_writer_and_layer() -> (impl Layer<Registry>, LineWriter) {
27 let writer = LineWriter::default();
28 let layer = tracing_subscriber::fmt::layer()
29 .json()
30 .with_writer(writer.clone());
31 (layer, writer)
32}
33
34pub fn try_init() -> Result<(), TryInitError> {
35 let print_layer = if let Ok(directive) = std::env::var(ENV_RUST_LOG) {
36 let layer = tracing_subscriber::fmt::layer()
37 .with_writer(|| TestWriter)
38 .event_format(tracing_subscriber::fmt::format().with_line_number(true))
39 .with_level(true)
40 .with_filter(EnvFilter::new(directive));
41 Some(layer)
42 } else {
43 None
44 };
45
46 let json_layer = if std::env::var(ENV_TRACE_SERVER).is_ok() {
47 tracing::info!("setting up irpc tracing subscriber");
48 let directive = std::env::var(ENV_SIM_LOG).unwrap_or_else(|_| "debug".to_string());
49 let layer = tracing_subscriber::fmt::layer()
50 .json()
51 .with_writer(global_writer())
52 .with_filter(EnvFilter::new(directive));
53 Some(layer)
54 } else {
55 None
56 };
57 tracing_subscriber::registry()
58 .with(print_layer)
59 .with(json_layer)
60 .try_init()
61}
62
63pub fn global_writer() -> LineWriter {
64 static WRITER: OnceLock<LineWriter> = OnceLock::new();
65 WRITER.get_or_init(Default::default).clone()
66}
67
68pub async fn submit_logs(client: &ActiveTrace) -> anyhow::Result<()> {
69 let writer = global_writer();
70 writer.submit(client).await?;
71 Ok(())
72}
73
74pub fn get_logs() -> Vec<String> {
75 let writer = global_writer();
76 writer.get()
77}
78
79#[derive(Debug)]
84struct TestWriter;
85
86impl std::io::Write for TestWriter {
87 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
88 print!(
89 "{}",
90 std::str::from_utf8(buf).expect("tried to log invalid UTF-8")
91 );
92 Ok(buf.len())
93 }
94 fn flush(&mut self) -> std::io::Result<()> {
95 std::io::stdout().flush()
96 }
97}
98
99#[derive(Clone, Default)]
100pub struct LineWriter {
101 buf: Arc<Mutex<Vec<u8>>>,
102}
103
104impl<'a> MakeWriter<'a> for LineWriter {
105 type Writer = MutexGuardWriter<'a, Vec<u8>>;
106
107 fn make_writer(&'a self) -> Self::Writer {
108 self.buf.make_writer()
109 }
110}
111
112impl LineWriter {
113 pub fn clear(&self) {
114 self.buf.lock().expect("lock poisoned").clear();
115 }
116
117 pub fn get(&self) -> Vec<String> {
118 let mut buf = self.buf.lock().expect("lock poisoned");
119 let lines = buf
120 .lines()
121 .filter_map(|line| match line {
122 Ok(line) => Some(line),
123 Err(err) => {
124 tracing::warn!("Skipping invalid log line: {err:?}");
125 None
126 }
127 })
128 .collect();
129 buf.clear();
130 lines
131 }
132
133 pub async fn submit(&self, client: &ActiveTrace) -> anyhow::Result<()> {
134 let lines = self.get();
135 client.put_logs(lines).await?;
136 Ok(())
137 }
138}