1use std::collections::{HashMap, HashSet};
10
11use derive_more::{From, Sub};
12use n0_future::time::Duration;
13use rand::{rngs::ThreadRng, Rng};
14use serde::{Deserialize, Serialize};
15use tracing::debug;
16
17use super::{util::IndexSet, PeerData, PeerIdentity, PeerInfo, IO};
18
19#[derive(Debug)]
21pub enum InEvent<PI> {
22 RecvMessage(PI, Message<PI>),
24 TimerExpired(Timer<PI>),
26 PeerDisconnected(PI),
28 RequestJoin(PI),
30 UpdatePeerData(PeerData),
32 Quit,
34}
35
36#[derive(Debug)]
38pub enum OutEvent<PI> {
39 SendMessage(PI, Message<PI>),
41 ScheduleTimer(Duration, Timer<PI>),
43 DisconnectPeer(PI),
45 EmitEvent(Event<PI>),
47 PeerData(PI, PeerData),
49}
50
51#[derive(Clone, Debug)]
53pub enum Event<PI> {
54 NeighborUp(PI),
56 NeighborDown(PI),
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
62pub enum Timer<PI> {
63 DoShuffle,
64 PendingNeighborRequest(PI),
65}
66
67#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
69pub enum Message<PI> {
70 Join(Option<PeerData>),
72 ForwardJoin(ForwardJoin<PI>),
75 Shuffle(Shuffle<PI>),
78 ShuffleReply(ShuffleReply<PI>),
81 Neighbor(Neighbor),
84 Disconnect(Disconnect),
88}
89
90#[derive(From, Sub, Eq, PartialEq, Clone, Debug, Copy, Serialize, Deserialize)]
95pub struct Ttl(pub u16);
96impl Ttl {
97 pub fn expired(&self) -> bool {
98 *self == Ttl(0)
99 }
100 pub fn next(&self) -> Ttl {
101 Ttl(self.0.saturating_sub(1))
102 }
103}
104
105#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
109pub struct ForwardJoin<PI> {
110 peer: PeerInfo<PI>,
112 ttl: Ttl,
114}
115
116#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
119pub struct Shuffle<PI> {
120 origin: PI,
122 nodes: Vec<PeerInfo<PI>>,
124 ttl: Ttl,
126}
127
128#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
133pub struct ShuffleReply<PI> {
134 nodes: Vec<PeerInfo<PI>>,
136}
137
138#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
142pub enum Priority {
143 High,
147 Low,
149}
150
151#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
154pub struct Neighbor {
155 priority: Priority,
157 data: Option<PeerData>,
159}
160
161#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
163pub struct Disconnect {
164 alive: bool,
167 _respond: bool,
169}
170
171#[derive(Clone, Debug, Serialize, Deserialize)]
173#[serde(default)]
174pub struct Config {
175 pub active_view_capacity: usize,
177 pub passive_view_capacity: usize,
180 pub active_random_walk_length: Ttl,
183 pub passive_random_walk_length: Ttl,
186 pub shuffle_random_walk_length: Ttl,
188 pub shuffle_active_view_count: usize,
190 pub shuffle_passive_view_count: usize,
192 pub shuffle_interval: Duration,
194 pub neighbor_request_timeout: Duration,
196}
197impl Default for Config {
198 fn default() -> Self {
200 Self {
201 active_view_capacity: 5,
203 passive_view_capacity: 30,
205 active_random_walk_length: Ttl(6),
207 passive_random_walk_length: Ttl(3),
209 shuffle_random_walk_length: Ttl(6),
211 shuffle_active_view_count: 3,
213 shuffle_passive_view_count: 4,
215 shuffle_interval: Duration::from_secs(60),
217 neighbor_request_timeout: Duration::from_millis(500),
219 }
220 }
221}
222
223#[derive(Default, Debug, Clone)]
224pub struct Stats {
225 total_connections: usize,
226}
227
228#[derive(Debug)]
230pub struct State<PI, RG = ThreadRng> {
231 me: PI,
233 me_data: Option<PeerData>,
235 pub(crate) active_view: IndexSet<PI>,
237 pub(crate) passive_view: IndexSet<PI>,
239 config: Config,
241 shuffle_scheduled: bool,
243 rng: RG,
245 pub(crate) stats: Stats,
247 pending_neighbor_requests: HashSet<PI>,
249 peer_data: HashMap<PI, PeerData>,
251 alive_disconnect_peers: HashSet<PI>,
253}
254
255impl<PI, RG> State<PI, RG>
256where
257 PI: PeerIdentity,
258 RG: Rng,
259{
260 pub fn new(me: PI, me_data: Option<PeerData>, config: Config, rng: RG) -> Self {
261 Self {
262 me,
263 me_data,
264 active_view: IndexSet::new(),
265 passive_view: IndexSet::new(),
266 config,
267 shuffle_scheduled: false,
268 rng,
269 stats: Stats::default(),
270 pending_neighbor_requests: Default::default(),
271 peer_data: Default::default(),
272 alive_disconnect_peers: Default::default(),
273 }
274 }
275
276 pub fn handle(&mut self, event: InEvent<PI>, io: &mut impl IO<PI>) {
277 match event {
278 InEvent::RecvMessage(from, message) => self.handle_message(from, message, io),
279 InEvent::TimerExpired(timer) => match timer {
280 Timer::DoShuffle => self.handle_shuffle_timer(io),
281 Timer::PendingNeighborRequest(peer) => self.handle_pending_neighbor_timer(peer, io),
282 },
283 InEvent::PeerDisconnected(peer) => self.handle_connection_closed(peer, io),
284 InEvent::RequestJoin(peer) => self.handle_join(peer, io),
285 InEvent::UpdatePeerData(data) => {
286 self.me_data = Some(data);
287 }
288 InEvent::Quit => self.handle_quit(io),
289 }
290
291 if !self.shuffle_scheduled {
293 io.push(OutEvent::ScheduleTimer(
294 self.config.shuffle_interval,
295 Timer::DoShuffle,
296 ));
297 self.shuffle_scheduled = true;
298 }
299 }
300
301 fn handle_message(&mut self, from: PI, message: Message<PI>, io: &mut impl IO<PI>) {
302 let is_disconnect = matches!(message, Message::Disconnect(Disconnect { .. }));
303 if !is_disconnect && !self.active_view.contains(&from) {
304 self.stats.total_connections += 1;
305 }
306 match message {
307 Message::Join(data) => self.on_join(from, data, io),
308 Message::ForwardJoin(details) => self.on_forward_join(from, details, io),
309 Message::Shuffle(details) => self.on_shuffle(from, details, io),
310 Message::ShuffleReply(details) => self.on_shuffle_reply(details, io),
311 Message::Neighbor(details) => self.on_neighbor(from, details, io),
312 Message::Disconnect(details) => self.on_disconnect(from, details, io),
313 }
314
315 if !is_disconnect && !self.active_view.contains(&from) {
318 io.push(OutEvent::DisconnectPeer(from));
319 }
320 }
321
322 fn handle_join(&mut self, peer: PI, io: &mut impl IO<PI>) {
323 io.push(OutEvent::SendMessage(
324 peer,
325 Message::Join(self.me_data.clone()),
326 ));
327 }
328
329 fn on_disconnect(&mut self, peer: PI, details: Disconnect, io: &mut impl IO<PI>) {
331 self.pending_neighbor_requests.remove(&peer);
332 if self.active_view.contains(&peer) {
333 self.remove_active(
334 &peer,
335 RemovalReason::DisconnectReceived {
336 is_alive: details.alive,
337 },
338 io,
339 );
340 } else if details.alive && self.passive_view.contains(&peer) {
341 self.alive_disconnect_peers.insert(peer);
342 }
343 }
344
345 fn handle_connection_closed(&mut self, peer: PI, io: &mut impl IO<PI>) {
347 self.pending_neighbor_requests.remove(&peer);
348 if self.active_view.contains(&peer) {
349 self.remove_active(&peer, RemovalReason::ConnectionClosed, io);
350 } else if !self.alive_disconnect_peers.remove(&peer) {
351 self.passive_view.remove(&peer);
352 self.peer_data.remove(&peer);
353 }
354 }
355
356 fn handle_quit(&mut self, io: &mut impl IO<PI>) {
357 for peer in self.active_view.clone().into_iter() {
358 self.active_view.remove(&peer);
359 self.send_disconnect(peer, false, io);
360 }
361 }
362
363 fn send_disconnect(&mut self, peer: PI, alive: bool, io: &mut impl IO<PI>) {
364 self.send_shuffle_reply(
368 peer,
369 self.config.shuffle_active_view_count + self.config.shuffle_passive_view_count,
370 io,
371 );
372 let message = Message::Disconnect(Disconnect {
373 alive,
374 _respond: false,
375 });
376 io.push(OutEvent::SendMessage(peer, message));
377 io.push(OutEvent::DisconnectPeer(peer));
378 }
379
380 fn on_join(&mut self, peer: PI, data: Option<PeerData>, io: &mut impl IO<PI>) {
381 self.add_active(peer, data.clone(), Priority::High, true, io);
384
385 let ttl = self.config.active_random_walk_length;
393 let peer_info = PeerInfo { id: peer, data };
394 for node in self.active_view.iter_without(&peer) {
395 let message = Message::ForwardJoin(ForwardJoin {
396 peer: peer_info.clone(),
397 ttl,
398 });
399 io.push(OutEvent::SendMessage(*node, message));
400 }
401 }
402
403 fn on_forward_join(&mut self, sender: PI, message: ForwardJoin<PI>, io: &mut impl IO<PI>) {
404 let peer_id = message.peer.id;
405 if self.active_view.contains(&peer_id) {
407 self.insert_peer_info(message.peer, io);
408 self.send_neighbor(peer_id, Priority::High, io);
409 }
410 else if message.ttl.expired() || self.active_view.len() <= 1 {
413 self.insert_peer_info(message.peer, io);
414 self.send_neighbor(peer_id, Priority::High, io);
419 } else {
420 if message.ttl == self.config.passive_random_walk_length {
422 self.add_passive(peer_id, message.peer.data.clone(), io);
423 }
424 if !self.active_view.contains(&peer_id)
429 && !self.pending_neighbor_requests.contains(&peer_id)
430 {
431 match self
432 .active_view
433 .pick_random_without(&[&sender], &mut self.rng)
434 {
435 None => {
436 unreachable!("if the peer was not added, there are at least two peers in our active view.");
437 }
438 Some(next) => {
439 let message = Message::ForwardJoin(ForwardJoin {
440 peer: message.peer,
441 ttl: message.ttl.next(),
442 });
443 io.push(OutEvent::SendMessage(*next, message));
444 }
445 }
446 }
447 }
448 }
449
450 fn on_neighbor(&mut self, from: PI, details: Neighbor, io: &mut impl IO<PI>) {
451 let is_reply = self.pending_neighbor_requests.remove(&from);
452 let do_reply = !is_reply;
453 if !self.add_active(from, details.data, details.priority, do_reply, io) {
458 self.send_disconnect(from, true, io);
459 }
460 }
461
462 fn peer_info(&self, id: &PI) -> PeerInfo<PI> {
464 let data = self.peer_data.get(id).cloned();
465 PeerInfo { id: *id, data }
466 }
467
468 fn insert_peer_info(&mut self, peer_info: PeerInfo<PI>, io: &mut impl IO<PI>) {
469 if let Some(data) = peer_info.data {
470 let old = self.peer_data.remove(&peer_info.id);
471 let same = matches!(old, Some(old) if old == data);
472 if !same && !data.0.is_empty() {
473 io.push(OutEvent::PeerData(peer_info.id, data.clone()));
474 }
475 self.peer_data.insert(peer_info.id, data);
476 }
477 }
478
479 fn on_shuffle(&mut self, from: PI, shuffle: Shuffle<PI>, io: &mut impl IO<PI>) {
487 if shuffle.ttl.expired() || self.active_view.len() <= 1 {
488 let len = shuffle.nodes.len();
489 for node in shuffle.nodes {
490 self.add_passive(node.id, node.data, io);
491 }
492 self.send_shuffle_reply(shuffle.origin, len, io);
493 } else if let Some(node) = self
494 .active_view
495 .pick_random_without(&[&shuffle.origin, &from], &mut self.rng)
496 {
497 let message = Message::Shuffle(Shuffle {
498 origin: shuffle.origin,
499 nodes: shuffle.nodes,
500 ttl: shuffle.ttl.next(),
501 });
502 io.push(OutEvent::SendMessage(*node, message));
503 }
504 }
505
506 fn send_shuffle_reply(&mut self, to: PI, len: usize, io: &mut impl IO<PI>) {
507 let mut nodes = self.passive_view.shuffled_and_capped(len, &mut self.rng);
508 if nodes.len() < len {
511 nodes.extend(
512 self.active_view
513 .shuffled_and_capped(len - nodes.len(), &mut self.rng),
514 );
515 }
516 let nodes = nodes.into_iter().map(|id| self.peer_info(&id));
517 let message = Message::ShuffleReply(ShuffleReply {
518 nodes: nodes.collect(),
519 });
520 io.push(OutEvent::SendMessage(to, message));
521 }
522
523 fn on_shuffle_reply(&mut self, message: ShuffleReply<PI>, io: &mut impl IO<PI>) {
524 for node in message.nodes {
525 self.add_passive(node.id, node.data, io);
526 }
527 self.refill_active_from_passive(&[], io);
528 }
529
530 fn handle_shuffle_timer(&mut self, io: &mut impl IO<PI>) {
531 if let Some(node) = self.active_view.pick_random(&mut self.rng) {
532 let active = self.active_view.shuffled_without_and_capped(
533 &[node],
534 self.config.shuffle_active_view_count,
535 &mut self.rng,
536 );
537 let passive = self.passive_view.shuffled_without_and_capped(
538 &[node],
539 self.config.shuffle_passive_view_count,
540 &mut self.rng,
541 );
542 let nodes = active
543 .iter()
544 .chain(passive.iter())
545 .map(|id| self.peer_info(id));
546 let me = PeerInfo {
547 id: self.me,
548 data: self.me_data.clone(),
549 };
550 let nodes = nodes.chain([me]);
551 let message = Shuffle {
552 origin: self.me,
553 nodes: nodes.collect(),
554 ttl: self.config.shuffle_random_walk_length,
555 };
556 io.push(OutEvent::SendMessage(*node, Message::Shuffle(message)));
557 }
558 io.push(OutEvent::ScheduleTimer(
559 self.config.shuffle_interval,
560 Timer::DoShuffle,
561 ));
562 }
563
564 fn passive_is_full(&self) -> bool {
565 self.passive_view.len() >= self.config.passive_view_capacity
566 }
567
568 fn active_is_full(&self) -> bool {
569 self.active_view.len() >= self.config.active_view_capacity
570 }
571
572 fn add_passive(&mut self, peer: PI, data: Option<PeerData>, io: &mut impl IO<PI>) {
577 self.insert_peer_info((peer, data).into(), io);
578 if self.active_view.contains(&peer) || self.passive_view.contains(&peer) || peer == self.me
579 {
580 return;
581 }
582 if self.passive_is_full() {
583 self.passive_view.remove_random(&mut self.rng);
584 }
585 self.passive_view.insert(peer);
586 }
587
588 fn remove_active(&mut self, peer: &PI, reason: RemovalReason, io: &mut impl IO<PI>) {
592 if let Some(idx) = self.active_view.get_index_of(peer) {
593 let removed_peer = self.remove_active_by_index(idx, reason, io).unwrap();
594 self.refill_active_from_passive(&[&removed_peer], io);
595 }
596 }
597
598 fn refill_active_from_passive(&mut self, skip_peers: &[&PI], io: &mut impl IO<PI>) {
599 if self.active_view.len() + self.pending_neighbor_requests.len()
600 >= self.config.active_view_capacity
601 {
602 return;
603 }
604 let mut skip_peers = skip_peers.to_vec();
611 skip_peers.extend(self.pending_neighbor_requests.iter());
612
613 if let Some(node) = self
614 .passive_view
615 .pick_random_without(&skip_peers, &mut self.rng)
616 .copied()
617 {
618 let priority = match self.active_view.is_empty() {
619 true => Priority::High,
620 false => Priority::Low,
621 };
622 self.send_neighbor(node, priority, io);
623 io.push(OutEvent::ScheduleTimer(
626 self.config.neighbor_request_timeout,
627 Timer::PendingNeighborRequest(node),
628 ));
629 };
630 }
631
632 fn handle_pending_neighbor_timer(&mut self, peer: PI, io: &mut impl IO<PI>) {
633 if self.pending_neighbor_requests.remove(&peer) {
634 self.passive_view.remove(&peer);
635 self.refill_active_from_passive(&[], io);
636 }
637 }
638
639 fn remove_active_by_index(
640 &mut self,
641 peer_index: usize,
642 reason: RemovalReason,
643 io: &mut impl IO<PI>,
644 ) -> Option<PI> {
645 if let Some(peer) = self.active_view.remove_index(peer_index) {
646 io.push(OutEvent::EmitEvent(Event::NeighborDown(peer)));
647
648 match reason {
649 RemovalReason::Random => self.send_disconnect(peer, true, io),
651 RemovalReason::DisconnectReceived { is_alive: _ } => {
653 io.push(OutEvent::DisconnectPeer(peer))
654 }
655 RemovalReason::ConnectionClosed => {}
657 }
658
659 let keep_as_passive = match reason {
660 RemovalReason::ConnectionClosed => self.alive_disconnect_peers.remove(&peer),
662 RemovalReason::DisconnectReceived { is_alive } => is_alive,
664 RemovalReason::Random => true,
666 };
667
668 if keep_as_passive {
669 let data = self.peer_data.remove(&peer);
670 self.add_passive(peer, data, io);
671 if !matches!(reason, RemovalReason::ConnectionClosed) {
673 self.alive_disconnect_peers.insert(peer);
674 }
675 }
676 debug!(other = ?peer, "removed from active view, reason: {reason:?}");
677 Some(peer)
678 } else {
679 None
680 }
681 }
682
683 fn free_random_slot_in_active_view(&mut self, io: &mut impl IO<PI>) {
685 if let Some(index) = self.active_view.pick_random_index(&mut self.rng) {
686 self.remove_active_by_index(index, RemovalReason::Random, io);
687 }
688 }
689
690 fn add_active(
696 &mut self,
697 peer: PI,
698 data: Option<PeerData>,
699 priority: Priority,
700 reply: bool,
701 io: &mut impl IO<PI>,
702 ) -> bool {
703 if peer == self.me {
704 return false;
705 }
706 self.insert_peer_info((peer, data).into(), io);
707 if self.active_view.contains(&peer) {
708 if reply {
709 self.send_neighbor(peer, priority, io);
710 }
711 return true;
712 }
713 match (priority, self.active_is_full()) {
714 (Priority::High, is_full) => {
715 if is_full {
716 self.free_random_slot_in_active_view(io);
717 }
718 self.add_active_unchecked(peer, Priority::High, reply, io);
719 true
720 }
721 (Priority::Low, false) => {
722 self.add_active_unchecked(peer, Priority::Low, reply, io);
723 true
724 }
725 (Priority::Low, true) => false,
726 }
727 }
728
729 fn add_active_unchecked(
730 &mut self,
731 peer: PI,
732 priority: Priority,
733 reply: bool,
734 io: &mut impl IO<PI>,
735 ) {
736 self.passive_view.remove(&peer);
737 if self.active_view.insert(peer) {
738 debug!(other = ?peer, "add to active view");
739 io.push(OutEvent::EmitEvent(Event::NeighborUp(peer)));
740 if reply {
741 self.send_neighbor(peer, priority, io);
742 }
743 }
744 }
745
746 fn send_neighbor(&mut self, peer: PI, priority: Priority, io: &mut impl IO<PI>) {
747 if self.pending_neighbor_requests.insert(peer) {
748 let message = Message::Neighbor(Neighbor {
749 priority,
750 data: self.me_data.clone(),
751 });
752 io.push(OutEvent::SendMessage(peer, message));
753 }
754 }
755}
756
757#[derive(Debug)]
758enum RemovalReason {
759 ConnectionClosed,
761 DisconnectReceived { is_alive: bool },
763 Random,
765}