1use std::sync::{Arc, RwLock};
6
7use anyhow::{bail, Result};
8use iroh::{Endpoint, EndpointAddr, PublicKey};
9use iroh_blobs::{
10 api::{blobs::BlobStatus, downloader::Downloader, Store},
11 store::{ProtectCb, ProtectOutcome},
12 Hash,
13};
14use iroh_gossip::net::Gossip;
15use n0_future::{task::AbortOnDropHandle, Stream, StreamExt};
16use serde::{Deserialize, Serialize};
17use tokio::sync::{mpsc, oneshot};
18use tracing::{debug, error, error_span, Instrument};
19
20use self::live::{LiveActor, ToLiveActor};
21pub use self::{
22 live::SyncEvent,
23 state::{Origin, SyncReason},
24};
25use crate::{
26 actor::SyncHandle, metrics::Metrics, Author, AuthorId, ContentStatus, ContentStatusCallback,
27 Entry, NamespaceId,
28};
29
30mod gossip;
31mod live;
32mod state;
33
34const ACTOR_CHANNEL_CAP: usize = 64;
36const SUBSCRIBE_CHANNEL_CAP: usize = 256;
38
39#[derive(derive_more::Debug)]
42pub struct Engine {
43 pub endpoint: Endpoint,
45 pub sync: SyncHandle,
47 pub default_author: DefaultAuthor,
49 to_live_actor: mpsc::Sender<ToLiveActor>,
50 #[allow(dead_code)]
51 actor_handle: AbortOnDropHandle<()>,
52 #[debug("ContentStatusCallback")]
53 content_status_cb: ContentStatusCallback,
54 blob_store: iroh_blobs::api::Store,
55 _gc_protect_task: AbortOnDropHandle<()>,
56}
57
58impl Engine {
59 pub async fn spawn(
64 endpoint: Endpoint,
65 gossip: Gossip,
66 replica_store: crate::store::Store,
67 bao_store: iroh_blobs::api::Store,
68 downloader: Downloader,
69 default_author_storage: DefaultAuthorStorage,
70 protect_cb: Option<ProtectCallbackHandler>,
71 ) -> anyhow::Result<Self> {
72 let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
73 let me = endpoint.id().fmt_short().to_string();
74
75 let content_status_cb: ContentStatusCallback = {
76 let blobs = bao_store.blobs().clone();
77 Arc::new(move |hash: iroh_blobs::Hash| {
78 let blobs = blobs.clone();
79 Box::pin(async move {
80 let blob_status = blobs.status(hash).await;
81 entry_to_content_status(blob_status)
82 })
83 })
84 };
85 let sync = SyncHandle::spawn(replica_store, Some(content_status_cb.clone()), me.clone());
86
87 let sync2 = sync.clone();
88 let gc_protect_task = AbortOnDropHandle::new(n0_future::task::spawn(async move {
89 let Some(mut protect_handler) = protect_cb else {
90 return;
91 };
92 while let Some(reply_tx) = protect_handler.0.recv().await {
93 let (tx, rx) = mpsc::channel(64);
94 if let Err(_err) = reply_tx.send(rx) {
95 continue;
96 }
97 let hashes = match sync2.content_hashes().await {
98 Ok(hashes) => hashes,
99 Err(err) => {
100 debug!("protect task: getting content hashes failed with {err:#}");
101 if let Err(_err) = tx.send(Err(err)).await {
102 debug!("protect task: failed to forward error");
103 }
104 continue;
105 }
106 };
107 for hash in hashes {
108 if let Err(_err) = tx.send(hash).await {
109 debug!("protect task: failed to forward hash");
110 break;
111 }
112 }
113 }
114 }));
115
116 let actor = LiveActor::new(
117 sync.clone(),
118 endpoint.clone(),
119 gossip.clone(),
120 bao_store.clone(),
121 downloader,
122 to_live_actor_recv,
123 live_actor_tx.clone(),
124 sync.metrics().clone(),
125 )?;
126 let actor_handle = n0_future::task::spawn(
127 async move {
128 if let Err(err) = actor.run().await {
129 error!("sync actor failed: {err:?}");
130 }
131 }
132 .instrument(error_span!("sync", %me)),
133 );
134
135 let default_author = match DefaultAuthor::load(default_author_storage, &sync).await {
136 Ok(author) => author,
137 Err(err) => {
138 let _store = sync.shutdown().await.ok();
141 return Err(err);
142 }
143 };
144
145 Ok(Self {
146 endpoint,
147 sync,
148 to_live_actor: live_actor_tx,
149 actor_handle: AbortOnDropHandle::new(actor_handle),
150 content_status_cb,
151 default_author,
152 blob_store: bao_store,
153 _gc_protect_task: gc_protect_task,
154 })
155 }
156
157 pub fn blob_store(&self) -> &Store {
159 &self.blob_store
160 }
161
162 pub fn metrics(&self) -> &Arc<Metrics> {
164 self.sync.metrics()
165 }
166
167 pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
172 let (reply, reply_rx) = oneshot::channel();
173 self.to_live_actor
174 .send(ToLiveActor::StartSync {
175 namespace,
176 peers,
177 reply,
178 })
179 .await?;
180 reply_rx.await??;
181 Ok(())
182 }
183
184 pub async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> {
189 let (reply, reply_rx) = oneshot::channel();
190 self.to_live_actor
191 .send(ToLiveActor::Leave {
192 namespace,
193 kill_subscribers,
194 reply,
195 })
196 .await?;
197 reply_rx.await??;
198 Ok(())
199 }
200
201 pub async fn subscribe(
203 &self,
204 namespace: NamespaceId,
205 ) -> Result<impl Stream<Item = Result<LiveEvent>> + Unpin + 'static> {
206 let content_status_cb = self.content_status_cb.clone();
209
210 let a = {
212 let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
213 self.sync.subscribe(namespace, s).await?;
214 Box::pin(r).then(move |ev| {
215 let content_status_cb = content_status_cb.clone();
216 Box::pin(async move { LiveEvent::from_replica_event(ev, &content_status_cb).await })
217 })
218 };
219
220 let b = {
222 let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
223 let r = Box::pin(r);
224 let (reply, reply_rx) = oneshot::channel();
225 self.to_live_actor
226 .send(ToLiveActor::Subscribe {
227 namespace,
228 sender: s,
229 reply,
230 })
231 .await?;
232 reply_rx.await??;
233 r.map(|event| Ok(LiveEvent::from(event)))
234 };
235
236 Ok(a.or(b))
237 }
238
239 pub async fn handle_connection(&self, conn: iroh::endpoint::Connection) -> anyhow::Result<()> {
241 self.to_live_actor
242 .send(ToLiveActor::HandleConnection { conn })
243 .await?;
244 Ok(())
245 }
246
247 pub async fn shutdown(&self) -> Result<()> {
249 let (reply, reply_rx) = oneshot::channel();
250 self.to_live_actor
251 .send(ToLiveActor::Shutdown { reply })
252 .await?;
253 reply_rx.await?;
254 Ok(())
255 }
256}
257
258fn entry_to_content_status(entry: irpc::Result<BlobStatus>) -> ContentStatus {
260 match entry {
261 Ok(BlobStatus::Complete { .. }) => ContentStatus::Complete,
262 Ok(BlobStatus::Partial { .. }) => ContentStatus::Incomplete,
263 Ok(BlobStatus::NotFound) => ContentStatus::Missing,
264 Err(cause) => {
265 tracing::warn!("Error while checking entry status: {cause:?}");
266 ContentStatus::Missing
267 }
268 }
269}
270
271#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
273pub enum LiveEvent {
274 InsertLocal {
276 entry: Entry,
278 },
279 InsertRemote {
281 from: PublicKey,
283 entry: Entry,
285 content_status: ContentStatus,
287 },
288 ContentReady {
290 hash: Hash,
292 },
293 PendingContentReady,
303 NeighborUp(PublicKey),
305 NeighborDown(PublicKey),
307 SyncFinished(SyncEvent),
309}
310
311impl From<live::Event> for LiveEvent {
312 fn from(ev: live::Event) -> Self {
313 match ev {
314 live::Event::ContentReady { hash } => Self::ContentReady { hash },
315 live::Event::NeighborUp(peer) => Self::NeighborUp(peer),
316 live::Event::NeighborDown(peer) => Self::NeighborDown(peer),
317 live::Event::SyncFinished(ev) => Self::SyncFinished(ev),
318 live::Event::PendingContentReady => Self::PendingContentReady,
319 }
320 }
321}
322
323impl LiveEvent {
324 async fn from_replica_event(
325 ev: crate::Event,
326 content_status_cb: &ContentStatusCallback,
327 ) -> Result<Self> {
328 Ok(match ev {
329 crate::Event::LocalInsert { entry, .. } => Self::InsertLocal {
330 entry: entry.into(),
331 },
332 crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
333 content_status: content_status_cb(entry.content_hash()).await,
334 entry: entry.into(),
335 from: PublicKey::from_bytes(&from)?,
336 },
337 })
338 }
339}
340
341#[derive(Debug)]
349pub enum DefaultAuthorStorage {
350 Mem,
352 #[cfg(feature = "fs-store")]
354 Persistent(std::path::PathBuf),
355}
356
357impl DefaultAuthorStorage {
358 pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
365 match self {
366 Self::Mem => {
367 let author = Author::new(&mut rand::rng());
368 let author_id = author.id();
369 docs_store.import_author(author).await?;
370 Ok(author_id)
371 }
372 #[cfg(feature = "fs-store")]
373 Self::Persistent(ref path) => {
374 use std::str::FromStr;
375
376 use anyhow::Context;
377 if path.exists() {
378 let data = tokio::fs::read_to_string(path).await.with_context(|| {
379 format!(
380 "Failed to read the default author file at `{}`",
381 path.to_string_lossy()
382 )
383 })?;
384 let author_id = AuthorId::from_str(&data).with_context(|| {
385 format!(
386 "Failed to parse the default author from `{}`",
387 path.to_string_lossy()
388 )
389 })?;
390 if docs_store.export_author(author_id).await?.is_none() {
391 bail!(
392 "The default author is missing from the docs store. To recover, delete the file `{}`. Then iroh will create a new default author.",
393 path.to_string_lossy()
394 )
395 }
396 Ok(author_id)
397 } else {
398 let author = Author::new(&mut rand::rng());
399 let author_id = author.id();
400 docs_store.import_author(author).await?;
401 docs_store.flush_store().await?;
405 self.persist(author_id).await?;
406 Ok(author_id)
407 }
408 }
409 }
410 }
411
412 pub async fn persist(&self, #[allow(unused)] author_id: AuthorId) -> anyhow::Result<()> {
414 match self {
415 Self::Mem => {
416 }
418 #[cfg(feature = "fs-store")]
419 Self::Persistent(ref path) => {
420 use anyhow::Context;
421 tokio::fs::write(path, author_id.to_string())
422 .await
423 .with_context(|| {
424 format!(
425 "Failed to write the default author to `{}`",
426 path.to_string_lossy()
427 )
428 })?;
429 }
430 }
431 Ok(())
432 }
433}
434
435#[derive(Debug)]
437pub struct DefaultAuthor {
438 value: RwLock<AuthorId>,
439 storage: DefaultAuthorStorage,
440}
441
442impl DefaultAuthor {
443 pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
447 let value = storage.load(docs_store).await?;
448 Ok(Self {
449 value: RwLock::new(value),
450 storage,
451 })
452 }
453
454 pub fn get(&self) -> AuthorId {
456 *self.value.read().unwrap()
457 }
458
459 pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
461 if docs_store.export_author(author_id).await?.is_none() {
462 bail!("The author does not exist");
463 }
464 self.storage.persist(author_id).await?;
465 *self.value.write().unwrap() = author_id;
466 Ok(())
467 }
468}
469
470#[derive(Debug)]
471struct ProtectCallbackSender(mpsc::Sender<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>);
472
473#[derive(Debug)]
477pub struct ProtectCallbackHandler(
478 pub(crate) mpsc::Receiver<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>,
479);
480
481impl ProtectCallbackHandler {
482 pub fn new() -> (Self, ProtectCb) {
493 let (tx, rx) = mpsc::channel(4);
494 let cb = ProtectCallbackSender(tx).into_cb();
495 let handler = ProtectCallbackHandler(rx);
496 (handler, cb)
497 }
498}
499
500impl ProtectCallbackSender {
501 fn into_cb(self) -> ProtectCb {
502 let start_tx = self.0.clone();
503 Arc::new(move |live| {
504 let start_tx = start_tx.clone();
505 Box::pin(async move {
506 let (tx, rx) = oneshot::channel();
507 if let Err(_err) = start_tx.send(tx).await {
508 tracing::warn!(
509 "Failed to get protected hashes from docs: ProtectCallback receiver dropped"
510 );
511 return ProtectOutcome::Abort;
512 }
513 let mut rx = match rx.await {
514 Ok(rx) => rx,
515 Err(_err) => {
516 tracing::warn!(
517 "Failed to get protected hashes from docs: ProtectCallback sender dropped"
518 );
519 return ProtectOutcome::Abort;
520 }
521 };
522 while let Some(res) = rx.recv().await {
523 match res {
524 Err(err) => {
525 tracing::warn!("Getting protected hashes produces error: {err:#}");
526 return ProtectOutcome::Abort;
527 }
528 Ok(hash) => {
529 live.insert(hash);
530 }
531 }
532 }
533 ProtectOutcome::Continue
534 })
535 })
536 }
537}