iroh_gossip/proto.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
//! Implementation of the iroh-gossip protocol, as an IO-less state machine
//!
//! This module implements the iroh-gossip protocol. The entry point is [`State`], which contains
//! the protocol state for a node.
//!
//! The iroh-gossip protocol is made up from two parts: A swarm membership protocol, based on
//! [HyParView][hyparview], and a gossip broadcasting protocol, based on [PlumTree][plumtree].
//!
//! For a full explanation it is recommended to read the two papers. What follows is a brief
//! outline of the protocols.
//!
//! All protocol messages are namespaced by a [`TopicId`], a 32 byte identifier. Topics are
//! separate swarms and broadcast scopes. The HyParView and PlumTree algorithms both work in the
//! scope of a single topic. Thus, joining multiple topics increases the number of open connections
//! to peers and the size of the local routing table.
//!
//! The **membership protocol** ([HyParView][hyparview]) is a cluster protocol where each peer
//! maintains a partial view of all nodes in the swarm.
//! A peer joins the swarm for a topic by connecting to any known peer that is a member of this
//! topic's swarm. Obtaining this initial contact info happens out of band. The peer then sends
//! a `Join` message to that initial peer. All peers maintain a list of
//! `active` and `passive` peers. Active peers are those that you maintain active connections to.
//! Passive peers is an addressbook of additional peers. If one of your active peers goes offline,
//! its slot is filled with a random peer from the passive set. In the default configuration, the
//! active view has a size of 5 and the passive view a size of 30.
//! The HyParView protocol ensures that active connections are always bidirectional, and regularly
//! exchanges nodes for the passive view in a `Shuffle` operation.
//! Thus, this protocol exposes a high degree of reliability and auto-recovery in the case of node
//! failures.
//!
//! The **gossip protocol** ([PlumTree][plumtree]) builds upon the membership protocol. It exposes
//! a method to broadcast messages to all peers in the swarm. On each node, it maintains two sets
//! of peers: An `eager` set and a `lazy` set. Both are subsets of the `active` view from the
//! membership protocol. When broadcasting a message from the local node, or upon receiving a
//! broadcast message, the message is pushed to all peers in the eager set. Additionally, the hash
//! of the message (which uniquely identifies it), but not the message content, is lazily pushed
//! to all peers in the `lazy` set. When receiving such lazy pushes (called `Ihaves`), those peers
//! may request the message content after a timeout if they didn't receive the message by one of
//! their eager peers before. When requesting a message from a currently-lazy peer, this peer is
//! also upgraded to be an eager peer from that moment on. This strategy self-optimizes the
//! messaging graph by latency. Note however that this optimization will work best if the messaging
//! paths are stable, i.e. if it's always the same peer that broadcasts. If not, the relative
//! message redundancy will grow and the ideal messaging graph might change frequently.
//!
//! [hyparview]: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
//! [plumtree]: https://asc.di.fct.unl.pt/~jleitao/pdf/srds07-leitao.pdf
use std::{fmt, hash::Hash};
use bytes::Bytes;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
mod hyparview;
mod plumtree;
pub mod state;
pub mod topic;
pub mod util;
#[cfg(test)]
mod tests;
pub use hyparview::Config as HyparviewConfig;
pub use plumtree::{Config as PlumtreeConfig, DeliveryScope, Scope};
pub use state::{InEvent, Message, OutEvent, State, Timer, TopicId};
pub use topic::{Command, Config, Event, IO};
/// The identifier for a peer.
///
/// The protocol implementation is generic over this trait. When implementing the protocol,
/// a concrete type must be chosen that will then be used throughout the implementation to identify
/// and index individual peers.
///
/// Note that the concrete type will be used in protocol messages. Therefore, implementations of
/// the protocol are only compatible if the same concrete type is supplied for this trait.
///
/// TODO: Rename to `PeerId`? It does not necessarily refer to a peer's address, as long as the
/// networking layer can translate the value of its concrete type into an address.
pub trait PeerIdentity: Hash + Eq + Copy + fmt::Debug + Serialize + DeserializeOwned {}
impl<T> PeerIdentity for T where T: Hash + Eq + Copy + fmt::Debug + Serialize + DeserializeOwned {}
/// Opaque binary data that is transmitted on messages that introduce new peers.
///
/// Implementations may use these bytes to supply addresses or other information needed to connect
/// to a peer that is not included in the peer's [`PeerIdentity`].
#[derive(derive_more::Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
#[debug("PeerData({}b)", self.0.len())]
pub struct PeerData(Bytes);
impl PeerData {
/// Create a new [`PeerData`] from a byte buffer.
pub fn new(data: impl Into<Bytes>) -> Self {
Self(data.into())
}
/// Get a reference to the contained [`bytes::Bytes`].
pub fn inner(&self) -> &bytes::Bytes {
&self.0
}
/// Get the peer data as a byte slice.
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
/// Returns true if the peer data is empty.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
/// PeerInfo contains a peer's identifier and the opaque peer data as provided by the implementer.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
struct PeerInfo<PI> {
pub id: PI,
pub data: Option<PeerData>,
}
impl<PI> From<(PI, Option<PeerData>)> for PeerInfo<PI> {
fn from((id, data): (PI, Option<PeerData>)) -> Self {
Self { id, data }
}
}
#[cfg(test)]
mod test {
use std::{collections::HashSet, env};
use n0_future::time::Instant;
use rand::SeedableRng;
use tracing_test::traced_test;
use super::{Command, Config, Event, State};
use crate::proto::{
tests::{
assert_synchronous_active, report_round_distribution, sort, Network, Simulator,
SimulatorConfig,
},
Scope, TopicId,
};
#[test]
#[traced_test]
fn hyparview_smoke() {
// Create a network with 4 nodes and active_view_capacity 2
let mut config = Config::default();
config.membership.active_view_capacity = 2;
let mut network = Network::new(Instant::now());
let rng = rand_chacha::ChaCha12Rng::seed_from_u64(99);
for i in 0..4 {
network.push(State::new(
i,
Default::default(),
config.clone(),
rng.clone(),
));
}
let t: TopicId = [0u8; 32].into();
// Do some joins between nodes 0,1,2
network.command(0, t, Command::Join(vec![1]));
network.command(0, t, Command::Join(vec![2]));
network.command(1, t, Command::Join(vec![2]));
network.command(2, t, Command::Join(vec![]));
network.ticks(10);
// Confirm emitted events
let actual = network.events_sorted();
let expected = sort(vec![
(0, t, Event::NeighborUp(1)),
(0, t, Event::NeighborUp(2)),
(1, t, Event::NeighborUp(2)),
(1, t, Event::NeighborUp(0)),
(2, t, Event::NeighborUp(0)),
(2, t, Event::NeighborUp(1)),
]);
assert_eq!(actual, expected);
// Confirm active connections
assert_eq!(network.conns(), vec![(0, 1), (0, 2), (1, 2)]);
// Now let node 3 join node 0.
// Node 0 is full, so it will disconnect from either node 1 or node 2.
network.command(3, t, Command::Join(vec![0]));
network.ticks(10);
// Confirm emitted events. There's two options because whether node 0 disconnects from
// node 1 or node 2 is random.
let actual = network.events_sorted();
eprintln!("actual {actual:?}");
let expected1 = sort(vec![
(3, t, Event::NeighborUp(0)),
(0, t, Event::NeighborUp(3)),
(0, t, Event::NeighborDown(1)),
(1, t, Event::NeighborDown(0)),
]);
let expected2 = sort(vec![
(3, t, Event::NeighborUp(0)),
(0, t, Event::NeighborUp(3)),
(0, t, Event::NeighborDown(2)),
(2, t, Event::NeighborDown(0)),
]);
assert!((actual == expected1) || (actual == expected2));
// Confirm active connections.
if actual == expected1 {
assert_eq!(network.conns(), vec![(0, 2), (0, 3), (1, 2)]);
} else {
assert_eq!(network.conns(), vec![(0, 1), (0, 3), (1, 2)]);
}
assert!(assert_synchronous_active(&network));
}
#[test]
#[traced_test]
fn plumtree_smoke() {
let config = Config::default();
let mut network = Network::new(Instant::now());
let broadcast_ticks = 12;
let join_ticks = 13;
// build a network with 6 nodes
let rng = rand_chacha::ChaCha12Rng::seed_from_u64(99);
for i in 0..6 {
network.push(State::new(
i,
Default::default(),
config.clone(),
rng.clone(),
));
}
let t = [0u8; 32].into();
// let node 0 join the topic but do not connect to any peers
network.command(0, t, Command::Join(vec![]));
// connect nodes 1 and 2 to node 0
(1..3).for_each(|i| network.command(i, t, Command::Join(vec![0])));
// connect nodes 4 and 5 to node 3
network.command(3, t, Command::Join(vec![]));
(4..6).for_each(|i| network.command(i, t, Command::Join(vec![3])));
// run ticks and drain events
network.ticks(join_ticks);
let _ = network.events();
assert!(assert_synchronous_active(&network));
// now broadcast a first message
network.command(
1,
t,
Command::Broadcast(b"hi1".to_vec().into(), Scope::Swarm),
);
network.ticks(broadcast_ticks);
let events = network.events();
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
// message should be received by two other nodes
assert_eq!(received.count(), 2);
assert!(assert_synchronous_active(&network));
// now connect the two sections of the swarm
network.command(2, t, Command::Join(vec![5]));
network.ticks(join_ticks);
let _ = network.events();
report_round_distribution(&network);
// now broadcast again
network.command(
1,
t,
Command::Broadcast(b"hi2".to_vec().into(), Scope::Swarm),
);
network.ticks(broadcast_ticks);
let events = network.events();
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
// message should be received by all 5 other nodes
assert_eq!(received.count(), 5);
assert!(assert_synchronous_active(&network));
report_round_distribution(&network);
}
#[test]
#[traced_test]
fn big_multiple_sender() {
let mut gossip_config = Config::default();
gossip_config.broadcast.optimization_threshold = (read_var("OPTIM", 7) as u16).into();
let config = SimulatorConfig {
peers_count: read_var("PEERS", 100),
..Default::default()
};
let rounds = read_var("ROUNDS", 10);
let mut simulator = Simulator::new(config, gossip_config);
simulator.init();
simulator.bootstrap();
for i in 0..rounds {
let from = i + 1;
let message = format!("m{i}").into_bytes().into();
simulator.gossip_round(from, message)
}
simulator.report_round_sums();
}
#[test]
#[traced_test]
fn big_single_sender() {
let mut gossip_config = Config::default();
gossip_config.broadcast.optimization_threshold = (read_var("OPTIM", 7) as u16).into();
let config = SimulatorConfig {
peers_count: read_var("PEERS", 100),
..Default::default()
};
let rounds = read_var("ROUNDS", 10);
let mut simulator = Simulator::new(config, gossip_config);
simulator.init();
simulator.bootstrap();
for i in 0..rounds {
let from = 2;
let message = format!("m{i}").into_bytes().into();
simulator.gossip_round(from, message)
}
simulator.report_round_sums();
}
#[test]
#[traced_test]
fn quit() {
// Create a network with 4 nodes and active_view_capacity 2
let mut config = Config::default();
config.membership.active_view_capacity = 2;
let mut network = Network::new(Instant::now());
let num = 4;
let rng = rand_chacha::ChaCha12Rng::seed_from_u64(99);
for i in 0..num {
network.push(State::new(
i,
Default::default(),
config.clone(),
rng.clone(),
));
}
let t: TopicId = [0u8; 32].into();
// join all nodes
network.command(0, t, Command::Join(vec![]));
network.command(1, t, Command::Join(vec![0]));
network.command(2, t, Command::Join(vec![1]));
network.command(3, t, Command::Join(vec![2]));
network.ticks(10);
// assert all peers appear in the connections
let all_conns: HashSet<i32> = HashSet::from_iter((0..4).flat_map(|pa| {
network
.get_active(&pa, &t)
.unwrap()
.into_iter()
.flat_map(|x| x.into_iter())
}));
assert_eq!(all_conns, HashSet::from_iter([0, 1, 2, 3]));
assert!(assert_synchronous_active(&network));
// let node 3 leave the swarm
network.command(3, t, Command::Quit);
network.ticks(4);
assert!(network.peer(&3).unwrap().state(&t).is_none());
// assert all peers without peer 3 appear in the connections
let all_conns: HashSet<i32> = HashSet::from_iter((0..num).flat_map(|pa| {
network
.get_active(&pa, &t)
.unwrap()
.into_iter()
.flat_map(|x| x.into_iter())
}));
assert_eq!(all_conns, HashSet::from_iter([0, 1, 2]));
assert!(assert_synchronous_active(&network));
}
fn read_var(name: &str, default: usize) -> usize {
env::var(name)
.unwrap_or_else(|_| default.to_string())
.parse()
.unwrap()
}
}