iroh_gossip/
proto.rs

1//! Implementation of the iroh-gossip protocol, as an IO-less state machine
2//!
3//! This module implements the iroh-gossip protocol. The entry point is [`State`], which contains
4//! the protocol state for a node.
5//!
6//! The iroh-gossip protocol is made up from two parts: A swarm membership protocol, based on
7//! [HyParView][hyparview], and a gossip broadcasting protocol, based on [PlumTree][plumtree].
8//!
9//! For a full explanation it is recommended to read the two papers. What follows is a brief
10//! outline of the protocols.
11//!
12//! All protocol messages are namespaced by a [`TopicId`], a 32 byte identifier. Topics are
13//! separate swarms and broadcast scopes. The HyParView and PlumTree algorithms both work in the
14//! scope of a single topic. Thus, joining multiple topics increases the number of open connections
15//! to peers and the size of the local routing table.
16//!
17//! The **membership protocol** ([HyParView][hyparview]) is a cluster protocol where each peer
18//! maintains a partial view of all nodes in the swarm.
19//! A peer joins the swarm for a topic by connecting to any known peer that is a member of this
20//! topic's swarm. Obtaining this initial contact info happens out of band. The peer then sends
21//! a `Join` message to that initial peer. All peers maintain a list of
22//! `active` and `passive` peers. Active peers are those that you maintain active connections to.
23//! Passive peers is an addressbook of additional peers. If one of your active peers goes offline,
24//! its slot is filled with a random peer from the passive set. In the default configuration, the
25//! active view has a size of 5 and the passive view a size of 30.
26//! The HyParView protocol ensures that active connections are always bidirectional, and regularly
27//! exchanges nodes for the passive view in a `Shuffle` operation.
28//! Thus, this protocol exposes a high degree of reliability and auto-recovery in the case of node
29//! failures.
30//!
31//! The **gossip protocol** ([PlumTree][plumtree]) builds upon the membership protocol. It exposes
32//! a method to broadcast messages to all peers in the swarm. On each node, it maintains two sets
33//! of peers: An `eager` set and a `lazy` set. Both are subsets of the `active` view from the
34//! membership protocol. When broadcasting a message from the local node, or upon receiving a
35//! broadcast message, the message is pushed to all peers in the eager set. Additionally, the hash
36//! of the message (which uniquely identifies it), but not the message content, is lazily pushed
37//! to all peers  in the `lazy` set. When receiving such lazy pushes (called `Ihaves`), those peers
38//! may request the message content after a timeout if they didn't receive the message by one of
39//! their eager peers before. When requesting a message from a currently-lazy peer, this peer is
40//! also upgraded to be an eager peer from that moment on. This strategy self-optimizes the
41//! messaging graph by latency. Note however that this optimization will work best if the messaging
42//! paths are stable, i.e. if it's always the same peer that broadcasts. If not, the relative
43//! message redundancy will grow and the ideal messaging graph might change frequently.
44//!
45//! [hyparview]: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
46//! [plumtree]: https://asc.di.fct.unl.pt/~jleitao/pdf/srds07-leitao.pdf
47
48use std::{fmt, hash::Hash};
49
50use bytes::Bytes;
51use serde::{de::DeserializeOwned, Deserialize, Serialize};
52
53mod hyparview;
54mod plumtree;
55pub mod state;
56pub mod topic;
57pub mod util;
58
59#[cfg(any(test, feature = "test-utils"))]
60pub mod sim;
61
62pub use hyparview::Config as HyparviewConfig;
63pub use plumtree::{Config as PlumtreeConfig, DeliveryScope, Scope};
64pub use state::{InEvent, Message, OutEvent, State, Timer, TopicId};
65pub use topic::{Command, Config, Event, IO};
66
67/// The default maximum size in bytes for a gossip message.
68/// This is a sane but arbitrary default and can be changed in the [`Config`].
69pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 4096;
70
71/// The minimum allowed value for [`Config::max_message_size`].
72pub const MIN_MAX_MESSAGE_SIZE: usize = 512;
73
74/// The identifier for a peer.
75///
76/// The protocol implementation is generic over this trait. When implementing the protocol,
77/// a concrete type must be chosen that will then be used throughout the implementation to identify
78/// and index individual peers.
79///
80/// Note that the concrete type will be used in protocol messages. Therefore, implementations of
81/// the protocol are only compatible if the same concrete type is supplied for this trait.
82///
83/// TODO: Rename to `PeerId`? It does not necessarily refer to a peer's address, as long as the
84/// networking layer can translate the value of its concrete type into an address.
85pub trait PeerIdentity: Hash + Eq + Ord + Copy + fmt::Debug + Serialize + DeserializeOwned {}
86impl<T> PeerIdentity for T where
87    T: Hash + Eq + Ord + Copy + fmt::Debug + Serialize + DeserializeOwned
88{
89}
90
91/// Opaque binary data that is transmitted on messages that introduce new peers.
92///
93/// Implementations may use these bytes to supply addresses or other information needed to connect
94/// to a peer that is not included in the peer's [`PeerIdentity`].
95#[derive(derive_more::Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
96#[debug("PeerData({}b)", self.0.len())]
97pub struct PeerData(Bytes);
98
99impl PeerData {
100    /// Create a new [`PeerData`] from a byte buffer.
101    pub fn new(data: impl Into<Bytes>) -> Self {
102        Self(data.into())
103    }
104
105    /// Get a reference to the contained [`bytes::Bytes`].
106    pub fn inner(&self) -> &bytes::Bytes {
107        &self.0
108    }
109
110    /// Get the peer data as a byte slice.
111    pub fn as_bytes(&self) -> &[u8] {
112        &self.0
113    }
114}
115
116/// PeerInfo contains a peer's identifier and the opaque peer data as provided by the implementer.
117#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
118struct PeerInfo<PI> {
119    pub id: PI,
120    pub data: Option<PeerData>,
121}
122
123impl<PI> From<(PI, Option<PeerData>)> for PeerInfo<PI> {
124    fn from((id, data): (PI, Option<PeerData>)) -> Self {
125        Self { id, data }
126    }
127}
128
129#[cfg(test)]
130mod test {
131    use std::{collections::HashSet, env, fmt, str::FromStr};
132
133    use rand::SeedableRng;
134    use rand_chacha::ChaCha12Rng;
135    use tracing_test::traced_test;
136
137    use super::{Command, Config, Event};
138    use crate::proto::{
139        sim::{LatencyConfig, Network, NetworkConfig},
140        Scope, TopicId,
141    };
142
143    #[test]
144    #[traced_test]
145    fn hyparview_smoke() {
146        // Create a network with 4 nodes and active_view_capacity 2
147        let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
148        let mut config = Config::default();
149        config.membership.active_view_capacity = 2;
150        let network_config = NetworkConfig {
151            proto: config,
152            latency: LatencyConfig::default_static(),
153        };
154        let mut network = Network::new(network_config, rng);
155        for i in 0..4 {
156            network.insert(i);
157        }
158
159        let t: TopicId = [0u8; 32].into();
160
161        // Do some joins between nodes 0,1,2
162        network.command(0, t, Command::Join(vec![1, 2]));
163        network.command(1, t, Command::Join(vec![2]));
164        network.command(2, t, Command::Join(vec![]));
165
166        network.run_trips(3);
167
168        // Confirm emitted events
169        let actual = network.events_sorted();
170        let expected = sort(vec![
171            (0, t, Event::NeighborUp(1)),
172            (0, t, Event::NeighborUp(2)),
173            (1, t, Event::NeighborUp(2)),
174            (1, t, Event::NeighborUp(0)),
175            (2, t, Event::NeighborUp(0)),
176            (2, t, Event::NeighborUp(1)),
177        ]);
178        assert_eq!(actual, expected);
179
180        // Confirm active connections
181        assert_eq!(network.conns(), vec![(0, 1), (0, 2), (1, 2)]);
182
183        // Now let node 3 join node 0.
184        // Node 0 is full, so it will disconnect from either node 1 or node 2.
185        network.command(3, t, Command::Join(vec![0]));
186
187        network.run_trips(2);
188
189        // Confirm emitted events. There's two options because whether node 0 disconnects from
190        // node 1 or node 2 is random.
191        let actual = network.events_sorted();
192        eprintln!("actual {actual:#?}");
193        let expected1 = sort(vec![
194            (3, t, Event::NeighborUp(0)),
195            (0, t, Event::NeighborUp(3)),
196            (0, t, Event::NeighborDown(1)),
197            (1, t, Event::NeighborDown(0)),
198        ]);
199        let expected2 = sort(vec![
200            (3, t, Event::NeighborUp(0)),
201            (0, t, Event::NeighborUp(3)),
202            (0, t, Event::NeighborDown(2)),
203            (2, t, Event::NeighborDown(0)),
204        ]);
205        assert!((actual == expected1) || (actual == expected2));
206
207        // Confirm active connections.
208        if actual == expected1 {
209            assert_eq!(network.conns(), vec![(0, 2), (0, 3), (1, 2)]);
210        } else {
211            assert_eq!(network.conns(), vec![(0, 1), (0, 3), (1, 2)]);
212        }
213        assert!(network.check_synchronicity());
214    }
215
216    #[test]
217    #[traced_test]
218    fn plumtree_smoke() {
219        let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
220        let network_config = NetworkConfig {
221            proto: Config::default(),
222            latency: LatencyConfig::default_static(),
223        };
224        let mut network = Network::new(network_config, rng);
225        // build a network with 6 nodes
226        for i in 0..6 {
227            network.insert(i);
228        }
229
230        let t = [0u8; 32].into();
231
232        // let node 0 join the topic but do not connect to any peers
233        network.command(0, t, Command::Join(vec![]));
234        // connect nodes 1 and 2 to node 0
235        (1..3).for_each(|i| network.command(i, t, Command::Join(vec![0])));
236        // connect nodes 4 and 5 to node 3
237        network.command(3, t, Command::Join(vec![]));
238        (4..6).for_each(|i| network.command(i, t, Command::Join(vec![3])));
239        // run ticks and drain events
240
241        network.run_trips(4);
242
243        let _ = network.events();
244        assert!(network.check_synchronicity());
245
246        // now broadcast a first message
247        network.command(
248            1,
249            t,
250            Command::Broadcast(b"hi1".to_vec().into(), Scope::Swarm),
251        );
252
253        network.run_trips(4);
254
255        let events = network.events();
256        let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
257        // message should be received by two other nodes
258        assert_eq!(received.count(), 2);
259        assert!(network.check_synchronicity());
260
261        // now connect the two sections of the swarm
262        network.command(2, t, Command::Join(vec![5]));
263        network.run_trips(3);
264        let _ = network.events();
265        println!("{}", network.report());
266
267        // now broadcast again
268        network.command(
269            1,
270            t,
271            Command::Broadcast(b"hi2".to_vec().into(), Scope::Swarm),
272        );
273        network.run_trips(5);
274        let events = network.events();
275        let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
276        // message should be received by all 5 other nodes
277        assert_eq!(received.count(), 5);
278        assert!(network.check_synchronicity());
279        println!("{}", network.report());
280    }
281
282    #[test]
283    #[traced_test]
284    fn quit() {
285        // Create a network with 4 nodes and active_view_capacity 2
286        let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
287        let mut config = Config::default();
288        config.membership.active_view_capacity = 2;
289        let mut network = Network::new(config.into(), rng);
290        let num = 4;
291        for i in 0..num {
292            network.insert(i);
293        }
294
295        let t: TopicId = [0u8; 32].into();
296
297        // join all nodes
298        network.command(0, t, Command::Join(vec![]));
299        network.command(1, t, Command::Join(vec![0]));
300        network.command(2, t, Command::Join(vec![1]));
301        network.command(3, t, Command::Join(vec![2]));
302        network.run_trips(2);
303
304        // assert all peers appear in the connections
305        let all_conns: HashSet<u64> = HashSet::from_iter((0u64..4).flat_map(|p| {
306            network
307                .neighbors(&p, &t)
308                .into_iter()
309                .flat_map(|x| x.into_iter())
310        }));
311        assert_eq!(all_conns, HashSet::from_iter([0, 1, 2, 3]));
312        assert!(network.check_synchronicity());
313
314        //  let node 3 leave the swarm
315        network.command(3, t, Command::Quit);
316        network.run_trips(4);
317        assert!(network.peer(&3).unwrap().state(&t).is_none());
318
319        // assert all peers without peer 3 appear in the connections
320        let all_conns: HashSet<u64> = HashSet::from_iter((0..num).flat_map(|p| {
321            network
322                .neighbors(&p, &t)
323                .into_iter()
324                .flat_map(|x| x.into_iter())
325        }));
326        assert_eq!(all_conns, HashSet::from_iter([0, 1, 2]));
327        assert!(network.check_synchronicity());
328    }
329
330    fn read_var<T: FromStr<Err: fmt::Display + fmt::Debug>>(name: &str, default: T) -> T {
331        env::var(name)
332            .map(|x| {
333                x.parse()
334                    .unwrap_or_else(|_| panic!("Failed to parse environment variable {name}"))
335            })
336            .unwrap_or(default)
337    }
338
339    fn sort<T: Ord + Clone>(items: Vec<T>) -> Vec<T> {
340        let mut sorted = items;
341        sorted.sort();
342        sorted
343    }
344}