1#![allow(missing_docs)]
2
3use std::{
4 collections::{HashMap, HashSet},
5 sync::Arc,
6 time::SystemTime,
7};
8
9use anyhow::{Context, Result};
10use futures_lite::FutureExt;
11use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
12use iroh_blobs::{
13 api::{
14 blobs::BlobStatus,
15 downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
16 Store,
17 },
18 Hash, HashAndFormat,
19};
20use iroh_gossip::net::Gossip;
21use serde::{Deserialize, Serialize};
22use tokio::{
23 sync::{self, mpsc, oneshot},
24 task::JoinSet,
25};
26use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
27
28use super::state::{NamespaceStates, Origin, SyncReason};
30use crate::{
31 actor::{OpenOpts, SyncHandle},
32 engine::gossip::GossipState,
33 metrics::Metrics,
34 net::{
35 connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
36 SyncFinished,
37 },
38 AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
39};
40
41const SOURCE_NAME: &str = "docs_engine";
43
44#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
48pub enum Op {
49 Put(SignedEntry),
51 ContentReady(Hash),
53 SyncReport(SyncReport),
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct SyncReport {
60 namespace: NamespaceId,
61 heads: Vec<u8>,
63}
64
65#[derive(derive_more::Debug, strum::Display)]
67pub enum ToLiveActor {
68 StartSync {
69 namespace: NamespaceId,
70 peers: Vec<NodeAddr>,
71 #[debug("onsehot::Sender")]
72 reply: sync::oneshot::Sender<anyhow::Result<()>>,
73 },
74 Leave {
75 namespace: NamespaceId,
76 kill_subscribers: bool,
77 #[debug("onsehot::Sender")]
78 reply: sync::oneshot::Sender<anyhow::Result<()>>,
79 },
80 Shutdown {
81 reply: sync::oneshot::Sender<()>,
82 },
83 Subscribe {
84 namespace: NamespaceId,
85 #[debug("sender")]
86 sender: async_channel::Sender<Event>,
87 #[debug("oneshot::Sender")]
88 reply: sync::oneshot::Sender<Result<()>>,
89 },
90 HandleConnection {
91 conn: iroh::endpoint::Connection,
92 },
93 AcceptSyncRequest {
94 namespace: NamespaceId,
95 peer: PublicKey,
96 #[debug("oneshot::Sender")]
97 reply: sync::oneshot::Sender<AcceptOutcome>,
98 },
99
100 IncomingSyncReport {
101 from: PublicKey,
102 report: SyncReport,
103 },
104 NeighborContentReady {
105 namespace: NamespaceId,
106 node: PublicKey,
107 hash: Hash,
108 },
109 NeighborUp {
110 namespace: NamespaceId,
111 peer: PublicKey,
112 },
113 NeighborDown {
114 namespace: NamespaceId,
115 peer: PublicKey,
116 },
117}
118
119#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
121pub enum Event {
122 ContentReady {
124 hash: Hash,
126 },
127 NeighborUp(PublicKey),
129 NeighborDown(PublicKey),
131 SyncFinished(SyncEvent),
133 PendingContentReady,
141}
142
143type SyncConnectRes = (
144 NamespaceId,
145 PublicKey,
146 SyncReason,
147 Result<SyncFinished, ConnectError>,
148);
149type SyncAcceptRes = Result<SyncFinished, AcceptError>;
150type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
151
152pub struct LiveActor {
154 inbox: mpsc::Receiver<ToLiveActor>,
156 sync: SyncHandle,
157 endpoint: Endpoint,
158 bao_store: Store,
159 downloader: Downloader,
160 replica_events_tx: async_channel::Sender<crate::Event>,
161 replica_events_rx: async_channel::Receiver<crate::Event>,
162
163 sync_actor_tx: mpsc::Sender<ToLiveActor>,
167 gossip: GossipState,
168
169 running_sync_connect: JoinSet<SyncConnectRes>,
171 running_sync_accept: JoinSet<SyncAcceptRes>,
173 download_tasks: JoinSet<DownloadRes>,
175 missing_hashes: HashSet<Hash>,
177 queued_hashes: QueuedHashes,
179 hash_providers: ProviderNodes,
181
182 subscribers: SubscribersMap,
184
185 state: NamespaceStates,
187 metrics: Arc<Metrics>,
188}
189impl LiveActor {
190 #[allow(clippy::too_many_arguments)]
192 pub fn new(
193 sync: SyncHandle,
194 endpoint: Endpoint,
195 gossip: Gossip,
196 bao_store: Store,
197 downloader: Downloader,
198 inbox: mpsc::Receiver<ToLiveActor>,
199 sync_actor_tx: mpsc::Sender<ToLiveActor>,
200 metrics: Arc<Metrics>,
201 ) -> Self {
202 let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
203 let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
204 Self {
205 inbox,
206 sync,
207 replica_events_rx,
208 replica_events_tx,
209 endpoint,
210 gossip: gossip_state,
211 bao_store,
212 downloader,
213 sync_actor_tx,
214 running_sync_connect: Default::default(),
215 running_sync_accept: Default::default(),
216 subscribers: Default::default(),
217 download_tasks: Default::default(),
218 state: Default::default(),
219 missing_hashes: Default::default(),
220 queued_hashes: Default::default(),
221 hash_providers: Default::default(),
222 metrics,
223 }
224 }
225
226 pub async fn run(mut self) -> Result<()> {
228 let shutdown_reply = self.run_inner().await;
229 if let Err(err) = self.shutdown().await {
230 error!(?err, "Error during shutdown");
231 }
232 drop(self);
233 match shutdown_reply {
234 Ok(reply) => {
235 reply.send(()).ok();
236 Ok(())
237 }
238 Err(err) => Err(err),
239 }
240 }
241
242 async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
243 let mut i = 0;
244 loop {
245 i += 1;
246 trace!(?i, "tick wait");
247 self.metrics.doc_live_tick_main.inc();
248 tokio::select! {
249 biased;
250 msg = self.inbox.recv() => {
251 let msg = msg.context("to_actor closed")?;
252 trace!(?i, %msg, "tick: to_actor");
253 self.metrics.doc_live_tick_actor.inc();
254 match msg {
255 ToLiveActor::Shutdown { reply } => {
256 break Ok(reply);
257 }
258 msg => {
259 self.on_actor_message(msg).await.context("on_actor_message")?;
260 }
261 }
262 }
263 event = self.replica_events_rx.recv() => {
264 trace!(?i, "tick: replica_event");
265 self.metrics.doc_live_tick_replica_event.inc();
266 let event = event.context("replica_events closed")?;
267 if let Err(err) = self.on_replica_event(event).await {
268 error!(?err, "Failed to process replica event");
269 }
270 }
271 Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
272 trace!(?i, "tick: running_sync_connect");
273 self.metrics.doc_live_tick_running_sync_connect.inc();
274 let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
275 self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
276
277 }
278 Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
279 trace!(?i, "tick: running_sync_accept");
280 self.metrics.doc_live_tick_running_sync_accept.inc();
281 let res = res.context("running_sync_accept closed")?;
282 self.on_sync_via_accept_finished(res).await;
283 }
284 Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
285 trace!(?i, "tick: pending_downloads");
286 self.metrics.doc_live_tick_pending_downloads.inc();
287 let (namespace, hash, res) = res.context("pending_downloads closed")?;
288 self.on_download_ready(namespace, hash, res).await;
289 }
290 res = self.gossip.progress(), if !self.gossip.is_empty() => {
291 if let Err(error) = res {
292 warn!(?error, "gossip state failed");
293 }
294 }
295 }
296 }
297 }
298
299 async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
300 match msg {
301 ToLiveActor::Shutdown { .. } => {
302 unreachable!("handled in run");
303 }
304 ToLiveActor::IncomingSyncReport { from, report } => {
305 self.on_sync_report(from, report).await
306 }
307 ToLiveActor::NeighborUp { namespace, peer } => {
308 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
309 self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
310 self.subscribers
311 .send(&namespace, Event::NeighborUp(peer))
312 .await;
313 }
314 ToLiveActor::NeighborDown { namespace, peer } => {
315 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
316 self.subscribers
317 .send(&namespace, Event::NeighborDown(peer))
318 .await;
319 }
320 ToLiveActor::StartSync {
321 namespace,
322 peers,
323 reply,
324 } => {
325 let res = self.start_sync(namespace, peers).await;
326 reply.send(res).ok();
327 }
328 ToLiveActor::Leave {
329 namespace,
330 kill_subscribers,
331 reply,
332 } => {
333 let res = self.leave(namespace, kill_subscribers).await;
334 reply.send(res).ok();
335 }
336 ToLiveActor::Subscribe {
337 namespace,
338 sender,
339 reply,
340 } => {
341 self.subscribers.subscribe(namespace, sender);
342 reply.send(Ok(())).ok();
343 }
344 ToLiveActor::HandleConnection { conn } => {
345 self.handle_connection(conn).await;
346 }
347 ToLiveActor::AcceptSyncRequest {
348 namespace,
349 peer,
350 reply,
351 } => {
352 let outcome = self.accept_sync_request(namespace, peer);
353 reply.send(outcome).ok();
354 }
355 ToLiveActor::NeighborContentReady {
356 namespace,
357 node,
358 hash,
359 } => {
360 self.on_neighbor_content_ready(namespace, node, hash).await;
361 }
362 };
363 Ok(true)
364 }
365
366 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
367 fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
368 if !self.state.start_connect(&namespace, peer, reason) {
369 return;
370 }
371 let endpoint = self.endpoint.clone();
372 let sync = self.sync.clone();
373 let metrics = self.metrics.clone();
374 let fut = async move {
375 let res = connect_and_sync(
376 &endpoint,
377 &sync,
378 namespace,
379 NodeAddr::new(peer),
380 Some(&metrics),
381 )
382 .await;
383 (namespace, peer, reason, res)
384 }
385 .instrument(Span::current());
386 self.running_sync_connect.spawn(fut);
387 }
388
389 async fn shutdown(&mut self) -> anyhow::Result<()> {
390 self.subscribers.clear();
392 let (gossip_shutdown_res, _store) = tokio::join!(
393 self.gossip.shutdown(),
395 self.sync.shutdown()
397 );
398 gossip_shutdown_res?;
399 Ok(())
402 }
403
404 async fn start_sync(&mut self, namespace: NamespaceId, mut peers: Vec<NodeAddr>) -> Result<()> {
405 debug!(?namespace, peers = peers.len(), "start sync");
406 if !self.state.is_syncing(&namespace) {
408 let opts = OpenOpts::default()
409 .sync()
410 .subscribe(self.replica_events_tx.clone());
411 self.sync.open(namespace, opts).await?;
412 self.state.insert(namespace);
413 }
414 match self.sync.get_sync_peers(namespace).await {
416 Ok(None) => {
417 }
419 Ok(Some(known_useful_peers)) => {
420 let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
421 match PublicKey::from_bytes(&peer_id_bytes) {
424 Ok(public_key) => Some(NodeAddr::new(public_key)),
425 Err(_signing_error) => {
426 warn!("potential db corruption: peers per doc can't be decoded");
427 None
428 }
429 }
430 });
431 peers.extend(as_node_addr);
432 }
433 Err(e) => {
434 warn!(%e, "db error reading peers per document")
436 }
437 }
438 self.join_peers(namespace, peers).await?;
439 Ok(())
440 }
441
442 async fn leave(
443 &mut self,
444 namespace: NamespaceId,
445 kill_subscribers: bool,
446 ) -> anyhow::Result<()> {
447 if self.state.remove(&namespace) {
449 self.sync.set_sync(namespace, false).await?;
450 self.sync
451 .unsubscribe(namespace, self.replica_events_tx.clone())
452 .await?;
453 self.sync.close(namespace).await?;
454 self.gossip.quit(&namespace);
455 }
456 if kill_subscribers {
457 self.subscribers.remove(&namespace);
458 }
459 Ok(())
460 }
461
462 async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
463 let mut peer_ids = Vec::new();
464
465 for peer in peers.into_iter() {
467 let peer_id = peer.node_id;
468 if peer.is_empty() {
471 peer_ids.push(peer_id)
472 } else {
473 match self.endpoint.add_node_addr_with_source(peer, SOURCE_NAME) {
474 Ok(()) => {
475 peer_ids.push(peer_id);
476 }
477 Err(err) => {
478 warn!(peer = %peer_id.fmt_short(), "failed to add known addrs: {err:?}");
479 }
480 }
481 }
482 }
483
484 self.gossip.join(namespace, peer_ids.clone()).await?;
486
487 if !peer_ids.is_empty() {
488 for peer in peer_ids {
490 self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
491 }
492 }
493 Ok(())
494 }
495
496 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
497 async fn on_sync_via_connect_finished(
498 &mut self,
499 namespace: NamespaceId,
500 peer: PublicKey,
501 reason: SyncReason,
502 result: Result<SyncFinished, ConnectError>,
503 ) {
504 match result {
505 Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
506 debug!(?reason, "remote abort, already syncing");
507 }
508 res => {
509 self.on_sync_finished(
510 namespace,
511 peer,
512 Origin::Connect(reason),
513 res.map_err(Into::into),
514 )
515 .await
516 }
517 }
518 }
519
520 #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
521 async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
522 match res {
523 Ok(state) => {
524 self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
525 .await
526 }
527 Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
528 debug!(?reason, "aborted by us");
530 }
531 Err(err) => {
532 if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
533 self.on_sync_finished(
534 namespace,
535 peer,
536 Origin::Accept,
537 Err(anyhow::Error::from(err)),
538 )
539 .await;
540 } else {
541 debug!(?err, "failed before reading the first message");
542 }
543 }
544 }
545 }
546
547 async fn on_sync_finished(
548 &mut self,
549 namespace: NamespaceId,
550 peer: PublicKey,
551 origin: Origin,
552 result: Result<SyncFinished>,
553 ) {
554 match &result {
555 Err(ref err) => {
556 warn!(?origin, ?err, "sync failed");
557 }
558 Ok(ref details) => {
559 info!(
560 sent = %details.outcome.num_sent,
561 recv = %details.outcome.num_recv,
562 t_connect = ?details.timings.connect,
563 t_process = ?details.timings.process,
564 "sync finished",
565 );
566
567 if let Err(e) = self
569 .sync
570 .register_useful_peer(namespace, *peer.as_bytes())
571 .await
572 {
573 debug!(%e, "failed to register peer for document")
574 }
575
576 if details.outcome.num_recv > 0 {
578 info!("broadcast sync report to neighbors");
579 match details
580 .outcome
581 .heads_received
582 .encode(Some(self.gossip.max_message_size()))
583 {
584 Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
585 Ok(heads) => {
586 let report = SyncReport { namespace, heads };
587 self.broadcast_neighbors(namespace, &Op::SyncReport(report))
588 .await;
589 }
590 }
591 }
592 }
593 };
594
595 let result_for_event = match &result {
596 Ok(details) => Ok(details.into()),
597 Err(err) => Err(err.to_string()),
598 };
599
600 let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
601 return;
602 };
603
604 let ev = SyncEvent {
605 peer,
606 origin,
607 result: result_for_event,
608 finished: SystemTime::now(),
609 started,
610 };
611 self.subscribers
612 .send(&namespace, Event::SyncFinished(ev))
613 .await;
614
615 if self.queued_hashes.contains_namespace(&namespace) {
621 self.state.set_may_emit_ready(&namespace, true);
622 } else {
623 self.subscribers
624 .send(&namespace, Event::PendingContentReady)
625 .await;
626 self.state.set_may_emit_ready(&namespace, false);
627 }
628
629 if resync {
630 self.sync_with_peer(namespace, peer, SyncReason::Resync);
631 }
632 }
633
634 async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
635 if !self.state.is_syncing(&namespace) {
636 return;
637 }
638
639 let msg = match postcard::to_stdvec(op) {
640 Ok(msg) => msg,
641 Err(err) => {
642 error!(?err, ?op, "Failed to serialize message:");
643 return;
644 }
645 };
646 self.gossip
648 .broadcast_neighbors(&namespace, msg.into())
649 .await;
650 }
651
652 async fn on_download_ready(
653 &mut self,
654 namespace: NamespaceId,
655 hash: Hash,
656 res: Result<(), anyhow::Error>,
657 ) {
658 let completed_namespaces = self.queued_hashes.remove_hash(&hash);
659 debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
660 if res.is_ok() {
661 self.subscribers
662 .send(&namespace, Event::ContentReady { hash })
663 .await;
664 self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
666 .await;
667 } else {
668 self.missing_hashes.insert(hash);
669 }
670 for namespace in completed_namespaces.iter() {
671 if let Some(true) = self.state.may_emit_ready(namespace) {
672 self.subscribers
673 .send(namespace, Event::PendingContentReady)
674 .await;
675 }
676 }
677 }
678
679 async fn on_neighbor_content_ready(
680 &mut self,
681 namespace: NamespaceId,
682 node: NodeId,
683 hash: Hash,
684 ) {
685 self.start_download(namespace, hash, node, true).await;
686 }
687
688 #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
689 async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
690 let namespace = report.namespace;
691 if !self.state.is_syncing(&namespace) {
692 return;
693 }
694 let heads = match AuthorHeads::decode(&report.heads) {
695 Ok(heads) => heads,
696 Err(err) => {
697 warn!(?err, "failed to decode AuthorHeads");
698 return;
699 }
700 };
701 match self.sync.has_news_for_us(report.namespace, heads).await {
702 Ok(Some(updated_authors)) => {
703 info!(%updated_authors, "news reported: sync now");
704 self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
705 }
706 Ok(None) => {
707 debug!("no news reported: nothing to do");
708 }
709 Err(err) => {
710 warn!("sync actor error: {err:?}");
711 }
712 }
713 }
714
715 async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
716 match event {
717 crate::Event::LocalInsert { namespace, entry } => {
718 debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
719 if self.state.is_syncing(&namespace) {
721 let op = Op::Put(entry.clone());
722 let message = postcard::to_stdvec(&op)?.into();
723 self.gossip.broadcast(&namespace, message).await;
724 }
725 }
726 crate::Event::RemoteInsert {
727 namespace,
728 entry,
729 from,
730 should_download,
731 remote_content_status,
732 } => {
733 debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
734 if should_download {
737 let hash = entry.content_hash();
738 if matches!(remote_content_status, ContentStatus::Complete) {
739 let node_id = PublicKey::from_bytes(&from)?;
740 self.start_download(namespace, hash, node_id, false).await;
741 } else {
742 self.missing_hashes.insert(hash);
743 }
744 }
745 }
746 }
747
748 Ok(())
749 }
750
751 async fn start_download(
752 &mut self,
753 namespace: NamespaceId,
754 hash: Hash,
755 node: PublicKey,
756 only_if_missing: bool,
757 ) {
758 let entry_status = self.bao_store.blobs().status(hash).await;
759 if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
760 self.missing_hashes.remove(&hash);
761 return;
762 }
763 self.hash_providers
764 .0
765 .lock()
766 .expect("poisoned")
767 .entry(hash)
768 .or_default()
769 .insert(node);
770 if self.queued_hashes.contains_hash(&hash) {
771 self.queued_hashes.insert(hash, namespace);
772 } else if !only_if_missing || self.missing_hashes.contains(&hash) {
773 let req = DownloadRequest::new(
774 HashAndFormat::raw(hash),
775 self.hash_providers.clone(),
776 SplitStrategy::None,
777 );
778 let handle = self.downloader.download_with_opts(req);
779
780 self.queued_hashes.insert(hash, namespace);
781 self.missing_hashes.remove(&hash);
782 self.download_tasks
783 .spawn(async move { (namespace, hash, handle.await) });
784 }
785 }
786
787 #[instrument("accept", skip_all)]
788 pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
789 let to_actor_tx = self.sync_actor_tx.clone();
790 let accept_request_cb = move |namespace, peer| {
791 let to_actor_tx = to_actor_tx.clone();
792 async move {
793 let (reply_tx, reply_rx) = oneshot::channel();
794 to_actor_tx
795 .send(ToLiveActor::AcceptSyncRequest {
796 namespace,
797 peer,
798 reply: reply_tx,
799 })
800 .await
801 .ok();
802 match reply_rx.await {
803 Ok(outcome) => outcome,
804 Err(err) => {
805 warn!(
806 "accept request callback failed to retrieve reply from actor: {err:?}"
807 );
808 AcceptOutcome::Reject(AbortReason::InternalServerError)
809 }
810 }
811 }
812 .boxed()
813 };
814 debug!("incoming connection");
815 let sync = self.sync.clone();
816 let metrics = self.metrics.clone();
817 self.running_sync_accept.spawn(
818 async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
819 .instrument(Span::current()),
820 );
821 }
822
823 pub fn accept_sync_request(
824 &mut self,
825 namespace: NamespaceId,
826 peer: PublicKey,
827 ) -> AcceptOutcome {
828 self.state
829 .accept_request(&self.endpoint.node_id(), &namespace, peer)
830 }
831}
832
833#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
835pub struct SyncEvent {
836 pub peer: PublicKey,
838 pub origin: Origin,
840 pub finished: SystemTime,
842 pub started: SystemTime,
844 pub result: std::result::Result<SyncDetails, String>,
846}
847
848#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
849pub struct SyncDetails {
850 pub entries_received: usize,
852 pub entries_sent: usize,
854}
855
856impl From<&SyncFinished> for SyncDetails {
857 fn from(value: &SyncFinished) -> Self {
858 Self {
859 entries_received: value.outcome.num_recv,
860 entries_sent: value.outcome.num_sent,
861 }
862 }
863}
864
865#[derive(Debug, Default)]
866struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
867
868impl SubscribersMap {
869 fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
870 self.0.entry(namespace).or_default().subscribe(sender);
871 }
872
873 async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
874 debug!(namespace=%namespace.fmt_short(), %event, "emit event");
875 let Some(subscribers) = self.0.get_mut(namespace) else {
876 return false;
877 };
878
879 if !subscribers.send(event).await {
880 self.0.remove(namespace);
881 }
882 true
883 }
884
885 fn remove(&mut self, namespace: &NamespaceId) {
886 self.0.remove(namespace);
887 }
888
889 fn clear(&mut self) {
890 self.0.clear();
891 }
892}
893
894#[derive(Debug, Default)]
895struct QueuedHashes {
896 by_hash: HashMap<Hash, HashSet<NamespaceId>>,
897 by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
898}
899
900#[derive(Debug, Clone, Default)]
901struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<NodeId>>>>);
902
903impl ContentDiscovery for ProviderNodes {
904 fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<NodeId> {
905 let nodes = self
906 .0
907 .lock()
908 .expect("poisoned")
909 .get(&hash.hash)
910 .into_iter()
911 .flatten()
912 .cloned()
913 .collect::<Vec<_>>();
914 Box::pin(n0_future::stream::iter(nodes))
915 }
916}
917
918impl QueuedHashes {
919 fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
920 self.by_hash.entry(hash).or_default().insert(namespace);
921 self.by_namespace.entry(namespace).or_default().insert(hash);
922 }
923
924 fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
928 let namespaces = self.by_hash.remove(hash).unwrap_or_default();
929 let mut removed_namespaces = vec![];
930 for namespace in namespaces {
931 if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
932 hashes.remove(hash);
933 if hashes.is_empty() {
934 self.by_namespace.remove(&namespace);
935 removed_namespaces.push(namespace);
936 }
937 }
938 }
939 removed_namespaces
940 }
941
942 fn contains_hash(&self, hash: &Hash) -> bool {
943 self.by_hash.contains_key(hash)
944 }
945
946 fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
947 self.by_namespace.contains_key(namespace)
948 }
949}
950
951#[derive(Debug, Default)]
952struct Subscribers(Vec<async_channel::Sender<Event>>);
953
954impl Subscribers {
955 fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
956 self.0.push(sender)
957 }
958
959 async fn send(&mut self, event: Event) -> bool {
960 let futs = self.0.iter().map(|sender| sender.send(event.clone()));
961 let res = futures_buffered::join_all(futs).await;
962 for (i, res) in res.into_iter().enumerate().rev() {
964 if res.is_err() {
965 self.0.remove(i);
966 }
967 }
968 !self.0.is_empty()
969 }
970}
971
972fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
973 match res {
974 Ok(res) => res.peer.fmt_short(),
975 Err(err) => err
976 .peer()
977 .map(|x| x.fmt_short())
978 .unwrap_or_else(|| "unknown".to_string()),
979 }
980}
981
982fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
983 match res {
984 Ok(res) => res.namespace.fmt_short(),
985 Err(err) => err
986 .namespace()
987 .map(|x| x.fmt_short())
988 .unwrap_or_else(|| "unknown".to_string()),
989 }
990}
991
992#[cfg(test)]
993mod tests {
994 use super::*;
995
996 #[tokio::test]
997 async fn test_sync_remove() {
998 let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
999 let (a_tx, a_rx) = async_channel::unbounded();
1000 let (b_tx, b_rx) = async_channel::unbounded();
1001 let mut subscribers = Subscribers::default();
1002 subscribers.subscribe(a_tx);
1003 subscribers.subscribe(b_tx);
1004 drop(a_rx);
1005 drop(b_rx);
1006 subscribers.send(Event::NeighborUp(pk)).await;
1007 }
1008}