iroh_docs/
sync.rs

1//! API for iroh-docs replicas
2
3// Names and concepts are roughly based on Willows design at the moment:
4//
5// https://hackmd.io/DTtck8QOQm6tZaQBBtTf7w
6//
7// This is going to change!
8
9use std::{
10    cmp::Ordering,
11    fmt::Debug,
12    ops::{Deref, DerefMut},
13    sync::Arc,
14};
15
16use bytes::{Bytes, BytesMut};
17use ed25519_dalek::{Signature, SignatureError};
18use iroh_blobs::Hash;
19use n0_future::{
20    time::{Duration, SystemTime},
21    IterExt,
22};
23use serde::{Deserialize, Serialize};
24
25pub use crate::heads::AuthorHeads;
26use crate::{
27    keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
28    ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
29    store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
30};
31
32/// Protocol message for the set reconciliation protocol.
33///
34/// Can be serialized to bytes with [serde] to transfer between peers.
35pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
36
37/// Byte representation of an iroh `EndpointId`.
38// TODO: Consider `iroh::EndpointId` instead of raw bytes (`iroh` re-exports it from `iroh-base`).
39pub type PeerIdBytes = [u8; 32];
40
41/// Max time in the future from our wall clock time that we accept entries for.
42/// Value is 10 minutes.
43pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_micros() as u64;
44
45/// Callback that may be set on a replica to determine the availability status for a content hash.
46pub type ContentStatusCallback =
47    Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
48
49/// Event emitted by sync when entries are added.
50#[derive(derive_more::Debug, Clone)]
51pub enum Event {
52    /// A local entry has been added.
53    LocalInsert {
54        /// Document in which the entry was inserted.
55        namespace: NamespaceId,
56        /// Inserted entry.
57        entry: SignedEntry,
58    },
59    /// A remote entry has been added.
60    RemoteInsert {
61        /// Document in which the entry was inserted.
62        namespace: NamespaceId,
63        /// Inserted entry.
64        entry: SignedEntry,
65        /// Peer that provided the inserted entry.
66        /// Debug matches [`iroh::PublicKey::fmt_short`] (first 5 bytes, lower hex).
67        #[debug("{}", hex::encode(&from[..5]))]
68        from: PeerIdBytes,
69        /// Whether download policies require the content to be downloaded.
70        should_download: bool,
71        /// [`ContentStatus`] for this entry in the remote's replica.
72        remote_content_status: ContentStatus,
73    },
74}
75
76/// Whether an entry was inserted locally or by a remote peer.
77#[derive(derive_more::Debug, Clone)]
78pub enum InsertOrigin {
79    /// The entry was inserted locally.
80    Local,
81    /// The entry was received from the remote node identified by [`PeerIdBytes`].
82    Sync {
83        /// The peer from which we received this entry.
84        /// Debug matches [`iroh::PublicKey::fmt_short`] (first 5 bytes, lower hex).
85        #[debug("{}", hex::encode(&from[..5]))]
86        from: PeerIdBytes,
87        /// Whether the peer claims to have the content blob for this entry.
88        remote_content_status: ContentStatus,
89    },
90}
91
92/// Whether the content status is available on a node.
93#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
94pub enum ContentStatus {
95    /// The content is completely available.
96    Complete,
97    /// The content is partially available.
98    Incomplete,
99    /// The content is missing.
100    Missing,
101}
102
103/// Outcome of a sync operation.
104#[derive(Debug, Clone, Default)]
105pub struct SyncOutcome {
106    /// Timestamp of the latest entry for each author in the set we received.
107    pub heads_received: AuthorHeads,
108    /// Number of entries we received.
109    pub num_recv: usize,
110    /// Number of entries we sent.
111    pub num_sent: usize,
112}
113
114fn get_as_ptr<T>(value: &T) -> Option<usize> {
115    use std::mem;
116    if mem::size_of::<T>() == std::mem::size_of::<usize>()
117        && mem::align_of::<T>() == mem::align_of::<usize>()
118    {
119        // Safe only if size and alignment requirements are met
120        unsafe { Some(mem::transmute_copy(value)) }
121    } else {
122        None
123    }
124}
125
126fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
127    get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
128}
129
130#[derive(Debug, Default)]
131struct Subscribers(Vec<async_channel::Sender<Event>>);
132impl Subscribers {
133    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
134        self.0.push(sender)
135    }
136    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
137        self.0.retain(|s| !same_channel(s, sender));
138    }
139    pub async fn send(&mut self, event: Event) {
140        self.0 = std::mem::take(&mut self.0)
141            .into_iter()
142            .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx))
143            .join_all()
144            .await
145            .into_iter()
146            .flatten()
147            .collect();
148    }
149    pub fn len(&self) -> usize {
150        self.0.len()
151    }
152    pub async fn send_with(&mut self, f: impl FnOnce() -> Event) {
153        if !self.0.is_empty() {
154            self.send(f()).await
155        }
156    }
157}
158
159/// Kind of capability of the namespace.
160#[derive(
161    Debug,
162    Clone,
163    Copy,
164    Serialize,
165    Deserialize,
166    num_enum::IntoPrimitive,
167    num_enum::TryFromPrimitive,
168    strum::Display,
169)]
170#[repr(u8)]
171#[strum(serialize_all = "snake_case")]
172pub enum CapabilityKind {
173    /// A writable replica.
174    Write = 1,
175    /// A readable replica.
176    Read = 2,
177}
178
179/// The capability of the namespace.
180#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
181pub enum Capability {
182    /// Write access to the namespace.
183    Write(NamespaceSecret),
184    /// Read only access to the namespace.
185    Read(NamespaceId),
186}
187
188impl Capability {
189    /// Get the [`NamespaceId`] for this [`Capability`].
190    pub fn id(&self) -> NamespaceId {
191        match self {
192            Capability::Write(secret) => secret.id(),
193            Capability::Read(id) => *id,
194        }
195    }
196
197    /// Get the [`NamespaceSecret`] of this [`Capability`].
198    /// Will fail if the [`Capability`] is read only.
199    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
200        match self {
201            Capability::Write(secret) => Ok(secret),
202            Capability::Read(_) => Err(ReadOnly),
203        }
204    }
205
206    /// Get the kind of capability.
207    pub fn kind(&self) -> CapabilityKind {
208        match self {
209            Capability::Write(_) => CapabilityKind::Write,
210            Capability::Read(_) => CapabilityKind::Read,
211        }
212    }
213
214    /// Get the raw representation of this namespace capability.
215    pub fn raw(&self) -> (u8, [u8; 32]) {
216        let capability_repr: u8 = self.kind().into();
217        let bytes = match self {
218            Capability::Write(secret) => secret.to_bytes(),
219            Capability::Read(id) => id.to_bytes(),
220        };
221        (capability_repr, bytes)
222    }
223
224    /// Create a [`Capability`] from its raw representation.
225    pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
226        let kind: CapabilityKind = kind.try_into()?;
227        let capability = match kind {
228            CapabilityKind::Write => {
229                let secret = NamespaceSecret::from_bytes(bytes);
230                Capability::Write(secret)
231            }
232            CapabilityKind::Read => {
233                let id = NamespaceId::from(bytes);
234                Capability::Read(id)
235            }
236        };
237        Ok(capability)
238    }
239
240    /// Merge this capability with another capability.
241    ///
242    /// Will return an error if `other` is not a capability for the same namespace.
243    ///
244    /// Returns `true` if the capability was changed, `false` otherwise.
245    pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
246        if other.id() != self.id() {
247            return Err(CapabilityError::NamespaceMismatch);
248        }
249
250        // the only capability upgrade is from read-only (self) to writable (other)
251        if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
252            let _ = std::mem::replace(self, other);
253            Ok(true)
254        } else {
255            Ok(false)
256        }
257    }
258}
259
260/// Errors for capability operations
261#[derive(Debug, thiserror::Error)]
262pub enum CapabilityError {
263    /// Namespaces are not the same
264    #[error("Namespaces are not the same")]
265    NamespaceMismatch,
266}
267
268/// In memory information about an open replica.
269#[derive(derive_more::Debug)]
270pub struct ReplicaInfo {
271    pub(crate) capability: Capability,
272    subscribers: Subscribers,
273    #[debug("ContentStatusCallback")]
274    content_status_cb: Option<ContentStatusCallback>,
275    closed: bool,
276}
277
278impl ReplicaInfo {
279    /// Create a new replica.
280    pub fn new(capability: Capability) -> Self {
281        Self {
282            capability,
283            subscribers: Default::default(),
284            // on_insert_sender: RwLock::new(None),
285            content_status_cb: None,
286            closed: false,
287        }
288    }
289
290    /// Subscribe to insert events.
291    ///
292    /// When subscribing to a replica, you must ensure that the corresponding [`async_channel::Receiver`] is
293    /// received from in a loop. If not receiving, local and remote inserts will hang waiting for
294    /// the receiver to be received from.
295    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
296        self.subscribers.subscribe(sender)
297    }
298
299    /// Explicitly unsubscribe a sender.
300    ///
301    /// Simply dropping the receiver is fine too. If you cloned a single sender to subscribe to
302    /// multiple replicas, you can use this method to explicitly unsubscribe the sender from
303    /// this replica without having to drop the receiver.
304    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
305        self.subscribers.unsubscribe(sender)
306    }
307
308    /// Get the number of current event subscribers.
309    pub fn subscribers_count(&self) -> usize {
310        self.subscribers.len()
311    }
312
313    /// Set the content status callback.
314    ///
315    /// Only one callback can be active at a time. If a previous callback was registered, this
316    /// will return `false`.
317    pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
318        if self.content_status_cb.is_some() {
319            false
320        } else {
321            self.content_status_cb = Some(cb);
322            true
323        }
324    }
325
326    fn ensure_open(&self) -> Result<(), InsertError> {
327        if self.closed() {
328            Err(InsertError::Closed)
329        } else {
330            Ok(())
331        }
332    }
333
334    /// Returns true if the replica is closed.
335    ///
336    /// If a replica is closed, no further operations can be performed. A replica cannot be closed
337    /// manually, it must be closed via [`store::Store::close_replica`] or
338    /// [`store::Store::remove_replica`]
339    pub fn closed(&self) -> bool {
340        self.closed
341    }
342
343    /// Merge a capability.
344    ///
345    /// The capability must refer to the the same namespace, otherwise an error will be returned.
346    ///
347    /// This will upgrade the replica's capability when passing a `Capability::Write`.
348    /// It is a no-op if `capability` is a Capability::Read`.
349    pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
350        self.capability.merge(capability)
351    }
352}
353
354/// Local representation of a mutable, synchronizable key-value store.
355#[derive(derive_more::Debug)]
356pub struct Replica<'a, I = Box<ReplicaInfo>> {
357    pub(crate) store: StoreInstance<'a>,
358    pub(crate) info: I,
359}
360
361impl<'a, I> Replica<'a, I>
362where
363    I: Deref<Target = ReplicaInfo> + DerefMut,
364{
365    /// Create a new replica.
366    pub fn new(store: StoreInstance<'a>, info: I) -> Self {
367        Replica { info, store }
368    }
369
370    /// Insert a new record at the given key.
371    ///
372    /// The entry will by signed by the provided `author`.
373    /// The `len` must be the byte length of the data identified by `hash`.
374    ///
375    /// Returns the number of entries removed as a consequence of this insertion,
376    /// or an error either if the entry failed to validate or if a store operation failed.
377    pub async fn insert(
378        &mut self,
379        key: impl AsRef<[u8]>,
380        author: &Author,
381        hash: Hash,
382        len: u64,
383    ) -> Result<usize, InsertError> {
384        if len == 0 || hash == Hash::EMPTY {
385            return Err(InsertError::EntryIsEmpty);
386        }
387        self.info.ensure_open()?;
388        let id = RecordIdentifier::new(self.id(), author.id(), key);
389        let record = Record::new_current(hash, len);
390        let entry = Entry::new(id, record);
391        let secret = self.secret_key()?;
392        let signed_entry = entry.sign(secret, author);
393        self.insert_entry(signed_entry, InsertOrigin::Local).await
394    }
395
396    /// Delete entries that match the given `author` and key `prefix`.
397    ///
398    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
399    /// entries whose key starts with or is equal to the given `prefix`.
400    ///
401    /// Returns the number of entries deleted.
402    pub async fn delete_prefix(
403        &mut self,
404        prefix: impl AsRef<[u8]>,
405        author: &Author,
406    ) -> Result<usize, InsertError> {
407        self.info.ensure_open()?;
408        let id = RecordIdentifier::new(self.id(), author.id(), prefix);
409        let entry = Entry::new_empty(id);
410        let signed_entry = entry.sign(self.secret_key()?, author);
411        self.insert_entry(signed_entry, InsertOrigin::Local).await
412    }
413
414    /// Insert an entry into this replica which was received from a remote peer.
415    ///
416    /// This will verify both the namespace and author signatures of the entry, emit an `on_insert`
417    /// event, and insert the entry into the replica store.
418    ///
419    /// Returns the number of entries removed as a consequence of this insertion,
420    /// or an error if the entry failed to validate or if a store operation failed.
421    pub async fn insert_remote_entry(
422        &mut self,
423        entry: SignedEntry,
424        received_from: PeerIdBytes,
425        content_status: ContentStatus,
426    ) -> Result<usize, InsertError> {
427        self.info.ensure_open()?;
428        entry.validate_empty()?;
429        let origin = InsertOrigin::Sync {
430            from: received_from,
431            remote_content_status: content_status,
432        };
433        self.insert_entry(entry, origin).await
434    }
435
436    /// Insert a signed entry into the database.
437    ///
438    /// Returns the number of entries removed as a consequence of this insertion.
439    async fn insert_entry(
440        &mut self,
441        entry: SignedEntry,
442        origin: InsertOrigin,
443    ) -> Result<usize, InsertError> {
444        let namespace = self.id();
445
446        let store = &self.store;
447        validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
448
449        let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
450        tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
451
452        let removed_count = match outcome {
453            InsertOutcome::Inserted { removed } => removed,
454            InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
455        };
456
457        let insert_event = match origin {
458            InsertOrigin::Local => Event::LocalInsert { namespace, entry },
459            InsertOrigin::Sync {
460                from,
461                remote_content_status,
462            } => {
463                let download_policy = self
464                    .store
465                    .get_download_policy(&self.id())
466                    .unwrap_or_default();
467                let should_download = download_policy.matches(entry.entry());
468                Event::RemoteInsert {
469                    namespace,
470                    entry,
471                    from,
472                    should_download,
473                    remote_content_status,
474                }
475            }
476        };
477
478        self.info.subscribers.send(insert_event).await;
479
480        Ok(removed_count)
481    }
482
483    /// Hashes the given data and inserts it.
484    ///
485    /// This does not store the content, just the record of it.
486    /// Returns the calculated hash.
487    pub async fn hash_and_insert(
488        &mut self,
489        key: impl AsRef<[u8]>,
490        author: &Author,
491        data: impl AsRef<[u8]>,
492    ) -> Result<Hash, InsertError> {
493        self.info.ensure_open()?;
494        let len = data.as_ref().len() as u64;
495        let hash = Hash::new(data);
496        self.insert(key, author, hash, len).await?;
497        Ok(hash)
498    }
499
500    /// Get the identifier for an entry in this replica.
501    pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
502        RecordIdentifier::new(self.info.capability.id(), author.id(), key)
503    }
504
505    /// Create the initial message for the set reconciliation flow with a remote peer.
506    pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
507        self.info.ensure_open().map_err(anyhow::Error::from)?;
508        self.store.initial_message()
509    }
510
511    /// Process a set reconciliation message from a remote peer.
512    ///
513    /// Returns the next message to be sent to the peer, if any.
514    pub async fn sync_process_message(
515        &mut self,
516        message: crate::ranger::Message<SignedEntry>,
517        from_peer: PeerIdBytes,
518        state: &mut SyncOutcome,
519    ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
520        self.info.ensure_open()?;
521        let my_namespace = self.id();
522        let now = system_time_now();
523
524        // update state with incoming data.
525        state.num_recv += message.value_count();
526        for (entry, _content_status) in message.values() {
527            state
528                .heads_received
529                .insert(entry.author(), entry.timestamp());
530        }
531
532        // let subscribers = std::rc::Rc::new(&mut self.subscribers);
533        // l
534        let cb = self.info.content_status_cb.clone();
535        let download_policy = self
536            .store
537            .get_download_policy(&my_namespace)
538            .unwrap_or_default();
539        let reply = self
540            .store
541            .process_message(
542                &Default::default(),
543                message,
544                // validate callback: validate incoming entries, and send to on_insert channel
545                |store, entry, content_status| {
546                    let origin = InsertOrigin::Sync {
547                        from: from_peer,
548                        remote_content_status: content_status,
549                    };
550                    validate_entry(now, store, my_namespace, entry, &origin).is_ok()
551                },
552                // on_insert callback: is called when an entry was actually inserted in the store
553                async |_store, entry, content_status| {
554                    // We use `send_with` to only clone the entry if we have active subscriptions.
555                    self.info
556                        .subscribers
557                        .send_with(|| {
558                            let should_download = download_policy.matches(entry.entry());
559                            Event::RemoteInsert {
560                                from: from_peer,
561                                namespace: my_namespace,
562                                entry: entry.clone(),
563                                should_download,
564                                remote_content_status: content_status,
565                            }
566                        })
567                        .await
568                },
569                // content_status callback: get content status for outgoing entries
570                async move |entry| {
571                    if let Some(cb) = cb.as_ref() {
572                        cb(entry.content_hash()).await
573                    } else {
574                        ContentStatus::Missing
575                    }
576                },
577            )
578            .await?;
579
580        // update state with outgoing data.
581        if let Some(ref reply) = reply {
582            state.num_sent += reply.value_count();
583        }
584
585        Ok(reply)
586    }
587
588    /// Get the namespace identifier for this [`Replica`].
589    pub fn id(&self) -> NamespaceId {
590        self.info.capability.id()
591    }
592
593    /// Get the [`Capability`] of this [`Replica`].
594    pub fn capability(&self) -> &Capability {
595        &self.info.capability
596    }
597
598    /// Get the byte representation of the [`NamespaceSecret`] key for this replica. Will fail if
599    /// the replica is read only
600    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
601        self.info.capability.secret_key()
602    }
603}
604
605/// Error that occurs trying to access the [`NamespaceSecret`] of a read-only [`Capability`].
606#[derive(Debug, thiserror::Error)]
607#[error("Replica allows read access only.")]
608pub struct ReadOnly;
609
610/// Validate a [`SignedEntry`] if it's fit to be inserted.
611///
612/// This validates that
613/// * the entry's author and namespace signatures are correct
614/// * the entry's namespace matches the current replica
615/// * the entry's timestamp is not more than 10 minutes in the future of our system time
616/// * the entry is newer than an existing entry for the same key and author, if such exists.
617fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
618    now: u64,
619    store: &S,
620    expected_namespace: NamespaceId,
621    entry: &SignedEntry,
622    origin: &InsertOrigin,
623) -> Result<(), ValidationFailure> {
624    // Verify the namespace
625    if entry.namespace() != expected_namespace {
626        return Err(ValidationFailure::InvalidNamespace);
627    }
628
629    // Verify signature for non-local entries.
630    if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
631        return Err(ValidationFailure::BadSignature);
632    }
633
634    // Verify that the timestamp of the entry is not too far in the future.
635    if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
636        return Err(ValidationFailure::TooFarInTheFuture);
637    }
638    Ok(())
639}
640
641/// Error emitted when inserting entries into a [`Replica`] failed
642#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
643pub enum InsertError {
644    /// Storage error
645    #[error("storage error")]
646    Store(anyhow::Error),
647    /// Validation failure
648    #[error("validation failure")]
649    Validation(#[from] ValidationFailure),
650    /// A newer entry exists for either this entry's key or a prefix of the key.
651    #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
652    NewerEntryExists,
653    /// Attempted to insert an empty entry.
654    #[error("Attempted to insert an empty entry")]
655    EntryIsEmpty,
656    /// Replica is read only.
657    #[error("Attempted to insert to read only replica")]
658    #[from(ReadOnly)]
659    ReadOnly,
660    /// The replica is closed, no operations may be performed.
661    #[error("replica is closed")]
662    Closed,
663}
664
665/// Reason why entry validation failed
666#[derive(thiserror::Error, Debug)]
667pub enum ValidationFailure {
668    /// Entry namespace does not match the current replica.
669    #[error("Entry namespace does not match the current replica")]
670    InvalidNamespace,
671    /// Entry signature is invalid.
672    #[error("Entry signature is invalid")]
673    BadSignature,
674    /// Entry timestamp is too far in the future.
675    #[error("Entry timestamp is too far in the future.")]
676    TooFarInTheFuture,
677    /// Entry has length 0 but not the empty hash, or the empty hash but not length 0.
678    #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
679    InvalidEmptyEntry,
680}
681
682/// A signed entry.
683#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
684pub struct SignedEntry {
685    signature: EntrySignature,
686    entry: Entry,
687}
688
689impl From<SignedEntry> for Entry {
690    fn from(value: SignedEntry) -> Self {
691        value.entry
692    }
693}
694
695impl PartialOrd for SignedEntry {
696    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
697        Some(self.cmp(other))
698    }
699}
700
701impl Ord for SignedEntry {
702    fn cmp(&self, other: &Self) -> Ordering {
703        self.entry.cmp(&other.entry)
704    }
705}
706
707impl PartialOrd for Entry {
708    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
709        Some(self.cmp(other))
710    }
711}
712
713impl Ord for Entry {
714    fn cmp(&self, other: &Self) -> Ordering {
715        self.id
716            .cmp(&other.id)
717            .then_with(|| self.record.cmp(&other.record))
718    }
719}
720
721impl SignedEntry {
722    pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
723        SignedEntry { signature, entry }
724    }
725
726    /// Create a new signed entry by signing an entry with the `namespace` and `author`.
727    pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
728        let signature = EntrySignature::from_entry(&entry, namespace, author);
729        SignedEntry { signature, entry }
730    }
731
732    /// Create a new signed entries from its parts.
733    pub fn from_parts(
734        namespace: &NamespaceSecret,
735        author: &Author,
736        key: impl AsRef<[u8]>,
737        record: Record,
738    ) -> Self {
739        let id = RecordIdentifier::new(namespace.id(), author.id(), key);
740        let entry = Entry::new(id, record);
741        Self::from_entry(entry, namespace, author)
742    }
743
744    /// Verify the signatures on this entry.
745    pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
746        self.signature.verify(
747            &self.entry,
748            &self.entry.namespace().public_key(store)?,
749            &self.entry.author().public_key(store)?,
750        )
751    }
752
753    /// Get the signature.
754    pub fn signature(&self) -> &EntrySignature {
755        &self.signature
756    }
757
758    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
759    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
760        self.entry().validate_empty()
761    }
762
763    /// Get the [`Entry`].
764    pub fn entry(&self) -> &Entry {
765        &self.entry
766    }
767
768    /// Get the content [`struct@Hash`] of the entry.
769    pub fn content_hash(&self) -> Hash {
770        self.entry().content_hash()
771    }
772
773    /// Get the content length of the entry.
774    pub fn content_len(&self) -> u64 {
775        self.entry().content_len()
776    }
777
778    /// Get the author bytes of this entry.
779    pub fn author_bytes(&self) -> AuthorId {
780        self.entry().id().author()
781    }
782
783    /// Get the key of the entry.
784    pub fn key(&self) -> &[u8] {
785        self.entry().id().key()
786    }
787
788    /// Get the timestamp of the entry.
789    pub fn timestamp(&self) -> u64 {
790        self.entry().timestamp()
791    }
792}
793
794impl RangeEntry for SignedEntry {
795    type Key = RecordIdentifier;
796    type Value = Record;
797
798    fn key(&self) -> &Self::Key {
799        &self.entry.id
800    }
801
802    fn value(&self) -> &Self::Value {
803        &self.entry.record
804    }
805
806    fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
807        let mut hasher = blake3::Hasher::new();
808        hasher.update(self.namespace().as_ref());
809        hasher.update(self.author_bytes().as_ref());
810        hasher.update(self.key());
811        hasher.update(&self.timestamp().to_be_bytes());
812        hasher.update(self.content_hash().as_bytes());
813        Fingerprint(hasher.finalize().into())
814    }
815}
816
817/// Signature over an entry.
818#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
819pub struct EntrySignature {
820    author_signature: Signature,
821    namespace_signature: Signature,
822}
823
824impl Debug for EntrySignature {
825    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826        f.debug_struct("EntrySignature")
827            .field(
828                "namespace_signature",
829                &hex::encode(self.namespace_signature.to_bytes()),
830            )
831            .field(
832                "author_signature",
833                &hex::encode(self.author_signature.to_bytes()),
834            )
835            .finish()
836    }
837}
838
839impl EntrySignature {
840    /// Create a new signature by signing an entry with the `namespace` and `author`.
841    pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
842        // TODO: this should probably include a namespace prefix
843        // namespace in the cryptographic sense.
844        let bytes = entry.to_vec();
845        let namespace_signature = namespace.sign(&bytes);
846        let author_signature = author.sign(&bytes);
847
848        EntrySignature {
849            author_signature,
850            namespace_signature,
851        }
852    }
853
854    /// Verify that this signature was created by signing the `entry` with the
855    /// secret keys of the specified `author` and `namespace`.
856    pub fn verify(
857        &self,
858        entry: &Entry,
859        namespace: &NamespacePublicKey,
860        author: &AuthorPublicKey,
861    ) -> Result<(), SignatureError> {
862        let bytes = entry.to_vec();
863        namespace.verify(&bytes, &self.namespace_signature)?;
864        author.verify(&bytes, &self.author_signature)?;
865
866        Ok(())
867    }
868
869    pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
870        let namespace_signature = Signature::from_bytes(namespace_sig);
871        let author_signature = Signature::from_bytes(author_sig);
872
873        EntrySignature {
874            author_signature,
875            namespace_signature,
876        }
877    }
878
879    pub(crate) fn author(&self) -> &Signature {
880        &self.author_signature
881    }
882
883    pub(crate) fn namespace(&self) -> &Signature {
884        &self.namespace_signature
885    }
886}
887
888/// A single entry in a [`Replica`]
889///
890/// An entry is identified by a key, its [`Author`], and the [`Replica`]'s
891/// [`NamespaceSecret`]. Its value is the [32-byte BLAKE3 hash](iroh_blobs::Hash)
892/// of the entry's content data, the size of this content data, and a timestamp.
893#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
894pub struct Entry {
895    id: RecordIdentifier,
896    record: Record,
897}
898
899impl Entry {
900    /// Create a new entry
901    pub fn new(id: RecordIdentifier, record: Record) -> Self {
902        Entry { id, record }
903    }
904
905    /// Create a new empty entry with the current timestamp.
906    pub fn new_empty(id: RecordIdentifier) -> Self {
907        Entry {
908            id,
909            record: Record::empty_current(),
910        }
911    }
912
913    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
914    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
915        match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
916            (true, true) => Ok(()),
917            (false, false) => Ok(()),
918            (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
919            (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
920        }
921    }
922
923    /// Get the [`RecordIdentifier`] for this entry.
924    pub fn id(&self) -> &RecordIdentifier {
925        &self.id
926    }
927
928    /// Get the [`NamespaceId`] of this entry.
929    pub fn namespace(&self) -> NamespaceId {
930        self.id.namespace()
931    }
932
933    /// Get the [`AuthorId`] of this entry.
934    pub fn author(&self) -> AuthorId {
935        self.id.author()
936    }
937
938    /// Get the key of this entry.
939    pub fn key(&self) -> &[u8] {
940        self.id.key()
941    }
942
943    /// Get the [`Record`] contained in this entry.
944    pub fn record(&self) -> &Record {
945        &self.record
946    }
947
948    /// Get the content hash of the record.
949    pub fn content_hash(&self) -> Hash {
950        self.record.hash
951    }
952
953    /// Get the content length of the record.
954    pub fn content_len(&self) -> u64 {
955        self.record.len
956    }
957
958    /// Get the timestamp of the record.
959    pub fn timestamp(&self) -> u64 {
960        self.record.timestamp
961    }
962
963    /// Serialize this entry into its canonical byte representation used for signing.
964    pub fn encode(&self, out: &mut Vec<u8>) {
965        self.id.encode(out);
966        self.record.encode(out);
967    }
968
969    /// Serialize this entry into a new vector with its canonical byte representation.
970    pub fn to_vec(&self) -> Vec<u8> {
971        let mut out = Vec::new();
972        self.encode(&mut out);
973        out
974    }
975
976    /// Sign this entry with a [`NamespaceSecret`] and [`Author`].
977    pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
978        SignedEntry::from_entry(self, namespace, author)
979    }
980}
981
982const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
983const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
984const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
985
986/// The identifier of a record.
987#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
988pub struct RecordIdentifier(Bytes);
989
990impl Default for RecordIdentifier {
991    fn default() -> Self {
992        Self::new(NamespaceId::default(), AuthorId::default(), b"")
993    }
994}
995
996impl Debug for RecordIdentifier {
997    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
998        f.debug_struct("RecordIdentifier")
999            .field("namespace", &self.namespace())
1000            .field("author", &self.author())
1001            .field("key", &std::string::String::from_utf8_lossy(self.key()))
1002            .finish()
1003    }
1004}
1005
1006impl RangeKey for RecordIdentifier {
1007    #[cfg(test)]
1008    fn is_prefix_of(&self, other: &Self) -> bool {
1009        other.as_ref().starts_with(self.as_ref())
1010    }
1011}
1012
1013fn system_time_now() -> u64 {
1014    SystemTime::now()
1015        .duration_since(SystemTime::UNIX_EPOCH)
1016        .expect("time drift")
1017        .as_micros() as u64
1018}
1019
1020impl RecordIdentifier {
1021    /// Create a new [`RecordIdentifier`].
1022    pub fn new(
1023        namespace: impl Into<NamespaceId>,
1024        author: impl Into<AuthorId>,
1025        key: impl AsRef<[u8]>,
1026    ) -> Self {
1027        let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1028        bytes.extend_from_slice(namespace.into().as_bytes());
1029        bytes.extend_from_slice(author.into().as_bytes());
1030        bytes.extend_from_slice(key.as_ref());
1031        Self(bytes.freeze())
1032    }
1033
1034    /// Serialize this [`RecordIdentifier`] into a mutable byte array.
1035    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1036        out.extend_from_slice(&self.0);
1037    }
1038
1039    /// Get this [`RecordIdentifier`] as [Bytes].
1040    pub fn as_bytes(&self) -> Bytes {
1041        self.0.clone()
1042    }
1043
1044    /// Get this [`RecordIdentifier`] as a tuple of byte slices.
1045    pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1046        (
1047            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1048            self.0[AUTHOR_BYTES].try_into().unwrap(),
1049            &self.0[KEY_BYTES],
1050        )
1051    }
1052
1053    /// Get this [`RecordIdentifier`] as a tuple of bytes.
1054    pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1055        (
1056            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1057            self.0[AUTHOR_BYTES].try_into().unwrap(),
1058            self.0.slice(KEY_BYTES),
1059        )
1060    }
1061
1062    /// Get the key of this record.
1063    pub fn key(&self) -> &[u8] {
1064        &self.0[KEY_BYTES]
1065    }
1066
1067    /// Get the key of this record as [`Bytes`].
1068    pub fn key_bytes(&self) -> Bytes {
1069        self.0.slice(KEY_BYTES)
1070    }
1071
1072    /// Get the [`NamespaceId`] of this record as byte array.
1073    pub fn namespace(&self) -> NamespaceId {
1074        let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1075        value.into()
1076    }
1077
1078    /// Get the [`AuthorId`] of this record as byte array.
1079    pub fn author(&self) -> AuthorId {
1080        let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1081        value.into()
1082    }
1083}
1084
1085impl AsRef<[u8]> for RecordIdentifier {
1086    fn as_ref(&self) -> &[u8] {
1087        &self.0
1088    }
1089}
1090
1091impl Deref for SignedEntry {
1092    type Target = Entry;
1093    fn deref(&self) -> &Self::Target {
1094        &self.entry
1095    }
1096}
1097
1098impl Deref for Entry {
1099    type Target = Record;
1100    fn deref(&self) -> &Self::Target {
1101        &self.record
1102    }
1103}
1104
1105/// The data part of an entry in a [`Replica`].
1106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1107pub struct Record {
1108    /// Length of the data referenced by `hash`.
1109    len: u64,
1110    /// Hash of the content data.
1111    hash: Hash,
1112    /// Record creation timestamp. Counted as micros since the Unix epoch.
1113    timestamp: u64,
1114}
1115
1116impl RangeValue for Record {}
1117
1118/// Ordering for entry values.
1119///
1120/// Compares first the timestamp, then the content hash.
1121impl Ord for Record {
1122    fn cmp(&self, other: &Self) -> Ordering {
1123        self.timestamp
1124            .cmp(&other.timestamp)
1125            .then_with(|| self.hash.cmp(&other.hash))
1126    }
1127}
1128
1129impl PartialOrd for Record {
1130    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1131        Some(self.cmp(other))
1132    }
1133}
1134
1135impl Record {
1136    /// Create a new record.
1137    pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1138        debug_assert!(
1139            len != 0 || hash == Hash::EMPTY,
1140            "if `len` is 0 then `hash` must be the hash of the empty byte range"
1141        );
1142        Record {
1143            hash,
1144            len,
1145            timestamp,
1146        }
1147    }
1148
1149    /// Create a tombstone record (empty content)
1150    pub fn empty(timestamp: u64) -> Self {
1151        Self::new(Hash::EMPTY, 0, timestamp)
1152    }
1153
1154    /// Create a tombstone record with the timestamp set to now.
1155    pub fn empty_current() -> Self {
1156        Self::new_current(Hash::EMPTY, 0)
1157    }
1158
1159    /// Return `true` if the entry is empty.
1160    pub fn is_empty(&self) -> bool {
1161        self.hash == Hash::EMPTY
1162    }
1163
1164    /// Create a new [`Record`] with the timestamp set to now.
1165    pub fn new_current(hash: Hash, len: u64) -> Self {
1166        let timestamp = system_time_now();
1167        Self::new(hash, len, timestamp)
1168    }
1169
1170    /// Get the length of the data addressed by this record's content hash.
1171    pub fn content_len(&self) -> u64 {
1172        self.len
1173    }
1174
1175    /// Get the [`struct@Hash`] of the content data of this record.
1176    pub fn content_hash(&self) -> Hash {
1177        self.hash
1178    }
1179
1180    /// Get the timestamp of this record.
1181    pub fn timestamp(&self) -> u64 {
1182        self.timestamp
1183    }
1184
1185    #[cfg(test)]
1186    pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1187        let len = data.as_ref().len() as u64;
1188        let hash = Hash::new(data);
1189        Self::new_current(hash, len)
1190    }
1191
1192    #[cfg(test)]
1193    pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1194        let len = data.as_ref().len() as u64;
1195        let hash = Hash::new(data);
1196        Self::new(hash, len, timestamp)
1197    }
1198
1199    /// Serialize this record into a mutable byte array.
1200    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1201        out.extend_from_slice(&self.len.to_be_bytes());
1202        out.extend_from_slice(self.hash.as_ref());
1203        out.extend_from_slice(&self.timestamp.to_be_bytes())
1204    }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use std::collections::HashSet;
1210
1211    use anyhow::Result;
1212    use rand::SeedableRng;
1213
1214    use super::*;
1215    use crate::{
1216        actor::SyncHandle,
1217        ranger::{Range, Store as _},
1218        store::{OpenError, Query, SortBy, SortDirection, Store},
1219    };
1220
1221    #[tokio::test]
1222    async fn test_basics_memory() -> Result<()> {
1223        let store = store::Store::memory();
1224        test_basics(store).await?;
1225
1226        Ok(())
1227    }
1228
1229    #[tokio::test]
1230    #[cfg(feature = "fs-store")]
1231    async fn test_basics_fs() -> Result<()> {
1232        let dbfile = tempfile::NamedTempFile::new()?;
1233        let store = store::fs::Store::persistent(dbfile.path())?;
1234        test_basics(store).await?;
1235        Ok(())
1236    }
1237
1238    async fn test_basics(mut store: Store) -> Result<()> {
1239        let mut rng = rand::rng();
1240        let alice = Author::new(&mut rng);
1241        let bob = Author::new(&mut rng);
1242        let myspace = NamespaceSecret::new(&mut rng);
1243
1244        let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1245        let record = Record::current_from_data(b"this is my cool data");
1246        let entry = Entry::new(record_id, record);
1247        let signed_entry = entry.sign(&myspace, &alice);
1248        signed_entry.verify(&()).expect("failed to verify");
1249
1250        let mut my_replica = store.new_replica(myspace.clone())?;
1251        for i in 0..10 {
1252            my_replica
1253                .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice"))
1254                .await?;
1255        }
1256
1257        for i in 0..10 {
1258            let res = store
1259                .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1260                .unwrap();
1261            let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1262            assert_eq!(res.entry().record().content_len(), len);
1263            res.verify(&())?;
1264        }
1265
1266        // Test multiple records for the same key
1267        let mut my_replica = store.new_replica(myspace.clone())?;
1268        my_replica
1269            .hash_and_insert("/cool/path", &alice, "round 1")
1270            .await?;
1271        let _entry = store
1272            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1273            .unwrap();
1274        // Second
1275        let mut my_replica = store.new_replica(myspace.clone())?;
1276        my_replica
1277            .hash_and_insert("/cool/path", &alice, "round 2")
1278            .await?;
1279        let _entry = store
1280            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1281            .unwrap();
1282
1283        // Get All by author
1284        let entries: Vec<_> = store
1285            .get_many(myspace.id(), Query::author(alice.id()))?
1286            .collect::<Result<_>>()?;
1287        assert_eq!(entries.len(), 11);
1288
1289        // Get All by author
1290        let entries: Vec<_> = store
1291            .get_many(myspace.id(), Query::author(bob.id()))?
1292            .collect::<Result<_>>()?;
1293        assert_eq!(entries.len(), 0);
1294
1295        // Get All by key
1296        let entries: Vec<_> = store
1297            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1298            .collect::<Result<_>>()?;
1299        assert_eq!(entries.len(), 1);
1300
1301        // Get All
1302        let entries: Vec<_> = store
1303            .get_many(myspace.id(), Query::all())?
1304            .collect::<Result<_>>()?;
1305        assert_eq!(entries.len(), 11);
1306
1307        // insert record from different author
1308        let mut my_replica = store.new_replica(myspace.clone())?;
1309        let _entry = my_replica
1310            .hash_and_insert("/cool/path", &bob, "bob round 1")
1311            .await?;
1312
1313        // Get All by author
1314        let entries: Vec<_> = store
1315            .get_many(myspace.id(), Query::author(alice.id()))?
1316            .collect::<Result<_>>()?;
1317        assert_eq!(entries.len(), 11);
1318
1319        let entries: Vec<_> = store
1320            .get_many(myspace.id(), Query::author(bob.id()))?
1321            .collect::<Result<_>>()?;
1322        assert_eq!(entries.len(), 1);
1323
1324        // Get All by key
1325        let entries: Vec<_> = store
1326            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1327            .collect::<Result<_>>()?;
1328        assert_eq!(entries.len(), 2);
1329
1330        // Get all by prefix
1331        let entries: Vec<_> = store
1332            .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1333            .collect::<Result<_>>()?;
1334        assert_eq!(entries.len(), 2);
1335
1336        // Get All by author and prefix
1337        let entries: Vec<_> = store
1338            .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1339            .collect::<Result<_>>()?;
1340        assert_eq!(entries.len(), 1);
1341
1342        let entries: Vec<_> = store
1343            .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1344            .collect::<Result<_>>()?;
1345        assert_eq!(entries.len(), 1);
1346
1347        // Get All
1348        let entries: Vec<_> = store
1349            .get_many(myspace.id(), Query::all())?
1350            .collect::<Result<_>>()?;
1351        assert_eq!(entries.len(), 12);
1352
1353        // Get Range of all should return all latest
1354        let mut my_replica = store.new_replica(myspace.clone())?;
1355        let entries_second: Vec<_> = my_replica
1356            .store
1357            .get_range(Range::new(
1358                RecordIdentifier::default(),
1359                RecordIdentifier::default(),
1360            ))?
1361            .collect::<Result<_, _>>()?;
1362
1363        assert_eq!(entries_second.len(), 12);
1364        assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1365
1366        test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1367        store.flush()?;
1368        Ok(())
1369    }
1370
1371    /// Test that [`Store::register_useful_peer`] behaves like a LRUCache of size
1372    /// [`super::store::PEERS_PER_DOC_CACHE_SIZE`].
1373    fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1374        /// Helper to verify the store returns the expected peers for the namespace.
1375        #[track_caller]
1376        fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1377            assert_eq!(
1378                expected_peers,
1379                &store
1380                    .get_sync_peers(&namespace)
1381                    .unwrap()
1382                    .unwrap()
1383                    .collect::<Vec<_>>(),
1384                "sync peers differ"
1385            );
1386        }
1387
1388        let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1389        // expected peers: newest peers are to the front, oldest to the back
1390        let mut expected_peers = Vec::with_capacity(count);
1391        for i in 0..count as u8 {
1392            let peer = [i; 32];
1393            expected_peers.insert(0, peer);
1394            store.register_useful_peer(namespace, peer)?;
1395        }
1396        verify_peers(store, namespace, &expected_peers);
1397
1398        // one more peer should evict the last peer
1399        expected_peers.pop();
1400        let newer_peer = [count as u8; 32];
1401        expected_peers.insert(0, newer_peer);
1402        store.register_useful_peer(namespace, newer_peer)?;
1403        verify_peers(store, namespace, &expected_peers);
1404
1405        // move one existing peer up
1406        let refreshed_peer = expected_peers.remove(2);
1407        expected_peers.insert(0, refreshed_peer);
1408        store.register_useful_peer(namespace, refreshed_peer)?;
1409        verify_peers(store, namespace, &expected_peers);
1410        Ok(())
1411    }
1412
1413    #[tokio::test]
1414    async fn test_content_hashes_iterator_memory() -> Result<()> {
1415        let store = store::Store::memory();
1416        test_content_hashes_iterator(store).await
1417    }
1418
1419    #[tokio::test]
1420    #[cfg(feature = "fs-store")]
1421    async fn test_content_hashes_iterator_fs() -> Result<()> {
1422        let dbfile = tempfile::NamedTempFile::new()?;
1423        let store = store::fs::Store::persistent(dbfile.path())?;
1424        test_content_hashes_iterator(store).await
1425    }
1426
1427    async fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1428        let mut rng = rand::rng();
1429        let mut expected = HashSet::new();
1430        let n_replicas = 3;
1431        let n_entries = 4;
1432        for i in 0..n_replicas {
1433            let namespace = NamespaceSecret::new(&mut rng);
1434            let author = store.new_author(&mut rng)?;
1435            let mut replica = store.new_replica(namespace)?;
1436            for j in 0..n_entries {
1437                let key = format!("{j}");
1438                let data = format!("{i}:{j}");
1439                let hash = replica.hash_and_insert(key, &author, data).await?;
1440                expected.insert(hash);
1441            }
1442        }
1443        assert_eq!(expected.len(), n_replicas * n_entries);
1444        let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1445        assert_eq!(actual, expected);
1446        Ok(())
1447    }
1448
1449    #[test]
1450    fn test_multikey() {
1451        let mut rng = rand::rng();
1452
1453        let k = ["a", "c", "z"];
1454
1455        let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1456        n.sort_by_key(|n| n.id());
1457
1458        let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1459        a.sort_by_key(|a| a.id());
1460
1461        // Just key
1462        {
1463            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1464            let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1465            let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1466
1467            let range = Range::new(ri0.clone(), ri2.clone());
1468            assert!(range.contains(&ri0), "start");
1469            assert!(range.contains(&ri1), "inside");
1470            assert!(!range.contains(&ri2), "end");
1471
1472            assert!(ri0 < ri1);
1473            assert!(ri1 < ri2);
1474        }
1475
1476        // Just namespace
1477        {
1478            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1479            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1480            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1481
1482            let range = Range::new(ri0.clone(), ri2.clone());
1483            assert!(range.contains(&ri0), "start");
1484            assert!(range.contains(&ri1), "inside");
1485            assert!(!range.contains(&ri2), "end");
1486
1487            assert!(ri0 < ri1);
1488            assert!(ri1 < ri2);
1489        }
1490
1491        // Just author
1492        {
1493            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1494            let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1495            let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1496
1497            let range = Range::new(ri0.clone(), ri2.clone());
1498            assert!(range.contains(&ri0), "start");
1499            assert!(range.contains(&ri1), "inside");
1500            assert!(!range.contains(&ri2), "end");
1501
1502            assert!(ri0 < ri1);
1503            assert!(ri1 < ri2);
1504        }
1505
1506        // Just key and namespace
1507        {
1508            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1509            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1510            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1511
1512            let range = Range::new(ri0.clone(), ri2.clone());
1513            assert!(range.contains(&ri0), "start");
1514            assert!(range.contains(&ri1), "inside");
1515            assert!(!range.contains(&ri2), "end");
1516
1517            assert!(ri0 < ri1);
1518            assert!(ri1 < ri2);
1519        }
1520
1521        // Mixed
1522        {
1523            // Ord should prioritize namespace - author - key
1524
1525            let a0 = a[0].id();
1526            let a1 = a[1].id();
1527            let n0 = n[0].id();
1528            let n1 = n[1].id();
1529            let k0 = k[0];
1530            let k1 = k[1];
1531
1532            assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1533            assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1534            assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1535            assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1536        }
1537    }
1538
1539    #[tokio::test]
1540    async fn test_timestamps_memory() -> Result<()> {
1541        let store = store::Store::memory();
1542        test_timestamps(store).await?;
1543
1544        Ok(())
1545    }
1546
1547    #[tokio::test]
1548    #[cfg(feature = "fs-store")]
1549    async fn test_timestamps_fs() -> Result<()> {
1550        let dbfile = tempfile::NamedTempFile::new()?;
1551        let store = store::fs::Store::persistent(dbfile.path())?;
1552        test_timestamps(store).await?;
1553        Ok(())
1554    }
1555
1556    async fn test_timestamps(mut store: Store) -> Result<()> {
1557        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
1558        let namespace = NamespaceSecret::new(&mut rng);
1559        let _replica = store.new_replica(namespace.clone())?;
1560        let author = store.new_author(&mut rng)?;
1561        store.close_replica(namespace.id());
1562        let mut replica = store.open_replica(&namespace.id())?;
1563
1564        let key = b"hello";
1565        let value = b"world";
1566        let entry = {
1567            let timestamp = 2;
1568            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1569            let record = Record::from_data(value, timestamp);
1570            Entry::new(id, record).sign(&namespace, &author)
1571        };
1572
1573        replica
1574            .insert_entry(entry.clone(), InsertOrigin::Local)
1575            .await
1576            .unwrap();
1577        store.close_replica(namespace.id());
1578        let res = store
1579            .get_exact(namespace.id(), author.id(), key, false)?
1580            .unwrap();
1581        assert_eq!(res, entry);
1582
1583        let entry2 = {
1584            let timestamp = 1;
1585            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1586            let record = Record::from_data(value, timestamp);
1587            Entry::new(id, record).sign(&namespace, &author)
1588        };
1589
1590        let mut replica = store.open_replica(&namespace.id())?;
1591        let res = replica.insert_entry(entry2, InsertOrigin::Local).await;
1592        store.close_replica(namespace.id());
1593        assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1594        let res = store
1595            .get_exact(namespace.id(), author.id(), key, false)?
1596            .unwrap();
1597        assert_eq!(res, entry);
1598        store.flush()?;
1599        Ok(())
1600    }
1601
1602    #[tokio::test]
1603    async fn test_replica_sync_memory() -> Result<()> {
1604        let alice_store = store::Store::memory();
1605        let bob_store = store::Store::memory();
1606
1607        test_replica_sync(alice_store, bob_store).await?;
1608        Ok(())
1609    }
1610
1611    #[tokio::test]
1612    #[cfg(feature = "fs-store")]
1613    async fn test_replica_sync_fs() -> Result<()> {
1614        let alice_dbfile = tempfile::NamedTempFile::new()?;
1615        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1616        let bob_dbfile = tempfile::NamedTempFile::new()?;
1617        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1618        test_replica_sync(alice_store, bob_store).await?;
1619
1620        Ok(())
1621    }
1622
1623    async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1624        let alice_set = ["ape", "eel", "fox", "gnu"];
1625        let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1626
1627        let mut rng = rand::rng();
1628        let author = Author::new(&mut rng);
1629        let myspace = NamespaceSecret::new(&mut rng);
1630        let mut alice = alice_store.new_replica(myspace.clone())?;
1631        for el in &alice_set {
1632            alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1633        }
1634
1635        let mut bob = bob_store.new_replica(myspace.clone())?;
1636        for el in &bob_set {
1637            bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1638        }
1639
1640        let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1641
1642        assert_eq!(alice_out.num_sent, 2);
1643        assert_eq!(bob_out.num_recv, 2);
1644        assert_eq!(alice_out.num_recv, 6);
1645        assert_eq!(bob_out.num_sent, 6);
1646
1647        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1648        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1649        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1650        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1651        alice_store.flush()?;
1652        bob_store.flush()?;
1653        Ok(())
1654    }
1655
1656    #[tokio::test]
1657    async fn test_replica_timestamp_sync_memory() -> Result<()> {
1658        let alice_store = store::Store::memory();
1659        let bob_store = store::Store::memory();
1660
1661        test_replica_timestamp_sync(alice_store, bob_store).await?;
1662        Ok(())
1663    }
1664
1665    #[tokio::test]
1666    #[cfg(feature = "fs-store")]
1667    async fn test_replica_timestamp_sync_fs() -> Result<()> {
1668        let alice_dbfile = tempfile::NamedTempFile::new()?;
1669        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1670        let bob_dbfile = tempfile::NamedTempFile::new()?;
1671        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1672        test_replica_timestamp_sync(alice_store, bob_store).await?;
1673
1674        Ok(())
1675    }
1676
1677    async fn test_replica_timestamp_sync(
1678        mut alice_store: Store,
1679        mut bob_store: Store,
1680    ) -> Result<()> {
1681        let mut rng = rand::rng();
1682        let author = Author::new(&mut rng);
1683        let namespace = NamespaceSecret::new(&mut rng);
1684        let mut alice = alice_store.new_replica(namespace.clone())?;
1685        let mut bob = bob_store.new_replica(namespace.clone())?;
1686
1687        let key = b"key";
1688        let alice_value = b"alice";
1689        let bob_value = b"bob";
1690        let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?;
1691        // system time increased - sync should overwrite
1692        let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?;
1693        sync(&mut alice, &mut bob).await?;
1694        assert_eq!(
1695            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1696            Some(bob_hash)
1697        );
1698        assert_eq!(
1699            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1700            Some(bob_hash)
1701        );
1702
1703        let mut alice = alice_store.new_replica(namespace.clone())?;
1704        let mut bob = bob_store.new_replica(namespace.clone())?;
1705
1706        let alice_value_2 = b"alice2";
1707        // system time increased - sync should overwrite
1708        let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?;
1709        let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?;
1710        sync(&mut alice, &mut bob).await?;
1711        assert_eq!(
1712            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1713            Some(alice_hash_2)
1714        );
1715        assert_eq!(
1716            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1717            Some(alice_hash_2)
1718        );
1719        alice_store.flush()?;
1720        bob_store.flush()?;
1721        Ok(())
1722    }
1723
1724    #[tokio::test]
1725    async fn test_future_timestamp() -> Result<()> {
1726        let mut rng = rand::rng();
1727        let mut store = store::Store::memory();
1728        let author = Author::new(&mut rng);
1729        let namespace = NamespaceSecret::new(&mut rng);
1730
1731        let mut replica = store.new_replica(namespace.clone())?;
1732        let key = b"hi";
1733        let t = system_time_now();
1734        let record = Record::from_data(b"1", t);
1735        let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1736        replica
1737            .insert_entry(entry0.clone(), InsertOrigin::Local)
1738            .await?;
1739
1740        assert_eq!(
1741            get_entry(&mut store, namespace.id(), author.id(), key)?,
1742            entry0
1743        );
1744
1745        let mut replica = store.new_replica(namespace.clone())?;
1746        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1747        let record = Record::from_data(b"2", t);
1748        let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1749        replica
1750            .insert_entry(entry1.clone(), InsertOrigin::Local)
1751            .await?;
1752        assert_eq!(
1753            get_entry(&mut store, namespace.id(), author.id(), key)?,
1754            entry1
1755        );
1756
1757        let mut replica = store.new_replica(namespace.clone())?;
1758        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1759        let record = Record::from_data(b"2", t);
1760        let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1761        replica
1762            .insert_entry(entry2.clone(), InsertOrigin::Local)
1763            .await?;
1764        assert_eq!(
1765            get_entry(&mut store, namespace.id(), author.id(), key)?,
1766            entry2
1767        );
1768
1769        let mut replica = store.new_replica(namespace.clone())?;
1770        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1771        let record = Record::from_data(b"2", t);
1772        let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1773        let res = replica.insert_entry(entry3, InsertOrigin::Local).await;
1774        assert!(matches!(
1775            res,
1776            Err(InsertError::Validation(
1777                ValidationFailure::TooFarInTheFuture
1778            ))
1779        ));
1780        assert_eq!(
1781            get_entry(&mut store, namespace.id(), author.id(), key)?,
1782            entry2
1783        );
1784        store.flush()?;
1785        Ok(())
1786    }
1787
1788    /// Regression: `MAX_TIMESTAMP_FUTURE_SHIFT` must be on the same unit scale as
1789    /// `system_time_now()` (Unix epoch **microseconds**). A millisecond-scaled constant
1790    /// only allows ~0.6s of future slack and would reject this insert.
1791    #[tokio::test]
1792    async fn test_future_timestamp_accepts_one_second_skew() -> Result<()> {
1793        let mut rng = rand::rng();
1794        let mut store = store::Store::memory();
1795        let author = Author::new(&mut rng);
1796        let namespace = NamespaceSecret::new(&mut rng);
1797        let mut replica = store.new_replica(namespace.clone())?;
1798        let key = b"skew";
1799        let skew_micros: u64 = Duration::from_secs(1).as_micros() as u64;
1800        let now = system_time_now();
1801        let record = Record::from_data(b"ahead", now + skew_micros);
1802        let entry = SignedEntry::from_parts(&namespace, &author, key, record);
1803        replica
1804            .insert_entry(entry.clone(), InsertOrigin::Local)
1805            .await?;
1806        assert_eq!(
1807            get_entry(&mut store, namespace.id(), author.id(), key)?,
1808            entry
1809        );
1810        store.flush()?;
1811        Ok(())
1812    }
1813
1814    #[tokio::test]
1815    async fn test_insert_empty() -> Result<()> {
1816        let mut store = store::Store::memory();
1817        let mut rng = rand::rng();
1818        let alice = Author::new(&mut rng);
1819        let myspace = NamespaceSecret::new(&mut rng);
1820        let mut replica = store.new_replica(myspace.clone())?;
1821        let hash = Hash::new(b"");
1822        let res = replica.insert(b"foo", &alice, hash, 0).await;
1823        assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1824        store.flush()?;
1825        Ok(())
1826    }
1827
1828    #[tokio::test]
1829    async fn test_prefix_delete_memory() -> Result<()> {
1830        let store = store::Store::memory();
1831        test_prefix_delete(store).await?;
1832        Ok(())
1833    }
1834
1835    #[tokio::test]
1836    #[cfg(feature = "fs-store")]
1837    async fn test_prefix_delete_fs() -> Result<()> {
1838        let dbfile = tempfile::NamedTempFile::new()?;
1839        let store = store::fs::Store::persistent(dbfile.path())?;
1840        test_prefix_delete(store).await?;
1841        Ok(())
1842    }
1843
1844    async fn test_prefix_delete(mut store: Store) -> Result<()> {
1845        let mut rng = rand::rng();
1846        let alice = Author::new(&mut rng);
1847        let myspace = NamespaceSecret::new(&mut rng);
1848        let mut replica = store.new_replica(myspace.clone())?;
1849        let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?;
1850        let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?;
1851
1852        // sanity checks
1853        assert_eq!(
1854            get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1855            Some(hash1)
1856        );
1857        assert_eq!(
1858            get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1859            Some(hash2)
1860        );
1861
1862        // delete
1863        let mut replica = store.new_replica(myspace.clone())?;
1864        let deleted = replica.delete_prefix(b"foo", &alice).await?;
1865        assert_eq!(deleted, 2);
1866        assert_eq!(
1867            store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1868            None
1869        );
1870        assert_eq!(
1871            store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1872            None
1873        );
1874        assert_eq!(
1875            store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1876            None
1877        );
1878        store.flush()?;
1879        Ok(())
1880    }
1881
1882    #[tokio::test]
1883    async fn test_replica_sync_delete_memory() -> Result<()> {
1884        let alice_store = store::Store::memory();
1885        let bob_store = store::Store::memory();
1886
1887        test_replica_sync_delete(alice_store, bob_store).await
1888    }
1889
1890    #[tokio::test]
1891    #[cfg(feature = "fs-store")]
1892    async fn test_replica_sync_delete_fs() -> Result<()> {
1893        let alice_dbfile = tempfile::NamedTempFile::new()?;
1894        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1895        let bob_dbfile = tempfile::NamedTempFile::new()?;
1896        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1897        test_replica_sync_delete(alice_store, bob_store).await
1898    }
1899
1900    async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1901        let alice_set = ["foot"];
1902        let bob_set = ["fool", "foo", "fog"];
1903
1904        let mut rng = rand::rng();
1905        let author = Author::new(&mut rng);
1906        let myspace = NamespaceSecret::new(&mut rng);
1907        let mut alice = alice_store.new_replica(myspace.clone())?;
1908        for el in &alice_set {
1909            alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1910        }
1911
1912        let mut bob = bob_store.new_replica(myspace.clone())?;
1913        for el in &bob_set {
1914            bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1915        }
1916
1917        sync(&mut alice, &mut bob).await?;
1918
1919        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1920        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1921        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1922        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1923
1924        let mut alice = alice_store.new_replica(myspace.clone())?;
1925        let mut bob = bob_store.new_replica(myspace.clone())?;
1926        alice.delete_prefix("foo", &author).await?;
1927        bob.hash_and_insert("fooz", &author, "fooz".as_bytes())
1928            .await?;
1929        sync(&mut alice, &mut bob).await?;
1930        check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1931        check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1932        alice_store.flush()?;
1933        bob_store.flush()?;
1934        Ok(())
1935    }
1936
1937    #[tokio::test]
1938    async fn test_replica_remove_memory() -> Result<()> {
1939        let alice_store = store::Store::memory();
1940        test_replica_remove(alice_store).await
1941    }
1942
1943    #[tokio::test]
1944    #[cfg(feature = "fs-store")]
1945    async fn test_replica_remove_fs() -> Result<()> {
1946        let alice_dbfile = tempfile::NamedTempFile::new()?;
1947        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1948        test_replica_remove(alice_store).await
1949    }
1950
1951    async fn test_replica_remove(mut store: Store) -> Result<()> {
1952        let mut rng = rand::rng();
1953        let namespace = NamespaceSecret::new(&mut rng);
1954        let author = Author::new(&mut rng);
1955        let mut replica = store.new_replica(namespace.clone())?;
1956
1957        // insert entry
1958        let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?;
1959        let res = store
1960            .get_many(namespace.id(), Query::all())?
1961            .collect::<Vec<_>>();
1962        assert_eq!(res.len(), 1);
1963
1964        // remove replica
1965        let res = store.remove_replica(&namespace.id());
1966        // may not remove replica while still open;
1967        assert!(res.is_err());
1968        store.close_replica(namespace.id());
1969        store.remove_replica(&namespace.id())?;
1970        let res = store
1971            .get_many(namespace.id(), Query::all())?
1972            .collect::<Vec<_>>();
1973        assert_eq!(res.len(), 0);
1974
1975        // may not reopen removed replica
1976        let res = store.load_replica_info(&namespace.id());
1977        assert!(matches!(res, Err(OpenError::NotFound)));
1978
1979        // may recreate replica
1980        let mut replica = store.new_replica(namespace.clone())?;
1981        replica.insert(b"foo", &author, hash, 3).await?;
1982        let res = store
1983            .get_many(namespace.id(), Query::all())?
1984            .collect::<Vec<_>>();
1985        assert_eq!(res.len(), 1);
1986        store.flush()?;
1987        Ok(())
1988    }
1989
1990    #[tokio::test]
1991    async fn test_replica_delete_edge_cases_memory() -> Result<()> {
1992        let store = store::Store::memory();
1993        test_replica_delete_edge_cases(store).await
1994    }
1995
1996    #[tokio::test]
1997    #[cfg(feature = "fs-store")]
1998    async fn test_replica_delete_edge_cases_fs() -> Result<()> {
1999        let dbfile = tempfile::NamedTempFile::new()?;
2000        let store = store::fs::Store::persistent(dbfile.path())?;
2001        test_replica_delete_edge_cases(store).await
2002    }
2003
2004    async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
2005        let mut rng = rand::rng();
2006        let author = Author::new(&mut rng);
2007        let namespace = NamespaceSecret::new(&mut rng);
2008
2009        let edgecases = [0u8, 1u8, 255u8];
2010        let prefixes = [0u8, 255u8];
2011        let hash = Hash::new(b"foo");
2012        let len = 3;
2013        for prefix in prefixes {
2014            let mut expected = vec![];
2015            let mut replica = store.new_replica(namespace.clone())?;
2016            for suffix in edgecases {
2017                let key = [prefix, suffix].to_vec();
2018                expected.push(key.clone());
2019                replica.insert(&key, &author, hash, len).await?;
2020            }
2021            assert_keys(&mut store, namespace.id(), expected);
2022            let mut replica = store.new_replica(namespace.clone())?;
2023            replica.delete_prefix([prefix], &author).await?;
2024            assert_keys(&mut store, namespace.id(), vec![]);
2025        }
2026
2027        let mut replica = store.new_replica(namespace.clone())?;
2028        let key = vec![1u8, 0u8];
2029        replica.insert(key, &author, hash, len).await?;
2030        let key = vec![1u8, 1u8];
2031        replica.insert(key, &author, hash, len).await?;
2032        let key = vec![1u8, 2u8];
2033        replica.insert(key, &author, hash, len).await?;
2034        let prefix = vec![1u8, 1u8];
2035        replica.delete_prefix(prefix, &author).await?;
2036        assert_keys(
2037            &mut store,
2038            namespace.id(),
2039            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2040        );
2041
2042        let mut replica = store.new_replica(namespace.clone())?;
2043        let key = vec![0u8, 255u8];
2044        replica.insert(key, &author, hash, len).await?;
2045        let key = vec![0u8, 0u8];
2046        replica.insert(key, &author, hash, len).await?;
2047        let prefix = vec![0u8];
2048        replica.delete_prefix(prefix, &author).await?;
2049        assert_keys(
2050            &mut store,
2051            namespace.id(),
2052            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2053        );
2054        store.flush()?;
2055        Ok(())
2056    }
2057
2058    #[tokio::test]
2059    async fn test_latest_iter_memory() -> Result<()> {
2060        let store = store::Store::memory();
2061        test_latest_iter(store).await
2062    }
2063
2064    #[tokio::test]
2065    #[cfg(feature = "fs-store")]
2066    async fn test_latest_iter_fs() -> Result<()> {
2067        let dbfile = tempfile::NamedTempFile::new()?;
2068        let store = store::fs::Store::persistent(dbfile.path())?;
2069        test_latest_iter(store).await
2070    }
2071
2072    async fn test_latest_iter(mut store: Store) -> Result<()> {
2073        let mut rng = rand::rng();
2074        let author0 = Author::new(&mut rng);
2075        let author1 = Author::new(&mut rng);
2076        let namespace = NamespaceSecret::new(&mut rng);
2077        let mut replica = store.new_replica(namespace.clone())?;
2078
2079        replica.hash_and_insert(b"a0.1", &author0, b"hi").await?;
2080        let latest = store
2081            .get_latest_for_each_author(namespace.id())?
2082            .collect::<Result<Vec<_>>>()?;
2083        assert_eq!(latest.len(), 1);
2084        assert_eq!(latest[0].2, b"a0.1".to_vec());
2085
2086        let mut replica = store.new_replica(namespace.clone())?;
2087        replica.hash_and_insert(b"a1.1", &author1, b"hi").await?;
2088        replica.hash_and_insert(b"a0.2", &author0, b"hi").await?;
2089        let latest = store
2090            .get_latest_for_each_author(namespace.id())?
2091            .collect::<Result<Vec<_>>>()?;
2092        let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2093        latest_keys.sort();
2094        assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2095        store.flush()?;
2096        Ok(())
2097    }
2098
2099    #[tokio::test]
2100    async fn test_replica_byte_keys_memory() -> Result<()> {
2101        let store = store::Store::memory();
2102
2103        test_replica_byte_keys(store).await?;
2104        Ok(())
2105    }
2106
2107    #[tokio::test]
2108    #[cfg(feature = "fs-store")]
2109    async fn test_replica_byte_keys_fs() -> Result<()> {
2110        let dbfile = tempfile::NamedTempFile::new()?;
2111        let store = store::fs::Store::persistent(dbfile.path())?;
2112        test_replica_byte_keys(store).await?;
2113
2114        Ok(())
2115    }
2116
2117    async fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2118        let mut rng = rand::rng();
2119        let author = Author::new(&mut rng);
2120        let namespace = NamespaceSecret::new(&mut rng);
2121
2122        let hash = Hash::new(b"foo");
2123        let len = 3;
2124
2125        let key = vec![1u8, 0u8];
2126        let mut replica = store.new_replica(namespace.clone())?;
2127        replica.insert(key, &author, hash, len).await?;
2128        assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2129        let key = vec![1u8, 2u8];
2130        let mut replica = store.new_replica(namespace.clone())?;
2131        replica.insert(key, &author, hash, len).await?;
2132        assert_keys(
2133            &mut store,
2134            namespace.id(),
2135            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2136        );
2137
2138        let key = vec![0u8, 255u8];
2139        let mut replica = store.new_replica(namespace.clone())?;
2140        replica.insert(key, &author, hash, len).await?;
2141        assert_keys(
2142            &mut store,
2143            namespace.id(),
2144            vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2145        );
2146        store.flush()?;
2147        Ok(())
2148    }
2149
2150    #[tokio::test]
2151    async fn test_replica_capability_memory() -> Result<()> {
2152        let store = store::Store::memory();
2153        test_replica_capability(store).await
2154    }
2155
2156    #[tokio::test]
2157    #[cfg(feature = "fs-store")]
2158    async fn test_replica_capability_fs() -> Result<()> {
2159        let dbfile = tempfile::NamedTempFile::new()?;
2160        let store = store::fs::Store::persistent(dbfile.path())?;
2161        test_replica_capability(store).await
2162    }
2163
2164    #[allow(clippy::redundant_pattern_matching)]
2165    async fn test_replica_capability(mut store: Store) -> Result<()> {
2166        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2167        let author = store.new_author(&mut rng)?;
2168        let namespace = NamespaceSecret::new(&mut rng);
2169
2170        // import read capability - insert must fail
2171        let capability = Capability::Read(namespace.id());
2172        store.import_namespace(capability)?;
2173        let mut replica = store.open_replica(&namespace.id())?;
2174        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2175        assert!(matches!(res, Err(InsertError::ReadOnly)));
2176
2177        // import write capability - insert must succeed
2178        let capability = Capability::Write(namespace.clone());
2179        store.import_namespace(capability)?;
2180        let mut replica = store.open_replica(&namespace.id())?;
2181        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2182        assert!(matches!(res, Ok(_)));
2183        store.close_replica(namespace.id());
2184        let mut replica = store.open_replica(&namespace.id())?;
2185        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2186        assert!(res.is_ok());
2187
2188        // import read capability again - insert must still succeed
2189        let capability = Capability::Read(namespace.id());
2190        store.import_namespace(capability)?;
2191        store.close_replica(namespace.id());
2192        let mut replica = store.open_replica(&namespace.id())?;
2193        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2194        assert!(res.is_ok());
2195        store.flush()?;
2196        Ok(())
2197    }
2198
2199    #[tokio::test]
2200    async fn test_actor_capability_memory() -> Result<()> {
2201        let store = store::Store::memory();
2202        test_actor_capability(store).await
2203    }
2204
2205    #[tokio::test]
2206    #[cfg(feature = "fs-store")]
2207    async fn test_actor_capability_fs() -> Result<()> {
2208        let dbfile = tempfile::NamedTempFile::new()?;
2209        let store = store::fs::Store::persistent(dbfile.path())?;
2210        test_actor_capability(store).await
2211    }
2212
2213    async fn test_actor_capability(store: Store) -> Result<()> {
2214        // test with actor
2215        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2216        let author = Author::new(&mut rng);
2217        let handle = SyncHandle::spawn(store, None, "test".into());
2218        let author = handle.import_author(author).await?;
2219        let namespace = NamespaceSecret::new(&mut rng);
2220        let id = namespace.id();
2221
2222        // import read capability - insert must fail
2223        let capability = Capability::Read(namespace.id());
2224        handle.import_namespace(capability).await?;
2225        handle.open(namespace.id(), Default::default()).await?;
2226        let res = handle
2227            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2228            .await;
2229        assert!(res.is_err());
2230
2231        // import write capability - insert must succeed
2232        let capability = Capability::Write(namespace.clone());
2233        handle.import_namespace(capability).await?;
2234        let res = handle
2235            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2236            .await;
2237        assert!(res.is_ok());
2238
2239        // close and reopen - must still succeed
2240        handle.close(namespace.id()).await?;
2241        let res = handle
2242            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2243            .await;
2244        assert!(res.is_err());
2245        handle.open(namespace.id(), Default::default()).await?;
2246        let res = handle
2247            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2248            .await;
2249        assert!(res.is_ok());
2250        Ok(())
2251    }
2252
2253    fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2254        let mut res = vec![];
2255        while let Ok(ev) = events.try_recv() {
2256            res.push(ev);
2257        }
2258        res
2259    }
2260
2261    /// This tests that no events are emitted for entries received during sync which are obsolete
2262    /// (too old) by the time they are actually inserted in the store.
2263    #[tokio::test]
2264    async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2265        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2266        let mut store1 = store::Store::memory();
2267        let mut store2 = store::Store::memory();
2268        let peer1 = [1u8; 32];
2269        let peer2 = [2u8; 32];
2270        let mut state1 = SyncOutcome::default();
2271        let mut state2 = SyncOutcome::default();
2272
2273        let author = Author::new(&mut rng);
2274        let namespace = NamespaceSecret::new(&mut rng);
2275        let mut replica1 = store1.new_replica(namespace.clone())?;
2276        let mut replica2 = store2.new_replica(namespace.clone())?;
2277
2278        let (events1_sender, events1) = async_channel::bounded(32);
2279        let (events2_sender, events2) = async_channel::bounded(32);
2280
2281        replica1.info.subscribe(events1_sender);
2282        replica2.info.subscribe(events2_sender);
2283
2284        replica1.hash_and_insert(b"foo", &author, b"init").await?;
2285
2286        let from1 = replica1.sync_initial_message()?;
2287        let from2 = replica2
2288            .sync_process_message(from1, peer1, &mut state2)
2289            .await
2290            .unwrap()
2291            .unwrap();
2292        let from1 = replica1
2293            .sync_process_message(from2, peer2, &mut state1)
2294            .await
2295            .unwrap()
2296            .unwrap();
2297        // now we will receive the entry from rpelica1. we will insert a newer entry now, while the
2298        // sync is already running. this means the entry from replica1 will be rejected. we make
2299        // sure that no InsertRemote event is emitted for this entry.
2300        replica2.hash_and_insert(b"foo", &author, b"update").await?;
2301        let from2 = replica2
2302            .sync_process_message(from1, peer1, &mut state2)
2303            .await
2304            .unwrap();
2305        assert!(from2.is_none());
2306        let events1 = drain(events1);
2307        let events2 = drain(events2);
2308        assert_eq!(events1.len(), 1);
2309        assert_eq!(events2.len(), 1);
2310        assert!(matches!(events1[0], Event::LocalInsert { .. }));
2311        assert!(matches!(events2[0], Event::LocalInsert { .. }));
2312        assert_eq!(state1.num_sent, 1);
2313        assert_eq!(state1.num_recv, 0);
2314        assert_eq!(state2.num_sent, 0);
2315        assert_eq!(state2.num_recv, 1);
2316        store1.flush()?;
2317        store2.flush()?;
2318        Ok(())
2319    }
2320
2321    #[tokio::test]
2322    async fn test_replica_queries_mem() -> Result<()> {
2323        let store = store::Store::memory();
2324
2325        test_replica_queries(store).await?;
2326        Ok(())
2327    }
2328
2329    #[tokio::test]
2330    #[cfg(feature = "fs-store")]
2331    async fn test_replica_queries_fs() -> Result<()> {
2332        let dbfile = tempfile::NamedTempFile::new()?;
2333        let store = store::fs::Store::persistent(dbfile.path())?;
2334        test_replica_queries(store).await?;
2335
2336        Ok(())
2337    }
2338
2339    async fn test_replica_queries(mut store: Store) -> Result<()> {
2340        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2341        let namespace = NamespaceSecret::new(&mut rng);
2342        let namespace_id = namespace.id();
2343
2344        let a1 = store.new_author(&mut rng)?;
2345        let a2 = store.new_author(&mut rng)?;
2346        let a3 = store.new_author(&mut rng)?;
2347        println!(
2348            "a1 {} a2 {} a3 {}",
2349            a1.id().fmt_short(),
2350            a2.id().fmt_short(),
2351            a3.id().fmt_short()
2352        );
2353
2354        let mut replica = store.new_replica(namespace.clone())?;
2355        replica.hash_and_insert("hi/world", &a2, "a2").await?;
2356        replica.hash_and_insert("hi/world", &a1, "a1").await?;
2357        replica.hash_and_insert("hi/moon", &a2, "a1").await?;
2358        replica.hash_and_insert("hi", &a3, "a3").await?;
2359
2360        struct QueryTester<'a> {
2361            store: &'a mut Store,
2362            namespace: NamespaceId,
2363        }
2364        impl QueryTester<'_> {
2365            fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2366                let query = query.into();
2367                let actual = self
2368                    .store
2369                    .get_many(self.namespace, query.clone())
2370                    .unwrap()
2371                    .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2372                    .collect::<Result<Vec<_>>>()
2373                    .unwrap();
2374                let expected = expected
2375                    .into_iter()
2376                    .map(|(key, author)| (key.to_string(), author.id()))
2377                    .collect::<Vec<_>>();
2378                assert_eq!(actual, expected, "query: {query:#?}")
2379            }
2380        }
2381
2382        let mut qt = QueryTester {
2383            store: &mut store,
2384            namespace: namespace_id,
2385        };
2386
2387        qt.assert(
2388            Query::all(),
2389            vec![
2390                ("hi/world", &a1),
2391                ("hi/moon", &a2),
2392                ("hi/world", &a2),
2393                ("hi", &a3),
2394            ],
2395        );
2396
2397        qt.assert(
2398            Query::single_latest_per_key(),
2399            vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2400        );
2401
2402        qt.assert(
2403            Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2404            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2405        );
2406
2407        qt.assert(
2408            Query::single_latest_per_key().key_prefix("hi/"),
2409            vec![("hi/moon", &a2), ("hi/world", &a1)],
2410        );
2411
2412        qt.assert(
2413            Query::single_latest_per_key()
2414                .key_prefix("hi/")
2415                .sort_direction(SortDirection::Desc),
2416            vec![("hi/world", &a1), ("hi/moon", &a2)],
2417        );
2418
2419        qt.assert(
2420            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2421            vec![
2422                ("hi", &a3),
2423                ("hi/moon", &a2),
2424                ("hi/world", &a1),
2425                ("hi/world", &a2),
2426            ],
2427        );
2428
2429        qt.assert(
2430            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2431            vec![
2432                ("hi/world", &a2),
2433                ("hi/world", &a1),
2434                ("hi/moon", &a2),
2435                ("hi", &a3),
2436            ],
2437        );
2438
2439        qt.assert(
2440            Query::all().key_prefix("hi/"),
2441            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2442        );
2443
2444        qt.assert(
2445            Query::all().key_prefix("hi/").offset(1).limit(1),
2446            vec![("hi/moon", &a2)],
2447        );
2448
2449        qt.assert(
2450            Query::all()
2451                .key_prefix("hi/")
2452                .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2453            vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2454        );
2455
2456        qt.assert(
2457            Query::all()
2458                .key_prefix("hi/")
2459                .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2460                .offset(1)
2461                .limit(1),
2462            vec![("hi/world", &a1)],
2463        );
2464
2465        qt.assert(
2466            Query::all()
2467                .key_prefix("hi/")
2468                .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2469            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2470        );
2471
2472        qt.assert(
2473            Query::all()
2474                .key_prefix("hi/")
2475                .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2476            vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2477        );
2478
2479        qt.assert(
2480            Query::all()
2481                .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2482                .limit(2)
2483                .offset(1),
2484            vec![("hi/moon", &a2), ("hi/world", &a1)],
2485        );
2486
2487        let mut replica = store.new_replica(namespace)?;
2488        replica.delete_prefix("hi/world", &a2).await?;
2489        let mut qt = QueryTester {
2490            store: &mut store,
2491            namespace: namespace_id,
2492        };
2493
2494        qt.assert(
2495            Query::all(),
2496            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2497        );
2498
2499        qt.assert(
2500            Query::all().include_empty(),
2501            vec![
2502                ("hi/world", &a1),
2503                ("hi/moon", &a2),
2504                ("hi/world", &a2),
2505                ("hi", &a3),
2506            ],
2507        );
2508        store.flush()?;
2509        Ok(())
2510    }
2511
2512    #[test]
2513    fn test_dl_policies_mem() -> Result<()> {
2514        let mut store = store::Store::memory();
2515        test_dl_policies(&mut store)
2516    }
2517
2518    #[test]
2519    #[cfg(feature = "fs-store")]
2520    fn test_dl_policies_fs() -> Result<()> {
2521        let dbfile = tempfile::NamedTempFile::new()?;
2522        let mut store = store::fs::Store::persistent(dbfile.path())?;
2523        test_dl_policies(&mut store)
2524    }
2525
2526    fn test_dl_policies(store: &mut Store) -> Result<()> {
2527        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2528        let namespace = NamespaceSecret::new(&mut rng);
2529        let id = namespace.id();
2530
2531        let filter = store::FilterKind::Exact("foo".into());
2532        let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2533        store
2534            .set_download_policy(&id, policy.clone())
2535            .expect_err("document dos not exist");
2536
2537        // now create the document
2538        store.new_replica(namespace)?;
2539
2540        store.set_download_policy(&id, policy.clone())?;
2541        let retrieved_policy = store.get_download_policy(&id)?;
2542        assert_eq!(retrieved_policy, policy);
2543        store.flush()?;
2544        Ok(())
2545    }
2546
2547    fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2548        expected.sort();
2549        assert_eq!(expected, get_keys_sorted(store, namespace));
2550    }
2551
2552    fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2553        let mut res = store
2554            .get_many(namespace, Query::all())
2555            .unwrap()
2556            .map(|e| e.map(|e| e.key().to_vec()))
2557            .collect::<Result<Vec<_>>>()
2558            .unwrap();
2559        res.sort();
2560        res
2561    }
2562
2563    fn get_entry(
2564        store: &mut Store,
2565        namespace: NamespaceId,
2566        author: AuthorId,
2567        key: &[u8],
2568    ) -> anyhow::Result<SignedEntry> {
2569        let entry = store
2570            .get_exact(namespace, author, key, true)?
2571            .ok_or_else(|| anyhow::anyhow!("not found"))?;
2572        Ok(entry)
2573    }
2574
2575    fn get_content_hash(
2576        store: &mut Store,
2577        namespace: NamespaceId,
2578        author: AuthorId,
2579        key: &[u8],
2580    ) -> anyhow::Result<Option<Hash>> {
2581        let hash = store
2582            .get_exact(namespace, author, key, false)?
2583            .map(|e| e.content_hash());
2584        Ok(hash)
2585    }
2586
2587    async fn sync<'a>(
2588        alice: &'a mut Replica<'a>,
2589        bob: &'a mut Replica<'a>,
2590    ) -> Result<(SyncOutcome, SyncOutcome)> {
2591        let alice_peer_id = [1u8; 32];
2592        let bob_peer_id = [2u8; 32];
2593        let mut alice_state = SyncOutcome::default();
2594        let mut bob_state = SyncOutcome::default();
2595        // Sync alice - bob
2596        let mut next_to_bob = Some(alice.sync_initial_message()?);
2597        let mut rounds = 0;
2598        while let Some(msg) = next_to_bob.take() {
2599            assert!(rounds < 100, "too many rounds");
2600            rounds += 1;
2601            println!("round {rounds}");
2602            if let Some(msg) = bob
2603                .sync_process_message(msg, alice_peer_id, &mut bob_state)
2604                .await?
2605            {
2606                next_to_bob = alice
2607                    .sync_process_message(msg, bob_peer_id, &mut alice_state)
2608                    .await?
2609            }
2610        }
2611        assert_eq!(alice_state.num_sent, bob_state.num_recv);
2612        assert_eq!(alice_state.num_recv, bob_state.num_sent);
2613        Ok((alice_state, bob_state))
2614    }
2615
2616    fn check_entries(
2617        store: &mut Store,
2618        namespace: &NamespaceId,
2619        author: &Author,
2620        set: &[&str],
2621    ) -> Result<()> {
2622        for el in set {
2623            store.get_exact(*namespace, author.id(), el, false)?;
2624        }
2625        Ok(())
2626    }
2627}