iroh_n0des/
simulation.rs

1use std::{
2    any::Any,
3    fmt::Debug,
4    marker::PhantomData,
5    pin::Pin,
6    sync::{Arc, RwLock},
7    time::Duration,
8};
9
10use anyhow::{Context, Result};
11use bytes::Bytes;
12use iroh::{Endpoint, NodeAddr, NodeId, SecretKey, Watcher};
13use iroh_metrics::encoding::Encoder;
14use iroh_n0des::{
15    Registry,
16    simulation::proto::{ActiveTrace, NodeInfo, TraceClient, TraceInfo},
17};
18use n0_future::IterExt;
19use n0_watcher::Watchable;
20use proto::{GetTraceResponse, NodeInfoWithAddr, Scope};
21use serde::{Serialize, de::DeserializeOwned};
22use tokio::{sync::Semaphore, time::MissedTickBehavior};
23use tokio_util::sync::CancellationToken;
24use tracing::{Instrument, debug, error_span, info, warn};
25use uuid::Uuid;
26
27use crate::simulation::proto::CheckpointId;
28
29pub mod events;
30pub mod proto;
31pub mod trace;
32
33/// Environment variable name for running a single isolated node by index.
34pub const ENV_TRACE_ISOLATED: &str = "N0DES_TRACE_ISOLATED";
35/// Environment variable name for initialization-only mode.
36pub const ENV_TRACE_INIT_ONLY: &str = "N0DES_TRACE_INIT_ONLY";
37/// Environment variable name for the trace server address.
38pub const ENV_TRACE_SERVER: &str = "N0DES_TRACE_SERVER";
39/// Environment variable name for the simulation session ID.
40pub const ENV_TRACE_SESSION_ID: &str = "N0DES_SESSION_ID";
41
42const METRICS_INTERVAL: Duration = Duration::from_secs(1);
43
44type BoxedSetupFn<D> = Box<dyn 'static + Send + Sync + FnOnce() -> BoxFuture<'static, Result<D>>>;
45
46type BoxedSpawnFn<D> = Arc<
47    dyn 'static
48        + Send
49        + Sync
50        + for<'a> Fn(&'a mut SpawnContext<'a, D>) -> BoxFuture<'a, Result<BoxNode>>,
51>;
52type BoxedRoundFn<D> = Arc<
53    dyn 'static
54        + Send
55        + Sync
56        + for<'a> Fn(&'a mut BoxNode, &'a RoundContext<'a, D>) -> BoxFuture<'a, Result<bool>>,
57>;
58type RoundLabelFn<D> = Arc<dyn 'static + Send + Sync + for<'a> Fn(&'a D, u32) -> Option<String>>;
59
60type BoxedCheckFn<D> = Arc<dyn Fn(&BoxNode, &RoundContext<'_, D>) -> Result<()>>;
61
62/// Helper trait for async functions.
63///
64/// This is needed because with a simple `impl Fn() -> Fut`, we can't
65/// express a variadic lifetime bound from the future to the function parameter.
66/// `impl AsyncFn` would allow this, but that doesn't allow to express a `Send`
67/// bound on the future.
68pub trait AsyncCallback<'a, A1: 'a, A2: 'a, T: 'a>:
69    'static + Send + Sync + Fn(&'a mut A1, &'a A2) -> Self::Fut
70{
71    type Fut: Future<Output = T> + Send;
72}
73
74impl<'a, A1: 'a, A2: 'a, T: 'a, Out, F> AsyncCallback<'a, A1, A2, T> for F
75where
76    Out: Send + Future<Output = T>,
77    F: 'static + Sync + Send + Fn(&'a mut A1, &'a A2) -> Out,
78{
79    type Fut = Out;
80}
81
82/// Trait for user-defined setup data that can be shared across simulation nodes.
83///
84/// The setup data must be serializable, deserializable, cloneable, and thread-safe
85/// to be distributed across simulation nodes.
86pub trait SetupData: Serialize + DeserializeOwned + Send + Sync + Clone + Debug + 'static {}
87impl<T> SetupData for T where T: Serialize + DeserializeOwned + Send + Sync + Clone + Debug + 'static
88{}
89
90/// Context provided when spawning a new simulation node.
91///
92/// Contains all the necessary information and resources for initializing
93/// a node, including its index, the shared setup data, and a metrics registry.
94pub struct SpawnContext<'a, D = ()> {
95    secret_key: SecretKey,
96    node_idx: u32,
97    setup_data: &'a D,
98    registry: &'a mut Registry,
99}
100
101impl<'a, D: SetupData> SpawnContext<'a, D> {
102    /// Returns the index of this node in the simulation.
103    pub fn node_index(&self) -> u32 {
104        self.node_idx
105    }
106
107    /// Returns a reference to the setup data for this simulation.
108    pub fn setup_data(&self) -> &D {
109        self.setup_data
110    }
111
112    /// Returns a mutable reference to a metrics registry.
113    ///
114    /// Use this to register custom metrics for the node being spawned.
115    pub fn metrics_registry(&mut self) -> &mut Registry {
116        self.registry
117    }
118
119    /// Returns the secret key for this node.
120    pub fn secret_key(&self) -> SecretKey {
121        self.secret_key.clone()
122    }
123
124    /// Returns the node id of this node.
125    pub fn node_id(&self) -> NodeId {
126        self.secret_key.public()
127    }
128
129    /// Creates and binds a new endpoint with this node's secret key.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the endpoint fails to bind to a local address.
134    pub async fn bind_endpoint(&self) -> Result<Endpoint> {
135        let ep = Endpoint::builder()
136            .discovery_n0()
137            .secret_key(self.secret_key())
138            .bind()
139            .await?;
140        Ok(ep)
141    }
142}
143
144/// Context provided during each simulation round.
145///
146/// Contains information about the current round, this node's identity,
147/// the shared setup data, and the addresses of all participating nodes.
148pub struct RoundContext<'a, D = ()> {
149    round: u32,
150    node_index: u32,
151    setup_data: &'a D,
152    all_nodes: &'a Vec<NodeInfoWithAddr>,
153}
154
155impl<'a, D> RoundContext<'a, D> {
156    /// Returns the current round number.
157    pub fn round(&self) -> u32 {
158        self.round
159    }
160
161    /// Returns the index of this node in the simulation.
162    pub fn node_index(&self) -> u32 {
163        self.node_index
164    }
165
166    /// Returns a reference to the shared setup data for this simulation.
167    pub fn setup_data(&self) -> &D {
168        self.setup_data
169    }
170
171    /// Returns an iterator over the addresses of all nodes except the specified one.
172    pub fn all_other_nodes(&self, me: NodeId) -> impl Iterator<Item = &NodeAddr> + '_ {
173        self.all_nodes
174            .iter()
175            .filter(move |n| n.info.node_id != Some(me))
176            .flat_map(|n| &n.addr)
177    }
178
179    /// Returns the address of the node with the given index.
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if no node with the specified index exists.
184    pub fn addr(&self, idx: u32) -> Result<NodeAddr> {
185        self.all_nodes
186            .iter()
187            .find(|n| n.info.idx == idx)
188            .cloned()
189            .context("node not found")?
190            .addr
191            .context("node has no address")
192    }
193
194    /// Returns the address of this node.
195    pub fn self_addr(&self) -> Option<&NodeAddr> {
196        self.all_nodes
197            .iter()
198            .find(|n| n.info.idx == self.node_index)
199            .and_then(|info| info.addr.as_ref())
200    }
201
202    pub fn try_self_addr(&self) -> Result<&NodeAddr> {
203        self.self_addr().context("missing node address")
204    }
205
206    /// Returns the total number of nodes participating in the simulation.
207    pub fn node_count(&self) -> usize {
208        self.all_nodes.len()
209    }
210}
211
212/// Trait for types that can be spawned as simulation nodes.
213///
214/// This trait is generic over `D: SetupData`, which is the type returned from the
215/// user-defined setup function (see [`Builder::with_setup`]). If not using the setup
216/// step, `D` defaults to the unit type `()`.
217///
218/// Implement this trait on your node type to be able to spawn the node in a simulation
219/// context. The only required method is [`Spawn::spawn`], which must return your spawned node.
220pub trait Spawn<D: SetupData = ()>: Node + 'static {
221    /// Spawns a new instance of this node type.
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if the node fails to initialize properly.
226    fn spawn(context: &mut SpawnContext<'_, D>) -> impl Future<Output = Result<Self>> + Send
227    where
228        Self: Sized;
229
230    fn round_label(_setup_data: &D, _round: u32) -> Option<String> {
231        None
232    }
233
234    /// Spawns a new instance as a dynamically-typed node.
235    ///
236    /// This calls `spawn` and boxes the result.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if the node fails to initialize properly.
241    fn spawn_dyn<'a>(context: &'a mut SpawnContext<'a, D>) -> BoxFuture<'a, Result<BoxNode>>
242    where
243        Self: Sized,
244    {
245        Box::pin(async move {
246            let node = Self::spawn(context).await?;
247            let node: Box<dyn DynNode> = Box::new(node);
248            anyhow::Ok(node)
249        })
250    }
251
252    /// Creates a new builder for this node type with the given round function.
253    ///
254    /// The round function will be called each simulation round and should return
255    /// `Ok(true)` to continue or `Ok(false)` to stop early.
256    fn builder(
257        round_fn: impl for<'a> AsyncCallback<'a, Self, RoundContext<'a, D>, Result<bool>>,
258    ) -> NodeBuilder<Self, D>
259    where
260        Self: Sized,
261    {
262        NodeBuilder::new(round_fn)
263    }
264}
265
266/// Trait for simulation node implementations.
267///
268/// Provides basic functionality for nodes including optional endpoint access
269/// and cleanup on shutdown.
270///
271/// For a node to be usable in a simulation, you also need to implement [`Spawn`]
272/// for your node struct.
273pub trait Node: Send + 'static {
274    /// Returns a reference to this node's endpoint, if any.
275    ///
276    /// The default implementation returns `None`.
277    fn endpoint(&self) -> Option<&Endpoint> {
278        None
279    }
280
281    /// Shuts down this node, performing any necessary cleanup.
282    ///
283    /// The default implementation does nothing and returns success.
284    ///
285    /// # Errors
286    ///
287    /// Returns an error if shutdown fails.
288    fn shutdown(&mut self) -> impl Future<Output = Result<()>> + Send + '_ {
289        async { Ok(()) }
290    }
291}
292
293type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
294
295/// A boxed dynamically-typed simulation node.
296pub type BoxNode = Box<dyn DynNode>;
297
298/// Trait for dynamically-typed simulation nodes.
299///
300/// This trait enables type erasure for nodes while preserving essential
301/// functionality like shutdown, endpoint access, and type casting.
302pub trait DynNode: Send + Any + 'static {
303    /// Shuts down this node, performing any necessary cleanup.
304    ///
305    /// The default implementation does nothing and returns success.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if shutdown fails.
310    fn shutdown(&mut self) -> BoxFuture<'_, Result<()>> {
311        Box::pin(async { Ok(()) })
312    }
313
314    /// Returns a reference to this node's endpoint, if any.
315    ///
316    /// The default implementation returns `None`.
317    fn endpoint(&self) -> Option<&Endpoint> {
318        None
319    }
320
321    fn round_label(&self, _round: u32) -> Option<String> {
322        None
323    }
324
325    /// Returns a reference to this node as `Any` for downcasting.
326    fn as_any(&self) -> &dyn Any;
327
328    /// Returns a mutable reference to this node as `Any` for downcasting.
329    fn as_any_mut(&mut self) -> &mut dyn Any;
330}
331
332impl<T: Node + Sized> DynNode for T {
333    fn shutdown(&mut self) -> BoxFuture<'_, Result<()>> {
334        Box::pin(<Self as Node>::shutdown(self))
335    }
336
337    fn endpoint(&self) -> Option<&Endpoint> {
338        <Self as Node>::endpoint(self)
339    }
340
341    fn as_any(&self) -> &dyn Any {
342        self
343    }
344
345    fn as_any_mut(&mut self) -> &mut dyn Any {
346        self
347    }
348}
349
350#[derive()]
351/// Builder for constructing simulation configurations.
352///
353/// Allows configuring the setup function, node spawners, and number of rounds
354/// before building the final simulation.
355pub struct Builder<D = ()> {
356    setup_fn: BoxedSetupFn<D>,
357    node_builders: Vec<NodeBuilderWithCount<D>>,
358    rounds: u32,
359}
360
361#[derive(Clone)]
362/// Builder for configuring individual nodes in a simulation.
363///
364/// Provides methods to set up spawn functions, round functions, and optional
365/// check functions for a specific node type.
366pub struct NodeBuilder<N, D> {
367    phantom: PhantomData<N>,
368    spawn_fn: BoxedSpawnFn<D>,
369    round_fn: BoxedRoundFn<D>,
370    round_label_fn: RoundLabelFn<D>,
371    check_fn: Option<BoxedCheckFn<D>>,
372}
373
374#[derive(Clone)]
375struct ErasedNodeBuilder<D> {
376    spawn_fn: BoxedSpawnFn<D>,
377    round_fn: BoxedRoundFn<D>,
378    round_label_fn: RoundLabelFn<D>,
379    check_fn: Option<BoxedCheckFn<D>>,
380}
381
382impl<T, N: Spawn<D>, D: SetupData> From<T> for NodeBuilder<N, D>
383where
384    T: for<'a> AsyncCallback<'a, N, RoundContext<'a, D>, Result<bool>>,
385{
386    fn from(value: T) -> Self {
387        Self::new(value)
388    }
389}
390
391impl<N: Spawn<D>, D: SetupData> NodeBuilder<N, D> {
392    /// Creates a new node builder with the given round function.
393    ///
394    /// The round function will be called each simulation round and should return
395    /// `Ok(true)` to continue or `Ok(false)` to stop early.
396    pub fn new(
397        round_fn: impl for<'a> AsyncCallback<'a, N, RoundContext<'a, D>, Result<bool>>,
398    ) -> Self {
399        let spawn_fn: BoxedSpawnFn<D> = Arc::new(N::spawn_dyn);
400        let round_label_fn: RoundLabelFn<D> = Arc::new(N::round_label);
401        let round_fn: BoxedRoundFn<D> = Arc::new(move |node, context| {
402            let node = node
403                .as_any_mut()
404                .downcast_mut::<N>()
405                .expect("unreachable: type is statically guaranteed");
406            Box::pin(round_fn(node, context))
407        });
408        Self {
409            phantom: PhantomData,
410            spawn_fn,
411            round_fn,
412            check_fn: None,
413            round_label_fn,
414        }
415    }
416
417    /// Adds a check function that will be called after each round.
418    ///
419    /// The check function can verify node state and return an error to fail
420    /// the simulation if invariants are violated.
421    ///
422    /// # Errors
423    ///
424    /// The check function should return an error if validation fails.
425    pub fn check(
426        mut self,
427        check_fn: impl 'static + for<'a> Fn(&'a N, &RoundContext<'a, D>) -> Result<()>,
428    ) -> Self {
429        let check_fn: BoxedCheckFn<D> = Arc::new(move |node, context| {
430            let node = node
431                .as_any()
432                .downcast_ref::<N>()
433                .expect("unreachable: type is statically guaranteed");
434            check_fn(node, context)
435        });
436        self.check_fn = Some(check_fn);
437        self
438    }
439
440    fn erase(self) -> ErasedNodeBuilder<D> {
441        ErasedNodeBuilder {
442            spawn_fn: self.spawn_fn,
443            round_fn: self.round_fn,
444            check_fn: self.check_fn,
445            round_label_fn: self.round_label_fn,
446        }
447    }
448}
449
450struct SimNode<D> {
451    node: BoxNode,
452    trace_id: Uuid,
453    idx: u32,
454    round_fn: BoxedRoundFn<D>,
455    check_fn: Option<BoxedCheckFn<D>>,
456    round_label_fn: RoundLabelFn<D>,
457    round: u32,
458    info: NodeInfo,
459    metrics: Arc<RwLock<Registry>>,
460    checkpoint_watcher: n0_watcher::Watchable<CheckpointId>,
461    all_nodes: Vec<NodeInfoWithAddr>,
462}
463
464impl<D: SetupData> SimNode<D> {
465    async fn spawn_and_run(
466        builder: NodeBuilderWithIdx<D>,
467        client: TraceClient,
468        trace_id: Uuid,
469        setup_data: &D,
470        rounds: u32,
471    ) -> Result<()> {
472        let secret_key = SecretKey::generate(&mut rand::rng());
473        let NodeBuilderWithIdx { node_idx, builder } = builder;
474        let info = NodeInfo {
475            // TODO: Only assign node id if endpoint was created.
476            node_id: Some(secret_key.public()),
477            idx: node_idx,
478            label: None,
479        };
480        let mut registry = Registry::default();
481        let mut context = SpawnContext {
482            setup_data,
483            node_idx,
484            secret_key,
485            registry: &mut registry,
486        };
487        let node = (builder.spawn_fn)(&mut context).await?;
488
489        if let Some(endpoint) = node.endpoint() {
490            registry.register_all(endpoint.metrics());
491        }
492
493        let mut node = Self {
494            node,
495            trace_id,
496            idx: node_idx,
497            info,
498            round: 0,
499            round_fn: builder.round_fn,
500            check_fn: builder.check_fn,
501            round_label_fn: builder.round_label_fn,
502            checkpoint_watcher: Watchable::new(0),
503            metrics: Arc::new(RwLock::new(registry)),
504            all_nodes: Default::default(),
505        };
506
507        let res = node
508            .run(&client, setup_data, rounds)
509            .await
510            .with_context(|| format!("node {} failed", node.idx));
511        if let Err(err) = &res {
512            warn!("node failed: {err:#}");
513        }
514        res
515    }
516
517    async fn run(&mut self, client: &TraceClient, setup_data: &D, rounds: u32) -> Result<()> {
518        let client = client.start_node(self.trace_id, self.info.clone()).await?;
519
520        info!(idx = self.idx, "start");
521
522        // Spawn a task to periodically put metrics.
523        if let Some(node_id) = self.node_id() {
524            let client = client.clone();
525            let mut watcher = self.checkpoint_watcher.watch();
526            let mut metrics_encoder = Encoder::new(self.metrics.clone());
527            tokio::task::spawn(
528                async move {
529                    let mut interval = tokio::time::interval(METRICS_INTERVAL);
530                    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
531                    loop {
532                        let checkpoint = tokio::select! {
533                            _ = interval.tick() => None,
534                            checkpoint = watcher.updated() => {
535                                match checkpoint {
536                                    Err(_) => break,
537                                    Ok(checkpoint) => Some(checkpoint)
538                                }
539                            }
540                        };
541                        if let Err(err) = client
542                            .put_metrics(node_id, checkpoint, metrics_encoder.export())
543                            .await
544                        {
545                            warn!(?err, "failed to put metrics, stop metrics task");
546                            break;
547                        }
548                    }
549                }
550                .instrument(error_span!("metrics")),
551            );
552        }
553
554        let info = NodeInfoWithAddr {
555            addr: self.my_addr().await,
556            info: self.info.clone(),
557        };
558        let all_nodes = client.wait_start(info).await?;
559        self.all_nodes = all_nodes;
560
561        let result = self.run_rounds(&client, setup_data, rounds).await;
562
563        if let Err(err) = self.node.shutdown().await {
564            warn!("failure during node shutdown: {err:#}");
565        }
566
567        client.end(to_str_err(&result)).await?;
568
569        result
570    }
571
572    async fn run_rounds(
573        &mut self,
574        client: &ActiveTrace,
575        setup_data: &D,
576        rounds: u32,
577    ) -> Result<()> {
578        while self.round < rounds {
579            if !self
580                .run_round(client, setup_data)
581                .await
582                .with_context(|| format!("failed at round {}", self.round))?
583            {
584                return Ok(());
585            }
586            self.round += 1;
587        }
588        Ok(())
589    }
590
591    #[tracing::instrument(name="round", skip_all, fields(round=self.round))]
592    async fn run_round(&mut self, client: &ActiveTrace, setup_data: &D) -> Result<bool> {
593        let context = RoundContext {
594            round: self.round,
595            node_index: self.idx,
596            setup_data,
597            all_nodes: &self.all_nodes,
598        };
599
600        let label = (self.round_label_fn)(setup_data, self.round)
601            .unwrap_or_else(|| format!("Round {}", self.round));
602
603        info!(%label, "start round");
604
605        let result = (self.round_fn)(&mut self.node, &context)
606            .await
607            .context("round function failed");
608
609        info!(%label, "end round");
610
611        let checkpoint = (context.round + 1) as u64;
612        self.checkpoint_watcher.set(checkpoint).ok();
613        client
614            .put_checkpoint(checkpoint, Some(label), to_str_err(&result))
615            .await
616            .context("put checkpoint")?;
617
618        client
619            .wait_checkpoint(checkpoint)
620            .await
621            .context("wait checkpoint")?;
622
623        match result {
624            Ok(out) => {
625                if let Some(check_fn) = self.check_fn.as_ref() {
626                    (check_fn)(&self.node, &context).context("check function failed")?;
627                }
628                Ok(out)
629            }
630            Err(err) => Err(err),
631        }
632    }
633
634    fn node_id(&self) -> Option<NodeId> {
635        self.info.node_id
636    }
637
638    async fn my_addr(&self) -> Option<NodeAddr> {
639        if let Some(endpoint) = self.node.endpoint() {
640            Some(node_addr(endpoint).await)
641        } else {
642            None
643        }
644    }
645}
646
647async fn node_addr(endpoint: &Endpoint) -> NodeAddr {
648    endpoint.online().await;
649    endpoint.node_addr()
650}
651
652impl Default for Builder<()> {
653    fn default() -> Self {
654        Self::new()
655    }
656}
657
658impl Builder<()> {
659    /// Creates a new simulation builder with empty setup data.
660    pub fn new() -> Builder<()> {
661        let setup_fn: BoxedSetupFn<()> = Box::new(move || Box::pin(async move { Ok(()) }));
662        Builder {
663            node_builders: Vec::new(),
664            setup_fn,
665            rounds: 0,
666        }
667    }
668}
669impl<D: SetupData> Builder<D> {
670    /// Creates a new simulation builder with a setup function for setup data.
671    ///
672    /// The setup function is called once before the simulation starts to
673    /// initialize the setup data that will be shared across all nodes.
674    ///
675    /// The setup function can return any type that implements [`SetupData`],
676    /// which is an auto-implemented supertrait for all types that are
677    /// serializable, cloneable, and thread-safe. See [`SetupData`] for details.
678    ///
679    /// # Errors
680    ///
681    /// The setup function should return an error if initialization fails.
682    pub fn with_setup<F, Fut>(setup_fn: F) -> Builder<D>
683    where
684        F: 'static + Send + Sync + FnOnce() -> Fut,
685        Fut: 'static + Send + Future<Output = Result<D>>,
686    {
687        let setup_fn: BoxedSetupFn<D> = Box::new(move || Box::pin(setup_fn()));
688        Builder {
689            node_builders: Vec::new(),
690            setup_fn,
691            rounds: 0,
692        }
693    }
694
695    /// Sets the number of rounds this simulation will run.
696    pub fn rounds(mut self, rounds: u32) -> Self {
697        self.rounds = rounds;
698        self
699    }
700
701    /// Adds a group of nodes to spawn in this simulation.
702    ///
703    /// Each node will be created using the provided node builder configuration.
704    ///
705    /// You can create a [`NodeBuilder`] from any type that implements [`Spawn<D>`] where
706    /// `D` is the type returned from [`Self::with_setup`]. If you are not using the setup
707    /// step, `D` defaults to the unit type `()`.
708    pub fn spawn<N: Spawn<D>>(
709        mut self,
710        node_count: u32,
711        node_builder: impl Into<NodeBuilder<N, D>>,
712    ) -> Self {
713        let node_builder = node_builder.into();
714        self.node_builders.push(NodeBuilderWithCount {
715            count: node_count,
716            builder: node_builder.erase(),
717        });
718        self
719    }
720
721    /// Builds the final simulation from this configuration.
722    ///
723    /// This method initializes tracing, runs the setup function, and prepares
724    /// all nodes for execution based on the current run mode.
725    ///
726    /// # Errors
727    ///
728    /// Returns an error if setup fails, tracing initialization fails, or
729    /// the configuration is invalid for the current run mode.
730    pub async fn build(self, name: &str) -> Result<Simulation<D>> {
731        let client = TraceClient::from_env_or_local()?;
732        let run_mode = RunMode::from_env()?;
733
734        debug!(%name, ?run_mode, "build simulation run");
735
736        let (trace_id, setup_data) = if matches!(run_mode, RunMode::InitOnly | RunMode::Integrated)
737        {
738            let setup_data = (self.setup_fn)().await?;
739            let encoded_setup_data = Bytes::from(postcard::to_stdvec(&setup_data)?);
740            let node_count = self.node_builders.iter().map(|builder| builder.count).sum();
741            let trace_id = client
742                .init_trace(
743                    TraceInfo {
744                        name: name.to_string(),
745                        node_count,
746                        expected_checkpoints: Some(self.rounds as u64),
747                    },
748                    Some(encoded_setup_data),
749                )
750                .await?;
751            info!(%name, node_count, %trace_id, "init simulation");
752
753            (trace_id, setup_data)
754        } else {
755            let info = client.get_trace(name.to_string()).await?;
756            let GetTraceResponse {
757                trace_id,
758                info,
759                setup_data,
760            } = info;
761            info!(%name, node_count=info.node_count, %trace_id, "get simulation");
762            let setup_data = setup_data.context("expected setup data to be set")?;
763            let setup_data: D =
764                postcard::from_bytes(&setup_data).context("failed to decode setup data")?;
765            (trace_id, setup_data)
766        };
767
768        let mut node_builders = self
769            .node_builders
770            .into_iter()
771            .flat_map(|builder| (0..builder.count).map(move |_| builder.builder.clone()))
772            .enumerate()
773            .map(|(node_idx, builder)| NodeBuilderWithIdx {
774                node_idx: node_idx as u32,
775                builder,
776            });
777
778        let node_builders: Vec<_> = match run_mode {
779            RunMode::InitOnly => vec![],
780            RunMode::Integrated => node_builders.collect(),
781            RunMode::Isolated(idx) => vec![
782                node_builders
783                    .nth(idx as usize)
784                    .context("invalid isolated index")?,
785            ],
786        };
787
788        Ok(Simulation {
789            run_mode,
790            max_rounds: self.rounds,
791            node_builders,
792            client,
793            trace_id,
794            setup_data,
795        })
796    }
797}
798
799struct NodeBuilderWithCount<D> {
800    count: u32,
801    builder: ErasedNodeBuilder<D>,
802}
803
804struct NodeBuilderWithIdx<D> {
805    node_idx: u32,
806    builder: ErasedNodeBuilder<D>,
807}
808
809/// A configured simulation ready to run.
810///
811/// Contains all the necessary components including the setup data, node spawners,
812/// and tracing client to execute a simulation run.
813pub struct Simulation<D> {
814    trace_id: Uuid,
815    run_mode: RunMode,
816    client: TraceClient,
817    setup_data: D,
818    max_rounds: u32,
819    node_builders: Vec<NodeBuilderWithIdx<D>>,
820}
821
822impl<D: SetupData> Simulation<D> {
823    /// Runs this simulation to completion.
824    ///
825    /// Spawns all configured nodes concurrently and executes the specified
826    /// number of simulation rounds.
827    ///
828    /// # Errors
829    ///
830    /// Returns an error if any node fails to spawn or if any round fails to execute.
831    pub async fn run(self) -> Result<()> {
832        let cancel_token = CancellationToken::new();
833
834        // Spawn a task to submit logs.
835        let logs_scope = match self.run_mode {
836            RunMode::Isolated(idx) => Some(Scope::Isolated(idx)),
837            RunMode::Integrated => Some(Scope::Integrated),
838            // Do not push logs for init-only runs.
839            RunMode::InitOnly => None,
840        };
841        let logs_task = if let Some(scope) = logs_scope {
842            Some(spawn_logs_task(
843                self.client.clone(),
844                self.trace_id,
845                scope,
846                cancel_token.clone(),
847            ))
848        } else {
849            None
850        };
851
852        // Spawn and run all nodes concurrently.
853        let result = self
854            .node_builders
855            .into_iter()
856            .map(async |builder| {
857                let span = error_span!("sim-node", idx = builder.node_idx);
858                SimNode::spawn_and_run(
859                    builder,
860                    self.client.clone(),
861                    self.trace_id,
862                    &self.setup_data,
863                    self.max_rounds,
864                )
865                .instrument(span)
866                .await
867            })
868            .try_join_all()
869            .await
870            .map(|_list| ());
871
872        cancel_token.cancel();
873        if let Some(join_handle) = logs_task {
874            join_handle.await?;
875        }
876
877        if matches!(self.run_mode, RunMode::Integrated) {
878            self.client
879                .close_trace(self.trace_id, to_str_err(&result))
880                .await?;
881        }
882
883        result
884    }
885}
886
887#[derive(Debug, Copy, Clone)]
888enum RunMode {
889    InitOnly,
890    Integrated,
891    Isolated(u32),
892}
893
894impl RunMode {
895    fn from_env() -> Result<Self> {
896        if std::env::var(ENV_TRACE_INIT_ONLY).is_ok() {
897            Ok(Self::InitOnly)
898        } else {
899            match std::env::var(ENV_TRACE_ISOLATED) {
900                Err(_) => Ok(Self::Integrated),
901                Ok(s) => {
902                    let idx = s.parse().with_context(|| {
903                        format!("Failed to parse env var `{ENV_TRACE_ISOLATED}` as number")
904                    })?;
905                    Ok(Self::Isolated(idx))
906                }
907            }
908        }
909    }
910}
911
912/// Spawns a task that periodically submits the collected logs from our global
913/// tracing subscriber to a trace server.
914fn spawn_logs_task(
915    client: TraceClient,
916    trace_id: Uuid,
917    scope: Scope,
918    cancel_token: CancellationToken,
919) -> tokio::task::JoinHandle<()> {
920    tokio::task::spawn(async move {
921        loop {
922            if cancel_token
923                .run_until_cancelled(tokio::time::sleep(Duration::from_secs(1)))
924                .await
925                .is_none()
926            {
927                break;
928            }
929            let lines = self::trace::get_logs();
930            if lines.is_empty() {
931                continue;
932            }
933            // 500 chosen so that we stay below ~16MB of logs (irpc's MAX_MESSAGE_SIZE limit).
934            // This gives us ~32KB per log line on average.
935            for lines_chunk in lines.chunks(500) {
936                if let Err(e) = client.put_logs(trace_id, scope, lines_chunk.to_vec()).await {
937                    eprintln!(
938                        "warning: failed to submit logs due to error, stopping log submission now: {e:?}"
939                    );
940                    break;
941                }
942            }
943            if cancel_token.is_cancelled() {
944                break;
945            }
946        }
947    })
948}
949
950static PERMIT: Semaphore = Semaphore::const_new(1);
951
952/// Runs a simulation function with proper setup and cleanup.
953///
954/// This function handles tracing initialization, sequential execution (via semaphore),
955/// log management, and error reporting for simulation functions.
956///
957/// # Errors
958///
959/// Returns an error if the simulation function fails, the builder fails,
960/// or the simulation execution fails.
961#[doc(hidden)]
962pub async fn run_sim_fn<F, Fut, D, E>(name: &str, sim_fn: F) -> anyhow::Result<()>
963where
964    F: Fn() -> Fut,
965    Fut: Future<Output = Result<Builder<D>, E>>,
966    D: SetupData,
967    anyhow::Error: From<E>,
968{
969    // Ensure simulations run sequentially so that we can extract logs properly.
970    let permit = PERMIT.acquire().await.expect("semaphore closed");
971
972    // Init the global tracing subscriber.
973    self::trace::init();
974    // Clear remaining logs from previous runs.
975    self::trace::global_writer().clear();
976
977    eprintln!("running simulation: {name}");
978    let result = sim_fn()
979        .await
980        .map_err(anyhow::Error::from)
981        .with_context(|| format!("simulation builder function `{name}` failed"))?
982        .build(name)
983        .await
984        .with_context(|| format!("simulation `{name}` failed to start"))?
985        .run()
986        .await
987        .with_context(|| format!("simulation `{name}` failed to complete"));
988
989    match &result {
990        Ok(()) => eprintln!("simulation `{name}` passed"),
991        Err(err) => eprintln!("simulation `{name}` failed: {err:#}"),
992    };
993
994    drop(permit);
995
996    result
997}
998
999fn to_str_err<T>(res: &Result<T, anyhow::Error>) -> Result<(), String> {
1000    if let Some(err) = res.as_ref().err() {
1001        Err(format!("{err:?}"))
1002    } else {
1003        Ok(())
1004    }
1005}