1use std::collections::{hash_map, HashMap, HashSet};
4
5use n0_future::time::{Duration, Instant};
6use rand::Rng;
7use serde::{Deserialize, Serialize};
8use tracing::trace;
9
10use crate::{
11 metrics::Metrics,
12 proto::{
13 topic::{self, Command},
14 util::idbytes_impls,
15 Config, PeerData, PeerIdentity, MIN_MAX_MESSAGE_SIZE,
16 },
17};
18
19#[derive(Clone, Copy, Eq, PartialEq, Hash, Serialize, Ord, PartialOrd, Deserialize)]
21pub struct TopicId([u8; 32]);
22idbytes_impls!(TopicId, "TopicId");
23
24impl TopicId {
25 pub fn fmt_short(&self) -> String {
28 data_encoding::HEXLOWER.encode(&self.as_bytes()[..5])
29 }
30}
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct Message<PI> {
37 pub(crate) topic: TopicId,
38 pub(crate) message: topic::Message<PI>,
39}
40
41impl<PI> Message<PI> {
42 pub fn kind(&self) -> MessageKind {
44 self.message.kind()
45 }
46}
47
48impl<PI: Serialize> Message<PI> {
49 pub(crate) fn postcard_header_size() -> usize {
50 let m = Self {
53 topic: TopicId(Default::default()),
54 message: topic::Message::<PI>::Gossip(super::plumtree::Message::Prune),
55 };
56 postcard::experimental::serialized_size(&m).unwrap() - 1
57 }
58}
59
60#[derive(Debug)]
62pub enum MessageKind {
63 Data,
65 Control,
67}
68
69impl<PI: Serialize> Message<PI> {
70 pub fn size(&self) -> postcard::Result<usize> {
72 postcard::experimental::serialized_size(&self)
73 }
74}
75
76#[derive(Clone, Debug)]
83pub struct Timer<PI> {
84 topic: TopicId,
85 timer: topic::Timer<PI>,
86}
87
88#[derive(Clone, Debug)]
90pub enum InEvent<PI> {
91 RecvMessage(PI, Message<PI>),
93 Command(TopicId, Command<PI>),
95 TimerExpired(Timer<PI>),
97 PeerDisconnected(PI),
99 UpdatePeerData(PeerData),
101}
102
103#[derive(Debug, Clone)]
105pub enum OutEvent<PI> {
106 SendMessage(PI, Message<PI>),
108 EmitEvent(TopicId, topic::Event<PI>),
110 ScheduleTimer(Duration, Timer<PI>),
113 DisconnectPeer(PI),
115 PeerData(PI, PeerData),
117}
118
119type ConnsMap<PI> = HashMap<PI, HashSet<TopicId>>;
120type Outbox<PI> = Vec<OutEvent<PI>>;
121
122enum InEventMapped<PI> {
123 All(topic::InEvent<PI>),
124 TopicEvent(TopicId, topic::InEvent<PI>),
125}
126
127impl<PI> From<InEvent<PI>> for InEventMapped<PI> {
128 fn from(event: InEvent<PI>) -> InEventMapped<PI> {
129 match event {
130 InEvent::RecvMessage(from, Message { topic, message }) => {
131 Self::TopicEvent(topic, topic::InEvent::RecvMessage(from, message))
132 }
133 InEvent::Command(topic, command) => {
134 Self::TopicEvent(topic, topic::InEvent::Command(command))
135 }
136 InEvent::TimerExpired(Timer { topic, timer }) => {
137 Self::TopicEvent(topic, topic::InEvent::TimerExpired(timer))
138 }
139 InEvent::PeerDisconnected(peer) => Self::All(topic::InEvent::PeerDisconnected(peer)),
140 InEvent::UpdatePeerData(data) => Self::All(topic::InEvent::UpdatePeerData(data)),
141 }
142 }
143}
144
145#[derive(Debug)]
154pub struct State<PI, R> {
155 me: PI,
156 me_data: PeerData,
157 config: Config,
158 rng: R,
159 states: HashMap<TopicId, topic::State<PI, R>>,
160 outbox: Outbox<PI>,
161 peer_topics: ConnsMap<PI>,
162}
163
164impl<PI: PeerIdentity, R: Rng + Clone> State<PI, R> {
165 pub fn new(me: PI, me_data: PeerData, config: Config, rng: R) -> Self {
176 assert!(
177 config.max_message_size >= MIN_MAX_MESSAGE_SIZE,
178 "max_message_size must be at least {MIN_MAX_MESSAGE_SIZE}"
179 );
180 Self {
181 me,
182 me_data,
183 config,
184 rng,
185 states: Default::default(),
186 outbox: Default::default(),
187 peer_topics: Default::default(),
188 }
189 }
190
191 pub fn me(&self) -> &PI {
193 &self.me
194 }
195
196 pub fn state(&self, topic: &TopicId) -> Option<&topic::State<PI, R>> {
198 self.states.get(topic)
199 }
200
201 pub fn reset_stats(&mut self, topic: &TopicId) {
203 if let Some(state) = self.states.get_mut(topic) {
204 state.reset_stats();
205 }
206 }
207
208 pub fn topics(&self) -> impl Iterator<Item = &TopicId> {
210 self.states.keys()
211 }
212
213 pub fn states(&self) -> impl Iterator<Item = (&TopicId, &topic::State<PI, R>)> {
215 self.states.iter()
216 }
217
218 pub fn has_active_peers(&self, topic: &TopicId) -> bool {
220 self.state(topic)
221 .map(|s| s.has_active_peers())
222 .unwrap_or(false)
223 }
224
225 pub fn max_message_size(&self) -> usize {
227 self.config.max_message_size
228 }
229
230 pub fn handle(
234 &mut self,
235 event: InEvent<PI>,
236 now: Instant,
237 metrics: Option<&Metrics>,
238 ) -> impl Iterator<Item = OutEvent<PI>> + '_ + use<'_, PI, R> {
239 trace!("in : {event:?}");
240 if let Some(metrics) = &metrics {
241 track_in_event(&event, metrics);
242 }
243
244 let event: InEventMapped<PI> = event.into();
245
246 match event {
247 InEventMapped::TopicEvent(topic, event) => {
248 if matches!(&event, topic::InEvent::Command(Command::Join(_peers))) {
250 if let hash_map::Entry::Vacant(e) = self.states.entry(topic) {
251 e.insert(topic::State::with_rng(
252 self.me,
253 Some(self.me_data.clone()),
254 self.config.clone(),
255 self.rng.clone(),
256 ));
257 }
258 }
259
260 let quit = matches!(event, topic::InEvent::Command(Command::Quit));
263
264 if let Some(state) = self.states.get_mut(&topic) {
266 if let topic::InEvent::RecvMessage(from, _message) = &event {
269 self.peer_topics.entry(*from).or_default().insert(topic);
270 }
271 let out = state.handle(event, now);
272 for event in out {
273 handle_out_event(topic, event, &mut self.peer_topics, &mut self.outbox);
274 }
275 }
276
277 if quit {
278 self.states.remove(&topic);
279 }
280 }
281 InEventMapped::All(event) => {
283 if let topic::InEvent::UpdatePeerData(data) = &event {
284 self.me_data = data.clone();
285 }
286 for (topic, state) in self.states.iter_mut() {
287 let out = state.handle(event.clone(), now);
288 for event in out {
289 handle_out_event(*topic, event, &mut self.peer_topics, &mut self.outbox);
290 }
291 }
292 }
293 }
294
295 if let Some(metrics) = &metrics {
297 track_out_events(&self.outbox, metrics);
298 }
299
300 self.outbox.drain(..)
301 }
302}
303
304fn handle_out_event<PI: PeerIdentity>(
305 topic: TopicId,
306 event: topic::OutEvent<PI>,
307 conns: &mut ConnsMap<PI>,
308 outbox: &mut Outbox<PI>,
309) {
310 trace!("out: {event:?}");
311 match event {
312 topic::OutEvent::SendMessage(to, message) => {
313 outbox.push(OutEvent::SendMessage(to, Message { topic, message }))
314 }
315 topic::OutEvent::EmitEvent(event) => outbox.push(OutEvent::EmitEvent(topic, event)),
316 topic::OutEvent::ScheduleTimer(delay, timer) => {
317 outbox.push(OutEvent::ScheduleTimer(delay, Timer { topic, timer }))
318 }
319 topic::OutEvent::DisconnectPeer(peer) => {
320 let empty = conns
321 .get_mut(&peer)
322 .map(|list| list.remove(&topic) && list.is_empty())
323 .unwrap_or(false);
324 if empty {
325 conns.remove(&peer);
326 outbox.push(OutEvent::DisconnectPeer(peer));
327 }
328 }
329 topic::OutEvent::PeerData(peer, data) => outbox.push(OutEvent::PeerData(peer, data)),
330 }
331}
332
333fn track_out_events<PI: Serialize>(events: &[OutEvent<PI>], metrics: &Metrics) {
334 for event in events {
335 match event {
336 OutEvent::SendMessage(_to, message) => match message.kind() {
337 MessageKind::Data => {
338 metrics.msgs_data_sent.inc();
339 metrics
340 .msgs_data_sent_size
341 .inc_by(message.size().unwrap_or(0) as u64);
342 }
343 MessageKind::Control => {
344 metrics.msgs_ctrl_sent.inc();
345 metrics
346 .msgs_ctrl_sent_size
347 .inc_by(message.size().unwrap_or(0) as u64);
348 }
349 },
350 OutEvent::EmitEvent(_topic, event) => match event {
351 super::Event::NeighborUp(_peer) => {
352 metrics.neighbor_up.inc();
353 }
354 super::Event::NeighborDown(_peer) => {
355 metrics.neighbor_down.inc();
356 }
357 _ => {}
358 },
359 _ => {}
360 }
361 }
362}
363
364fn track_in_event<PI: Serialize>(event: &InEvent<PI>, metrics: &Metrics) {
365 if let InEvent::RecvMessage(_from, message) = event {
366 match message.kind() {
367 MessageKind::Data => {
368 metrics.msgs_data_recv.inc();
369 metrics
370 .msgs_data_recv_size
371 .inc_by(message.size().unwrap_or(0) as u64);
372 }
373 MessageKind::Control => {
374 metrics.msgs_ctrl_recv.inc();
375 metrics
376 .msgs_ctrl_recv_size
377 .inc_by(message.size().unwrap_or(0) as u64);
378 }
379 }
380 }
381}