iroh_n0des/
simulation.rs

1use std::{
2    any::Any,
3    fmt::Debug,
4    marker::PhantomData,
5    pin::Pin,
6    sync::{Arc, RwLock},
7    time::Duration,
8};
9
10use anyhow::{Context, Result};
11use bytes::Bytes;
12use iroh::{Endpoint, NodeAddr, NodeId, SecretKey, Watcher};
13use iroh_metrics::encoding::Encoder;
14use iroh_n0des::{
15    Registry,
16    simulation::proto::{ActiveTrace, NodeInfo, TraceClient, TraceInfo},
17};
18use n0_future::IterExt;
19use n0_watcher::Watchable;
20use proto::{GetTraceResponse, NodeInfoWithAddr, Scope};
21use serde::{Serialize, de::DeserializeOwned};
22use tokio::{sync::Semaphore, time::MissedTickBehavior};
23use tokio_util::sync::CancellationToken;
24use tracing::{Instrument, debug, error_span, info, warn};
25use uuid::Uuid;
26
27use crate::simulation::proto::CheckpointId;
28
29pub mod events;
30pub mod proto;
31pub mod trace;
32
33/// Environment variable name for running a single isolated node by index.
34pub const ENV_TRACE_ISOLATED: &str = "N0DES_TRACE_ISOLATED";
35/// Environment variable name for initialization-only mode.
36pub const ENV_TRACE_INIT_ONLY: &str = "N0DES_TRACE_INIT_ONLY";
37/// Environment variable name for the trace server address.
38pub const ENV_TRACE_SERVER: &str = "N0DES_TRACE_SERVER";
39/// Environment variable name for the simulation session ID.
40pub const ENV_TRACE_SESSION_ID: &str = "N0DES_SESSION_ID";
41
42const METRICS_INTERVAL: Duration = Duration::from_secs(1);
43
44type BoxedSpawnFn<C> = Arc<
45    dyn 'static
46        + Send
47        + Sync
48        + for<'a> Fn(&'a mut SpawnContext<'a, C>) -> BoxFuture<'a, Result<BoxNode<C>>>,
49>;
50type BoxedRoundFn<C> = Arc<
51    dyn 'static
52        + Send
53        + Sync
54        + for<'a> Fn(&'a mut BoxNode<C>, &'a RoundContext<'a, C>) -> BoxFuture<'a, Result<bool>>,
55>;
56
57type BoxedCheckFn<C> = Arc<dyn Fn(&BoxNode<C>, &RoundContext<'_, C>) -> Result<()>>;
58
59/// Helper trait for async functions.
60///
61/// This is needed because with a simple `impl Fn() -> Fut`, we can't
62/// express a variadic lifetime bound from the future to the function parameter.
63/// `impl AsyncFn` would allow this, but that doesn't allow to express a `Send`
64/// bound on the future.
65pub trait AsyncCallback<'a, A1: 'a, A2: 'a, T: 'a>:
66    'static + Send + Sync + Fn(&'a mut A1, &'a A2) -> Self::Fut
67{
68    type Fut: Future<Output = T> + Send;
69}
70
71impl<'a, A1: 'a, A2: 'a, T: 'a, Out, F> AsyncCallback<'a, A1, A2, T> for F
72where
73    Out: Send + Future<Output = T>,
74    F: 'static + Sync + Send + Fn(&'a mut A1, &'a A2) -> Out,
75{
76    type Fut = Out;
77}
78
79/// Trait for user-defined setup data that can be shared across simulation nodes.
80///
81/// The setup data must be serializable, deserializable, cloneable, and thread-safe
82/// to be distributed across simulation nodes.
83pub trait SetupData: Serialize + DeserializeOwned + Send + Sync + Clone + Debug + 'static {}
84impl<T> SetupData for T where T: Serialize + DeserializeOwned + Send + Sync + Clone + Debug + 'static
85{}
86
87pub trait Ctx: Send + Sync + 'static {
88    type Config: SetupData + Default;
89    type Setup: SetupData;
90
91    /// Runs once for each simulation environment.
92    fn setup(config: &Self::Config) -> impl Future<Output = Result<Self::Setup>> + Send;
93
94    fn round_label(_config: &Self::Config, _round: u32) -> Option<String> {
95        None
96    }
97}
98
99impl Ctx for () {
100    type Config = ();
101    type Setup = ();
102
103    async fn setup(_config: &Self::Config) -> Result<Self::Setup> {
104        Ok(())
105    }
106}
107
108/// Context provided when spawning a new simulation node.
109///
110/// Contains all the necessary information and resources for initializing
111/// a node, including its index, the shared setup data, and a metrics registry.
112pub struct SpawnContext<'a, C: Ctx = ()> {
113    ctx: &'a StaticCtx<C>,
114    secret_key: SecretKey,
115    node_idx: u32,
116    registry: &'a mut Registry,
117}
118
119impl<'a, C: Ctx> SpawnContext<'a, C> {
120    /// Returns the index of this node in the simulation.
121    pub fn node_index(&self) -> u32 {
122        self.node_idx
123    }
124
125    /// Returns a reference to the setup data for this simulation.
126    pub fn setup_data(&self) -> &C::Setup {
127        &self.ctx.setup
128    }
129
130    pub fn config(&self) -> &C::Config {
131        &self.ctx.config
132    }
133
134    /// Returns a mutable reference to a metrics registry.
135    ///
136    /// Use this to register custom metrics for the node being spawned.
137    pub fn metrics_registry(&mut self) -> &mut Registry {
138        self.registry
139    }
140
141    /// Returns the secret key for this node.
142    pub fn secret_key(&self) -> SecretKey {
143        self.secret_key.clone()
144    }
145
146    /// Returns the node id of this node.
147    pub fn node_id(&self) -> NodeId {
148        self.secret_key.public()
149    }
150
151    /// Creates and binds a new endpoint with this node's secret key.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the endpoint fails to bind to a local address.
156    pub async fn bind_endpoint(&self) -> Result<Endpoint> {
157        let ep = Endpoint::builder()
158            .discovery_n0()
159            .secret_key(self.secret_key())
160            .bind()
161            .await?;
162        Ok(ep)
163    }
164}
165
166/// Context provided during each simulation round.
167///
168/// Contains information about the current round, this node's identity,
169/// the shared setup data, and the addresses of all participating nodes.
170pub struct RoundContext<'a, C: Ctx = ()> {
171    round: u32,
172    node_index: u32,
173    all_nodes: &'a Vec<NodeInfoWithAddr>,
174    ctx: &'a StaticCtx<C>,
175}
176
177impl<'a, C: Ctx> RoundContext<'a, C> {
178    /// Returns the current round number.
179    pub fn round(&self) -> u32 {
180        self.round
181    }
182
183    /// Returns the index of this node in the simulation.
184    pub fn node_index(&self) -> u32 {
185        self.node_index
186    }
187
188    /// Returns a reference to the shared setup data for this simulation.
189    pub fn setup_data(&self) -> &C::Setup {
190        &self.ctx.setup
191    }
192
193    pub fn config(&self) -> &C::Config {
194        &self.ctx.config
195    }
196
197    /// Returns an iterator over the addresses of all nodes except the specified one.
198    pub fn all_other_nodes(&self, me: NodeId) -> impl Iterator<Item = &NodeAddr> + '_ {
199        self.all_nodes
200            .iter()
201            .filter(move |n| n.info.node_id != Some(me))
202            .flat_map(|n| &n.addr)
203    }
204
205    /// Returns the address of the node with the given index.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if no node with the specified index exists.
210    pub fn addr(&self, idx: u32) -> Result<NodeAddr> {
211        self.all_nodes
212            .iter()
213            .find(|n| n.info.idx == idx)
214            .cloned()
215            .context("node not found")?
216            .addr
217            .context("node has no address")
218    }
219
220    /// Returns the address of this node.
221    pub fn self_addr(&self) -> Option<&NodeAddr> {
222        self.all_nodes
223            .iter()
224            .find(|n| n.info.idx == self.node_index)
225            .and_then(|info| info.addr.as_ref())
226    }
227
228    pub fn try_self_addr(&self) -> Result<&NodeAddr> {
229        self.self_addr().context("missing node address")
230    }
231
232    /// Returns the total number of nodes participating in the simulation.
233    pub fn node_count(&self) -> usize {
234        self.all_nodes.len()
235    }
236}
237
238/// Trait for simulation node implementations.
239///
240/// Provides basic functionality for nodes including optional endpoint access
241/// and cleanup on shutdown.
242///
243/// To use a node in a simulation, implement this trait for your type and
244/// provide an async `spawn` function. Add the node to a simulation with
245/// `NodeBuilder::new(round_fn)` and `Builder::spawn(...)`.
246pub trait Node<C: Ctx = ()>: Send + 'static + Sized {
247    /// Returns a reference to this node's endpoint, if any.
248    fn endpoint(&self) -> Option<&Endpoint>;
249
250    /// Shuts down this node, performing any necessary cleanup.
251    ///
252    /// The default implementation does nothing and returns success.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if shutdown fails.
257    fn shutdown(&mut self) -> impl Future<Output = Result<()>> + Send + '_ {
258        async { Ok(()) }
259    }
260
261    /// Spawns a new instance of this node type.
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if the node fails to initialize properly.
266    fn spawn(context: &mut SpawnContext<'_, C>) -> impl Future<Output = Result<Self>> + Send;
267
268    /// Spawns a new instance as a dynamically-typed node.
269    ///
270    /// This calls `spawn` and boxes the result.
271    ///
272    /// # Errors
273    ///
274    /// Returns an error if the node fails to initialize properly.
275    fn spawn_dyn<'a>(context: &'a mut SpawnContext<'_, C>) -> BoxFuture<'a, Result<BoxNode<C>>> {
276        Box::pin(async {
277            let node = Self::spawn(context).await?;
278            let node: Box<dyn DynNode<C>> = Box::new(node);
279            anyhow::Ok(node)
280        })
281    }
282
283    fn node_label(&self, _context: &SpawnContext<C>) -> Option<String> {
284        None
285    }
286}
287
288type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
289
290/// A boxed dynamically-typed simulation node.
291pub type BoxNode<C> = Box<dyn DynNode<C> + 'static>;
292
293/// Trait for dynamically-typed simulation nodes.
294///
295/// This trait enables type erasure for nodes while preserving essential
296/// functionality like shutdown, endpoint access, and type casting.
297pub trait DynNode<C: Ctx>: Send + Any + 'static {
298    /// Shuts down this node, performing any necessary cleanup.
299    ///
300    /// The default implementation does nothing and returns success.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if shutdown fails.
305    fn shutdown(&mut self) -> BoxFuture<'_, Result<()>> {
306        Box::pin(async { Ok(()) })
307    }
308
309    /// Returns a reference to this node's endpoint, if any.
310    ///
311    /// The default implementation returns `None`.
312    fn endpoint(&self) -> Option<&Endpoint> {
313        None
314    }
315
316    fn node_label(&self, _context: &SpawnContext<C>) -> Option<String> {
317        None
318    }
319
320    /// Returns a reference to this node as `Any` for downcasting.
321    fn as_any(&self) -> &dyn Any;
322
323    /// Returns a mutable reference to this node as `Any` for downcasting.
324    fn as_any_mut(&mut self) -> &mut dyn Any;
325}
326
327impl<C: Ctx, N: Node<C> + Sized> DynNode<C> for N {
328    fn shutdown(&mut self) -> BoxFuture<'_, Result<()>> {
329        Box::pin(<Self as Node<C>>::shutdown(self))
330    }
331
332    fn endpoint(&self) -> Option<&Endpoint> {
333        <Self as Node<C>>::endpoint(self)
334    }
335
336    fn node_label(&self, context: &SpawnContext<C>) -> Option<String> {
337        <Self as Node<C>>::node_label(self, context)
338    }
339
340    fn as_any(&self) -> &dyn Any {
341        self
342    }
343
344    fn as_any_mut(&mut self) -> &mut dyn Any {
345        self
346    }
347}
348
349#[derive()]
350/// Builder for constructing simulation configurations.
351///
352/// Allows configuring the setup function, node spawners, and number of rounds
353/// before building the final simulation.
354pub struct Builder<C: Ctx = ()> {
355    config: C::Config,
356    node_builders: Vec<NodeBuilderWithCount<C>>,
357    rounds: u32,
358}
359
360struct SimFns<C: Ctx> {
361    spawn: BoxedSpawnFn<C>,
362    round: BoxedRoundFn<C>,
363    check: Option<BoxedCheckFn<C>>,
364}
365
366impl<C: Ctx> Clone for SimFns<C> {
367    fn clone(&self) -> Self {
368        Self {
369            round: self.round.clone(),
370            check: self.check.clone(),
371            spawn: self.spawn.clone(),
372        }
373    }
374}
375
376/// Builder for configuring individual nodes in a simulation.
377///
378/// Provides methods to set up spawn functions, round functions, and optional
379/// check functions for a specific node type.
380#[derive(Clone)]
381pub struct NodeBuilder<N: Node<C>, C: Ctx> {
382    phantom: PhantomData<N>,
383    fns: SimFns<C>,
384}
385
386struct ErasedNodeBuilder<C: Ctx> {
387    fns: SimFns<C>,
388}
389
390impl<C: Ctx> Clone for ErasedNodeBuilder<C> {
391    fn clone(&self) -> Self {
392        Self {
393            fns: self.fns.clone(),
394        }
395    }
396}
397
398impl<N: Node<C>, C: Ctx> NodeBuilder<N, C> {
399    /// Creates a new node builder with the given round function.
400    ///
401    /// The round function will be called each simulation round and should return
402    /// `Ok(true)` to continue or `Ok(false)` to stop early.
403    pub fn new(
404        round_fn: impl for<'a> AsyncCallback<'a, N, RoundContext<'a, C>, Result<bool>>,
405    ) -> Self {
406        let spawn_fn: BoxedSpawnFn<C> = Arc::new(N::spawn_dyn);
407        let round_fn: BoxedRoundFn<C> = Arc::new(move |node, context| {
408            let node = node
409                .as_any_mut()
410                .downcast_mut::<N>()
411                .expect("unreachable: type is statically guaranteed");
412            Box::pin(round_fn(node, context))
413        });
414        let fns = SimFns {
415            spawn: spawn_fn,
416            round: round_fn,
417            check: None,
418        };
419        Self {
420            phantom: PhantomData,
421            fns,
422        }
423    }
424
425    /// Adds a check function that will be called after each round.
426    ///
427    /// The check function can verify node state and return an error to fail
428    /// the simulation if invariants are violated.
429    ///
430    /// # Errors
431    ///
432    /// The check function should return an error if validation fails.
433    pub fn check(
434        mut self,
435        check_fn: impl 'static + for<'a> Fn(&'a N, &RoundContext<'a, C>) -> Result<()>,
436    ) -> Self {
437        let check_fn: BoxedCheckFn<C> = Arc::new(move |node, context| {
438            let node = node
439                .as_any()
440                .downcast_ref::<N>()
441                .expect("unreachable: type is statically guaranteed");
442            check_fn(node, context)
443        });
444        self.fns.check = Some(check_fn);
445        self
446    }
447
448    fn erase(self) -> ErasedNodeBuilder<C> {
449        ErasedNodeBuilder { fns: self.fns }
450    }
451}
452
453struct SimNode<'a, C: Ctx> {
454    node: BoxNode<C>,
455    ctx: &'a StaticCtx<C>,
456    client: ActiveTrace,
457    idx: u32,
458    fns: SimFns<C>,
459    round: u32,
460    info: NodeInfo,
461    metrics: Arc<RwLock<Registry>>,
462    checkpoint_watcher: n0_watcher::Watchable<CheckpointId>,
463    all_nodes: Vec<NodeInfoWithAddr>,
464}
465
466impl<'a, C: Ctx> SimNode<'a, C> {
467    async fn spawn_and_run(builder: NodeBuilderWithIdx<C>, ctx: &'a StaticCtx<C>) -> Result<()> {
468        let secret_key = SecretKey::generate(&mut rand::rng());
469        let NodeBuilderWithIdx { node_idx, builder } = builder;
470        let mut registry = Registry::default();
471        let node_id = secret_key.public();
472        let mut context = SpawnContext::<C> {
473            ctx,
474            node_idx,
475            secret_key: secret_key.clone(),
476            registry: &mut registry,
477        };
478        let node = (builder.fns.spawn)(&mut context).await?;
479
480        // TODO: Borrow checker forces to recreate the context, I think this can be avoided but can't figure it out.
481        let context = SpawnContext::<C> {
482            ctx,
483            node_idx,
484            secret_key: secret_key.clone(),
485            registry: &mut registry,
486        };
487        let label = node.node_label(&context);
488
489        let info = NodeInfo {
490            // TODO: Only assign node id if endpoint was created.
491            node_id: Some(node_id),
492            idx: node_idx,
493            label,
494        };
495
496        if let Some(endpoint) = node.endpoint() {
497            registry.register_all(endpoint.metrics());
498        }
499        let client = ctx.client.start_node(ctx.trace_id, info.clone()).await?;
500
501        let mut node = Self {
502            node,
503            ctx,
504            client,
505            idx: node_idx,
506            info,
507            round: 0,
508            fns: builder.fns,
509            checkpoint_watcher: Watchable::new(0),
510            metrics: Arc::new(RwLock::new(registry)),
511            all_nodes: Default::default(),
512        };
513
514        let res = node
515            .run()
516            .await
517            .with_context(|| format!("node {} failed", node.idx));
518        if let Err(err) = &res {
519            warn!("node failed: {err:#}");
520        }
521        res
522    }
523
524    async fn run(&mut self) -> Result<()> {
525        info!(idx = self.idx, "start");
526
527        // Spawn a task to periodically put metrics.
528        if let Some(node_id) = self.node_id() {
529            let client = self.client.clone();
530            let mut watcher = self.checkpoint_watcher.watch();
531            let mut metrics_encoder = Encoder::new(self.metrics.clone());
532            tokio::task::spawn(
533                async move {
534                    let mut interval = tokio::time::interval(METRICS_INTERVAL);
535                    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
536                    loop {
537                        let checkpoint = tokio::select! {
538                            _ = interval.tick() => None,
539                            checkpoint = watcher.updated() => {
540                                match checkpoint {
541                                    Err(_) => break,
542                                    Ok(checkpoint) => Some(checkpoint)
543                                }
544                            }
545                        };
546                        if let Err(err) = client
547                            .put_metrics(node_id, checkpoint, metrics_encoder.export())
548                            .await
549                        {
550                            warn!(?err, "failed to put metrics, stop metrics task");
551                            break;
552                        }
553                    }
554                }
555                .instrument(error_span!("metrics")),
556            );
557        }
558
559        let info = NodeInfoWithAddr {
560            addr: self.my_addr().await,
561            info: self.info.clone(),
562        };
563        self.all_nodes = self.client.wait_start(info).await?;
564
565        let result = self.run_rounds().await;
566
567        if let Err(err) = self.node.shutdown().await {
568            warn!("failure during node shutdown: {err:#}");
569        }
570
571        self.client.end(to_str_err(&result)).await?;
572
573        result
574    }
575
576    async fn run_rounds(&mut self) -> Result<()> {
577        while self.round < self.ctx.max_rounds {
578            if !self
579                .run_round()
580                .await
581                .with_context(|| format!("failed at round {}", self.round))?
582            {
583                return Ok(());
584            }
585            self.round += 1;
586        }
587        Ok(())
588    }
589
590    #[tracing::instrument(name="round", skip_all, fields(round=self.round))]
591    async fn run_round(&mut self) -> Result<bool> {
592        let context = RoundContext {
593            round: self.round,
594            node_index: self.idx,
595            all_nodes: &self.all_nodes,
596            ctx: self.ctx,
597        };
598
599        let label = C::round_label(&self.ctx.config, self.round)
600            .unwrap_or_else(|| format!("Round {}", self.round));
601
602        info!(%label, "start round");
603
604        let result = (self.fns.round)(&mut self.node, &context)
605            .await
606            .context("round function failed");
607
608        info!(%label, "end round");
609
610        let checkpoint = (context.round + 1) as u64;
611        self.checkpoint_watcher.set(checkpoint).ok();
612        self.client
613            .put_checkpoint(checkpoint, Some(label), to_str_err(&result))
614            .await
615            .context("put checkpoint")?;
616
617        self.client
618            .wait_checkpoint(checkpoint)
619            .await
620            .context("wait checkpoint")?;
621
622        match result {
623            Ok(out) => {
624                if let Some(check_fn) = self.fns.check.as_ref() {
625                    (check_fn)(&self.node, &context).context("check function failed")?;
626                }
627                Ok(out)
628            }
629            Err(err) => Err(err),
630        }
631    }
632
633    fn node_id(&self) -> Option<NodeId> {
634        self.info.node_id
635    }
636
637    async fn my_addr(&self) -> Option<NodeAddr> {
638        if let Some(endpoint) = self.node.endpoint() {
639            Some(node_addr(endpoint).await)
640        } else {
641            None
642        }
643    }
644}
645
646async fn node_addr(endpoint: &Endpoint) -> NodeAddr {
647    endpoint.online().await;
648    endpoint.node_addr()
649}
650
651impl Default for Builder<()> {
652    fn default() -> Self {
653        Self::new()
654    }
655}
656
657impl<C: Ctx> Builder<C> {
658    /// Creates a new simulation builder.
659    ///
660    /// The context's setup function is called once before the simulation starts to
661    /// initialize the setup data that will be shared across all nodes.
662    ///
663    /// The setup function can return any type that implements [`SetupData`],
664    /// which is an auto-implemented supertrait for all types that are
665    /// serializable, cloneable, and thread-safe. See [`SetupData`] for details.
666    ///
667    /// # Errors
668    ///
669    /// The setup function should return an error if initialization fails.
670    pub fn new() -> Self {
671        Self::with_config(Default::default())
672    }
673
674    pub fn with_config(config: C::Config) -> Self {
675        Self {
676            config,
677            node_builders: vec![],
678            rounds: 0,
679        }
680    }
681
682    /// Sets the number of rounds this simulation will run.
683    pub fn rounds(mut self, rounds: u32) -> Self {
684        self.rounds = rounds;
685        self
686    }
687
688    /// Adds a group of nodes to spawn in this simulation.
689    ///
690    /// Each node will be created using the provided node builder configuration.
691    ///
692    /// Create a [`NodeBuilder`] with `NodeBuilder::new(round_fn)` for any type
693    /// that implements [`Node<C>`]. Shared setup data is provided via your
694    /// [`Ctx`] implementation (`Ctx::setup`), not via a builder closure.
695    pub fn spawn<N: Node<C>>(
696        mut self,
697        node_count: u32,
698        builder: impl Into<NodeBuilder<N, C>>,
699    ) -> Self {
700        let builder = builder.into();
701        self.node_builders.push(NodeBuilderWithCount {
702            count: node_count,
703            builder: builder.erase(),
704        });
705        self
706    }
707
708    /// Builds the final simulation from this configuration.
709    ///
710    /// This method initializes tracing, runs the setup function, and prepares
711    /// all nodes for execution based on the current run mode.
712    ///
713    /// # Errors
714    ///
715    /// Returns an error if setup fails, tracing initialization fails, or
716    /// the configuration is invalid for the current run mode.
717    pub async fn build(self, name: &str) -> Result<Simulation<C>> {
718        let client = TraceClient::from_env_or_local()?;
719        let run_mode = RunMode::from_env()?;
720
721        debug!(%name, ?run_mode, "build simulation run");
722
723        let (trace_id, setup_data) = if matches!(run_mode, RunMode::InitOnly | RunMode::Integrated)
724        {
725            let setup_data = C::setup(&self.config).await?;
726            let encoded_setup_data = Bytes::from(postcard::to_stdvec(&setup_data)?);
727            let node_count = self.node_builders.iter().map(|builder| builder.count).sum();
728            let trace_id = client
729                .init_trace(
730                    TraceInfo {
731                        name: name.to_string(),
732                        node_count,
733                        expected_checkpoints: Some(self.rounds as u64),
734                    },
735                    Some(encoded_setup_data),
736                )
737                .await?;
738            info!(%name, node_count, %trace_id, "init simulation");
739
740            (trace_id, setup_data)
741        } else {
742            let info = client.get_trace(name.to_string()).await?;
743            let GetTraceResponse {
744                trace_id,
745                info,
746                setup_data,
747            } = info;
748            info!(%name, node_count=info.node_count, %trace_id, "get simulation");
749            let setup_data = setup_data.context("expected setup data to be set")?;
750            let setup_data: C::Setup =
751                postcard::from_bytes(&setup_data).context("failed to decode setup data")?;
752            (trace_id, setup_data)
753        };
754
755        // map all the builders with their count into a flat iter of (i, builder)
756        let mut builders = self
757            .node_builders
758            .iter()
759            .flat_map(|builder| (0..builder.count).map(|_| &builder.builder))
760            .enumerate();
761
762        // build the list of builders *we* want to run:
763        let node_builders: Vec<_> = match run_mode {
764            // init only: run nothing
765            RunMode::InitOnly => vec![],
766            // integrated: run all nodes
767            RunMode::Integrated => builders.map(NodeBuilderWithIdx::from_tuple).collect(),
768            // isolated: run single node
769            RunMode::Isolated(idx) => {
770                let item = builders
771                    .nth(idx as usize)
772                    .context("invalid isolated index")?;
773                vec![NodeBuilderWithIdx::from_tuple(item)]
774            }
775        };
776
777        let ctx = StaticCtx {
778            setup: setup_data,
779            config: self.config,
780            trace_id,
781            client,
782            run_mode,
783            max_rounds: self.rounds,
784        };
785
786        Ok(Simulation { node_builders, ctx })
787    }
788}
789
790struct NodeBuilderWithCount<C: Ctx> {
791    count: u32,
792    builder: ErasedNodeBuilder<C>,
793}
794
795struct NodeBuilderWithIdx<C: Ctx> {
796    node_idx: u32,
797    builder: ErasedNodeBuilder<C>,
798}
799
800impl<C: Ctx> NodeBuilderWithIdx<C> {
801    fn from_tuple((node_idx, builder): (usize, &ErasedNodeBuilder<C>)) -> Self {
802        Self {
803            node_idx: node_idx as u32,
804            builder: builder.clone(),
805        }
806    }
807}
808
809/// A configured simulation ready to run.
810///
811/// Contains all the necessary components including the setup data, node spawners,
812/// and tracing client to execute a simulation run.
813pub struct Simulation<C: Ctx> {
814    ctx: StaticCtx<C>,
815    node_builders: Vec<NodeBuilderWithIdx<C>>,
816}
817
818impl<C: Ctx> Simulation<C> {
819    /// Runs this simulation to completion.
820    ///
821    /// Spawns all configured nodes concurrently and executes the specified
822    /// number of simulation rounds.
823    ///
824    /// # Errors
825    ///
826    /// Returns an error if any node fails to spawn or if any round fails to execute.
827    pub async fn run(self) -> Result<()> {
828        let cancel_token = CancellationToken::new();
829
830        // Spawn a task to submit logs.
831        let logs_scope = match self.ctx.run_mode {
832            RunMode::Isolated(idx) => Some(Scope::Isolated(idx)),
833            RunMode::Integrated => Some(Scope::Integrated),
834            // Do not push logs for init-only runs.
835            RunMode::InitOnly => None,
836        };
837        let logs_task = if let Some(scope) = logs_scope {
838            Some(spawn_logs_task(
839                self.ctx.client.clone(),
840                self.ctx.trace_id,
841                scope,
842                cancel_token.clone(),
843            ))
844        } else {
845            None
846        };
847
848        // Spawn and run all nodes concurrently.
849        let result = self
850            .node_builders
851            .into_iter()
852            .map(async |builder| {
853                let span = error_span!("sim-node", idx = builder.node_idx);
854                SimNode::spawn_and_run(builder, &self.ctx)
855                    .instrument(span)
856                    .await
857            })
858            .try_join_all()
859            .await
860            .map(|_list| ());
861
862        cancel_token.cancel();
863        if let Some(join_handle) = logs_task {
864            join_handle.await?;
865        }
866
867        if matches!(self.ctx.run_mode, RunMode::Integrated) {
868            self.ctx
869                .client
870                .close_trace(self.ctx.trace_id, to_str_err(&result))
871                .await?;
872        }
873
874        result
875    }
876}
877
878#[derive(Debug, Clone)]
879struct StaticCtx<C: Ctx> {
880    setup: C::Setup,
881    config: C::Config,
882    trace_id: Uuid,
883    client: TraceClient,
884    run_mode: RunMode,
885    max_rounds: u32,
886}
887
888#[derive(Debug, Copy, Clone)]
889enum RunMode {
890    InitOnly,
891    Integrated,
892    Isolated(u32),
893}
894
895impl RunMode {
896    fn from_env() -> Result<Self> {
897        if std::env::var(ENV_TRACE_INIT_ONLY).is_ok() {
898            Ok(Self::InitOnly)
899        } else {
900            match std::env::var(ENV_TRACE_ISOLATED) {
901                Err(_) => Ok(Self::Integrated),
902                Ok(s) => {
903                    let idx = s.parse().with_context(|| {
904                        format!("Failed to parse env var `{ENV_TRACE_ISOLATED}` as number")
905                    })?;
906                    Ok(Self::Isolated(idx))
907                }
908            }
909        }
910    }
911}
912
913/// Spawns a task that periodically submits the collected logs from our global
914/// tracing subscriber to a trace server.
915fn spawn_logs_task(
916    client: TraceClient,
917    trace_id: Uuid,
918    scope: Scope,
919    cancel_token: CancellationToken,
920) -> tokio::task::JoinHandle<()> {
921    tokio::task::spawn(async move {
922        loop {
923            if cancel_token
924                .run_until_cancelled(tokio::time::sleep(Duration::from_secs(1)))
925                .await
926                .is_none()
927            {
928                break;
929            }
930            let lines = self::trace::get_logs();
931            if lines.is_empty() {
932                continue;
933            }
934            // 500 chosen so that we stay below ~16MB of logs (irpc's MAX_MESSAGE_SIZE limit).
935            // This gives us ~32KB per log line on average.
936            for lines_chunk in lines.chunks(500) {
937                if let Err(e) = client.put_logs(trace_id, scope, lines_chunk.to_vec()).await {
938                    eprintln!(
939                        "warning: failed to submit logs due to error, stopping log submission now: {e:?}"
940                    );
941                    break;
942                }
943            }
944            if cancel_token.is_cancelled() {
945                break;
946            }
947        }
948    })
949}
950
951static PERMIT: Semaphore = Semaphore::const_new(1);
952
953/// Runs a simulation function with proper setup and cleanup.
954///
955/// This function handles tracing initialization, sequential execution (via semaphore),
956/// log management, and error reporting for simulation functions.
957///
958/// # Errors
959///
960/// Returns an error if the simulation function fails, the builder fails,
961/// or the simulation execution fails.
962#[doc(hidden)]
963pub async fn run_sim_fn<F, Fut, C, E>(name: &str, sim_fn: F) -> anyhow::Result<()>
964where
965    F: Fn() -> Fut,
966    Fut: Future<Output = Result<Builder<C>, E>>,
967    C: Ctx,
968    anyhow::Error: From<E>,
969{
970    // Ensure simulations run sequentially so that we can extract logs properly.
971    let permit = PERMIT.acquire().await.expect("semaphore closed");
972
973    // Init the global tracing subscriber.
974    self::trace::init();
975    // Clear remaining logs from previous runs.
976    self::trace::global_writer().clear();
977
978    eprintln!("running simulation: {name}");
979    let result = sim_fn()
980        .await
981        .map_err(anyhow::Error::from)
982        .with_context(|| format!("simulation builder function `{name}` failed"))?
983        .build(name)
984        .await
985        .with_context(|| format!("simulation `{name}` failed to start"))?
986        .run()
987        .await
988        .with_context(|| format!("simulation `{name}` failed to complete"));
989
990    match &result {
991        Ok(()) => eprintln!("simulation `{name}` passed"),
992        Err(err) => eprintln!("simulation `{name}` failed: {err:#}"),
993    };
994
995    drop(permit);
996
997    result
998}
999
1000fn to_str_err<T>(res: &Result<T, anyhow::Error>) -> Result<(), String> {
1001    if let Some(err) = res.as_ref().err() {
1002        Err(format!("{err:?}"))
1003    } else {
1004        Ok(())
1005    }
1006}