1use std::{
10 cmp::Ordering,
11 fmt::Debug,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14 time::Duration,
15};
16
17use bytes::{Bytes, BytesMut};
18use ed25519_dalek::{Signature, SignatureError};
19use iroh_blobs::Hash;
20use n0_future::{time::SystemTime, IterExt};
21use serde::{Deserialize, Serialize};
22
23pub use crate::heads::AuthorHeads;
24use crate::{
25 keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
26 ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
27 store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
28};
29
30pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
34
35pub type PeerIdBytes = [u8; 32];
38
39pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
42
43pub type ContentStatusCallback =
45 Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
46
47#[derive(Debug, Clone)]
49pub enum Event {
50 LocalInsert {
52 namespace: NamespaceId,
54 entry: SignedEntry,
56 },
57 RemoteInsert {
59 namespace: NamespaceId,
61 entry: SignedEntry,
63 from: PeerIdBytes,
65 should_download: bool,
67 remote_content_status: ContentStatus,
69 },
70}
71
72#[derive(Debug, Clone)]
74pub enum InsertOrigin {
75 Local,
77 Sync {
79 from: PeerIdBytes,
81 remote_content_status: ContentStatus,
83 },
84}
85
86#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
88pub enum ContentStatus {
89 Complete,
91 Incomplete,
93 Missing,
95}
96
97#[derive(Debug, Clone, Default)]
99pub struct SyncOutcome {
100 pub heads_received: AuthorHeads,
102 pub num_recv: usize,
104 pub num_sent: usize,
106}
107
108fn get_as_ptr<T>(value: &T) -> Option<usize> {
109 use std::mem;
110 if mem::size_of::<T>() == std::mem::size_of::<usize>()
111 && mem::align_of::<T>() == mem::align_of::<usize>()
112 {
113 unsafe { Some(mem::transmute_copy(value)) }
115 } else {
116 None
117 }
118}
119
120fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
121 get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
122}
123
124#[derive(Debug, Default)]
125struct Subscribers(Vec<async_channel::Sender<Event>>);
126impl Subscribers {
127 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
128 self.0.push(sender)
129 }
130 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
131 self.0.retain(|s| !same_channel(s, sender));
132 }
133 pub async fn send(&mut self, event: Event) {
134 self.0 = std::mem::take(&mut self.0)
135 .into_iter()
136 .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx))
137 .join_all()
138 .await
139 .into_iter()
140 .flatten()
141 .collect();
142 }
143 pub fn len(&self) -> usize {
144 self.0.len()
145 }
146 pub async fn send_with(&mut self, f: impl FnOnce() -> Event) {
147 if !self.0.is_empty() {
148 self.send(f()).await
149 }
150 }
151}
152
153#[derive(
155 Debug,
156 Clone,
157 Copy,
158 Serialize,
159 Deserialize,
160 num_enum::IntoPrimitive,
161 num_enum::TryFromPrimitive,
162 strum::Display,
163)]
164#[repr(u8)]
165#[strum(serialize_all = "snake_case")]
166pub enum CapabilityKind {
167 Write = 1,
169 Read = 2,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
175pub enum Capability {
176 Write(NamespaceSecret),
178 Read(NamespaceId),
180}
181
182impl Capability {
183 pub fn id(&self) -> NamespaceId {
185 match self {
186 Capability::Write(secret) => secret.id(),
187 Capability::Read(id) => *id,
188 }
189 }
190
191 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
194 match self {
195 Capability::Write(secret) => Ok(secret),
196 Capability::Read(_) => Err(ReadOnly),
197 }
198 }
199
200 pub fn kind(&self) -> CapabilityKind {
202 match self {
203 Capability::Write(_) => CapabilityKind::Write,
204 Capability::Read(_) => CapabilityKind::Read,
205 }
206 }
207
208 pub fn raw(&self) -> (u8, [u8; 32]) {
210 let capability_repr: u8 = self.kind().into();
211 let bytes = match self {
212 Capability::Write(secret) => secret.to_bytes(),
213 Capability::Read(id) => id.to_bytes(),
214 };
215 (capability_repr, bytes)
216 }
217
218 pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
220 let kind: CapabilityKind = kind.try_into()?;
221 let capability = match kind {
222 CapabilityKind::Write => {
223 let secret = NamespaceSecret::from_bytes(bytes);
224 Capability::Write(secret)
225 }
226 CapabilityKind::Read => {
227 let id = NamespaceId::from(bytes);
228 Capability::Read(id)
229 }
230 };
231 Ok(capability)
232 }
233
234 pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
240 if other.id() != self.id() {
241 return Err(CapabilityError::NamespaceMismatch);
242 }
243
244 if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
246 let _ = std::mem::replace(self, other);
247 Ok(true)
248 } else {
249 Ok(false)
250 }
251 }
252}
253
254#[derive(Debug, thiserror::Error)]
256pub enum CapabilityError {
257 #[error("Namespaces are not the same")]
259 NamespaceMismatch,
260}
261
262#[derive(derive_more::Debug)]
264pub struct ReplicaInfo {
265 pub(crate) capability: Capability,
266 subscribers: Subscribers,
267 #[debug("ContentStatusCallback")]
268 content_status_cb: Option<ContentStatusCallback>,
269 closed: bool,
270}
271
272impl ReplicaInfo {
273 pub fn new(capability: Capability) -> Self {
275 Self {
276 capability,
277 subscribers: Default::default(),
278 content_status_cb: None,
280 closed: false,
281 }
282 }
283
284 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
290 self.subscribers.subscribe(sender)
291 }
292
293 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
299 self.subscribers.unsubscribe(sender)
300 }
301
302 pub fn subscribers_count(&self) -> usize {
304 self.subscribers.len()
305 }
306
307 pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
312 if self.content_status_cb.is_some() {
313 false
314 } else {
315 self.content_status_cb = Some(cb);
316 true
317 }
318 }
319
320 fn ensure_open(&self) -> Result<(), InsertError> {
321 if self.closed() {
322 Err(InsertError::Closed)
323 } else {
324 Ok(())
325 }
326 }
327
328 pub fn closed(&self) -> bool {
334 self.closed
335 }
336
337 pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
344 self.capability.merge(capability)
345 }
346}
347
348#[derive(derive_more::Debug)]
350pub struct Replica<'a, I = Box<ReplicaInfo>> {
351 pub(crate) store: StoreInstance<'a>,
352 pub(crate) info: I,
353}
354
355impl<'a, I> Replica<'a, I>
356where
357 I: Deref<Target = ReplicaInfo> + DerefMut,
358{
359 pub fn new(store: StoreInstance<'a>, info: I) -> Self {
361 Replica { info, store }
362 }
363
364 pub async fn insert(
372 &mut self,
373 key: impl AsRef<[u8]>,
374 author: &Author,
375 hash: Hash,
376 len: u64,
377 ) -> Result<usize, InsertError> {
378 if len == 0 || hash == Hash::EMPTY {
379 return Err(InsertError::EntryIsEmpty);
380 }
381 self.info.ensure_open()?;
382 let id = RecordIdentifier::new(self.id(), author.id(), key);
383 let record = Record::new_current(hash, len);
384 let entry = Entry::new(id, record);
385 let secret = self.secret_key()?;
386 let signed_entry = entry.sign(secret, author);
387 self.insert_entry(signed_entry, InsertOrigin::Local).await
388 }
389
390 pub async fn delete_prefix(
397 &mut self,
398 prefix: impl AsRef<[u8]>,
399 author: &Author,
400 ) -> Result<usize, InsertError> {
401 self.info.ensure_open()?;
402 let id = RecordIdentifier::new(self.id(), author.id(), prefix);
403 let entry = Entry::new_empty(id);
404 let signed_entry = entry.sign(self.secret_key()?, author);
405 self.insert_entry(signed_entry, InsertOrigin::Local).await
406 }
407
408 pub async fn insert_remote_entry(
416 &mut self,
417 entry: SignedEntry,
418 received_from: PeerIdBytes,
419 content_status: ContentStatus,
420 ) -> Result<usize, InsertError> {
421 self.info.ensure_open()?;
422 entry.validate_empty()?;
423 let origin = InsertOrigin::Sync {
424 from: received_from,
425 remote_content_status: content_status,
426 };
427 self.insert_entry(entry, origin).await
428 }
429
430 async fn insert_entry(
434 &mut self,
435 entry: SignedEntry,
436 origin: InsertOrigin,
437 ) -> Result<usize, InsertError> {
438 let namespace = self.id();
439
440 let store = &self.store;
441 validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
442
443 let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
444 tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
445
446 let removed_count = match outcome {
447 InsertOutcome::Inserted { removed } => removed,
448 InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
449 };
450
451 let insert_event = match origin {
452 InsertOrigin::Local => Event::LocalInsert { namespace, entry },
453 InsertOrigin::Sync {
454 from,
455 remote_content_status,
456 } => {
457 let download_policy = self
458 .store
459 .get_download_policy(&self.id())
460 .unwrap_or_default();
461 let should_download = download_policy.matches(entry.entry());
462 Event::RemoteInsert {
463 namespace,
464 entry,
465 from,
466 should_download,
467 remote_content_status,
468 }
469 }
470 };
471
472 self.info.subscribers.send(insert_event).await;
473
474 Ok(removed_count)
475 }
476
477 pub async fn hash_and_insert(
482 &mut self,
483 key: impl AsRef<[u8]>,
484 author: &Author,
485 data: impl AsRef<[u8]>,
486 ) -> Result<Hash, InsertError> {
487 self.info.ensure_open()?;
488 let len = data.as_ref().len() as u64;
489 let hash = Hash::new(data);
490 self.insert(key, author, hash, len).await?;
491 Ok(hash)
492 }
493
494 pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
496 RecordIdentifier::new(self.info.capability.id(), author.id(), key)
497 }
498
499 pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
501 self.info.ensure_open().map_err(anyhow::Error::from)?;
502 self.store.initial_message()
503 }
504
505 pub async fn sync_process_message(
509 &mut self,
510 message: crate::ranger::Message<SignedEntry>,
511 from_peer: PeerIdBytes,
512 state: &mut SyncOutcome,
513 ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
514 self.info.ensure_open()?;
515 let my_namespace = self.id();
516 let now = system_time_now();
517
518 state.num_recv += message.value_count();
520 for (entry, _content_status) in message.values() {
521 state
522 .heads_received
523 .insert(entry.author(), entry.timestamp());
524 }
525
526 let cb = self.info.content_status_cb.clone();
529 let download_policy = self
530 .store
531 .get_download_policy(&my_namespace)
532 .unwrap_or_default();
533 let reply = self
534 .store
535 .process_message(
536 &Default::default(),
537 message,
538 |store, entry, content_status| {
540 let origin = InsertOrigin::Sync {
541 from: from_peer,
542 remote_content_status: content_status,
543 };
544 validate_entry(now, store, my_namespace, entry, &origin).is_ok()
545 },
546 async |_store, entry, content_status| {
548 self.info
550 .subscribers
551 .send_with(|| {
552 let should_download = download_policy.matches(entry.entry());
553 Event::RemoteInsert {
554 from: from_peer,
555 namespace: my_namespace,
556 entry: entry.clone(),
557 should_download,
558 remote_content_status: content_status,
559 }
560 })
561 .await
562 },
563 async move |entry| {
565 if let Some(cb) = cb.as_ref() {
566 cb(entry.content_hash()).await
567 } else {
568 ContentStatus::Missing
569 }
570 },
571 )
572 .await?;
573
574 if let Some(ref reply) = reply {
576 state.num_sent += reply.value_count();
577 }
578
579 Ok(reply)
580 }
581
582 pub fn id(&self) -> NamespaceId {
584 self.info.capability.id()
585 }
586
587 pub fn capability(&self) -> &Capability {
589 &self.info.capability
590 }
591
592 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
595 self.info.capability.secret_key()
596 }
597}
598
599#[derive(Debug, thiserror::Error)]
601#[error("Replica allows read access only.")]
602pub struct ReadOnly;
603
604fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
612 now: u64,
613 store: &S,
614 expected_namespace: NamespaceId,
615 entry: &SignedEntry,
616 origin: &InsertOrigin,
617) -> Result<(), ValidationFailure> {
618 if entry.namespace() != expected_namespace {
620 return Err(ValidationFailure::InvalidNamespace);
621 }
622
623 if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
625 return Err(ValidationFailure::BadSignature);
626 }
627
628 if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
630 return Err(ValidationFailure::TooFarInTheFuture);
631 }
632 Ok(())
633}
634
635#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
637pub enum InsertError {
638 #[error("storage error")]
640 Store(anyhow::Error),
641 #[error("validation failure")]
643 Validation(#[from] ValidationFailure),
644 #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
646 NewerEntryExists,
647 #[error("Attempted to insert an empty entry")]
649 EntryIsEmpty,
650 #[error("Attempted to insert to read only replica")]
652 #[from(ReadOnly)]
653 ReadOnly,
654 #[error("replica is closed")]
656 Closed,
657}
658
659#[derive(thiserror::Error, Debug)]
661pub enum ValidationFailure {
662 #[error("Entry namespace does not match the current replica")]
664 InvalidNamespace,
665 #[error("Entry signature is invalid")]
667 BadSignature,
668 #[error("Entry timestamp is too far in the future.")]
670 TooFarInTheFuture,
671 #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
673 InvalidEmptyEntry,
674}
675
676#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
678pub struct SignedEntry {
679 signature: EntrySignature,
680 entry: Entry,
681}
682
683impl From<SignedEntry> for Entry {
684 fn from(value: SignedEntry) -> Self {
685 value.entry
686 }
687}
688
689impl PartialOrd for SignedEntry {
690 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
691 Some(self.cmp(other))
692 }
693}
694
695impl Ord for SignedEntry {
696 fn cmp(&self, other: &Self) -> Ordering {
697 self.entry.cmp(&other.entry)
698 }
699}
700
701impl PartialOrd for Entry {
702 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
703 Some(self.cmp(other))
704 }
705}
706
707impl Ord for Entry {
708 fn cmp(&self, other: &Self) -> Ordering {
709 self.id
710 .cmp(&other.id)
711 .then_with(|| self.record.cmp(&other.record))
712 }
713}
714
715impl SignedEntry {
716 pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
717 SignedEntry { signature, entry }
718 }
719
720 pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
722 let signature = EntrySignature::from_entry(&entry, namespace, author);
723 SignedEntry { signature, entry }
724 }
725
726 pub fn from_parts(
728 namespace: &NamespaceSecret,
729 author: &Author,
730 key: impl AsRef<[u8]>,
731 record: Record,
732 ) -> Self {
733 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
734 let entry = Entry::new(id, record);
735 Self::from_entry(entry, namespace, author)
736 }
737
738 pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
740 self.signature.verify(
741 &self.entry,
742 &self.entry.namespace().public_key(store)?,
743 &self.entry.author().public_key(store)?,
744 )
745 }
746
747 pub fn signature(&self) -> &EntrySignature {
749 &self.signature
750 }
751
752 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
754 self.entry().validate_empty()
755 }
756
757 pub fn entry(&self) -> &Entry {
759 &self.entry
760 }
761
762 pub fn content_hash(&self) -> Hash {
764 self.entry().content_hash()
765 }
766
767 pub fn content_len(&self) -> u64 {
769 self.entry().content_len()
770 }
771
772 pub fn author_bytes(&self) -> AuthorId {
774 self.entry().id().author()
775 }
776
777 pub fn key(&self) -> &[u8] {
779 self.entry().id().key()
780 }
781
782 pub fn timestamp(&self) -> u64 {
784 self.entry().timestamp()
785 }
786}
787
788impl RangeEntry for SignedEntry {
789 type Key = RecordIdentifier;
790 type Value = Record;
791
792 fn key(&self) -> &Self::Key {
793 &self.entry.id
794 }
795
796 fn value(&self) -> &Self::Value {
797 &self.entry.record
798 }
799
800 fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
801 let mut hasher = blake3::Hasher::new();
802 hasher.update(self.namespace().as_ref());
803 hasher.update(self.author_bytes().as_ref());
804 hasher.update(self.key());
805 hasher.update(&self.timestamp().to_be_bytes());
806 hasher.update(self.content_hash().as_bytes());
807 Fingerprint(hasher.finalize().into())
808 }
809}
810
811#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
813pub struct EntrySignature {
814 author_signature: Signature,
815 namespace_signature: Signature,
816}
817
818impl Debug for EntrySignature {
819 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
820 f.debug_struct("EntrySignature")
821 .field(
822 "namespace_signature",
823 &hex::encode(self.namespace_signature.to_bytes()),
824 )
825 .field(
826 "author_signature",
827 &hex::encode(self.author_signature.to_bytes()),
828 )
829 .finish()
830 }
831}
832
833impl EntrySignature {
834 pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
836 let bytes = entry.to_vec();
839 let namespace_signature = namespace.sign(&bytes);
840 let author_signature = author.sign(&bytes);
841
842 EntrySignature {
843 author_signature,
844 namespace_signature,
845 }
846 }
847
848 pub fn verify(
851 &self,
852 entry: &Entry,
853 namespace: &NamespacePublicKey,
854 author: &AuthorPublicKey,
855 ) -> Result<(), SignatureError> {
856 let bytes = entry.to_vec();
857 namespace.verify(&bytes, &self.namespace_signature)?;
858 author.verify(&bytes, &self.author_signature)?;
859
860 Ok(())
861 }
862
863 pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
864 let namespace_signature = Signature::from_bytes(namespace_sig);
865 let author_signature = Signature::from_bytes(author_sig);
866
867 EntrySignature {
868 author_signature,
869 namespace_signature,
870 }
871 }
872
873 pub(crate) fn author(&self) -> &Signature {
874 &self.author_signature
875 }
876
877 pub(crate) fn namespace(&self) -> &Signature {
878 &self.namespace_signature
879 }
880}
881
882#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
888pub struct Entry {
889 id: RecordIdentifier,
890 record: Record,
891}
892
893impl Entry {
894 pub fn new(id: RecordIdentifier, record: Record) -> Self {
896 Entry { id, record }
897 }
898
899 pub fn new_empty(id: RecordIdentifier) -> Self {
901 Entry {
902 id,
903 record: Record::empty_current(),
904 }
905 }
906
907 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
909 match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
910 (true, true) => Ok(()),
911 (false, false) => Ok(()),
912 (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
913 (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
914 }
915 }
916
917 pub fn id(&self) -> &RecordIdentifier {
919 &self.id
920 }
921
922 pub fn namespace(&self) -> NamespaceId {
924 self.id.namespace()
925 }
926
927 pub fn author(&self) -> AuthorId {
929 self.id.author()
930 }
931
932 pub fn key(&self) -> &[u8] {
934 self.id.key()
935 }
936
937 pub fn record(&self) -> &Record {
939 &self.record
940 }
941
942 pub fn content_hash(&self) -> Hash {
944 self.record.hash
945 }
946
947 pub fn content_len(&self) -> u64 {
949 self.record.len
950 }
951
952 pub fn timestamp(&self) -> u64 {
954 self.record.timestamp
955 }
956
957 pub fn encode(&self, out: &mut Vec<u8>) {
959 self.id.encode(out);
960 self.record.encode(out);
961 }
962
963 pub fn to_vec(&self) -> Vec<u8> {
965 let mut out = Vec::new();
966 self.encode(&mut out);
967 out
968 }
969
970 pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
972 SignedEntry::from_entry(self, namespace, author)
973 }
974}
975
976const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
977const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
978const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
979
980#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
982pub struct RecordIdentifier(Bytes);
983
984impl Default for RecordIdentifier {
985 fn default() -> Self {
986 Self::new(NamespaceId::default(), AuthorId::default(), b"")
987 }
988}
989
990impl Debug for RecordIdentifier {
991 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
992 f.debug_struct("RecordIdentifier")
993 .field("namespace", &self.namespace())
994 .field("author", &self.author())
995 .field("key", &std::string::String::from_utf8_lossy(self.key()))
996 .finish()
997 }
998}
999
1000impl RangeKey for RecordIdentifier {
1001 #[cfg(test)]
1002 fn is_prefix_of(&self, other: &Self) -> bool {
1003 other.as_ref().starts_with(self.as_ref())
1004 }
1005}
1006
1007fn system_time_now() -> u64 {
1008 SystemTime::now()
1009 .duration_since(SystemTime::UNIX_EPOCH)
1010 .expect("time drift")
1011 .as_micros() as u64
1012}
1013
1014impl RecordIdentifier {
1015 pub fn new(
1017 namespace: impl Into<NamespaceId>,
1018 author: impl Into<AuthorId>,
1019 key: impl AsRef<[u8]>,
1020 ) -> Self {
1021 let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1022 bytes.extend_from_slice(namespace.into().as_bytes());
1023 bytes.extend_from_slice(author.into().as_bytes());
1024 bytes.extend_from_slice(key.as_ref());
1025 Self(bytes.freeze())
1026 }
1027
1028 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1030 out.extend_from_slice(&self.0);
1031 }
1032
1033 pub fn as_bytes(&self) -> Bytes {
1035 self.0.clone()
1036 }
1037
1038 pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1040 (
1041 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1042 self.0[AUTHOR_BYTES].try_into().unwrap(),
1043 &self.0[KEY_BYTES],
1044 )
1045 }
1046
1047 pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1049 (
1050 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1051 self.0[AUTHOR_BYTES].try_into().unwrap(),
1052 self.0.slice(KEY_BYTES),
1053 )
1054 }
1055
1056 pub fn key(&self) -> &[u8] {
1058 &self.0[KEY_BYTES]
1059 }
1060
1061 pub fn key_bytes(&self) -> Bytes {
1063 self.0.slice(KEY_BYTES)
1064 }
1065
1066 pub fn namespace(&self) -> NamespaceId {
1068 let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1069 value.into()
1070 }
1071
1072 pub fn author(&self) -> AuthorId {
1074 let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1075 value.into()
1076 }
1077}
1078
1079impl AsRef<[u8]> for RecordIdentifier {
1080 fn as_ref(&self) -> &[u8] {
1081 &self.0
1082 }
1083}
1084
1085impl Deref for SignedEntry {
1086 type Target = Entry;
1087 fn deref(&self) -> &Self::Target {
1088 &self.entry
1089 }
1090}
1091
1092impl Deref for Entry {
1093 type Target = Record;
1094 fn deref(&self) -> &Self::Target {
1095 &self.record
1096 }
1097}
1098
1099#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1101pub struct Record {
1102 len: u64,
1104 hash: Hash,
1106 timestamp: u64,
1108}
1109
1110impl RangeValue for Record {}
1111
1112impl Ord for Record {
1116 fn cmp(&self, other: &Self) -> Ordering {
1117 self.timestamp
1118 .cmp(&other.timestamp)
1119 .then_with(|| self.hash.cmp(&other.hash))
1120 }
1121}
1122
1123impl PartialOrd for Record {
1124 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1125 Some(self.cmp(other))
1126 }
1127}
1128
1129impl Record {
1130 pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1132 debug_assert!(
1133 len != 0 || hash == Hash::EMPTY,
1134 "if `len` is 0 then `hash` must be the hash of the empty byte range"
1135 );
1136 Record {
1137 hash,
1138 len,
1139 timestamp,
1140 }
1141 }
1142
1143 pub fn empty(timestamp: u64) -> Self {
1145 Self::new(Hash::EMPTY, 0, timestamp)
1146 }
1147
1148 pub fn empty_current() -> Self {
1150 Self::new_current(Hash::EMPTY, 0)
1151 }
1152
1153 pub fn is_empty(&self) -> bool {
1155 self.hash == Hash::EMPTY
1156 }
1157
1158 pub fn new_current(hash: Hash, len: u64) -> Self {
1160 let timestamp = system_time_now();
1161 Self::new(hash, len, timestamp)
1162 }
1163
1164 pub fn content_len(&self) -> u64 {
1166 self.len
1167 }
1168
1169 pub fn content_hash(&self) -> Hash {
1171 self.hash
1172 }
1173
1174 pub fn timestamp(&self) -> u64 {
1176 self.timestamp
1177 }
1178
1179 #[cfg(test)]
1180 pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1181 let len = data.as_ref().len() as u64;
1182 let hash = Hash::new(data);
1183 Self::new_current(hash, len)
1184 }
1185
1186 #[cfg(test)]
1187 pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1188 let len = data.as_ref().len() as u64;
1189 let hash = Hash::new(data);
1190 Self::new(hash, len, timestamp)
1191 }
1192
1193 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1195 out.extend_from_slice(&self.len.to_be_bytes());
1196 out.extend_from_slice(self.hash.as_ref());
1197 out.extend_from_slice(&self.timestamp.to_be_bytes())
1198 }
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203 use std::collections::HashSet;
1204
1205 use anyhow::Result;
1206 use rand::SeedableRng;
1207
1208 use super::*;
1209 use crate::{
1210 actor::SyncHandle,
1211 ranger::{Range, Store as _},
1212 store::{OpenError, Query, SortBy, SortDirection, Store},
1213 };
1214
1215 #[tokio::test]
1216 async fn test_basics_memory() -> Result<()> {
1217 let store = store::Store::memory();
1218 test_basics(store).await?;
1219
1220 Ok(())
1221 }
1222
1223 #[tokio::test]
1224 #[cfg(feature = "fs-store")]
1225 async fn test_basics_fs() -> Result<()> {
1226 let dbfile = tempfile::NamedTempFile::new()?;
1227 let store = store::fs::Store::persistent(dbfile.path())?;
1228 test_basics(store).await?;
1229 Ok(())
1230 }
1231
1232 async fn test_basics(mut store: Store) -> Result<()> {
1233 let mut rng = rand::rng();
1234 let alice = Author::new(&mut rng);
1235 let bob = Author::new(&mut rng);
1236 let myspace = NamespaceSecret::new(&mut rng);
1237
1238 let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1239 let record = Record::current_from_data(b"this is my cool data");
1240 let entry = Entry::new(record_id, record);
1241 let signed_entry = entry.sign(&myspace, &alice);
1242 signed_entry.verify(&()).expect("failed to verify");
1243
1244 let mut my_replica = store.new_replica(myspace.clone())?;
1245 for i in 0..10 {
1246 my_replica
1247 .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice"))
1248 .await?;
1249 }
1250
1251 for i in 0..10 {
1252 let res = store
1253 .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1254 .unwrap();
1255 let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1256 assert_eq!(res.entry().record().content_len(), len);
1257 res.verify(&())?;
1258 }
1259
1260 let mut my_replica = store.new_replica(myspace.clone())?;
1262 my_replica
1263 .hash_and_insert("/cool/path", &alice, "round 1")
1264 .await?;
1265 let _entry = store
1266 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1267 .unwrap();
1268 let mut my_replica = store.new_replica(myspace.clone())?;
1270 my_replica
1271 .hash_and_insert("/cool/path", &alice, "round 2")
1272 .await?;
1273 let _entry = store
1274 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1275 .unwrap();
1276
1277 let entries: Vec<_> = store
1279 .get_many(myspace.id(), Query::author(alice.id()))?
1280 .collect::<Result<_>>()?;
1281 assert_eq!(entries.len(), 11);
1282
1283 let entries: Vec<_> = store
1285 .get_many(myspace.id(), Query::author(bob.id()))?
1286 .collect::<Result<_>>()?;
1287 assert_eq!(entries.len(), 0);
1288
1289 let entries: Vec<_> = store
1291 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1292 .collect::<Result<_>>()?;
1293 assert_eq!(entries.len(), 1);
1294
1295 let entries: Vec<_> = store
1297 .get_many(myspace.id(), Query::all())?
1298 .collect::<Result<_>>()?;
1299 assert_eq!(entries.len(), 11);
1300
1301 let mut my_replica = store.new_replica(myspace.clone())?;
1303 let _entry = my_replica
1304 .hash_and_insert("/cool/path", &bob, "bob round 1")
1305 .await?;
1306
1307 let entries: Vec<_> = store
1309 .get_many(myspace.id(), Query::author(alice.id()))?
1310 .collect::<Result<_>>()?;
1311 assert_eq!(entries.len(), 11);
1312
1313 let entries: Vec<_> = store
1314 .get_many(myspace.id(), Query::author(bob.id()))?
1315 .collect::<Result<_>>()?;
1316 assert_eq!(entries.len(), 1);
1317
1318 let entries: Vec<_> = store
1320 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1321 .collect::<Result<_>>()?;
1322 assert_eq!(entries.len(), 2);
1323
1324 let entries: Vec<_> = store
1326 .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1327 .collect::<Result<_>>()?;
1328 assert_eq!(entries.len(), 2);
1329
1330 let entries: Vec<_> = store
1332 .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1333 .collect::<Result<_>>()?;
1334 assert_eq!(entries.len(), 1);
1335
1336 let entries: Vec<_> = store
1337 .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1338 .collect::<Result<_>>()?;
1339 assert_eq!(entries.len(), 1);
1340
1341 let entries: Vec<_> = store
1343 .get_many(myspace.id(), Query::all())?
1344 .collect::<Result<_>>()?;
1345 assert_eq!(entries.len(), 12);
1346
1347 let mut my_replica = store.new_replica(myspace.clone())?;
1349 let entries_second: Vec<_> = my_replica
1350 .store
1351 .get_range(Range::new(
1352 RecordIdentifier::default(),
1353 RecordIdentifier::default(),
1354 ))?
1355 .collect::<Result<_, _>>()?;
1356
1357 assert_eq!(entries_second.len(), 12);
1358 assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1359
1360 test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1361 store.flush()?;
1362 Ok(())
1363 }
1364
1365 fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1368 #[track_caller]
1370 fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1371 assert_eq!(
1372 expected_peers,
1373 &store
1374 .get_sync_peers(&namespace)
1375 .unwrap()
1376 .unwrap()
1377 .collect::<Vec<_>>(),
1378 "sync peers differ"
1379 );
1380 }
1381
1382 let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1383 let mut expected_peers = Vec::with_capacity(count);
1385 for i in 0..count as u8 {
1386 let peer = [i; 32];
1387 expected_peers.insert(0, peer);
1388 store.register_useful_peer(namespace, peer)?;
1389 }
1390 verify_peers(store, namespace, &expected_peers);
1391
1392 expected_peers.pop();
1394 let newer_peer = [count as u8; 32];
1395 expected_peers.insert(0, newer_peer);
1396 store.register_useful_peer(namespace, newer_peer)?;
1397 verify_peers(store, namespace, &expected_peers);
1398
1399 let refreshed_peer = expected_peers.remove(2);
1401 expected_peers.insert(0, refreshed_peer);
1402 store.register_useful_peer(namespace, refreshed_peer)?;
1403 verify_peers(store, namespace, &expected_peers);
1404 Ok(())
1405 }
1406
1407 #[tokio::test]
1408 async fn test_content_hashes_iterator_memory() -> Result<()> {
1409 let store = store::Store::memory();
1410 test_content_hashes_iterator(store).await
1411 }
1412
1413 #[tokio::test]
1414 #[cfg(feature = "fs-store")]
1415 async fn test_content_hashes_iterator_fs() -> Result<()> {
1416 let dbfile = tempfile::NamedTempFile::new()?;
1417 let store = store::fs::Store::persistent(dbfile.path())?;
1418 test_content_hashes_iterator(store).await
1419 }
1420
1421 async fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1422 let mut rng = rand::rng();
1423 let mut expected = HashSet::new();
1424 let n_replicas = 3;
1425 let n_entries = 4;
1426 for i in 0..n_replicas {
1427 let namespace = NamespaceSecret::new(&mut rng);
1428 let author = store.new_author(&mut rng)?;
1429 let mut replica = store.new_replica(namespace)?;
1430 for j in 0..n_entries {
1431 let key = format!("{j}");
1432 let data = format!("{i}:{j}");
1433 let hash = replica.hash_and_insert(key, &author, data).await?;
1434 expected.insert(hash);
1435 }
1436 }
1437 assert_eq!(expected.len(), n_replicas * n_entries);
1438 let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1439 assert_eq!(actual, expected);
1440 Ok(())
1441 }
1442
1443 #[test]
1444 fn test_multikey() {
1445 let mut rng = rand::rng();
1446
1447 let k = ["a", "c", "z"];
1448
1449 let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1450 n.sort_by_key(|n| n.id());
1451
1452 let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1453 a.sort_by_key(|a| a.id());
1454
1455 {
1457 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1458 let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1459 let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1460
1461 let range = Range::new(ri0.clone(), ri2.clone());
1462 assert!(range.contains(&ri0), "start");
1463 assert!(range.contains(&ri1), "inside");
1464 assert!(!range.contains(&ri2), "end");
1465
1466 assert!(ri0 < ri1);
1467 assert!(ri1 < ri2);
1468 }
1469
1470 {
1472 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1473 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1474 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1475
1476 let range = Range::new(ri0.clone(), ri2.clone());
1477 assert!(range.contains(&ri0), "start");
1478 assert!(range.contains(&ri1), "inside");
1479 assert!(!range.contains(&ri2), "end");
1480
1481 assert!(ri0 < ri1);
1482 assert!(ri1 < ri2);
1483 }
1484
1485 {
1487 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1488 let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1489 let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1490
1491 let range = Range::new(ri0.clone(), ri2.clone());
1492 assert!(range.contains(&ri0), "start");
1493 assert!(range.contains(&ri1), "inside");
1494 assert!(!range.contains(&ri2), "end");
1495
1496 assert!(ri0 < ri1);
1497 assert!(ri1 < ri2);
1498 }
1499
1500 {
1502 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1503 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1504 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1505
1506 let range = Range::new(ri0.clone(), ri2.clone());
1507 assert!(range.contains(&ri0), "start");
1508 assert!(range.contains(&ri1), "inside");
1509 assert!(!range.contains(&ri2), "end");
1510
1511 assert!(ri0 < ri1);
1512 assert!(ri1 < ri2);
1513 }
1514
1515 {
1517 let a0 = a[0].id();
1520 let a1 = a[1].id();
1521 let n0 = n[0].id();
1522 let n1 = n[1].id();
1523 let k0 = k[0];
1524 let k1 = k[1];
1525
1526 assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1527 assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1528 assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1529 assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1530 }
1531 }
1532
1533 #[tokio::test]
1534 async fn test_timestamps_memory() -> Result<()> {
1535 let store = store::Store::memory();
1536 test_timestamps(store).await?;
1537
1538 Ok(())
1539 }
1540
1541 #[tokio::test]
1542 #[cfg(feature = "fs-store")]
1543 async fn test_timestamps_fs() -> Result<()> {
1544 let dbfile = tempfile::NamedTempFile::new()?;
1545 let store = store::fs::Store::persistent(dbfile.path())?;
1546 test_timestamps(store).await?;
1547 Ok(())
1548 }
1549
1550 async fn test_timestamps(mut store: Store) -> Result<()> {
1551 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1552 let namespace = NamespaceSecret::new(&mut rng);
1553 let _replica = store.new_replica(namespace.clone())?;
1554 let author = store.new_author(&mut rng)?;
1555 store.close_replica(namespace.id());
1556 let mut replica = store.open_replica(&namespace.id())?;
1557
1558 let key = b"hello";
1559 let value = b"world";
1560 let entry = {
1561 let timestamp = 2;
1562 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1563 let record = Record::from_data(value, timestamp);
1564 Entry::new(id, record).sign(&namespace, &author)
1565 };
1566
1567 replica
1568 .insert_entry(entry.clone(), InsertOrigin::Local)
1569 .await
1570 .unwrap();
1571 store.close_replica(namespace.id());
1572 let res = store
1573 .get_exact(namespace.id(), author.id(), key, false)?
1574 .unwrap();
1575 assert_eq!(res, entry);
1576
1577 let entry2 = {
1578 let timestamp = 1;
1579 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1580 let record = Record::from_data(value, timestamp);
1581 Entry::new(id, record).sign(&namespace, &author)
1582 };
1583
1584 let mut replica = store.open_replica(&namespace.id())?;
1585 let res = replica.insert_entry(entry2, InsertOrigin::Local).await;
1586 store.close_replica(namespace.id());
1587 assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1588 let res = store
1589 .get_exact(namespace.id(), author.id(), key, false)?
1590 .unwrap();
1591 assert_eq!(res, entry);
1592 store.flush()?;
1593 Ok(())
1594 }
1595
1596 #[tokio::test]
1597 async fn test_replica_sync_memory() -> Result<()> {
1598 let alice_store = store::Store::memory();
1599 let bob_store = store::Store::memory();
1600
1601 test_replica_sync(alice_store, bob_store).await?;
1602 Ok(())
1603 }
1604
1605 #[tokio::test]
1606 #[cfg(feature = "fs-store")]
1607 async fn test_replica_sync_fs() -> Result<()> {
1608 let alice_dbfile = tempfile::NamedTempFile::new()?;
1609 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1610 let bob_dbfile = tempfile::NamedTempFile::new()?;
1611 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1612 test_replica_sync(alice_store, bob_store).await?;
1613
1614 Ok(())
1615 }
1616
1617 async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1618 let alice_set = ["ape", "eel", "fox", "gnu"];
1619 let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1620
1621 let mut rng = rand::rng();
1622 let author = Author::new(&mut rng);
1623 let myspace = NamespaceSecret::new(&mut rng);
1624 let mut alice = alice_store.new_replica(myspace.clone())?;
1625 for el in &alice_set {
1626 alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1627 }
1628
1629 let mut bob = bob_store.new_replica(myspace.clone())?;
1630 for el in &bob_set {
1631 bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1632 }
1633
1634 let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1635
1636 assert_eq!(alice_out.num_sent, 2);
1637 assert_eq!(bob_out.num_recv, 2);
1638 assert_eq!(alice_out.num_recv, 6);
1639 assert_eq!(bob_out.num_sent, 6);
1640
1641 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1642 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1643 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1644 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1645 alice_store.flush()?;
1646 bob_store.flush()?;
1647 Ok(())
1648 }
1649
1650 #[tokio::test]
1651 async fn test_replica_timestamp_sync_memory() -> Result<()> {
1652 let alice_store = store::Store::memory();
1653 let bob_store = store::Store::memory();
1654
1655 test_replica_timestamp_sync(alice_store, bob_store).await?;
1656 Ok(())
1657 }
1658
1659 #[tokio::test]
1660 #[cfg(feature = "fs-store")]
1661 async fn test_replica_timestamp_sync_fs() -> Result<()> {
1662 let alice_dbfile = tempfile::NamedTempFile::new()?;
1663 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1664 let bob_dbfile = tempfile::NamedTempFile::new()?;
1665 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1666 test_replica_timestamp_sync(alice_store, bob_store).await?;
1667
1668 Ok(())
1669 }
1670
1671 async fn test_replica_timestamp_sync(
1672 mut alice_store: Store,
1673 mut bob_store: Store,
1674 ) -> Result<()> {
1675 let mut rng = rand::rng();
1676 let author = Author::new(&mut rng);
1677 let namespace = NamespaceSecret::new(&mut rng);
1678 let mut alice = alice_store.new_replica(namespace.clone())?;
1679 let mut bob = bob_store.new_replica(namespace.clone())?;
1680
1681 let key = b"key";
1682 let alice_value = b"alice";
1683 let bob_value = b"bob";
1684 let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?;
1685 let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?;
1687 sync(&mut alice, &mut bob).await?;
1688 assert_eq!(
1689 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1690 Some(bob_hash)
1691 );
1692 assert_eq!(
1693 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1694 Some(bob_hash)
1695 );
1696
1697 let mut alice = alice_store.new_replica(namespace.clone())?;
1698 let mut bob = bob_store.new_replica(namespace.clone())?;
1699
1700 let alice_value_2 = b"alice2";
1701 let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?;
1703 let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?;
1704 sync(&mut alice, &mut bob).await?;
1705 assert_eq!(
1706 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1707 Some(alice_hash_2)
1708 );
1709 assert_eq!(
1710 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1711 Some(alice_hash_2)
1712 );
1713 alice_store.flush()?;
1714 bob_store.flush()?;
1715 Ok(())
1716 }
1717
1718 #[tokio::test]
1719 async fn test_future_timestamp() -> Result<()> {
1720 let mut rng = rand::rng();
1721 let mut store = store::Store::memory();
1722 let author = Author::new(&mut rng);
1723 let namespace = NamespaceSecret::new(&mut rng);
1724
1725 let mut replica = store.new_replica(namespace.clone())?;
1726 let key = b"hi";
1727 let t = system_time_now();
1728 let record = Record::from_data(b"1", t);
1729 let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1730 replica
1731 .insert_entry(entry0.clone(), InsertOrigin::Local)
1732 .await?;
1733
1734 assert_eq!(
1735 get_entry(&mut store, namespace.id(), author.id(), key)?,
1736 entry0
1737 );
1738
1739 let mut replica = store.new_replica(namespace.clone())?;
1740 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1741 let record = Record::from_data(b"2", t);
1742 let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1743 replica
1744 .insert_entry(entry1.clone(), InsertOrigin::Local)
1745 .await?;
1746 assert_eq!(
1747 get_entry(&mut store, namespace.id(), author.id(), key)?,
1748 entry1
1749 );
1750
1751 let mut replica = store.new_replica(namespace.clone())?;
1752 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1753 let record = Record::from_data(b"2", t);
1754 let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1755 replica
1756 .insert_entry(entry2.clone(), InsertOrigin::Local)
1757 .await?;
1758 assert_eq!(
1759 get_entry(&mut store, namespace.id(), author.id(), key)?,
1760 entry2
1761 );
1762
1763 let mut replica = store.new_replica(namespace.clone())?;
1764 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1765 let record = Record::from_data(b"2", t);
1766 let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1767 let res = replica.insert_entry(entry3, InsertOrigin::Local).await;
1768 assert!(matches!(
1769 res,
1770 Err(InsertError::Validation(
1771 ValidationFailure::TooFarInTheFuture
1772 ))
1773 ));
1774 assert_eq!(
1775 get_entry(&mut store, namespace.id(), author.id(), key)?,
1776 entry2
1777 );
1778 store.flush()?;
1779 Ok(())
1780 }
1781
1782 #[tokio::test]
1783 async fn test_insert_empty() -> Result<()> {
1784 let mut store = store::Store::memory();
1785 let mut rng = rand::rng();
1786 let alice = Author::new(&mut rng);
1787 let myspace = NamespaceSecret::new(&mut rng);
1788 let mut replica = store.new_replica(myspace.clone())?;
1789 let hash = Hash::new(b"");
1790 let res = replica.insert(b"foo", &alice, hash, 0).await;
1791 assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1792 store.flush()?;
1793 Ok(())
1794 }
1795
1796 #[tokio::test]
1797 async fn test_prefix_delete_memory() -> Result<()> {
1798 let store = store::Store::memory();
1799 test_prefix_delete(store).await?;
1800 Ok(())
1801 }
1802
1803 #[tokio::test]
1804 #[cfg(feature = "fs-store")]
1805 async fn test_prefix_delete_fs() -> Result<()> {
1806 let dbfile = tempfile::NamedTempFile::new()?;
1807 let store = store::fs::Store::persistent(dbfile.path())?;
1808 test_prefix_delete(store).await?;
1809 Ok(())
1810 }
1811
1812 async fn test_prefix_delete(mut store: Store) -> Result<()> {
1813 let mut rng = rand::rng();
1814 let alice = Author::new(&mut rng);
1815 let myspace = NamespaceSecret::new(&mut rng);
1816 let mut replica = store.new_replica(myspace.clone())?;
1817 let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?;
1818 let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?;
1819
1820 assert_eq!(
1822 get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1823 Some(hash1)
1824 );
1825 assert_eq!(
1826 get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1827 Some(hash2)
1828 );
1829
1830 let mut replica = store.new_replica(myspace.clone())?;
1832 let deleted = replica.delete_prefix(b"foo", &alice).await?;
1833 assert_eq!(deleted, 2);
1834 assert_eq!(
1835 store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1836 None
1837 );
1838 assert_eq!(
1839 store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1840 None
1841 );
1842 assert_eq!(
1843 store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1844 None
1845 );
1846 store.flush()?;
1847 Ok(())
1848 }
1849
1850 #[tokio::test]
1851 async fn test_replica_sync_delete_memory() -> Result<()> {
1852 let alice_store = store::Store::memory();
1853 let bob_store = store::Store::memory();
1854
1855 test_replica_sync_delete(alice_store, bob_store).await
1856 }
1857
1858 #[tokio::test]
1859 #[cfg(feature = "fs-store")]
1860 async fn test_replica_sync_delete_fs() -> Result<()> {
1861 let alice_dbfile = tempfile::NamedTempFile::new()?;
1862 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1863 let bob_dbfile = tempfile::NamedTempFile::new()?;
1864 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1865 test_replica_sync_delete(alice_store, bob_store).await
1866 }
1867
1868 async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1869 let alice_set = ["foot"];
1870 let bob_set = ["fool", "foo", "fog"];
1871
1872 let mut rng = rand::rng();
1873 let author = Author::new(&mut rng);
1874 let myspace = NamespaceSecret::new(&mut rng);
1875 let mut alice = alice_store.new_replica(myspace.clone())?;
1876 for el in &alice_set {
1877 alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1878 }
1879
1880 let mut bob = bob_store.new_replica(myspace.clone())?;
1881 for el in &bob_set {
1882 bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1883 }
1884
1885 sync(&mut alice, &mut bob).await?;
1886
1887 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1888 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1889 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1890 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1891
1892 let mut alice = alice_store.new_replica(myspace.clone())?;
1893 let mut bob = bob_store.new_replica(myspace.clone())?;
1894 alice.delete_prefix("foo", &author).await?;
1895 bob.hash_and_insert("fooz", &author, "fooz".as_bytes())
1896 .await?;
1897 sync(&mut alice, &mut bob).await?;
1898 check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1899 check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1900 alice_store.flush()?;
1901 bob_store.flush()?;
1902 Ok(())
1903 }
1904
1905 #[tokio::test]
1906 async fn test_replica_remove_memory() -> Result<()> {
1907 let alice_store = store::Store::memory();
1908 test_replica_remove(alice_store).await
1909 }
1910
1911 #[tokio::test]
1912 #[cfg(feature = "fs-store")]
1913 async fn test_replica_remove_fs() -> Result<()> {
1914 let alice_dbfile = tempfile::NamedTempFile::new()?;
1915 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1916 test_replica_remove(alice_store).await
1917 }
1918
1919 async fn test_replica_remove(mut store: Store) -> Result<()> {
1920 let mut rng = rand::rng();
1921 let namespace = NamespaceSecret::new(&mut rng);
1922 let author = Author::new(&mut rng);
1923 let mut replica = store.new_replica(namespace.clone())?;
1924
1925 let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?;
1927 let res = store
1928 .get_many(namespace.id(), Query::all())?
1929 .collect::<Vec<_>>();
1930 assert_eq!(res.len(), 1);
1931
1932 let res = store.remove_replica(&namespace.id());
1934 assert!(res.is_err());
1936 store.close_replica(namespace.id());
1937 store.remove_replica(&namespace.id())?;
1938 let res = store
1939 .get_many(namespace.id(), Query::all())?
1940 .collect::<Vec<_>>();
1941 assert_eq!(res.len(), 0);
1942
1943 let res = store.load_replica_info(&namespace.id());
1945 assert!(matches!(res, Err(OpenError::NotFound)));
1946
1947 let mut replica = store.new_replica(namespace.clone())?;
1949 replica.insert(b"foo", &author, hash, 3).await?;
1950 let res = store
1951 .get_many(namespace.id(), Query::all())?
1952 .collect::<Vec<_>>();
1953 assert_eq!(res.len(), 1);
1954 store.flush()?;
1955 Ok(())
1956 }
1957
1958 #[tokio::test]
1959 async fn test_replica_delete_edge_cases_memory() -> Result<()> {
1960 let store = store::Store::memory();
1961 test_replica_delete_edge_cases(store).await
1962 }
1963
1964 #[tokio::test]
1965 #[cfg(feature = "fs-store")]
1966 async fn test_replica_delete_edge_cases_fs() -> Result<()> {
1967 let dbfile = tempfile::NamedTempFile::new()?;
1968 let store = store::fs::Store::persistent(dbfile.path())?;
1969 test_replica_delete_edge_cases(store).await
1970 }
1971
1972 async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
1973 let mut rng = rand::rng();
1974 let author = Author::new(&mut rng);
1975 let namespace = NamespaceSecret::new(&mut rng);
1976
1977 let edgecases = [0u8, 1u8, 255u8];
1978 let prefixes = [0u8, 255u8];
1979 let hash = Hash::new(b"foo");
1980 let len = 3;
1981 for prefix in prefixes {
1982 let mut expected = vec![];
1983 let mut replica = store.new_replica(namespace.clone())?;
1984 for suffix in edgecases {
1985 let key = [prefix, suffix].to_vec();
1986 expected.push(key.clone());
1987 replica.insert(&key, &author, hash, len).await?;
1988 }
1989 assert_keys(&mut store, namespace.id(), expected);
1990 let mut replica = store.new_replica(namespace.clone())?;
1991 replica.delete_prefix([prefix], &author).await?;
1992 assert_keys(&mut store, namespace.id(), vec![]);
1993 }
1994
1995 let mut replica = store.new_replica(namespace.clone())?;
1996 let key = vec![1u8, 0u8];
1997 replica.insert(key, &author, hash, len).await?;
1998 let key = vec![1u8, 1u8];
1999 replica.insert(key, &author, hash, len).await?;
2000 let key = vec![1u8, 2u8];
2001 replica.insert(key, &author, hash, len).await?;
2002 let prefix = vec![1u8, 1u8];
2003 replica.delete_prefix(prefix, &author).await?;
2004 assert_keys(
2005 &mut store,
2006 namespace.id(),
2007 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2008 );
2009
2010 let mut replica = store.new_replica(namespace.clone())?;
2011 let key = vec![0u8, 255u8];
2012 replica.insert(key, &author, hash, len).await?;
2013 let key = vec![0u8, 0u8];
2014 replica.insert(key, &author, hash, len).await?;
2015 let prefix = vec![0u8];
2016 replica.delete_prefix(prefix, &author).await?;
2017 assert_keys(
2018 &mut store,
2019 namespace.id(),
2020 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2021 );
2022 store.flush()?;
2023 Ok(())
2024 }
2025
2026 #[tokio::test]
2027 async fn test_latest_iter_memory() -> Result<()> {
2028 let store = store::Store::memory();
2029 test_latest_iter(store).await
2030 }
2031
2032 #[tokio::test]
2033 #[cfg(feature = "fs-store")]
2034 async fn test_latest_iter_fs() -> Result<()> {
2035 let dbfile = tempfile::NamedTempFile::new()?;
2036 let store = store::fs::Store::persistent(dbfile.path())?;
2037 test_latest_iter(store).await
2038 }
2039
2040 async fn test_latest_iter(mut store: Store) -> Result<()> {
2041 let mut rng = rand::rng();
2042 let author0 = Author::new(&mut rng);
2043 let author1 = Author::new(&mut rng);
2044 let namespace = NamespaceSecret::new(&mut rng);
2045 let mut replica = store.new_replica(namespace.clone())?;
2046
2047 replica.hash_and_insert(b"a0.1", &author0, b"hi").await?;
2048 let latest = store
2049 .get_latest_for_each_author(namespace.id())?
2050 .collect::<Result<Vec<_>>>()?;
2051 assert_eq!(latest.len(), 1);
2052 assert_eq!(latest[0].2, b"a0.1".to_vec());
2053
2054 let mut replica = store.new_replica(namespace.clone())?;
2055 replica.hash_and_insert(b"a1.1", &author1, b"hi").await?;
2056 replica.hash_and_insert(b"a0.2", &author0, b"hi").await?;
2057 let latest = store
2058 .get_latest_for_each_author(namespace.id())?
2059 .collect::<Result<Vec<_>>>()?;
2060 let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2061 latest_keys.sort();
2062 assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2063 store.flush()?;
2064 Ok(())
2065 }
2066
2067 #[tokio::test]
2068 async fn test_replica_byte_keys_memory() -> Result<()> {
2069 let store = store::Store::memory();
2070
2071 test_replica_byte_keys(store).await?;
2072 Ok(())
2073 }
2074
2075 #[tokio::test]
2076 #[cfg(feature = "fs-store")]
2077 async fn test_replica_byte_keys_fs() -> Result<()> {
2078 let dbfile = tempfile::NamedTempFile::new()?;
2079 let store = store::fs::Store::persistent(dbfile.path())?;
2080 test_replica_byte_keys(store).await?;
2081
2082 Ok(())
2083 }
2084
2085 async fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2086 let mut rng = rand::rng();
2087 let author = Author::new(&mut rng);
2088 let namespace = NamespaceSecret::new(&mut rng);
2089
2090 let hash = Hash::new(b"foo");
2091 let len = 3;
2092
2093 let key = vec![1u8, 0u8];
2094 let mut replica = store.new_replica(namespace.clone())?;
2095 replica.insert(key, &author, hash, len).await?;
2096 assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2097 let key = vec![1u8, 2u8];
2098 let mut replica = store.new_replica(namespace.clone())?;
2099 replica.insert(key, &author, hash, len).await?;
2100 assert_keys(
2101 &mut store,
2102 namespace.id(),
2103 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2104 );
2105
2106 let key = vec![0u8, 255u8];
2107 let mut replica = store.new_replica(namespace.clone())?;
2108 replica.insert(key, &author, hash, len).await?;
2109 assert_keys(
2110 &mut store,
2111 namespace.id(),
2112 vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2113 );
2114 store.flush()?;
2115 Ok(())
2116 }
2117
2118 #[tokio::test]
2119 async fn test_replica_capability_memory() -> Result<()> {
2120 let store = store::Store::memory();
2121 test_replica_capability(store).await
2122 }
2123
2124 #[tokio::test]
2125 #[cfg(feature = "fs-store")]
2126 async fn test_replica_capability_fs() -> Result<()> {
2127 let dbfile = tempfile::NamedTempFile::new()?;
2128 let store = store::fs::Store::persistent(dbfile.path())?;
2129 test_replica_capability(store).await
2130 }
2131
2132 #[allow(clippy::redundant_pattern_matching)]
2133 async fn test_replica_capability(mut store: Store) -> Result<()> {
2134 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2135 let author = store.new_author(&mut rng)?;
2136 let namespace = NamespaceSecret::new(&mut rng);
2137
2138 let capability = Capability::Read(namespace.id());
2140 store.import_namespace(capability)?;
2141 let mut replica = store.open_replica(&namespace.id())?;
2142 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2143 assert!(matches!(res, Err(InsertError::ReadOnly)));
2144
2145 let capability = Capability::Write(namespace.clone());
2147 store.import_namespace(capability)?;
2148 let mut replica = store.open_replica(&namespace.id())?;
2149 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2150 assert!(matches!(res, Ok(_)));
2151 store.close_replica(namespace.id());
2152 let mut replica = store.open_replica(&namespace.id())?;
2153 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2154 assert!(res.is_ok());
2155
2156 let capability = Capability::Read(namespace.id());
2158 store.import_namespace(capability)?;
2159 store.close_replica(namespace.id());
2160 let mut replica = store.open_replica(&namespace.id())?;
2161 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2162 assert!(res.is_ok());
2163 store.flush()?;
2164 Ok(())
2165 }
2166
2167 #[tokio::test]
2168 async fn test_actor_capability_memory() -> Result<()> {
2169 let store = store::Store::memory();
2170 test_actor_capability(store).await
2171 }
2172
2173 #[tokio::test]
2174 #[cfg(feature = "fs-store")]
2175 async fn test_actor_capability_fs() -> Result<()> {
2176 let dbfile = tempfile::NamedTempFile::new()?;
2177 let store = store::fs::Store::persistent(dbfile.path())?;
2178 test_actor_capability(store).await
2179 }
2180
2181 async fn test_actor_capability(store: Store) -> Result<()> {
2182 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2184 let author = Author::new(&mut rng);
2185 let handle = SyncHandle::spawn(store, None, "test".into());
2186 let author = handle.import_author(author).await?;
2187 let namespace = NamespaceSecret::new(&mut rng);
2188 let id = namespace.id();
2189
2190 let capability = Capability::Read(namespace.id());
2192 handle.import_namespace(capability).await?;
2193 handle.open(namespace.id(), Default::default()).await?;
2194 let res = handle
2195 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2196 .await;
2197 assert!(res.is_err());
2198
2199 let capability = Capability::Write(namespace.clone());
2201 handle.import_namespace(capability).await?;
2202 let res = handle
2203 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2204 .await;
2205 assert!(res.is_ok());
2206
2207 handle.close(namespace.id()).await?;
2209 let res = handle
2210 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2211 .await;
2212 assert!(res.is_err());
2213 handle.open(namespace.id(), Default::default()).await?;
2214 let res = handle
2215 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2216 .await;
2217 assert!(res.is_ok());
2218 Ok(())
2219 }
2220
2221 fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2222 let mut res = vec![];
2223 while let Ok(ev) = events.try_recv() {
2224 res.push(ev);
2225 }
2226 res
2227 }
2228
2229 #[tokio::test]
2232 async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2233 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2234 let mut store1 = store::Store::memory();
2235 let mut store2 = store::Store::memory();
2236 let peer1 = [1u8; 32];
2237 let peer2 = [2u8; 32];
2238 let mut state1 = SyncOutcome::default();
2239 let mut state2 = SyncOutcome::default();
2240
2241 let author = Author::new(&mut rng);
2242 let namespace = NamespaceSecret::new(&mut rng);
2243 let mut replica1 = store1.new_replica(namespace.clone())?;
2244 let mut replica2 = store2.new_replica(namespace.clone())?;
2245
2246 let (events1_sender, events1) = async_channel::bounded(32);
2247 let (events2_sender, events2) = async_channel::bounded(32);
2248
2249 replica1.info.subscribe(events1_sender);
2250 replica2.info.subscribe(events2_sender);
2251
2252 replica1.hash_and_insert(b"foo", &author, b"init").await?;
2253
2254 let from1 = replica1.sync_initial_message()?;
2255 let from2 = replica2
2256 .sync_process_message(from1, peer1, &mut state2)
2257 .await
2258 .unwrap()
2259 .unwrap();
2260 let from1 = replica1
2261 .sync_process_message(from2, peer2, &mut state1)
2262 .await
2263 .unwrap()
2264 .unwrap();
2265 replica2.hash_and_insert(b"foo", &author, b"update").await?;
2269 let from2 = replica2
2270 .sync_process_message(from1, peer1, &mut state2)
2271 .await
2272 .unwrap();
2273 assert!(from2.is_none());
2274 let events1 = drain(events1);
2275 let events2 = drain(events2);
2276 assert_eq!(events1.len(), 1);
2277 assert_eq!(events2.len(), 1);
2278 assert!(matches!(events1[0], Event::LocalInsert { .. }));
2279 assert!(matches!(events2[0], Event::LocalInsert { .. }));
2280 assert_eq!(state1.num_sent, 1);
2281 assert_eq!(state1.num_recv, 0);
2282 assert_eq!(state2.num_sent, 0);
2283 assert_eq!(state2.num_recv, 1);
2284 store1.flush()?;
2285 store2.flush()?;
2286 Ok(())
2287 }
2288
2289 #[tokio::test]
2290 async fn test_replica_queries_mem() -> Result<()> {
2291 let store = store::Store::memory();
2292
2293 test_replica_queries(store).await?;
2294 Ok(())
2295 }
2296
2297 #[tokio::test]
2298 #[cfg(feature = "fs-store")]
2299 async fn test_replica_queries_fs() -> Result<()> {
2300 let dbfile = tempfile::NamedTempFile::new()?;
2301 let store = store::fs::Store::persistent(dbfile.path())?;
2302 test_replica_queries(store).await?;
2303
2304 Ok(())
2305 }
2306
2307 async fn test_replica_queries(mut store: Store) -> Result<()> {
2308 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2309 let namespace = NamespaceSecret::new(&mut rng);
2310 let namespace_id = namespace.id();
2311
2312 let a1 = store.new_author(&mut rng)?;
2313 let a2 = store.new_author(&mut rng)?;
2314 let a3 = store.new_author(&mut rng)?;
2315 println!(
2316 "a1 {} a2 {} a3 {}",
2317 a1.id().fmt_short(),
2318 a2.id().fmt_short(),
2319 a3.id().fmt_short()
2320 );
2321
2322 let mut replica = store.new_replica(namespace.clone())?;
2323 replica.hash_and_insert("hi/world", &a2, "a2").await?;
2324 replica.hash_and_insert("hi/world", &a1, "a1").await?;
2325 replica.hash_and_insert("hi/moon", &a2, "a1").await?;
2326 replica.hash_and_insert("hi", &a3, "a3").await?;
2327
2328 struct QueryTester<'a> {
2329 store: &'a mut Store,
2330 namespace: NamespaceId,
2331 }
2332 impl QueryTester<'_> {
2333 fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2334 let query = query.into();
2335 let actual = self
2336 .store
2337 .get_many(self.namespace, query.clone())
2338 .unwrap()
2339 .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2340 .collect::<Result<Vec<_>>>()
2341 .unwrap();
2342 let expected = expected
2343 .into_iter()
2344 .map(|(key, author)| (key.to_string(), author.id()))
2345 .collect::<Vec<_>>();
2346 assert_eq!(actual, expected, "query: {query:#?}")
2347 }
2348 }
2349
2350 let mut qt = QueryTester {
2351 store: &mut store,
2352 namespace: namespace_id,
2353 };
2354
2355 qt.assert(
2356 Query::all(),
2357 vec![
2358 ("hi/world", &a1),
2359 ("hi/moon", &a2),
2360 ("hi/world", &a2),
2361 ("hi", &a3),
2362 ],
2363 );
2364
2365 qt.assert(
2366 Query::single_latest_per_key(),
2367 vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2368 );
2369
2370 qt.assert(
2371 Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2372 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2373 );
2374
2375 qt.assert(
2376 Query::single_latest_per_key().key_prefix("hi/"),
2377 vec![("hi/moon", &a2), ("hi/world", &a1)],
2378 );
2379
2380 qt.assert(
2381 Query::single_latest_per_key()
2382 .key_prefix("hi/")
2383 .sort_direction(SortDirection::Desc),
2384 vec![("hi/world", &a1), ("hi/moon", &a2)],
2385 );
2386
2387 qt.assert(
2388 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2389 vec![
2390 ("hi", &a3),
2391 ("hi/moon", &a2),
2392 ("hi/world", &a1),
2393 ("hi/world", &a2),
2394 ],
2395 );
2396
2397 qt.assert(
2398 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2399 vec![
2400 ("hi/world", &a2),
2401 ("hi/world", &a1),
2402 ("hi/moon", &a2),
2403 ("hi", &a3),
2404 ],
2405 );
2406
2407 qt.assert(
2408 Query::all().key_prefix("hi/"),
2409 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2410 );
2411
2412 qt.assert(
2413 Query::all().key_prefix("hi/").offset(1).limit(1),
2414 vec![("hi/moon", &a2)],
2415 );
2416
2417 qt.assert(
2418 Query::all()
2419 .key_prefix("hi/")
2420 .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2421 vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2422 );
2423
2424 qt.assert(
2425 Query::all()
2426 .key_prefix("hi/")
2427 .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2428 .offset(1)
2429 .limit(1),
2430 vec![("hi/world", &a1)],
2431 );
2432
2433 qt.assert(
2434 Query::all()
2435 .key_prefix("hi/")
2436 .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2437 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2438 );
2439
2440 qt.assert(
2441 Query::all()
2442 .key_prefix("hi/")
2443 .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2444 vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2445 );
2446
2447 qt.assert(
2448 Query::all()
2449 .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2450 .limit(2)
2451 .offset(1),
2452 vec![("hi/moon", &a2), ("hi/world", &a1)],
2453 );
2454
2455 let mut replica = store.new_replica(namespace)?;
2456 replica.delete_prefix("hi/world", &a2).await?;
2457 let mut qt = QueryTester {
2458 store: &mut store,
2459 namespace: namespace_id,
2460 };
2461
2462 qt.assert(
2463 Query::all(),
2464 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2465 );
2466
2467 qt.assert(
2468 Query::all().include_empty(),
2469 vec![
2470 ("hi/world", &a1),
2471 ("hi/moon", &a2),
2472 ("hi/world", &a2),
2473 ("hi", &a3),
2474 ],
2475 );
2476 store.flush()?;
2477 Ok(())
2478 }
2479
2480 #[test]
2481 fn test_dl_policies_mem() -> Result<()> {
2482 let mut store = store::Store::memory();
2483 test_dl_policies(&mut store)
2484 }
2485
2486 #[test]
2487 #[cfg(feature = "fs-store")]
2488 fn test_dl_policies_fs() -> Result<()> {
2489 let dbfile = tempfile::NamedTempFile::new()?;
2490 let mut store = store::fs::Store::persistent(dbfile.path())?;
2491 test_dl_policies(&mut store)
2492 }
2493
2494 fn test_dl_policies(store: &mut Store) -> Result<()> {
2495 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2496 let namespace = NamespaceSecret::new(&mut rng);
2497 let id = namespace.id();
2498
2499 let filter = store::FilterKind::Exact("foo".into());
2500 let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2501 store
2502 .set_download_policy(&id, policy.clone())
2503 .expect_err("document dos not exist");
2504
2505 store.new_replica(namespace)?;
2507
2508 store.set_download_policy(&id, policy.clone())?;
2509 let retrieved_policy = store.get_download_policy(&id)?;
2510 assert_eq!(retrieved_policy, policy);
2511 store.flush()?;
2512 Ok(())
2513 }
2514
2515 fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2516 expected.sort();
2517 assert_eq!(expected, get_keys_sorted(store, namespace));
2518 }
2519
2520 fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2521 let mut res = store
2522 .get_many(namespace, Query::all())
2523 .unwrap()
2524 .map(|e| e.map(|e| e.key().to_vec()))
2525 .collect::<Result<Vec<_>>>()
2526 .unwrap();
2527 res.sort();
2528 res
2529 }
2530
2531 fn get_entry(
2532 store: &mut Store,
2533 namespace: NamespaceId,
2534 author: AuthorId,
2535 key: &[u8],
2536 ) -> anyhow::Result<SignedEntry> {
2537 let entry = store
2538 .get_exact(namespace, author, key, true)?
2539 .ok_or_else(|| anyhow::anyhow!("not found"))?;
2540 Ok(entry)
2541 }
2542
2543 fn get_content_hash(
2544 store: &mut Store,
2545 namespace: NamespaceId,
2546 author: AuthorId,
2547 key: &[u8],
2548 ) -> anyhow::Result<Option<Hash>> {
2549 let hash = store
2550 .get_exact(namespace, author, key, false)?
2551 .map(|e| e.content_hash());
2552 Ok(hash)
2553 }
2554
2555 async fn sync<'a>(
2556 alice: &'a mut Replica<'a>,
2557 bob: &'a mut Replica<'a>,
2558 ) -> Result<(SyncOutcome, SyncOutcome)> {
2559 let alice_peer_id = [1u8; 32];
2560 let bob_peer_id = [2u8; 32];
2561 let mut alice_state = SyncOutcome::default();
2562 let mut bob_state = SyncOutcome::default();
2563 let mut next_to_bob = Some(alice.sync_initial_message()?);
2565 let mut rounds = 0;
2566 while let Some(msg) = next_to_bob.take() {
2567 assert!(rounds < 100, "too many rounds");
2568 rounds += 1;
2569 println!("round {rounds}");
2570 if let Some(msg) = bob
2571 .sync_process_message(msg, alice_peer_id, &mut bob_state)
2572 .await?
2573 {
2574 next_to_bob = alice
2575 .sync_process_message(msg, bob_peer_id, &mut alice_state)
2576 .await?
2577 }
2578 }
2579 assert_eq!(alice_state.num_sent, bob_state.num_recv);
2580 assert_eq!(alice_state.num_recv, bob_state.num_sent);
2581 Ok((alice_state, bob_state))
2582 }
2583
2584 fn check_entries(
2585 store: &mut Store,
2586 namespace: &NamespaceId,
2587 author: &Author,
2588 set: &[&str],
2589 ) -> Result<()> {
2590 for el in set {
2591 store.get_exact(*namespace, author.id(), el, false)?;
2592 }
2593 Ok(())
2594 }
2595}