iroh_gossip/proto/
topic.rs

1//! This module contains the implementation of the gossiping protocol for an individual topic
2
3use std::collections::VecDeque;
4
5use bytes::Bytes;
6use derive_more::From;
7use n0_future::time::{Duration, Instant};
8use rand::{Rng, SeedableRng};
9use serde::{Deserialize, Serialize};
10
11use super::{
12    hyparview::{self, InEvent as SwarmIn},
13    plumtree::{self, GossipEvent, InEvent as GossipIn, Scope},
14    state::MessageKind,
15    PeerData, PeerIdentity, DEFAULT_MAX_MESSAGE_SIZE,
16};
17use crate::proto::MIN_MAX_MESSAGE_SIZE;
18
19/// Input event to the topic state handler.
20#[derive(Clone, Debug)]
21pub enum InEvent<PI> {
22    /// Message received from the network.
23    RecvMessage(PI, Message<PI>),
24    /// Execute a command from the application.
25    Command(Command<PI>),
26    /// Trigger a previously scheduled timer.
27    TimerExpired(Timer<PI>),
28    /// Peer disconnected on the network level.
29    PeerDisconnected(PI),
30    /// Update the opaque peer data about yourself.
31    UpdatePeerData(PeerData),
32}
33
34/// An output event from the state handler.
35#[derive(Debug, PartialEq, Eq)]
36pub enum OutEvent<PI> {
37    /// Send a message on the network
38    SendMessage(PI, Message<PI>),
39    /// Emit an event to the application.
40    EmitEvent(Event<PI>),
41    /// Schedule a timer. The runtime is responsible for sending an [InEvent::TimerExpired]
42    /// after the duration.
43    ScheduleTimer(Duration, Timer<PI>),
44    /// Close the connection to a peer on the network level.
45    DisconnectPeer(PI),
46    /// Emitted when new [`PeerData`] was received for a peer.
47    PeerData(PI, PeerData),
48}
49
50impl<PI> From<hyparview::OutEvent<PI>> for OutEvent<PI> {
51    fn from(event: hyparview::OutEvent<PI>) -> Self {
52        use hyparview::OutEvent::*;
53        match event {
54            SendMessage(to, message) => Self::SendMessage(to, message.into()),
55            ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()),
56            DisconnectPeer(peer) => Self::DisconnectPeer(peer),
57            EmitEvent(event) => Self::EmitEvent(event.into()),
58            PeerData(peer, data) => Self::PeerData(peer, data),
59        }
60    }
61}
62
63impl<PI> From<plumtree::OutEvent<PI>> for OutEvent<PI> {
64    fn from(event: plumtree::OutEvent<PI>) -> Self {
65        use plumtree::OutEvent::*;
66        match event {
67            SendMessage(to, message) => Self::SendMessage(to, message.into()),
68            ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()),
69            EmitEvent(event) => Self::EmitEvent(event.into()),
70        }
71    }
72}
73
74/// A trait for a concrete type to push `OutEvent`s to.
75///
76/// The implementation is generic over this trait, which allows the upper layer to supply a
77/// container of their choice for `OutEvent`s emitted from the protocol state.
78pub trait IO<PI: Clone> {
79    /// Push an event in the IO container
80    fn push(&mut self, event: impl Into<OutEvent<PI>>);
81
82    /// Push all events from an iterator into the IO container
83    fn push_from_iter(&mut self, iter: impl IntoIterator<Item = impl Into<OutEvent<PI>>>) {
84        for event in iter.into_iter() {
85            self.push(event);
86        }
87    }
88}
89
90/// A protocol message for a particular topic
91#[derive(From, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
92pub enum Message<PI> {
93    /// A message of the swarm membership layer
94    Swarm(hyparview::Message<PI>),
95    /// A message of the gossip broadcast layer
96    Gossip(plumtree::Message),
97}
98
99impl<PI> Message<PI> {
100    /// Get the kind of this message
101    pub fn kind(&self) -> MessageKind {
102        match self {
103            Message::Swarm(_) => MessageKind::Control,
104            Message::Gossip(message) => match message {
105                plumtree::Message::Gossip(_) => MessageKind::Data,
106                _ => MessageKind::Control,
107            },
108        }
109    }
110
111    /// Returns `true` if this is a disconnect message (which is the last message sent to a peer per topic).
112    pub fn is_disconnect(&self) -> bool {
113        matches!(self, Message::Swarm(hyparview::Message::Disconnect(_)))
114    }
115}
116
117impl<PI: Serialize> Message<PI> {
118    /// Get the encoded size of this message
119    pub fn size(&self) -> postcard::Result<usize> {
120        postcard::experimental::serialized_size(&self)
121    }
122}
123
124/// An event to be emitted to the application for a particular topic.
125#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
126pub enum Event<PI> {
127    /// We have a new, direct neighbor in the swarm membership layer for this topic
128    NeighborUp(PI),
129    /// We dropped direct neighbor in the swarm membership layer for this topic
130    NeighborDown(PI),
131    /// A gossip message was received for this topic
132    Received(GossipEvent<PI>),
133}
134
135impl<PI> From<hyparview::Event<PI>> for Event<PI> {
136    fn from(value: hyparview::Event<PI>) -> Self {
137        match value {
138            hyparview::Event::NeighborUp(peer) => Self::NeighborUp(peer),
139            hyparview::Event::NeighborDown(peer) => Self::NeighborDown(peer),
140        }
141    }
142}
143
144impl<PI> From<plumtree::Event<PI>> for Event<PI> {
145    fn from(value: plumtree::Event<PI>) -> Self {
146        match value {
147            plumtree::Event::Received(event) => Self::Received(event),
148        }
149    }
150}
151
152/// A timer to be registered for a particular topic.
153///
154/// This should be treated as an opaque value by the implementer and, once emitted, simply returned
155/// to the protocol through [`InEvent::TimerExpired`].
156#[derive(Clone, From, Debug, PartialEq, Eq)]
157pub enum Timer<PI> {
158    /// A timer for the swarm layer
159    Swarm(hyparview::Timer<PI>),
160    /// A timer for the gossip layer
161    Gossip(plumtree::Timer),
162}
163
164/// A command to the protocol state for a particular topic.
165#[derive(Clone, derive_more::Debug)]
166pub enum Command<PI> {
167    /// Join this topic and connect to peers.
168    ///
169    /// If the list of peers is empty, will prepare the state and accept incoming join requests,
170    /// but only become operational after the first join request by another peer.
171    Join(Vec<PI>),
172    /// Broadcast a message for this topic.
173    Broadcast(#[debug("<{}b>", _0.len())] Bytes, Scope),
174    /// Leave this topic and drop all state.
175    Quit,
176}
177
178impl<PI: Clone> IO<PI> for VecDeque<OutEvent<PI>> {
179    fn push(&mut self, event: impl Into<OutEvent<PI>>) {
180        self.push_back(event.into())
181    }
182}
183
184/// Protocol configuration
185#[derive(Clone, Debug, Serialize, Deserialize)]
186#[serde(default)]
187pub struct Config {
188    /// Configuration for the swarm membership layer
189    pub membership: hyparview::Config,
190    /// Configuration for the gossip broadcast layer
191    pub broadcast: plumtree::Config,
192    /// Max message size in bytes.
193    ///
194    /// This size should be the same across a network to ensure all nodes can transmit and read large messages.
195    ///
196    /// At minimum, this size should be large enough to send gossip control messages. This can vary, depending on the size of the [`PeerIdentity`] you use and the size of the [`PeerData`] you transmit in your messages.
197    ///
198    /// The default is [`DEFAULT_MAX_MESSAGE_SIZE`].
199    pub max_message_size: usize,
200}
201
202impl Default for Config {
203    fn default() -> Self {
204        Self {
205            membership: Default::default(),
206            broadcast: Default::default(),
207            max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
208        }
209    }
210}
211
212/// The topic state maintains the swarm membership and broadcast tree for a particular topic.
213#[derive(Debug)]
214pub struct State<PI, R> {
215    me: PI,
216    pub(crate) swarm: hyparview::State<PI, R>,
217    pub(crate) gossip: plumtree::State<PI>,
218    outbox: VecDeque<OutEvent<PI>>,
219    stats: Stats,
220}
221
222impl<PI: PeerIdentity> State<PI, rand::rngs::StdRng> {
223    /// Initialize the local state with the default random number generator.
224    ///
225    /// ## Panics
226    ///
227    /// Panics if [`Config::max_message_size`] is below [`MIN_MAX_MESSAGE_SIZE`].
228    pub fn new(me: PI, me_data: Option<PeerData>, config: Config) -> Self {
229        Self::with_rng(me, me_data, config, rand::rngs::StdRng::from_os_rng())
230    }
231}
232
233impl<PI, R> State<PI, R> {
234    /// The address of your local endpoint.
235    pub fn endpoint(&self) -> &PI {
236        &self.me
237    }
238}
239
240impl<PI: PeerIdentity, R: Rng> State<PI, R> {
241    /// Initialize the local state with a custom random number generator.
242    ///
243    /// ## Panics
244    ///
245    /// Panics if [`Config::max_message_size`] is below [`MIN_MAX_MESSAGE_SIZE`].
246    pub fn with_rng(me: PI, me_data: Option<PeerData>, config: Config, rng: R) -> Self {
247        assert!(
248            config.max_message_size >= MIN_MAX_MESSAGE_SIZE,
249            "max_message_size must be at least {MIN_MAX_MESSAGE_SIZE}"
250        );
251        let max_payload_size =
252            config.max_message_size - super::Message::<PI>::postcard_header_size();
253        Self {
254            swarm: hyparview::State::new(me, me_data, config.membership, rng),
255            gossip: plumtree::State::new(me, config.broadcast, max_payload_size),
256            me,
257            outbox: VecDeque::new(),
258            stats: Stats::default(),
259        }
260    }
261
262    /// Handle an incoming event.
263    ///
264    /// Returns an iterator of outgoing events that must be processed by the application.
265    pub fn handle(
266        &mut self,
267        event: InEvent<PI>,
268        now: Instant,
269    ) -> impl Iterator<Item = OutEvent<PI>> + '_ {
270        let io = &mut self.outbox;
271        // Process the event, store out events in outbox.
272        match event {
273            InEvent::Command(command) => match command {
274                Command::Join(peers) => {
275                    for peer in peers {
276                        self.swarm.handle(SwarmIn::RequestJoin(peer), io);
277                    }
278                }
279                Command::Broadcast(data, scope) => {
280                    self.gossip
281                        .handle(GossipIn::Broadcast(data, scope), now, io)
282                }
283                Command::Quit => self.swarm.handle(SwarmIn::Quit, io),
284            },
285            InEvent::RecvMessage(from, message) => {
286                self.stats.messages_received += 1;
287                match message {
288                    Message::Swarm(message) => {
289                        self.swarm.handle(SwarmIn::RecvMessage(from, message), io)
290                    }
291                    Message::Gossip(message) => {
292                        self.gossip
293                            .handle(GossipIn::RecvMessage(from, message), now, io)
294                    }
295                }
296            }
297            InEvent::TimerExpired(timer) => match timer {
298                Timer::Swarm(timer) => self.swarm.handle(SwarmIn::TimerExpired(timer), io),
299                Timer::Gossip(timer) => self.gossip.handle(GossipIn::TimerExpired(timer), now, io),
300            },
301            InEvent::PeerDisconnected(peer) => {
302                self.swarm.handle(SwarmIn::PeerDisconnected(peer), io);
303                self.gossip.handle(GossipIn::NeighborDown(peer), now, io);
304            }
305            InEvent::UpdatePeerData(data) => self.swarm.handle(SwarmIn::UpdatePeerData(data), io),
306        }
307
308        // Forward NeighborUp and NeighborDown events from hyparview to plumtree
309        let mut io = VecDeque::new();
310        for event in self.outbox.iter() {
311            match event {
312                OutEvent::EmitEvent(Event::NeighborUp(peer)) => {
313                    self.gossip
314                        .handle(GossipIn::NeighborUp(*peer), now, &mut io)
315                }
316                OutEvent::EmitEvent(Event::NeighborDown(peer)) => {
317                    self.gossip
318                        .handle(GossipIn::NeighborDown(*peer), now, &mut io)
319                }
320                _ => {}
321            }
322        }
323        // Note that this is a no-op because plumtree::handle(NeighborUp | NeighborDown)
324        // above does not emit any OutEvents.
325        self.outbox.extend(io.drain(..));
326
327        // Update sent message counter
328        self.stats.messages_sent += self
329            .outbox
330            .iter()
331            .filter(|event| matches!(event, OutEvent::SendMessage(_, _)))
332            .count();
333
334        self.outbox.drain(..)
335    }
336
337    /// Get stats on how many messages were sent and received.
338    // TODO: Remove/replace with metrics?
339    pub fn stats(&self) -> &Stats {
340        &self.stats
341    }
342
343    /// Reset all statistics.
344    pub fn reset_stats(&mut self) {
345        self.gossip.stats = Default::default();
346        self.swarm.stats = Default::default();
347        self.stats = Default::default();
348    }
349
350    /// Get statistics for the gossip broadcast state
351    ///
352    /// TODO: Remove/replace with metrics?
353    pub fn gossip_stats(&self) -> &plumtree::Stats {
354        self.gossip.stats()
355    }
356
357    /// Check if this topic has any active (connected) peers.
358    pub fn has_active_peers(&self) -> bool {
359        !self.swarm.active_view.is_empty()
360    }
361}
362
363/// Statistics for the protocol state of a topic
364#[derive(Clone, Debug, Default)]
365pub struct Stats {
366    /// Number of messages sent
367    pub messages_sent: usize,
368    /// Number of messages received
369    pub messages_received: usize,
370}