iroh_docs/engine/
live.rs

1#![allow(missing_docs)]
2
3use std::{
4    collections::{HashMap, HashSet},
5    sync::Arc,
6    time::SystemTime,
7};
8
9use anyhow::{Context, Result};
10use futures_lite::FutureExt;
11use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
12use iroh_blobs::{
13    api::{
14        blobs::BlobStatus,
15        downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
16        Store,
17    },
18    Hash, HashAndFormat,
19};
20use iroh_gossip::net::Gossip;
21use serde::{Deserialize, Serialize};
22use tokio::{
23    sync::{self, mpsc, oneshot},
24    task::JoinSet,
25};
26use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
27
28// use super::gossip::{GossipActor, ToGossipActor};
29use super::state::{NamespaceStates, Origin, SyncReason};
30use crate::{
31    actor::{OpenOpts, SyncHandle},
32    engine::gossip::GossipState,
33    metrics::Metrics,
34    net::{
35        connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
36        SyncFinished,
37    },
38    AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
39};
40
41/// Name used for logging when new node addresses are added from the docs engine.
42const SOURCE_NAME: &str = "docs_engine";
43
44/// An iroh-docs operation
45///
46/// This is the message that is broadcast over iroh-gossip.
47#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
48pub enum Op {
49    /// A new entry was inserted into the document.
50    Put(SignedEntry),
51    /// A peer now has content available for a hash.
52    ContentReady(Hash),
53    /// We synced with another peer, here's the news.
54    SyncReport(SyncReport),
55}
56
57/// Report of a successful sync with the new heads.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct SyncReport {
60    namespace: NamespaceId,
61    /// Encoded [`AuthorHeads`]
62    heads: Vec<u8>,
63}
64
65/// Messages to the sync actor
66#[derive(derive_more::Debug, strum::Display)]
67pub enum ToLiveActor {
68    StartSync {
69        namespace: NamespaceId,
70        peers: Vec<NodeAddr>,
71        #[debug("onsehot::Sender")]
72        reply: sync::oneshot::Sender<anyhow::Result<()>>,
73    },
74    Leave {
75        namespace: NamespaceId,
76        kill_subscribers: bool,
77        #[debug("onsehot::Sender")]
78        reply: sync::oneshot::Sender<anyhow::Result<()>>,
79    },
80    Shutdown {
81        reply: sync::oneshot::Sender<()>,
82    },
83    Subscribe {
84        namespace: NamespaceId,
85        #[debug("sender")]
86        sender: async_channel::Sender<Event>,
87        #[debug("oneshot::Sender")]
88        reply: sync::oneshot::Sender<Result<()>>,
89    },
90    HandleConnection {
91        conn: iroh::endpoint::Connection,
92    },
93    AcceptSyncRequest {
94        namespace: NamespaceId,
95        peer: PublicKey,
96        #[debug("oneshot::Sender")]
97        reply: sync::oneshot::Sender<AcceptOutcome>,
98    },
99
100    IncomingSyncReport {
101        from: PublicKey,
102        report: SyncReport,
103    },
104    NeighborContentReady {
105        namespace: NamespaceId,
106        node: PublicKey,
107        hash: Hash,
108    },
109    NeighborUp {
110        namespace: NamespaceId,
111        peer: PublicKey,
112    },
113    NeighborDown {
114        namespace: NamespaceId,
115        peer: PublicKey,
116    },
117}
118
119/// Events informing about actions of the live sync progress.
120#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
121pub enum Event {
122    /// The content of an entry was downloaded and is now available at the local node
123    ContentReady {
124        /// The content hash of the newly available entry content
125        hash: Hash,
126    },
127    /// We have a new neighbor in the swarm.
128    NeighborUp(PublicKey),
129    /// We lost a neighbor in the swarm.
130    NeighborDown(PublicKey),
131    /// A set-reconciliation sync finished.
132    SyncFinished(SyncEvent),
133    /// All pending content is now ready.
134    ///
135    /// This event is only emitted after a sync completed and `Self::SyncFinished` was emitted at
136    /// least once. It signals that all currently pending downloads have been completed.
137    ///
138    /// Receiving this event does not guarantee that all content in the document is available. If
139    /// blobs failed to download, this event will still be emitted after all operations completed.
140    PendingContentReady,
141}
142
143type SyncConnectRes = (
144    NamespaceId,
145    PublicKey,
146    SyncReason,
147    Result<SyncFinished, ConnectError>,
148);
149type SyncAcceptRes = Result<SyncFinished, AcceptError>;
150type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
151
152// Currently peers might double-sync in both directions.
153pub struct LiveActor {
154    /// Receiver for actor messages.
155    inbox: mpsc::Receiver<ToLiveActor>,
156    sync: SyncHandle,
157    endpoint: Endpoint,
158    bao_store: Store,
159    downloader: Downloader,
160    replica_events_tx: async_channel::Sender<crate::Event>,
161    replica_events_rx: async_channel::Receiver<crate::Event>,
162
163    /// Send messages to self.
164    /// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
165    /// Only clone into newly spawned tasks.
166    sync_actor_tx: mpsc::Sender<ToLiveActor>,
167    gossip: GossipState,
168
169    /// Running sync futures (from connect).
170    running_sync_connect: JoinSet<SyncConnectRes>,
171    /// Running sync futures (from accept).
172    running_sync_accept: JoinSet<SyncAcceptRes>,
173    /// Running download futures.
174    download_tasks: JoinSet<DownloadRes>,
175    /// Content hashes which are wanted but not yet queued because no provider was found.
176    missing_hashes: HashSet<Hash>,
177    /// Content hashes queued in downloader.
178    queued_hashes: QueuedHashes,
179    /// Nodes known to have a hash
180    hash_providers: ProviderNodes,
181
182    /// Subscribers to actor events
183    subscribers: SubscribersMap,
184
185    /// Sync state per replica and peer
186    state: NamespaceStates,
187    metrics: Arc<Metrics>,
188}
189impl LiveActor {
190    /// Create the live actor.
191    #[allow(clippy::too_many_arguments)]
192    pub fn new(
193        sync: SyncHandle,
194        endpoint: Endpoint,
195        gossip: Gossip,
196        bao_store: Store,
197        downloader: Downloader,
198        inbox: mpsc::Receiver<ToLiveActor>,
199        sync_actor_tx: mpsc::Sender<ToLiveActor>,
200        metrics: Arc<Metrics>,
201    ) -> Self {
202        let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
203        let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
204        Self {
205            inbox,
206            sync,
207            replica_events_rx,
208            replica_events_tx,
209            endpoint,
210            gossip: gossip_state,
211            bao_store,
212            downloader,
213            sync_actor_tx,
214            running_sync_connect: Default::default(),
215            running_sync_accept: Default::default(),
216            subscribers: Default::default(),
217            download_tasks: Default::default(),
218            state: Default::default(),
219            missing_hashes: Default::default(),
220            queued_hashes: Default::default(),
221            hash_providers: Default::default(),
222            metrics,
223        }
224    }
225
226    /// Run the actor loop.
227    pub async fn run(mut self) -> Result<()> {
228        let shutdown_reply = self.run_inner().await;
229        if let Err(err) = self.shutdown().await {
230            error!(?err, "Error during shutdown");
231        }
232        drop(self);
233        match shutdown_reply {
234            Ok(reply) => {
235                reply.send(()).ok();
236                Ok(())
237            }
238            Err(err) => Err(err),
239        }
240    }
241
242    async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
243        let mut i = 0;
244        loop {
245            i += 1;
246            trace!(?i, "tick wait");
247            self.metrics.doc_live_tick_main.inc();
248            tokio::select! {
249                biased;
250                msg = self.inbox.recv() => {
251                    let msg = msg.context("to_actor closed")?;
252                    trace!(?i, %msg, "tick: to_actor");
253                    self.metrics.doc_live_tick_actor.inc();
254                    match msg {
255                        ToLiveActor::Shutdown { reply } => {
256                            break Ok(reply);
257                        }
258                        msg => {
259                            self.on_actor_message(msg).await.context("on_actor_message")?;
260                        }
261                    }
262                }
263                event = self.replica_events_rx.recv() => {
264                    trace!(?i, "tick: replica_event");
265                    self.metrics.doc_live_tick_replica_event.inc();
266                    let event = event.context("replica_events closed")?;
267                    if let Err(err) = self.on_replica_event(event).await {
268                        error!(?err, "Failed to process replica event");
269                    }
270                }
271                Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
272                    trace!(?i, "tick: running_sync_connect");
273                    self.metrics.doc_live_tick_running_sync_connect.inc();
274                    let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
275                    self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
276
277                }
278                Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
279                    trace!(?i, "tick: running_sync_accept");
280                    self.metrics.doc_live_tick_running_sync_accept.inc();
281                    let res = res.context("running_sync_accept closed")?;
282                    self.on_sync_via_accept_finished(res).await;
283                }
284                Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
285                    trace!(?i, "tick: pending_downloads");
286                    self.metrics.doc_live_tick_pending_downloads.inc();
287                    let (namespace, hash, res) = res.context("pending_downloads closed")?;
288                    self.on_download_ready(namespace, hash, res).await;
289                }
290                res = self.gossip.progress(), if !self.gossip.is_empty() => {
291                    if let Err(error) = res {
292                        warn!(?error, "gossip state failed");
293                    }
294                }
295            }
296        }
297    }
298
299    async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
300        match msg {
301            ToLiveActor::Shutdown { .. } => {
302                unreachable!("handled in run");
303            }
304            ToLiveActor::IncomingSyncReport { from, report } => {
305                self.on_sync_report(from, report).await
306            }
307            ToLiveActor::NeighborUp { namespace, peer } => {
308                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
309                self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
310                self.subscribers
311                    .send(&namespace, Event::NeighborUp(peer))
312                    .await;
313            }
314            ToLiveActor::NeighborDown { namespace, peer } => {
315                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
316                self.subscribers
317                    .send(&namespace, Event::NeighborDown(peer))
318                    .await;
319            }
320            ToLiveActor::StartSync {
321                namespace,
322                peers,
323                reply,
324            } => {
325                let res = self.start_sync(namespace, peers).await;
326                reply.send(res).ok();
327            }
328            ToLiveActor::Leave {
329                namespace,
330                kill_subscribers,
331                reply,
332            } => {
333                let res = self.leave(namespace, kill_subscribers).await;
334                reply.send(res).ok();
335            }
336            ToLiveActor::Subscribe {
337                namespace,
338                sender,
339                reply,
340            } => {
341                self.subscribers.subscribe(namespace, sender);
342                reply.send(Ok(())).ok();
343            }
344            ToLiveActor::HandleConnection { conn } => {
345                self.handle_connection(conn).await;
346            }
347            ToLiveActor::AcceptSyncRequest {
348                namespace,
349                peer,
350                reply,
351            } => {
352                let outcome = self.accept_sync_request(namespace, peer);
353                reply.send(outcome).ok();
354            }
355            ToLiveActor::NeighborContentReady {
356                namespace,
357                node,
358                hash,
359            } => {
360                self.on_neighbor_content_ready(namespace, node, hash).await;
361            }
362        };
363        Ok(true)
364    }
365
366    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
367    fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
368        if !self.state.start_connect(&namespace, peer, reason) {
369            return;
370        }
371        let endpoint = self.endpoint.clone();
372        let sync = self.sync.clone();
373        let metrics = self.metrics.clone();
374        let fut = async move {
375            let res = connect_and_sync(
376                &endpoint,
377                &sync,
378                namespace,
379                NodeAddr::new(peer),
380                Some(&metrics),
381            )
382            .await;
383            (namespace, peer, reason, res)
384        }
385        .instrument(Span::current());
386        self.running_sync_connect.spawn(fut);
387    }
388
389    async fn shutdown(&mut self) -> anyhow::Result<()> {
390        // cancel all subscriptions
391        self.subscribers.clear();
392        let (gossip_shutdown_res, _store) = tokio::join!(
393            // quit the gossip topics and task loops.
394            self.gossip.shutdown(),
395            // shutdown sync thread
396            self.sync.shutdown()
397        );
398        gossip_shutdown_res?;
399        // TODO: abort_all and join_next all JoinSets to catch panics
400        // (they are aborted on drop, but that swallows panics)
401        Ok(())
402    }
403
404    async fn start_sync(&mut self, namespace: NamespaceId, mut peers: Vec<NodeAddr>) -> Result<()> {
405        debug!(?namespace, peers = peers.len(), "start sync");
406        // update state to allow sync
407        if !self.state.is_syncing(&namespace) {
408            let opts = OpenOpts::default()
409                .sync()
410                .subscribe(self.replica_events_tx.clone());
411            self.sync.open(namespace, opts).await?;
412            self.state.insert(namespace);
413        }
414        // add the peers stored for this document
415        match self.sync.get_sync_peers(namespace).await {
416            Ok(None) => {
417                // no peers for this document
418            }
419            Ok(Some(known_useful_peers)) => {
420                let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
421                    // peers are stored as bytes, don't fail the operation if they can't be
422                    // decoded: simply ignore the peer
423                    match PublicKey::from_bytes(&peer_id_bytes) {
424                        Ok(public_key) => Some(NodeAddr::new(public_key)),
425                        Err(_signing_error) => {
426                            warn!("potential db corruption: peers per doc can't be decoded");
427                            None
428                        }
429                    }
430                });
431                peers.extend(as_node_addr);
432            }
433            Err(e) => {
434                // try to continue if peers per doc can't be read since they are not vital for sync
435                warn!(%e, "db error reading peers per document")
436            }
437        }
438        self.join_peers(namespace, peers).await?;
439        Ok(())
440    }
441
442    async fn leave(
443        &mut self,
444        namespace: NamespaceId,
445        kill_subscribers: bool,
446    ) -> anyhow::Result<()> {
447        // self.subscribers.remove(&namespace);
448        if self.state.remove(&namespace) {
449            self.sync.set_sync(namespace, false).await?;
450            self.sync
451                .unsubscribe(namespace, self.replica_events_tx.clone())
452                .await?;
453            self.sync.close(namespace).await?;
454            self.gossip.quit(&namespace);
455        }
456        if kill_subscribers {
457            self.subscribers.remove(&namespace);
458        }
459        Ok(())
460    }
461
462    async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
463        let mut peer_ids = Vec::new();
464
465        // add addresses of peers to our endpoint address book
466        for peer in peers.into_iter() {
467            let peer_id = peer.node_id;
468            // adding a node address without any addressing info fails with an error,
469            // but we still want to include those peers because node discovery might find addresses for them
470            if peer.is_empty() {
471                peer_ids.push(peer_id)
472            } else {
473                match self.endpoint.add_node_addr_with_source(peer, SOURCE_NAME) {
474                    Ok(()) => {
475                        peer_ids.push(peer_id);
476                    }
477                    Err(err) => {
478                        warn!(peer = %peer_id.fmt_short(), "failed to add known addrs: {err:?}");
479                    }
480                }
481            }
482        }
483
484        // tell gossip to join
485        self.gossip.join(namespace, peer_ids.clone()).await?;
486
487        if !peer_ids.is_empty() {
488            // trigger initial sync with initial peers
489            for peer in peer_ids {
490                self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
491            }
492        }
493        Ok(())
494    }
495
496    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
497    async fn on_sync_via_connect_finished(
498        &mut self,
499        namespace: NamespaceId,
500        peer: PublicKey,
501        reason: SyncReason,
502        result: Result<SyncFinished, ConnectError>,
503    ) {
504        match result {
505            Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
506                debug!(?reason, "remote abort, already syncing");
507            }
508            res => {
509                self.on_sync_finished(
510                    namespace,
511                    peer,
512                    Origin::Connect(reason),
513                    res.map_err(Into::into),
514                )
515                .await
516            }
517        }
518    }
519
520    #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
521    async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
522        match res {
523            Ok(state) => {
524                self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
525                    .await
526            }
527            Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
528                // In case we aborted the sync: do nothing (our outgoing sync is in progress)
529                debug!(?reason, "aborted by us");
530            }
531            Err(err) => {
532                if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
533                    self.on_sync_finished(
534                        namespace,
535                        peer,
536                        Origin::Accept,
537                        Err(anyhow::Error::from(err)),
538                    )
539                    .await;
540                } else {
541                    debug!(?err, "failed before reading the first message");
542                }
543            }
544        }
545    }
546
547    async fn on_sync_finished(
548        &mut self,
549        namespace: NamespaceId,
550        peer: PublicKey,
551        origin: Origin,
552        result: Result<SyncFinished>,
553    ) {
554        match &result {
555            Err(ref err) => {
556                warn!(?origin, ?err, "sync failed");
557            }
558            Ok(ref details) => {
559                info!(
560                    sent = %details.outcome.num_sent,
561                    recv = %details.outcome.num_recv,
562                    t_connect = ?details.timings.connect,
563                    t_process = ?details.timings.process,
564                    "sync finished",
565                );
566
567                // register the peer as useful for the document
568                if let Err(e) = self
569                    .sync
570                    .register_useful_peer(namespace, *peer.as_bytes())
571                    .await
572                {
573                    debug!(%e, "failed to register peer for document")
574                }
575
576                // broadcast a sync report to our neighbors, but only if we received new entries.
577                if details.outcome.num_recv > 0 {
578                    info!("broadcast sync report to neighbors");
579                    match details
580                        .outcome
581                        .heads_received
582                        .encode(Some(self.gossip.max_message_size()))
583                    {
584                        Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
585                        Ok(heads) => {
586                            let report = SyncReport { namespace, heads };
587                            self.broadcast_neighbors(namespace, &Op::SyncReport(report))
588                                .await;
589                        }
590                    }
591                }
592            }
593        };
594
595        let result_for_event = match &result {
596            Ok(details) => Ok(details.into()),
597            Err(err) => Err(err.to_string()),
598        };
599
600        let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
601            return;
602        };
603
604        let ev = SyncEvent {
605            peer,
606            origin,
607            result: result_for_event,
608            finished: SystemTime::now(),
609            started,
610        };
611        self.subscribers
612            .send(&namespace, Event::SyncFinished(ev))
613            .await;
614
615        // Check if there are queued pending content hashes for this namespace.
616        // If hashes are pending, mark this namespace to be eglible for a PendingContentReady event once all
617        // pending hashes have completed downloading.
618        // If no hashes are pending, emit the PendingContentReady event right away. The next
619        // PendingContentReady event may then only be emitted after the next sync completes.
620        if self.queued_hashes.contains_namespace(&namespace) {
621            self.state.set_may_emit_ready(&namespace, true);
622        } else {
623            self.subscribers
624                .send(&namespace, Event::PendingContentReady)
625                .await;
626            self.state.set_may_emit_ready(&namespace, false);
627        }
628
629        if resync {
630            self.sync_with_peer(namespace, peer, SyncReason::Resync);
631        }
632    }
633
634    async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
635        if !self.state.is_syncing(&namespace) {
636            return;
637        }
638
639        let msg = match postcard::to_stdvec(op) {
640            Ok(msg) => msg,
641            Err(err) => {
642                error!(?err, ?op, "Failed to serialize message:");
643                return;
644            }
645        };
646        // TODO: We should debounce and merge these neighbor announcements likely.
647        self.gossip
648            .broadcast_neighbors(&namespace, msg.into())
649            .await;
650    }
651
652    async fn on_download_ready(
653        &mut self,
654        namespace: NamespaceId,
655        hash: Hash,
656        res: Result<(), anyhow::Error>,
657    ) {
658        let completed_namespaces = self.queued_hashes.remove_hash(&hash);
659        debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
660        if res.is_ok() {
661            self.subscribers
662                .send(&namespace, Event::ContentReady { hash })
663                .await;
664            // Inform our neighbors that we have new content ready.
665            self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
666                .await;
667        } else {
668            self.missing_hashes.insert(hash);
669        }
670        for namespace in completed_namespaces.iter() {
671            if let Some(true) = self.state.may_emit_ready(namespace) {
672                self.subscribers
673                    .send(namespace, Event::PendingContentReady)
674                    .await;
675            }
676        }
677    }
678
679    async fn on_neighbor_content_ready(
680        &mut self,
681        namespace: NamespaceId,
682        node: NodeId,
683        hash: Hash,
684    ) {
685        self.start_download(namespace, hash, node, true).await;
686    }
687
688    #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
689    async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
690        let namespace = report.namespace;
691        if !self.state.is_syncing(&namespace) {
692            return;
693        }
694        let heads = match AuthorHeads::decode(&report.heads) {
695            Ok(heads) => heads,
696            Err(err) => {
697                warn!(?err, "failed to decode AuthorHeads");
698                return;
699            }
700        };
701        match self.sync.has_news_for_us(report.namespace, heads).await {
702            Ok(Some(updated_authors)) => {
703                info!(%updated_authors, "news reported: sync now");
704                self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
705            }
706            Ok(None) => {
707                debug!("no news reported: nothing to do");
708            }
709            Err(err) => {
710                warn!("sync actor error: {err:?}");
711            }
712        }
713    }
714
715    async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
716        match event {
717            crate::Event::LocalInsert { namespace, entry } => {
718                debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
719                // A new entry was inserted locally. Broadcast a gossip message.
720                if self.state.is_syncing(&namespace) {
721                    let op = Op::Put(entry.clone());
722                    let message = postcard::to_stdvec(&op)?.into();
723                    self.gossip.broadcast(&namespace, message).await;
724                }
725            }
726            crate::Event::RemoteInsert {
727                namespace,
728                entry,
729                from,
730                should_download,
731                remote_content_status,
732            } => {
733                debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
734                // A new entry was inserted from initial sync or gossip. Queue downloading the
735                // content.
736                if should_download {
737                    let hash = entry.content_hash();
738                    if matches!(remote_content_status, ContentStatus::Complete) {
739                        let node_id = PublicKey::from_bytes(&from)?;
740                        self.start_download(namespace, hash, node_id, false).await;
741                    } else {
742                        self.missing_hashes.insert(hash);
743                    }
744                }
745            }
746        }
747
748        Ok(())
749    }
750
751    async fn start_download(
752        &mut self,
753        namespace: NamespaceId,
754        hash: Hash,
755        node: PublicKey,
756        only_if_missing: bool,
757    ) {
758        let entry_status = self.bao_store.blobs().status(hash).await;
759        if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
760            self.missing_hashes.remove(&hash);
761            return;
762        }
763        self.hash_providers
764            .0
765            .lock()
766            .expect("poisoned")
767            .entry(hash)
768            .or_default()
769            .insert(node);
770        if self.queued_hashes.contains_hash(&hash) {
771            self.queued_hashes.insert(hash, namespace);
772        } else if !only_if_missing || self.missing_hashes.contains(&hash) {
773            let req = DownloadRequest::new(
774                HashAndFormat::raw(hash),
775                self.hash_providers.clone(),
776                SplitStrategy::None,
777            );
778            let handle = self.downloader.download_with_opts(req);
779
780            self.queued_hashes.insert(hash, namespace);
781            self.missing_hashes.remove(&hash);
782            self.download_tasks
783                .spawn(async move { (namespace, hash, handle.await) });
784        }
785    }
786
787    #[instrument("accept", skip_all)]
788    pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
789        let to_actor_tx = self.sync_actor_tx.clone();
790        let accept_request_cb = move |namespace, peer| {
791            let to_actor_tx = to_actor_tx.clone();
792            async move {
793                let (reply_tx, reply_rx) = oneshot::channel();
794                to_actor_tx
795                    .send(ToLiveActor::AcceptSyncRequest {
796                        namespace,
797                        peer,
798                        reply: reply_tx,
799                    })
800                    .await
801                    .ok();
802                match reply_rx.await {
803                    Ok(outcome) => outcome,
804                    Err(err) => {
805                        warn!(
806                            "accept request callback failed to retrieve reply from actor: {err:?}"
807                        );
808                        AcceptOutcome::Reject(AbortReason::InternalServerError)
809                    }
810                }
811            }
812            .boxed()
813        };
814        debug!("incoming connection");
815        let sync = self.sync.clone();
816        let metrics = self.metrics.clone();
817        self.running_sync_accept.spawn(
818            async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
819                .instrument(Span::current()),
820        );
821    }
822
823    pub fn accept_sync_request(
824        &mut self,
825        namespace: NamespaceId,
826        peer: PublicKey,
827    ) -> AcceptOutcome {
828        self.state
829            .accept_request(&self.endpoint.node_id(), &namespace, peer)
830    }
831}
832
833/// Event emitted when a sync operation completes
834#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
835pub struct SyncEvent {
836    /// Peer we synced with
837    pub peer: PublicKey,
838    /// Origin of the sync exchange
839    pub origin: Origin,
840    /// Timestamp when the sync started
841    pub finished: SystemTime,
842    /// Timestamp when the sync finished
843    pub started: SystemTime,
844    /// Result of the sync operation
845    pub result: std::result::Result<SyncDetails, String>,
846}
847
848#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
849pub struct SyncDetails {
850    /// Number of entries received
851    pub entries_received: usize,
852    /// Number of entries sent
853    pub entries_sent: usize,
854}
855
856impl From<&SyncFinished> for SyncDetails {
857    fn from(value: &SyncFinished) -> Self {
858        Self {
859            entries_received: value.outcome.num_recv,
860            entries_sent: value.outcome.num_sent,
861        }
862    }
863}
864
865#[derive(Debug, Default)]
866struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
867
868impl SubscribersMap {
869    fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
870        self.0.entry(namespace).or_default().subscribe(sender);
871    }
872
873    async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
874        debug!(namespace=%namespace.fmt_short(), %event, "emit event");
875        let Some(subscribers) = self.0.get_mut(namespace) else {
876            return false;
877        };
878
879        if !subscribers.send(event).await {
880            self.0.remove(namespace);
881        }
882        true
883    }
884
885    fn remove(&mut self, namespace: &NamespaceId) {
886        self.0.remove(namespace);
887    }
888
889    fn clear(&mut self) {
890        self.0.clear();
891    }
892}
893
894#[derive(Debug, Default)]
895struct QueuedHashes {
896    by_hash: HashMap<Hash, HashSet<NamespaceId>>,
897    by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
898}
899
900#[derive(Debug, Clone, Default)]
901struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<NodeId>>>>);
902
903impl ContentDiscovery for ProviderNodes {
904    fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<NodeId> {
905        let nodes = self
906            .0
907            .lock()
908            .expect("poisoned")
909            .get(&hash.hash)
910            .into_iter()
911            .flatten()
912            .cloned()
913            .collect::<Vec<_>>();
914        Box::pin(n0_future::stream::iter(nodes))
915    }
916}
917
918impl QueuedHashes {
919    fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
920        self.by_hash.entry(hash).or_default().insert(namespace);
921        self.by_namespace.entry(namespace).or_default().insert(hash);
922    }
923
924    /// Remove a hash from the set of queued hashes.
925    ///
926    /// Returns a list of namespaces that are now complete (have no queued hashes anymore).
927    fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
928        let namespaces = self.by_hash.remove(hash).unwrap_or_default();
929        let mut removed_namespaces = vec![];
930        for namespace in namespaces {
931            if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
932                hashes.remove(hash);
933                if hashes.is_empty() {
934                    self.by_namespace.remove(&namespace);
935                    removed_namespaces.push(namespace);
936                }
937            }
938        }
939        removed_namespaces
940    }
941
942    fn contains_hash(&self, hash: &Hash) -> bool {
943        self.by_hash.contains_key(hash)
944    }
945
946    fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
947        self.by_namespace.contains_key(namespace)
948    }
949}
950
951#[derive(Debug, Default)]
952struct Subscribers(Vec<async_channel::Sender<Event>>);
953
954impl Subscribers {
955    fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
956        self.0.push(sender)
957    }
958
959    async fn send(&mut self, event: Event) -> bool {
960        let futs = self.0.iter().map(|sender| sender.send(event.clone()));
961        let res = futures_buffered::join_all(futs).await;
962        // reverse the order so removing does not shift remaining indices
963        for (i, res) in res.into_iter().enumerate().rev() {
964            if res.is_err() {
965                self.0.remove(i);
966            }
967        }
968        !self.0.is_empty()
969    }
970}
971
972fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
973    match res {
974        Ok(res) => res.peer.fmt_short(),
975        Err(err) => err
976            .peer()
977            .map(|x| x.fmt_short())
978            .unwrap_or_else(|| "unknown".to_string()),
979    }
980}
981
982fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
983    match res {
984        Ok(res) => res.namespace.fmt_short(),
985        Err(err) => err
986            .namespace()
987            .map(|x| x.fmt_short())
988            .unwrap_or_else(|| "unknown".to_string()),
989    }
990}
991
992#[cfg(test)]
993mod tests {
994    use super::*;
995
996    #[tokio::test]
997    async fn test_sync_remove() {
998        let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
999        let (a_tx, a_rx) = async_channel::unbounded();
1000        let (b_tx, b_rx) = async_channel::unbounded();
1001        let mut subscribers = Subscribers::default();
1002        subscribers.subscribe(a_tx);
1003        subscribers.subscribe(b_tx);
1004        drop(a_rx);
1005        drop(b_rx);
1006        subscribers.send(Event::NeighborUp(pk)).await;
1007    }
1008}