iroh_gossip/proto.rs
1//! Implementation of the iroh-gossip protocol, as an IO-less state machine
2//!
3//! This module implements the iroh-gossip protocol. The entry point is [`State`], which contains
4//! the protocol state for a node.
5//!
6//! The iroh-gossip protocol is made up from two parts: A swarm membership protocol, based on
7//! [HyParView][hyparview], and a gossip broadcasting protocol, based on [PlumTree][plumtree].
8//!
9//! For a full explanation it is recommended to read the two papers. What follows is a brief
10//! outline of the protocols.
11//!
12//! All protocol messages are namespaced by a [`TopicId`], a 32 byte identifier. Topics are
13//! separate swarms and broadcast scopes. The HyParView and PlumTree algorithms both work in the
14//! scope of a single topic. Thus, joining multiple topics increases the number of open connections
15//! to peers and the size of the local routing table.
16//!
17//! The **membership protocol** ([HyParView][hyparview]) is a cluster protocol where each peer
18//! maintains a partial view of all nodes in the swarm.
19//! A peer joins the swarm for a topic by connecting to any known peer that is a member of this
20//! topic's swarm. Obtaining this initial contact info happens out of band. The peer then sends
21//! a `Join` message to that initial peer. All peers maintain a list of
22//! `active` and `passive` peers. Active peers are those that you maintain active connections to.
23//! Passive peers is an addressbook of additional peers. If one of your active peers goes offline,
24//! its slot is filled with a random peer from the passive set. In the default configuration, the
25//! active view has a size of 5 and the passive view a size of 30.
26//! The HyParView protocol ensures that active connections are always bidirectional, and regularly
27//! exchanges nodes for the passive view in a `Shuffle` operation.
28//! Thus, this protocol exposes a high degree of reliability and auto-recovery in the case of node
29//! failures.
30//!
31//! The **gossip protocol** ([PlumTree][plumtree]) builds upon the membership protocol. It exposes
32//! a method to broadcast messages to all peers in the swarm. On each node, it maintains two sets
33//! of peers: An `eager` set and a `lazy` set. Both are subsets of the `active` view from the
34//! membership protocol. When broadcasting a message from the local node, or upon receiving a
35//! broadcast message, the message is pushed to all peers in the eager set. Additionally, the hash
36//! of the message (which uniquely identifies it), but not the message content, is lazily pushed
37//! to all peers in the `lazy` set. When receiving such lazy pushes (called `Ihaves`), those peers
38//! may request the message content after a timeout if they didn't receive the message by one of
39//! their eager peers before. When requesting a message from a currently-lazy peer, this peer is
40//! also upgraded to be an eager peer from that moment on. This strategy self-optimizes the
41//! messaging graph by latency. Note however that this optimization will work best if the messaging
42//! paths are stable, i.e. if it's always the same peer that broadcasts. If not, the relative
43//! message redundancy will grow and the ideal messaging graph might change frequently.
44//!
45//! [hyparview]: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
46//! [plumtree]: https://asc.di.fct.unl.pt/~jleitao/pdf/srds07-leitao.pdf
47
48use std::{fmt, hash::Hash};
49
50use bytes::Bytes;
51use serde::{de::DeserializeOwned, Deserialize, Serialize};
52
53mod hyparview;
54mod plumtree;
55pub mod state;
56pub mod topic;
57pub mod util;
58
59#[cfg(any(test, feature = "test-utils"))]
60pub mod sim;
61
62pub use hyparview::Config as HyparviewConfig;
63pub use plumtree::{Config as PlumtreeConfig, DeliveryScope, Scope};
64pub use state::{InEvent, Message, OutEvent, State, Timer, TopicId};
65pub use topic::{Command, Config, Event, IO};
66
67/// The default maximum size in bytes for a gossip message.
68/// This is a sane but arbitrary default and can be changed in the [`Config`].
69pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 4096;
70
71/// The minimum allowed value for [`Config::max_message_size`].
72pub const MIN_MAX_MESSAGE_SIZE: usize = 512;
73
74/// The identifier for a peer.
75///
76/// The protocol implementation is generic over this trait. When implementing the protocol,
77/// a concrete type must be chosen that will then be used throughout the implementation to identify
78/// and index individual peers.
79///
80/// Note that the concrete type will be used in protocol messages. Therefore, implementations of
81/// the protocol are only compatible if the same concrete type is supplied for this trait.
82///
83/// TODO: Rename to `PeerId`? It does not necessarily refer to a peer's address, as long as the
84/// networking layer can translate the value of its concrete type into an address.
85pub trait PeerIdentity: Hash + Eq + Ord + Copy + fmt::Debug + Serialize + DeserializeOwned {}
86impl<T> PeerIdentity for T where
87 T: Hash + Eq + Ord + Copy + fmt::Debug + Serialize + DeserializeOwned
88{
89}
90
91/// Opaque binary data that is transmitted on messages that introduce new peers.
92///
93/// Implementations may use these bytes to supply addresses or other information needed to connect
94/// to a peer that is not included in the peer's [`PeerIdentity`].
95#[derive(derive_more::Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
96#[debug("PeerData({}b)", self.0.len())]
97pub struct PeerData(Bytes);
98
99impl PeerData {
100 /// Create a new [`PeerData`] from a byte buffer.
101 pub fn new(data: impl Into<Bytes>) -> Self {
102 Self(data.into())
103 }
104
105 /// Get a reference to the contained [`bytes::Bytes`].
106 pub fn inner(&self) -> &bytes::Bytes {
107 &self.0
108 }
109
110 /// Get the peer data as a byte slice.
111 pub fn as_bytes(&self) -> &[u8] {
112 &self.0
113 }
114}
115
116/// PeerInfo contains a peer's identifier and the opaque peer data as provided by the implementer.
117#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
118struct PeerInfo<PI> {
119 pub id: PI,
120 pub data: Option<PeerData>,
121}
122
123impl<PI> From<(PI, Option<PeerData>)> for PeerInfo<PI> {
124 fn from((id, data): (PI, Option<PeerData>)) -> Self {
125 Self { id, data }
126 }
127}
128
129#[cfg(test)]
130mod test {
131 use std::{collections::HashSet, env, fmt, str::FromStr};
132
133 use rand::SeedableRng;
134 use rand_chacha::ChaCha12Rng;
135 use tracing_test::traced_test;
136
137 use super::{Command, Config, Event};
138 use crate::proto::{
139 sim::{LatencyConfig, Network, NetworkConfig},
140 Scope, TopicId,
141 };
142
143 #[test]
144 #[traced_test]
145 fn hyparview_smoke() {
146 // Create a network with 4 nodes and active_view_capacity 2
147 let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
148 let mut config = Config::default();
149 config.membership.active_view_capacity = 2;
150 let network_config = NetworkConfig {
151 proto: config,
152 latency: LatencyConfig::default_static(),
153 };
154 let mut network = Network::new(network_config, rng);
155 for i in 0..4 {
156 network.insert(i);
157 }
158
159 let t: TopicId = [0u8; 32].into();
160
161 // Do some joins between nodes 0,1,2
162 network.command(0, t, Command::Join(vec![1, 2]));
163 network.command(1, t, Command::Join(vec![2]));
164 network.command(2, t, Command::Join(vec![]));
165
166 network.run_trips(3);
167
168 // Confirm emitted events
169 let actual = network.events_sorted();
170 let expected = sort(vec![
171 (0, t, Event::NeighborUp(1)),
172 (0, t, Event::NeighborUp(2)),
173 (1, t, Event::NeighborUp(2)),
174 (1, t, Event::NeighborUp(0)),
175 (2, t, Event::NeighborUp(0)),
176 (2, t, Event::NeighborUp(1)),
177 ]);
178 assert_eq!(actual, expected);
179
180 // Confirm active connections
181 assert_eq!(network.conns(), vec![(0, 1), (0, 2), (1, 2)]);
182
183 // Now let node 3 join node 0.
184 // Node 0 is full, so it will disconnect from either node 1 or node 2.
185 network.command(3, t, Command::Join(vec![0]));
186
187 network.run_trips(2);
188
189 // Confirm emitted events. There's two options because whether node 0 disconnects from
190 // node 1 or node 2 is random.
191 let actual = network.events_sorted();
192 eprintln!("actual {actual:#?}");
193 let expected1 = sort(vec![
194 (3, t, Event::NeighborUp(0)),
195 (0, t, Event::NeighborUp(3)),
196 (0, t, Event::NeighborDown(1)),
197 (1, t, Event::NeighborDown(0)),
198 ]);
199 let expected2 = sort(vec![
200 (3, t, Event::NeighborUp(0)),
201 (0, t, Event::NeighborUp(3)),
202 (0, t, Event::NeighborDown(2)),
203 (2, t, Event::NeighborDown(0)),
204 ]);
205 assert!((actual == expected1) || (actual == expected2));
206
207 // Confirm active connections.
208 if actual == expected1 {
209 assert_eq!(network.conns(), vec![(0, 2), (0, 3), (1, 2)]);
210 } else {
211 assert_eq!(network.conns(), vec![(0, 1), (0, 3), (1, 2)]);
212 }
213 assert!(network.check_synchronicity());
214 }
215
216 #[test]
217 #[traced_test]
218 fn plumtree_smoke() {
219 let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
220 let network_config = NetworkConfig {
221 proto: Config::default(),
222 latency: LatencyConfig::default_static(),
223 };
224 let mut network = Network::new(network_config, rng);
225 // build a network with 6 nodes
226 for i in 0..6 {
227 network.insert(i);
228 }
229
230 let t = [0u8; 32].into();
231
232 // let node 0 join the topic but do not connect to any peers
233 network.command(0, t, Command::Join(vec![]));
234 // connect nodes 1 and 2 to node 0
235 (1..3).for_each(|i| network.command(i, t, Command::Join(vec![0])));
236 // connect nodes 4 and 5 to node 3
237 network.command(3, t, Command::Join(vec![]));
238 (4..6).for_each(|i| network.command(i, t, Command::Join(vec![3])));
239 // run ticks and drain events
240
241 network.run_trips(4);
242
243 let _ = network.events();
244 assert!(network.check_synchronicity());
245
246 // now broadcast a first message
247 network.command(
248 1,
249 t,
250 Command::Broadcast(b"hi1".to_vec().into(), Scope::Swarm),
251 );
252
253 network.run_trips(4);
254
255 let events = network.events();
256 let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
257 // message should be received by two other nodes
258 assert_eq!(received.count(), 2);
259 assert!(network.check_synchronicity());
260
261 // now connect the two sections of the swarm
262 network.command(2, t, Command::Join(vec![5]));
263 network.run_trips(3);
264 let _ = network.events();
265 println!("{}", network.report());
266
267 // now broadcast again
268 network.command(
269 1,
270 t,
271 Command::Broadcast(b"hi2".to_vec().into(), Scope::Swarm),
272 );
273 network.run_trips(5);
274 let events = network.events();
275 let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
276 // message should be received by all 5 other nodes
277 assert_eq!(received.count(), 5);
278 assert!(network.check_synchronicity());
279 println!("{}", network.report());
280 }
281
282 #[test]
283 #[traced_test]
284 fn quit() {
285 // Create a network with 4 nodes and active_view_capacity 2
286 let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
287 let mut config = Config::default();
288 config.membership.active_view_capacity = 2;
289 let mut network = Network::new(config.into(), rng);
290 let num = 4;
291 for i in 0..num {
292 network.insert(i);
293 }
294
295 let t: TopicId = [0u8; 32].into();
296
297 // join all nodes
298 network.command(0, t, Command::Join(vec![]));
299 network.command(1, t, Command::Join(vec![0]));
300 network.command(2, t, Command::Join(vec![1]));
301 network.command(3, t, Command::Join(vec![2]));
302 network.run_trips(2);
303
304 // assert all peers appear in the connections
305 let all_conns: HashSet<u64> = HashSet::from_iter((0u64..4).flat_map(|p| {
306 network
307 .neighbors(&p, &t)
308 .into_iter()
309 .flat_map(|x| x.into_iter())
310 }));
311 assert_eq!(all_conns, HashSet::from_iter([0, 1, 2, 3]));
312 assert!(network.check_synchronicity());
313
314 // let node 3 leave the swarm
315 network.command(3, t, Command::Quit);
316 network.run_trips(4);
317 assert!(network.peer(&3).unwrap().state(&t).is_none());
318
319 // assert all peers without peer 3 appear in the connections
320 let all_conns: HashSet<u64> = HashSet::from_iter((0..num).flat_map(|p| {
321 network
322 .neighbors(&p, &t)
323 .into_iter()
324 .flat_map(|x| x.into_iter())
325 }));
326 assert_eq!(all_conns, HashSet::from_iter([0, 1, 2]));
327 assert!(network.check_synchronicity());
328 }
329
330 fn read_var<T: FromStr<Err: fmt::Display + fmt::Debug>>(name: &str, default: T) -> T {
331 env::var(name)
332 .map(|x| {
333 x.parse()
334 .unwrap_or_else(|_| panic!("Failed to parse environment variable {name}"))
335 })
336 .unwrap_or(default)
337 }
338
339 fn sort<T: Ord + Clone>(items: Vec<T>) -> Vec<T> {
340 let mut sorted = items;
341 sorted.sort();
342 sorted
343 }
344}