iroh_docs/
actor.rs

1//! This contains an actor spawned on a separate thread to process replica and store operations.
2
3use std::{
4    collections::{hash_map, HashMap},
5    num::NonZeroU64,
6    sync::Arc,
7};
8
9use anyhow::{anyhow, Context, Result};
10use bytes::Bytes;
11use iroh_blobs::Hash;
12use irpc::channel::mpsc;
13use n0_future::{task::JoinSet, time::Duration, TryFutureExt};
14use serde::{Deserialize, Serialize};
15use tokio::sync::oneshot;
16#[cfg(wasm_browser)]
17use tracing::Instrument;
18use tracing::{debug, error, error_span, trace, warn};
19
20use crate::{
21    api::{
22        protocol::{AuthorListResponse, ListResponse},
23        RpcError, RpcResult,
24    },
25    metrics::Metrics,
26    ranger::Message,
27    store::{
28        fs::{ContentHashesIterator, StoreInstance},
29        DownloadPolicy, ImportNamespaceOutcome, Query, Store,
30    },
31    Author, AuthorHeads, AuthorId, Capability, ContentStatus, ContentStatusCallback, Event,
32    NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo, SignedEntry, SyncOutcome,
33};
34
35const ACTION_CAP: usize = 1024;
36pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
37
38#[derive(derive_more::Debug, derive_more::Display)]
39enum Action {
40    #[display("NewAuthor")]
41    ImportAuthor {
42        author: Author,
43        #[debug("reply")]
44        reply: oneshot::Sender<Result<AuthorId>>,
45    },
46    #[display("ExportAuthor")]
47    ExportAuthor {
48        author: AuthorId,
49        #[debug("reply")]
50        reply: oneshot::Sender<Result<Option<Author>>>,
51    },
52    #[display("DeleteAuthor")]
53    DeleteAuthor {
54        author: AuthorId,
55        #[debug("reply")]
56        reply: oneshot::Sender<Result<()>>,
57    },
58    #[display("NewReplica")]
59    ImportNamespace {
60        capability: Capability,
61        #[debug("reply")]
62        reply: oneshot::Sender<Result<NamespaceId>>,
63    },
64    #[display("ListAuthors")]
65    ListAuthors {
66        #[debug("reply")]
67        reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
68    },
69    #[display("ListReplicas")]
70    ListReplicas {
71        #[debug("reply")]
72        reply: mpsc::Sender<RpcResult<ListResponse>>,
73    },
74    #[display("ContentHashes")]
75    ContentHashes {
76        #[debug("reply")]
77        reply: oneshot::Sender<Result<ContentHashesIterator>>,
78    },
79    #[display("FlushStore")]
80    FlushStore {
81        #[debug("reply")]
82        reply: oneshot::Sender<Result<()>>,
83    },
84    #[display("Replica({}, {})", _0.fmt_short(), _1)]
85    Replica(NamespaceId, ReplicaAction),
86    #[display("Shutdown")]
87    Shutdown {
88        #[debug("reply")]
89        reply: Option<oneshot::Sender<Store>>,
90    },
91    #[cfg(test)]
92    #[display("DebugTasksLen")]
93    DebugTasksLen {
94        #[debug("reply")]
95        reply: oneshot::Sender<usize>,
96    },
97}
98
99#[derive(derive_more::Debug, strum::Display)]
100enum ReplicaAction {
101    Open {
102        #[debug("reply")]
103        reply: oneshot::Sender<Result<()>>,
104        opts: OpenOpts,
105    },
106    Close {
107        #[debug("reply")]
108        reply: oneshot::Sender<Result<bool>>,
109    },
110    GetState {
111        #[debug("reply")]
112        reply: oneshot::Sender<Result<OpenState>>,
113    },
114    SetSync {
115        sync: bool,
116        #[debug("reply")]
117        reply: oneshot::Sender<Result<()>>,
118    },
119    Subscribe {
120        sender: async_channel::Sender<Event>,
121        #[debug("reply")]
122        reply: oneshot::Sender<Result<()>>,
123    },
124    Unsubscribe {
125        sender: async_channel::Sender<Event>,
126        #[debug("reply")]
127        reply: oneshot::Sender<Result<()>>,
128    },
129    InsertLocal {
130        author: AuthorId,
131        key: Bytes,
132        hash: Hash,
133        len: u64,
134        #[debug("reply")]
135        reply: oneshot::Sender<Result<()>>,
136    },
137    DeletePrefix {
138        author: AuthorId,
139        key: Bytes,
140        #[debug("reply")]
141        reply: oneshot::Sender<Result<usize>>,
142    },
143    InsertRemote {
144        entry: SignedEntry,
145        from: PeerIdBytes,
146        content_status: ContentStatus,
147        #[debug("reply")]
148        reply: oneshot::Sender<Result<()>>,
149    },
150    SyncInitialMessage {
151        #[debug("reply")]
152        reply: oneshot::Sender<Result<Message<SignedEntry>>>,
153    },
154    SyncProcessMessage {
155        message: Message<SignedEntry>,
156        from: PeerIdBytes,
157        state: SyncOutcome,
158        #[debug("reply")]
159        reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
160    },
161    GetSyncPeers {
162        #[debug("reply")]
163        reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
164    },
165    RegisterUsefulPeer {
166        peer: PeerIdBytes,
167        #[debug("reply")]
168        reply: oneshot::Sender<Result<()>>,
169    },
170    GetExact {
171        author: AuthorId,
172        key: Bytes,
173        include_empty: bool,
174        reply: oneshot::Sender<Result<Option<SignedEntry>>>,
175    },
176    GetMany {
177        query: Query,
178        reply: mpsc::Sender<RpcResult<SignedEntry>>,
179    },
180    DropReplica {
181        reply: oneshot::Sender<Result<()>>,
182    },
183    ExportSecretKey {
184        reply: oneshot::Sender<Result<NamespaceSecret>>,
185    },
186    HasNewsForUs {
187        heads: AuthorHeads,
188        #[debug("reply")]
189        reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
190    },
191    SetDownloadPolicy {
192        policy: DownloadPolicy,
193        #[debug("reply")]
194        reply: oneshot::Sender<Result<()>>,
195    },
196    GetDownloadPolicy {
197        #[debug("reply")]
198        reply: oneshot::Sender<Result<DownloadPolicy>>,
199    },
200}
201
202/// The state for an open replica.
203#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
204pub struct OpenState {
205    /// Whether to accept sync requests for this replica.
206    pub sync: bool,
207    /// How many event subscriptions are open
208    pub subscribers: usize,
209    /// By how many handles the replica is currently held open
210    pub handles: usize,
211}
212
213#[derive(Debug)]
214struct OpenReplica {
215    info: ReplicaInfo,
216    sync: bool,
217    handles: usize,
218}
219
220/// The [`SyncHandle`] controls an actor thread which executes replica and store operations.
221///
222/// The [`SyncHandle`] exposes async methods which all send messages into the actor thread, usually
223/// returning something via a return channel. The actor thread itself is a regular [`std::thread`]
224/// which processes incoming messages sequentially.
225///
226/// The handle is cheaply cloneable. Once the last clone is dropped, the actor thread is joined.
227/// The thread will finish processing all messages in the channel queue, and then exit.
228/// To prevent this last drop from blocking the calling thread, you can call [`SyncHandle::shutdown`]
229/// and await its result before dropping the last [`SyncHandle`]. This ensures that
230/// waiting for the actor to finish happens in an async context, and therefore that the final
231/// [`SyncHandle::drop`] will not block.
232#[derive(Debug, Clone)]
233pub struct SyncHandle {
234    tx: async_channel::Sender<Action>,
235    #[cfg(wasm_browser)]
236    #[allow(unused)]
237    join_handle: Arc<Option<n0_future::task::JoinHandle<()>>>,
238    #[cfg(not(wasm_browser))]
239    join_handle: Arc<Option<std::thread::JoinHandle<()>>>,
240    metrics: Arc<Metrics>,
241}
242
243/// Options when opening a replica.
244#[derive(Debug, Default)]
245pub struct OpenOpts {
246    /// Set to true to set sync state to true.
247    pub sync: bool,
248    /// Optionally subscribe to replica events.
249    pub subscribe: Option<async_channel::Sender<Event>>,
250}
251
252impl OpenOpts {
253    /// Set sync state to true.
254    pub fn sync(mut self) -> Self {
255        self.sync = true;
256        self
257    }
258    /// Subscribe to replica events.
259    pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
260        self.subscribe = Some(subscribe);
261        self
262    }
263}
264
265#[allow(missing_docs)]
266impl SyncHandle {
267    /// Spawn a sync actor and return a handle.
268    pub fn spawn(
269        store: Store,
270        content_status_callback: Option<ContentStatusCallback>,
271        me: String,
272    ) -> SyncHandle {
273        let metrics = Arc::new(Metrics::default());
274        let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
275        let actor = Actor {
276            store,
277            states: Default::default(),
278            action_rx,
279            content_status_callback,
280            tasks: Default::default(),
281            metrics: metrics.clone(),
282        };
283
284        let span = error_span!("sync", %me);
285        #[cfg(wasm_browser)]
286        let join_handle = n0_future::task::spawn(actor.run_async().instrument(span));
287
288        #[cfg(not(wasm_browser))]
289        let join_handle = std::thread::Builder::new()
290            .name("sync-actor".to_string())
291            .spawn(move || {
292                let _enter = span.enter();
293
294                if let Err(err) = actor.run_in_thread() {
295                    error!("Sync actor closed with error: {err:?}");
296                }
297            })
298            .expect("failed to spawn thread");
299
300        let join_handle = Arc::new(Some(join_handle));
301        SyncHandle {
302            tx: action_tx,
303            join_handle,
304            metrics,
305        }
306    }
307
308    /// Returns the metrics collected in this sync actor.
309    pub fn metrics(&self) -> &Arc<Metrics> {
310        &self.metrics
311    }
312
313    pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
314        tracing::debug!("SyncHandle::open called");
315        let (reply, rx) = oneshot::channel();
316        let action = ReplicaAction::Open { reply, opts };
317        self.send_replica(namespace, action).await?;
318        rx.await?
319    }
320
321    pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
322        let (reply, rx) = oneshot::channel();
323        self.send_replica(namespace, ReplicaAction::Close { reply })
324            .await?;
325        rx.await?
326    }
327
328    pub async fn subscribe(
329        &self,
330        namespace: NamespaceId,
331        sender: async_channel::Sender<Event>,
332    ) -> Result<()> {
333        let (reply, rx) = oneshot::channel();
334        self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
335            .await?;
336        rx.await?
337    }
338
339    pub async fn unsubscribe(
340        &self,
341        namespace: NamespaceId,
342        sender: async_channel::Sender<Event>,
343    ) -> Result<()> {
344        let (reply, rx) = oneshot::channel();
345        self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
346            .await?;
347        rx.await?
348    }
349
350    pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
351        let (reply, rx) = oneshot::channel();
352        let action = ReplicaAction::SetSync { sync, reply };
353        self.send_replica(namespace, action).await?;
354        rx.await?
355    }
356
357    pub async fn insert_local(
358        &self,
359        namespace: NamespaceId,
360        author: AuthorId,
361        key: Bytes,
362        hash: Hash,
363        len: u64,
364    ) -> Result<()> {
365        let (reply, rx) = oneshot::channel();
366        let action = ReplicaAction::InsertLocal {
367            author,
368            key,
369            hash,
370            len,
371            reply,
372        };
373        self.send_replica(namespace, action).await?;
374        rx.await?
375    }
376
377    pub async fn delete_prefix(
378        &self,
379        namespace: NamespaceId,
380        author: AuthorId,
381        key: Bytes,
382    ) -> Result<usize> {
383        let (reply, rx) = oneshot::channel();
384        let action = ReplicaAction::DeletePrefix { author, key, reply };
385        self.send_replica(namespace, action).await?;
386        rx.await?
387    }
388
389    pub async fn insert_remote(
390        &self,
391        namespace: NamespaceId,
392        entry: SignedEntry,
393        from: PeerIdBytes,
394        content_status: ContentStatus,
395    ) -> Result<()> {
396        let (reply, rx) = oneshot::channel();
397        let action = ReplicaAction::InsertRemote {
398            entry,
399            from,
400            content_status,
401            reply,
402        };
403        self.send_replica(namespace, action).await?;
404        rx.await?
405    }
406
407    pub async fn sync_initial_message(
408        &self,
409        namespace: NamespaceId,
410    ) -> Result<Message<SignedEntry>> {
411        let (reply, rx) = oneshot::channel();
412        let action = ReplicaAction::SyncInitialMessage { reply };
413        self.send_replica(namespace, action).await?;
414        rx.await?
415    }
416
417    pub async fn sync_process_message(
418        &self,
419        namespace: NamespaceId,
420        message: Message<SignedEntry>,
421        from: PeerIdBytes,
422        state: SyncOutcome,
423    ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
424        let (reply, rx) = oneshot::channel();
425        let action = ReplicaAction::SyncProcessMessage {
426            reply,
427            message,
428            from,
429            state,
430        };
431        self.send_replica(namespace, action).await?;
432        rx.await?
433    }
434
435    pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
436        let (reply, rx) = oneshot::channel();
437        let action = ReplicaAction::GetSyncPeers { reply };
438        self.send_replica(namespace, action).await?;
439        rx.await?
440    }
441
442    pub async fn register_useful_peer(
443        &self,
444        namespace: NamespaceId,
445        peer: PeerIdBytes,
446    ) -> Result<()> {
447        let (reply, rx) = oneshot::channel();
448        let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
449        self.send_replica(namespace, action).await?;
450        rx.await?
451    }
452
453    pub async fn has_news_for_us(
454        &self,
455        namespace: NamespaceId,
456        heads: AuthorHeads,
457    ) -> Result<Option<NonZeroU64>> {
458        let (reply, rx) = oneshot::channel();
459        let action = ReplicaAction::HasNewsForUs { reply, heads };
460        self.send_replica(namespace, action).await?;
461        rx.await?
462    }
463
464    pub async fn get_many(
465        &self,
466        namespace: NamespaceId,
467        query: Query,
468        reply: mpsc::Sender<RpcResult<SignedEntry>>,
469    ) -> Result<()> {
470        let action = ReplicaAction::GetMany { query, reply };
471        self.send_replica(namespace, action).await?;
472        Ok(())
473    }
474
475    pub async fn get_exact(
476        &self,
477        namespace: NamespaceId,
478        author: AuthorId,
479        key: Bytes,
480        include_empty: bool,
481    ) -> Result<Option<SignedEntry>> {
482        let (reply, rx) = oneshot::channel();
483        let action = ReplicaAction::GetExact {
484            author,
485            key,
486            include_empty,
487            reply,
488        };
489        self.send_replica(namespace, action).await?;
490        rx.await?
491    }
492
493    pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
494        let (reply, rx) = oneshot::channel();
495        let action = ReplicaAction::DropReplica { reply };
496        self.send_replica(namespace, action).await?;
497        rx.await?
498    }
499
500    pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
501        let (reply, rx) = oneshot::channel();
502        let action = ReplicaAction::ExportSecretKey { reply };
503        self.send_replica(namespace, action).await?;
504        rx.await?
505    }
506
507    pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
508        let (reply, rx) = oneshot::channel();
509        let action = ReplicaAction::GetState { reply };
510        self.send_replica(namespace, action).await?;
511        rx.await?
512    }
513
514    pub async fn shutdown(&self) -> Result<Store> {
515        let (reply, rx) = oneshot::channel();
516        let action = Action::Shutdown { reply: Some(reply) };
517        self.send(action).await?;
518        let store = rx.await?;
519        Ok(store)
520    }
521
522    #[cfg(test)]
523    async fn debug_tasks_len(&self) -> Result<usize> {
524        let (reply, rx) = oneshot::channel();
525        self.send(Action::DebugTasksLen { reply }).await?;
526        Ok(rx.await?)
527    }
528
529    pub async fn list_authors(
530        &self,
531        reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
532    ) -> Result<()> {
533        self.send(Action::ListAuthors { reply }).await
534    }
535
536    pub async fn list_replicas(&self, reply: mpsc::Sender<RpcResult<ListResponse>>) -> Result<()> {
537        self.send(Action::ListReplicas { reply }).await
538    }
539
540    pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
541        let (reply, rx) = oneshot::channel();
542        self.send(Action::ImportAuthor { author, reply }).await?;
543        rx.await?
544    }
545
546    pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
547        let (reply, rx) = oneshot::channel();
548        self.send(Action::ExportAuthor { author, reply }).await?;
549        rx.await?
550    }
551
552    pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
553        let (reply, rx) = oneshot::channel();
554        self.send(Action::DeleteAuthor { author, reply }).await?;
555        rx.await?
556    }
557
558    pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
559        let (reply, rx) = oneshot::channel();
560        self.send(Action::ImportNamespace { capability, reply })
561            .await?;
562        rx.await?
563    }
564
565    pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
566        let (reply, rx) = oneshot::channel();
567        let action = ReplicaAction::GetDownloadPolicy { reply };
568        self.send_replica(namespace, action).await?;
569        rx.await?
570    }
571
572    pub async fn set_download_policy(
573        &self,
574        namespace: NamespaceId,
575        policy: DownloadPolicy,
576    ) -> Result<()> {
577        let (reply, rx) = oneshot::channel();
578        let action = ReplicaAction::SetDownloadPolicy { reply, policy };
579        self.send_replica(namespace, action).await?;
580        rx.await?
581    }
582
583    pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
584        let (reply, rx) = oneshot::channel();
585        self.send(Action::ContentHashes { reply }).await?;
586        rx.await?
587    }
588
589    /// Makes sure that all pending database operations are persisted to disk.
590    ///
591    /// Otherwise, database operations are batched into bigger transactions for speed.
592    /// Use this if you need to make sure something is written to the database
593    /// before another operation, e.g. to make sure sudden process exits don't corrupt
594    /// your application state.
595    ///
596    /// It's not necessary to call this function before shutdown, as `shutdown` will
597    /// trigger a flush on its own.
598    pub async fn flush_store(&self) -> Result<()> {
599        let (reply, rx) = oneshot::channel();
600        self.send(Action::FlushStore { reply }).await?;
601        rx.await?
602    }
603
604    async fn send(&self, action: Action) -> Result<()> {
605        self.tx
606            .send(action)
607            .await
608            .context("sending to iroh_docs actor failed")?;
609        Ok(())
610    }
611    async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
612        self.send(Action::Replica(namespace, action)).await?;
613        Ok(())
614    }
615}
616
617impl Drop for SyncHandle {
618    fn drop(&mut self) {
619        // this means we're dropping the last reference
620        #[allow(unused)]
621        if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
622            #[cfg(wasm_browser)]
623            {
624                let tx = self.tx.clone();
625                n0_future::task::spawn(async move {
626                    tx.send(Action::Shutdown { reply: None }).await.ok();
627                });
628            }
629            #[cfg(not(wasm_browser))]
630            {
631                // this call is the reason tx can not be a tokio mpsc channel.
632                // we have no control about where drop is called, yet tokio send_blocking panics
633                // when called from inside a tokio runtime.
634                self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
635                let handle = handle.take().expect("this can only run once");
636
637                if let Err(err) = handle.join() {
638                    warn!(?err, "Failed to join sync actor");
639                }
640            }
641        }
642    }
643}
644
645struct Actor {
646    store: Store,
647    states: OpenReplicas,
648    action_rx: async_channel::Receiver<Action>,
649    content_status_callback: Option<ContentStatusCallback>,
650    tasks: JoinSet<()>,
651    metrics: Arc<Metrics>,
652}
653
654impl Actor {
655    #[cfg(not(wasm_browser))]
656    fn run_in_thread(self) -> Result<()> {
657        let rt = tokio::runtime::Builder::new_current_thread()
658            .enable_time()
659            .build()?;
660        let local_set = tokio::task::LocalSet::new();
661        local_set.block_on(&rt, async move { self.run_async().await });
662        Ok(())
663    }
664
665    async fn run_async(mut self) {
666        let reply = loop {
667            let timeout = n0_future::time::sleep(MAX_COMMIT_DELAY);
668            tokio::pin!(timeout);
669            let action = tokio::select! {
670                _ = &mut timeout => {
671                    if let Err(cause) = self.store.flush() {
672                        error!(?cause, "failed to flush store");
673                    }
674                    continue;
675                }
676                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
677                    if let Err(err) = res {
678                        if !err.is_cancelled() {
679                            warn!(?err, "actor reply-streamer task panicked");
680                        }
681                    }
682                    continue;
683                }
684                action = self.action_rx.recv() => {
685                    match action {
686                        Ok(action) => action,
687                        Err(async_channel::RecvError) => {
688                            debug!("action channel disconnected");
689                            break None;
690                        }
691
692                    }
693                }
694            };
695            trace!(%action, "tick");
696            self.metrics.actor_tick_main.inc();
697            match action {
698                Action::Shutdown { reply } => {
699                    break reply;
700                }
701                action => {
702                    if self.on_action(action).await.is_err() {
703                        warn!("failed to send reply: receiver dropped");
704                    }
705                }
706            }
707        };
708
709        if let Err(cause) = self.store.flush() {
710            warn!(?cause, "failed to flush store");
711        }
712        self.close_all();
713        self.tasks.abort_all();
714        debug!("docs actor shutdown");
715        if let Some(reply) = reply {
716            reply.send(self.store).ok();
717        }
718    }
719
720    async fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
721        match action {
722            Action::Shutdown { .. } => {
723                unreachable!("Shutdown is handled in run()")
724            }
725            #[cfg(test)]
726            Action::DebugTasksLen { reply } => send_reply(reply, self.tasks.len()),
727            Action::ImportAuthor { author, reply } => {
728                let id = author.id();
729                send_reply(reply, self.store.import_author(author).map(|_| id))
730            }
731            Action::ExportAuthor { author, reply } => {
732                send_reply(reply, self.store.get_author(&author))
733            }
734            Action::DeleteAuthor { author, reply } => {
735                send_reply(reply, self.store.delete_author(author))
736            }
737            Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
738                let id = capability.id();
739                let outcome = this.store.import_namespace(capability.clone())?;
740                if let ImportNamespaceOutcome::Upgraded = outcome {
741                    if let Ok(state) = this.states.get_mut(&id) {
742                        state.info.merge_capability(capability)?;
743                    }
744                }
745                Ok(id)
746            }),
747            Action::ListAuthors { reply } => {
748                let iter = self
749                    .store
750                    .list_authors()
751                    .map(|a| a.map(|a| a.map(|a| AuthorListResponse { author_id: a.id() })));
752                self.tasks.spawn_local(async move {
753                    iter_to_irpc(reply, iter).await.ok();
754                });
755                Ok(())
756            }
757            Action::ListReplicas { reply } => {
758                let iter = self.store.list_namespaces();
759                let iter = iter.map(|inner| {
760                    inner.map(|res| res.map(|(id, capability)| ListResponse { id, capability }))
761                });
762                self.tasks.spawn_local(async move {
763                    iter_to_irpc(reply, iter).await.ok();
764                });
765                Ok(())
766            }
767            Action::ContentHashes { reply } => {
768                send_reply_with(reply, self, |this| this.store.content_hashes())
769            }
770            Action::FlushStore { reply } => send_reply(reply, self.store.flush()),
771            Action::Replica(namespace, action) => self.on_replica_action(namespace, action).await,
772        }
773    }
774
775    async fn on_replica_action(
776        &mut self,
777        namespace: NamespaceId,
778        action: ReplicaAction,
779    ) -> Result<(), SendReplyError> {
780        match action {
781            ReplicaAction::Open { reply, opts } => {
782                tracing::trace!("open in");
783                let res = self.open(namespace, opts);
784                tracing::trace!("open out");
785                send_reply(reply, res)
786            }
787            ReplicaAction::Close { reply } => {
788                let res = self.close(namespace);
789                // ignore errors when no receiver is present for close
790                reply.send(Ok(res)).ok();
791                Ok(())
792            }
793            ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
794                let state = this.states.get_mut(&namespace)?;
795                state.info.subscribe(sender);
796                Ok(())
797            }),
798            ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
799                let state = this.states.get_mut(&namespace)?;
800                state.info.unsubscribe(&sender);
801                drop(sender);
802                Ok(())
803            }),
804            ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
805                let state = this.states.get_mut(&namespace)?;
806                state.sync = sync;
807                Ok(())
808            }),
809            ReplicaAction::InsertLocal {
810                author,
811                key,
812                hash,
813                len,
814                reply,
815            } => {
816                send_reply_with_async(reply, self, async move |this| {
817                    let author = get_author(&mut this.store, &author)?;
818                    let mut replica = this.states.replica(namespace, &mut this.store)?;
819                    replica.insert(&key, &author, hash, len).await?;
820                    this.metrics.new_entries_local.inc();
821                    this.metrics.new_entries_local_size.inc_by(len);
822                    Ok(())
823                })
824                .await
825            }
826            ReplicaAction::DeletePrefix { author, key, reply } => {
827                send_reply_with_async(reply, self, async |this| {
828                    let author = get_author(&mut this.store, &author)?;
829                    let mut replica = this.states.replica(namespace, &mut this.store)?;
830                    let res = replica.delete_prefix(&key, &author).await?;
831                    Ok(res)
832                })
833                .await
834            }
835            ReplicaAction::InsertRemote {
836                entry,
837                from,
838                content_status,
839                reply,
840            } => {
841                send_reply_with_async(reply, self, async move |this| {
842                    let mut replica = this
843                        .states
844                        .replica_if_syncing(&namespace, &mut this.store)?;
845                    let len = entry.content_len();
846                    replica
847                        .insert_remote_entry(entry, from, content_status)
848                        .await?;
849                    this.metrics.new_entries_remote.inc();
850                    this.metrics.new_entries_remote_size.inc_by(len);
851                    Ok(())
852                })
853                .await
854            }
855
856            ReplicaAction::SyncInitialMessage { reply } => {
857                send_reply_with(reply, self, move |this| {
858                    let mut replica = this
859                        .states
860                        .replica_if_syncing(&namespace, &mut this.store)?;
861                    let res = replica.sync_initial_message()?;
862                    Ok(res)
863                })
864            }
865            ReplicaAction::SyncProcessMessage {
866                message,
867                from,
868                mut state,
869                reply,
870            } => {
871                let res = async {
872                    let mut replica = self
873                        .states
874                        .replica_if_syncing(&namespace, &mut self.store)?;
875                    let res = replica
876                        .sync_process_message(message, from, &mut state)
877                        .await?;
878                    Ok((res, state))
879                }
880                .await;
881                reply.send(res).map_err(send_reply_error)
882            }
883            ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
884                this.states.ensure_open(&namespace)?;
885                let peers = this.store.get_sync_peers(&namespace)?;
886                Ok(peers.map(|iter| iter.collect()))
887            }),
888            ReplicaAction::RegisterUsefulPeer { peer, reply } => {
889                let res = self.store.register_useful_peer(namespace, peer);
890                send_reply(reply, res)
891            }
892            ReplicaAction::GetExact {
893                author,
894                key,
895                include_empty,
896                reply,
897            } => send_reply_with(reply, self, move |this| {
898                this.states.ensure_open(&namespace)?;
899                this.store.get_exact(namespace, author, key, include_empty)
900            }),
901            ReplicaAction::GetMany { query, reply } => {
902                let iter = self
903                    .states
904                    .ensure_open(&namespace)
905                    .and_then(|_| self.store.get_many(namespace, query));
906                self.tasks
907                    .spawn_local(iter_to_irpc(reply, iter).map_ok_or_else(|_| (), |_| ()));
908                Ok(())
909            }
910            ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
911                this.close(namespace);
912                this.store.remove_replica(&namespace)
913            }),
914            ReplicaAction::ExportSecretKey { reply } => {
915                let res = self
916                    .states
917                    .get_mut(&namespace)
918                    .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
919                send_reply(reply, res)
920            }
921            ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
922                let state = this.states.get_mut(&namespace)?;
923                let handles = state.handles;
924                let sync = state.sync;
925                let subscribers = state.info.subscribers_count();
926                Ok(OpenState {
927                    handles,
928                    sync,
929                    subscribers,
930                })
931            }),
932            ReplicaAction::HasNewsForUs { heads, reply } => {
933                let res = self.store.has_news_for_us(namespace, &heads);
934                send_reply(reply, res)
935            }
936            ReplicaAction::SetDownloadPolicy { policy, reply } => {
937                send_reply(reply, self.store.set_download_policy(&namespace, policy))
938            }
939            ReplicaAction::GetDownloadPolicy { reply } => {
940                send_reply(reply, self.store.get_download_policy(&namespace))
941            }
942        }
943    }
944
945    fn close(&mut self, namespace: NamespaceId) -> bool {
946        let res = self.states.close(namespace);
947        if res {
948            self.store.close_replica(namespace);
949        }
950        res
951    }
952
953    fn close_all(&mut self) {
954        for id in self.states.close_all() {
955            self.store.close_replica(id);
956        }
957    }
958
959    fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
960        let open_cb = || {
961            let mut info = self.store.load_replica_info(&namespace)?;
962            if let Some(cb) = &self.content_status_callback {
963                info.set_content_status_callback(Arc::clone(cb));
964            }
965            Ok(info)
966        };
967        self.states.open_with(namespace, opts, open_cb)
968    }
969}
970
971#[derive(Default)]
972struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
973
974impl OpenReplicas {
975    fn replica<'a, 'b>(
976        &'a mut self,
977        namespace: NamespaceId,
978        store: &'b mut Store,
979    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
980        let state = self.get_mut(&namespace)?;
981        Ok(Replica::new(
982            StoreInstance::new(state.info.capability.id(), store),
983            &mut state.info,
984        ))
985    }
986
987    fn replica_if_syncing<'a, 'b>(
988        &'a mut self,
989        namespace: &NamespaceId,
990        store: &'b mut Store,
991    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
992        let state = self.get_mut(namespace)?;
993        anyhow::ensure!(state.sync, "sync is not enabled for replica");
994        Ok(Replica::new(
995            StoreInstance::new(state.info.capability.id(), store),
996            &mut state.info,
997        ))
998    }
999
1000    fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
1001        self.0.get_mut(namespace).context("replica not open")
1002    }
1003
1004    fn is_open(&self, namespace: &NamespaceId) -> bool {
1005        self.0.contains_key(namespace)
1006    }
1007
1008    fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
1009        match self.is_open(namespace) {
1010            true => Ok(()),
1011            false => Err(anyhow!("replica not open")),
1012        }
1013    }
1014    fn open_with(
1015        &mut self,
1016        namespace: NamespaceId,
1017        opts: OpenOpts,
1018        mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
1019    ) -> Result<()> {
1020        match self.0.entry(namespace) {
1021            hash_map::Entry::Vacant(e) => {
1022                let mut info = open_cb()?;
1023                if let Some(sender) = opts.subscribe {
1024                    info.subscribe(sender);
1025                }
1026                debug!(namespace = %namespace.fmt_short(), "open");
1027                let state = OpenReplica {
1028                    info,
1029                    sync: opts.sync,
1030                    handles: 1,
1031                };
1032                e.insert(state);
1033            }
1034            hash_map::Entry::Occupied(mut e) => {
1035                let state = e.get_mut();
1036                state.handles += 1;
1037                state.sync = state.sync || opts.sync;
1038                if let Some(sender) = opts.subscribe {
1039                    state.info.subscribe(sender);
1040                }
1041            }
1042        }
1043        Ok(())
1044    }
1045    fn close(&mut self, namespace: NamespaceId) -> bool {
1046        match self.0.entry(namespace) {
1047            hash_map::Entry::Vacant(_e) => {
1048                warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
1049                true
1050            }
1051            hash_map::Entry::Occupied(mut e) => {
1052                let state = e.get_mut();
1053                tracing::debug!("STATE {state:?}");
1054                state.handles = state.handles.wrapping_sub(1);
1055                if state.handles == 0 {
1056                    let _ = e.remove_entry();
1057                    debug!(namespace = %namespace.fmt_short(), "close");
1058                    true
1059                } else {
1060                    false
1061                }
1062            }
1063        }
1064    }
1065
1066    fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
1067        self.0.drain().map(|(n, _s)| n)
1068    }
1069}
1070
1071async fn iter_to_irpc<T: irpc::RpcMessage>(
1072    channel: mpsc::Sender<RpcResult<T>>,
1073    iter: Result<impl Iterator<Item = Result<T>>>,
1074) -> Result<(), SendReplyError> {
1075    match iter {
1076        Err(err) => channel
1077            .send(Err(RpcError::new(&*err)))
1078            .await
1079            .map_err(send_reply_error)?,
1080        Ok(iter) => {
1081            for item in iter {
1082                let item = item.map_err(|err| RpcError::new(&*err));
1083                channel.send(item).await.map_err(send_reply_error)?;
1084            }
1085        }
1086    }
1087    Ok(())
1088}
1089
1090fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
1091    store.get_author(id)?.context("author not found")
1092}
1093
1094#[derive(Debug)]
1095struct SendReplyError;
1096
1097fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
1098    sender.send(value).map_err(send_reply_error)
1099}
1100
1101fn send_reply_with<T>(
1102    sender: oneshot::Sender<Result<T>>,
1103    this: &mut Actor,
1104    f: impl FnOnce(&mut Actor) -> Result<T>,
1105) -> Result<(), SendReplyError> {
1106    sender.send(f(this)).map_err(send_reply_error)
1107}
1108
1109async fn send_reply_with_async<T>(
1110    sender: oneshot::Sender<Result<T>>,
1111    this: &mut Actor,
1112    f: impl AsyncFnOnce(&mut Actor) -> Result<T>,
1113) -> Result<(), SendReplyError> {
1114    sender.send(f(this).await).map_err(send_reply_error)
1115}
1116
1117fn send_reply_error<T>(_err: T) -> SendReplyError {
1118    SendReplyError
1119}
1120
1121#[cfg(test)]
1122mod tests {
1123    use super::*;
1124    use crate::store;
1125    #[tokio::test]
1126    async fn open_close() -> anyhow::Result<()> {
1127        let store = store::Store::memory();
1128        let sync = SyncHandle::spawn(store, None, "foo".into());
1129        let namespace = NamespaceSecret::new(&mut rand::rng());
1130        let id = namespace.id();
1131        sync.import_namespace(namespace.into()).await?;
1132        sync.open(id, Default::default()).await?;
1133        let (tx, rx) = async_channel::bounded(10);
1134        sync.subscribe(id, tx).await?;
1135        sync.close(id).await?;
1136        assert!(rx.recv().await.is_err());
1137        Ok(())
1138    }
1139
1140    /// Tests that streamer tasks spawned into `Actor.tasks` are reaped
1141    /// once they complete.
1142    ///
1143    /// The three streaming actions (`ListAuthors`, `ListReplicas`, and
1144    /// `ReplicaAction::GetMany`) each `spawn_local` a task into
1145    /// `Actor.tasks` to drive their reply channel. The actor must
1146    /// `join_next` those tasks once they finish, otherwise the
1147    /// `JoinSet` grows without bound for the lifetime of the actor.
1148    #[tokio::test]
1149    async fn actor_tasks_joinset_drain() -> anyhow::Result<()> {
1150        let store = store::Store::memory();
1151        let sync = SyncHandle::spawn(store, None, "drain".into());
1152
1153        let namespace = NamespaceSecret::new(&mut rand::rng());
1154        let id = namespace.id();
1155        sync.import_namespace(namespace.into()).await?;
1156        sync.open(id, Default::default()).await?;
1157
1158        const ITERATIONS: usize = 1000;
1159
1160        for _ in 0..ITERATIONS {
1161            let (tx, mut rx) = mpsc::channel(64);
1162            sync.list_authors(tx).await?;
1163            while rx.recv().await?.is_some() {}
1164        }
1165
1166        for _ in 0..ITERATIONS {
1167            let (tx, mut rx) = mpsc::channel(64);
1168            sync.list_replicas(tx).await?;
1169            while rx.recv().await?.is_some() {}
1170        }
1171
1172        for _ in 0..ITERATIONS {
1173            let (tx, mut rx) = mpsc::channel(64);
1174            sync.get_many(id, store::Query::all().into(), tx).await?;
1175            while rx.recv().await?.is_some() {}
1176        }
1177
1178        let mut last = sync.debug_tasks_len().await?;
1179        let deadline = std::time::Instant::now() + Duration::from_secs(10);
1180        while last > 16 && std::time::Instant::now() < deadline {
1181            tokio::time::sleep(Duration::from_millis(50)).await;
1182            last = sync.debug_tasks_len().await?;
1183        }
1184
1185        assert!(
1186            last <= 16,
1187            "residual Actor.tasks JoinSet len = {last}, expected <= 16 \
1188             (was the join_next arm in run_async lost? streamer tasks \
1189             for ListAuthors / ListReplicas / GetMany are not being reaped)"
1190        );
1191
1192        sync.close(id).await?;
1193        Ok(())
1194    }
1195}