1use std::{
10 cmp::Ordering,
11 fmt::Debug,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14};
15
16use bytes::{Bytes, BytesMut};
17use ed25519_dalek::{Signature, SignatureError};
18use iroh_blobs::Hash;
19use n0_future::{
20 time::{Duration, SystemTime},
21 IterExt,
22};
23use serde::{Deserialize, Serialize};
24
25pub use crate::heads::AuthorHeads;
26use crate::{
27 keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
28 ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
29 store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
30};
31
32pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
36
37pub type PeerIdBytes = [u8; 32];
40
41pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
44
45pub type ContentStatusCallback =
47 Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
48
49#[derive(Debug, Clone)]
51pub enum Event {
52 LocalInsert {
54 namespace: NamespaceId,
56 entry: SignedEntry,
58 },
59 RemoteInsert {
61 namespace: NamespaceId,
63 entry: SignedEntry,
65 from: PeerIdBytes,
67 should_download: bool,
69 remote_content_status: ContentStatus,
71 },
72}
73
74#[derive(Debug, Clone)]
76pub enum InsertOrigin {
77 Local,
79 Sync {
81 from: PeerIdBytes,
83 remote_content_status: ContentStatus,
85 },
86}
87
88#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
90pub enum ContentStatus {
91 Complete,
93 Incomplete,
95 Missing,
97}
98
99#[derive(Debug, Clone, Default)]
101pub struct SyncOutcome {
102 pub heads_received: AuthorHeads,
104 pub num_recv: usize,
106 pub num_sent: usize,
108}
109
110fn get_as_ptr<T>(value: &T) -> Option<usize> {
111 use std::mem;
112 if mem::size_of::<T>() == std::mem::size_of::<usize>()
113 && mem::align_of::<T>() == mem::align_of::<usize>()
114 {
115 unsafe { Some(mem::transmute_copy(value)) }
117 } else {
118 None
119 }
120}
121
122fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
123 get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
124}
125
126#[derive(Debug, Default)]
127struct Subscribers(Vec<async_channel::Sender<Event>>);
128impl Subscribers {
129 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
130 self.0.push(sender)
131 }
132 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
133 self.0.retain(|s| !same_channel(s, sender));
134 }
135 pub async fn send(&mut self, event: Event) {
136 self.0 = std::mem::take(&mut self.0)
137 .into_iter()
138 .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx))
139 .join_all()
140 .await
141 .into_iter()
142 .flatten()
143 .collect();
144 }
145 pub fn len(&self) -> usize {
146 self.0.len()
147 }
148 pub async fn send_with(&mut self, f: impl FnOnce() -> Event) {
149 if !self.0.is_empty() {
150 self.send(f()).await
151 }
152 }
153}
154
155#[derive(
157 Debug,
158 Clone,
159 Copy,
160 Serialize,
161 Deserialize,
162 num_enum::IntoPrimitive,
163 num_enum::TryFromPrimitive,
164 strum::Display,
165)]
166#[repr(u8)]
167#[strum(serialize_all = "snake_case")]
168pub enum CapabilityKind {
169 Write = 1,
171 Read = 2,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
177pub enum Capability {
178 Write(NamespaceSecret),
180 Read(NamespaceId),
182}
183
184impl Capability {
185 pub fn id(&self) -> NamespaceId {
187 match self {
188 Capability::Write(secret) => secret.id(),
189 Capability::Read(id) => *id,
190 }
191 }
192
193 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
196 match self {
197 Capability::Write(secret) => Ok(secret),
198 Capability::Read(_) => Err(ReadOnly),
199 }
200 }
201
202 pub fn kind(&self) -> CapabilityKind {
204 match self {
205 Capability::Write(_) => CapabilityKind::Write,
206 Capability::Read(_) => CapabilityKind::Read,
207 }
208 }
209
210 pub fn raw(&self) -> (u8, [u8; 32]) {
212 let capability_repr: u8 = self.kind().into();
213 let bytes = match self {
214 Capability::Write(secret) => secret.to_bytes(),
215 Capability::Read(id) => id.to_bytes(),
216 };
217 (capability_repr, bytes)
218 }
219
220 pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
222 let kind: CapabilityKind = kind.try_into()?;
223 let capability = match kind {
224 CapabilityKind::Write => {
225 let secret = NamespaceSecret::from_bytes(bytes);
226 Capability::Write(secret)
227 }
228 CapabilityKind::Read => {
229 let id = NamespaceId::from(bytes);
230 Capability::Read(id)
231 }
232 };
233 Ok(capability)
234 }
235
236 pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
242 if other.id() != self.id() {
243 return Err(CapabilityError::NamespaceMismatch);
244 }
245
246 if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
248 let _ = std::mem::replace(self, other);
249 Ok(true)
250 } else {
251 Ok(false)
252 }
253 }
254}
255
256#[derive(Debug, thiserror::Error)]
258pub enum CapabilityError {
259 #[error("Namespaces are not the same")]
261 NamespaceMismatch,
262}
263
264#[derive(derive_more::Debug)]
266pub struct ReplicaInfo {
267 pub(crate) capability: Capability,
268 subscribers: Subscribers,
269 #[debug("ContentStatusCallback")]
270 content_status_cb: Option<ContentStatusCallback>,
271 closed: bool,
272}
273
274impl ReplicaInfo {
275 pub fn new(capability: Capability) -> Self {
277 Self {
278 capability,
279 subscribers: Default::default(),
280 content_status_cb: None,
282 closed: false,
283 }
284 }
285
286 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
292 self.subscribers.subscribe(sender)
293 }
294
295 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
301 self.subscribers.unsubscribe(sender)
302 }
303
304 pub fn subscribers_count(&self) -> usize {
306 self.subscribers.len()
307 }
308
309 pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
314 if self.content_status_cb.is_some() {
315 false
316 } else {
317 self.content_status_cb = Some(cb);
318 true
319 }
320 }
321
322 fn ensure_open(&self) -> Result<(), InsertError> {
323 if self.closed() {
324 Err(InsertError::Closed)
325 } else {
326 Ok(())
327 }
328 }
329
330 pub fn closed(&self) -> bool {
336 self.closed
337 }
338
339 pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
346 self.capability.merge(capability)
347 }
348}
349
350#[derive(derive_more::Debug)]
352pub struct Replica<'a, I = Box<ReplicaInfo>> {
353 pub(crate) store: StoreInstance<'a>,
354 pub(crate) info: I,
355}
356
357impl<'a, I> Replica<'a, I>
358where
359 I: Deref<Target = ReplicaInfo> + DerefMut,
360{
361 pub fn new(store: StoreInstance<'a>, info: I) -> Self {
363 Replica { info, store }
364 }
365
366 pub async fn insert(
374 &mut self,
375 key: impl AsRef<[u8]>,
376 author: &Author,
377 hash: Hash,
378 len: u64,
379 ) -> Result<usize, InsertError> {
380 if len == 0 || hash == Hash::EMPTY {
381 return Err(InsertError::EntryIsEmpty);
382 }
383 self.info.ensure_open()?;
384 let id = RecordIdentifier::new(self.id(), author.id(), key);
385 let record = Record::new_current(hash, len);
386 let entry = Entry::new(id, record);
387 let secret = self.secret_key()?;
388 let signed_entry = entry.sign(secret, author);
389 self.insert_entry(signed_entry, InsertOrigin::Local).await
390 }
391
392 pub async fn delete_prefix(
399 &mut self,
400 prefix: impl AsRef<[u8]>,
401 author: &Author,
402 ) -> Result<usize, InsertError> {
403 self.info.ensure_open()?;
404 let id = RecordIdentifier::new(self.id(), author.id(), prefix);
405 let entry = Entry::new_empty(id);
406 let signed_entry = entry.sign(self.secret_key()?, author);
407 self.insert_entry(signed_entry, InsertOrigin::Local).await
408 }
409
410 pub async fn insert_remote_entry(
418 &mut self,
419 entry: SignedEntry,
420 received_from: PeerIdBytes,
421 content_status: ContentStatus,
422 ) -> Result<usize, InsertError> {
423 self.info.ensure_open()?;
424 entry.validate_empty()?;
425 let origin = InsertOrigin::Sync {
426 from: received_from,
427 remote_content_status: content_status,
428 };
429 self.insert_entry(entry, origin).await
430 }
431
432 async fn insert_entry(
436 &mut self,
437 entry: SignedEntry,
438 origin: InsertOrigin,
439 ) -> Result<usize, InsertError> {
440 let namespace = self.id();
441
442 let store = &self.store;
443 validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
444
445 let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
446 tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
447
448 let removed_count = match outcome {
449 InsertOutcome::Inserted { removed } => removed,
450 InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
451 };
452
453 let insert_event = match origin {
454 InsertOrigin::Local => Event::LocalInsert { namespace, entry },
455 InsertOrigin::Sync {
456 from,
457 remote_content_status,
458 } => {
459 let download_policy = self
460 .store
461 .get_download_policy(&self.id())
462 .unwrap_or_default();
463 let should_download = download_policy.matches(entry.entry());
464 Event::RemoteInsert {
465 namespace,
466 entry,
467 from,
468 should_download,
469 remote_content_status,
470 }
471 }
472 };
473
474 self.info.subscribers.send(insert_event).await;
475
476 Ok(removed_count)
477 }
478
479 pub async fn hash_and_insert(
484 &mut self,
485 key: impl AsRef<[u8]>,
486 author: &Author,
487 data: impl AsRef<[u8]>,
488 ) -> Result<Hash, InsertError> {
489 self.info.ensure_open()?;
490 let len = data.as_ref().len() as u64;
491 let hash = Hash::new(data);
492 self.insert(key, author, hash, len).await?;
493 Ok(hash)
494 }
495
496 pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
498 RecordIdentifier::new(self.info.capability.id(), author.id(), key)
499 }
500
501 pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
503 self.info.ensure_open().map_err(anyhow::Error::from)?;
504 self.store.initial_message()
505 }
506
507 pub async fn sync_process_message(
511 &mut self,
512 message: crate::ranger::Message<SignedEntry>,
513 from_peer: PeerIdBytes,
514 state: &mut SyncOutcome,
515 ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
516 self.info.ensure_open()?;
517 let my_namespace = self.id();
518 let now = system_time_now();
519
520 state.num_recv += message.value_count();
522 for (entry, _content_status) in message.values() {
523 state
524 .heads_received
525 .insert(entry.author(), entry.timestamp());
526 }
527
528 let cb = self.info.content_status_cb.clone();
531 let download_policy = self
532 .store
533 .get_download_policy(&my_namespace)
534 .unwrap_or_default();
535 let reply = self
536 .store
537 .process_message(
538 &Default::default(),
539 message,
540 |store, entry, content_status| {
542 let origin = InsertOrigin::Sync {
543 from: from_peer,
544 remote_content_status: content_status,
545 };
546 validate_entry(now, store, my_namespace, entry, &origin).is_ok()
547 },
548 async |_store, entry, content_status| {
550 self.info
552 .subscribers
553 .send_with(|| {
554 let should_download = download_policy.matches(entry.entry());
555 Event::RemoteInsert {
556 from: from_peer,
557 namespace: my_namespace,
558 entry: entry.clone(),
559 should_download,
560 remote_content_status: content_status,
561 }
562 })
563 .await
564 },
565 async move |entry| {
567 if let Some(cb) = cb.as_ref() {
568 cb(entry.content_hash()).await
569 } else {
570 ContentStatus::Missing
571 }
572 },
573 )
574 .await?;
575
576 if let Some(ref reply) = reply {
578 state.num_sent += reply.value_count();
579 }
580
581 Ok(reply)
582 }
583
584 pub fn id(&self) -> NamespaceId {
586 self.info.capability.id()
587 }
588
589 pub fn capability(&self) -> &Capability {
591 &self.info.capability
592 }
593
594 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
597 self.info.capability.secret_key()
598 }
599}
600
601#[derive(Debug, thiserror::Error)]
603#[error("Replica allows read access only.")]
604pub struct ReadOnly;
605
606fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
614 now: u64,
615 store: &S,
616 expected_namespace: NamespaceId,
617 entry: &SignedEntry,
618 origin: &InsertOrigin,
619) -> Result<(), ValidationFailure> {
620 if entry.namespace() != expected_namespace {
622 return Err(ValidationFailure::InvalidNamespace);
623 }
624
625 if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
627 return Err(ValidationFailure::BadSignature);
628 }
629
630 if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
632 return Err(ValidationFailure::TooFarInTheFuture);
633 }
634 Ok(())
635}
636
637#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
639pub enum InsertError {
640 #[error("storage error")]
642 Store(anyhow::Error),
643 #[error("validation failure")]
645 Validation(#[from] ValidationFailure),
646 #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
648 NewerEntryExists,
649 #[error("Attempted to insert an empty entry")]
651 EntryIsEmpty,
652 #[error("Attempted to insert to read only replica")]
654 #[from(ReadOnly)]
655 ReadOnly,
656 #[error("replica is closed")]
658 Closed,
659}
660
661#[derive(thiserror::Error, Debug)]
663pub enum ValidationFailure {
664 #[error("Entry namespace does not match the current replica")]
666 InvalidNamespace,
667 #[error("Entry signature is invalid")]
669 BadSignature,
670 #[error("Entry timestamp is too far in the future.")]
672 TooFarInTheFuture,
673 #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
675 InvalidEmptyEntry,
676}
677
678#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
680pub struct SignedEntry {
681 signature: EntrySignature,
682 entry: Entry,
683}
684
685impl From<SignedEntry> for Entry {
686 fn from(value: SignedEntry) -> Self {
687 value.entry
688 }
689}
690
691impl PartialOrd for SignedEntry {
692 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
693 Some(self.cmp(other))
694 }
695}
696
697impl Ord for SignedEntry {
698 fn cmp(&self, other: &Self) -> Ordering {
699 self.entry.cmp(&other.entry)
700 }
701}
702
703impl PartialOrd for Entry {
704 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
705 Some(self.cmp(other))
706 }
707}
708
709impl Ord for Entry {
710 fn cmp(&self, other: &Self) -> Ordering {
711 self.id
712 .cmp(&other.id)
713 .then_with(|| self.record.cmp(&other.record))
714 }
715}
716
717impl SignedEntry {
718 pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
719 SignedEntry { signature, entry }
720 }
721
722 pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
724 let signature = EntrySignature::from_entry(&entry, namespace, author);
725 SignedEntry { signature, entry }
726 }
727
728 pub fn from_parts(
730 namespace: &NamespaceSecret,
731 author: &Author,
732 key: impl AsRef<[u8]>,
733 record: Record,
734 ) -> Self {
735 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
736 let entry = Entry::new(id, record);
737 Self::from_entry(entry, namespace, author)
738 }
739
740 pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
742 self.signature.verify(
743 &self.entry,
744 &self.entry.namespace().public_key(store)?,
745 &self.entry.author().public_key(store)?,
746 )
747 }
748
749 pub fn signature(&self) -> &EntrySignature {
751 &self.signature
752 }
753
754 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
756 self.entry().validate_empty()
757 }
758
759 pub fn entry(&self) -> &Entry {
761 &self.entry
762 }
763
764 pub fn content_hash(&self) -> Hash {
766 self.entry().content_hash()
767 }
768
769 pub fn content_len(&self) -> u64 {
771 self.entry().content_len()
772 }
773
774 pub fn author_bytes(&self) -> AuthorId {
776 self.entry().id().author()
777 }
778
779 pub fn key(&self) -> &[u8] {
781 self.entry().id().key()
782 }
783
784 pub fn timestamp(&self) -> u64 {
786 self.entry().timestamp()
787 }
788}
789
790impl RangeEntry for SignedEntry {
791 type Key = RecordIdentifier;
792 type Value = Record;
793
794 fn key(&self) -> &Self::Key {
795 &self.entry.id
796 }
797
798 fn value(&self) -> &Self::Value {
799 &self.entry.record
800 }
801
802 fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
803 let mut hasher = blake3::Hasher::new();
804 hasher.update(self.namespace().as_ref());
805 hasher.update(self.author_bytes().as_ref());
806 hasher.update(self.key());
807 hasher.update(&self.timestamp().to_be_bytes());
808 hasher.update(self.content_hash().as_bytes());
809 Fingerprint(hasher.finalize().into())
810 }
811}
812
813#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
815pub struct EntrySignature {
816 author_signature: Signature,
817 namespace_signature: Signature,
818}
819
820impl Debug for EntrySignature {
821 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822 f.debug_struct("EntrySignature")
823 .field(
824 "namespace_signature",
825 &hex::encode(self.namespace_signature.to_bytes()),
826 )
827 .field(
828 "author_signature",
829 &hex::encode(self.author_signature.to_bytes()),
830 )
831 .finish()
832 }
833}
834
835impl EntrySignature {
836 pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
838 let bytes = entry.to_vec();
841 let namespace_signature = namespace.sign(&bytes);
842 let author_signature = author.sign(&bytes);
843
844 EntrySignature {
845 author_signature,
846 namespace_signature,
847 }
848 }
849
850 pub fn verify(
853 &self,
854 entry: &Entry,
855 namespace: &NamespacePublicKey,
856 author: &AuthorPublicKey,
857 ) -> Result<(), SignatureError> {
858 let bytes = entry.to_vec();
859 namespace.verify(&bytes, &self.namespace_signature)?;
860 author.verify(&bytes, &self.author_signature)?;
861
862 Ok(())
863 }
864
865 pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
866 let namespace_signature = Signature::from_bytes(namespace_sig);
867 let author_signature = Signature::from_bytes(author_sig);
868
869 EntrySignature {
870 author_signature,
871 namespace_signature,
872 }
873 }
874
875 pub(crate) fn author(&self) -> &Signature {
876 &self.author_signature
877 }
878
879 pub(crate) fn namespace(&self) -> &Signature {
880 &self.namespace_signature
881 }
882}
883
884#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
890pub struct Entry {
891 id: RecordIdentifier,
892 record: Record,
893}
894
895impl Entry {
896 pub fn new(id: RecordIdentifier, record: Record) -> Self {
898 Entry { id, record }
899 }
900
901 pub fn new_empty(id: RecordIdentifier) -> Self {
903 Entry {
904 id,
905 record: Record::empty_current(),
906 }
907 }
908
909 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
911 match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
912 (true, true) => Ok(()),
913 (false, false) => Ok(()),
914 (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
915 (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
916 }
917 }
918
919 pub fn id(&self) -> &RecordIdentifier {
921 &self.id
922 }
923
924 pub fn namespace(&self) -> NamespaceId {
926 self.id.namespace()
927 }
928
929 pub fn author(&self) -> AuthorId {
931 self.id.author()
932 }
933
934 pub fn key(&self) -> &[u8] {
936 self.id.key()
937 }
938
939 pub fn record(&self) -> &Record {
941 &self.record
942 }
943
944 pub fn content_hash(&self) -> Hash {
946 self.record.hash
947 }
948
949 pub fn content_len(&self) -> u64 {
951 self.record.len
952 }
953
954 pub fn timestamp(&self) -> u64 {
956 self.record.timestamp
957 }
958
959 pub fn encode(&self, out: &mut Vec<u8>) {
961 self.id.encode(out);
962 self.record.encode(out);
963 }
964
965 pub fn to_vec(&self) -> Vec<u8> {
967 let mut out = Vec::new();
968 self.encode(&mut out);
969 out
970 }
971
972 pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
974 SignedEntry::from_entry(self, namespace, author)
975 }
976}
977
978const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
979const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
980const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
981
982#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
984pub struct RecordIdentifier(Bytes);
985
986impl Default for RecordIdentifier {
987 fn default() -> Self {
988 Self::new(NamespaceId::default(), AuthorId::default(), b"")
989 }
990}
991
992impl Debug for RecordIdentifier {
993 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
994 f.debug_struct("RecordIdentifier")
995 .field("namespace", &self.namespace())
996 .field("author", &self.author())
997 .field("key", &std::string::String::from_utf8_lossy(self.key()))
998 .finish()
999 }
1000}
1001
1002impl RangeKey for RecordIdentifier {
1003 #[cfg(test)]
1004 fn is_prefix_of(&self, other: &Self) -> bool {
1005 other.as_ref().starts_with(self.as_ref())
1006 }
1007}
1008
1009fn system_time_now() -> u64 {
1010 SystemTime::now()
1011 .duration_since(SystemTime::UNIX_EPOCH)
1012 .expect("time drift")
1013 .as_micros() as u64
1014}
1015
1016impl RecordIdentifier {
1017 pub fn new(
1019 namespace: impl Into<NamespaceId>,
1020 author: impl Into<AuthorId>,
1021 key: impl AsRef<[u8]>,
1022 ) -> Self {
1023 let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1024 bytes.extend_from_slice(namespace.into().as_bytes());
1025 bytes.extend_from_slice(author.into().as_bytes());
1026 bytes.extend_from_slice(key.as_ref());
1027 Self(bytes.freeze())
1028 }
1029
1030 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1032 out.extend_from_slice(&self.0);
1033 }
1034
1035 pub fn as_bytes(&self) -> Bytes {
1037 self.0.clone()
1038 }
1039
1040 pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1042 (
1043 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1044 self.0[AUTHOR_BYTES].try_into().unwrap(),
1045 &self.0[KEY_BYTES],
1046 )
1047 }
1048
1049 pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1051 (
1052 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1053 self.0[AUTHOR_BYTES].try_into().unwrap(),
1054 self.0.slice(KEY_BYTES),
1055 )
1056 }
1057
1058 pub fn key(&self) -> &[u8] {
1060 &self.0[KEY_BYTES]
1061 }
1062
1063 pub fn key_bytes(&self) -> Bytes {
1065 self.0.slice(KEY_BYTES)
1066 }
1067
1068 pub fn namespace(&self) -> NamespaceId {
1070 let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1071 value.into()
1072 }
1073
1074 pub fn author(&self) -> AuthorId {
1076 let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1077 value.into()
1078 }
1079}
1080
1081impl AsRef<[u8]> for RecordIdentifier {
1082 fn as_ref(&self) -> &[u8] {
1083 &self.0
1084 }
1085}
1086
1087impl Deref for SignedEntry {
1088 type Target = Entry;
1089 fn deref(&self) -> &Self::Target {
1090 &self.entry
1091 }
1092}
1093
1094impl Deref for Entry {
1095 type Target = Record;
1096 fn deref(&self) -> &Self::Target {
1097 &self.record
1098 }
1099}
1100
1101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1103pub struct Record {
1104 len: u64,
1106 hash: Hash,
1108 timestamp: u64,
1110}
1111
1112impl RangeValue for Record {}
1113
1114impl Ord for Record {
1118 fn cmp(&self, other: &Self) -> Ordering {
1119 self.timestamp
1120 .cmp(&other.timestamp)
1121 .then_with(|| self.hash.cmp(&other.hash))
1122 }
1123}
1124
1125impl PartialOrd for Record {
1126 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1127 Some(self.cmp(other))
1128 }
1129}
1130
1131impl Record {
1132 pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1134 debug_assert!(
1135 len != 0 || hash == Hash::EMPTY,
1136 "if `len` is 0 then `hash` must be the hash of the empty byte range"
1137 );
1138 Record {
1139 hash,
1140 len,
1141 timestamp,
1142 }
1143 }
1144
1145 pub fn empty(timestamp: u64) -> Self {
1147 Self::new(Hash::EMPTY, 0, timestamp)
1148 }
1149
1150 pub fn empty_current() -> Self {
1152 Self::new_current(Hash::EMPTY, 0)
1153 }
1154
1155 pub fn is_empty(&self) -> bool {
1157 self.hash == Hash::EMPTY
1158 }
1159
1160 pub fn new_current(hash: Hash, len: u64) -> Self {
1162 let timestamp = system_time_now();
1163 Self::new(hash, len, timestamp)
1164 }
1165
1166 pub fn content_len(&self) -> u64 {
1168 self.len
1169 }
1170
1171 pub fn content_hash(&self) -> Hash {
1173 self.hash
1174 }
1175
1176 pub fn timestamp(&self) -> u64 {
1178 self.timestamp
1179 }
1180
1181 #[cfg(test)]
1182 pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1183 let len = data.as_ref().len() as u64;
1184 let hash = Hash::new(data);
1185 Self::new_current(hash, len)
1186 }
1187
1188 #[cfg(test)]
1189 pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1190 let len = data.as_ref().len() as u64;
1191 let hash = Hash::new(data);
1192 Self::new(hash, len, timestamp)
1193 }
1194
1195 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1197 out.extend_from_slice(&self.len.to_be_bytes());
1198 out.extend_from_slice(self.hash.as_ref());
1199 out.extend_from_slice(&self.timestamp.to_be_bytes())
1200 }
1201}
1202
1203#[cfg(test)]
1204mod tests {
1205 use std::collections::HashSet;
1206
1207 use anyhow::Result;
1208 use rand::SeedableRng;
1209
1210 use super::*;
1211 use crate::{
1212 actor::SyncHandle,
1213 ranger::{Range, Store as _},
1214 store::{OpenError, Query, SortBy, SortDirection, Store},
1215 };
1216
1217 #[tokio::test]
1218 async fn test_basics_memory() -> Result<()> {
1219 let store = store::Store::memory();
1220 test_basics(store).await?;
1221
1222 Ok(())
1223 }
1224
1225 #[tokio::test]
1226 #[cfg(feature = "fs-store")]
1227 async fn test_basics_fs() -> Result<()> {
1228 let dbfile = tempfile::NamedTempFile::new()?;
1229 let store = store::fs::Store::persistent(dbfile.path())?;
1230 test_basics(store).await?;
1231 Ok(())
1232 }
1233
1234 async fn test_basics(mut store: Store) -> Result<()> {
1235 let mut rng = rand::rng();
1236 let alice = Author::new(&mut rng);
1237 let bob = Author::new(&mut rng);
1238 let myspace = NamespaceSecret::new(&mut rng);
1239
1240 let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1241 let record = Record::current_from_data(b"this is my cool data");
1242 let entry = Entry::new(record_id, record);
1243 let signed_entry = entry.sign(&myspace, &alice);
1244 signed_entry.verify(&()).expect("failed to verify");
1245
1246 let mut my_replica = store.new_replica(myspace.clone())?;
1247 for i in 0..10 {
1248 my_replica
1249 .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice"))
1250 .await?;
1251 }
1252
1253 for i in 0..10 {
1254 let res = store
1255 .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1256 .unwrap();
1257 let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1258 assert_eq!(res.entry().record().content_len(), len);
1259 res.verify(&())?;
1260 }
1261
1262 let mut my_replica = store.new_replica(myspace.clone())?;
1264 my_replica
1265 .hash_and_insert("/cool/path", &alice, "round 1")
1266 .await?;
1267 let _entry = store
1268 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1269 .unwrap();
1270 let mut my_replica = store.new_replica(myspace.clone())?;
1272 my_replica
1273 .hash_and_insert("/cool/path", &alice, "round 2")
1274 .await?;
1275 let _entry = store
1276 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1277 .unwrap();
1278
1279 let entries: Vec<_> = store
1281 .get_many(myspace.id(), Query::author(alice.id()))?
1282 .collect::<Result<_>>()?;
1283 assert_eq!(entries.len(), 11);
1284
1285 let entries: Vec<_> = store
1287 .get_many(myspace.id(), Query::author(bob.id()))?
1288 .collect::<Result<_>>()?;
1289 assert_eq!(entries.len(), 0);
1290
1291 let entries: Vec<_> = store
1293 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1294 .collect::<Result<_>>()?;
1295 assert_eq!(entries.len(), 1);
1296
1297 let entries: Vec<_> = store
1299 .get_many(myspace.id(), Query::all())?
1300 .collect::<Result<_>>()?;
1301 assert_eq!(entries.len(), 11);
1302
1303 let mut my_replica = store.new_replica(myspace.clone())?;
1305 let _entry = my_replica
1306 .hash_and_insert("/cool/path", &bob, "bob round 1")
1307 .await?;
1308
1309 let entries: Vec<_> = store
1311 .get_many(myspace.id(), Query::author(alice.id()))?
1312 .collect::<Result<_>>()?;
1313 assert_eq!(entries.len(), 11);
1314
1315 let entries: Vec<_> = store
1316 .get_many(myspace.id(), Query::author(bob.id()))?
1317 .collect::<Result<_>>()?;
1318 assert_eq!(entries.len(), 1);
1319
1320 let entries: Vec<_> = store
1322 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1323 .collect::<Result<_>>()?;
1324 assert_eq!(entries.len(), 2);
1325
1326 let entries: Vec<_> = store
1328 .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1329 .collect::<Result<_>>()?;
1330 assert_eq!(entries.len(), 2);
1331
1332 let entries: Vec<_> = store
1334 .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1335 .collect::<Result<_>>()?;
1336 assert_eq!(entries.len(), 1);
1337
1338 let entries: Vec<_> = store
1339 .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1340 .collect::<Result<_>>()?;
1341 assert_eq!(entries.len(), 1);
1342
1343 let entries: Vec<_> = store
1345 .get_many(myspace.id(), Query::all())?
1346 .collect::<Result<_>>()?;
1347 assert_eq!(entries.len(), 12);
1348
1349 let mut my_replica = store.new_replica(myspace.clone())?;
1351 let entries_second: Vec<_> = my_replica
1352 .store
1353 .get_range(Range::new(
1354 RecordIdentifier::default(),
1355 RecordIdentifier::default(),
1356 ))?
1357 .collect::<Result<_, _>>()?;
1358
1359 assert_eq!(entries_second.len(), 12);
1360 assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1361
1362 test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1363 store.flush()?;
1364 Ok(())
1365 }
1366
1367 fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1370 #[track_caller]
1372 fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1373 assert_eq!(
1374 expected_peers,
1375 &store
1376 .get_sync_peers(&namespace)
1377 .unwrap()
1378 .unwrap()
1379 .collect::<Vec<_>>(),
1380 "sync peers differ"
1381 );
1382 }
1383
1384 let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1385 let mut expected_peers = Vec::with_capacity(count);
1387 for i in 0..count as u8 {
1388 let peer = [i; 32];
1389 expected_peers.insert(0, peer);
1390 store.register_useful_peer(namespace, peer)?;
1391 }
1392 verify_peers(store, namespace, &expected_peers);
1393
1394 expected_peers.pop();
1396 let newer_peer = [count as u8; 32];
1397 expected_peers.insert(0, newer_peer);
1398 store.register_useful_peer(namespace, newer_peer)?;
1399 verify_peers(store, namespace, &expected_peers);
1400
1401 let refreshed_peer = expected_peers.remove(2);
1403 expected_peers.insert(0, refreshed_peer);
1404 store.register_useful_peer(namespace, refreshed_peer)?;
1405 verify_peers(store, namespace, &expected_peers);
1406 Ok(())
1407 }
1408
1409 #[tokio::test]
1410 async fn test_content_hashes_iterator_memory() -> Result<()> {
1411 let store = store::Store::memory();
1412 test_content_hashes_iterator(store).await
1413 }
1414
1415 #[tokio::test]
1416 #[cfg(feature = "fs-store")]
1417 async fn test_content_hashes_iterator_fs() -> Result<()> {
1418 let dbfile = tempfile::NamedTempFile::new()?;
1419 let store = store::fs::Store::persistent(dbfile.path())?;
1420 test_content_hashes_iterator(store).await
1421 }
1422
1423 async fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1424 let mut rng = rand::rng();
1425 let mut expected = HashSet::new();
1426 let n_replicas = 3;
1427 let n_entries = 4;
1428 for i in 0..n_replicas {
1429 let namespace = NamespaceSecret::new(&mut rng);
1430 let author = store.new_author(&mut rng)?;
1431 let mut replica = store.new_replica(namespace)?;
1432 for j in 0..n_entries {
1433 let key = format!("{j}");
1434 let data = format!("{i}:{j}");
1435 let hash = replica.hash_and_insert(key, &author, data).await?;
1436 expected.insert(hash);
1437 }
1438 }
1439 assert_eq!(expected.len(), n_replicas * n_entries);
1440 let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1441 assert_eq!(actual, expected);
1442 Ok(())
1443 }
1444
1445 #[test]
1446 fn test_multikey() {
1447 let mut rng = rand::rng();
1448
1449 let k = ["a", "c", "z"];
1450
1451 let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1452 n.sort_by_key(|n| n.id());
1453
1454 let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1455 a.sort_by_key(|a| a.id());
1456
1457 {
1459 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1460 let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1461 let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1462
1463 let range = Range::new(ri0.clone(), ri2.clone());
1464 assert!(range.contains(&ri0), "start");
1465 assert!(range.contains(&ri1), "inside");
1466 assert!(!range.contains(&ri2), "end");
1467
1468 assert!(ri0 < ri1);
1469 assert!(ri1 < ri2);
1470 }
1471
1472 {
1474 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1475 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1476 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1477
1478 let range = Range::new(ri0.clone(), ri2.clone());
1479 assert!(range.contains(&ri0), "start");
1480 assert!(range.contains(&ri1), "inside");
1481 assert!(!range.contains(&ri2), "end");
1482
1483 assert!(ri0 < ri1);
1484 assert!(ri1 < ri2);
1485 }
1486
1487 {
1489 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1490 let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1491 let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1492
1493 let range = Range::new(ri0.clone(), ri2.clone());
1494 assert!(range.contains(&ri0), "start");
1495 assert!(range.contains(&ri1), "inside");
1496 assert!(!range.contains(&ri2), "end");
1497
1498 assert!(ri0 < ri1);
1499 assert!(ri1 < ri2);
1500 }
1501
1502 {
1504 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1505 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1506 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1507
1508 let range = Range::new(ri0.clone(), ri2.clone());
1509 assert!(range.contains(&ri0), "start");
1510 assert!(range.contains(&ri1), "inside");
1511 assert!(!range.contains(&ri2), "end");
1512
1513 assert!(ri0 < ri1);
1514 assert!(ri1 < ri2);
1515 }
1516
1517 {
1519 let a0 = a[0].id();
1522 let a1 = a[1].id();
1523 let n0 = n[0].id();
1524 let n1 = n[1].id();
1525 let k0 = k[0];
1526 let k1 = k[1];
1527
1528 assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1529 assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1530 assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1531 assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1532 }
1533 }
1534
1535 #[tokio::test]
1536 async fn test_timestamps_memory() -> Result<()> {
1537 let store = store::Store::memory();
1538 test_timestamps(store).await?;
1539
1540 Ok(())
1541 }
1542
1543 #[tokio::test]
1544 #[cfg(feature = "fs-store")]
1545 async fn test_timestamps_fs() -> Result<()> {
1546 let dbfile = tempfile::NamedTempFile::new()?;
1547 let store = store::fs::Store::persistent(dbfile.path())?;
1548 test_timestamps(store).await?;
1549 Ok(())
1550 }
1551
1552 async fn test_timestamps(mut store: Store) -> Result<()> {
1553 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1554 let namespace = NamespaceSecret::new(&mut rng);
1555 let _replica = store.new_replica(namespace.clone())?;
1556 let author = store.new_author(&mut rng)?;
1557 store.close_replica(namespace.id());
1558 let mut replica = store.open_replica(&namespace.id())?;
1559
1560 let key = b"hello";
1561 let value = b"world";
1562 let entry = {
1563 let timestamp = 2;
1564 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1565 let record = Record::from_data(value, timestamp);
1566 Entry::new(id, record).sign(&namespace, &author)
1567 };
1568
1569 replica
1570 .insert_entry(entry.clone(), InsertOrigin::Local)
1571 .await
1572 .unwrap();
1573 store.close_replica(namespace.id());
1574 let res = store
1575 .get_exact(namespace.id(), author.id(), key, false)?
1576 .unwrap();
1577 assert_eq!(res, entry);
1578
1579 let entry2 = {
1580 let timestamp = 1;
1581 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1582 let record = Record::from_data(value, timestamp);
1583 Entry::new(id, record).sign(&namespace, &author)
1584 };
1585
1586 let mut replica = store.open_replica(&namespace.id())?;
1587 let res = replica.insert_entry(entry2, InsertOrigin::Local).await;
1588 store.close_replica(namespace.id());
1589 assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1590 let res = store
1591 .get_exact(namespace.id(), author.id(), key, false)?
1592 .unwrap();
1593 assert_eq!(res, entry);
1594 store.flush()?;
1595 Ok(())
1596 }
1597
1598 #[tokio::test]
1599 async fn test_replica_sync_memory() -> Result<()> {
1600 let alice_store = store::Store::memory();
1601 let bob_store = store::Store::memory();
1602
1603 test_replica_sync(alice_store, bob_store).await?;
1604 Ok(())
1605 }
1606
1607 #[tokio::test]
1608 #[cfg(feature = "fs-store")]
1609 async fn test_replica_sync_fs() -> Result<()> {
1610 let alice_dbfile = tempfile::NamedTempFile::new()?;
1611 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1612 let bob_dbfile = tempfile::NamedTempFile::new()?;
1613 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1614 test_replica_sync(alice_store, bob_store).await?;
1615
1616 Ok(())
1617 }
1618
1619 async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1620 let alice_set = ["ape", "eel", "fox", "gnu"];
1621 let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1622
1623 let mut rng = rand::rng();
1624 let author = Author::new(&mut rng);
1625 let myspace = NamespaceSecret::new(&mut rng);
1626 let mut alice = alice_store.new_replica(myspace.clone())?;
1627 for el in &alice_set {
1628 alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1629 }
1630
1631 let mut bob = bob_store.new_replica(myspace.clone())?;
1632 for el in &bob_set {
1633 bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1634 }
1635
1636 let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1637
1638 assert_eq!(alice_out.num_sent, 2);
1639 assert_eq!(bob_out.num_recv, 2);
1640 assert_eq!(alice_out.num_recv, 6);
1641 assert_eq!(bob_out.num_sent, 6);
1642
1643 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1644 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1645 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1646 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1647 alice_store.flush()?;
1648 bob_store.flush()?;
1649 Ok(())
1650 }
1651
1652 #[tokio::test]
1653 async fn test_replica_timestamp_sync_memory() -> Result<()> {
1654 let alice_store = store::Store::memory();
1655 let bob_store = store::Store::memory();
1656
1657 test_replica_timestamp_sync(alice_store, bob_store).await?;
1658 Ok(())
1659 }
1660
1661 #[tokio::test]
1662 #[cfg(feature = "fs-store")]
1663 async fn test_replica_timestamp_sync_fs() -> Result<()> {
1664 let alice_dbfile = tempfile::NamedTempFile::new()?;
1665 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1666 let bob_dbfile = tempfile::NamedTempFile::new()?;
1667 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1668 test_replica_timestamp_sync(alice_store, bob_store).await?;
1669
1670 Ok(())
1671 }
1672
1673 async fn test_replica_timestamp_sync(
1674 mut alice_store: Store,
1675 mut bob_store: Store,
1676 ) -> Result<()> {
1677 let mut rng = rand::rng();
1678 let author = Author::new(&mut rng);
1679 let namespace = NamespaceSecret::new(&mut rng);
1680 let mut alice = alice_store.new_replica(namespace.clone())?;
1681 let mut bob = bob_store.new_replica(namespace.clone())?;
1682
1683 let key = b"key";
1684 let alice_value = b"alice";
1685 let bob_value = b"bob";
1686 let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?;
1687 let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?;
1689 sync(&mut alice, &mut bob).await?;
1690 assert_eq!(
1691 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1692 Some(bob_hash)
1693 );
1694 assert_eq!(
1695 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1696 Some(bob_hash)
1697 );
1698
1699 let mut alice = alice_store.new_replica(namespace.clone())?;
1700 let mut bob = bob_store.new_replica(namespace.clone())?;
1701
1702 let alice_value_2 = b"alice2";
1703 let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?;
1705 let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?;
1706 sync(&mut alice, &mut bob).await?;
1707 assert_eq!(
1708 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1709 Some(alice_hash_2)
1710 );
1711 assert_eq!(
1712 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1713 Some(alice_hash_2)
1714 );
1715 alice_store.flush()?;
1716 bob_store.flush()?;
1717 Ok(())
1718 }
1719
1720 #[tokio::test]
1721 async fn test_future_timestamp() -> Result<()> {
1722 let mut rng = rand::rng();
1723 let mut store = store::Store::memory();
1724 let author = Author::new(&mut rng);
1725 let namespace = NamespaceSecret::new(&mut rng);
1726
1727 let mut replica = store.new_replica(namespace.clone())?;
1728 let key = b"hi";
1729 let t = system_time_now();
1730 let record = Record::from_data(b"1", t);
1731 let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1732 replica
1733 .insert_entry(entry0.clone(), InsertOrigin::Local)
1734 .await?;
1735
1736 assert_eq!(
1737 get_entry(&mut store, namespace.id(), author.id(), key)?,
1738 entry0
1739 );
1740
1741 let mut replica = store.new_replica(namespace.clone())?;
1742 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1743 let record = Record::from_data(b"2", t);
1744 let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1745 replica
1746 .insert_entry(entry1.clone(), InsertOrigin::Local)
1747 .await?;
1748 assert_eq!(
1749 get_entry(&mut store, namespace.id(), author.id(), key)?,
1750 entry1
1751 );
1752
1753 let mut replica = store.new_replica(namespace.clone())?;
1754 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1755 let record = Record::from_data(b"2", t);
1756 let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1757 replica
1758 .insert_entry(entry2.clone(), InsertOrigin::Local)
1759 .await?;
1760 assert_eq!(
1761 get_entry(&mut store, namespace.id(), author.id(), key)?,
1762 entry2
1763 );
1764
1765 let mut replica = store.new_replica(namespace.clone())?;
1766 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1767 let record = Record::from_data(b"2", t);
1768 let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1769 let res = replica.insert_entry(entry3, InsertOrigin::Local).await;
1770 assert!(matches!(
1771 res,
1772 Err(InsertError::Validation(
1773 ValidationFailure::TooFarInTheFuture
1774 ))
1775 ));
1776 assert_eq!(
1777 get_entry(&mut store, namespace.id(), author.id(), key)?,
1778 entry2
1779 );
1780 store.flush()?;
1781 Ok(())
1782 }
1783
1784 #[tokio::test]
1785 async fn test_insert_empty() -> Result<()> {
1786 let mut store = store::Store::memory();
1787 let mut rng = rand::rng();
1788 let alice = Author::new(&mut rng);
1789 let myspace = NamespaceSecret::new(&mut rng);
1790 let mut replica = store.new_replica(myspace.clone())?;
1791 let hash = Hash::new(b"");
1792 let res = replica.insert(b"foo", &alice, hash, 0).await;
1793 assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1794 store.flush()?;
1795 Ok(())
1796 }
1797
1798 #[tokio::test]
1799 async fn test_prefix_delete_memory() -> Result<()> {
1800 let store = store::Store::memory();
1801 test_prefix_delete(store).await?;
1802 Ok(())
1803 }
1804
1805 #[tokio::test]
1806 #[cfg(feature = "fs-store")]
1807 async fn test_prefix_delete_fs() -> Result<()> {
1808 let dbfile = tempfile::NamedTempFile::new()?;
1809 let store = store::fs::Store::persistent(dbfile.path())?;
1810 test_prefix_delete(store).await?;
1811 Ok(())
1812 }
1813
1814 async fn test_prefix_delete(mut store: Store) -> Result<()> {
1815 let mut rng = rand::rng();
1816 let alice = Author::new(&mut rng);
1817 let myspace = NamespaceSecret::new(&mut rng);
1818 let mut replica = store.new_replica(myspace.clone())?;
1819 let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?;
1820 let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?;
1821
1822 assert_eq!(
1824 get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1825 Some(hash1)
1826 );
1827 assert_eq!(
1828 get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1829 Some(hash2)
1830 );
1831
1832 let mut replica = store.new_replica(myspace.clone())?;
1834 let deleted = replica.delete_prefix(b"foo", &alice).await?;
1835 assert_eq!(deleted, 2);
1836 assert_eq!(
1837 store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1838 None
1839 );
1840 assert_eq!(
1841 store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1842 None
1843 );
1844 assert_eq!(
1845 store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1846 None
1847 );
1848 store.flush()?;
1849 Ok(())
1850 }
1851
1852 #[tokio::test]
1853 async fn test_replica_sync_delete_memory() -> Result<()> {
1854 let alice_store = store::Store::memory();
1855 let bob_store = store::Store::memory();
1856
1857 test_replica_sync_delete(alice_store, bob_store).await
1858 }
1859
1860 #[tokio::test]
1861 #[cfg(feature = "fs-store")]
1862 async fn test_replica_sync_delete_fs() -> Result<()> {
1863 let alice_dbfile = tempfile::NamedTempFile::new()?;
1864 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1865 let bob_dbfile = tempfile::NamedTempFile::new()?;
1866 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1867 test_replica_sync_delete(alice_store, bob_store).await
1868 }
1869
1870 async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1871 let alice_set = ["foot"];
1872 let bob_set = ["fool", "foo", "fog"];
1873
1874 let mut rng = rand::rng();
1875 let author = Author::new(&mut rng);
1876 let myspace = NamespaceSecret::new(&mut rng);
1877 let mut alice = alice_store.new_replica(myspace.clone())?;
1878 for el in &alice_set {
1879 alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1880 }
1881
1882 let mut bob = bob_store.new_replica(myspace.clone())?;
1883 for el in &bob_set {
1884 bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1885 }
1886
1887 sync(&mut alice, &mut bob).await?;
1888
1889 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1890 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1891 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1892 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1893
1894 let mut alice = alice_store.new_replica(myspace.clone())?;
1895 let mut bob = bob_store.new_replica(myspace.clone())?;
1896 alice.delete_prefix("foo", &author).await?;
1897 bob.hash_and_insert("fooz", &author, "fooz".as_bytes())
1898 .await?;
1899 sync(&mut alice, &mut bob).await?;
1900 check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1901 check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1902 alice_store.flush()?;
1903 bob_store.flush()?;
1904 Ok(())
1905 }
1906
1907 #[tokio::test]
1908 async fn test_replica_remove_memory() -> Result<()> {
1909 let alice_store = store::Store::memory();
1910 test_replica_remove(alice_store).await
1911 }
1912
1913 #[tokio::test]
1914 #[cfg(feature = "fs-store")]
1915 async fn test_replica_remove_fs() -> Result<()> {
1916 let alice_dbfile = tempfile::NamedTempFile::new()?;
1917 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1918 test_replica_remove(alice_store).await
1919 }
1920
1921 async fn test_replica_remove(mut store: Store) -> Result<()> {
1922 let mut rng = rand::rng();
1923 let namespace = NamespaceSecret::new(&mut rng);
1924 let author = Author::new(&mut rng);
1925 let mut replica = store.new_replica(namespace.clone())?;
1926
1927 let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?;
1929 let res = store
1930 .get_many(namespace.id(), Query::all())?
1931 .collect::<Vec<_>>();
1932 assert_eq!(res.len(), 1);
1933
1934 let res = store.remove_replica(&namespace.id());
1936 assert!(res.is_err());
1938 store.close_replica(namespace.id());
1939 store.remove_replica(&namespace.id())?;
1940 let res = store
1941 .get_many(namespace.id(), Query::all())?
1942 .collect::<Vec<_>>();
1943 assert_eq!(res.len(), 0);
1944
1945 let res = store.load_replica_info(&namespace.id());
1947 assert!(matches!(res, Err(OpenError::NotFound)));
1948
1949 let mut replica = store.new_replica(namespace.clone())?;
1951 replica.insert(b"foo", &author, hash, 3).await?;
1952 let res = store
1953 .get_many(namespace.id(), Query::all())?
1954 .collect::<Vec<_>>();
1955 assert_eq!(res.len(), 1);
1956 store.flush()?;
1957 Ok(())
1958 }
1959
1960 #[tokio::test]
1961 async fn test_replica_delete_edge_cases_memory() -> Result<()> {
1962 let store = store::Store::memory();
1963 test_replica_delete_edge_cases(store).await
1964 }
1965
1966 #[tokio::test]
1967 #[cfg(feature = "fs-store")]
1968 async fn test_replica_delete_edge_cases_fs() -> Result<()> {
1969 let dbfile = tempfile::NamedTempFile::new()?;
1970 let store = store::fs::Store::persistent(dbfile.path())?;
1971 test_replica_delete_edge_cases(store).await
1972 }
1973
1974 async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
1975 let mut rng = rand::rng();
1976 let author = Author::new(&mut rng);
1977 let namespace = NamespaceSecret::new(&mut rng);
1978
1979 let edgecases = [0u8, 1u8, 255u8];
1980 let prefixes = [0u8, 255u8];
1981 let hash = Hash::new(b"foo");
1982 let len = 3;
1983 for prefix in prefixes {
1984 let mut expected = vec![];
1985 let mut replica = store.new_replica(namespace.clone())?;
1986 for suffix in edgecases {
1987 let key = [prefix, suffix].to_vec();
1988 expected.push(key.clone());
1989 replica.insert(&key, &author, hash, len).await?;
1990 }
1991 assert_keys(&mut store, namespace.id(), expected);
1992 let mut replica = store.new_replica(namespace.clone())?;
1993 replica.delete_prefix([prefix], &author).await?;
1994 assert_keys(&mut store, namespace.id(), vec![]);
1995 }
1996
1997 let mut replica = store.new_replica(namespace.clone())?;
1998 let key = vec![1u8, 0u8];
1999 replica.insert(key, &author, hash, len).await?;
2000 let key = vec![1u8, 1u8];
2001 replica.insert(key, &author, hash, len).await?;
2002 let key = vec![1u8, 2u8];
2003 replica.insert(key, &author, hash, len).await?;
2004 let prefix = vec![1u8, 1u8];
2005 replica.delete_prefix(prefix, &author).await?;
2006 assert_keys(
2007 &mut store,
2008 namespace.id(),
2009 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2010 );
2011
2012 let mut replica = store.new_replica(namespace.clone())?;
2013 let key = vec![0u8, 255u8];
2014 replica.insert(key, &author, hash, len).await?;
2015 let key = vec![0u8, 0u8];
2016 replica.insert(key, &author, hash, len).await?;
2017 let prefix = vec![0u8];
2018 replica.delete_prefix(prefix, &author).await?;
2019 assert_keys(
2020 &mut store,
2021 namespace.id(),
2022 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2023 );
2024 store.flush()?;
2025 Ok(())
2026 }
2027
2028 #[tokio::test]
2029 async fn test_latest_iter_memory() -> Result<()> {
2030 let store = store::Store::memory();
2031 test_latest_iter(store).await
2032 }
2033
2034 #[tokio::test]
2035 #[cfg(feature = "fs-store")]
2036 async fn test_latest_iter_fs() -> Result<()> {
2037 let dbfile = tempfile::NamedTempFile::new()?;
2038 let store = store::fs::Store::persistent(dbfile.path())?;
2039 test_latest_iter(store).await
2040 }
2041
2042 async fn test_latest_iter(mut store: Store) -> Result<()> {
2043 let mut rng = rand::rng();
2044 let author0 = Author::new(&mut rng);
2045 let author1 = Author::new(&mut rng);
2046 let namespace = NamespaceSecret::new(&mut rng);
2047 let mut replica = store.new_replica(namespace.clone())?;
2048
2049 replica.hash_and_insert(b"a0.1", &author0, b"hi").await?;
2050 let latest = store
2051 .get_latest_for_each_author(namespace.id())?
2052 .collect::<Result<Vec<_>>>()?;
2053 assert_eq!(latest.len(), 1);
2054 assert_eq!(latest[0].2, b"a0.1".to_vec());
2055
2056 let mut replica = store.new_replica(namespace.clone())?;
2057 replica.hash_and_insert(b"a1.1", &author1, b"hi").await?;
2058 replica.hash_and_insert(b"a0.2", &author0, b"hi").await?;
2059 let latest = store
2060 .get_latest_for_each_author(namespace.id())?
2061 .collect::<Result<Vec<_>>>()?;
2062 let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2063 latest_keys.sort();
2064 assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2065 store.flush()?;
2066 Ok(())
2067 }
2068
2069 #[tokio::test]
2070 async fn test_replica_byte_keys_memory() -> Result<()> {
2071 let store = store::Store::memory();
2072
2073 test_replica_byte_keys(store).await?;
2074 Ok(())
2075 }
2076
2077 #[tokio::test]
2078 #[cfg(feature = "fs-store")]
2079 async fn test_replica_byte_keys_fs() -> Result<()> {
2080 let dbfile = tempfile::NamedTempFile::new()?;
2081 let store = store::fs::Store::persistent(dbfile.path())?;
2082 test_replica_byte_keys(store).await?;
2083
2084 Ok(())
2085 }
2086
2087 async fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2088 let mut rng = rand::rng();
2089 let author = Author::new(&mut rng);
2090 let namespace = NamespaceSecret::new(&mut rng);
2091
2092 let hash = Hash::new(b"foo");
2093 let len = 3;
2094
2095 let key = vec![1u8, 0u8];
2096 let mut replica = store.new_replica(namespace.clone())?;
2097 replica.insert(key, &author, hash, len).await?;
2098 assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2099 let key = vec![1u8, 2u8];
2100 let mut replica = store.new_replica(namespace.clone())?;
2101 replica.insert(key, &author, hash, len).await?;
2102 assert_keys(
2103 &mut store,
2104 namespace.id(),
2105 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2106 );
2107
2108 let key = vec![0u8, 255u8];
2109 let mut replica = store.new_replica(namespace.clone())?;
2110 replica.insert(key, &author, hash, len).await?;
2111 assert_keys(
2112 &mut store,
2113 namespace.id(),
2114 vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2115 );
2116 store.flush()?;
2117 Ok(())
2118 }
2119
2120 #[tokio::test]
2121 async fn test_replica_capability_memory() -> Result<()> {
2122 let store = store::Store::memory();
2123 test_replica_capability(store).await
2124 }
2125
2126 #[tokio::test]
2127 #[cfg(feature = "fs-store")]
2128 async fn test_replica_capability_fs() -> Result<()> {
2129 let dbfile = tempfile::NamedTempFile::new()?;
2130 let store = store::fs::Store::persistent(dbfile.path())?;
2131 test_replica_capability(store).await
2132 }
2133
2134 #[allow(clippy::redundant_pattern_matching)]
2135 async fn test_replica_capability(mut store: Store) -> Result<()> {
2136 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2137 let author = store.new_author(&mut rng)?;
2138 let namespace = NamespaceSecret::new(&mut rng);
2139
2140 let capability = Capability::Read(namespace.id());
2142 store.import_namespace(capability)?;
2143 let mut replica = store.open_replica(&namespace.id())?;
2144 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2145 assert!(matches!(res, Err(InsertError::ReadOnly)));
2146
2147 let capability = Capability::Write(namespace.clone());
2149 store.import_namespace(capability)?;
2150 let mut replica = store.open_replica(&namespace.id())?;
2151 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2152 assert!(matches!(res, Ok(_)));
2153 store.close_replica(namespace.id());
2154 let mut replica = store.open_replica(&namespace.id())?;
2155 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2156 assert!(res.is_ok());
2157
2158 let capability = Capability::Read(namespace.id());
2160 store.import_namespace(capability)?;
2161 store.close_replica(namespace.id());
2162 let mut replica = store.open_replica(&namespace.id())?;
2163 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2164 assert!(res.is_ok());
2165 store.flush()?;
2166 Ok(())
2167 }
2168
2169 #[tokio::test]
2170 async fn test_actor_capability_memory() -> Result<()> {
2171 let store = store::Store::memory();
2172 test_actor_capability(store).await
2173 }
2174
2175 #[tokio::test]
2176 #[cfg(feature = "fs-store")]
2177 async fn test_actor_capability_fs() -> Result<()> {
2178 let dbfile = tempfile::NamedTempFile::new()?;
2179 let store = store::fs::Store::persistent(dbfile.path())?;
2180 test_actor_capability(store).await
2181 }
2182
2183 async fn test_actor_capability(store: Store) -> Result<()> {
2184 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2186 let author = Author::new(&mut rng);
2187 let handle = SyncHandle::spawn(store, None, "test".into());
2188 let author = handle.import_author(author).await?;
2189 let namespace = NamespaceSecret::new(&mut rng);
2190 let id = namespace.id();
2191
2192 let capability = Capability::Read(namespace.id());
2194 handle.import_namespace(capability).await?;
2195 handle.open(namespace.id(), Default::default()).await?;
2196 let res = handle
2197 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2198 .await;
2199 assert!(res.is_err());
2200
2201 let capability = Capability::Write(namespace.clone());
2203 handle.import_namespace(capability).await?;
2204 let res = handle
2205 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2206 .await;
2207 assert!(res.is_ok());
2208
2209 handle.close(namespace.id()).await?;
2211 let res = handle
2212 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2213 .await;
2214 assert!(res.is_err());
2215 handle.open(namespace.id(), Default::default()).await?;
2216 let res = handle
2217 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2218 .await;
2219 assert!(res.is_ok());
2220 Ok(())
2221 }
2222
2223 fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2224 let mut res = vec![];
2225 while let Ok(ev) = events.try_recv() {
2226 res.push(ev);
2227 }
2228 res
2229 }
2230
2231 #[tokio::test]
2234 async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2235 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2236 let mut store1 = store::Store::memory();
2237 let mut store2 = store::Store::memory();
2238 let peer1 = [1u8; 32];
2239 let peer2 = [2u8; 32];
2240 let mut state1 = SyncOutcome::default();
2241 let mut state2 = SyncOutcome::default();
2242
2243 let author = Author::new(&mut rng);
2244 let namespace = NamespaceSecret::new(&mut rng);
2245 let mut replica1 = store1.new_replica(namespace.clone())?;
2246 let mut replica2 = store2.new_replica(namespace.clone())?;
2247
2248 let (events1_sender, events1) = async_channel::bounded(32);
2249 let (events2_sender, events2) = async_channel::bounded(32);
2250
2251 replica1.info.subscribe(events1_sender);
2252 replica2.info.subscribe(events2_sender);
2253
2254 replica1.hash_and_insert(b"foo", &author, b"init").await?;
2255
2256 let from1 = replica1.sync_initial_message()?;
2257 let from2 = replica2
2258 .sync_process_message(from1, peer1, &mut state2)
2259 .await
2260 .unwrap()
2261 .unwrap();
2262 let from1 = replica1
2263 .sync_process_message(from2, peer2, &mut state1)
2264 .await
2265 .unwrap()
2266 .unwrap();
2267 replica2.hash_and_insert(b"foo", &author, b"update").await?;
2271 let from2 = replica2
2272 .sync_process_message(from1, peer1, &mut state2)
2273 .await
2274 .unwrap();
2275 assert!(from2.is_none());
2276 let events1 = drain(events1);
2277 let events2 = drain(events2);
2278 assert_eq!(events1.len(), 1);
2279 assert_eq!(events2.len(), 1);
2280 assert!(matches!(events1[0], Event::LocalInsert { .. }));
2281 assert!(matches!(events2[0], Event::LocalInsert { .. }));
2282 assert_eq!(state1.num_sent, 1);
2283 assert_eq!(state1.num_recv, 0);
2284 assert_eq!(state2.num_sent, 0);
2285 assert_eq!(state2.num_recv, 1);
2286 store1.flush()?;
2287 store2.flush()?;
2288 Ok(())
2289 }
2290
2291 #[tokio::test]
2292 async fn test_replica_queries_mem() -> Result<()> {
2293 let store = store::Store::memory();
2294
2295 test_replica_queries(store).await?;
2296 Ok(())
2297 }
2298
2299 #[tokio::test]
2300 #[cfg(feature = "fs-store")]
2301 async fn test_replica_queries_fs() -> Result<()> {
2302 let dbfile = tempfile::NamedTempFile::new()?;
2303 let store = store::fs::Store::persistent(dbfile.path())?;
2304 test_replica_queries(store).await?;
2305
2306 Ok(())
2307 }
2308
2309 async fn test_replica_queries(mut store: Store) -> Result<()> {
2310 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2311 let namespace = NamespaceSecret::new(&mut rng);
2312 let namespace_id = namespace.id();
2313
2314 let a1 = store.new_author(&mut rng)?;
2315 let a2 = store.new_author(&mut rng)?;
2316 let a3 = store.new_author(&mut rng)?;
2317 println!(
2318 "a1 {} a2 {} a3 {}",
2319 a1.id().fmt_short(),
2320 a2.id().fmt_short(),
2321 a3.id().fmt_short()
2322 );
2323
2324 let mut replica = store.new_replica(namespace.clone())?;
2325 replica.hash_and_insert("hi/world", &a2, "a2").await?;
2326 replica.hash_and_insert("hi/world", &a1, "a1").await?;
2327 replica.hash_and_insert("hi/moon", &a2, "a1").await?;
2328 replica.hash_and_insert("hi", &a3, "a3").await?;
2329
2330 struct QueryTester<'a> {
2331 store: &'a mut Store,
2332 namespace: NamespaceId,
2333 }
2334 impl QueryTester<'_> {
2335 fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2336 let query = query.into();
2337 let actual = self
2338 .store
2339 .get_many(self.namespace, query.clone())
2340 .unwrap()
2341 .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2342 .collect::<Result<Vec<_>>>()
2343 .unwrap();
2344 let expected = expected
2345 .into_iter()
2346 .map(|(key, author)| (key.to_string(), author.id()))
2347 .collect::<Vec<_>>();
2348 assert_eq!(actual, expected, "query: {query:#?}")
2349 }
2350 }
2351
2352 let mut qt = QueryTester {
2353 store: &mut store,
2354 namespace: namespace_id,
2355 };
2356
2357 qt.assert(
2358 Query::all(),
2359 vec![
2360 ("hi/world", &a1),
2361 ("hi/moon", &a2),
2362 ("hi/world", &a2),
2363 ("hi", &a3),
2364 ],
2365 );
2366
2367 qt.assert(
2368 Query::single_latest_per_key(),
2369 vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2370 );
2371
2372 qt.assert(
2373 Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2374 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2375 );
2376
2377 qt.assert(
2378 Query::single_latest_per_key().key_prefix("hi/"),
2379 vec![("hi/moon", &a2), ("hi/world", &a1)],
2380 );
2381
2382 qt.assert(
2383 Query::single_latest_per_key()
2384 .key_prefix("hi/")
2385 .sort_direction(SortDirection::Desc),
2386 vec![("hi/world", &a1), ("hi/moon", &a2)],
2387 );
2388
2389 qt.assert(
2390 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2391 vec![
2392 ("hi", &a3),
2393 ("hi/moon", &a2),
2394 ("hi/world", &a1),
2395 ("hi/world", &a2),
2396 ],
2397 );
2398
2399 qt.assert(
2400 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2401 vec![
2402 ("hi/world", &a2),
2403 ("hi/world", &a1),
2404 ("hi/moon", &a2),
2405 ("hi", &a3),
2406 ],
2407 );
2408
2409 qt.assert(
2410 Query::all().key_prefix("hi/"),
2411 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2412 );
2413
2414 qt.assert(
2415 Query::all().key_prefix("hi/").offset(1).limit(1),
2416 vec![("hi/moon", &a2)],
2417 );
2418
2419 qt.assert(
2420 Query::all()
2421 .key_prefix("hi/")
2422 .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2423 vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2424 );
2425
2426 qt.assert(
2427 Query::all()
2428 .key_prefix("hi/")
2429 .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2430 .offset(1)
2431 .limit(1),
2432 vec![("hi/world", &a1)],
2433 );
2434
2435 qt.assert(
2436 Query::all()
2437 .key_prefix("hi/")
2438 .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2439 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2440 );
2441
2442 qt.assert(
2443 Query::all()
2444 .key_prefix("hi/")
2445 .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2446 vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2447 );
2448
2449 qt.assert(
2450 Query::all()
2451 .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2452 .limit(2)
2453 .offset(1),
2454 vec![("hi/moon", &a2), ("hi/world", &a1)],
2455 );
2456
2457 let mut replica = store.new_replica(namespace)?;
2458 replica.delete_prefix("hi/world", &a2).await?;
2459 let mut qt = QueryTester {
2460 store: &mut store,
2461 namespace: namespace_id,
2462 };
2463
2464 qt.assert(
2465 Query::all(),
2466 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2467 );
2468
2469 qt.assert(
2470 Query::all().include_empty(),
2471 vec![
2472 ("hi/world", &a1),
2473 ("hi/moon", &a2),
2474 ("hi/world", &a2),
2475 ("hi", &a3),
2476 ],
2477 );
2478 store.flush()?;
2479 Ok(())
2480 }
2481
2482 #[test]
2483 fn test_dl_policies_mem() -> Result<()> {
2484 let mut store = store::Store::memory();
2485 test_dl_policies(&mut store)
2486 }
2487
2488 #[test]
2489 #[cfg(feature = "fs-store")]
2490 fn test_dl_policies_fs() -> Result<()> {
2491 let dbfile = tempfile::NamedTempFile::new()?;
2492 let mut store = store::fs::Store::persistent(dbfile.path())?;
2493 test_dl_policies(&mut store)
2494 }
2495
2496 fn test_dl_policies(store: &mut Store) -> Result<()> {
2497 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2498 let namespace = NamespaceSecret::new(&mut rng);
2499 let id = namespace.id();
2500
2501 let filter = store::FilterKind::Exact("foo".into());
2502 let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2503 store
2504 .set_download_policy(&id, policy.clone())
2505 .expect_err("document dos not exist");
2506
2507 store.new_replica(namespace)?;
2509
2510 store.set_download_policy(&id, policy.clone())?;
2511 let retrieved_policy = store.get_download_policy(&id)?;
2512 assert_eq!(retrieved_policy, policy);
2513 store.flush()?;
2514 Ok(())
2515 }
2516
2517 fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2518 expected.sort();
2519 assert_eq!(expected, get_keys_sorted(store, namespace));
2520 }
2521
2522 fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2523 let mut res = store
2524 .get_many(namespace, Query::all())
2525 .unwrap()
2526 .map(|e| e.map(|e| e.key().to_vec()))
2527 .collect::<Result<Vec<_>>>()
2528 .unwrap();
2529 res.sort();
2530 res
2531 }
2532
2533 fn get_entry(
2534 store: &mut Store,
2535 namespace: NamespaceId,
2536 author: AuthorId,
2537 key: &[u8],
2538 ) -> anyhow::Result<SignedEntry> {
2539 let entry = store
2540 .get_exact(namespace, author, key, true)?
2541 .ok_or_else(|| anyhow::anyhow!("not found"))?;
2542 Ok(entry)
2543 }
2544
2545 fn get_content_hash(
2546 store: &mut Store,
2547 namespace: NamespaceId,
2548 author: AuthorId,
2549 key: &[u8],
2550 ) -> anyhow::Result<Option<Hash>> {
2551 let hash = store
2552 .get_exact(namespace, author, key, false)?
2553 .map(|e| e.content_hash());
2554 Ok(hash)
2555 }
2556
2557 async fn sync<'a>(
2558 alice: &'a mut Replica<'a>,
2559 bob: &'a mut Replica<'a>,
2560 ) -> Result<(SyncOutcome, SyncOutcome)> {
2561 let alice_peer_id = [1u8; 32];
2562 let bob_peer_id = [2u8; 32];
2563 let mut alice_state = SyncOutcome::default();
2564 let mut bob_state = SyncOutcome::default();
2565 let mut next_to_bob = Some(alice.sync_initial_message()?);
2567 let mut rounds = 0;
2568 while let Some(msg) = next_to_bob.take() {
2569 assert!(rounds < 100, "too many rounds");
2570 rounds += 1;
2571 println!("round {rounds}");
2572 if let Some(msg) = bob
2573 .sync_process_message(msg, alice_peer_id, &mut bob_state)
2574 .await?
2575 {
2576 next_to_bob = alice
2577 .sync_process_message(msg, bob_peer_id, &mut alice_state)
2578 .await?
2579 }
2580 }
2581 assert_eq!(alice_state.num_sent, bob_state.num_recv);
2582 assert_eq!(alice_state.num_recv, bob_state.num_sent);
2583 Ok((alice_state, bob_state))
2584 }
2585
2586 fn check_entries(
2587 store: &mut Store,
2588 namespace: &NamespaceId,
2589 author: &Author,
2590 set: &[&str],
2591 ) -> Result<()> {
2592 for el in set {
2593 store.get_exact(*namespace, author.id(), el, false)?;
2594 }
2595 Ok(())
2596 }
2597}