iroh_docs/
actor.rs

1//! This contains an actor spawned on a separate thread to process replica and store operations.
2
3use std::{
4    collections::{hash_map, HashMap},
5    num::NonZeroU64,
6    sync::Arc,
7};
8
9use anyhow::{anyhow, Context, Result};
10use bytes::Bytes;
11use futures_util::FutureExt;
12use iroh_blobs::Hash;
13use irpc::channel::mpsc;
14use n0_future::{task::JoinSet, time::Duration};
15use serde::{Deserialize, Serialize};
16use tokio::sync::oneshot;
17#[cfg(wasm_browser)]
18use tracing::Instrument;
19use tracing::{debug, error, error_span, trace, warn};
20
21use crate::{
22    api::{
23        protocol::{AuthorListResponse, ListResponse},
24        RpcError, RpcResult,
25    },
26    metrics::Metrics,
27    ranger::Message,
28    store::{
29        fs::{ContentHashesIterator, StoreInstance},
30        DownloadPolicy, ImportNamespaceOutcome, Query, Store,
31    },
32    Author, AuthorHeads, AuthorId, Capability, ContentStatus, ContentStatusCallback, Event,
33    NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo, SignedEntry, SyncOutcome,
34};
35
36const ACTION_CAP: usize = 1024;
37pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
38
39#[derive(derive_more::Debug, derive_more::Display)]
40enum Action {
41    #[display("NewAuthor")]
42    ImportAuthor {
43        author: Author,
44        #[debug("reply")]
45        reply: oneshot::Sender<Result<AuthorId>>,
46    },
47    #[display("ExportAuthor")]
48    ExportAuthor {
49        author: AuthorId,
50        #[debug("reply")]
51        reply: oneshot::Sender<Result<Option<Author>>>,
52    },
53    #[display("DeleteAuthor")]
54    DeleteAuthor {
55        author: AuthorId,
56        #[debug("reply")]
57        reply: oneshot::Sender<Result<()>>,
58    },
59    #[display("NewReplica")]
60    ImportNamespace {
61        capability: Capability,
62        #[debug("reply")]
63        reply: oneshot::Sender<Result<NamespaceId>>,
64    },
65    #[display("ListAuthors")]
66    ListAuthors {
67        #[debug("reply")]
68        reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
69    },
70    #[display("ListReplicas")]
71    ListReplicas {
72        #[debug("reply")]
73        reply: mpsc::Sender<RpcResult<ListResponse>>,
74    },
75    #[display("ContentHashes")]
76    ContentHashes {
77        #[debug("reply")]
78        reply: oneshot::Sender<Result<ContentHashesIterator>>,
79    },
80    #[display("FlushStore")]
81    FlushStore {
82        #[debug("reply")]
83        reply: oneshot::Sender<Result<()>>,
84    },
85    #[display("Replica({}, {})", _0.fmt_short(), _1)]
86    Replica(NamespaceId, ReplicaAction),
87    #[display("Shutdown")]
88    Shutdown {
89        #[debug("reply")]
90        reply: Option<oneshot::Sender<Store>>,
91    },
92}
93
94#[derive(derive_more::Debug, strum::Display)]
95enum ReplicaAction {
96    Open {
97        #[debug("reply")]
98        reply: oneshot::Sender<Result<()>>,
99        opts: OpenOpts,
100    },
101    Close {
102        #[debug("reply")]
103        reply: oneshot::Sender<Result<bool>>,
104    },
105    GetState {
106        #[debug("reply")]
107        reply: oneshot::Sender<Result<OpenState>>,
108    },
109    SetSync {
110        sync: bool,
111        #[debug("reply")]
112        reply: oneshot::Sender<Result<()>>,
113    },
114    Subscribe {
115        sender: async_channel::Sender<Event>,
116        #[debug("reply")]
117        reply: oneshot::Sender<Result<()>>,
118    },
119    Unsubscribe {
120        sender: async_channel::Sender<Event>,
121        #[debug("reply")]
122        reply: oneshot::Sender<Result<()>>,
123    },
124    InsertLocal {
125        author: AuthorId,
126        key: Bytes,
127        hash: Hash,
128        len: u64,
129        #[debug("reply")]
130        reply: oneshot::Sender<Result<()>>,
131    },
132    DeletePrefix {
133        author: AuthorId,
134        key: Bytes,
135        #[debug("reply")]
136        reply: oneshot::Sender<Result<usize>>,
137    },
138    InsertRemote {
139        entry: SignedEntry,
140        from: PeerIdBytes,
141        content_status: ContentStatus,
142        #[debug("reply")]
143        reply: oneshot::Sender<Result<()>>,
144    },
145    SyncInitialMessage {
146        #[debug("reply")]
147        reply: oneshot::Sender<Result<Message<SignedEntry>>>,
148    },
149    SyncProcessMessage {
150        message: Message<SignedEntry>,
151        from: PeerIdBytes,
152        state: SyncOutcome,
153        #[debug("reply")]
154        reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
155    },
156    GetSyncPeers {
157        #[debug("reply")]
158        reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
159    },
160    RegisterUsefulPeer {
161        peer: PeerIdBytes,
162        #[debug("reply")]
163        reply: oneshot::Sender<Result<()>>,
164    },
165    GetExact {
166        author: AuthorId,
167        key: Bytes,
168        include_empty: bool,
169        reply: oneshot::Sender<Result<Option<SignedEntry>>>,
170    },
171    GetMany {
172        query: Query,
173        reply: mpsc::Sender<RpcResult<SignedEntry>>,
174    },
175    DropReplica {
176        reply: oneshot::Sender<Result<()>>,
177    },
178    ExportSecretKey {
179        reply: oneshot::Sender<Result<NamespaceSecret>>,
180    },
181    HasNewsForUs {
182        heads: AuthorHeads,
183        #[debug("reply")]
184        reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
185    },
186    SetDownloadPolicy {
187        policy: DownloadPolicy,
188        #[debug("reply")]
189        reply: oneshot::Sender<Result<()>>,
190    },
191    GetDownloadPolicy {
192        #[debug("reply")]
193        reply: oneshot::Sender<Result<DownloadPolicy>>,
194    },
195}
196
197/// The state for an open replica.
198#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
199pub struct OpenState {
200    /// Whether to accept sync requests for this replica.
201    pub sync: bool,
202    /// How many event subscriptions are open
203    pub subscribers: usize,
204    /// By how many handles the replica is currently held open
205    pub handles: usize,
206}
207
208#[derive(Debug)]
209struct OpenReplica {
210    info: ReplicaInfo,
211    sync: bool,
212    handles: usize,
213}
214
215/// The [`SyncHandle`] controls an actor thread which executes replica and store operations.
216///
217/// The [`SyncHandle`] exposes async methods which all send messages into the actor thread, usually
218/// returning something via a return channel. The actor thread itself is a regular [`std::thread`]
219/// which processes incoming messages sequentially.
220///
221/// The handle is cheaply cloneable. Once the last clone is dropped, the actor thread is joined.
222/// The thread will finish processing all messages in the channel queue, and then exit.
223/// To prevent this last drop from blocking the calling thread, you can call [`SyncHandle::shutdown`]
224/// and await its result before dropping the last [`SyncHandle`]. This ensures that
225/// waiting for the actor to finish happens in an async context, and therefore that the final
226/// [`SyncHandle::drop`] will not block.
227#[derive(Debug, Clone)]
228pub struct SyncHandle {
229    tx: async_channel::Sender<Action>,
230    #[cfg(wasm_browser)]
231    #[allow(unused)]
232    join_handle: Arc<Option<n0_future::task::JoinHandle<()>>>,
233    #[cfg(not(wasm_browser))]
234    join_handle: Arc<Option<std::thread::JoinHandle<()>>>,
235    metrics: Arc<Metrics>,
236}
237
238/// Options when opening a replica.
239#[derive(Debug, Default)]
240pub struct OpenOpts {
241    /// Set to true to set sync state to true.
242    pub sync: bool,
243    /// Optionally subscribe to replica events.
244    pub subscribe: Option<async_channel::Sender<Event>>,
245}
246
247impl OpenOpts {
248    /// Set sync state to true.
249    pub fn sync(mut self) -> Self {
250        self.sync = true;
251        self
252    }
253    /// Subscribe to replica events.
254    pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
255        self.subscribe = Some(subscribe);
256        self
257    }
258}
259
260#[allow(missing_docs)]
261impl SyncHandle {
262    /// Spawn a sync actor and return a handle.
263    pub fn spawn(
264        store: Store,
265        content_status_callback: Option<ContentStatusCallback>,
266        me: String,
267    ) -> SyncHandle {
268        let metrics = Arc::new(Metrics::default());
269        let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
270        let actor = Actor {
271            store,
272            states: Default::default(),
273            action_rx,
274            content_status_callback,
275            tasks: Default::default(),
276            metrics: metrics.clone(),
277        };
278
279        let span = error_span!("sync", %me);
280        #[cfg(wasm_browser)]
281        let join_handle = n0_future::task::spawn(actor.run_async().instrument(span));
282
283        #[cfg(not(wasm_browser))]
284        let join_handle = std::thread::Builder::new()
285            .name("sync-actor".to_string())
286            .spawn(move || {
287                let _enter = span.enter();
288
289                if let Err(err) = actor.run_in_thread() {
290                    error!("Sync actor closed with error: {err:?}");
291                }
292            })
293            .expect("failed to spawn thread");
294
295        let join_handle = Arc::new(Some(join_handle));
296        SyncHandle {
297            tx: action_tx,
298            join_handle,
299            metrics,
300        }
301    }
302
303    /// Returns the metrics collected in this sync actor.
304    pub fn metrics(&self) -> &Arc<Metrics> {
305        &self.metrics
306    }
307
308    pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
309        tracing::debug!("SyncHandle::open called");
310        let (reply, rx) = oneshot::channel();
311        let action = ReplicaAction::Open { reply, opts };
312        self.send_replica(namespace, action).await?;
313        rx.await?
314    }
315
316    pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
317        let (reply, rx) = oneshot::channel();
318        self.send_replica(namespace, ReplicaAction::Close { reply })
319            .await?;
320        rx.await?
321    }
322
323    pub async fn subscribe(
324        &self,
325        namespace: NamespaceId,
326        sender: async_channel::Sender<Event>,
327    ) -> Result<()> {
328        let (reply, rx) = oneshot::channel();
329        self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
330            .await?;
331        rx.await?
332    }
333
334    pub async fn unsubscribe(
335        &self,
336        namespace: NamespaceId,
337        sender: async_channel::Sender<Event>,
338    ) -> Result<()> {
339        let (reply, rx) = oneshot::channel();
340        self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
341            .await?;
342        rx.await?
343    }
344
345    pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
346        let (reply, rx) = oneshot::channel();
347        let action = ReplicaAction::SetSync { sync, reply };
348        self.send_replica(namespace, action).await?;
349        rx.await?
350    }
351
352    pub async fn insert_local(
353        &self,
354        namespace: NamespaceId,
355        author: AuthorId,
356        key: Bytes,
357        hash: Hash,
358        len: u64,
359    ) -> Result<()> {
360        let (reply, rx) = oneshot::channel();
361        let action = ReplicaAction::InsertLocal {
362            author,
363            key,
364            hash,
365            len,
366            reply,
367        };
368        self.send_replica(namespace, action).await?;
369        rx.await?
370    }
371
372    pub async fn delete_prefix(
373        &self,
374        namespace: NamespaceId,
375        author: AuthorId,
376        key: Bytes,
377    ) -> Result<usize> {
378        let (reply, rx) = oneshot::channel();
379        let action = ReplicaAction::DeletePrefix { author, key, reply };
380        self.send_replica(namespace, action).await?;
381        rx.await?
382    }
383
384    pub async fn insert_remote(
385        &self,
386        namespace: NamespaceId,
387        entry: SignedEntry,
388        from: PeerIdBytes,
389        content_status: ContentStatus,
390    ) -> Result<()> {
391        let (reply, rx) = oneshot::channel();
392        let action = ReplicaAction::InsertRemote {
393            entry,
394            from,
395            content_status,
396            reply,
397        };
398        self.send_replica(namespace, action).await?;
399        rx.await?
400    }
401
402    pub async fn sync_initial_message(
403        &self,
404        namespace: NamespaceId,
405    ) -> Result<Message<SignedEntry>> {
406        let (reply, rx) = oneshot::channel();
407        let action = ReplicaAction::SyncInitialMessage { reply };
408        self.send_replica(namespace, action).await?;
409        rx.await?
410    }
411
412    pub async fn sync_process_message(
413        &self,
414        namespace: NamespaceId,
415        message: Message<SignedEntry>,
416        from: PeerIdBytes,
417        state: SyncOutcome,
418    ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
419        let (reply, rx) = oneshot::channel();
420        let action = ReplicaAction::SyncProcessMessage {
421            reply,
422            message,
423            from,
424            state,
425        };
426        self.send_replica(namespace, action).await?;
427        rx.await?
428    }
429
430    pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
431        let (reply, rx) = oneshot::channel();
432        let action = ReplicaAction::GetSyncPeers { reply };
433        self.send_replica(namespace, action).await?;
434        rx.await?
435    }
436
437    pub async fn register_useful_peer(
438        &self,
439        namespace: NamespaceId,
440        peer: PeerIdBytes,
441    ) -> Result<()> {
442        let (reply, rx) = oneshot::channel();
443        let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
444        self.send_replica(namespace, action).await?;
445        rx.await?
446    }
447
448    pub async fn has_news_for_us(
449        &self,
450        namespace: NamespaceId,
451        heads: AuthorHeads,
452    ) -> Result<Option<NonZeroU64>> {
453        let (reply, rx) = oneshot::channel();
454        let action = ReplicaAction::HasNewsForUs { reply, heads };
455        self.send_replica(namespace, action).await?;
456        rx.await?
457    }
458
459    pub async fn get_many(
460        &self,
461        namespace: NamespaceId,
462        query: Query,
463        reply: mpsc::Sender<RpcResult<SignedEntry>>,
464    ) -> Result<()> {
465        let action = ReplicaAction::GetMany { query, reply };
466        self.send_replica(namespace, action).await?;
467        Ok(())
468    }
469
470    pub async fn get_exact(
471        &self,
472        namespace: NamespaceId,
473        author: AuthorId,
474        key: Bytes,
475        include_empty: bool,
476    ) -> Result<Option<SignedEntry>> {
477        let (reply, rx) = oneshot::channel();
478        let action = ReplicaAction::GetExact {
479            author,
480            key,
481            include_empty,
482            reply,
483        };
484        self.send_replica(namespace, action).await?;
485        rx.await?
486    }
487
488    pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
489        let (reply, rx) = oneshot::channel();
490        let action = ReplicaAction::DropReplica { reply };
491        self.send_replica(namespace, action).await?;
492        rx.await?
493    }
494
495    pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
496        let (reply, rx) = oneshot::channel();
497        let action = ReplicaAction::ExportSecretKey { reply };
498        self.send_replica(namespace, action).await?;
499        rx.await?
500    }
501
502    pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
503        let (reply, rx) = oneshot::channel();
504        let action = ReplicaAction::GetState { reply };
505        self.send_replica(namespace, action).await?;
506        rx.await?
507    }
508
509    pub async fn shutdown(&self) -> Result<Store> {
510        let (reply, rx) = oneshot::channel();
511        let action = Action::Shutdown { reply: Some(reply) };
512        self.send(action).await?;
513        let store = rx.await?;
514        Ok(store)
515    }
516
517    pub async fn list_authors(
518        &self,
519        reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
520    ) -> Result<()> {
521        self.send(Action::ListAuthors { reply }).await
522    }
523
524    pub async fn list_replicas(&self, reply: mpsc::Sender<RpcResult<ListResponse>>) -> Result<()> {
525        self.send(Action::ListReplicas { reply }).await
526    }
527
528    pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
529        let (reply, rx) = oneshot::channel();
530        self.send(Action::ImportAuthor { author, reply }).await?;
531        rx.await?
532    }
533
534    pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
535        let (reply, rx) = oneshot::channel();
536        self.send(Action::ExportAuthor { author, reply }).await?;
537        rx.await?
538    }
539
540    pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
541        let (reply, rx) = oneshot::channel();
542        self.send(Action::DeleteAuthor { author, reply }).await?;
543        rx.await?
544    }
545
546    pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
547        let (reply, rx) = oneshot::channel();
548        self.send(Action::ImportNamespace { capability, reply })
549            .await?;
550        rx.await?
551    }
552
553    pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
554        let (reply, rx) = oneshot::channel();
555        let action = ReplicaAction::GetDownloadPolicy { reply };
556        self.send_replica(namespace, action).await?;
557        rx.await?
558    }
559
560    pub async fn set_download_policy(
561        &self,
562        namespace: NamespaceId,
563        policy: DownloadPolicy,
564    ) -> Result<()> {
565        let (reply, rx) = oneshot::channel();
566        let action = ReplicaAction::SetDownloadPolicy { reply, policy };
567        self.send_replica(namespace, action).await?;
568        rx.await?
569    }
570
571    pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
572        let (reply, rx) = oneshot::channel();
573        self.send(Action::ContentHashes { reply }).await?;
574        rx.await?
575    }
576
577    /// Makes sure that all pending database operations are persisted to disk.
578    ///
579    /// Otherwise, database operations are batched into bigger transactions for speed.
580    /// Use this if you need to make sure something is written to the database
581    /// before another operation, e.g. to make sure sudden process exits don't corrupt
582    /// your application state.
583    ///
584    /// It's not necessary to call this function before shutdown, as `shutdown` will
585    /// trigger a flush on its own.
586    pub async fn flush_store(&self) -> Result<()> {
587        let (reply, rx) = oneshot::channel();
588        self.send(Action::FlushStore { reply }).await?;
589        rx.await?
590    }
591
592    async fn send(&self, action: Action) -> Result<()> {
593        self.tx
594            .send(action)
595            .await
596            .context("sending to iroh_docs actor failed")?;
597        Ok(())
598    }
599    async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
600        self.send(Action::Replica(namespace, action)).await?;
601        Ok(())
602    }
603}
604
605impl Drop for SyncHandle {
606    fn drop(&mut self) {
607        // this means we're dropping the last reference
608        #[allow(unused)]
609        if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
610            #[cfg(wasm_browser)]
611            {
612                let tx = self.tx.clone();
613                n0_future::task::spawn(async move {
614                    tx.send(Action::Shutdown { reply: None }).await.ok();
615                });
616            }
617            #[cfg(not(wasm_browser))]
618            {
619                // this call is the reason tx can not be a tokio mpsc channel.
620                // we have no control about where drop is called, yet tokio send_blocking panics
621                // when called from inside a tokio runtime.
622                self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
623                let handle = handle.take().expect("this can only run once");
624
625                if let Err(err) = handle.join() {
626                    warn!(?err, "Failed to join sync actor");
627                }
628            }
629        }
630    }
631}
632
633struct Actor {
634    store: Store,
635    states: OpenReplicas,
636    action_rx: async_channel::Receiver<Action>,
637    content_status_callback: Option<ContentStatusCallback>,
638    tasks: JoinSet<()>,
639    metrics: Arc<Metrics>,
640}
641
642impl Actor {
643    #[cfg(not(wasm_browser))]
644    fn run_in_thread(self) -> Result<()> {
645        let rt = tokio::runtime::Builder::new_current_thread()
646            .enable_time()
647            .build()?;
648        let local_set = tokio::task::LocalSet::new();
649        local_set.block_on(&rt, async move { self.run_async().await });
650        Ok(())
651    }
652
653    async fn run_async(mut self) {
654        let reply = loop {
655            let timeout = n0_future::time::sleep(MAX_COMMIT_DELAY);
656            tokio::pin!(timeout);
657            let action = tokio::select! {
658                _ = &mut timeout => {
659                    if let Err(cause) = self.store.flush() {
660                        error!(?cause, "failed to flush store");
661                    }
662                    continue;
663                }
664                action = self.action_rx.recv() => {
665                    match action {
666                        Ok(action) => action,
667                        Err(async_channel::RecvError) => {
668                            debug!("action channel disconnected");
669                            break None;
670                        }
671
672                    }
673                }
674            };
675            trace!(%action, "tick");
676            self.metrics.actor_tick_main.inc();
677            match action {
678                Action::Shutdown { reply } => {
679                    break reply;
680                }
681                action => {
682                    if self.on_action(action).await.is_err() {
683                        warn!("failed to send reply: receiver dropped");
684                    }
685                }
686            }
687        };
688
689        if let Err(cause) = self.store.flush() {
690            warn!(?cause, "failed to flush store");
691        }
692        self.close_all();
693        self.tasks.abort_all();
694        debug!("docs actor shutdown");
695        if let Some(reply) = reply {
696            reply.send(self.store).ok();
697        }
698    }
699
700    async fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
701        match action {
702            Action::Shutdown { .. } => {
703                unreachable!("Shutdown is handled in run()")
704            }
705            Action::ImportAuthor { author, reply } => {
706                let id = author.id();
707                send_reply(reply, self.store.import_author(author).map(|_| id))
708            }
709            Action::ExportAuthor { author, reply } => {
710                send_reply(reply, self.store.get_author(&author))
711            }
712            Action::DeleteAuthor { author, reply } => {
713                send_reply(reply, self.store.delete_author(author))
714            }
715            Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
716                let id = capability.id();
717                let outcome = this.store.import_namespace(capability.clone())?;
718                if let ImportNamespaceOutcome::Upgraded = outcome {
719                    if let Ok(state) = this.states.get_mut(&id) {
720                        state.info.merge_capability(capability)?;
721                    }
722                }
723                Ok(id)
724            }),
725            Action::ListAuthors { reply } => {
726                let iter = self
727                    .store
728                    .list_authors()
729                    .map(|a| a.map(|a| a.map(|a| AuthorListResponse { author_id: a.id() })));
730                self.tasks
731                    .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
732                Ok(())
733            }
734            Action::ListReplicas { reply } => {
735                let iter = self.store.list_namespaces();
736                let iter = iter.map(|inner| {
737                    inner.map(|res| res.map(|(id, capability)| ListResponse { id, capability }))
738                });
739                self.tasks
740                    .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
741                Ok(())
742            }
743            Action::ContentHashes { reply } => {
744                send_reply_with(reply, self, |this| this.store.content_hashes())
745            }
746            Action::FlushStore { reply } => send_reply(reply, self.store.flush()),
747            Action::Replica(namespace, action) => self.on_replica_action(namespace, action).await,
748        }
749    }
750
751    async fn on_replica_action(
752        &mut self,
753        namespace: NamespaceId,
754        action: ReplicaAction,
755    ) -> Result<(), SendReplyError> {
756        match action {
757            ReplicaAction::Open { reply, opts } => {
758                tracing::trace!("open in");
759                let res = self.open(namespace, opts);
760                tracing::trace!("open out");
761                send_reply(reply, res)
762            }
763            ReplicaAction::Close { reply } => {
764                let res = self.close(namespace);
765                // ignore errors when no receiver is present for close
766                reply.send(Ok(res)).ok();
767                Ok(())
768            }
769            ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
770                let state = this.states.get_mut(&namespace)?;
771                state.info.subscribe(sender);
772                Ok(())
773            }),
774            ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
775                let state = this.states.get_mut(&namespace)?;
776                state.info.unsubscribe(&sender);
777                drop(sender);
778                Ok(())
779            }),
780            ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
781                let state = this.states.get_mut(&namespace)?;
782                state.sync = sync;
783                Ok(())
784            }),
785            ReplicaAction::InsertLocal {
786                author,
787                key,
788                hash,
789                len,
790                reply,
791            } => {
792                send_reply_with_async(reply, self, async move |this| {
793                    let author = get_author(&mut this.store, &author)?;
794                    let mut replica = this.states.replica(namespace, &mut this.store)?;
795                    replica.insert(&key, &author, hash, len).await?;
796                    this.metrics.new_entries_local.inc();
797                    this.metrics.new_entries_local_size.inc_by(len);
798                    Ok(())
799                })
800                .await
801            }
802            ReplicaAction::DeletePrefix { author, key, reply } => {
803                send_reply_with_async(reply, self, async |this| {
804                    let author = get_author(&mut this.store, &author)?;
805                    let mut replica = this.states.replica(namespace, &mut this.store)?;
806                    let res = replica.delete_prefix(&key, &author).await?;
807                    Ok(res)
808                })
809                .await
810            }
811            ReplicaAction::InsertRemote {
812                entry,
813                from,
814                content_status,
815                reply,
816            } => {
817                send_reply_with_async(reply, self, async move |this| {
818                    let mut replica = this
819                        .states
820                        .replica_if_syncing(&namespace, &mut this.store)?;
821                    let len = entry.content_len();
822                    replica
823                        .insert_remote_entry(entry, from, content_status)
824                        .await?;
825                    this.metrics.new_entries_remote.inc();
826                    this.metrics.new_entries_remote_size.inc_by(len);
827                    Ok(())
828                })
829                .await
830            }
831
832            ReplicaAction::SyncInitialMessage { reply } => {
833                send_reply_with(reply, self, move |this| {
834                    let mut replica = this
835                        .states
836                        .replica_if_syncing(&namespace, &mut this.store)?;
837                    let res = replica.sync_initial_message()?;
838                    Ok(res)
839                })
840            }
841            ReplicaAction::SyncProcessMessage {
842                message,
843                from,
844                mut state,
845                reply,
846            } => {
847                let res = async {
848                    let mut replica = self
849                        .states
850                        .replica_if_syncing(&namespace, &mut self.store)?;
851                    let res = replica
852                        .sync_process_message(message, from, &mut state)
853                        .await?;
854                    Ok((res, state))
855                }
856                .await;
857                reply.send(res).map_err(send_reply_error)
858            }
859            ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
860                this.states.ensure_open(&namespace)?;
861                let peers = this.store.get_sync_peers(&namespace)?;
862                Ok(peers.map(|iter| iter.collect()))
863            }),
864            ReplicaAction::RegisterUsefulPeer { peer, reply } => {
865                let res = self.store.register_useful_peer(namespace, peer);
866                send_reply(reply, res)
867            }
868            ReplicaAction::GetExact {
869                author,
870                key,
871                include_empty,
872                reply,
873            } => send_reply_with(reply, self, move |this| {
874                this.states.ensure_open(&namespace)?;
875                this.store.get_exact(namespace, author, key, include_empty)
876            }),
877            ReplicaAction::GetMany { query, reply } => {
878                let iter = self
879                    .states
880                    .ensure_open(&namespace)
881                    .and_then(|_| self.store.get_many(namespace, query));
882                self.tasks
883                    .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
884                Ok(())
885            }
886            ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
887                this.close(namespace);
888                this.store.remove_replica(&namespace)
889            }),
890            ReplicaAction::ExportSecretKey { reply } => {
891                let res = self
892                    .states
893                    .get_mut(&namespace)
894                    .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
895                send_reply(reply, res)
896            }
897            ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
898                let state = this.states.get_mut(&namespace)?;
899                let handles = state.handles;
900                let sync = state.sync;
901                let subscribers = state.info.subscribers_count();
902                Ok(OpenState {
903                    handles,
904                    sync,
905                    subscribers,
906                })
907            }),
908            ReplicaAction::HasNewsForUs { heads, reply } => {
909                let res = self.store.has_news_for_us(namespace, &heads);
910                send_reply(reply, res)
911            }
912            ReplicaAction::SetDownloadPolicy { policy, reply } => {
913                send_reply(reply, self.store.set_download_policy(&namespace, policy))
914            }
915            ReplicaAction::GetDownloadPolicy { reply } => {
916                send_reply(reply, self.store.get_download_policy(&namespace))
917            }
918        }
919    }
920
921    fn close(&mut self, namespace: NamespaceId) -> bool {
922        let res = self.states.close(namespace);
923        if res {
924            self.store.close_replica(namespace);
925        }
926        res
927    }
928
929    fn close_all(&mut self) {
930        for id in self.states.close_all() {
931            self.store.close_replica(id);
932        }
933    }
934
935    fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
936        let open_cb = || {
937            let mut info = self.store.load_replica_info(&namespace)?;
938            if let Some(cb) = &self.content_status_callback {
939                info.set_content_status_callback(Arc::clone(cb));
940            }
941            Ok(info)
942        };
943        self.states.open_with(namespace, opts, open_cb)
944    }
945}
946
947#[derive(Default)]
948struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
949
950impl OpenReplicas {
951    fn replica<'a, 'b>(
952        &'a mut self,
953        namespace: NamespaceId,
954        store: &'b mut Store,
955    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
956        let state = self.get_mut(&namespace)?;
957        Ok(Replica::new(
958            StoreInstance::new(state.info.capability.id(), store),
959            &mut state.info,
960        ))
961    }
962
963    fn replica_if_syncing<'a, 'b>(
964        &'a mut self,
965        namespace: &NamespaceId,
966        store: &'b mut Store,
967    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
968        let state = self.get_mut(namespace)?;
969        anyhow::ensure!(state.sync, "sync is not enabled for replica");
970        Ok(Replica::new(
971            StoreInstance::new(state.info.capability.id(), store),
972            &mut state.info,
973        ))
974    }
975
976    fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
977        self.0.get_mut(namespace).context("replica not open")
978    }
979
980    fn is_open(&self, namespace: &NamespaceId) -> bool {
981        self.0.contains_key(namespace)
982    }
983
984    fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
985        match self.is_open(namespace) {
986            true => Ok(()),
987            false => Err(anyhow!("replica not open")),
988        }
989    }
990    fn open_with(
991        &mut self,
992        namespace: NamespaceId,
993        opts: OpenOpts,
994        mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
995    ) -> Result<()> {
996        match self.0.entry(namespace) {
997            hash_map::Entry::Vacant(e) => {
998                let mut info = open_cb()?;
999                if let Some(sender) = opts.subscribe {
1000                    info.subscribe(sender);
1001                }
1002                debug!(namespace = %namespace.fmt_short(), "open");
1003                let state = OpenReplica {
1004                    info,
1005                    sync: opts.sync,
1006                    handles: 1,
1007                };
1008                e.insert(state);
1009            }
1010            hash_map::Entry::Occupied(mut e) => {
1011                let state = e.get_mut();
1012                state.handles += 1;
1013                state.sync = state.sync || opts.sync;
1014                if let Some(sender) = opts.subscribe {
1015                    state.info.subscribe(sender);
1016                }
1017            }
1018        }
1019        Ok(())
1020    }
1021    fn close(&mut self, namespace: NamespaceId) -> bool {
1022        match self.0.entry(namespace) {
1023            hash_map::Entry::Vacant(_e) => {
1024                warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
1025                true
1026            }
1027            hash_map::Entry::Occupied(mut e) => {
1028                let state = e.get_mut();
1029                tracing::debug!("STATE {state:?}");
1030                state.handles = state.handles.wrapping_sub(1);
1031                if state.handles == 0 {
1032                    let _ = e.remove_entry();
1033                    debug!(namespace = %namespace.fmt_short(), "close");
1034                    true
1035                } else {
1036                    false
1037                }
1038            }
1039        }
1040    }
1041
1042    fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
1043        self.0.drain().map(|(n, _s)| n)
1044    }
1045}
1046
1047async fn iter_to_irpc<T: irpc::RpcMessage>(
1048    channel: mpsc::Sender<RpcResult<T>>,
1049    iter: Result<impl Iterator<Item = Result<T>>>,
1050) -> Result<(), SendReplyError> {
1051    match iter {
1052        Err(err) => channel
1053            .send(Err(RpcError::new(&*err)))
1054            .await
1055            .map_err(send_reply_error)?,
1056        Ok(iter) => {
1057            for item in iter {
1058                let item = item.map_err(|err| RpcError::new(&*err));
1059                channel.send(item).await.map_err(send_reply_error)?;
1060            }
1061        }
1062    }
1063    Ok(())
1064}
1065
1066fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
1067    store.get_author(id)?.context("author not found")
1068}
1069
1070#[derive(Debug)]
1071struct SendReplyError;
1072
1073fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
1074    sender.send(value).map_err(send_reply_error)
1075}
1076
1077fn send_reply_with<T>(
1078    sender: oneshot::Sender<Result<T>>,
1079    this: &mut Actor,
1080    f: impl FnOnce(&mut Actor) -> Result<T>,
1081) -> Result<(), SendReplyError> {
1082    sender.send(f(this)).map_err(send_reply_error)
1083}
1084
1085async fn send_reply_with_async<T>(
1086    sender: oneshot::Sender<Result<T>>,
1087    this: &mut Actor,
1088    f: impl AsyncFnOnce(&mut Actor) -> Result<T>,
1089) -> Result<(), SendReplyError> {
1090    sender.send(f(this).await).map_err(send_reply_error)
1091}
1092
1093fn send_reply_error<T>(_err: T) -> SendReplyError {
1094    SendReplyError
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099    use super::*;
1100    use crate::store;
1101    #[tokio::test]
1102    async fn open_close() -> anyhow::Result<()> {
1103        let store = store::Store::memory();
1104        let sync = SyncHandle::spawn(store, None, "foo".into());
1105        let namespace = NamespaceSecret::new(&mut rand::rng());
1106        let id = namespace.id();
1107        sync.import_namespace(namespace.into()).await?;
1108        sync.open(id, Default::default()).await?;
1109        let (tx, rx) = async_channel::bounded(10);
1110        sync.subscribe(id, tx).await?;
1111        sync.close(id).await?;
1112        assert!(rx.recv().await.is_err());
1113        Ok(())
1114    }
1115}