iroh_gossip/proto/plumtree.rs
1//! Implementation of the Plumtree epidemic broadcast tree protocol
2//!
3//! The implementation is based on [this paper][paper] by Joao Leitao, Jose Pereira, Luıs Rodrigues
4//! and the [example implementation][impl] by Bartosz Sypytkowski
5//!
6//! [paper]: https://asc.di.fct.unl.pt/~jleitao/pdf/srds07-leitao.pdf
7//! [impl]: https://gist.github.com/Horusiath/84fac596101b197da0546d1697580d99
8
9use std::{
10 collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
11 hash::Hash,
12};
13
14use bytes::Bytes;
15use derive_more::{Add, From, Sub};
16use n0_future::time::{Duration, Instant};
17use postcard::experimental::max_size::MaxSize;
18use serde::{Deserialize, Serialize};
19use tracing::{debug, warn};
20
21use super::{
22 util::{idbytes_impls, TimeBoundCache},
23 PeerIdentity, IO,
24};
25
26/// A message identifier, which is the message content's blake3 hash.
27#[derive(Serialize, Deserialize, Clone, Hash, Copy, PartialEq, Eq, MaxSize)]
28pub struct MessageId([u8; 32]);
29idbytes_impls!(MessageId, "MessageId");
30
31impl MessageId {
32 /// Create a `[MessageId]` by hashing the message content.
33 ///
34 /// This hashes the input with [`blake3::hash`].
35 pub fn from_content(message: &[u8]) -> Self {
36 Self::from(blake3::hash(message))
37 }
38}
39
40/// Events Plumtree is informed of from the peer sampling service and IO layer.
41#[derive(Debug)]
42pub enum InEvent<PI> {
43 /// A [`Message`] was received from the peer.
44 RecvMessage(PI, Message),
45 /// Broadcast the contained payload to the given scope.
46 Broadcast(Bytes, Scope),
47 /// A timer has expired.
48 TimerExpired(Timer),
49 /// New member `PI` has joined the topic.
50 NeighborUp(PI),
51 /// Peer `PI` has disconnected from the topic.
52 NeighborDown(PI),
53}
54
55/// Events Plumtree emits.
56#[derive(Debug, PartialEq, Eq)]
57pub enum OutEvent<PI> {
58 /// Ask the IO layer to send a [`Message`] to peer `PI`.
59 SendMessage(PI, Message),
60 /// Schedule a [`Timer`].
61 ScheduleTimer(Duration, Timer),
62 /// Emit an [`Event`] to the application.
63 EmitEvent(Event<PI>),
64}
65
66/// Kinds of timers Plumtree needs to schedule.
67#[derive(Clone, Debug, Eq, PartialEq)]
68pub enum Timer {
69 /// Request the content for [`MessageId`] by sending [`Message::Graft`].
70 ///
71 /// The message will be sent to a peer that sent us an [`Message::IHave`] for this [`MessageId`],
72 /// which will send us the message content in reply and also move the peer into the eager set.
73 /// Will be a no-op if the message for [`MessageId`] was already received from another peer by now.
74 SendGraft(MessageId),
75 /// Dispatch the [`Message::IHave`] in our lazy push queue.
76 DispatchLazyPush,
77 /// Evict the message cache
78 EvictCache,
79}
80
81/// Event emitted by the [`State`] to the application.
82#[derive(Clone, Debug, PartialEq, Eq)]
83pub enum Event<PI> {
84 /// A new gossip message was received.
85 Received(GossipEvent<PI>),
86}
87
88#[derive(Clone, derive_more::Debug, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
89pub struct GossipEvent<PI> {
90 /// The content of the gossip message.
91 #[debug("<{}b>", content.len())]
92 pub content: Bytes,
93 /// The peer that we received the gossip message from. Note that this is not the peer that
94 /// originally broadcasted the message, but the peer before us in the gossiping path.
95 pub delivered_from: PI,
96 /// The broadcast scope of the message.
97 pub scope: DeliveryScope,
98}
99
100impl<PI> GossipEvent<PI> {
101 fn from_message(message: &Gossip, from: PI) -> Self {
102 Self {
103 content: message.content.clone(),
104 scope: message.scope,
105 delivered_from: from,
106 }
107 }
108}
109
110/// Number of delivery hops a message has taken.
111#[derive(
112 From,
113 Add,
114 Sub,
115 Serialize,
116 Deserialize,
117 Eq,
118 PartialEq,
119 PartialOrd,
120 Ord,
121 Clone,
122 Copy,
123 Debug,
124 Hash,
125 MaxSize,
126)]
127pub struct Round(u16);
128
129impl Round {
130 pub fn next(&self) -> Round {
131 Round(self.0 + 1)
132 }
133}
134
135/// Messages that we can send and receive from peers within the topic.
136#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
137pub enum Message {
138 /// When receiving Gossip, emit as event and forward full message to eager peer and (after a
139 /// delay) message IDs to lazy peers.
140 Gossip(Gossip),
141 /// When receiving Prune, move the peer from the eager to the lazy set.
142 Prune,
143 /// When receiving Graft, move the peer to the eager set and send the full content for the
144 /// included message ID.
145 Graft(Graft),
146 /// When receiving IHave, do nothing initially, and request the messages for the included
147 /// message IDs after some time if they aren't pushed eagerly to us.
148 IHave(Vec<IHave>),
149}
150
151/// Payload messages transmitted by the protocol.
152#[derive(Serialize, Deserialize, Clone, derive_more::Debug, PartialEq, Eq)]
153pub struct Gossip {
154 /// Id of the message.
155 id: MessageId,
156 /// Message contents.
157 #[debug("<{}b>", content.len())]
158 content: Bytes,
159 /// Scope to broadcast to.
160 scope: DeliveryScope,
161}
162
163impl Gossip {
164 fn round(&self) -> Option<Round> {
165 match self.scope {
166 DeliveryScope::Swarm(round) => Some(round),
167 DeliveryScope::Neighbors => None,
168 }
169 }
170}
171
172/// The scope to deliver the message to.
173#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Copy)]
174pub enum DeliveryScope {
175 /// This message was received from the swarm, with a distance (in hops) travelled from the
176 /// original broadcaster.
177 Swarm(Round),
178 /// This message was received from a direct neighbor that broadcasted the message to neighbors
179 /// only.
180 Neighbors,
181}
182
183impl DeliveryScope {
184 /// Whether this message was directly received from its publisher.
185 pub fn is_direct(&self) -> bool {
186 matches!(self, Self::Neighbors | Self::Swarm(Round(0)))
187 }
188}
189
190/// The broadcast scope of a gossip message.
191#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Copy)]
192pub enum Scope {
193 /// The message is broadcast to all peers in the swarm.
194 Swarm,
195 /// The message is broadcast only to the immediate neighbors of a peer.
196 Neighbors,
197}
198
199impl Gossip {
200 /// Get a clone of this `Gossip` message and increase the delivery round by 1.
201 pub fn next_round(&self) -> Option<Gossip> {
202 match self.scope {
203 DeliveryScope::Neighbors => None,
204 DeliveryScope::Swarm(round) => Some(Gossip {
205 id: self.id,
206 content: self.content.clone(),
207 scope: DeliveryScope::Swarm(round.next()),
208 }),
209 }
210 }
211
212 /// Validate that the message id is the blake3 hash of the message content.
213 pub fn validate(&self) -> bool {
214 let expected = MessageId::from_content(&self.content);
215 expected == self.id
216 }
217}
218
219/// Control message to inform peers we have a message without transmitting the whole payload.
220#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, MaxSize)]
221pub struct IHave {
222 /// Id of the message.
223 pub(crate) id: MessageId,
224 /// Delivery round of the message.
225 pub(crate) round: Round,
226}
227
228/// Control message to signal a peer that they have been moved to the eager set, and to ask the
229/// peer to do the same with this node.
230#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
231pub struct Graft {
232 /// Message id that triggers the graft, if any.
233 /// On receiving a graft, the payload message must be sent in reply if a message id is set.
234 id: Option<MessageId>,
235 /// Delivery round of the [`Message::IHave`] that triggered this Graft message.
236 round: Round,
237}
238
239/// Configuration for the gossip broadcast layer.
240///
241/// Currently, the expectation is that the configuration is the same for all peers in the
242/// network (as recommended in the paper).
243#[derive(Clone, Debug, Serialize, Deserialize)]
244#[serde(default)]
245pub struct Config {
246 /// When receiving an `IHave` message, this timeout is registered. If the message for the
247 /// `IHave` was not received once the timeout is expired, a `Graft` message is sent to the
248 /// peer that sent us the `IHave` to request the message payload.
249 ///
250 /// The plumtree paper notes:
251 /// > The timeout value is a protocol parameter that should be configured considering the
252 /// > diameter of the overlay and a target maximum recovery latency, defined by the application
253 /// > requirements. (p.8)
254 pub graft_timeout_1: Duration,
255 /// This timeout is registered when sending a `Graft` message. If a reply has not been
256 /// received once the timeout expires, we send another `Graft` message to the next peer that
257 /// sent us an `IHave` for this message.
258 ///
259 /// The plumtree paper notes:
260 /// > This second timeout value should be smaller that the first, in the order of an average
261 /// > round trip time to a neighbor.
262 pub graft_timeout_2: Duration,
263 /// Timeout after which `IHave` messages are pushed to peers.
264 pub dispatch_timeout: Duration,
265 /// The protocol performs a tree optimization, which promotes lazy peers to eager peers if the
266 /// `Message::IHave` messages received from them have a lower number of hops from the
267 /// message's origin as the `InEvent::Broadcast` messages received from our eager peers. This
268 /// parameter is the number of hops that the lazy peers must be closer to the origin than our
269 /// eager peers to be promoted to become an eager peer.
270 pub optimization_threshold: Round,
271
272 /// Duration for which to keep gossip messages in the internal message cache.
273 ///
274 /// Messages broadcast from this node or received from other nodes are kept in an internal
275 /// cache for this duration before being evicted. If this is too low, other nodes will not be
276 /// able to retrieve messages once they need them. If this is high, the cache will grow.
277 ///
278 /// Should be at least around several round trip times to peers.
279 pub message_cache_retention: Duration,
280
281 /// Duration for which to keep the `MessageId`s for received messages.
282 ///
283 /// Should be at least as long as [`Self::message_cache_retention`], usually will be longer to
284 /// not accidentally receive messages multiple times.
285 pub message_id_retention: Duration,
286
287 /// How often the internal caches will be checked for expired items.
288 pub cache_evict_interval: Duration,
289}
290
291impl Default for Config {
292 /// Sensible defaults for the plumtree configuration
293 //
294 // TODO: Find out what good defaults are for the three timeouts here. Current numbers are
295 // guesses that need validation. The paper does not have concrete recommendations for these
296 // numbers.
297 fn default() -> Self {
298 Self {
299 // Paper: "The timeout value is a protocol parameter that should be configured considering
300 // the diameter of the overlay and a target maximum recovery latency, defined by the
301 // application requirements. This is a parameter that should be statically configured
302 // at deployment time." (p. 8)
303 //
304 // Earthstar has 5ms it seems, see https://github.com/earthstar-project/earthstar/blob/1523c640fedf106f598bf79b184fb0ada64b1cc0/src/syncer/plum_tree.ts#L75
305 // However in the paper it is more like a few roundtrips if I read things correctly.
306 graft_timeout_1: Duration::from_millis(80),
307
308 // Paper: "This second timeout value should be smaller that the first, in the order of an
309 // average round trip time to a neighbor." (p. 9)
310 //
311 // Earthstar doesn't have this step from my reading.
312 graft_timeout_2: Duration::from_millis(40),
313
314 // Again, paper does not tell a recommended number here. Likely should be quite small,
315 // as to not delay messages without need. This would also be the time frame in which
316 // `IHave`s are aggregated to save on packets.
317 //
318 // Eartstar dispatches immediately from my reading.
319 dispatch_timeout: Duration::from_millis(5),
320
321 // This number comes from experiment settings the plumtree paper (p. 12)
322 optimization_threshold: Round(7),
323
324 // This is a certainly-high-enough value for usual operation.
325 message_cache_retention: Duration::from_secs(30),
326 message_id_retention: Duration::from_secs(90),
327 cache_evict_interval: Duration::from_secs(1),
328 }
329 }
330}
331
332/// Stats about this topic's plumtree.
333#[derive(Debug, Default, Clone)]
334pub struct Stats {
335 /// Number of payload messages received so far.
336 ///
337 /// See [`Message::Gossip`].
338 pub payload_messages_received: u64,
339 /// Number of control messages received so far.
340 ///
341 /// See [`Message::Prune`], [`Message::Graft`], [`Message::IHave`].
342 pub control_messages_received: u64,
343 /// Max round seen so far.
344 pub max_last_delivery_hop: u16,
345}
346
347/// State of the plumtree.
348#[derive(Debug)]
349pub struct State<PI> {
350 /// Our address.
351 me: PI,
352 /// Configuration for this plumtree.
353 config: Config,
354
355 /// Set of peers used for payload exchange.
356 pub(crate) eager_push_peers: BTreeSet<PI>,
357 /// Set of peers used for control message exchange.
358 pub(crate) lazy_push_peers: BTreeSet<PI>,
359
360 lazy_push_queue: BTreeMap<PI, Vec<IHave>>,
361
362 /// Messages for which a [`MessageId`] has been seen via a [`Message::IHave`] but we have not
363 /// yet received the full payload. For each, we store the peers that have claimed to have this
364 /// message.
365 missing_messages: HashMap<MessageId, VecDeque<(PI, Round)>>,
366 /// Messages for which the full payload has been seen.
367 received_messages: TimeBoundCache<MessageId, ()>,
368 /// Payloads of received messages.
369 cache: TimeBoundCache<MessageId, Gossip>,
370
371 /// Message ids for which a [`Timer::SendGraft`] has been scheduled.
372 graft_timer_scheduled: HashSet<MessageId>,
373 /// Whether a [`Timer::DispatchLazyPush`] has been scheduled.
374 dispatch_timer_scheduled: bool,
375
376 /// Set to false after the first message is received. Used for initial timer scheduling.
377 init: bool,
378
379 /// [`Stats`] of this plumtree.
380 pub(crate) stats: Stats,
381
382 max_message_size: usize,
383}
384
385impl<PI: PeerIdentity> State<PI> {
386 /// Initialize the [`State`] of a plumtree.
387 pub fn new(me: PI, config: Config, max_message_size: usize) -> Self {
388 Self {
389 me,
390 eager_push_peers: Default::default(),
391 lazy_push_peers: Default::default(),
392 lazy_push_queue: Default::default(),
393 config,
394 missing_messages: Default::default(),
395 received_messages: Default::default(),
396 graft_timer_scheduled: Default::default(),
397 dispatch_timer_scheduled: false,
398 cache: Default::default(),
399 init: false,
400 stats: Default::default(),
401 max_message_size,
402 }
403 }
404
405 /// Handle an [`InEvent`].
406 pub fn handle(&mut self, event: InEvent<PI>, now: Instant, io: &mut impl IO<PI>) {
407 if !self.init {
408 self.init = true;
409 self.on_evict_cache_timer(now, io)
410 }
411 match event {
412 InEvent::RecvMessage(from, message) => self.handle_message(from, message, now, io),
413 InEvent::Broadcast(data, scope) => self.broadcast(data, scope, now, io),
414 InEvent::NeighborUp(peer) => self.on_neighbor_up(peer),
415 InEvent::NeighborDown(peer) => self.on_neighbor_down(peer),
416 InEvent::TimerExpired(timer) => match timer {
417 Timer::DispatchLazyPush => self.on_dispatch_timer(io),
418 Timer::SendGraft(id) => {
419 self.on_send_graft_timer(id, io);
420 }
421 Timer::EvictCache => self.on_evict_cache_timer(now, io),
422 },
423 }
424 }
425
426 /// Get access to the [`Stats`] of the plumtree.
427 pub fn stats(&self) -> &Stats {
428 &self.stats
429 }
430
431 /// Handle receiving a [`Message`].
432 fn handle_message(&mut self, sender: PI, message: Message, now: Instant, io: &mut impl IO<PI>) {
433 if matches!(message, Message::Gossip(_)) {
434 self.stats.payload_messages_received += 1;
435 } else {
436 self.stats.control_messages_received += 1;
437 }
438 match message {
439 Message::Gossip(details) => self.on_gossip(sender, details, now, io),
440 Message::Prune => self.on_prune(sender),
441 Message::IHave(details) => self.on_ihave(sender, details, io),
442 Message::Graft(details) => self.on_graft(sender, details, io),
443 }
444 }
445
446 /// Dispatches messages from lazy queue over to lazy peers.
447 fn on_dispatch_timer(&mut self, io: &mut impl IO<PI>) {
448 let chunk_size = self.max_message_size
449 // Space for discriminator
450 - 1
451 // Space for length prefix
452 - 2;
453 let chunk_len = chunk_size / IHave::POSTCARD_MAX_SIZE;
454 while let Some((peer, list)) = self.lazy_push_queue.pop_first() {
455 for chunk in list.chunks(chunk_len) {
456 io.push(OutEvent::SendMessage(peer, Message::IHave(chunk.to_vec())));
457 }
458 }
459
460 self.dispatch_timer_scheduled = false;
461 }
462
463 /// Send a gossip message.
464 ///
465 /// Will be pushed in full to eager peers.
466 /// Pushing the message id to the lazy peers is delayed by a timer.
467 fn broadcast(&mut self, content: Bytes, scope: Scope, now: Instant, io: &mut impl IO<PI>) {
468 let id = MessageId::from_content(&content);
469 let scope = match scope {
470 Scope::Neighbors => DeliveryScope::Neighbors,
471 Scope::Swarm => DeliveryScope::Swarm(Round(0)),
472 };
473 let message = Gossip { id, content, scope };
474 let me = self.me;
475 if let DeliveryScope::Swarm(_) = scope {
476 self.received_messages
477 .insert(id, (), now + self.config.message_id_retention);
478 self.cache.insert(
479 id,
480 message.clone(),
481 now + self.config.message_cache_retention,
482 );
483 self.lazy_push(message.clone(), &me, io);
484 }
485
486 self.eager_push(message.clone(), &me, io);
487 }
488
489 /// Handle receiving a [`Message::Gossip`].
490 fn on_gossip(&mut self, sender: PI, message: Gossip, now: Instant, io: &mut impl IO<PI>) {
491 // Validate that the message id is the blake3 hash of the message content.
492 if !message.validate() {
493 // TODO: Do we want to take any measures against the sender if we received a message
494 // with a spoofed message id?
495 warn!(
496 peer = ?sender,
497 "Received a message with spoofed message id ({})", message.id
498 );
499 return;
500 }
501
502 // if we already received this message: move peer to lazy set
503 // and notify peer about this.
504 if self.received_messages.contains_key(&message.id) {
505 self.add_lazy(sender);
506 io.push(OutEvent::SendMessage(sender, Message::Prune));
507 // otherwise store the message, emit to application and forward to peers
508 } else {
509 if let DeliveryScope::Swarm(prev_round) = message.scope {
510 // insert the message in the list of received messages
511 self.received_messages.insert(
512 message.id,
513 (),
514 now + self.config.message_id_retention,
515 );
516 // increase the round for forwarding the message, and add to cache
517 // to reply to Graft messages later
518 // TODO: add callback/event to application to get missing messages that were received before?
519 let message = message.next_round().expect("just checked");
520
521 self.cache.insert(
522 message.id,
523 message.clone(),
524 now + self.config.message_cache_retention,
525 );
526 // push the message to our peers
527 self.eager_push(message.clone(), &sender, io);
528 self.lazy_push(message.clone(), &sender, io);
529 // cleanup places where we track missing messages
530 self.graft_timer_scheduled.remove(&message.id);
531 let previous_ihaves = self.missing_messages.remove(&message.id);
532 // do the optimization step from the paper
533 if let Some(previous_ihaves) = previous_ihaves {
534 self.optimize_tree(&sender, &message, previous_ihaves, io);
535 }
536 self.stats.max_last_delivery_hop =
537 self.stats.max_last_delivery_hop.max(prev_round.0);
538 }
539
540 // emit event to application
541 io.push(OutEvent::EmitEvent(Event::Received(
542 GossipEvent::from_message(&message, sender),
543 )));
544 }
545 }
546
547 /// Optimize the tree by pruning the `sender` of a [`Message::Gossip`] if we previously
548 /// received a [`Message::IHave`] for the same message with a much lower number of delivery
549 /// hops from the original broadcaster of the message.
550 ///
551 /// See [Config::optimization_threshold].
552 fn optimize_tree(
553 &mut self,
554 gossip_sender: &PI,
555 message: &Gossip,
556 previous_ihaves: VecDeque<(PI, Round)>,
557 io: &mut impl IO<PI>,
558 ) {
559 let round = message.round().expect("only called for swarm messages");
560 let best_ihave = previous_ihaves
561 .iter()
562 .min_by(|(_a_peer, a_round), (_b_peer, b_round)| a_round.cmp(b_round))
563 .copied();
564
565 if let Some((ihave_peer, ihave_round)) = best_ihave {
566 if (ihave_round < round) && (round - ihave_round) >= self.config.optimization_threshold
567 {
568 // Graft the sender of the IHave, but only if it's not already eager.
569 if !self.eager_push_peers.contains(&ihave_peer) {
570 let message = Message::Graft(Graft {
571 id: None,
572 round: ihave_round,
573 });
574 self.add_eager(ihave_peer);
575 io.push(OutEvent::SendMessage(ihave_peer, message));
576 }
577 // Prune the sender of the Gossip.
578 self.add_lazy(*gossip_sender);
579 io.push(OutEvent::SendMessage(*gossip_sender, Message::Prune));
580 }
581 }
582 }
583
584 /// Handle receiving a [`Message::Prune`].
585 fn on_prune(&mut self, sender: PI) {
586 self.add_lazy(sender);
587 }
588
589 /// Handle receiving a [`Message::IHave`].
590 ///
591 /// > When a node receives a IHAVE message, it simply marks the corresponding message as
592 /// > missing It then starts a timer, with a predefined timeout value, and waits for the missing
593 /// > message to be received via eager push before the timer expires. The timeout value is a
594 /// > protocol parameter that should be configured considering the diameter of the overlay and a
595 /// > target maximum recovery latency, defined by the application requirements. This is a
596 /// > parameter that should be statically configured at deployment time. (p8)
597 fn on_ihave(&mut self, sender: PI, ihaves: Vec<IHave>, io: &mut impl IO<PI>) {
598 for ihave in ihaves {
599 if !self.received_messages.contains_key(&ihave.id) {
600 self.missing_messages
601 .entry(ihave.id)
602 .or_default()
603 .push_back((sender, ihave.round));
604
605 if !self.graft_timer_scheduled.contains(&ihave.id) {
606 self.graft_timer_scheduled.insert(ihave.id);
607 io.push(OutEvent::ScheduleTimer(
608 self.config.graft_timeout_1,
609 Timer::SendGraft(ihave.id),
610 ));
611 }
612 }
613 }
614 }
615
616 /// A scheduled [`Timer::SendGraft`] has reached it's deadline.
617 fn on_send_graft_timer(&mut self, id: MessageId, io: &mut impl IO<PI>) {
618 self.graft_timer_scheduled.remove(&id);
619 // if the message was received before the timer ran out, there is no need to request it
620 // again
621 if self.received_messages.contains_key(&id) {
622 return;
623 }
624 // get the first peer that advertised this message
625 let entry = self
626 .missing_messages
627 .get_mut(&id)
628 .and_then(|entries| entries.pop_front());
629 if let Some((peer, round)) = entry {
630 self.add_eager(peer);
631 let message = Message::Graft(Graft {
632 id: Some(id),
633 round,
634 });
635 io.push(OutEvent::SendMessage(peer, message));
636
637 // "when a GRAFT message is sent, another timer is started to expire after a certain timeout,
638 // to ensure that the message will be requested to another neighbor if it is not received
639 // meanwhile. This second timeout value should be smaller that the first, in the order of
640 // an average round trip time to a neighbor." (p9)
641 io.push(OutEvent::ScheduleTimer(
642 self.config.graft_timeout_2,
643 Timer::SendGraft(id),
644 ));
645 }
646 }
647
648 /// Handle receiving a [`Message::Graft`].
649 fn on_graft(&mut self, sender: PI, details: Graft, io: &mut impl IO<PI>) {
650 self.add_eager(sender);
651 if let Some(id) = details.id {
652 if let Some(message) = self.cache.get(&id) {
653 io.push(OutEvent::SendMessage(
654 sender,
655 Message::Gossip(message.clone()),
656 ));
657 } else {
658 debug!(?id, peer=?sender, "on_graft failed to graft: message not in cache");
659 }
660 }
661 }
662
663 /// Handle a [`InEvent::NeighborUp`] when a peer joins the topic.
664 fn on_neighbor_up(&mut self, peer: PI) {
665 self.add_eager(peer);
666 }
667
668 /// Handle a [`InEvent::NeighborDown`] when a peer leaves the topic.
669 /// > When a neighbor is detected to leave the overlay, it is simple removed from the
670 /// > membership. Furthermore, the record of IHAVE messages sent from failed members is deleted
671 /// > from the missing history. (p9)
672 fn on_neighbor_down(&mut self, peer: PI) {
673 self.missing_messages.retain(|_message_id, ihaves| {
674 ihaves.retain(|(ihave_peer, _round)| *ihave_peer != peer);
675 !ihaves.is_empty()
676 });
677 self.eager_push_peers.remove(&peer);
678 self.lazy_push_peers.remove(&peer);
679 }
680
681 fn on_evict_cache_timer(&mut self, now: Instant, io: &mut impl IO<PI>) {
682 self.cache.expire_until(now);
683 io.push(OutEvent::ScheduleTimer(
684 self.config.cache_evict_interval,
685 Timer::EvictCache,
686 ));
687 }
688
689 /// Moves peer into eager set.
690 fn add_eager(&mut self, peer: PI) {
691 self.lazy_push_peers.remove(&peer);
692 self.eager_push_peers.insert(peer);
693 }
694
695 /// Moves peer into lazy set.
696 fn add_lazy(&mut self, peer: PI) {
697 self.eager_push_peers.remove(&peer);
698 self.lazy_push_peers.insert(peer);
699 }
700
701 /// Immediately sends message to eager peers.
702 fn eager_push(&mut self, gossip: Gossip, sender: &PI, io: &mut impl IO<PI>) {
703 for peer in self
704 .eager_push_peers
705 .iter()
706 .filter(|peer| **peer != self.me && *peer != sender)
707 {
708 io.push(OutEvent::SendMessage(
709 *peer,
710 Message::Gossip(gossip.clone()),
711 ));
712 }
713 }
714
715 /// Queue lazy message announcements into the queue that will be sent out as batched
716 /// [`Message::IHave`] messages once the [`Timer::DispatchLazyPush`] timer is triggered.
717 fn lazy_push(&mut self, gossip: Gossip, sender: &PI, io: &mut impl IO<PI>) {
718 let Some(round) = gossip.round() else {
719 return;
720 };
721 for peer in self.lazy_push_peers.iter().filter(|x| *x != sender) {
722 self.lazy_push_queue.entry(*peer).or_default().push(IHave {
723 id: gossip.id,
724 round,
725 });
726 }
727 if !self.dispatch_timer_scheduled {
728 io.push(OutEvent::ScheduleTimer(
729 self.config.dispatch_timeout,
730 Timer::DispatchLazyPush,
731 ));
732 self.dispatch_timer_scheduled = true;
733 }
734 }
735}
736
737#[cfg(test)]
738mod test {
739 use super::*;
740 #[test]
741 fn optimize_tree() {
742 let mut io = VecDeque::new();
743 let config: Config = Default::default();
744 let mut state = State::new(1, config.clone(), 1024);
745 let now = Instant::now();
746
747 // we receive an IHave message from peer 2
748 // it has `round: 2` which means that the the peer that sent us the IHave was
749 // two hops away from the original sender of the message
750 let content: Bytes = b"hi".to_vec().into();
751 let id = MessageId::from_content(&content);
752 let event = InEvent::RecvMessage(
753 2u32,
754 Message::IHave(vec![IHave {
755 id,
756 round: Round(2),
757 }]),
758 );
759 state.handle(event, now, &mut io);
760 io.clear();
761 // we then receive a `Gossip` message with the same `MessageId` from peer 3
762 // the message has `round: 6`, which means it travelled 6 hops until it reached us
763 // this is less hops than to peer 2, but not enough to trigger the optimization
764 // because we use the default config which has `optimization_threshold: 7`
765 let event = InEvent::RecvMessage(
766 3,
767 Message::Gossip(Gossip {
768 id,
769 content: content.clone(),
770 scope: DeliveryScope::Swarm(Round(6)),
771 }),
772 );
773 state.handle(event, now, &mut io);
774 let expected = {
775 // we expect a dispatch timer schedule and receive event, but no Graft or Prune
776 // messages
777 let mut io = VecDeque::new();
778 io.push(OutEvent::ScheduleTimer(
779 config.dispatch_timeout,
780 Timer::DispatchLazyPush,
781 ));
782 io.push(OutEvent::EmitEvent(Event::Received(GossipEvent {
783 content,
784 delivered_from: 3,
785 scope: DeliveryScope::Swarm(Round(6)),
786 })));
787 io
788 };
789 assert_eq!(io, expected);
790 io.clear();
791
792 // now we run the same flow again but this time peer 3 is 9 hops away from the message's
793 // sender. message's sender. this will trigger the optimization:
794 // peer 2 will be promoted to eager and peer 4 demoted to lazy
795
796 let content: Bytes = b"hi2".to_vec().into();
797 let id = MessageId::from_content(&content);
798 let event = InEvent::RecvMessage(
799 2u32,
800 Message::IHave(vec![IHave {
801 id,
802 round: Round(2),
803 }]),
804 );
805 state.handle(event, now, &mut io);
806 io.clear();
807
808 let event = InEvent::RecvMessage(
809 3,
810 Message::Gossip(Gossip {
811 id,
812 content: content.clone(),
813 scope: DeliveryScope::Swarm(Round(9)),
814 }),
815 );
816 state.handle(event, now, &mut io);
817 let expected = {
818 // this time we expect the Graft and Prune messages to be sent, performing the
819 // optimization step
820 let mut io = VecDeque::new();
821 io.push(OutEvent::SendMessage(
822 2,
823 Message::Graft(Graft {
824 id: None,
825 round: Round(2),
826 }),
827 ));
828 io.push(OutEvent::SendMessage(3, Message::Prune));
829 io.push(OutEvent::EmitEvent(Event::Received(GossipEvent {
830 content,
831 delivered_from: 3,
832 scope: DeliveryScope::Swarm(Round(9)),
833 })));
834 io
835 };
836 assert_eq!(io, expected);
837 }
838
839 #[test]
840 fn spoofed_messages_are_ignored() {
841 let config: Config = Default::default();
842 let mut state = State::new(1, config.clone(), 1024);
843 let now = Instant::now();
844
845 // we recv a correct gossip message and expect the Received event to be emitted
846 let content: Bytes = b"hello1".to_vec().into();
847 let message = Message::Gossip(Gossip {
848 content: content.clone(),
849 id: MessageId::from_content(&content),
850 scope: DeliveryScope::Swarm(Round(1)),
851 });
852 let mut io = VecDeque::new();
853 state.handle(InEvent::RecvMessage(2, message), now, &mut io);
854 let expected = {
855 let mut io = VecDeque::new();
856 io.push(OutEvent::ScheduleTimer(
857 config.cache_evict_interval,
858 Timer::EvictCache,
859 ));
860 io.push(OutEvent::ScheduleTimer(
861 config.dispatch_timeout,
862 Timer::DispatchLazyPush,
863 ));
864 io.push(OutEvent::EmitEvent(Event::Received(GossipEvent {
865 content,
866 delivered_from: 2,
867 scope: DeliveryScope::Swarm(Round(1)),
868 })));
869 io
870 };
871 assert_eq!(io, expected);
872
873 // now we recv with a spoofed id and expect no event to be emitted
874 let content: Bytes = b"hello2".to_vec().into();
875 let message = Message::Gossip(Gossip {
876 content,
877 id: MessageId::from_content(b"foo"),
878 scope: DeliveryScope::Swarm(Round(1)),
879 });
880 let mut io = VecDeque::new();
881 state.handle(InEvent::RecvMessage(2, message), now, &mut io);
882 let expected = VecDeque::new();
883 assert_eq!(io, expected);
884 }
885
886 #[test]
887 fn cache_is_evicted() {
888 let config: Config = Default::default();
889 let mut state = State::new(1, config.clone(), 1024);
890 let now = Instant::now();
891 let content: Bytes = b"hello1".to_vec().into();
892 let message = Message::Gossip(Gossip {
893 content: content.clone(),
894 id: MessageId::from_content(&content),
895 scope: DeliveryScope::Swarm(Round(1)),
896 });
897 let mut io = VecDeque::new();
898 state.handle(InEvent::RecvMessage(2, message), now, &mut io);
899 assert_eq!(state.cache.len(), 1);
900
901 let now = now + Duration::from_secs(1);
902 state.handle(InEvent::TimerExpired(Timer::EvictCache), now, &mut io);
903 assert_eq!(state.cache.len(), 1);
904
905 let now = now + config.message_cache_retention;
906 state.handle(InEvent::TimerExpired(Timer::EvictCache), now, &mut io);
907 assert_eq!(state.cache.len(), 0);
908 }
909}