iroh_docs/engine/
live.rs

1#![allow(missing_docs)]
2
3use std::{
4    collections::{HashMap, HashSet},
5    sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use futures_lite::FutureExt;
10use iroh::{
11    discovery::static_provider::StaticProvider, Endpoint, EndpointAddr, EndpointId, PublicKey,
12};
13use iroh_blobs::{
14    api::{
15        blobs::BlobStatus,
16        downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
17        Store,
18    },
19    Hash, HashAndFormat,
20};
21use iroh_gossip::net::Gossip;
22use n0_future::{task::JoinSet, time::SystemTime};
23use serde::{Deserialize, Serialize};
24use tokio::sync::{self, mpsc, oneshot};
25use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
26
27// use super::gossip::{GossipActor, ToGossipActor};
28use super::state::{NamespaceStates, Origin, SyncReason};
29use crate::{
30    actor::{OpenOpts, SyncHandle},
31    engine::gossip::GossipState,
32    metrics::Metrics,
33    net::{
34        connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
35        SyncFinished,
36    },
37    AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
38};
39
40/// An iroh-docs operation
41///
42/// This is the message that is broadcast over iroh-gossip.
43#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
44pub enum Op {
45    /// A new entry was inserted into the document.
46    Put(SignedEntry),
47    /// A peer now has content available for a hash.
48    ContentReady(Hash),
49    /// We synced with another peer, here's the news.
50    SyncReport(SyncReport),
51}
52
53/// Report of a successful sync with the new heads.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct SyncReport {
56    namespace: NamespaceId,
57    /// Encoded [`AuthorHeads`]
58    heads: Vec<u8>,
59}
60
61/// Messages to the sync actor
62#[derive(derive_more::Debug, strum::Display)]
63pub enum ToLiveActor {
64    StartSync {
65        namespace: NamespaceId,
66        peers: Vec<EndpointAddr>,
67        #[debug("onsehot::Sender")]
68        reply: sync::oneshot::Sender<anyhow::Result<()>>,
69    },
70    Leave {
71        namespace: NamespaceId,
72        kill_subscribers: bool,
73        #[debug("onsehot::Sender")]
74        reply: sync::oneshot::Sender<anyhow::Result<()>>,
75    },
76    Shutdown {
77        reply: sync::oneshot::Sender<()>,
78    },
79    Subscribe {
80        namespace: NamespaceId,
81        #[debug("sender")]
82        sender: async_channel::Sender<Event>,
83        #[debug("oneshot::Sender")]
84        reply: sync::oneshot::Sender<Result<()>>,
85    },
86    HandleConnection {
87        conn: iroh::endpoint::Connection,
88    },
89    AcceptSyncRequest {
90        namespace: NamespaceId,
91        peer: PublicKey,
92        #[debug("oneshot::Sender")]
93        reply: sync::oneshot::Sender<AcceptOutcome>,
94    },
95
96    IncomingSyncReport {
97        from: PublicKey,
98        report: SyncReport,
99    },
100    NeighborContentReady {
101        namespace: NamespaceId,
102        node: PublicKey,
103        hash: Hash,
104    },
105    NeighborUp {
106        namespace: NamespaceId,
107        peer: PublicKey,
108    },
109    NeighborDown {
110        namespace: NamespaceId,
111        peer: PublicKey,
112    },
113}
114
115/// Events informing about actions of the live sync progress.
116#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
117pub enum Event {
118    /// The content of an entry was downloaded and is now available at the local node
119    ContentReady {
120        /// The content hash of the newly available entry content
121        hash: Hash,
122    },
123    /// We have a new neighbor in the swarm.
124    NeighborUp(PublicKey),
125    /// We lost a neighbor in the swarm.
126    NeighborDown(PublicKey),
127    /// A set-reconciliation sync finished.
128    SyncFinished(SyncEvent),
129    /// All pending content is now ready.
130    ///
131    /// This event is only emitted after a sync completed and `Self::SyncFinished` was emitted at
132    /// least once. It signals that all currently pending downloads have been completed.
133    ///
134    /// Receiving this event does not guarantee that all content in the document is available. If
135    /// blobs failed to download, this event will still be emitted after all operations completed.
136    PendingContentReady,
137}
138
139type SyncConnectRes = (
140    NamespaceId,
141    PublicKey,
142    SyncReason,
143    Result<SyncFinished, ConnectError>,
144);
145type SyncAcceptRes = Result<SyncFinished, AcceptError>;
146type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
147
148// Currently peers might double-sync in both directions.
149pub struct LiveActor {
150    /// Receiver for actor messages.
151    inbox: mpsc::Receiver<ToLiveActor>,
152    sync: SyncHandle,
153    endpoint: Endpoint,
154    bao_store: Store,
155    downloader: Downloader,
156    static_provider: StaticProvider,
157    replica_events_tx: async_channel::Sender<crate::Event>,
158    replica_events_rx: async_channel::Receiver<crate::Event>,
159
160    /// Send messages to self.
161    /// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
162    /// Only clone into newly spawned tasks.
163    sync_actor_tx: mpsc::Sender<ToLiveActor>,
164    gossip: GossipState,
165
166    /// Running sync futures (from connect).
167    running_sync_connect: JoinSet<SyncConnectRes>,
168    /// Running sync futures (from accept).
169    running_sync_accept: JoinSet<SyncAcceptRes>,
170    /// Running download futures.
171    download_tasks: JoinSet<DownloadRes>,
172    /// Content hashes which are wanted but not yet queued because no provider was found.
173    missing_hashes: HashSet<Hash>,
174    /// Content hashes queued in downloader.
175    queued_hashes: QueuedHashes,
176    /// Nodes known to have a hash
177    hash_providers: ProviderNodes,
178
179    /// Subscribers to actor events
180    subscribers: SubscribersMap,
181
182    /// Sync state per replica and peer
183    state: NamespaceStates,
184    metrics: Arc<Metrics>,
185}
186impl LiveActor {
187    /// Create the live actor.
188    #[allow(clippy::too_many_arguments)]
189    pub fn new(
190        sync: SyncHandle,
191        endpoint: Endpoint,
192        gossip: Gossip,
193        bao_store: Store,
194        downloader: Downloader,
195        inbox: mpsc::Receiver<ToLiveActor>,
196        sync_actor_tx: mpsc::Sender<ToLiveActor>,
197        metrics: Arc<Metrics>,
198    ) -> Self {
199        let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
200        let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
201        let static_provider = StaticProvider::new();
202        endpoint.discovery().add(static_provider.clone());
203        Self {
204            inbox,
205            sync,
206            replica_events_rx,
207            replica_events_tx,
208            endpoint,
209            static_provider,
210            gossip: gossip_state,
211            bao_store,
212            downloader,
213            sync_actor_tx,
214            running_sync_connect: Default::default(),
215            running_sync_accept: Default::default(),
216            subscribers: Default::default(),
217            download_tasks: Default::default(),
218            state: Default::default(),
219            missing_hashes: Default::default(),
220            queued_hashes: Default::default(),
221            hash_providers: Default::default(),
222            metrics,
223        }
224    }
225
226    /// Run the actor loop.
227    pub async fn run(mut self) -> Result<()> {
228        let shutdown_reply = self.run_inner().await;
229        if let Err(err) = self.shutdown().await {
230            error!(?err, "Error during shutdown");
231        }
232        drop(self);
233        match shutdown_reply {
234            Ok(reply) => {
235                reply.send(()).ok();
236                Ok(())
237            }
238            Err(err) => Err(err),
239        }
240    }
241
242    async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
243        let mut i = 0;
244        loop {
245            i += 1;
246            trace!(?i, "tick wait");
247            self.metrics.doc_live_tick_main.inc();
248            tokio::select! {
249                biased;
250                msg = self.inbox.recv() => {
251                    let msg = msg.context("to_actor closed")?;
252                    trace!(?i, %msg, "tick: to_actor");
253                    self.metrics.doc_live_tick_actor.inc();
254                    match msg {
255                        ToLiveActor::Shutdown { reply } => {
256                            break Ok(reply);
257                        }
258                        msg => {
259                            self.on_actor_message(msg).await.context("on_actor_message")?;
260                        }
261                    }
262                }
263                event = self.replica_events_rx.recv() => {
264                    trace!(?i, "tick: replica_event");
265                    self.metrics.doc_live_tick_replica_event.inc();
266                    let event = event.context("replica_events closed")?;
267                    if let Err(err) = self.on_replica_event(event).await {
268                        error!(?err, "Failed to process replica event");
269                    }
270                }
271                Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
272                    trace!(?i, "tick: running_sync_connect");
273                    self.metrics.doc_live_tick_running_sync_connect.inc();
274                    let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
275                    self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
276
277                }
278                Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
279                    trace!(?i, "tick: running_sync_accept");
280                    self.metrics.doc_live_tick_running_sync_accept.inc();
281                    let res = res.context("running_sync_accept closed")?;
282                    self.on_sync_via_accept_finished(res).await;
283                }
284                Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
285                    trace!(?i, "tick: pending_downloads");
286                    self.metrics.doc_live_tick_pending_downloads.inc();
287                    let (namespace, hash, res) = res.context("pending_downloads closed")?;
288                    self.on_download_ready(namespace, hash, res).await;
289                }
290                res = self.gossip.progress(), if !self.gossip.is_empty() => {
291                    if let Err(error) = res {
292                        warn!(?error, "gossip state failed");
293                    }
294                }
295            }
296        }
297    }
298
299    async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
300        match msg {
301            ToLiveActor::Shutdown { .. } => {
302                unreachable!("handled in run");
303            }
304            ToLiveActor::IncomingSyncReport { from, report } => {
305                self.on_sync_report(from, report).await
306            }
307            ToLiveActor::NeighborUp { namespace, peer } => {
308                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
309                self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
310                self.subscribers
311                    .send(&namespace, Event::NeighborUp(peer))
312                    .await;
313            }
314            ToLiveActor::NeighborDown { namespace, peer } => {
315                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
316                self.subscribers
317                    .send(&namespace, Event::NeighborDown(peer))
318                    .await;
319            }
320            ToLiveActor::StartSync {
321                namespace,
322                peers,
323                reply,
324            } => {
325                let res = self.start_sync(namespace, peers).await;
326                reply.send(res).ok();
327            }
328            ToLiveActor::Leave {
329                namespace,
330                kill_subscribers,
331                reply,
332            } => {
333                let res = self.leave(namespace, kill_subscribers).await;
334                reply.send(res).ok();
335            }
336            ToLiveActor::Subscribe {
337                namespace,
338                sender,
339                reply,
340            } => {
341                self.subscribers.subscribe(namespace, sender);
342                reply.send(Ok(())).ok();
343            }
344            ToLiveActor::HandleConnection { conn } => {
345                self.handle_connection(conn).await;
346            }
347            ToLiveActor::AcceptSyncRequest {
348                namespace,
349                peer,
350                reply,
351            } => {
352                let outcome = self.accept_sync_request(namespace, peer);
353                reply.send(outcome).ok();
354            }
355            ToLiveActor::NeighborContentReady {
356                namespace,
357                node,
358                hash,
359            } => {
360                self.on_neighbor_content_ready(namespace, node, hash).await;
361            }
362        };
363        Ok(true)
364    }
365
366    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
367    fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
368        if !self.state.start_connect(&namespace, peer, reason) {
369            return;
370        }
371        let endpoint = self.endpoint.clone();
372        let sync = self.sync.clone();
373        let metrics = self.metrics.clone();
374        let fut = async move {
375            let res = connect_and_sync(
376                &endpoint,
377                &sync,
378                namespace,
379                EndpointAddr::new(peer),
380                Some(&metrics),
381            )
382            .await;
383            (namespace, peer, reason, res)
384        }
385        .instrument(Span::current());
386        self.running_sync_connect.spawn(fut);
387    }
388
389    async fn shutdown(&mut self) -> anyhow::Result<()> {
390        // cancel all subscriptions
391        self.subscribers.clear();
392        let (gossip_shutdown_res, _store) = tokio::join!(
393            // quit the gossip topics and task loops.
394            self.gossip.shutdown(),
395            // shutdown sync thread
396            self.sync.shutdown()
397        );
398        gossip_shutdown_res?;
399        // TODO: abort_all and join_next all JoinSets to catch panics
400        // (they are aborted on drop, but that swallows panics)
401        Ok(())
402    }
403
404    async fn start_sync(
405        &mut self,
406        namespace: NamespaceId,
407        mut peers: Vec<EndpointAddr>,
408    ) -> Result<()> {
409        debug!(?namespace, peers = peers.len(), "start sync");
410        // update state to allow sync
411        if !self.state.is_syncing(&namespace) {
412            let opts = OpenOpts::default()
413                .sync()
414                .subscribe(self.replica_events_tx.clone());
415            self.sync.open(namespace, opts).await?;
416            self.state.insert(namespace);
417        }
418        // add the peers stored for this document
419        match self.sync.get_sync_peers(namespace).await {
420            Ok(None) => {
421                // no peers for this document
422            }
423            Ok(Some(known_useful_peers)) => {
424                let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
425                    // peers are stored as bytes, don't fail the operation if they can't be
426                    // decoded: simply ignore the peer
427                    match PublicKey::from_bytes(&peer_id_bytes) {
428                        Ok(public_key) => Some(EndpointAddr::new(public_key)),
429                        Err(_signing_error) => {
430                            warn!("potential db corruption: peers per doc can't be decoded");
431                            None
432                        }
433                    }
434                });
435                peers.extend(as_node_addr);
436            }
437            Err(e) => {
438                // try to continue if peers per doc can't be read since they are not vital for sync
439                warn!(%e, "db error reading peers per document")
440            }
441        }
442        self.join_peers(namespace, peers).await?;
443        Ok(())
444    }
445
446    async fn leave(
447        &mut self,
448        namespace: NamespaceId,
449        kill_subscribers: bool,
450    ) -> anyhow::Result<()> {
451        // self.subscribers.remove(&namespace);
452        if self.state.remove(&namespace) {
453            self.sync.set_sync(namespace, false).await?;
454            self.sync
455                .unsubscribe(namespace, self.replica_events_tx.clone())
456                .await?;
457            self.sync.close(namespace).await?;
458            self.gossip.quit(&namespace);
459        }
460        if kill_subscribers {
461            self.subscribers.remove(&namespace);
462        }
463        Ok(())
464    }
465
466    async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
467        let mut peer_ids = Vec::new();
468
469        // add addresses of peers to our endpoint address book
470        for peer in peers.into_iter() {
471            let peer_id = peer.id;
472            // adding a node address without any addressing info fails with an error,
473            // but we still want to include those peers because node discovery might find addresses for them
474            if !peer.is_empty() {
475                self.static_provider.add_endpoint_info(peer);
476            }
477            peer_ids.push(peer_id);
478        }
479
480        // tell gossip to join
481        self.gossip.join(namespace, peer_ids.clone()).await?;
482
483        if !peer_ids.is_empty() {
484            // trigger initial sync with initial peers
485            for peer in peer_ids {
486                self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
487            }
488        }
489        Ok(())
490    }
491
492    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
493    async fn on_sync_via_connect_finished(
494        &mut self,
495        namespace: NamespaceId,
496        peer: PublicKey,
497        reason: SyncReason,
498        result: Result<SyncFinished, ConnectError>,
499    ) {
500        match result {
501            Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
502                debug!(?reason, "remote abort, already syncing");
503            }
504            res => {
505                self.on_sync_finished(
506                    namespace,
507                    peer,
508                    Origin::Connect(reason),
509                    res.map_err(Into::into),
510                )
511                .await
512            }
513        }
514    }
515
516    #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
517    async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
518        match res {
519            Ok(state) => {
520                self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
521                    .await
522            }
523            Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
524                // In case we aborted the sync: do nothing (our outgoing sync is in progress)
525                debug!(?reason, "aborted by us");
526            }
527            Err(err) => {
528                if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
529                    self.on_sync_finished(
530                        namespace,
531                        peer,
532                        Origin::Accept,
533                        Err(anyhow::Error::from(err)),
534                    )
535                    .await;
536                } else {
537                    debug!(?err, "failed before reading the first message");
538                }
539            }
540        }
541    }
542
543    async fn on_sync_finished(
544        &mut self,
545        namespace: NamespaceId,
546        peer: PublicKey,
547        origin: Origin,
548        result: Result<SyncFinished>,
549    ) {
550        match &result {
551            Err(ref err) => {
552                warn!(?origin, ?err, "sync failed");
553            }
554            Ok(ref details) => {
555                info!(
556                    sent = %details.outcome.num_sent,
557                    recv = %details.outcome.num_recv,
558                    t_connect = ?details.timings.connect,
559                    t_process = ?details.timings.process,
560                    "sync finished",
561                );
562
563                // register the peer as useful for the document
564                if let Err(e) = self
565                    .sync
566                    .register_useful_peer(namespace, *peer.as_bytes())
567                    .await
568                {
569                    debug!(%e, "failed to register peer for document")
570                }
571
572                // broadcast a sync report to our neighbors, but only if we received new entries.
573                if details.outcome.num_recv > 0 {
574                    info!("broadcast sync report to neighbors");
575                    match details
576                        .outcome
577                        .heads_received
578                        .encode(Some(self.gossip.max_message_size()))
579                    {
580                        Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
581                        Ok(heads) => {
582                            let report = SyncReport { namespace, heads };
583                            self.broadcast_neighbors(namespace, &Op::SyncReport(report))
584                                .await;
585                        }
586                    }
587                }
588            }
589        };
590
591        let result_for_event = match &result {
592            Ok(details) => Ok(details.into()),
593            Err(err) => Err(err.to_string()),
594        };
595
596        let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
597            return;
598        };
599
600        let ev = SyncEvent {
601            peer,
602            origin,
603            result: result_for_event,
604            finished: SystemTime::now(),
605            started,
606        };
607        self.subscribers
608            .send(&namespace, Event::SyncFinished(ev))
609            .await;
610
611        // Check if there are queued pending content hashes for this namespace.
612        // If hashes are pending, mark this namespace to be eglible for a PendingContentReady event once all
613        // pending hashes have completed downloading.
614        // If no hashes are pending, emit the PendingContentReady event right away. The next
615        // PendingContentReady event may then only be emitted after the next sync completes.
616        if self.queued_hashes.contains_namespace(&namespace) {
617            self.state.set_may_emit_ready(&namespace, true);
618        } else {
619            self.subscribers
620                .send(&namespace, Event::PendingContentReady)
621                .await;
622            self.state.set_may_emit_ready(&namespace, false);
623        }
624
625        if resync {
626            self.sync_with_peer(namespace, peer, SyncReason::Resync);
627        }
628    }
629
630    async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
631        if !self.state.is_syncing(&namespace) {
632            return;
633        }
634
635        let msg = match postcard::to_stdvec(op) {
636            Ok(msg) => msg,
637            Err(err) => {
638                error!(?err, ?op, "Failed to serialize message:");
639                return;
640            }
641        };
642        // TODO: We should debounce and merge these neighbor announcements likely.
643        self.gossip
644            .broadcast_neighbors(&namespace, msg.into())
645            .await;
646    }
647
648    async fn on_download_ready(
649        &mut self,
650        namespace: NamespaceId,
651        hash: Hash,
652        res: Result<(), anyhow::Error>,
653    ) {
654        let completed_namespaces = self.queued_hashes.remove_hash(&hash);
655        debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
656        if res.is_ok() {
657            self.subscribers
658                .send(&namespace, Event::ContentReady { hash })
659                .await;
660            // Inform our neighbors that we have new content ready.
661            self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
662                .await;
663        } else {
664            self.missing_hashes.insert(hash);
665        }
666        for namespace in completed_namespaces.iter() {
667            if let Some(true) = self.state.may_emit_ready(namespace) {
668                self.subscribers
669                    .send(namespace, Event::PendingContentReady)
670                    .await;
671            }
672        }
673    }
674
675    async fn on_neighbor_content_ready(
676        &mut self,
677        namespace: NamespaceId,
678        node: EndpointId,
679        hash: Hash,
680    ) {
681        self.start_download(namespace, hash, node, true).await;
682    }
683
684    #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
685    async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
686        let namespace = report.namespace;
687        if !self.state.is_syncing(&namespace) {
688            return;
689        }
690        let heads = match AuthorHeads::decode(&report.heads) {
691            Ok(heads) => heads,
692            Err(err) => {
693                warn!(?err, "failed to decode AuthorHeads");
694                return;
695            }
696        };
697        match self.sync.has_news_for_us(report.namespace, heads).await {
698            Ok(Some(updated_authors)) => {
699                info!(%updated_authors, "news reported: sync now");
700                self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
701            }
702            Ok(None) => {
703                debug!("no news reported: nothing to do");
704            }
705            Err(err) => {
706                warn!("sync actor error: {err:?}");
707            }
708        }
709    }
710
711    async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
712        match event {
713            crate::Event::LocalInsert { namespace, entry } => {
714                debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
715                // A new entry was inserted locally. Broadcast a gossip message.
716                if self.state.is_syncing(&namespace) {
717                    let op = Op::Put(entry.clone());
718                    let message = postcard::to_stdvec(&op)?.into();
719                    self.gossip.broadcast(&namespace, message).await;
720                }
721            }
722            crate::Event::RemoteInsert {
723                namespace,
724                entry,
725                from,
726                should_download,
727                remote_content_status,
728            } => {
729                debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
730                // A new entry was inserted from initial sync or gossip. Queue downloading the
731                // content.
732                if should_download {
733                    let hash = entry.content_hash();
734                    if matches!(remote_content_status, ContentStatus::Complete) {
735                        let node_id = PublicKey::from_bytes(&from)?;
736                        self.start_download(namespace, hash, node_id, false).await;
737                    } else {
738                        self.missing_hashes.insert(hash);
739                    }
740                }
741            }
742        }
743
744        Ok(())
745    }
746
747    async fn start_download(
748        &mut self,
749        namespace: NamespaceId,
750        hash: Hash,
751        node: PublicKey,
752        only_if_missing: bool,
753    ) {
754        let entry_status = self.bao_store.blobs().status(hash).await;
755        if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
756            self.missing_hashes.remove(&hash);
757            return;
758        }
759        self.hash_providers
760            .0
761            .lock()
762            .expect("poisoned")
763            .entry(hash)
764            .or_default()
765            .insert(node);
766        if self.queued_hashes.contains_hash(&hash) {
767            self.queued_hashes.insert(hash, namespace);
768        } else if !only_if_missing || self.missing_hashes.contains(&hash) {
769            let req = DownloadRequest::new(
770                HashAndFormat::raw(hash),
771                self.hash_providers.clone(),
772                SplitStrategy::None,
773            );
774            let handle = self.downloader.download_with_opts(req);
775
776            self.queued_hashes.insert(hash, namespace);
777            self.missing_hashes.remove(&hash);
778            self.download_tasks
779                .spawn(async move { (namespace, hash, handle.await) });
780        }
781    }
782
783    #[instrument("accept", skip_all)]
784    pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
785        let to_actor_tx = self.sync_actor_tx.clone();
786        let accept_request_cb = move |namespace, peer| {
787            let to_actor_tx = to_actor_tx.clone();
788            async move {
789                let (reply_tx, reply_rx) = oneshot::channel();
790                to_actor_tx
791                    .send(ToLiveActor::AcceptSyncRequest {
792                        namespace,
793                        peer,
794                        reply: reply_tx,
795                    })
796                    .await
797                    .ok();
798                match reply_rx.await {
799                    Ok(outcome) => outcome,
800                    Err(err) => {
801                        warn!(
802                            "accept request callback failed to retrieve reply from actor: {err:?}"
803                        );
804                        AcceptOutcome::Reject(AbortReason::InternalServerError)
805                    }
806                }
807            }
808            .boxed()
809        };
810        debug!("incoming connection");
811        let sync = self.sync.clone();
812        let metrics = self.metrics.clone();
813        self.running_sync_accept.spawn(
814            async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
815                .instrument(Span::current()),
816        );
817    }
818
819    pub fn accept_sync_request(
820        &mut self,
821        namespace: NamespaceId,
822        peer: PublicKey,
823    ) -> AcceptOutcome {
824        self.state
825            .accept_request(&self.endpoint.id(), &namespace, peer)
826    }
827}
828
829/// Event emitted when a sync operation completes
830#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
831pub struct SyncEvent {
832    /// Peer we synced with
833    pub peer: PublicKey,
834    /// Origin of the sync exchange
835    pub origin: Origin,
836    /// Timestamp when the sync started
837    pub finished: SystemTime,
838    /// Timestamp when the sync finished
839    pub started: SystemTime,
840    /// Result of the sync operation
841    pub result: std::result::Result<SyncDetails, String>,
842}
843
844#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
845pub struct SyncDetails {
846    /// Number of entries received
847    pub entries_received: usize,
848    /// Number of entries sent
849    pub entries_sent: usize,
850}
851
852impl From<&SyncFinished> for SyncDetails {
853    fn from(value: &SyncFinished) -> Self {
854        Self {
855            entries_received: value.outcome.num_recv,
856            entries_sent: value.outcome.num_sent,
857        }
858    }
859}
860
861#[derive(Debug, Default)]
862struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
863
864impl SubscribersMap {
865    fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
866        self.0.entry(namespace).or_default().subscribe(sender);
867    }
868
869    async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
870        debug!(namespace=%namespace.fmt_short(), %event, "emit event");
871        let Some(subscribers) = self.0.get_mut(namespace) else {
872            return false;
873        };
874
875        if !subscribers.send(event).await {
876            self.0.remove(namespace);
877        }
878        true
879    }
880
881    fn remove(&mut self, namespace: &NamespaceId) {
882        self.0.remove(namespace);
883    }
884
885    fn clear(&mut self) {
886        self.0.clear();
887    }
888}
889
890#[derive(Debug, Default)]
891struct QueuedHashes {
892    by_hash: HashMap<Hash, HashSet<NamespaceId>>,
893    by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
894}
895
896#[derive(Debug, Clone, Default)]
897struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<EndpointId>>>>);
898
899impl ContentDiscovery for ProviderNodes {
900    fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<EndpointId> {
901        let nodes = self
902            .0
903            .lock()
904            .expect("poisoned")
905            .get(&hash.hash)
906            .into_iter()
907            .flatten()
908            .cloned()
909            .collect::<Vec<_>>();
910        Box::pin(n0_future::stream::iter(nodes))
911    }
912}
913
914impl QueuedHashes {
915    fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
916        self.by_hash.entry(hash).or_default().insert(namespace);
917        self.by_namespace.entry(namespace).or_default().insert(hash);
918    }
919
920    /// Remove a hash from the set of queued hashes.
921    ///
922    /// Returns a list of namespaces that are now complete (have no queued hashes anymore).
923    fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
924        let namespaces = self.by_hash.remove(hash).unwrap_or_default();
925        let mut removed_namespaces = vec![];
926        for namespace in namespaces {
927            if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
928                hashes.remove(hash);
929                if hashes.is_empty() {
930                    self.by_namespace.remove(&namespace);
931                    removed_namespaces.push(namespace);
932                }
933            }
934        }
935        removed_namespaces
936    }
937
938    fn contains_hash(&self, hash: &Hash) -> bool {
939        self.by_hash.contains_key(hash)
940    }
941
942    fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
943        self.by_namespace.contains_key(namespace)
944    }
945}
946
947#[derive(Debug, Default)]
948struct Subscribers(Vec<async_channel::Sender<Event>>);
949
950impl Subscribers {
951    fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
952        self.0.push(sender)
953    }
954
955    async fn send(&mut self, event: Event) -> bool {
956        let futs = self.0.iter().map(|sender| sender.send(event.clone()));
957        let res = futures_buffered::join_all(futs).await;
958        // reverse the order so removing does not shift remaining indices
959        for (i, res) in res.into_iter().enumerate().rev() {
960            if res.is_err() {
961                self.0.remove(i);
962            }
963        }
964        !self.0.is_empty()
965    }
966}
967
968fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
969    match res {
970        Ok(res) => res.peer.fmt_short().to_string(),
971        Err(err) => err
972            .peer()
973            .map(|x| x.fmt_short().to_string())
974            .unwrap_or_else(|| "unknown".to_string()),
975    }
976}
977
978fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
979    match res {
980        Ok(res) => res.namespace.fmt_short(),
981        Err(err) => err
982            .namespace()
983            .map(|x| x.fmt_short())
984            .unwrap_or_else(|| "unknown".to_string()),
985    }
986}
987
988#[cfg(test)]
989mod tests {
990    use super::*;
991
992    #[tokio::test]
993    async fn test_sync_remove() {
994        let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
995        let (a_tx, a_rx) = async_channel::unbounded();
996        let (b_tx, b_rx) = async_channel::unbounded();
997        let mut subscribers = Subscribers::default();
998        subscribers.subscribe(a_tx);
999        subscribers.subscribe(b_tx);
1000        drop(a_rx);
1001        drop(b_rx);
1002        subscribers.send(Event::NeighborUp(pk)).await;
1003    }
1004}