iroh_gossip/proto/
topic.rs

1//! This module contains the implementation of the gossiping protocol for an individual topic
2
3use std::collections::VecDeque;
4
5use bytes::Bytes;
6use derive_more::From;
7use n0_future::time::{Duration, Instant};
8use rand::Rng;
9use serde::{Deserialize, Serialize};
10
11use super::{
12    hyparview::{self, InEvent as SwarmIn},
13    plumtree::{self, GossipEvent, InEvent as GossipIn, Scope},
14    state::MessageKind,
15    PeerData, PeerIdentity, DEFAULT_MAX_MESSAGE_SIZE,
16};
17use crate::proto::MIN_MAX_MESSAGE_SIZE;
18
19/// Input event to the topic state handler.
20#[derive(Clone, Debug)]
21pub enum InEvent<PI> {
22    /// Message received from the network.
23    RecvMessage(PI, Message<PI>),
24    /// Execute a command from the application.
25    Command(Command<PI>),
26    /// Trigger a previously scheduled timer.
27    TimerExpired(Timer<PI>),
28    /// Peer disconnected on the network level.
29    PeerDisconnected(PI),
30    /// Update the opaque peer data about yourself.
31    UpdatePeerData(PeerData),
32}
33
34/// An output event from the state handler.
35#[derive(Debug, PartialEq, Eq)]
36pub enum OutEvent<PI> {
37    /// Send a message on the network
38    SendMessage(PI, Message<PI>),
39    /// Emit an event to the application.
40    EmitEvent(Event<PI>),
41    /// Schedule a timer. The runtime is responsible for sending an [InEvent::TimerExpired]
42    /// after the duration.
43    ScheduleTimer(Duration, Timer<PI>),
44    /// Close the connection to a peer on the network level.
45    DisconnectPeer(PI),
46    /// Emitted when new [`PeerData`] was received for a peer.
47    PeerData(PI, PeerData),
48}
49
50impl<PI> From<hyparview::OutEvent<PI>> for OutEvent<PI> {
51    fn from(event: hyparview::OutEvent<PI>) -> Self {
52        use hyparview::OutEvent::*;
53        match event {
54            SendMessage(to, message) => Self::SendMessage(to, message.into()),
55            ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()),
56            DisconnectPeer(peer) => Self::DisconnectPeer(peer),
57            EmitEvent(event) => Self::EmitEvent(event.into()),
58            PeerData(peer, data) => Self::PeerData(peer, data),
59        }
60    }
61}
62
63impl<PI> From<plumtree::OutEvent<PI>> for OutEvent<PI> {
64    fn from(event: plumtree::OutEvent<PI>) -> Self {
65        use plumtree::OutEvent::*;
66        match event {
67            SendMessage(to, message) => Self::SendMessage(to, message.into()),
68            ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()),
69            EmitEvent(event) => Self::EmitEvent(event.into()),
70        }
71    }
72}
73
74/// A trait for a concrete type to push `OutEvent`s to.
75///
76/// The implementation is generic over this trait, which allows the upper layer to supply a
77/// container of their choice for `OutEvent`s emitted from the protocol state.
78pub trait IO<PI: Clone> {
79    /// Push an event in the IO container
80    fn push(&mut self, event: impl Into<OutEvent<PI>>);
81
82    /// Push all events from an iterator into the IO container
83    fn push_from_iter(&mut self, iter: impl IntoIterator<Item = impl Into<OutEvent<PI>>>) {
84        for event in iter.into_iter() {
85            self.push(event);
86        }
87    }
88}
89
90/// A protocol message for a particular topic
91#[derive(From, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
92pub enum Message<PI> {
93    /// A message of the swarm membership layer
94    Swarm(hyparview::Message<PI>),
95    /// A message of the gossip broadcast layer
96    Gossip(plumtree::Message),
97}
98
99impl<PI> Message<PI> {
100    /// Get the kind of this message
101    pub fn kind(&self) -> MessageKind {
102        match self {
103            Message::Swarm(_) => MessageKind::Control,
104            Message::Gossip(message) => match message {
105                plumtree::Message::Gossip(_) => MessageKind::Data,
106                _ => MessageKind::Control,
107            },
108        }
109    }
110
111    /// Returns `true` if this is a disconnect message (which is the last message sent to a peer per topic).
112    pub fn is_disconnect(&self) -> bool {
113        matches!(self, Message::Swarm(hyparview::Message::Disconnect(_)))
114    }
115}
116
117/// An event to be emitted to the application for a particular topic.
118#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
119pub enum Event<PI> {
120    /// We have a new, direct neighbor in the swarm membership layer for this topic
121    NeighborUp(PI),
122    /// We dropped direct neighbor in the swarm membership layer for this topic
123    NeighborDown(PI),
124    /// A gossip message was received for this topic
125    Received(GossipEvent<PI>),
126}
127
128impl<PI> From<hyparview::Event<PI>> for Event<PI> {
129    fn from(value: hyparview::Event<PI>) -> Self {
130        match value {
131            hyparview::Event::NeighborUp(peer) => Self::NeighborUp(peer),
132            hyparview::Event::NeighborDown(peer) => Self::NeighborDown(peer),
133        }
134    }
135}
136
137impl<PI> From<plumtree::Event<PI>> for Event<PI> {
138    fn from(value: plumtree::Event<PI>) -> Self {
139        match value {
140            plumtree::Event::Received(event) => Self::Received(event),
141        }
142    }
143}
144
145/// A timer to be registered for a particular topic.
146///
147/// This should be treated as an opaque value by the implementer and, once emitted, simply returned
148/// to the protocol through [`InEvent::TimerExpired`].
149#[derive(Clone, From, Debug, PartialEq, Eq)]
150pub enum Timer<PI> {
151    /// A timer for the swarm layer
152    Swarm(hyparview::Timer<PI>),
153    /// A timer for the gossip layer
154    Gossip(plumtree::Timer),
155}
156
157/// A command to the protocol state for a particular topic.
158#[derive(Clone, derive_more::Debug)]
159pub enum Command<PI> {
160    /// Join this topic and connect to peers.
161    ///
162    /// If the list of peers is empty, will prepare the state and accept incoming join requests,
163    /// but only become operational after the first join request by another peer.
164    Join(Vec<PI>),
165    /// Broadcast a message for this topic.
166    Broadcast(#[debug("<{}b>", _0.len())] Bytes, Scope),
167    /// Leave this topic and drop all state.
168    Quit,
169}
170
171impl<PI: Clone> IO<PI> for VecDeque<OutEvent<PI>> {
172    fn push(&mut self, event: impl Into<OutEvent<PI>>) {
173        self.push_back(event.into())
174    }
175}
176
177/// Protocol configuration
178#[derive(Clone, Debug, Serialize, Deserialize)]
179#[serde(default)]
180pub struct Config {
181    /// Configuration for the swarm membership layer
182    pub membership: hyparview::Config,
183    /// Configuration for the gossip broadcast layer
184    pub broadcast: plumtree::Config,
185    /// Max message size in bytes.
186    ///
187    /// This size should be the same across a network to ensure all nodes can transmit and read large messages.
188    ///
189    /// At minimum, this size should be large enough to send gossip control messages. This can vary, depending on the size of the [`PeerIdentity`] you use and the size of the [`PeerData`] you transmit in your messages.
190    ///
191    /// The default is [`DEFAULT_MAX_MESSAGE_SIZE`].
192    pub max_message_size: usize,
193}
194
195impl Default for Config {
196    fn default() -> Self {
197        Self {
198            membership: Default::default(),
199            broadcast: Default::default(),
200            max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
201        }
202    }
203}
204
205/// The topic state maintains the swarm membership and broadcast tree for a particular topic.
206#[derive(Debug)]
207pub struct State<PI, R> {
208    me: PI,
209    pub(crate) swarm: hyparview::State<PI, R>,
210    pub(crate) gossip: plumtree::State<PI>,
211    outbox: VecDeque<OutEvent<PI>>,
212    stats: Stats,
213}
214
215impl<PI: PeerIdentity> State<PI, rand::rngs::ThreadRng> {
216    /// Initialize the local state with the default random number generator.
217    ///
218    /// ## Panics
219    ///
220    /// Panics if [`Config::max_message_size`] is below [`MIN_MAX_MESSAGE_SIZE`].
221    pub fn new(me: PI, me_data: Option<PeerData>, config: Config) -> Self {
222        Self::with_rng(me, me_data, config, rand::rng())
223    }
224}
225
226impl<PI, R> State<PI, R> {
227    /// The address of your local endpoint.
228    pub fn endpoint(&self) -> &PI {
229        &self.me
230    }
231}
232
233impl<PI: PeerIdentity, R: Rng> State<PI, R> {
234    /// Initialize the local state with a custom random number generator.
235    ///
236    /// ## Panics
237    ///
238    /// Panics if [`Config::max_message_size`] is below [`MIN_MAX_MESSAGE_SIZE`].
239    pub fn with_rng(me: PI, me_data: Option<PeerData>, config: Config, rng: R) -> Self {
240        assert!(
241            config.max_message_size >= MIN_MAX_MESSAGE_SIZE,
242            "max_message_size must be at least {MIN_MAX_MESSAGE_SIZE}"
243        );
244        let max_payload_size =
245            config.max_message_size - super::Message::<PI>::postcard_header_size();
246        Self {
247            swarm: hyparview::State::new(me, me_data, config.membership, rng),
248            gossip: plumtree::State::new(me, config.broadcast, max_payload_size),
249            me,
250            outbox: VecDeque::new(),
251            stats: Stats::default(),
252        }
253    }
254
255    /// Handle an incoming event.
256    ///
257    /// Returns an iterator of outgoing events that must be processed by the application.
258    pub fn handle(
259        &mut self,
260        event: InEvent<PI>,
261        now: Instant,
262    ) -> impl Iterator<Item = OutEvent<PI>> + '_ {
263        let io = &mut self.outbox;
264        // Process the event, store out events in outbox.
265        match event {
266            InEvent::Command(command) => match command {
267                Command::Join(peers) => {
268                    for peer in peers {
269                        self.swarm.handle(SwarmIn::RequestJoin(peer), io);
270                    }
271                }
272                Command::Broadcast(data, scope) => {
273                    self.gossip
274                        .handle(GossipIn::Broadcast(data, scope), now, io)
275                }
276                Command::Quit => self.swarm.handle(SwarmIn::Quit, io),
277            },
278            InEvent::RecvMessage(from, message) => {
279                self.stats.messages_received += 1;
280                match message {
281                    Message::Swarm(message) => {
282                        self.swarm.handle(SwarmIn::RecvMessage(from, message), io)
283                    }
284                    Message::Gossip(message) => {
285                        self.gossip
286                            .handle(GossipIn::RecvMessage(from, message), now, io)
287                    }
288                }
289            }
290            InEvent::TimerExpired(timer) => match timer {
291                Timer::Swarm(timer) => self.swarm.handle(SwarmIn::TimerExpired(timer), io),
292                Timer::Gossip(timer) => self.gossip.handle(GossipIn::TimerExpired(timer), now, io),
293            },
294            InEvent::PeerDisconnected(peer) => {
295                self.swarm.handle(SwarmIn::PeerDisconnected(peer), io);
296                self.gossip.handle(GossipIn::NeighborDown(peer), now, io);
297            }
298            InEvent::UpdatePeerData(data) => self.swarm.handle(SwarmIn::UpdatePeerData(data), io),
299        }
300
301        // Forward NeighborUp and NeighborDown events from hyparview to plumtree
302        let mut io = VecDeque::new();
303        for event in self.outbox.iter() {
304            match event {
305                OutEvent::EmitEvent(Event::NeighborUp(peer)) => {
306                    self.gossip
307                        .handle(GossipIn::NeighborUp(*peer), now, &mut io)
308                }
309                OutEvent::EmitEvent(Event::NeighborDown(peer)) => {
310                    self.gossip
311                        .handle(GossipIn::NeighborDown(*peer), now, &mut io)
312                }
313                _ => {}
314            }
315        }
316        // Note that this is a no-op because plumtree::handle(NeighborUp | NeighborDown)
317        // above does not emit any OutEvents.
318        self.outbox.extend(io.drain(..));
319
320        // Update sent message counter
321        self.stats.messages_sent += self
322            .outbox
323            .iter()
324            .filter(|event| matches!(event, OutEvent::SendMessage(_, _)))
325            .count();
326
327        self.outbox.drain(..)
328    }
329
330    /// Get stats on how many messages were sent and received.
331    // TODO: Remove/replace with metrics?
332    pub fn stats(&self) -> &Stats {
333        &self.stats
334    }
335
336    /// Reset all statistics.
337    pub fn reset_stats(&mut self) {
338        self.gossip.stats = Default::default();
339        self.swarm.stats = Default::default();
340        self.stats = Default::default();
341    }
342
343    /// Get statistics for the gossip broadcast state
344    ///
345    /// TODO: Remove/replace with metrics?
346    pub fn gossip_stats(&self) -> &plumtree::Stats {
347        self.gossip.stats()
348    }
349
350    /// Check if this topic has any active (connected) peers.
351    pub fn has_active_peers(&self) -> bool {
352        !self.swarm.active_view.is_empty()
353    }
354}
355
356/// Statistics for the protocol state of a topic
357#[derive(Clone, Debug, Default)]
358pub struct Stats {
359    /// Number of messages sent
360    pub messages_sent: usize,
361    /// Number of messages received
362    pub messages_received: usize,
363}