1use std::collections::VecDeque;
4
5use bytes::Bytes;
6use derive_more::From;
7use n0_future::time::{Duration, Instant};
8use rand::Rng;
9use serde::{Deserialize, Serialize};
10
11use super::{
12 hyparview::{self, InEvent as SwarmIn},
13 plumtree::{self, GossipEvent, InEvent as GossipIn, Scope},
14 state::MessageKind,
15 PeerData, PeerIdentity, DEFAULT_MAX_MESSAGE_SIZE,
16};
17use crate::proto::MIN_MAX_MESSAGE_SIZE;
18
19#[derive(Clone, Debug)]
21pub enum InEvent<PI> {
22 RecvMessage(PI, Message<PI>),
24 Command(Command<PI>),
26 TimerExpired(Timer<PI>),
28 PeerDisconnected(PI),
30 UpdatePeerData(PeerData),
32}
33
34#[derive(Debug, PartialEq, Eq)]
36pub enum OutEvent<PI> {
37 SendMessage(PI, Message<PI>),
39 EmitEvent(Event<PI>),
41 ScheduleTimer(Duration, Timer<PI>),
44 DisconnectPeer(PI),
46 PeerData(PI, PeerData),
48}
49
50impl<PI> From<hyparview::OutEvent<PI>> for OutEvent<PI> {
51 fn from(event: hyparview::OutEvent<PI>) -> Self {
52 use hyparview::OutEvent::*;
53 match event {
54 SendMessage(to, message) => Self::SendMessage(to, message.into()),
55 ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()),
56 DisconnectPeer(peer) => Self::DisconnectPeer(peer),
57 EmitEvent(event) => Self::EmitEvent(event.into()),
58 PeerData(peer, data) => Self::PeerData(peer, data),
59 }
60 }
61}
62
63impl<PI> From<plumtree::OutEvent<PI>> for OutEvent<PI> {
64 fn from(event: plumtree::OutEvent<PI>) -> Self {
65 use plumtree::OutEvent::*;
66 match event {
67 SendMessage(to, message) => Self::SendMessage(to, message.into()),
68 ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()),
69 EmitEvent(event) => Self::EmitEvent(event.into()),
70 }
71 }
72}
73
74pub trait IO<PI: Clone> {
79 fn push(&mut self, event: impl Into<OutEvent<PI>>);
81
82 fn push_from_iter(&mut self, iter: impl IntoIterator<Item = impl Into<OutEvent<PI>>>) {
84 for event in iter.into_iter() {
85 self.push(event);
86 }
87 }
88}
89
90#[derive(From, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
92pub enum Message<PI> {
93 Swarm(hyparview::Message<PI>),
95 Gossip(plumtree::Message),
97}
98
99impl<PI> Message<PI> {
100 pub fn kind(&self) -> MessageKind {
102 match self {
103 Message::Swarm(_) => MessageKind::Control,
104 Message::Gossip(message) => match message {
105 plumtree::Message::Gossip(_) => MessageKind::Data,
106 _ => MessageKind::Control,
107 },
108 }
109 }
110
111 pub fn is_disconnect(&self) -> bool {
113 matches!(self, Message::Swarm(hyparview::Message::Disconnect(_)))
114 }
115}
116
117#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
119pub enum Event<PI> {
120 NeighborUp(PI),
122 NeighborDown(PI),
124 Received(GossipEvent<PI>),
126}
127
128impl<PI> From<hyparview::Event<PI>> for Event<PI> {
129 fn from(value: hyparview::Event<PI>) -> Self {
130 match value {
131 hyparview::Event::NeighborUp(peer) => Self::NeighborUp(peer),
132 hyparview::Event::NeighborDown(peer) => Self::NeighborDown(peer),
133 }
134 }
135}
136
137impl<PI> From<plumtree::Event<PI>> for Event<PI> {
138 fn from(value: plumtree::Event<PI>) -> Self {
139 match value {
140 plumtree::Event::Received(event) => Self::Received(event),
141 }
142 }
143}
144
145#[derive(Clone, From, Debug, PartialEq, Eq)]
150pub enum Timer<PI> {
151 Swarm(hyparview::Timer<PI>),
153 Gossip(plumtree::Timer),
155}
156
157#[derive(Clone, derive_more::Debug)]
159pub enum Command<PI> {
160 Join(Vec<PI>),
165 Broadcast(#[debug("<{}b>", _0.len())] Bytes, Scope),
167 Quit,
169}
170
171impl<PI: Clone> IO<PI> for VecDeque<OutEvent<PI>> {
172 fn push(&mut self, event: impl Into<OutEvent<PI>>) {
173 self.push_back(event.into())
174 }
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize)]
179#[serde(default)]
180pub struct Config {
181 pub membership: hyparview::Config,
183 pub broadcast: plumtree::Config,
185 pub max_message_size: usize,
193}
194
195impl Default for Config {
196 fn default() -> Self {
197 Self {
198 membership: Default::default(),
199 broadcast: Default::default(),
200 max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
201 }
202 }
203}
204
205#[derive(Debug)]
207pub struct State<PI, R> {
208 me: PI,
209 pub(crate) swarm: hyparview::State<PI, R>,
210 pub(crate) gossip: plumtree::State<PI>,
211 outbox: VecDeque<OutEvent<PI>>,
212 stats: Stats,
213}
214
215impl<PI: PeerIdentity> State<PI, rand::rngs::ThreadRng> {
216 pub fn new(me: PI, me_data: Option<PeerData>, config: Config) -> Self {
222 Self::with_rng(me, me_data, config, rand::rng())
223 }
224}
225
226impl<PI, R> State<PI, R> {
227 pub fn endpoint(&self) -> &PI {
229 &self.me
230 }
231}
232
233impl<PI: PeerIdentity, R: Rng> State<PI, R> {
234 pub fn with_rng(me: PI, me_data: Option<PeerData>, config: Config, rng: R) -> Self {
240 assert!(
241 config.max_message_size >= MIN_MAX_MESSAGE_SIZE,
242 "max_message_size must be at least {MIN_MAX_MESSAGE_SIZE}"
243 );
244 let max_payload_size =
245 config.max_message_size - super::Message::<PI>::postcard_header_size();
246 Self {
247 swarm: hyparview::State::new(me, me_data, config.membership, rng),
248 gossip: plumtree::State::new(me, config.broadcast, max_payload_size),
249 me,
250 outbox: VecDeque::new(),
251 stats: Stats::default(),
252 }
253 }
254
255 pub fn handle(
259 &mut self,
260 event: InEvent<PI>,
261 now: Instant,
262 ) -> impl Iterator<Item = OutEvent<PI>> + '_ {
263 let io = &mut self.outbox;
264 match event {
266 InEvent::Command(command) => match command {
267 Command::Join(peers) => {
268 for peer in peers {
269 self.swarm.handle(SwarmIn::RequestJoin(peer), io);
270 }
271 }
272 Command::Broadcast(data, scope) => {
273 self.gossip
274 .handle(GossipIn::Broadcast(data, scope), now, io)
275 }
276 Command::Quit => self.swarm.handle(SwarmIn::Quit, io),
277 },
278 InEvent::RecvMessage(from, message) => {
279 self.stats.messages_received += 1;
280 match message {
281 Message::Swarm(message) => {
282 self.swarm.handle(SwarmIn::RecvMessage(from, message), io)
283 }
284 Message::Gossip(message) => {
285 self.gossip
286 .handle(GossipIn::RecvMessage(from, message), now, io)
287 }
288 }
289 }
290 InEvent::TimerExpired(timer) => match timer {
291 Timer::Swarm(timer) => self.swarm.handle(SwarmIn::TimerExpired(timer), io),
292 Timer::Gossip(timer) => self.gossip.handle(GossipIn::TimerExpired(timer), now, io),
293 },
294 InEvent::PeerDisconnected(peer) => {
295 self.swarm.handle(SwarmIn::PeerDisconnected(peer), io);
296 self.gossip.handle(GossipIn::NeighborDown(peer), now, io);
297 }
298 InEvent::UpdatePeerData(data) => self.swarm.handle(SwarmIn::UpdatePeerData(data), io),
299 }
300
301 let mut io = VecDeque::new();
303 for event in self.outbox.iter() {
304 match event {
305 OutEvent::EmitEvent(Event::NeighborUp(peer)) => {
306 self.gossip
307 .handle(GossipIn::NeighborUp(*peer), now, &mut io)
308 }
309 OutEvent::EmitEvent(Event::NeighborDown(peer)) => {
310 self.gossip
311 .handle(GossipIn::NeighborDown(*peer), now, &mut io)
312 }
313 _ => {}
314 }
315 }
316 self.outbox.extend(io.drain(..));
319
320 self.stats.messages_sent += self
322 .outbox
323 .iter()
324 .filter(|event| matches!(event, OutEvent::SendMessage(_, _)))
325 .count();
326
327 self.outbox.drain(..)
328 }
329
330 pub fn stats(&self) -> &Stats {
333 &self.stats
334 }
335
336 pub fn reset_stats(&mut self) {
338 self.gossip.stats = Default::default();
339 self.swarm.stats = Default::default();
340 self.stats = Default::default();
341 }
342
343 pub fn gossip_stats(&self) -> &plumtree::Stats {
347 self.gossip.stats()
348 }
349
350 pub fn has_active_peers(&self) -> bool {
352 !self.swarm.active_view.is_empty()
353 }
354}
355
356#[derive(Clone, Debug, Default)]
358pub struct Stats {
359 pub messages_sent: usize,
361 pub messages_received: usize,
363}