iroh_docs/
engine.rs

1//! Handlers and actors to for live syncing replicas.
2//!
3//! [`crate::Replica`] is also called documents here.
4
5use std::sync::{Arc, RwLock};
6
7use anyhow::{bail, Result};
8use iroh::{Endpoint, EndpointAddr, PublicKey};
9use iroh_blobs::{
10    api::{blobs::BlobStatus, downloader::Downloader, Store},
11    store::{ProtectCb, ProtectOutcome},
12    Hash,
13};
14use iroh_gossip::net::Gossip;
15use n0_future::{task::AbortOnDropHandle, Stream, StreamExt};
16use serde::{Deserialize, Serialize};
17use tokio::sync::{mpsc, oneshot};
18use tracing::{debug, error, error_span, Instrument};
19
20use self::live::{LiveActor, ToLiveActor};
21pub use self::{
22    live::SyncEvent,
23    state::{Origin, SyncReason},
24};
25use crate::{
26    actor::SyncHandle, metrics::Metrics, Author, AuthorId, ContentStatus, ContentStatusCallback,
27    Entry, NamespaceId,
28};
29
30mod gossip;
31mod live;
32mod state;
33
34/// Capacity of the channel for the [`ToLiveActor`] messages.
35const ACTOR_CHANNEL_CAP: usize = 64;
36/// Capacity for the channels for [`Engine::subscribe`].
37const SUBSCRIBE_CHANNEL_CAP: usize = 256;
38
39/// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with
40/// peers and a gossip swarm for each syncing document.
41#[derive(derive_more::Debug)]
42pub struct Engine {
43    /// [`Endpoint`] used by the engine.
44    pub endpoint: Endpoint,
45    /// Handle to the actor thread.
46    pub sync: SyncHandle,
47    /// The persistent default author for this engine.
48    pub default_author: DefaultAuthor,
49    to_live_actor: mpsc::Sender<ToLiveActor>,
50    #[allow(dead_code)]
51    actor_handle: AbortOnDropHandle<()>,
52    #[debug("ContentStatusCallback")]
53    content_status_cb: ContentStatusCallback,
54    blob_store: iroh_blobs::api::Store,
55    _gc_protect_task: AbortOnDropHandle<()>,
56}
57
58impl Engine {
59    /// Start the sync engine.
60    ///
61    /// This will spawn two tokio tasks for the live sync coordination and gossip actors, and a
62    /// thread for the actor interacting with doc storage.
63    pub async fn spawn(
64        endpoint: Endpoint,
65        gossip: Gossip,
66        replica_store: crate::store::Store,
67        bao_store: iroh_blobs::api::Store,
68        downloader: Downloader,
69        default_author_storage: DefaultAuthorStorage,
70        protect_cb: Option<ProtectCallbackHandler>,
71    ) -> anyhow::Result<Self> {
72        let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
73        let me = endpoint.id().fmt_short().to_string();
74
75        let content_status_cb: ContentStatusCallback = {
76            let blobs = bao_store.blobs().clone();
77            Arc::new(move |hash: iroh_blobs::Hash| {
78                let blobs = blobs.clone();
79                Box::pin(async move {
80                    let blob_status = blobs.status(hash).await;
81                    entry_to_content_status(blob_status)
82                })
83            })
84        };
85        let sync = SyncHandle::spawn(replica_store, Some(content_status_cb.clone()), me.clone());
86
87        let sync2 = sync.clone();
88        let gc_protect_task = AbortOnDropHandle::new(n0_future::task::spawn(async move {
89            let Some(mut protect_handler) = protect_cb else {
90                return;
91            };
92            while let Some(reply_tx) = protect_handler.0.recv().await {
93                let (tx, rx) = mpsc::channel(64);
94                if let Err(_err) = reply_tx.send(rx) {
95                    continue;
96                }
97                let hashes = match sync2.content_hashes().await {
98                    Ok(hashes) => hashes,
99                    Err(err) => {
100                        debug!("protect task: getting content hashes failed with {err:#}");
101                        if let Err(_err) = tx.send(Err(err)).await {
102                            debug!("protect task: failed to forward error");
103                        }
104                        continue;
105                    }
106                };
107                for hash in hashes {
108                    if let Err(_err) = tx.send(hash).await {
109                        debug!("protect task: failed to forward hash");
110                        break;
111                    }
112                }
113            }
114        }));
115
116        let actor = LiveActor::new(
117            sync.clone(),
118            endpoint.clone(),
119            gossip.clone(),
120            bao_store.clone(),
121            downloader,
122            to_live_actor_recv,
123            live_actor_tx.clone(),
124            sync.metrics().clone(),
125        )?;
126        let actor_handle = n0_future::task::spawn(
127            async move {
128                if let Err(err) = actor.run().await {
129                    error!("sync actor failed: {err:?}");
130                }
131            }
132            .instrument(error_span!("sync", %me)),
133        );
134
135        let default_author = match DefaultAuthor::load(default_author_storage, &sync).await {
136            Ok(author) => author,
137            Err(err) => {
138                // If loading the default author failed, make sure to shutdown the sync actor before
139                // returning.
140                let _store = sync.shutdown().await.ok();
141                return Err(err);
142            }
143        };
144
145        Ok(Self {
146            endpoint,
147            sync,
148            to_live_actor: live_actor_tx,
149            actor_handle: AbortOnDropHandle::new(actor_handle),
150            content_status_cb,
151            default_author,
152            blob_store: bao_store,
153            _gc_protect_task: gc_protect_task,
154        })
155    }
156
157    /// Get the blob store.
158    pub fn blob_store(&self) -> &Store {
159        &self.blob_store
160    }
161
162    /// Returns the metrics tracked for this engine.
163    pub fn metrics(&self) -> &Arc<Metrics> {
164        self.sync.metrics()
165    }
166
167    /// Start to sync a document.
168    ///
169    /// If `peers` is non-empty, it will both do an initial set-reconciliation sync with each peer,
170    /// and join an iroh-gossip swarm with these peers to receive and broadcast document updates.
171    pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
172        let (reply, reply_rx) = oneshot::channel();
173        self.to_live_actor
174            .send(ToLiveActor::StartSync {
175                namespace,
176                peers,
177                reply,
178            })
179            .await?;
180        reply_rx.await??;
181        Ok(())
182    }
183
184    /// Stop the live sync for a document and leave the gossip swarm.
185    ///
186    /// If `kill_subscribers` is true, all existing event subscribers will be dropped. This means
187    /// they will receive `None` and no further events in case of rejoining the document.
188    pub async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> {
189        let (reply, reply_rx) = oneshot::channel();
190        self.to_live_actor
191            .send(ToLiveActor::Leave {
192                namespace,
193                kill_subscribers,
194                reply,
195            })
196            .await?;
197        reply_rx.await??;
198        Ok(())
199    }
200
201    /// Subscribe to replica and sync progress events.
202    pub async fn subscribe(
203        &self,
204        namespace: NamespaceId,
205    ) -> Result<impl Stream<Item = Result<LiveEvent>> + Unpin + 'static> {
206        // Create a future that sends channel senders to the respective actors.
207        // We clone `self` so that the future does not capture any lifetimes.
208        let content_status_cb = self.content_status_cb.clone();
209
210        // Subscribe to insert events from the replica.
211        let a = {
212            let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
213            self.sync.subscribe(namespace, s).await?;
214            Box::pin(r).then(move |ev| {
215                let content_status_cb = content_status_cb.clone();
216                Box::pin(async move { LiveEvent::from_replica_event(ev, &content_status_cb).await })
217            })
218        };
219
220        // Subscribe to events from the [`live::Actor`].
221        let b = {
222            let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
223            let r = Box::pin(r);
224            let (reply, reply_rx) = oneshot::channel();
225            self.to_live_actor
226                .send(ToLiveActor::Subscribe {
227                    namespace,
228                    sender: s,
229                    reply,
230                })
231                .await?;
232            reply_rx.await??;
233            r.map(|event| Ok(LiveEvent::from(event)))
234        };
235
236        Ok(a.or(b))
237    }
238
239    /// Handle an incoming iroh-docs connection.
240    pub async fn handle_connection(&self, conn: iroh::endpoint::Connection) -> anyhow::Result<()> {
241        self.to_live_actor
242            .send(ToLiveActor::HandleConnection { conn })
243            .await?;
244        Ok(())
245    }
246
247    /// Shutdown the engine.
248    pub async fn shutdown(&self) -> Result<()> {
249        let (reply, reply_rx) = oneshot::channel();
250        self.to_live_actor
251            .send(ToLiveActor::Shutdown { reply })
252            .await?;
253        reply_rx.await?;
254        Ok(())
255    }
256}
257
258/// Converts an [`BlobStatus`] into a ['ContentStatus'].
259fn entry_to_content_status(entry: irpc::Result<BlobStatus>) -> ContentStatus {
260    match entry {
261        Ok(BlobStatus::Complete { .. }) => ContentStatus::Complete,
262        Ok(BlobStatus::Partial { .. }) => ContentStatus::Incomplete,
263        Ok(BlobStatus::NotFound) => ContentStatus::Missing,
264        Err(cause) => {
265            tracing::warn!("Error while checking entry status: {cause:?}");
266            ContentStatus::Missing
267        }
268    }
269}
270
271/// Events informing about actions of the live sync progress.
272#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
273pub enum LiveEvent {
274    /// A local insertion.
275    InsertLocal {
276        /// The inserted entry.
277        entry: Entry,
278    },
279    /// Received a remote insert.
280    InsertRemote {
281        /// The peer that sent us the entry.
282        from: PublicKey,
283        /// The inserted entry.
284        entry: Entry,
285        /// If the content is available at the local node
286        content_status: ContentStatus,
287    },
288    /// The content of an entry was downloaded and is now available at the local node
289    ContentReady {
290        /// The content hash of the newly available entry content
291        hash: Hash,
292    },
293    /// All pending content is now ready.
294    ///
295    /// This event signals that all queued content downloads from the last sync run have either
296    /// completed or failed.
297    ///
298    /// It will only be emitted after a [`Self::SyncFinished`] event, never before.
299    ///
300    /// Receiving this event does not guarantee that all content in the document is available. If
301    /// blobs failed to download, this event will still be emitted after all operations completed.
302    PendingContentReady,
303    /// We have a new neighbor in the swarm.
304    NeighborUp(PublicKey),
305    /// We lost a neighbor in the swarm.
306    NeighborDown(PublicKey),
307    /// A set-reconciliation sync finished.
308    SyncFinished(SyncEvent),
309}
310
311impl From<live::Event> for LiveEvent {
312    fn from(ev: live::Event) -> Self {
313        match ev {
314            live::Event::ContentReady { hash } => Self::ContentReady { hash },
315            live::Event::NeighborUp(peer) => Self::NeighborUp(peer),
316            live::Event::NeighborDown(peer) => Self::NeighborDown(peer),
317            live::Event::SyncFinished(ev) => Self::SyncFinished(ev),
318            live::Event::PendingContentReady => Self::PendingContentReady,
319        }
320    }
321}
322
323impl LiveEvent {
324    async fn from_replica_event(
325        ev: crate::Event,
326        content_status_cb: &ContentStatusCallback,
327    ) -> Result<Self> {
328        Ok(match ev {
329            crate::Event::LocalInsert { entry, .. } => Self::InsertLocal {
330                entry: entry.into(),
331            },
332            crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
333                content_status: content_status_cb(entry.content_hash()).await,
334                entry: entry.into(),
335                from: PublicKey::from_bytes(&from)?,
336            },
337        })
338    }
339}
340
341/// Where to persist the default author.
342///
343/// If set to `Mem`, a new author will be created in the docs store before spawning the sync
344/// engine. Changing the default author will not be persisted.
345///
346/// If set to `Persistent`, the default author will be loaded from and persisted to the specified
347/// path (as hex encoded string of the author's public key).
348#[derive(Debug)]
349pub enum DefaultAuthorStorage {
350    /// Memory storage.
351    Mem,
352    /// File based persistent storage.
353    #[cfg(feature = "fs-store")]
354    Persistent(std::path::PathBuf),
355}
356
357impl DefaultAuthorStorage {
358    /// Load the default author from the storage.
359    ///
360    /// Will create and save a new author if the storage is empty.
361    ///
362    /// Returns an error if the author can't be parsed or if the uathor does not exist in the docs
363    /// store.
364    pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
365        match self {
366            Self::Mem => {
367                let author = Author::new(&mut rand::rng());
368                let author_id = author.id();
369                docs_store.import_author(author).await?;
370                Ok(author_id)
371            }
372            #[cfg(feature = "fs-store")]
373            Self::Persistent(ref path) => {
374                use std::str::FromStr;
375
376                use anyhow::Context;
377                if path.exists() {
378                    let data = tokio::fs::read_to_string(path).await.with_context(|| {
379                        format!(
380                            "Failed to read the default author file at `{}`",
381                            path.to_string_lossy()
382                        )
383                    })?;
384                    let author_id = AuthorId::from_str(&data).with_context(|| {
385                        format!(
386                            "Failed to parse the default author from `{}`",
387                            path.to_string_lossy()
388                        )
389                    })?;
390                    if docs_store.export_author(author_id).await?.is_none() {
391                        bail!(
392                            "The default author is missing from the docs store. To recover, delete the file `{}`. Then iroh will create a new default author.",
393                            path.to_string_lossy()
394                        )
395                    }
396                    Ok(author_id)
397                } else {
398                    let author = Author::new(&mut rand::rng());
399                    let author_id = author.id();
400                    docs_store.import_author(author).await?;
401                    // Make sure to write the default author to the store
402                    // *before* we write the default author ID file.
403                    // Otherwise the default author ID file is effectively a dangling reference.
404                    docs_store.flush_store().await?;
405                    self.persist(author_id).await?;
406                    Ok(author_id)
407                }
408            }
409        }
410    }
411
412    /// Save a new default author.
413    pub async fn persist(&self, #[allow(unused)] author_id: AuthorId) -> anyhow::Result<()> {
414        match self {
415            Self::Mem => {
416                // persistence is not possible for the mem storage so this is a noop.
417            }
418            #[cfg(feature = "fs-store")]
419            Self::Persistent(ref path) => {
420                use anyhow::Context;
421                tokio::fs::write(path, author_id.to_string())
422                    .await
423                    .with_context(|| {
424                        format!(
425                            "Failed to write the default author to `{}`",
426                            path.to_string_lossy()
427                        )
428                    })?;
429            }
430        }
431        Ok(())
432    }
433}
434
435/// Persistent default author for a docs engine.
436#[derive(Debug)]
437pub struct DefaultAuthor {
438    value: RwLock<AuthorId>,
439    storage: DefaultAuthorStorage,
440}
441
442impl DefaultAuthor {
443    /// Load the default author from storage.
444    ///
445    /// If the storage is empty creates a new author and persists it.
446    pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
447        let value = storage.load(docs_store).await?;
448        Ok(Self {
449            value: RwLock::new(value),
450            storage,
451        })
452    }
453
454    /// Get the current default author.
455    pub fn get(&self) -> AuthorId {
456        *self.value.read().unwrap()
457    }
458
459    /// Set the default author.
460    pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
461        if docs_store.export_author(author_id).await?.is_none() {
462            bail!("The author does not exist");
463        }
464        self.storage.persist(author_id).await?;
465        *self.value.write().unwrap() = author_id;
466        Ok(())
467    }
468}
469
470#[derive(Debug)]
471struct ProtectCallbackSender(mpsc::Sender<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>);
472
473/// The handler for a blobs protection callback.
474///
475/// See [`ProtectCallbackHandler::new`].
476#[derive(Debug)]
477pub struct ProtectCallbackHandler(
478    pub(crate) mpsc::Receiver<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>,
479);
480
481impl ProtectCallbackHandler {
482    /// Creates a callback and handler to manage blob protection.
483    ///
484    /// The returned [`ProtectCb`] must be passed set in the [`GcConfig`] of the [`iroh_blobs`] store where
485    /// the blobs for hashes in documents are persisted. The [`ProtectCallbackHandler`] must be passed to
486    /// [`Builder::protect_handler`] (or [`Engine::spawn`]). This will then ensure that hashes referenced
487    /// in docs will not be deleted from the blobs store, and will be garbage collected if they no longer appear
488    /// in any doc.
489    ///
490    /// [`Builder::protect_handler`]: crate::protocol::Builder::protect_handler
491    /// [`GcConfig`]: iroh_blobs::store::GcConfig
492    pub fn new() -> (Self, ProtectCb) {
493        let (tx, rx) = mpsc::channel(4);
494        let cb = ProtectCallbackSender(tx).into_cb();
495        let handler = ProtectCallbackHandler(rx);
496        (handler, cb)
497    }
498}
499
500impl ProtectCallbackSender {
501    fn into_cb(self) -> ProtectCb {
502        let start_tx = self.0.clone();
503        Arc::new(move |live| {
504            let start_tx = start_tx.clone();
505            Box::pin(async move {
506                let (tx, rx) = oneshot::channel();
507                if let Err(_err) = start_tx.send(tx).await {
508                    tracing::warn!(
509                        "Failed to get protected hashes from docs: ProtectCallback receiver dropped"
510                    );
511                    return ProtectOutcome::Abort;
512                }
513                let mut rx = match rx.await {
514                    Ok(rx) => rx,
515                    Err(_err) => {
516                        tracing::warn!(
517                            "Failed to get protected hashes from docs: ProtectCallback sender dropped"
518                        );
519                        return ProtectOutcome::Abort;
520                    }
521                };
522                while let Some(res) = rx.recv().await {
523                    match res {
524                        Err(err) => {
525                            tracing::warn!("Getting protected hashes produces error: {err:#}");
526                            return ProtectOutcome::Abort;
527                        }
528                        Ok(hash) => {
529                            live.insert(hash);
530                        }
531                    }
532                }
533                ProtectOutcome::Continue
534            })
535        })
536    }
537}