iroh_gossip/proto/
state.rs

1//! The protocol state of the `iroh-gossip` protocol.
2
3use std::collections::{hash_map, HashMap, HashSet};
4
5use n0_future::time::{Duration, Instant};
6use rand::Rng;
7use serde::{Deserialize, Serialize};
8use tracing::trace;
9
10use crate::{
11    metrics::Metrics,
12    proto::{
13        topic::{self, Command},
14        util::idbytes_impls,
15        Config, PeerData, PeerIdentity, MIN_MAX_MESSAGE_SIZE,
16    },
17};
18
19/// The identifier for a topic
20#[derive(Clone, Copy, Eq, PartialEq, Hash, Serialize, Ord, PartialOrd, Deserialize)]
21pub struct TopicId([u8; 32]);
22idbytes_impls!(TopicId, "TopicId");
23
24impl TopicId {
25    /// Convert to a hex string limited to the first 5 bytes for a friendly string
26    /// representation of the key.
27    pub fn fmt_short(&self) -> String {
28        data_encoding::HEXLOWER.encode(&self.as_bytes()[..5])
29    }
30}
31
32/// Protocol wire message
33///
34/// This is the wire frame of the `iroh-gossip` protocol.
35#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct Message<PI> {
37    pub(crate) topic: TopicId,
38    pub(crate) message: topic::Message<PI>,
39}
40
41impl<PI> Message<PI> {
42    /// Get the kind of this message
43    pub fn kind(&self) -> MessageKind {
44        self.message.kind()
45    }
46}
47
48impl<PI: Serialize> Message<PI> {
49    pub(crate) fn postcard_header_size() -> usize {
50        // We create a message that has no payload (gossip::Message::Prune), calculate the encoded size,
51        // and subtract 1 for the discriminator of the inner gossip::Message enum.
52        let m = Self {
53            topic: TopicId(Default::default()),
54            message: topic::Message::<PI>::Gossip(super::plumtree::Message::Prune),
55        };
56        postcard::experimental::serialized_size(&m).unwrap() - 1
57    }
58}
59
60/// Whether this is a control or data message
61#[derive(Debug)]
62pub enum MessageKind {
63    /// A data message.
64    Data,
65    /// A control message.
66    Control,
67}
68
69impl<PI: Serialize> Message<PI> {
70    /// Get the encoded size of this message
71    pub fn size(&self) -> postcard::Result<usize> {
72        postcard::experimental::serialized_size(&self)
73    }
74}
75
76/// A timer to be registered into the runtime
77///
78/// As the implementation of the protocol is an IO-less state machine, registering timers does not
79/// happen within the protocol implementation. Instead, these `Timer` structs are emitted as
80/// [`OutEvent`]s. The implementer must register the timer in its runtime to be emitted on the specified [`Instant`],
81/// and once triggered inject an [`InEvent::TimerExpired`] into the protocol state.
82#[derive(Clone, Debug)]
83pub struct Timer<PI> {
84    topic: TopicId,
85    timer: topic::Timer<PI>,
86}
87
88/// Input event to the protocol state.
89#[derive(Clone, Debug)]
90pub enum InEvent<PI> {
91    /// Message received from the network.
92    RecvMessage(PI, Message<PI>),
93    /// Execute a command from the application.
94    Command(TopicId, Command<PI>),
95    /// Trigger a previously scheduled timer.
96    TimerExpired(Timer<PI>),
97    /// Peer disconnected on the network level.
98    PeerDisconnected(PI),
99    /// Update the opaque peer data about yourself.
100    UpdatePeerData(PeerData),
101}
102
103/// Output event from the protocol state.
104#[derive(Debug, Clone)]
105pub enum OutEvent<PI> {
106    /// Send a message on the network
107    SendMessage(PI, Message<PI>),
108    /// Emit an event to the application.
109    EmitEvent(TopicId, topic::Event<PI>),
110    /// Schedule a timer. The runtime is responsible for sending an [InEvent::TimerExpired]
111    /// after the duration.
112    ScheduleTimer(Duration, Timer<PI>),
113    /// Close the connection to a peer on the network level.
114    DisconnectPeer(PI),
115    /// Updated peer data
116    PeerData(PI, PeerData),
117}
118
119type ConnsMap<PI> = HashMap<PI, HashSet<TopicId>>;
120type Outbox<PI> = Vec<OutEvent<PI>>;
121
122enum InEventMapped<PI> {
123    All(topic::InEvent<PI>),
124    TopicEvent(TopicId, topic::InEvent<PI>),
125}
126
127impl<PI> From<InEvent<PI>> for InEventMapped<PI> {
128    fn from(event: InEvent<PI>) -> InEventMapped<PI> {
129        match event {
130            InEvent::RecvMessage(from, Message { topic, message }) => {
131                Self::TopicEvent(topic, topic::InEvent::RecvMessage(from, message))
132            }
133            InEvent::Command(topic, command) => {
134                Self::TopicEvent(topic, topic::InEvent::Command(command))
135            }
136            InEvent::TimerExpired(Timer { topic, timer }) => {
137                Self::TopicEvent(topic, topic::InEvent::TimerExpired(timer))
138            }
139            InEvent::PeerDisconnected(peer) => Self::All(topic::InEvent::PeerDisconnected(peer)),
140            InEvent::UpdatePeerData(data) => Self::All(topic::InEvent::UpdatePeerData(data)),
141        }
142    }
143}
144
145/// The state of the `iroh-gossip` protocol.
146///
147/// The implementation works as an IO-less state machine. The implementer injects events through
148/// [`Self::handle`], which returns an iterator of [`OutEvent`]s to be processed.
149///
150/// This struct contains a map of [`topic::State`] for each topic that was joined. It mostly acts as
151/// a forwarder of [`InEvent`]s to matching topic state. Each topic's state is completely
152/// independent; thus the actual protocol logic lives with [`topic::State`].
153#[derive(Debug)]
154pub struct State<PI, R> {
155    me: PI,
156    me_data: PeerData,
157    config: Config,
158    rng: R,
159    states: HashMap<TopicId, topic::State<PI, R>>,
160    outbox: Outbox<PI>,
161    peer_topics: ConnsMap<PI>,
162}
163
164impl<PI: PeerIdentity, R: Rng + Clone> State<PI, R> {
165    /// Create a new protocol state instance.
166    ///
167    /// `me` is the [`PeerIdentity`] of the local node, `peer_data` is the initial [`PeerData`]
168    /// (which can be updated over time).
169    /// For the protocol to perform as recommended in the papers, the [`Config`] should be
170    /// identical for all nodes in the network.
171    ///
172    /// ## Panics
173    ///
174    /// Panics if [`Config::max_message_size`] is below [`MIN_MAX_MESSAGE_SIZE`].
175    pub fn new(me: PI, me_data: PeerData, config: Config, rng: R) -> Self {
176        assert!(
177            config.max_message_size >= MIN_MAX_MESSAGE_SIZE,
178            "max_message_size must be at least {MIN_MAX_MESSAGE_SIZE}"
179        );
180        Self {
181            me,
182            me_data,
183            config,
184            rng,
185            states: Default::default(),
186            outbox: Default::default(),
187            peer_topics: Default::default(),
188        }
189    }
190
191    /// Get a reference to the node's [`PeerIdentity`]
192    pub fn me(&self) -> &PI {
193        &self.me
194    }
195
196    /// Get a reference to the protocol state for a topic.
197    pub fn state(&self, topic: &TopicId) -> Option<&topic::State<PI, R>> {
198        self.states.get(topic)
199    }
200
201    /// Resets the tracked stats for a topic.
202    pub fn reset_stats(&mut self, topic: &TopicId) {
203        if let Some(state) = self.states.get_mut(topic) {
204            state.reset_stats();
205        }
206    }
207
208    /// Get an iterator of all joined topics.
209    pub fn topics(&self) -> impl Iterator<Item = &TopicId> {
210        self.states.keys()
211    }
212
213    /// Get an iterator for the states of all joined topics.
214    pub fn states(&self) -> impl Iterator<Item = (&TopicId, &topic::State<PI, R>)> {
215        self.states.iter()
216    }
217
218    /// Check if a topic has any active (connected) peers.
219    pub fn has_active_peers(&self, topic: &TopicId) -> bool {
220        self.state(topic)
221            .map(|s| s.has_active_peers())
222            .unwrap_or(false)
223    }
224
225    /// Returns the maximum message size configured in the gossip protocol.
226    pub fn max_message_size(&self) -> usize {
227        self.config.max_message_size
228    }
229
230    /// Handle an [`InEvent`]
231    ///
232    /// This returns an iterator of [`OutEvent`]s that must be processed.
233    pub fn handle(
234        &mut self,
235        event: InEvent<PI>,
236        now: Instant,
237        metrics: Option<&Metrics>,
238    ) -> impl Iterator<Item = OutEvent<PI>> + '_ + use<'_, PI, R> {
239        trace!("in : {event:?}");
240        if let Some(metrics) = &metrics {
241            track_in_event(&event, metrics);
242        }
243
244        let event: InEventMapped<PI> = event.into();
245
246        match event {
247            InEventMapped::TopicEvent(topic, event) => {
248                // when receiving a join command, initialize state if it doesn't exist
249                if matches!(&event, topic::InEvent::Command(Command::Join(_peers))) {
250                    if let hash_map::Entry::Vacant(e) = self.states.entry(topic) {
251                        e.insert(topic::State::with_rng(
252                            self.me,
253                            Some(self.me_data.clone()),
254                            self.config.clone(),
255                            self.rng.clone(),
256                        ));
257                    }
258                }
259
260                // when receiving a quit command, note this and drop the topic state after
261                // processing this last event
262                let quit = matches!(event, topic::InEvent::Command(Command::Quit));
263
264                // pass the event to the state handler
265                if let Some(state) = self.states.get_mut(&topic) {
266                    // when receiving messages, update our conn map to take note that this topic state may want
267                    // to keep this connection
268                    if let topic::InEvent::RecvMessage(from, _message) = &event {
269                        self.peer_topics.entry(*from).or_default().insert(topic);
270                    }
271                    let out = state.handle(event, now);
272                    for event in out {
273                        handle_out_event(topic, event, &mut self.peer_topics, &mut self.outbox);
274                    }
275                }
276
277                if quit {
278                    self.states.remove(&topic);
279                }
280            }
281            // when a peer disconnected on the network level, forward event to all states
282            InEventMapped::All(event) => {
283                if let topic::InEvent::UpdatePeerData(data) = &event {
284                    self.me_data = data.clone();
285                }
286                for (topic, state) in self.states.iter_mut() {
287                    let out = state.handle(event.clone(), now);
288                    for event in out {
289                        handle_out_event(*topic, event, &mut self.peer_topics, &mut self.outbox);
290                    }
291                }
292            }
293        }
294
295        // track metrics
296        if let Some(metrics) = &metrics {
297            track_out_events(&self.outbox, metrics);
298        }
299
300        self.outbox.drain(..)
301    }
302}
303
304fn handle_out_event<PI: PeerIdentity>(
305    topic: TopicId,
306    event: topic::OutEvent<PI>,
307    conns: &mut ConnsMap<PI>,
308    outbox: &mut Outbox<PI>,
309) {
310    trace!("out: {event:?}");
311    match event {
312        topic::OutEvent::SendMessage(to, message) => {
313            outbox.push(OutEvent::SendMessage(to, Message { topic, message }))
314        }
315        topic::OutEvent::EmitEvent(event) => outbox.push(OutEvent::EmitEvent(topic, event)),
316        topic::OutEvent::ScheduleTimer(delay, timer) => {
317            outbox.push(OutEvent::ScheduleTimer(delay, Timer { topic, timer }))
318        }
319        topic::OutEvent::DisconnectPeer(peer) => {
320            let empty = conns
321                .get_mut(&peer)
322                .map(|list| list.remove(&topic) && list.is_empty())
323                .unwrap_or(false);
324            if empty {
325                conns.remove(&peer);
326                outbox.push(OutEvent::DisconnectPeer(peer));
327            }
328        }
329        topic::OutEvent::PeerData(peer, data) => outbox.push(OutEvent::PeerData(peer, data)),
330    }
331}
332
333fn track_out_events<PI: Serialize>(events: &[OutEvent<PI>], metrics: &Metrics) {
334    for event in events {
335        match event {
336            OutEvent::SendMessage(_to, message) => match message.kind() {
337                MessageKind::Data => {
338                    metrics.msgs_data_sent.inc();
339                    metrics
340                        .msgs_data_sent_size
341                        .inc_by(message.size().unwrap_or(0) as u64);
342                }
343                MessageKind::Control => {
344                    metrics.msgs_ctrl_sent.inc();
345                    metrics
346                        .msgs_ctrl_sent_size
347                        .inc_by(message.size().unwrap_or(0) as u64);
348                }
349            },
350            OutEvent::EmitEvent(_topic, event) => match event {
351                super::Event::NeighborUp(_peer) => {
352                    metrics.neighbor_up.inc();
353                }
354                super::Event::NeighborDown(_peer) => {
355                    metrics.neighbor_down.inc();
356                }
357                _ => {}
358            },
359            _ => {}
360        }
361    }
362}
363
364fn track_in_event<PI: Serialize>(event: &InEvent<PI>, metrics: &Metrics) {
365    if let InEvent::RecvMessage(_from, message) = event {
366        match message.kind() {
367            MessageKind::Data => {
368                metrics.msgs_data_recv.inc();
369                metrics
370                    .msgs_data_recv_size
371                    .inc_by(message.size().unwrap_or(0) as u64);
372            }
373            MessageKind::Control => {
374                metrics.msgs_ctrl_recv.inc();
375                metrics
376                    .msgs_ctrl_recv_size
377                    .inc_by(message.size().unwrap_or(0) as u64);
378            }
379        }
380    }
381}