1use std::{
10 cmp::Ordering,
11 fmt::Debug,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14 time::{Duration, SystemTime},
15};
16
17use bytes::{Bytes, BytesMut};
18use ed25519_dalek::{Signature, SignatureError};
19use iroh_blobs::Hash;
20use serde::{Deserialize, Serialize};
21
22pub use crate::heads::AuthorHeads;
23use crate::{
24 keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
25 ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
26 store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
27};
28
29pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
33
34pub type PeerIdBytes = [u8; 32];
37
38pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
41
42pub type ContentStatusCallback =
44 Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
45
46#[derive(Debug, Clone)]
48pub enum Event {
49 LocalInsert {
51 namespace: NamespaceId,
53 entry: SignedEntry,
55 },
56 RemoteInsert {
58 namespace: NamespaceId,
60 entry: SignedEntry,
62 from: PeerIdBytes,
64 should_download: bool,
66 remote_content_status: ContentStatus,
68 },
69}
70
71#[derive(Debug, Clone)]
73pub enum InsertOrigin {
74 Local,
76 Sync {
78 from: PeerIdBytes,
80 remote_content_status: ContentStatus,
82 },
83}
84
85#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
87pub enum ContentStatus {
88 Complete,
90 Incomplete,
92 Missing,
94}
95
96#[derive(Debug, Clone, Default)]
98pub struct SyncOutcome {
99 pub heads_received: AuthorHeads,
101 pub num_recv: usize,
103 pub num_sent: usize,
105}
106
107fn get_as_ptr<T>(value: &T) -> Option<usize> {
108 use std::mem;
109 if mem::size_of::<T>() == std::mem::size_of::<usize>()
110 && mem::align_of::<T>() == mem::align_of::<usize>()
111 {
112 unsafe { Some(mem::transmute_copy(value)) }
114 } else {
115 None
116 }
117}
118
119fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
120 get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
121}
122
123#[derive(Debug, Default)]
124struct Subscribers(Vec<async_channel::Sender<Event>>);
125impl Subscribers {
126 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
127 self.0.push(sender)
128 }
129 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
130 self.0.retain(|s| !same_channel(s, sender));
131 }
132 pub fn send(&mut self, event: Event) {
133 self.0
134 .retain(|sender| sender.send_blocking(event.clone()).is_ok())
135 }
136 pub fn len(&self) -> usize {
137 self.0.len()
138 }
139 pub fn send_with(&mut self, f: impl FnOnce() -> Event) {
140 if !self.0.is_empty() {
141 self.send(f())
142 }
143 }
144}
145
146#[derive(
148 Debug,
149 Clone,
150 Copy,
151 Serialize,
152 Deserialize,
153 num_enum::IntoPrimitive,
154 num_enum::TryFromPrimitive,
155 strum::Display,
156)]
157#[repr(u8)]
158#[strum(serialize_all = "snake_case")]
159pub enum CapabilityKind {
160 Write = 1,
162 Read = 2,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
168pub enum Capability {
169 Write(NamespaceSecret),
171 Read(NamespaceId),
173}
174
175impl Capability {
176 pub fn id(&self) -> NamespaceId {
178 match self {
179 Capability::Write(secret) => secret.id(),
180 Capability::Read(id) => *id,
181 }
182 }
183
184 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
187 match self {
188 Capability::Write(secret) => Ok(secret),
189 Capability::Read(_) => Err(ReadOnly),
190 }
191 }
192
193 pub fn kind(&self) -> CapabilityKind {
195 match self {
196 Capability::Write(_) => CapabilityKind::Write,
197 Capability::Read(_) => CapabilityKind::Read,
198 }
199 }
200
201 pub fn raw(&self) -> (u8, [u8; 32]) {
203 let capability_repr: u8 = self.kind().into();
204 let bytes = match self {
205 Capability::Write(secret) => secret.to_bytes(),
206 Capability::Read(id) => id.to_bytes(),
207 };
208 (capability_repr, bytes)
209 }
210
211 pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
213 let kind: CapabilityKind = kind.try_into()?;
214 let capability = match kind {
215 CapabilityKind::Write => {
216 let secret = NamespaceSecret::from_bytes(bytes);
217 Capability::Write(secret)
218 }
219 CapabilityKind::Read => {
220 let id = NamespaceId::from(bytes);
221 Capability::Read(id)
222 }
223 };
224 Ok(capability)
225 }
226
227 pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
233 if other.id() != self.id() {
234 return Err(CapabilityError::NamespaceMismatch);
235 }
236
237 if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
239 let _ = std::mem::replace(self, other);
240 Ok(true)
241 } else {
242 Ok(false)
243 }
244 }
245}
246
247#[derive(Debug, thiserror::Error)]
249pub enum CapabilityError {
250 #[error("Namespaces are not the same")]
252 NamespaceMismatch,
253}
254
255#[derive(derive_more::Debug)]
257pub struct ReplicaInfo {
258 pub(crate) capability: Capability,
259 subscribers: Subscribers,
260 #[debug("ContentStatusCallback")]
261 content_status_cb: Option<ContentStatusCallback>,
262 closed: bool,
263}
264
265impl ReplicaInfo {
266 pub fn new(capability: Capability) -> Self {
268 Self {
269 capability,
270 subscribers: Default::default(),
271 content_status_cb: None,
273 closed: false,
274 }
275 }
276
277 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
283 self.subscribers.subscribe(sender)
284 }
285
286 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
292 self.subscribers.unsubscribe(sender)
293 }
294
295 pub fn subscribers_count(&self) -> usize {
297 self.subscribers.len()
298 }
299
300 pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
305 if self.content_status_cb.is_some() {
306 false
307 } else {
308 self.content_status_cb = Some(cb);
309 true
310 }
311 }
312
313 fn ensure_open(&self) -> Result<(), InsertError> {
314 if self.closed() {
315 Err(InsertError::Closed)
316 } else {
317 Ok(())
318 }
319 }
320
321 pub fn closed(&self) -> bool {
327 self.closed
328 }
329
330 pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
337 self.capability.merge(capability)
338 }
339}
340
341#[derive(derive_more::Debug)]
343pub struct Replica<'a, I = Box<ReplicaInfo>> {
344 pub(crate) store: StoreInstance<'a>,
345 pub(crate) info: I,
346}
347
348impl<'a, I> Replica<'a, I>
349where
350 I: Deref<Target = ReplicaInfo> + DerefMut,
351{
352 pub fn new(store: StoreInstance<'a>, info: I) -> Self {
354 Replica { info, store }
355 }
356
357 pub fn insert(
365 &mut self,
366 key: impl AsRef<[u8]>,
367 author: &Author,
368 hash: Hash,
369 len: u64,
370 ) -> Result<usize, InsertError> {
371 if len == 0 || hash == Hash::EMPTY {
372 return Err(InsertError::EntryIsEmpty);
373 }
374 self.info.ensure_open()?;
375 let id = RecordIdentifier::new(self.id(), author.id(), key);
376 let record = Record::new_current(hash, len);
377 let entry = Entry::new(id, record);
378 let secret = self.secret_key()?;
379 let signed_entry = entry.sign(secret, author);
380 self.insert_entry(signed_entry, InsertOrigin::Local)
381 }
382
383 pub fn delete_prefix(
390 &mut self,
391 prefix: impl AsRef<[u8]>,
392 author: &Author,
393 ) -> Result<usize, InsertError> {
394 self.info.ensure_open()?;
395 let id = RecordIdentifier::new(self.id(), author.id(), prefix);
396 let entry = Entry::new_empty(id);
397 let signed_entry = entry.sign(self.secret_key()?, author);
398 self.insert_entry(signed_entry, InsertOrigin::Local)
399 }
400
401 pub fn insert_remote_entry(
409 &mut self,
410 entry: SignedEntry,
411 received_from: PeerIdBytes,
412 content_status: ContentStatus,
413 ) -> Result<usize, InsertError> {
414 self.info.ensure_open()?;
415 entry.validate_empty()?;
416 let origin = InsertOrigin::Sync {
417 from: received_from,
418 remote_content_status: content_status,
419 };
420 self.insert_entry(entry, origin)
421 }
422
423 fn insert_entry(
427 &mut self,
428 entry: SignedEntry,
429 origin: InsertOrigin,
430 ) -> Result<usize, InsertError> {
431 let namespace = self.id();
432
433 let store = &self.store;
434 validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
435
436 let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
437 tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
438
439 let removed_count = match outcome {
440 InsertOutcome::Inserted { removed } => removed,
441 InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
442 };
443
444 let insert_event = match origin {
445 InsertOrigin::Local => Event::LocalInsert { namespace, entry },
446 InsertOrigin::Sync {
447 from,
448 remote_content_status,
449 } => {
450 let download_policy = self
451 .store
452 .get_download_policy(&self.id())
453 .unwrap_or_default();
454 let should_download = download_policy.matches(entry.entry());
455 Event::RemoteInsert {
456 namespace,
457 entry,
458 from,
459 should_download,
460 remote_content_status,
461 }
462 }
463 };
464
465 self.info.subscribers.send(insert_event);
466
467 Ok(removed_count)
468 }
469
470 pub fn hash_and_insert(
475 &mut self,
476 key: impl AsRef<[u8]>,
477 author: &Author,
478 data: impl AsRef<[u8]>,
479 ) -> Result<Hash, InsertError> {
480 self.info.ensure_open()?;
481 let len = data.as_ref().len() as u64;
482 let hash = Hash::new(data);
483 self.insert(key, author, hash, len)?;
484 Ok(hash)
485 }
486
487 pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
489 RecordIdentifier::new(self.info.capability.id(), author.id(), key)
490 }
491
492 pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
494 self.info.ensure_open().map_err(anyhow::Error::from)?;
495 self.store.initial_message()
496 }
497
498 pub async fn sync_process_message(
502 &mut self,
503 message: crate::ranger::Message<SignedEntry>,
504 from_peer: PeerIdBytes,
505 state: &mut SyncOutcome,
506 ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
507 self.info.ensure_open()?;
508 let my_namespace = self.id();
509 let now = system_time_now();
510
511 state.num_recv += message.value_count();
513 for (entry, _content_status) in message.values() {
514 state
515 .heads_received
516 .insert(entry.author(), entry.timestamp());
517 }
518
519 let cb = self.info.content_status_cb.clone();
522 let download_policy = self
523 .store
524 .get_download_policy(&my_namespace)
525 .unwrap_or_default();
526 let reply = self
527 .store
528 .process_message(
529 &Default::default(),
530 message,
531 |store, entry, content_status| {
533 let origin = InsertOrigin::Sync {
534 from: from_peer,
535 remote_content_status: content_status,
536 };
537 validate_entry(now, store, my_namespace, entry, &origin).is_ok()
538 },
539 |_store, entry, content_status| {
541 self.info.subscribers.send_with(|| {
543 let should_download = download_policy.matches(entry.entry());
544 Event::RemoteInsert {
545 from: from_peer,
546 namespace: my_namespace,
547 entry: entry.clone(),
548 should_download,
549 remote_content_status: content_status,
550 }
551 })
552 },
553 move |entry| {
555 let cb = cb.clone();
556 Box::pin(async move {
557 if let Some(cb) = cb.as_ref() {
558 cb(entry.content_hash()).await
559 } else {
560 ContentStatus::Missing
561 }
562 })
563 },
564 )
565 .await?;
566
567 if let Some(ref reply) = reply {
569 state.num_sent += reply.value_count();
570 }
571
572 Ok(reply)
573 }
574
575 pub fn id(&self) -> NamespaceId {
577 self.info.capability.id()
578 }
579
580 pub fn capability(&self) -> &Capability {
582 &self.info.capability
583 }
584
585 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
588 self.info.capability.secret_key()
589 }
590}
591
592#[derive(Debug, thiserror::Error)]
594#[error("Replica allows read access only.")]
595pub struct ReadOnly;
596
597fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
605 now: u64,
606 store: &S,
607 expected_namespace: NamespaceId,
608 entry: &SignedEntry,
609 origin: &InsertOrigin,
610) -> Result<(), ValidationFailure> {
611 if entry.namespace() != expected_namespace {
613 return Err(ValidationFailure::InvalidNamespace);
614 }
615
616 if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
618 return Err(ValidationFailure::BadSignature);
619 }
620
621 if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
623 return Err(ValidationFailure::TooFarInTheFuture);
624 }
625 Ok(())
626}
627
628#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
630pub enum InsertError {
631 #[error("storage error")]
633 Store(anyhow::Error),
634 #[error("validation failure")]
636 Validation(#[from] ValidationFailure),
637 #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
639 NewerEntryExists,
640 #[error("Attempted to insert an empty entry")]
642 EntryIsEmpty,
643 #[error("Attempted to insert to read only replica")]
645 #[from(ReadOnly)]
646 ReadOnly,
647 #[error("replica is closed")]
649 Closed,
650}
651
652#[derive(thiserror::Error, Debug)]
654pub enum ValidationFailure {
655 #[error("Entry namespace does not match the current replica")]
657 InvalidNamespace,
658 #[error("Entry signature is invalid")]
660 BadSignature,
661 #[error("Entry timestamp is too far in the future.")]
663 TooFarInTheFuture,
664 #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
666 InvalidEmptyEntry,
667}
668
669#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
671pub struct SignedEntry {
672 signature: EntrySignature,
673 entry: Entry,
674}
675
676impl From<SignedEntry> for Entry {
677 fn from(value: SignedEntry) -> Self {
678 value.entry
679 }
680}
681
682impl PartialOrd for SignedEntry {
683 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
684 Some(self.cmp(other))
685 }
686}
687
688impl Ord for SignedEntry {
689 fn cmp(&self, other: &Self) -> Ordering {
690 self.entry.cmp(&other.entry)
691 }
692}
693
694impl PartialOrd for Entry {
695 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
696 Some(self.cmp(other))
697 }
698}
699
700impl Ord for Entry {
701 fn cmp(&self, other: &Self) -> Ordering {
702 self.id
703 .cmp(&other.id)
704 .then_with(|| self.record.cmp(&other.record))
705 }
706}
707
708impl SignedEntry {
709 pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
710 SignedEntry { signature, entry }
711 }
712
713 pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
715 let signature = EntrySignature::from_entry(&entry, namespace, author);
716 SignedEntry { signature, entry }
717 }
718
719 pub fn from_parts(
721 namespace: &NamespaceSecret,
722 author: &Author,
723 key: impl AsRef<[u8]>,
724 record: Record,
725 ) -> Self {
726 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
727 let entry = Entry::new(id, record);
728 Self::from_entry(entry, namespace, author)
729 }
730
731 pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
733 self.signature.verify(
734 &self.entry,
735 &self.entry.namespace().public_key(store)?,
736 &self.entry.author().public_key(store)?,
737 )
738 }
739
740 pub fn signature(&self) -> &EntrySignature {
742 &self.signature
743 }
744
745 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
747 self.entry().validate_empty()
748 }
749
750 pub fn entry(&self) -> &Entry {
752 &self.entry
753 }
754
755 pub fn content_hash(&self) -> Hash {
757 self.entry().content_hash()
758 }
759
760 pub fn content_len(&self) -> u64 {
762 self.entry().content_len()
763 }
764
765 pub fn author_bytes(&self) -> AuthorId {
767 self.entry().id().author()
768 }
769
770 pub fn key(&self) -> &[u8] {
772 self.entry().id().key()
773 }
774
775 pub fn timestamp(&self) -> u64 {
777 self.entry().timestamp()
778 }
779}
780
781impl RangeEntry for SignedEntry {
782 type Key = RecordIdentifier;
783 type Value = Record;
784
785 fn key(&self) -> &Self::Key {
786 &self.entry.id
787 }
788
789 fn value(&self) -> &Self::Value {
790 &self.entry.record
791 }
792
793 fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
794 let mut hasher = blake3::Hasher::new();
795 hasher.update(self.namespace().as_ref());
796 hasher.update(self.author_bytes().as_ref());
797 hasher.update(self.key());
798 hasher.update(&self.timestamp().to_be_bytes());
799 hasher.update(self.content_hash().as_bytes());
800 Fingerprint(hasher.finalize().into())
801 }
802}
803
804#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
806pub struct EntrySignature {
807 author_signature: Signature,
808 namespace_signature: Signature,
809}
810
811impl Debug for EntrySignature {
812 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
813 f.debug_struct("EntrySignature")
814 .field(
815 "namespace_signature",
816 &hex::encode(self.namespace_signature.to_bytes()),
817 )
818 .field(
819 "author_signature",
820 &hex::encode(self.author_signature.to_bytes()),
821 )
822 .finish()
823 }
824}
825
826impl EntrySignature {
827 pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
829 let bytes = entry.to_vec();
832 let namespace_signature = namespace.sign(&bytes);
833 let author_signature = author.sign(&bytes);
834
835 EntrySignature {
836 author_signature,
837 namespace_signature,
838 }
839 }
840
841 pub fn verify(
844 &self,
845 entry: &Entry,
846 namespace: &NamespacePublicKey,
847 author: &AuthorPublicKey,
848 ) -> Result<(), SignatureError> {
849 let bytes = entry.to_vec();
850 namespace.verify(&bytes, &self.namespace_signature)?;
851 author.verify(&bytes, &self.author_signature)?;
852
853 Ok(())
854 }
855
856 pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
857 let namespace_signature = Signature::from_bytes(namespace_sig);
858 let author_signature = Signature::from_bytes(author_sig);
859
860 EntrySignature {
861 author_signature,
862 namespace_signature,
863 }
864 }
865
866 pub(crate) fn author(&self) -> &Signature {
867 &self.author_signature
868 }
869
870 pub(crate) fn namespace(&self) -> &Signature {
871 &self.namespace_signature
872 }
873}
874
875#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
881pub struct Entry {
882 id: RecordIdentifier,
883 record: Record,
884}
885
886impl Entry {
887 pub fn new(id: RecordIdentifier, record: Record) -> Self {
889 Entry { id, record }
890 }
891
892 pub fn new_empty(id: RecordIdentifier) -> Self {
894 Entry {
895 id,
896 record: Record::empty_current(),
897 }
898 }
899
900 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
902 match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
903 (true, true) => Ok(()),
904 (false, false) => Ok(()),
905 (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
906 (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
907 }
908 }
909
910 pub fn id(&self) -> &RecordIdentifier {
912 &self.id
913 }
914
915 pub fn namespace(&self) -> NamespaceId {
917 self.id.namespace()
918 }
919
920 pub fn author(&self) -> AuthorId {
922 self.id.author()
923 }
924
925 pub fn key(&self) -> &[u8] {
927 self.id.key()
928 }
929
930 pub fn record(&self) -> &Record {
932 &self.record
933 }
934
935 pub fn content_hash(&self) -> Hash {
937 self.record.hash
938 }
939
940 pub fn content_len(&self) -> u64 {
942 self.record.len
943 }
944
945 pub fn timestamp(&self) -> u64 {
947 self.record.timestamp
948 }
949
950 pub fn encode(&self, out: &mut Vec<u8>) {
952 self.id.encode(out);
953 self.record.encode(out);
954 }
955
956 pub fn to_vec(&self) -> Vec<u8> {
958 let mut out = Vec::new();
959 self.encode(&mut out);
960 out
961 }
962
963 pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
965 SignedEntry::from_entry(self, namespace, author)
966 }
967}
968
969const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
970const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
971const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
972
973#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
975pub struct RecordIdentifier(Bytes);
976
977impl Default for RecordIdentifier {
978 fn default() -> Self {
979 Self::new(NamespaceId::default(), AuthorId::default(), b"")
980 }
981}
982
983impl Debug for RecordIdentifier {
984 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
985 f.debug_struct("RecordIdentifier")
986 .field("namespace", &self.namespace())
987 .field("author", &self.author())
988 .field("key", &std::string::String::from_utf8_lossy(self.key()))
989 .finish()
990 }
991}
992
993impl RangeKey for RecordIdentifier {
994 #[cfg(test)]
995 fn is_prefix_of(&self, other: &Self) -> bool {
996 other.as_ref().starts_with(self.as_ref())
997 }
998}
999
1000fn system_time_now() -> u64 {
1001 SystemTime::now()
1002 .duration_since(SystemTime::UNIX_EPOCH)
1003 .expect("time drift")
1004 .as_micros() as u64
1005}
1006
1007impl RecordIdentifier {
1008 pub fn new(
1010 namespace: impl Into<NamespaceId>,
1011 author: impl Into<AuthorId>,
1012 key: impl AsRef<[u8]>,
1013 ) -> Self {
1014 let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1015 bytes.extend_from_slice(namespace.into().as_bytes());
1016 bytes.extend_from_slice(author.into().as_bytes());
1017 bytes.extend_from_slice(key.as_ref());
1018 Self(bytes.freeze())
1019 }
1020
1021 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1023 out.extend_from_slice(&self.0);
1024 }
1025
1026 pub fn as_bytes(&self) -> Bytes {
1028 self.0.clone()
1029 }
1030
1031 pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1033 (
1034 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1035 self.0[AUTHOR_BYTES].try_into().unwrap(),
1036 &self.0[KEY_BYTES],
1037 )
1038 }
1039
1040 pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1042 (
1043 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1044 self.0[AUTHOR_BYTES].try_into().unwrap(),
1045 self.0.slice(KEY_BYTES),
1046 )
1047 }
1048
1049 pub fn key(&self) -> &[u8] {
1051 &self.0[KEY_BYTES]
1052 }
1053
1054 pub fn key_bytes(&self) -> Bytes {
1056 self.0.slice(KEY_BYTES)
1057 }
1058
1059 pub fn namespace(&self) -> NamespaceId {
1061 let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1062 value.into()
1063 }
1064
1065 pub fn author(&self) -> AuthorId {
1067 let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1068 value.into()
1069 }
1070}
1071
1072impl AsRef<[u8]> for RecordIdentifier {
1073 fn as_ref(&self) -> &[u8] {
1074 &self.0
1075 }
1076}
1077
1078impl Deref for SignedEntry {
1079 type Target = Entry;
1080 fn deref(&self) -> &Self::Target {
1081 &self.entry
1082 }
1083}
1084
1085impl Deref for Entry {
1086 type Target = Record;
1087 fn deref(&self) -> &Self::Target {
1088 &self.record
1089 }
1090}
1091
1092#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1094pub struct Record {
1095 len: u64,
1097 hash: Hash,
1099 timestamp: u64,
1101}
1102
1103impl RangeValue for Record {}
1104
1105impl Ord for Record {
1109 fn cmp(&self, other: &Self) -> Ordering {
1110 self.timestamp
1111 .cmp(&other.timestamp)
1112 .then_with(|| self.hash.cmp(&other.hash))
1113 }
1114}
1115
1116impl PartialOrd for Record {
1117 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1118 Some(self.cmp(other))
1119 }
1120}
1121
1122impl Record {
1123 pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1125 debug_assert!(
1126 len != 0 || hash == Hash::EMPTY,
1127 "if `len` is 0 then `hash` must be the hash of the empty byte range"
1128 );
1129 Record {
1130 hash,
1131 len,
1132 timestamp,
1133 }
1134 }
1135
1136 pub fn empty(timestamp: u64) -> Self {
1138 Self::new(Hash::EMPTY, 0, timestamp)
1139 }
1140
1141 pub fn empty_current() -> Self {
1143 Self::new_current(Hash::EMPTY, 0)
1144 }
1145
1146 pub fn is_empty(&self) -> bool {
1148 self.hash == Hash::EMPTY
1149 }
1150
1151 pub fn new_current(hash: Hash, len: u64) -> Self {
1153 let timestamp = system_time_now();
1154 Self::new(hash, len, timestamp)
1155 }
1156
1157 pub fn content_len(&self) -> u64 {
1159 self.len
1160 }
1161
1162 pub fn content_hash(&self) -> Hash {
1164 self.hash
1165 }
1166
1167 pub fn timestamp(&self) -> u64 {
1169 self.timestamp
1170 }
1171
1172 #[cfg(test)]
1173 pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1174 let len = data.as_ref().len() as u64;
1175 let hash = Hash::new(data);
1176 Self::new_current(hash, len)
1177 }
1178
1179 #[cfg(test)]
1180 pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1181 let len = data.as_ref().len() as u64;
1182 let hash = Hash::new(data);
1183 Self::new(hash, len, timestamp)
1184 }
1185
1186 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1188 out.extend_from_slice(&self.len.to_be_bytes());
1189 out.extend_from_slice(self.hash.as_ref());
1190 out.extend_from_slice(&self.timestamp.to_be_bytes())
1191 }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196 use std::collections::HashSet;
1197
1198 use anyhow::Result;
1199 use rand_core::SeedableRng;
1200
1201 use super::*;
1202 use crate::{
1203 actor::SyncHandle,
1204 ranger::{Range, Store as _},
1205 store::{OpenError, Query, SortBy, SortDirection, Store},
1206 };
1207
1208 #[test]
1209 fn test_basics_memory() -> Result<()> {
1210 let store = store::Store::memory();
1211 test_basics(store)?;
1212
1213 Ok(())
1214 }
1215
1216 #[test]
1217 fn test_basics_fs() -> Result<()> {
1218 let dbfile = tempfile::NamedTempFile::new()?;
1219 let store = store::fs::Store::persistent(dbfile.path())?;
1220 test_basics(store)?;
1221 Ok(())
1222 }
1223
1224 fn test_basics(mut store: Store) -> Result<()> {
1225 let mut rng = rand::thread_rng();
1226 let alice = Author::new(&mut rng);
1227 let bob = Author::new(&mut rng);
1228 let myspace = NamespaceSecret::new(&mut rng);
1229
1230 let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1231 let record = Record::current_from_data(b"this is my cool data");
1232 let entry = Entry::new(record_id, record);
1233 let signed_entry = entry.sign(&myspace, &alice);
1234 signed_entry.verify(&()).expect("failed to verify");
1235
1236 let mut my_replica = store.new_replica(myspace.clone())?;
1237 for i in 0..10 {
1238 my_replica.hash_and_insert(
1239 format!("/{i}"),
1240 &alice,
1241 format!("{i}: hello from alice"),
1242 )?;
1243 }
1244
1245 for i in 0..10 {
1246 let res = store
1247 .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1248 .unwrap();
1249 let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1250 assert_eq!(res.entry().record().content_len(), len);
1251 res.verify(&())?;
1252 }
1253
1254 let mut my_replica = store.new_replica(myspace.clone())?;
1256 my_replica.hash_and_insert("/cool/path", &alice, "round 1")?;
1257 let _entry = store
1258 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1259 .unwrap();
1260 let mut my_replica = store.new_replica(myspace.clone())?;
1262 my_replica.hash_and_insert("/cool/path", &alice, "round 2")?;
1263 let _entry = store
1264 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1265 .unwrap();
1266
1267 let entries: Vec<_> = store
1269 .get_many(myspace.id(), Query::author(alice.id()))?
1270 .collect::<Result<_>>()?;
1271 assert_eq!(entries.len(), 11);
1272
1273 let entries: Vec<_> = store
1275 .get_many(myspace.id(), Query::author(bob.id()))?
1276 .collect::<Result<_>>()?;
1277 assert_eq!(entries.len(), 0);
1278
1279 let entries: Vec<_> = store
1281 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1282 .collect::<Result<_>>()?;
1283 assert_eq!(entries.len(), 1);
1284
1285 let entries: Vec<_> = store
1287 .get_many(myspace.id(), Query::all())?
1288 .collect::<Result<_>>()?;
1289 assert_eq!(entries.len(), 11);
1290
1291 let mut my_replica = store.new_replica(myspace.clone())?;
1293 let _entry = my_replica.hash_and_insert("/cool/path", &bob, "bob round 1")?;
1294
1295 let entries: Vec<_> = store
1297 .get_many(myspace.id(), Query::author(alice.id()))?
1298 .collect::<Result<_>>()?;
1299 assert_eq!(entries.len(), 11);
1300
1301 let entries: Vec<_> = store
1302 .get_many(myspace.id(), Query::author(bob.id()))?
1303 .collect::<Result<_>>()?;
1304 assert_eq!(entries.len(), 1);
1305
1306 let entries: Vec<_> = store
1308 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1309 .collect::<Result<_>>()?;
1310 assert_eq!(entries.len(), 2);
1311
1312 let entries: Vec<_> = store
1314 .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1315 .collect::<Result<_>>()?;
1316 assert_eq!(entries.len(), 2);
1317
1318 let entries: Vec<_> = store
1320 .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1321 .collect::<Result<_>>()?;
1322 assert_eq!(entries.len(), 1);
1323
1324 let entries: Vec<_> = store
1325 .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1326 .collect::<Result<_>>()?;
1327 assert_eq!(entries.len(), 1);
1328
1329 let entries: Vec<_> = store
1331 .get_many(myspace.id(), Query::all())?
1332 .collect::<Result<_>>()?;
1333 assert_eq!(entries.len(), 12);
1334
1335 let mut my_replica = store.new_replica(myspace.clone())?;
1337 let entries_second: Vec<_> = my_replica
1338 .store
1339 .get_range(Range::new(
1340 RecordIdentifier::default(),
1341 RecordIdentifier::default(),
1342 ))?
1343 .collect::<Result<_, _>>()?;
1344
1345 assert_eq!(entries_second.len(), 12);
1346 assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1347
1348 test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1349 store.flush()?;
1350 Ok(())
1351 }
1352
1353 fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1356 #[track_caller]
1358 fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1359 assert_eq!(
1360 expected_peers,
1361 &store
1362 .get_sync_peers(&namespace)
1363 .unwrap()
1364 .unwrap()
1365 .collect::<Vec<_>>(),
1366 "sync peers differ"
1367 );
1368 }
1369
1370 let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1371 let mut expected_peers = Vec::with_capacity(count);
1373 for i in 0..count as u8 {
1374 let peer = [i; 32];
1375 expected_peers.insert(0, peer);
1376 store.register_useful_peer(namespace, peer)?;
1377 }
1378 verify_peers(store, namespace, &expected_peers);
1379
1380 expected_peers.pop();
1382 let newer_peer = [count as u8; 32];
1383 expected_peers.insert(0, newer_peer);
1384 store.register_useful_peer(namespace, newer_peer)?;
1385 verify_peers(store, namespace, &expected_peers);
1386
1387 let refreshed_peer = expected_peers.remove(2);
1389 expected_peers.insert(0, refreshed_peer);
1390 store.register_useful_peer(namespace, refreshed_peer)?;
1391 verify_peers(store, namespace, &expected_peers);
1392 Ok(())
1393 }
1394
1395 #[test]
1396 fn test_content_hashes_iterator_memory() -> Result<()> {
1397 let store = store::Store::memory();
1398 test_content_hashes_iterator(store)
1399 }
1400
1401 #[test]
1402 fn test_content_hashes_iterator_fs() -> Result<()> {
1403 let dbfile = tempfile::NamedTempFile::new()?;
1404 let store = store::fs::Store::persistent(dbfile.path())?;
1405 test_content_hashes_iterator(store)
1406 }
1407
1408 fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1409 let mut rng = rand::thread_rng();
1410 let mut expected = HashSet::new();
1411 let n_replicas = 3;
1412 let n_entries = 4;
1413 for i in 0..n_replicas {
1414 let namespace = NamespaceSecret::new(&mut rng);
1415 let author = store.new_author(&mut rng)?;
1416 let mut replica = store.new_replica(namespace)?;
1417 for j in 0..n_entries {
1418 let key = format!("{j}");
1419 let data = format!("{i}:{j}");
1420 let hash = replica.hash_and_insert(key, &author, data)?;
1421 expected.insert(hash);
1422 }
1423 }
1424 assert_eq!(expected.len(), n_replicas * n_entries);
1425 let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1426 assert_eq!(actual, expected);
1427 Ok(())
1428 }
1429
1430 #[test]
1431 fn test_multikey() {
1432 let mut rng = rand::thread_rng();
1433
1434 let k = ["a", "c", "z"];
1435
1436 let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1437 n.sort_by_key(|n| n.id());
1438
1439 let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1440 a.sort_by_key(|a| a.id());
1441
1442 {
1444 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1445 let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1446 let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1447
1448 let range = Range::new(ri0.clone(), ri2.clone());
1449 assert!(range.contains(&ri0), "start");
1450 assert!(range.contains(&ri1), "inside");
1451 assert!(!range.contains(&ri2), "end");
1452
1453 assert!(ri0 < ri1);
1454 assert!(ri1 < ri2);
1455 }
1456
1457 {
1459 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1460 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1461 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1462
1463 let range = Range::new(ri0.clone(), ri2.clone());
1464 assert!(range.contains(&ri0), "start");
1465 assert!(range.contains(&ri1), "inside");
1466 assert!(!range.contains(&ri2), "end");
1467
1468 assert!(ri0 < ri1);
1469 assert!(ri1 < ri2);
1470 }
1471
1472 {
1474 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1475 let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1476 let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1477
1478 let range = Range::new(ri0.clone(), ri2.clone());
1479 assert!(range.contains(&ri0), "start");
1480 assert!(range.contains(&ri1), "inside");
1481 assert!(!range.contains(&ri2), "end");
1482
1483 assert!(ri0 < ri1);
1484 assert!(ri1 < ri2);
1485 }
1486
1487 {
1489 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1490 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1491 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1492
1493 let range = Range::new(ri0.clone(), ri2.clone());
1494 assert!(range.contains(&ri0), "start");
1495 assert!(range.contains(&ri1), "inside");
1496 assert!(!range.contains(&ri2), "end");
1497
1498 assert!(ri0 < ri1);
1499 assert!(ri1 < ri2);
1500 }
1501
1502 {
1504 let a0 = a[0].id();
1507 let a1 = a[1].id();
1508 let n0 = n[0].id();
1509 let n1 = n[1].id();
1510 let k0 = k[0];
1511 let k1 = k[1];
1512
1513 assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1514 assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1515 assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1516 assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1517 }
1518 }
1519
1520 #[test]
1521 fn test_timestamps_memory() -> Result<()> {
1522 let store = store::Store::memory();
1523 test_timestamps(store)?;
1524
1525 Ok(())
1526 }
1527
1528 #[test]
1529 fn test_timestamps_fs() -> Result<()> {
1530 let dbfile = tempfile::NamedTempFile::new()?;
1531 let store = store::fs::Store::persistent(dbfile.path())?;
1532 test_timestamps(store)?;
1533 Ok(())
1534 }
1535
1536 fn test_timestamps(mut store: Store) -> Result<()> {
1537 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1538 let namespace = NamespaceSecret::new(&mut rng);
1539 let _replica = store.new_replica(namespace.clone())?;
1540 let author = store.new_author(&mut rng)?;
1541 store.close_replica(namespace.id());
1542 let mut replica = store.open_replica(&namespace.id())?;
1543
1544 let key = b"hello";
1545 let value = b"world";
1546 let entry = {
1547 let timestamp = 2;
1548 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1549 let record = Record::from_data(value, timestamp);
1550 Entry::new(id, record).sign(&namespace, &author)
1551 };
1552
1553 replica
1554 .insert_entry(entry.clone(), InsertOrigin::Local)
1555 .unwrap();
1556 store.close_replica(namespace.id());
1557 let res = store
1558 .get_exact(namespace.id(), author.id(), key, false)?
1559 .unwrap();
1560 assert_eq!(res, entry);
1561
1562 let entry2 = {
1563 let timestamp = 1;
1564 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1565 let record = Record::from_data(value, timestamp);
1566 Entry::new(id, record).sign(&namespace, &author)
1567 };
1568
1569 let mut replica = store.open_replica(&namespace.id())?;
1570 let res = replica.insert_entry(entry2, InsertOrigin::Local);
1571 store.close_replica(namespace.id());
1572 assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1573 let res = store
1574 .get_exact(namespace.id(), author.id(), key, false)?
1575 .unwrap();
1576 assert_eq!(res, entry);
1577 store.flush()?;
1578 Ok(())
1579 }
1580
1581 #[tokio::test]
1582 async fn test_replica_sync_memory() -> Result<()> {
1583 let alice_store = store::Store::memory();
1584 let bob_store = store::Store::memory();
1585
1586 test_replica_sync(alice_store, bob_store).await?;
1587 Ok(())
1588 }
1589
1590 #[tokio::test]
1591 async fn test_replica_sync_fs() -> Result<()> {
1592 let alice_dbfile = tempfile::NamedTempFile::new()?;
1593 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1594 let bob_dbfile = tempfile::NamedTempFile::new()?;
1595 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1596 test_replica_sync(alice_store, bob_store).await?;
1597
1598 Ok(())
1599 }
1600
1601 async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1602 let alice_set = ["ape", "eel", "fox", "gnu"];
1603 let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1604
1605 let mut rng = rand::thread_rng();
1606 let author = Author::new(&mut rng);
1607 let myspace = NamespaceSecret::new(&mut rng);
1608 let mut alice = alice_store.new_replica(myspace.clone())?;
1609 for el in &alice_set {
1610 alice.hash_and_insert(el, &author, el.as_bytes())?;
1611 }
1612
1613 let mut bob = bob_store.new_replica(myspace.clone())?;
1614 for el in &bob_set {
1615 bob.hash_and_insert(el, &author, el.as_bytes())?;
1616 }
1617
1618 let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1619
1620 assert_eq!(alice_out.num_sent, 2);
1621 assert_eq!(bob_out.num_recv, 2);
1622 assert_eq!(alice_out.num_recv, 6);
1623 assert_eq!(bob_out.num_sent, 6);
1624
1625 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1626 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1627 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1628 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1629 alice_store.flush()?;
1630 bob_store.flush()?;
1631 Ok(())
1632 }
1633
1634 #[tokio::test]
1635 async fn test_replica_timestamp_sync_memory() -> Result<()> {
1636 let alice_store = store::Store::memory();
1637 let bob_store = store::Store::memory();
1638
1639 test_replica_timestamp_sync(alice_store, bob_store).await?;
1640 Ok(())
1641 }
1642
1643 #[tokio::test]
1644 async fn test_replica_timestamp_sync_fs() -> Result<()> {
1645 let alice_dbfile = tempfile::NamedTempFile::new()?;
1646 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1647 let bob_dbfile = tempfile::NamedTempFile::new()?;
1648 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1649 test_replica_timestamp_sync(alice_store, bob_store).await?;
1650
1651 Ok(())
1652 }
1653
1654 async fn test_replica_timestamp_sync(
1655 mut alice_store: Store,
1656 mut bob_store: Store,
1657 ) -> Result<()> {
1658 let mut rng = rand::thread_rng();
1659 let author = Author::new(&mut rng);
1660 let namespace = NamespaceSecret::new(&mut rng);
1661 let mut alice = alice_store.new_replica(namespace.clone())?;
1662 let mut bob = bob_store.new_replica(namespace.clone())?;
1663
1664 let key = b"key";
1665 let alice_value = b"alice";
1666 let bob_value = b"bob";
1667 let _alice_hash = alice.hash_and_insert(key, &author, alice_value)?;
1668 let bob_hash = bob.hash_and_insert(key, &author, bob_value)?;
1670 sync(&mut alice, &mut bob).await?;
1671 assert_eq!(
1672 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1673 Some(bob_hash)
1674 );
1675 assert_eq!(
1676 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1677 Some(bob_hash)
1678 );
1679
1680 let mut alice = alice_store.new_replica(namespace.clone())?;
1681 let mut bob = bob_store.new_replica(namespace.clone())?;
1682
1683 let alice_value_2 = b"alice2";
1684 let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value)?;
1686 let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2)?;
1687 sync(&mut alice, &mut bob).await?;
1688 assert_eq!(
1689 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1690 Some(alice_hash_2)
1691 );
1692 assert_eq!(
1693 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1694 Some(alice_hash_2)
1695 );
1696 alice_store.flush()?;
1697 bob_store.flush()?;
1698 Ok(())
1699 }
1700
1701 #[test]
1702 fn test_future_timestamp() -> Result<()> {
1703 let mut rng = rand::thread_rng();
1704 let mut store = store::Store::memory();
1705 let author = Author::new(&mut rng);
1706 let namespace = NamespaceSecret::new(&mut rng);
1707
1708 let mut replica = store.new_replica(namespace.clone())?;
1709 let key = b"hi";
1710 let t = system_time_now();
1711 let record = Record::from_data(b"1", t);
1712 let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1713 replica.insert_entry(entry0.clone(), InsertOrigin::Local)?;
1714
1715 assert_eq!(
1716 get_entry(&mut store, namespace.id(), author.id(), key)?,
1717 entry0
1718 );
1719
1720 let mut replica = store.new_replica(namespace.clone())?;
1721 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1722 let record = Record::from_data(b"2", t);
1723 let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1724 replica.insert_entry(entry1.clone(), InsertOrigin::Local)?;
1725 assert_eq!(
1726 get_entry(&mut store, namespace.id(), author.id(), key)?,
1727 entry1
1728 );
1729
1730 let mut replica = store.new_replica(namespace.clone())?;
1731 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1732 let record = Record::from_data(b"2", t);
1733 let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1734 replica.insert_entry(entry2.clone(), InsertOrigin::Local)?;
1735 assert_eq!(
1736 get_entry(&mut store, namespace.id(), author.id(), key)?,
1737 entry2
1738 );
1739
1740 let mut replica = store.new_replica(namespace.clone())?;
1741 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1742 let record = Record::from_data(b"2", t);
1743 let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1744 let res = replica.insert_entry(entry3, InsertOrigin::Local);
1745 assert!(matches!(
1746 res,
1747 Err(InsertError::Validation(
1748 ValidationFailure::TooFarInTheFuture
1749 ))
1750 ));
1751 assert_eq!(
1752 get_entry(&mut store, namespace.id(), author.id(), key)?,
1753 entry2
1754 );
1755 store.flush()?;
1756 Ok(())
1757 }
1758
1759 #[test]
1760 fn test_insert_empty() -> Result<()> {
1761 let mut store = store::Store::memory();
1762 let mut rng = rand::thread_rng();
1763 let alice = Author::new(&mut rng);
1764 let myspace = NamespaceSecret::new(&mut rng);
1765 let mut replica = store.new_replica(myspace.clone())?;
1766 let hash = Hash::new(b"");
1767 let res = replica.insert(b"foo", &alice, hash, 0);
1768 assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1769 store.flush()?;
1770 Ok(())
1771 }
1772
1773 #[test]
1774 fn test_prefix_delete_memory() -> Result<()> {
1775 let store = store::Store::memory();
1776 test_prefix_delete(store)?;
1777 Ok(())
1778 }
1779
1780 #[test]
1781 fn test_prefix_delete_fs() -> Result<()> {
1782 let dbfile = tempfile::NamedTempFile::new()?;
1783 let store = store::fs::Store::persistent(dbfile.path())?;
1784 test_prefix_delete(store)?;
1785 Ok(())
1786 }
1787
1788 fn test_prefix_delete(mut store: Store) -> Result<()> {
1789 let mut rng = rand::thread_rng();
1790 let alice = Author::new(&mut rng);
1791 let myspace = NamespaceSecret::new(&mut rng);
1792 let mut replica = store.new_replica(myspace.clone())?;
1793 let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello")?;
1794 let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world")?;
1795
1796 assert_eq!(
1798 get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1799 Some(hash1)
1800 );
1801 assert_eq!(
1802 get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1803 Some(hash2)
1804 );
1805
1806 let mut replica = store.new_replica(myspace.clone())?;
1808 let deleted = replica.delete_prefix(b"foo", &alice)?;
1809 assert_eq!(deleted, 2);
1810 assert_eq!(
1811 store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1812 None
1813 );
1814 assert_eq!(
1815 store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1816 None
1817 );
1818 assert_eq!(
1819 store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1820 None
1821 );
1822 store.flush()?;
1823 Ok(())
1824 }
1825
1826 #[tokio::test]
1827 async fn test_replica_sync_delete_memory() -> Result<()> {
1828 let alice_store = store::Store::memory();
1829 let bob_store = store::Store::memory();
1830
1831 test_replica_sync_delete(alice_store, bob_store).await
1832 }
1833
1834 #[tokio::test]
1835 async fn test_replica_sync_delete_fs() -> Result<()> {
1836 let alice_dbfile = tempfile::NamedTempFile::new()?;
1837 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1838 let bob_dbfile = tempfile::NamedTempFile::new()?;
1839 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1840 test_replica_sync_delete(alice_store, bob_store).await
1841 }
1842
1843 async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1844 let alice_set = ["foot"];
1845 let bob_set = ["fool", "foo", "fog"];
1846
1847 let mut rng = rand::thread_rng();
1848 let author = Author::new(&mut rng);
1849 let myspace = NamespaceSecret::new(&mut rng);
1850 let mut alice = alice_store.new_replica(myspace.clone())?;
1851 for el in &alice_set {
1852 alice.hash_and_insert(el, &author, el.as_bytes())?;
1853 }
1854
1855 let mut bob = bob_store.new_replica(myspace.clone())?;
1856 for el in &bob_set {
1857 bob.hash_and_insert(el, &author, el.as_bytes())?;
1858 }
1859
1860 sync(&mut alice, &mut bob).await?;
1861
1862 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1863 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1864 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1865 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1866
1867 let mut alice = alice_store.new_replica(myspace.clone())?;
1868 let mut bob = bob_store.new_replica(myspace.clone())?;
1869 alice.delete_prefix("foo", &author)?;
1870 bob.hash_and_insert("fooz", &author, "fooz".as_bytes())?;
1871 sync(&mut alice, &mut bob).await?;
1872 check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1873 check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1874 alice_store.flush()?;
1875 bob_store.flush()?;
1876 Ok(())
1877 }
1878
1879 #[test]
1880 fn test_replica_remove_memory() -> Result<()> {
1881 let alice_store = store::Store::memory();
1882 test_replica_remove(alice_store)
1883 }
1884
1885 #[test]
1886 fn test_replica_remove_fs() -> Result<()> {
1887 let alice_dbfile = tempfile::NamedTempFile::new()?;
1888 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1889 test_replica_remove(alice_store)
1890 }
1891
1892 fn test_replica_remove(mut store: Store) -> Result<()> {
1893 let mut rng = rand::thread_rng();
1894 let namespace = NamespaceSecret::new(&mut rng);
1895 let author = Author::new(&mut rng);
1896 let mut replica = store.new_replica(namespace.clone())?;
1897
1898 let hash = replica.hash_and_insert(b"foo", &author, b"bar")?;
1900 let res = store
1901 .get_many(namespace.id(), Query::all())?
1902 .collect::<Vec<_>>();
1903 assert_eq!(res.len(), 1);
1904
1905 let res = store.remove_replica(&namespace.id());
1907 assert!(res.is_err());
1909 store.close_replica(namespace.id());
1910 store.remove_replica(&namespace.id())?;
1911 let res = store
1912 .get_many(namespace.id(), Query::all())?
1913 .collect::<Vec<_>>();
1914 assert_eq!(res.len(), 0);
1915
1916 let res = store.load_replica_info(&namespace.id());
1918 assert!(matches!(res, Err(OpenError::NotFound)));
1919
1920 let mut replica = store.new_replica(namespace.clone())?;
1922 replica.insert(b"foo", &author, hash, 3)?;
1923 let res = store
1924 .get_many(namespace.id(), Query::all())?
1925 .collect::<Vec<_>>();
1926 assert_eq!(res.len(), 1);
1927 store.flush()?;
1928 Ok(())
1929 }
1930
1931 #[test]
1932 fn test_replica_delete_edge_cases_memory() -> Result<()> {
1933 let store = store::Store::memory();
1934 test_replica_delete_edge_cases(store)
1935 }
1936
1937 #[test]
1938 fn test_replica_delete_edge_cases_fs() -> Result<()> {
1939 let dbfile = tempfile::NamedTempFile::new()?;
1940 let store = store::fs::Store::persistent(dbfile.path())?;
1941 test_replica_delete_edge_cases(store)
1942 }
1943
1944 fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
1945 let mut rng = rand::thread_rng();
1946 let author = Author::new(&mut rng);
1947 let namespace = NamespaceSecret::new(&mut rng);
1948
1949 let edgecases = [0u8, 1u8, 255u8];
1950 let prefixes = [0u8, 255u8];
1951 let hash = Hash::new(b"foo");
1952 let len = 3;
1953 for prefix in prefixes {
1954 let mut expected = vec![];
1955 let mut replica = store.new_replica(namespace.clone())?;
1956 for suffix in edgecases {
1957 let key = [prefix, suffix].to_vec();
1958 expected.push(key.clone());
1959 replica.insert(&key, &author, hash, len)?;
1960 }
1961 assert_keys(&mut store, namespace.id(), expected);
1962 let mut replica = store.new_replica(namespace.clone())?;
1963 replica.delete_prefix([prefix], &author)?;
1964 assert_keys(&mut store, namespace.id(), vec![]);
1965 }
1966
1967 let mut replica = store.new_replica(namespace.clone())?;
1968 let key = vec![1u8, 0u8];
1969 replica.insert(key, &author, hash, len)?;
1970 let key = vec![1u8, 1u8];
1971 replica.insert(key, &author, hash, len)?;
1972 let key = vec![1u8, 2u8];
1973 replica.insert(key, &author, hash, len)?;
1974 let prefix = vec![1u8, 1u8];
1975 replica.delete_prefix(prefix, &author)?;
1976 assert_keys(
1977 &mut store,
1978 namespace.id(),
1979 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1980 );
1981
1982 let mut replica = store.new_replica(namespace.clone())?;
1983 let key = vec![0u8, 255u8];
1984 replica.insert(key, &author, hash, len)?;
1985 let key = vec![0u8, 0u8];
1986 replica.insert(key, &author, hash, len)?;
1987 let prefix = vec![0u8];
1988 replica.delete_prefix(prefix, &author)?;
1989 assert_keys(
1990 &mut store,
1991 namespace.id(),
1992 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1993 );
1994 store.flush()?;
1995 Ok(())
1996 }
1997
1998 #[test]
1999 fn test_latest_iter_memory() -> Result<()> {
2000 let store = store::Store::memory();
2001 test_latest_iter(store)
2002 }
2003
2004 #[test]
2005 fn test_latest_iter_fs() -> Result<()> {
2006 let dbfile = tempfile::NamedTempFile::new()?;
2007 let store = store::fs::Store::persistent(dbfile.path())?;
2008 test_latest_iter(store)
2009 }
2010
2011 fn test_latest_iter(mut store: Store) -> Result<()> {
2012 let mut rng = rand::thread_rng();
2013 let author0 = Author::new(&mut rng);
2014 let author1 = Author::new(&mut rng);
2015 let namespace = NamespaceSecret::new(&mut rng);
2016 let mut replica = store.new_replica(namespace.clone())?;
2017
2018 replica.hash_and_insert(b"a0.1", &author0, b"hi")?;
2019 let latest = store
2020 .get_latest_for_each_author(namespace.id())?
2021 .collect::<Result<Vec<_>>>()?;
2022 assert_eq!(latest.len(), 1);
2023 assert_eq!(latest[0].2, b"a0.1".to_vec());
2024
2025 let mut replica = store.new_replica(namespace.clone())?;
2026 replica.hash_and_insert(b"a1.1", &author1, b"hi")?;
2027 replica.hash_and_insert(b"a0.2", &author0, b"hi")?;
2028 let latest = store
2029 .get_latest_for_each_author(namespace.id())?
2030 .collect::<Result<Vec<_>>>()?;
2031 let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2032 latest_keys.sort();
2033 assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2034 store.flush()?;
2035 Ok(())
2036 }
2037
2038 #[test]
2039 fn test_replica_byte_keys_memory() -> Result<()> {
2040 let store = store::Store::memory();
2041
2042 test_replica_byte_keys(store)?;
2043 Ok(())
2044 }
2045
2046 #[test]
2047 fn test_replica_byte_keys_fs() -> Result<()> {
2048 let dbfile = tempfile::NamedTempFile::new()?;
2049 let store = store::fs::Store::persistent(dbfile.path())?;
2050 test_replica_byte_keys(store)?;
2051
2052 Ok(())
2053 }
2054
2055 fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2056 let mut rng = rand::thread_rng();
2057 let author = Author::new(&mut rng);
2058 let namespace = NamespaceSecret::new(&mut rng);
2059
2060 let hash = Hash::new(b"foo");
2061 let len = 3;
2062
2063 let key = vec![1u8, 0u8];
2064 let mut replica = store.new_replica(namespace.clone())?;
2065 replica.insert(key, &author, hash, len)?;
2066 assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2067 let key = vec![1u8, 2u8];
2068 let mut replica = store.new_replica(namespace.clone())?;
2069 replica.insert(key, &author, hash, len)?;
2070 assert_keys(
2071 &mut store,
2072 namespace.id(),
2073 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2074 );
2075
2076 let key = vec![0u8, 255u8];
2077 let mut replica = store.new_replica(namespace.clone())?;
2078 replica.insert(key, &author, hash, len)?;
2079 assert_keys(
2080 &mut store,
2081 namespace.id(),
2082 vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2083 );
2084 store.flush()?;
2085 Ok(())
2086 }
2087
2088 #[test]
2089 fn test_replica_capability_memory() -> Result<()> {
2090 let store = store::Store::memory();
2091 test_replica_capability(store)
2092 }
2093
2094 #[test]
2095 fn test_replica_capability_fs() -> Result<()> {
2096 let dbfile = tempfile::NamedTempFile::new()?;
2097 let store = store::fs::Store::persistent(dbfile.path())?;
2098 test_replica_capability(store)
2099 }
2100
2101 #[allow(clippy::redundant_pattern_matching)]
2102 fn test_replica_capability(mut store: Store) -> Result<()> {
2103 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2104 let author = store.new_author(&mut rng)?;
2105 let namespace = NamespaceSecret::new(&mut rng);
2106
2107 let capability = Capability::Read(namespace.id());
2109 store.import_namespace(capability)?;
2110 let mut replica = store.open_replica(&namespace.id())?;
2111 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2112 assert!(matches!(res, Err(InsertError::ReadOnly)));
2113
2114 let capability = Capability::Write(namespace.clone());
2116 store.import_namespace(capability)?;
2117 let mut replica = store.open_replica(&namespace.id())?;
2118 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2119 assert!(matches!(res, Ok(_)));
2120 store.close_replica(namespace.id());
2121 let mut replica = store.open_replica(&namespace.id())?;
2122 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2123 assert!(res.is_ok());
2124
2125 let capability = Capability::Read(namespace.id());
2127 store.import_namespace(capability)?;
2128 store.close_replica(namespace.id());
2129 let mut replica = store.open_replica(&namespace.id())?;
2130 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2131 assert!(res.is_ok());
2132 store.flush()?;
2133 Ok(())
2134 }
2135
2136 #[tokio::test]
2137 async fn test_actor_capability_memory() -> Result<()> {
2138 let store = store::Store::memory();
2139 test_actor_capability(store).await
2140 }
2141
2142 #[tokio::test]
2143 async fn test_actor_capability_fs() -> Result<()> {
2144 let dbfile = tempfile::NamedTempFile::new()?;
2145 let store = store::fs::Store::persistent(dbfile.path())?;
2146 test_actor_capability(store).await
2147 }
2148
2149 async fn test_actor_capability(store: Store) -> Result<()> {
2150 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2152 let author = Author::new(&mut rng);
2153 let handle = SyncHandle::spawn(store, None, "test".into());
2154 let author = handle.import_author(author).await?;
2155 let namespace = NamespaceSecret::new(&mut rng);
2156 let id = namespace.id();
2157
2158 let capability = Capability::Read(namespace.id());
2160 handle.import_namespace(capability).await?;
2161 handle.open(namespace.id(), Default::default()).await?;
2162 let res = handle
2163 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2164 .await;
2165 assert!(res.is_err());
2166
2167 let capability = Capability::Write(namespace.clone());
2169 handle.import_namespace(capability).await?;
2170 let res = handle
2171 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2172 .await;
2173 assert!(res.is_ok());
2174
2175 handle.close(namespace.id()).await?;
2177 let res = handle
2178 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2179 .await;
2180 assert!(res.is_err());
2181 handle.open(namespace.id(), Default::default()).await?;
2182 let res = handle
2183 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2184 .await;
2185 assert!(res.is_ok());
2186 Ok(())
2187 }
2188
2189 fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2190 let mut res = vec![];
2191 while let Ok(ev) = events.try_recv() {
2192 res.push(ev);
2193 }
2194 res
2195 }
2196
2197 #[tokio::test]
2200 async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2201 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2202 let mut store1 = store::Store::memory();
2203 let mut store2 = store::Store::memory();
2204 let peer1 = [1u8; 32];
2205 let peer2 = [2u8; 32];
2206 let mut state1 = SyncOutcome::default();
2207 let mut state2 = SyncOutcome::default();
2208
2209 let author = Author::new(&mut rng);
2210 let namespace = NamespaceSecret::new(&mut rng);
2211 let mut replica1 = store1.new_replica(namespace.clone())?;
2212 let mut replica2 = store2.new_replica(namespace.clone())?;
2213
2214 let (events1_sender, events1) = async_channel::bounded(32);
2215 let (events2_sender, events2) = async_channel::bounded(32);
2216
2217 replica1.info.subscribe(events1_sender);
2218 replica2.info.subscribe(events2_sender);
2219
2220 replica1.hash_and_insert(b"foo", &author, b"init")?;
2221
2222 let from1 = replica1.sync_initial_message()?;
2223 let from2 = replica2
2224 .sync_process_message(from1, peer1, &mut state2)
2225 .await
2226 .unwrap()
2227 .unwrap();
2228 let from1 = replica1
2229 .sync_process_message(from2, peer2, &mut state1)
2230 .await
2231 .unwrap()
2232 .unwrap();
2233 replica2.hash_and_insert(b"foo", &author, b"update")?;
2237 let from2 = replica2
2238 .sync_process_message(from1, peer1, &mut state2)
2239 .await
2240 .unwrap();
2241 assert!(from2.is_none());
2242 let events1 = drain(events1);
2243 let events2 = drain(events2);
2244 assert_eq!(events1.len(), 1);
2245 assert_eq!(events2.len(), 1);
2246 assert!(matches!(events1[0], Event::LocalInsert { .. }));
2247 assert!(matches!(events2[0], Event::LocalInsert { .. }));
2248 assert_eq!(state1.num_sent, 1);
2249 assert_eq!(state1.num_recv, 0);
2250 assert_eq!(state2.num_sent, 0);
2251 assert_eq!(state2.num_recv, 1);
2252 store1.flush()?;
2253 store2.flush()?;
2254 Ok(())
2255 }
2256
2257 #[test]
2258 fn test_replica_queries_mem() -> Result<()> {
2259 let store = store::Store::memory();
2260
2261 test_replica_queries(store)?;
2262 Ok(())
2263 }
2264
2265 #[test]
2266 fn test_replica_queries_fs() -> Result<()> {
2267 let dbfile = tempfile::NamedTempFile::new()?;
2268 let store = store::fs::Store::persistent(dbfile.path())?;
2269 test_replica_queries(store)?;
2270
2271 Ok(())
2272 }
2273
2274 fn test_replica_queries(mut store: Store) -> Result<()> {
2275 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2276 let namespace = NamespaceSecret::new(&mut rng);
2277 let namespace_id = namespace.id();
2278
2279 let a1 = store.new_author(&mut rng)?;
2280 let a2 = store.new_author(&mut rng)?;
2281 let a3 = store.new_author(&mut rng)?;
2282 println!(
2283 "a1 {} a2 {} a3 {}",
2284 a1.id().fmt_short(),
2285 a2.id().fmt_short(),
2286 a3.id().fmt_short()
2287 );
2288
2289 let mut replica = store.new_replica(namespace.clone())?;
2290 replica.hash_and_insert("hi/world", &a2, "a2")?;
2291 replica.hash_and_insert("hi/world", &a1, "a1")?;
2292 replica.hash_and_insert("hi/moon", &a2, "a1")?;
2293 replica.hash_and_insert("hi", &a3, "a3")?;
2294
2295 struct QueryTester<'a> {
2296 store: &'a mut Store,
2297 namespace: NamespaceId,
2298 }
2299 impl QueryTester<'_> {
2300 fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2301 let query = query.into();
2302 let actual = self
2303 .store
2304 .get_many(self.namespace, query.clone())
2305 .unwrap()
2306 .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2307 .collect::<Result<Vec<_>>>()
2308 .unwrap();
2309 let expected = expected
2310 .into_iter()
2311 .map(|(key, author)| (key.to_string(), author.id()))
2312 .collect::<Vec<_>>();
2313 assert_eq!(actual, expected, "query: {query:#?}")
2314 }
2315 }
2316
2317 let mut qt = QueryTester {
2318 store: &mut store,
2319 namespace: namespace_id,
2320 };
2321
2322 qt.assert(
2323 Query::all(),
2324 vec![
2325 ("hi/world", &a1),
2326 ("hi/moon", &a2),
2327 ("hi/world", &a2),
2328 ("hi", &a3),
2329 ],
2330 );
2331
2332 qt.assert(
2333 Query::single_latest_per_key(),
2334 vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2335 );
2336
2337 qt.assert(
2338 Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2339 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2340 );
2341
2342 qt.assert(
2343 Query::single_latest_per_key().key_prefix("hi/"),
2344 vec![("hi/moon", &a2), ("hi/world", &a1)],
2345 );
2346
2347 qt.assert(
2348 Query::single_latest_per_key()
2349 .key_prefix("hi/")
2350 .sort_direction(SortDirection::Desc),
2351 vec![("hi/world", &a1), ("hi/moon", &a2)],
2352 );
2353
2354 qt.assert(
2355 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2356 vec![
2357 ("hi", &a3),
2358 ("hi/moon", &a2),
2359 ("hi/world", &a1),
2360 ("hi/world", &a2),
2361 ],
2362 );
2363
2364 qt.assert(
2365 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2366 vec![
2367 ("hi/world", &a2),
2368 ("hi/world", &a1),
2369 ("hi/moon", &a2),
2370 ("hi", &a3),
2371 ],
2372 );
2373
2374 qt.assert(
2375 Query::all().key_prefix("hi/"),
2376 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2377 );
2378
2379 qt.assert(
2380 Query::all().key_prefix("hi/").offset(1).limit(1),
2381 vec![("hi/moon", &a2)],
2382 );
2383
2384 qt.assert(
2385 Query::all()
2386 .key_prefix("hi/")
2387 .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2388 vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2389 );
2390
2391 qt.assert(
2392 Query::all()
2393 .key_prefix("hi/")
2394 .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2395 .offset(1)
2396 .limit(1),
2397 vec![("hi/world", &a1)],
2398 );
2399
2400 qt.assert(
2401 Query::all()
2402 .key_prefix("hi/")
2403 .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2404 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2405 );
2406
2407 qt.assert(
2408 Query::all()
2409 .key_prefix("hi/")
2410 .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2411 vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2412 );
2413
2414 qt.assert(
2415 Query::all()
2416 .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2417 .limit(2)
2418 .offset(1),
2419 vec![("hi/moon", &a2), ("hi/world", &a1)],
2420 );
2421
2422 let mut replica = store.new_replica(namespace)?;
2423 replica.delete_prefix("hi/world", &a2)?;
2424 let mut qt = QueryTester {
2425 store: &mut store,
2426 namespace: namespace_id,
2427 };
2428
2429 qt.assert(
2430 Query::all(),
2431 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2432 );
2433
2434 qt.assert(
2435 Query::all().include_empty(),
2436 vec![
2437 ("hi/world", &a1),
2438 ("hi/moon", &a2),
2439 ("hi/world", &a2),
2440 ("hi", &a3),
2441 ],
2442 );
2443 store.flush()?;
2444 Ok(())
2445 }
2446
2447 #[test]
2448 fn test_dl_policies_mem() -> Result<()> {
2449 let mut store = store::Store::memory();
2450 test_dl_policies(&mut store)
2451 }
2452
2453 #[test]
2454 fn test_dl_policies_fs() -> Result<()> {
2455 let dbfile = tempfile::NamedTempFile::new()?;
2456 let mut store = store::fs::Store::persistent(dbfile.path())?;
2457 test_dl_policies(&mut store)
2458 }
2459
2460 fn test_dl_policies(store: &mut Store) -> Result<()> {
2461 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2462 let namespace = NamespaceSecret::new(&mut rng);
2463 let id = namespace.id();
2464
2465 let filter = store::FilterKind::Exact("foo".into());
2466 let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2467 store
2468 .set_download_policy(&id, policy.clone())
2469 .expect_err("document dos not exist");
2470
2471 store.new_replica(namespace)?;
2473
2474 store.set_download_policy(&id, policy.clone())?;
2475 let retrieved_policy = store.get_download_policy(&id)?;
2476 assert_eq!(retrieved_policy, policy);
2477 store.flush()?;
2478 Ok(())
2479 }
2480
2481 fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2482 expected.sort();
2483 assert_eq!(expected, get_keys_sorted(store, namespace));
2484 }
2485
2486 fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2487 let mut res = store
2488 .get_many(namespace, Query::all())
2489 .unwrap()
2490 .map(|e| e.map(|e| e.key().to_vec()))
2491 .collect::<Result<Vec<_>>>()
2492 .unwrap();
2493 res.sort();
2494 res
2495 }
2496
2497 fn get_entry(
2498 store: &mut Store,
2499 namespace: NamespaceId,
2500 author: AuthorId,
2501 key: &[u8],
2502 ) -> anyhow::Result<SignedEntry> {
2503 let entry = store
2504 .get_exact(namespace, author, key, true)?
2505 .ok_or_else(|| anyhow::anyhow!("not found"))?;
2506 Ok(entry)
2507 }
2508
2509 fn get_content_hash(
2510 store: &mut Store,
2511 namespace: NamespaceId,
2512 author: AuthorId,
2513 key: &[u8],
2514 ) -> anyhow::Result<Option<Hash>> {
2515 let hash = store
2516 .get_exact(namespace, author, key, false)?
2517 .map(|e| e.content_hash());
2518 Ok(hash)
2519 }
2520
2521 async fn sync<'a>(
2522 alice: &'a mut Replica<'a>,
2523 bob: &'a mut Replica<'a>,
2524 ) -> Result<(SyncOutcome, SyncOutcome)> {
2525 let alice_peer_id = [1u8; 32];
2526 let bob_peer_id = [2u8; 32];
2527 let mut alice_state = SyncOutcome::default();
2528 let mut bob_state = SyncOutcome::default();
2529 let mut next_to_bob = Some(alice.sync_initial_message()?);
2531 let mut rounds = 0;
2532 while let Some(msg) = next_to_bob.take() {
2533 assert!(rounds < 100, "too many rounds");
2534 rounds += 1;
2535 println!("round {rounds}");
2536 if let Some(msg) = bob
2537 .sync_process_message(msg, alice_peer_id, &mut bob_state)
2538 .await?
2539 {
2540 next_to_bob = alice
2541 .sync_process_message(msg, bob_peer_id, &mut alice_state)
2542 .await?
2543 }
2544 }
2545 assert_eq!(alice_state.num_sent, bob_state.num_recv);
2546 assert_eq!(alice_state.num_recv, bob_state.num_sent);
2547 Ok((alice_state, bob_state))
2548 }
2549
2550 fn check_entries(
2551 store: &mut Store,
2552 namespace: &NamespaceId,
2553 author: &Author,
2554 set: &[&str],
2555 ) -> Result<()> {
2556 for el in set {
2557 store.get_exact(*namespace, author.id(), el, false)?;
2558 }
2559 Ok(())
2560 }
2561}