1#![allow(missing_docs)]
2
3use std::{
4 collections::{HashMap, HashSet},
5 sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use futures_lite::FutureExt;
10use iroh::{
11 discovery::static_provider::StaticProvider, Endpoint, EndpointAddr, EndpointId, PublicKey,
12};
13use iroh_blobs::{
14 api::{
15 blobs::BlobStatus,
16 downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
17 Store,
18 },
19 Hash, HashAndFormat,
20};
21use iroh_gossip::net::Gossip;
22use n0_future::{task::JoinSet, time::SystemTime};
23use serde::{Deserialize, Serialize};
24use tokio::sync::{self, mpsc, oneshot};
25use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
26
27use super::state::{NamespaceStates, Origin, SyncReason};
29use crate::{
30 actor::{OpenOpts, SyncHandle},
31 engine::gossip::GossipState,
32 metrics::Metrics,
33 net::{
34 connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
35 SyncFinished,
36 },
37 AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
38};
39
40#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
44pub enum Op {
45 Put(SignedEntry),
47 ContentReady(Hash),
49 SyncReport(SyncReport),
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct SyncReport {
56 namespace: NamespaceId,
57 heads: Vec<u8>,
59}
60
61#[derive(derive_more::Debug, strum::Display)]
63pub enum ToLiveActor {
64 StartSync {
65 namespace: NamespaceId,
66 peers: Vec<EndpointAddr>,
67 #[debug("onsehot::Sender")]
68 reply: sync::oneshot::Sender<anyhow::Result<()>>,
69 },
70 Leave {
71 namespace: NamespaceId,
72 kill_subscribers: bool,
73 #[debug("onsehot::Sender")]
74 reply: sync::oneshot::Sender<anyhow::Result<()>>,
75 },
76 Shutdown {
77 reply: sync::oneshot::Sender<()>,
78 },
79 Subscribe {
80 namespace: NamespaceId,
81 #[debug("sender")]
82 sender: async_channel::Sender<Event>,
83 #[debug("oneshot::Sender")]
84 reply: sync::oneshot::Sender<Result<()>>,
85 },
86 HandleConnection {
87 conn: iroh::endpoint::Connection,
88 },
89 AcceptSyncRequest {
90 namespace: NamespaceId,
91 peer: PublicKey,
92 #[debug("oneshot::Sender")]
93 reply: sync::oneshot::Sender<AcceptOutcome>,
94 },
95
96 IncomingSyncReport {
97 from: PublicKey,
98 report: SyncReport,
99 },
100 NeighborContentReady {
101 namespace: NamespaceId,
102 node: PublicKey,
103 hash: Hash,
104 },
105 NeighborUp {
106 namespace: NamespaceId,
107 peer: PublicKey,
108 },
109 NeighborDown {
110 namespace: NamespaceId,
111 peer: PublicKey,
112 },
113}
114
115#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
117pub enum Event {
118 ContentReady {
120 hash: Hash,
122 },
123 NeighborUp(PublicKey),
125 NeighborDown(PublicKey),
127 SyncFinished(SyncEvent),
129 PendingContentReady,
137}
138
139type SyncConnectRes = (
140 NamespaceId,
141 PublicKey,
142 SyncReason,
143 Result<SyncFinished, ConnectError>,
144);
145type SyncAcceptRes = Result<SyncFinished, AcceptError>;
146type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
147
148pub struct LiveActor {
150 inbox: mpsc::Receiver<ToLiveActor>,
152 sync: SyncHandle,
153 endpoint: Endpoint,
154 bao_store: Store,
155 downloader: Downloader,
156 static_provider: StaticProvider,
157 replica_events_tx: async_channel::Sender<crate::Event>,
158 replica_events_rx: async_channel::Receiver<crate::Event>,
159
160 sync_actor_tx: mpsc::Sender<ToLiveActor>,
164 gossip: GossipState,
165
166 running_sync_connect: JoinSet<SyncConnectRes>,
168 running_sync_accept: JoinSet<SyncAcceptRes>,
170 download_tasks: JoinSet<DownloadRes>,
172 missing_hashes: HashSet<Hash>,
174 queued_hashes: QueuedHashes,
176 hash_providers: ProviderNodes,
178
179 subscribers: SubscribersMap,
181
182 state: NamespaceStates,
184 metrics: Arc<Metrics>,
185}
186impl LiveActor {
187 #[allow(clippy::too_many_arguments)]
189 pub fn new(
190 sync: SyncHandle,
191 endpoint: Endpoint,
192 gossip: Gossip,
193 bao_store: Store,
194 downloader: Downloader,
195 inbox: mpsc::Receiver<ToLiveActor>,
196 sync_actor_tx: mpsc::Sender<ToLiveActor>,
197 metrics: Arc<Metrics>,
198 ) -> Self {
199 let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
200 let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
201 let static_provider = StaticProvider::new();
202 endpoint.discovery().add(static_provider.clone());
203 Self {
204 inbox,
205 sync,
206 replica_events_rx,
207 replica_events_tx,
208 endpoint,
209 static_provider,
210 gossip: gossip_state,
211 bao_store,
212 downloader,
213 sync_actor_tx,
214 running_sync_connect: Default::default(),
215 running_sync_accept: Default::default(),
216 subscribers: Default::default(),
217 download_tasks: Default::default(),
218 state: Default::default(),
219 missing_hashes: Default::default(),
220 queued_hashes: Default::default(),
221 hash_providers: Default::default(),
222 metrics,
223 }
224 }
225
226 pub async fn run(mut self) -> Result<()> {
228 let shutdown_reply = self.run_inner().await;
229 if let Err(err) = self.shutdown().await {
230 error!(?err, "Error during shutdown");
231 }
232 drop(self);
233 match shutdown_reply {
234 Ok(reply) => {
235 reply.send(()).ok();
236 Ok(())
237 }
238 Err(err) => Err(err),
239 }
240 }
241
242 async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
243 let mut i = 0;
244 loop {
245 i += 1;
246 trace!(?i, "tick wait");
247 self.metrics.doc_live_tick_main.inc();
248 tokio::select! {
249 biased;
250 msg = self.inbox.recv() => {
251 let msg = msg.context("to_actor closed")?;
252 trace!(?i, %msg, "tick: to_actor");
253 self.metrics.doc_live_tick_actor.inc();
254 match msg {
255 ToLiveActor::Shutdown { reply } => {
256 break Ok(reply);
257 }
258 msg => {
259 self.on_actor_message(msg).await.context("on_actor_message")?;
260 }
261 }
262 }
263 event = self.replica_events_rx.recv() => {
264 trace!(?i, "tick: replica_event");
265 self.metrics.doc_live_tick_replica_event.inc();
266 let event = event.context("replica_events closed")?;
267 if let Err(err) = self.on_replica_event(event).await {
268 error!(?err, "Failed to process replica event");
269 }
270 }
271 Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
272 trace!(?i, "tick: running_sync_connect");
273 self.metrics.doc_live_tick_running_sync_connect.inc();
274 let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
275 self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
276
277 }
278 Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
279 trace!(?i, "tick: running_sync_accept");
280 self.metrics.doc_live_tick_running_sync_accept.inc();
281 let res = res.context("running_sync_accept closed")?;
282 self.on_sync_via_accept_finished(res).await;
283 }
284 Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
285 trace!(?i, "tick: pending_downloads");
286 self.metrics.doc_live_tick_pending_downloads.inc();
287 let (namespace, hash, res) = res.context("pending_downloads closed")?;
288 self.on_download_ready(namespace, hash, res).await;
289 }
290 res = self.gossip.progress(), if !self.gossip.is_empty() => {
291 if let Err(error) = res {
292 warn!(?error, "gossip state failed");
293 }
294 }
295 }
296 }
297 }
298
299 async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
300 match msg {
301 ToLiveActor::Shutdown { .. } => {
302 unreachable!("handled in run");
303 }
304 ToLiveActor::IncomingSyncReport { from, report } => {
305 self.on_sync_report(from, report).await
306 }
307 ToLiveActor::NeighborUp { namespace, peer } => {
308 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
309 self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
310 self.subscribers
311 .send(&namespace, Event::NeighborUp(peer))
312 .await;
313 }
314 ToLiveActor::NeighborDown { namespace, peer } => {
315 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
316 self.subscribers
317 .send(&namespace, Event::NeighborDown(peer))
318 .await;
319 }
320 ToLiveActor::StartSync {
321 namespace,
322 peers,
323 reply,
324 } => {
325 let res = self.start_sync(namespace, peers).await;
326 reply.send(res).ok();
327 }
328 ToLiveActor::Leave {
329 namespace,
330 kill_subscribers,
331 reply,
332 } => {
333 let res = self.leave(namespace, kill_subscribers).await;
334 reply.send(res).ok();
335 }
336 ToLiveActor::Subscribe {
337 namespace,
338 sender,
339 reply,
340 } => {
341 self.subscribers.subscribe(namespace, sender);
342 reply.send(Ok(())).ok();
343 }
344 ToLiveActor::HandleConnection { conn } => {
345 self.handle_connection(conn).await;
346 }
347 ToLiveActor::AcceptSyncRequest {
348 namespace,
349 peer,
350 reply,
351 } => {
352 let outcome = self.accept_sync_request(namespace, peer);
353 reply.send(outcome).ok();
354 }
355 ToLiveActor::NeighborContentReady {
356 namespace,
357 node,
358 hash,
359 } => {
360 self.on_neighbor_content_ready(namespace, node, hash).await;
361 }
362 };
363 Ok(true)
364 }
365
366 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
367 fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
368 if !self.state.start_connect(&namespace, peer, reason) {
369 return;
370 }
371 let endpoint = self.endpoint.clone();
372 let sync = self.sync.clone();
373 let metrics = self.metrics.clone();
374 let fut = async move {
375 let res = connect_and_sync(
376 &endpoint,
377 &sync,
378 namespace,
379 EndpointAddr::new(peer),
380 Some(&metrics),
381 )
382 .await;
383 (namespace, peer, reason, res)
384 }
385 .instrument(Span::current());
386 self.running_sync_connect.spawn(fut);
387 }
388
389 async fn shutdown(&mut self) -> anyhow::Result<()> {
390 self.subscribers.clear();
392 let (gossip_shutdown_res, _store) = tokio::join!(
393 self.gossip.shutdown(),
395 self.sync.shutdown()
397 );
398 gossip_shutdown_res?;
399 Ok(())
402 }
403
404 async fn start_sync(
405 &mut self,
406 namespace: NamespaceId,
407 mut peers: Vec<EndpointAddr>,
408 ) -> Result<()> {
409 debug!(?namespace, peers = peers.len(), "start sync");
410 if !self.state.is_syncing(&namespace) {
412 let opts = OpenOpts::default()
413 .sync()
414 .subscribe(self.replica_events_tx.clone());
415 self.sync.open(namespace, opts).await?;
416 self.state.insert(namespace);
417 }
418 match self.sync.get_sync_peers(namespace).await {
420 Ok(None) => {
421 }
423 Ok(Some(known_useful_peers)) => {
424 let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
425 match PublicKey::from_bytes(&peer_id_bytes) {
428 Ok(public_key) => Some(EndpointAddr::new(public_key)),
429 Err(_signing_error) => {
430 warn!("potential db corruption: peers per doc can't be decoded");
431 None
432 }
433 }
434 });
435 peers.extend(as_node_addr);
436 }
437 Err(e) => {
438 warn!(%e, "db error reading peers per document")
440 }
441 }
442 self.join_peers(namespace, peers).await?;
443 Ok(())
444 }
445
446 async fn leave(
447 &mut self,
448 namespace: NamespaceId,
449 kill_subscribers: bool,
450 ) -> anyhow::Result<()> {
451 if self.state.remove(&namespace) {
453 self.sync.set_sync(namespace, false).await?;
454 self.sync
455 .unsubscribe(namespace, self.replica_events_tx.clone())
456 .await?;
457 self.sync.close(namespace).await?;
458 self.gossip.quit(&namespace);
459 }
460 if kill_subscribers {
461 self.subscribers.remove(&namespace);
462 }
463 Ok(())
464 }
465
466 async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
467 let mut peer_ids = Vec::new();
468
469 for peer in peers.into_iter() {
471 let peer_id = peer.id;
472 if !peer.is_empty() {
475 self.static_provider.add_endpoint_info(peer);
476 }
477 peer_ids.push(peer_id);
478 }
479
480 self.gossip.join(namespace, peer_ids.clone()).await?;
482
483 if !peer_ids.is_empty() {
484 for peer in peer_ids {
486 self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
487 }
488 }
489 Ok(())
490 }
491
492 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
493 async fn on_sync_via_connect_finished(
494 &mut self,
495 namespace: NamespaceId,
496 peer: PublicKey,
497 reason: SyncReason,
498 result: Result<SyncFinished, ConnectError>,
499 ) {
500 match result {
501 Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
502 debug!(?reason, "remote abort, already syncing");
503 }
504 res => {
505 self.on_sync_finished(
506 namespace,
507 peer,
508 Origin::Connect(reason),
509 res.map_err(Into::into),
510 )
511 .await
512 }
513 }
514 }
515
516 #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
517 async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
518 match res {
519 Ok(state) => {
520 self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
521 .await
522 }
523 Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
524 debug!(?reason, "aborted by us");
526 }
527 Err(err) => {
528 if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
529 self.on_sync_finished(
530 namespace,
531 peer,
532 Origin::Accept,
533 Err(anyhow::Error::from(err)),
534 )
535 .await;
536 } else {
537 debug!(?err, "failed before reading the first message");
538 }
539 }
540 }
541 }
542
543 async fn on_sync_finished(
544 &mut self,
545 namespace: NamespaceId,
546 peer: PublicKey,
547 origin: Origin,
548 result: Result<SyncFinished>,
549 ) {
550 match &result {
551 Err(ref err) => {
552 warn!(?origin, ?err, "sync failed");
553 }
554 Ok(ref details) => {
555 info!(
556 sent = %details.outcome.num_sent,
557 recv = %details.outcome.num_recv,
558 t_connect = ?details.timings.connect,
559 t_process = ?details.timings.process,
560 "sync finished",
561 );
562
563 if let Err(e) = self
565 .sync
566 .register_useful_peer(namespace, *peer.as_bytes())
567 .await
568 {
569 debug!(%e, "failed to register peer for document")
570 }
571
572 if details.outcome.num_recv > 0 {
574 info!("broadcast sync report to neighbors");
575 match details
576 .outcome
577 .heads_received
578 .encode(Some(self.gossip.max_message_size()))
579 {
580 Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
581 Ok(heads) => {
582 let report = SyncReport { namespace, heads };
583 self.broadcast_neighbors(namespace, &Op::SyncReport(report))
584 .await;
585 }
586 }
587 }
588 }
589 };
590
591 let result_for_event = match &result {
592 Ok(details) => Ok(details.into()),
593 Err(err) => Err(err.to_string()),
594 };
595
596 let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
597 return;
598 };
599
600 let ev = SyncEvent {
601 peer,
602 origin,
603 result: result_for_event,
604 finished: SystemTime::now(),
605 started,
606 };
607 self.subscribers
608 .send(&namespace, Event::SyncFinished(ev))
609 .await;
610
611 if self.queued_hashes.contains_namespace(&namespace) {
617 self.state.set_may_emit_ready(&namespace, true);
618 } else {
619 self.subscribers
620 .send(&namespace, Event::PendingContentReady)
621 .await;
622 self.state.set_may_emit_ready(&namespace, false);
623 }
624
625 if resync {
626 self.sync_with_peer(namespace, peer, SyncReason::Resync);
627 }
628 }
629
630 async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
631 if !self.state.is_syncing(&namespace) {
632 return;
633 }
634
635 let msg = match postcard::to_stdvec(op) {
636 Ok(msg) => msg,
637 Err(err) => {
638 error!(?err, ?op, "Failed to serialize message:");
639 return;
640 }
641 };
642 self.gossip
644 .broadcast_neighbors(&namespace, msg.into())
645 .await;
646 }
647
648 async fn on_download_ready(
649 &mut self,
650 namespace: NamespaceId,
651 hash: Hash,
652 res: Result<(), anyhow::Error>,
653 ) {
654 let completed_namespaces = self.queued_hashes.remove_hash(&hash);
655 debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
656 if res.is_ok() {
657 self.subscribers
658 .send(&namespace, Event::ContentReady { hash })
659 .await;
660 self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
662 .await;
663 } else {
664 self.missing_hashes.insert(hash);
665 }
666 for namespace in completed_namespaces.iter() {
667 if let Some(true) = self.state.may_emit_ready(namespace) {
668 self.subscribers
669 .send(namespace, Event::PendingContentReady)
670 .await;
671 }
672 }
673 }
674
675 async fn on_neighbor_content_ready(
676 &mut self,
677 namespace: NamespaceId,
678 node: EndpointId,
679 hash: Hash,
680 ) {
681 self.start_download(namespace, hash, node, true).await;
682 }
683
684 #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
685 async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
686 let namespace = report.namespace;
687 if !self.state.is_syncing(&namespace) {
688 return;
689 }
690 let heads = match AuthorHeads::decode(&report.heads) {
691 Ok(heads) => heads,
692 Err(err) => {
693 warn!(?err, "failed to decode AuthorHeads");
694 return;
695 }
696 };
697 match self.sync.has_news_for_us(report.namespace, heads).await {
698 Ok(Some(updated_authors)) => {
699 info!(%updated_authors, "news reported: sync now");
700 self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
701 }
702 Ok(None) => {
703 debug!("no news reported: nothing to do");
704 }
705 Err(err) => {
706 warn!("sync actor error: {err:?}");
707 }
708 }
709 }
710
711 async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
712 match event {
713 crate::Event::LocalInsert { namespace, entry } => {
714 debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
715 if self.state.is_syncing(&namespace) {
717 let op = Op::Put(entry.clone());
718 let message = postcard::to_stdvec(&op)?.into();
719 self.gossip.broadcast(&namespace, message).await;
720 }
721 }
722 crate::Event::RemoteInsert {
723 namespace,
724 entry,
725 from,
726 should_download,
727 remote_content_status,
728 } => {
729 debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
730 if should_download {
733 let hash = entry.content_hash();
734 if matches!(remote_content_status, ContentStatus::Complete) {
735 let node_id = PublicKey::from_bytes(&from)?;
736 self.start_download(namespace, hash, node_id, false).await;
737 } else {
738 self.missing_hashes.insert(hash);
739 }
740 }
741 }
742 }
743
744 Ok(())
745 }
746
747 async fn start_download(
748 &mut self,
749 namespace: NamespaceId,
750 hash: Hash,
751 node: PublicKey,
752 only_if_missing: bool,
753 ) {
754 let entry_status = self.bao_store.blobs().status(hash).await;
755 if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
756 self.missing_hashes.remove(&hash);
757 return;
758 }
759 self.hash_providers
760 .0
761 .lock()
762 .expect("poisoned")
763 .entry(hash)
764 .or_default()
765 .insert(node);
766 if self.queued_hashes.contains_hash(&hash) {
767 self.queued_hashes.insert(hash, namespace);
768 } else if !only_if_missing || self.missing_hashes.contains(&hash) {
769 let req = DownloadRequest::new(
770 HashAndFormat::raw(hash),
771 self.hash_providers.clone(),
772 SplitStrategy::None,
773 );
774 let handle = self.downloader.download_with_opts(req);
775
776 self.queued_hashes.insert(hash, namespace);
777 self.missing_hashes.remove(&hash);
778 self.download_tasks
779 .spawn(async move { (namespace, hash, handle.await) });
780 }
781 }
782
783 #[instrument("accept", skip_all)]
784 pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
785 let to_actor_tx = self.sync_actor_tx.clone();
786 let accept_request_cb = move |namespace, peer| {
787 let to_actor_tx = to_actor_tx.clone();
788 async move {
789 let (reply_tx, reply_rx) = oneshot::channel();
790 to_actor_tx
791 .send(ToLiveActor::AcceptSyncRequest {
792 namespace,
793 peer,
794 reply: reply_tx,
795 })
796 .await
797 .ok();
798 match reply_rx.await {
799 Ok(outcome) => outcome,
800 Err(err) => {
801 warn!(
802 "accept request callback failed to retrieve reply from actor: {err:?}"
803 );
804 AcceptOutcome::Reject(AbortReason::InternalServerError)
805 }
806 }
807 }
808 .boxed()
809 };
810 debug!("incoming connection");
811 let sync = self.sync.clone();
812 let metrics = self.metrics.clone();
813 self.running_sync_accept.spawn(
814 async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
815 .instrument(Span::current()),
816 );
817 }
818
819 pub fn accept_sync_request(
820 &mut self,
821 namespace: NamespaceId,
822 peer: PublicKey,
823 ) -> AcceptOutcome {
824 self.state
825 .accept_request(&self.endpoint.id(), &namespace, peer)
826 }
827}
828
829#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
831pub struct SyncEvent {
832 pub peer: PublicKey,
834 pub origin: Origin,
836 pub finished: SystemTime,
838 pub started: SystemTime,
840 pub result: std::result::Result<SyncDetails, String>,
842}
843
844#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
845pub struct SyncDetails {
846 pub entries_received: usize,
848 pub entries_sent: usize,
850}
851
852impl From<&SyncFinished> for SyncDetails {
853 fn from(value: &SyncFinished) -> Self {
854 Self {
855 entries_received: value.outcome.num_recv,
856 entries_sent: value.outcome.num_sent,
857 }
858 }
859}
860
861#[derive(Debug, Default)]
862struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
863
864impl SubscribersMap {
865 fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
866 self.0.entry(namespace).or_default().subscribe(sender);
867 }
868
869 async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
870 debug!(namespace=%namespace.fmt_short(), %event, "emit event");
871 let Some(subscribers) = self.0.get_mut(namespace) else {
872 return false;
873 };
874
875 if !subscribers.send(event).await {
876 self.0.remove(namespace);
877 }
878 true
879 }
880
881 fn remove(&mut self, namespace: &NamespaceId) {
882 self.0.remove(namespace);
883 }
884
885 fn clear(&mut self) {
886 self.0.clear();
887 }
888}
889
890#[derive(Debug, Default)]
891struct QueuedHashes {
892 by_hash: HashMap<Hash, HashSet<NamespaceId>>,
893 by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
894}
895
896#[derive(Debug, Clone, Default)]
897struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<EndpointId>>>>);
898
899impl ContentDiscovery for ProviderNodes {
900 fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<EndpointId> {
901 let nodes = self
902 .0
903 .lock()
904 .expect("poisoned")
905 .get(&hash.hash)
906 .into_iter()
907 .flatten()
908 .cloned()
909 .collect::<Vec<_>>();
910 Box::pin(n0_future::stream::iter(nodes))
911 }
912}
913
914impl QueuedHashes {
915 fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
916 self.by_hash.entry(hash).or_default().insert(namespace);
917 self.by_namespace.entry(namespace).or_default().insert(hash);
918 }
919
920 fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
924 let namespaces = self.by_hash.remove(hash).unwrap_or_default();
925 let mut removed_namespaces = vec![];
926 for namespace in namespaces {
927 if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
928 hashes.remove(hash);
929 if hashes.is_empty() {
930 self.by_namespace.remove(&namespace);
931 removed_namespaces.push(namespace);
932 }
933 }
934 }
935 removed_namespaces
936 }
937
938 fn contains_hash(&self, hash: &Hash) -> bool {
939 self.by_hash.contains_key(hash)
940 }
941
942 fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
943 self.by_namespace.contains_key(namespace)
944 }
945}
946
947#[derive(Debug, Default)]
948struct Subscribers(Vec<async_channel::Sender<Event>>);
949
950impl Subscribers {
951 fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
952 self.0.push(sender)
953 }
954
955 async fn send(&mut self, event: Event) -> bool {
956 let futs = self.0.iter().map(|sender| sender.send(event.clone()));
957 let res = futures_buffered::join_all(futs).await;
958 for (i, res) in res.into_iter().enumerate().rev() {
960 if res.is_err() {
961 self.0.remove(i);
962 }
963 }
964 !self.0.is_empty()
965 }
966}
967
968fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
969 match res {
970 Ok(res) => res.peer.fmt_short().to_string(),
971 Err(err) => err
972 .peer()
973 .map(|x| x.fmt_short().to_string())
974 .unwrap_or_else(|| "unknown".to_string()),
975 }
976}
977
978fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
979 match res {
980 Ok(res) => res.namespace.fmt_short(),
981 Err(err) => err
982 .namespace()
983 .map(|x| x.fmt_short())
984 .unwrap_or_else(|| "unknown".to_string()),
985 }
986}
987
988#[cfg(test)]
989mod tests {
990 use super::*;
991
992 #[tokio::test]
993 async fn test_sync_remove() {
994 let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
995 let (a_tx, a_rx) = async_channel::unbounded();
996 let (b_tx, b_rx) = async_channel::unbounded();
997 let mut subscribers = Subscribers::default();
998 subscribers.subscribe(a_tx);
999 subscribers.subscribe(b_tx);
1000 drop(a_rx);
1001 drop(b_rx);
1002 subscribers.send(Event::NeighborUp(pk)).await;
1003 }
1004}