iroh_docs/
sync.rs

1//! API for iroh-docs replicas
2
3// Names and concepts are roughly based on Willows design at the moment:
4//
5// https://hackmd.io/DTtck8QOQm6tZaQBBtTf7w
6//
7// This is going to change!
8
9use std::{
10    cmp::Ordering,
11    fmt::Debug,
12    ops::{Deref, DerefMut},
13    sync::Arc,
14    time::{Duration, SystemTime},
15};
16
17use bytes::{Bytes, BytesMut};
18use ed25519_dalek::{Signature, SignatureError};
19use iroh_blobs::Hash;
20use serde::{Deserialize, Serialize};
21
22pub use crate::heads::AuthorHeads;
23use crate::{
24    keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
25    ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
26    store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
27};
28
29/// Protocol message for the set reconciliation protocol.
30///
31/// Can be serialized to bytes with [serde] to transfer between peers.
32pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
33
34/// Byte representation of a `PeerId` from `iroh-net`.
35// TODO: PeerId is in iroh-net which iroh-docs doesn't depend on. Add iroh-base crate with `PeerId`.
36pub type PeerIdBytes = [u8; 32];
37
38/// Max time in the future from our wall clock time that we accept entries for.
39/// Value is 10 minutes.
40pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
41
42/// Callback that may be set on a replica to determine the availability status for a content hash.
43pub type ContentStatusCallback =
44    Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
45
46/// Event emitted by sync when entries are added.
47#[derive(Debug, Clone)]
48pub enum Event {
49    /// A local entry has been added.
50    LocalInsert {
51        /// Document in which the entry was inserted.
52        namespace: NamespaceId,
53        /// Inserted entry.
54        entry: SignedEntry,
55    },
56    /// A remote entry has been added.
57    RemoteInsert {
58        /// Document in which the entry was inserted.
59        namespace: NamespaceId,
60        /// Inserted entry.
61        entry: SignedEntry,
62        /// Peer that provided the inserted entry.
63        from: PeerIdBytes,
64        /// Whether download policies require the content to be downloaded.
65        should_download: bool,
66        /// [`ContentStatus`] for this entry in the remote's replica.
67        remote_content_status: ContentStatus,
68    },
69}
70
71/// Whether an entry was inserted locally or by a remote peer.
72#[derive(Debug, Clone)]
73pub enum InsertOrigin {
74    /// The entry was inserted locally.
75    Local,
76    /// The entry was received from the remote node identified by [`PeerIdBytes`].
77    Sync {
78        /// The peer from which we received this entry.
79        from: PeerIdBytes,
80        /// Whether the peer claims to have the content blob for this entry.
81        remote_content_status: ContentStatus,
82    },
83}
84
85/// Whether the content status is available on a node.
86#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
87pub enum ContentStatus {
88    /// The content is completely available.
89    Complete,
90    /// The content is partially available.
91    Incomplete,
92    /// The content is missing.
93    Missing,
94}
95
96/// Outcome of a sync operation.
97#[derive(Debug, Clone, Default)]
98pub struct SyncOutcome {
99    /// Timestamp of the latest entry for each author in the set we received.
100    pub heads_received: AuthorHeads,
101    /// Number of entries we received.
102    pub num_recv: usize,
103    /// Number of entries we sent.
104    pub num_sent: usize,
105}
106
107fn get_as_ptr<T>(value: &T) -> Option<usize> {
108    use std::mem;
109    if mem::size_of::<T>() == std::mem::size_of::<usize>()
110        && mem::align_of::<T>() == mem::align_of::<usize>()
111    {
112        // Safe only if size and alignment requirements are met
113        unsafe { Some(mem::transmute_copy(value)) }
114    } else {
115        None
116    }
117}
118
119fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
120    get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
121}
122
123#[derive(Debug, Default)]
124struct Subscribers(Vec<async_channel::Sender<Event>>);
125impl Subscribers {
126    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
127        self.0.push(sender)
128    }
129    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
130        self.0.retain(|s| !same_channel(s, sender));
131    }
132    pub fn send(&mut self, event: Event) {
133        self.0
134            .retain(|sender| sender.send_blocking(event.clone()).is_ok())
135    }
136    pub fn len(&self) -> usize {
137        self.0.len()
138    }
139    pub fn send_with(&mut self, f: impl FnOnce() -> Event) {
140        if !self.0.is_empty() {
141            self.send(f())
142        }
143    }
144}
145
146/// Kind of capability of the namespace.
147#[derive(
148    Debug,
149    Clone,
150    Copy,
151    Serialize,
152    Deserialize,
153    num_enum::IntoPrimitive,
154    num_enum::TryFromPrimitive,
155    strum::Display,
156)]
157#[repr(u8)]
158#[strum(serialize_all = "snake_case")]
159pub enum CapabilityKind {
160    /// A writable replica.
161    Write = 1,
162    /// A readable replica.
163    Read = 2,
164}
165
166/// The capability of the namespace.
167#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
168pub enum Capability {
169    /// Write access to the namespace.
170    Write(NamespaceSecret),
171    /// Read only access to the namespace.
172    Read(NamespaceId),
173}
174
175impl Capability {
176    /// Get the [`NamespaceId`] for this [`Capability`].
177    pub fn id(&self) -> NamespaceId {
178        match self {
179            Capability::Write(secret) => secret.id(),
180            Capability::Read(id) => *id,
181        }
182    }
183
184    /// Get the [`NamespaceSecret`] of this [`Capability`].
185    /// Will fail if the [`Capability`] is read only.
186    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
187        match self {
188            Capability::Write(secret) => Ok(secret),
189            Capability::Read(_) => Err(ReadOnly),
190        }
191    }
192
193    /// Get the kind of capability.
194    pub fn kind(&self) -> CapabilityKind {
195        match self {
196            Capability::Write(_) => CapabilityKind::Write,
197            Capability::Read(_) => CapabilityKind::Read,
198        }
199    }
200
201    /// Get the raw representation of this namespace capability.
202    pub fn raw(&self) -> (u8, [u8; 32]) {
203        let capability_repr: u8 = self.kind().into();
204        let bytes = match self {
205            Capability::Write(secret) => secret.to_bytes(),
206            Capability::Read(id) => id.to_bytes(),
207        };
208        (capability_repr, bytes)
209    }
210
211    /// Create a [`Capability`] from its raw representation.
212    pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
213        let kind: CapabilityKind = kind.try_into()?;
214        let capability = match kind {
215            CapabilityKind::Write => {
216                let secret = NamespaceSecret::from_bytes(bytes);
217                Capability::Write(secret)
218            }
219            CapabilityKind::Read => {
220                let id = NamespaceId::from(bytes);
221                Capability::Read(id)
222            }
223        };
224        Ok(capability)
225    }
226
227    /// Merge this capability with another capability.
228    ///
229    /// Will return an error if `other` is not a capability for the same namespace.
230    ///
231    /// Returns `true` if the capability was changed, `false` otherwise.
232    pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
233        if other.id() != self.id() {
234            return Err(CapabilityError::NamespaceMismatch);
235        }
236
237        // the only capability upgrade is from read-only (self) to writable (other)
238        if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
239            let _ = std::mem::replace(self, other);
240            Ok(true)
241        } else {
242            Ok(false)
243        }
244    }
245}
246
247/// Errors for capability operations
248#[derive(Debug, thiserror::Error)]
249pub enum CapabilityError {
250    /// Namespaces are not the same
251    #[error("Namespaces are not the same")]
252    NamespaceMismatch,
253}
254
255/// In memory information about an open replica.
256#[derive(derive_more::Debug)]
257pub struct ReplicaInfo {
258    pub(crate) capability: Capability,
259    subscribers: Subscribers,
260    #[debug("ContentStatusCallback")]
261    content_status_cb: Option<ContentStatusCallback>,
262    closed: bool,
263}
264
265impl ReplicaInfo {
266    /// Create a new replica.
267    pub fn new(capability: Capability) -> Self {
268        Self {
269            capability,
270            subscribers: Default::default(),
271            // on_insert_sender: RwLock::new(None),
272            content_status_cb: None,
273            closed: false,
274        }
275    }
276
277    /// Subscribe to insert events.
278    ///
279    /// When subscribing to a replica, you must ensure that the corresponding [`async_channel::Receiver`] is
280    /// received from in a loop. If not receiving, local and remote inserts will hang waiting for
281    /// the receiver to be received from.
282    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
283        self.subscribers.subscribe(sender)
284    }
285
286    /// Explicitly unsubscribe a sender.
287    ///
288    /// Simply dropping the receiver is fine too. If you cloned a single sender to subscribe to
289    /// multiple replicas, you can use this method to explicitly unsubscribe the sender from
290    /// this replica without having to drop the receiver.
291    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
292        self.subscribers.unsubscribe(sender)
293    }
294
295    /// Get the number of current event subscribers.
296    pub fn subscribers_count(&self) -> usize {
297        self.subscribers.len()
298    }
299
300    /// Set the content status callback.
301    ///
302    /// Only one callback can be active at a time. If a previous callback was registered, this
303    /// will return `false`.
304    pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
305        if self.content_status_cb.is_some() {
306            false
307        } else {
308            self.content_status_cb = Some(cb);
309            true
310        }
311    }
312
313    fn ensure_open(&self) -> Result<(), InsertError> {
314        if self.closed() {
315            Err(InsertError::Closed)
316        } else {
317            Ok(())
318        }
319    }
320
321    /// Returns true if the replica is closed.
322    ///
323    /// If a replica is closed, no further operations can be performed. A replica cannot be closed
324    /// manually, it must be closed via [`store::Store::close_replica`] or
325    /// [`store::Store::remove_replica`]
326    pub fn closed(&self) -> bool {
327        self.closed
328    }
329
330    /// Merge a capability.
331    ///
332    /// The capability must refer to the the same namespace, otherwise an error will be returned.
333    ///
334    /// This will upgrade the replica's capability when passing a `Capability::Write`.
335    /// It is a no-op if `capability` is a Capability::Read`.
336    pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
337        self.capability.merge(capability)
338    }
339}
340
341/// Local representation of a mutable, synchronizable key-value store.
342#[derive(derive_more::Debug)]
343pub struct Replica<'a, I = Box<ReplicaInfo>> {
344    pub(crate) store: StoreInstance<'a>,
345    pub(crate) info: I,
346}
347
348impl<'a, I> Replica<'a, I>
349where
350    I: Deref<Target = ReplicaInfo> + DerefMut,
351{
352    /// Create a new replica.
353    pub fn new(store: StoreInstance<'a>, info: I) -> Self {
354        Replica { info, store }
355    }
356
357    /// Insert a new record at the given key.
358    ///
359    /// The entry will by signed by the provided `author`.
360    /// The `len` must be the byte length of the data identified by `hash`.
361    ///
362    /// Returns the number of entries removed as a consequence of this insertion,
363    /// or an error either if the entry failed to validate or if a store operation failed.
364    pub fn insert(
365        &mut self,
366        key: impl AsRef<[u8]>,
367        author: &Author,
368        hash: Hash,
369        len: u64,
370    ) -> Result<usize, InsertError> {
371        if len == 0 || hash == Hash::EMPTY {
372            return Err(InsertError::EntryIsEmpty);
373        }
374        self.info.ensure_open()?;
375        let id = RecordIdentifier::new(self.id(), author.id(), key);
376        let record = Record::new_current(hash, len);
377        let entry = Entry::new(id, record);
378        let secret = self.secret_key()?;
379        let signed_entry = entry.sign(secret, author);
380        self.insert_entry(signed_entry, InsertOrigin::Local)
381    }
382
383    /// Delete entries that match the given `author` and key `prefix`.
384    ///
385    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
386    /// entries whose key starts with or is equal to the given `prefix`.
387    ///
388    /// Returns the number of entries deleted.
389    pub fn delete_prefix(
390        &mut self,
391        prefix: impl AsRef<[u8]>,
392        author: &Author,
393    ) -> Result<usize, InsertError> {
394        self.info.ensure_open()?;
395        let id = RecordIdentifier::new(self.id(), author.id(), prefix);
396        let entry = Entry::new_empty(id);
397        let signed_entry = entry.sign(self.secret_key()?, author);
398        self.insert_entry(signed_entry, InsertOrigin::Local)
399    }
400
401    /// Insert an entry into this replica which was received from a remote peer.
402    ///
403    /// This will verify both the namespace and author signatures of the entry, emit an `on_insert`
404    /// event, and insert the entry into the replica store.
405    ///
406    /// Returns the number of entries removed as a consequence of this insertion,
407    /// or an error if the entry failed to validate or if a store operation failed.
408    pub fn insert_remote_entry(
409        &mut self,
410        entry: SignedEntry,
411        received_from: PeerIdBytes,
412        content_status: ContentStatus,
413    ) -> Result<usize, InsertError> {
414        self.info.ensure_open()?;
415        entry.validate_empty()?;
416        let origin = InsertOrigin::Sync {
417            from: received_from,
418            remote_content_status: content_status,
419        };
420        self.insert_entry(entry, origin)
421    }
422
423    /// Insert a signed entry into the database.
424    ///
425    /// Returns the number of entries removed as a consequence of this insertion.
426    fn insert_entry(
427        &mut self,
428        entry: SignedEntry,
429        origin: InsertOrigin,
430    ) -> Result<usize, InsertError> {
431        let namespace = self.id();
432
433        let store = &self.store;
434        validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
435
436        let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
437        tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
438
439        let removed_count = match outcome {
440            InsertOutcome::Inserted { removed } => removed,
441            InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
442        };
443
444        let insert_event = match origin {
445            InsertOrigin::Local => Event::LocalInsert { namespace, entry },
446            InsertOrigin::Sync {
447                from,
448                remote_content_status,
449            } => {
450                let download_policy = self
451                    .store
452                    .get_download_policy(&self.id())
453                    .unwrap_or_default();
454                let should_download = download_policy.matches(entry.entry());
455                Event::RemoteInsert {
456                    namespace,
457                    entry,
458                    from,
459                    should_download,
460                    remote_content_status,
461                }
462            }
463        };
464
465        self.info.subscribers.send(insert_event);
466
467        Ok(removed_count)
468    }
469
470    /// Hashes the given data and inserts it.
471    ///
472    /// This does not store the content, just the record of it.
473    /// Returns the calculated hash.
474    pub fn hash_and_insert(
475        &mut self,
476        key: impl AsRef<[u8]>,
477        author: &Author,
478        data: impl AsRef<[u8]>,
479    ) -> Result<Hash, InsertError> {
480        self.info.ensure_open()?;
481        let len = data.as_ref().len() as u64;
482        let hash = Hash::new(data);
483        self.insert(key, author, hash, len)?;
484        Ok(hash)
485    }
486
487    /// Get the identifier for an entry in this replica.
488    pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
489        RecordIdentifier::new(self.info.capability.id(), author.id(), key)
490    }
491
492    /// Create the initial message for the set reconciliation flow with a remote peer.
493    pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
494        self.info.ensure_open().map_err(anyhow::Error::from)?;
495        self.store.initial_message()
496    }
497
498    /// Process a set reconciliation message from a remote peer.
499    ///
500    /// Returns the next message to be sent to the peer, if any.
501    pub async fn sync_process_message(
502        &mut self,
503        message: crate::ranger::Message<SignedEntry>,
504        from_peer: PeerIdBytes,
505        state: &mut SyncOutcome,
506    ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
507        self.info.ensure_open()?;
508        let my_namespace = self.id();
509        let now = system_time_now();
510
511        // update state with incoming data.
512        state.num_recv += message.value_count();
513        for (entry, _content_status) in message.values() {
514            state
515                .heads_received
516                .insert(entry.author(), entry.timestamp());
517        }
518
519        // let subscribers = std::rc::Rc::new(&mut self.subscribers);
520        // l
521        let cb = self.info.content_status_cb.clone();
522        let download_policy = self
523            .store
524            .get_download_policy(&my_namespace)
525            .unwrap_or_default();
526        let reply = self
527            .store
528            .process_message(
529                &Default::default(),
530                message,
531                // validate callback: validate incoming entries, and send to on_insert channel
532                |store, entry, content_status| {
533                    let origin = InsertOrigin::Sync {
534                        from: from_peer,
535                        remote_content_status: content_status,
536                    };
537                    validate_entry(now, store, my_namespace, entry, &origin).is_ok()
538                },
539                // on_insert callback: is called when an entry was actually inserted in the store
540                |_store, entry, content_status| {
541                    // We use `send_with` to only clone the entry if we have active subscriptions.
542                    self.info.subscribers.send_with(|| {
543                        let should_download = download_policy.matches(entry.entry());
544                        Event::RemoteInsert {
545                            from: from_peer,
546                            namespace: my_namespace,
547                            entry: entry.clone(),
548                            should_download,
549                            remote_content_status: content_status,
550                        }
551                    })
552                },
553                // content_status callback: get content status for outgoing entries
554                move |entry| {
555                    let cb = cb.clone();
556                    Box::pin(async move {
557                        if let Some(cb) = cb.as_ref() {
558                            cb(entry.content_hash()).await
559                        } else {
560                            ContentStatus::Missing
561                        }
562                    })
563                },
564            )
565            .await?;
566
567        // update state with outgoing data.
568        if let Some(ref reply) = reply {
569            state.num_sent += reply.value_count();
570        }
571
572        Ok(reply)
573    }
574
575    /// Get the namespace identifier for this [`Replica`].
576    pub fn id(&self) -> NamespaceId {
577        self.info.capability.id()
578    }
579
580    /// Get the [`Capability`] of this [`Replica`].
581    pub fn capability(&self) -> &Capability {
582        &self.info.capability
583    }
584
585    /// Get the byte representation of the [`NamespaceSecret`] key for this replica. Will fail if
586    /// the replica is read only
587    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
588        self.info.capability.secret_key()
589    }
590}
591
592/// Error that occurs trying to access the [`NamespaceSecret`] of a read-only [`Capability`].
593#[derive(Debug, thiserror::Error)]
594#[error("Replica allows read access only.")]
595pub struct ReadOnly;
596
597/// Validate a [`SignedEntry`] if it's fit to be inserted.
598///
599/// This validates that
600/// * the entry's author and namespace signatures are correct
601/// * the entry's namespace matches the current replica
602/// * the entry's timestamp is not more than 10 minutes in the future of our system time
603/// * the entry is newer than an existing entry for the same key and author, if such exists.
604fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
605    now: u64,
606    store: &S,
607    expected_namespace: NamespaceId,
608    entry: &SignedEntry,
609    origin: &InsertOrigin,
610) -> Result<(), ValidationFailure> {
611    // Verify the namespace
612    if entry.namespace() != expected_namespace {
613        return Err(ValidationFailure::InvalidNamespace);
614    }
615
616    // Verify signature for non-local entries.
617    if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
618        return Err(ValidationFailure::BadSignature);
619    }
620
621    // Verify that the timestamp of the entry is not too far in the future.
622    if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
623        return Err(ValidationFailure::TooFarInTheFuture);
624    }
625    Ok(())
626}
627
628/// Error emitted when inserting entries into a [`Replica`] failed
629#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
630pub enum InsertError {
631    /// Storage error
632    #[error("storage error")]
633    Store(anyhow::Error),
634    /// Validation failure
635    #[error("validation failure")]
636    Validation(#[from] ValidationFailure),
637    /// A newer entry exists for either this entry's key or a prefix of the key.
638    #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
639    NewerEntryExists,
640    /// Attempted to insert an empty entry.
641    #[error("Attempted to insert an empty entry")]
642    EntryIsEmpty,
643    /// Replica is read only.
644    #[error("Attempted to insert to read only replica")]
645    #[from(ReadOnly)]
646    ReadOnly,
647    /// The replica is closed, no operations may be performed.
648    #[error("replica is closed")]
649    Closed,
650}
651
652/// Reason why entry validation failed
653#[derive(thiserror::Error, Debug)]
654pub enum ValidationFailure {
655    /// Entry namespace does not match the current replica.
656    #[error("Entry namespace does not match the current replica")]
657    InvalidNamespace,
658    /// Entry signature is invalid.
659    #[error("Entry signature is invalid")]
660    BadSignature,
661    /// Entry timestamp is too far in the future.
662    #[error("Entry timestamp is too far in the future.")]
663    TooFarInTheFuture,
664    /// Entry has length 0 but not the empty hash, or the empty hash but not length 0.
665    #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
666    InvalidEmptyEntry,
667}
668
669/// A signed entry.
670#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
671pub struct SignedEntry {
672    signature: EntrySignature,
673    entry: Entry,
674}
675
676impl From<SignedEntry> for Entry {
677    fn from(value: SignedEntry) -> Self {
678        value.entry
679    }
680}
681
682impl PartialOrd for SignedEntry {
683    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
684        Some(self.cmp(other))
685    }
686}
687
688impl Ord for SignedEntry {
689    fn cmp(&self, other: &Self) -> Ordering {
690        self.entry.cmp(&other.entry)
691    }
692}
693
694impl PartialOrd for Entry {
695    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
696        Some(self.cmp(other))
697    }
698}
699
700impl Ord for Entry {
701    fn cmp(&self, other: &Self) -> Ordering {
702        self.id
703            .cmp(&other.id)
704            .then_with(|| self.record.cmp(&other.record))
705    }
706}
707
708impl SignedEntry {
709    pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
710        SignedEntry { signature, entry }
711    }
712
713    /// Create a new signed entry by signing an entry with the `namespace` and `author`.
714    pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
715        let signature = EntrySignature::from_entry(&entry, namespace, author);
716        SignedEntry { signature, entry }
717    }
718
719    /// Create a new signed entries from its parts.
720    pub fn from_parts(
721        namespace: &NamespaceSecret,
722        author: &Author,
723        key: impl AsRef<[u8]>,
724        record: Record,
725    ) -> Self {
726        let id = RecordIdentifier::new(namespace.id(), author.id(), key);
727        let entry = Entry::new(id, record);
728        Self::from_entry(entry, namespace, author)
729    }
730
731    /// Verify the signatures on this entry.
732    pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
733        self.signature.verify(
734            &self.entry,
735            &self.entry.namespace().public_key(store)?,
736            &self.entry.author().public_key(store)?,
737        )
738    }
739
740    /// Get the signature.
741    pub fn signature(&self) -> &EntrySignature {
742        &self.signature
743    }
744
745    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
746    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
747        self.entry().validate_empty()
748    }
749
750    /// Get the [`Entry`].
751    pub fn entry(&self) -> &Entry {
752        &self.entry
753    }
754
755    /// Get the content [`struct@Hash`] of the entry.
756    pub fn content_hash(&self) -> Hash {
757        self.entry().content_hash()
758    }
759
760    /// Get the content length of the entry.
761    pub fn content_len(&self) -> u64 {
762        self.entry().content_len()
763    }
764
765    /// Get the author bytes of this entry.
766    pub fn author_bytes(&self) -> AuthorId {
767        self.entry().id().author()
768    }
769
770    /// Get the key of the entry.
771    pub fn key(&self) -> &[u8] {
772        self.entry().id().key()
773    }
774
775    /// Get the timestamp of the entry.
776    pub fn timestamp(&self) -> u64 {
777        self.entry().timestamp()
778    }
779}
780
781impl RangeEntry for SignedEntry {
782    type Key = RecordIdentifier;
783    type Value = Record;
784
785    fn key(&self) -> &Self::Key {
786        &self.entry.id
787    }
788
789    fn value(&self) -> &Self::Value {
790        &self.entry.record
791    }
792
793    fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
794        let mut hasher = blake3::Hasher::new();
795        hasher.update(self.namespace().as_ref());
796        hasher.update(self.author_bytes().as_ref());
797        hasher.update(self.key());
798        hasher.update(&self.timestamp().to_be_bytes());
799        hasher.update(self.content_hash().as_bytes());
800        Fingerprint(hasher.finalize().into())
801    }
802}
803
804/// Signature over an entry.
805#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
806pub struct EntrySignature {
807    author_signature: Signature,
808    namespace_signature: Signature,
809}
810
811impl Debug for EntrySignature {
812    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
813        f.debug_struct("EntrySignature")
814            .field(
815                "namespace_signature",
816                &hex::encode(self.namespace_signature.to_bytes()),
817            )
818            .field(
819                "author_signature",
820                &hex::encode(self.author_signature.to_bytes()),
821            )
822            .finish()
823    }
824}
825
826impl EntrySignature {
827    /// Create a new signature by signing an entry with the `namespace` and `author`.
828    pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
829        // TODO: this should probably include a namespace prefix
830        // namespace in the cryptographic sense.
831        let bytes = entry.to_vec();
832        let namespace_signature = namespace.sign(&bytes);
833        let author_signature = author.sign(&bytes);
834
835        EntrySignature {
836            author_signature,
837            namespace_signature,
838        }
839    }
840
841    /// Verify that this signature was created by signing the `entry` with the
842    /// secret keys of the specified `author` and `namespace`.
843    pub fn verify(
844        &self,
845        entry: &Entry,
846        namespace: &NamespacePublicKey,
847        author: &AuthorPublicKey,
848    ) -> Result<(), SignatureError> {
849        let bytes = entry.to_vec();
850        namespace.verify(&bytes, &self.namespace_signature)?;
851        author.verify(&bytes, &self.author_signature)?;
852
853        Ok(())
854    }
855
856    pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
857        let namespace_signature = Signature::from_bytes(namespace_sig);
858        let author_signature = Signature::from_bytes(author_sig);
859
860        EntrySignature {
861            author_signature,
862            namespace_signature,
863        }
864    }
865
866    pub(crate) fn author(&self) -> &Signature {
867        &self.author_signature
868    }
869
870    pub(crate) fn namespace(&self) -> &Signature {
871        &self.namespace_signature
872    }
873}
874
875/// A single entry in a [`Replica`]
876///
877/// An entry is identified by a key, its [`Author`], and the [`Replica`]'s
878/// [`NamespaceSecret`]. Its value is the [32-byte BLAKE3 hash](iroh_blobs::Hash)
879/// of the entry's content data, the size of this content data, and a timestamp.
880#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
881pub struct Entry {
882    id: RecordIdentifier,
883    record: Record,
884}
885
886impl Entry {
887    /// Create a new entry
888    pub fn new(id: RecordIdentifier, record: Record) -> Self {
889        Entry { id, record }
890    }
891
892    /// Create a new empty entry with the current timestamp.
893    pub fn new_empty(id: RecordIdentifier) -> Self {
894        Entry {
895            id,
896            record: Record::empty_current(),
897        }
898    }
899
900    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
901    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
902        match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
903            (true, true) => Ok(()),
904            (false, false) => Ok(()),
905            (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
906            (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
907        }
908    }
909
910    /// Get the [`RecordIdentifier`] for this entry.
911    pub fn id(&self) -> &RecordIdentifier {
912        &self.id
913    }
914
915    /// Get the [`NamespaceId`] of this entry.
916    pub fn namespace(&self) -> NamespaceId {
917        self.id.namespace()
918    }
919
920    /// Get the [`AuthorId`] of this entry.
921    pub fn author(&self) -> AuthorId {
922        self.id.author()
923    }
924
925    /// Get the key of this entry.
926    pub fn key(&self) -> &[u8] {
927        self.id.key()
928    }
929
930    /// Get the [`Record`] contained in this entry.
931    pub fn record(&self) -> &Record {
932        &self.record
933    }
934
935    /// Get the content hash of the record.
936    pub fn content_hash(&self) -> Hash {
937        self.record.hash
938    }
939
940    /// Get the content length of the record.
941    pub fn content_len(&self) -> u64 {
942        self.record.len
943    }
944
945    /// Get the timestamp of the record.
946    pub fn timestamp(&self) -> u64 {
947        self.record.timestamp
948    }
949
950    /// Serialize this entry into its canonical byte representation used for signing.
951    pub fn encode(&self, out: &mut Vec<u8>) {
952        self.id.encode(out);
953        self.record.encode(out);
954    }
955
956    /// Serialize this entry into a new vector with its canonical byte representation.
957    pub fn to_vec(&self) -> Vec<u8> {
958        let mut out = Vec::new();
959        self.encode(&mut out);
960        out
961    }
962
963    /// Sign this entry with a [`NamespaceSecret`] and [`Author`].
964    pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
965        SignedEntry::from_entry(self, namespace, author)
966    }
967}
968
969const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
970const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
971const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
972
973/// The identifier of a record.
974#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
975pub struct RecordIdentifier(Bytes);
976
977impl Default for RecordIdentifier {
978    fn default() -> Self {
979        Self::new(NamespaceId::default(), AuthorId::default(), b"")
980    }
981}
982
983impl Debug for RecordIdentifier {
984    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
985        f.debug_struct("RecordIdentifier")
986            .field("namespace", &self.namespace())
987            .field("author", &self.author())
988            .field("key", &std::string::String::from_utf8_lossy(self.key()))
989            .finish()
990    }
991}
992
993impl RangeKey for RecordIdentifier {
994    #[cfg(test)]
995    fn is_prefix_of(&self, other: &Self) -> bool {
996        other.as_ref().starts_with(self.as_ref())
997    }
998}
999
1000fn system_time_now() -> u64 {
1001    SystemTime::now()
1002        .duration_since(SystemTime::UNIX_EPOCH)
1003        .expect("time drift")
1004        .as_micros() as u64
1005}
1006
1007impl RecordIdentifier {
1008    /// Create a new [`RecordIdentifier`].
1009    pub fn new(
1010        namespace: impl Into<NamespaceId>,
1011        author: impl Into<AuthorId>,
1012        key: impl AsRef<[u8]>,
1013    ) -> Self {
1014        let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1015        bytes.extend_from_slice(namespace.into().as_bytes());
1016        bytes.extend_from_slice(author.into().as_bytes());
1017        bytes.extend_from_slice(key.as_ref());
1018        Self(bytes.freeze())
1019    }
1020
1021    /// Serialize this [`RecordIdentifier`] into a mutable byte array.
1022    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1023        out.extend_from_slice(&self.0);
1024    }
1025
1026    /// Get this [`RecordIdentifier`] as [Bytes].
1027    pub fn as_bytes(&self) -> Bytes {
1028        self.0.clone()
1029    }
1030
1031    /// Get this [`RecordIdentifier`] as a tuple of byte slices.
1032    pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1033        (
1034            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1035            self.0[AUTHOR_BYTES].try_into().unwrap(),
1036            &self.0[KEY_BYTES],
1037        )
1038    }
1039
1040    /// Get this [`RecordIdentifier`] as a tuple of bytes.
1041    pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1042        (
1043            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1044            self.0[AUTHOR_BYTES].try_into().unwrap(),
1045            self.0.slice(KEY_BYTES),
1046        )
1047    }
1048
1049    /// Get the key of this record.
1050    pub fn key(&self) -> &[u8] {
1051        &self.0[KEY_BYTES]
1052    }
1053
1054    /// Get the key of this record as [`Bytes`].
1055    pub fn key_bytes(&self) -> Bytes {
1056        self.0.slice(KEY_BYTES)
1057    }
1058
1059    /// Get the [`NamespaceId`] of this record as byte array.
1060    pub fn namespace(&self) -> NamespaceId {
1061        let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1062        value.into()
1063    }
1064
1065    /// Get the [`AuthorId`] of this record as byte array.
1066    pub fn author(&self) -> AuthorId {
1067        let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1068        value.into()
1069    }
1070}
1071
1072impl AsRef<[u8]> for RecordIdentifier {
1073    fn as_ref(&self) -> &[u8] {
1074        &self.0
1075    }
1076}
1077
1078impl Deref for SignedEntry {
1079    type Target = Entry;
1080    fn deref(&self) -> &Self::Target {
1081        &self.entry
1082    }
1083}
1084
1085impl Deref for Entry {
1086    type Target = Record;
1087    fn deref(&self) -> &Self::Target {
1088        &self.record
1089    }
1090}
1091
1092/// The data part of an entry in a [`Replica`].
1093#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1094pub struct Record {
1095    /// Length of the data referenced by `hash`.
1096    len: u64,
1097    /// Hash of the content data.
1098    hash: Hash,
1099    /// Record creation timestamp. Counted as micros since the Unix epoch.
1100    timestamp: u64,
1101}
1102
1103impl RangeValue for Record {}
1104
1105/// Ordering for entry values.
1106///
1107/// Compares first the timestamp, then the content hash.
1108impl Ord for Record {
1109    fn cmp(&self, other: &Self) -> Ordering {
1110        self.timestamp
1111            .cmp(&other.timestamp)
1112            .then_with(|| self.hash.cmp(&other.hash))
1113    }
1114}
1115
1116impl PartialOrd for Record {
1117    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1118        Some(self.cmp(other))
1119    }
1120}
1121
1122impl Record {
1123    /// Create a new record.
1124    pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1125        debug_assert!(
1126            len != 0 || hash == Hash::EMPTY,
1127            "if `len` is 0 then `hash` must be the hash of the empty byte range"
1128        );
1129        Record {
1130            hash,
1131            len,
1132            timestamp,
1133        }
1134    }
1135
1136    /// Create a tombstone record (empty content)
1137    pub fn empty(timestamp: u64) -> Self {
1138        Self::new(Hash::EMPTY, 0, timestamp)
1139    }
1140
1141    /// Create a tombstone record with the timestamp set to now.
1142    pub fn empty_current() -> Self {
1143        Self::new_current(Hash::EMPTY, 0)
1144    }
1145
1146    /// Return `true` if the entry is empty.
1147    pub fn is_empty(&self) -> bool {
1148        self.hash == Hash::EMPTY
1149    }
1150
1151    /// Create a new [`Record`] with the timestamp set to now.
1152    pub fn new_current(hash: Hash, len: u64) -> Self {
1153        let timestamp = system_time_now();
1154        Self::new(hash, len, timestamp)
1155    }
1156
1157    /// Get the length of the data addressed by this record's content hash.
1158    pub fn content_len(&self) -> u64 {
1159        self.len
1160    }
1161
1162    /// Get the [`struct@Hash`] of the content data of this record.
1163    pub fn content_hash(&self) -> Hash {
1164        self.hash
1165    }
1166
1167    /// Get the timestamp of this record.
1168    pub fn timestamp(&self) -> u64 {
1169        self.timestamp
1170    }
1171
1172    #[cfg(test)]
1173    pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1174        let len = data.as_ref().len() as u64;
1175        let hash = Hash::new(data);
1176        Self::new_current(hash, len)
1177    }
1178
1179    #[cfg(test)]
1180    pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1181        let len = data.as_ref().len() as u64;
1182        let hash = Hash::new(data);
1183        Self::new(hash, len, timestamp)
1184    }
1185
1186    /// Serialize this record into a mutable byte array.
1187    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1188        out.extend_from_slice(&self.len.to_be_bytes());
1189        out.extend_from_slice(self.hash.as_ref());
1190        out.extend_from_slice(&self.timestamp.to_be_bytes())
1191    }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196    use std::collections::HashSet;
1197
1198    use anyhow::Result;
1199    use rand_core::SeedableRng;
1200
1201    use super::*;
1202    use crate::{
1203        actor::SyncHandle,
1204        ranger::{Range, Store as _},
1205        store::{OpenError, Query, SortBy, SortDirection, Store},
1206    };
1207
1208    #[test]
1209    fn test_basics_memory() -> Result<()> {
1210        let store = store::Store::memory();
1211        test_basics(store)?;
1212
1213        Ok(())
1214    }
1215
1216    #[test]
1217    fn test_basics_fs() -> Result<()> {
1218        let dbfile = tempfile::NamedTempFile::new()?;
1219        let store = store::fs::Store::persistent(dbfile.path())?;
1220        test_basics(store)?;
1221        Ok(())
1222    }
1223
1224    fn test_basics(mut store: Store) -> Result<()> {
1225        let mut rng = rand::thread_rng();
1226        let alice = Author::new(&mut rng);
1227        let bob = Author::new(&mut rng);
1228        let myspace = NamespaceSecret::new(&mut rng);
1229
1230        let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1231        let record = Record::current_from_data(b"this is my cool data");
1232        let entry = Entry::new(record_id, record);
1233        let signed_entry = entry.sign(&myspace, &alice);
1234        signed_entry.verify(&()).expect("failed to verify");
1235
1236        let mut my_replica = store.new_replica(myspace.clone())?;
1237        for i in 0..10 {
1238            my_replica.hash_and_insert(
1239                format!("/{i}"),
1240                &alice,
1241                format!("{i}: hello from alice"),
1242            )?;
1243        }
1244
1245        for i in 0..10 {
1246            let res = store
1247                .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1248                .unwrap();
1249            let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1250            assert_eq!(res.entry().record().content_len(), len);
1251            res.verify(&())?;
1252        }
1253
1254        // Test multiple records for the same key
1255        let mut my_replica = store.new_replica(myspace.clone())?;
1256        my_replica.hash_and_insert("/cool/path", &alice, "round 1")?;
1257        let _entry = store
1258            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1259            .unwrap();
1260        // Second
1261        let mut my_replica = store.new_replica(myspace.clone())?;
1262        my_replica.hash_and_insert("/cool/path", &alice, "round 2")?;
1263        let _entry = store
1264            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1265            .unwrap();
1266
1267        // Get All by author
1268        let entries: Vec<_> = store
1269            .get_many(myspace.id(), Query::author(alice.id()))?
1270            .collect::<Result<_>>()?;
1271        assert_eq!(entries.len(), 11);
1272
1273        // Get All by author
1274        let entries: Vec<_> = store
1275            .get_many(myspace.id(), Query::author(bob.id()))?
1276            .collect::<Result<_>>()?;
1277        assert_eq!(entries.len(), 0);
1278
1279        // Get All by key
1280        let entries: Vec<_> = store
1281            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1282            .collect::<Result<_>>()?;
1283        assert_eq!(entries.len(), 1);
1284
1285        // Get All
1286        let entries: Vec<_> = store
1287            .get_many(myspace.id(), Query::all())?
1288            .collect::<Result<_>>()?;
1289        assert_eq!(entries.len(), 11);
1290
1291        // insert record from different author
1292        let mut my_replica = store.new_replica(myspace.clone())?;
1293        let _entry = my_replica.hash_and_insert("/cool/path", &bob, "bob round 1")?;
1294
1295        // Get All by author
1296        let entries: Vec<_> = store
1297            .get_many(myspace.id(), Query::author(alice.id()))?
1298            .collect::<Result<_>>()?;
1299        assert_eq!(entries.len(), 11);
1300
1301        let entries: Vec<_> = store
1302            .get_many(myspace.id(), Query::author(bob.id()))?
1303            .collect::<Result<_>>()?;
1304        assert_eq!(entries.len(), 1);
1305
1306        // Get All by key
1307        let entries: Vec<_> = store
1308            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1309            .collect::<Result<_>>()?;
1310        assert_eq!(entries.len(), 2);
1311
1312        // Get all by prefix
1313        let entries: Vec<_> = store
1314            .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1315            .collect::<Result<_>>()?;
1316        assert_eq!(entries.len(), 2);
1317
1318        // Get All by author and prefix
1319        let entries: Vec<_> = store
1320            .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1321            .collect::<Result<_>>()?;
1322        assert_eq!(entries.len(), 1);
1323
1324        let entries: Vec<_> = store
1325            .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1326            .collect::<Result<_>>()?;
1327        assert_eq!(entries.len(), 1);
1328
1329        // Get All
1330        let entries: Vec<_> = store
1331            .get_many(myspace.id(), Query::all())?
1332            .collect::<Result<_>>()?;
1333        assert_eq!(entries.len(), 12);
1334
1335        // Get Range of all should return all latest
1336        let mut my_replica = store.new_replica(myspace.clone())?;
1337        let entries_second: Vec<_> = my_replica
1338            .store
1339            .get_range(Range::new(
1340                RecordIdentifier::default(),
1341                RecordIdentifier::default(),
1342            ))?
1343            .collect::<Result<_, _>>()?;
1344
1345        assert_eq!(entries_second.len(), 12);
1346        assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1347
1348        test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1349        store.flush()?;
1350        Ok(())
1351    }
1352
1353    /// Test that [`Store::register_useful_peer`] behaves like a LRUCache of size
1354    /// [`super::store::PEERS_PER_DOC_CACHE_SIZE`].
1355    fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1356        /// Helper to verify the store returns the expected peers for the namespace.
1357        #[track_caller]
1358        fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1359            assert_eq!(
1360                expected_peers,
1361                &store
1362                    .get_sync_peers(&namespace)
1363                    .unwrap()
1364                    .unwrap()
1365                    .collect::<Vec<_>>(),
1366                "sync peers differ"
1367            );
1368        }
1369
1370        let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1371        // expected peers: newest peers are to the front, oldest to the back
1372        let mut expected_peers = Vec::with_capacity(count);
1373        for i in 0..count as u8 {
1374            let peer = [i; 32];
1375            expected_peers.insert(0, peer);
1376            store.register_useful_peer(namespace, peer)?;
1377        }
1378        verify_peers(store, namespace, &expected_peers);
1379
1380        // one more peer should evict the last peer
1381        expected_peers.pop();
1382        let newer_peer = [count as u8; 32];
1383        expected_peers.insert(0, newer_peer);
1384        store.register_useful_peer(namespace, newer_peer)?;
1385        verify_peers(store, namespace, &expected_peers);
1386
1387        // move one existing peer up
1388        let refreshed_peer = expected_peers.remove(2);
1389        expected_peers.insert(0, refreshed_peer);
1390        store.register_useful_peer(namespace, refreshed_peer)?;
1391        verify_peers(store, namespace, &expected_peers);
1392        Ok(())
1393    }
1394
1395    #[test]
1396    fn test_content_hashes_iterator_memory() -> Result<()> {
1397        let store = store::Store::memory();
1398        test_content_hashes_iterator(store)
1399    }
1400
1401    #[test]
1402    fn test_content_hashes_iterator_fs() -> Result<()> {
1403        let dbfile = tempfile::NamedTempFile::new()?;
1404        let store = store::fs::Store::persistent(dbfile.path())?;
1405        test_content_hashes_iterator(store)
1406    }
1407
1408    fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1409        let mut rng = rand::thread_rng();
1410        let mut expected = HashSet::new();
1411        let n_replicas = 3;
1412        let n_entries = 4;
1413        for i in 0..n_replicas {
1414            let namespace = NamespaceSecret::new(&mut rng);
1415            let author = store.new_author(&mut rng)?;
1416            let mut replica = store.new_replica(namespace)?;
1417            for j in 0..n_entries {
1418                let key = format!("{j}");
1419                let data = format!("{i}:{j}");
1420                let hash = replica.hash_and_insert(key, &author, data)?;
1421                expected.insert(hash);
1422            }
1423        }
1424        assert_eq!(expected.len(), n_replicas * n_entries);
1425        let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1426        assert_eq!(actual, expected);
1427        Ok(())
1428    }
1429
1430    #[test]
1431    fn test_multikey() {
1432        let mut rng = rand::thread_rng();
1433
1434        let k = ["a", "c", "z"];
1435
1436        let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1437        n.sort_by_key(|n| n.id());
1438
1439        let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1440        a.sort_by_key(|a| a.id());
1441
1442        // Just key
1443        {
1444            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1445            let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1446            let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1447
1448            let range = Range::new(ri0.clone(), ri2.clone());
1449            assert!(range.contains(&ri0), "start");
1450            assert!(range.contains(&ri1), "inside");
1451            assert!(!range.contains(&ri2), "end");
1452
1453            assert!(ri0 < ri1);
1454            assert!(ri1 < ri2);
1455        }
1456
1457        // Just namespace
1458        {
1459            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1460            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1461            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1462
1463            let range = Range::new(ri0.clone(), ri2.clone());
1464            assert!(range.contains(&ri0), "start");
1465            assert!(range.contains(&ri1), "inside");
1466            assert!(!range.contains(&ri2), "end");
1467
1468            assert!(ri0 < ri1);
1469            assert!(ri1 < ri2);
1470        }
1471
1472        // Just author
1473        {
1474            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1475            let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1476            let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1477
1478            let range = Range::new(ri0.clone(), ri2.clone());
1479            assert!(range.contains(&ri0), "start");
1480            assert!(range.contains(&ri1), "inside");
1481            assert!(!range.contains(&ri2), "end");
1482
1483            assert!(ri0 < ri1);
1484            assert!(ri1 < ri2);
1485        }
1486
1487        // Just key and namespace
1488        {
1489            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1490            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1491            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1492
1493            let range = Range::new(ri0.clone(), ri2.clone());
1494            assert!(range.contains(&ri0), "start");
1495            assert!(range.contains(&ri1), "inside");
1496            assert!(!range.contains(&ri2), "end");
1497
1498            assert!(ri0 < ri1);
1499            assert!(ri1 < ri2);
1500        }
1501
1502        // Mixed
1503        {
1504            // Ord should prioritize namespace - author - key
1505
1506            let a0 = a[0].id();
1507            let a1 = a[1].id();
1508            let n0 = n[0].id();
1509            let n1 = n[1].id();
1510            let k0 = k[0];
1511            let k1 = k[1];
1512
1513            assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1514            assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1515            assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1516            assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1517        }
1518    }
1519
1520    #[test]
1521    fn test_timestamps_memory() -> Result<()> {
1522        let store = store::Store::memory();
1523        test_timestamps(store)?;
1524
1525        Ok(())
1526    }
1527
1528    #[test]
1529    fn test_timestamps_fs() -> Result<()> {
1530        let dbfile = tempfile::NamedTempFile::new()?;
1531        let store = store::fs::Store::persistent(dbfile.path())?;
1532        test_timestamps(store)?;
1533        Ok(())
1534    }
1535
1536    fn test_timestamps(mut store: Store) -> Result<()> {
1537        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1538        let namespace = NamespaceSecret::new(&mut rng);
1539        let _replica = store.new_replica(namespace.clone())?;
1540        let author = store.new_author(&mut rng)?;
1541        store.close_replica(namespace.id());
1542        let mut replica = store.open_replica(&namespace.id())?;
1543
1544        let key = b"hello";
1545        let value = b"world";
1546        let entry = {
1547            let timestamp = 2;
1548            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1549            let record = Record::from_data(value, timestamp);
1550            Entry::new(id, record).sign(&namespace, &author)
1551        };
1552
1553        replica
1554            .insert_entry(entry.clone(), InsertOrigin::Local)
1555            .unwrap();
1556        store.close_replica(namespace.id());
1557        let res = store
1558            .get_exact(namespace.id(), author.id(), key, false)?
1559            .unwrap();
1560        assert_eq!(res, entry);
1561
1562        let entry2 = {
1563            let timestamp = 1;
1564            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1565            let record = Record::from_data(value, timestamp);
1566            Entry::new(id, record).sign(&namespace, &author)
1567        };
1568
1569        let mut replica = store.open_replica(&namespace.id())?;
1570        let res = replica.insert_entry(entry2, InsertOrigin::Local);
1571        store.close_replica(namespace.id());
1572        assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1573        let res = store
1574            .get_exact(namespace.id(), author.id(), key, false)?
1575            .unwrap();
1576        assert_eq!(res, entry);
1577        store.flush()?;
1578        Ok(())
1579    }
1580
1581    #[tokio::test]
1582    async fn test_replica_sync_memory() -> Result<()> {
1583        let alice_store = store::Store::memory();
1584        let bob_store = store::Store::memory();
1585
1586        test_replica_sync(alice_store, bob_store).await?;
1587        Ok(())
1588    }
1589
1590    #[tokio::test]
1591    async fn test_replica_sync_fs() -> Result<()> {
1592        let alice_dbfile = tempfile::NamedTempFile::new()?;
1593        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1594        let bob_dbfile = tempfile::NamedTempFile::new()?;
1595        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1596        test_replica_sync(alice_store, bob_store).await?;
1597
1598        Ok(())
1599    }
1600
1601    async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1602        let alice_set = ["ape", "eel", "fox", "gnu"];
1603        let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1604
1605        let mut rng = rand::thread_rng();
1606        let author = Author::new(&mut rng);
1607        let myspace = NamespaceSecret::new(&mut rng);
1608        let mut alice = alice_store.new_replica(myspace.clone())?;
1609        for el in &alice_set {
1610            alice.hash_and_insert(el, &author, el.as_bytes())?;
1611        }
1612
1613        let mut bob = bob_store.new_replica(myspace.clone())?;
1614        for el in &bob_set {
1615            bob.hash_and_insert(el, &author, el.as_bytes())?;
1616        }
1617
1618        let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1619
1620        assert_eq!(alice_out.num_sent, 2);
1621        assert_eq!(bob_out.num_recv, 2);
1622        assert_eq!(alice_out.num_recv, 6);
1623        assert_eq!(bob_out.num_sent, 6);
1624
1625        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1626        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1627        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1628        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1629        alice_store.flush()?;
1630        bob_store.flush()?;
1631        Ok(())
1632    }
1633
1634    #[tokio::test]
1635    async fn test_replica_timestamp_sync_memory() -> Result<()> {
1636        let alice_store = store::Store::memory();
1637        let bob_store = store::Store::memory();
1638
1639        test_replica_timestamp_sync(alice_store, bob_store).await?;
1640        Ok(())
1641    }
1642
1643    #[tokio::test]
1644    async fn test_replica_timestamp_sync_fs() -> Result<()> {
1645        let alice_dbfile = tempfile::NamedTempFile::new()?;
1646        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1647        let bob_dbfile = tempfile::NamedTempFile::new()?;
1648        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1649        test_replica_timestamp_sync(alice_store, bob_store).await?;
1650
1651        Ok(())
1652    }
1653
1654    async fn test_replica_timestamp_sync(
1655        mut alice_store: Store,
1656        mut bob_store: Store,
1657    ) -> Result<()> {
1658        let mut rng = rand::thread_rng();
1659        let author = Author::new(&mut rng);
1660        let namespace = NamespaceSecret::new(&mut rng);
1661        let mut alice = alice_store.new_replica(namespace.clone())?;
1662        let mut bob = bob_store.new_replica(namespace.clone())?;
1663
1664        let key = b"key";
1665        let alice_value = b"alice";
1666        let bob_value = b"bob";
1667        let _alice_hash = alice.hash_and_insert(key, &author, alice_value)?;
1668        // system time increased - sync should overwrite
1669        let bob_hash = bob.hash_and_insert(key, &author, bob_value)?;
1670        sync(&mut alice, &mut bob).await?;
1671        assert_eq!(
1672            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1673            Some(bob_hash)
1674        );
1675        assert_eq!(
1676            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1677            Some(bob_hash)
1678        );
1679
1680        let mut alice = alice_store.new_replica(namespace.clone())?;
1681        let mut bob = bob_store.new_replica(namespace.clone())?;
1682
1683        let alice_value_2 = b"alice2";
1684        // system time increased - sync should overwrite
1685        let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value)?;
1686        let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2)?;
1687        sync(&mut alice, &mut bob).await?;
1688        assert_eq!(
1689            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1690            Some(alice_hash_2)
1691        );
1692        assert_eq!(
1693            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1694            Some(alice_hash_2)
1695        );
1696        alice_store.flush()?;
1697        bob_store.flush()?;
1698        Ok(())
1699    }
1700
1701    #[test]
1702    fn test_future_timestamp() -> Result<()> {
1703        let mut rng = rand::thread_rng();
1704        let mut store = store::Store::memory();
1705        let author = Author::new(&mut rng);
1706        let namespace = NamespaceSecret::new(&mut rng);
1707
1708        let mut replica = store.new_replica(namespace.clone())?;
1709        let key = b"hi";
1710        let t = system_time_now();
1711        let record = Record::from_data(b"1", t);
1712        let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1713        replica.insert_entry(entry0.clone(), InsertOrigin::Local)?;
1714
1715        assert_eq!(
1716            get_entry(&mut store, namespace.id(), author.id(), key)?,
1717            entry0
1718        );
1719
1720        let mut replica = store.new_replica(namespace.clone())?;
1721        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1722        let record = Record::from_data(b"2", t);
1723        let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1724        replica.insert_entry(entry1.clone(), InsertOrigin::Local)?;
1725        assert_eq!(
1726            get_entry(&mut store, namespace.id(), author.id(), key)?,
1727            entry1
1728        );
1729
1730        let mut replica = store.new_replica(namespace.clone())?;
1731        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1732        let record = Record::from_data(b"2", t);
1733        let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1734        replica.insert_entry(entry2.clone(), InsertOrigin::Local)?;
1735        assert_eq!(
1736            get_entry(&mut store, namespace.id(), author.id(), key)?,
1737            entry2
1738        );
1739
1740        let mut replica = store.new_replica(namespace.clone())?;
1741        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1742        let record = Record::from_data(b"2", t);
1743        let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1744        let res = replica.insert_entry(entry3, InsertOrigin::Local);
1745        assert!(matches!(
1746            res,
1747            Err(InsertError::Validation(
1748                ValidationFailure::TooFarInTheFuture
1749            ))
1750        ));
1751        assert_eq!(
1752            get_entry(&mut store, namespace.id(), author.id(), key)?,
1753            entry2
1754        );
1755        store.flush()?;
1756        Ok(())
1757    }
1758
1759    #[test]
1760    fn test_insert_empty() -> Result<()> {
1761        let mut store = store::Store::memory();
1762        let mut rng = rand::thread_rng();
1763        let alice = Author::new(&mut rng);
1764        let myspace = NamespaceSecret::new(&mut rng);
1765        let mut replica = store.new_replica(myspace.clone())?;
1766        let hash = Hash::new(b"");
1767        let res = replica.insert(b"foo", &alice, hash, 0);
1768        assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1769        store.flush()?;
1770        Ok(())
1771    }
1772
1773    #[test]
1774    fn test_prefix_delete_memory() -> Result<()> {
1775        let store = store::Store::memory();
1776        test_prefix_delete(store)?;
1777        Ok(())
1778    }
1779
1780    #[test]
1781    fn test_prefix_delete_fs() -> Result<()> {
1782        let dbfile = tempfile::NamedTempFile::new()?;
1783        let store = store::fs::Store::persistent(dbfile.path())?;
1784        test_prefix_delete(store)?;
1785        Ok(())
1786    }
1787
1788    fn test_prefix_delete(mut store: Store) -> Result<()> {
1789        let mut rng = rand::thread_rng();
1790        let alice = Author::new(&mut rng);
1791        let myspace = NamespaceSecret::new(&mut rng);
1792        let mut replica = store.new_replica(myspace.clone())?;
1793        let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello")?;
1794        let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world")?;
1795
1796        // sanity checks
1797        assert_eq!(
1798            get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1799            Some(hash1)
1800        );
1801        assert_eq!(
1802            get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1803            Some(hash2)
1804        );
1805
1806        // delete
1807        let mut replica = store.new_replica(myspace.clone())?;
1808        let deleted = replica.delete_prefix(b"foo", &alice)?;
1809        assert_eq!(deleted, 2);
1810        assert_eq!(
1811            store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1812            None
1813        );
1814        assert_eq!(
1815            store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1816            None
1817        );
1818        assert_eq!(
1819            store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1820            None
1821        );
1822        store.flush()?;
1823        Ok(())
1824    }
1825
1826    #[tokio::test]
1827    async fn test_replica_sync_delete_memory() -> Result<()> {
1828        let alice_store = store::Store::memory();
1829        let bob_store = store::Store::memory();
1830
1831        test_replica_sync_delete(alice_store, bob_store).await
1832    }
1833
1834    #[tokio::test]
1835    async fn test_replica_sync_delete_fs() -> Result<()> {
1836        let alice_dbfile = tempfile::NamedTempFile::new()?;
1837        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1838        let bob_dbfile = tempfile::NamedTempFile::new()?;
1839        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1840        test_replica_sync_delete(alice_store, bob_store).await
1841    }
1842
1843    async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1844        let alice_set = ["foot"];
1845        let bob_set = ["fool", "foo", "fog"];
1846
1847        let mut rng = rand::thread_rng();
1848        let author = Author::new(&mut rng);
1849        let myspace = NamespaceSecret::new(&mut rng);
1850        let mut alice = alice_store.new_replica(myspace.clone())?;
1851        for el in &alice_set {
1852            alice.hash_and_insert(el, &author, el.as_bytes())?;
1853        }
1854
1855        let mut bob = bob_store.new_replica(myspace.clone())?;
1856        for el in &bob_set {
1857            bob.hash_and_insert(el, &author, el.as_bytes())?;
1858        }
1859
1860        sync(&mut alice, &mut bob).await?;
1861
1862        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1863        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1864        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1865        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1866
1867        let mut alice = alice_store.new_replica(myspace.clone())?;
1868        let mut bob = bob_store.new_replica(myspace.clone())?;
1869        alice.delete_prefix("foo", &author)?;
1870        bob.hash_and_insert("fooz", &author, "fooz".as_bytes())?;
1871        sync(&mut alice, &mut bob).await?;
1872        check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1873        check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1874        alice_store.flush()?;
1875        bob_store.flush()?;
1876        Ok(())
1877    }
1878
1879    #[test]
1880    fn test_replica_remove_memory() -> Result<()> {
1881        let alice_store = store::Store::memory();
1882        test_replica_remove(alice_store)
1883    }
1884
1885    #[test]
1886    fn test_replica_remove_fs() -> Result<()> {
1887        let alice_dbfile = tempfile::NamedTempFile::new()?;
1888        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1889        test_replica_remove(alice_store)
1890    }
1891
1892    fn test_replica_remove(mut store: Store) -> Result<()> {
1893        let mut rng = rand::thread_rng();
1894        let namespace = NamespaceSecret::new(&mut rng);
1895        let author = Author::new(&mut rng);
1896        let mut replica = store.new_replica(namespace.clone())?;
1897
1898        // insert entry
1899        let hash = replica.hash_and_insert(b"foo", &author, b"bar")?;
1900        let res = store
1901            .get_many(namespace.id(), Query::all())?
1902            .collect::<Vec<_>>();
1903        assert_eq!(res.len(), 1);
1904
1905        // remove replica
1906        let res = store.remove_replica(&namespace.id());
1907        // may not remove replica while still open;
1908        assert!(res.is_err());
1909        store.close_replica(namespace.id());
1910        store.remove_replica(&namespace.id())?;
1911        let res = store
1912            .get_many(namespace.id(), Query::all())?
1913            .collect::<Vec<_>>();
1914        assert_eq!(res.len(), 0);
1915
1916        // may not reopen removed replica
1917        let res = store.load_replica_info(&namespace.id());
1918        assert!(matches!(res, Err(OpenError::NotFound)));
1919
1920        // may recreate replica
1921        let mut replica = store.new_replica(namespace.clone())?;
1922        replica.insert(b"foo", &author, hash, 3)?;
1923        let res = store
1924            .get_many(namespace.id(), Query::all())?
1925            .collect::<Vec<_>>();
1926        assert_eq!(res.len(), 1);
1927        store.flush()?;
1928        Ok(())
1929    }
1930
1931    #[test]
1932    fn test_replica_delete_edge_cases_memory() -> Result<()> {
1933        let store = store::Store::memory();
1934        test_replica_delete_edge_cases(store)
1935    }
1936
1937    #[test]
1938    fn test_replica_delete_edge_cases_fs() -> Result<()> {
1939        let dbfile = tempfile::NamedTempFile::new()?;
1940        let store = store::fs::Store::persistent(dbfile.path())?;
1941        test_replica_delete_edge_cases(store)
1942    }
1943
1944    fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
1945        let mut rng = rand::thread_rng();
1946        let author = Author::new(&mut rng);
1947        let namespace = NamespaceSecret::new(&mut rng);
1948
1949        let edgecases = [0u8, 1u8, 255u8];
1950        let prefixes = [0u8, 255u8];
1951        let hash = Hash::new(b"foo");
1952        let len = 3;
1953        for prefix in prefixes {
1954            let mut expected = vec![];
1955            let mut replica = store.new_replica(namespace.clone())?;
1956            for suffix in edgecases {
1957                let key = [prefix, suffix].to_vec();
1958                expected.push(key.clone());
1959                replica.insert(&key, &author, hash, len)?;
1960            }
1961            assert_keys(&mut store, namespace.id(), expected);
1962            let mut replica = store.new_replica(namespace.clone())?;
1963            replica.delete_prefix([prefix], &author)?;
1964            assert_keys(&mut store, namespace.id(), vec![]);
1965        }
1966
1967        let mut replica = store.new_replica(namespace.clone())?;
1968        let key = vec![1u8, 0u8];
1969        replica.insert(key, &author, hash, len)?;
1970        let key = vec![1u8, 1u8];
1971        replica.insert(key, &author, hash, len)?;
1972        let key = vec![1u8, 2u8];
1973        replica.insert(key, &author, hash, len)?;
1974        let prefix = vec![1u8, 1u8];
1975        replica.delete_prefix(prefix, &author)?;
1976        assert_keys(
1977            &mut store,
1978            namespace.id(),
1979            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1980        );
1981
1982        let mut replica = store.new_replica(namespace.clone())?;
1983        let key = vec![0u8, 255u8];
1984        replica.insert(key, &author, hash, len)?;
1985        let key = vec![0u8, 0u8];
1986        replica.insert(key, &author, hash, len)?;
1987        let prefix = vec![0u8];
1988        replica.delete_prefix(prefix, &author)?;
1989        assert_keys(
1990            &mut store,
1991            namespace.id(),
1992            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1993        );
1994        store.flush()?;
1995        Ok(())
1996    }
1997
1998    #[test]
1999    fn test_latest_iter_memory() -> Result<()> {
2000        let store = store::Store::memory();
2001        test_latest_iter(store)
2002    }
2003
2004    #[test]
2005    fn test_latest_iter_fs() -> Result<()> {
2006        let dbfile = tempfile::NamedTempFile::new()?;
2007        let store = store::fs::Store::persistent(dbfile.path())?;
2008        test_latest_iter(store)
2009    }
2010
2011    fn test_latest_iter(mut store: Store) -> Result<()> {
2012        let mut rng = rand::thread_rng();
2013        let author0 = Author::new(&mut rng);
2014        let author1 = Author::new(&mut rng);
2015        let namespace = NamespaceSecret::new(&mut rng);
2016        let mut replica = store.new_replica(namespace.clone())?;
2017
2018        replica.hash_and_insert(b"a0.1", &author0, b"hi")?;
2019        let latest = store
2020            .get_latest_for_each_author(namespace.id())?
2021            .collect::<Result<Vec<_>>>()?;
2022        assert_eq!(latest.len(), 1);
2023        assert_eq!(latest[0].2, b"a0.1".to_vec());
2024
2025        let mut replica = store.new_replica(namespace.clone())?;
2026        replica.hash_and_insert(b"a1.1", &author1, b"hi")?;
2027        replica.hash_and_insert(b"a0.2", &author0, b"hi")?;
2028        let latest = store
2029            .get_latest_for_each_author(namespace.id())?
2030            .collect::<Result<Vec<_>>>()?;
2031        let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2032        latest_keys.sort();
2033        assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2034        store.flush()?;
2035        Ok(())
2036    }
2037
2038    #[test]
2039    fn test_replica_byte_keys_memory() -> Result<()> {
2040        let store = store::Store::memory();
2041
2042        test_replica_byte_keys(store)?;
2043        Ok(())
2044    }
2045
2046    #[test]
2047    fn test_replica_byte_keys_fs() -> Result<()> {
2048        let dbfile = tempfile::NamedTempFile::new()?;
2049        let store = store::fs::Store::persistent(dbfile.path())?;
2050        test_replica_byte_keys(store)?;
2051
2052        Ok(())
2053    }
2054
2055    fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2056        let mut rng = rand::thread_rng();
2057        let author = Author::new(&mut rng);
2058        let namespace = NamespaceSecret::new(&mut rng);
2059
2060        let hash = Hash::new(b"foo");
2061        let len = 3;
2062
2063        let key = vec![1u8, 0u8];
2064        let mut replica = store.new_replica(namespace.clone())?;
2065        replica.insert(key, &author, hash, len)?;
2066        assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2067        let key = vec![1u8, 2u8];
2068        let mut replica = store.new_replica(namespace.clone())?;
2069        replica.insert(key, &author, hash, len)?;
2070        assert_keys(
2071            &mut store,
2072            namespace.id(),
2073            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2074        );
2075
2076        let key = vec![0u8, 255u8];
2077        let mut replica = store.new_replica(namespace.clone())?;
2078        replica.insert(key, &author, hash, len)?;
2079        assert_keys(
2080            &mut store,
2081            namespace.id(),
2082            vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2083        );
2084        store.flush()?;
2085        Ok(())
2086    }
2087
2088    #[test]
2089    fn test_replica_capability_memory() -> Result<()> {
2090        let store = store::Store::memory();
2091        test_replica_capability(store)
2092    }
2093
2094    #[test]
2095    fn test_replica_capability_fs() -> Result<()> {
2096        let dbfile = tempfile::NamedTempFile::new()?;
2097        let store = store::fs::Store::persistent(dbfile.path())?;
2098        test_replica_capability(store)
2099    }
2100
2101    #[allow(clippy::redundant_pattern_matching)]
2102    fn test_replica_capability(mut store: Store) -> Result<()> {
2103        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2104        let author = store.new_author(&mut rng)?;
2105        let namespace = NamespaceSecret::new(&mut rng);
2106
2107        // import read capability - insert must fail
2108        let capability = Capability::Read(namespace.id());
2109        store.import_namespace(capability)?;
2110        let mut replica = store.open_replica(&namespace.id())?;
2111        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2112        assert!(matches!(res, Err(InsertError::ReadOnly)));
2113
2114        // import write capability - insert must succeed
2115        let capability = Capability::Write(namespace.clone());
2116        store.import_namespace(capability)?;
2117        let mut replica = store.open_replica(&namespace.id())?;
2118        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2119        assert!(matches!(res, Ok(_)));
2120        store.close_replica(namespace.id());
2121        let mut replica = store.open_replica(&namespace.id())?;
2122        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2123        assert!(res.is_ok());
2124
2125        // import read capability again - insert must still succeed
2126        let capability = Capability::Read(namespace.id());
2127        store.import_namespace(capability)?;
2128        store.close_replica(namespace.id());
2129        let mut replica = store.open_replica(&namespace.id())?;
2130        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2131        assert!(res.is_ok());
2132        store.flush()?;
2133        Ok(())
2134    }
2135
2136    #[tokio::test]
2137    async fn test_actor_capability_memory() -> Result<()> {
2138        let store = store::Store::memory();
2139        test_actor_capability(store).await
2140    }
2141
2142    #[tokio::test]
2143    async fn test_actor_capability_fs() -> Result<()> {
2144        let dbfile = tempfile::NamedTempFile::new()?;
2145        let store = store::fs::Store::persistent(dbfile.path())?;
2146        test_actor_capability(store).await
2147    }
2148
2149    async fn test_actor_capability(store: Store) -> Result<()> {
2150        // test with actor
2151        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2152        let author = Author::new(&mut rng);
2153        let handle = SyncHandle::spawn(store, None, "test".into());
2154        let author = handle.import_author(author).await?;
2155        let namespace = NamespaceSecret::new(&mut rng);
2156        let id = namespace.id();
2157
2158        // import read capability - insert must fail
2159        let capability = Capability::Read(namespace.id());
2160        handle.import_namespace(capability).await?;
2161        handle.open(namespace.id(), Default::default()).await?;
2162        let res = handle
2163            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2164            .await;
2165        assert!(res.is_err());
2166
2167        // import write capability - insert must succeed
2168        let capability = Capability::Write(namespace.clone());
2169        handle.import_namespace(capability).await?;
2170        let res = handle
2171            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2172            .await;
2173        assert!(res.is_ok());
2174
2175        // close and reopen - must still succeed
2176        handle.close(namespace.id()).await?;
2177        let res = handle
2178            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2179            .await;
2180        assert!(res.is_err());
2181        handle.open(namespace.id(), Default::default()).await?;
2182        let res = handle
2183            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2184            .await;
2185        assert!(res.is_ok());
2186        Ok(())
2187    }
2188
2189    fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2190        let mut res = vec![];
2191        while let Ok(ev) = events.try_recv() {
2192            res.push(ev);
2193        }
2194        res
2195    }
2196
2197    /// This tests that no events are emitted for entries received during sync which are obsolete
2198    /// (too old) by the time they are actually inserted in the store.
2199    #[tokio::test]
2200    async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2201        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2202        let mut store1 = store::Store::memory();
2203        let mut store2 = store::Store::memory();
2204        let peer1 = [1u8; 32];
2205        let peer2 = [2u8; 32];
2206        let mut state1 = SyncOutcome::default();
2207        let mut state2 = SyncOutcome::default();
2208
2209        let author = Author::new(&mut rng);
2210        let namespace = NamespaceSecret::new(&mut rng);
2211        let mut replica1 = store1.new_replica(namespace.clone())?;
2212        let mut replica2 = store2.new_replica(namespace.clone())?;
2213
2214        let (events1_sender, events1) = async_channel::bounded(32);
2215        let (events2_sender, events2) = async_channel::bounded(32);
2216
2217        replica1.info.subscribe(events1_sender);
2218        replica2.info.subscribe(events2_sender);
2219
2220        replica1.hash_and_insert(b"foo", &author, b"init")?;
2221
2222        let from1 = replica1.sync_initial_message()?;
2223        let from2 = replica2
2224            .sync_process_message(from1, peer1, &mut state2)
2225            .await
2226            .unwrap()
2227            .unwrap();
2228        let from1 = replica1
2229            .sync_process_message(from2, peer2, &mut state1)
2230            .await
2231            .unwrap()
2232            .unwrap();
2233        // now we will receive the entry from rpelica1. we will insert a newer entry now, while the
2234        // sync is already running. this means the entry from replica1 will be rejected. we make
2235        // sure that no InsertRemote event is emitted for this entry.
2236        replica2.hash_and_insert(b"foo", &author, b"update")?;
2237        let from2 = replica2
2238            .sync_process_message(from1, peer1, &mut state2)
2239            .await
2240            .unwrap();
2241        assert!(from2.is_none());
2242        let events1 = drain(events1);
2243        let events2 = drain(events2);
2244        assert_eq!(events1.len(), 1);
2245        assert_eq!(events2.len(), 1);
2246        assert!(matches!(events1[0], Event::LocalInsert { .. }));
2247        assert!(matches!(events2[0], Event::LocalInsert { .. }));
2248        assert_eq!(state1.num_sent, 1);
2249        assert_eq!(state1.num_recv, 0);
2250        assert_eq!(state2.num_sent, 0);
2251        assert_eq!(state2.num_recv, 1);
2252        store1.flush()?;
2253        store2.flush()?;
2254        Ok(())
2255    }
2256
2257    #[test]
2258    fn test_replica_queries_mem() -> Result<()> {
2259        let store = store::Store::memory();
2260
2261        test_replica_queries(store)?;
2262        Ok(())
2263    }
2264
2265    #[test]
2266    fn test_replica_queries_fs() -> Result<()> {
2267        let dbfile = tempfile::NamedTempFile::new()?;
2268        let store = store::fs::Store::persistent(dbfile.path())?;
2269        test_replica_queries(store)?;
2270
2271        Ok(())
2272    }
2273
2274    fn test_replica_queries(mut store: Store) -> Result<()> {
2275        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2276        let namespace = NamespaceSecret::new(&mut rng);
2277        let namespace_id = namespace.id();
2278
2279        let a1 = store.new_author(&mut rng)?;
2280        let a2 = store.new_author(&mut rng)?;
2281        let a3 = store.new_author(&mut rng)?;
2282        println!(
2283            "a1 {} a2 {} a3 {}",
2284            a1.id().fmt_short(),
2285            a2.id().fmt_short(),
2286            a3.id().fmt_short()
2287        );
2288
2289        let mut replica = store.new_replica(namespace.clone())?;
2290        replica.hash_and_insert("hi/world", &a2, "a2")?;
2291        replica.hash_and_insert("hi/world", &a1, "a1")?;
2292        replica.hash_and_insert("hi/moon", &a2, "a1")?;
2293        replica.hash_and_insert("hi", &a3, "a3")?;
2294
2295        struct QueryTester<'a> {
2296            store: &'a mut Store,
2297            namespace: NamespaceId,
2298        }
2299        impl QueryTester<'_> {
2300            fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2301                let query = query.into();
2302                let actual = self
2303                    .store
2304                    .get_many(self.namespace, query.clone())
2305                    .unwrap()
2306                    .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2307                    .collect::<Result<Vec<_>>>()
2308                    .unwrap();
2309                let expected = expected
2310                    .into_iter()
2311                    .map(|(key, author)| (key.to_string(), author.id()))
2312                    .collect::<Vec<_>>();
2313                assert_eq!(actual, expected, "query: {query:#?}")
2314            }
2315        }
2316
2317        let mut qt = QueryTester {
2318            store: &mut store,
2319            namespace: namespace_id,
2320        };
2321
2322        qt.assert(
2323            Query::all(),
2324            vec![
2325                ("hi/world", &a1),
2326                ("hi/moon", &a2),
2327                ("hi/world", &a2),
2328                ("hi", &a3),
2329            ],
2330        );
2331
2332        qt.assert(
2333            Query::single_latest_per_key(),
2334            vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2335        );
2336
2337        qt.assert(
2338            Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2339            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2340        );
2341
2342        qt.assert(
2343            Query::single_latest_per_key().key_prefix("hi/"),
2344            vec![("hi/moon", &a2), ("hi/world", &a1)],
2345        );
2346
2347        qt.assert(
2348            Query::single_latest_per_key()
2349                .key_prefix("hi/")
2350                .sort_direction(SortDirection::Desc),
2351            vec![("hi/world", &a1), ("hi/moon", &a2)],
2352        );
2353
2354        qt.assert(
2355            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2356            vec![
2357                ("hi", &a3),
2358                ("hi/moon", &a2),
2359                ("hi/world", &a1),
2360                ("hi/world", &a2),
2361            ],
2362        );
2363
2364        qt.assert(
2365            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2366            vec![
2367                ("hi/world", &a2),
2368                ("hi/world", &a1),
2369                ("hi/moon", &a2),
2370                ("hi", &a3),
2371            ],
2372        );
2373
2374        qt.assert(
2375            Query::all().key_prefix("hi/"),
2376            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2377        );
2378
2379        qt.assert(
2380            Query::all().key_prefix("hi/").offset(1).limit(1),
2381            vec![("hi/moon", &a2)],
2382        );
2383
2384        qt.assert(
2385            Query::all()
2386                .key_prefix("hi/")
2387                .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2388            vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2389        );
2390
2391        qt.assert(
2392            Query::all()
2393                .key_prefix("hi/")
2394                .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2395                .offset(1)
2396                .limit(1),
2397            vec![("hi/world", &a1)],
2398        );
2399
2400        qt.assert(
2401            Query::all()
2402                .key_prefix("hi/")
2403                .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2404            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2405        );
2406
2407        qt.assert(
2408            Query::all()
2409                .key_prefix("hi/")
2410                .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2411            vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2412        );
2413
2414        qt.assert(
2415            Query::all()
2416                .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2417                .limit(2)
2418                .offset(1),
2419            vec![("hi/moon", &a2), ("hi/world", &a1)],
2420        );
2421
2422        let mut replica = store.new_replica(namespace)?;
2423        replica.delete_prefix("hi/world", &a2)?;
2424        let mut qt = QueryTester {
2425            store: &mut store,
2426            namespace: namespace_id,
2427        };
2428
2429        qt.assert(
2430            Query::all(),
2431            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2432        );
2433
2434        qt.assert(
2435            Query::all().include_empty(),
2436            vec![
2437                ("hi/world", &a1),
2438                ("hi/moon", &a2),
2439                ("hi/world", &a2),
2440                ("hi", &a3),
2441            ],
2442        );
2443        store.flush()?;
2444        Ok(())
2445    }
2446
2447    #[test]
2448    fn test_dl_policies_mem() -> Result<()> {
2449        let mut store = store::Store::memory();
2450        test_dl_policies(&mut store)
2451    }
2452
2453    #[test]
2454    fn test_dl_policies_fs() -> Result<()> {
2455        let dbfile = tempfile::NamedTempFile::new()?;
2456        let mut store = store::fs::Store::persistent(dbfile.path())?;
2457        test_dl_policies(&mut store)
2458    }
2459
2460    fn test_dl_policies(store: &mut Store) -> Result<()> {
2461        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2462        let namespace = NamespaceSecret::new(&mut rng);
2463        let id = namespace.id();
2464
2465        let filter = store::FilterKind::Exact("foo".into());
2466        let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2467        store
2468            .set_download_policy(&id, policy.clone())
2469            .expect_err("document dos not exist");
2470
2471        // now create the document
2472        store.new_replica(namespace)?;
2473
2474        store.set_download_policy(&id, policy.clone())?;
2475        let retrieved_policy = store.get_download_policy(&id)?;
2476        assert_eq!(retrieved_policy, policy);
2477        store.flush()?;
2478        Ok(())
2479    }
2480
2481    fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2482        expected.sort();
2483        assert_eq!(expected, get_keys_sorted(store, namespace));
2484    }
2485
2486    fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2487        let mut res = store
2488            .get_many(namespace, Query::all())
2489            .unwrap()
2490            .map(|e| e.map(|e| e.key().to_vec()))
2491            .collect::<Result<Vec<_>>>()
2492            .unwrap();
2493        res.sort();
2494        res
2495    }
2496
2497    fn get_entry(
2498        store: &mut Store,
2499        namespace: NamespaceId,
2500        author: AuthorId,
2501        key: &[u8],
2502    ) -> anyhow::Result<SignedEntry> {
2503        let entry = store
2504            .get_exact(namespace, author, key, true)?
2505            .ok_or_else(|| anyhow::anyhow!("not found"))?;
2506        Ok(entry)
2507    }
2508
2509    fn get_content_hash(
2510        store: &mut Store,
2511        namespace: NamespaceId,
2512        author: AuthorId,
2513        key: &[u8],
2514    ) -> anyhow::Result<Option<Hash>> {
2515        let hash = store
2516            .get_exact(namespace, author, key, false)?
2517            .map(|e| e.content_hash());
2518        Ok(hash)
2519    }
2520
2521    async fn sync<'a>(
2522        alice: &'a mut Replica<'a>,
2523        bob: &'a mut Replica<'a>,
2524    ) -> Result<(SyncOutcome, SyncOutcome)> {
2525        let alice_peer_id = [1u8; 32];
2526        let bob_peer_id = [2u8; 32];
2527        let mut alice_state = SyncOutcome::default();
2528        let mut bob_state = SyncOutcome::default();
2529        // Sync alice - bob
2530        let mut next_to_bob = Some(alice.sync_initial_message()?);
2531        let mut rounds = 0;
2532        while let Some(msg) = next_to_bob.take() {
2533            assert!(rounds < 100, "too many rounds");
2534            rounds += 1;
2535            println!("round {rounds}");
2536            if let Some(msg) = bob
2537                .sync_process_message(msg, alice_peer_id, &mut bob_state)
2538                .await?
2539            {
2540                next_to_bob = alice
2541                    .sync_process_message(msg, bob_peer_id, &mut alice_state)
2542                    .await?
2543            }
2544        }
2545        assert_eq!(alice_state.num_sent, bob_state.num_recv);
2546        assert_eq!(alice_state.num_recv, bob_state.num_sent);
2547        Ok((alice_state, bob_state))
2548    }
2549
2550    fn check_entries(
2551        store: &mut Store,
2552        namespace: &NamespaceId,
2553        author: &Author,
2554        set: &[&str],
2555    ) -> Result<()> {
2556        for el in set {
2557            store.get_exact(*namespace, author.id(), el, false)?;
2558        }
2559        Ok(())
2560    }
2561}