iroh_docs/engine/
live.rs

1#![allow(missing_docs)]
2
3use std::{
4    collections::{HashMap, HashSet},
5    sync::Arc,
6    time::SystemTime,
7};
8
9use anyhow::{Context, Result};
10use futures_lite::FutureExt;
11use iroh::{
12    discovery::static_provider::StaticProvider, Endpoint, EndpointAddr, EndpointId, PublicKey,
13};
14use iroh_blobs::{
15    api::{
16        blobs::BlobStatus,
17        downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
18        Store,
19    },
20    Hash, HashAndFormat,
21};
22use iroh_gossip::net::Gossip;
23use serde::{Deserialize, Serialize};
24use tokio::{
25    sync::{self, mpsc, oneshot},
26    task::JoinSet,
27};
28use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
29
30// use super::gossip::{GossipActor, ToGossipActor};
31use super::state::{NamespaceStates, Origin, SyncReason};
32use crate::{
33    actor::{OpenOpts, SyncHandle},
34    engine::gossip::GossipState,
35    metrics::Metrics,
36    net::{
37        connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
38        SyncFinished,
39    },
40    AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
41};
42
43/// An iroh-docs operation
44///
45/// This is the message that is broadcast over iroh-gossip.
46#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
47pub enum Op {
48    /// A new entry was inserted into the document.
49    Put(SignedEntry),
50    /// A peer now has content available for a hash.
51    ContentReady(Hash),
52    /// We synced with another peer, here's the news.
53    SyncReport(SyncReport),
54}
55
56/// Report of a successful sync with the new heads.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct SyncReport {
59    namespace: NamespaceId,
60    /// Encoded [`AuthorHeads`]
61    heads: Vec<u8>,
62}
63
64/// Messages to the sync actor
65#[derive(derive_more::Debug, strum::Display)]
66pub enum ToLiveActor {
67    StartSync {
68        namespace: NamespaceId,
69        peers: Vec<EndpointAddr>,
70        #[debug("onsehot::Sender")]
71        reply: sync::oneshot::Sender<anyhow::Result<()>>,
72    },
73    Leave {
74        namespace: NamespaceId,
75        kill_subscribers: bool,
76        #[debug("onsehot::Sender")]
77        reply: sync::oneshot::Sender<anyhow::Result<()>>,
78    },
79    Shutdown {
80        reply: sync::oneshot::Sender<()>,
81    },
82    Subscribe {
83        namespace: NamespaceId,
84        #[debug("sender")]
85        sender: async_channel::Sender<Event>,
86        #[debug("oneshot::Sender")]
87        reply: sync::oneshot::Sender<Result<()>>,
88    },
89    HandleConnection {
90        conn: iroh::endpoint::Connection,
91    },
92    AcceptSyncRequest {
93        namespace: NamespaceId,
94        peer: PublicKey,
95        #[debug("oneshot::Sender")]
96        reply: sync::oneshot::Sender<AcceptOutcome>,
97    },
98
99    IncomingSyncReport {
100        from: PublicKey,
101        report: SyncReport,
102    },
103    NeighborContentReady {
104        namespace: NamespaceId,
105        node: PublicKey,
106        hash: Hash,
107    },
108    NeighborUp {
109        namespace: NamespaceId,
110        peer: PublicKey,
111    },
112    NeighborDown {
113        namespace: NamespaceId,
114        peer: PublicKey,
115    },
116}
117
118/// Events informing about actions of the live sync progress.
119#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
120pub enum Event {
121    /// The content of an entry was downloaded and is now available at the local node
122    ContentReady {
123        /// The content hash of the newly available entry content
124        hash: Hash,
125    },
126    /// We have a new neighbor in the swarm.
127    NeighborUp(PublicKey),
128    /// We lost a neighbor in the swarm.
129    NeighborDown(PublicKey),
130    /// A set-reconciliation sync finished.
131    SyncFinished(SyncEvent),
132    /// All pending content is now ready.
133    ///
134    /// This event is only emitted after a sync completed and `Self::SyncFinished` was emitted at
135    /// least once. It signals that all currently pending downloads have been completed.
136    ///
137    /// Receiving this event does not guarantee that all content in the document is available. If
138    /// blobs failed to download, this event will still be emitted after all operations completed.
139    PendingContentReady,
140}
141
142type SyncConnectRes = (
143    NamespaceId,
144    PublicKey,
145    SyncReason,
146    Result<SyncFinished, ConnectError>,
147);
148type SyncAcceptRes = Result<SyncFinished, AcceptError>;
149type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
150
151// Currently peers might double-sync in both directions.
152pub struct LiveActor {
153    /// Receiver for actor messages.
154    inbox: mpsc::Receiver<ToLiveActor>,
155    sync: SyncHandle,
156    endpoint: Endpoint,
157    bao_store: Store,
158    downloader: Downloader,
159    static_provider: StaticProvider,
160    replica_events_tx: async_channel::Sender<crate::Event>,
161    replica_events_rx: async_channel::Receiver<crate::Event>,
162
163    /// Send messages to self.
164    /// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
165    /// Only clone into newly spawned tasks.
166    sync_actor_tx: mpsc::Sender<ToLiveActor>,
167    gossip: GossipState,
168
169    /// Running sync futures (from connect).
170    running_sync_connect: JoinSet<SyncConnectRes>,
171    /// Running sync futures (from accept).
172    running_sync_accept: JoinSet<SyncAcceptRes>,
173    /// Running download futures.
174    download_tasks: JoinSet<DownloadRes>,
175    /// Content hashes which are wanted but not yet queued because no provider was found.
176    missing_hashes: HashSet<Hash>,
177    /// Content hashes queued in downloader.
178    queued_hashes: QueuedHashes,
179    /// Nodes known to have a hash
180    hash_providers: ProviderNodes,
181
182    /// Subscribers to actor events
183    subscribers: SubscribersMap,
184
185    /// Sync state per replica and peer
186    state: NamespaceStates,
187    metrics: Arc<Metrics>,
188}
189impl LiveActor {
190    /// Create the live actor.
191    #[allow(clippy::too_many_arguments)]
192    pub fn new(
193        sync: SyncHandle,
194        endpoint: Endpoint,
195        gossip: Gossip,
196        bao_store: Store,
197        downloader: Downloader,
198        inbox: mpsc::Receiver<ToLiveActor>,
199        sync_actor_tx: mpsc::Sender<ToLiveActor>,
200        metrics: Arc<Metrics>,
201    ) -> Self {
202        let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
203        let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
204        let static_provider = StaticProvider::new();
205        endpoint.discovery().add(static_provider.clone());
206        Self {
207            inbox,
208            sync,
209            replica_events_rx,
210            replica_events_tx,
211            endpoint,
212            static_provider,
213            gossip: gossip_state,
214            bao_store,
215            downloader,
216            sync_actor_tx,
217            running_sync_connect: Default::default(),
218            running_sync_accept: Default::default(),
219            subscribers: Default::default(),
220            download_tasks: Default::default(),
221            state: Default::default(),
222            missing_hashes: Default::default(),
223            queued_hashes: Default::default(),
224            hash_providers: Default::default(),
225            metrics,
226        }
227    }
228
229    /// Run the actor loop.
230    pub async fn run(mut self) -> Result<()> {
231        let shutdown_reply = self.run_inner().await;
232        if let Err(err) = self.shutdown().await {
233            error!(?err, "Error during shutdown");
234        }
235        drop(self);
236        match shutdown_reply {
237            Ok(reply) => {
238                reply.send(()).ok();
239                Ok(())
240            }
241            Err(err) => Err(err),
242        }
243    }
244
245    async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
246        let mut i = 0;
247        loop {
248            i += 1;
249            trace!(?i, "tick wait");
250            self.metrics.doc_live_tick_main.inc();
251            tokio::select! {
252                biased;
253                msg = self.inbox.recv() => {
254                    let msg = msg.context("to_actor closed")?;
255                    trace!(?i, %msg, "tick: to_actor");
256                    self.metrics.doc_live_tick_actor.inc();
257                    match msg {
258                        ToLiveActor::Shutdown { reply } => {
259                            break Ok(reply);
260                        }
261                        msg => {
262                            self.on_actor_message(msg).await.context("on_actor_message")?;
263                        }
264                    }
265                }
266                event = self.replica_events_rx.recv() => {
267                    trace!(?i, "tick: replica_event");
268                    self.metrics.doc_live_tick_replica_event.inc();
269                    let event = event.context("replica_events closed")?;
270                    if let Err(err) = self.on_replica_event(event).await {
271                        error!(?err, "Failed to process replica event");
272                    }
273                }
274                Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
275                    trace!(?i, "tick: running_sync_connect");
276                    self.metrics.doc_live_tick_running_sync_connect.inc();
277                    let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
278                    self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
279
280                }
281                Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
282                    trace!(?i, "tick: running_sync_accept");
283                    self.metrics.doc_live_tick_running_sync_accept.inc();
284                    let res = res.context("running_sync_accept closed")?;
285                    self.on_sync_via_accept_finished(res).await;
286                }
287                Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
288                    trace!(?i, "tick: pending_downloads");
289                    self.metrics.doc_live_tick_pending_downloads.inc();
290                    let (namespace, hash, res) = res.context("pending_downloads closed")?;
291                    self.on_download_ready(namespace, hash, res).await;
292                }
293                res = self.gossip.progress(), if !self.gossip.is_empty() => {
294                    if let Err(error) = res {
295                        warn!(?error, "gossip state failed");
296                    }
297                }
298            }
299        }
300    }
301
302    async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
303        match msg {
304            ToLiveActor::Shutdown { .. } => {
305                unreachable!("handled in run");
306            }
307            ToLiveActor::IncomingSyncReport { from, report } => {
308                self.on_sync_report(from, report).await
309            }
310            ToLiveActor::NeighborUp { namespace, peer } => {
311                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
312                self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
313                self.subscribers
314                    .send(&namespace, Event::NeighborUp(peer))
315                    .await;
316            }
317            ToLiveActor::NeighborDown { namespace, peer } => {
318                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
319                self.subscribers
320                    .send(&namespace, Event::NeighborDown(peer))
321                    .await;
322            }
323            ToLiveActor::StartSync {
324                namespace,
325                peers,
326                reply,
327            } => {
328                let res = self.start_sync(namespace, peers).await;
329                reply.send(res).ok();
330            }
331            ToLiveActor::Leave {
332                namespace,
333                kill_subscribers,
334                reply,
335            } => {
336                let res = self.leave(namespace, kill_subscribers).await;
337                reply.send(res).ok();
338            }
339            ToLiveActor::Subscribe {
340                namespace,
341                sender,
342                reply,
343            } => {
344                self.subscribers.subscribe(namespace, sender);
345                reply.send(Ok(())).ok();
346            }
347            ToLiveActor::HandleConnection { conn } => {
348                self.handle_connection(conn).await;
349            }
350            ToLiveActor::AcceptSyncRequest {
351                namespace,
352                peer,
353                reply,
354            } => {
355                let outcome = self.accept_sync_request(namespace, peer);
356                reply.send(outcome).ok();
357            }
358            ToLiveActor::NeighborContentReady {
359                namespace,
360                node,
361                hash,
362            } => {
363                self.on_neighbor_content_ready(namespace, node, hash).await;
364            }
365        };
366        Ok(true)
367    }
368
369    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
370    fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
371        if !self.state.start_connect(&namespace, peer, reason) {
372            return;
373        }
374        let endpoint = self.endpoint.clone();
375        let sync = self.sync.clone();
376        let metrics = self.metrics.clone();
377        let fut = async move {
378            let res = connect_and_sync(
379                &endpoint,
380                &sync,
381                namespace,
382                EndpointAddr::new(peer),
383                Some(&metrics),
384            )
385            .await;
386            (namespace, peer, reason, res)
387        }
388        .instrument(Span::current());
389        self.running_sync_connect.spawn(fut);
390    }
391
392    async fn shutdown(&mut self) -> anyhow::Result<()> {
393        // cancel all subscriptions
394        self.subscribers.clear();
395        let (gossip_shutdown_res, _store) = tokio::join!(
396            // quit the gossip topics and task loops.
397            self.gossip.shutdown(),
398            // shutdown sync thread
399            self.sync.shutdown()
400        );
401        gossip_shutdown_res?;
402        // TODO: abort_all and join_next all JoinSets to catch panics
403        // (they are aborted on drop, but that swallows panics)
404        Ok(())
405    }
406
407    async fn start_sync(
408        &mut self,
409        namespace: NamespaceId,
410        mut peers: Vec<EndpointAddr>,
411    ) -> Result<()> {
412        debug!(?namespace, peers = peers.len(), "start sync");
413        // update state to allow sync
414        if !self.state.is_syncing(&namespace) {
415            let opts = OpenOpts::default()
416                .sync()
417                .subscribe(self.replica_events_tx.clone());
418            self.sync.open(namespace, opts).await?;
419            self.state.insert(namespace);
420        }
421        // add the peers stored for this document
422        match self.sync.get_sync_peers(namespace).await {
423            Ok(None) => {
424                // no peers for this document
425            }
426            Ok(Some(known_useful_peers)) => {
427                let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
428                    // peers are stored as bytes, don't fail the operation if they can't be
429                    // decoded: simply ignore the peer
430                    match PublicKey::from_bytes(&peer_id_bytes) {
431                        Ok(public_key) => Some(EndpointAddr::new(public_key)),
432                        Err(_signing_error) => {
433                            warn!("potential db corruption: peers per doc can't be decoded");
434                            None
435                        }
436                    }
437                });
438                peers.extend(as_node_addr);
439            }
440            Err(e) => {
441                // try to continue if peers per doc can't be read since they are not vital for sync
442                warn!(%e, "db error reading peers per document")
443            }
444        }
445        self.join_peers(namespace, peers).await?;
446        Ok(())
447    }
448
449    async fn leave(
450        &mut self,
451        namespace: NamespaceId,
452        kill_subscribers: bool,
453    ) -> anyhow::Result<()> {
454        // self.subscribers.remove(&namespace);
455        if self.state.remove(&namespace) {
456            self.sync.set_sync(namespace, false).await?;
457            self.sync
458                .unsubscribe(namespace, self.replica_events_tx.clone())
459                .await?;
460            self.sync.close(namespace).await?;
461            self.gossip.quit(&namespace);
462        }
463        if kill_subscribers {
464            self.subscribers.remove(&namespace);
465        }
466        Ok(())
467    }
468
469    async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
470        let mut peer_ids = Vec::new();
471
472        // add addresses of peers to our endpoint address book
473        for peer in peers.into_iter() {
474            let peer_id = peer.id;
475            // adding a node address without any addressing info fails with an error,
476            // but we still want to include those peers because node discovery might find addresses for them
477            if !peer.is_empty() {
478                self.static_provider.add_endpoint_info(peer);
479            }
480            peer_ids.push(peer_id);
481        }
482
483        // tell gossip to join
484        self.gossip.join(namespace, peer_ids.clone()).await?;
485
486        if !peer_ids.is_empty() {
487            // trigger initial sync with initial peers
488            for peer in peer_ids {
489                self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
490            }
491        }
492        Ok(())
493    }
494
495    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
496    async fn on_sync_via_connect_finished(
497        &mut self,
498        namespace: NamespaceId,
499        peer: PublicKey,
500        reason: SyncReason,
501        result: Result<SyncFinished, ConnectError>,
502    ) {
503        match result {
504            Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
505                debug!(?reason, "remote abort, already syncing");
506            }
507            res => {
508                self.on_sync_finished(
509                    namespace,
510                    peer,
511                    Origin::Connect(reason),
512                    res.map_err(Into::into),
513                )
514                .await
515            }
516        }
517    }
518
519    #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
520    async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
521        match res {
522            Ok(state) => {
523                self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
524                    .await
525            }
526            Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
527                // In case we aborted the sync: do nothing (our outgoing sync is in progress)
528                debug!(?reason, "aborted by us");
529            }
530            Err(err) => {
531                if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
532                    self.on_sync_finished(
533                        namespace,
534                        peer,
535                        Origin::Accept,
536                        Err(anyhow::Error::from(err)),
537                    )
538                    .await;
539                } else {
540                    debug!(?err, "failed before reading the first message");
541                }
542            }
543        }
544    }
545
546    async fn on_sync_finished(
547        &mut self,
548        namespace: NamespaceId,
549        peer: PublicKey,
550        origin: Origin,
551        result: Result<SyncFinished>,
552    ) {
553        match &result {
554            Err(ref err) => {
555                warn!(?origin, ?err, "sync failed");
556            }
557            Ok(ref details) => {
558                info!(
559                    sent = %details.outcome.num_sent,
560                    recv = %details.outcome.num_recv,
561                    t_connect = ?details.timings.connect,
562                    t_process = ?details.timings.process,
563                    "sync finished",
564                );
565
566                // register the peer as useful for the document
567                if let Err(e) = self
568                    .sync
569                    .register_useful_peer(namespace, *peer.as_bytes())
570                    .await
571                {
572                    debug!(%e, "failed to register peer for document")
573                }
574
575                // broadcast a sync report to our neighbors, but only if we received new entries.
576                if details.outcome.num_recv > 0 {
577                    info!("broadcast sync report to neighbors");
578                    match details
579                        .outcome
580                        .heads_received
581                        .encode(Some(self.gossip.max_message_size()))
582                    {
583                        Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
584                        Ok(heads) => {
585                            let report = SyncReport { namespace, heads };
586                            self.broadcast_neighbors(namespace, &Op::SyncReport(report))
587                                .await;
588                        }
589                    }
590                }
591            }
592        };
593
594        let result_for_event = match &result {
595            Ok(details) => Ok(details.into()),
596            Err(err) => Err(err.to_string()),
597        };
598
599        let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
600            return;
601        };
602
603        let ev = SyncEvent {
604            peer,
605            origin,
606            result: result_for_event,
607            finished: SystemTime::now(),
608            started,
609        };
610        self.subscribers
611            .send(&namespace, Event::SyncFinished(ev))
612            .await;
613
614        // Check if there are queued pending content hashes for this namespace.
615        // If hashes are pending, mark this namespace to be eglible for a PendingContentReady event once all
616        // pending hashes have completed downloading.
617        // If no hashes are pending, emit the PendingContentReady event right away. The next
618        // PendingContentReady event may then only be emitted after the next sync completes.
619        if self.queued_hashes.contains_namespace(&namespace) {
620            self.state.set_may_emit_ready(&namespace, true);
621        } else {
622            self.subscribers
623                .send(&namespace, Event::PendingContentReady)
624                .await;
625            self.state.set_may_emit_ready(&namespace, false);
626        }
627
628        if resync {
629            self.sync_with_peer(namespace, peer, SyncReason::Resync);
630        }
631    }
632
633    async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
634        if !self.state.is_syncing(&namespace) {
635            return;
636        }
637
638        let msg = match postcard::to_stdvec(op) {
639            Ok(msg) => msg,
640            Err(err) => {
641                error!(?err, ?op, "Failed to serialize message:");
642                return;
643            }
644        };
645        // TODO: We should debounce and merge these neighbor announcements likely.
646        self.gossip
647            .broadcast_neighbors(&namespace, msg.into())
648            .await;
649    }
650
651    async fn on_download_ready(
652        &mut self,
653        namespace: NamespaceId,
654        hash: Hash,
655        res: Result<(), anyhow::Error>,
656    ) {
657        let completed_namespaces = self.queued_hashes.remove_hash(&hash);
658        debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
659        if res.is_ok() {
660            self.subscribers
661                .send(&namespace, Event::ContentReady { hash })
662                .await;
663            // Inform our neighbors that we have new content ready.
664            self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
665                .await;
666        } else {
667            self.missing_hashes.insert(hash);
668        }
669        for namespace in completed_namespaces.iter() {
670            if let Some(true) = self.state.may_emit_ready(namespace) {
671                self.subscribers
672                    .send(namespace, Event::PendingContentReady)
673                    .await;
674            }
675        }
676    }
677
678    async fn on_neighbor_content_ready(
679        &mut self,
680        namespace: NamespaceId,
681        node: EndpointId,
682        hash: Hash,
683    ) {
684        self.start_download(namespace, hash, node, true).await;
685    }
686
687    #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
688    async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
689        let namespace = report.namespace;
690        if !self.state.is_syncing(&namespace) {
691            return;
692        }
693        let heads = match AuthorHeads::decode(&report.heads) {
694            Ok(heads) => heads,
695            Err(err) => {
696                warn!(?err, "failed to decode AuthorHeads");
697                return;
698            }
699        };
700        match self.sync.has_news_for_us(report.namespace, heads).await {
701            Ok(Some(updated_authors)) => {
702                info!(%updated_authors, "news reported: sync now");
703                self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
704            }
705            Ok(None) => {
706                debug!("no news reported: nothing to do");
707            }
708            Err(err) => {
709                warn!("sync actor error: {err:?}");
710            }
711        }
712    }
713
714    async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
715        match event {
716            crate::Event::LocalInsert { namespace, entry } => {
717                debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
718                // A new entry was inserted locally. Broadcast a gossip message.
719                if self.state.is_syncing(&namespace) {
720                    let op = Op::Put(entry.clone());
721                    let message = postcard::to_stdvec(&op)?.into();
722                    self.gossip.broadcast(&namespace, message).await;
723                }
724            }
725            crate::Event::RemoteInsert {
726                namespace,
727                entry,
728                from,
729                should_download,
730                remote_content_status,
731            } => {
732                debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
733                // A new entry was inserted from initial sync or gossip. Queue downloading the
734                // content.
735                if should_download {
736                    let hash = entry.content_hash();
737                    if matches!(remote_content_status, ContentStatus::Complete) {
738                        let node_id = PublicKey::from_bytes(&from)?;
739                        self.start_download(namespace, hash, node_id, false).await;
740                    } else {
741                        self.missing_hashes.insert(hash);
742                    }
743                }
744            }
745        }
746
747        Ok(())
748    }
749
750    async fn start_download(
751        &mut self,
752        namespace: NamespaceId,
753        hash: Hash,
754        node: PublicKey,
755        only_if_missing: bool,
756    ) {
757        let entry_status = self.bao_store.blobs().status(hash).await;
758        if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
759            self.missing_hashes.remove(&hash);
760            return;
761        }
762        self.hash_providers
763            .0
764            .lock()
765            .expect("poisoned")
766            .entry(hash)
767            .or_default()
768            .insert(node);
769        if self.queued_hashes.contains_hash(&hash) {
770            self.queued_hashes.insert(hash, namespace);
771        } else if !only_if_missing || self.missing_hashes.contains(&hash) {
772            let req = DownloadRequest::new(
773                HashAndFormat::raw(hash),
774                self.hash_providers.clone(),
775                SplitStrategy::None,
776            );
777            let handle = self.downloader.download_with_opts(req);
778
779            self.queued_hashes.insert(hash, namespace);
780            self.missing_hashes.remove(&hash);
781            self.download_tasks
782                .spawn(async move { (namespace, hash, handle.await) });
783        }
784    }
785
786    #[instrument("accept", skip_all)]
787    pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
788        let to_actor_tx = self.sync_actor_tx.clone();
789        let accept_request_cb = move |namespace, peer| {
790            let to_actor_tx = to_actor_tx.clone();
791            async move {
792                let (reply_tx, reply_rx) = oneshot::channel();
793                to_actor_tx
794                    .send(ToLiveActor::AcceptSyncRequest {
795                        namespace,
796                        peer,
797                        reply: reply_tx,
798                    })
799                    .await
800                    .ok();
801                match reply_rx.await {
802                    Ok(outcome) => outcome,
803                    Err(err) => {
804                        warn!(
805                            "accept request callback failed to retrieve reply from actor: {err:?}"
806                        );
807                        AcceptOutcome::Reject(AbortReason::InternalServerError)
808                    }
809                }
810            }
811            .boxed()
812        };
813        debug!("incoming connection");
814        let sync = self.sync.clone();
815        let metrics = self.metrics.clone();
816        self.running_sync_accept.spawn(
817            async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
818                .instrument(Span::current()),
819        );
820    }
821
822    pub fn accept_sync_request(
823        &mut self,
824        namespace: NamespaceId,
825        peer: PublicKey,
826    ) -> AcceptOutcome {
827        self.state
828            .accept_request(&self.endpoint.id(), &namespace, peer)
829    }
830}
831
832/// Event emitted when a sync operation completes
833#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
834pub struct SyncEvent {
835    /// Peer we synced with
836    pub peer: PublicKey,
837    /// Origin of the sync exchange
838    pub origin: Origin,
839    /// Timestamp when the sync started
840    pub finished: SystemTime,
841    /// Timestamp when the sync finished
842    pub started: SystemTime,
843    /// Result of the sync operation
844    pub result: std::result::Result<SyncDetails, String>,
845}
846
847#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
848pub struct SyncDetails {
849    /// Number of entries received
850    pub entries_received: usize,
851    /// Number of entries sent
852    pub entries_sent: usize,
853}
854
855impl From<&SyncFinished> for SyncDetails {
856    fn from(value: &SyncFinished) -> Self {
857        Self {
858            entries_received: value.outcome.num_recv,
859            entries_sent: value.outcome.num_sent,
860        }
861    }
862}
863
864#[derive(Debug, Default)]
865struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
866
867impl SubscribersMap {
868    fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
869        self.0.entry(namespace).or_default().subscribe(sender);
870    }
871
872    async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
873        debug!(namespace=%namespace.fmt_short(), %event, "emit event");
874        let Some(subscribers) = self.0.get_mut(namespace) else {
875            return false;
876        };
877
878        if !subscribers.send(event).await {
879            self.0.remove(namespace);
880        }
881        true
882    }
883
884    fn remove(&mut self, namespace: &NamespaceId) {
885        self.0.remove(namespace);
886    }
887
888    fn clear(&mut self) {
889        self.0.clear();
890    }
891}
892
893#[derive(Debug, Default)]
894struct QueuedHashes {
895    by_hash: HashMap<Hash, HashSet<NamespaceId>>,
896    by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
897}
898
899#[derive(Debug, Clone, Default)]
900struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<EndpointId>>>>);
901
902impl ContentDiscovery for ProviderNodes {
903    fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<EndpointId> {
904        let nodes = self
905            .0
906            .lock()
907            .expect("poisoned")
908            .get(&hash.hash)
909            .into_iter()
910            .flatten()
911            .cloned()
912            .collect::<Vec<_>>();
913        Box::pin(n0_future::stream::iter(nodes))
914    }
915}
916
917impl QueuedHashes {
918    fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
919        self.by_hash.entry(hash).or_default().insert(namespace);
920        self.by_namespace.entry(namespace).or_default().insert(hash);
921    }
922
923    /// Remove a hash from the set of queued hashes.
924    ///
925    /// Returns a list of namespaces that are now complete (have no queued hashes anymore).
926    fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
927        let namespaces = self.by_hash.remove(hash).unwrap_or_default();
928        let mut removed_namespaces = vec![];
929        for namespace in namespaces {
930            if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
931                hashes.remove(hash);
932                if hashes.is_empty() {
933                    self.by_namespace.remove(&namespace);
934                    removed_namespaces.push(namespace);
935                }
936            }
937        }
938        removed_namespaces
939    }
940
941    fn contains_hash(&self, hash: &Hash) -> bool {
942        self.by_hash.contains_key(hash)
943    }
944
945    fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
946        self.by_namespace.contains_key(namespace)
947    }
948}
949
950#[derive(Debug, Default)]
951struct Subscribers(Vec<async_channel::Sender<Event>>);
952
953impl Subscribers {
954    fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
955        self.0.push(sender)
956    }
957
958    async fn send(&mut self, event: Event) -> bool {
959        let futs = self.0.iter().map(|sender| sender.send(event.clone()));
960        let res = futures_buffered::join_all(futs).await;
961        // reverse the order so removing does not shift remaining indices
962        for (i, res) in res.into_iter().enumerate().rev() {
963            if res.is_err() {
964                self.0.remove(i);
965            }
966        }
967        !self.0.is_empty()
968    }
969}
970
971fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
972    match res {
973        Ok(res) => res.peer.fmt_short().to_string(),
974        Err(err) => err
975            .peer()
976            .map(|x| x.fmt_short().to_string())
977            .unwrap_or_else(|| "unknown".to_string()),
978    }
979}
980
981fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
982    match res {
983        Ok(res) => res.namespace.fmt_short(),
984        Err(err) => err
985            .namespace()
986            .map(|x| x.fmt_short())
987            .unwrap_or_else(|| "unknown".to_string()),
988    }
989}
990
991#[cfg(test)]
992mod tests {
993    use super::*;
994
995    #[tokio::test]
996    async fn test_sync_remove() {
997        let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
998        let (a_tx, a_rx) = async_channel::unbounded();
999        let (b_tx, b_rx) = async_channel::unbounded();
1000        let mut subscribers = Subscribers::default();
1001        subscribers.subscribe(a_tx);
1002        subscribers.subscribe(b_tx);
1003        drop(a_rx);
1004        drop(b_rx);
1005        subscribers.send(Event::NeighborUp(pk)).await;
1006    }
1007}