1use std::{
10 cmp::Ordering,
11 fmt::Debug,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14};
15
16use bytes::{Bytes, BytesMut};
17use ed25519_dalek::{Signature, SignatureError};
18use iroh_blobs::Hash;
19use n0_future::{
20 time::{Duration, SystemTime},
21 IterExt,
22};
23use serde::{Deserialize, Serialize};
24
25pub use crate::heads::AuthorHeads;
26use crate::{
27 keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
28 ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
29 store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
30};
31
32pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
36
37pub type PeerIdBytes = [u8; 32];
40
41pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_micros() as u64;
44
45pub type ContentStatusCallback =
47 Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
48
49#[derive(derive_more::Debug, Clone)]
51pub enum Event {
52 LocalInsert {
54 namespace: NamespaceId,
56 entry: SignedEntry,
58 },
59 RemoteInsert {
61 namespace: NamespaceId,
63 entry: SignedEntry,
65 #[debug("{}", hex::encode(&from[..5]))]
68 from: PeerIdBytes,
69 should_download: bool,
71 remote_content_status: ContentStatus,
73 },
74}
75
76#[derive(derive_more::Debug, Clone)]
78pub enum InsertOrigin {
79 Local,
81 Sync {
83 #[debug("{}", hex::encode(&from[..5]))]
86 from: PeerIdBytes,
87 remote_content_status: ContentStatus,
89 },
90}
91
92#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
94pub enum ContentStatus {
95 Complete,
97 Incomplete,
99 Missing,
101}
102
103#[derive(Debug, Clone, Default)]
105pub struct SyncOutcome {
106 pub heads_received: AuthorHeads,
108 pub num_recv: usize,
110 pub num_sent: usize,
112}
113
114fn get_as_ptr<T>(value: &T) -> Option<usize> {
115 use std::mem;
116 if mem::size_of::<T>() == std::mem::size_of::<usize>()
117 && mem::align_of::<T>() == mem::align_of::<usize>()
118 {
119 unsafe { Some(mem::transmute_copy(value)) }
121 } else {
122 None
123 }
124}
125
126fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
127 get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
128}
129
130#[derive(Debug, Default)]
131struct Subscribers(Vec<async_channel::Sender<Event>>);
132impl Subscribers {
133 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
134 self.0.push(sender)
135 }
136 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
137 self.0.retain(|s| !same_channel(s, sender));
138 }
139 pub async fn send(&mut self, event: Event) {
140 self.0 = std::mem::take(&mut self.0)
141 .into_iter()
142 .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx))
143 .join_all()
144 .await
145 .into_iter()
146 .flatten()
147 .collect();
148 }
149 pub fn len(&self) -> usize {
150 self.0.len()
151 }
152 pub async fn send_with(&mut self, f: impl FnOnce() -> Event) {
153 if !self.0.is_empty() {
154 self.send(f()).await
155 }
156 }
157}
158
159#[derive(
161 Debug,
162 Clone,
163 Copy,
164 Serialize,
165 Deserialize,
166 num_enum::IntoPrimitive,
167 num_enum::TryFromPrimitive,
168 strum::Display,
169)]
170#[repr(u8)]
171#[strum(serialize_all = "snake_case")]
172pub enum CapabilityKind {
173 Write = 1,
175 Read = 2,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
181pub enum Capability {
182 Write(NamespaceSecret),
184 Read(NamespaceId),
186}
187
188impl Capability {
189 pub fn id(&self) -> NamespaceId {
191 match self {
192 Capability::Write(secret) => secret.id(),
193 Capability::Read(id) => *id,
194 }
195 }
196
197 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
200 match self {
201 Capability::Write(secret) => Ok(secret),
202 Capability::Read(_) => Err(ReadOnly),
203 }
204 }
205
206 pub fn kind(&self) -> CapabilityKind {
208 match self {
209 Capability::Write(_) => CapabilityKind::Write,
210 Capability::Read(_) => CapabilityKind::Read,
211 }
212 }
213
214 pub fn raw(&self) -> (u8, [u8; 32]) {
216 let capability_repr: u8 = self.kind().into();
217 let bytes = match self {
218 Capability::Write(secret) => secret.to_bytes(),
219 Capability::Read(id) => id.to_bytes(),
220 };
221 (capability_repr, bytes)
222 }
223
224 pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
226 let kind: CapabilityKind = kind.try_into()?;
227 let capability = match kind {
228 CapabilityKind::Write => {
229 let secret = NamespaceSecret::from_bytes(bytes);
230 Capability::Write(secret)
231 }
232 CapabilityKind::Read => {
233 let id = NamespaceId::from(bytes);
234 Capability::Read(id)
235 }
236 };
237 Ok(capability)
238 }
239
240 pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
246 if other.id() != self.id() {
247 return Err(CapabilityError::NamespaceMismatch);
248 }
249
250 if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
252 let _ = std::mem::replace(self, other);
253 Ok(true)
254 } else {
255 Ok(false)
256 }
257 }
258}
259
260#[derive(Debug, thiserror::Error)]
262pub enum CapabilityError {
263 #[error("Namespaces are not the same")]
265 NamespaceMismatch,
266}
267
268#[derive(derive_more::Debug)]
270pub struct ReplicaInfo {
271 pub(crate) capability: Capability,
272 subscribers: Subscribers,
273 #[debug("ContentStatusCallback")]
274 content_status_cb: Option<ContentStatusCallback>,
275 closed: bool,
276}
277
278impl ReplicaInfo {
279 pub fn new(capability: Capability) -> Self {
281 Self {
282 capability,
283 subscribers: Default::default(),
284 content_status_cb: None,
286 closed: false,
287 }
288 }
289
290 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
296 self.subscribers.subscribe(sender)
297 }
298
299 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
305 self.subscribers.unsubscribe(sender)
306 }
307
308 pub fn subscribers_count(&self) -> usize {
310 self.subscribers.len()
311 }
312
313 pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
318 if self.content_status_cb.is_some() {
319 false
320 } else {
321 self.content_status_cb = Some(cb);
322 true
323 }
324 }
325
326 fn ensure_open(&self) -> Result<(), InsertError> {
327 if self.closed() {
328 Err(InsertError::Closed)
329 } else {
330 Ok(())
331 }
332 }
333
334 pub fn closed(&self) -> bool {
340 self.closed
341 }
342
343 pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
350 self.capability.merge(capability)
351 }
352}
353
354#[derive(derive_more::Debug)]
356pub struct Replica<'a, I = Box<ReplicaInfo>> {
357 pub(crate) store: StoreInstance<'a>,
358 pub(crate) info: I,
359}
360
361impl<'a, I> Replica<'a, I>
362where
363 I: Deref<Target = ReplicaInfo> + DerefMut,
364{
365 pub fn new(store: StoreInstance<'a>, info: I) -> Self {
367 Replica { info, store }
368 }
369
370 pub async fn insert(
378 &mut self,
379 key: impl AsRef<[u8]>,
380 author: &Author,
381 hash: Hash,
382 len: u64,
383 ) -> Result<usize, InsertError> {
384 if len == 0 || hash == Hash::EMPTY {
385 return Err(InsertError::EntryIsEmpty);
386 }
387 self.info.ensure_open()?;
388 let id = RecordIdentifier::new(self.id(), author.id(), key);
389 let record = Record::new_current(hash, len);
390 let entry = Entry::new(id, record);
391 let secret = self.secret_key()?;
392 let signed_entry = entry.sign(secret, author);
393 self.insert_entry(signed_entry, InsertOrigin::Local).await
394 }
395
396 pub async fn delete_prefix(
403 &mut self,
404 prefix: impl AsRef<[u8]>,
405 author: &Author,
406 ) -> Result<usize, InsertError> {
407 self.info.ensure_open()?;
408 let id = RecordIdentifier::new(self.id(), author.id(), prefix);
409 let entry = Entry::new_empty(id);
410 let signed_entry = entry.sign(self.secret_key()?, author);
411 self.insert_entry(signed_entry, InsertOrigin::Local).await
412 }
413
414 pub async fn insert_remote_entry(
422 &mut self,
423 entry: SignedEntry,
424 received_from: PeerIdBytes,
425 content_status: ContentStatus,
426 ) -> Result<usize, InsertError> {
427 self.info.ensure_open()?;
428 entry.validate_empty()?;
429 let origin = InsertOrigin::Sync {
430 from: received_from,
431 remote_content_status: content_status,
432 };
433 self.insert_entry(entry, origin).await
434 }
435
436 async fn insert_entry(
440 &mut self,
441 entry: SignedEntry,
442 origin: InsertOrigin,
443 ) -> Result<usize, InsertError> {
444 let namespace = self.id();
445
446 let store = &self.store;
447 validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
448
449 let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
450 tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
451
452 let removed_count = match outcome {
453 InsertOutcome::Inserted { removed } => removed,
454 InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
455 };
456
457 let insert_event = match origin {
458 InsertOrigin::Local => Event::LocalInsert { namespace, entry },
459 InsertOrigin::Sync {
460 from,
461 remote_content_status,
462 } => {
463 let download_policy = self
464 .store
465 .get_download_policy(&self.id())
466 .unwrap_or_default();
467 let should_download = download_policy.matches(entry.entry());
468 Event::RemoteInsert {
469 namespace,
470 entry,
471 from,
472 should_download,
473 remote_content_status,
474 }
475 }
476 };
477
478 self.info.subscribers.send(insert_event).await;
479
480 Ok(removed_count)
481 }
482
483 pub async fn hash_and_insert(
488 &mut self,
489 key: impl AsRef<[u8]>,
490 author: &Author,
491 data: impl AsRef<[u8]>,
492 ) -> Result<Hash, InsertError> {
493 self.info.ensure_open()?;
494 let len = data.as_ref().len() as u64;
495 let hash = Hash::new(data);
496 self.insert(key, author, hash, len).await?;
497 Ok(hash)
498 }
499
500 pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
502 RecordIdentifier::new(self.info.capability.id(), author.id(), key)
503 }
504
505 pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
507 self.info.ensure_open().map_err(anyhow::Error::from)?;
508 self.store.initial_message()
509 }
510
511 pub async fn sync_process_message(
515 &mut self,
516 message: crate::ranger::Message<SignedEntry>,
517 from_peer: PeerIdBytes,
518 state: &mut SyncOutcome,
519 ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
520 self.info.ensure_open()?;
521 let my_namespace = self.id();
522 let now = system_time_now();
523
524 state.num_recv += message.value_count();
526 for (entry, _content_status) in message.values() {
527 state
528 .heads_received
529 .insert(entry.author(), entry.timestamp());
530 }
531
532 let cb = self.info.content_status_cb.clone();
535 let download_policy = self
536 .store
537 .get_download_policy(&my_namespace)
538 .unwrap_or_default();
539 let reply = self
540 .store
541 .process_message(
542 &Default::default(),
543 message,
544 |store, entry, content_status| {
546 let origin = InsertOrigin::Sync {
547 from: from_peer,
548 remote_content_status: content_status,
549 };
550 validate_entry(now, store, my_namespace, entry, &origin).is_ok()
551 },
552 async |_store, entry, content_status| {
554 self.info
556 .subscribers
557 .send_with(|| {
558 let should_download = download_policy.matches(entry.entry());
559 Event::RemoteInsert {
560 from: from_peer,
561 namespace: my_namespace,
562 entry: entry.clone(),
563 should_download,
564 remote_content_status: content_status,
565 }
566 })
567 .await
568 },
569 async move |entry| {
571 if let Some(cb) = cb.as_ref() {
572 cb(entry.content_hash()).await
573 } else {
574 ContentStatus::Missing
575 }
576 },
577 )
578 .await?;
579
580 if let Some(ref reply) = reply {
582 state.num_sent += reply.value_count();
583 }
584
585 Ok(reply)
586 }
587
588 pub fn id(&self) -> NamespaceId {
590 self.info.capability.id()
591 }
592
593 pub fn capability(&self) -> &Capability {
595 &self.info.capability
596 }
597
598 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
601 self.info.capability.secret_key()
602 }
603}
604
605#[derive(Debug, thiserror::Error)]
607#[error("Replica allows read access only.")]
608pub struct ReadOnly;
609
610fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
618 now: u64,
619 store: &S,
620 expected_namespace: NamespaceId,
621 entry: &SignedEntry,
622 origin: &InsertOrigin,
623) -> Result<(), ValidationFailure> {
624 if entry.namespace() != expected_namespace {
626 return Err(ValidationFailure::InvalidNamespace);
627 }
628
629 if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
631 return Err(ValidationFailure::BadSignature);
632 }
633
634 if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
636 return Err(ValidationFailure::TooFarInTheFuture);
637 }
638 Ok(())
639}
640
641#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
643pub enum InsertError {
644 #[error("storage error")]
646 Store(anyhow::Error),
647 #[error("validation failure")]
649 Validation(#[from] ValidationFailure),
650 #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
652 NewerEntryExists,
653 #[error("Attempted to insert an empty entry")]
655 EntryIsEmpty,
656 #[error("Attempted to insert to read only replica")]
658 #[from(ReadOnly)]
659 ReadOnly,
660 #[error("replica is closed")]
662 Closed,
663}
664
665#[derive(thiserror::Error, Debug)]
667pub enum ValidationFailure {
668 #[error("Entry namespace does not match the current replica")]
670 InvalidNamespace,
671 #[error("Entry signature is invalid")]
673 BadSignature,
674 #[error("Entry timestamp is too far in the future.")]
676 TooFarInTheFuture,
677 #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
679 InvalidEmptyEntry,
680}
681
682#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
684pub struct SignedEntry {
685 signature: EntrySignature,
686 entry: Entry,
687}
688
689impl From<SignedEntry> for Entry {
690 fn from(value: SignedEntry) -> Self {
691 value.entry
692 }
693}
694
695impl PartialOrd for SignedEntry {
696 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
697 Some(self.cmp(other))
698 }
699}
700
701impl Ord for SignedEntry {
702 fn cmp(&self, other: &Self) -> Ordering {
703 self.entry.cmp(&other.entry)
704 }
705}
706
707impl PartialOrd for Entry {
708 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
709 Some(self.cmp(other))
710 }
711}
712
713impl Ord for Entry {
714 fn cmp(&self, other: &Self) -> Ordering {
715 self.id
716 .cmp(&other.id)
717 .then_with(|| self.record.cmp(&other.record))
718 }
719}
720
721impl SignedEntry {
722 pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
723 SignedEntry { signature, entry }
724 }
725
726 pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
728 let signature = EntrySignature::from_entry(&entry, namespace, author);
729 SignedEntry { signature, entry }
730 }
731
732 pub fn from_parts(
734 namespace: &NamespaceSecret,
735 author: &Author,
736 key: impl AsRef<[u8]>,
737 record: Record,
738 ) -> Self {
739 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
740 let entry = Entry::new(id, record);
741 Self::from_entry(entry, namespace, author)
742 }
743
744 pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
746 self.signature.verify(
747 &self.entry,
748 &self.entry.namespace().public_key(store)?,
749 &self.entry.author().public_key(store)?,
750 )
751 }
752
753 pub fn signature(&self) -> &EntrySignature {
755 &self.signature
756 }
757
758 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
760 self.entry().validate_empty()
761 }
762
763 pub fn entry(&self) -> &Entry {
765 &self.entry
766 }
767
768 pub fn content_hash(&self) -> Hash {
770 self.entry().content_hash()
771 }
772
773 pub fn content_len(&self) -> u64 {
775 self.entry().content_len()
776 }
777
778 pub fn author_bytes(&self) -> AuthorId {
780 self.entry().id().author()
781 }
782
783 pub fn key(&self) -> &[u8] {
785 self.entry().id().key()
786 }
787
788 pub fn timestamp(&self) -> u64 {
790 self.entry().timestamp()
791 }
792}
793
794impl RangeEntry for SignedEntry {
795 type Key = RecordIdentifier;
796 type Value = Record;
797
798 fn key(&self) -> &Self::Key {
799 &self.entry.id
800 }
801
802 fn value(&self) -> &Self::Value {
803 &self.entry.record
804 }
805
806 fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
807 let mut hasher = blake3::Hasher::new();
808 hasher.update(self.namespace().as_ref());
809 hasher.update(self.author_bytes().as_ref());
810 hasher.update(self.key());
811 hasher.update(&self.timestamp().to_be_bytes());
812 hasher.update(self.content_hash().as_bytes());
813 Fingerprint(hasher.finalize().into())
814 }
815}
816
817#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
819pub struct EntrySignature {
820 author_signature: Signature,
821 namespace_signature: Signature,
822}
823
824impl Debug for EntrySignature {
825 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826 f.debug_struct("EntrySignature")
827 .field(
828 "namespace_signature",
829 &hex::encode(self.namespace_signature.to_bytes()),
830 )
831 .field(
832 "author_signature",
833 &hex::encode(self.author_signature.to_bytes()),
834 )
835 .finish()
836 }
837}
838
839impl EntrySignature {
840 pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
842 let bytes = entry.to_vec();
845 let namespace_signature = namespace.sign(&bytes);
846 let author_signature = author.sign(&bytes);
847
848 EntrySignature {
849 author_signature,
850 namespace_signature,
851 }
852 }
853
854 pub fn verify(
857 &self,
858 entry: &Entry,
859 namespace: &NamespacePublicKey,
860 author: &AuthorPublicKey,
861 ) -> Result<(), SignatureError> {
862 let bytes = entry.to_vec();
863 namespace.verify(&bytes, &self.namespace_signature)?;
864 author.verify(&bytes, &self.author_signature)?;
865
866 Ok(())
867 }
868
869 pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
870 let namespace_signature = Signature::from_bytes(namespace_sig);
871 let author_signature = Signature::from_bytes(author_sig);
872
873 EntrySignature {
874 author_signature,
875 namespace_signature,
876 }
877 }
878
879 pub(crate) fn author(&self) -> &Signature {
880 &self.author_signature
881 }
882
883 pub(crate) fn namespace(&self) -> &Signature {
884 &self.namespace_signature
885 }
886}
887
888#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
894pub struct Entry {
895 id: RecordIdentifier,
896 record: Record,
897}
898
899impl Entry {
900 pub fn new(id: RecordIdentifier, record: Record) -> Self {
902 Entry { id, record }
903 }
904
905 pub fn new_empty(id: RecordIdentifier) -> Self {
907 Entry {
908 id,
909 record: Record::empty_current(),
910 }
911 }
912
913 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
915 match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
916 (true, true) => Ok(()),
917 (false, false) => Ok(()),
918 (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
919 (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
920 }
921 }
922
923 pub fn id(&self) -> &RecordIdentifier {
925 &self.id
926 }
927
928 pub fn namespace(&self) -> NamespaceId {
930 self.id.namespace()
931 }
932
933 pub fn author(&self) -> AuthorId {
935 self.id.author()
936 }
937
938 pub fn key(&self) -> &[u8] {
940 self.id.key()
941 }
942
943 pub fn record(&self) -> &Record {
945 &self.record
946 }
947
948 pub fn content_hash(&self) -> Hash {
950 self.record.hash
951 }
952
953 pub fn content_len(&self) -> u64 {
955 self.record.len
956 }
957
958 pub fn timestamp(&self) -> u64 {
960 self.record.timestamp
961 }
962
963 pub fn encode(&self, out: &mut Vec<u8>) {
965 self.id.encode(out);
966 self.record.encode(out);
967 }
968
969 pub fn to_vec(&self) -> Vec<u8> {
971 let mut out = Vec::new();
972 self.encode(&mut out);
973 out
974 }
975
976 pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
978 SignedEntry::from_entry(self, namespace, author)
979 }
980}
981
982const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
983const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
984const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
985
986#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
988pub struct RecordIdentifier(Bytes);
989
990impl Default for RecordIdentifier {
991 fn default() -> Self {
992 Self::new(NamespaceId::default(), AuthorId::default(), b"")
993 }
994}
995
996impl Debug for RecordIdentifier {
997 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
998 f.debug_struct("RecordIdentifier")
999 .field("namespace", &self.namespace())
1000 .field("author", &self.author())
1001 .field("key", &std::string::String::from_utf8_lossy(self.key()))
1002 .finish()
1003 }
1004}
1005
1006impl RangeKey for RecordIdentifier {
1007 #[cfg(test)]
1008 fn is_prefix_of(&self, other: &Self) -> bool {
1009 other.as_ref().starts_with(self.as_ref())
1010 }
1011}
1012
1013fn system_time_now() -> u64 {
1014 SystemTime::now()
1015 .duration_since(SystemTime::UNIX_EPOCH)
1016 .expect("time drift")
1017 .as_micros() as u64
1018}
1019
1020impl RecordIdentifier {
1021 pub fn new(
1023 namespace: impl Into<NamespaceId>,
1024 author: impl Into<AuthorId>,
1025 key: impl AsRef<[u8]>,
1026 ) -> Self {
1027 let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1028 bytes.extend_from_slice(namespace.into().as_bytes());
1029 bytes.extend_from_slice(author.into().as_bytes());
1030 bytes.extend_from_slice(key.as_ref());
1031 Self(bytes.freeze())
1032 }
1033
1034 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1036 out.extend_from_slice(&self.0);
1037 }
1038
1039 pub fn as_bytes(&self) -> Bytes {
1041 self.0.clone()
1042 }
1043
1044 pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1046 (
1047 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1048 self.0[AUTHOR_BYTES].try_into().unwrap(),
1049 &self.0[KEY_BYTES],
1050 )
1051 }
1052
1053 pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1055 (
1056 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1057 self.0[AUTHOR_BYTES].try_into().unwrap(),
1058 self.0.slice(KEY_BYTES),
1059 )
1060 }
1061
1062 pub fn key(&self) -> &[u8] {
1064 &self.0[KEY_BYTES]
1065 }
1066
1067 pub fn key_bytes(&self) -> Bytes {
1069 self.0.slice(KEY_BYTES)
1070 }
1071
1072 pub fn namespace(&self) -> NamespaceId {
1074 let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1075 value.into()
1076 }
1077
1078 pub fn author(&self) -> AuthorId {
1080 let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1081 value.into()
1082 }
1083}
1084
1085impl AsRef<[u8]> for RecordIdentifier {
1086 fn as_ref(&self) -> &[u8] {
1087 &self.0
1088 }
1089}
1090
1091impl Deref for SignedEntry {
1092 type Target = Entry;
1093 fn deref(&self) -> &Self::Target {
1094 &self.entry
1095 }
1096}
1097
1098impl Deref for Entry {
1099 type Target = Record;
1100 fn deref(&self) -> &Self::Target {
1101 &self.record
1102 }
1103}
1104
1105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1107pub struct Record {
1108 len: u64,
1110 hash: Hash,
1112 timestamp: u64,
1114}
1115
1116impl RangeValue for Record {}
1117
1118impl Ord for Record {
1122 fn cmp(&self, other: &Self) -> Ordering {
1123 self.timestamp
1124 .cmp(&other.timestamp)
1125 .then_with(|| self.hash.cmp(&other.hash))
1126 }
1127}
1128
1129impl PartialOrd for Record {
1130 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1131 Some(self.cmp(other))
1132 }
1133}
1134
1135impl Record {
1136 pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1138 debug_assert!(
1139 len != 0 || hash == Hash::EMPTY,
1140 "if `len` is 0 then `hash` must be the hash of the empty byte range"
1141 );
1142 Record {
1143 hash,
1144 len,
1145 timestamp,
1146 }
1147 }
1148
1149 pub fn empty(timestamp: u64) -> Self {
1151 Self::new(Hash::EMPTY, 0, timestamp)
1152 }
1153
1154 pub fn empty_current() -> Self {
1156 Self::new_current(Hash::EMPTY, 0)
1157 }
1158
1159 pub fn is_empty(&self) -> bool {
1161 self.hash == Hash::EMPTY
1162 }
1163
1164 pub fn new_current(hash: Hash, len: u64) -> Self {
1166 let timestamp = system_time_now();
1167 Self::new(hash, len, timestamp)
1168 }
1169
1170 pub fn content_len(&self) -> u64 {
1172 self.len
1173 }
1174
1175 pub fn content_hash(&self) -> Hash {
1177 self.hash
1178 }
1179
1180 pub fn timestamp(&self) -> u64 {
1182 self.timestamp
1183 }
1184
1185 #[cfg(test)]
1186 pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1187 let len = data.as_ref().len() as u64;
1188 let hash = Hash::new(data);
1189 Self::new_current(hash, len)
1190 }
1191
1192 #[cfg(test)]
1193 pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1194 let len = data.as_ref().len() as u64;
1195 let hash = Hash::new(data);
1196 Self::new(hash, len, timestamp)
1197 }
1198
1199 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1201 out.extend_from_slice(&self.len.to_be_bytes());
1202 out.extend_from_slice(self.hash.as_ref());
1203 out.extend_from_slice(&self.timestamp.to_be_bytes())
1204 }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209 use std::collections::HashSet;
1210
1211 use anyhow::Result;
1212 use rand::SeedableRng;
1213
1214 use super::*;
1215 use crate::{
1216 actor::SyncHandle,
1217 ranger::{Range, Store as _},
1218 store::{OpenError, Query, SortBy, SortDirection, Store},
1219 };
1220
1221 #[tokio::test]
1222 async fn test_basics_memory() -> Result<()> {
1223 let store = store::Store::memory();
1224 test_basics(store).await?;
1225
1226 Ok(())
1227 }
1228
1229 #[tokio::test]
1230 #[cfg(feature = "fs-store")]
1231 async fn test_basics_fs() -> Result<()> {
1232 let dbfile = tempfile::NamedTempFile::new()?;
1233 let store = store::fs::Store::persistent(dbfile.path())?;
1234 test_basics(store).await?;
1235 Ok(())
1236 }
1237
1238 async fn test_basics(mut store: Store) -> Result<()> {
1239 let mut rng = rand::rng();
1240 let alice = Author::new(&mut rng);
1241 let bob = Author::new(&mut rng);
1242 let myspace = NamespaceSecret::new(&mut rng);
1243
1244 let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1245 let record = Record::current_from_data(b"this is my cool data");
1246 let entry = Entry::new(record_id, record);
1247 let signed_entry = entry.sign(&myspace, &alice);
1248 signed_entry.verify(&()).expect("failed to verify");
1249
1250 let mut my_replica = store.new_replica(myspace.clone())?;
1251 for i in 0..10 {
1252 my_replica
1253 .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice"))
1254 .await?;
1255 }
1256
1257 for i in 0..10 {
1258 let res = store
1259 .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1260 .unwrap();
1261 let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1262 assert_eq!(res.entry().record().content_len(), len);
1263 res.verify(&())?;
1264 }
1265
1266 let mut my_replica = store.new_replica(myspace.clone())?;
1268 my_replica
1269 .hash_and_insert("/cool/path", &alice, "round 1")
1270 .await?;
1271 let _entry = store
1272 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1273 .unwrap();
1274 let mut my_replica = store.new_replica(myspace.clone())?;
1276 my_replica
1277 .hash_and_insert("/cool/path", &alice, "round 2")
1278 .await?;
1279 let _entry = store
1280 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1281 .unwrap();
1282
1283 let entries: Vec<_> = store
1285 .get_many(myspace.id(), Query::author(alice.id()))?
1286 .collect::<Result<_>>()?;
1287 assert_eq!(entries.len(), 11);
1288
1289 let entries: Vec<_> = store
1291 .get_many(myspace.id(), Query::author(bob.id()))?
1292 .collect::<Result<_>>()?;
1293 assert_eq!(entries.len(), 0);
1294
1295 let entries: Vec<_> = store
1297 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1298 .collect::<Result<_>>()?;
1299 assert_eq!(entries.len(), 1);
1300
1301 let entries: Vec<_> = store
1303 .get_many(myspace.id(), Query::all())?
1304 .collect::<Result<_>>()?;
1305 assert_eq!(entries.len(), 11);
1306
1307 let mut my_replica = store.new_replica(myspace.clone())?;
1309 let _entry = my_replica
1310 .hash_and_insert("/cool/path", &bob, "bob round 1")
1311 .await?;
1312
1313 let entries: Vec<_> = store
1315 .get_many(myspace.id(), Query::author(alice.id()))?
1316 .collect::<Result<_>>()?;
1317 assert_eq!(entries.len(), 11);
1318
1319 let entries: Vec<_> = store
1320 .get_many(myspace.id(), Query::author(bob.id()))?
1321 .collect::<Result<_>>()?;
1322 assert_eq!(entries.len(), 1);
1323
1324 let entries: Vec<_> = store
1326 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1327 .collect::<Result<_>>()?;
1328 assert_eq!(entries.len(), 2);
1329
1330 let entries: Vec<_> = store
1332 .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1333 .collect::<Result<_>>()?;
1334 assert_eq!(entries.len(), 2);
1335
1336 let entries: Vec<_> = store
1338 .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1339 .collect::<Result<_>>()?;
1340 assert_eq!(entries.len(), 1);
1341
1342 let entries: Vec<_> = store
1343 .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1344 .collect::<Result<_>>()?;
1345 assert_eq!(entries.len(), 1);
1346
1347 let entries: Vec<_> = store
1349 .get_many(myspace.id(), Query::all())?
1350 .collect::<Result<_>>()?;
1351 assert_eq!(entries.len(), 12);
1352
1353 let mut my_replica = store.new_replica(myspace.clone())?;
1355 let entries_second: Vec<_> = my_replica
1356 .store
1357 .get_range(Range::new(
1358 RecordIdentifier::default(),
1359 RecordIdentifier::default(),
1360 ))?
1361 .collect::<Result<_, _>>()?;
1362
1363 assert_eq!(entries_second.len(), 12);
1364 assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1365
1366 test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1367 store.flush()?;
1368 Ok(())
1369 }
1370
1371 fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1374 #[track_caller]
1376 fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1377 assert_eq!(
1378 expected_peers,
1379 &store
1380 .get_sync_peers(&namespace)
1381 .unwrap()
1382 .unwrap()
1383 .collect::<Vec<_>>(),
1384 "sync peers differ"
1385 );
1386 }
1387
1388 let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1389 let mut expected_peers = Vec::with_capacity(count);
1391 for i in 0..count as u8 {
1392 let peer = [i; 32];
1393 expected_peers.insert(0, peer);
1394 store.register_useful_peer(namespace, peer)?;
1395 }
1396 verify_peers(store, namespace, &expected_peers);
1397
1398 expected_peers.pop();
1400 let newer_peer = [count as u8; 32];
1401 expected_peers.insert(0, newer_peer);
1402 store.register_useful_peer(namespace, newer_peer)?;
1403 verify_peers(store, namespace, &expected_peers);
1404
1405 let refreshed_peer = expected_peers.remove(2);
1407 expected_peers.insert(0, refreshed_peer);
1408 store.register_useful_peer(namespace, refreshed_peer)?;
1409 verify_peers(store, namespace, &expected_peers);
1410 Ok(())
1411 }
1412
1413 #[tokio::test]
1414 async fn test_content_hashes_iterator_memory() -> Result<()> {
1415 let store = store::Store::memory();
1416 test_content_hashes_iterator(store).await
1417 }
1418
1419 #[tokio::test]
1420 #[cfg(feature = "fs-store")]
1421 async fn test_content_hashes_iterator_fs() -> Result<()> {
1422 let dbfile = tempfile::NamedTempFile::new()?;
1423 let store = store::fs::Store::persistent(dbfile.path())?;
1424 test_content_hashes_iterator(store).await
1425 }
1426
1427 async fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1428 let mut rng = rand::rng();
1429 let mut expected = HashSet::new();
1430 let n_replicas = 3;
1431 let n_entries = 4;
1432 for i in 0..n_replicas {
1433 let namespace = NamespaceSecret::new(&mut rng);
1434 let author = store.new_author(&mut rng)?;
1435 let mut replica = store.new_replica(namespace)?;
1436 for j in 0..n_entries {
1437 let key = format!("{j}");
1438 let data = format!("{i}:{j}");
1439 let hash = replica.hash_and_insert(key, &author, data).await?;
1440 expected.insert(hash);
1441 }
1442 }
1443 assert_eq!(expected.len(), n_replicas * n_entries);
1444 let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1445 assert_eq!(actual, expected);
1446 Ok(())
1447 }
1448
1449 #[test]
1450 fn test_multikey() {
1451 let mut rng = rand::rng();
1452
1453 let k = ["a", "c", "z"];
1454
1455 let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1456 n.sort_by_key(|n| n.id());
1457
1458 let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1459 a.sort_by_key(|a| a.id());
1460
1461 {
1463 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1464 let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1465 let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1466
1467 let range = Range::new(ri0.clone(), ri2.clone());
1468 assert!(range.contains(&ri0), "start");
1469 assert!(range.contains(&ri1), "inside");
1470 assert!(!range.contains(&ri2), "end");
1471
1472 assert!(ri0 < ri1);
1473 assert!(ri1 < ri2);
1474 }
1475
1476 {
1478 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1479 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1480 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1481
1482 let range = Range::new(ri0.clone(), ri2.clone());
1483 assert!(range.contains(&ri0), "start");
1484 assert!(range.contains(&ri1), "inside");
1485 assert!(!range.contains(&ri2), "end");
1486
1487 assert!(ri0 < ri1);
1488 assert!(ri1 < ri2);
1489 }
1490
1491 {
1493 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1494 let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1495 let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1496
1497 let range = Range::new(ri0.clone(), ri2.clone());
1498 assert!(range.contains(&ri0), "start");
1499 assert!(range.contains(&ri1), "inside");
1500 assert!(!range.contains(&ri2), "end");
1501
1502 assert!(ri0 < ri1);
1503 assert!(ri1 < ri2);
1504 }
1505
1506 {
1508 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1509 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1510 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1511
1512 let range = Range::new(ri0.clone(), ri2.clone());
1513 assert!(range.contains(&ri0), "start");
1514 assert!(range.contains(&ri1), "inside");
1515 assert!(!range.contains(&ri2), "end");
1516
1517 assert!(ri0 < ri1);
1518 assert!(ri1 < ri2);
1519 }
1520
1521 {
1523 let a0 = a[0].id();
1526 let a1 = a[1].id();
1527 let n0 = n[0].id();
1528 let n1 = n[1].id();
1529 let k0 = k[0];
1530 let k1 = k[1];
1531
1532 assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1533 assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1534 assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1535 assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1536 }
1537 }
1538
1539 #[tokio::test]
1540 async fn test_timestamps_memory() -> Result<()> {
1541 let store = store::Store::memory();
1542 test_timestamps(store).await?;
1543
1544 Ok(())
1545 }
1546
1547 #[tokio::test]
1548 #[cfg(feature = "fs-store")]
1549 async fn test_timestamps_fs() -> Result<()> {
1550 let dbfile = tempfile::NamedTempFile::new()?;
1551 let store = store::fs::Store::persistent(dbfile.path())?;
1552 test_timestamps(store).await?;
1553 Ok(())
1554 }
1555
1556 async fn test_timestamps(mut store: Store) -> Result<()> {
1557 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
1558 let namespace = NamespaceSecret::new(&mut rng);
1559 let _replica = store.new_replica(namespace.clone())?;
1560 let author = store.new_author(&mut rng)?;
1561 store.close_replica(namespace.id());
1562 let mut replica = store.open_replica(&namespace.id())?;
1563
1564 let key = b"hello";
1565 let value = b"world";
1566 let entry = {
1567 let timestamp = 2;
1568 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1569 let record = Record::from_data(value, timestamp);
1570 Entry::new(id, record).sign(&namespace, &author)
1571 };
1572
1573 replica
1574 .insert_entry(entry.clone(), InsertOrigin::Local)
1575 .await
1576 .unwrap();
1577 store.close_replica(namespace.id());
1578 let res = store
1579 .get_exact(namespace.id(), author.id(), key, false)?
1580 .unwrap();
1581 assert_eq!(res, entry);
1582
1583 let entry2 = {
1584 let timestamp = 1;
1585 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1586 let record = Record::from_data(value, timestamp);
1587 Entry::new(id, record).sign(&namespace, &author)
1588 };
1589
1590 let mut replica = store.open_replica(&namespace.id())?;
1591 let res = replica.insert_entry(entry2, InsertOrigin::Local).await;
1592 store.close_replica(namespace.id());
1593 assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1594 let res = store
1595 .get_exact(namespace.id(), author.id(), key, false)?
1596 .unwrap();
1597 assert_eq!(res, entry);
1598 store.flush()?;
1599 Ok(())
1600 }
1601
1602 #[tokio::test]
1603 async fn test_replica_sync_memory() -> Result<()> {
1604 let alice_store = store::Store::memory();
1605 let bob_store = store::Store::memory();
1606
1607 test_replica_sync(alice_store, bob_store).await?;
1608 Ok(())
1609 }
1610
1611 #[tokio::test]
1612 #[cfg(feature = "fs-store")]
1613 async fn test_replica_sync_fs() -> Result<()> {
1614 let alice_dbfile = tempfile::NamedTempFile::new()?;
1615 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1616 let bob_dbfile = tempfile::NamedTempFile::new()?;
1617 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1618 test_replica_sync(alice_store, bob_store).await?;
1619
1620 Ok(())
1621 }
1622
1623 async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1624 let alice_set = ["ape", "eel", "fox", "gnu"];
1625 let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1626
1627 let mut rng = rand::rng();
1628 let author = Author::new(&mut rng);
1629 let myspace = NamespaceSecret::new(&mut rng);
1630 let mut alice = alice_store.new_replica(myspace.clone())?;
1631 for el in &alice_set {
1632 alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1633 }
1634
1635 let mut bob = bob_store.new_replica(myspace.clone())?;
1636 for el in &bob_set {
1637 bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1638 }
1639
1640 let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1641
1642 assert_eq!(alice_out.num_sent, 2);
1643 assert_eq!(bob_out.num_recv, 2);
1644 assert_eq!(alice_out.num_recv, 6);
1645 assert_eq!(bob_out.num_sent, 6);
1646
1647 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1648 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1649 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1650 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1651 alice_store.flush()?;
1652 bob_store.flush()?;
1653 Ok(())
1654 }
1655
1656 #[tokio::test]
1657 async fn test_replica_timestamp_sync_memory() -> Result<()> {
1658 let alice_store = store::Store::memory();
1659 let bob_store = store::Store::memory();
1660
1661 test_replica_timestamp_sync(alice_store, bob_store).await?;
1662 Ok(())
1663 }
1664
1665 #[tokio::test]
1666 #[cfg(feature = "fs-store")]
1667 async fn test_replica_timestamp_sync_fs() -> Result<()> {
1668 let alice_dbfile = tempfile::NamedTempFile::new()?;
1669 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1670 let bob_dbfile = tempfile::NamedTempFile::new()?;
1671 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1672 test_replica_timestamp_sync(alice_store, bob_store).await?;
1673
1674 Ok(())
1675 }
1676
1677 async fn test_replica_timestamp_sync(
1678 mut alice_store: Store,
1679 mut bob_store: Store,
1680 ) -> Result<()> {
1681 let mut rng = rand::rng();
1682 let author = Author::new(&mut rng);
1683 let namespace = NamespaceSecret::new(&mut rng);
1684 let mut alice = alice_store.new_replica(namespace.clone())?;
1685 let mut bob = bob_store.new_replica(namespace.clone())?;
1686
1687 let key = b"key";
1688 let alice_value = b"alice";
1689 let bob_value = b"bob";
1690 let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?;
1691 let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?;
1693 sync(&mut alice, &mut bob).await?;
1694 assert_eq!(
1695 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1696 Some(bob_hash)
1697 );
1698 assert_eq!(
1699 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1700 Some(bob_hash)
1701 );
1702
1703 let mut alice = alice_store.new_replica(namespace.clone())?;
1704 let mut bob = bob_store.new_replica(namespace.clone())?;
1705
1706 let alice_value_2 = b"alice2";
1707 let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?;
1709 let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?;
1710 sync(&mut alice, &mut bob).await?;
1711 assert_eq!(
1712 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1713 Some(alice_hash_2)
1714 );
1715 assert_eq!(
1716 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1717 Some(alice_hash_2)
1718 );
1719 alice_store.flush()?;
1720 bob_store.flush()?;
1721 Ok(())
1722 }
1723
1724 #[tokio::test]
1725 async fn test_future_timestamp() -> Result<()> {
1726 let mut rng = rand::rng();
1727 let mut store = store::Store::memory();
1728 let author = Author::new(&mut rng);
1729 let namespace = NamespaceSecret::new(&mut rng);
1730
1731 let mut replica = store.new_replica(namespace.clone())?;
1732 let key = b"hi";
1733 let t = system_time_now();
1734 let record = Record::from_data(b"1", t);
1735 let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1736 replica
1737 .insert_entry(entry0.clone(), InsertOrigin::Local)
1738 .await?;
1739
1740 assert_eq!(
1741 get_entry(&mut store, namespace.id(), author.id(), key)?,
1742 entry0
1743 );
1744
1745 let mut replica = store.new_replica(namespace.clone())?;
1746 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1747 let record = Record::from_data(b"2", t);
1748 let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1749 replica
1750 .insert_entry(entry1.clone(), InsertOrigin::Local)
1751 .await?;
1752 assert_eq!(
1753 get_entry(&mut store, namespace.id(), author.id(), key)?,
1754 entry1
1755 );
1756
1757 let mut replica = store.new_replica(namespace.clone())?;
1758 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1759 let record = Record::from_data(b"2", t);
1760 let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1761 replica
1762 .insert_entry(entry2.clone(), InsertOrigin::Local)
1763 .await?;
1764 assert_eq!(
1765 get_entry(&mut store, namespace.id(), author.id(), key)?,
1766 entry2
1767 );
1768
1769 let mut replica = store.new_replica(namespace.clone())?;
1770 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1771 let record = Record::from_data(b"2", t);
1772 let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1773 let res = replica.insert_entry(entry3, InsertOrigin::Local).await;
1774 assert!(matches!(
1775 res,
1776 Err(InsertError::Validation(
1777 ValidationFailure::TooFarInTheFuture
1778 ))
1779 ));
1780 assert_eq!(
1781 get_entry(&mut store, namespace.id(), author.id(), key)?,
1782 entry2
1783 );
1784 store.flush()?;
1785 Ok(())
1786 }
1787
1788 #[tokio::test]
1792 async fn test_future_timestamp_accepts_one_second_skew() -> Result<()> {
1793 let mut rng = rand::rng();
1794 let mut store = store::Store::memory();
1795 let author = Author::new(&mut rng);
1796 let namespace = NamespaceSecret::new(&mut rng);
1797 let mut replica = store.new_replica(namespace.clone())?;
1798 let key = b"skew";
1799 let skew_micros: u64 = Duration::from_secs(1).as_micros() as u64;
1800 let now = system_time_now();
1801 let record = Record::from_data(b"ahead", now + skew_micros);
1802 let entry = SignedEntry::from_parts(&namespace, &author, key, record);
1803 replica
1804 .insert_entry(entry.clone(), InsertOrigin::Local)
1805 .await?;
1806 assert_eq!(
1807 get_entry(&mut store, namespace.id(), author.id(), key)?,
1808 entry
1809 );
1810 store.flush()?;
1811 Ok(())
1812 }
1813
1814 #[tokio::test]
1815 async fn test_insert_empty() -> Result<()> {
1816 let mut store = store::Store::memory();
1817 let mut rng = rand::rng();
1818 let alice = Author::new(&mut rng);
1819 let myspace = NamespaceSecret::new(&mut rng);
1820 let mut replica = store.new_replica(myspace.clone())?;
1821 let hash = Hash::new(b"");
1822 let res = replica.insert(b"foo", &alice, hash, 0).await;
1823 assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1824 store.flush()?;
1825 Ok(())
1826 }
1827
1828 #[tokio::test]
1829 async fn test_prefix_delete_memory() -> Result<()> {
1830 let store = store::Store::memory();
1831 test_prefix_delete(store).await?;
1832 Ok(())
1833 }
1834
1835 #[tokio::test]
1836 #[cfg(feature = "fs-store")]
1837 async fn test_prefix_delete_fs() -> Result<()> {
1838 let dbfile = tempfile::NamedTempFile::new()?;
1839 let store = store::fs::Store::persistent(dbfile.path())?;
1840 test_prefix_delete(store).await?;
1841 Ok(())
1842 }
1843
1844 async fn test_prefix_delete(mut store: Store) -> Result<()> {
1845 let mut rng = rand::rng();
1846 let alice = Author::new(&mut rng);
1847 let myspace = NamespaceSecret::new(&mut rng);
1848 let mut replica = store.new_replica(myspace.clone())?;
1849 let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?;
1850 let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?;
1851
1852 assert_eq!(
1854 get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1855 Some(hash1)
1856 );
1857 assert_eq!(
1858 get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1859 Some(hash2)
1860 );
1861
1862 let mut replica = store.new_replica(myspace.clone())?;
1864 let deleted = replica.delete_prefix(b"foo", &alice).await?;
1865 assert_eq!(deleted, 2);
1866 assert_eq!(
1867 store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1868 None
1869 );
1870 assert_eq!(
1871 store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1872 None
1873 );
1874 assert_eq!(
1875 store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1876 None
1877 );
1878 store.flush()?;
1879 Ok(())
1880 }
1881
1882 #[tokio::test]
1883 async fn test_replica_sync_delete_memory() -> Result<()> {
1884 let alice_store = store::Store::memory();
1885 let bob_store = store::Store::memory();
1886
1887 test_replica_sync_delete(alice_store, bob_store).await
1888 }
1889
1890 #[tokio::test]
1891 #[cfg(feature = "fs-store")]
1892 async fn test_replica_sync_delete_fs() -> Result<()> {
1893 let alice_dbfile = tempfile::NamedTempFile::new()?;
1894 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1895 let bob_dbfile = tempfile::NamedTempFile::new()?;
1896 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1897 test_replica_sync_delete(alice_store, bob_store).await
1898 }
1899
1900 async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1901 let alice_set = ["foot"];
1902 let bob_set = ["fool", "foo", "fog"];
1903
1904 let mut rng = rand::rng();
1905 let author = Author::new(&mut rng);
1906 let myspace = NamespaceSecret::new(&mut rng);
1907 let mut alice = alice_store.new_replica(myspace.clone())?;
1908 for el in &alice_set {
1909 alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1910 }
1911
1912 let mut bob = bob_store.new_replica(myspace.clone())?;
1913 for el in &bob_set {
1914 bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1915 }
1916
1917 sync(&mut alice, &mut bob).await?;
1918
1919 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1920 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1921 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1922 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1923
1924 let mut alice = alice_store.new_replica(myspace.clone())?;
1925 let mut bob = bob_store.new_replica(myspace.clone())?;
1926 alice.delete_prefix("foo", &author).await?;
1927 bob.hash_and_insert("fooz", &author, "fooz".as_bytes())
1928 .await?;
1929 sync(&mut alice, &mut bob).await?;
1930 check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1931 check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1932 alice_store.flush()?;
1933 bob_store.flush()?;
1934 Ok(())
1935 }
1936
1937 #[tokio::test]
1938 async fn test_replica_remove_memory() -> Result<()> {
1939 let alice_store = store::Store::memory();
1940 test_replica_remove(alice_store).await
1941 }
1942
1943 #[tokio::test]
1944 #[cfg(feature = "fs-store")]
1945 async fn test_replica_remove_fs() -> Result<()> {
1946 let alice_dbfile = tempfile::NamedTempFile::new()?;
1947 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1948 test_replica_remove(alice_store).await
1949 }
1950
1951 async fn test_replica_remove(mut store: Store) -> Result<()> {
1952 let mut rng = rand::rng();
1953 let namespace = NamespaceSecret::new(&mut rng);
1954 let author = Author::new(&mut rng);
1955 let mut replica = store.new_replica(namespace.clone())?;
1956
1957 let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?;
1959 let res = store
1960 .get_many(namespace.id(), Query::all())?
1961 .collect::<Vec<_>>();
1962 assert_eq!(res.len(), 1);
1963
1964 let res = store.remove_replica(&namespace.id());
1966 assert!(res.is_err());
1968 store.close_replica(namespace.id());
1969 store.remove_replica(&namespace.id())?;
1970 let res = store
1971 .get_many(namespace.id(), Query::all())?
1972 .collect::<Vec<_>>();
1973 assert_eq!(res.len(), 0);
1974
1975 let res = store.load_replica_info(&namespace.id());
1977 assert!(matches!(res, Err(OpenError::NotFound)));
1978
1979 let mut replica = store.new_replica(namespace.clone())?;
1981 replica.insert(b"foo", &author, hash, 3).await?;
1982 let res = store
1983 .get_many(namespace.id(), Query::all())?
1984 .collect::<Vec<_>>();
1985 assert_eq!(res.len(), 1);
1986 store.flush()?;
1987 Ok(())
1988 }
1989
1990 #[tokio::test]
1991 async fn test_replica_delete_edge_cases_memory() -> Result<()> {
1992 let store = store::Store::memory();
1993 test_replica_delete_edge_cases(store).await
1994 }
1995
1996 #[tokio::test]
1997 #[cfg(feature = "fs-store")]
1998 async fn test_replica_delete_edge_cases_fs() -> Result<()> {
1999 let dbfile = tempfile::NamedTempFile::new()?;
2000 let store = store::fs::Store::persistent(dbfile.path())?;
2001 test_replica_delete_edge_cases(store).await
2002 }
2003
2004 async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
2005 let mut rng = rand::rng();
2006 let author = Author::new(&mut rng);
2007 let namespace = NamespaceSecret::new(&mut rng);
2008
2009 let edgecases = [0u8, 1u8, 255u8];
2010 let prefixes = [0u8, 255u8];
2011 let hash = Hash::new(b"foo");
2012 let len = 3;
2013 for prefix in prefixes {
2014 let mut expected = vec![];
2015 let mut replica = store.new_replica(namespace.clone())?;
2016 for suffix in edgecases {
2017 let key = [prefix, suffix].to_vec();
2018 expected.push(key.clone());
2019 replica.insert(&key, &author, hash, len).await?;
2020 }
2021 assert_keys(&mut store, namespace.id(), expected);
2022 let mut replica = store.new_replica(namespace.clone())?;
2023 replica.delete_prefix([prefix], &author).await?;
2024 assert_keys(&mut store, namespace.id(), vec![]);
2025 }
2026
2027 let mut replica = store.new_replica(namespace.clone())?;
2028 let key = vec![1u8, 0u8];
2029 replica.insert(key, &author, hash, len).await?;
2030 let key = vec![1u8, 1u8];
2031 replica.insert(key, &author, hash, len).await?;
2032 let key = vec![1u8, 2u8];
2033 replica.insert(key, &author, hash, len).await?;
2034 let prefix = vec![1u8, 1u8];
2035 replica.delete_prefix(prefix, &author).await?;
2036 assert_keys(
2037 &mut store,
2038 namespace.id(),
2039 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2040 );
2041
2042 let mut replica = store.new_replica(namespace.clone())?;
2043 let key = vec![0u8, 255u8];
2044 replica.insert(key, &author, hash, len).await?;
2045 let key = vec![0u8, 0u8];
2046 replica.insert(key, &author, hash, len).await?;
2047 let prefix = vec![0u8];
2048 replica.delete_prefix(prefix, &author).await?;
2049 assert_keys(
2050 &mut store,
2051 namespace.id(),
2052 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2053 );
2054 store.flush()?;
2055 Ok(())
2056 }
2057
2058 #[tokio::test]
2059 async fn test_latest_iter_memory() -> Result<()> {
2060 let store = store::Store::memory();
2061 test_latest_iter(store).await
2062 }
2063
2064 #[tokio::test]
2065 #[cfg(feature = "fs-store")]
2066 async fn test_latest_iter_fs() -> Result<()> {
2067 let dbfile = tempfile::NamedTempFile::new()?;
2068 let store = store::fs::Store::persistent(dbfile.path())?;
2069 test_latest_iter(store).await
2070 }
2071
2072 async fn test_latest_iter(mut store: Store) -> Result<()> {
2073 let mut rng = rand::rng();
2074 let author0 = Author::new(&mut rng);
2075 let author1 = Author::new(&mut rng);
2076 let namespace = NamespaceSecret::new(&mut rng);
2077 let mut replica = store.new_replica(namespace.clone())?;
2078
2079 replica.hash_and_insert(b"a0.1", &author0, b"hi").await?;
2080 let latest = store
2081 .get_latest_for_each_author(namespace.id())?
2082 .collect::<Result<Vec<_>>>()?;
2083 assert_eq!(latest.len(), 1);
2084 assert_eq!(latest[0].2, b"a0.1".to_vec());
2085
2086 let mut replica = store.new_replica(namespace.clone())?;
2087 replica.hash_and_insert(b"a1.1", &author1, b"hi").await?;
2088 replica.hash_and_insert(b"a0.2", &author0, b"hi").await?;
2089 let latest = store
2090 .get_latest_for_each_author(namespace.id())?
2091 .collect::<Result<Vec<_>>>()?;
2092 let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2093 latest_keys.sort();
2094 assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2095 store.flush()?;
2096 Ok(())
2097 }
2098
2099 #[tokio::test]
2100 async fn test_replica_byte_keys_memory() -> Result<()> {
2101 let store = store::Store::memory();
2102
2103 test_replica_byte_keys(store).await?;
2104 Ok(())
2105 }
2106
2107 #[tokio::test]
2108 #[cfg(feature = "fs-store")]
2109 async fn test_replica_byte_keys_fs() -> Result<()> {
2110 let dbfile = tempfile::NamedTempFile::new()?;
2111 let store = store::fs::Store::persistent(dbfile.path())?;
2112 test_replica_byte_keys(store).await?;
2113
2114 Ok(())
2115 }
2116
2117 async fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2118 let mut rng = rand::rng();
2119 let author = Author::new(&mut rng);
2120 let namespace = NamespaceSecret::new(&mut rng);
2121
2122 let hash = Hash::new(b"foo");
2123 let len = 3;
2124
2125 let key = vec![1u8, 0u8];
2126 let mut replica = store.new_replica(namespace.clone())?;
2127 replica.insert(key, &author, hash, len).await?;
2128 assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2129 let key = vec![1u8, 2u8];
2130 let mut replica = store.new_replica(namespace.clone())?;
2131 replica.insert(key, &author, hash, len).await?;
2132 assert_keys(
2133 &mut store,
2134 namespace.id(),
2135 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2136 );
2137
2138 let key = vec![0u8, 255u8];
2139 let mut replica = store.new_replica(namespace.clone())?;
2140 replica.insert(key, &author, hash, len).await?;
2141 assert_keys(
2142 &mut store,
2143 namespace.id(),
2144 vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2145 );
2146 store.flush()?;
2147 Ok(())
2148 }
2149
2150 #[tokio::test]
2151 async fn test_replica_capability_memory() -> Result<()> {
2152 let store = store::Store::memory();
2153 test_replica_capability(store).await
2154 }
2155
2156 #[tokio::test]
2157 #[cfg(feature = "fs-store")]
2158 async fn test_replica_capability_fs() -> Result<()> {
2159 let dbfile = tempfile::NamedTempFile::new()?;
2160 let store = store::fs::Store::persistent(dbfile.path())?;
2161 test_replica_capability(store).await
2162 }
2163
2164 #[allow(clippy::redundant_pattern_matching)]
2165 async fn test_replica_capability(mut store: Store) -> Result<()> {
2166 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2167 let author = store.new_author(&mut rng)?;
2168 let namespace = NamespaceSecret::new(&mut rng);
2169
2170 let capability = Capability::Read(namespace.id());
2172 store.import_namespace(capability)?;
2173 let mut replica = store.open_replica(&namespace.id())?;
2174 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2175 assert!(matches!(res, Err(InsertError::ReadOnly)));
2176
2177 let capability = Capability::Write(namespace.clone());
2179 store.import_namespace(capability)?;
2180 let mut replica = store.open_replica(&namespace.id())?;
2181 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2182 assert!(matches!(res, Ok(_)));
2183 store.close_replica(namespace.id());
2184 let mut replica = store.open_replica(&namespace.id())?;
2185 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2186 assert!(res.is_ok());
2187
2188 let capability = Capability::Read(namespace.id());
2190 store.import_namespace(capability)?;
2191 store.close_replica(namespace.id());
2192 let mut replica = store.open_replica(&namespace.id())?;
2193 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2194 assert!(res.is_ok());
2195 store.flush()?;
2196 Ok(())
2197 }
2198
2199 #[tokio::test]
2200 async fn test_actor_capability_memory() -> Result<()> {
2201 let store = store::Store::memory();
2202 test_actor_capability(store).await
2203 }
2204
2205 #[tokio::test]
2206 #[cfg(feature = "fs-store")]
2207 async fn test_actor_capability_fs() -> Result<()> {
2208 let dbfile = tempfile::NamedTempFile::new()?;
2209 let store = store::fs::Store::persistent(dbfile.path())?;
2210 test_actor_capability(store).await
2211 }
2212
2213 async fn test_actor_capability(store: Store) -> Result<()> {
2214 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2216 let author = Author::new(&mut rng);
2217 let handle = SyncHandle::spawn(store, None, "test".into());
2218 let author = handle.import_author(author).await?;
2219 let namespace = NamespaceSecret::new(&mut rng);
2220 let id = namespace.id();
2221
2222 let capability = Capability::Read(namespace.id());
2224 handle.import_namespace(capability).await?;
2225 handle.open(namespace.id(), Default::default()).await?;
2226 let res = handle
2227 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2228 .await;
2229 assert!(res.is_err());
2230
2231 let capability = Capability::Write(namespace.clone());
2233 handle.import_namespace(capability).await?;
2234 let res = handle
2235 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2236 .await;
2237 assert!(res.is_ok());
2238
2239 handle.close(namespace.id()).await?;
2241 let res = handle
2242 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2243 .await;
2244 assert!(res.is_err());
2245 handle.open(namespace.id(), Default::default()).await?;
2246 let res = handle
2247 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2248 .await;
2249 assert!(res.is_ok());
2250 Ok(())
2251 }
2252
2253 fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2254 let mut res = vec![];
2255 while let Ok(ev) = events.try_recv() {
2256 res.push(ev);
2257 }
2258 res
2259 }
2260
2261 #[tokio::test]
2264 async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2265 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2266 let mut store1 = store::Store::memory();
2267 let mut store2 = store::Store::memory();
2268 let peer1 = [1u8; 32];
2269 let peer2 = [2u8; 32];
2270 let mut state1 = SyncOutcome::default();
2271 let mut state2 = SyncOutcome::default();
2272
2273 let author = Author::new(&mut rng);
2274 let namespace = NamespaceSecret::new(&mut rng);
2275 let mut replica1 = store1.new_replica(namespace.clone())?;
2276 let mut replica2 = store2.new_replica(namespace.clone())?;
2277
2278 let (events1_sender, events1) = async_channel::bounded(32);
2279 let (events2_sender, events2) = async_channel::bounded(32);
2280
2281 replica1.info.subscribe(events1_sender);
2282 replica2.info.subscribe(events2_sender);
2283
2284 replica1.hash_and_insert(b"foo", &author, b"init").await?;
2285
2286 let from1 = replica1.sync_initial_message()?;
2287 let from2 = replica2
2288 .sync_process_message(from1, peer1, &mut state2)
2289 .await
2290 .unwrap()
2291 .unwrap();
2292 let from1 = replica1
2293 .sync_process_message(from2, peer2, &mut state1)
2294 .await
2295 .unwrap()
2296 .unwrap();
2297 replica2.hash_and_insert(b"foo", &author, b"update").await?;
2301 let from2 = replica2
2302 .sync_process_message(from1, peer1, &mut state2)
2303 .await
2304 .unwrap();
2305 assert!(from2.is_none());
2306 let events1 = drain(events1);
2307 let events2 = drain(events2);
2308 assert_eq!(events1.len(), 1);
2309 assert_eq!(events2.len(), 1);
2310 assert!(matches!(events1[0], Event::LocalInsert { .. }));
2311 assert!(matches!(events2[0], Event::LocalInsert { .. }));
2312 assert_eq!(state1.num_sent, 1);
2313 assert_eq!(state1.num_recv, 0);
2314 assert_eq!(state2.num_sent, 0);
2315 assert_eq!(state2.num_recv, 1);
2316 store1.flush()?;
2317 store2.flush()?;
2318 Ok(())
2319 }
2320
2321 #[tokio::test]
2322 async fn test_replica_queries_mem() -> Result<()> {
2323 let store = store::Store::memory();
2324
2325 test_replica_queries(store).await?;
2326 Ok(())
2327 }
2328
2329 #[tokio::test]
2330 #[cfg(feature = "fs-store")]
2331 async fn test_replica_queries_fs() -> Result<()> {
2332 let dbfile = tempfile::NamedTempFile::new()?;
2333 let store = store::fs::Store::persistent(dbfile.path())?;
2334 test_replica_queries(store).await?;
2335
2336 Ok(())
2337 }
2338
2339 async fn test_replica_queries(mut store: Store) -> Result<()> {
2340 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2341 let namespace = NamespaceSecret::new(&mut rng);
2342 let namespace_id = namespace.id();
2343
2344 let a1 = store.new_author(&mut rng)?;
2345 let a2 = store.new_author(&mut rng)?;
2346 let a3 = store.new_author(&mut rng)?;
2347 println!(
2348 "a1 {} a2 {} a3 {}",
2349 a1.id().fmt_short(),
2350 a2.id().fmt_short(),
2351 a3.id().fmt_short()
2352 );
2353
2354 let mut replica = store.new_replica(namespace.clone())?;
2355 replica.hash_and_insert("hi/world", &a2, "a2").await?;
2356 replica.hash_and_insert("hi/world", &a1, "a1").await?;
2357 replica.hash_and_insert("hi/moon", &a2, "a1").await?;
2358 replica.hash_and_insert("hi", &a3, "a3").await?;
2359
2360 struct QueryTester<'a> {
2361 store: &'a mut Store,
2362 namespace: NamespaceId,
2363 }
2364 impl QueryTester<'_> {
2365 fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2366 let query = query.into();
2367 let actual = self
2368 .store
2369 .get_many(self.namespace, query.clone())
2370 .unwrap()
2371 .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2372 .collect::<Result<Vec<_>>>()
2373 .unwrap();
2374 let expected = expected
2375 .into_iter()
2376 .map(|(key, author)| (key.to_string(), author.id()))
2377 .collect::<Vec<_>>();
2378 assert_eq!(actual, expected, "query: {query:#?}")
2379 }
2380 }
2381
2382 let mut qt = QueryTester {
2383 store: &mut store,
2384 namespace: namespace_id,
2385 };
2386
2387 qt.assert(
2388 Query::all(),
2389 vec![
2390 ("hi/world", &a1),
2391 ("hi/moon", &a2),
2392 ("hi/world", &a2),
2393 ("hi", &a3),
2394 ],
2395 );
2396
2397 qt.assert(
2398 Query::single_latest_per_key(),
2399 vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2400 );
2401
2402 qt.assert(
2403 Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2404 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2405 );
2406
2407 qt.assert(
2408 Query::single_latest_per_key().key_prefix("hi/"),
2409 vec![("hi/moon", &a2), ("hi/world", &a1)],
2410 );
2411
2412 qt.assert(
2413 Query::single_latest_per_key()
2414 .key_prefix("hi/")
2415 .sort_direction(SortDirection::Desc),
2416 vec![("hi/world", &a1), ("hi/moon", &a2)],
2417 );
2418
2419 qt.assert(
2420 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2421 vec![
2422 ("hi", &a3),
2423 ("hi/moon", &a2),
2424 ("hi/world", &a1),
2425 ("hi/world", &a2),
2426 ],
2427 );
2428
2429 qt.assert(
2430 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2431 vec![
2432 ("hi/world", &a2),
2433 ("hi/world", &a1),
2434 ("hi/moon", &a2),
2435 ("hi", &a3),
2436 ],
2437 );
2438
2439 qt.assert(
2440 Query::all().key_prefix("hi/"),
2441 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2442 );
2443
2444 qt.assert(
2445 Query::all().key_prefix("hi/").offset(1).limit(1),
2446 vec![("hi/moon", &a2)],
2447 );
2448
2449 qt.assert(
2450 Query::all()
2451 .key_prefix("hi/")
2452 .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2453 vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2454 );
2455
2456 qt.assert(
2457 Query::all()
2458 .key_prefix("hi/")
2459 .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2460 .offset(1)
2461 .limit(1),
2462 vec![("hi/world", &a1)],
2463 );
2464
2465 qt.assert(
2466 Query::all()
2467 .key_prefix("hi/")
2468 .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2469 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2470 );
2471
2472 qt.assert(
2473 Query::all()
2474 .key_prefix("hi/")
2475 .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2476 vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2477 );
2478
2479 qt.assert(
2480 Query::all()
2481 .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2482 .limit(2)
2483 .offset(1),
2484 vec![("hi/moon", &a2), ("hi/world", &a1)],
2485 );
2486
2487 let mut replica = store.new_replica(namespace)?;
2488 replica.delete_prefix("hi/world", &a2).await?;
2489 let mut qt = QueryTester {
2490 store: &mut store,
2491 namespace: namespace_id,
2492 };
2493
2494 qt.assert(
2495 Query::all(),
2496 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2497 );
2498
2499 qt.assert(
2500 Query::all().include_empty(),
2501 vec![
2502 ("hi/world", &a1),
2503 ("hi/moon", &a2),
2504 ("hi/world", &a2),
2505 ("hi", &a3),
2506 ],
2507 );
2508 store.flush()?;
2509 Ok(())
2510 }
2511
2512 #[test]
2513 fn test_dl_policies_mem() -> Result<()> {
2514 let mut store = store::Store::memory();
2515 test_dl_policies(&mut store)
2516 }
2517
2518 #[test]
2519 #[cfg(feature = "fs-store")]
2520 fn test_dl_policies_fs() -> Result<()> {
2521 let dbfile = tempfile::NamedTempFile::new()?;
2522 let mut store = store::fs::Store::persistent(dbfile.path())?;
2523 test_dl_policies(&mut store)
2524 }
2525
2526 fn test_dl_policies(store: &mut Store) -> Result<()> {
2527 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2528 let namespace = NamespaceSecret::new(&mut rng);
2529 let id = namespace.id();
2530
2531 let filter = store::FilterKind::Exact("foo".into());
2532 let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2533 store
2534 .set_download_policy(&id, policy.clone())
2535 .expect_err("document dos not exist");
2536
2537 store.new_replica(namespace)?;
2539
2540 store.set_download_policy(&id, policy.clone())?;
2541 let retrieved_policy = store.get_download_policy(&id)?;
2542 assert_eq!(retrieved_policy, policy);
2543 store.flush()?;
2544 Ok(())
2545 }
2546
2547 fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2548 expected.sort();
2549 assert_eq!(expected, get_keys_sorted(store, namespace));
2550 }
2551
2552 fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2553 let mut res = store
2554 .get_many(namespace, Query::all())
2555 .unwrap()
2556 .map(|e| e.map(|e| e.key().to_vec()))
2557 .collect::<Result<Vec<_>>>()
2558 .unwrap();
2559 res.sort();
2560 res
2561 }
2562
2563 fn get_entry(
2564 store: &mut Store,
2565 namespace: NamespaceId,
2566 author: AuthorId,
2567 key: &[u8],
2568 ) -> anyhow::Result<SignedEntry> {
2569 let entry = store
2570 .get_exact(namespace, author, key, true)?
2571 .ok_or_else(|| anyhow::anyhow!("not found"))?;
2572 Ok(entry)
2573 }
2574
2575 fn get_content_hash(
2576 store: &mut Store,
2577 namespace: NamespaceId,
2578 author: AuthorId,
2579 key: &[u8],
2580 ) -> anyhow::Result<Option<Hash>> {
2581 let hash = store
2582 .get_exact(namespace, author, key, false)?
2583 .map(|e| e.content_hash());
2584 Ok(hash)
2585 }
2586
2587 async fn sync<'a>(
2588 alice: &'a mut Replica<'a>,
2589 bob: &'a mut Replica<'a>,
2590 ) -> Result<(SyncOutcome, SyncOutcome)> {
2591 let alice_peer_id = [1u8; 32];
2592 let bob_peer_id = [2u8; 32];
2593 let mut alice_state = SyncOutcome::default();
2594 let mut bob_state = SyncOutcome::default();
2595 let mut next_to_bob = Some(alice.sync_initial_message()?);
2597 let mut rounds = 0;
2598 while let Some(msg) = next_to_bob.take() {
2599 assert!(rounds < 100, "too many rounds");
2600 rounds += 1;
2601 println!("round {rounds}");
2602 if let Some(msg) = bob
2603 .sync_process_message(msg, alice_peer_id, &mut bob_state)
2604 .await?
2605 {
2606 next_to_bob = alice
2607 .sync_process_message(msg, bob_peer_id, &mut alice_state)
2608 .await?
2609 }
2610 }
2611 assert_eq!(alice_state.num_sent, bob_state.num_recv);
2612 assert_eq!(alice_state.num_recv, bob_state.num_sent);
2613 Ok((alice_state, bob_state))
2614 }
2615
2616 fn check_entries(
2617 store: &mut Store,
2618 namespace: &NamespaceId,
2619 author: &Author,
2620 set: &[&str],
2621 ) -> Result<()> {
2622 for el in set {
2623 store.get_exact(*namespace, author.id(), el, false)?;
2624 }
2625 Ok(())
2626 }
2627}