iroh_docs/
actor.rs

1//! This contains an actor spawned on a separate thread to process replica and store operations.
2
3use std::{
4    collections::{hash_map, HashMap},
5    num::NonZeroU64,
6    sync::Arc,
7    time::Duration,
8};
9
10use anyhow::{anyhow, Context, Result};
11use bytes::Bytes;
12use futures_util::FutureExt;
13use iroh_blobs::Hash;
14use irpc::channel::mpsc;
15use n0_future::task::JoinSet;
16use serde::{Deserialize, Serialize};
17use tokio::sync::oneshot;
18#[cfg(wasm_browser)]
19use tracing::Instrument;
20use tracing::{debug, error, error_span, trace, warn};
21
22use crate::{
23    api::{
24        protocol::{AuthorListResponse, ListResponse},
25        RpcError, RpcResult,
26    },
27    metrics::Metrics,
28    ranger::Message,
29    store::{
30        fs::{ContentHashesIterator, StoreInstance},
31        DownloadPolicy, ImportNamespaceOutcome, Query, Store,
32    },
33    Author, AuthorHeads, AuthorId, Capability, ContentStatus, ContentStatusCallback, Event,
34    NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo, SignedEntry, SyncOutcome,
35};
36
37const ACTION_CAP: usize = 1024;
38pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
39
40#[derive(derive_more::Debug, derive_more::Display)]
41enum Action {
42    #[display("NewAuthor")]
43    ImportAuthor {
44        author: Author,
45        #[debug("reply")]
46        reply: oneshot::Sender<Result<AuthorId>>,
47    },
48    #[display("ExportAuthor")]
49    ExportAuthor {
50        author: AuthorId,
51        #[debug("reply")]
52        reply: oneshot::Sender<Result<Option<Author>>>,
53    },
54    #[display("DeleteAuthor")]
55    DeleteAuthor {
56        author: AuthorId,
57        #[debug("reply")]
58        reply: oneshot::Sender<Result<()>>,
59    },
60    #[display("NewReplica")]
61    ImportNamespace {
62        capability: Capability,
63        #[debug("reply")]
64        reply: oneshot::Sender<Result<NamespaceId>>,
65    },
66    #[display("ListAuthors")]
67    ListAuthors {
68        #[debug("reply")]
69        reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
70    },
71    #[display("ListReplicas")]
72    ListReplicas {
73        #[debug("reply")]
74        reply: mpsc::Sender<RpcResult<ListResponse>>,
75    },
76    #[display("ContentHashes")]
77    ContentHashes {
78        #[debug("reply")]
79        reply: oneshot::Sender<Result<ContentHashesIterator>>,
80    },
81    #[display("FlushStore")]
82    FlushStore {
83        #[debug("reply")]
84        reply: oneshot::Sender<Result<()>>,
85    },
86    #[display("Replica({}, {})", _0.fmt_short(), _1)]
87    Replica(NamespaceId, ReplicaAction),
88    #[display("Shutdown")]
89    Shutdown {
90        #[debug("reply")]
91        reply: Option<oneshot::Sender<Store>>,
92    },
93}
94
95#[derive(derive_more::Debug, strum::Display)]
96enum ReplicaAction {
97    Open {
98        #[debug("reply")]
99        reply: oneshot::Sender<Result<()>>,
100        opts: OpenOpts,
101    },
102    Close {
103        #[debug("reply")]
104        reply: oneshot::Sender<Result<bool>>,
105    },
106    GetState {
107        #[debug("reply")]
108        reply: oneshot::Sender<Result<OpenState>>,
109    },
110    SetSync {
111        sync: bool,
112        #[debug("reply")]
113        reply: oneshot::Sender<Result<()>>,
114    },
115    Subscribe {
116        sender: async_channel::Sender<Event>,
117        #[debug("reply")]
118        reply: oneshot::Sender<Result<()>>,
119    },
120    Unsubscribe {
121        sender: async_channel::Sender<Event>,
122        #[debug("reply")]
123        reply: oneshot::Sender<Result<()>>,
124    },
125    InsertLocal {
126        author: AuthorId,
127        key: Bytes,
128        hash: Hash,
129        len: u64,
130        #[debug("reply")]
131        reply: oneshot::Sender<Result<()>>,
132    },
133    DeletePrefix {
134        author: AuthorId,
135        key: Bytes,
136        #[debug("reply")]
137        reply: oneshot::Sender<Result<usize>>,
138    },
139    InsertRemote {
140        entry: SignedEntry,
141        from: PeerIdBytes,
142        content_status: ContentStatus,
143        #[debug("reply")]
144        reply: oneshot::Sender<Result<()>>,
145    },
146    SyncInitialMessage {
147        #[debug("reply")]
148        reply: oneshot::Sender<Result<Message<SignedEntry>>>,
149    },
150    SyncProcessMessage {
151        message: Message<SignedEntry>,
152        from: PeerIdBytes,
153        state: SyncOutcome,
154        #[debug("reply")]
155        reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
156    },
157    GetSyncPeers {
158        #[debug("reply")]
159        reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
160    },
161    RegisterUsefulPeer {
162        peer: PeerIdBytes,
163        #[debug("reply")]
164        reply: oneshot::Sender<Result<()>>,
165    },
166    GetExact {
167        author: AuthorId,
168        key: Bytes,
169        include_empty: bool,
170        reply: oneshot::Sender<Result<Option<SignedEntry>>>,
171    },
172    GetMany {
173        query: Query,
174        reply: mpsc::Sender<RpcResult<SignedEntry>>,
175    },
176    DropReplica {
177        reply: oneshot::Sender<Result<()>>,
178    },
179    ExportSecretKey {
180        reply: oneshot::Sender<Result<NamespaceSecret>>,
181    },
182    HasNewsForUs {
183        heads: AuthorHeads,
184        #[debug("reply")]
185        reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
186    },
187    SetDownloadPolicy {
188        policy: DownloadPolicy,
189        #[debug("reply")]
190        reply: oneshot::Sender<Result<()>>,
191    },
192    GetDownloadPolicy {
193        #[debug("reply")]
194        reply: oneshot::Sender<Result<DownloadPolicy>>,
195    },
196}
197
198/// The state for an open replica.
199#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
200pub struct OpenState {
201    /// Whether to accept sync requests for this replica.
202    pub sync: bool,
203    /// How many event subscriptions are open
204    pub subscribers: usize,
205    /// By how many handles the replica is currently held open
206    pub handles: usize,
207}
208
209#[derive(Debug)]
210struct OpenReplica {
211    info: ReplicaInfo,
212    sync: bool,
213    handles: usize,
214}
215
216/// The [`SyncHandle`] controls an actor thread which executes replica and store operations.
217///
218/// The [`SyncHandle`] exposes async methods which all send messages into the actor thread, usually
219/// returning something via a return channel. The actor thread itself is a regular [`std::thread`]
220/// which processes incoming messages sequentially.
221///
222/// The handle is cheaply cloneable. Once the last clone is dropped, the actor thread is joined.
223/// The thread will finish processing all messages in the channel queue, and then exit.
224/// To prevent this last drop from blocking the calling thread, you can call [`SyncHandle::shutdown`]
225/// and await its result before dropping the last [`SyncHandle`]. This ensures that
226/// waiting for the actor to finish happens in an async context, and therefore that the final
227/// [`SyncHandle::drop`] will not block.
228#[derive(Debug, Clone)]
229pub struct SyncHandle {
230    tx: async_channel::Sender<Action>,
231    #[cfg(wasm_browser)]
232    #[allow(unused)]
233    join_handle: Arc<Option<n0_future::task::JoinHandle<()>>>,
234    #[cfg(not(wasm_browser))]
235    join_handle: Arc<Option<std::thread::JoinHandle<()>>>,
236    metrics: Arc<Metrics>,
237}
238
239/// Options when opening a replica.
240#[derive(Debug, Default)]
241pub struct OpenOpts {
242    /// Set to true to set sync state to true.
243    pub sync: bool,
244    /// Optionally subscribe to replica events.
245    pub subscribe: Option<async_channel::Sender<Event>>,
246}
247
248impl OpenOpts {
249    /// Set sync state to true.
250    pub fn sync(mut self) -> Self {
251        self.sync = true;
252        self
253    }
254    /// Subscribe to replica events.
255    pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
256        self.subscribe = Some(subscribe);
257        self
258    }
259}
260
261#[allow(missing_docs)]
262impl SyncHandle {
263    /// Spawn a sync actor and return a handle.
264    pub fn spawn(
265        store: Store,
266        content_status_callback: Option<ContentStatusCallback>,
267        me: String,
268    ) -> SyncHandle {
269        let metrics = Arc::new(Metrics::default());
270        let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
271        let actor = Actor {
272            store,
273            states: Default::default(),
274            action_rx,
275            content_status_callback,
276            tasks: Default::default(),
277            metrics: metrics.clone(),
278        };
279
280        let span = error_span!("sync", %me);
281        #[cfg(wasm_browser)]
282        let join_handle = n0_future::task::spawn(actor.run_async().instrument(span));
283
284        #[cfg(not(wasm_browser))]
285        let join_handle = std::thread::Builder::new()
286            .name("sync-actor".to_string())
287            .spawn(move || {
288                let _enter = span.enter();
289
290                if let Err(err) = actor.run_in_thread() {
291                    error!("Sync actor closed with error: {err:?}");
292                }
293            })
294            .expect("failed to spawn thread");
295
296        let join_handle = Arc::new(Some(join_handle));
297        SyncHandle {
298            tx: action_tx,
299            join_handle,
300            metrics,
301        }
302    }
303
304    /// Returns the metrics collected in this sync actor.
305    pub fn metrics(&self) -> &Arc<Metrics> {
306        &self.metrics
307    }
308
309    pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
310        tracing::debug!("SyncHandle::open called");
311        let (reply, rx) = oneshot::channel();
312        let action = ReplicaAction::Open { reply, opts };
313        self.send_replica(namespace, action).await?;
314        rx.await?
315    }
316
317    pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
318        let (reply, rx) = oneshot::channel();
319        self.send_replica(namespace, ReplicaAction::Close { reply })
320            .await?;
321        rx.await?
322    }
323
324    pub async fn subscribe(
325        &self,
326        namespace: NamespaceId,
327        sender: async_channel::Sender<Event>,
328    ) -> Result<()> {
329        let (reply, rx) = oneshot::channel();
330        self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
331            .await?;
332        rx.await?
333    }
334
335    pub async fn unsubscribe(
336        &self,
337        namespace: NamespaceId,
338        sender: async_channel::Sender<Event>,
339    ) -> Result<()> {
340        let (reply, rx) = oneshot::channel();
341        self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
342            .await?;
343        rx.await?
344    }
345
346    pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
347        let (reply, rx) = oneshot::channel();
348        let action = ReplicaAction::SetSync { sync, reply };
349        self.send_replica(namespace, action).await?;
350        rx.await?
351    }
352
353    pub async fn insert_local(
354        &self,
355        namespace: NamespaceId,
356        author: AuthorId,
357        key: Bytes,
358        hash: Hash,
359        len: u64,
360    ) -> Result<()> {
361        let (reply, rx) = oneshot::channel();
362        let action = ReplicaAction::InsertLocal {
363            author,
364            key,
365            hash,
366            len,
367            reply,
368        };
369        self.send_replica(namespace, action).await?;
370        rx.await?
371    }
372
373    pub async fn delete_prefix(
374        &self,
375        namespace: NamespaceId,
376        author: AuthorId,
377        key: Bytes,
378    ) -> Result<usize> {
379        let (reply, rx) = oneshot::channel();
380        let action = ReplicaAction::DeletePrefix { author, key, reply };
381        self.send_replica(namespace, action).await?;
382        rx.await?
383    }
384
385    pub async fn insert_remote(
386        &self,
387        namespace: NamespaceId,
388        entry: SignedEntry,
389        from: PeerIdBytes,
390        content_status: ContentStatus,
391    ) -> Result<()> {
392        let (reply, rx) = oneshot::channel();
393        let action = ReplicaAction::InsertRemote {
394            entry,
395            from,
396            content_status,
397            reply,
398        };
399        self.send_replica(namespace, action).await?;
400        rx.await?
401    }
402
403    pub async fn sync_initial_message(
404        &self,
405        namespace: NamespaceId,
406    ) -> Result<Message<SignedEntry>> {
407        let (reply, rx) = oneshot::channel();
408        let action = ReplicaAction::SyncInitialMessage { reply };
409        self.send_replica(namespace, action).await?;
410        rx.await?
411    }
412
413    pub async fn sync_process_message(
414        &self,
415        namespace: NamespaceId,
416        message: Message<SignedEntry>,
417        from: PeerIdBytes,
418        state: SyncOutcome,
419    ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
420        let (reply, rx) = oneshot::channel();
421        let action = ReplicaAction::SyncProcessMessage {
422            reply,
423            message,
424            from,
425            state,
426        };
427        self.send_replica(namespace, action).await?;
428        rx.await?
429    }
430
431    pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
432        let (reply, rx) = oneshot::channel();
433        let action = ReplicaAction::GetSyncPeers { reply };
434        self.send_replica(namespace, action).await?;
435        rx.await?
436    }
437
438    pub async fn register_useful_peer(
439        &self,
440        namespace: NamespaceId,
441        peer: PeerIdBytes,
442    ) -> Result<()> {
443        let (reply, rx) = oneshot::channel();
444        let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
445        self.send_replica(namespace, action).await?;
446        rx.await?
447    }
448
449    pub async fn has_news_for_us(
450        &self,
451        namespace: NamespaceId,
452        heads: AuthorHeads,
453    ) -> Result<Option<NonZeroU64>> {
454        let (reply, rx) = oneshot::channel();
455        let action = ReplicaAction::HasNewsForUs { reply, heads };
456        self.send_replica(namespace, action).await?;
457        rx.await?
458    }
459
460    pub async fn get_many(
461        &self,
462        namespace: NamespaceId,
463        query: Query,
464        reply: mpsc::Sender<RpcResult<SignedEntry>>,
465    ) -> Result<()> {
466        let action = ReplicaAction::GetMany { query, reply };
467        self.send_replica(namespace, action).await?;
468        Ok(())
469    }
470
471    pub async fn get_exact(
472        &self,
473        namespace: NamespaceId,
474        author: AuthorId,
475        key: Bytes,
476        include_empty: bool,
477    ) -> Result<Option<SignedEntry>> {
478        let (reply, rx) = oneshot::channel();
479        let action = ReplicaAction::GetExact {
480            author,
481            key,
482            include_empty,
483            reply,
484        };
485        self.send_replica(namespace, action).await?;
486        rx.await?
487    }
488
489    pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
490        let (reply, rx) = oneshot::channel();
491        let action = ReplicaAction::DropReplica { reply };
492        self.send_replica(namespace, action).await?;
493        rx.await?
494    }
495
496    pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
497        let (reply, rx) = oneshot::channel();
498        let action = ReplicaAction::ExportSecretKey { reply };
499        self.send_replica(namespace, action).await?;
500        rx.await?
501    }
502
503    pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
504        let (reply, rx) = oneshot::channel();
505        let action = ReplicaAction::GetState { reply };
506        self.send_replica(namespace, action).await?;
507        rx.await?
508    }
509
510    pub async fn shutdown(&self) -> Result<Store> {
511        let (reply, rx) = oneshot::channel();
512        let action = Action::Shutdown { reply: Some(reply) };
513        self.send(action).await?;
514        let store = rx.await?;
515        Ok(store)
516    }
517
518    pub async fn list_authors(
519        &self,
520        reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
521    ) -> Result<()> {
522        self.send(Action::ListAuthors { reply }).await
523    }
524
525    pub async fn list_replicas(&self, reply: mpsc::Sender<RpcResult<ListResponse>>) -> Result<()> {
526        self.send(Action::ListReplicas { reply }).await
527    }
528
529    pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
530        let (reply, rx) = oneshot::channel();
531        self.send(Action::ImportAuthor { author, reply }).await?;
532        rx.await?
533    }
534
535    pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
536        let (reply, rx) = oneshot::channel();
537        self.send(Action::ExportAuthor { author, reply }).await?;
538        rx.await?
539    }
540
541    pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
542        let (reply, rx) = oneshot::channel();
543        self.send(Action::DeleteAuthor { author, reply }).await?;
544        rx.await?
545    }
546
547    pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
548        let (reply, rx) = oneshot::channel();
549        self.send(Action::ImportNamespace { capability, reply })
550            .await?;
551        rx.await?
552    }
553
554    pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
555        let (reply, rx) = oneshot::channel();
556        let action = ReplicaAction::GetDownloadPolicy { reply };
557        self.send_replica(namespace, action).await?;
558        rx.await?
559    }
560
561    pub async fn set_download_policy(
562        &self,
563        namespace: NamespaceId,
564        policy: DownloadPolicy,
565    ) -> Result<()> {
566        let (reply, rx) = oneshot::channel();
567        let action = ReplicaAction::SetDownloadPolicy { reply, policy };
568        self.send_replica(namespace, action).await?;
569        rx.await?
570    }
571
572    pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
573        let (reply, rx) = oneshot::channel();
574        self.send(Action::ContentHashes { reply }).await?;
575        rx.await?
576    }
577
578    /// Makes sure that all pending database operations are persisted to disk.
579    ///
580    /// Otherwise, database operations are batched into bigger transactions for speed.
581    /// Use this if you need to make sure something is written to the database
582    /// before another operation, e.g. to make sure sudden process exits don't corrupt
583    /// your application state.
584    ///
585    /// It's not necessary to call this function before shutdown, as `shutdown` will
586    /// trigger a flush on its own.
587    pub async fn flush_store(&self) -> Result<()> {
588        let (reply, rx) = oneshot::channel();
589        self.send(Action::FlushStore { reply }).await?;
590        rx.await?
591    }
592
593    async fn send(&self, action: Action) -> Result<()> {
594        self.tx
595            .send(action)
596            .await
597            .context("sending to iroh_docs actor failed")?;
598        Ok(())
599    }
600    async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
601        self.send(Action::Replica(namespace, action)).await?;
602        Ok(())
603    }
604}
605
606impl Drop for SyncHandle {
607    fn drop(&mut self) {
608        // this means we're dropping the last reference
609        #[allow(unused)]
610        if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
611            #[cfg(wasm_browser)]
612            {
613                let tx = self.tx.clone();
614                n0_future::task::spawn(async move {
615                    tx.send(Action::Shutdown { reply: None }).await.ok();
616                });
617            }
618            #[cfg(not(wasm_browser))]
619            {
620                // this call is the reason tx can not be a tokio mpsc channel.
621                // we have no control about where drop is called, yet tokio send_blocking panics
622                // when called from inside a tokio runtime.
623                self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
624                let handle = handle.take().expect("this can only run once");
625
626                if let Err(err) = handle.join() {
627                    warn!(?err, "Failed to join sync actor");
628                }
629            }
630        }
631    }
632}
633
634struct Actor {
635    store: Store,
636    states: OpenReplicas,
637    action_rx: async_channel::Receiver<Action>,
638    content_status_callback: Option<ContentStatusCallback>,
639    tasks: JoinSet<()>,
640    metrics: Arc<Metrics>,
641}
642
643impl Actor {
644    #[cfg(not(wasm_browser))]
645    fn run_in_thread(self) -> Result<()> {
646        let rt = tokio::runtime::Builder::new_current_thread()
647            .enable_time()
648            .build()?;
649        let local_set = tokio::task::LocalSet::new();
650        local_set.block_on(&rt, async move { self.run_async().await });
651        Ok(())
652    }
653
654    async fn run_async(mut self) {
655        let reply = loop {
656            let timeout = n0_future::time::sleep(MAX_COMMIT_DELAY);
657            tokio::pin!(timeout);
658            let action = tokio::select! {
659                _ = &mut timeout => {
660                    if let Err(cause) = self.store.flush() {
661                        error!(?cause, "failed to flush store");
662                    }
663                    continue;
664                }
665                action = self.action_rx.recv() => {
666                    match action {
667                        Ok(action) => action,
668                        Err(async_channel::RecvError) => {
669                            debug!("action channel disconnected");
670                            break None;
671                        }
672
673                    }
674                }
675            };
676            trace!(%action, "tick");
677            self.metrics.actor_tick_main.inc();
678            match action {
679                Action::Shutdown { reply } => {
680                    break reply;
681                }
682                action => {
683                    if self.on_action(action).await.is_err() {
684                        warn!("failed to send reply: receiver dropped");
685                    }
686                }
687            }
688        };
689
690        if let Err(cause) = self.store.flush() {
691            warn!(?cause, "failed to flush store");
692        }
693        self.close_all();
694        self.tasks.abort_all();
695        debug!("docs actor shutdown");
696        if let Some(reply) = reply {
697            reply.send(self.store).ok();
698        }
699    }
700
701    async fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
702        match action {
703            Action::Shutdown { .. } => {
704                unreachable!("Shutdown is handled in run()")
705            }
706            Action::ImportAuthor { author, reply } => {
707                let id = author.id();
708                send_reply(reply, self.store.import_author(author).map(|_| id))
709            }
710            Action::ExportAuthor { author, reply } => {
711                send_reply(reply, self.store.get_author(&author))
712            }
713            Action::DeleteAuthor { author, reply } => {
714                send_reply(reply, self.store.delete_author(author))
715            }
716            Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
717                let id = capability.id();
718                let outcome = this.store.import_namespace(capability.clone())?;
719                if let ImportNamespaceOutcome::Upgraded = outcome {
720                    if let Ok(state) = this.states.get_mut(&id) {
721                        state.info.merge_capability(capability)?;
722                    }
723                }
724                Ok(id)
725            }),
726            Action::ListAuthors { reply } => {
727                let iter = self
728                    .store
729                    .list_authors()
730                    .map(|a| a.map(|a| a.map(|a| AuthorListResponse { author_id: a.id() })));
731                self.tasks
732                    .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
733                Ok(())
734            }
735            Action::ListReplicas { reply } => {
736                let iter = self.store.list_namespaces();
737                let iter = iter.map(|inner| {
738                    inner.map(|res| res.map(|(id, capability)| ListResponse { id, capability }))
739                });
740                self.tasks
741                    .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
742                Ok(())
743            }
744            Action::ContentHashes { reply } => {
745                send_reply_with(reply, self, |this| this.store.content_hashes())
746            }
747            Action::FlushStore { reply } => send_reply(reply, self.store.flush()),
748            Action::Replica(namespace, action) => self.on_replica_action(namespace, action).await,
749        }
750    }
751
752    async fn on_replica_action(
753        &mut self,
754        namespace: NamespaceId,
755        action: ReplicaAction,
756    ) -> Result<(), SendReplyError> {
757        match action {
758            ReplicaAction::Open { reply, opts } => {
759                tracing::trace!("open in");
760                let res = self.open(namespace, opts);
761                tracing::trace!("open out");
762                send_reply(reply, res)
763            }
764            ReplicaAction::Close { reply } => {
765                let res = self.close(namespace);
766                // ignore errors when no receiver is present for close
767                reply.send(Ok(res)).ok();
768                Ok(())
769            }
770            ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
771                let state = this.states.get_mut(&namespace)?;
772                state.info.subscribe(sender);
773                Ok(())
774            }),
775            ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
776                let state = this.states.get_mut(&namespace)?;
777                state.info.unsubscribe(&sender);
778                drop(sender);
779                Ok(())
780            }),
781            ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
782                let state = this.states.get_mut(&namespace)?;
783                state.sync = sync;
784                Ok(())
785            }),
786            ReplicaAction::InsertLocal {
787                author,
788                key,
789                hash,
790                len,
791                reply,
792            } => {
793                send_reply_with_async(reply, self, async move |this| {
794                    let author = get_author(&mut this.store, &author)?;
795                    let mut replica = this.states.replica(namespace, &mut this.store)?;
796                    replica.insert(&key, &author, hash, len).await?;
797                    this.metrics.new_entries_local.inc();
798                    this.metrics.new_entries_local_size.inc_by(len);
799                    Ok(())
800                })
801                .await
802            }
803            ReplicaAction::DeletePrefix { author, key, reply } => {
804                send_reply_with_async(reply, self, async |this| {
805                    let author = get_author(&mut this.store, &author)?;
806                    let mut replica = this.states.replica(namespace, &mut this.store)?;
807                    let res = replica.delete_prefix(&key, &author).await?;
808                    Ok(res)
809                })
810                .await
811            }
812            ReplicaAction::InsertRemote {
813                entry,
814                from,
815                content_status,
816                reply,
817            } => {
818                send_reply_with_async(reply, self, async move |this| {
819                    let mut replica = this
820                        .states
821                        .replica_if_syncing(&namespace, &mut this.store)?;
822                    let len = entry.content_len();
823                    replica
824                        .insert_remote_entry(entry, from, content_status)
825                        .await?;
826                    this.metrics.new_entries_remote.inc();
827                    this.metrics.new_entries_remote_size.inc_by(len);
828                    Ok(())
829                })
830                .await
831            }
832
833            ReplicaAction::SyncInitialMessage { reply } => {
834                send_reply_with(reply, self, move |this| {
835                    let mut replica = this
836                        .states
837                        .replica_if_syncing(&namespace, &mut this.store)?;
838                    let res = replica.sync_initial_message()?;
839                    Ok(res)
840                })
841            }
842            ReplicaAction::SyncProcessMessage {
843                message,
844                from,
845                mut state,
846                reply,
847            } => {
848                let res = async {
849                    let mut replica = self
850                        .states
851                        .replica_if_syncing(&namespace, &mut self.store)?;
852                    let res = replica
853                        .sync_process_message(message, from, &mut state)
854                        .await?;
855                    Ok((res, state))
856                }
857                .await;
858                reply.send(res).map_err(send_reply_error)
859            }
860            ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
861                this.states.ensure_open(&namespace)?;
862                let peers = this.store.get_sync_peers(&namespace)?;
863                Ok(peers.map(|iter| iter.collect()))
864            }),
865            ReplicaAction::RegisterUsefulPeer { peer, reply } => {
866                let res = self.store.register_useful_peer(namespace, peer);
867                send_reply(reply, res)
868            }
869            ReplicaAction::GetExact {
870                author,
871                key,
872                include_empty,
873                reply,
874            } => send_reply_with(reply, self, move |this| {
875                this.states.ensure_open(&namespace)?;
876                this.store.get_exact(namespace, author, key, include_empty)
877            }),
878            ReplicaAction::GetMany { query, reply } => {
879                let iter = self
880                    .states
881                    .ensure_open(&namespace)
882                    .and_then(|_| self.store.get_many(namespace, query));
883                self.tasks
884                    .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
885                Ok(())
886            }
887            ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
888                this.close(namespace);
889                this.store.remove_replica(&namespace)
890            }),
891            ReplicaAction::ExportSecretKey { reply } => {
892                let res = self
893                    .states
894                    .get_mut(&namespace)
895                    .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
896                send_reply(reply, res)
897            }
898            ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
899                let state = this.states.get_mut(&namespace)?;
900                let handles = state.handles;
901                let sync = state.sync;
902                let subscribers = state.info.subscribers_count();
903                Ok(OpenState {
904                    handles,
905                    sync,
906                    subscribers,
907                })
908            }),
909            ReplicaAction::HasNewsForUs { heads, reply } => {
910                let res = self.store.has_news_for_us(namespace, &heads);
911                send_reply(reply, res)
912            }
913            ReplicaAction::SetDownloadPolicy { policy, reply } => {
914                send_reply(reply, self.store.set_download_policy(&namespace, policy))
915            }
916            ReplicaAction::GetDownloadPolicy { reply } => {
917                send_reply(reply, self.store.get_download_policy(&namespace))
918            }
919        }
920    }
921
922    fn close(&mut self, namespace: NamespaceId) -> bool {
923        let res = self.states.close(namespace);
924        if res {
925            self.store.close_replica(namespace);
926        }
927        res
928    }
929
930    fn close_all(&mut self) {
931        for id in self.states.close_all() {
932            self.store.close_replica(id);
933        }
934    }
935
936    fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
937        let open_cb = || {
938            let mut info = self.store.load_replica_info(&namespace)?;
939            if let Some(cb) = &self.content_status_callback {
940                info.set_content_status_callback(Arc::clone(cb));
941            }
942            Ok(info)
943        };
944        self.states.open_with(namespace, opts, open_cb)
945    }
946}
947
948#[derive(Default)]
949struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
950
951impl OpenReplicas {
952    fn replica<'a, 'b>(
953        &'a mut self,
954        namespace: NamespaceId,
955        store: &'b mut Store,
956    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
957        let state = self.get_mut(&namespace)?;
958        Ok(Replica::new(
959            StoreInstance::new(state.info.capability.id(), store),
960            &mut state.info,
961        ))
962    }
963
964    fn replica_if_syncing<'a, 'b>(
965        &'a mut self,
966        namespace: &NamespaceId,
967        store: &'b mut Store,
968    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
969        let state = self.get_mut(namespace)?;
970        anyhow::ensure!(state.sync, "sync is not enabled for replica");
971        Ok(Replica::new(
972            StoreInstance::new(state.info.capability.id(), store),
973            &mut state.info,
974        ))
975    }
976
977    fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
978        self.0.get_mut(namespace).context("replica not open")
979    }
980
981    fn is_open(&self, namespace: &NamespaceId) -> bool {
982        self.0.contains_key(namespace)
983    }
984
985    fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
986        match self.is_open(namespace) {
987            true => Ok(()),
988            false => Err(anyhow!("replica not open")),
989        }
990    }
991    fn open_with(
992        &mut self,
993        namespace: NamespaceId,
994        opts: OpenOpts,
995        mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
996    ) -> Result<()> {
997        match self.0.entry(namespace) {
998            hash_map::Entry::Vacant(e) => {
999                let mut info = open_cb()?;
1000                if let Some(sender) = opts.subscribe {
1001                    info.subscribe(sender);
1002                }
1003                debug!(namespace = %namespace.fmt_short(), "open");
1004                let state = OpenReplica {
1005                    info,
1006                    sync: opts.sync,
1007                    handles: 1,
1008                };
1009                e.insert(state);
1010            }
1011            hash_map::Entry::Occupied(mut e) => {
1012                let state = e.get_mut();
1013                state.handles += 1;
1014                state.sync = state.sync || opts.sync;
1015                if let Some(sender) = opts.subscribe {
1016                    state.info.subscribe(sender);
1017                }
1018            }
1019        }
1020        Ok(())
1021    }
1022    fn close(&mut self, namespace: NamespaceId) -> bool {
1023        match self.0.entry(namespace) {
1024            hash_map::Entry::Vacant(_e) => {
1025                warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
1026                true
1027            }
1028            hash_map::Entry::Occupied(mut e) => {
1029                let state = e.get_mut();
1030                tracing::debug!("STATE {state:?}");
1031                state.handles = state.handles.wrapping_sub(1);
1032                if state.handles == 0 {
1033                    let _ = e.remove_entry();
1034                    debug!(namespace = %namespace.fmt_short(), "close");
1035                    true
1036                } else {
1037                    false
1038                }
1039            }
1040        }
1041    }
1042
1043    fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
1044        self.0.drain().map(|(n, _s)| n)
1045    }
1046}
1047
1048async fn iter_to_irpc<T: irpc::RpcMessage>(
1049    channel: mpsc::Sender<RpcResult<T>>,
1050    iter: Result<impl Iterator<Item = Result<T>>>,
1051) -> Result<(), SendReplyError> {
1052    match iter {
1053        Err(err) => channel
1054            .send(Err(RpcError::new(&*err)))
1055            .await
1056            .map_err(send_reply_error)?,
1057        Ok(iter) => {
1058            for item in iter {
1059                let item = item.map_err(|err| RpcError::new(&*err));
1060                channel.send(item).await.map_err(send_reply_error)?;
1061            }
1062        }
1063    }
1064    Ok(())
1065}
1066
1067fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
1068    store.get_author(id)?.context("author not found")
1069}
1070
1071#[derive(Debug)]
1072struct SendReplyError;
1073
1074fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
1075    sender.send(value).map_err(send_reply_error)
1076}
1077
1078fn send_reply_with<T>(
1079    sender: oneshot::Sender<Result<T>>,
1080    this: &mut Actor,
1081    f: impl FnOnce(&mut Actor) -> Result<T>,
1082) -> Result<(), SendReplyError> {
1083    sender.send(f(this)).map_err(send_reply_error)
1084}
1085
1086async fn send_reply_with_async<T>(
1087    sender: oneshot::Sender<Result<T>>,
1088    this: &mut Actor,
1089    f: impl AsyncFnOnce(&mut Actor) -> Result<T>,
1090) -> Result<(), SendReplyError> {
1091    sender.send(f(this).await).map_err(send_reply_error)
1092}
1093
1094fn send_reply_error<T>(_err: T) -> SendReplyError {
1095    SendReplyError
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    use super::*;
1101    use crate::store;
1102    #[tokio::test]
1103    async fn open_close() -> anyhow::Result<()> {
1104        let store = store::Store::memory();
1105        let sync = SyncHandle::spawn(store, None, "foo".into());
1106        let namespace = NamespaceSecret::new(&mut rand::rng());
1107        let id = namespace.id();
1108        sync.import_namespace(namespace.into()).await?;
1109        sync.open(id, Default::default()).await?;
1110        let (tx, rx) = async_channel::bounded(10);
1111        sync.subscribe(id, tx).await?;
1112        sync.close(id).await?;
1113        assert!(rx.recv().await.is_err());
1114        Ok(())
1115    }
1116}