1use std::{
2 any::Any,
3 fmt::Debug,
4 marker::PhantomData,
5 pin::Pin,
6 sync::{Arc, RwLock},
7 time::Duration,
8};
9
10use anyhow::{Context, Result};
11use bytes::Bytes;
12use iroh::{Endpoint, NodeAddr, NodeId, SecretKey, Watcher};
13use iroh_metrics::encoding::Encoder;
14use iroh_n0des::{
15 Registry,
16 simulation::proto::{ActiveTrace, NodeInfo, TraceClient, TraceInfo},
17};
18use n0_future::IterExt;
19use n0_watcher::Watchable;
20use proto::{GetTraceResponse, NodeInfoWithAddr, Scope};
21use serde::{Serialize, de::DeserializeOwned};
22use tokio::{sync::Semaphore, time::MissedTickBehavior};
23use tokio_util::sync::CancellationToken;
24use tracing::{Instrument, debug, error_span, info, warn};
25use uuid::Uuid;
26
27use crate::simulation::proto::CheckpointId;
28
29pub mod events;
30pub mod proto;
31pub mod trace;
32
33pub const ENV_TRACE_ISOLATED: &str = "N0DES_TRACE_ISOLATED";
35pub const ENV_TRACE_INIT_ONLY: &str = "N0DES_TRACE_INIT_ONLY";
37pub const ENV_TRACE_SERVER: &str = "N0DES_TRACE_SERVER";
39pub const ENV_TRACE_SESSION_ID: &str = "N0DES_SESSION_ID";
41
42const METRICS_INTERVAL: Duration = Duration::from_secs(1);
43
44type BoxedSpawnFn<C> = Arc<
45 dyn 'static
46 + Send
47 + Sync
48 + for<'a> Fn(&'a mut SpawnContext<'a, C>) -> BoxFuture<'a, Result<BoxNode<C>>>,
49>;
50type BoxedRoundFn<C> = Arc<
51 dyn 'static
52 + Send
53 + Sync
54 + for<'a> Fn(&'a mut BoxNode<C>, &'a RoundContext<'a, C>) -> BoxFuture<'a, Result<bool>>,
55>;
56
57type BoxedCheckFn<C> = Arc<dyn Fn(&BoxNode<C>, &RoundContext<'_, C>) -> Result<()>>;
58
59pub trait AsyncCallback<'a, A1: 'a, A2: 'a, T: 'a>:
66 'static + Send + Sync + Fn(&'a mut A1, &'a A2) -> Self::Fut
67{
68 type Fut: Future<Output = T> + Send;
69}
70
71impl<'a, A1: 'a, A2: 'a, T: 'a, Out, F> AsyncCallback<'a, A1, A2, T> for F
72where
73 Out: Send + Future<Output = T>,
74 F: 'static + Sync + Send + Fn(&'a mut A1, &'a A2) -> Out,
75{
76 type Fut = Out;
77}
78
79pub trait SetupData: Serialize + DeserializeOwned + Send + Sync + Clone + Debug + 'static {}
84impl<T> SetupData for T where T: Serialize + DeserializeOwned + Send + Sync + Clone + Debug + 'static
85{}
86
87pub trait Ctx: Send + Sync + 'static {
88 type Config: SetupData + Default;
89 type Setup: SetupData;
90
91 fn setup(config: &Self::Config) -> impl Future<Output = Result<Self::Setup>> + Send;
93
94 fn round_label(_config: &Self::Config, _round: u32) -> Option<String> {
95 None
96 }
97}
98
99impl Ctx for () {
100 type Config = ();
101 type Setup = ();
102
103 async fn setup(_config: &Self::Config) -> Result<Self::Setup> {
104 Ok(())
105 }
106}
107
108pub struct SpawnContext<'a, C: Ctx = ()> {
113 ctx: &'a StaticCtx<C>,
114 secret_key: SecretKey,
115 node_idx: u32,
116 registry: &'a mut Registry,
117}
118
119impl<'a, C: Ctx> SpawnContext<'a, C> {
120 pub fn node_index(&self) -> u32 {
122 self.node_idx
123 }
124
125 pub fn setup_data(&self) -> &C::Setup {
127 &self.ctx.setup
128 }
129
130 pub fn config(&self) -> &C::Config {
131 &self.ctx.config
132 }
133
134 pub fn metrics_registry(&mut self) -> &mut Registry {
138 self.registry
139 }
140
141 pub fn secret_key(&self) -> SecretKey {
143 self.secret_key.clone()
144 }
145
146 pub fn node_id(&self) -> NodeId {
148 self.secret_key.public()
149 }
150
151 pub async fn bind_endpoint(&self) -> Result<Endpoint> {
157 let ep = Endpoint::builder()
158 .discovery_n0()
159 .secret_key(self.secret_key())
160 .bind()
161 .await?;
162 Ok(ep)
163 }
164}
165
166pub struct RoundContext<'a, C: Ctx = ()> {
171 round: u32,
172 node_index: u32,
173 all_nodes: &'a Vec<NodeInfoWithAddr>,
174 ctx: &'a StaticCtx<C>,
175}
176
177impl<'a, C: Ctx> RoundContext<'a, C> {
178 pub fn round(&self) -> u32 {
180 self.round
181 }
182
183 pub fn node_index(&self) -> u32 {
185 self.node_index
186 }
187
188 pub fn setup_data(&self) -> &C::Setup {
190 &self.ctx.setup
191 }
192
193 pub fn config(&self) -> &C::Config {
194 &self.ctx.config
195 }
196
197 pub fn all_other_nodes(&self, me: NodeId) -> impl Iterator<Item = &NodeAddr> + '_ {
199 self.all_nodes
200 .iter()
201 .filter(move |n| n.info.node_id != Some(me))
202 .flat_map(|n| &n.addr)
203 }
204
205 pub fn addr(&self, idx: u32) -> Result<NodeAddr> {
211 self.all_nodes
212 .iter()
213 .find(|n| n.info.idx == idx)
214 .cloned()
215 .context("node not found")?
216 .addr
217 .context("node has no address")
218 }
219
220 pub fn self_addr(&self) -> Option<&NodeAddr> {
222 self.all_nodes
223 .iter()
224 .find(|n| n.info.idx == self.node_index)
225 .and_then(|info| info.addr.as_ref())
226 }
227
228 pub fn try_self_addr(&self) -> Result<&NodeAddr> {
229 self.self_addr().context("missing node address")
230 }
231
232 pub fn node_count(&self) -> usize {
234 self.all_nodes.len()
235 }
236}
237
238pub trait Node<C: Ctx = ()>: Send + 'static + Sized {
247 fn endpoint(&self) -> Option<&Endpoint>;
249
250 fn shutdown(&mut self) -> impl Future<Output = Result<()>> + Send + '_ {
258 async { Ok(()) }
259 }
260
261 fn spawn(context: &mut SpawnContext<'_, C>) -> impl Future<Output = Result<Self>> + Send;
267
268 fn spawn_dyn<'a>(context: &'a mut SpawnContext<'_, C>) -> BoxFuture<'a, Result<BoxNode<C>>> {
276 Box::pin(async {
277 let node = Self::spawn(context).await?;
278 let node: Box<dyn DynNode<C>> = Box::new(node);
279 anyhow::Ok(node)
280 })
281 }
282
283 fn node_label(&self, _context: &SpawnContext<C>) -> Option<String> {
284 None
285 }
286}
287
288type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
289
290pub type BoxNode<C> = Box<dyn DynNode<C> + 'static>;
292
293pub trait DynNode<C: Ctx>: Send + Any + 'static {
298 fn shutdown(&mut self) -> BoxFuture<'_, Result<()>> {
306 Box::pin(async { Ok(()) })
307 }
308
309 fn endpoint(&self) -> Option<&Endpoint> {
313 None
314 }
315
316 fn node_label(&self, _context: &SpawnContext<C>) -> Option<String> {
317 None
318 }
319
320 fn as_any(&self) -> &dyn Any;
322
323 fn as_any_mut(&mut self) -> &mut dyn Any;
325}
326
327impl<C: Ctx, N: Node<C> + Sized> DynNode<C> for N {
328 fn shutdown(&mut self) -> BoxFuture<'_, Result<()>> {
329 Box::pin(<Self as Node<C>>::shutdown(self))
330 }
331
332 fn endpoint(&self) -> Option<&Endpoint> {
333 <Self as Node<C>>::endpoint(self)
334 }
335
336 fn node_label(&self, context: &SpawnContext<C>) -> Option<String> {
337 <Self as Node<C>>::node_label(self, context)
338 }
339
340 fn as_any(&self) -> &dyn Any {
341 self
342 }
343
344 fn as_any_mut(&mut self) -> &mut dyn Any {
345 self
346 }
347}
348
349#[derive()]
350pub struct Builder<C: Ctx = ()> {
355 config: C::Config,
356 node_builders: Vec<NodeBuilderWithCount<C>>,
357 rounds: u32,
358}
359
360struct SimFns<C: Ctx> {
361 spawn: BoxedSpawnFn<C>,
362 round: BoxedRoundFn<C>,
363 check: Option<BoxedCheckFn<C>>,
364}
365
366impl<C: Ctx> Clone for SimFns<C> {
367 fn clone(&self) -> Self {
368 Self {
369 round: self.round.clone(),
370 check: self.check.clone(),
371 spawn: self.spawn.clone(),
372 }
373 }
374}
375
376#[derive(Clone)]
381pub struct NodeBuilder<N: Node<C>, C: Ctx> {
382 phantom: PhantomData<N>,
383 fns: SimFns<C>,
384}
385
386struct ErasedNodeBuilder<C: Ctx> {
387 fns: SimFns<C>,
388}
389
390impl<C: Ctx> Clone for ErasedNodeBuilder<C> {
391 fn clone(&self) -> Self {
392 Self {
393 fns: self.fns.clone(),
394 }
395 }
396}
397
398impl<N: Node<C>, C: Ctx> NodeBuilder<N, C> {
399 pub fn new(
404 round_fn: impl for<'a> AsyncCallback<'a, N, RoundContext<'a, C>, Result<bool>>,
405 ) -> Self {
406 let spawn_fn: BoxedSpawnFn<C> = Arc::new(N::spawn_dyn);
407 let round_fn: BoxedRoundFn<C> = Arc::new(move |node, context| {
408 let node = node
409 .as_any_mut()
410 .downcast_mut::<N>()
411 .expect("unreachable: type is statically guaranteed");
412 Box::pin(round_fn(node, context))
413 });
414 let fns = SimFns {
415 spawn: spawn_fn,
416 round: round_fn,
417 check: None,
418 };
419 Self {
420 phantom: PhantomData,
421 fns,
422 }
423 }
424
425 pub fn check(
434 mut self,
435 check_fn: impl 'static + for<'a> Fn(&'a N, &RoundContext<'a, C>) -> Result<()>,
436 ) -> Self {
437 let check_fn: BoxedCheckFn<C> = Arc::new(move |node, context| {
438 let node = node
439 .as_any()
440 .downcast_ref::<N>()
441 .expect("unreachable: type is statically guaranteed");
442 check_fn(node, context)
443 });
444 self.fns.check = Some(check_fn);
445 self
446 }
447
448 fn erase(self) -> ErasedNodeBuilder<C> {
449 ErasedNodeBuilder { fns: self.fns }
450 }
451}
452
453struct SimNode<'a, C: Ctx> {
454 node: BoxNode<C>,
455 ctx: &'a StaticCtx<C>,
456 client: ActiveTrace,
457 idx: u32,
458 fns: SimFns<C>,
459 round: u32,
460 info: NodeInfo,
461 metrics: Arc<RwLock<Registry>>,
462 checkpoint_watcher: n0_watcher::Watchable<CheckpointId>,
463 all_nodes: Vec<NodeInfoWithAddr>,
464}
465
466impl<'a, C: Ctx> SimNode<'a, C> {
467 async fn spawn_and_run(builder: NodeBuilderWithIdx<C>, ctx: &'a StaticCtx<C>) -> Result<()> {
468 let secret_key = SecretKey::generate(&mut rand::rng());
469 let NodeBuilderWithIdx { node_idx, builder } = builder;
470 let mut registry = Registry::default();
471 let node_id = secret_key.public();
472 let mut context = SpawnContext::<C> {
473 ctx,
474 node_idx,
475 secret_key: secret_key.clone(),
476 registry: &mut registry,
477 };
478 let node = (builder.fns.spawn)(&mut context).await?;
479
480 let context = SpawnContext::<C> {
482 ctx,
483 node_idx,
484 secret_key: secret_key.clone(),
485 registry: &mut registry,
486 };
487 let label = node.node_label(&context);
488
489 let info = NodeInfo {
490 node_id: Some(node_id),
492 idx: node_idx,
493 label,
494 };
495
496 if let Some(endpoint) = node.endpoint() {
497 registry.register_all(endpoint.metrics());
498 }
499 let client = ctx.client.start_node(ctx.trace_id, info.clone()).await?;
500
501 let mut node = Self {
502 node,
503 ctx,
504 client,
505 idx: node_idx,
506 info,
507 round: 0,
508 fns: builder.fns,
509 checkpoint_watcher: Watchable::new(0),
510 metrics: Arc::new(RwLock::new(registry)),
511 all_nodes: Default::default(),
512 };
513
514 let res = node
515 .run()
516 .await
517 .with_context(|| format!("node {} failed", node.idx));
518 if let Err(err) = &res {
519 warn!("node failed: {err:#}");
520 }
521 res
522 }
523
524 async fn run(&mut self) -> Result<()> {
525 info!(idx = self.idx, "start");
526
527 if let Some(node_id) = self.node_id() {
529 let client = self.client.clone();
530 let mut watcher = self.checkpoint_watcher.watch();
531 let mut metrics_encoder = Encoder::new(self.metrics.clone());
532 tokio::task::spawn(
533 async move {
534 let mut interval = tokio::time::interval(METRICS_INTERVAL);
535 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
536 loop {
537 let checkpoint = tokio::select! {
538 _ = interval.tick() => None,
539 checkpoint = watcher.updated() => {
540 match checkpoint {
541 Err(_) => break,
542 Ok(checkpoint) => Some(checkpoint)
543 }
544 }
545 };
546 if let Err(err) = client
547 .put_metrics(node_id, checkpoint, metrics_encoder.export())
548 .await
549 {
550 warn!(?err, "failed to put metrics, stop metrics task");
551 break;
552 }
553 }
554 }
555 .instrument(error_span!("metrics")),
556 );
557 }
558
559 let info = NodeInfoWithAddr {
560 addr: self.my_addr().await,
561 info: self.info.clone(),
562 };
563 self.all_nodes = self.client.wait_start(info).await?;
564
565 let result = self.run_rounds().await;
566
567 if let Err(err) = self.node.shutdown().await {
568 warn!("failure during node shutdown: {err:#}");
569 }
570
571 self.client.end(to_str_err(&result)).await?;
572
573 result
574 }
575
576 async fn run_rounds(&mut self) -> Result<()> {
577 while self.round < self.ctx.max_rounds {
578 if !self
579 .run_round()
580 .await
581 .with_context(|| format!("failed at round {}", self.round))?
582 {
583 return Ok(());
584 }
585 self.round += 1;
586 }
587 Ok(())
588 }
589
590 #[tracing::instrument(name="round", skip_all, fields(round=self.round))]
591 async fn run_round(&mut self) -> Result<bool> {
592 let context = RoundContext {
593 round: self.round,
594 node_index: self.idx,
595 all_nodes: &self.all_nodes,
596 ctx: self.ctx,
597 };
598
599 let label = C::round_label(&self.ctx.config, self.round)
600 .unwrap_or_else(|| format!("Round {}", self.round));
601
602 info!(%label, "start round");
603
604 let result = (self.fns.round)(&mut self.node, &context)
605 .await
606 .context("round function failed");
607
608 info!(%label, "end round");
609
610 let checkpoint = (context.round + 1) as u64;
611 self.checkpoint_watcher.set(checkpoint).ok();
612 self.client
613 .put_checkpoint(checkpoint, Some(label), to_str_err(&result))
614 .await
615 .context("put checkpoint")?;
616
617 self.client
618 .wait_checkpoint(checkpoint)
619 .await
620 .context("wait checkpoint")?;
621
622 match result {
623 Ok(out) => {
624 if let Some(check_fn) = self.fns.check.as_ref() {
625 (check_fn)(&self.node, &context).context("check function failed")?;
626 }
627 Ok(out)
628 }
629 Err(err) => Err(err),
630 }
631 }
632
633 fn node_id(&self) -> Option<NodeId> {
634 self.info.node_id
635 }
636
637 async fn my_addr(&self) -> Option<NodeAddr> {
638 if let Some(endpoint) = self.node.endpoint() {
639 Some(node_addr(endpoint).await)
640 } else {
641 None
642 }
643 }
644}
645
646async fn node_addr(endpoint: &Endpoint) -> NodeAddr {
647 endpoint.online().await;
648 endpoint.node_addr()
649}
650
651impl Default for Builder<()> {
652 fn default() -> Self {
653 Self::new()
654 }
655}
656
657impl<C: Ctx> Builder<C> {
658 pub fn new() -> Self {
671 Self::with_config(Default::default())
672 }
673
674 pub fn with_config(config: C::Config) -> Self {
675 Self {
676 config,
677 node_builders: vec![],
678 rounds: 0,
679 }
680 }
681
682 pub fn rounds(mut self, rounds: u32) -> Self {
684 self.rounds = rounds;
685 self
686 }
687
688 pub fn spawn<N: Node<C>>(
696 mut self,
697 node_count: u32,
698 builder: impl Into<NodeBuilder<N, C>>,
699 ) -> Self {
700 let builder = builder.into();
701 self.node_builders.push(NodeBuilderWithCount {
702 count: node_count,
703 builder: builder.erase(),
704 });
705 self
706 }
707
708 pub async fn build(self, name: &str) -> Result<Simulation<C>> {
718 let client = TraceClient::from_env_or_local()?;
719 let run_mode = RunMode::from_env()?;
720
721 debug!(%name, ?run_mode, "build simulation run");
722
723 let (trace_id, setup_data) = if matches!(run_mode, RunMode::InitOnly | RunMode::Integrated)
724 {
725 let setup_data = C::setup(&self.config).await?;
726 let encoded_setup_data = Bytes::from(postcard::to_stdvec(&setup_data)?);
727 let node_count = self.node_builders.iter().map(|builder| builder.count).sum();
728 let trace_id = client
729 .init_trace(
730 TraceInfo {
731 name: name.to_string(),
732 node_count,
733 expected_checkpoints: Some(self.rounds as u64),
734 },
735 Some(encoded_setup_data),
736 )
737 .await?;
738 info!(%name, node_count, %trace_id, "init simulation");
739
740 (trace_id, setup_data)
741 } else {
742 let info = client.get_trace(name.to_string()).await?;
743 let GetTraceResponse {
744 trace_id,
745 info,
746 setup_data,
747 } = info;
748 info!(%name, node_count=info.node_count, %trace_id, "get simulation");
749 let setup_data = setup_data.context("expected setup data to be set")?;
750 let setup_data: C::Setup =
751 postcard::from_bytes(&setup_data).context("failed to decode setup data")?;
752 (trace_id, setup_data)
753 };
754
755 let mut builders = self
757 .node_builders
758 .iter()
759 .flat_map(|builder| (0..builder.count).map(|_| &builder.builder))
760 .enumerate();
761
762 let node_builders: Vec<_> = match run_mode {
764 RunMode::InitOnly => vec![],
766 RunMode::Integrated => builders.map(NodeBuilderWithIdx::from_tuple).collect(),
768 RunMode::Isolated(idx) => {
770 let item = builders
771 .nth(idx as usize)
772 .context("invalid isolated index")?;
773 vec![NodeBuilderWithIdx::from_tuple(item)]
774 }
775 };
776
777 let ctx = StaticCtx {
778 setup: setup_data,
779 config: self.config,
780 trace_id,
781 client,
782 run_mode,
783 max_rounds: self.rounds,
784 };
785
786 Ok(Simulation { node_builders, ctx })
787 }
788}
789
790struct NodeBuilderWithCount<C: Ctx> {
791 count: u32,
792 builder: ErasedNodeBuilder<C>,
793}
794
795struct NodeBuilderWithIdx<C: Ctx> {
796 node_idx: u32,
797 builder: ErasedNodeBuilder<C>,
798}
799
800impl<C: Ctx> NodeBuilderWithIdx<C> {
801 fn from_tuple((node_idx, builder): (usize, &ErasedNodeBuilder<C>)) -> Self {
802 Self {
803 node_idx: node_idx as u32,
804 builder: builder.clone(),
805 }
806 }
807}
808
809pub struct Simulation<C: Ctx> {
814 ctx: StaticCtx<C>,
815 node_builders: Vec<NodeBuilderWithIdx<C>>,
816}
817
818impl<C: Ctx> Simulation<C> {
819 pub async fn run(self) -> Result<()> {
828 let cancel_token = CancellationToken::new();
829
830 let logs_scope = match self.ctx.run_mode {
832 RunMode::Isolated(idx) => Some(Scope::Isolated(idx)),
833 RunMode::Integrated => Some(Scope::Integrated),
834 RunMode::InitOnly => None,
836 };
837 let logs_task = if let Some(scope) = logs_scope {
838 Some(spawn_logs_task(
839 self.ctx.client.clone(),
840 self.ctx.trace_id,
841 scope,
842 cancel_token.clone(),
843 ))
844 } else {
845 None
846 };
847
848 let result = self
850 .node_builders
851 .into_iter()
852 .map(async |builder| {
853 let span = error_span!("sim-node", idx = builder.node_idx);
854 SimNode::spawn_and_run(builder, &self.ctx)
855 .instrument(span)
856 .await
857 })
858 .try_join_all()
859 .await
860 .map(|_list| ());
861
862 cancel_token.cancel();
863 if let Some(join_handle) = logs_task {
864 join_handle.await?;
865 }
866
867 if matches!(self.ctx.run_mode, RunMode::Integrated) {
868 self.ctx
869 .client
870 .close_trace(self.ctx.trace_id, to_str_err(&result))
871 .await?;
872 }
873
874 result
875 }
876}
877
878#[derive(Debug, Clone)]
879struct StaticCtx<C: Ctx> {
880 setup: C::Setup,
881 config: C::Config,
882 trace_id: Uuid,
883 client: TraceClient,
884 run_mode: RunMode,
885 max_rounds: u32,
886}
887
888#[derive(Debug, Copy, Clone)]
889enum RunMode {
890 InitOnly,
891 Integrated,
892 Isolated(u32),
893}
894
895impl RunMode {
896 fn from_env() -> Result<Self> {
897 if std::env::var(ENV_TRACE_INIT_ONLY).is_ok() {
898 Ok(Self::InitOnly)
899 } else {
900 match std::env::var(ENV_TRACE_ISOLATED) {
901 Err(_) => Ok(Self::Integrated),
902 Ok(s) => {
903 let idx = s.parse().with_context(|| {
904 format!("Failed to parse env var `{ENV_TRACE_ISOLATED}` as number")
905 })?;
906 Ok(Self::Isolated(idx))
907 }
908 }
909 }
910 }
911}
912
913fn spawn_logs_task(
916 client: TraceClient,
917 trace_id: Uuid,
918 scope: Scope,
919 cancel_token: CancellationToken,
920) -> tokio::task::JoinHandle<()> {
921 tokio::task::spawn(async move {
922 loop {
923 if cancel_token
924 .run_until_cancelled(tokio::time::sleep(Duration::from_secs(1)))
925 .await
926 .is_none()
927 {
928 break;
929 }
930 let lines = self::trace::get_logs();
931 if lines.is_empty() {
932 continue;
933 }
934 for lines_chunk in lines.chunks(500) {
937 if let Err(e) = client.put_logs(trace_id, scope, lines_chunk.to_vec()).await {
938 eprintln!(
939 "warning: failed to submit logs due to error, stopping log submission now: {e:?}"
940 );
941 break;
942 }
943 }
944 if cancel_token.is_cancelled() {
945 break;
946 }
947 }
948 })
949}
950
951static PERMIT: Semaphore = Semaphore::const_new(1);
952
953#[doc(hidden)]
963pub async fn run_sim_fn<F, Fut, C, E>(name: &str, sim_fn: F) -> anyhow::Result<()>
964where
965 F: Fn() -> Fut,
966 Fut: Future<Output = Result<Builder<C>, E>>,
967 C: Ctx,
968 anyhow::Error: From<E>,
969{
970 let permit = PERMIT.acquire().await.expect("semaphore closed");
972
973 self::trace::init();
975 self::trace::global_writer().clear();
977
978 eprintln!("running simulation: {name}");
979 let result = sim_fn()
980 .await
981 .map_err(anyhow::Error::from)
982 .with_context(|| format!("simulation builder function `{name}` failed"))?
983 .build(name)
984 .await
985 .with_context(|| format!("simulation `{name}` failed to start"))?
986 .run()
987 .await
988 .with_context(|| format!("simulation `{name}` failed to complete"));
989
990 match &result {
991 Ok(()) => eprintln!("simulation `{name}` passed"),
992 Err(err) => eprintln!("simulation `{name}` failed: {err:#}"),
993 };
994
995 drop(permit);
996
997 result
998}
999
1000fn to_str_err<T>(res: &Result<T, anyhow::Error>) -> Result<(), String> {
1001 if let Some(err) = res.as_ref().err() {
1002 Err(format!("{err:?}"))
1003 } else {
1004 Ok(())
1005 }
1006}