1#![allow(missing_docs)]
2
3use std::{
4 collections::{HashMap, HashSet},
5 sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use iroh::{address_lookup::memory::MemoryLookup, Endpoint, EndpointAddr, EndpointId, PublicKey};
10use iroh_blobs::{
11 api::{
12 blobs::BlobStatus,
13 downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
14 Store,
15 },
16 Hash, HashAndFormat,
17};
18use iroh_gossip::net::Gossip;
19use n0_future::{task::JoinSet, time::SystemTime, FutureExt};
20use serde::{Deserialize, Serialize};
21use tokio::sync::{self, mpsc, oneshot};
22use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
23
24use super::state::{NamespaceStates, Origin, SyncReason};
26use crate::{
27 actor::{OpenOpts, SyncHandle},
28 engine::gossip::GossipState,
29 metrics::Metrics,
30 net::{
31 connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
32 SyncFinished,
33 },
34 AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
35};
36
37#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
41pub enum Op {
42 Put(SignedEntry),
44 ContentReady(Hash),
46 SyncReport(SyncReport),
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct SyncReport {
53 namespace: NamespaceId,
54 heads: Vec<u8>,
56}
57
58#[derive(derive_more::Debug, strum::Display)]
60pub enum ToLiveActor {
61 StartSync {
62 namespace: NamespaceId,
63 peers: Vec<EndpointAddr>,
64 #[debug("onsehot::Sender")]
65 reply: sync::oneshot::Sender<anyhow::Result<()>>,
66 },
67 Leave {
68 namespace: NamespaceId,
69 kill_subscribers: bool,
70 #[debug("onsehot::Sender")]
71 reply: sync::oneshot::Sender<anyhow::Result<()>>,
72 },
73 Shutdown {
74 reply: sync::oneshot::Sender<()>,
75 },
76 Subscribe {
77 namespace: NamespaceId,
78 #[debug("sender")]
79 sender: async_channel::Sender<Event>,
80 #[debug("oneshot::Sender")]
81 reply: sync::oneshot::Sender<Result<()>>,
82 },
83 HandleConnection {
84 conn: iroh::endpoint::Connection,
85 },
86 AcceptSyncRequest {
87 namespace: NamespaceId,
88 peer: PublicKey,
89 #[debug("oneshot::Sender")]
90 reply: sync::oneshot::Sender<AcceptOutcome>,
91 },
92
93 IncomingSyncReport {
94 from: PublicKey,
95 report: SyncReport,
96 },
97 NeighborContentReady {
98 namespace: NamespaceId,
99 node: PublicKey,
100 hash: Hash,
101 },
102 NeighborUp {
103 namespace: NamespaceId,
104 peer: PublicKey,
105 },
106 NeighborDown {
107 namespace: NamespaceId,
108 peer: PublicKey,
109 },
110}
111
112#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
114pub enum Event {
115 ContentReady {
117 hash: Hash,
119 },
120 NeighborUp(PublicKey),
122 NeighborDown(PublicKey),
124 SyncFinished(SyncEvent),
126 PendingContentReady,
134}
135
136type SyncConnectRes = (
137 NamespaceId,
138 PublicKey,
139 SyncReason,
140 Result<SyncFinished, ConnectError>,
141);
142type SyncAcceptRes = Result<SyncFinished, AcceptError>;
143type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
144
145pub struct LiveActor {
147 inbox: mpsc::Receiver<ToLiveActor>,
149 sync: SyncHandle,
150 endpoint: Endpoint,
151 bao_store: Store,
152 downloader: Downloader,
153 memory_lookup: MemoryLookup,
154 replica_events_tx: async_channel::Sender<crate::Event>,
155 replica_events_rx: async_channel::Receiver<crate::Event>,
156
157 sync_actor_tx: mpsc::Sender<ToLiveActor>,
161 gossip: GossipState,
162
163 running_sync_connect: JoinSet<SyncConnectRes>,
165 running_sync_accept: JoinSet<SyncAcceptRes>,
167 download_tasks: JoinSet<DownloadRes>,
169 missing_hashes: HashSet<Hash>,
171 queued_hashes: QueuedHashes,
173 hash_providers: ProviderNodes,
175
176 subscribers: SubscribersMap,
178
179 state: NamespaceStates,
181 metrics: Arc<Metrics>,
182}
183impl LiveActor {
184 #[allow(clippy::too_many_arguments)]
186 pub fn new(
187 sync: SyncHandle,
188 endpoint: Endpoint,
189 gossip: Gossip,
190 bao_store: Store,
191 downloader: Downloader,
192 inbox: mpsc::Receiver<ToLiveActor>,
193 sync_actor_tx: mpsc::Sender<ToLiveActor>,
194 metrics: Arc<Metrics>,
195 ) -> Result<Self> {
196 let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
197 let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
198 let memory_lookup = MemoryLookup::new();
199 endpoint.address_lookup()?.add(memory_lookup.clone());
200 Ok(Self {
201 inbox,
202 sync,
203 replica_events_rx,
204 replica_events_tx,
205 endpoint,
206 memory_lookup,
207 gossip: gossip_state,
208 bao_store,
209 downloader,
210 sync_actor_tx,
211 running_sync_connect: Default::default(),
212 running_sync_accept: Default::default(),
213 subscribers: Default::default(),
214 download_tasks: Default::default(),
215 state: Default::default(),
216 missing_hashes: Default::default(),
217 queued_hashes: Default::default(),
218 hash_providers: Default::default(),
219 metrics,
220 })
221 }
222
223 pub async fn run(mut self) -> Result<()> {
225 let shutdown_reply = self.run_inner().await;
226 if let Err(err) = self.shutdown().await {
227 error!(?err, "Error during shutdown");
228 }
229 drop(self);
230 match shutdown_reply {
231 Ok(reply) => {
232 reply.send(()).ok();
233 Ok(())
234 }
235 Err(err) => Err(err),
236 }
237 }
238
239 async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
240 let mut i = 0;
241 loop {
242 i += 1;
243 trace!(?i, "tick wait");
244 self.metrics.doc_live_tick_main.inc();
245 tokio::select! {
246 biased;
247 msg = self.inbox.recv() => {
248 let msg = msg.context("to_actor closed")?;
249 trace!(?i, %msg, "tick: to_actor");
250 self.metrics.doc_live_tick_actor.inc();
251 match msg {
252 ToLiveActor::Shutdown { reply } => {
253 break Ok(reply);
254 }
255 msg => {
256 self.on_actor_message(msg).await.context("on_actor_message")?;
257 }
258 }
259 }
260 event = self.replica_events_rx.recv() => {
261 trace!(?i, "tick: replica_event");
262 self.metrics.doc_live_tick_replica_event.inc();
263 let event = event.context("replica_events closed")?;
264 if let Err(err) = self.on_replica_event(event).await {
265 error!(?err, "Failed to process replica event");
266 }
267 }
268 Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
269 trace!(?i, "tick: running_sync_connect");
270 self.metrics.doc_live_tick_running_sync_connect.inc();
271 let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
272 self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
273
274 }
275 Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
276 trace!(?i, "tick: running_sync_accept");
277 self.metrics.doc_live_tick_running_sync_accept.inc();
278 let res = res.context("running_sync_accept closed")?;
279 self.on_sync_via_accept_finished(res).await;
280 }
281 Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
282 trace!(?i, "tick: pending_downloads");
283 self.metrics.doc_live_tick_pending_downloads.inc();
284 let (namespace, hash, res) = res.context("pending_downloads closed")?;
285 self.on_download_ready(namespace, hash, res).await;
286 }
287 res = self.gossip.progress(), if !self.gossip.is_empty() => {
288 if let Err(error) = res {
289 warn!(?error, "gossip state failed");
290 }
291 }
292 }
293 }
294 }
295
296 async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
297 match msg {
298 ToLiveActor::Shutdown { .. } => {
299 unreachable!("handled in run");
300 }
301 ToLiveActor::IncomingSyncReport { from, report } => {
302 self.on_sync_report(from, report).await
303 }
304 ToLiveActor::NeighborUp { namespace, peer } => {
305 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
306 self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
307 self.subscribers
308 .send(&namespace, Event::NeighborUp(peer))
309 .await;
310 }
311 ToLiveActor::NeighborDown { namespace, peer } => {
312 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
313 self.subscribers
314 .send(&namespace, Event::NeighborDown(peer))
315 .await;
316 }
317 ToLiveActor::StartSync {
318 namespace,
319 peers,
320 reply,
321 } => {
322 let res = self.start_sync(namespace, peers).await;
323 reply.send(res).ok();
324 }
325 ToLiveActor::Leave {
326 namespace,
327 kill_subscribers,
328 reply,
329 } => {
330 let res = self.leave(namespace, kill_subscribers).await;
331 reply.send(res).ok();
332 }
333 ToLiveActor::Subscribe {
334 namespace,
335 sender,
336 reply,
337 } => {
338 self.subscribers.subscribe(namespace, sender);
339 reply.send(Ok(())).ok();
340 }
341 ToLiveActor::HandleConnection { conn } => {
342 self.handle_connection(conn).await;
343 }
344 ToLiveActor::AcceptSyncRequest {
345 namespace,
346 peer,
347 reply,
348 } => {
349 let outcome = self.accept_sync_request(namespace, peer);
350 reply.send(outcome).ok();
351 }
352 ToLiveActor::NeighborContentReady {
353 namespace,
354 node,
355 hash,
356 } => {
357 self.on_neighbor_content_ready(namespace, node, hash).await;
358 }
359 };
360 Ok(true)
361 }
362
363 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
364 fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
365 if !self.state.start_connect(&namespace, peer, reason) {
366 return;
367 }
368 let endpoint = self.endpoint.clone();
369 let sync = self.sync.clone();
370 let metrics = self.metrics.clone();
371 let fut = async move {
372 let res = connect_and_sync(
373 &endpoint,
374 &sync,
375 namespace,
376 EndpointAddr::new(peer),
377 Some(&metrics),
378 )
379 .await;
380 (namespace, peer, reason, res)
381 }
382 .instrument(Span::current());
383 self.running_sync_connect.spawn(fut);
384 }
385
386 async fn shutdown(&mut self) -> anyhow::Result<()> {
387 self.subscribers.clear();
389 let (gossip_shutdown_res, _store) = tokio::join!(
390 self.gossip.shutdown(),
392 self.sync.shutdown()
394 );
395 gossip_shutdown_res?;
396 Ok(())
399 }
400
401 async fn start_sync(
402 &mut self,
403 namespace: NamespaceId,
404 mut peers: Vec<EndpointAddr>,
405 ) -> Result<()> {
406 debug!(?namespace, peers = peers.len(), "start sync");
407 if !self.state.is_syncing(&namespace) {
409 let opts = OpenOpts::default()
410 .sync()
411 .subscribe(self.replica_events_tx.clone());
412 self.sync.open(namespace, opts).await?;
413 self.state.insert(namespace);
414 }
415 match self.sync.get_sync_peers(namespace).await {
417 Ok(None) => {
418 }
420 Ok(Some(known_useful_peers)) => {
421 let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
422 match PublicKey::from_bytes(&peer_id_bytes) {
425 Ok(public_key) => Some(EndpointAddr::new(public_key)),
426 Err(_signing_error) => {
427 warn!("potential db corruption: peers per doc can't be decoded");
428 None
429 }
430 }
431 });
432 peers.extend(as_node_addr);
433 }
434 Err(e) => {
435 warn!(%e, "db error reading peers per document")
437 }
438 }
439 self.join_peers(namespace, peers).await?;
440 Ok(())
441 }
442
443 async fn leave(
444 &mut self,
445 namespace: NamespaceId,
446 kill_subscribers: bool,
447 ) -> anyhow::Result<()> {
448 if self.state.remove(&namespace) {
450 self.sync.set_sync(namespace, false).await?;
451 self.sync
452 .unsubscribe(namespace, self.replica_events_tx.clone())
453 .await?;
454 self.sync.close(namespace).await?;
455 self.gossip.quit(&namespace);
456 }
457 if kill_subscribers {
458 self.subscribers.remove(&namespace);
459 }
460 Ok(())
461 }
462
463 async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
464 let mut peer_ids = Vec::new();
465
466 for peer in peers.into_iter() {
468 let peer_id = peer.id;
469 if !peer.is_empty() {
472 self.memory_lookup.add_endpoint_info(peer);
473 }
474 peer_ids.push(peer_id);
475 }
476
477 self.gossip.join(namespace, peer_ids.clone()).await?;
479
480 if !peer_ids.is_empty() {
481 for peer in peer_ids {
483 self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
484 }
485 }
486 Ok(())
487 }
488
489 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
490 async fn on_sync_via_connect_finished(
491 &mut self,
492 namespace: NamespaceId,
493 peer: PublicKey,
494 reason: SyncReason,
495 result: Result<SyncFinished, ConnectError>,
496 ) {
497 match result {
498 Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
499 debug!(?reason, "remote abort, already syncing");
500 }
501 res => {
502 self.on_sync_finished(
503 namespace,
504 peer,
505 Origin::Connect(reason),
506 res.map_err(Into::into),
507 )
508 .await
509 }
510 }
511 }
512
513 #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
514 async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
515 match res {
516 Ok(state) => {
517 self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
518 .await
519 }
520 Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
521 debug!(?reason, "aborted by us");
523 }
524 Err(err) => {
525 if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
526 self.on_sync_finished(
527 namespace,
528 peer,
529 Origin::Accept,
530 Err(anyhow::Error::from(err)),
531 )
532 .await;
533 } else {
534 debug!(?err, "failed before reading the first message");
535 }
536 }
537 }
538 }
539
540 async fn on_sync_finished(
541 &mut self,
542 namespace: NamespaceId,
543 peer: PublicKey,
544 origin: Origin,
545 result: Result<SyncFinished>,
546 ) {
547 match &result {
548 Err(ref err) => {
549 warn!(?origin, ?err, "sync failed");
550 }
551 Ok(ref details) => {
552 info!(
553 sent = %details.outcome.num_sent,
554 recv = %details.outcome.num_recv,
555 t_connect = ?details.timings.connect,
556 t_process = ?details.timings.process,
557 "sync finished",
558 );
559
560 if let Err(e) = self
562 .sync
563 .register_useful_peer(namespace, *peer.as_bytes())
564 .await
565 {
566 debug!(%e, "failed to register peer for document")
567 }
568
569 if details.outcome.num_recv > 0 {
571 info!("broadcast sync report to neighbors");
572 match details
573 .outcome
574 .heads_received
575 .encode(Some(self.gossip.max_message_size()))
576 {
577 Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
578 Ok(heads) => {
579 let report = SyncReport { namespace, heads };
580 self.broadcast_neighbors(namespace, &Op::SyncReport(report))
581 .await;
582 }
583 }
584 }
585 }
586 };
587
588 let result_for_event = match &result {
589 Ok(details) => Ok(details.into()),
590 Err(err) => Err(err.to_string()),
591 };
592
593 let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
594 return;
595 };
596
597 let ev = SyncEvent {
598 peer,
599 origin,
600 result: result_for_event,
601 finished: SystemTime::now(),
602 started,
603 };
604 self.subscribers
605 .send(&namespace, Event::SyncFinished(ev))
606 .await;
607
608 if self.queued_hashes.contains_namespace(&namespace) {
614 self.state.set_may_emit_ready(&namespace, true);
615 } else {
616 self.subscribers
617 .send(&namespace, Event::PendingContentReady)
618 .await;
619 self.state.set_may_emit_ready(&namespace, false);
620 }
621
622 if resync {
623 self.sync_with_peer(namespace, peer, SyncReason::Resync);
624 }
625 }
626
627 async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
628 if !self.state.is_syncing(&namespace) {
629 return;
630 }
631
632 let msg = match postcard::to_stdvec(op) {
633 Ok(msg) => msg,
634 Err(err) => {
635 error!(?err, ?op, "Failed to serialize message:");
636 return;
637 }
638 };
639 self.gossip
641 .broadcast_neighbors(&namespace, msg.into())
642 .await;
643 }
644
645 async fn on_download_ready(
646 &mut self,
647 namespace: NamespaceId,
648 hash: Hash,
649 res: Result<(), anyhow::Error>,
650 ) {
651 let completed_namespaces = self.queued_hashes.remove_hash(&hash);
652 debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
653 if res.is_ok() {
654 self.subscribers
655 .send(&namespace, Event::ContentReady { hash })
656 .await;
657 self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
659 .await;
660 } else {
661 self.missing_hashes.insert(hash);
662 }
663 for namespace in completed_namespaces.iter() {
664 if let Some(true) = self.state.may_emit_ready(namespace) {
665 self.subscribers
666 .send(namespace, Event::PendingContentReady)
667 .await;
668 }
669 }
670 }
671
672 async fn on_neighbor_content_ready(
673 &mut self,
674 namespace: NamespaceId,
675 node: EndpointId,
676 hash: Hash,
677 ) {
678 self.start_download(namespace, hash, node, true).await;
679 }
680
681 #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
682 async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
683 let namespace = report.namespace;
684 if !self.state.is_syncing(&namespace) {
685 return;
686 }
687 let heads = match AuthorHeads::decode(&report.heads) {
688 Ok(heads) => heads,
689 Err(err) => {
690 warn!(?err, "failed to decode AuthorHeads");
691 return;
692 }
693 };
694 match self.sync.has_news_for_us(report.namespace, heads).await {
695 Ok(Some(updated_authors)) => {
696 info!(%updated_authors, "news reported: sync now");
697 self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
698 }
699 Ok(None) => {
700 debug!("no news reported: nothing to do");
701 }
702 Err(err) => {
703 warn!("sync actor error: {err:?}");
704 }
705 }
706 }
707
708 async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
709 match event {
710 crate::Event::LocalInsert { namespace, entry } => {
711 debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
712 if self.state.is_syncing(&namespace) {
714 let op = Op::Put(entry.clone());
715 let message = postcard::to_stdvec(&op)?.into();
716 self.gossip.broadcast(&namespace, message).await;
717 }
718 }
719 crate::Event::RemoteInsert {
720 namespace,
721 entry,
722 from,
723 should_download,
724 remote_content_status,
725 } => {
726 debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
727 if should_download {
730 let hash = entry.content_hash();
731 if matches!(remote_content_status, ContentStatus::Complete) {
732 let node_id = PublicKey::from_bytes(&from)?;
733 self.start_download(namespace, hash, node_id, false).await;
734 } else {
735 self.missing_hashes.insert(hash);
736 }
737 }
738 }
739 }
740
741 Ok(())
742 }
743
744 async fn start_download(
745 &mut self,
746 namespace: NamespaceId,
747 hash: Hash,
748 node: PublicKey,
749 only_if_missing: bool,
750 ) {
751 let entry_status = self.bao_store.blobs().status(hash).await;
752 if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
753 self.missing_hashes.remove(&hash);
754 return;
755 }
756 self.hash_providers
757 .0
758 .lock()
759 .expect("poisoned")
760 .entry(hash)
761 .or_default()
762 .insert(node);
763 if self.queued_hashes.contains_hash(&hash) {
764 self.queued_hashes.insert(hash, namespace);
765 } else if !only_if_missing || self.missing_hashes.contains(&hash) {
766 let req = DownloadRequest::new(
767 HashAndFormat::raw(hash),
768 self.hash_providers.clone(),
769 SplitStrategy::None,
770 );
771 let handle = self.downloader.download_with_opts(req);
772
773 self.queued_hashes.insert(hash, namespace);
774 self.missing_hashes.remove(&hash);
775 self.download_tasks.spawn(async move {
776 (
777 namespace,
778 hash,
779 handle.await.map_err(|e| anyhow::anyhow!(e)),
780 )
781 });
782 }
783 }
784
785 #[instrument("accept", skip_all)]
786 pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
787 let to_actor_tx = self.sync_actor_tx.clone();
788 let accept_request_cb = move |namespace, peer| {
789 let to_actor_tx = to_actor_tx.clone();
790 async move {
791 let (reply_tx, reply_rx) = oneshot::channel();
792 to_actor_tx
793 .send(ToLiveActor::AcceptSyncRequest {
794 namespace,
795 peer,
796 reply: reply_tx,
797 })
798 .await
799 .ok();
800 match reply_rx.await {
801 Ok(outcome) => outcome,
802 Err(err) => {
803 warn!(
804 "accept request callback failed to retrieve reply from actor: {err:?}"
805 );
806 AcceptOutcome::Reject(AbortReason::InternalServerError)
807 }
808 }
809 }
810 .boxed()
811 };
812 debug!("incoming connection");
813 let sync = self.sync.clone();
814 let metrics = self.metrics.clone();
815 self.running_sync_accept.spawn(
816 async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
817 .instrument(Span::current()),
818 );
819 }
820
821 pub fn accept_sync_request(
822 &mut self,
823 namespace: NamespaceId,
824 peer: PublicKey,
825 ) -> AcceptOutcome {
826 self.state
827 .accept_request(&self.endpoint.id(), &namespace, peer)
828 }
829}
830
831#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
833pub struct SyncEvent {
834 pub peer: PublicKey,
836 pub origin: Origin,
838 pub finished: SystemTime,
840 pub started: SystemTime,
842 pub result: std::result::Result<SyncDetails, String>,
844}
845
846#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
847pub struct SyncDetails {
848 pub entries_received: usize,
850 pub entries_sent: usize,
852}
853
854impl From<&SyncFinished> for SyncDetails {
855 fn from(value: &SyncFinished) -> Self {
856 Self {
857 entries_received: value.outcome.num_recv,
858 entries_sent: value.outcome.num_sent,
859 }
860 }
861}
862
863#[derive(Debug, Default)]
864struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
865
866impl SubscribersMap {
867 fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
868 self.0.entry(namespace).or_default().subscribe(sender);
869 }
870
871 async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
872 debug!(namespace=%namespace.fmt_short(), %event, "emit event");
873 let Some(subscribers) = self.0.get_mut(namespace) else {
874 return false;
875 };
876
877 if !subscribers.send(event).await {
878 self.0.remove(namespace);
879 }
880 true
881 }
882
883 fn remove(&mut self, namespace: &NamespaceId) {
884 self.0.remove(namespace);
885 }
886
887 fn clear(&mut self) {
888 self.0.clear();
889 }
890}
891
892#[derive(Debug, Default)]
893struct QueuedHashes {
894 by_hash: HashMap<Hash, HashSet<NamespaceId>>,
895 by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
896}
897
898#[derive(Debug, Clone, Default)]
899struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<EndpointId>>>>);
900
901impl ContentDiscovery for ProviderNodes {
902 fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<EndpointId> {
903 let nodes = self
904 .0
905 .lock()
906 .expect("poisoned")
907 .get(&hash.hash)
908 .into_iter()
909 .flatten()
910 .cloned()
911 .collect::<Vec<_>>();
912 Box::pin(n0_future::stream::iter(nodes))
913 }
914}
915
916impl QueuedHashes {
917 fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
918 self.by_hash.entry(hash).or_default().insert(namespace);
919 self.by_namespace.entry(namespace).or_default().insert(hash);
920 }
921
922 fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
926 let namespaces = self.by_hash.remove(hash).unwrap_or_default();
927 let mut removed_namespaces = vec![];
928 for namespace in namespaces {
929 if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
930 hashes.remove(hash);
931 if hashes.is_empty() {
932 self.by_namespace.remove(&namespace);
933 removed_namespaces.push(namespace);
934 }
935 }
936 }
937 removed_namespaces
938 }
939
940 fn contains_hash(&self, hash: &Hash) -> bool {
941 self.by_hash.contains_key(hash)
942 }
943
944 fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
945 self.by_namespace.contains_key(namespace)
946 }
947}
948
949#[derive(Debug, Default)]
950struct Subscribers(Vec<async_channel::Sender<Event>>);
951
952impl Subscribers {
953 fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
954 self.0.push(sender)
955 }
956
957 async fn send(&mut self, event: Event) -> bool {
958 let futs = self.0.iter().map(|sender| sender.send(event.clone()));
959 let res = futures_buffered::join_all(futs).await;
960 for (i, res) in res.into_iter().enumerate().rev() {
962 if res.is_err() {
963 self.0.remove(i);
964 }
965 }
966 !self.0.is_empty()
967 }
968}
969
970fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
971 match res {
972 Ok(res) => res.peer.fmt_short().to_string(),
973 Err(err) => err
974 .peer()
975 .map(|x| x.fmt_short().to_string())
976 .unwrap_or_else(|| "unknown".to_string()),
977 }
978}
979
980fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
981 match res {
982 Ok(res) => res.namespace.fmt_short(),
983 Err(err) => err
984 .namespace()
985 .map(|x| x.fmt_short())
986 .unwrap_or_else(|| "unknown".to_string()),
987 }
988}
989
990#[cfg(test)]
991mod tests {
992 use super::*;
993
994 #[tokio::test]
995 async fn test_sync_remove() {
996 let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
997 let (a_tx, a_rx) = async_channel::unbounded();
998 let (b_tx, b_rx) = async_channel::unbounded();
999 let mut subscribers = Subscribers::default();
1000 subscribers.subscribe(a_tx);
1001 subscribers.subscribe(b_tx);
1002 drop(a_rx);
1003 drop(b_rx);
1004 subscribers.send(Event::NeighborUp(pk)).await;
1005 }
1006}