1use std::{
4 collections::{hash_map, HashMap},
5 num::NonZeroU64,
6 sync::Arc,
7 time::Duration,
8};
9
10use anyhow::{anyhow, Context, Result};
11use bytes::Bytes;
12use futures_util::FutureExt;
13use iroh_blobs::Hash;
14use irpc::channel::mpsc;
15use n0_future::task::JoinSet;
16use serde::{Deserialize, Serialize};
17use tokio::sync::oneshot;
18#[cfg(wasm_browser)]
19use tracing::Instrument;
20use tracing::{debug, error, error_span, trace, warn};
21
22use crate::{
23 api::{
24 protocol::{AuthorListResponse, ListResponse},
25 RpcError, RpcResult,
26 },
27 metrics::Metrics,
28 ranger::Message,
29 store::{
30 fs::{ContentHashesIterator, StoreInstance},
31 DownloadPolicy, ImportNamespaceOutcome, Query, Store,
32 },
33 Author, AuthorHeads, AuthorId, Capability, ContentStatus, ContentStatusCallback, Event,
34 NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo, SignedEntry, SyncOutcome,
35};
36
37const ACTION_CAP: usize = 1024;
38pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
39
40#[derive(derive_more::Debug, derive_more::Display)]
41enum Action {
42 #[display("NewAuthor")]
43 ImportAuthor {
44 author: Author,
45 #[debug("reply")]
46 reply: oneshot::Sender<Result<AuthorId>>,
47 },
48 #[display("ExportAuthor")]
49 ExportAuthor {
50 author: AuthorId,
51 #[debug("reply")]
52 reply: oneshot::Sender<Result<Option<Author>>>,
53 },
54 #[display("DeleteAuthor")]
55 DeleteAuthor {
56 author: AuthorId,
57 #[debug("reply")]
58 reply: oneshot::Sender<Result<()>>,
59 },
60 #[display("NewReplica")]
61 ImportNamespace {
62 capability: Capability,
63 #[debug("reply")]
64 reply: oneshot::Sender<Result<NamespaceId>>,
65 },
66 #[display("ListAuthors")]
67 ListAuthors {
68 #[debug("reply")]
69 reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
70 },
71 #[display("ListReplicas")]
72 ListReplicas {
73 #[debug("reply")]
74 reply: mpsc::Sender<RpcResult<ListResponse>>,
75 },
76 #[display("ContentHashes")]
77 ContentHashes {
78 #[debug("reply")]
79 reply: oneshot::Sender<Result<ContentHashesIterator>>,
80 },
81 #[display("FlushStore")]
82 FlushStore {
83 #[debug("reply")]
84 reply: oneshot::Sender<Result<()>>,
85 },
86 #[display("Replica({}, {})", _0.fmt_short(), _1)]
87 Replica(NamespaceId, ReplicaAction),
88 #[display("Shutdown")]
89 Shutdown {
90 #[debug("reply")]
91 reply: Option<oneshot::Sender<Store>>,
92 },
93}
94
95#[derive(derive_more::Debug, strum::Display)]
96enum ReplicaAction {
97 Open {
98 #[debug("reply")]
99 reply: oneshot::Sender<Result<()>>,
100 opts: OpenOpts,
101 },
102 Close {
103 #[debug("reply")]
104 reply: oneshot::Sender<Result<bool>>,
105 },
106 GetState {
107 #[debug("reply")]
108 reply: oneshot::Sender<Result<OpenState>>,
109 },
110 SetSync {
111 sync: bool,
112 #[debug("reply")]
113 reply: oneshot::Sender<Result<()>>,
114 },
115 Subscribe {
116 sender: async_channel::Sender<Event>,
117 #[debug("reply")]
118 reply: oneshot::Sender<Result<()>>,
119 },
120 Unsubscribe {
121 sender: async_channel::Sender<Event>,
122 #[debug("reply")]
123 reply: oneshot::Sender<Result<()>>,
124 },
125 InsertLocal {
126 author: AuthorId,
127 key: Bytes,
128 hash: Hash,
129 len: u64,
130 #[debug("reply")]
131 reply: oneshot::Sender<Result<()>>,
132 },
133 DeletePrefix {
134 author: AuthorId,
135 key: Bytes,
136 #[debug("reply")]
137 reply: oneshot::Sender<Result<usize>>,
138 },
139 InsertRemote {
140 entry: SignedEntry,
141 from: PeerIdBytes,
142 content_status: ContentStatus,
143 #[debug("reply")]
144 reply: oneshot::Sender<Result<()>>,
145 },
146 SyncInitialMessage {
147 #[debug("reply")]
148 reply: oneshot::Sender<Result<Message<SignedEntry>>>,
149 },
150 SyncProcessMessage {
151 message: Message<SignedEntry>,
152 from: PeerIdBytes,
153 state: SyncOutcome,
154 #[debug("reply")]
155 reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
156 },
157 GetSyncPeers {
158 #[debug("reply")]
159 reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
160 },
161 RegisterUsefulPeer {
162 peer: PeerIdBytes,
163 #[debug("reply")]
164 reply: oneshot::Sender<Result<()>>,
165 },
166 GetExact {
167 author: AuthorId,
168 key: Bytes,
169 include_empty: bool,
170 reply: oneshot::Sender<Result<Option<SignedEntry>>>,
171 },
172 GetMany {
173 query: Query,
174 reply: mpsc::Sender<RpcResult<SignedEntry>>,
175 },
176 DropReplica {
177 reply: oneshot::Sender<Result<()>>,
178 },
179 ExportSecretKey {
180 reply: oneshot::Sender<Result<NamespaceSecret>>,
181 },
182 HasNewsForUs {
183 heads: AuthorHeads,
184 #[debug("reply")]
185 reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
186 },
187 SetDownloadPolicy {
188 policy: DownloadPolicy,
189 #[debug("reply")]
190 reply: oneshot::Sender<Result<()>>,
191 },
192 GetDownloadPolicy {
193 #[debug("reply")]
194 reply: oneshot::Sender<Result<DownloadPolicy>>,
195 },
196}
197
198#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
200pub struct OpenState {
201 pub sync: bool,
203 pub subscribers: usize,
205 pub handles: usize,
207}
208
209#[derive(Debug)]
210struct OpenReplica {
211 info: ReplicaInfo,
212 sync: bool,
213 handles: usize,
214}
215
216#[derive(Debug, Clone)]
229pub struct SyncHandle {
230 tx: async_channel::Sender<Action>,
231 #[cfg(wasm_browser)]
232 #[allow(unused)]
233 join_handle: Arc<Option<n0_future::task::JoinHandle<()>>>,
234 #[cfg(not(wasm_browser))]
235 join_handle: Arc<Option<std::thread::JoinHandle<()>>>,
236 metrics: Arc<Metrics>,
237}
238
239#[derive(Debug, Default)]
241pub struct OpenOpts {
242 pub sync: bool,
244 pub subscribe: Option<async_channel::Sender<Event>>,
246}
247
248impl OpenOpts {
249 pub fn sync(mut self) -> Self {
251 self.sync = true;
252 self
253 }
254 pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
256 self.subscribe = Some(subscribe);
257 self
258 }
259}
260
261#[allow(missing_docs)]
262impl SyncHandle {
263 pub fn spawn(
265 store: Store,
266 content_status_callback: Option<ContentStatusCallback>,
267 me: String,
268 ) -> SyncHandle {
269 let metrics = Arc::new(Metrics::default());
270 let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
271 let actor = Actor {
272 store,
273 states: Default::default(),
274 action_rx,
275 content_status_callback,
276 tasks: Default::default(),
277 metrics: metrics.clone(),
278 };
279
280 let span = error_span!("sync", %me);
281 #[cfg(wasm_browser)]
282 let join_handle = n0_future::task::spawn(actor.run_async().instrument(span));
283
284 #[cfg(not(wasm_browser))]
285 let join_handle = std::thread::Builder::new()
286 .name("sync-actor".to_string())
287 .spawn(move || {
288 let _enter = span.enter();
289
290 if let Err(err) = actor.run_in_thread() {
291 error!("Sync actor closed with error: {err:?}");
292 }
293 })
294 .expect("failed to spawn thread");
295
296 let join_handle = Arc::new(Some(join_handle));
297 SyncHandle {
298 tx: action_tx,
299 join_handle,
300 metrics,
301 }
302 }
303
304 pub fn metrics(&self) -> &Arc<Metrics> {
306 &self.metrics
307 }
308
309 pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
310 tracing::debug!("SyncHandle::open called");
311 let (reply, rx) = oneshot::channel();
312 let action = ReplicaAction::Open { reply, opts };
313 self.send_replica(namespace, action).await?;
314 rx.await?
315 }
316
317 pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
318 let (reply, rx) = oneshot::channel();
319 self.send_replica(namespace, ReplicaAction::Close { reply })
320 .await?;
321 rx.await?
322 }
323
324 pub async fn subscribe(
325 &self,
326 namespace: NamespaceId,
327 sender: async_channel::Sender<Event>,
328 ) -> Result<()> {
329 let (reply, rx) = oneshot::channel();
330 self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
331 .await?;
332 rx.await?
333 }
334
335 pub async fn unsubscribe(
336 &self,
337 namespace: NamespaceId,
338 sender: async_channel::Sender<Event>,
339 ) -> Result<()> {
340 let (reply, rx) = oneshot::channel();
341 self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
342 .await?;
343 rx.await?
344 }
345
346 pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
347 let (reply, rx) = oneshot::channel();
348 let action = ReplicaAction::SetSync { sync, reply };
349 self.send_replica(namespace, action).await?;
350 rx.await?
351 }
352
353 pub async fn insert_local(
354 &self,
355 namespace: NamespaceId,
356 author: AuthorId,
357 key: Bytes,
358 hash: Hash,
359 len: u64,
360 ) -> Result<()> {
361 let (reply, rx) = oneshot::channel();
362 let action = ReplicaAction::InsertLocal {
363 author,
364 key,
365 hash,
366 len,
367 reply,
368 };
369 self.send_replica(namespace, action).await?;
370 rx.await?
371 }
372
373 pub async fn delete_prefix(
374 &self,
375 namespace: NamespaceId,
376 author: AuthorId,
377 key: Bytes,
378 ) -> Result<usize> {
379 let (reply, rx) = oneshot::channel();
380 let action = ReplicaAction::DeletePrefix { author, key, reply };
381 self.send_replica(namespace, action).await?;
382 rx.await?
383 }
384
385 pub async fn insert_remote(
386 &self,
387 namespace: NamespaceId,
388 entry: SignedEntry,
389 from: PeerIdBytes,
390 content_status: ContentStatus,
391 ) -> Result<()> {
392 let (reply, rx) = oneshot::channel();
393 let action = ReplicaAction::InsertRemote {
394 entry,
395 from,
396 content_status,
397 reply,
398 };
399 self.send_replica(namespace, action).await?;
400 rx.await?
401 }
402
403 pub async fn sync_initial_message(
404 &self,
405 namespace: NamespaceId,
406 ) -> Result<Message<SignedEntry>> {
407 let (reply, rx) = oneshot::channel();
408 let action = ReplicaAction::SyncInitialMessage { reply };
409 self.send_replica(namespace, action).await?;
410 rx.await?
411 }
412
413 pub async fn sync_process_message(
414 &self,
415 namespace: NamespaceId,
416 message: Message<SignedEntry>,
417 from: PeerIdBytes,
418 state: SyncOutcome,
419 ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
420 let (reply, rx) = oneshot::channel();
421 let action = ReplicaAction::SyncProcessMessage {
422 reply,
423 message,
424 from,
425 state,
426 };
427 self.send_replica(namespace, action).await?;
428 rx.await?
429 }
430
431 pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
432 let (reply, rx) = oneshot::channel();
433 let action = ReplicaAction::GetSyncPeers { reply };
434 self.send_replica(namespace, action).await?;
435 rx.await?
436 }
437
438 pub async fn register_useful_peer(
439 &self,
440 namespace: NamespaceId,
441 peer: PeerIdBytes,
442 ) -> Result<()> {
443 let (reply, rx) = oneshot::channel();
444 let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
445 self.send_replica(namespace, action).await?;
446 rx.await?
447 }
448
449 pub async fn has_news_for_us(
450 &self,
451 namespace: NamespaceId,
452 heads: AuthorHeads,
453 ) -> Result<Option<NonZeroU64>> {
454 let (reply, rx) = oneshot::channel();
455 let action = ReplicaAction::HasNewsForUs { reply, heads };
456 self.send_replica(namespace, action).await?;
457 rx.await?
458 }
459
460 pub async fn get_many(
461 &self,
462 namespace: NamespaceId,
463 query: Query,
464 reply: mpsc::Sender<RpcResult<SignedEntry>>,
465 ) -> Result<()> {
466 let action = ReplicaAction::GetMany { query, reply };
467 self.send_replica(namespace, action).await?;
468 Ok(())
469 }
470
471 pub async fn get_exact(
472 &self,
473 namespace: NamespaceId,
474 author: AuthorId,
475 key: Bytes,
476 include_empty: bool,
477 ) -> Result<Option<SignedEntry>> {
478 let (reply, rx) = oneshot::channel();
479 let action = ReplicaAction::GetExact {
480 author,
481 key,
482 include_empty,
483 reply,
484 };
485 self.send_replica(namespace, action).await?;
486 rx.await?
487 }
488
489 pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
490 let (reply, rx) = oneshot::channel();
491 let action = ReplicaAction::DropReplica { reply };
492 self.send_replica(namespace, action).await?;
493 rx.await?
494 }
495
496 pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
497 let (reply, rx) = oneshot::channel();
498 let action = ReplicaAction::ExportSecretKey { reply };
499 self.send_replica(namespace, action).await?;
500 rx.await?
501 }
502
503 pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
504 let (reply, rx) = oneshot::channel();
505 let action = ReplicaAction::GetState { reply };
506 self.send_replica(namespace, action).await?;
507 rx.await?
508 }
509
510 pub async fn shutdown(&self) -> Result<Store> {
511 let (reply, rx) = oneshot::channel();
512 let action = Action::Shutdown { reply: Some(reply) };
513 self.send(action).await?;
514 let store = rx.await?;
515 Ok(store)
516 }
517
518 pub async fn list_authors(
519 &self,
520 reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
521 ) -> Result<()> {
522 self.send(Action::ListAuthors { reply }).await
523 }
524
525 pub async fn list_replicas(&self, reply: mpsc::Sender<RpcResult<ListResponse>>) -> Result<()> {
526 self.send(Action::ListReplicas { reply }).await
527 }
528
529 pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
530 let (reply, rx) = oneshot::channel();
531 self.send(Action::ImportAuthor { author, reply }).await?;
532 rx.await?
533 }
534
535 pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
536 let (reply, rx) = oneshot::channel();
537 self.send(Action::ExportAuthor { author, reply }).await?;
538 rx.await?
539 }
540
541 pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
542 let (reply, rx) = oneshot::channel();
543 self.send(Action::DeleteAuthor { author, reply }).await?;
544 rx.await?
545 }
546
547 pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
548 let (reply, rx) = oneshot::channel();
549 self.send(Action::ImportNamespace { capability, reply })
550 .await?;
551 rx.await?
552 }
553
554 pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
555 let (reply, rx) = oneshot::channel();
556 let action = ReplicaAction::GetDownloadPolicy { reply };
557 self.send_replica(namespace, action).await?;
558 rx.await?
559 }
560
561 pub async fn set_download_policy(
562 &self,
563 namespace: NamespaceId,
564 policy: DownloadPolicy,
565 ) -> Result<()> {
566 let (reply, rx) = oneshot::channel();
567 let action = ReplicaAction::SetDownloadPolicy { reply, policy };
568 self.send_replica(namespace, action).await?;
569 rx.await?
570 }
571
572 pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
573 let (reply, rx) = oneshot::channel();
574 self.send(Action::ContentHashes { reply }).await?;
575 rx.await?
576 }
577
578 pub async fn flush_store(&self) -> Result<()> {
588 let (reply, rx) = oneshot::channel();
589 self.send(Action::FlushStore { reply }).await?;
590 rx.await?
591 }
592
593 async fn send(&self, action: Action) -> Result<()> {
594 self.tx
595 .send(action)
596 .await
597 .context("sending to iroh_docs actor failed")?;
598 Ok(())
599 }
600 async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
601 self.send(Action::Replica(namespace, action)).await?;
602 Ok(())
603 }
604}
605
606impl Drop for SyncHandle {
607 fn drop(&mut self) {
608 #[allow(unused)]
610 if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
611 #[cfg(wasm_browser)]
612 {
613 let tx = self.tx.clone();
614 n0_future::task::spawn(async move {
615 tx.send(Action::Shutdown { reply: None }).await.ok();
616 });
617 }
618 #[cfg(not(wasm_browser))]
619 {
620 self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
624 let handle = handle.take().expect("this can only run once");
625
626 if let Err(err) = handle.join() {
627 warn!(?err, "Failed to join sync actor");
628 }
629 }
630 }
631 }
632}
633
634struct Actor {
635 store: Store,
636 states: OpenReplicas,
637 action_rx: async_channel::Receiver<Action>,
638 content_status_callback: Option<ContentStatusCallback>,
639 tasks: JoinSet<()>,
640 metrics: Arc<Metrics>,
641}
642
643impl Actor {
644 #[cfg(not(wasm_browser))]
645 fn run_in_thread(self) -> Result<()> {
646 let rt = tokio::runtime::Builder::new_current_thread()
647 .enable_time()
648 .build()?;
649 let local_set = tokio::task::LocalSet::new();
650 local_set.block_on(&rt, async move { self.run_async().await });
651 Ok(())
652 }
653
654 async fn run_async(mut self) {
655 let reply = loop {
656 let timeout = n0_future::time::sleep(MAX_COMMIT_DELAY);
657 tokio::pin!(timeout);
658 let action = tokio::select! {
659 _ = &mut timeout => {
660 if let Err(cause) = self.store.flush() {
661 error!(?cause, "failed to flush store");
662 }
663 continue;
664 }
665 action = self.action_rx.recv() => {
666 match action {
667 Ok(action) => action,
668 Err(async_channel::RecvError) => {
669 debug!("action channel disconnected");
670 break None;
671 }
672
673 }
674 }
675 };
676 trace!(%action, "tick");
677 self.metrics.actor_tick_main.inc();
678 match action {
679 Action::Shutdown { reply } => {
680 break reply;
681 }
682 action => {
683 if self.on_action(action).await.is_err() {
684 warn!("failed to send reply: receiver dropped");
685 }
686 }
687 }
688 };
689
690 if let Err(cause) = self.store.flush() {
691 warn!(?cause, "failed to flush store");
692 }
693 self.close_all();
694 self.tasks.abort_all();
695 debug!("docs actor shutdown");
696 if let Some(reply) = reply {
697 reply.send(self.store).ok();
698 }
699 }
700
701 async fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
702 match action {
703 Action::Shutdown { .. } => {
704 unreachable!("Shutdown is handled in run()")
705 }
706 Action::ImportAuthor { author, reply } => {
707 let id = author.id();
708 send_reply(reply, self.store.import_author(author).map(|_| id))
709 }
710 Action::ExportAuthor { author, reply } => {
711 send_reply(reply, self.store.get_author(&author))
712 }
713 Action::DeleteAuthor { author, reply } => {
714 send_reply(reply, self.store.delete_author(author))
715 }
716 Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
717 let id = capability.id();
718 let outcome = this.store.import_namespace(capability.clone())?;
719 if let ImportNamespaceOutcome::Upgraded = outcome {
720 if let Ok(state) = this.states.get_mut(&id) {
721 state.info.merge_capability(capability)?;
722 }
723 }
724 Ok(id)
725 }),
726 Action::ListAuthors { reply } => {
727 let iter = self
728 .store
729 .list_authors()
730 .map(|a| a.map(|a| a.map(|a| AuthorListResponse { author_id: a.id() })));
731 self.tasks
732 .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
733 Ok(())
734 }
735 Action::ListReplicas { reply } => {
736 let iter = self.store.list_namespaces();
737 let iter = iter.map(|inner| {
738 inner.map(|res| res.map(|(id, capability)| ListResponse { id, capability }))
739 });
740 self.tasks
741 .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
742 Ok(())
743 }
744 Action::ContentHashes { reply } => {
745 send_reply_with(reply, self, |this| this.store.content_hashes())
746 }
747 Action::FlushStore { reply } => send_reply(reply, self.store.flush()),
748 Action::Replica(namespace, action) => self.on_replica_action(namespace, action).await,
749 }
750 }
751
752 async fn on_replica_action(
753 &mut self,
754 namespace: NamespaceId,
755 action: ReplicaAction,
756 ) -> Result<(), SendReplyError> {
757 match action {
758 ReplicaAction::Open { reply, opts } => {
759 tracing::trace!("open in");
760 let res = self.open(namespace, opts);
761 tracing::trace!("open out");
762 send_reply(reply, res)
763 }
764 ReplicaAction::Close { reply } => {
765 let res = self.close(namespace);
766 reply.send(Ok(res)).ok();
768 Ok(())
769 }
770 ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
771 let state = this.states.get_mut(&namespace)?;
772 state.info.subscribe(sender);
773 Ok(())
774 }),
775 ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
776 let state = this.states.get_mut(&namespace)?;
777 state.info.unsubscribe(&sender);
778 drop(sender);
779 Ok(())
780 }),
781 ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
782 let state = this.states.get_mut(&namespace)?;
783 state.sync = sync;
784 Ok(())
785 }),
786 ReplicaAction::InsertLocal {
787 author,
788 key,
789 hash,
790 len,
791 reply,
792 } => {
793 send_reply_with_async(reply, self, async move |this| {
794 let author = get_author(&mut this.store, &author)?;
795 let mut replica = this.states.replica(namespace, &mut this.store)?;
796 replica.insert(&key, &author, hash, len).await?;
797 this.metrics.new_entries_local.inc();
798 this.metrics.new_entries_local_size.inc_by(len);
799 Ok(())
800 })
801 .await
802 }
803 ReplicaAction::DeletePrefix { author, key, reply } => {
804 send_reply_with_async(reply, self, async |this| {
805 let author = get_author(&mut this.store, &author)?;
806 let mut replica = this.states.replica(namespace, &mut this.store)?;
807 let res = replica.delete_prefix(&key, &author).await?;
808 Ok(res)
809 })
810 .await
811 }
812 ReplicaAction::InsertRemote {
813 entry,
814 from,
815 content_status,
816 reply,
817 } => {
818 send_reply_with_async(reply, self, async move |this| {
819 let mut replica = this
820 .states
821 .replica_if_syncing(&namespace, &mut this.store)?;
822 let len = entry.content_len();
823 replica
824 .insert_remote_entry(entry, from, content_status)
825 .await?;
826 this.metrics.new_entries_remote.inc();
827 this.metrics.new_entries_remote_size.inc_by(len);
828 Ok(())
829 })
830 .await
831 }
832
833 ReplicaAction::SyncInitialMessage { reply } => {
834 send_reply_with(reply, self, move |this| {
835 let mut replica = this
836 .states
837 .replica_if_syncing(&namespace, &mut this.store)?;
838 let res = replica.sync_initial_message()?;
839 Ok(res)
840 })
841 }
842 ReplicaAction::SyncProcessMessage {
843 message,
844 from,
845 mut state,
846 reply,
847 } => {
848 let res = async {
849 let mut replica = self
850 .states
851 .replica_if_syncing(&namespace, &mut self.store)?;
852 let res = replica
853 .sync_process_message(message, from, &mut state)
854 .await?;
855 Ok((res, state))
856 }
857 .await;
858 reply.send(res).map_err(send_reply_error)
859 }
860 ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
861 this.states.ensure_open(&namespace)?;
862 let peers = this.store.get_sync_peers(&namespace)?;
863 Ok(peers.map(|iter| iter.collect()))
864 }),
865 ReplicaAction::RegisterUsefulPeer { peer, reply } => {
866 let res = self.store.register_useful_peer(namespace, peer);
867 send_reply(reply, res)
868 }
869 ReplicaAction::GetExact {
870 author,
871 key,
872 include_empty,
873 reply,
874 } => send_reply_with(reply, self, move |this| {
875 this.states.ensure_open(&namespace)?;
876 this.store.get_exact(namespace, author, key, include_empty)
877 }),
878 ReplicaAction::GetMany { query, reply } => {
879 let iter = self
880 .states
881 .ensure_open(&namespace)
882 .and_then(|_| self.store.get_many(namespace, query));
883 self.tasks
884 .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
885 Ok(())
886 }
887 ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
888 this.close(namespace);
889 this.store.remove_replica(&namespace)
890 }),
891 ReplicaAction::ExportSecretKey { reply } => {
892 let res = self
893 .states
894 .get_mut(&namespace)
895 .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
896 send_reply(reply, res)
897 }
898 ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
899 let state = this.states.get_mut(&namespace)?;
900 let handles = state.handles;
901 let sync = state.sync;
902 let subscribers = state.info.subscribers_count();
903 Ok(OpenState {
904 handles,
905 sync,
906 subscribers,
907 })
908 }),
909 ReplicaAction::HasNewsForUs { heads, reply } => {
910 let res = self.store.has_news_for_us(namespace, &heads);
911 send_reply(reply, res)
912 }
913 ReplicaAction::SetDownloadPolicy { policy, reply } => {
914 send_reply(reply, self.store.set_download_policy(&namespace, policy))
915 }
916 ReplicaAction::GetDownloadPolicy { reply } => {
917 send_reply(reply, self.store.get_download_policy(&namespace))
918 }
919 }
920 }
921
922 fn close(&mut self, namespace: NamespaceId) -> bool {
923 let res = self.states.close(namespace);
924 if res {
925 self.store.close_replica(namespace);
926 }
927 res
928 }
929
930 fn close_all(&mut self) {
931 for id in self.states.close_all() {
932 self.store.close_replica(id);
933 }
934 }
935
936 fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
937 let open_cb = || {
938 let mut info = self.store.load_replica_info(&namespace)?;
939 if let Some(cb) = &self.content_status_callback {
940 info.set_content_status_callback(Arc::clone(cb));
941 }
942 Ok(info)
943 };
944 self.states.open_with(namespace, opts, open_cb)
945 }
946}
947
948#[derive(Default)]
949struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
950
951impl OpenReplicas {
952 fn replica<'a, 'b>(
953 &'a mut self,
954 namespace: NamespaceId,
955 store: &'b mut Store,
956 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
957 let state = self.get_mut(&namespace)?;
958 Ok(Replica::new(
959 StoreInstance::new(state.info.capability.id(), store),
960 &mut state.info,
961 ))
962 }
963
964 fn replica_if_syncing<'a, 'b>(
965 &'a mut self,
966 namespace: &NamespaceId,
967 store: &'b mut Store,
968 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
969 let state = self.get_mut(namespace)?;
970 anyhow::ensure!(state.sync, "sync is not enabled for replica");
971 Ok(Replica::new(
972 StoreInstance::new(state.info.capability.id(), store),
973 &mut state.info,
974 ))
975 }
976
977 fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
978 self.0.get_mut(namespace).context("replica not open")
979 }
980
981 fn is_open(&self, namespace: &NamespaceId) -> bool {
982 self.0.contains_key(namespace)
983 }
984
985 fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
986 match self.is_open(namespace) {
987 true => Ok(()),
988 false => Err(anyhow!("replica not open")),
989 }
990 }
991 fn open_with(
992 &mut self,
993 namespace: NamespaceId,
994 opts: OpenOpts,
995 mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
996 ) -> Result<()> {
997 match self.0.entry(namespace) {
998 hash_map::Entry::Vacant(e) => {
999 let mut info = open_cb()?;
1000 if let Some(sender) = opts.subscribe {
1001 info.subscribe(sender);
1002 }
1003 debug!(namespace = %namespace.fmt_short(), "open");
1004 let state = OpenReplica {
1005 info,
1006 sync: opts.sync,
1007 handles: 1,
1008 };
1009 e.insert(state);
1010 }
1011 hash_map::Entry::Occupied(mut e) => {
1012 let state = e.get_mut();
1013 state.handles += 1;
1014 state.sync = state.sync || opts.sync;
1015 if let Some(sender) = opts.subscribe {
1016 state.info.subscribe(sender);
1017 }
1018 }
1019 }
1020 Ok(())
1021 }
1022 fn close(&mut self, namespace: NamespaceId) -> bool {
1023 match self.0.entry(namespace) {
1024 hash_map::Entry::Vacant(_e) => {
1025 warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
1026 true
1027 }
1028 hash_map::Entry::Occupied(mut e) => {
1029 let state = e.get_mut();
1030 tracing::debug!("STATE {state:?}");
1031 state.handles = state.handles.wrapping_sub(1);
1032 if state.handles == 0 {
1033 let _ = e.remove_entry();
1034 debug!(namespace = %namespace.fmt_short(), "close");
1035 true
1036 } else {
1037 false
1038 }
1039 }
1040 }
1041 }
1042
1043 fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
1044 self.0.drain().map(|(n, _s)| n)
1045 }
1046}
1047
1048async fn iter_to_irpc<T: irpc::RpcMessage>(
1049 channel: mpsc::Sender<RpcResult<T>>,
1050 iter: Result<impl Iterator<Item = Result<T>>>,
1051) -> Result<(), SendReplyError> {
1052 match iter {
1053 Err(err) => channel
1054 .send(Err(RpcError::new(&*err)))
1055 .await
1056 .map_err(send_reply_error)?,
1057 Ok(iter) => {
1058 for item in iter {
1059 let item = item.map_err(|err| RpcError::new(&*err));
1060 channel.send(item).await.map_err(send_reply_error)?;
1061 }
1062 }
1063 }
1064 Ok(())
1065}
1066
1067fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
1068 store.get_author(id)?.context("author not found")
1069}
1070
1071#[derive(Debug)]
1072struct SendReplyError;
1073
1074fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
1075 sender.send(value).map_err(send_reply_error)
1076}
1077
1078fn send_reply_with<T>(
1079 sender: oneshot::Sender<Result<T>>,
1080 this: &mut Actor,
1081 f: impl FnOnce(&mut Actor) -> Result<T>,
1082) -> Result<(), SendReplyError> {
1083 sender.send(f(this)).map_err(send_reply_error)
1084}
1085
1086async fn send_reply_with_async<T>(
1087 sender: oneshot::Sender<Result<T>>,
1088 this: &mut Actor,
1089 f: impl AsyncFnOnce(&mut Actor) -> Result<T>,
1090) -> Result<(), SendReplyError> {
1091 sender.send(f(this).await).map_err(send_reply_error)
1092}
1093
1094fn send_reply_error<T>(_err: T) -> SendReplyError {
1095 SendReplyError
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100 use super::*;
1101 use crate::store;
1102 #[tokio::test]
1103 async fn open_close() -> anyhow::Result<()> {
1104 let store = store::Store::memory();
1105 let sync = SyncHandle::spawn(store, None, "foo".into());
1106 let namespace = NamespaceSecret::new(&mut rand::rng());
1107 let id = namespace.id();
1108 sync.import_namespace(namespace.into()).await?;
1109 sync.open(id, Default::default()).await?;
1110 let (tx, rx) = async_channel::bounded(10);
1111 sync.subscribe(id, tx).await?;
1112 sync.close(id).await?;
1113 assert!(rx.recv().await.is_err());
1114 Ok(())
1115 }
1116}