1use std::{
4 collections::{hash_map, HashMap},
5 num::NonZeroU64,
6 sync::Arc,
7};
8
9use anyhow::{anyhow, Context, Result};
10use bytes::Bytes;
11use iroh_blobs::Hash;
12use irpc::channel::mpsc;
13use n0_future::{task::JoinSet, time::Duration, TryFutureExt};
14use serde::{Deserialize, Serialize};
15use tokio::sync::oneshot;
16#[cfg(wasm_browser)]
17use tracing::Instrument;
18use tracing::{debug, error, error_span, trace, warn};
19
20use crate::{
21 api::{
22 protocol::{AuthorListResponse, ListResponse},
23 RpcError, RpcResult,
24 },
25 metrics::Metrics,
26 ranger::Message,
27 store::{
28 fs::{ContentHashesIterator, StoreInstance},
29 DownloadPolicy, ImportNamespaceOutcome, Query, Store,
30 },
31 Author, AuthorHeads, AuthorId, Capability, ContentStatus, ContentStatusCallback, Event,
32 NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo, SignedEntry, SyncOutcome,
33};
34
35const ACTION_CAP: usize = 1024;
36pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
37
38#[derive(derive_more::Debug, derive_more::Display)]
39enum Action {
40 #[display("NewAuthor")]
41 ImportAuthor {
42 author: Author,
43 #[debug("reply")]
44 reply: oneshot::Sender<Result<AuthorId>>,
45 },
46 #[display("ExportAuthor")]
47 ExportAuthor {
48 author: AuthorId,
49 #[debug("reply")]
50 reply: oneshot::Sender<Result<Option<Author>>>,
51 },
52 #[display("DeleteAuthor")]
53 DeleteAuthor {
54 author: AuthorId,
55 #[debug("reply")]
56 reply: oneshot::Sender<Result<()>>,
57 },
58 #[display("NewReplica")]
59 ImportNamespace {
60 capability: Capability,
61 #[debug("reply")]
62 reply: oneshot::Sender<Result<NamespaceId>>,
63 },
64 #[display("ListAuthors")]
65 ListAuthors {
66 #[debug("reply")]
67 reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
68 },
69 #[display("ListReplicas")]
70 ListReplicas {
71 #[debug("reply")]
72 reply: mpsc::Sender<RpcResult<ListResponse>>,
73 },
74 #[display("ContentHashes")]
75 ContentHashes {
76 #[debug("reply")]
77 reply: oneshot::Sender<Result<ContentHashesIterator>>,
78 },
79 #[display("FlushStore")]
80 FlushStore {
81 #[debug("reply")]
82 reply: oneshot::Sender<Result<()>>,
83 },
84 #[display("Replica({}, {})", _0.fmt_short(), _1)]
85 Replica(NamespaceId, ReplicaAction),
86 #[display("Shutdown")]
87 Shutdown {
88 #[debug("reply")]
89 reply: Option<oneshot::Sender<Store>>,
90 },
91 #[cfg(test)]
92 #[display("DebugTasksLen")]
93 DebugTasksLen {
94 #[debug("reply")]
95 reply: oneshot::Sender<usize>,
96 },
97}
98
99#[derive(derive_more::Debug, strum::Display)]
100enum ReplicaAction {
101 Open {
102 #[debug("reply")]
103 reply: oneshot::Sender<Result<()>>,
104 opts: OpenOpts,
105 },
106 Close {
107 #[debug("reply")]
108 reply: oneshot::Sender<Result<bool>>,
109 },
110 GetState {
111 #[debug("reply")]
112 reply: oneshot::Sender<Result<OpenState>>,
113 },
114 SetSync {
115 sync: bool,
116 #[debug("reply")]
117 reply: oneshot::Sender<Result<()>>,
118 },
119 Subscribe {
120 sender: async_channel::Sender<Event>,
121 #[debug("reply")]
122 reply: oneshot::Sender<Result<()>>,
123 },
124 Unsubscribe {
125 sender: async_channel::Sender<Event>,
126 #[debug("reply")]
127 reply: oneshot::Sender<Result<()>>,
128 },
129 InsertLocal {
130 author: AuthorId,
131 key: Bytes,
132 hash: Hash,
133 len: u64,
134 #[debug("reply")]
135 reply: oneshot::Sender<Result<()>>,
136 },
137 DeletePrefix {
138 author: AuthorId,
139 key: Bytes,
140 #[debug("reply")]
141 reply: oneshot::Sender<Result<usize>>,
142 },
143 InsertRemote {
144 entry: SignedEntry,
145 from: PeerIdBytes,
146 content_status: ContentStatus,
147 #[debug("reply")]
148 reply: oneshot::Sender<Result<()>>,
149 },
150 SyncInitialMessage {
151 #[debug("reply")]
152 reply: oneshot::Sender<Result<Message<SignedEntry>>>,
153 },
154 SyncProcessMessage {
155 message: Message<SignedEntry>,
156 from: PeerIdBytes,
157 state: SyncOutcome,
158 #[debug("reply")]
159 reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
160 },
161 GetSyncPeers {
162 #[debug("reply")]
163 reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
164 },
165 RegisterUsefulPeer {
166 peer: PeerIdBytes,
167 #[debug("reply")]
168 reply: oneshot::Sender<Result<()>>,
169 },
170 GetExact {
171 author: AuthorId,
172 key: Bytes,
173 include_empty: bool,
174 reply: oneshot::Sender<Result<Option<SignedEntry>>>,
175 },
176 GetMany {
177 query: Query,
178 reply: mpsc::Sender<RpcResult<SignedEntry>>,
179 },
180 DropReplica {
181 reply: oneshot::Sender<Result<()>>,
182 },
183 ExportSecretKey {
184 reply: oneshot::Sender<Result<NamespaceSecret>>,
185 },
186 HasNewsForUs {
187 heads: AuthorHeads,
188 #[debug("reply")]
189 reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
190 },
191 SetDownloadPolicy {
192 policy: DownloadPolicy,
193 #[debug("reply")]
194 reply: oneshot::Sender<Result<()>>,
195 },
196 GetDownloadPolicy {
197 #[debug("reply")]
198 reply: oneshot::Sender<Result<DownloadPolicy>>,
199 },
200}
201
202#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
204pub struct OpenState {
205 pub sync: bool,
207 pub subscribers: usize,
209 pub handles: usize,
211}
212
213#[derive(Debug)]
214struct OpenReplica {
215 info: ReplicaInfo,
216 sync: bool,
217 handles: usize,
218}
219
220#[derive(Debug, Clone)]
233pub struct SyncHandle {
234 tx: async_channel::Sender<Action>,
235 #[cfg(wasm_browser)]
236 #[allow(unused)]
237 join_handle: Arc<Option<n0_future::task::JoinHandle<()>>>,
238 #[cfg(not(wasm_browser))]
239 join_handle: Arc<Option<std::thread::JoinHandle<()>>>,
240 metrics: Arc<Metrics>,
241}
242
243#[derive(Debug, Default)]
245pub struct OpenOpts {
246 pub sync: bool,
248 pub subscribe: Option<async_channel::Sender<Event>>,
250}
251
252impl OpenOpts {
253 pub fn sync(mut self) -> Self {
255 self.sync = true;
256 self
257 }
258 pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
260 self.subscribe = Some(subscribe);
261 self
262 }
263}
264
265#[allow(missing_docs)]
266impl SyncHandle {
267 pub fn spawn(
269 store: Store,
270 content_status_callback: Option<ContentStatusCallback>,
271 me: String,
272 ) -> SyncHandle {
273 let metrics = Arc::new(Metrics::default());
274 let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
275 let actor = Actor {
276 store,
277 states: Default::default(),
278 action_rx,
279 content_status_callback,
280 tasks: Default::default(),
281 metrics: metrics.clone(),
282 };
283
284 let span = error_span!("sync", %me);
285 #[cfg(wasm_browser)]
286 let join_handle = n0_future::task::spawn(actor.run_async().instrument(span));
287
288 #[cfg(not(wasm_browser))]
289 let join_handle = std::thread::Builder::new()
290 .name("sync-actor".to_string())
291 .spawn(move || {
292 let _enter = span.enter();
293
294 if let Err(err) = actor.run_in_thread() {
295 error!("Sync actor closed with error: {err:?}");
296 }
297 })
298 .expect("failed to spawn thread");
299
300 let join_handle = Arc::new(Some(join_handle));
301 SyncHandle {
302 tx: action_tx,
303 join_handle,
304 metrics,
305 }
306 }
307
308 pub fn metrics(&self) -> &Arc<Metrics> {
310 &self.metrics
311 }
312
313 pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
314 tracing::debug!("SyncHandle::open called");
315 let (reply, rx) = oneshot::channel();
316 let action = ReplicaAction::Open { reply, opts };
317 self.send_replica(namespace, action).await?;
318 rx.await?
319 }
320
321 pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
322 let (reply, rx) = oneshot::channel();
323 self.send_replica(namespace, ReplicaAction::Close { reply })
324 .await?;
325 rx.await?
326 }
327
328 pub async fn subscribe(
329 &self,
330 namespace: NamespaceId,
331 sender: async_channel::Sender<Event>,
332 ) -> Result<()> {
333 let (reply, rx) = oneshot::channel();
334 self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
335 .await?;
336 rx.await?
337 }
338
339 pub async fn unsubscribe(
340 &self,
341 namespace: NamespaceId,
342 sender: async_channel::Sender<Event>,
343 ) -> Result<()> {
344 let (reply, rx) = oneshot::channel();
345 self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
346 .await?;
347 rx.await?
348 }
349
350 pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
351 let (reply, rx) = oneshot::channel();
352 let action = ReplicaAction::SetSync { sync, reply };
353 self.send_replica(namespace, action).await?;
354 rx.await?
355 }
356
357 pub async fn insert_local(
358 &self,
359 namespace: NamespaceId,
360 author: AuthorId,
361 key: Bytes,
362 hash: Hash,
363 len: u64,
364 ) -> Result<()> {
365 let (reply, rx) = oneshot::channel();
366 let action = ReplicaAction::InsertLocal {
367 author,
368 key,
369 hash,
370 len,
371 reply,
372 };
373 self.send_replica(namespace, action).await?;
374 rx.await?
375 }
376
377 pub async fn delete_prefix(
378 &self,
379 namespace: NamespaceId,
380 author: AuthorId,
381 key: Bytes,
382 ) -> Result<usize> {
383 let (reply, rx) = oneshot::channel();
384 let action = ReplicaAction::DeletePrefix { author, key, reply };
385 self.send_replica(namespace, action).await?;
386 rx.await?
387 }
388
389 pub async fn insert_remote(
390 &self,
391 namespace: NamespaceId,
392 entry: SignedEntry,
393 from: PeerIdBytes,
394 content_status: ContentStatus,
395 ) -> Result<()> {
396 let (reply, rx) = oneshot::channel();
397 let action = ReplicaAction::InsertRemote {
398 entry,
399 from,
400 content_status,
401 reply,
402 };
403 self.send_replica(namespace, action).await?;
404 rx.await?
405 }
406
407 pub async fn sync_initial_message(
408 &self,
409 namespace: NamespaceId,
410 ) -> Result<Message<SignedEntry>> {
411 let (reply, rx) = oneshot::channel();
412 let action = ReplicaAction::SyncInitialMessage { reply };
413 self.send_replica(namespace, action).await?;
414 rx.await?
415 }
416
417 pub async fn sync_process_message(
418 &self,
419 namespace: NamespaceId,
420 message: Message<SignedEntry>,
421 from: PeerIdBytes,
422 state: SyncOutcome,
423 ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
424 let (reply, rx) = oneshot::channel();
425 let action = ReplicaAction::SyncProcessMessage {
426 reply,
427 message,
428 from,
429 state,
430 };
431 self.send_replica(namespace, action).await?;
432 rx.await?
433 }
434
435 pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
436 let (reply, rx) = oneshot::channel();
437 let action = ReplicaAction::GetSyncPeers { reply };
438 self.send_replica(namespace, action).await?;
439 rx.await?
440 }
441
442 pub async fn register_useful_peer(
443 &self,
444 namespace: NamespaceId,
445 peer: PeerIdBytes,
446 ) -> Result<()> {
447 let (reply, rx) = oneshot::channel();
448 let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
449 self.send_replica(namespace, action).await?;
450 rx.await?
451 }
452
453 pub async fn has_news_for_us(
454 &self,
455 namespace: NamespaceId,
456 heads: AuthorHeads,
457 ) -> Result<Option<NonZeroU64>> {
458 let (reply, rx) = oneshot::channel();
459 let action = ReplicaAction::HasNewsForUs { reply, heads };
460 self.send_replica(namespace, action).await?;
461 rx.await?
462 }
463
464 pub async fn get_many(
465 &self,
466 namespace: NamespaceId,
467 query: Query,
468 reply: mpsc::Sender<RpcResult<SignedEntry>>,
469 ) -> Result<()> {
470 let action = ReplicaAction::GetMany { query, reply };
471 self.send_replica(namespace, action).await?;
472 Ok(())
473 }
474
475 pub async fn get_exact(
476 &self,
477 namespace: NamespaceId,
478 author: AuthorId,
479 key: Bytes,
480 include_empty: bool,
481 ) -> Result<Option<SignedEntry>> {
482 let (reply, rx) = oneshot::channel();
483 let action = ReplicaAction::GetExact {
484 author,
485 key,
486 include_empty,
487 reply,
488 };
489 self.send_replica(namespace, action).await?;
490 rx.await?
491 }
492
493 pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
494 let (reply, rx) = oneshot::channel();
495 let action = ReplicaAction::DropReplica { reply };
496 self.send_replica(namespace, action).await?;
497 rx.await?
498 }
499
500 pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
501 let (reply, rx) = oneshot::channel();
502 let action = ReplicaAction::ExportSecretKey { reply };
503 self.send_replica(namespace, action).await?;
504 rx.await?
505 }
506
507 pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
508 let (reply, rx) = oneshot::channel();
509 let action = ReplicaAction::GetState { reply };
510 self.send_replica(namespace, action).await?;
511 rx.await?
512 }
513
514 pub async fn shutdown(&self) -> Result<Store> {
515 let (reply, rx) = oneshot::channel();
516 let action = Action::Shutdown { reply: Some(reply) };
517 self.send(action).await?;
518 let store = rx.await?;
519 Ok(store)
520 }
521
522 #[cfg(test)]
523 async fn debug_tasks_len(&self) -> Result<usize> {
524 let (reply, rx) = oneshot::channel();
525 self.send(Action::DebugTasksLen { reply }).await?;
526 Ok(rx.await?)
527 }
528
529 pub async fn list_authors(
530 &self,
531 reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
532 ) -> Result<()> {
533 self.send(Action::ListAuthors { reply }).await
534 }
535
536 pub async fn list_replicas(&self, reply: mpsc::Sender<RpcResult<ListResponse>>) -> Result<()> {
537 self.send(Action::ListReplicas { reply }).await
538 }
539
540 pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
541 let (reply, rx) = oneshot::channel();
542 self.send(Action::ImportAuthor { author, reply }).await?;
543 rx.await?
544 }
545
546 pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
547 let (reply, rx) = oneshot::channel();
548 self.send(Action::ExportAuthor { author, reply }).await?;
549 rx.await?
550 }
551
552 pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
553 let (reply, rx) = oneshot::channel();
554 self.send(Action::DeleteAuthor { author, reply }).await?;
555 rx.await?
556 }
557
558 pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
559 let (reply, rx) = oneshot::channel();
560 self.send(Action::ImportNamespace { capability, reply })
561 .await?;
562 rx.await?
563 }
564
565 pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
566 let (reply, rx) = oneshot::channel();
567 let action = ReplicaAction::GetDownloadPolicy { reply };
568 self.send_replica(namespace, action).await?;
569 rx.await?
570 }
571
572 pub async fn set_download_policy(
573 &self,
574 namespace: NamespaceId,
575 policy: DownloadPolicy,
576 ) -> Result<()> {
577 let (reply, rx) = oneshot::channel();
578 let action = ReplicaAction::SetDownloadPolicy { reply, policy };
579 self.send_replica(namespace, action).await?;
580 rx.await?
581 }
582
583 pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
584 let (reply, rx) = oneshot::channel();
585 self.send(Action::ContentHashes { reply }).await?;
586 rx.await?
587 }
588
589 pub async fn flush_store(&self) -> Result<()> {
599 let (reply, rx) = oneshot::channel();
600 self.send(Action::FlushStore { reply }).await?;
601 rx.await?
602 }
603
604 async fn send(&self, action: Action) -> Result<()> {
605 self.tx
606 .send(action)
607 .await
608 .context("sending to iroh_docs actor failed")?;
609 Ok(())
610 }
611 async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
612 self.send(Action::Replica(namespace, action)).await?;
613 Ok(())
614 }
615}
616
617impl Drop for SyncHandle {
618 fn drop(&mut self) {
619 #[allow(unused)]
621 if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
622 #[cfg(wasm_browser)]
623 {
624 let tx = self.tx.clone();
625 n0_future::task::spawn(async move {
626 tx.send(Action::Shutdown { reply: None }).await.ok();
627 });
628 }
629 #[cfg(not(wasm_browser))]
630 {
631 self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
635 let handle = handle.take().expect("this can only run once");
636
637 if let Err(err) = handle.join() {
638 warn!(?err, "Failed to join sync actor");
639 }
640 }
641 }
642 }
643}
644
645struct Actor {
646 store: Store,
647 states: OpenReplicas,
648 action_rx: async_channel::Receiver<Action>,
649 content_status_callback: Option<ContentStatusCallback>,
650 tasks: JoinSet<()>,
651 metrics: Arc<Metrics>,
652}
653
654impl Actor {
655 #[cfg(not(wasm_browser))]
656 fn run_in_thread(self) -> Result<()> {
657 let rt = tokio::runtime::Builder::new_current_thread()
658 .enable_time()
659 .build()?;
660 let local_set = tokio::task::LocalSet::new();
661 local_set.block_on(&rt, async move { self.run_async().await });
662 Ok(())
663 }
664
665 async fn run_async(mut self) {
666 let reply = loop {
667 let timeout = n0_future::time::sleep(MAX_COMMIT_DELAY);
668 tokio::pin!(timeout);
669 let action = tokio::select! {
670 _ = &mut timeout => {
671 if let Err(cause) = self.store.flush() {
672 error!(?cause, "failed to flush store");
673 }
674 continue;
675 }
676 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
677 if let Err(err) = res {
678 if !err.is_cancelled() {
679 warn!(?err, "actor reply-streamer task panicked");
680 }
681 }
682 continue;
683 }
684 action = self.action_rx.recv() => {
685 match action {
686 Ok(action) => action,
687 Err(async_channel::RecvError) => {
688 debug!("action channel disconnected");
689 break None;
690 }
691
692 }
693 }
694 };
695 trace!(%action, "tick");
696 self.metrics.actor_tick_main.inc();
697 match action {
698 Action::Shutdown { reply } => {
699 break reply;
700 }
701 action => {
702 if self.on_action(action).await.is_err() {
703 warn!("failed to send reply: receiver dropped");
704 }
705 }
706 }
707 };
708
709 if let Err(cause) = self.store.flush() {
710 warn!(?cause, "failed to flush store");
711 }
712 self.close_all();
713 self.tasks.abort_all();
714 debug!("docs actor shutdown");
715 if let Some(reply) = reply {
716 reply.send(self.store).ok();
717 }
718 }
719
720 async fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
721 match action {
722 Action::Shutdown { .. } => {
723 unreachable!("Shutdown is handled in run()")
724 }
725 #[cfg(test)]
726 Action::DebugTasksLen { reply } => send_reply(reply, self.tasks.len()),
727 Action::ImportAuthor { author, reply } => {
728 let id = author.id();
729 send_reply(reply, self.store.import_author(author).map(|_| id))
730 }
731 Action::ExportAuthor { author, reply } => {
732 send_reply(reply, self.store.get_author(&author))
733 }
734 Action::DeleteAuthor { author, reply } => {
735 send_reply(reply, self.store.delete_author(author))
736 }
737 Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
738 let id = capability.id();
739 let outcome = this.store.import_namespace(capability.clone())?;
740 if let ImportNamespaceOutcome::Upgraded = outcome {
741 if let Ok(state) = this.states.get_mut(&id) {
742 state.info.merge_capability(capability)?;
743 }
744 }
745 Ok(id)
746 }),
747 Action::ListAuthors { reply } => {
748 let iter = self
749 .store
750 .list_authors()
751 .map(|a| a.map(|a| a.map(|a| AuthorListResponse { author_id: a.id() })));
752 self.tasks.spawn_local(async move {
753 iter_to_irpc(reply, iter).await.ok();
754 });
755 Ok(())
756 }
757 Action::ListReplicas { reply } => {
758 let iter = self.store.list_namespaces();
759 let iter = iter.map(|inner| {
760 inner.map(|res| res.map(|(id, capability)| ListResponse { id, capability }))
761 });
762 self.tasks.spawn_local(async move {
763 iter_to_irpc(reply, iter).await.ok();
764 });
765 Ok(())
766 }
767 Action::ContentHashes { reply } => {
768 send_reply_with(reply, self, |this| this.store.content_hashes())
769 }
770 Action::FlushStore { reply } => send_reply(reply, self.store.flush()),
771 Action::Replica(namespace, action) => self.on_replica_action(namespace, action).await,
772 }
773 }
774
775 async fn on_replica_action(
776 &mut self,
777 namespace: NamespaceId,
778 action: ReplicaAction,
779 ) -> Result<(), SendReplyError> {
780 match action {
781 ReplicaAction::Open { reply, opts } => {
782 tracing::trace!("open in");
783 let res = self.open(namespace, opts);
784 tracing::trace!("open out");
785 send_reply(reply, res)
786 }
787 ReplicaAction::Close { reply } => {
788 let res = self.close(namespace);
789 reply.send(Ok(res)).ok();
791 Ok(())
792 }
793 ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
794 let state = this.states.get_mut(&namespace)?;
795 state.info.subscribe(sender);
796 Ok(())
797 }),
798 ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
799 let state = this.states.get_mut(&namespace)?;
800 state.info.unsubscribe(&sender);
801 drop(sender);
802 Ok(())
803 }),
804 ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
805 let state = this.states.get_mut(&namespace)?;
806 state.sync = sync;
807 Ok(())
808 }),
809 ReplicaAction::InsertLocal {
810 author,
811 key,
812 hash,
813 len,
814 reply,
815 } => {
816 send_reply_with_async(reply, self, async move |this| {
817 let author = get_author(&mut this.store, &author)?;
818 let mut replica = this.states.replica(namespace, &mut this.store)?;
819 replica.insert(&key, &author, hash, len).await?;
820 this.metrics.new_entries_local.inc();
821 this.metrics.new_entries_local_size.inc_by(len);
822 Ok(())
823 })
824 .await
825 }
826 ReplicaAction::DeletePrefix { author, key, reply } => {
827 send_reply_with_async(reply, self, async |this| {
828 let author = get_author(&mut this.store, &author)?;
829 let mut replica = this.states.replica(namespace, &mut this.store)?;
830 let res = replica.delete_prefix(&key, &author).await?;
831 Ok(res)
832 })
833 .await
834 }
835 ReplicaAction::InsertRemote {
836 entry,
837 from,
838 content_status,
839 reply,
840 } => {
841 send_reply_with_async(reply, self, async move |this| {
842 let mut replica = this
843 .states
844 .replica_if_syncing(&namespace, &mut this.store)?;
845 let len = entry.content_len();
846 replica
847 .insert_remote_entry(entry, from, content_status)
848 .await?;
849 this.metrics.new_entries_remote.inc();
850 this.metrics.new_entries_remote_size.inc_by(len);
851 Ok(())
852 })
853 .await
854 }
855
856 ReplicaAction::SyncInitialMessage { reply } => {
857 send_reply_with(reply, self, move |this| {
858 let mut replica = this
859 .states
860 .replica_if_syncing(&namespace, &mut this.store)?;
861 let res = replica.sync_initial_message()?;
862 Ok(res)
863 })
864 }
865 ReplicaAction::SyncProcessMessage {
866 message,
867 from,
868 mut state,
869 reply,
870 } => {
871 let res = async {
872 let mut replica = self
873 .states
874 .replica_if_syncing(&namespace, &mut self.store)?;
875 let res = replica
876 .sync_process_message(message, from, &mut state)
877 .await?;
878 Ok((res, state))
879 }
880 .await;
881 reply.send(res).map_err(send_reply_error)
882 }
883 ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
884 this.states.ensure_open(&namespace)?;
885 let peers = this.store.get_sync_peers(&namespace)?;
886 Ok(peers.map(|iter| iter.collect()))
887 }),
888 ReplicaAction::RegisterUsefulPeer { peer, reply } => {
889 let res = self.store.register_useful_peer(namespace, peer);
890 send_reply(reply, res)
891 }
892 ReplicaAction::GetExact {
893 author,
894 key,
895 include_empty,
896 reply,
897 } => send_reply_with(reply, self, move |this| {
898 this.states.ensure_open(&namespace)?;
899 this.store.get_exact(namespace, author, key, include_empty)
900 }),
901 ReplicaAction::GetMany { query, reply } => {
902 let iter = self
903 .states
904 .ensure_open(&namespace)
905 .and_then(|_| self.store.get_many(namespace, query));
906 self.tasks
907 .spawn_local(iter_to_irpc(reply, iter).map_ok_or_else(|_| (), |_| ()));
908 Ok(())
909 }
910 ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
911 this.close(namespace);
912 this.store.remove_replica(&namespace)
913 }),
914 ReplicaAction::ExportSecretKey { reply } => {
915 let res = self
916 .states
917 .get_mut(&namespace)
918 .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
919 send_reply(reply, res)
920 }
921 ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
922 let state = this.states.get_mut(&namespace)?;
923 let handles = state.handles;
924 let sync = state.sync;
925 let subscribers = state.info.subscribers_count();
926 Ok(OpenState {
927 handles,
928 sync,
929 subscribers,
930 })
931 }),
932 ReplicaAction::HasNewsForUs { heads, reply } => {
933 let res = self.store.has_news_for_us(namespace, &heads);
934 send_reply(reply, res)
935 }
936 ReplicaAction::SetDownloadPolicy { policy, reply } => {
937 send_reply(reply, self.store.set_download_policy(&namespace, policy))
938 }
939 ReplicaAction::GetDownloadPolicy { reply } => {
940 send_reply(reply, self.store.get_download_policy(&namespace))
941 }
942 }
943 }
944
945 fn close(&mut self, namespace: NamespaceId) -> bool {
946 let res = self.states.close(namespace);
947 if res {
948 self.store.close_replica(namespace);
949 }
950 res
951 }
952
953 fn close_all(&mut self) {
954 for id in self.states.close_all() {
955 self.store.close_replica(id);
956 }
957 }
958
959 fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
960 let open_cb = || {
961 let mut info = self.store.load_replica_info(&namespace)?;
962 if let Some(cb) = &self.content_status_callback {
963 info.set_content_status_callback(Arc::clone(cb));
964 }
965 Ok(info)
966 };
967 self.states.open_with(namespace, opts, open_cb)
968 }
969}
970
971#[derive(Default)]
972struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
973
974impl OpenReplicas {
975 fn replica<'a, 'b>(
976 &'a mut self,
977 namespace: NamespaceId,
978 store: &'b mut Store,
979 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
980 let state = self.get_mut(&namespace)?;
981 Ok(Replica::new(
982 StoreInstance::new(state.info.capability.id(), store),
983 &mut state.info,
984 ))
985 }
986
987 fn replica_if_syncing<'a, 'b>(
988 &'a mut self,
989 namespace: &NamespaceId,
990 store: &'b mut Store,
991 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
992 let state = self.get_mut(namespace)?;
993 anyhow::ensure!(state.sync, "sync is not enabled for replica");
994 Ok(Replica::new(
995 StoreInstance::new(state.info.capability.id(), store),
996 &mut state.info,
997 ))
998 }
999
1000 fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
1001 self.0.get_mut(namespace).context("replica not open")
1002 }
1003
1004 fn is_open(&self, namespace: &NamespaceId) -> bool {
1005 self.0.contains_key(namespace)
1006 }
1007
1008 fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
1009 match self.is_open(namespace) {
1010 true => Ok(()),
1011 false => Err(anyhow!("replica not open")),
1012 }
1013 }
1014 fn open_with(
1015 &mut self,
1016 namespace: NamespaceId,
1017 opts: OpenOpts,
1018 mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
1019 ) -> Result<()> {
1020 match self.0.entry(namespace) {
1021 hash_map::Entry::Vacant(e) => {
1022 let mut info = open_cb()?;
1023 if let Some(sender) = opts.subscribe {
1024 info.subscribe(sender);
1025 }
1026 debug!(namespace = %namespace.fmt_short(), "open");
1027 let state = OpenReplica {
1028 info,
1029 sync: opts.sync,
1030 handles: 1,
1031 };
1032 e.insert(state);
1033 }
1034 hash_map::Entry::Occupied(mut e) => {
1035 let state = e.get_mut();
1036 state.handles += 1;
1037 state.sync = state.sync || opts.sync;
1038 if let Some(sender) = opts.subscribe {
1039 state.info.subscribe(sender);
1040 }
1041 }
1042 }
1043 Ok(())
1044 }
1045 fn close(&mut self, namespace: NamespaceId) -> bool {
1046 match self.0.entry(namespace) {
1047 hash_map::Entry::Vacant(_e) => {
1048 warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
1049 true
1050 }
1051 hash_map::Entry::Occupied(mut e) => {
1052 let state = e.get_mut();
1053 tracing::debug!("STATE {state:?}");
1054 state.handles = state.handles.wrapping_sub(1);
1055 if state.handles == 0 {
1056 let _ = e.remove_entry();
1057 debug!(namespace = %namespace.fmt_short(), "close");
1058 true
1059 } else {
1060 false
1061 }
1062 }
1063 }
1064 }
1065
1066 fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
1067 self.0.drain().map(|(n, _s)| n)
1068 }
1069}
1070
1071async fn iter_to_irpc<T: irpc::RpcMessage>(
1072 channel: mpsc::Sender<RpcResult<T>>,
1073 iter: Result<impl Iterator<Item = Result<T>>>,
1074) -> Result<(), SendReplyError> {
1075 match iter {
1076 Err(err) => channel
1077 .send(Err(RpcError::new(&*err)))
1078 .await
1079 .map_err(send_reply_error)?,
1080 Ok(iter) => {
1081 for item in iter {
1082 let item = item.map_err(|err| RpcError::new(&*err));
1083 channel.send(item).await.map_err(send_reply_error)?;
1084 }
1085 }
1086 }
1087 Ok(())
1088}
1089
1090fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
1091 store.get_author(id)?.context("author not found")
1092}
1093
1094#[derive(Debug)]
1095struct SendReplyError;
1096
1097fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
1098 sender.send(value).map_err(send_reply_error)
1099}
1100
1101fn send_reply_with<T>(
1102 sender: oneshot::Sender<Result<T>>,
1103 this: &mut Actor,
1104 f: impl FnOnce(&mut Actor) -> Result<T>,
1105) -> Result<(), SendReplyError> {
1106 sender.send(f(this)).map_err(send_reply_error)
1107}
1108
1109async fn send_reply_with_async<T>(
1110 sender: oneshot::Sender<Result<T>>,
1111 this: &mut Actor,
1112 f: impl AsyncFnOnce(&mut Actor) -> Result<T>,
1113) -> Result<(), SendReplyError> {
1114 sender.send(f(this).await).map_err(send_reply_error)
1115}
1116
1117fn send_reply_error<T>(_err: T) -> SendReplyError {
1118 SendReplyError
1119}
1120
1121#[cfg(test)]
1122mod tests {
1123 use super::*;
1124 use crate::store;
1125 #[tokio::test]
1126 async fn open_close() -> anyhow::Result<()> {
1127 let store = store::Store::memory();
1128 let sync = SyncHandle::spawn(store, None, "foo".into());
1129 let namespace = NamespaceSecret::new(&mut rand::rng());
1130 let id = namespace.id();
1131 sync.import_namespace(namespace.into()).await?;
1132 sync.open(id, Default::default()).await?;
1133 let (tx, rx) = async_channel::bounded(10);
1134 sync.subscribe(id, tx).await?;
1135 sync.close(id).await?;
1136 assert!(rx.recv().await.is_err());
1137 Ok(())
1138 }
1139
1140 #[tokio::test]
1149 async fn actor_tasks_joinset_drain() -> anyhow::Result<()> {
1150 let store = store::Store::memory();
1151 let sync = SyncHandle::spawn(store, None, "drain".into());
1152
1153 let namespace = NamespaceSecret::new(&mut rand::rng());
1154 let id = namespace.id();
1155 sync.import_namespace(namespace.into()).await?;
1156 sync.open(id, Default::default()).await?;
1157
1158 const ITERATIONS: usize = 1000;
1159
1160 for _ in 0..ITERATIONS {
1161 let (tx, mut rx) = mpsc::channel(64);
1162 sync.list_authors(tx).await?;
1163 while rx.recv().await?.is_some() {}
1164 }
1165
1166 for _ in 0..ITERATIONS {
1167 let (tx, mut rx) = mpsc::channel(64);
1168 sync.list_replicas(tx).await?;
1169 while rx.recv().await?.is_some() {}
1170 }
1171
1172 for _ in 0..ITERATIONS {
1173 let (tx, mut rx) = mpsc::channel(64);
1174 sync.get_many(id, store::Query::all().into(), tx).await?;
1175 while rx.recv().await?.is_some() {}
1176 }
1177
1178 let mut last = sync.debug_tasks_len().await?;
1179 let deadline = std::time::Instant::now() + Duration::from_secs(10);
1180 while last > 16 && std::time::Instant::now() < deadline {
1181 tokio::time::sleep(Duration::from_millis(50)).await;
1182 last = sync.debug_tasks_len().await?;
1183 }
1184
1185 assert!(
1186 last <= 16,
1187 "residual Actor.tasks JoinSet len = {last}, expected <= 16 \
1188 (was the join_next arm in run_async lost? streamer tasks \
1189 for ListAuthors / ListReplicas / GetMany are not being reaped)"
1190 );
1191
1192 sync.close(id).await?;
1193 Ok(())
1194 }
1195}