1use std::{
2 any::Any,
3 fmt::Debug,
4 marker::PhantomData,
5 pin::Pin,
6 sync::{Arc, RwLock},
7 time::Duration,
8};
9
10use anyhow::{Context, Result};
11use bytes::Bytes;
12use iroh::{Endpoint, NodeAddr, NodeId, SecretKey, Watcher};
13use iroh_metrics::encoding::Encoder;
14use iroh_n0des::{
15 Registry,
16 simulation::proto::{ActiveTrace, NodeInfo, TraceClient, TraceInfo},
17};
18use n0_future::IterExt;
19use n0_watcher::Watchable;
20use proto::{GetTraceResponse, NodeInfoWithAddr, Scope};
21use serde::{Serialize, de::DeserializeOwned};
22use tokio::{sync::Semaphore, time::MissedTickBehavior};
23use tokio_util::sync::CancellationToken;
24use tracing::{Instrument, debug, error_span, info, warn};
25use uuid::Uuid;
26
27use crate::simulation::proto::CheckpointId;
28
29pub mod events;
30pub mod proto;
31pub mod trace;
32
33pub const ENV_TRACE_ISOLATED: &str = "N0DES_TRACE_ISOLATED";
35pub const ENV_TRACE_INIT_ONLY: &str = "N0DES_TRACE_INIT_ONLY";
37pub const ENV_TRACE_SERVER: &str = "N0DES_TRACE_SERVER";
39pub const ENV_TRACE_SESSION_ID: &str = "N0DES_SESSION_ID";
41
42const METRICS_INTERVAL: Duration = Duration::from_secs(1);
43
44type BoxedSetupFn<D> = Box<dyn 'static + Send + Sync + FnOnce() -> BoxFuture<'static, Result<D>>>;
45
46type BoxedSpawnFn<D> = Arc<
47 dyn 'static
48 + Send
49 + Sync
50 + for<'a> Fn(&'a mut SpawnContext<'a, D>) -> BoxFuture<'a, Result<BoxNode>>,
51>;
52type BoxedRoundFn<D> = Arc<
53 dyn 'static
54 + Send
55 + Sync
56 + for<'a> Fn(&'a mut BoxNode, &'a RoundContext<'a, D>) -> BoxFuture<'a, Result<bool>>,
57>;
58type RoundLabelFn<D> = Arc<dyn 'static + Send + Sync + for<'a> Fn(&'a D, u32) -> Option<String>>;
59
60type BoxedCheckFn<D> = Arc<dyn Fn(&BoxNode, &RoundContext<'_, D>) -> Result<()>>;
61
62pub trait AsyncCallback<'a, A1: 'a, A2: 'a, T: 'a>:
69 'static + Send + Sync + Fn(&'a mut A1, &'a A2) -> Self::Fut
70{
71 type Fut: Future<Output = T> + Send;
72}
73
74impl<'a, A1: 'a, A2: 'a, T: 'a, Out, F> AsyncCallback<'a, A1, A2, T> for F
75where
76 Out: Send + Future<Output = T>,
77 F: 'static + Sync + Send + Fn(&'a mut A1, &'a A2) -> Out,
78{
79 type Fut = Out;
80}
81
82pub trait SetupData: Serialize + DeserializeOwned + Send + Sync + Clone + Debug + 'static {}
87impl<T> SetupData for T where T: Serialize + DeserializeOwned + Send + Sync + Clone + Debug + 'static
88{}
89
90pub struct SpawnContext<'a, D = ()> {
95 secret_key: SecretKey,
96 node_idx: u32,
97 setup_data: &'a D,
98 registry: &'a mut Registry,
99}
100
101impl<'a, D: SetupData> SpawnContext<'a, D> {
102 pub fn node_index(&self) -> u32 {
104 self.node_idx
105 }
106
107 pub fn setup_data(&self) -> &D {
109 self.setup_data
110 }
111
112 pub fn metrics_registry(&mut self) -> &mut Registry {
116 self.registry
117 }
118
119 pub fn secret_key(&self) -> SecretKey {
121 self.secret_key.clone()
122 }
123
124 pub fn node_id(&self) -> NodeId {
126 self.secret_key.public()
127 }
128
129 pub async fn bind_endpoint(&self) -> Result<Endpoint> {
135 let ep = Endpoint::builder()
136 .discovery_n0()
137 .secret_key(self.secret_key())
138 .bind()
139 .await?;
140 Ok(ep)
141 }
142}
143
144pub struct RoundContext<'a, D = ()> {
149 round: u32,
150 node_index: u32,
151 setup_data: &'a D,
152 all_nodes: &'a Vec<NodeInfoWithAddr>,
153}
154
155impl<'a, D> RoundContext<'a, D> {
156 pub fn round(&self) -> u32 {
158 self.round
159 }
160
161 pub fn node_index(&self) -> u32 {
163 self.node_index
164 }
165
166 pub fn setup_data(&self) -> &D {
168 self.setup_data
169 }
170
171 pub fn all_other_nodes(&self, me: NodeId) -> impl Iterator<Item = &NodeAddr> + '_ {
173 self.all_nodes
174 .iter()
175 .filter(move |n| n.info.node_id != Some(me))
176 .flat_map(|n| &n.addr)
177 }
178
179 pub fn addr(&self, idx: u32) -> Result<NodeAddr> {
185 self.all_nodes
186 .iter()
187 .find(|n| n.info.idx == idx)
188 .cloned()
189 .context("node not found")?
190 .addr
191 .context("node has no address")
192 }
193
194 pub fn self_addr(&self) -> Option<&NodeAddr> {
196 self.all_nodes
197 .iter()
198 .find(|n| n.info.idx == self.node_index)
199 .and_then(|info| info.addr.as_ref())
200 }
201
202 pub fn try_self_addr(&self) -> Result<&NodeAddr> {
203 self.self_addr().context("missing node address")
204 }
205
206 pub fn node_count(&self) -> usize {
208 self.all_nodes.len()
209 }
210}
211
212pub trait Spawn<D: SetupData = ()>: Node + 'static {
221 fn spawn(context: &mut SpawnContext<'_, D>) -> impl Future<Output = Result<Self>> + Send
227 where
228 Self: Sized;
229
230 fn round_label(_setup_data: &D, _round: u32) -> Option<String> {
231 None
232 }
233
234 fn spawn_dyn<'a>(context: &'a mut SpawnContext<'a, D>) -> BoxFuture<'a, Result<BoxNode>>
242 where
243 Self: Sized,
244 {
245 Box::pin(async move {
246 let node = Self::spawn(context).await?;
247 let node: Box<dyn DynNode> = Box::new(node);
248 anyhow::Ok(node)
249 })
250 }
251
252 fn builder(
257 round_fn: impl for<'a> AsyncCallback<'a, Self, RoundContext<'a, D>, Result<bool>>,
258 ) -> NodeBuilder<Self, D>
259 where
260 Self: Sized,
261 {
262 NodeBuilder::new(round_fn)
263 }
264}
265
266pub trait Node: Send + 'static {
274 fn endpoint(&self) -> Option<&Endpoint> {
278 None
279 }
280
281 fn shutdown(&mut self) -> impl Future<Output = Result<()>> + Send + '_ {
289 async { Ok(()) }
290 }
291}
292
293type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
294
295pub type BoxNode = Box<dyn DynNode>;
297
298pub trait DynNode: Send + Any + 'static {
303 fn shutdown(&mut self) -> BoxFuture<'_, Result<()>> {
311 Box::pin(async { Ok(()) })
312 }
313
314 fn endpoint(&self) -> Option<&Endpoint> {
318 None
319 }
320
321 fn round_label(&self, _round: u32) -> Option<String> {
322 None
323 }
324
325 fn as_any(&self) -> &dyn Any;
327
328 fn as_any_mut(&mut self) -> &mut dyn Any;
330}
331
332impl<T: Node + Sized> DynNode for T {
333 fn shutdown(&mut self) -> BoxFuture<'_, Result<()>> {
334 Box::pin(<Self as Node>::shutdown(self))
335 }
336
337 fn endpoint(&self) -> Option<&Endpoint> {
338 <Self as Node>::endpoint(self)
339 }
340
341 fn as_any(&self) -> &dyn Any {
342 self
343 }
344
345 fn as_any_mut(&mut self) -> &mut dyn Any {
346 self
347 }
348}
349
350#[derive()]
351pub struct Builder<D = ()> {
356 setup_fn: BoxedSetupFn<D>,
357 node_builders: Vec<NodeBuilderWithCount<D>>,
358 rounds: u32,
359}
360
361#[derive(Clone)]
362pub struct NodeBuilder<N, D> {
367 phantom: PhantomData<N>,
368 spawn_fn: BoxedSpawnFn<D>,
369 round_fn: BoxedRoundFn<D>,
370 round_label_fn: RoundLabelFn<D>,
371 check_fn: Option<BoxedCheckFn<D>>,
372}
373
374#[derive(Clone)]
375struct ErasedNodeBuilder<D> {
376 spawn_fn: BoxedSpawnFn<D>,
377 round_fn: BoxedRoundFn<D>,
378 round_label_fn: RoundLabelFn<D>,
379 check_fn: Option<BoxedCheckFn<D>>,
380}
381
382impl<T, N: Spawn<D>, D: SetupData> From<T> for NodeBuilder<N, D>
383where
384 T: for<'a> AsyncCallback<'a, N, RoundContext<'a, D>, Result<bool>>,
385{
386 fn from(value: T) -> Self {
387 Self::new(value)
388 }
389}
390
391impl<N: Spawn<D>, D: SetupData> NodeBuilder<N, D> {
392 pub fn new(
397 round_fn: impl for<'a> AsyncCallback<'a, N, RoundContext<'a, D>, Result<bool>>,
398 ) -> Self {
399 let spawn_fn: BoxedSpawnFn<D> = Arc::new(N::spawn_dyn);
400 let round_label_fn: RoundLabelFn<D> = Arc::new(N::round_label);
401 let round_fn: BoxedRoundFn<D> = Arc::new(move |node, context| {
402 let node = node
403 .as_any_mut()
404 .downcast_mut::<N>()
405 .expect("unreachable: type is statically guaranteed");
406 Box::pin(round_fn(node, context))
407 });
408 Self {
409 phantom: PhantomData,
410 spawn_fn,
411 round_fn,
412 check_fn: None,
413 round_label_fn,
414 }
415 }
416
417 pub fn check(
426 mut self,
427 check_fn: impl 'static + for<'a> Fn(&'a N, &RoundContext<'a, D>) -> Result<()>,
428 ) -> Self {
429 let check_fn: BoxedCheckFn<D> = Arc::new(move |node, context| {
430 let node = node
431 .as_any()
432 .downcast_ref::<N>()
433 .expect("unreachable: type is statically guaranteed");
434 check_fn(node, context)
435 });
436 self.check_fn = Some(check_fn);
437 self
438 }
439
440 fn erase(self) -> ErasedNodeBuilder<D> {
441 ErasedNodeBuilder {
442 spawn_fn: self.spawn_fn,
443 round_fn: self.round_fn,
444 check_fn: self.check_fn,
445 round_label_fn: self.round_label_fn,
446 }
447 }
448}
449
450struct SimNode<D> {
451 node: BoxNode,
452 trace_id: Uuid,
453 idx: u32,
454 round_fn: BoxedRoundFn<D>,
455 check_fn: Option<BoxedCheckFn<D>>,
456 round_label_fn: RoundLabelFn<D>,
457 round: u32,
458 info: NodeInfo,
459 metrics: Arc<RwLock<Registry>>,
460 checkpoint_watcher: n0_watcher::Watchable<CheckpointId>,
461 all_nodes: Vec<NodeInfoWithAddr>,
462}
463
464impl<D: SetupData> SimNode<D> {
465 async fn spawn_and_run(
466 builder: NodeBuilderWithIdx<D>,
467 client: TraceClient,
468 trace_id: Uuid,
469 setup_data: &D,
470 rounds: u32,
471 ) -> Result<()> {
472 let secret_key = SecretKey::generate(&mut rand::rng());
473 let NodeBuilderWithIdx { node_idx, builder } = builder;
474 let info = NodeInfo {
475 node_id: Some(secret_key.public()),
477 idx: node_idx,
478 label: None,
479 };
480 let mut registry = Registry::default();
481 let mut context = SpawnContext {
482 setup_data,
483 node_idx,
484 secret_key,
485 registry: &mut registry,
486 };
487 let node = (builder.spawn_fn)(&mut context).await?;
488
489 if let Some(endpoint) = node.endpoint() {
490 registry.register_all(endpoint.metrics());
491 }
492
493 let mut node = Self {
494 node,
495 trace_id,
496 idx: node_idx,
497 info,
498 round: 0,
499 round_fn: builder.round_fn,
500 check_fn: builder.check_fn,
501 round_label_fn: builder.round_label_fn,
502 checkpoint_watcher: Watchable::new(0),
503 metrics: Arc::new(RwLock::new(registry)),
504 all_nodes: Default::default(),
505 };
506
507 let res = node
508 .run(&client, setup_data, rounds)
509 .await
510 .with_context(|| format!("node {} failed", node.idx));
511 if let Err(err) = &res {
512 warn!("node failed: {err:#}");
513 }
514 res
515 }
516
517 async fn run(&mut self, client: &TraceClient, setup_data: &D, rounds: u32) -> Result<()> {
518 let client = client.start_node(self.trace_id, self.info.clone()).await?;
519
520 info!(idx = self.idx, "start");
521
522 if let Some(node_id) = self.node_id() {
524 let client = client.clone();
525 let mut watcher = self.checkpoint_watcher.watch();
526 let mut metrics_encoder = Encoder::new(self.metrics.clone());
527 tokio::task::spawn(
528 async move {
529 let mut interval = tokio::time::interval(METRICS_INTERVAL);
530 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
531 loop {
532 let checkpoint = tokio::select! {
533 _ = interval.tick() => None,
534 checkpoint = watcher.updated() => {
535 match checkpoint {
536 Err(_) => break,
537 Ok(checkpoint) => Some(checkpoint)
538 }
539 }
540 };
541 if let Err(err) = client
542 .put_metrics(node_id, checkpoint, metrics_encoder.export())
543 .await
544 {
545 warn!(?err, "failed to put metrics, stop metrics task");
546 break;
547 }
548 }
549 }
550 .instrument(error_span!("metrics")),
551 );
552 }
553
554 let info = NodeInfoWithAddr {
555 addr: self.my_addr().await,
556 info: self.info.clone(),
557 };
558 let all_nodes = client.wait_start(info).await?;
559 self.all_nodes = all_nodes;
560
561 let result = self.run_rounds(&client, setup_data, rounds).await;
562
563 if let Err(err) = self.node.shutdown().await {
564 warn!("failure during node shutdown: {err:#}");
565 }
566
567 client.end(to_str_err(&result)).await?;
568
569 result
570 }
571
572 async fn run_rounds(
573 &mut self,
574 client: &ActiveTrace,
575 setup_data: &D,
576 rounds: u32,
577 ) -> Result<()> {
578 while self.round < rounds {
579 if !self
580 .run_round(client, setup_data)
581 .await
582 .with_context(|| format!("failed at round {}", self.round))?
583 {
584 return Ok(());
585 }
586 self.round += 1;
587 }
588 Ok(())
589 }
590
591 #[tracing::instrument(name="round", skip_all, fields(round=self.round))]
592 async fn run_round(&mut self, client: &ActiveTrace, setup_data: &D) -> Result<bool> {
593 let context = RoundContext {
594 round: self.round,
595 node_index: self.idx,
596 setup_data,
597 all_nodes: &self.all_nodes,
598 };
599
600 let label = (self.round_label_fn)(setup_data, self.round)
601 .unwrap_or_else(|| format!("Round {}", self.round));
602
603 info!(%label, "start round");
604
605 let result = (self.round_fn)(&mut self.node, &context)
606 .await
607 .context("round function failed");
608
609 info!(%label, "end round");
610
611 let checkpoint = (context.round + 1) as u64;
612 self.checkpoint_watcher.set(checkpoint).ok();
613 client
614 .put_checkpoint(checkpoint, Some(label), to_str_err(&result))
615 .await
616 .context("put checkpoint")?;
617
618 client
619 .wait_checkpoint(checkpoint)
620 .await
621 .context("wait checkpoint")?;
622
623 match result {
624 Ok(out) => {
625 if let Some(check_fn) = self.check_fn.as_ref() {
626 (check_fn)(&self.node, &context).context("check function failed")?;
627 }
628 Ok(out)
629 }
630 Err(err) => Err(err),
631 }
632 }
633
634 fn node_id(&self) -> Option<NodeId> {
635 self.info.node_id
636 }
637
638 async fn my_addr(&self) -> Option<NodeAddr> {
639 if let Some(endpoint) = self.node.endpoint() {
640 Some(node_addr(endpoint).await)
641 } else {
642 None
643 }
644 }
645}
646
647async fn node_addr(endpoint: &Endpoint) -> NodeAddr {
648 endpoint.online().await;
649 endpoint.node_addr()
650}
651
652impl Default for Builder<()> {
653 fn default() -> Self {
654 Self::new()
655 }
656}
657
658impl Builder<()> {
659 pub fn new() -> Builder<()> {
661 let setup_fn: BoxedSetupFn<()> = Box::new(move || Box::pin(async move { Ok(()) }));
662 Builder {
663 node_builders: Vec::new(),
664 setup_fn,
665 rounds: 0,
666 }
667 }
668}
669impl<D: SetupData> Builder<D> {
670 pub fn with_setup<F, Fut>(setup_fn: F) -> Builder<D>
683 where
684 F: 'static + Send + Sync + FnOnce() -> Fut,
685 Fut: 'static + Send + Future<Output = Result<D>>,
686 {
687 let setup_fn: BoxedSetupFn<D> = Box::new(move || Box::pin(setup_fn()));
688 Builder {
689 node_builders: Vec::new(),
690 setup_fn,
691 rounds: 0,
692 }
693 }
694
695 pub fn rounds(mut self, rounds: u32) -> Self {
697 self.rounds = rounds;
698 self
699 }
700
701 pub fn spawn<N: Spawn<D>>(
709 mut self,
710 node_count: u32,
711 node_builder: impl Into<NodeBuilder<N, D>>,
712 ) -> Self {
713 let node_builder = node_builder.into();
714 self.node_builders.push(NodeBuilderWithCount {
715 count: node_count,
716 builder: node_builder.erase(),
717 });
718 self
719 }
720
721 pub async fn build(self, name: &str) -> Result<Simulation<D>> {
731 let client = TraceClient::from_env_or_local()?;
732 let run_mode = RunMode::from_env()?;
733
734 debug!(%name, ?run_mode, "build simulation run");
735
736 let (trace_id, setup_data) = if matches!(run_mode, RunMode::InitOnly | RunMode::Integrated)
737 {
738 let setup_data = (self.setup_fn)().await?;
739 let encoded_setup_data = Bytes::from(postcard::to_stdvec(&setup_data)?);
740 let node_count = self.node_builders.iter().map(|builder| builder.count).sum();
741 let trace_id = client
742 .init_trace(
743 TraceInfo {
744 name: name.to_string(),
745 node_count,
746 expected_checkpoints: Some(self.rounds as u64),
747 },
748 Some(encoded_setup_data),
749 )
750 .await?;
751 info!(%name, node_count, %trace_id, "init simulation");
752
753 (trace_id, setup_data)
754 } else {
755 let info = client.get_trace(name.to_string()).await?;
756 let GetTraceResponse {
757 trace_id,
758 info,
759 setup_data,
760 } = info;
761 info!(%name, node_count=info.node_count, %trace_id, "get simulation");
762 let setup_data = setup_data.context("expected setup data to be set")?;
763 let setup_data: D =
764 postcard::from_bytes(&setup_data).context("failed to decode setup data")?;
765 (trace_id, setup_data)
766 };
767
768 let mut node_builders = self
769 .node_builders
770 .into_iter()
771 .flat_map(|builder| (0..builder.count).map(move |_| builder.builder.clone()))
772 .enumerate()
773 .map(|(node_idx, builder)| NodeBuilderWithIdx {
774 node_idx: node_idx as u32,
775 builder,
776 });
777
778 let node_builders: Vec<_> = match run_mode {
779 RunMode::InitOnly => vec![],
780 RunMode::Integrated => node_builders.collect(),
781 RunMode::Isolated(idx) => vec![
782 node_builders
783 .nth(idx as usize)
784 .context("invalid isolated index")?,
785 ],
786 };
787
788 Ok(Simulation {
789 run_mode,
790 max_rounds: self.rounds,
791 node_builders,
792 client,
793 trace_id,
794 setup_data,
795 })
796 }
797}
798
799struct NodeBuilderWithCount<D> {
800 count: u32,
801 builder: ErasedNodeBuilder<D>,
802}
803
804struct NodeBuilderWithIdx<D> {
805 node_idx: u32,
806 builder: ErasedNodeBuilder<D>,
807}
808
809pub struct Simulation<D> {
814 trace_id: Uuid,
815 run_mode: RunMode,
816 client: TraceClient,
817 setup_data: D,
818 max_rounds: u32,
819 node_builders: Vec<NodeBuilderWithIdx<D>>,
820}
821
822impl<D: SetupData> Simulation<D> {
823 pub async fn run(self) -> Result<()> {
832 let cancel_token = CancellationToken::new();
833
834 let logs_scope = match self.run_mode {
836 RunMode::Isolated(idx) => Some(Scope::Isolated(idx)),
837 RunMode::Integrated => Some(Scope::Integrated),
838 RunMode::InitOnly => None,
840 };
841 let logs_task = if let Some(scope) = logs_scope {
842 Some(spawn_logs_task(
843 self.client.clone(),
844 self.trace_id,
845 scope,
846 cancel_token.clone(),
847 ))
848 } else {
849 None
850 };
851
852 let result = self
854 .node_builders
855 .into_iter()
856 .map(async |builder| {
857 let span = error_span!("sim-node", idx = builder.node_idx);
858 SimNode::spawn_and_run(
859 builder,
860 self.client.clone(),
861 self.trace_id,
862 &self.setup_data,
863 self.max_rounds,
864 )
865 .instrument(span)
866 .await
867 })
868 .try_join_all()
869 .await
870 .map(|_list| ());
871
872 cancel_token.cancel();
873 if let Some(join_handle) = logs_task {
874 join_handle.await?;
875 }
876
877 if matches!(self.run_mode, RunMode::Integrated) {
878 self.client
879 .close_trace(self.trace_id, to_str_err(&result))
880 .await?;
881 }
882
883 result
884 }
885}
886
887#[derive(Debug, Copy, Clone)]
888enum RunMode {
889 InitOnly,
890 Integrated,
891 Isolated(u32),
892}
893
894impl RunMode {
895 fn from_env() -> Result<Self> {
896 if std::env::var(ENV_TRACE_INIT_ONLY).is_ok() {
897 Ok(Self::InitOnly)
898 } else {
899 match std::env::var(ENV_TRACE_ISOLATED) {
900 Err(_) => Ok(Self::Integrated),
901 Ok(s) => {
902 let idx = s.parse().with_context(|| {
903 format!("Failed to parse env var `{ENV_TRACE_ISOLATED}` as number")
904 })?;
905 Ok(Self::Isolated(idx))
906 }
907 }
908 }
909 }
910}
911
912fn spawn_logs_task(
915 client: TraceClient,
916 trace_id: Uuid,
917 scope: Scope,
918 cancel_token: CancellationToken,
919) -> tokio::task::JoinHandle<()> {
920 tokio::task::spawn(async move {
921 loop {
922 if cancel_token
923 .run_until_cancelled(tokio::time::sleep(Duration::from_secs(1)))
924 .await
925 .is_none()
926 {
927 break;
928 }
929 let lines = self::trace::get_logs();
930 if lines.is_empty() {
931 continue;
932 }
933 for lines_chunk in lines.chunks(500) {
936 if let Err(e) = client.put_logs(trace_id, scope, lines_chunk.to_vec()).await {
937 eprintln!(
938 "warning: failed to submit logs due to error, stopping log submission now: {e:?}"
939 );
940 break;
941 }
942 }
943 if cancel_token.is_cancelled() {
944 break;
945 }
946 }
947 })
948}
949
950static PERMIT: Semaphore = Semaphore::const_new(1);
951
952#[doc(hidden)]
962pub async fn run_sim_fn<F, Fut, D, E>(name: &str, sim_fn: F) -> anyhow::Result<()>
963where
964 F: Fn() -> Fut,
965 Fut: Future<Output = Result<Builder<D>, E>>,
966 D: SetupData,
967 anyhow::Error: From<E>,
968{
969 let permit = PERMIT.acquire().await.expect("semaphore closed");
971
972 self::trace::init();
974 self::trace::global_writer().clear();
976
977 eprintln!("running simulation: {name}");
978 let result = sim_fn()
979 .await
980 .map_err(anyhow::Error::from)
981 .with_context(|| format!("simulation builder function `{name}` failed"))?
982 .build(name)
983 .await
984 .with_context(|| format!("simulation `{name}` failed to start"))?
985 .run()
986 .await
987 .with_context(|| format!("simulation `{name}` failed to complete"));
988
989 match &result {
990 Ok(()) => eprintln!("simulation `{name}` passed"),
991 Err(err) => eprintln!("simulation `{name}` failed: {err:#}"),
992 };
993
994 drop(permit);
995
996 result
997}
998
999fn to_str_err<T>(res: &Result<T, anyhow::Error>) -> Result<(), String> {
1000 if let Some(err) = res.as_ref().err() {
1001 Err(format!("{err:?}"))
1002 } else {
1003 Ok(())
1004 }
1005}