iroh_gossip/proto/
hyparview.rs

1//! Implementation of the HyParView membership protocol
2//!
3//! The implementation is based on [this paper][paper] by Joao Leitao, Jose Pereira, Luıs Rodrigues
4//! and the [example implementation][impl] by Bartosz Sypytkowski
5//!
6//! [paper]: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
7//! [impl]: https://gist.github.com/Horusiath/84fac596101b197da0546d1697580d99
8
9use std::collections::{HashMap, HashSet};
10
11use derive_more::{From, Sub};
12use n0_future::time::Duration;
13use rand::{rngs::ThreadRng, Rng};
14use serde::{Deserialize, Serialize};
15use tracing::debug;
16
17use super::{util::IndexSet, PeerData, PeerIdentity, PeerInfo, IO};
18
19/// Input event for HyParView
20#[derive(Debug)]
21pub enum InEvent<PI> {
22    /// A [`Message`] was received from a peer.
23    RecvMessage(PI, Message<PI>),
24    /// A timer has expired.
25    TimerExpired(Timer<PI>),
26    /// A peer was disconnected on the IO layer.
27    PeerDisconnected(PI),
28    /// Send a join request to a peer.
29    RequestJoin(PI),
30    /// Update the peer data that is transmitted on join requests.
31    UpdatePeerData(PeerData),
32    /// Quit the swarm, informing peers about us leaving.
33    Quit,
34}
35
36/// Output event for HyParView
37#[derive(Debug)]
38pub enum OutEvent<PI> {
39    /// Ask the IO layer to send a [`Message`] to peer `PI`.
40    SendMessage(PI, Message<PI>),
41    /// Schedule a [`Timer`].
42    ScheduleTimer(Duration, Timer<PI>),
43    /// Ask the IO layer to close the connection to peer `PI`.
44    DisconnectPeer(PI),
45    /// Emit an [`Event`] to the application.
46    EmitEvent(Event<PI>),
47    /// New [`PeerData`] was received for peer `PI`.
48    PeerData(PI, PeerData),
49}
50
51/// Event emitted by the [`State`] to the application.
52#[derive(Clone, Debug)]
53pub enum Event<PI> {
54    /// A peer was added to our set of active connections.
55    NeighborUp(PI),
56    /// A peer was removed from our set of active connections.
57    NeighborDown(PI),
58}
59
60/// Kinds of timers HyParView needs to schedule.
61#[derive(Clone, Debug, PartialEq, Eq)]
62pub enum Timer<PI> {
63    DoShuffle,
64    PendingNeighborRequest(PI),
65}
66
67/// Messages that we can send and receive from peers within the topic.
68#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
69pub enum Message<PI> {
70    /// Sent to a peer if you want to join the swarm
71    Join(Option<PeerData>),
72    /// When receiving Join, ForwardJoin is forwarded to the peer's ActiveView to introduce the
73    /// new member.
74    ForwardJoin(ForwardJoin<PI>),
75    /// A shuffle request is sent occasionally to re-shuffle the PassiveView with contacts from
76    /// other peers.
77    Shuffle(Shuffle<PI>),
78    /// Peers reply to [`Message::Shuffle`] requests with a random peers from their active and
79    /// passive views.
80    ShuffleReply(ShuffleReply<PI>),
81    /// Request to add sender to an active view of recipient. If [`Neighbor::priority`] is
82    /// [`Priority::High`], the request cannot be denied.
83    Neighbor(Neighbor),
84    /// Request to disconnect from a peer.
85    /// If [`Disconnect::alive`] is true, the other peer is not shutting down, so it should be
86    /// added to the passive set.
87    Disconnect(Disconnect),
88}
89
90/// The time-to-live for this message.
91///
92/// Each time a message is forwarded, the `Ttl` is decreased by 1. If the `Ttl` reaches 0, it
93/// should not be forwarded further.
94#[derive(From, Sub, Eq, PartialEq, Clone, Debug, Copy, Serialize, Deserialize)]
95pub struct Ttl(pub u16);
96impl Ttl {
97    pub fn expired(&self) -> bool {
98        *self == Ttl(0)
99    }
100    pub fn next(&self) -> Ttl {
101        Ttl(self.0.saturating_sub(1))
102    }
103}
104
105/// A message informing other peers that a new peer joined the swarm for this topic.
106///
107/// Will be forwarded in a random walk until `ttl` reaches 0.
108#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
109pub struct ForwardJoin<PI> {
110    /// The peer that newly joined the swarm
111    peer: PeerInfo<PI>,
112    /// The time-to-live for this message
113    ttl: Ttl,
114}
115
116/// Shuffle messages are sent occasionally to shuffle our passive view with peers from other peer's
117/// active and passive views.
118#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
119pub struct Shuffle<PI> {
120    /// The peer that initiated the shuffle request.
121    origin: PI,
122    /// A random subset of the active and passive peers of the `origin` peer.
123    nodes: Vec<PeerInfo<PI>>,
124    /// The time-to-live for this message.
125    ttl: Ttl,
126}
127
128/// Once a shuffle messages reaches a [`Ttl`] of 0, a peer replies with a `ShuffleReply`.
129///
130/// The reply is sent to the peer that initiated the shuffle and contains a subset of the active
131/// and passive views of the peer at the end of the random walk.
132#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
133pub struct ShuffleReply<PI> {
134    /// A random subset of the active and passive peers of the peer sending the `ShuffleReply`.
135    nodes: Vec<PeerInfo<PI>>,
136}
137
138/// The priority of a `Join` message
139///
140/// This is `High` if the sender does not have any active peers, and `Low` otherwise.
141#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
142pub enum Priority {
143    /// High priority join that may not be denied.
144    ///
145    /// A peer may only send high priority joins if it doesn't have any active peers at the moment.
146    High,
147    /// Low priority join that can be denied.
148    Low,
149}
150
151/// A neighbor message is sent after adding a peer to our active view to inform them that we are
152/// now neighbors.
153#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
154pub struct Neighbor {
155    /// The priority of the `Join` or `ForwardJoin` message that triggered this neighbor request.
156    priority: Priority,
157    /// The user data of the peer sending this message.
158    data: Option<PeerData>,
159}
160
161/// Message sent when leaving the swarm or closing down to inform peers about us being gone.
162#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
163pub struct Disconnect {
164    /// Whether we are actually shutting down or closing the connection only because our limits are
165    /// reached.
166    alive: bool,
167    /// Obsolete field (kept in the struct to maintain wire compatibility).
168    _respond: bool,
169}
170
171/// Configuration for the swarm membership layer
172#[derive(Clone, Debug, Serialize, Deserialize)]
173#[serde(default)]
174pub struct Config {
175    /// Number of peers to which active connections are maintained
176    pub active_view_capacity: usize,
177    /// Number of peers for which contact information is remembered,
178    /// but to which we are not actively connected to.
179    pub passive_view_capacity: usize,
180    /// Number of hops a `ForwardJoin` message is propagated until the new peer's info
181    /// is added to a peer's active view.
182    pub active_random_walk_length: Ttl,
183    /// Number of hops a `ForwardJoin` message is propagated until the new peer's info
184    /// is added to a peer's passive view.
185    pub passive_random_walk_length: Ttl,
186    /// Number of hops a `Shuffle` message is propagated until a peer replies to it.
187    pub shuffle_random_walk_length: Ttl,
188    /// Number of active peers to be included in a `Shuffle` request.
189    pub shuffle_active_view_count: usize,
190    /// Number of passive peers to be included in a `Shuffle` request.
191    pub shuffle_passive_view_count: usize,
192    /// Interval duration for shuffle requests
193    pub shuffle_interval: Duration,
194    /// Timeout after which a `Neighbor` request is considered failed
195    pub neighbor_request_timeout: Duration,
196}
197impl Default for Config {
198    /// Default values for the HyParView layer
199    fn default() -> Self {
200        Self {
201            // From the paper (p9)
202            active_view_capacity: 5,
203            // From the paper (p9)
204            passive_view_capacity: 30,
205            // From the paper (p9)
206            active_random_walk_length: Ttl(6),
207            // From the paper (p9)
208            passive_random_walk_length: Ttl(3),
209            // From the paper (p9)
210            shuffle_random_walk_length: Ttl(6),
211            // From the paper (p9)
212            shuffle_active_view_count: 3,
213            // From the paper (p9)
214            shuffle_passive_view_count: 4,
215            // Wild guess
216            shuffle_interval: Duration::from_secs(60),
217            // Wild guess
218            neighbor_request_timeout: Duration::from_millis(500),
219        }
220    }
221}
222
223#[derive(Default, Debug, Clone)]
224pub struct Stats {
225    total_connections: usize,
226}
227
228/// The state of the HyParView protocol
229#[derive(Debug)]
230pub struct State<PI, RG = ThreadRng> {
231    /// Our peer identity
232    me: PI,
233    /// Our opaque user data to transmit to peers on join messages
234    me_data: Option<PeerData>,
235    /// The active view, i.e. peers we are connected to
236    pub(crate) active_view: IndexSet<PI>,
237    /// The passive view, i.e. peers we know about but are not connected to at the moment
238    pub(crate) passive_view: IndexSet<PI>,
239    /// Protocol configuration (cannot change at runtime)
240    config: Config,
241    /// Whether a shuffle timer is currently scheduled
242    shuffle_scheduled: bool,
243    /// Random number generator
244    rng: RG,
245    /// Statistics
246    pub(crate) stats: Stats,
247    /// The set of neighbor requests we sent out but did not yet receive a reply for
248    pending_neighbor_requests: HashSet<PI>,
249    /// The opaque user peer data we received for other peers
250    peer_data: HashMap<PI, PeerData>,
251    /// List of peers that are disconnecting, but which we want to keep in the passive set once the connection closes
252    alive_disconnect_peers: HashSet<PI>,
253}
254
255impl<PI, RG> State<PI, RG>
256where
257    PI: PeerIdentity,
258    RG: Rng,
259{
260    pub fn new(me: PI, me_data: Option<PeerData>, config: Config, rng: RG) -> Self {
261        Self {
262            me,
263            me_data,
264            active_view: IndexSet::new(),
265            passive_view: IndexSet::new(),
266            config,
267            shuffle_scheduled: false,
268            rng,
269            stats: Stats::default(),
270            pending_neighbor_requests: Default::default(),
271            peer_data: Default::default(),
272            alive_disconnect_peers: Default::default(),
273        }
274    }
275
276    pub fn handle(&mut self, event: InEvent<PI>, io: &mut impl IO<PI>) {
277        match event {
278            InEvent::RecvMessage(from, message) => self.handle_message(from, message, io),
279            InEvent::TimerExpired(timer) => match timer {
280                Timer::DoShuffle => self.handle_shuffle_timer(io),
281                Timer::PendingNeighborRequest(peer) => self.handle_pending_neighbor_timer(peer, io),
282            },
283            InEvent::PeerDisconnected(peer) => self.handle_connection_closed(peer, io),
284            InEvent::RequestJoin(peer) => self.handle_join(peer, io),
285            InEvent::UpdatePeerData(data) => {
286                self.me_data = Some(data);
287            }
288            InEvent::Quit => self.handle_quit(io),
289        }
290
291        // this will only happen on the first call
292        if !self.shuffle_scheduled {
293            io.push(OutEvent::ScheduleTimer(
294                self.config.shuffle_interval,
295                Timer::DoShuffle,
296            ));
297            self.shuffle_scheduled = true;
298        }
299    }
300
301    fn handle_message(&mut self, from: PI, message: Message<PI>, io: &mut impl IO<PI>) {
302        let is_disconnect = matches!(message, Message::Disconnect(Disconnect { .. }));
303        if !is_disconnect && !self.active_view.contains(&from) {
304            self.stats.total_connections += 1;
305        }
306        match message {
307            Message::Join(data) => self.on_join(from, data, io),
308            Message::ForwardJoin(details) => self.on_forward_join(from, details, io),
309            Message::Shuffle(details) => self.on_shuffle(from, details, io),
310            Message::ShuffleReply(details) => self.on_shuffle_reply(details, io),
311            Message::Neighbor(details) => self.on_neighbor(from, details, io),
312            Message::Disconnect(details) => self.on_disconnect(from, details, io),
313        }
314
315        // Disconnect from passive nodes right after receiving a message.
316        // TODO(frando): I'm not sure anymore that this is correct. Maybe remove?
317        if !is_disconnect && !self.active_view.contains(&from) {
318            io.push(OutEvent::DisconnectPeer(from));
319        }
320    }
321
322    fn handle_join(&mut self, peer: PI, io: &mut impl IO<PI>) {
323        io.push(OutEvent::SendMessage(
324            peer,
325            Message::Join(self.me_data.clone()),
326        ));
327    }
328
329    /// We received a disconnect message.
330    fn on_disconnect(&mut self, peer: PI, details: Disconnect, io: &mut impl IO<PI>) {
331        self.pending_neighbor_requests.remove(&peer);
332        if self.active_view.contains(&peer) {
333            self.remove_active(
334                &peer,
335                RemovalReason::DisconnectReceived {
336                    is_alive: details.alive,
337                },
338                io,
339            );
340        } else if details.alive && self.passive_view.contains(&peer) {
341            self.alive_disconnect_peers.insert(peer);
342        }
343    }
344
345    /// A connection was closed by the peer.
346    fn handle_connection_closed(&mut self, peer: PI, io: &mut impl IO<PI>) {
347        self.pending_neighbor_requests.remove(&peer);
348        if self.active_view.contains(&peer) {
349            self.remove_active(&peer, RemovalReason::ConnectionClosed, io);
350        } else if !self.alive_disconnect_peers.remove(&peer) {
351            self.passive_view.remove(&peer);
352            self.peer_data.remove(&peer);
353        }
354    }
355
356    fn handle_quit(&mut self, io: &mut impl IO<PI>) {
357        for peer in self.active_view.clone().into_iter() {
358            self.active_view.remove(&peer);
359            self.send_disconnect(peer, false, io);
360        }
361    }
362
363    fn send_disconnect(&mut self, peer: PI, alive: bool, io: &mut impl IO<PI>) {
364        // Before disconnecting, send a `ShuffleReply` with some of our nodes to
365        // prevent the other node from running out of connections. This is especially
366        // relevant if the other node just joined the swarm.
367        self.send_shuffle_reply(
368            peer,
369            self.config.shuffle_active_view_count + self.config.shuffle_passive_view_count,
370            io,
371        );
372        let message = Message::Disconnect(Disconnect {
373            alive,
374            _respond: false,
375        });
376        io.push(OutEvent::SendMessage(peer, message));
377        io.push(OutEvent::DisconnectPeer(peer));
378    }
379
380    fn on_join(&mut self, peer: PI, data: Option<PeerData>, io: &mut impl IO<PI>) {
381        // "A node that receives a join request will start by adding the new
382        // node to its active view, even if it has to drop a random node from it. (6)"
383        self.add_active(peer, data.clone(), Priority::High, true, io);
384
385        // "The contact node c will then send to all other nodes in its active view a ForwardJoin
386        // request containing the new node identifier. Associated to the join procedure,
387        // there are two configuration parameters, named Active Random Walk Length (ARWL),
388        // that specifies the maximum number of hops a ForwardJoin request is propagated,
389        // and Passive Random Walk Length (PRWL), that specifies at which point in the walk the node
390        // is inserted in a passive view. To use these parameters, the ForwardJoin request carries
391        // a “time to live” field that is initially set to ARWL and decreased at every hop. (7)"
392        let ttl = self.config.active_random_walk_length;
393        let peer_info = PeerInfo { id: peer, data };
394        for node in self.active_view.iter_without(&peer) {
395            let message = Message::ForwardJoin(ForwardJoin {
396                peer: peer_info.clone(),
397                ttl,
398            });
399            io.push(OutEvent::SendMessage(*node, message));
400        }
401    }
402
403    fn on_forward_join(&mut self, sender: PI, message: ForwardJoin<PI>, io: &mut impl IO<PI>) {
404        let peer_id = message.peer.id;
405        // If the peer is already in our active view, we renew our neighbor relationship.
406        if self.active_view.contains(&peer_id) {
407            self.insert_peer_info(message.peer, io);
408            self.send_neighbor(peer_id, Priority::High, io);
409        }
410        // "i) If the time to live is equal to zero or if the number of nodes in p’s active view is equal to one,
411        // it will add the new node to its active view (7)"
412        else if message.ttl.expired() || self.active_view.len() <= 1 {
413            self.insert_peer_info(message.peer, io);
414            // Modification from paper: Instead of adding the peer directly to our active view,
415            // we only send the Neighbor message. We will add the peer to our active view once we receive a
416            // reply from our neighbor.
417            // This prevents us adding unreachable peers to our active view.
418            self.send_neighbor(peer_id, Priority::High, io);
419        } else {
420            // "ii) If the time to live is equal to PRWL, p will insert the new node into its passive view"
421            if message.ttl == self.config.passive_random_walk_length {
422                self.add_passive(peer_id, message.peer.data.clone(), io);
423            }
424            // "iii) The time to live field is decremented."
425            // "iv) If, at this point, n has not been inserted
426            // in p’s active view, p will forward the request to a random node in its active view
427            // (different from the one from which the request was received)."
428            if !self.active_view.contains(&peer_id)
429                && !self.pending_neighbor_requests.contains(&peer_id)
430            {
431                match self
432                    .active_view
433                    .pick_random_without(&[&sender], &mut self.rng)
434                {
435                    None => {
436                        unreachable!("if the peer was not added, there are at least two peers in our active view.");
437                    }
438                    Some(next) => {
439                        let message = Message::ForwardJoin(ForwardJoin {
440                            peer: message.peer,
441                            ttl: message.ttl.next(),
442                        });
443                        io.push(OutEvent::SendMessage(*next, message));
444                    }
445                }
446            }
447        }
448    }
449
450    fn on_neighbor(&mut self, from: PI, details: Neighbor, io: &mut impl IO<PI>) {
451        let is_reply = self.pending_neighbor_requests.remove(&from);
452        let do_reply = !is_reply;
453        // "A node q that receives a high priority neighbor request will always accept the request, even
454        // if it has to drop a random member from its active view (again, the member that is dropped will
455        // receive a Disconnect notification). If a node q receives a low priority Neighbor request, it will
456        // only accept the request if it has a free slot in its active view, otherwise it will refuse the request."
457        if !self.add_active(from, details.data, details.priority, do_reply, io) {
458            self.send_disconnect(from, true, io);
459        }
460    }
461
462    /// Get the peer [`PeerInfo`] for a peer.
463    fn peer_info(&self, id: &PI) -> PeerInfo<PI> {
464        let data = self.peer_data.get(id).cloned();
465        PeerInfo { id: *id, data }
466    }
467
468    fn insert_peer_info(&mut self, peer_info: PeerInfo<PI>, io: &mut impl IO<PI>) {
469        if let Some(data) = peer_info.data {
470            let old = self.peer_data.remove(&peer_info.id);
471            let same = matches!(old, Some(old) if old == data);
472            if !same && !data.0.is_empty() {
473                io.push(OutEvent::PeerData(peer_info.id, data.clone()));
474            }
475            self.peer_data.insert(peer_info.id, data);
476        }
477    }
478
479    /// Handle a [`Message::Shuffle`]
480    ///
481    /// > A node q that receives a Shuffle request will first decrease its time to live. If the time
482    /// > to live of the message is greater than zero and the number of nodes in q’s active view is
483    /// > greater than 1, the node will select a random node from its active view, different from the
484    /// > one he received this shuffle message from, and simply forwards the Shuffle request.
485    /// > Otherwise, node q accepts the Shuffle request and send back (p.8)
486    fn on_shuffle(&mut self, from: PI, shuffle: Shuffle<PI>, io: &mut impl IO<PI>) {
487        if shuffle.ttl.expired() || self.active_view.len() <= 1 {
488            let len = shuffle.nodes.len();
489            for node in shuffle.nodes {
490                self.add_passive(node.id, node.data, io);
491            }
492            self.send_shuffle_reply(shuffle.origin, len, io);
493        } else if let Some(node) = self
494            .active_view
495            .pick_random_without(&[&shuffle.origin, &from], &mut self.rng)
496        {
497            let message = Message::Shuffle(Shuffle {
498                origin: shuffle.origin,
499                nodes: shuffle.nodes,
500                ttl: shuffle.ttl.next(),
501            });
502            io.push(OutEvent::SendMessage(*node, message));
503        }
504    }
505
506    fn send_shuffle_reply(&mut self, to: PI, len: usize, io: &mut impl IO<PI>) {
507        let mut nodes = self.passive_view.shuffled_and_capped(len, &mut self.rng);
508        // If we don't have enough passive nodes for the expected length, we fill with
509        // active nodes.
510        if nodes.len() < len {
511            nodes.extend(
512                self.active_view
513                    .shuffled_and_capped(len - nodes.len(), &mut self.rng),
514            );
515        }
516        let nodes = nodes.into_iter().map(|id| self.peer_info(&id));
517        let message = Message::ShuffleReply(ShuffleReply {
518            nodes: nodes.collect(),
519        });
520        io.push(OutEvent::SendMessage(to, message));
521    }
522
523    fn on_shuffle_reply(&mut self, message: ShuffleReply<PI>, io: &mut impl IO<PI>) {
524        for node in message.nodes {
525            self.add_passive(node.id, node.data, io);
526        }
527        self.refill_active_from_passive(&[], io);
528    }
529
530    fn handle_shuffle_timer(&mut self, io: &mut impl IO<PI>) {
531        if let Some(node) = self.active_view.pick_random(&mut self.rng) {
532            let active = self.active_view.shuffled_without_and_capped(
533                &[node],
534                self.config.shuffle_active_view_count,
535                &mut self.rng,
536            );
537            let passive = self.passive_view.shuffled_without_and_capped(
538                &[node],
539                self.config.shuffle_passive_view_count,
540                &mut self.rng,
541            );
542            let nodes = active
543                .iter()
544                .chain(passive.iter())
545                .map(|id| self.peer_info(id));
546            let me = PeerInfo {
547                id: self.me,
548                data: self.me_data.clone(),
549            };
550            let nodes = nodes.chain([me]);
551            let message = Shuffle {
552                origin: self.me,
553                nodes: nodes.collect(),
554                ttl: self.config.shuffle_random_walk_length,
555            };
556            io.push(OutEvent::SendMessage(*node, Message::Shuffle(message)));
557        }
558        io.push(OutEvent::ScheduleTimer(
559            self.config.shuffle_interval,
560            Timer::DoShuffle,
561        ));
562    }
563
564    fn passive_is_full(&self) -> bool {
565        self.passive_view.len() >= self.config.passive_view_capacity
566    }
567
568    fn active_is_full(&self) -> bool {
569        self.active_view.len() >= self.config.active_view_capacity
570    }
571
572    /// Add a peer to the passive view.
573    ///
574    /// If the passive view is full, it will first remove a random peer and then insert the new peer.
575    /// If a peer is currently in the active view it will not be added.
576    fn add_passive(&mut self, peer: PI, data: Option<PeerData>, io: &mut impl IO<PI>) {
577        self.insert_peer_info((peer, data).into(), io);
578        if self.active_view.contains(&peer) || self.passive_view.contains(&peer) || peer == self.me
579        {
580            return;
581        }
582        if self.passive_is_full() {
583            self.passive_view.remove_random(&mut self.rng);
584        }
585        self.passive_view.insert(peer);
586    }
587
588    /// Remove a peer from the active view.
589    ///
590    /// If `reason` is [`RemovalReason::Random`], a [`Disconnect`] message will be sent to the peer.
591    fn remove_active(&mut self, peer: &PI, reason: RemovalReason, io: &mut impl IO<PI>) {
592        if let Some(idx) = self.active_view.get_index_of(peer) {
593            let removed_peer = self.remove_active_by_index(idx, reason, io).unwrap();
594            self.refill_active_from_passive(&[&removed_peer], io);
595        }
596    }
597
598    fn refill_active_from_passive(&mut self, skip_peers: &[&PI], io: &mut impl IO<PI>) {
599        if self.active_view.len() + self.pending_neighbor_requests.len()
600            >= self.config.active_view_capacity
601        {
602            return;
603        }
604        // "When a node p suspects that one of the nodes present in its active view has failed
605        // (by either disconnecting or blocking), it selects a random node q from its passive view and
606        // attempts to establish a TCP connection with q. If the connection fails to establish,
607        // node q is considered failed and removed from p’s passive view; another node q′ is selected
608        // at random and a new attempt is made. The procedure is repeated until a connection is established
609        // with success." (p7)
610        let mut skip_peers = skip_peers.to_vec();
611        skip_peers.extend(self.pending_neighbor_requests.iter());
612
613        if let Some(node) = self
614            .passive_view
615            .pick_random_without(&skip_peers, &mut self.rng)
616            .copied()
617        {
618            let priority = match self.active_view.is_empty() {
619                true => Priority::High,
620                false => Priority::Low,
621            };
622            self.send_neighbor(node, priority, io);
623            // schedule a timer that checks if the node replied with a neighbor message,
624            // otherwise try again with another passive node.
625            io.push(OutEvent::ScheduleTimer(
626                self.config.neighbor_request_timeout,
627                Timer::PendingNeighborRequest(node),
628            ));
629        };
630    }
631
632    fn handle_pending_neighbor_timer(&mut self, peer: PI, io: &mut impl IO<PI>) {
633        if self.pending_neighbor_requests.remove(&peer) {
634            self.passive_view.remove(&peer);
635            self.refill_active_from_passive(&[], io);
636        }
637    }
638
639    fn remove_active_by_index(
640        &mut self,
641        peer_index: usize,
642        reason: RemovalReason,
643        io: &mut impl IO<PI>,
644    ) -> Option<PI> {
645        if let Some(peer) = self.active_view.remove_index(peer_index) {
646            io.push(OutEvent::EmitEvent(Event::NeighborDown(peer)));
647
648            match reason {
649                // send a disconnect message, then close connection.
650                RemovalReason::Random => self.send_disconnect(peer, true, io),
651                // close connection without sending anything further.
652                RemovalReason::DisconnectReceived { is_alive: _ } => {
653                    io.push(OutEvent::DisconnectPeer(peer))
654                }
655                // do nothing, connection already closed.
656                RemovalReason::ConnectionClosed => {}
657            }
658
659            let keep_as_passive = match reason {
660                // keep alive if previously marked as alive.
661                RemovalReason::ConnectionClosed => self.alive_disconnect_peers.remove(&peer),
662                // keep alive if other peer said to be still alive.
663                RemovalReason::DisconnectReceived { is_alive } => is_alive,
664                // keep alive (only we are removing for now)
665                RemovalReason::Random => true,
666            };
667
668            if keep_as_passive {
669                let data = self.peer_data.remove(&peer);
670                self.add_passive(peer, data, io);
671                // mark peer as alive, so it doesn't get removed from the passive view if the conn closes.
672                if !matches!(reason, RemovalReason::ConnectionClosed) {
673                    self.alive_disconnect_peers.insert(peer);
674                }
675            }
676            debug!(other = ?peer, "removed from active view, reason: {reason:?}");
677            Some(peer)
678        } else {
679            None
680        }
681    }
682
683    /// Remove a random peer from the active view.
684    fn free_random_slot_in_active_view(&mut self, io: &mut impl IO<PI>) {
685        if let Some(index) = self.active_view.pick_random_index(&mut self.rng) {
686            self.remove_active_by_index(index, RemovalReason::Random, io);
687        }
688    }
689
690    /// Add a peer to the active view.
691    ///
692    /// If the active view is currently full, a random peer will be removed first.
693    /// Sends a Neighbor message to the peer. If high_priority is true, the peer
694    /// may not deny the Neighbor request.
695    fn add_active(
696        &mut self,
697        peer: PI,
698        data: Option<PeerData>,
699        priority: Priority,
700        reply: bool,
701        io: &mut impl IO<PI>,
702    ) -> bool {
703        if peer == self.me {
704            return false;
705        }
706        self.insert_peer_info((peer, data).into(), io);
707        if self.active_view.contains(&peer) {
708            if reply {
709                self.send_neighbor(peer, priority, io);
710            }
711            return true;
712        }
713        match (priority, self.active_is_full()) {
714            (Priority::High, is_full) => {
715                if is_full {
716                    self.free_random_slot_in_active_view(io);
717                }
718                self.add_active_unchecked(peer, Priority::High, reply, io);
719                true
720            }
721            (Priority::Low, false) => {
722                self.add_active_unchecked(peer, Priority::Low, reply, io);
723                true
724            }
725            (Priority::Low, true) => false,
726        }
727    }
728
729    fn add_active_unchecked(
730        &mut self,
731        peer: PI,
732        priority: Priority,
733        reply: bool,
734        io: &mut impl IO<PI>,
735    ) {
736        self.passive_view.remove(&peer);
737        if self.active_view.insert(peer) {
738            debug!(other = ?peer, "add to active view");
739            io.push(OutEvent::EmitEvent(Event::NeighborUp(peer)));
740            if reply {
741                self.send_neighbor(peer, priority, io);
742            }
743        }
744    }
745
746    fn send_neighbor(&mut self, peer: PI, priority: Priority, io: &mut impl IO<PI>) {
747        if self.pending_neighbor_requests.insert(peer) {
748            let message = Message::Neighbor(Neighbor {
749                priority,
750                data: self.me_data.clone(),
751            });
752            io.push(OutEvent::SendMessage(peer, message));
753        }
754    }
755}
756
757#[derive(Debug)]
758enum RemovalReason {
759    /// A peer is removed because the connection was closed ungracefully.
760    ConnectionClosed,
761    /// A peer is removed because we received a disconnect message.
762    DisconnectReceived { is_alive: bool },
763    /// A peer is removed after random selection to make room for a newly joined peer.
764    Random,
765}