iroh_docs/engine/
live.rs

1#![allow(missing_docs)]
2
3use std::{
4    collections::{HashMap, HashSet},
5    sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use iroh::{address_lookup::memory::MemoryLookup, Endpoint, EndpointAddr, EndpointId, PublicKey};
10use iroh_blobs::{
11    api::{
12        blobs::BlobStatus,
13        downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
14        Store,
15    },
16    Hash, HashAndFormat,
17};
18use iroh_gossip::net::Gossip;
19use n0_future::{task::JoinSet, time::SystemTime, FutureExt};
20use serde::{Deserialize, Serialize};
21use tokio::sync::{self, mpsc, oneshot};
22use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
23
24// use super::gossip::{GossipActor, ToGossipActor};
25use super::state::{NamespaceStates, Origin, SyncReason};
26use crate::{
27    actor::{OpenOpts, SyncHandle},
28    engine::gossip::GossipState,
29    metrics::Metrics,
30    net::{
31        connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
32        SyncFinished,
33    },
34    AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
35};
36
37/// An iroh-docs operation
38///
39/// This is the message that is broadcast over iroh-gossip.
40#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
41pub enum Op {
42    /// A new entry was inserted into the document.
43    Put(SignedEntry),
44    /// A peer now has content available for a hash.
45    ContentReady(Hash),
46    /// We synced with another peer, here's the news.
47    SyncReport(SyncReport),
48}
49
50/// Report of a successful sync with the new heads.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct SyncReport {
53    namespace: NamespaceId,
54    /// Encoded [`AuthorHeads`]
55    heads: Vec<u8>,
56}
57
58/// Messages to the sync actor
59#[derive(derive_more::Debug, strum::Display)]
60pub enum ToLiveActor {
61    StartSync {
62        namespace: NamespaceId,
63        peers: Vec<EndpointAddr>,
64        #[debug("onsehot::Sender")]
65        reply: sync::oneshot::Sender<anyhow::Result<()>>,
66    },
67    Leave {
68        namespace: NamespaceId,
69        kill_subscribers: bool,
70        #[debug("onsehot::Sender")]
71        reply: sync::oneshot::Sender<anyhow::Result<()>>,
72    },
73    Shutdown {
74        reply: sync::oneshot::Sender<()>,
75    },
76    Subscribe {
77        namespace: NamespaceId,
78        #[debug("sender")]
79        sender: async_channel::Sender<Event>,
80        #[debug("oneshot::Sender")]
81        reply: sync::oneshot::Sender<Result<()>>,
82    },
83    HandleConnection {
84        conn: iroh::endpoint::Connection,
85    },
86    AcceptSyncRequest {
87        namespace: NamespaceId,
88        peer: PublicKey,
89        #[debug("oneshot::Sender")]
90        reply: sync::oneshot::Sender<AcceptOutcome>,
91    },
92
93    IncomingSyncReport {
94        from: PublicKey,
95        report: SyncReport,
96    },
97    NeighborContentReady {
98        namespace: NamespaceId,
99        node: PublicKey,
100        hash: Hash,
101    },
102    NeighborUp {
103        namespace: NamespaceId,
104        peer: PublicKey,
105    },
106    NeighborDown {
107        namespace: NamespaceId,
108        peer: PublicKey,
109    },
110}
111
112/// Events informing about actions of the live sync progress.
113#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
114pub enum Event {
115    /// The content of an entry was downloaded and is now available at the local node
116    ContentReady {
117        /// The content hash of the newly available entry content
118        hash: Hash,
119    },
120    /// We have a new neighbor in the swarm.
121    NeighborUp(PublicKey),
122    /// We lost a neighbor in the swarm.
123    NeighborDown(PublicKey),
124    /// A set-reconciliation sync finished.
125    SyncFinished(SyncEvent),
126    /// All pending content is now ready.
127    ///
128    /// This event is only emitted after a sync completed and `Self::SyncFinished` was emitted at
129    /// least once. It signals that all currently pending downloads have been completed.
130    ///
131    /// Receiving this event does not guarantee that all content in the document is available. If
132    /// blobs failed to download, this event will still be emitted after all operations completed.
133    PendingContentReady,
134}
135
136type SyncConnectRes = (
137    NamespaceId,
138    PublicKey,
139    SyncReason,
140    Result<SyncFinished, ConnectError>,
141);
142type SyncAcceptRes = Result<SyncFinished, AcceptError>;
143type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
144
145// Currently peers might double-sync in both directions.
146pub struct LiveActor {
147    /// Receiver for actor messages.
148    inbox: mpsc::Receiver<ToLiveActor>,
149    sync: SyncHandle,
150    endpoint: Endpoint,
151    bao_store: Store,
152    downloader: Downloader,
153    memory_lookup: MemoryLookup,
154    replica_events_tx: async_channel::Sender<crate::Event>,
155    replica_events_rx: async_channel::Receiver<crate::Event>,
156
157    /// Send messages to self.
158    /// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
159    /// Only clone into newly spawned tasks.
160    sync_actor_tx: mpsc::Sender<ToLiveActor>,
161    gossip: GossipState,
162
163    /// Running sync futures (from connect).
164    running_sync_connect: JoinSet<SyncConnectRes>,
165    /// Running sync futures (from accept).
166    running_sync_accept: JoinSet<SyncAcceptRes>,
167    /// Running download futures.
168    download_tasks: JoinSet<DownloadRes>,
169    /// Content hashes which are wanted but not yet queued because no provider was found.
170    missing_hashes: HashSet<Hash>,
171    /// Content hashes queued in downloader.
172    queued_hashes: QueuedHashes,
173    /// Nodes known to have a hash
174    hash_providers: ProviderNodes,
175
176    /// Subscribers to actor events
177    subscribers: SubscribersMap,
178
179    /// Sync state per replica and peer
180    state: NamespaceStates,
181    metrics: Arc<Metrics>,
182}
183impl LiveActor {
184    /// Create the live actor.
185    #[allow(clippy::too_many_arguments)]
186    pub fn new(
187        sync: SyncHandle,
188        endpoint: Endpoint,
189        gossip: Gossip,
190        bao_store: Store,
191        downloader: Downloader,
192        inbox: mpsc::Receiver<ToLiveActor>,
193        sync_actor_tx: mpsc::Sender<ToLiveActor>,
194        metrics: Arc<Metrics>,
195    ) -> Result<Self> {
196        let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
197        let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
198        let memory_lookup = MemoryLookup::new();
199        endpoint.address_lookup()?.add(memory_lookup.clone());
200        Ok(Self {
201            inbox,
202            sync,
203            replica_events_rx,
204            replica_events_tx,
205            endpoint,
206            memory_lookup,
207            gossip: gossip_state,
208            bao_store,
209            downloader,
210            sync_actor_tx,
211            running_sync_connect: Default::default(),
212            running_sync_accept: Default::default(),
213            subscribers: Default::default(),
214            download_tasks: Default::default(),
215            state: Default::default(),
216            missing_hashes: Default::default(),
217            queued_hashes: Default::default(),
218            hash_providers: Default::default(),
219            metrics,
220        })
221    }
222
223    /// Run the actor loop.
224    pub async fn run(mut self) -> Result<()> {
225        let shutdown_reply = self.run_inner().await;
226        if let Err(err) = self.shutdown().await {
227            error!(?err, "Error during shutdown");
228        }
229        drop(self);
230        match shutdown_reply {
231            Ok(reply) => {
232                reply.send(()).ok();
233                Ok(())
234            }
235            Err(err) => Err(err),
236        }
237    }
238
239    async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
240        let mut i = 0;
241        loop {
242            i += 1;
243            trace!(?i, "tick wait");
244            self.metrics.doc_live_tick_main.inc();
245            tokio::select! {
246                biased;
247                msg = self.inbox.recv() => {
248                    let msg = msg.context("to_actor closed")?;
249                    trace!(?i, %msg, "tick: to_actor");
250                    self.metrics.doc_live_tick_actor.inc();
251                    match msg {
252                        ToLiveActor::Shutdown { reply } => {
253                            break Ok(reply);
254                        }
255                        msg => {
256                            self.on_actor_message(msg).await.context("on_actor_message")?;
257                        }
258                    }
259                }
260                event = self.replica_events_rx.recv() => {
261                    trace!(?i, "tick: replica_event");
262                    self.metrics.doc_live_tick_replica_event.inc();
263                    let event = event.context("replica_events closed")?;
264                    if let Err(err) = self.on_replica_event(event).await {
265                        error!(?err, "Failed to process replica event");
266                    }
267                }
268                Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
269                    trace!(?i, "tick: running_sync_connect");
270                    self.metrics.doc_live_tick_running_sync_connect.inc();
271                    let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
272                    self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
273
274                }
275                Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
276                    trace!(?i, "tick: running_sync_accept");
277                    self.metrics.doc_live_tick_running_sync_accept.inc();
278                    let res = res.context("running_sync_accept closed")?;
279                    self.on_sync_via_accept_finished(res).await;
280                }
281                Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
282                    trace!(?i, "tick: pending_downloads");
283                    self.metrics.doc_live_tick_pending_downloads.inc();
284                    let (namespace, hash, res) = res.context("pending_downloads closed")?;
285                    self.on_download_ready(namespace, hash, res).await;
286                }
287                res = self.gossip.progress(), if !self.gossip.is_empty() => {
288                    if let Err(error) = res {
289                        warn!(?error, "gossip state failed");
290                    }
291                }
292            }
293        }
294    }
295
296    async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
297        match msg {
298            ToLiveActor::Shutdown { .. } => {
299                unreachable!("handled in run");
300            }
301            ToLiveActor::IncomingSyncReport { from, report } => {
302                self.on_sync_report(from, report).await
303            }
304            ToLiveActor::NeighborUp { namespace, peer } => {
305                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
306                self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
307                self.subscribers
308                    .send(&namespace, Event::NeighborUp(peer))
309                    .await;
310            }
311            ToLiveActor::NeighborDown { namespace, peer } => {
312                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
313                self.subscribers
314                    .send(&namespace, Event::NeighborDown(peer))
315                    .await;
316            }
317            ToLiveActor::StartSync {
318                namespace,
319                peers,
320                reply,
321            } => {
322                let res = self.start_sync(namespace, peers).await;
323                reply.send(res).ok();
324            }
325            ToLiveActor::Leave {
326                namespace,
327                kill_subscribers,
328                reply,
329            } => {
330                let res = self.leave(namespace, kill_subscribers).await;
331                reply.send(res).ok();
332            }
333            ToLiveActor::Subscribe {
334                namespace,
335                sender,
336                reply,
337            } => {
338                self.subscribers.subscribe(namespace, sender);
339                reply.send(Ok(())).ok();
340            }
341            ToLiveActor::HandleConnection { conn } => {
342                self.handle_connection(conn).await;
343            }
344            ToLiveActor::AcceptSyncRequest {
345                namespace,
346                peer,
347                reply,
348            } => {
349                let outcome = self.accept_sync_request(namespace, peer);
350                reply.send(outcome).ok();
351            }
352            ToLiveActor::NeighborContentReady {
353                namespace,
354                node,
355                hash,
356            } => {
357                self.on_neighbor_content_ready(namespace, node, hash).await;
358            }
359        };
360        Ok(true)
361    }
362
363    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
364    fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
365        if !self.state.start_connect(&namespace, peer, reason) {
366            return;
367        }
368        let endpoint = self.endpoint.clone();
369        let sync = self.sync.clone();
370        let metrics = self.metrics.clone();
371        let fut = async move {
372            let res = connect_and_sync(
373                &endpoint,
374                &sync,
375                namespace,
376                EndpointAddr::new(peer),
377                Some(&metrics),
378            )
379            .await;
380            (namespace, peer, reason, res)
381        }
382        .instrument(Span::current());
383        self.running_sync_connect.spawn(fut);
384    }
385
386    async fn shutdown(&mut self) -> anyhow::Result<()> {
387        // cancel all subscriptions
388        self.subscribers.clear();
389        let (gossip_shutdown_res, _store) = tokio::join!(
390            // quit the gossip topics and task loops.
391            self.gossip.shutdown(),
392            // shutdown sync thread
393            self.sync.shutdown()
394        );
395        gossip_shutdown_res?;
396        // TODO: abort_all and join_next all JoinSets to catch panics
397        // (they are aborted on drop, but that swallows panics)
398        Ok(())
399    }
400
401    async fn start_sync(
402        &mut self,
403        namespace: NamespaceId,
404        mut peers: Vec<EndpointAddr>,
405    ) -> Result<()> {
406        debug!(?namespace, peers = peers.len(), "start sync");
407        // update state to allow sync
408        if !self.state.is_syncing(&namespace) {
409            let opts = OpenOpts::default()
410                .sync()
411                .subscribe(self.replica_events_tx.clone());
412            self.sync.open(namespace, opts).await?;
413            self.state.insert(namespace);
414        }
415        // add the peers stored for this document
416        match self.sync.get_sync_peers(namespace).await {
417            Ok(None) => {
418                // no peers for this document
419            }
420            Ok(Some(known_useful_peers)) => {
421                let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
422                    // peers are stored as bytes, don't fail the operation if they can't be
423                    // decoded: simply ignore the peer
424                    match PublicKey::from_bytes(&peer_id_bytes) {
425                        Ok(public_key) => Some(EndpointAddr::new(public_key)),
426                        Err(_signing_error) => {
427                            warn!("potential db corruption: peers per doc can't be decoded");
428                            None
429                        }
430                    }
431                });
432                peers.extend(as_node_addr);
433            }
434            Err(e) => {
435                // try to continue if peers per doc can't be read since they are not vital for sync
436                warn!(%e, "db error reading peers per document")
437            }
438        }
439        self.join_peers(namespace, peers).await?;
440        Ok(())
441    }
442
443    async fn leave(
444        &mut self,
445        namespace: NamespaceId,
446        kill_subscribers: bool,
447    ) -> anyhow::Result<()> {
448        // self.subscribers.remove(&namespace);
449        if self.state.remove(&namespace) {
450            self.sync.set_sync(namespace, false).await?;
451            self.sync
452                .unsubscribe(namespace, self.replica_events_tx.clone())
453                .await?;
454            self.sync.close(namespace).await?;
455            self.gossip.quit(&namespace);
456        }
457        if kill_subscribers {
458            self.subscribers.remove(&namespace);
459        }
460        Ok(())
461    }
462
463    async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
464        let mut peer_ids = Vec::new();
465
466        // add addresses of peers to our endpoint address book
467        for peer in peers.into_iter() {
468            let peer_id = peer.id;
469            // adding a node address without any addressing info fails with an error,
470            // but we still want to include those peers because endpoint address lookup might find addresses for them
471            if !peer.is_empty() {
472                self.memory_lookup.add_endpoint_info(peer);
473            }
474            peer_ids.push(peer_id);
475        }
476
477        // tell gossip to join
478        self.gossip.join(namespace, peer_ids.clone()).await?;
479
480        if !peer_ids.is_empty() {
481            // trigger initial sync with initial peers
482            for peer in peer_ids {
483                self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
484            }
485        }
486        Ok(())
487    }
488
489    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
490    async fn on_sync_via_connect_finished(
491        &mut self,
492        namespace: NamespaceId,
493        peer: PublicKey,
494        reason: SyncReason,
495        result: Result<SyncFinished, ConnectError>,
496    ) {
497        match result {
498            Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
499                debug!(?reason, "remote abort, already syncing");
500            }
501            res => {
502                self.on_sync_finished(
503                    namespace,
504                    peer,
505                    Origin::Connect(reason),
506                    res.map_err(Into::into),
507                )
508                .await
509            }
510        }
511    }
512
513    #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
514    async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
515        match res {
516            Ok(state) => {
517                self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
518                    .await
519            }
520            Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
521                // In case we aborted the sync: do nothing (our outgoing sync is in progress)
522                debug!(?reason, "aborted by us");
523            }
524            Err(err) => {
525                if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
526                    self.on_sync_finished(
527                        namespace,
528                        peer,
529                        Origin::Accept,
530                        Err(anyhow::Error::from(err)),
531                    )
532                    .await;
533                } else {
534                    debug!(?err, "failed before reading the first message");
535                }
536            }
537        }
538    }
539
540    async fn on_sync_finished(
541        &mut self,
542        namespace: NamespaceId,
543        peer: PublicKey,
544        origin: Origin,
545        result: Result<SyncFinished>,
546    ) {
547        match &result {
548            Err(ref err) => {
549                warn!(?origin, ?err, "sync failed");
550            }
551            Ok(ref details) => {
552                info!(
553                    sent = %details.outcome.num_sent,
554                    recv = %details.outcome.num_recv,
555                    t_connect = ?details.timings.connect,
556                    t_process = ?details.timings.process,
557                    "sync finished",
558                );
559
560                // register the peer as useful for the document
561                if let Err(e) = self
562                    .sync
563                    .register_useful_peer(namespace, *peer.as_bytes())
564                    .await
565                {
566                    debug!(%e, "failed to register peer for document")
567                }
568
569                // broadcast a sync report to our neighbors, but only if we received new entries.
570                if details.outcome.num_recv > 0 {
571                    info!("broadcast sync report to neighbors");
572                    match details
573                        .outcome
574                        .heads_received
575                        .encode(Some(self.gossip.max_message_size()))
576                    {
577                        Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
578                        Ok(heads) => {
579                            let report = SyncReport { namespace, heads };
580                            self.broadcast_neighbors(namespace, &Op::SyncReport(report))
581                                .await;
582                        }
583                    }
584                }
585            }
586        };
587
588        let result_for_event = match &result {
589            Ok(details) => Ok(details.into()),
590            Err(err) => Err(err.to_string()),
591        };
592
593        let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
594            return;
595        };
596
597        let ev = SyncEvent {
598            peer,
599            origin,
600            result: result_for_event,
601            finished: SystemTime::now(),
602            started,
603        };
604        self.subscribers
605            .send(&namespace, Event::SyncFinished(ev))
606            .await;
607
608        // Check if there are queued pending content hashes for this namespace.
609        // If hashes are pending, mark this namespace to be eglible for a PendingContentReady event once all
610        // pending hashes have completed downloading.
611        // If no hashes are pending, emit the PendingContentReady event right away. The next
612        // PendingContentReady event may then only be emitted after the next sync completes.
613        if self.queued_hashes.contains_namespace(&namespace) {
614            self.state.set_may_emit_ready(&namespace, true);
615        } else {
616            self.subscribers
617                .send(&namespace, Event::PendingContentReady)
618                .await;
619            self.state.set_may_emit_ready(&namespace, false);
620        }
621
622        if resync {
623            self.sync_with_peer(namespace, peer, SyncReason::Resync);
624        }
625    }
626
627    async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
628        if !self.state.is_syncing(&namespace) {
629            return;
630        }
631
632        let msg = match postcard::to_stdvec(op) {
633            Ok(msg) => msg,
634            Err(err) => {
635                error!(?err, ?op, "Failed to serialize message:");
636                return;
637            }
638        };
639        // TODO: We should debounce and merge these neighbor announcements likely.
640        self.gossip
641            .broadcast_neighbors(&namespace, msg.into())
642            .await;
643    }
644
645    async fn on_download_ready(
646        &mut self,
647        namespace: NamespaceId,
648        hash: Hash,
649        res: Result<(), anyhow::Error>,
650    ) {
651        let completed_namespaces = self.queued_hashes.remove_hash(&hash);
652        debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
653        if res.is_ok() {
654            self.subscribers
655                .send(&namespace, Event::ContentReady { hash })
656                .await;
657            // Inform our neighbors that we have new content ready.
658            self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
659                .await;
660        } else {
661            self.missing_hashes.insert(hash);
662        }
663        for namespace in completed_namespaces.iter() {
664            if let Some(true) = self.state.may_emit_ready(namespace) {
665                self.subscribers
666                    .send(namespace, Event::PendingContentReady)
667                    .await;
668            }
669        }
670    }
671
672    async fn on_neighbor_content_ready(
673        &mut self,
674        namespace: NamespaceId,
675        node: EndpointId,
676        hash: Hash,
677    ) {
678        self.start_download(namespace, hash, node, true).await;
679    }
680
681    #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
682    async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
683        let namespace = report.namespace;
684        if !self.state.is_syncing(&namespace) {
685            return;
686        }
687        let heads = match AuthorHeads::decode(&report.heads) {
688            Ok(heads) => heads,
689            Err(err) => {
690                warn!(?err, "failed to decode AuthorHeads");
691                return;
692            }
693        };
694        match self.sync.has_news_for_us(report.namespace, heads).await {
695            Ok(Some(updated_authors)) => {
696                info!(%updated_authors, "news reported: sync now");
697                self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
698            }
699            Ok(None) => {
700                debug!("no news reported: nothing to do");
701            }
702            Err(err) => {
703                warn!("sync actor error: {err:?}");
704            }
705        }
706    }
707
708    async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
709        match event {
710            crate::Event::LocalInsert { namespace, entry } => {
711                debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
712                // A new entry was inserted locally. Broadcast a gossip message.
713                if self.state.is_syncing(&namespace) {
714                    let op = Op::Put(entry.clone());
715                    let message = postcard::to_stdvec(&op)?.into();
716                    self.gossip.broadcast(&namespace, message).await;
717                }
718            }
719            crate::Event::RemoteInsert {
720                namespace,
721                entry,
722                from,
723                should_download,
724                remote_content_status,
725            } => {
726                debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
727                // A new entry was inserted from initial sync or gossip. Queue downloading the
728                // content.
729                if should_download {
730                    let hash = entry.content_hash();
731                    if matches!(remote_content_status, ContentStatus::Complete) {
732                        let node_id = PublicKey::from_bytes(&from)?;
733                        self.start_download(namespace, hash, node_id, false).await;
734                    } else {
735                        self.missing_hashes.insert(hash);
736                    }
737                }
738            }
739        }
740
741        Ok(())
742    }
743
744    async fn start_download(
745        &mut self,
746        namespace: NamespaceId,
747        hash: Hash,
748        node: PublicKey,
749        only_if_missing: bool,
750    ) {
751        let entry_status = self.bao_store.blobs().status(hash).await;
752        if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
753            self.missing_hashes.remove(&hash);
754            return;
755        }
756        self.hash_providers
757            .0
758            .lock()
759            .expect("poisoned")
760            .entry(hash)
761            .or_default()
762            .insert(node);
763        if self.queued_hashes.contains_hash(&hash) {
764            self.queued_hashes.insert(hash, namespace);
765        } else if !only_if_missing || self.missing_hashes.contains(&hash) {
766            let req = DownloadRequest::new(
767                HashAndFormat::raw(hash),
768                self.hash_providers.clone(),
769                SplitStrategy::None,
770            );
771            let handle = self.downloader.download_with_opts(req);
772
773            self.queued_hashes.insert(hash, namespace);
774            self.missing_hashes.remove(&hash);
775            self.download_tasks.spawn(async move {
776                (
777                    namespace,
778                    hash,
779                    handle.await.map_err(|e| anyhow::anyhow!(e)),
780                )
781            });
782        }
783    }
784
785    #[instrument("accept", skip_all)]
786    pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
787        let to_actor_tx = self.sync_actor_tx.clone();
788        let accept_request_cb = move |namespace, peer| {
789            let to_actor_tx = to_actor_tx.clone();
790            async move {
791                let (reply_tx, reply_rx) = oneshot::channel();
792                to_actor_tx
793                    .send(ToLiveActor::AcceptSyncRequest {
794                        namespace,
795                        peer,
796                        reply: reply_tx,
797                    })
798                    .await
799                    .ok();
800                match reply_rx.await {
801                    Ok(outcome) => outcome,
802                    Err(err) => {
803                        warn!(
804                            "accept request callback failed to retrieve reply from actor: {err:?}"
805                        );
806                        AcceptOutcome::Reject(AbortReason::InternalServerError)
807                    }
808                }
809            }
810            .boxed()
811        };
812        debug!("incoming connection");
813        let sync = self.sync.clone();
814        let metrics = self.metrics.clone();
815        self.running_sync_accept.spawn(
816            async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
817                .instrument(Span::current()),
818        );
819    }
820
821    pub fn accept_sync_request(
822        &mut self,
823        namespace: NamespaceId,
824        peer: PublicKey,
825    ) -> AcceptOutcome {
826        self.state
827            .accept_request(&self.endpoint.id(), &namespace, peer)
828    }
829}
830
831/// Event emitted when a sync operation completes
832#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
833pub struct SyncEvent {
834    /// Peer we synced with
835    pub peer: PublicKey,
836    /// Origin of the sync exchange
837    pub origin: Origin,
838    /// Timestamp when the sync started
839    pub finished: SystemTime,
840    /// Timestamp when the sync finished
841    pub started: SystemTime,
842    /// Result of the sync operation
843    pub result: std::result::Result<SyncDetails, String>,
844}
845
846#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
847pub struct SyncDetails {
848    /// Number of entries received
849    pub entries_received: usize,
850    /// Number of entries sent
851    pub entries_sent: usize,
852}
853
854impl From<&SyncFinished> for SyncDetails {
855    fn from(value: &SyncFinished) -> Self {
856        Self {
857            entries_received: value.outcome.num_recv,
858            entries_sent: value.outcome.num_sent,
859        }
860    }
861}
862
863#[derive(Debug, Default)]
864struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
865
866impl SubscribersMap {
867    fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
868        self.0.entry(namespace).or_default().subscribe(sender);
869    }
870
871    async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
872        debug!(namespace=%namespace.fmt_short(), %event, "emit event");
873        let Some(subscribers) = self.0.get_mut(namespace) else {
874            return false;
875        };
876
877        if !subscribers.send(event).await {
878            self.0.remove(namespace);
879        }
880        true
881    }
882
883    fn remove(&mut self, namespace: &NamespaceId) {
884        self.0.remove(namespace);
885    }
886
887    fn clear(&mut self) {
888        self.0.clear();
889    }
890}
891
892#[derive(Debug, Default)]
893struct QueuedHashes {
894    by_hash: HashMap<Hash, HashSet<NamespaceId>>,
895    by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
896}
897
898#[derive(Debug, Clone, Default)]
899struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<EndpointId>>>>);
900
901impl ContentDiscovery for ProviderNodes {
902    fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<EndpointId> {
903        let nodes = self
904            .0
905            .lock()
906            .expect("poisoned")
907            .get(&hash.hash)
908            .into_iter()
909            .flatten()
910            .cloned()
911            .collect::<Vec<_>>();
912        Box::pin(n0_future::stream::iter(nodes))
913    }
914}
915
916impl QueuedHashes {
917    fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
918        self.by_hash.entry(hash).or_default().insert(namespace);
919        self.by_namespace.entry(namespace).or_default().insert(hash);
920    }
921
922    /// Remove a hash from the set of queued hashes.
923    ///
924    /// Returns a list of namespaces that are now complete (have no queued hashes anymore).
925    fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
926        let namespaces = self.by_hash.remove(hash).unwrap_or_default();
927        let mut removed_namespaces = vec![];
928        for namespace in namespaces {
929            if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
930                hashes.remove(hash);
931                if hashes.is_empty() {
932                    self.by_namespace.remove(&namespace);
933                    removed_namespaces.push(namespace);
934                }
935            }
936        }
937        removed_namespaces
938    }
939
940    fn contains_hash(&self, hash: &Hash) -> bool {
941        self.by_hash.contains_key(hash)
942    }
943
944    fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
945        self.by_namespace.contains_key(namespace)
946    }
947}
948
949#[derive(Debug, Default)]
950struct Subscribers(Vec<async_channel::Sender<Event>>);
951
952impl Subscribers {
953    fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
954        self.0.push(sender)
955    }
956
957    async fn send(&mut self, event: Event) -> bool {
958        let futs = self.0.iter().map(|sender| sender.send(event.clone()));
959        let res = futures_buffered::join_all(futs).await;
960        // reverse the order so removing does not shift remaining indices
961        for (i, res) in res.into_iter().enumerate().rev() {
962            if res.is_err() {
963                self.0.remove(i);
964            }
965        }
966        !self.0.is_empty()
967    }
968}
969
970fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
971    match res {
972        Ok(res) => res.peer.fmt_short().to_string(),
973        Err(err) => err
974            .peer()
975            .map(|x| x.fmt_short().to_string())
976            .unwrap_or_else(|| "unknown".to_string()),
977    }
978}
979
980fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
981    match res {
982        Ok(res) => res.namespace.fmt_short(),
983        Err(err) => err
984            .namespace()
985            .map(|x| x.fmt_short())
986            .unwrap_or_else(|| "unknown".to_string()),
987    }
988}
989
990#[cfg(test)]
991mod tests {
992    use super::*;
993
994    #[tokio::test]
995    async fn test_sync_remove() {
996        let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
997        let (a_tx, a_rx) = async_channel::unbounded();
998        let (b_tx, b_rx) = async_channel::unbounded();
999        let mut subscribers = Subscribers::default();
1000        subscribers.subscribe(a_tx);
1001        subscribers.subscribe(b_tx);
1002        drop(a_rx);
1003        drop(b_rx);
1004        subscribers.send(Event::NeighborUp(pk)).await;
1005    }
1006}