iroh_docs/
engine.rs

1//! Handlers and actors to for live syncing replicas.
2//!
3//! [`crate::Replica`] is also called documents here.
4
5use std::sync::{Arc, RwLock};
6
7use anyhow::{bail, Result};
8use futures_lite::{Stream, StreamExt};
9use iroh::{Endpoint, EndpointAddr, PublicKey};
10use iroh_blobs::{
11    api::{blobs::BlobStatus, downloader::Downloader, Store},
12    store::{ProtectCb, ProtectOutcome},
13    Hash,
14};
15use iroh_gossip::net::Gossip;
16use n0_future::task::AbortOnDropHandle;
17use serde::{Deserialize, Serialize};
18use tokio::sync::{mpsc, oneshot};
19use tracing::{debug, error, error_span, Instrument};
20
21use self::live::{LiveActor, ToLiveActor};
22pub use self::{
23    live::SyncEvent,
24    state::{Origin, SyncReason},
25};
26use crate::{
27    actor::SyncHandle, metrics::Metrics, Author, AuthorId, ContentStatus, ContentStatusCallback,
28    Entry, NamespaceId,
29};
30
31mod gossip;
32mod live;
33mod state;
34
35/// Capacity of the channel for the [`ToLiveActor`] messages.
36const ACTOR_CHANNEL_CAP: usize = 64;
37/// Capacity for the channels for [`Engine::subscribe`].
38const SUBSCRIBE_CHANNEL_CAP: usize = 256;
39
40/// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with
41/// peers and a gossip swarm for each syncing document.
42#[derive(derive_more::Debug)]
43pub struct Engine {
44    /// [`Endpoint`] used by the engine.
45    pub endpoint: Endpoint,
46    /// Handle to the actor thread.
47    pub sync: SyncHandle,
48    /// The persistent default author for this engine.
49    pub default_author: DefaultAuthor,
50    to_live_actor: mpsc::Sender<ToLiveActor>,
51    #[allow(dead_code)]
52    actor_handle: AbortOnDropHandle<()>,
53    #[debug("ContentStatusCallback")]
54    content_status_cb: ContentStatusCallback,
55    blob_store: iroh_blobs::api::Store,
56    _gc_protect_task: AbortOnDropHandle<()>,
57}
58
59impl Engine {
60    /// Start the sync engine.
61    ///
62    /// This will spawn two tokio tasks for the live sync coordination and gossip actors, and a
63    /// thread for the actor interacting with doc storage.
64    pub async fn spawn(
65        endpoint: Endpoint,
66        gossip: Gossip,
67        replica_store: crate::store::Store,
68        bao_store: iroh_blobs::api::Store,
69        downloader: Downloader,
70        default_author_storage: DefaultAuthorStorage,
71        protect_cb: Option<ProtectCallbackHandler>,
72    ) -> anyhow::Result<Self> {
73        let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
74        let me = endpoint.id().fmt_short().to_string();
75
76        let content_status_cb: ContentStatusCallback = {
77            let blobs = bao_store.blobs().clone();
78            Arc::new(move |hash: iroh_blobs::Hash| {
79                let blobs = blobs.clone();
80                Box::pin(async move {
81                    let blob_status = blobs.status(hash).await;
82                    entry_to_content_status(blob_status)
83                })
84            })
85        };
86        let sync = SyncHandle::spawn(replica_store, Some(content_status_cb.clone()), me.clone());
87
88        let sync2 = sync.clone();
89        let gc_protect_task = AbortOnDropHandle::new(n0_future::task::spawn(async move {
90            let Some(mut protect_handler) = protect_cb else {
91                return;
92            };
93            while let Some(reply_tx) = protect_handler.0.recv().await {
94                let (tx, rx) = mpsc::channel(64);
95                if let Err(_err) = reply_tx.send(rx) {
96                    continue;
97                }
98                let hashes = match sync2.content_hashes().await {
99                    Ok(hashes) => hashes,
100                    Err(err) => {
101                        debug!("protect task: getting content hashes failed with {err:#}");
102                        if let Err(_err) = tx.send(Err(err)).await {
103                            debug!("protect task: failed to forward error");
104                        }
105                        continue;
106                    }
107                };
108                for hash in hashes {
109                    if let Err(_err) = tx.send(hash).await {
110                        debug!("protect task: failed to forward hash");
111                        break;
112                    }
113                }
114            }
115        }));
116
117        let actor = LiveActor::new(
118            sync.clone(),
119            endpoint.clone(),
120            gossip.clone(),
121            bao_store.clone(),
122            downloader,
123            to_live_actor_recv,
124            live_actor_tx.clone(),
125            sync.metrics().clone(),
126        );
127        let actor_handle = n0_future::task::spawn(
128            async move {
129                if let Err(err) = actor.run().await {
130                    error!("sync actor failed: {err:?}");
131                }
132            }
133            .instrument(error_span!("sync", %me)),
134        );
135
136        let default_author = match DefaultAuthor::load(default_author_storage, &sync).await {
137            Ok(author) => author,
138            Err(err) => {
139                // If loading the default author failed, make sure to shutdown the sync actor before
140                // returning.
141                let _store = sync.shutdown().await.ok();
142                return Err(err);
143            }
144        };
145
146        Ok(Self {
147            endpoint,
148            sync,
149            to_live_actor: live_actor_tx,
150            actor_handle: AbortOnDropHandle::new(actor_handle),
151            content_status_cb,
152            default_author,
153            blob_store: bao_store,
154            _gc_protect_task: gc_protect_task,
155        })
156    }
157
158    /// Get the blob store.
159    pub fn blob_store(&self) -> &Store {
160        &self.blob_store
161    }
162
163    /// Returns the metrics tracked for this engine.
164    pub fn metrics(&self) -> &Arc<Metrics> {
165        self.sync.metrics()
166    }
167
168    /// Start to sync a document.
169    ///
170    /// If `peers` is non-empty, it will both do an initial set-reconciliation sync with each peer,
171    /// and join an iroh-gossip swarm with these peers to receive and broadcast document updates.
172    pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
173        let (reply, reply_rx) = oneshot::channel();
174        self.to_live_actor
175            .send(ToLiveActor::StartSync {
176                namespace,
177                peers,
178                reply,
179            })
180            .await?;
181        reply_rx.await??;
182        Ok(())
183    }
184
185    /// Stop the live sync for a document and leave the gossip swarm.
186    ///
187    /// If `kill_subscribers` is true, all existing event subscribers will be dropped. This means
188    /// they will receive `None` and no further events in case of rejoining the document.
189    pub async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> {
190        let (reply, reply_rx) = oneshot::channel();
191        self.to_live_actor
192            .send(ToLiveActor::Leave {
193                namespace,
194                kill_subscribers,
195                reply,
196            })
197            .await?;
198        reply_rx.await??;
199        Ok(())
200    }
201
202    /// Subscribe to replica and sync progress events.
203    pub async fn subscribe(
204        &self,
205        namespace: NamespaceId,
206    ) -> Result<impl Stream<Item = Result<LiveEvent>> + Unpin + 'static> {
207        // Create a future that sends channel senders to the respective actors.
208        // We clone `self` so that the future does not capture any lifetimes.
209        let content_status_cb = self.content_status_cb.clone();
210
211        // Subscribe to insert events from the replica.
212        let a = {
213            let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
214            self.sync.subscribe(namespace, s).await?;
215            Box::pin(r).then(move |ev| {
216                let content_status_cb = content_status_cb.clone();
217                Box::pin(async move { LiveEvent::from_replica_event(ev, &content_status_cb).await })
218            })
219        };
220
221        // Subscribe to events from the [`live::Actor`].
222        let b = {
223            let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
224            let r = Box::pin(r);
225            let (reply, reply_rx) = oneshot::channel();
226            self.to_live_actor
227                .send(ToLiveActor::Subscribe {
228                    namespace,
229                    sender: s,
230                    reply,
231                })
232                .await?;
233            reply_rx.await??;
234            r.map(|event| Ok(LiveEvent::from(event)))
235        };
236
237        Ok(a.or(b))
238    }
239
240    /// Handle an incoming iroh-docs connection.
241    pub async fn handle_connection(&self, conn: iroh::endpoint::Connection) -> anyhow::Result<()> {
242        self.to_live_actor
243            .send(ToLiveActor::HandleConnection { conn })
244            .await?;
245        Ok(())
246    }
247
248    /// Shutdown the engine.
249    pub async fn shutdown(&self) -> Result<()> {
250        let (reply, reply_rx) = oneshot::channel();
251        self.to_live_actor
252            .send(ToLiveActor::Shutdown { reply })
253            .await?;
254        reply_rx.await?;
255        Ok(())
256    }
257}
258
259/// Converts an [`BlobStatus`] into a ['ContentStatus'].
260fn entry_to_content_status(entry: irpc::Result<BlobStatus>) -> ContentStatus {
261    match entry {
262        Ok(BlobStatus::Complete { .. }) => ContentStatus::Complete,
263        Ok(BlobStatus::Partial { .. }) => ContentStatus::Incomplete,
264        Ok(BlobStatus::NotFound) => ContentStatus::Missing,
265        Err(cause) => {
266            tracing::warn!("Error while checking entry status: {cause:?}");
267            ContentStatus::Missing
268        }
269    }
270}
271
272/// Events informing about actions of the live sync progress.
273#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
274pub enum LiveEvent {
275    /// A local insertion.
276    InsertLocal {
277        /// The inserted entry.
278        entry: Entry,
279    },
280    /// Received a remote insert.
281    InsertRemote {
282        /// The peer that sent us the entry.
283        from: PublicKey,
284        /// The inserted entry.
285        entry: Entry,
286        /// If the content is available at the local node
287        content_status: ContentStatus,
288    },
289    /// The content of an entry was downloaded and is now available at the local node
290    ContentReady {
291        /// The content hash of the newly available entry content
292        hash: Hash,
293    },
294    /// All pending content is now ready.
295    ///
296    /// This event signals that all queued content downloads from the last sync run have either
297    /// completed or failed.
298    ///
299    /// It will only be emitted after a [`Self::SyncFinished`] event, never before.
300    ///
301    /// Receiving this event does not guarantee that all content in the document is available. If
302    /// blobs failed to download, this event will still be emitted after all operations completed.
303    PendingContentReady,
304    /// We have a new neighbor in the swarm.
305    NeighborUp(PublicKey),
306    /// We lost a neighbor in the swarm.
307    NeighborDown(PublicKey),
308    /// A set-reconciliation sync finished.
309    SyncFinished(SyncEvent),
310}
311
312impl From<live::Event> for LiveEvent {
313    fn from(ev: live::Event) -> Self {
314        match ev {
315            live::Event::ContentReady { hash } => Self::ContentReady { hash },
316            live::Event::NeighborUp(peer) => Self::NeighborUp(peer),
317            live::Event::NeighborDown(peer) => Self::NeighborDown(peer),
318            live::Event::SyncFinished(ev) => Self::SyncFinished(ev),
319            live::Event::PendingContentReady => Self::PendingContentReady,
320        }
321    }
322}
323
324impl LiveEvent {
325    async fn from_replica_event(
326        ev: crate::Event,
327        content_status_cb: &ContentStatusCallback,
328    ) -> Result<Self> {
329        Ok(match ev {
330            crate::Event::LocalInsert { entry, .. } => Self::InsertLocal {
331                entry: entry.into(),
332            },
333            crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
334                content_status: content_status_cb(entry.content_hash()).await,
335                entry: entry.into(),
336                from: PublicKey::from_bytes(&from)?,
337            },
338        })
339    }
340}
341
342/// Where to persist the default author.
343///
344/// If set to `Mem`, a new author will be created in the docs store before spawning the sync
345/// engine. Changing the default author will not be persisted.
346///
347/// If set to `Persistent`, the default author will be loaded from and persisted to the specified
348/// path (as hex encoded string of the author's public key).
349#[derive(Debug)]
350pub enum DefaultAuthorStorage {
351    /// Memory storage.
352    Mem,
353    /// File based persistent storage.
354    #[cfg(feature = "fs-store")]
355    Persistent(std::path::PathBuf),
356}
357
358impl DefaultAuthorStorage {
359    /// Load the default author from the storage.
360    ///
361    /// Will create and save a new author if the storage is empty.
362    ///
363    /// Returns an error if the author can't be parsed or if the uathor does not exist in the docs
364    /// store.
365    pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
366        match self {
367            Self::Mem => {
368                let author = Author::new(&mut rand::rng());
369                let author_id = author.id();
370                docs_store.import_author(author).await?;
371                Ok(author_id)
372            }
373            #[cfg(feature = "fs-store")]
374            Self::Persistent(ref path) => {
375                use std::str::FromStr;
376
377                use anyhow::Context;
378                if path.exists() {
379                    let data = tokio::fs::read_to_string(path).await.with_context(|| {
380                        format!(
381                            "Failed to read the default author file at `{}`",
382                            path.to_string_lossy()
383                        )
384                    })?;
385                    let author_id = AuthorId::from_str(&data).with_context(|| {
386                        format!(
387                            "Failed to parse the default author from `{}`",
388                            path.to_string_lossy()
389                        )
390                    })?;
391                    if docs_store.export_author(author_id).await?.is_none() {
392                        bail!("The default author is missing from the docs store. To recover, delete the file `{}`. Then iroh will create a new default author.", path.to_string_lossy())
393                    }
394                    Ok(author_id)
395                } else {
396                    let author = Author::new(&mut rand::rng());
397                    let author_id = author.id();
398                    docs_store.import_author(author).await?;
399                    // Make sure to write the default author to the store
400                    // *before* we write the default author ID file.
401                    // Otherwise the default author ID file is effectively a dangling reference.
402                    docs_store.flush_store().await?;
403                    self.persist(author_id).await?;
404                    Ok(author_id)
405                }
406            }
407        }
408    }
409
410    /// Save a new default author.
411    pub async fn persist(&self, #[allow(unused)] author_id: AuthorId) -> anyhow::Result<()> {
412        match self {
413            Self::Mem => {
414                // persistence is not possible for the mem storage so this is a noop.
415            }
416            #[cfg(feature = "fs-store")]
417            Self::Persistent(ref path) => {
418                use anyhow::Context;
419                tokio::fs::write(path, author_id.to_string())
420                    .await
421                    .with_context(|| {
422                        format!(
423                            "Failed to write the default author to `{}`",
424                            path.to_string_lossy()
425                        )
426                    })?;
427            }
428        }
429        Ok(())
430    }
431}
432
433/// Persistent default author for a docs engine.
434#[derive(Debug)]
435pub struct DefaultAuthor {
436    value: RwLock<AuthorId>,
437    storage: DefaultAuthorStorage,
438}
439
440impl DefaultAuthor {
441    /// Load the default author from storage.
442    ///
443    /// If the storage is empty creates a new author and persists it.
444    pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
445        let value = storage.load(docs_store).await?;
446        Ok(Self {
447            value: RwLock::new(value),
448            storage,
449        })
450    }
451
452    /// Get the current default author.
453    pub fn get(&self) -> AuthorId {
454        *self.value.read().unwrap()
455    }
456
457    /// Set the default author.
458    pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
459        if docs_store.export_author(author_id).await?.is_none() {
460            bail!("The author does not exist");
461        }
462        self.storage.persist(author_id).await?;
463        *self.value.write().unwrap() = author_id;
464        Ok(())
465    }
466}
467
468#[derive(Debug)]
469struct ProtectCallbackSender(mpsc::Sender<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>);
470
471/// The handler for a blobs protection callback.
472///
473/// See [`ProtectCallbackHandler::new`].
474#[derive(Debug)]
475pub struct ProtectCallbackHandler(
476    pub(crate) mpsc::Receiver<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>,
477);
478
479impl ProtectCallbackHandler {
480    /// Creates a callback and handler to manage blob protection.
481    ///
482    /// The returned [`ProtectCb`] must be passed set in the [`GcConfig`] of the [`iroh_blobs`] store where
483    /// the blobs for hashes in documents are persisted. The [`ProtectCallbackHandler`] must be passed to
484    /// [`Builder::protect_handler`] (or [`Engine::spawn`]). This will then ensure that hashes referenced
485    /// in docs will not be deleted from the blobs store, and will be garbage collected if they no longer appear
486    /// in any doc.
487    ///
488    /// [`Builder::protect_handler`]: crate::protocol::Builder::protect_handler
489    /// [`GcConfig`]: iroh_blobs::store::GcConfig
490    pub fn new() -> (Self, ProtectCb) {
491        let (tx, rx) = mpsc::channel(4);
492        let cb = ProtectCallbackSender(tx).into_cb();
493        let handler = ProtectCallbackHandler(rx);
494        (handler, cb)
495    }
496}
497
498impl ProtectCallbackSender {
499    fn into_cb(self) -> ProtectCb {
500        let start_tx = self.0.clone();
501        Arc::new(move |live| {
502            let start_tx = start_tx.clone();
503            Box::pin(async move {
504                let (tx, rx) = oneshot::channel();
505                if let Err(_err) = start_tx.send(tx).await {
506                    tracing::warn!("Failed to get protected hashes from docs: ProtectCallback receiver dropped");
507                    return ProtectOutcome::Abort;
508                }
509                let mut rx = match rx.await {
510                    Ok(rx) => rx,
511                    Err(_err) => {
512                        tracing::warn!("Failed to get protected hashes from docs: ProtectCallback sender dropped");
513                        return ProtectOutcome::Abort;
514                    }
515                };
516                while let Some(res) = rx.recv().await {
517                    match res {
518                        Err(err) => {
519                            tracing::warn!("Getting protected hashes produces error: {err:#}");
520                            return ProtectOutcome::Abort;
521                        }
522                        Ok(hash) => {
523                            live.insert(hash);
524                        }
525                    }
526                }
527                ProtectOutcome::Continue
528            })
529        })
530    }
531}