1#![allow(missing_docs)]
2
3use std::{
4 collections::{HashMap, HashSet},
5 sync::Arc,
6 time::SystemTime,
7};
8
9use anyhow::{Context, Result};
10use futures_lite::FutureExt;
11use iroh::{
12 discovery::static_provider::StaticProvider, Endpoint, EndpointAddr, EndpointId, PublicKey,
13};
14use iroh_blobs::{
15 api::{
16 blobs::BlobStatus,
17 downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
18 Store,
19 },
20 Hash, HashAndFormat,
21};
22use iroh_gossip::net::Gossip;
23use serde::{Deserialize, Serialize};
24use tokio::{
25 sync::{self, mpsc, oneshot},
26 task::JoinSet,
27};
28use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
29
30use super::state::{NamespaceStates, Origin, SyncReason};
32use crate::{
33 actor::{OpenOpts, SyncHandle},
34 engine::gossip::GossipState,
35 metrics::Metrics,
36 net::{
37 connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
38 SyncFinished,
39 },
40 AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
41};
42
43#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
47pub enum Op {
48 Put(SignedEntry),
50 ContentReady(Hash),
52 SyncReport(SyncReport),
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct SyncReport {
59 namespace: NamespaceId,
60 heads: Vec<u8>,
62}
63
64#[derive(derive_more::Debug, strum::Display)]
66pub enum ToLiveActor {
67 StartSync {
68 namespace: NamespaceId,
69 peers: Vec<EndpointAddr>,
70 #[debug("onsehot::Sender")]
71 reply: sync::oneshot::Sender<anyhow::Result<()>>,
72 },
73 Leave {
74 namespace: NamespaceId,
75 kill_subscribers: bool,
76 #[debug("onsehot::Sender")]
77 reply: sync::oneshot::Sender<anyhow::Result<()>>,
78 },
79 Shutdown {
80 reply: sync::oneshot::Sender<()>,
81 },
82 Subscribe {
83 namespace: NamespaceId,
84 #[debug("sender")]
85 sender: async_channel::Sender<Event>,
86 #[debug("oneshot::Sender")]
87 reply: sync::oneshot::Sender<Result<()>>,
88 },
89 HandleConnection {
90 conn: iroh::endpoint::Connection,
91 },
92 AcceptSyncRequest {
93 namespace: NamespaceId,
94 peer: PublicKey,
95 #[debug("oneshot::Sender")]
96 reply: sync::oneshot::Sender<AcceptOutcome>,
97 },
98
99 IncomingSyncReport {
100 from: PublicKey,
101 report: SyncReport,
102 },
103 NeighborContentReady {
104 namespace: NamespaceId,
105 node: PublicKey,
106 hash: Hash,
107 },
108 NeighborUp {
109 namespace: NamespaceId,
110 peer: PublicKey,
111 },
112 NeighborDown {
113 namespace: NamespaceId,
114 peer: PublicKey,
115 },
116}
117
118#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
120pub enum Event {
121 ContentReady {
123 hash: Hash,
125 },
126 NeighborUp(PublicKey),
128 NeighborDown(PublicKey),
130 SyncFinished(SyncEvent),
132 PendingContentReady,
140}
141
142type SyncConnectRes = (
143 NamespaceId,
144 PublicKey,
145 SyncReason,
146 Result<SyncFinished, ConnectError>,
147);
148type SyncAcceptRes = Result<SyncFinished, AcceptError>;
149type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
150
151pub struct LiveActor {
153 inbox: mpsc::Receiver<ToLiveActor>,
155 sync: SyncHandle,
156 endpoint: Endpoint,
157 bao_store: Store,
158 downloader: Downloader,
159 static_provider: StaticProvider,
160 replica_events_tx: async_channel::Sender<crate::Event>,
161 replica_events_rx: async_channel::Receiver<crate::Event>,
162
163 sync_actor_tx: mpsc::Sender<ToLiveActor>,
167 gossip: GossipState,
168
169 running_sync_connect: JoinSet<SyncConnectRes>,
171 running_sync_accept: JoinSet<SyncAcceptRes>,
173 download_tasks: JoinSet<DownloadRes>,
175 missing_hashes: HashSet<Hash>,
177 queued_hashes: QueuedHashes,
179 hash_providers: ProviderNodes,
181
182 subscribers: SubscribersMap,
184
185 state: NamespaceStates,
187 metrics: Arc<Metrics>,
188}
189impl LiveActor {
190 #[allow(clippy::too_many_arguments)]
192 pub fn new(
193 sync: SyncHandle,
194 endpoint: Endpoint,
195 gossip: Gossip,
196 bao_store: Store,
197 downloader: Downloader,
198 inbox: mpsc::Receiver<ToLiveActor>,
199 sync_actor_tx: mpsc::Sender<ToLiveActor>,
200 metrics: Arc<Metrics>,
201 ) -> Self {
202 let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
203 let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
204 let static_provider = StaticProvider::new();
205 endpoint.discovery().add(static_provider.clone());
206 Self {
207 inbox,
208 sync,
209 replica_events_rx,
210 replica_events_tx,
211 endpoint,
212 static_provider,
213 gossip: gossip_state,
214 bao_store,
215 downloader,
216 sync_actor_tx,
217 running_sync_connect: Default::default(),
218 running_sync_accept: Default::default(),
219 subscribers: Default::default(),
220 download_tasks: Default::default(),
221 state: Default::default(),
222 missing_hashes: Default::default(),
223 queued_hashes: Default::default(),
224 hash_providers: Default::default(),
225 metrics,
226 }
227 }
228
229 pub async fn run(mut self) -> Result<()> {
231 let shutdown_reply = self.run_inner().await;
232 if let Err(err) = self.shutdown().await {
233 error!(?err, "Error during shutdown");
234 }
235 drop(self);
236 match shutdown_reply {
237 Ok(reply) => {
238 reply.send(()).ok();
239 Ok(())
240 }
241 Err(err) => Err(err),
242 }
243 }
244
245 async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
246 let mut i = 0;
247 loop {
248 i += 1;
249 trace!(?i, "tick wait");
250 self.metrics.doc_live_tick_main.inc();
251 tokio::select! {
252 biased;
253 msg = self.inbox.recv() => {
254 let msg = msg.context("to_actor closed")?;
255 trace!(?i, %msg, "tick: to_actor");
256 self.metrics.doc_live_tick_actor.inc();
257 match msg {
258 ToLiveActor::Shutdown { reply } => {
259 break Ok(reply);
260 }
261 msg => {
262 self.on_actor_message(msg).await.context("on_actor_message")?;
263 }
264 }
265 }
266 event = self.replica_events_rx.recv() => {
267 trace!(?i, "tick: replica_event");
268 self.metrics.doc_live_tick_replica_event.inc();
269 let event = event.context("replica_events closed")?;
270 if let Err(err) = self.on_replica_event(event).await {
271 error!(?err, "Failed to process replica event");
272 }
273 }
274 Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
275 trace!(?i, "tick: running_sync_connect");
276 self.metrics.doc_live_tick_running_sync_connect.inc();
277 let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
278 self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
279
280 }
281 Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
282 trace!(?i, "tick: running_sync_accept");
283 self.metrics.doc_live_tick_running_sync_accept.inc();
284 let res = res.context("running_sync_accept closed")?;
285 self.on_sync_via_accept_finished(res).await;
286 }
287 Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
288 trace!(?i, "tick: pending_downloads");
289 self.metrics.doc_live_tick_pending_downloads.inc();
290 let (namespace, hash, res) = res.context("pending_downloads closed")?;
291 self.on_download_ready(namespace, hash, res).await;
292 }
293 res = self.gossip.progress(), if !self.gossip.is_empty() => {
294 if let Err(error) = res {
295 warn!(?error, "gossip state failed");
296 }
297 }
298 }
299 }
300 }
301
302 async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
303 match msg {
304 ToLiveActor::Shutdown { .. } => {
305 unreachable!("handled in run");
306 }
307 ToLiveActor::IncomingSyncReport { from, report } => {
308 self.on_sync_report(from, report).await
309 }
310 ToLiveActor::NeighborUp { namespace, peer } => {
311 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
312 self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
313 self.subscribers
314 .send(&namespace, Event::NeighborUp(peer))
315 .await;
316 }
317 ToLiveActor::NeighborDown { namespace, peer } => {
318 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
319 self.subscribers
320 .send(&namespace, Event::NeighborDown(peer))
321 .await;
322 }
323 ToLiveActor::StartSync {
324 namespace,
325 peers,
326 reply,
327 } => {
328 let res = self.start_sync(namespace, peers).await;
329 reply.send(res).ok();
330 }
331 ToLiveActor::Leave {
332 namespace,
333 kill_subscribers,
334 reply,
335 } => {
336 let res = self.leave(namespace, kill_subscribers).await;
337 reply.send(res).ok();
338 }
339 ToLiveActor::Subscribe {
340 namespace,
341 sender,
342 reply,
343 } => {
344 self.subscribers.subscribe(namespace, sender);
345 reply.send(Ok(())).ok();
346 }
347 ToLiveActor::HandleConnection { conn } => {
348 self.handle_connection(conn).await;
349 }
350 ToLiveActor::AcceptSyncRequest {
351 namespace,
352 peer,
353 reply,
354 } => {
355 let outcome = self.accept_sync_request(namespace, peer);
356 reply.send(outcome).ok();
357 }
358 ToLiveActor::NeighborContentReady {
359 namespace,
360 node,
361 hash,
362 } => {
363 self.on_neighbor_content_ready(namespace, node, hash).await;
364 }
365 };
366 Ok(true)
367 }
368
369 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
370 fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
371 if !self.state.start_connect(&namespace, peer, reason) {
372 return;
373 }
374 let endpoint = self.endpoint.clone();
375 let sync = self.sync.clone();
376 let metrics = self.metrics.clone();
377 let fut = async move {
378 let res = connect_and_sync(
379 &endpoint,
380 &sync,
381 namespace,
382 EndpointAddr::new(peer),
383 Some(&metrics),
384 )
385 .await;
386 (namespace, peer, reason, res)
387 }
388 .instrument(Span::current());
389 self.running_sync_connect.spawn(fut);
390 }
391
392 async fn shutdown(&mut self) -> anyhow::Result<()> {
393 self.subscribers.clear();
395 let (gossip_shutdown_res, _store) = tokio::join!(
396 self.gossip.shutdown(),
398 self.sync.shutdown()
400 );
401 gossip_shutdown_res?;
402 Ok(())
405 }
406
407 async fn start_sync(
408 &mut self,
409 namespace: NamespaceId,
410 mut peers: Vec<EndpointAddr>,
411 ) -> Result<()> {
412 debug!(?namespace, peers = peers.len(), "start sync");
413 if !self.state.is_syncing(&namespace) {
415 let opts = OpenOpts::default()
416 .sync()
417 .subscribe(self.replica_events_tx.clone());
418 self.sync.open(namespace, opts).await?;
419 self.state.insert(namespace);
420 }
421 match self.sync.get_sync_peers(namespace).await {
423 Ok(None) => {
424 }
426 Ok(Some(known_useful_peers)) => {
427 let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
428 match PublicKey::from_bytes(&peer_id_bytes) {
431 Ok(public_key) => Some(EndpointAddr::new(public_key)),
432 Err(_signing_error) => {
433 warn!("potential db corruption: peers per doc can't be decoded");
434 None
435 }
436 }
437 });
438 peers.extend(as_node_addr);
439 }
440 Err(e) => {
441 warn!(%e, "db error reading peers per document")
443 }
444 }
445 self.join_peers(namespace, peers).await?;
446 Ok(())
447 }
448
449 async fn leave(
450 &mut self,
451 namespace: NamespaceId,
452 kill_subscribers: bool,
453 ) -> anyhow::Result<()> {
454 if self.state.remove(&namespace) {
456 self.sync.set_sync(namespace, false).await?;
457 self.sync
458 .unsubscribe(namespace, self.replica_events_tx.clone())
459 .await?;
460 self.sync.close(namespace).await?;
461 self.gossip.quit(&namespace);
462 }
463 if kill_subscribers {
464 self.subscribers.remove(&namespace);
465 }
466 Ok(())
467 }
468
469 async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
470 let mut peer_ids = Vec::new();
471
472 for peer in peers.into_iter() {
474 let peer_id = peer.id;
475 if !peer.is_empty() {
478 self.static_provider.add_endpoint_info(peer);
479 }
480 peer_ids.push(peer_id);
481 }
482
483 self.gossip.join(namespace, peer_ids.clone()).await?;
485
486 if !peer_ids.is_empty() {
487 for peer in peer_ids {
489 self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
490 }
491 }
492 Ok(())
493 }
494
495 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
496 async fn on_sync_via_connect_finished(
497 &mut self,
498 namespace: NamespaceId,
499 peer: PublicKey,
500 reason: SyncReason,
501 result: Result<SyncFinished, ConnectError>,
502 ) {
503 match result {
504 Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
505 debug!(?reason, "remote abort, already syncing");
506 }
507 res => {
508 self.on_sync_finished(
509 namespace,
510 peer,
511 Origin::Connect(reason),
512 res.map_err(Into::into),
513 )
514 .await
515 }
516 }
517 }
518
519 #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
520 async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
521 match res {
522 Ok(state) => {
523 self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
524 .await
525 }
526 Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
527 debug!(?reason, "aborted by us");
529 }
530 Err(err) => {
531 if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
532 self.on_sync_finished(
533 namespace,
534 peer,
535 Origin::Accept,
536 Err(anyhow::Error::from(err)),
537 )
538 .await;
539 } else {
540 debug!(?err, "failed before reading the first message");
541 }
542 }
543 }
544 }
545
546 async fn on_sync_finished(
547 &mut self,
548 namespace: NamespaceId,
549 peer: PublicKey,
550 origin: Origin,
551 result: Result<SyncFinished>,
552 ) {
553 match &result {
554 Err(ref err) => {
555 warn!(?origin, ?err, "sync failed");
556 }
557 Ok(ref details) => {
558 info!(
559 sent = %details.outcome.num_sent,
560 recv = %details.outcome.num_recv,
561 t_connect = ?details.timings.connect,
562 t_process = ?details.timings.process,
563 "sync finished",
564 );
565
566 if let Err(e) = self
568 .sync
569 .register_useful_peer(namespace, *peer.as_bytes())
570 .await
571 {
572 debug!(%e, "failed to register peer for document")
573 }
574
575 if details.outcome.num_recv > 0 {
577 info!("broadcast sync report to neighbors");
578 match details
579 .outcome
580 .heads_received
581 .encode(Some(self.gossip.max_message_size()))
582 {
583 Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
584 Ok(heads) => {
585 let report = SyncReport { namespace, heads };
586 self.broadcast_neighbors(namespace, &Op::SyncReport(report))
587 .await;
588 }
589 }
590 }
591 }
592 };
593
594 let result_for_event = match &result {
595 Ok(details) => Ok(details.into()),
596 Err(err) => Err(err.to_string()),
597 };
598
599 let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
600 return;
601 };
602
603 let ev = SyncEvent {
604 peer,
605 origin,
606 result: result_for_event,
607 finished: SystemTime::now(),
608 started,
609 };
610 self.subscribers
611 .send(&namespace, Event::SyncFinished(ev))
612 .await;
613
614 if self.queued_hashes.contains_namespace(&namespace) {
620 self.state.set_may_emit_ready(&namespace, true);
621 } else {
622 self.subscribers
623 .send(&namespace, Event::PendingContentReady)
624 .await;
625 self.state.set_may_emit_ready(&namespace, false);
626 }
627
628 if resync {
629 self.sync_with_peer(namespace, peer, SyncReason::Resync);
630 }
631 }
632
633 async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
634 if !self.state.is_syncing(&namespace) {
635 return;
636 }
637
638 let msg = match postcard::to_stdvec(op) {
639 Ok(msg) => msg,
640 Err(err) => {
641 error!(?err, ?op, "Failed to serialize message:");
642 return;
643 }
644 };
645 self.gossip
647 .broadcast_neighbors(&namespace, msg.into())
648 .await;
649 }
650
651 async fn on_download_ready(
652 &mut self,
653 namespace: NamespaceId,
654 hash: Hash,
655 res: Result<(), anyhow::Error>,
656 ) {
657 let completed_namespaces = self.queued_hashes.remove_hash(&hash);
658 debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
659 if res.is_ok() {
660 self.subscribers
661 .send(&namespace, Event::ContentReady { hash })
662 .await;
663 self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
665 .await;
666 } else {
667 self.missing_hashes.insert(hash);
668 }
669 for namespace in completed_namespaces.iter() {
670 if let Some(true) = self.state.may_emit_ready(namespace) {
671 self.subscribers
672 .send(namespace, Event::PendingContentReady)
673 .await;
674 }
675 }
676 }
677
678 async fn on_neighbor_content_ready(
679 &mut self,
680 namespace: NamespaceId,
681 node: EndpointId,
682 hash: Hash,
683 ) {
684 self.start_download(namespace, hash, node, true).await;
685 }
686
687 #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
688 async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
689 let namespace = report.namespace;
690 if !self.state.is_syncing(&namespace) {
691 return;
692 }
693 let heads = match AuthorHeads::decode(&report.heads) {
694 Ok(heads) => heads,
695 Err(err) => {
696 warn!(?err, "failed to decode AuthorHeads");
697 return;
698 }
699 };
700 match self.sync.has_news_for_us(report.namespace, heads).await {
701 Ok(Some(updated_authors)) => {
702 info!(%updated_authors, "news reported: sync now");
703 self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
704 }
705 Ok(None) => {
706 debug!("no news reported: nothing to do");
707 }
708 Err(err) => {
709 warn!("sync actor error: {err:?}");
710 }
711 }
712 }
713
714 async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
715 match event {
716 crate::Event::LocalInsert { namespace, entry } => {
717 debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
718 if self.state.is_syncing(&namespace) {
720 let op = Op::Put(entry.clone());
721 let message = postcard::to_stdvec(&op)?.into();
722 self.gossip.broadcast(&namespace, message).await;
723 }
724 }
725 crate::Event::RemoteInsert {
726 namespace,
727 entry,
728 from,
729 should_download,
730 remote_content_status,
731 } => {
732 debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
733 if should_download {
736 let hash = entry.content_hash();
737 if matches!(remote_content_status, ContentStatus::Complete) {
738 let node_id = PublicKey::from_bytes(&from)?;
739 self.start_download(namespace, hash, node_id, false).await;
740 } else {
741 self.missing_hashes.insert(hash);
742 }
743 }
744 }
745 }
746
747 Ok(())
748 }
749
750 async fn start_download(
751 &mut self,
752 namespace: NamespaceId,
753 hash: Hash,
754 node: PublicKey,
755 only_if_missing: bool,
756 ) {
757 let entry_status = self.bao_store.blobs().status(hash).await;
758 if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
759 self.missing_hashes.remove(&hash);
760 return;
761 }
762 self.hash_providers
763 .0
764 .lock()
765 .expect("poisoned")
766 .entry(hash)
767 .or_default()
768 .insert(node);
769 if self.queued_hashes.contains_hash(&hash) {
770 self.queued_hashes.insert(hash, namespace);
771 } else if !only_if_missing || self.missing_hashes.contains(&hash) {
772 let req = DownloadRequest::new(
773 HashAndFormat::raw(hash),
774 self.hash_providers.clone(),
775 SplitStrategy::None,
776 );
777 let handle = self.downloader.download_with_opts(req);
778
779 self.queued_hashes.insert(hash, namespace);
780 self.missing_hashes.remove(&hash);
781 self.download_tasks
782 .spawn(async move { (namespace, hash, handle.await) });
783 }
784 }
785
786 #[instrument("accept", skip_all)]
787 pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
788 let to_actor_tx = self.sync_actor_tx.clone();
789 let accept_request_cb = move |namespace, peer| {
790 let to_actor_tx = to_actor_tx.clone();
791 async move {
792 let (reply_tx, reply_rx) = oneshot::channel();
793 to_actor_tx
794 .send(ToLiveActor::AcceptSyncRequest {
795 namespace,
796 peer,
797 reply: reply_tx,
798 })
799 .await
800 .ok();
801 match reply_rx.await {
802 Ok(outcome) => outcome,
803 Err(err) => {
804 warn!(
805 "accept request callback failed to retrieve reply from actor: {err:?}"
806 );
807 AcceptOutcome::Reject(AbortReason::InternalServerError)
808 }
809 }
810 }
811 .boxed()
812 };
813 debug!("incoming connection");
814 let sync = self.sync.clone();
815 let metrics = self.metrics.clone();
816 self.running_sync_accept.spawn(
817 async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
818 .instrument(Span::current()),
819 );
820 }
821
822 pub fn accept_sync_request(
823 &mut self,
824 namespace: NamespaceId,
825 peer: PublicKey,
826 ) -> AcceptOutcome {
827 self.state
828 .accept_request(&self.endpoint.id(), &namespace, peer)
829 }
830}
831
832#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
834pub struct SyncEvent {
835 pub peer: PublicKey,
837 pub origin: Origin,
839 pub finished: SystemTime,
841 pub started: SystemTime,
843 pub result: std::result::Result<SyncDetails, String>,
845}
846
847#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
848pub struct SyncDetails {
849 pub entries_received: usize,
851 pub entries_sent: usize,
853}
854
855impl From<&SyncFinished> for SyncDetails {
856 fn from(value: &SyncFinished) -> Self {
857 Self {
858 entries_received: value.outcome.num_recv,
859 entries_sent: value.outcome.num_sent,
860 }
861 }
862}
863
864#[derive(Debug, Default)]
865struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
866
867impl SubscribersMap {
868 fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
869 self.0.entry(namespace).or_default().subscribe(sender);
870 }
871
872 async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
873 debug!(namespace=%namespace.fmt_short(), %event, "emit event");
874 let Some(subscribers) = self.0.get_mut(namespace) else {
875 return false;
876 };
877
878 if !subscribers.send(event).await {
879 self.0.remove(namespace);
880 }
881 true
882 }
883
884 fn remove(&mut self, namespace: &NamespaceId) {
885 self.0.remove(namespace);
886 }
887
888 fn clear(&mut self) {
889 self.0.clear();
890 }
891}
892
893#[derive(Debug, Default)]
894struct QueuedHashes {
895 by_hash: HashMap<Hash, HashSet<NamespaceId>>,
896 by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
897}
898
899#[derive(Debug, Clone, Default)]
900struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<EndpointId>>>>);
901
902impl ContentDiscovery for ProviderNodes {
903 fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<EndpointId> {
904 let nodes = self
905 .0
906 .lock()
907 .expect("poisoned")
908 .get(&hash.hash)
909 .into_iter()
910 .flatten()
911 .cloned()
912 .collect::<Vec<_>>();
913 Box::pin(n0_future::stream::iter(nodes))
914 }
915}
916
917impl QueuedHashes {
918 fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
919 self.by_hash.entry(hash).or_default().insert(namespace);
920 self.by_namespace.entry(namespace).or_default().insert(hash);
921 }
922
923 fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
927 let namespaces = self.by_hash.remove(hash).unwrap_or_default();
928 let mut removed_namespaces = vec![];
929 for namespace in namespaces {
930 if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
931 hashes.remove(hash);
932 if hashes.is_empty() {
933 self.by_namespace.remove(&namespace);
934 removed_namespaces.push(namespace);
935 }
936 }
937 }
938 removed_namespaces
939 }
940
941 fn contains_hash(&self, hash: &Hash) -> bool {
942 self.by_hash.contains_key(hash)
943 }
944
945 fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
946 self.by_namespace.contains_key(namespace)
947 }
948}
949
950#[derive(Debug, Default)]
951struct Subscribers(Vec<async_channel::Sender<Event>>);
952
953impl Subscribers {
954 fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
955 self.0.push(sender)
956 }
957
958 async fn send(&mut self, event: Event) -> bool {
959 let futs = self.0.iter().map(|sender| sender.send(event.clone()));
960 let res = futures_buffered::join_all(futs).await;
961 for (i, res) in res.into_iter().enumerate().rev() {
963 if res.is_err() {
964 self.0.remove(i);
965 }
966 }
967 !self.0.is_empty()
968 }
969}
970
971fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
972 match res {
973 Ok(res) => res.peer.fmt_short().to_string(),
974 Err(err) => err
975 .peer()
976 .map(|x| x.fmt_short().to_string())
977 .unwrap_or_else(|| "unknown".to_string()),
978 }
979}
980
981fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
982 match res {
983 Ok(res) => res.namespace.fmt_short(),
984 Err(err) => err
985 .namespace()
986 .map(|x| x.fmt_short())
987 .unwrap_or_else(|| "unknown".to_string()),
988 }
989}
990
991#[cfg(test)]
992mod tests {
993 use super::*;
994
995 #[tokio::test]
996 async fn test_sync_remove() {
997 let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
998 let (a_tx, a_rx) = async_channel::unbounded();
999 let (b_tx, b_rx) = async_channel::unbounded();
1000 let mut subscribers = Subscribers::default();
1001 subscribers.subscribe(a_tx);
1002 subscribers.subscribe(b_tx);
1003 drop(a_rx);
1004 drop(b_rx);
1005 subscribers.send(Event::NeighborUp(pk)).await;
1006 }
1007}