iroh_gossip/proto/
sim.rs

1//! Simulation framework for testing the protocol implementation
2
3use std::{
4    collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque},
5    fmt,
6    str::FromStr,
7};
8
9use bytes::Bytes;
10use n0_future::time::{Duration, Instant};
11use rand::{seq::IteratorRandom, Rng, SeedableRng};
12use rand_chacha::ChaCha12Rng;
13use serde::{Deserialize, Serialize};
14use tracing::{debug, debug_span, info, info_span, trace, warn};
15
16use super::{Command, Config, Event, InEvent, OutEvent, PeerIdentity, State, TopicId};
17use crate::proto::{PeerData, Scope};
18
19const DEFAULT_LATENCY_STATIC: Duration = Duration::from_millis(50);
20const DEFAULT_LATENCY_MIN: Duration = Duration::from_millis(10);
21const DEFAULT_LATENCY_MAX: Duration = Duration::from_millis(100);
22
23/// Configuration for a [`Network`].
24#[derive(Debug, Clone, Default, Serialize, Deserialize)]
25pub struct NetworkConfig {
26    /// Configures the latency between peers.
27    #[serde(default)]
28    pub latency: LatencyConfig,
29    /// Default protocol config for all peers.
30    #[serde(default)]
31    pub proto: Config,
32}
33
34impl From<Config> for NetworkConfig {
35    fn from(config: Config) -> Self {
36        Self {
37            latency: Default::default(),
38            proto: config,
39        }
40    }
41}
42
43/// Configures the latency between peers.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45#[serde(rename_all = "lowercase")]
46pub enum LatencyConfig {
47    /// Use the same latency, always.
48    Static(#[serde(with = "humantime_serde")] Duration),
49    /// Chose a random latency for each connection within the specified bounds.
50    Dynamic {
51        /// The lower bound for the latency between two peers.
52        #[serde(with = "humantime_serde")]
53        min: Duration,
54        /// The upper bound for the latency between two peers.
55        #[serde(with = "humantime_serde")]
56        max: Duration,
57    },
58}
59
60impl LatencyConfig {
61    /// Returns a default latency config with a static latency.
62    pub fn default_static() -> Self {
63        Self::Static(DEFAULT_LATENCY_STATIC)
64    }
65
66    /// Returns a default latency config with a dynamic latency.
67    pub fn default_dynamic() -> Self {
68        Self::Dynamic {
69            min: DEFAULT_LATENCY_MIN,
70            max: DEFAULT_LATENCY_MAX,
71        }
72    }
73
74    /// Creates a new latency config with the provided min and max values in milliseconds.
75    pub fn random_ms(min: u64, max: u64) -> Self {
76        Self::Dynamic {
77            min: Duration::from_millis(min),
78            max: Duration::from_millis(max),
79        }
80    }
81
82    /// Returns the maximum latency possible.
83    pub fn max(&self) -> Duration {
84        match self {
85            Self::Static(dur) => *dur,
86            Self::Dynamic { max, .. } => *max,
87        }
88    }
89
90    /// Returns a new latency value to use for a peer connection.
91    pub fn r#gen(&self, mut rng: impl Rng) -> Duration {
92        match self {
93            Self::Static(d) => *d,
94            // TODO(frando): use uniform distribution?
95            Self::Dynamic { min, max } => rng.random_range(*min..*max),
96        }
97    }
98}
99
100impl Default for LatencyConfig {
101    fn default() -> Self {
102        Self::default_dynamic()
103    }
104}
105
106/// Test network implementation.
107///
108/// A discrete event simulation of a gossip swarm.
109#[derive(Debug)]
110pub struct Network<PI, R> {
111    start: Instant,
112    time: Instant,
113    tick: usize,
114    peers: BTreeMap<PI, State<PI, R>>,
115    conns: BTreeSet<ConnId<PI>>,
116    events: VecDeque<(PI, TopicId, Event<PI>)>,
117    latencies: BTreeMap<ConnId<PI>, Duration>,
118    rng: R,
119    config: NetworkConfig,
120    queue: TimedEventQueue<PI>,
121}
122
123impl<PI, R> Network<PI, R> {
124    /// Creates a new network.
125    pub fn new(config: NetworkConfig, rng: R) -> Self {
126        let time = Instant::now();
127        Self {
128            tick: 0,
129            start: time,
130            time,
131            config,
132            queue: Default::default(),
133            peers: Default::default(),
134            conns: Default::default(),
135            events: Default::default(),
136            latencies: BTreeMap::new(),
137            rng,
138        }
139    }
140}
141
142impl<PI: PeerIdentity + fmt::Display, R: Rng + SeedableRng + Clone> Network<PI, R> {
143    /// Inserts a new peer.
144    ///
145    /// Panics if the peer already exists.
146    pub fn insert(&mut self, peer_id: PI) {
147        let config = self.config.proto.clone();
148        self.insert_with_config(peer_id, config);
149    }
150
151    /// Inserts a new peer with the specified protocol config.
152    ///
153    /// Panics if the peer already exists.
154    pub fn insert_with_config(&mut self, peer_id: PI, config: Config) {
155        assert!(
156            !self.peers.contains_key(&peer_id),
157            "duplicate peer: {peer_id:?}"
158        );
159        let rng = R::from_rng(&mut self.rng);
160        let state = State::new(peer_id, PeerData::default(), config, rng);
161        self.peers.insert(peer_id, state);
162    }
163
164    /// Inserts a new peer and joins a topic with a set of bootstrap nodes.
165    ///
166    /// Panics if the peer already exists.
167    pub fn insert_and_join(&mut self, peer_id: PI, topic: TopicId, bootstrap: Vec<PI>) {
168        self.insert(peer_id);
169        self.command(peer_id, topic, Command::Join(bootstrap));
170    }
171}
172
173impl<PI: PeerIdentity + fmt::Display, R: Rng + Clone> Network<PI, R> {
174    /// Drains all queued events.
175    pub fn events(&mut self) -> impl Iterator<Item = (PI, TopicId, Event<PI>)> + '_ {
176        self.events.drain(..)
177    }
178
179    /// Drains all queued events and returns them in a sorted vector.
180    pub fn events_sorted(&mut self) -> Vec<(PI, TopicId, Event<PI>)> {
181        sort(self.events().collect())
182    }
183
184    /// Returns all active connections.
185    pub fn conns(&self) -> Vec<(PI, PI)> {
186        sort(self.conns.iter().cloned().map(Into::into).collect())
187    }
188
189    /// Queues and performs a command.
190    pub fn command(&mut self, peer: PI, topic: TopicId, command: Command<PI>) {
191        debug!(?peer, "~~ COMMAND {command:?}");
192        self.queue
193            .insert(self.time, peer, InEvent::Command(topic, command));
194        self.tick();
195    }
196
197    /// Returns an iterator over the [`State`] for each peer.
198    pub fn peer_states(&self) -> impl Iterator<Item = &State<PI, R>> {
199        self.peers.values()
200    }
201
202    /// Returns an iterator over the node ids of all peers.
203    pub fn peer_ids(&self) -> impl Iterator<Item = PI> + '_ {
204        self.peers.keys().cloned()
205    }
206
207    /// Returns the [`State`] for a peer, if it exists.
208    pub fn peer(&self, peer: &PI) -> Option<&State<PI, R>> {
209        self.peers.get(peer)
210    }
211
212    /// Returns the neighbors a peer has on the swarm membership layer.
213    pub fn neighbors(&self, peer: &PI, topic: &TopicId) -> Option<Vec<PI>> {
214        let peer = self.peer(peer)?;
215        let state = peer.state(topic)?;
216        Some(state.swarm.active_view.iter().cloned().collect::<Vec<_>>())
217    }
218
219    /// Removes a peer, breaking all connections to other peers.
220    pub fn remove(&mut self, peer: &PI) {
221        let remove_conns: Vec<_> = self
222            .conns
223            .iter()
224            .filter(|&c| c.peers().contains(peer))
225            .cloned()
226            .collect();
227        for conn in remove_conns.into_iter() {
228            self.kill_connection(*peer, conn.other(*peer).unwrap());
229        }
230        self.peers.remove(peer);
231    }
232
233    /// Returns the time elapsed since starting the network.
234    pub fn elapsed(&self) -> Duration {
235        self.time.duration_since(self.start)
236    }
237
238    /// Returns the time elapsed since starting the network, formatted as seconds with limited decimals.
239    pub fn elapsed_fmt(&self) -> String {
240        format!("{:>2.4}s", self.elapsed().as_secs_f32())
241    }
242
243    /// Runs the simulation for `n` times the maximum latency between peers.
244    pub fn run_trips(&mut self, n: usize) {
245        let duration = self.config.latency.max() * n as u32;
246        self.run_duration(duration)
247    }
248
249    /// Runs the simulation for `timeout`.
250    pub fn run_duration(&mut self, timeout: Duration) {
251        let end = self.time + timeout;
252        while self.queue.next_before(end) {
253            self.tick();
254        }
255        assert!(self.time <= end);
256        self.time = end;
257    }
258
259    /// Runs the simulation while `f` returns `true`.
260    ///
261    /// The callback will be called for each emitted event.
262    pub fn run_while(&mut self, mut f: impl FnMut(PI, TopicId, Event<PI>) -> bool) {
263        loop {
264            while let Some((peer, topic, event)) = self.events.pop_front() {
265                if !f(peer, topic, event) {
266                    return;
267                }
268            }
269            self.tick();
270        }
271    }
272
273    /// Runs the simulation while `f` returns `true`, aborting after `timeout`.
274    ///
275    /// The callback will be called for each emitted event.
276    pub fn run_while_with_timeout(
277        &mut self,
278        timeout: Duration,
279        mut f: impl FnMut(PI, TopicId, Event<PI>) -> bool,
280    ) {
281        let end = self.time + timeout;
282        loop {
283            while let Some((peer, topic, event)) = self.events.pop_front() {
284                if !f(peer, topic, event) {
285                    return;
286                }
287            }
288            if self.queue.next_before(end) {
289                self.tick();
290            } else {
291                break;
292            }
293        }
294        assert!(self.time <= end);
295        self.time = end;
296    }
297
298    fn tick(&mut self) {
299        self.tick += 1;
300        let Some((time, peer, event)) = self.queue.pop() else {
301            warn!("tick on empty queue");
302            return;
303        };
304        assert!(time >= self.time);
305        self.time = time;
306        let span = debug_span!("tick", %peer, tick = %self.tick, t = %self.elapsed_fmt());
307        let _guard = span.enter();
308        debug!("~~ TICK ");
309
310        let Some(state) = self.peers.get_mut(&peer) else {
311            // TODO: queue PeerDisconnected for sender?
312            warn!(?time, ?peer, ?event, "event for dead peer");
313            return;
314        };
315        if let InEvent::RecvMessage(from, _message) = &event {
316            self.conns.insert((*from, peer).into());
317        }
318        let out = state.handle(event, self.time, None);
319        let mut kill = vec![];
320        for event in out {
321            match event {
322                OutEvent::SendMessage(to, message) => {
323                    let latency = latency_between(
324                        &self.config.latency,
325                        &mut self.latencies,
326                        &peer,
327                        &to,
328                        &mut self.rng,
329                    );
330                    self.queue
331                        .insert(self.time + latency, to, InEvent::RecvMessage(peer, message));
332                }
333                OutEvent::ScheduleTimer(time, timer) => {
334                    self.queue
335                        .insert(self.time + time, peer, InEvent::TimerExpired(timer));
336                }
337                OutEvent::DisconnectPeer(to) => {
338                    debug!(peer = ?peer, other = ?to, "disconnect");
339                    kill.push((peer, to));
340                }
341                OutEvent::EmitEvent(topic, event) => {
342                    debug!(peer = ?peer, "emit {event:?}");
343                    self.events.push_back((peer, topic, event));
344                }
345                OutEvent::PeerData(_peer, _data) => {}
346            }
347        }
348        for (from, to) in kill {
349            self.kill_connection(from, to);
350        }
351    }
352
353    /// Breaks the connection between two peers.
354    ///
355    /// The `to` peer will received a [`InEvent::PeerDisconnected`] after a latency interval.
356    fn kill_connection(&mut self, from: PI, to: PI) {
357        let conn = ConnId::from((from, to));
358        if self.conns.remove(&conn) {
359            // We add the event a microsecond after the regular latency between the two peers,
360            // so that any messages queued from the current time arrive before the disconnected event.
361            let latency = latency_between(
362                &self.config.latency,
363                &mut self.latencies,
364                &from,
365                &to,
366                &mut self.rng,
367            ) + Duration::from_micros(1);
368            self.queue
369                .insert(self.time + latency, to, InEvent::PeerDisconnected(from));
370        }
371    }
372
373    /// Checks if all neighbor and eager relations are synchronous.
374    ///
375    /// Iterates over all peers, and checks for each peer X:
376    /// - that all active view members (neighbors) have X listed as a neighbor as well
377    /// - that all eager peers have X listed as eager as well
378    ///
379    /// Returns `true` if this is holds, otherwise returns `false`.
380    ///
381    /// Logs, at debug level, the cases where the above doesn't hold.
382    pub fn check_synchronicity(&self) -> bool {
383        let mut ok = true;
384        for state in self.peers.values() {
385            let peer = *state.me();
386            for (topic, state) in state.states() {
387                for other in state.swarm.active_view.iter() {
388                    let other_state = &self
389                        .peers
390                        .get(other)
391                        .unwrap()
392                        .state(topic)
393                        .unwrap()
394                        .swarm
395                        .active_view;
396                    if !other_state.contains(&peer) {
397                        debug!(node = %peer, other = ?other, "missing active_view peer in other");
398                        ok = false;
399                    }
400                }
401                for other in state.gossip.eager_push_peers.iter() {
402                    let other_state = &self
403                        .peers
404                        .get(other)
405                        .unwrap()
406                        .state(topic)
407                        .unwrap()
408                        .gossip
409                        .eager_push_peers;
410                    if !other_state.contains(&peer) {
411                        debug!(node = %peer, other = ?other, "missing eager_push peer in other");
412                        ok = false;
413                    }
414                }
415            }
416        }
417        ok
418    }
419
420    /// Returns a report with histograms on active, passive, eager and lazy counts.
421    pub fn report(&self) -> NetworkReport<PI> {
422        let mut histograms = NetworkHistograms::default();
423        let mut peers_without_neighbors = Vec::new();
424        for (id, peer) in self.peers.iter() {
425            let state = peer.state(&TOPIC).unwrap();
426            add_one(&mut histograms.active, state.swarm.active_view.len());
427            add_one(&mut histograms.passive, state.swarm.passive_view.len());
428            add_one(&mut histograms.eager, state.gossip.eager_push_peers.len());
429            add_one(&mut histograms.lazy, state.gossip.lazy_push_peers.len());
430            if state.swarm.active_view.is_empty() {
431                peers_without_neighbors.push(*id);
432                trace!(node=%id, active = ?state.swarm.active_view.iter().collect::<Vec<_>>(), passive=?state.swarm.passive_view.iter().collect::<Vec<_>>(), "active view empty^");
433            }
434        }
435        NetworkReport {
436            histograms,
437            peer_count: self.peers.len(),
438            peers_without_neighbors,
439        }
440    }
441}
442
443fn latency_between<PI: PeerIdentity + Ord + PartialOrd, R: Rng>(
444    latency_config: &LatencyConfig,
445    latencies: &mut BTreeMap<ConnId<PI>, Duration>,
446    a: &PI,
447    b: &PI,
448    rng: &mut R,
449) -> Duration {
450    let id: ConnId<PI> = (*a, *b).into();
451    *latencies
452        .entry(id)
453        .or_insert_with(|| latency_config.r#gen(rng))
454}
455
456#[derive(Debug)]
457struct TimedEventQueue<PI> {
458    seq: i32,
459    events: BinaryHeap<(TimedEvent<PeerEvent<PI>>, i32)>,
460}
461
462impl<PI> Default for TimedEventQueue<PI> {
463    fn default() -> Self {
464        Self {
465            seq: 0,
466            events: Default::default(),
467        }
468    }
469}
470
471impl<PI> TimedEventQueue<PI> {
472    fn insert(&mut self, time: Instant, peer: PI, event: InEvent<PI>) {
473        let seq = self.seq;
474        self.seq += 1;
475        self.events.push((
476            TimedEvent {
477                time,
478                event: PeerEvent(peer, event),
479            },
480            -seq,
481        ))
482    }
483
484    fn pop(&mut self) -> Option<(Instant, PI, InEvent<PI>)> {
485        self.events
486            .pop()
487            .map(|(e, _)| (e.time, e.event.0, e.event.1))
488    }
489
490    fn peek_next(&self) -> Option<Instant> {
491        self.events.peek().map(|(e, _)| e.time)
492    }
493
494    fn next_before(&self, before: Instant) -> bool {
495        match self.peek_next() {
496            None => false,
497            Some(at) => at <= before,
498        }
499    }
500}
501
502#[derive(Debug)]
503struct TimedEvent<E> {
504    time: Instant,
505    event: E,
506}
507
508#[derive(Debug)]
509struct PeerEvent<PI>(PI, InEvent<PI>);
510
511impl<E> Eq for TimedEvent<E> {}
512
513impl<E> PartialEq for TimedEvent<E> {
514    fn eq(&self, other: &Self) -> bool {
515        self.time.eq(&other.time)
516    }
517}
518
519impl<E> PartialOrd for TimedEvent<E> {
520    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
521        Some(self.cmp(other))
522    }
523}
524
525impl<E> Ord for TimedEvent<E> {
526    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
527        self.time.cmp(&other.time).reverse()
528    }
529}
530
531/// The peer id type used in the simulator.
532type PeerId = u64;
533
534/// Configuration for the [`Simulator`].
535#[derive(Debug, Serialize, Deserialize)]
536pub struct SimulatorConfig {
537    /// Seed for the random number generator used in the nodes
538    pub rng_seed: u64,
539    /// Number of nodes to create
540    pub peers: usize,
541    /// Timeout after which a gossip round is aborted
542    pub gossip_round_timeout: Duration,
543}
544
545/// Variants how to bootstrap the swarm.
546#[derive(Debug, Serialize, Deserialize, Clone, Default)]
547#[serde(rename_all = "lowercase")]
548pub enum BootstrapMode {
549    /// All peers join a single peer.
550    #[default]
551    Single,
552    /// First `count` bootstrap peers are created and join each other,
553    /// then the remaining peers join the swarm by joining one of these first `count` peers.
554    Set {
555        /// Number of bootstrap peers to join first
556        count: u64,
557    },
558}
559
560impl SimulatorConfig {
561    /// Creates a [`SimulatorConfig`] by reading from environment variables.
562    ///
563    /// [`Self::peers`] is read from `PEERS`, defaulting to `100` if unset.
564    /// [`Self::rng_seed`] is read from `SEED`, defaulting to `0` if unset.
565    /// [`Self::gossip_round_timeout`] is read, as seconds, from `GOSSIP_ROUND_TIMEOUT`, defaulting to `5` if unset.
566    pub fn from_env() -> Self {
567        let peer = read_var("PEERS", 100);
568        Self {
569            rng_seed: read_var("SEED", 0),
570            peers: peer,
571            gossip_round_timeout: Duration::from_secs(read_var("GOSSIP_ROUND_TIMEOUT", 5)),
572        }
573    }
574}
575
576impl Default for SimulatorConfig {
577    fn default() -> Self {
578        Self {
579            rng_seed: 0,
580            peers: 100,
581            gossip_round_timeout: Duration::from_secs(5),
582        }
583    }
584}
585
586/// Statistics for a gossip round.
587#[derive(Debug, Default, Clone, Serialize, Deserialize)]
588pub struct RoundStats {
589    /// The (simulated) time this round took in total.
590    pub duration: Duration,
591    /// The relative message redundancy in this round.
592    pub rmr: f32,
593    /// The maximum last delivery hop in this round.
594    pub ldh: f32,
595    /// The number of undelivered messages in this round.
596    pub missed: f32,
597}
598
599/// Difference (as factors) between two [`RoundStats`].
600#[derive(Debug, Default, Clone, Serialize, Deserialize)]
601pub struct RoundStatsDiff {
602    /// The difference in [`RoundStats::duration`], as a factor.
603    pub duration: f32,
604    /// The difference in [`RoundStats::rmr`], as a factor.
605    pub rmr: f32,
606    /// The difference in [`RoundStats::ldh`], as a factor.
607    pub ldh: f32,
608    /// The difference in [`RoundStats::missed`], as a factor.
609    pub missed: f32,
610}
611
612impl fmt::Display for RoundStats {
613    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
614        write!(
615            f,
616            "RMR {:>6.2} LDH {:>6.2} duration {:>6.2}ms missed {:>10.2}",
617            self.rmr,
618            self.ldh,
619            self.duration.as_millis(),
620            self.missed
621        )
622    }
623}
624
625impl RoundStats {
626    fn new_max() -> Self {
627        Self {
628            duration: Duration::MAX,
629            rmr: f32::MAX,
630            ldh: f32::MAX,
631            missed: f32::MAX,
632        }
633    }
634
635    /// Calculates the mean for each value in a list of [`RoundStats`].
636    pub fn mean<'a>(rounds: impl IntoIterator<Item = &'a RoundStats>) -> RoundStats {
637        let (len, mut avg) =
638            rounds
639                .into_iter()
640                .fold((0., RoundStats::default()), |(len, mut agg), round| {
641                    agg.rmr += round.rmr;
642                    agg.ldh += round.ldh;
643                    agg.duration += round.duration;
644                    agg.missed += round.missed;
645                    (len + 1., agg)
646                });
647        avg.rmr /= len;
648        avg.ldh /= len;
649        avg.missed /= len;
650        avg.duration /= len as u32;
651        avg
652    }
653
654    /// Calculates the minimum for each value in a list of [`RoundStats`].
655    pub fn min<'a>(rounds: impl IntoIterator<Item = &'a RoundStats>) -> RoundStats {
656        rounds
657            .into_iter()
658            .fold(RoundStats::new_max(), |mut agg, round| {
659                agg.rmr = agg.rmr.min(round.rmr);
660                agg.ldh = agg.ldh.min(round.ldh);
661                agg.duration = agg.duration.min(round.duration);
662                agg.missed = agg.missed.min(round.missed);
663                agg
664            })
665    }
666
667    /// Calculates the maximum for each value in a list of [`RoundStats`].
668    pub fn max<'a>(rounds: impl IntoIterator<Item = &'a RoundStats>) -> RoundStats {
669        rounds
670            .into_iter()
671            .fold(RoundStats::default(), |mut agg, round| {
672                agg.rmr = agg.rmr.max(round.rmr);
673                agg.ldh = agg.ldh.max(round.ldh);
674                agg.duration = agg.duration.max(round.duration);
675                agg.missed = agg.missed.max(round.missed);
676                agg
677            })
678    }
679
680    /// Calculates the minimum, maximum, and mean for each value in a list of [`RoundStats`].
681    pub fn avg(rounds: &[RoundStats]) -> RoundStatsAvg {
682        let len = rounds.len();
683        let min = Self::min(rounds);
684        let max = Self::max(rounds);
685        let mean = Self::mean(rounds);
686        RoundStatsAvg {
687            len,
688            min,
689            max,
690            mean,
691        }
692    }
693
694    /// Calculates the difference factors for each value between `self` and `other`.
695    pub fn diff(&self, other: &Self) -> RoundStatsDiff {
696        RoundStatsDiff {
697            duration: diff_percent(self.duration.as_secs_f32(), other.duration.as_secs_f32()),
698            rmr: diff_percent(self.rmr, other.rmr),
699            ldh: diff_percent(self.ldh, other.ldh),
700            missed: diff_percent(self.missed, other.missed),
701        }
702    }
703}
704
705fn diff_percent(a: f32, b: f32) -> f32 {
706    if a == 0.0 && b == 0.0 {
707        0.0
708    } else if b == 0.0 {
709        -1.0
710    } else if a == 0.0 {
711        1.0
712    } else {
713        (b - a) / a
714    }
715}
716
717/// Summary values for a list of [`RoundStats`].
718#[derive(Debug, Default, Clone, Serialize, Deserialize)]
719pub struct RoundStatsAvg {
720    /// The number of rounds for which this average is calculated.
721    pub len: usize,
722    /// The minimum values of the list.
723    pub min: RoundStats,
724    /// The maximum values of the list.
725    pub max: RoundStats,
726    /// The mean values of the list.
727    pub mean: RoundStats,
728}
729
730/// Difference, in factors, between two [`RoundStatsAvg`]
731#[derive(Debug, Default, Clone, Serialize, Deserialize)]
732pub struct RoundStatsAvgDiff {
733    /// The difference, as factors, in the minimums.
734    pub min: RoundStatsDiff,
735    /// The difference, as factors, in the maximumx.
736    pub max: RoundStatsDiff,
737    /// The difference, as factors, in the mean values.
738    pub mean: RoundStatsDiff,
739}
740
741impl RoundStatsAvg {
742    /// Calculates the difference, as factors, between `self` and `other`.
743    pub fn diff(&self, other: &Self) -> RoundStatsAvgDiff {
744        RoundStatsAvgDiff {
745            min: self.min.diff(&other.min),
746            max: self.max.diff(&other.max),
747            mean: self.mean.diff(&other.mean),
748        }
749    }
750
751    /// Calculates the average between a list of [`RoundStatsAvg`].
752    pub fn avg(rows: &[Self]) -> Self {
753        let len = rows.iter().map(|row| row.len).sum();
754        let min = RoundStats::min(rows.iter().map(|x| &x.min));
755        let max = RoundStats::min(rows.iter().map(|x| &x.max));
756        let mean = RoundStats::min(rows.iter().map(|x| &x.mean));
757        Self {
758            min,
759            max,
760            mean,
761            len,
762        }
763    }
764}
765
766/// Histograms on the distribution of peers in the network.
767///
768/// For each field, the map's key is the bucket value, and the map's value is
769/// the number of peers that fall into that bucket.
770#[derive(Debug, Default)]
771pub struct NetworkHistograms {
772    /// Distribution of active view (neighbor) counts.
773    pub active: BTreeMap<usize, usize>,
774    /// Distribution of passive view counts.
775    pub passive: BTreeMap<usize, usize>,
776    /// Distribution of eager peer counts.
777    pub eager: BTreeMap<usize, usize>,
778    /// Distribution of lazy peer counts.
779    pub lazy: BTreeMap<usize, usize>,
780}
781
782fn avg(map: &BTreeMap<usize, usize>) -> f32 {
783    let (sum, count) = map
784        .iter()
785        .fold((0, 0), |(sum, count), (k, v)| (sum + k * v, count + v));
786    if count != 0 {
787        sum as f32 / count as f32
788    } else {
789        0.
790    }
791}
792
793fn min(map: &BTreeMap<usize, usize>) -> usize {
794    map.first_key_value().map(|(k, _v)| *k).unwrap_or_default()
795}
796
797fn max(map: &BTreeMap<usize, usize>) -> usize {
798    map.last_key_value().map(|(k, _v)| *k).unwrap_or_default()
799}
800
801impl fmt::Display for NetworkHistograms {
802    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
803        write!(
804            f,
805            "    eager {:?}\n    lazy {:?}\n    active {:?}\n    passive {:?}",
806            self.eager, self.lazy, self.active, self.passive
807        )
808    }
809}
810
811/// A report on the state of a [`Network`].
812#[derive(Debug)]
813pub struct NetworkReport<PI> {
814    /// The number of peers in the network.
815    pub peer_count: usize,
816    /// List of peers that don't have any neighbors.
817    pub peers_without_neighbors: Vec<PI>,
818    /// Histograms of peer distribution metrics.
819    pub histograms: NetworkHistograms,
820}
821
822impl<PI> NetworkReport<PI> {
823    /// Returns `true` if the network contains peers that have no active neighbors.
824    pub fn has_peers_with_no_neighbors(&self) -> bool {
825        *self.histograms.active.get(&0).unwrap_or(&0) > 0
826    }
827}
828
829impl<PI> fmt::Display for NetworkReport<PI> {
830    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
831        writeln!(f, "peers: {}\n{}", self.peer_count, self.histograms)?;
832        if self.peers_without_neighbors.is_empty() {
833            writeln!(f, "(all have neighbors)")
834        } else {
835            writeln!(
836                f,
837                "({} peers have no neighbors)",
838                self.peers_without_neighbors.len()
839            )
840        }
841    }
842}
843
844const TOPIC: TopicId = TopicId::from_bytes([0u8; 32]);
845
846/// A simulator for the gossip protocol
847#[derive(Debug)]
848pub struct Simulator {
849    /// Configuration of the simulator.
850    pub config: SimulatorConfig,
851    /// The [`Network`]
852    pub network: Network<PeerId, rand_chacha::ChaCha12Rng>,
853    /// List of [`RoundStats`] of all previous rounds.
854    round_stats: Vec<RoundStats>,
855}
856
857impl Simulator {
858    /// Creates a new simulator.
859    pub fn new(
860        simulator_config: SimulatorConfig,
861        network_config: impl Into<NetworkConfig>,
862    ) -> Self {
863        let network_config = network_config.into();
864        info!("start {simulator_config:?} {network_config:?}");
865        let rng = rand_chacha::ChaCha12Rng::seed_from_u64(simulator_config.rng_seed);
866        Self {
867            network: Network::new(network_config, rng),
868            config: simulator_config,
869            round_stats: Default::default(),
870        }
871    }
872
873    /// Creates a new random number generator, derived from the simuator's RNG.
874    pub fn rng(&mut self) -> ChaCha12Rng {
875        ChaCha12Rng::from_rng(&mut self.network.rng)
876    }
877
878    /// Returns the peer id of a random peer.
879    pub fn random_peer(&mut self) -> PeerId {
880        *self
881            .network
882            .peers
883            .keys()
884            .choose(&mut self.network.rng)
885            .unwrap()
886    }
887
888    /// Returns the number of peers.
889    pub fn peer_count(&self) -> usize {
890        self.network.peers.len()
891    }
892
893    /// Removes `n` peers from the network.
894    pub fn remove_peers(&mut self, n: usize) {
895        for _i in 0..n {
896            let key = self.random_peer();
897            self.network.remove(&key);
898        }
899    }
900
901    /// Returns a report on the current state of the network.
902    pub fn report(&mut self) -> NetworkReport<PeerId> {
903        let report = self.network.report();
904        let min_active_len = min(&report.histograms.active);
905        let max_active_len = max(&report.histograms.active);
906        let avg = avg(&report.histograms.active);
907        let len = report.peer_count;
908        debug!(
909            "nodes {len} active: avg {avg:2.2} min {min_active_len} max {max_active_len} empty {}",
910            report.peers_without_neighbors.len()
911        );
912        report
913    }
914
915    /// Bootstraps the network.
916    ///
917    /// See [`BootstrapMode`] for details.
918    ///
919    /// Returns the [`NetworkReport`] after finishing the bootstrap.
920    pub fn bootstrap(&mut self, bootstrap_mode: BootstrapMode) -> NetworkReport<PeerId> {
921        let span = info_span!("bootstrap");
922        let _guard = span.enter();
923        info!("bootstrap {bootstrap_mode:?}");
924
925        let node_count = self.config.peers as u64;
926
927        match bootstrap_mode {
928            BootstrapMode::Single => {
929                self.network.insert_and_join(0, TOPIC, vec![]);
930                for i in 1..node_count {
931                    self.network.insert_and_join(i, TOPIC, vec![0]);
932                }
933                self.network.run_trips(20);
934            }
935            BootstrapMode::Set { count } => {
936                self.network.insert_and_join(0, TOPIC, vec![]);
937                for i in 1..count {
938                    self.network.insert_and_join(i, TOPIC, vec![0]);
939                }
940
941                self.network.run_trips(7);
942
943                for i in count..node_count {
944                    let contact = self.network.rng.random_range(0..count);
945                    self.network.insert_and_join(i, TOPIC, vec![contact]);
946                }
947
948                self.network.run_trips(20);
949            }
950        }
951
952        let report = self.report();
953        if report.has_peers_with_no_neighbors() {
954            warn!("failed to keep all nodes active after warmup: {report:?}");
955        } else {
956            info!("bootstrap complete, all nodes active");
957        }
958        report
959    }
960
961    /// Runs a round of gossiping.
962    ///
963    /// `messages` is a list of `(sender, message)` pairs. All messages will be sent simultaneously.
964    /// The round will run until all peers received all messages, or until [`SimulatorConfig::gossip_round_timeout`]
965    /// is elapsed.
966    ///
967    /// Returns the number of undelivered messages.
968    pub fn gossip_round(&mut self, messages: Vec<(PeerId, Bytes)>) -> usize {
969        let span = debug_span!("g", r = self.round_stats.len());
970        let _guard = span.enter();
971        self.reset_stats();
972        let start = self.network.time;
973        let expected_count: usize = messages.len() * (self.network.peers.len() - 1);
974        info!(
975            time=%self.network.elapsed_fmt(),
976            "round {i}: send {len} messages / recv {expected_count} total",
977            len = messages.len(),
978            i = self.round_stats.len()
979        );
980
981        // Send messages and keep track of expected receives
982        let mut missing: BTreeMap<PeerId, BTreeSet<Bytes>> = BTreeMap::new();
983        for (from, message) in messages {
984            for peer in self.network.peer_ids().filter(|p| *p != from) {
985                missing.entry(peer).or_default().insert(message.clone());
986            }
987            self.network
988                .command(from, TOPIC, Command::Broadcast(message, Scope::Swarm));
989        }
990
991        let timeout = self.config.gossip_round_timeout;
992        let mut received_count = 0;
993        self.network
994            .run_while_with_timeout(timeout, |peer, _topic, event| {
995                if let Event::Received(message) = event {
996                    let set = missing.get_mut(&peer).unwrap();
997                    if !set.remove(&message.content) {
998                        panic!("received duplicate message event");
999                    } else if set.is_empty() {
1000                        missing.remove(&peer);
1001                    }
1002                    received_count += 1;
1003                }
1004                received_count != expected_count
1005            });
1006
1007        let missing_count: usize = expected_count - received_count;
1008        if missing_count == 0 {
1009            info!("break: all messages received by all peers");
1010        } else {
1011            warn!("break: max ticks for round exceeded (still missing {missing_count})");
1012            debug!("missing: {missing:?}");
1013        }
1014        let elapsed = self.network.time.duration_since(start);
1015        self.report_gossip_round(expected_count, missing_count, elapsed);
1016        missing_count
1017    }
1018
1019    fn report_gossip_round(
1020        &mut self,
1021        expected_recv_count: usize,
1022        missed: usize,
1023        duration: Duration,
1024    ) {
1025        let payloud_msg_count = self.total_payload_messages();
1026        let ctrl_msg_count = self.total_control_messages();
1027        let rmr_expected_count = expected_recv_count - missed;
1028        let rmr = (payloud_msg_count as f32 / (rmr_expected_count as f32 - 1.)) - 1.;
1029        let ldh = self.max_ldh();
1030
1031        let round_stats = RoundStats {
1032            duration,
1033            rmr,
1034            ldh: ldh as f32,
1035            missed: missed as f32,
1036        };
1037        let histograms = self.network.report().histograms;
1038        info!(
1039            "round {}: pay {} ctrl {} {round_stats} \n{histograms}",
1040            self.round_stats.len(),
1041            payloud_msg_count,
1042            ctrl_msg_count,
1043        );
1044        self.round_stats.push(round_stats);
1045    }
1046
1047    /// Calculates the [`RoundStatsAvg`] of all gossip rounds.
1048    pub fn round_stats_average(&self) -> RoundStatsAvg {
1049        RoundStats::avg(&self.round_stats)
1050    }
1051
1052    fn reset_stats(&mut self) {
1053        for state in self.network.peers.values_mut() {
1054            state.reset_stats(&TOPIC);
1055        }
1056    }
1057
1058    fn max_ldh(&self) -> u16 {
1059        let mut max = 0;
1060        for state in self.network.peers.values() {
1061            let state = state.state(&TOPIC).unwrap();
1062            let stats = state.gossip.stats();
1063            max = max.max(stats.max_last_delivery_hop);
1064        }
1065        max
1066    }
1067
1068    fn total_payload_messages(&self) -> u64 {
1069        let mut sum = 0;
1070        for state in self.network.peers.values() {
1071            let state = state.state(&TOPIC).unwrap();
1072            let stats = state.gossip.stats();
1073            sum += stats.payload_messages_received;
1074        }
1075        sum
1076    }
1077
1078    fn total_control_messages(&self) -> u64 {
1079        let mut sum = 0;
1080        for state in self.network.peers.values() {
1081            let state = state.state(&TOPIC).unwrap();
1082            let stats = state.gossip.stats();
1083            sum += stats.control_messages_received;
1084        }
1085        sum
1086    }
1087}
1088
1089fn add_one(map: &mut BTreeMap<usize, usize>, key: usize) {
1090    let entry = map.entry(key).or_default();
1091    *entry += 1;
1092}
1093
1094/// Helper struct for active connections. A sorted tuple.
1095#[derive(Debug, Clone, PartialOrd, Ord, Eq, PartialEq, Hash)]
1096struct ConnId<PI>([PI; 2]);
1097impl<PI: Ord + Copy> ConnId<PI> {
1098    fn new(a: PI, b: PI) -> Self {
1099        let mut conn = [a, b];
1100        conn.sort();
1101        Self(conn)
1102    }
1103    fn peers(&self) -> [PI; 2] {
1104        self.0
1105    }
1106
1107    fn other(&self, other: PI) -> Option<PI> {
1108        if self.0[0] == other {
1109            Some(self.0[1])
1110        } else if self.0[1] == other {
1111            Some(self.0[0])
1112        } else {
1113            None
1114        }
1115    }
1116}
1117impl<PI: Ord + Copy> From<(PI, PI)> for ConnId<PI> {
1118    fn from((a, b): (PI, PI)) -> Self {
1119        Self::new(a, b)
1120    }
1121}
1122impl<PI: Copy> From<ConnId<PI>> for (PI, PI) {
1123    fn from(conn: ConnId<PI>) -> (PI, PI) {
1124        (conn.0[0], conn.0[1])
1125    }
1126}
1127
1128fn sort<T: Ord + Clone>(items: Vec<T>) -> Vec<T> {
1129    let mut sorted = items;
1130    sorted.sort();
1131    sorted
1132}
1133
1134fn read_var<T: FromStr<Err: fmt::Display + fmt::Debug>>(name: &str, default: T) -> T {
1135    std::env::var(name)
1136        .map(|x| {
1137            x.parse()
1138                .unwrap_or_else(|_| panic!("Failed to parse environment variable {name}"))
1139        })
1140        .unwrap_or(default)
1141}