1use std::{
4 collections::{hash_map, HashMap},
5 num::NonZeroU64,
6 sync::Arc,
7};
8
9use anyhow::{anyhow, Context, Result};
10use bytes::Bytes;
11use futures_util::FutureExt;
12use iroh_blobs::Hash;
13use irpc::channel::mpsc;
14use n0_future::{task::JoinSet, time::Duration};
15use serde::{Deserialize, Serialize};
16use tokio::sync::oneshot;
17#[cfg(wasm_browser)]
18use tracing::Instrument;
19use tracing::{debug, error, error_span, trace, warn};
20
21use crate::{
22 api::{
23 protocol::{AuthorListResponse, ListResponse},
24 RpcError, RpcResult,
25 },
26 metrics::Metrics,
27 ranger::Message,
28 store::{
29 fs::{ContentHashesIterator, StoreInstance},
30 DownloadPolicy, ImportNamespaceOutcome, Query, Store,
31 },
32 Author, AuthorHeads, AuthorId, Capability, ContentStatus, ContentStatusCallback, Event,
33 NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo, SignedEntry, SyncOutcome,
34};
35
36const ACTION_CAP: usize = 1024;
37pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
38
39#[derive(derive_more::Debug, derive_more::Display)]
40enum Action {
41 #[display("NewAuthor")]
42 ImportAuthor {
43 author: Author,
44 #[debug("reply")]
45 reply: oneshot::Sender<Result<AuthorId>>,
46 },
47 #[display("ExportAuthor")]
48 ExportAuthor {
49 author: AuthorId,
50 #[debug("reply")]
51 reply: oneshot::Sender<Result<Option<Author>>>,
52 },
53 #[display("DeleteAuthor")]
54 DeleteAuthor {
55 author: AuthorId,
56 #[debug("reply")]
57 reply: oneshot::Sender<Result<()>>,
58 },
59 #[display("NewReplica")]
60 ImportNamespace {
61 capability: Capability,
62 #[debug("reply")]
63 reply: oneshot::Sender<Result<NamespaceId>>,
64 },
65 #[display("ListAuthors")]
66 ListAuthors {
67 #[debug("reply")]
68 reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
69 },
70 #[display("ListReplicas")]
71 ListReplicas {
72 #[debug("reply")]
73 reply: mpsc::Sender<RpcResult<ListResponse>>,
74 },
75 #[display("ContentHashes")]
76 ContentHashes {
77 #[debug("reply")]
78 reply: oneshot::Sender<Result<ContentHashesIterator>>,
79 },
80 #[display("FlushStore")]
81 FlushStore {
82 #[debug("reply")]
83 reply: oneshot::Sender<Result<()>>,
84 },
85 #[display("Replica({}, {})", _0.fmt_short(), _1)]
86 Replica(NamespaceId, ReplicaAction),
87 #[display("Shutdown")]
88 Shutdown {
89 #[debug("reply")]
90 reply: Option<oneshot::Sender<Store>>,
91 },
92}
93
94#[derive(derive_more::Debug, strum::Display)]
95enum ReplicaAction {
96 Open {
97 #[debug("reply")]
98 reply: oneshot::Sender<Result<()>>,
99 opts: OpenOpts,
100 },
101 Close {
102 #[debug("reply")]
103 reply: oneshot::Sender<Result<bool>>,
104 },
105 GetState {
106 #[debug("reply")]
107 reply: oneshot::Sender<Result<OpenState>>,
108 },
109 SetSync {
110 sync: bool,
111 #[debug("reply")]
112 reply: oneshot::Sender<Result<()>>,
113 },
114 Subscribe {
115 sender: async_channel::Sender<Event>,
116 #[debug("reply")]
117 reply: oneshot::Sender<Result<()>>,
118 },
119 Unsubscribe {
120 sender: async_channel::Sender<Event>,
121 #[debug("reply")]
122 reply: oneshot::Sender<Result<()>>,
123 },
124 InsertLocal {
125 author: AuthorId,
126 key: Bytes,
127 hash: Hash,
128 len: u64,
129 #[debug("reply")]
130 reply: oneshot::Sender<Result<()>>,
131 },
132 DeletePrefix {
133 author: AuthorId,
134 key: Bytes,
135 #[debug("reply")]
136 reply: oneshot::Sender<Result<usize>>,
137 },
138 InsertRemote {
139 entry: SignedEntry,
140 from: PeerIdBytes,
141 content_status: ContentStatus,
142 #[debug("reply")]
143 reply: oneshot::Sender<Result<()>>,
144 },
145 SyncInitialMessage {
146 #[debug("reply")]
147 reply: oneshot::Sender<Result<Message<SignedEntry>>>,
148 },
149 SyncProcessMessage {
150 message: Message<SignedEntry>,
151 from: PeerIdBytes,
152 state: SyncOutcome,
153 #[debug("reply")]
154 reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
155 },
156 GetSyncPeers {
157 #[debug("reply")]
158 reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
159 },
160 RegisterUsefulPeer {
161 peer: PeerIdBytes,
162 #[debug("reply")]
163 reply: oneshot::Sender<Result<()>>,
164 },
165 GetExact {
166 author: AuthorId,
167 key: Bytes,
168 include_empty: bool,
169 reply: oneshot::Sender<Result<Option<SignedEntry>>>,
170 },
171 GetMany {
172 query: Query,
173 reply: mpsc::Sender<RpcResult<SignedEntry>>,
174 },
175 DropReplica {
176 reply: oneshot::Sender<Result<()>>,
177 },
178 ExportSecretKey {
179 reply: oneshot::Sender<Result<NamespaceSecret>>,
180 },
181 HasNewsForUs {
182 heads: AuthorHeads,
183 #[debug("reply")]
184 reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
185 },
186 SetDownloadPolicy {
187 policy: DownloadPolicy,
188 #[debug("reply")]
189 reply: oneshot::Sender<Result<()>>,
190 },
191 GetDownloadPolicy {
192 #[debug("reply")]
193 reply: oneshot::Sender<Result<DownloadPolicy>>,
194 },
195}
196
197#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
199pub struct OpenState {
200 pub sync: bool,
202 pub subscribers: usize,
204 pub handles: usize,
206}
207
208#[derive(Debug)]
209struct OpenReplica {
210 info: ReplicaInfo,
211 sync: bool,
212 handles: usize,
213}
214
215#[derive(Debug, Clone)]
228pub struct SyncHandle {
229 tx: async_channel::Sender<Action>,
230 #[cfg(wasm_browser)]
231 #[allow(unused)]
232 join_handle: Arc<Option<n0_future::task::JoinHandle<()>>>,
233 #[cfg(not(wasm_browser))]
234 join_handle: Arc<Option<std::thread::JoinHandle<()>>>,
235 metrics: Arc<Metrics>,
236}
237
238#[derive(Debug, Default)]
240pub struct OpenOpts {
241 pub sync: bool,
243 pub subscribe: Option<async_channel::Sender<Event>>,
245}
246
247impl OpenOpts {
248 pub fn sync(mut self) -> Self {
250 self.sync = true;
251 self
252 }
253 pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
255 self.subscribe = Some(subscribe);
256 self
257 }
258}
259
260#[allow(missing_docs)]
261impl SyncHandle {
262 pub fn spawn(
264 store: Store,
265 content_status_callback: Option<ContentStatusCallback>,
266 me: String,
267 ) -> SyncHandle {
268 let metrics = Arc::new(Metrics::default());
269 let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
270 let actor = Actor {
271 store,
272 states: Default::default(),
273 action_rx,
274 content_status_callback,
275 tasks: Default::default(),
276 metrics: metrics.clone(),
277 };
278
279 let span = error_span!("sync", %me);
280 #[cfg(wasm_browser)]
281 let join_handle = n0_future::task::spawn(actor.run_async().instrument(span));
282
283 #[cfg(not(wasm_browser))]
284 let join_handle = std::thread::Builder::new()
285 .name("sync-actor".to_string())
286 .spawn(move || {
287 let _enter = span.enter();
288
289 if let Err(err) = actor.run_in_thread() {
290 error!("Sync actor closed with error: {err:?}");
291 }
292 })
293 .expect("failed to spawn thread");
294
295 let join_handle = Arc::new(Some(join_handle));
296 SyncHandle {
297 tx: action_tx,
298 join_handle,
299 metrics,
300 }
301 }
302
303 pub fn metrics(&self) -> &Arc<Metrics> {
305 &self.metrics
306 }
307
308 pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
309 tracing::debug!("SyncHandle::open called");
310 let (reply, rx) = oneshot::channel();
311 let action = ReplicaAction::Open { reply, opts };
312 self.send_replica(namespace, action).await?;
313 rx.await?
314 }
315
316 pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
317 let (reply, rx) = oneshot::channel();
318 self.send_replica(namespace, ReplicaAction::Close { reply })
319 .await?;
320 rx.await?
321 }
322
323 pub async fn subscribe(
324 &self,
325 namespace: NamespaceId,
326 sender: async_channel::Sender<Event>,
327 ) -> Result<()> {
328 let (reply, rx) = oneshot::channel();
329 self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
330 .await?;
331 rx.await?
332 }
333
334 pub async fn unsubscribe(
335 &self,
336 namespace: NamespaceId,
337 sender: async_channel::Sender<Event>,
338 ) -> Result<()> {
339 let (reply, rx) = oneshot::channel();
340 self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
341 .await?;
342 rx.await?
343 }
344
345 pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
346 let (reply, rx) = oneshot::channel();
347 let action = ReplicaAction::SetSync { sync, reply };
348 self.send_replica(namespace, action).await?;
349 rx.await?
350 }
351
352 pub async fn insert_local(
353 &self,
354 namespace: NamespaceId,
355 author: AuthorId,
356 key: Bytes,
357 hash: Hash,
358 len: u64,
359 ) -> Result<()> {
360 let (reply, rx) = oneshot::channel();
361 let action = ReplicaAction::InsertLocal {
362 author,
363 key,
364 hash,
365 len,
366 reply,
367 };
368 self.send_replica(namespace, action).await?;
369 rx.await?
370 }
371
372 pub async fn delete_prefix(
373 &self,
374 namespace: NamespaceId,
375 author: AuthorId,
376 key: Bytes,
377 ) -> Result<usize> {
378 let (reply, rx) = oneshot::channel();
379 let action = ReplicaAction::DeletePrefix { author, key, reply };
380 self.send_replica(namespace, action).await?;
381 rx.await?
382 }
383
384 pub async fn insert_remote(
385 &self,
386 namespace: NamespaceId,
387 entry: SignedEntry,
388 from: PeerIdBytes,
389 content_status: ContentStatus,
390 ) -> Result<()> {
391 let (reply, rx) = oneshot::channel();
392 let action = ReplicaAction::InsertRemote {
393 entry,
394 from,
395 content_status,
396 reply,
397 };
398 self.send_replica(namespace, action).await?;
399 rx.await?
400 }
401
402 pub async fn sync_initial_message(
403 &self,
404 namespace: NamespaceId,
405 ) -> Result<Message<SignedEntry>> {
406 let (reply, rx) = oneshot::channel();
407 let action = ReplicaAction::SyncInitialMessage { reply };
408 self.send_replica(namespace, action).await?;
409 rx.await?
410 }
411
412 pub async fn sync_process_message(
413 &self,
414 namespace: NamespaceId,
415 message: Message<SignedEntry>,
416 from: PeerIdBytes,
417 state: SyncOutcome,
418 ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
419 let (reply, rx) = oneshot::channel();
420 let action = ReplicaAction::SyncProcessMessage {
421 reply,
422 message,
423 from,
424 state,
425 };
426 self.send_replica(namespace, action).await?;
427 rx.await?
428 }
429
430 pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
431 let (reply, rx) = oneshot::channel();
432 let action = ReplicaAction::GetSyncPeers { reply };
433 self.send_replica(namespace, action).await?;
434 rx.await?
435 }
436
437 pub async fn register_useful_peer(
438 &self,
439 namespace: NamespaceId,
440 peer: PeerIdBytes,
441 ) -> Result<()> {
442 let (reply, rx) = oneshot::channel();
443 let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
444 self.send_replica(namespace, action).await?;
445 rx.await?
446 }
447
448 pub async fn has_news_for_us(
449 &self,
450 namespace: NamespaceId,
451 heads: AuthorHeads,
452 ) -> Result<Option<NonZeroU64>> {
453 let (reply, rx) = oneshot::channel();
454 let action = ReplicaAction::HasNewsForUs { reply, heads };
455 self.send_replica(namespace, action).await?;
456 rx.await?
457 }
458
459 pub async fn get_many(
460 &self,
461 namespace: NamespaceId,
462 query: Query,
463 reply: mpsc::Sender<RpcResult<SignedEntry>>,
464 ) -> Result<()> {
465 let action = ReplicaAction::GetMany { query, reply };
466 self.send_replica(namespace, action).await?;
467 Ok(())
468 }
469
470 pub async fn get_exact(
471 &self,
472 namespace: NamespaceId,
473 author: AuthorId,
474 key: Bytes,
475 include_empty: bool,
476 ) -> Result<Option<SignedEntry>> {
477 let (reply, rx) = oneshot::channel();
478 let action = ReplicaAction::GetExact {
479 author,
480 key,
481 include_empty,
482 reply,
483 };
484 self.send_replica(namespace, action).await?;
485 rx.await?
486 }
487
488 pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
489 let (reply, rx) = oneshot::channel();
490 let action = ReplicaAction::DropReplica { reply };
491 self.send_replica(namespace, action).await?;
492 rx.await?
493 }
494
495 pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
496 let (reply, rx) = oneshot::channel();
497 let action = ReplicaAction::ExportSecretKey { reply };
498 self.send_replica(namespace, action).await?;
499 rx.await?
500 }
501
502 pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
503 let (reply, rx) = oneshot::channel();
504 let action = ReplicaAction::GetState { reply };
505 self.send_replica(namespace, action).await?;
506 rx.await?
507 }
508
509 pub async fn shutdown(&self) -> Result<Store> {
510 let (reply, rx) = oneshot::channel();
511 let action = Action::Shutdown { reply: Some(reply) };
512 self.send(action).await?;
513 let store = rx.await?;
514 Ok(store)
515 }
516
517 pub async fn list_authors(
518 &self,
519 reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
520 ) -> Result<()> {
521 self.send(Action::ListAuthors { reply }).await
522 }
523
524 pub async fn list_replicas(&self, reply: mpsc::Sender<RpcResult<ListResponse>>) -> Result<()> {
525 self.send(Action::ListReplicas { reply }).await
526 }
527
528 pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
529 let (reply, rx) = oneshot::channel();
530 self.send(Action::ImportAuthor { author, reply }).await?;
531 rx.await?
532 }
533
534 pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
535 let (reply, rx) = oneshot::channel();
536 self.send(Action::ExportAuthor { author, reply }).await?;
537 rx.await?
538 }
539
540 pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
541 let (reply, rx) = oneshot::channel();
542 self.send(Action::DeleteAuthor { author, reply }).await?;
543 rx.await?
544 }
545
546 pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
547 let (reply, rx) = oneshot::channel();
548 self.send(Action::ImportNamespace { capability, reply })
549 .await?;
550 rx.await?
551 }
552
553 pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
554 let (reply, rx) = oneshot::channel();
555 let action = ReplicaAction::GetDownloadPolicy { reply };
556 self.send_replica(namespace, action).await?;
557 rx.await?
558 }
559
560 pub async fn set_download_policy(
561 &self,
562 namespace: NamespaceId,
563 policy: DownloadPolicy,
564 ) -> Result<()> {
565 let (reply, rx) = oneshot::channel();
566 let action = ReplicaAction::SetDownloadPolicy { reply, policy };
567 self.send_replica(namespace, action).await?;
568 rx.await?
569 }
570
571 pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
572 let (reply, rx) = oneshot::channel();
573 self.send(Action::ContentHashes { reply }).await?;
574 rx.await?
575 }
576
577 pub async fn flush_store(&self) -> Result<()> {
587 let (reply, rx) = oneshot::channel();
588 self.send(Action::FlushStore { reply }).await?;
589 rx.await?
590 }
591
592 async fn send(&self, action: Action) -> Result<()> {
593 self.tx
594 .send(action)
595 .await
596 .context("sending to iroh_docs actor failed")?;
597 Ok(())
598 }
599 async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
600 self.send(Action::Replica(namespace, action)).await?;
601 Ok(())
602 }
603}
604
605impl Drop for SyncHandle {
606 fn drop(&mut self) {
607 #[allow(unused)]
609 if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
610 #[cfg(wasm_browser)]
611 {
612 let tx = self.tx.clone();
613 n0_future::task::spawn(async move {
614 tx.send(Action::Shutdown { reply: None }).await.ok();
615 });
616 }
617 #[cfg(not(wasm_browser))]
618 {
619 self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
623 let handle = handle.take().expect("this can only run once");
624
625 if let Err(err) = handle.join() {
626 warn!(?err, "Failed to join sync actor");
627 }
628 }
629 }
630 }
631}
632
633struct Actor {
634 store: Store,
635 states: OpenReplicas,
636 action_rx: async_channel::Receiver<Action>,
637 content_status_callback: Option<ContentStatusCallback>,
638 tasks: JoinSet<()>,
639 metrics: Arc<Metrics>,
640}
641
642impl Actor {
643 #[cfg(not(wasm_browser))]
644 fn run_in_thread(self) -> Result<()> {
645 let rt = tokio::runtime::Builder::new_current_thread()
646 .enable_time()
647 .build()?;
648 let local_set = tokio::task::LocalSet::new();
649 local_set.block_on(&rt, async move { self.run_async().await });
650 Ok(())
651 }
652
653 async fn run_async(mut self) {
654 let reply = loop {
655 let timeout = n0_future::time::sleep(MAX_COMMIT_DELAY);
656 tokio::pin!(timeout);
657 let action = tokio::select! {
658 _ = &mut timeout => {
659 if let Err(cause) = self.store.flush() {
660 error!(?cause, "failed to flush store");
661 }
662 continue;
663 }
664 action = self.action_rx.recv() => {
665 match action {
666 Ok(action) => action,
667 Err(async_channel::RecvError) => {
668 debug!("action channel disconnected");
669 break None;
670 }
671
672 }
673 }
674 };
675 trace!(%action, "tick");
676 self.metrics.actor_tick_main.inc();
677 match action {
678 Action::Shutdown { reply } => {
679 break reply;
680 }
681 action => {
682 if self.on_action(action).await.is_err() {
683 warn!("failed to send reply: receiver dropped");
684 }
685 }
686 }
687 };
688
689 if let Err(cause) = self.store.flush() {
690 warn!(?cause, "failed to flush store");
691 }
692 self.close_all();
693 self.tasks.abort_all();
694 debug!("docs actor shutdown");
695 if let Some(reply) = reply {
696 reply.send(self.store).ok();
697 }
698 }
699
700 async fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
701 match action {
702 Action::Shutdown { .. } => {
703 unreachable!("Shutdown is handled in run()")
704 }
705 Action::ImportAuthor { author, reply } => {
706 let id = author.id();
707 send_reply(reply, self.store.import_author(author).map(|_| id))
708 }
709 Action::ExportAuthor { author, reply } => {
710 send_reply(reply, self.store.get_author(&author))
711 }
712 Action::DeleteAuthor { author, reply } => {
713 send_reply(reply, self.store.delete_author(author))
714 }
715 Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
716 let id = capability.id();
717 let outcome = this.store.import_namespace(capability.clone())?;
718 if let ImportNamespaceOutcome::Upgraded = outcome {
719 if let Ok(state) = this.states.get_mut(&id) {
720 state.info.merge_capability(capability)?;
721 }
722 }
723 Ok(id)
724 }),
725 Action::ListAuthors { reply } => {
726 let iter = self
727 .store
728 .list_authors()
729 .map(|a| a.map(|a| a.map(|a| AuthorListResponse { author_id: a.id() })));
730 self.tasks
731 .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
732 Ok(())
733 }
734 Action::ListReplicas { reply } => {
735 let iter = self.store.list_namespaces();
736 let iter = iter.map(|inner| {
737 inner.map(|res| res.map(|(id, capability)| ListResponse { id, capability }))
738 });
739 self.tasks
740 .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
741 Ok(())
742 }
743 Action::ContentHashes { reply } => {
744 send_reply_with(reply, self, |this| this.store.content_hashes())
745 }
746 Action::FlushStore { reply } => send_reply(reply, self.store.flush()),
747 Action::Replica(namespace, action) => self.on_replica_action(namespace, action).await,
748 }
749 }
750
751 async fn on_replica_action(
752 &mut self,
753 namespace: NamespaceId,
754 action: ReplicaAction,
755 ) -> Result<(), SendReplyError> {
756 match action {
757 ReplicaAction::Open { reply, opts } => {
758 tracing::trace!("open in");
759 let res = self.open(namespace, opts);
760 tracing::trace!("open out");
761 send_reply(reply, res)
762 }
763 ReplicaAction::Close { reply } => {
764 let res = self.close(namespace);
765 reply.send(Ok(res)).ok();
767 Ok(())
768 }
769 ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
770 let state = this.states.get_mut(&namespace)?;
771 state.info.subscribe(sender);
772 Ok(())
773 }),
774 ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
775 let state = this.states.get_mut(&namespace)?;
776 state.info.unsubscribe(&sender);
777 drop(sender);
778 Ok(())
779 }),
780 ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
781 let state = this.states.get_mut(&namespace)?;
782 state.sync = sync;
783 Ok(())
784 }),
785 ReplicaAction::InsertLocal {
786 author,
787 key,
788 hash,
789 len,
790 reply,
791 } => {
792 send_reply_with_async(reply, self, async move |this| {
793 let author = get_author(&mut this.store, &author)?;
794 let mut replica = this.states.replica(namespace, &mut this.store)?;
795 replica.insert(&key, &author, hash, len).await?;
796 this.metrics.new_entries_local.inc();
797 this.metrics.new_entries_local_size.inc_by(len);
798 Ok(())
799 })
800 .await
801 }
802 ReplicaAction::DeletePrefix { author, key, reply } => {
803 send_reply_with_async(reply, self, async |this| {
804 let author = get_author(&mut this.store, &author)?;
805 let mut replica = this.states.replica(namespace, &mut this.store)?;
806 let res = replica.delete_prefix(&key, &author).await?;
807 Ok(res)
808 })
809 .await
810 }
811 ReplicaAction::InsertRemote {
812 entry,
813 from,
814 content_status,
815 reply,
816 } => {
817 send_reply_with_async(reply, self, async move |this| {
818 let mut replica = this
819 .states
820 .replica_if_syncing(&namespace, &mut this.store)?;
821 let len = entry.content_len();
822 replica
823 .insert_remote_entry(entry, from, content_status)
824 .await?;
825 this.metrics.new_entries_remote.inc();
826 this.metrics.new_entries_remote_size.inc_by(len);
827 Ok(())
828 })
829 .await
830 }
831
832 ReplicaAction::SyncInitialMessage { reply } => {
833 send_reply_with(reply, self, move |this| {
834 let mut replica = this
835 .states
836 .replica_if_syncing(&namespace, &mut this.store)?;
837 let res = replica.sync_initial_message()?;
838 Ok(res)
839 })
840 }
841 ReplicaAction::SyncProcessMessage {
842 message,
843 from,
844 mut state,
845 reply,
846 } => {
847 let res = async {
848 let mut replica = self
849 .states
850 .replica_if_syncing(&namespace, &mut self.store)?;
851 let res = replica
852 .sync_process_message(message, from, &mut state)
853 .await?;
854 Ok((res, state))
855 }
856 .await;
857 reply.send(res).map_err(send_reply_error)
858 }
859 ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
860 this.states.ensure_open(&namespace)?;
861 let peers = this.store.get_sync_peers(&namespace)?;
862 Ok(peers.map(|iter| iter.collect()))
863 }),
864 ReplicaAction::RegisterUsefulPeer { peer, reply } => {
865 let res = self.store.register_useful_peer(namespace, peer);
866 send_reply(reply, res)
867 }
868 ReplicaAction::GetExact {
869 author,
870 key,
871 include_empty,
872 reply,
873 } => send_reply_with(reply, self, move |this| {
874 this.states.ensure_open(&namespace)?;
875 this.store.get_exact(namespace, author, key, include_empty)
876 }),
877 ReplicaAction::GetMany { query, reply } => {
878 let iter = self
879 .states
880 .ensure_open(&namespace)
881 .and_then(|_| self.store.get_many(namespace, query));
882 self.tasks
883 .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
884 Ok(())
885 }
886 ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
887 this.close(namespace);
888 this.store.remove_replica(&namespace)
889 }),
890 ReplicaAction::ExportSecretKey { reply } => {
891 let res = self
892 .states
893 .get_mut(&namespace)
894 .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
895 send_reply(reply, res)
896 }
897 ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
898 let state = this.states.get_mut(&namespace)?;
899 let handles = state.handles;
900 let sync = state.sync;
901 let subscribers = state.info.subscribers_count();
902 Ok(OpenState {
903 handles,
904 sync,
905 subscribers,
906 })
907 }),
908 ReplicaAction::HasNewsForUs { heads, reply } => {
909 let res = self.store.has_news_for_us(namespace, &heads);
910 send_reply(reply, res)
911 }
912 ReplicaAction::SetDownloadPolicy { policy, reply } => {
913 send_reply(reply, self.store.set_download_policy(&namespace, policy))
914 }
915 ReplicaAction::GetDownloadPolicy { reply } => {
916 send_reply(reply, self.store.get_download_policy(&namespace))
917 }
918 }
919 }
920
921 fn close(&mut self, namespace: NamespaceId) -> bool {
922 let res = self.states.close(namespace);
923 if res {
924 self.store.close_replica(namespace);
925 }
926 res
927 }
928
929 fn close_all(&mut self) {
930 for id in self.states.close_all() {
931 self.store.close_replica(id);
932 }
933 }
934
935 fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
936 let open_cb = || {
937 let mut info = self.store.load_replica_info(&namespace)?;
938 if let Some(cb) = &self.content_status_callback {
939 info.set_content_status_callback(Arc::clone(cb));
940 }
941 Ok(info)
942 };
943 self.states.open_with(namespace, opts, open_cb)
944 }
945}
946
947#[derive(Default)]
948struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
949
950impl OpenReplicas {
951 fn replica<'a, 'b>(
952 &'a mut self,
953 namespace: NamespaceId,
954 store: &'b mut Store,
955 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
956 let state = self.get_mut(&namespace)?;
957 Ok(Replica::new(
958 StoreInstance::new(state.info.capability.id(), store),
959 &mut state.info,
960 ))
961 }
962
963 fn replica_if_syncing<'a, 'b>(
964 &'a mut self,
965 namespace: &NamespaceId,
966 store: &'b mut Store,
967 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
968 let state = self.get_mut(namespace)?;
969 anyhow::ensure!(state.sync, "sync is not enabled for replica");
970 Ok(Replica::new(
971 StoreInstance::new(state.info.capability.id(), store),
972 &mut state.info,
973 ))
974 }
975
976 fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
977 self.0.get_mut(namespace).context("replica not open")
978 }
979
980 fn is_open(&self, namespace: &NamespaceId) -> bool {
981 self.0.contains_key(namespace)
982 }
983
984 fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
985 match self.is_open(namespace) {
986 true => Ok(()),
987 false => Err(anyhow!("replica not open")),
988 }
989 }
990 fn open_with(
991 &mut self,
992 namespace: NamespaceId,
993 opts: OpenOpts,
994 mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
995 ) -> Result<()> {
996 match self.0.entry(namespace) {
997 hash_map::Entry::Vacant(e) => {
998 let mut info = open_cb()?;
999 if let Some(sender) = opts.subscribe {
1000 info.subscribe(sender);
1001 }
1002 debug!(namespace = %namespace.fmt_short(), "open");
1003 let state = OpenReplica {
1004 info,
1005 sync: opts.sync,
1006 handles: 1,
1007 };
1008 e.insert(state);
1009 }
1010 hash_map::Entry::Occupied(mut e) => {
1011 let state = e.get_mut();
1012 state.handles += 1;
1013 state.sync = state.sync || opts.sync;
1014 if let Some(sender) = opts.subscribe {
1015 state.info.subscribe(sender);
1016 }
1017 }
1018 }
1019 Ok(())
1020 }
1021 fn close(&mut self, namespace: NamespaceId) -> bool {
1022 match self.0.entry(namespace) {
1023 hash_map::Entry::Vacant(_e) => {
1024 warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
1025 true
1026 }
1027 hash_map::Entry::Occupied(mut e) => {
1028 let state = e.get_mut();
1029 tracing::debug!("STATE {state:?}");
1030 state.handles = state.handles.wrapping_sub(1);
1031 if state.handles == 0 {
1032 let _ = e.remove_entry();
1033 debug!(namespace = %namespace.fmt_short(), "close");
1034 true
1035 } else {
1036 false
1037 }
1038 }
1039 }
1040 }
1041
1042 fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
1043 self.0.drain().map(|(n, _s)| n)
1044 }
1045}
1046
1047async fn iter_to_irpc<T: irpc::RpcMessage>(
1048 channel: mpsc::Sender<RpcResult<T>>,
1049 iter: Result<impl Iterator<Item = Result<T>>>,
1050) -> Result<(), SendReplyError> {
1051 match iter {
1052 Err(err) => channel
1053 .send(Err(RpcError::new(&*err)))
1054 .await
1055 .map_err(send_reply_error)?,
1056 Ok(iter) => {
1057 for item in iter {
1058 let item = item.map_err(|err| RpcError::new(&*err));
1059 channel.send(item).await.map_err(send_reply_error)?;
1060 }
1061 }
1062 }
1063 Ok(())
1064}
1065
1066fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
1067 store.get_author(id)?.context("author not found")
1068}
1069
1070#[derive(Debug)]
1071struct SendReplyError;
1072
1073fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
1074 sender.send(value).map_err(send_reply_error)
1075}
1076
1077fn send_reply_with<T>(
1078 sender: oneshot::Sender<Result<T>>,
1079 this: &mut Actor,
1080 f: impl FnOnce(&mut Actor) -> Result<T>,
1081) -> Result<(), SendReplyError> {
1082 sender.send(f(this)).map_err(send_reply_error)
1083}
1084
1085async fn send_reply_with_async<T>(
1086 sender: oneshot::Sender<Result<T>>,
1087 this: &mut Actor,
1088 f: impl AsyncFnOnce(&mut Actor) -> Result<T>,
1089) -> Result<(), SendReplyError> {
1090 sender.send(f(this).await).map_err(send_reply_error)
1091}
1092
1093fn send_reply_error<T>(_err: T) -> SendReplyError {
1094 SendReplyError
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099 use super::*;
1100 use crate::store;
1101 #[tokio::test]
1102 async fn open_close() -> anyhow::Result<()> {
1103 let store = store::Store::memory();
1104 let sync = SyncHandle::spawn(store, None, "foo".into());
1105 let namespace = NamespaceSecret::new(&mut rand::rng());
1106 let id = namespace.id();
1107 sync.import_namespace(namespace.into()).await?;
1108 sync.open(id, Default::default()).await?;
1109 let (tx, rx) = async_channel::bounded(10);
1110 sync.subscribe(id, tx).await?;
1111 sync.close(id).await?;
1112 assert!(rx.recv().await.is_err());
1113 Ok(())
1114 }
1115}