1use std::sync::{Arc, RwLock};
6
7use anyhow::{bail, Result};
8use futures_lite::{Stream, StreamExt};
9use iroh::{Endpoint, EndpointAddr, PublicKey};
10use iroh_blobs::{
11 api::{blobs::BlobStatus, downloader::Downloader, Store},
12 store::{ProtectCb, ProtectOutcome},
13 Hash,
14};
15use iroh_gossip::net::Gossip;
16use n0_future::task::AbortOnDropHandle;
17use serde::{Deserialize, Serialize};
18use tokio::sync::{mpsc, oneshot};
19use tracing::{debug, error, error_span, Instrument};
20
21use self::live::{LiveActor, ToLiveActor};
22pub use self::{
23 live::SyncEvent,
24 state::{Origin, SyncReason},
25};
26use crate::{
27 actor::SyncHandle, metrics::Metrics, Author, AuthorId, ContentStatus, ContentStatusCallback,
28 Entry, NamespaceId,
29};
30
31mod gossip;
32mod live;
33mod state;
34
35const ACTOR_CHANNEL_CAP: usize = 64;
37const SUBSCRIBE_CHANNEL_CAP: usize = 256;
39
40#[derive(derive_more::Debug)]
43pub struct Engine {
44 pub endpoint: Endpoint,
46 pub sync: SyncHandle,
48 pub default_author: DefaultAuthor,
50 to_live_actor: mpsc::Sender<ToLiveActor>,
51 #[allow(dead_code)]
52 actor_handle: AbortOnDropHandle<()>,
53 #[debug("ContentStatusCallback")]
54 content_status_cb: ContentStatusCallback,
55 blob_store: iroh_blobs::api::Store,
56 _gc_protect_task: AbortOnDropHandle<()>,
57}
58
59impl Engine {
60 pub async fn spawn(
65 endpoint: Endpoint,
66 gossip: Gossip,
67 replica_store: crate::store::Store,
68 bao_store: iroh_blobs::api::Store,
69 downloader: Downloader,
70 default_author_storage: DefaultAuthorStorage,
71 protect_cb: Option<ProtectCallbackHandler>,
72 ) -> anyhow::Result<Self> {
73 let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
74 let me = endpoint.id().fmt_short().to_string();
75
76 let content_status_cb: ContentStatusCallback = {
77 let blobs = bao_store.blobs().clone();
78 Arc::new(move |hash: iroh_blobs::Hash| {
79 let blobs = blobs.clone();
80 Box::pin(async move {
81 let blob_status = blobs.status(hash).await;
82 entry_to_content_status(blob_status)
83 })
84 })
85 };
86 let sync = SyncHandle::spawn(replica_store, Some(content_status_cb.clone()), me.clone());
87
88 let sync2 = sync.clone();
89 let gc_protect_task = AbortOnDropHandle::new(n0_future::task::spawn(async move {
90 let Some(mut protect_handler) = protect_cb else {
91 return;
92 };
93 while let Some(reply_tx) = protect_handler.0.recv().await {
94 let (tx, rx) = mpsc::channel(64);
95 if let Err(_err) = reply_tx.send(rx) {
96 continue;
97 }
98 let hashes = match sync2.content_hashes().await {
99 Ok(hashes) => hashes,
100 Err(err) => {
101 debug!("protect task: getting content hashes failed with {err:#}");
102 if let Err(_err) = tx.send(Err(err)).await {
103 debug!("protect task: failed to forward error");
104 }
105 continue;
106 }
107 };
108 for hash in hashes {
109 if let Err(_err) = tx.send(hash).await {
110 debug!("protect task: failed to forward hash");
111 break;
112 }
113 }
114 }
115 }));
116
117 let actor = LiveActor::new(
118 sync.clone(),
119 endpoint.clone(),
120 gossip.clone(),
121 bao_store.clone(),
122 downloader,
123 to_live_actor_recv,
124 live_actor_tx.clone(),
125 sync.metrics().clone(),
126 );
127 let actor_handle = n0_future::task::spawn(
128 async move {
129 if let Err(err) = actor.run().await {
130 error!("sync actor failed: {err:?}");
131 }
132 }
133 .instrument(error_span!("sync", %me)),
134 );
135
136 let default_author = match DefaultAuthor::load(default_author_storage, &sync).await {
137 Ok(author) => author,
138 Err(err) => {
139 let _store = sync.shutdown().await.ok();
142 return Err(err);
143 }
144 };
145
146 Ok(Self {
147 endpoint,
148 sync,
149 to_live_actor: live_actor_tx,
150 actor_handle: AbortOnDropHandle::new(actor_handle),
151 content_status_cb,
152 default_author,
153 blob_store: bao_store,
154 _gc_protect_task: gc_protect_task,
155 })
156 }
157
158 pub fn blob_store(&self) -> &Store {
160 &self.blob_store
161 }
162
163 pub fn metrics(&self) -> &Arc<Metrics> {
165 self.sync.metrics()
166 }
167
168 pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
173 let (reply, reply_rx) = oneshot::channel();
174 self.to_live_actor
175 .send(ToLiveActor::StartSync {
176 namespace,
177 peers,
178 reply,
179 })
180 .await?;
181 reply_rx.await??;
182 Ok(())
183 }
184
185 pub async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> {
190 let (reply, reply_rx) = oneshot::channel();
191 self.to_live_actor
192 .send(ToLiveActor::Leave {
193 namespace,
194 kill_subscribers,
195 reply,
196 })
197 .await?;
198 reply_rx.await??;
199 Ok(())
200 }
201
202 pub async fn subscribe(
204 &self,
205 namespace: NamespaceId,
206 ) -> Result<impl Stream<Item = Result<LiveEvent>> + Unpin + 'static> {
207 let content_status_cb = self.content_status_cb.clone();
210
211 let a = {
213 let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
214 self.sync.subscribe(namespace, s).await?;
215 Box::pin(r).then(move |ev| {
216 let content_status_cb = content_status_cb.clone();
217 Box::pin(async move { LiveEvent::from_replica_event(ev, &content_status_cb).await })
218 })
219 };
220
221 let b = {
223 let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
224 let r = Box::pin(r);
225 let (reply, reply_rx) = oneshot::channel();
226 self.to_live_actor
227 .send(ToLiveActor::Subscribe {
228 namespace,
229 sender: s,
230 reply,
231 })
232 .await?;
233 reply_rx.await??;
234 r.map(|event| Ok(LiveEvent::from(event)))
235 };
236
237 Ok(a.or(b))
238 }
239
240 pub async fn handle_connection(&self, conn: iroh::endpoint::Connection) -> anyhow::Result<()> {
242 self.to_live_actor
243 .send(ToLiveActor::HandleConnection { conn })
244 .await?;
245 Ok(())
246 }
247
248 pub async fn shutdown(&self) -> Result<()> {
250 let (reply, reply_rx) = oneshot::channel();
251 self.to_live_actor
252 .send(ToLiveActor::Shutdown { reply })
253 .await?;
254 reply_rx.await?;
255 Ok(())
256 }
257}
258
259fn entry_to_content_status(entry: irpc::Result<BlobStatus>) -> ContentStatus {
261 match entry {
262 Ok(BlobStatus::Complete { .. }) => ContentStatus::Complete,
263 Ok(BlobStatus::Partial { .. }) => ContentStatus::Incomplete,
264 Ok(BlobStatus::NotFound) => ContentStatus::Missing,
265 Err(cause) => {
266 tracing::warn!("Error while checking entry status: {cause:?}");
267 ContentStatus::Missing
268 }
269 }
270}
271
272#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
274pub enum LiveEvent {
275 InsertLocal {
277 entry: Entry,
279 },
280 InsertRemote {
282 from: PublicKey,
284 entry: Entry,
286 content_status: ContentStatus,
288 },
289 ContentReady {
291 hash: Hash,
293 },
294 PendingContentReady,
304 NeighborUp(PublicKey),
306 NeighborDown(PublicKey),
308 SyncFinished(SyncEvent),
310}
311
312impl From<live::Event> for LiveEvent {
313 fn from(ev: live::Event) -> Self {
314 match ev {
315 live::Event::ContentReady { hash } => Self::ContentReady { hash },
316 live::Event::NeighborUp(peer) => Self::NeighborUp(peer),
317 live::Event::NeighborDown(peer) => Self::NeighborDown(peer),
318 live::Event::SyncFinished(ev) => Self::SyncFinished(ev),
319 live::Event::PendingContentReady => Self::PendingContentReady,
320 }
321 }
322}
323
324impl LiveEvent {
325 async fn from_replica_event(
326 ev: crate::Event,
327 content_status_cb: &ContentStatusCallback,
328 ) -> Result<Self> {
329 Ok(match ev {
330 crate::Event::LocalInsert { entry, .. } => Self::InsertLocal {
331 entry: entry.into(),
332 },
333 crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
334 content_status: content_status_cb(entry.content_hash()).await,
335 entry: entry.into(),
336 from: PublicKey::from_bytes(&from)?,
337 },
338 })
339 }
340}
341
342#[derive(Debug)]
350pub enum DefaultAuthorStorage {
351 Mem,
353 #[cfg(feature = "fs-store")]
355 Persistent(std::path::PathBuf),
356}
357
358impl DefaultAuthorStorage {
359 pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
366 match self {
367 Self::Mem => {
368 let author = Author::new(&mut rand::rng());
369 let author_id = author.id();
370 docs_store.import_author(author).await?;
371 Ok(author_id)
372 }
373 #[cfg(feature = "fs-store")]
374 Self::Persistent(ref path) => {
375 use std::str::FromStr;
376
377 use anyhow::Context;
378 if path.exists() {
379 let data = tokio::fs::read_to_string(path).await.with_context(|| {
380 format!(
381 "Failed to read the default author file at `{}`",
382 path.to_string_lossy()
383 )
384 })?;
385 let author_id = AuthorId::from_str(&data).with_context(|| {
386 format!(
387 "Failed to parse the default author from `{}`",
388 path.to_string_lossy()
389 )
390 })?;
391 if docs_store.export_author(author_id).await?.is_none() {
392 bail!("The default author is missing from the docs store. To recover, delete the file `{}`. Then iroh will create a new default author.", path.to_string_lossy())
393 }
394 Ok(author_id)
395 } else {
396 let author = Author::new(&mut rand::rng());
397 let author_id = author.id();
398 docs_store.import_author(author).await?;
399 docs_store.flush_store().await?;
403 self.persist(author_id).await?;
404 Ok(author_id)
405 }
406 }
407 }
408 }
409
410 pub async fn persist(&self, #[allow(unused)] author_id: AuthorId) -> anyhow::Result<()> {
412 match self {
413 Self::Mem => {
414 }
416 #[cfg(feature = "fs-store")]
417 Self::Persistent(ref path) => {
418 use anyhow::Context;
419 tokio::fs::write(path, author_id.to_string())
420 .await
421 .with_context(|| {
422 format!(
423 "Failed to write the default author to `{}`",
424 path.to_string_lossy()
425 )
426 })?;
427 }
428 }
429 Ok(())
430 }
431}
432
433#[derive(Debug)]
435pub struct DefaultAuthor {
436 value: RwLock<AuthorId>,
437 storage: DefaultAuthorStorage,
438}
439
440impl DefaultAuthor {
441 pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
445 let value = storage.load(docs_store).await?;
446 Ok(Self {
447 value: RwLock::new(value),
448 storage,
449 })
450 }
451
452 pub fn get(&self) -> AuthorId {
454 *self.value.read().unwrap()
455 }
456
457 pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
459 if docs_store.export_author(author_id).await?.is_none() {
460 bail!("The author does not exist");
461 }
462 self.storage.persist(author_id).await?;
463 *self.value.write().unwrap() = author_id;
464 Ok(())
465 }
466}
467
468#[derive(Debug)]
469struct ProtectCallbackSender(mpsc::Sender<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>);
470
471#[derive(Debug)]
475pub struct ProtectCallbackHandler(
476 pub(crate) mpsc::Receiver<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>,
477);
478
479impl ProtectCallbackHandler {
480 pub fn new() -> (Self, ProtectCb) {
491 let (tx, rx) = mpsc::channel(4);
492 let cb = ProtectCallbackSender(tx).into_cb();
493 let handler = ProtectCallbackHandler(rx);
494 (handler, cb)
495 }
496}
497
498impl ProtectCallbackSender {
499 fn into_cb(self) -> ProtectCb {
500 let start_tx = self.0.clone();
501 Arc::new(move |live| {
502 let start_tx = start_tx.clone();
503 Box::pin(async move {
504 let (tx, rx) = oneshot::channel();
505 if let Err(_err) = start_tx.send(tx).await {
506 tracing::warn!("Failed to get protected hashes from docs: ProtectCallback receiver dropped");
507 return ProtectOutcome::Abort;
508 }
509 let mut rx = match rx.await {
510 Ok(rx) => rx,
511 Err(_err) => {
512 tracing::warn!("Failed to get protected hashes from docs: ProtectCallback sender dropped");
513 return ProtectOutcome::Abort;
514 }
515 };
516 while let Some(res) = rx.recv().await {
517 match res {
518 Err(err) => {
519 tracing::warn!("Getting protected hashes produces error: {err:#}");
520 return ProtectOutcome::Abort;
521 }
522 Ok(hash) => {
523 live.insert(hash);
524 }
525 }
526 }
527 ProtectOutcome::Continue
528 })
529 })
530 }
531}