1use std::{
4 collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque},
5 fmt,
6 str::FromStr,
7};
8
9use bytes::Bytes;
10use n0_future::time::{Duration, Instant};
11use rand::{seq::IteratorRandom, Rng, SeedableRng};
12use rand_chacha::ChaCha12Rng;
13use serde::{Deserialize, Serialize};
14use tracing::{debug, debug_span, info, info_span, trace, warn};
15
16use super::{Command, Config, Event, InEvent, OutEvent, PeerIdentity, State, TopicId};
17use crate::proto::{PeerData, Scope};
18
19const DEFAULT_LATENCY_STATIC: Duration = Duration::from_millis(50);
20const DEFAULT_LATENCY_MIN: Duration = Duration::from_millis(10);
21const DEFAULT_LATENCY_MAX: Duration = Duration::from_millis(100);
22
23#[derive(Debug, Clone, Default, Serialize, Deserialize)]
25pub struct NetworkConfig {
26 #[serde(default)]
28 pub latency: LatencyConfig,
29 #[serde(default)]
31 pub proto: Config,
32}
33
34impl From<Config> for NetworkConfig {
35 fn from(config: Config) -> Self {
36 Self {
37 latency: Default::default(),
38 proto: config,
39 }
40 }
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45#[serde(rename_all = "lowercase")]
46pub enum LatencyConfig {
47 Static(#[serde(with = "humantime_serde")] Duration),
49 Dynamic {
51 #[serde(with = "humantime_serde")]
53 min: Duration,
54 #[serde(with = "humantime_serde")]
56 max: Duration,
57 },
58}
59
60impl LatencyConfig {
61 pub fn default_static() -> Self {
63 Self::Static(DEFAULT_LATENCY_STATIC)
64 }
65
66 pub fn default_dynamic() -> Self {
68 Self::Dynamic {
69 min: DEFAULT_LATENCY_MIN,
70 max: DEFAULT_LATENCY_MAX,
71 }
72 }
73
74 pub fn random_ms(min: u64, max: u64) -> Self {
76 Self::Dynamic {
77 min: Duration::from_millis(min),
78 max: Duration::from_millis(max),
79 }
80 }
81
82 pub fn max(&self) -> Duration {
84 match self {
85 Self::Static(dur) => *dur,
86 Self::Dynamic { max, .. } => *max,
87 }
88 }
89
90 pub fn r#gen(&self, mut rng: impl Rng) -> Duration {
92 match self {
93 Self::Static(d) => *d,
94 Self::Dynamic { min, max } => rng.random_range(*min..*max),
96 }
97 }
98}
99
100impl Default for LatencyConfig {
101 fn default() -> Self {
102 Self::default_dynamic()
103 }
104}
105
106#[derive(Debug)]
110pub struct Network<PI, R> {
111 start: Instant,
112 time: Instant,
113 tick: usize,
114 peers: BTreeMap<PI, State<PI, R>>,
115 conns: BTreeSet<ConnId<PI>>,
116 events: VecDeque<(PI, TopicId, Event<PI>)>,
117 latencies: BTreeMap<ConnId<PI>, Duration>,
118 rng: R,
119 config: NetworkConfig,
120 queue: TimedEventQueue<PI>,
121}
122
123impl<PI, R> Network<PI, R> {
124 pub fn new(config: NetworkConfig, rng: R) -> Self {
126 let time = Instant::now();
127 Self {
128 tick: 0,
129 start: time,
130 time,
131 config,
132 queue: Default::default(),
133 peers: Default::default(),
134 conns: Default::default(),
135 events: Default::default(),
136 latencies: BTreeMap::new(),
137 rng,
138 }
139 }
140}
141
142impl<PI: PeerIdentity + fmt::Display, R: Rng + SeedableRng + Clone> Network<PI, R> {
143 pub fn insert(&mut self, peer_id: PI) {
147 let config = self.config.proto.clone();
148 self.insert_with_config(peer_id, config);
149 }
150
151 pub fn insert_with_config(&mut self, peer_id: PI, config: Config) {
155 assert!(
156 !self.peers.contains_key(&peer_id),
157 "duplicate peer: {peer_id:?}"
158 );
159 let rng = R::from_rng(&mut self.rng);
160 let state = State::new(peer_id, PeerData::default(), config, rng);
161 self.peers.insert(peer_id, state);
162 }
163
164 pub fn insert_and_join(&mut self, peer_id: PI, topic: TopicId, bootstrap: Vec<PI>) {
168 self.insert(peer_id);
169 self.command(peer_id, topic, Command::Join(bootstrap));
170 }
171}
172
173impl<PI: PeerIdentity + fmt::Display, R: Rng + Clone> Network<PI, R> {
174 pub fn events(&mut self) -> impl Iterator<Item = (PI, TopicId, Event<PI>)> + '_ {
176 self.events.drain(..)
177 }
178
179 pub fn events_sorted(&mut self) -> Vec<(PI, TopicId, Event<PI>)> {
181 sort(self.events().collect())
182 }
183
184 pub fn conns(&self) -> Vec<(PI, PI)> {
186 sort(self.conns.iter().cloned().map(Into::into).collect())
187 }
188
189 pub fn command(&mut self, peer: PI, topic: TopicId, command: Command<PI>) {
191 debug!(?peer, "~~ COMMAND {command:?}");
192 self.queue
193 .insert(self.time, peer, InEvent::Command(topic, command));
194 self.tick();
195 }
196
197 pub fn peer_states(&self) -> impl Iterator<Item = &State<PI, R>> {
199 self.peers.values()
200 }
201
202 pub fn peer_ids(&self) -> impl Iterator<Item = PI> + '_ {
204 self.peers.keys().cloned()
205 }
206
207 pub fn peer(&self, peer: &PI) -> Option<&State<PI, R>> {
209 self.peers.get(peer)
210 }
211
212 pub fn neighbors(&self, peer: &PI, topic: &TopicId) -> Option<Vec<PI>> {
214 let peer = self.peer(peer)?;
215 let state = peer.state(topic)?;
216 Some(state.swarm.active_view.iter().cloned().collect::<Vec<_>>())
217 }
218
219 pub fn remove(&mut self, peer: &PI) {
221 let remove_conns: Vec<_> = self
222 .conns
223 .iter()
224 .filter(|&c| c.peers().contains(peer))
225 .cloned()
226 .collect();
227 for conn in remove_conns.into_iter() {
228 self.kill_connection(*peer, conn.other(*peer).unwrap());
229 }
230 self.peers.remove(peer);
231 }
232
233 pub fn elapsed(&self) -> Duration {
235 self.time.duration_since(self.start)
236 }
237
238 pub fn elapsed_fmt(&self) -> String {
240 format!("{:>2.4}s", self.elapsed().as_secs_f32())
241 }
242
243 pub fn run_trips(&mut self, n: usize) {
245 let duration = self.config.latency.max() * n as u32;
246 self.run_duration(duration)
247 }
248
249 pub fn run_duration(&mut self, timeout: Duration) {
251 let end = self.time + timeout;
252 while self.queue.next_before(end) {
253 self.tick();
254 }
255 assert!(self.time <= end);
256 self.time = end;
257 }
258
259 pub fn run_while(&mut self, mut f: impl FnMut(PI, TopicId, Event<PI>) -> bool) {
263 loop {
264 while let Some((peer, topic, event)) = self.events.pop_front() {
265 if !f(peer, topic, event) {
266 return;
267 }
268 }
269 self.tick();
270 }
271 }
272
273 pub fn run_while_with_timeout(
277 &mut self,
278 timeout: Duration,
279 mut f: impl FnMut(PI, TopicId, Event<PI>) -> bool,
280 ) {
281 let end = self.time + timeout;
282 loop {
283 while let Some((peer, topic, event)) = self.events.pop_front() {
284 if !f(peer, topic, event) {
285 return;
286 }
287 }
288 if self.queue.next_before(end) {
289 self.tick();
290 } else {
291 break;
292 }
293 }
294 assert!(self.time <= end);
295 self.time = end;
296 }
297
298 fn tick(&mut self) {
299 self.tick += 1;
300 let Some((time, peer, event)) = self.queue.pop() else {
301 warn!("tick on empty queue");
302 return;
303 };
304 assert!(time >= self.time);
305 self.time = time;
306 let span = debug_span!("tick", %peer, tick = %self.tick, t = %self.elapsed_fmt());
307 let _guard = span.enter();
308 debug!("~~ TICK ");
309
310 let Some(state) = self.peers.get_mut(&peer) else {
311 warn!(?time, ?peer, ?event, "event for dead peer");
313 return;
314 };
315 if let InEvent::RecvMessage(from, _message) = &event {
316 self.conns.insert((*from, peer).into());
317 }
318 let out = state.handle(event, self.time, None);
319 let mut kill = vec![];
320 for event in out {
321 match event {
322 OutEvent::SendMessage(to, message) => {
323 let latency = latency_between(
324 &self.config.latency,
325 &mut self.latencies,
326 &peer,
327 &to,
328 &mut self.rng,
329 );
330 self.queue
331 .insert(self.time + latency, to, InEvent::RecvMessage(peer, message));
332 }
333 OutEvent::ScheduleTimer(time, timer) => {
334 self.queue
335 .insert(self.time + time, peer, InEvent::TimerExpired(timer));
336 }
337 OutEvent::DisconnectPeer(to) => {
338 debug!(peer = ?peer, other = ?to, "disconnect");
339 kill.push((peer, to));
340 }
341 OutEvent::EmitEvent(topic, event) => {
342 debug!(peer = ?peer, "emit {event:?}");
343 self.events.push_back((peer, topic, event));
344 }
345 OutEvent::PeerData(_peer, _data) => {}
346 }
347 }
348 for (from, to) in kill {
349 self.kill_connection(from, to);
350 }
351 }
352
353 fn kill_connection(&mut self, from: PI, to: PI) {
357 let conn = ConnId::from((from, to));
358 if self.conns.remove(&conn) {
359 let latency = latency_between(
362 &self.config.latency,
363 &mut self.latencies,
364 &from,
365 &to,
366 &mut self.rng,
367 ) + Duration::from_micros(1);
368 self.queue
369 .insert(self.time + latency, to, InEvent::PeerDisconnected(from));
370 }
371 }
372
373 pub fn check_synchronicity(&self) -> bool {
383 let mut ok = true;
384 for state in self.peers.values() {
385 let peer = *state.me();
386 for (topic, state) in state.states() {
387 for other in state.swarm.active_view.iter() {
388 let other_state = &self
389 .peers
390 .get(other)
391 .unwrap()
392 .state(topic)
393 .unwrap()
394 .swarm
395 .active_view;
396 if !other_state.contains(&peer) {
397 debug!(node = %peer, other = ?other, "missing active_view peer in other");
398 ok = false;
399 }
400 }
401 for other in state.gossip.eager_push_peers.iter() {
402 let other_state = &self
403 .peers
404 .get(other)
405 .unwrap()
406 .state(topic)
407 .unwrap()
408 .gossip
409 .eager_push_peers;
410 if !other_state.contains(&peer) {
411 debug!(node = %peer, other = ?other, "missing eager_push peer in other");
412 ok = false;
413 }
414 }
415 }
416 }
417 ok
418 }
419
420 pub fn report(&self) -> NetworkReport<PI> {
422 let mut histograms = NetworkHistograms::default();
423 let mut peers_without_neighbors = Vec::new();
424 for (id, peer) in self.peers.iter() {
425 let state = peer.state(&TOPIC).unwrap();
426 add_one(&mut histograms.active, state.swarm.active_view.len());
427 add_one(&mut histograms.passive, state.swarm.passive_view.len());
428 add_one(&mut histograms.eager, state.gossip.eager_push_peers.len());
429 add_one(&mut histograms.lazy, state.gossip.lazy_push_peers.len());
430 if state.swarm.active_view.is_empty() {
431 peers_without_neighbors.push(*id);
432 trace!(node=%id, active = ?state.swarm.active_view.iter().collect::<Vec<_>>(), passive=?state.swarm.passive_view.iter().collect::<Vec<_>>(), "active view empty^");
433 }
434 }
435 NetworkReport {
436 histograms,
437 peer_count: self.peers.len(),
438 peers_without_neighbors,
439 }
440 }
441}
442
443fn latency_between<PI: PeerIdentity + Ord + PartialOrd, R: Rng>(
444 latency_config: &LatencyConfig,
445 latencies: &mut BTreeMap<ConnId<PI>, Duration>,
446 a: &PI,
447 b: &PI,
448 rng: &mut R,
449) -> Duration {
450 let id: ConnId<PI> = (*a, *b).into();
451 *latencies
452 .entry(id)
453 .or_insert_with(|| latency_config.r#gen(rng))
454}
455
456#[derive(Debug)]
457struct TimedEventQueue<PI> {
458 seq: i32,
459 events: BinaryHeap<(TimedEvent<PeerEvent<PI>>, i32)>,
460}
461
462impl<PI> Default for TimedEventQueue<PI> {
463 fn default() -> Self {
464 Self {
465 seq: 0,
466 events: Default::default(),
467 }
468 }
469}
470
471impl<PI> TimedEventQueue<PI> {
472 fn insert(&mut self, time: Instant, peer: PI, event: InEvent<PI>) {
473 let seq = self.seq;
474 self.seq += 1;
475 self.events.push((
476 TimedEvent {
477 time,
478 event: PeerEvent(peer, event),
479 },
480 -seq,
481 ))
482 }
483
484 fn pop(&mut self) -> Option<(Instant, PI, InEvent<PI>)> {
485 self.events
486 .pop()
487 .map(|(e, _)| (e.time, e.event.0, e.event.1))
488 }
489
490 fn peek_next(&self) -> Option<Instant> {
491 self.events.peek().map(|(e, _)| e.time)
492 }
493
494 fn next_before(&self, before: Instant) -> bool {
495 match self.peek_next() {
496 None => false,
497 Some(at) => at <= before,
498 }
499 }
500}
501
502#[derive(Debug)]
503struct TimedEvent<E> {
504 time: Instant,
505 event: E,
506}
507
508#[derive(Debug)]
509struct PeerEvent<PI>(PI, InEvent<PI>);
510
511impl<E> Eq for TimedEvent<E> {}
512
513impl<E> PartialEq for TimedEvent<E> {
514 fn eq(&self, other: &Self) -> bool {
515 self.time.eq(&other.time)
516 }
517}
518
519impl<E> PartialOrd for TimedEvent<E> {
520 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
521 Some(self.cmp(other))
522 }
523}
524
525impl<E> Ord for TimedEvent<E> {
526 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
527 self.time.cmp(&other.time).reverse()
528 }
529}
530
531type PeerId = u64;
533
534#[derive(Debug, Serialize, Deserialize)]
536pub struct SimulatorConfig {
537 pub rng_seed: u64,
539 pub peers: usize,
541 pub gossip_round_timeout: Duration,
543}
544
545#[derive(Debug, Serialize, Deserialize, Clone, Default)]
547#[serde(rename_all = "lowercase")]
548pub enum BootstrapMode {
549 #[default]
551 Single,
552 Set {
555 count: u64,
557 },
558}
559
560impl SimulatorConfig {
561 pub fn from_env() -> Self {
567 let peer = read_var("PEERS", 100);
568 Self {
569 rng_seed: read_var("SEED", 0),
570 peers: peer,
571 gossip_round_timeout: Duration::from_secs(read_var("GOSSIP_ROUND_TIMEOUT", 5)),
572 }
573 }
574}
575
576impl Default for SimulatorConfig {
577 fn default() -> Self {
578 Self {
579 rng_seed: 0,
580 peers: 100,
581 gossip_round_timeout: Duration::from_secs(5),
582 }
583 }
584}
585
586#[derive(Debug, Default, Clone, Serialize, Deserialize)]
588pub struct RoundStats {
589 pub duration: Duration,
591 pub rmr: f32,
593 pub ldh: f32,
595 pub missed: f32,
597}
598
599#[derive(Debug, Default, Clone, Serialize, Deserialize)]
601pub struct RoundStatsDiff {
602 pub duration: f32,
604 pub rmr: f32,
606 pub ldh: f32,
608 pub missed: f32,
610}
611
612impl fmt::Display for RoundStats {
613 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
614 write!(
615 f,
616 "RMR {:>6.2} LDH {:>6.2} duration {:>6.2}ms missed {:>10.2}",
617 self.rmr,
618 self.ldh,
619 self.duration.as_millis(),
620 self.missed
621 )
622 }
623}
624
625impl RoundStats {
626 fn new_max() -> Self {
627 Self {
628 duration: Duration::MAX,
629 rmr: f32::MAX,
630 ldh: f32::MAX,
631 missed: f32::MAX,
632 }
633 }
634
635 pub fn mean<'a>(rounds: impl IntoIterator<Item = &'a RoundStats>) -> RoundStats {
637 let (len, mut avg) =
638 rounds
639 .into_iter()
640 .fold((0., RoundStats::default()), |(len, mut agg), round| {
641 agg.rmr += round.rmr;
642 agg.ldh += round.ldh;
643 agg.duration += round.duration;
644 agg.missed += round.missed;
645 (len + 1., agg)
646 });
647 avg.rmr /= len;
648 avg.ldh /= len;
649 avg.missed /= len;
650 avg.duration /= len as u32;
651 avg
652 }
653
654 pub fn min<'a>(rounds: impl IntoIterator<Item = &'a RoundStats>) -> RoundStats {
656 rounds
657 .into_iter()
658 .fold(RoundStats::new_max(), |mut agg, round| {
659 agg.rmr = agg.rmr.min(round.rmr);
660 agg.ldh = agg.ldh.min(round.ldh);
661 agg.duration = agg.duration.min(round.duration);
662 agg.missed = agg.missed.min(round.missed);
663 agg
664 })
665 }
666
667 pub fn max<'a>(rounds: impl IntoIterator<Item = &'a RoundStats>) -> RoundStats {
669 rounds
670 .into_iter()
671 .fold(RoundStats::default(), |mut agg, round| {
672 agg.rmr = agg.rmr.max(round.rmr);
673 agg.ldh = agg.ldh.max(round.ldh);
674 agg.duration = agg.duration.max(round.duration);
675 agg.missed = agg.missed.max(round.missed);
676 agg
677 })
678 }
679
680 pub fn avg(rounds: &[RoundStats]) -> RoundStatsAvg {
682 let len = rounds.len();
683 let min = Self::min(rounds);
684 let max = Self::max(rounds);
685 let mean = Self::mean(rounds);
686 RoundStatsAvg {
687 len,
688 min,
689 max,
690 mean,
691 }
692 }
693
694 pub fn diff(&self, other: &Self) -> RoundStatsDiff {
696 RoundStatsDiff {
697 duration: diff_percent(self.duration.as_secs_f32(), other.duration.as_secs_f32()),
698 rmr: diff_percent(self.rmr, other.rmr),
699 ldh: diff_percent(self.ldh, other.ldh),
700 missed: diff_percent(self.missed, other.missed),
701 }
702 }
703}
704
705fn diff_percent(a: f32, b: f32) -> f32 {
706 if a == 0.0 && b == 0.0 {
707 0.0
708 } else if b == 0.0 {
709 -1.0
710 } else if a == 0.0 {
711 1.0
712 } else {
713 (b - a) / a
714 }
715}
716
717#[derive(Debug, Default, Clone, Serialize, Deserialize)]
719pub struct RoundStatsAvg {
720 pub len: usize,
722 pub min: RoundStats,
724 pub max: RoundStats,
726 pub mean: RoundStats,
728}
729
730#[derive(Debug, Default, Clone, Serialize, Deserialize)]
732pub struct RoundStatsAvgDiff {
733 pub min: RoundStatsDiff,
735 pub max: RoundStatsDiff,
737 pub mean: RoundStatsDiff,
739}
740
741impl RoundStatsAvg {
742 pub fn diff(&self, other: &Self) -> RoundStatsAvgDiff {
744 RoundStatsAvgDiff {
745 min: self.min.diff(&other.min),
746 max: self.max.diff(&other.max),
747 mean: self.mean.diff(&other.mean),
748 }
749 }
750
751 pub fn avg(rows: &[Self]) -> Self {
753 let len = rows.iter().map(|row| row.len).sum();
754 let min = RoundStats::min(rows.iter().map(|x| &x.min));
755 let max = RoundStats::min(rows.iter().map(|x| &x.max));
756 let mean = RoundStats::min(rows.iter().map(|x| &x.mean));
757 Self {
758 min,
759 max,
760 mean,
761 len,
762 }
763 }
764}
765
766#[derive(Debug, Default)]
771pub struct NetworkHistograms {
772 pub active: BTreeMap<usize, usize>,
774 pub passive: BTreeMap<usize, usize>,
776 pub eager: BTreeMap<usize, usize>,
778 pub lazy: BTreeMap<usize, usize>,
780}
781
782fn avg(map: &BTreeMap<usize, usize>) -> f32 {
783 let (sum, count) = map
784 .iter()
785 .fold((0, 0), |(sum, count), (k, v)| (sum + k * v, count + v));
786 if count != 0 {
787 sum as f32 / count as f32
788 } else {
789 0.
790 }
791}
792
793fn min(map: &BTreeMap<usize, usize>) -> usize {
794 map.first_key_value().map(|(k, _v)| *k).unwrap_or_default()
795}
796
797fn max(map: &BTreeMap<usize, usize>) -> usize {
798 map.last_key_value().map(|(k, _v)| *k).unwrap_or_default()
799}
800
801impl fmt::Display for NetworkHistograms {
802 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
803 write!(
804 f,
805 " eager {:?}\n lazy {:?}\n active {:?}\n passive {:?}",
806 self.eager, self.lazy, self.active, self.passive
807 )
808 }
809}
810
811#[derive(Debug)]
813pub struct NetworkReport<PI> {
814 pub peer_count: usize,
816 pub peers_without_neighbors: Vec<PI>,
818 pub histograms: NetworkHistograms,
820}
821
822impl<PI> NetworkReport<PI> {
823 pub fn has_peers_with_no_neighbors(&self) -> bool {
825 *self.histograms.active.get(&0).unwrap_or(&0) > 0
826 }
827}
828
829impl<PI> fmt::Display for NetworkReport<PI> {
830 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
831 writeln!(f, "peers: {}\n{}", self.peer_count, self.histograms)?;
832 if self.peers_without_neighbors.is_empty() {
833 writeln!(f, "(all have neighbors)")
834 } else {
835 writeln!(
836 f,
837 "({} peers have no neighbors)",
838 self.peers_without_neighbors.len()
839 )
840 }
841 }
842}
843
844const TOPIC: TopicId = TopicId::from_bytes([0u8; 32]);
845
846#[derive(Debug)]
848pub struct Simulator {
849 pub config: SimulatorConfig,
851 pub network: Network<PeerId, rand_chacha::ChaCha12Rng>,
853 round_stats: Vec<RoundStats>,
855}
856
857impl Simulator {
858 pub fn new(
860 simulator_config: SimulatorConfig,
861 network_config: impl Into<NetworkConfig>,
862 ) -> Self {
863 let network_config = network_config.into();
864 info!("start {simulator_config:?} {network_config:?}");
865 let rng = rand_chacha::ChaCha12Rng::seed_from_u64(simulator_config.rng_seed);
866 Self {
867 network: Network::new(network_config, rng),
868 config: simulator_config,
869 round_stats: Default::default(),
870 }
871 }
872
873 pub fn rng(&mut self) -> ChaCha12Rng {
875 ChaCha12Rng::from_rng(&mut self.network.rng)
876 }
877
878 pub fn random_peer(&mut self) -> PeerId {
880 *self
881 .network
882 .peers
883 .keys()
884 .choose(&mut self.network.rng)
885 .unwrap()
886 }
887
888 pub fn peer_count(&self) -> usize {
890 self.network.peers.len()
891 }
892
893 pub fn remove_peers(&mut self, n: usize) {
895 for _i in 0..n {
896 let key = self.random_peer();
897 self.network.remove(&key);
898 }
899 }
900
901 pub fn report(&mut self) -> NetworkReport<PeerId> {
903 let report = self.network.report();
904 let min_active_len = min(&report.histograms.active);
905 let max_active_len = max(&report.histograms.active);
906 let avg = avg(&report.histograms.active);
907 let len = report.peer_count;
908 debug!(
909 "nodes {len} active: avg {avg:2.2} min {min_active_len} max {max_active_len} empty {}",
910 report.peers_without_neighbors.len()
911 );
912 report
913 }
914
915 pub fn bootstrap(&mut self, bootstrap_mode: BootstrapMode) -> NetworkReport<PeerId> {
921 let span = info_span!("bootstrap");
922 let _guard = span.enter();
923 info!("bootstrap {bootstrap_mode:?}");
924
925 let node_count = self.config.peers as u64;
926
927 match bootstrap_mode {
928 BootstrapMode::Single => {
929 self.network.insert_and_join(0, TOPIC, vec![]);
930 for i in 1..node_count {
931 self.network.insert_and_join(i, TOPIC, vec![0]);
932 }
933 self.network.run_trips(20);
934 }
935 BootstrapMode::Set { count } => {
936 self.network.insert_and_join(0, TOPIC, vec![]);
937 for i in 1..count {
938 self.network.insert_and_join(i, TOPIC, vec![0]);
939 }
940
941 self.network.run_trips(7);
942
943 for i in count..node_count {
944 let contact = self.network.rng.random_range(0..count);
945 self.network.insert_and_join(i, TOPIC, vec![contact]);
946 }
947
948 self.network.run_trips(20);
949 }
950 }
951
952 let report = self.report();
953 if report.has_peers_with_no_neighbors() {
954 warn!("failed to keep all nodes active after warmup: {report:?}");
955 } else {
956 info!("bootstrap complete, all nodes active");
957 }
958 report
959 }
960
961 pub fn gossip_round(&mut self, messages: Vec<(PeerId, Bytes)>) -> usize {
969 let span = debug_span!("g", r = self.round_stats.len());
970 let _guard = span.enter();
971 self.reset_stats();
972 let start = self.network.time;
973 let expected_count: usize = messages.len() * (self.network.peers.len() - 1);
974 info!(
975 time=%self.network.elapsed_fmt(),
976 "round {i}: send {len} messages / recv {expected_count} total",
977 len = messages.len(),
978 i = self.round_stats.len()
979 );
980
981 let mut missing: BTreeMap<PeerId, BTreeSet<Bytes>> = BTreeMap::new();
983 for (from, message) in messages {
984 for peer in self.network.peer_ids().filter(|p| *p != from) {
985 missing.entry(peer).or_default().insert(message.clone());
986 }
987 self.network
988 .command(from, TOPIC, Command::Broadcast(message, Scope::Swarm));
989 }
990
991 let timeout = self.config.gossip_round_timeout;
992 let mut received_count = 0;
993 self.network
994 .run_while_with_timeout(timeout, |peer, _topic, event| {
995 if let Event::Received(message) = event {
996 let set = missing.get_mut(&peer).unwrap();
997 if !set.remove(&message.content) {
998 panic!("received duplicate message event");
999 } else if set.is_empty() {
1000 missing.remove(&peer);
1001 }
1002 received_count += 1;
1003 }
1004 received_count != expected_count
1005 });
1006
1007 let missing_count: usize = expected_count - received_count;
1008 if missing_count == 0 {
1009 info!("break: all messages received by all peers");
1010 } else {
1011 warn!("break: max ticks for round exceeded (still missing {missing_count})");
1012 debug!("missing: {missing:?}");
1013 }
1014 let elapsed = self.network.time.duration_since(start);
1015 self.report_gossip_round(expected_count, missing_count, elapsed);
1016 missing_count
1017 }
1018
1019 fn report_gossip_round(
1020 &mut self,
1021 expected_recv_count: usize,
1022 missed: usize,
1023 duration: Duration,
1024 ) {
1025 let payloud_msg_count = self.total_payload_messages();
1026 let ctrl_msg_count = self.total_control_messages();
1027 let rmr_expected_count = expected_recv_count - missed;
1028 let rmr = (payloud_msg_count as f32 / (rmr_expected_count as f32 - 1.)) - 1.;
1029 let ldh = self.max_ldh();
1030
1031 let round_stats = RoundStats {
1032 duration,
1033 rmr,
1034 ldh: ldh as f32,
1035 missed: missed as f32,
1036 };
1037 let histograms = self.network.report().histograms;
1038 info!(
1039 "round {}: pay {} ctrl {} {round_stats} \n{histograms}",
1040 self.round_stats.len(),
1041 payloud_msg_count,
1042 ctrl_msg_count,
1043 );
1044 self.round_stats.push(round_stats);
1045 }
1046
1047 pub fn round_stats_average(&self) -> RoundStatsAvg {
1049 RoundStats::avg(&self.round_stats)
1050 }
1051
1052 fn reset_stats(&mut self) {
1053 for state in self.network.peers.values_mut() {
1054 state.reset_stats(&TOPIC);
1055 }
1056 }
1057
1058 fn max_ldh(&self) -> u16 {
1059 let mut max = 0;
1060 for state in self.network.peers.values() {
1061 let state = state.state(&TOPIC).unwrap();
1062 let stats = state.gossip.stats();
1063 max = max.max(stats.max_last_delivery_hop);
1064 }
1065 max
1066 }
1067
1068 fn total_payload_messages(&self) -> u64 {
1069 let mut sum = 0;
1070 for state in self.network.peers.values() {
1071 let state = state.state(&TOPIC).unwrap();
1072 let stats = state.gossip.stats();
1073 sum += stats.payload_messages_received;
1074 }
1075 sum
1076 }
1077
1078 fn total_control_messages(&self) -> u64 {
1079 let mut sum = 0;
1080 for state in self.network.peers.values() {
1081 let state = state.state(&TOPIC).unwrap();
1082 let stats = state.gossip.stats();
1083 sum += stats.control_messages_received;
1084 }
1085 sum
1086 }
1087}
1088
1089fn add_one(map: &mut BTreeMap<usize, usize>, key: usize) {
1090 let entry = map.entry(key).or_default();
1091 *entry += 1;
1092}
1093
1094#[derive(Debug, Clone, PartialOrd, Ord, Eq, PartialEq, Hash)]
1096struct ConnId<PI>([PI; 2]);
1097impl<PI: Ord + Copy> ConnId<PI> {
1098 fn new(a: PI, b: PI) -> Self {
1099 let mut conn = [a, b];
1100 conn.sort();
1101 Self(conn)
1102 }
1103 fn peers(&self) -> [PI; 2] {
1104 self.0
1105 }
1106
1107 fn other(&self, other: PI) -> Option<PI> {
1108 if self.0[0] == other {
1109 Some(self.0[1])
1110 } else if self.0[1] == other {
1111 Some(self.0[0])
1112 } else {
1113 None
1114 }
1115 }
1116}
1117impl<PI: Ord + Copy> From<(PI, PI)> for ConnId<PI> {
1118 fn from((a, b): (PI, PI)) -> Self {
1119 Self::new(a, b)
1120 }
1121}
1122impl<PI: Copy> From<ConnId<PI>> for (PI, PI) {
1123 fn from(conn: ConnId<PI>) -> (PI, PI) {
1124 (conn.0[0], conn.0[1])
1125 }
1126}
1127
1128fn sort<T: Ord + Clone>(items: Vec<T>) -> Vec<T> {
1129 let mut sorted = items;
1130 sorted.sort();
1131 sorted
1132}
1133
1134fn read_var<T: FromStr<Err: fmt::Display + fmt::Debug>>(name: &str, default: T) -> T {
1135 std::env::var(name)
1136 .map(|x| {
1137 x.parse()
1138 .unwrap_or_else(|_| panic!("Failed to parse environment variable {name}"))
1139 })
1140 .unwrap_or(default)
1141}