1use std::collections::VecDeque;
4
5use bytes::Bytes;
6use derive_more::From;
7use n0_future::time::{Duration, Instant};
8use rand::{Rng, SeedableRng};
9use serde::{Deserialize, Serialize};
10
11use super::{
12 hyparview::{self, InEvent as SwarmIn},
13 plumtree::{self, GossipEvent, InEvent as GossipIn, Scope},
14 state::MessageKind,
15 PeerData, PeerIdentity, DEFAULT_MAX_MESSAGE_SIZE,
16};
17use crate::proto::MIN_MAX_MESSAGE_SIZE;
18
19#[derive(Clone, Debug)]
21pub enum InEvent<PI> {
22 RecvMessage(PI, Message<PI>),
24 Command(Command<PI>),
26 TimerExpired(Timer<PI>),
28 PeerDisconnected(PI),
30 UpdatePeerData(PeerData),
32}
33
34#[derive(Debug, PartialEq, Eq)]
36pub enum OutEvent<PI> {
37 SendMessage(PI, Message<PI>),
39 EmitEvent(Event<PI>),
41 ScheduleTimer(Duration, Timer<PI>),
44 DisconnectPeer(PI),
46 PeerData(PI, PeerData),
48}
49
50impl<PI> From<hyparview::OutEvent<PI>> for OutEvent<PI> {
51 fn from(event: hyparview::OutEvent<PI>) -> Self {
52 use hyparview::OutEvent::*;
53 match event {
54 SendMessage(to, message) => Self::SendMessage(to, message.into()),
55 ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()),
56 DisconnectPeer(peer) => Self::DisconnectPeer(peer),
57 EmitEvent(event) => Self::EmitEvent(event.into()),
58 PeerData(peer, data) => Self::PeerData(peer, data),
59 }
60 }
61}
62
63impl<PI> From<plumtree::OutEvent<PI>> for OutEvent<PI> {
64 fn from(event: plumtree::OutEvent<PI>) -> Self {
65 use plumtree::OutEvent::*;
66 match event {
67 SendMessage(to, message) => Self::SendMessage(to, message.into()),
68 ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()),
69 EmitEvent(event) => Self::EmitEvent(event.into()),
70 }
71 }
72}
73
74pub trait IO<PI: Clone> {
79 fn push(&mut self, event: impl Into<OutEvent<PI>>);
81
82 fn push_from_iter(&mut self, iter: impl IntoIterator<Item = impl Into<OutEvent<PI>>>) {
84 for event in iter.into_iter() {
85 self.push(event);
86 }
87 }
88}
89
90#[derive(From, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
92pub enum Message<PI> {
93 Swarm(hyparview::Message<PI>),
95 Gossip(plumtree::Message),
97}
98
99impl<PI> Message<PI> {
100 pub fn kind(&self) -> MessageKind {
102 match self {
103 Message::Swarm(_) => MessageKind::Control,
104 Message::Gossip(message) => match message {
105 plumtree::Message::Gossip(_) => MessageKind::Data,
106 _ => MessageKind::Control,
107 },
108 }
109 }
110
111 pub fn is_disconnect(&self) -> bool {
113 matches!(self, Message::Swarm(hyparview::Message::Disconnect(_)))
114 }
115}
116
117impl<PI: Serialize> Message<PI> {
118 pub fn size(&self) -> postcard::Result<usize> {
120 postcard::experimental::serialized_size(&self)
121 }
122}
123
124#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
126pub enum Event<PI> {
127 NeighborUp(PI),
129 NeighborDown(PI),
131 Received(GossipEvent<PI>),
133}
134
135impl<PI> From<hyparview::Event<PI>> for Event<PI> {
136 fn from(value: hyparview::Event<PI>) -> Self {
137 match value {
138 hyparview::Event::NeighborUp(peer) => Self::NeighborUp(peer),
139 hyparview::Event::NeighborDown(peer) => Self::NeighborDown(peer),
140 }
141 }
142}
143
144impl<PI> From<plumtree::Event<PI>> for Event<PI> {
145 fn from(value: plumtree::Event<PI>) -> Self {
146 match value {
147 plumtree::Event::Received(event) => Self::Received(event),
148 }
149 }
150}
151
152#[derive(Clone, From, Debug, PartialEq, Eq)]
157pub enum Timer<PI> {
158 Swarm(hyparview::Timer<PI>),
160 Gossip(plumtree::Timer),
162}
163
164#[derive(Clone, derive_more::Debug)]
166pub enum Command<PI> {
167 Join(Vec<PI>),
172 Broadcast(#[debug("<{}b>", _0.len())] Bytes, Scope),
174 Quit,
176}
177
178impl<PI: Clone> IO<PI> for VecDeque<OutEvent<PI>> {
179 fn push(&mut self, event: impl Into<OutEvent<PI>>) {
180 self.push_back(event.into())
181 }
182}
183
184#[derive(Clone, Debug, Serialize, Deserialize)]
186#[serde(default)]
187pub struct Config {
188 pub membership: hyparview::Config,
190 pub broadcast: plumtree::Config,
192 pub max_message_size: usize,
200}
201
202impl Default for Config {
203 fn default() -> Self {
204 Self {
205 membership: Default::default(),
206 broadcast: Default::default(),
207 max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
208 }
209 }
210}
211
212#[derive(Debug)]
214pub struct State<PI, R> {
215 me: PI,
216 pub(crate) swarm: hyparview::State<PI, R>,
217 pub(crate) gossip: plumtree::State<PI>,
218 outbox: VecDeque<OutEvent<PI>>,
219 stats: Stats,
220}
221
222impl<PI: PeerIdentity> State<PI, rand::rngs::StdRng> {
223 pub fn new(me: PI, me_data: Option<PeerData>, config: Config) -> Self {
229 Self::with_rng(me, me_data, config, rand::rngs::StdRng::from_os_rng())
230 }
231}
232
233impl<PI, R> State<PI, R> {
234 pub fn endpoint(&self) -> &PI {
236 &self.me
237 }
238}
239
240impl<PI: PeerIdentity, R: Rng> State<PI, R> {
241 pub fn with_rng(me: PI, me_data: Option<PeerData>, config: Config, rng: R) -> Self {
247 assert!(
248 config.max_message_size >= MIN_MAX_MESSAGE_SIZE,
249 "max_message_size must be at least {MIN_MAX_MESSAGE_SIZE}"
250 );
251 let max_payload_size =
252 config.max_message_size - super::Message::<PI>::postcard_header_size();
253 Self {
254 swarm: hyparview::State::new(me, me_data, config.membership, rng),
255 gossip: plumtree::State::new(me, config.broadcast, max_payload_size),
256 me,
257 outbox: VecDeque::new(),
258 stats: Stats::default(),
259 }
260 }
261
262 pub fn handle(
266 &mut self,
267 event: InEvent<PI>,
268 now: Instant,
269 ) -> impl Iterator<Item = OutEvent<PI>> + '_ {
270 let io = &mut self.outbox;
271 match event {
273 InEvent::Command(command) => match command {
274 Command::Join(peers) => {
275 for peer in peers {
276 self.swarm.handle(SwarmIn::RequestJoin(peer), io);
277 }
278 }
279 Command::Broadcast(data, scope) => {
280 self.gossip
281 .handle(GossipIn::Broadcast(data, scope), now, io)
282 }
283 Command::Quit => self.swarm.handle(SwarmIn::Quit, io),
284 },
285 InEvent::RecvMessage(from, message) => {
286 self.stats.messages_received += 1;
287 match message {
288 Message::Swarm(message) => {
289 self.swarm.handle(SwarmIn::RecvMessage(from, message), io)
290 }
291 Message::Gossip(message) => {
292 self.gossip
293 .handle(GossipIn::RecvMessage(from, message), now, io)
294 }
295 }
296 }
297 InEvent::TimerExpired(timer) => match timer {
298 Timer::Swarm(timer) => self.swarm.handle(SwarmIn::TimerExpired(timer), io),
299 Timer::Gossip(timer) => self.gossip.handle(GossipIn::TimerExpired(timer), now, io),
300 },
301 InEvent::PeerDisconnected(peer) => {
302 self.swarm.handle(SwarmIn::PeerDisconnected(peer), io);
303 self.gossip.handle(GossipIn::NeighborDown(peer), now, io);
304 }
305 InEvent::UpdatePeerData(data) => self.swarm.handle(SwarmIn::UpdatePeerData(data), io),
306 }
307
308 let mut io = VecDeque::new();
310 for event in self.outbox.iter() {
311 match event {
312 OutEvent::EmitEvent(Event::NeighborUp(peer)) => {
313 self.gossip
314 .handle(GossipIn::NeighborUp(*peer), now, &mut io)
315 }
316 OutEvent::EmitEvent(Event::NeighborDown(peer)) => {
317 self.gossip
318 .handle(GossipIn::NeighborDown(*peer), now, &mut io)
319 }
320 _ => {}
321 }
322 }
323 self.outbox.extend(io.drain(..));
326
327 self.stats.messages_sent += self
329 .outbox
330 .iter()
331 .filter(|event| matches!(event, OutEvent::SendMessage(_, _)))
332 .count();
333
334 self.outbox.drain(..)
335 }
336
337 pub fn stats(&self) -> &Stats {
340 &self.stats
341 }
342
343 pub fn reset_stats(&mut self) {
345 self.gossip.stats = Default::default();
346 self.swarm.stats = Default::default();
347 self.stats = Default::default();
348 }
349
350 pub fn gossip_stats(&self) -> &plumtree::Stats {
354 self.gossip.stats()
355 }
356
357 pub fn has_active_peers(&self) -> bool {
359 !self.swarm.active_view.is_empty()
360 }
361}
362
363#[derive(Clone, Debug, Default)]
365pub struct Stats {
366 pub messages_sent: usize,
368 pub messages_received: usize,
370}