iroh_gossip/proto/
plumtree.rs

1//! Implementation of the Plumtree epidemic broadcast tree protocol
2//!
3//! The implementation is based on [this paper][paper] by Joao Leitao, Jose Pereira, Luıs Rodrigues
4//! and the [example implementation][impl] by Bartosz Sypytkowski
5//!
6//! [paper]: https://asc.di.fct.unl.pt/~jleitao/pdf/srds07-leitao.pdf
7//! [impl]: https://gist.github.com/Horusiath/84fac596101b197da0546d1697580d99
8
9use std::{
10    collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
11    hash::Hash,
12};
13
14use bytes::Bytes;
15use derive_more::{Add, From, Sub};
16use n0_future::time::{Duration, Instant};
17use postcard::experimental::max_size::MaxSize;
18use serde::{Deserialize, Serialize};
19use tracing::{debug, warn};
20
21use super::{
22    util::{idbytes_impls, TimeBoundCache},
23    PeerIdentity, IO,
24};
25
26/// A message identifier, which is the message content's blake3 hash.
27#[derive(Serialize, Deserialize, Clone, Hash, Copy, PartialEq, Eq, MaxSize)]
28pub struct MessageId([u8; 32]);
29idbytes_impls!(MessageId, "MessageId");
30
31impl MessageId {
32    /// Create a `[MessageId]` by hashing the message content.
33    ///
34    /// This hashes the input with [`blake3::hash`].
35    pub fn from_content(message: &[u8]) -> Self {
36        Self::from(blake3::hash(message))
37    }
38}
39
40/// Events Plumtree is informed of from the peer sampling service and IO layer.
41#[derive(Debug)]
42pub enum InEvent<PI> {
43    /// A [`Message`] was received from the peer.
44    RecvMessage(PI, Message),
45    /// Broadcast the contained payload to the given scope.
46    Broadcast(Bytes, Scope),
47    /// A timer has expired.
48    TimerExpired(Timer),
49    /// New member `PI` has joined the topic.
50    NeighborUp(PI),
51    /// Peer `PI` has disconnected from the topic.
52    NeighborDown(PI),
53}
54
55/// Events Plumtree emits.
56#[derive(Debug, PartialEq, Eq)]
57pub enum OutEvent<PI> {
58    /// Ask the IO layer to send a [`Message`] to peer `PI`.
59    SendMessage(PI, Message),
60    /// Schedule a [`Timer`].
61    ScheduleTimer(Duration, Timer),
62    /// Emit an [`Event`] to the application.
63    EmitEvent(Event<PI>),
64}
65
66/// Kinds of timers Plumtree needs to schedule.
67#[derive(Clone, Debug, Eq, PartialEq)]
68pub enum Timer {
69    /// Request the content for [`MessageId`] by sending [`Message::Graft`].
70    ///
71    /// The message will be sent to a peer that sent us an [`Message::IHave`] for this [`MessageId`],
72    /// which will send us the message content in reply and also move the peer into the eager set.
73    /// Will be a no-op if the message for [`MessageId`] was already received from another peer by now.
74    SendGraft(MessageId),
75    /// Dispatch the [`Message::IHave`] in our lazy push queue.
76    DispatchLazyPush,
77    /// Evict the message cache
78    EvictCache,
79}
80
81/// Event emitted by the [`State`] to the application.
82#[derive(Clone, Debug, PartialEq, Eq)]
83pub enum Event<PI> {
84    /// A new gossip message was received.
85    Received(GossipEvent<PI>),
86}
87
88#[derive(Clone, derive_more::Debug, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
89pub struct GossipEvent<PI> {
90    /// The content of the gossip message.
91    #[debug("<{}b>", content.len())]
92    pub content: Bytes,
93    /// The peer that we received the gossip message from. Note that this is not the peer that
94    /// originally broadcasted the message, but the peer before us in the gossiping path.
95    pub delivered_from: PI,
96    /// The broadcast scope of the message.
97    pub scope: DeliveryScope,
98}
99
100impl<PI> GossipEvent<PI> {
101    fn from_message(message: &Gossip, from: PI) -> Self {
102        Self {
103            content: message.content.clone(),
104            scope: message.scope,
105            delivered_from: from,
106        }
107    }
108}
109
110/// Number of delivery hops a message has taken.
111#[derive(
112    From,
113    Add,
114    Sub,
115    Serialize,
116    Deserialize,
117    Eq,
118    PartialEq,
119    PartialOrd,
120    Ord,
121    Clone,
122    Copy,
123    Debug,
124    Hash,
125    MaxSize,
126)]
127pub struct Round(u16);
128
129impl Round {
130    pub fn next(&self) -> Round {
131        Round(self.0 + 1)
132    }
133}
134
135/// Messages that we can send and receive from peers within the topic.
136#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
137pub enum Message {
138    /// When receiving Gossip, emit as event and forward full message to eager peer and (after a
139    /// delay) message IDs to lazy peers.
140    Gossip(Gossip),
141    /// When receiving Prune, move the peer from the eager to the lazy set.
142    Prune,
143    /// When receiving Graft, move the peer to the eager set and send the full content for the
144    /// included message ID.
145    Graft(Graft),
146    /// When receiving IHave, do nothing initially, and request the messages for the included
147    /// message IDs after some time if they aren't pushed eagerly to us.
148    IHave(Vec<IHave>),
149}
150
151/// Payload messages transmitted by the protocol.
152#[derive(Serialize, Deserialize, Clone, derive_more::Debug, PartialEq, Eq)]
153pub struct Gossip {
154    /// Id of the message.
155    id: MessageId,
156    /// Message contents.
157    #[debug("<{}b>", content.len())]
158    content: Bytes,
159    /// Scope to broadcast to.
160    scope: DeliveryScope,
161}
162
163impl Gossip {
164    fn round(&self) -> Option<Round> {
165        match self.scope {
166            DeliveryScope::Swarm(round) => Some(round),
167            DeliveryScope::Neighbors => None,
168        }
169    }
170}
171
172/// The scope to deliver the message to.
173#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Copy)]
174pub enum DeliveryScope {
175    /// This message was received from the swarm, with a distance (in hops) travelled from the
176    /// original broadcaster.
177    Swarm(Round),
178    /// This message was received from a direct neighbor that broadcasted the message to neighbors
179    /// only.
180    Neighbors,
181}
182
183impl DeliveryScope {
184    /// Whether this message was directly received from its publisher.
185    pub fn is_direct(&self) -> bool {
186        matches!(self, Self::Neighbors | Self::Swarm(Round(0)))
187    }
188}
189
190/// The broadcast scope of a gossip message.
191#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Copy)]
192pub enum Scope {
193    /// The message is broadcast to all peers in the swarm.
194    Swarm,
195    /// The message is broadcast only to the immediate neighbors of a peer.
196    Neighbors,
197}
198
199impl Gossip {
200    /// Get a clone of this `Gossip` message and increase the delivery round by 1.
201    pub fn next_round(&self) -> Option<Gossip> {
202        match self.scope {
203            DeliveryScope::Neighbors => None,
204            DeliveryScope::Swarm(round) => Some(Gossip {
205                id: self.id,
206                content: self.content.clone(),
207                scope: DeliveryScope::Swarm(round.next()),
208            }),
209        }
210    }
211
212    /// Validate that the message id is the blake3 hash of the message content.
213    pub fn validate(&self) -> bool {
214        let expected = MessageId::from_content(&self.content);
215        expected == self.id
216    }
217}
218
219/// Control message to inform peers we have a message without transmitting the whole payload.
220#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, MaxSize)]
221pub struct IHave {
222    /// Id of the message.
223    pub(crate) id: MessageId,
224    /// Delivery round of the message.
225    pub(crate) round: Round,
226}
227
228/// Control message to signal a peer that they have been moved to the eager set, and to ask the
229/// peer to do the same with this node.
230#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
231pub struct Graft {
232    /// Message id that triggers the graft, if any.
233    /// On receiving a graft, the payload message must be sent in reply if a message id is set.
234    id: Option<MessageId>,
235    /// Delivery round of the [`Message::IHave`] that triggered this Graft message.
236    round: Round,
237}
238
239/// Configuration for the gossip broadcast layer.
240///
241/// Currently, the expectation is that the configuration is the same for all peers in the
242/// network (as recommended in the paper).
243#[derive(Clone, Debug, Serialize, Deserialize)]
244#[serde(default)]
245pub struct Config {
246    /// When receiving an `IHave` message, this timeout is registered. If the message for the
247    /// `IHave` was not received once the timeout is expired, a `Graft` message is sent to the
248    /// peer that sent us the `IHave` to request the message payload.
249    ///
250    /// The plumtree paper notes:
251    /// > The timeout value is a protocol parameter that should be configured considering the
252    /// > diameter of the overlay and a target maximum recovery latency, defined by the application
253    /// > requirements. (p.8)
254    pub graft_timeout_1: Duration,
255    /// This timeout is registered when sending a `Graft` message. If a reply has not been
256    /// received once the timeout expires, we send another `Graft` message to the next peer that
257    /// sent us an `IHave` for this message.
258    ///
259    /// The plumtree paper notes:
260    /// > This second timeout value should be smaller that the first, in the order of an average
261    /// > round trip time to a neighbor.
262    pub graft_timeout_2: Duration,
263    /// Timeout after which `IHave` messages are pushed to peers.
264    pub dispatch_timeout: Duration,
265    /// The protocol performs a tree optimization, which promotes lazy peers to eager peers if the
266    /// `Message::IHave` messages received from them have a lower number of hops from the
267    /// message's origin as the `InEvent::Broadcast` messages received from our eager peers. This
268    /// parameter is the number of hops that the lazy peers must be closer to the origin than our
269    /// eager peers to be promoted to become an eager peer.
270    pub optimization_threshold: Round,
271
272    /// Duration for which to keep gossip messages in the internal message cache.
273    ///
274    /// Messages broadcast from this node or received from other nodes are kept in an internal
275    /// cache for this duration before being evicted. If this is too low, other nodes will not be
276    /// able to retrieve messages once they need them. If this is high, the cache will grow.
277    ///
278    /// Should be at least around several round trip times to peers.
279    pub message_cache_retention: Duration,
280
281    /// Duration for which to keep the `MessageId`s for received messages.
282    ///
283    /// Should be at least as long as [`Self::message_cache_retention`], usually will be longer to
284    /// not accidentally receive messages multiple times.
285    pub message_id_retention: Duration,
286
287    /// How often the internal caches will be checked for expired items.
288    pub cache_evict_interval: Duration,
289}
290
291impl Default for Config {
292    /// Sensible defaults for the plumtree configuration
293    //
294    // TODO: Find out what good defaults are for the three timeouts here. Current numbers are
295    // guesses that need validation. The paper does not have concrete recommendations for these
296    // numbers.
297    fn default() -> Self {
298        Self {
299            // Paper: "The timeout value is a protocol parameter that should be configured considering
300            // the diameter of the overlay and a target maximum recovery latency, defined by the
301            // application requirements. This is a parameter that should be statically configured
302            // at deployment time." (p. 8)
303            //
304            // Earthstar has 5ms it seems, see https://github.com/earthstar-project/earthstar/blob/1523c640fedf106f598bf79b184fb0ada64b1cc0/src/syncer/plum_tree.ts#L75
305            // However in the paper it is more like a few roundtrips if I read things correctly.
306            graft_timeout_1: Duration::from_millis(80),
307
308            // Paper: "This second timeout value should be smaller that the first, in the order of an
309            // average round trip time to a neighbor." (p. 9)
310            //
311            // Earthstar doesn't have this step from my reading.
312            graft_timeout_2: Duration::from_millis(40),
313
314            // Again, paper does not tell a recommended number here. Likely should be quite small,
315            // as to not delay messages without need. This would also be the time frame in which
316            // `IHave`s are aggregated to save on packets.
317            //
318            // Eartstar dispatches immediately from my reading.
319            dispatch_timeout: Duration::from_millis(5),
320
321            // This number comes from experiment settings the plumtree paper (p. 12)
322            optimization_threshold: Round(7),
323
324            // This is a certainly-high-enough value for usual operation.
325            message_cache_retention: Duration::from_secs(30),
326            message_id_retention: Duration::from_secs(90),
327            cache_evict_interval: Duration::from_secs(1),
328        }
329    }
330}
331
332/// Stats about this topic's plumtree.
333#[derive(Debug, Default, Clone)]
334pub struct Stats {
335    /// Number of payload messages received so far.
336    ///
337    /// See [`Message::Gossip`].
338    pub payload_messages_received: u64,
339    /// Number of control messages received so far.
340    ///
341    /// See [`Message::Prune`], [`Message::Graft`], [`Message::IHave`].
342    pub control_messages_received: u64,
343    /// Max round seen so far.
344    pub max_last_delivery_hop: u16,
345}
346
347/// State of the plumtree.
348#[derive(Debug)]
349pub struct State<PI> {
350    /// Our address.
351    me: PI,
352    /// Configuration for this plumtree.
353    config: Config,
354
355    /// Set of peers used for payload exchange.
356    pub(crate) eager_push_peers: BTreeSet<PI>,
357    /// Set of peers used for control message exchange.
358    pub(crate) lazy_push_peers: BTreeSet<PI>,
359
360    lazy_push_queue: BTreeMap<PI, Vec<IHave>>,
361
362    /// Messages for which a [`MessageId`] has been seen via a [`Message::IHave`] but we have not
363    /// yet received the full payload. For each, we store the peers that have claimed to have this
364    /// message.
365    missing_messages: HashMap<MessageId, VecDeque<(PI, Round)>>,
366    /// Messages for which the full payload has been seen.
367    received_messages: TimeBoundCache<MessageId, ()>,
368    /// Payloads of received messages.
369    cache: TimeBoundCache<MessageId, Gossip>,
370
371    /// Message ids for which a [`Timer::SendGraft`] has been scheduled.
372    graft_timer_scheduled: HashSet<MessageId>,
373    /// Whether a [`Timer::DispatchLazyPush`] has been scheduled.
374    dispatch_timer_scheduled: bool,
375
376    /// Set to false after the first message is received. Used for initial timer scheduling.
377    init: bool,
378
379    /// [`Stats`] of this plumtree.
380    pub(crate) stats: Stats,
381
382    max_message_size: usize,
383}
384
385impl<PI: PeerIdentity> State<PI> {
386    /// Initialize the [`State`] of a plumtree.
387    pub fn new(me: PI, config: Config, max_message_size: usize) -> Self {
388        Self {
389            me,
390            eager_push_peers: Default::default(),
391            lazy_push_peers: Default::default(),
392            lazy_push_queue: Default::default(),
393            config,
394            missing_messages: Default::default(),
395            received_messages: Default::default(),
396            graft_timer_scheduled: Default::default(),
397            dispatch_timer_scheduled: false,
398            cache: Default::default(),
399            init: false,
400            stats: Default::default(),
401            max_message_size,
402        }
403    }
404
405    /// Handle an [`InEvent`].
406    pub fn handle(&mut self, event: InEvent<PI>, now: Instant, io: &mut impl IO<PI>) {
407        if !self.init {
408            self.init = true;
409            self.on_evict_cache_timer(now, io)
410        }
411        match event {
412            InEvent::RecvMessage(from, message) => self.handle_message(from, message, now, io),
413            InEvent::Broadcast(data, scope) => self.broadcast(data, scope, now, io),
414            InEvent::NeighborUp(peer) => self.on_neighbor_up(peer),
415            InEvent::NeighborDown(peer) => self.on_neighbor_down(peer),
416            InEvent::TimerExpired(timer) => match timer {
417                Timer::DispatchLazyPush => self.on_dispatch_timer(io),
418                Timer::SendGraft(id) => {
419                    self.on_send_graft_timer(id, io);
420                }
421                Timer::EvictCache => self.on_evict_cache_timer(now, io),
422            },
423        }
424    }
425
426    /// Get access to the [`Stats`] of the plumtree.
427    pub fn stats(&self) -> &Stats {
428        &self.stats
429    }
430
431    /// Handle receiving a [`Message`].
432    fn handle_message(&mut self, sender: PI, message: Message, now: Instant, io: &mut impl IO<PI>) {
433        if matches!(message, Message::Gossip(_)) {
434            self.stats.payload_messages_received += 1;
435        } else {
436            self.stats.control_messages_received += 1;
437        }
438        match message {
439            Message::Gossip(details) => self.on_gossip(sender, details, now, io),
440            Message::Prune => self.on_prune(sender),
441            Message::IHave(details) => self.on_ihave(sender, details, io),
442            Message::Graft(details) => self.on_graft(sender, details, io),
443        }
444    }
445
446    /// Dispatches messages from lazy queue over to lazy peers.
447    fn on_dispatch_timer(&mut self, io: &mut impl IO<PI>) {
448        let chunk_size = self.max_message_size
449            // Space for discriminator
450            - 1
451            // Space for length prefix
452            - 2;
453        let chunk_len = chunk_size / IHave::POSTCARD_MAX_SIZE;
454        while let Some((peer, list)) = self.lazy_push_queue.pop_first() {
455            for chunk in list.chunks(chunk_len) {
456                io.push(OutEvent::SendMessage(peer, Message::IHave(chunk.to_vec())));
457            }
458        }
459
460        self.dispatch_timer_scheduled = false;
461    }
462
463    /// Send a gossip message.
464    ///
465    /// Will be pushed in full to eager peers.
466    /// Pushing the message id to the lazy peers is delayed by a timer.
467    fn broadcast(&mut self, content: Bytes, scope: Scope, now: Instant, io: &mut impl IO<PI>) {
468        let id = MessageId::from_content(&content);
469        let scope = match scope {
470            Scope::Neighbors => DeliveryScope::Neighbors,
471            Scope::Swarm => DeliveryScope::Swarm(Round(0)),
472        };
473        let message = Gossip { id, content, scope };
474        let me = self.me;
475        if let DeliveryScope::Swarm(_) = scope {
476            self.received_messages
477                .insert(id, (), now + self.config.message_id_retention);
478            self.cache.insert(
479                id,
480                message.clone(),
481                now + self.config.message_cache_retention,
482            );
483            self.lazy_push(message.clone(), &me, io);
484        }
485
486        self.eager_push(message.clone(), &me, io);
487    }
488
489    /// Handle receiving a [`Message::Gossip`].
490    fn on_gossip(&mut self, sender: PI, message: Gossip, now: Instant, io: &mut impl IO<PI>) {
491        // Validate that the message id is the blake3 hash of the message content.
492        if !message.validate() {
493            // TODO: Do we want to take any measures against the sender if we received a message
494            // with a spoofed message id?
495            warn!(
496                peer = ?sender,
497                "Received a message with spoofed message id ({})", message.id
498            );
499            return;
500        }
501
502        // if we already received this message: move peer to lazy set
503        // and notify peer about this.
504        if self.received_messages.contains_key(&message.id) {
505            self.add_lazy(sender);
506            io.push(OutEvent::SendMessage(sender, Message::Prune));
507        // otherwise store the message, emit to application and forward to peers
508        } else {
509            if let DeliveryScope::Swarm(prev_round) = message.scope {
510                // insert the message in the list of received messages
511                self.received_messages.insert(
512                    message.id,
513                    (),
514                    now + self.config.message_id_retention,
515                );
516                // increase the round for forwarding the message, and add to cache
517                // to reply to Graft messages later
518                // TODO: add callback/event to application to get missing messages that were received before?
519                let message = message.next_round().expect("just checked");
520
521                self.cache.insert(
522                    message.id,
523                    message.clone(),
524                    now + self.config.message_cache_retention,
525                );
526                // push the message to our peers
527                self.eager_push(message.clone(), &sender, io);
528                self.lazy_push(message.clone(), &sender, io);
529                // cleanup places where we track missing messages
530                self.graft_timer_scheduled.remove(&message.id);
531                let previous_ihaves = self.missing_messages.remove(&message.id);
532                // do the optimization step from the paper
533                if let Some(previous_ihaves) = previous_ihaves {
534                    self.optimize_tree(&sender, &message, previous_ihaves, io);
535                }
536                self.stats.max_last_delivery_hop =
537                    self.stats.max_last_delivery_hop.max(prev_round.0);
538            }
539
540            // emit event to application
541            io.push(OutEvent::EmitEvent(Event::Received(
542                GossipEvent::from_message(&message, sender),
543            )));
544        }
545    }
546
547    /// Optimize the tree by pruning the `sender` of a [`Message::Gossip`] if we previously
548    /// received a [`Message::IHave`] for the same message with a much lower number of delivery
549    /// hops from the original broadcaster of the message.
550    ///
551    /// See [Config::optimization_threshold].
552    fn optimize_tree(
553        &mut self,
554        gossip_sender: &PI,
555        message: &Gossip,
556        previous_ihaves: VecDeque<(PI, Round)>,
557        io: &mut impl IO<PI>,
558    ) {
559        let round = message.round().expect("only called for swarm messages");
560        let best_ihave = previous_ihaves
561            .iter()
562            .min_by(|(_a_peer, a_round), (_b_peer, b_round)| a_round.cmp(b_round))
563            .copied();
564
565        if let Some((ihave_peer, ihave_round)) = best_ihave {
566            if (ihave_round < round) && (round - ihave_round) >= self.config.optimization_threshold
567            {
568                // Graft the sender of the IHave, but only if it's not already eager.
569                if !self.eager_push_peers.contains(&ihave_peer) {
570                    let message = Message::Graft(Graft {
571                        id: None,
572                        round: ihave_round,
573                    });
574                    self.add_eager(ihave_peer);
575                    io.push(OutEvent::SendMessage(ihave_peer, message));
576                }
577                // Prune the sender of the Gossip.
578                self.add_lazy(*gossip_sender);
579                io.push(OutEvent::SendMessage(*gossip_sender, Message::Prune));
580            }
581        }
582    }
583
584    /// Handle receiving a [`Message::Prune`].
585    fn on_prune(&mut self, sender: PI) {
586        self.add_lazy(sender);
587    }
588
589    /// Handle receiving a [`Message::IHave`].
590    ///
591    /// > When a node receives a IHAVE message, it simply marks the corresponding message as
592    /// > missing It then starts a timer, with a predefined timeout value, and waits for the missing
593    /// > message to be received via eager push before the timer expires. The timeout value is a
594    /// > protocol parameter that should be configured considering the diameter of the overlay and a
595    /// > target maximum recovery latency, defined by the application requirements. This is a
596    /// > parameter that should be statically configured at deployment time. (p8)
597    fn on_ihave(&mut self, sender: PI, ihaves: Vec<IHave>, io: &mut impl IO<PI>) {
598        for ihave in ihaves {
599            if !self.received_messages.contains_key(&ihave.id) {
600                self.missing_messages
601                    .entry(ihave.id)
602                    .or_default()
603                    .push_back((sender, ihave.round));
604
605                if !self.graft_timer_scheduled.contains(&ihave.id) {
606                    self.graft_timer_scheduled.insert(ihave.id);
607                    io.push(OutEvent::ScheduleTimer(
608                        self.config.graft_timeout_1,
609                        Timer::SendGraft(ihave.id),
610                    ));
611                }
612            }
613        }
614    }
615
616    /// A scheduled [`Timer::SendGraft`] has reached it's deadline.
617    fn on_send_graft_timer(&mut self, id: MessageId, io: &mut impl IO<PI>) {
618        self.graft_timer_scheduled.remove(&id);
619        // if the message was received before the timer ran out, there is no need to request it
620        // again
621        if self.received_messages.contains_key(&id) {
622            return;
623        }
624        // get the first peer that advertised this message
625        let entry = self
626            .missing_messages
627            .get_mut(&id)
628            .and_then(|entries| entries.pop_front());
629        if let Some((peer, round)) = entry {
630            self.add_eager(peer);
631            let message = Message::Graft(Graft {
632                id: Some(id),
633                round,
634            });
635            io.push(OutEvent::SendMessage(peer, message));
636
637            // "when a GRAFT message is sent, another timer is started to expire after a certain timeout,
638            // to ensure that the message will be requested to another neighbor if it is not received
639            // meanwhile. This second timeout value should be smaller that the first, in the order of
640            // an average round trip time to a neighbor." (p9)
641            io.push(OutEvent::ScheduleTimer(
642                self.config.graft_timeout_2,
643                Timer::SendGraft(id),
644            ));
645        }
646    }
647
648    /// Handle receiving a [`Message::Graft`].
649    fn on_graft(&mut self, sender: PI, details: Graft, io: &mut impl IO<PI>) {
650        self.add_eager(sender);
651        if let Some(id) = details.id {
652            if let Some(message) = self.cache.get(&id) {
653                io.push(OutEvent::SendMessage(
654                    sender,
655                    Message::Gossip(message.clone()),
656                ));
657            } else {
658                debug!(?id, peer=?sender, "on_graft failed to graft: message not in cache");
659            }
660        }
661    }
662
663    /// Handle a [`InEvent::NeighborUp`] when a peer joins the topic.
664    fn on_neighbor_up(&mut self, peer: PI) {
665        self.add_eager(peer);
666    }
667
668    /// Handle a [`InEvent::NeighborDown`] when a peer leaves the topic.
669    /// > When a neighbor is detected to leave the overlay, it is simple removed from the
670    /// > membership. Furthermore, the record of IHAVE messages sent from failed members is deleted
671    /// > from the missing history. (p9)
672    fn on_neighbor_down(&mut self, peer: PI) {
673        self.missing_messages.retain(|_message_id, ihaves| {
674            ihaves.retain(|(ihave_peer, _round)| *ihave_peer != peer);
675            !ihaves.is_empty()
676        });
677        self.eager_push_peers.remove(&peer);
678        self.lazy_push_peers.remove(&peer);
679    }
680
681    fn on_evict_cache_timer(&mut self, now: Instant, io: &mut impl IO<PI>) {
682        self.cache.expire_until(now);
683        io.push(OutEvent::ScheduleTimer(
684            self.config.cache_evict_interval,
685            Timer::EvictCache,
686        ));
687    }
688
689    /// Moves peer into eager set.
690    fn add_eager(&mut self, peer: PI) {
691        self.lazy_push_peers.remove(&peer);
692        self.eager_push_peers.insert(peer);
693    }
694
695    /// Moves peer into lazy set.
696    fn add_lazy(&mut self, peer: PI) {
697        self.eager_push_peers.remove(&peer);
698        self.lazy_push_peers.insert(peer);
699    }
700
701    /// Immediately sends message to eager peers.
702    fn eager_push(&mut self, gossip: Gossip, sender: &PI, io: &mut impl IO<PI>) {
703        for peer in self
704            .eager_push_peers
705            .iter()
706            .filter(|peer| **peer != self.me && *peer != sender)
707        {
708            io.push(OutEvent::SendMessage(
709                *peer,
710                Message::Gossip(gossip.clone()),
711            ));
712        }
713    }
714
715    /// Queue lazy message announcements into the queue that will be sent out as batched
716    /// [`Message::IHave`] messages once the [`Timer::DispatchLazyPush`] timer is triggered.
717    fn lazy_push(&mut self, gossip: Gossip, sender: &PI, io: &mut impl IO<PI>) {
718        let Some(round) = gossip.round() else {
719            return;
720        };
721        for peer in self.lazy_push_peers.iter().filter(|x| *x != sender) {
722            self.lazy_push_queue.entry(*peer).or_default().push(IHave {
723                id: gossip.id,
724                round,
725            });
726        }
727        if !self.dispatch_timer_scheduled {
728            io.push(OutEvent::ScheduleTimer(
729                self.config.dispatch_timeout,
730                Timer::DispatchLazyPush,
731            ));
732            self.dispatch_timer_scheduled = true;
733        }
734    }
735}
736
737#[cfg(test)]
738mod test {
739    use super::*;
740    #[test]
741    fn optimize_tree() {
742        let mut io = VecDeque::new();
743        let config: Config = Default::default();
744        let mut state = State::new(1, config.clone(), 1024);
745        let now = Instant::now();
746
747        // we receive an IHave message from peer 2
748        // it has `round: 2` which means that the the peer that sent us the IHave was
749        // two hops away from the original sender of the message
750        let content: Bytes = b"hi".to_vec().into();
751        let id = MessageId::from_content(&content);
752        let event = InEvent::RecvMessage(
753            2u32,
754            Message::IHave(vec![IHave {
755                id,
756                round: Round(2),
757            }]),
758        );
759        state.handle(event, now, &mut io);
760        io.clear();
761        // we then receive a `Gossip` message with the same `MessageId` from peer 3
762        // the message has `round: 6`, which means it travelled 6 hops until it reached us
763        // this is less hops than to peer 2, but not enough to trigger the optimization
764        // because we use the default config which has `optimization_threshold: 7`
765        let event = InEvent::RecvMessage(
766            3,
767            Message::Gossip(Gossip {
768                id,
769                content: content.clone(),
770                scope: DeliveryScope::Swarm(Round(6)),
771            }),
772        );
773        state.handle(event, now, &mut io);
774        let expected = {
775            // we expect a dispatch timer schedule and receive event, but no Graft or Prune
776            // messages
777            let mut io = VecDeque::new();
778            io.push(OutEvent::ScheduleTimer(
779                config.dispatch_timeout,
780                Timer::DispatchLazyPush,
781            ));
782            io.push(OutEvent::EmitEvent(Event::Received(GossipEvent {
783                content,
784                delivered_from: 3,
785                scope: DeliveryScope::Swarm(Round(6)),
786            })));
787            io
788        };
789        assert_eq!(io, expected);
790        io.clear();
791
792        // now we run the same flow again but this time peer 3 is 9 hops away from the message's
793        // sender. message's sender. this will trigger the optimization:
794        // peer 2 will be promoted to eager and peer 4 demoted to lazy
795
796        let content: Bytes = b"hi2".to_vec().into();
797        let id = MessageId::from_content(&content);
798        let event = InEvent::RecvMessage(
799            2u32,
800            Message::IHave(vec![IHave {
801                id,
802                round: Round(2),
803            }]),
804        );
805        state.handle(event, now, &mut io);
806        io.clear();
807
808        let event = InEvent::RecvMessage(
809            3,
810            Message::Gossip(Gossip {
811                id,
812                content: content.clone(),
813                scope: DeliveryScope::Swarm(Round(9)),
814            }),
815        );
816        state.handle(event, now, &mut io);
817        let expected = {
818            // this time we expect the Graft and Prune messages to be sent, performing the
819            // optimization step
820            let mut io = VecDeque::new();
821            io.push(OutEvent::SendMessage(
822                2,
823                Message::Graft(Graft {
824                    id: None,
825                    round: Round(2),
826                }),
827            ));
828            io.push(OutEvent::SendMessage(3, Message::Prune));
829            io.push(OutEvent::EmitEvent(Event::Received(GossipEvent {
830                content,
831                delivered_from: 3,
832                scope: DeliveryScope::Swarm(Round(9)),
833            })));
834            io
835        };
836        assert_eq!(io, expected);
837    }
838
839    #[test]
840    fn spoofed_messages_are_ignored() {
841        let config: Config = Default::default();
842        let mut state = State::new(1, config.clone(), 1024);
843        let now = Instant::now();
844
845        // we recv a correct gossip message and expect the Received event to be emitted
846        let content: Bytes = b"hello1".to_vec().into();
847        let message = Message::Gossip(Gossip {
848            content: content.clone(),
849            id: MessageId::from_content(&content),
850            scope: DeliveryScope::Swarm(Round(1)),
851        });
852        let mut io = VecDeque::new();
853        state.handle(InEvent::RecvMessage(2, message), now, &mut io);
854        let expected = {
855            let mut io = VecDeque::new();
856            io.push(OutEvent::ScheduleTimer(
857                config.cache_evict_interval,
858                Timer::EvictCache,
859            ));
860            io.push(OutEvent::ScheduleTimer(
861                config.dispatch_timeout,
862                Timer::DispatchLazyPush,
863            ));
864            io.push(OutEvent::EmitEvent(Event::Received(GossipEvent {
865                content,
866                delivered_from: 2,
867                scope: DeliveryScope::Swarm(Round(1)),
868            })));
869            io
870        };
871        assert_eq!(io, expected);
872
873        // now we recv with a spoofed id and expect no event to be emitted
874        let content: Bytes = b"hello2".to_vec().into();
875        let message = Message::Gossip(Gossip {
876            content,
877            id: MessageId::from_content(b"foo"),
878            scope: DeliveryScope::Swarm(Round(1)),
879        });
880        let mut io = VecDeque::new();
881        state.handle(InEvent::RecvMessage(2, message), now, &mut io);
882        let expected = VecDeque::new();
883        assert_eq!(io, expected);
884    }
885
886    #[test]
887    fn cache_is_evicted() {
888        let config: Config = Default::default();
889        let mut state = State::new(1, config.clone(), 1024);
890        let now = Instant::now();
891        let content: Bytes = b"hello1".to_vec().into();
892        let message = Message::Gossip(Gossip {
893            content: content.clone(),
894            id: MessageId::from_content(&content),
895            scope: DeliveryScope::Swarm(Round(1)),
896        });
897        let mut io = VecDeque::new();
898        state.handle(InEvent::RecvMessage(2, message), now, &mut io);
899        assert_eq!(state.cache.len(), 1);
900
901        let now = now + Duration::from_secs(1);
902        state.handle(InEvent::TimerExpired(Timer::EvictCache), now, &mut io);
903        assert_eq!(state.cache.len(), 1);
904
905        let now = now + config.message_cache_retention;
906        state.handle(InEvent::TimerExpired(Timer::EvictCache), now, &mut io);
907        assert_eq!(state.cache.len(), 0);
908    }
909}