iroh_docs/
sync.rs

1//! API for iroh-docs replicas
2
3// Names and concepts are roughly based on Willows design at the moment:
4//
5// https://hackmd.io/DTtck8QOQm6tZaQBBtTf7w
6//
7// This is going to change!
8
9use std::{
10    cmp::Ordering,
11    fmt::Debug,
12    ops::{Deref, DerefMut},
13    sync::Arc,
14    time::Duration,
15};
16
17use bytes::{Bytes, BytesMut};
18use ed25519_dalek::{Signature, SignatureError};
19use iroh_blobs::Hash;
20use n0_future::{time::SystemTime, IterExt};
21use serde::{Deserialize, Serialize};
22
23pub use crate::heads::AuthorHeads;
24use crate::{
25    keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
26    ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
27    store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
28};
29
30/// Protocol message for the set reconciliation protocol.
31///
32/// Can be serialized to bytes with [serde] to transfer between peers.
33pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
34
35/// Byte representation of a `PeerId` from `iroh-net`.
36// TODO: PeerId is in iroh-net which iroh-docs doesn't depend on. Add iroh-base crate with `PeerId`.
37pub type PeerIdBytes = [u8; 32];
38
39/// Max time in the future from our wall clock time that we accept entries for.
40/// Value is 10 minutes.
41pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
42
43/// Callback that may be set on a replica to determine the availability status for a content hash.
44pub type ContentStatusCallback =
45    Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
46
47/// Event emitted by sync when entries are added.
48#[derive(Debug, Clone)]
49pub enum Event {
50    /// A local entry has been added.
51    LocalInsert {
52        /// Document in which the entry was inserted.
53        namespace: NamespaceId,
54        /// Inserted entry.
55        entry: SignedEntry,
56    },
57    /// A remote entry has been added.
58    RemoteInsert {
59        /// Document in which the entry was inserted.
60        namespace: NamespaceId,
61        /// Inserted entry.
62        entry: SignedEntry,
63        /// Peer that provided the inserted entry.
64        from: PeerIdBytes,
65        /// Whether download policies require the content to be downloaded.
66        should_download: bool,
67        /// [`ContentStatus`] for this entry in the remote's replica.
68        remote_content_status: ContentStatus,
69    },
70}
71
72/// Whether an entry was inserted locally or by a remote peer.
73#[derive(Debug, Clone)]
74pub enum InsertOrigin {
75    /// The entry was inserted locally.
76    Local,
77    /// The entry was received from the remote node identified by [`PeerIdBytes`].
78    Sync {
79        /// The peer from which we received this entry.
80        from: PeerIdBytes,
81        /// Whether the peer claims to have the content blob for this entry.
82        remote_content_status: ContentStatus,
83    },
84}
85
86/// Whether the content status is available on a node.
87#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
88pub enum ContentStatus {
89    /// The content is completely available.
90    Complete,
91    /// The content is partially available.
92    Incomplete,
93    /// The content is missing.
94    Missing,
95}
96
97/// Outcome of a sync operation.
98#[derive(Debug, Clone, Default)]
99pub struct SyncOutcome {
100    /// Timestamp of the latest entry for each author in the set we received.
101    pub heads_received: AuthorHeads,
102    /// Number of entries we received.
103    pub num_recv: usize,
104    /// Number of entries we sent.
105    pub num_sent: usize,
106}
107
108fn get_as_ptr<T>(value: &T) -> Option<usize> {
109    use std::mem;
110    if mem::size_of::<T>() == std::mem::size_of::<usize>()
111        && mem::align_of::<T>() == mem::align_of::<usize>()
112    {
113        // Safe only if size and alignment requirements are met
114        unsafe { Some(mem::transmute_copy(value)) }
115    } else {
116        None
117    }
118}
119
120fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
121    get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
122}
123
124#[derive(Debug, Default)]
125struct Subscribers(Vec<async_channel::Sender<Event>>);
126impl Subscribers {
127    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
128        self.0.push(sender)
129    }
130    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
131        self.0.retain(|s| !same_channel(s, sender));
132    }
133    pub async fn send(&mut self, event: Event) {
134        self.0 = std::mem::take(&mut self.0)
135            .into_iter()
136            .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx))
137            .join_all()
138            .await
139            .into_iter()
140            .flatten()
141            .collect();
142    }
143    pub fn len(&self) -> usize {
144        self.0.len()
145    }
146    pub async fn send_with(&mut self, f: impl FnOnce() -> Event) {
147        if !self.0.is_empty() {
148            self.send(f()).await
149        }
150    }
151}
152
153/// Kind of capability of the namespace.
154#[derive(
155    Debug,
156    Clone,
157    Copy,
158    Serialize,
159    Deserialize,
160    num_enum::IntoPrimitive,
161    num_enum::TryFromPrimitive,
162    strum::Display,
163)]
164#[repr(u8)]
165#[strum(serialize_all = "snake_case")]
166pub enum CapabilityKind {
167    /// A writable replica.
168    Write = 1,
169    /// A readable replica.
170    Read = 2,
171}
172
173/// The capability of the namespace.
174#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
175pub enum Capability {
176    /// Write access to the namespace.
177    Write(NamespaceSecret),
178    /// Read only access to the namespace.
179    Read(NamespaceId),
180}
181
182impl Capability {
183    /// Get the [`NamespaceId`] for this [`Capability`].
184    pub fn id(&self) -> NamespaceId {
185        match self {
186            Capability::Write(secret) => secret.id(),
187            Capability::Read(id) => *id,
188        }
189    }
190
191    /// Get the [`NamespaceSecret`] of this [`Capability`].
192    /// Will fail if the [`Capability`] is read only.
193    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
194        match self {
195            Capability::Write(secret) => Ok(secret),
196            Capability::Read(_) => Err(ReadOnly),
197        }
198    }
199
200    /// Get the kind of capability.
201    pub fn kind(&self) -> CapabilityKind {
202        match self {
203            Capability::Write(_) => CapabilityKind::Write,
204            Capability::Read(_) => CapabilityKind::Read,
205        }
206    }
207
208    /// Get the raw representation of this namespace capability.
209    pub fn raw(&self) -> (u8, [u8; 32]) {
210        let capability_repr: u8 = self.kind().into();
211        let bytes = match self {
212            Capability::Write(secret) => secret.to_bytes(),
213            Capability::Read(id) => id.to_bytes(),
214        };
215        (capability_repr, bytes)
216    }
217
218    /// Create a [`Capability`] from its raw representation.
219    pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
220        let kind: CapabilityKind = kind.try_into()?;
221        let capability = match kind {
222            CapabilityKind::Write => {
223                let secret = NamespaceSecret::from_bytes(bytes);
224                Capability::Write(secret)
225            }
226            CapabilityKind::Read => {
227                let id = NamespaceId::from(bytes);
228                Capability::Read(id)
229            }
230        };
231        Ok(capability)
232    }
233
234    /// Merge this capability with another capability.
235    ///
236    /// Will return an error if `other` is not a capability for the same namespace.
237    ///
238    /// Returns `true` if the capability was changed, `false` otherwise.
239    pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
240        if other.id() != self.id() {
241            return Err(CapabilityError::NamespaceMismatch);
242        }
243
244        // the only capability upgrade is from read-only (self) to writable (other)
245        if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
246            let _ = std::mem::replace(self, other);
247            Ok(true)
248        } else {
249            Ok(false)
250        }
251    }
252}
253
254/// Errors for capability operations
255#[derive(Debug, thiserror::Error)]
256pub enum CapabilityError {
257    /// Namespaces are not the same
258    #[error("Namespaces are not the same")]
259    NamespaceMismatch,
260}
261
262/// In memory information about an open replica.
263#[derive(derive_more::Debug)]
264pub struct ReplicaInfo {
265    pub(crate) capability: Capability,
266    subscribers: Subscribers,
267    #[debug("ContentStatusCallback")]
268    content_status_cb: Option<ContentStatusCallback>,
269    closed: bool,
270}
271
272impl ReplicaInfo {
273    /// Create a new replica.
274    pub fn new(capability: Capability) -> Self {
275        Self {
276            capability,
277            subscribers: Default::default(),
278            // on_insert_sender: RwLock::new(None),
279            content_status_cb: None,
280            closed: false,
281        }
282    }
283
284    /// Subscribe to insert events.
285    ///
286    /// When subscribing to a replica, you must ensure that the corresponding [`async_channel::Receiver`] is
287    /// received from in a loop. If not receiving, local and remote inserts will hang waiting for
288    /// the receiver to be received from.
289    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
290        self.subscribers.subscribe(sender)
291    }
292
293    /// Explicitly unsubscribe a sender.
294    ///
295    /// Simply dropping the receiver is fine too. If you cloned a single sender to subscribe to
296    /// multiple replicas, you can use this method to explicitly unsubscribe the sender from
297    /// this replica without having to drop the receiver.
298    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
299        self.subscribers.unsubscribe(sender)
300    }
301
302    /// Get the number of current event subscribers.
303    pub fn subscribers_count(&self) -> usize {
304        self.subscribers.len()
305    }
306
307    /// Set the content status callback.
308    ///
309    /// Only one callback can be active at a time. If a previous callback was registered, this
310    /// will return `false`.
311    pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
312        if self.content_status_cb.is_some() {
313            false
314        } else {
315            self.content_status_cb = Some(cb);
316            true
317        }
318    }
319
320    fn ensure_open(&self) -> Result<(), InsertError> {
321        if self.closed() {
322            Err(InsertError::Closed)
323        } else {
324            Ok(())
325        }
326    }
327
328    /// Returns true if the replica is closed.
329    ///
330    /// If a replica is closed, no further operations can be performed. A replica cannot be closed
331    /// manually, it must be closed via [`store::Store::close_replica`] or
332    /// [`store::Store::remove_replica`]
333    pub fn closed(&self) -> bool {
334        self.closed
335    }
336
337    /// Merge a capability.
338    ///
339    /// The capability must refer to the the same namespace, otherwise an error will be returned.
340    ///
341    /// This will upgrade the replica's capability when passing a `Capability::Write`.
342    /// It is a no-op if `capability` is a Capability::Read`.
343    pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
344        self.capability.merge(capability)
345    }
346}
347
348/// Local representation of a mutable, synchronizable key-value store.
349#[derive(derive_more::Debug)]
350pub struct Replica<'a, I = Box<ReplicaInfo>> {
351    pub(crate) store: StoreInstance<'a>,
352    pub(crate) info: I,
353}
354
355impl<'a, I> Replica<'a, I>
356where
357    I: Deref<Target = ReplicaInfo> + DerefMut,
358{
359    /// Create a new replica.
360    pub fn new(store: StoreInstance<'a>, info: I) -> Self {
361        Replica { info, store }
362    }
363
364    /// Insert a new record at the given key.
365    ///
366    /// The entry will by signed by the provided `author`.
367    /// The `len` must be the byte length of the data identified by `hash`.
368    ///
369    /// Returns the number of entries removed as a consequence of this insertion,
370    /// or an error either if the entry failed to validate or if a store operation failed.
371    pub async fn insert(
372        &mut self,
373        key: impl AsRef<[u8]>,
374        author: &Author,
375        hash: Hash,
376        len: u64,
377    ) -> Result<usize, InsertError> {
378        if len == 0 || hash == Hash::EMPTY {
379            return Err(InsertError::EntryIsEmpty);
380        }
381        self.info.ensure_open()?;
382        let id = RecordIdentifier::new(self.id(), author.id(), key);
383        let record = Record::new_current(hash, len);
384        let entry = Entry::new(id, record);
385        let secret = self.secret_key()?;
386        let signed_entry = entry.sign(secret, author);
387        self.insert_entry(signed_entry, InsertOrigin::Local).await
388    }
389
390    /// Delete entries that match the given `author` and key `prefix`.
391    ///
392    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
393    /// entries whose key starts with or is equal to the given `prefix`.
394    ///
395    /// Returns the number of entries deleted.
396    pub async fn delete_prefix(
397        &mut self,
398        prefix: impl AsRef<[u8]>,
399        author: &Author,
400    ) -> Result<usize, InsertError> {
401        self.info.ensure_open()?;
402        let id = RecordIdentifier::new(self.id(), author.id(), prefix);
403        let entry = Entry::new_empty(id);
404        let signed_entry = entry.sign(self.secret_key()?, author);
405        self.insert_entry(signed_entry, InsertOrigin::Local).await
406    }
407
408    /// Insert an entry into this replica which was received from a remote peer.
409    ///
410    /// This will verify both the namespace and author signatures of the entry, emit an `on_insert`
411    /// event, and insert the entry into the replica store.
412    ///
413    /// Returns the number of entries removed as a consequence of this insertion,
414    /// or an error if the entry failed to validate or if a store operation failed.
415    pub async fn insert_remote_entry(
416        &mut self,
417        entry: SignedEntry,
418        received_from: PeerIdBytes,
419        content_status: ContentStatus,
420    ) -> Result<usize, InsertError> {
421        self.info.ensure_open()?;
422        entry.validate_empty()?;
423        let origin = InsertOrigin::Sync {
424            from: received_from,
425            remote_content_status: content_status,
426        };
427        self.insert_entry(entry, origin).await
428    }
429
430    /// Insert a signed entry into the database.
431    ///
432    /// Returns the number of entries removed as a consequence of this insertion.
433    async fn insert_entry(
434        &mut self,
435        entry: SignedEntry,
436        origin: InsertOrigin,
437    ) -> Result<usize, InsertError> {
438        let namespace = self.id();
439
440        let store = &self.store;
441        validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
442
443        let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
444        tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
445
446        let removed_count = match outcome {
447            InsertOutcome::Inserted { removed } => removed,
448            InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
449        };
450
451        let insert_event = match origin {
452            InsertOrigin::Local => Event::LocalInsert { namespace, entry },
453            InsertOrigin::Sync {
454                from,
455                remote_content_status,
456            } => {
457                let download_policy = self
458                    .store
459                    .get_download_policy(&self.id())
460                    .unwrap_or_default();
461                let should_download = download_policy.matches(entry.entry());
462                Event::RemoteInsert {
463                    namespace,
464                    entry,
465                    from,
466                    should_download,
467                    remote_content_status,
468                }
469            }
470        };
471
472        self.info.subscribers.send(insert_event).await;
473
474        Ok(removed_count)
475    }
476
477    /// Hashes the given data and inserts it.
478    ///
479    /// This does not store the content, just the record of it.
480    /// Returns the calculated hash.
481    pub async fn hash_and_insert(
482        &mut self,
483        key: impl AsRef<[u8]>,
484        author: &Author,
485        data: impl AsRef<[u8]>,
486    ) -> Result<Hash, InsertError> {
487        self.info.ensure_open()?;
488        let len = data.as_ref().len() as u64;
489        let hash = Hash::new(data);
490        self.insert(key, author, hash, len).await?;
491        Ok(hash)
492    }
493
494    /// Get the identifier for an entry in this replica.
495    pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
496        RecordIdentifier::new(self.info.capability.id(), author.id(), key)
497    }
498
499    /// Create the initial message for the set reconciliation flow with a remote peer.
500    pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
501        self.info.ensure_open().map_err(anyhow::Error::from)?;
502        self.store.initial_message()
503    }
504
505    /// Process a set reconciliation message from a remote peer.
506    ///
507    /// Returns the next message to be sent to the peer, if any.
508    pub async fn sync_process_message(
509        &mut self,
510        message: crate::ranger::Message<SignedEntry>,
511        from_peer: PeerIdBytes,
512        state: &mut SyncOutcome,
513    ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
514        self.info.ensure_open()?;
515        let my_namespace = self.id();
516        let now = system_time_now();
517
518        // update state with incoming data.
519        state.num_recv += message.value_count();
520        for (entry, _content_status) in message.values() {
521            state
522                .heads_received
523                .insert(entry.author(), entry.timestamp());
524        }
525
526        // let subscribers = std::rc::Rc::new(&mut self.subscribers);
527        // l
528        let cb = self.info.content_status_cb.clone();
529        let download_policy = self
530            .store
531            .get_download_policy(&my_namespace)
532            .unwrap_or_default();
533        let reply = self
534            .store
535            .process_message(
536                &Default::default(),
537                message,
538                // validate callback: validate incoming entries, and send to on_insert channel
539                |store, entry, content_status| {
540                    let origin = InsertOrigin::Sync {
541                        from: from_peer,
542                        remote_content_status: content_status,
543                    };
544                    validate_entry(now, store, my_namespace, entry, &origin).is_ok()
545                },
546                // on_insert callback: is called when an entry was actually inserted in the store
547                async |_store, entry, content_status| {
548                    // We use `send_with` to only clone the entry if we have active subscriptions.
549                    self.info
550                        .subscribers
551                        .send_with(|| {
552                            let should_download = download_policy.matches(entry.entry());
553                            Event::RemoteInsert {
554                                from: from_peer,
555                                namespace: my_namespace,
556                                entry: entry.clone(),
557                                should_download,
558                                remote_content_status: content_status,
559                            }
560                        })
561                        .await
562                },
563                // content_status callback: get content status for outgoing entries
564                async move |entry| {
565                    if let Some(cb) = cb.as_ref() {
566                        cb(entry.content_hash()).await
567                    } else {
568                        ContentStatus::Missing
569                    }
570                },
571            )
572            .await?;
573
574        // update state with outgoing data.
575        if let Some(ref reply) = reply {
576            state.num_sent += reply.value_count();
577        }
578
579        Ok(reply)
580    }
581
582    /// Get the namespace identifier for this [`Replica`].
583    pub fn id(&self) -> NamespaceId {
584        self.info.capability.id()
585    }
586
587    /// Get the [`Capability`] of this [`Replica`].
588    pub fn capability(&self) -> &Capability {
589        &self.info.capability
590    }
591
592    /// Get the byte representation of the [`NamespaceSecret`] key for this replica. Will fail if
593    /// the replica is read only
594    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
595        self.info.capability.secret_key()
596    }
597}
598
599/// Error that occurs trying to access the [`NamespaceSecret`] of a read-only [`Capability`].
600#[derive(Debug, thiserror::Error)]
601#[error("Replica allows read access only.")]
602pub struct ReadOnly;
603
604/// Validate a [`SignedEntry`] if it's fit to be inserted.
605///
606/// This validates that
607/// * the entry's author and namespace signatures are correct
608/// * the entry's namespace matches the current replica
609/// * the entry's timestamp is not more than 10 minutes in the future of our system time
610/// * the entry is newer than an existing entry for the same key and author, if such exists.
611fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
612    now: u64,
613    store: &S,
614    expected_namespace: NamespaceId,
615    entry: &SignedEntry,
616    origin: &InsertOrigin,
617) -> Result<(), ValidationFailure> {
618    // Verify the namespace
619    if entry.namespace() != expected_namespace {
620        return Err(ValidationFailure::InvalidNamespace);
621    }
622
623    // Verify signature for non-local entries.
624    if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
625        return Err(ValidationFailure::BadSignature);
626    }
627
628    // Verify that the timestamp of the entry is not too far in the future.
629    if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
630        return Err(ValidationFailure::TooFarInTheFuture);
631    }
632    Ok(())
633}
634
635/// Error emitted when inserting entries into a [`Replica`] failed
636#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
637pub enum InsertError {
638    /// Storage error
639    #[error("storage error")]
640    Store(anyhow::Error),
641    /// Validation failure
642    #[error("validation failure")]
643    Validation(#[from] ValidationFailure),
644    /// A newer entry exists for either this entry's key or a prefix of the key.
645    #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
646    NewerEntryExists,
647    /// Attempted to insert an empty entry.
648    #[error("Attempted to insert an empty entry")]
649    EntryIsEmpty,
650    /// Replica is read only.
651    #[error("Attempted to insert to read only replica")]
652    #[from(ReadOnly)]
653    ReadOnly,
654    /// The replica is closed, no operations may be performed.
655    #[error("replica is closed")]
656    Closed,
657}
658
659/// Reason why entry validation failed
660#[derive(thiserror::Error, Debug)]
661pub enum ValidationFailure {
662    /// Entry namespace does not match the current replica.
663    #[error("Entry namespace does not match the current replica")]
664    InvalidNamespace,
665    /// Entry signature is invalid.
666    #[error("Entry signature is invalid")]
667    BadSignature,
668    /// Entry timestamp is too far in the future.
669    #[error("Entry timestamp is too far in the future.")]
670    TooFarInTheFuture,
671    /// Entry has length 0 but not the empty hash, or the empty hash but not length 0.
672    #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
673    InvalidEmptyEntry,
674}
675
676/// A signed entry.
677#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
678pub struct SignedEntry {
679    signature: EntrySignature,
680    entry: Entry,
681}
682
683impl From<SignedEntry> for Entry {
684    fn from(value: SignedEntry) -> Self {
685        value.entry
686    }
687}
688
689impl PartialOrd for SignedEntry {
690    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
691        Some(self.cmp(other))
692    }
693}
694
695impl Ord for SignedEntry {
696    fn cmp(&self, other: &Self) -> Ordering {
697        self.entry.cmp(&other.entry)
698    }
699}
700
701impl PartialOrd for Entry {
702    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
703        Some(self.cmp(other))
704    }
705}
706
707impl Ord for Entry {
708    fn cmp(&self, other: &Self) -> Ordering {
709        self.id
710            .cmp(&other.id)
711            .then_with(|| self.record.cmp(&other.record))
712    }
713}
714
715impl SignedEntry {
716    pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
717        SignedEntry { signature, entry }
718    }
719
720    /// Create a new signed entry by signing an entry with the `namespace` and `author`.
721    pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
722        let signature = EntrySignature::from_entry(&entry, namespace, author);
723        SignedEntry { signature, entry }
724    }
725
726    /// Create a new signed entries from its parts.
727    pub fn from_parts(
728        namespace: &NamespaceSecret,
729        author: &Author,
730        key: impl AsRef<[u8]>,
731        record: Record,
732    ) -> Self {
733        let id = RecordIdentifier::new(namespace.id(), author.id(), key);
734        let entry = Entry::new(id, record);
735        Self::from_entry(entry, namespace, author)
736    }
737
738    /// Verify the signatures on this entry.
739    pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
740        self.signature.verify(
741            &self.entry,
742            &self.entry.namespace().public_key(store)?,
743            &self.entry.author().public_key(store)?,
744        )
745    }
746
747    /// Get the signature.
748    pub fn signature(&self) -> &EntrySignature {
749        &self.signature
750    }
751
752    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
753    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
754        self.entry().validate_empty()
755    }
756
757    /// Get the [`Entry`].
758    pub fn entry(&self) -> &Entry {
759        &self.entry
760    }
761
762    /// Get the content [`struct@Hash`] of the entry.
763    pub fn content_hash(&self) -> Hash {
764        self.entry().content_hash()
765    }
766
767    /// Get the content length of the entry.
768    pub fn content_len(&self) -> u64 {
769        self.entry().content_len()
770    }
771
772    /// Get the author bytes of this entry.
773    pub fn author_bytes(&self) -> AuthorId {
774        self.entry().id().author()
775    }
776
777    /// Get the key of the entry.
778    pub fn key(&self) -> &[u8] {
779        self.entry().id().key()
780    }
781
782    /// Get the timestamp of the entry.
783    pub fn timestamp(&self) -> u64 {
784        self.entry().timestamp()
785    }
786}
787
788impl RangeEntry for SignedEntry {
789    type Key = RecordIdentifier;
790    type Value = Record;
791
792    fn key(&self) -> &Self::Key {
793        &self.entry.id
794    }
795
796    fn value(&self) -> &Self::Value {
797        &self.entry.record
798    }
799
800    fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
801        let mut hasher = blake3::Hasher::new();
802        hasher.update(self.namespace().as_ref());
803        hasher.update(self.author_bytes().as_ref());
804        hasher.update(self.key());
805        hasher.update(&self.timestamp().to_be_bytes());
806        hasher.update(self.content_hash().as_bytes());
807        Fingerprint(hasher.finalize().into())
808    }
809}
810
811/// Signature over an entry.
812#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
813pub struct EntrySignature {
814    author_signature: Signature,
815    namespace_signature: Signature,
816}
817
818impl Debug for EntrySignature {
819    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
820        f.debug_struct("EntrySignature")
821            .field(
822                "namespace_signature",
823                &hex::encode(self.namespace_signature.to_bytes()),
824            )
825            .field(
826                "author_signature",
827                &hex::encode(self.author_signature.to_bytes()),
828            )
829            .finish()
830    }
831}
832
833impl EntrySignature {
834    /// Create a new signature by signing an entry with the `namespace` and `author`.
835    pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
836        // TODO: this should probably include a namespace prefix
837        // namespace in the cryptographic sense.
838        let bytes = entry.to_vec();
839        let namespace_signature = namespace.sign(&bytes);
840        let author_signature = author.sign(&bytes);
841
842        EntrySignature {
843            author_signature,
844            namespace_signature,
845        }
846    }
847
848    /// Verify that this signature was created by signing the `entry` with the
849    /// secret keys of the specified `author` and `namespace`.
850    pub fn verify(
851        &self,
852        entry: &Entry,
853        namespace: &NamespacePublicKey,
854        author: &AuthorPublicKey,
855    ) -> Result<(), SignatureError> {
856        let bytes = entry.to_vec();
857        namespace.verify(&bytes, &self.namespace_signature)?;
858        author.verify(&bytes, &self.author_signature)?;
859
860        Ok(())
861    }
862
863    pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
864        let namespace_signature = Signature::from_bytes(namespace_sig);
865        let author_signature = Signature::from_bytes(author_sig);
866
867        EntrySignature {
868            author_signature,
869            namespace_signature,
870        }
871    }
872
873    pub(crate) fn author(&self) -> &Signature {
874        &self.author_signature
875    }
876
877    pub(crate) fn namespace(&self) -> &Signature {
878        &self.namespace_signature
879    }
880}
881
882/// A single entry in a [`Replica`]
883///
884/// An entry is identified by a key, its [`Author`], and the [`Replica`]'s
885/// [`NamespaceSecret`]. Its value is the [32-byte BLAKE3 hash](iroh_blobs::Hash)
886/// of the entry's content data, the size of this content data, and a timestamp.
887#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
888pub struct Entry {
889    id: RecordIdentifier,
890    record: Record,
891}
892
893impl Entry {
894    /// Create a new entry
895    pub fn new(id: RecordIdentifier, record: Record) -> Self {
896        Entry { id, record }
897    }
898
899    /// Create a new empty entry with the current timestamp.
900    pub fn new_empty(id: RecordIdentifier) -> Self {
901        Entry {
902            id,
903            record: Record::empty_current(),
904        }
905    }
906
907    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
908    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
909        match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
910            (true, true) => Ok(()),
911            (false, false) => Ok(()),
912            (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
913            (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
914        }
915    }
916
917    /// Get the [`RecordIdentifier`] for this entry.
918    pub fn id(&self) -> &RecordIdentifier {
919        &self.id
920    }
921
922    /// Get the [`NamespaceId`] of this entry.
923    pub fn namespace(&self) -> NamespaceId {
924        self.id.namespace()
925    }
926
927    /// Get the [`AuthorId`] of this entry.
928    pub fn author(&self) -> AuthorId {
929        self.id.author()
930    }
931
932    /// Get the key of this entry.
933    pub fn key(&self) -> &[u8] {
934        self.id.key()
935    }
936
937    /// Get the [`Record`] contained in this entry.
938    pub fn record(&self) -> &Record {
939        &self.record
940    }
941
942    /// Get the content hash of the record.
943    pub fn content_hash(&self) -> Hash {
944        self.record.hash
945    }
946
947    /// Get the content length of the record.
948    pub fn content_len(&self) -> u64 {
949        self.record.len
950    }
951
952    /// Get the timestamp of the record.
953    pub fn timestamp(&self) -> u64 {
954        self.record.timestamp
955    }
956
957    /// Serialize this entry into its canonical byte representation used for signing.
958    pub fn encode(&self, out: &mut Vec<u8>) {
959        self.id.encode(out);
960        self.record.encode(out);
961    }
962
963    /// Serialize this entry into a new vector with its canonical byte representation.
964    pub fn to_vec(&self) -> Vec<u8> {
965        let mut out = Vec::new();
966        self.encode(&mut out);
967        out
968    }
969
970    /// Sign this entry with a [`NamespaceSecret`] and [`Author`].
971    pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
972        SignedEntry::from_entry(self, namespace, author)
973    }
974}
975
976const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
977const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
978const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
979
980/// The identifier of a record.
981#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
982pub struct RecordIdentifier(Bytes);
983
984impl Default for RecordIdentifier {
985    fn default() -> Self {
986        Self::new(NamespaceId::default(), AuthorId::default(), b"")
987    }
988}
989
990impl Debug for RecordIdentifier {
991    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
992        f.debug_struct("RecordIdentifier")
993            .field("namespace", &self.namespace())
994            .field("author", &self.author())
995            .field("key", &std::string::String::from_utf8_lossy(self.key()))
996            .finish()
997    }
998}
999
1000impl RangeKey for RecordIdentifier {
1001    #[cfg(test)]
1002    fn is_prefix_of(&self, other: &Self) -> bool {
1003        other.as_ref().starts_with(self.as_ref())
1004    }
1005}
1006
1007fn system_time_now() -> u64 {
1008    SystemTime::now()
1009        .duration_since(SystemTime::UNIX_EPOCH)
1010        .expect("time drift")
1011        .as_micros() as u64
1012}
1013
1014impl RecordIdentifier {
1015    /// Create a new [`RecordIdentifier`].
1016    pub fn new(
1017        namespace: impl Into<NamespaceId>,
1018        author: impl Into<AuthorId>,
1019        key: impl AsRef<[u8]>,
1020    ) -> Self {
1021        let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1022        bytes.extend_from_slice(namespace.into().as_bytes());
1023        bytes.extend_from_slice(author.into().as_bytes());
1024        bytes.extend_from_slice(key.as_ref());
1025        Self(bytes.freeze())
1026    }
1027
1028    /// Serialize this [`RecordIdentifier`] into a mutable byte array.
1029    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1030        out.extend_from_slice(&self.0);
1031    }
1032
1033    /// Get this [`RecordIdentifier`] as [Bytes].
1034    pub fn as_bytes(&self) -> Bytes {
1035        self.0.clone()
1036    }
1037
1038    /// Get this [`RecordIdentifier`] as a tuple of byte slices.
1039    pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1040        (
1041            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1042            self.0[AUTHOR_BYTES].try_into().unwrap(),
1043            &self.0[KEY_BYTES],
1044        )
1045    }
1046
1047    /// Get this [`RecordIdentifier`] as a tuple of bytes.
1048    pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1049        (
1050            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1051            self.0[AUTHOR_BYTES].try_into().unwrap(),
1052            self.0.slice(KEY_BYTES),
1053        )
1054    }
1055
1056    /// Get the key of this record.
1057    pub fn key(&self) -> &[u8] {
1058        &self.0[KEY_BYTES]
1059    }
1060
1061    /// Get the key of this record as [`Bytes`].
1062    pub fn key_bytes(&self) -> Bytes {
1063        self.0.slice(KEY_BYTES)
1064    }
1065
1066    /// Get the [`NamespaceId`] of this record as byte array.
1067    pub fn namespace(&self) -> NamespaceId {
1068        let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1069        value.into()
1070    }
1071
1072    /// Get the [`AuthorId`] of this record as byte array.
1073    pub fn author(&self) -> AuthorId {
1074        let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1075        value.into()
1076    }
1077}
1078
1079impl AsRef<[u8]> for RecordIdentifier {
1080    fn as_ref(&self) -> &[u8] {
1081        &self.0
1082    }
1083}
1084
1085impl Deref for SignedEntry {
1086    type Target = Entry;
1087    fn deref(&self) -> &Self::Target {
1088        &self.entry
1089    }
1090}
1091
1092impl Deref for Entry {
1093    type Target = Record;
1094    fn deref(&self) -> &Self::Target {
1095        &self.record
1096    }
1097}
1098
1099/// The data part of an entry in a [`Replica`].
1100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1101pub struct Record {
1102    /// Length of the data referenced by `hash`.
1103    len: u64,
1104    /// Hash of the content data.
1105    hash: Hash,
1106    /// Record creation timestamp. Counted as micros since the Unix epoch.
1107    timestamp: u64,
1108}
1109
1110impl RangeValue for Record {}
1111
1112/// Ordering for entry values.
1113///
1114/// Compares first the timestamp, then the content hash.
1115impl Ord for Record {
1116    fn cmp(&self, other: &Self) -> Ordering {
1117        self.timestamp
1118            .cmp(&other.timestamp)
1119            .then_with(|| self.hash.cmp(&other.hash))
1120    }
1121}
1122
1123impl PartialOrd for Record {
1124    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1125        Some(self.cmp(other))
1126    }
1127}
1128
1129impl Record {
1130    /// Create a new record.
1131    pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1132        debug_assert!(
1133            len != 0 || hash == Hash::EMPTY,
1134            "if `len` is 0 then `hash` must be the hash of the empty byte range"
1135        );
1136        Record {
1137            hash,
1138            len,
1139            timestamp,
1140        }
1141    }
1142
1143    /// Create a tombstone record (empty content)
1144    pub fn empty(timestamp: u64) -> Self {
1145        Self::new(Hash::EMPTY, 0, timestamp)
1146    }
1147
1148    /// Create a tombstone record with the timestamp set to now.
1149    pub fn empty_current() -> Self {
1150        Self::new_current(Hash::EMPTY, 0)
1151    }
1152
1153    /// Return `true` if the entry is empty.
1154    pub fn is_empty(&self) -> bool {
1155        self.hash == Hash::EMPTY
1156    }
1157
1158    /// Create a new [`Record`] with the timestamp set to now.
1159    pub fn new_current(hash: Hash, len: u64) -> Self {
1160        let timestamp = system_time_now();
1161        Self::new(hash, len, timestamp)
1162    }
1163
1164    /// Get the length of the data addressed by this record's content hash.
1165    pub fn content_len(&self) -> u64 {
1166        self.len
1167    }
1168
1169    /// Get the [`struct@Hash`] of the content data of this record.
1170    pub fn content_hash(&self) -> Hash {
1171        self.hash
1172    }
1173
1174    /// Get the timestamp of this record.
1175    pub fn timestamp(&self) -> u64 {
1176        self.timestamp
1177    }
1178
1179    #[cfg(test)]
1180    pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1181        let len = data.as_ref().len() as u64;
1182        let hash = Hash::new(data);
1183        Self::new_current(hash, len)
1184    }
1185
1186    #[cfg(test)]
1187    pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1188        let len = data.as_ref().len() as u64;
1189        let hash = Hash::new(data);
1190        Self::new(hash, len, timestamp)
1191    }
1192
1193    /// Serialize this record into a mutable byte array.
1194    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1195        out.extend_from_slice(&self.len.to_be_bytes());
1196        out.extend_from_slice(self.hash.as_ref());
1197        out.extend_from_slice(&self.timestamp.to_be_bytes())
1198    }
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203    use std::collections::HashSet;
1204
1205    use anyhow::Result;
1206    use rand::SeedableRng;
1207
1208    use super::*;
1209    use crate::{
1210        actor::SyncHandle,
1211        ranger::{Range, Store as _},
1212        store::{OpenError, Query, SortBy, SortDirection, Store},
1213    };
1214
1215    #[tokio::test]
1216    async fn test_basics_memory() -> Result<()> {
1217        let store = store::Store::memory();
1218        test_basics(store).await?;
1219
1220        Ok(())
1221    }
1222
1223    #[tokio::test]
1224    #[cfg(feature = "fs-store")]
1225    async fn test_basics_fs() -> Result<()> {
1226        let dbfile = tempfile::NamedTempFile::new()?;
1227        let store = store::fs::Store::persistent(dbfile.path())?;
1228        test_basics(store).await?;
1229        Ok(())
1230    }
1231
1232    async fn test_basics(mut store: Store) -> Result<()> {
1233        let mut rng = rand::rng();
1234        let alice = Author::new(&mut rng);
1235        let bob = Author::new(&mut rng);
1236        let myspace = NamespaceSecret::new(&mut rng);
1237
1238        let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1239        let record = Record::current_from_data(b"this is my cool data");
1240        let entry = Entry::new(record_id, record);
1241        let signed_entry = entry.sign(&myspace, &alice);
1242        signed_entry.verify(&()).expect("failed to verify");
1243
1244        let mut my_replica = store.new_replica(myspace.clone())?;
1245        for i in 0..10 {
1246            my_replica
1247                .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice"))
1248                .await?;
1249        }
1250
1251        for i in 0..10 {
1252            let res = store
1253                .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1254                .unwrap();
1255            let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1256            assert_eq!(res.entry().record().content_len(), len);
1257            res.verify(&())?;
1258        }
1259
1260        // Test multiple records for the same key
1261        let mut my_replica = store.new_replica(myspace.clone())?;
1262        my_replica
1263            .hash_and_insert("/cool/path", &alice, "round 1")
1264            .await?;
1265        let _entry = store
1266            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1267            .unwrap();
1268        // Second
1269        let mut my_replica = store.new_replica(myspace.clone())?;
1270        my_replica
1271            .hash_and_insert("/cool/path", &alice, "round 2")
1272            .await?;
1273        let _entry = store
1274            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1275            .unwrap();
1276
1277        // Get All by author
1278        let entries: Vec<_> = store
1279            .get_many(myspace.id(), Query::author(alice.id()))?
1280            .collect::<Result<_>>()?;
1281        assert_eq!(entries.len(), 11);
1282
1283        // Get All by author
1284        let entries: Vec<_> = store
1285            .get_many(myspace.id(), Query::author(bob.id()))?
1286            .collect::<Result<_>>()?;
1287        assert_eq!(entries.len(), 0);
1288
1289        // Get All by key
1290        let entries: Vec<_> = store
1291            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1292            .collect::<Result<_>>()?;
1293        assert_eq!(entries.len(), 1);
1294
1295        // Get All
1296        let entries: Vec<_> = store
1297            .get_many(myspace.id(), Query::all())?
1298            .collect::<Result<_>>()?;
1299        assert_eq!(entries.len(), 11);
1300
1301        // insert record from different author
1302        let mut my_replica = store.new_replica(myspace.clone())?;
1303        let _entry = my_replica
1304            .hash_and_insert("/cool/path", &bob, "bob round 1")
1305            .await?;
1306
1307        // Get All by author
1308        let entries: Vec<_> = store
1309            .get_many(myspace.id(), Query::author(alice.id()))?
1310            .collect::<Result<_>>()?;
1311        assert_eq!(entries.len(), 11);
1312
1313        let entries: Vec<_> = store
1314            .get_many(myspace.id(), Query::author(bob.id()))?
1315            .collect::<Result<_>>()?;
1316        assert_eq!(entries.len(), 1);
1317
1318        // Get All by key
1319        let entries: Vec<_> = store
1320            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1321            .collect::<Result<_>>()?;
1322        assert_eq!(entries.len(), 2);
1323
1324        // Get all by prefix
1325        let entries: Vec<_> = store
1326            .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1327            .collect::<Result<_>>()?;
1328        assert_eq!(entries.len(), 2);
1329
1330        // Get All by author and prefix
1331        let entries: Vec<_> = store
1332            .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1333            .collect::<Result<_>>()?;
1334        assert_eq!(entries.len(), 1);
1335
1336        let entries: Vec<_> = store
1337            .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1338            .collect::<Result<_>>()?;
1339        assert_eq!(entries.len(), 1);
1340
1341        // Get All
1342        let entries: Vec<_> = store
1343            .get_many(myspace.id(), Query::all())?
1344            .collect::<Result<_>>()?;
1345        assert_eq!(entries.len(), 12);
1346
1347        // Get Range of all should return all latest
1348        let mut my_replica = store.new_replica(myspace.clone())?;
1349        let entries_second: Vec<_> = my_replica
1350            .store
1351            .get_range(Range::new(
1352                RecordIdentifier::default(),
1353                RecordIdentifier::default(),
1354            ))?
1355            .collect::<Result<_, _>>()?;
1356
1357        assert_eq!(entries_second.len(), 12);
1358        assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1359
1360        test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1361        store.flush()?;
1362        Ok(())
1363    }
1364
1365    /// Test that [`Store::register_useful_peer`] behaves like a LRUCache of size
1366    /// [`super::store::PEERS_PER_DOC_CACHE_SIZE`].
1367    fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1368        /// Helper to verify the store returns the expected peers for the namespace.
1369        #[track_caller]
1370        fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1371            assert_eq!(
1372                expected_peers,
1373                &store
1374                    .get_sync_peers(&namespace)
1375                    .unwrap()
1376                    .unwrap()
1377                    .collect::<Vec<_>>(),
1378                "sync peers differ"
1379            );
1380        }
1381
1382        let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1383        // expected peers: newest peers are to the front, oldest to the back
1384        let mut expected_peers = Vec::with_capacity(count);
1385        for i in 0..count as u8 {
1386            let peer = [i; 32];
1387            expected_peers.insert(0, peer);
1388            store.register_useful_peer(namespace, peer)?;
1389        }
1390        verify_peers(store, namespace, &expected_peers);
1391
1392        // one more peer should evict the last peer
1393        expected_peers.pop();
1394        let newer_peer = [count as u8; 32];
1395        expected_peers.insert(0, newer_peer);
1396        store.register_useful_peer(namespace, newer_peer)?;
1397        verify_peers(store, namespace, &expected_peers);
1398
1399        // move one existing peer up
1400        let refreshed_peer = expected_peers.remove(2);
1401        expected_peers.insert(0, refreshed_peer);
1402        store.register_useful_peer(namespace, refreshed_peer)?;
1403        verify_peers(store, namespace, &expected_peers);
1404        Ok(())
1405    }
1406
1407    #[tokio::test]
1408    async fn test_content_hashes_iterator_memory() -> Result<()> {
1409        let store = store::Store::memory();
1410        test_content_hashes_iterator(store).await
1411    }
1412
1413    #[tokio::test]
1414    #[cfg(feature = "fs-store")]
1415    async fn test_content_hashes_iterator_fs() -> Result<()> {
1416        let dbfile = tempfile::NamedTempFile::new()?;
1417        let store = store::fs::Store::persistent(dbfile.path())?;
1418        test_content_hashes_iterator(store).await
1419    }
1420
1421    async fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1422        let mut rng = rand::rng();
1423        let mut expected = HashSet::new();
1424        let n_replicas = 3;
1425        let n_entries = 4;
1426        for i in 0..n_replicas {
1427            let namespace = NamespaceSecret::new(&mut rng);
1428            let author = store.new_author(&mut rng)?;
1429            let mut replica = store.new_replica(namespace)?;
1430            for j in 0..n_entries {
1431                let key = format!("{j}");
1432                let data = format!("{i}:{j}");
1433                let hash = replica.hash_and_insert(key, &author, data).await?;
1434                expected.insert(hash);
1435            }
1436        }
1437        assert_eq!(expected.len(), n_replicas * n_entries);
1438        let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1439        assert_eq!(actual, expected);
1440        Ok(())
1441    }
1442
1443    #[test]
1444    fn test_multikey() {
1445        let mut rng = rand::rng();
1446
1447        let k = ["a", "c", "z"];
1448
1449        let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1450        n.sort_by_key(|n| n.id());
1451
1452        let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1453        a.sort_by_key(|a| a.id());
1454
1455        // Just key
1456        {
1457            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1458            let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1459            let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1460
1461            let range = Range::new(ri0.clone(), ri2.clone());
1462            assert!(range.contains(&ri0), "start");
1463            assert!(range.contains(&ri1), "inside");
1464            assert!(!range.contains(&ri2), "end");
1465
1466            assert!(ri0 < ri1);
1467            assert!(ri1 < ri2);
1468        }
1469
1470        // Just namespace
1471        {
1472            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1473            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1474            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1475
1476            let range = Range::new(ri0.clone(), ri2.clone());
1477            assert!(range.contains(&ri0), "start");
1478            assert!(range.contains(&ri1), "inside");
1479            assert!(!range.contains(&ri2), "end");
1480
1481            assert!(ri0 < ri1);
1482            assert!(ri1 < ri2);
1483        }
1484
1485        // Just author
1486        {
1487            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1488            let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1489            let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1490
1491            let range = Range::new(ri0.clone(), ri2.clone());
1492            assert!(range.contains(&ri0), "start");
1493            assert!(range.contains(&ri1), "inside");
1494            assert!(!range.contains(&ri2), "end");
1495
1496            assert!(ri0 < ri1);
1497            assert!(ri1 < ri2);
1498        }
1499
1500        // Just key and namespace
1501        {
1502            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1503            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1504            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1505
1506            let range = Range::new(ri0.clone(), ri2.clone());
1507            assert!(range.contains(&ri0), "start");
1508            assert!(range.contains(&ri1), "inside");
1509            assert!(!range.contains(&ri2), "end");
1510
1511            assert!(ri0 < ri1);
1512            assert!(ri1 < ri2);
1513        }
1514
1515        // Mixed
1516        {
1517            // Ord should prioritize namespace - author - key
1518
1519            let a0 = a[0].id();
1520            let a1 = a[1].id();
1521            let n0 = n[0].id();
1522            let n1 = n[1].id();
1523            let k0 = k[0];
1524            let k1 = k[1];
1525
1526            assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1527            assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1528            assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1529            assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1530        }
1531    }
1532
1533    #[tokio::test]
1534    async fn test_timestamps_memory() -> Result<()> {
1535        let store = store::Store::memory();
1536        test_timestamps(store).await?;
1537
1538        Ok(())
1539    }
1540
1541    #[tokio::test]
1542    #[cfg(feature = "fs-store")]
1543    async fn test_timestamps_fs() -> Result<()> {
1544        let dbfile = tempfile::NamedTempFile::new()?;
1545        let store = store::fs::Store::persistent(dbfile.path())?;
1546        test_timestamps(store).await?;
1547        Ok(())
1548    }
1549
1550    async fn test_timestamps(mut store: Store) -> Result<()> {
1551        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1552        let namespace = NamespaceSecret::new(&mut rng);
1553        let _replica = store.new_replica(namespace.clone())?;
1554        let author = store.new_author(&mut rng)?;
1555        store.close_replica(namespace.id());
1556        let mut replica = store.open_replica(&namespace.id())?;
1557
1558        let key = b"hello";
1559        let value = b"world";
1560        let entry = {
1561            let timestamp = 2;
1562            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1563            let record = Record::from_data(value, timestamp);
1564            Entry::new(id, record).sign(&namespace, &author)
1565        };
1566
1567        replica
1568            .insert_entry(entry.clone(), InsertOrigin::Local)
1569            .await
1570            .unwrap();
1571        store.close_replica(namespace.id());
1572        let res = store
1573            .get_exact(namespace.id(), author.id(), key, false)?
1574            .unwrap();
1575        assert_eq!(res, entry);
1576
1577        let entry2 = {
1578            let timestamp = 1;
1579            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1580            let record = Record::from_data(value, timestamp);
1581            Entry::new(id, record).sign(&namespace, &author)
1582        };
1583
1584        let mut replica = store.open_replica(&namespace.id())?;
1585        let res = replica.insert_entry(entry2, InsertOrigin::Local).await;
1586        store.close_replica(namespace.id());
1587        assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1588        let res = store
1589            .get_exact(namespace.id(), author.id(), key, false)?
1590            .unwrap();
1591        assert_eq!(res, entry);
1592        store.flush()?;
1593        Ok(())
1594    }
1595
1596    #[tokio::test]
1597    async fn test_replica_sync_memory() -> Result<()> {
1598        let alice_store = store::Store::memory();
1599        let bob_store = store::Store::memory();
1600
1601        test_replica_sync(alice_store, bob_store).await?;
1602        Ok(())
1603    }
1604
1605    #[tokio::test]
1606    #[cfg(feature = "fs-store")]
1607    async fn test_replica_sync_fs() -> Result<()> {
1608        let alice_dbfile = tempfile::NamedTempFile::new()?;
1609        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1610        let bob_dbfile = tempfile::NamedTempFile::new()?;
1611        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1612        test_replica_sync(alice_store, bob_store).await?;
1613
1614        Ok(())
1615    }
1616
1617    async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1618        let alice_set = ["ape", "eel", "fox", "gnu"];
1619        let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1620
1621        let mut rng = rand::rng();
1622        let author = Author::new(&mut rng);
1623        let myspace = NamespaceSecret::new(&mut rng);
1624        let mut alice = alice_store.new_replica(myspace.clone())?;
1625        for el in &alice_set {
1626            alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1627        }
1628
1629        let mut bob = bob_store.new_replica(myspace.clone())?;
1630        for el in &bob_set {
1631            bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1632        }
1633
1634        let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1635
1636        assert_eq!(alice_out.num_sent, 2);
1637        assert_eq!(bob_out.num_recv, 2);
1638        assert_eq!(alice_out.num_recv, 6);
1639        assert_eq!(bob_out.num_sent, 6);
1640
1641        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1642        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1643        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1644        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1645        alice_store.flush()?;
1646        bob_store.flush()?;
1647        Ok(())
1648    }
1649
1650    #[tokio::test]
1651    async fn test_replica_timestamp_sync_memory() -> Result<()> {
1652        let alice_store = store::Store::memory();
1653        let bob_store = store::Store::memory();
1654
1655        test_replica_timestamp_sync(alice_store, bob_store).await?;
1656        Ok(())
1657    }
1658
1659    #[tokio::test]
1660    #[cfg(feature = "fs-store")]
1661    async fn test_replica_timestamp_sync_fs() -> Result<()> {
1662        let alice_dbfile = tempfile::NamedTempFile::new()?;
1663        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1664        let bob_dbfile = tempfile::NamedTempFile::new()?;
1665        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1666        test_replica_timestamp_sync(alice_store, bob_store).await?;
1667
1668        Ok(())
1669    }
1670
1671    async fn test_replica_timestamp_sync(
1672        mut alice_store: Store,
1673        mut bob_store: Store,
1674    ) -> Result<()> {
1675        let mut rng = rand::rng();
1676        let author = Author::new(&mut rng);
1677        let namespace = NamespaceSecret::new(&mut rng);
1678        let mut alice = alice_store.new_replica(namespace.clone())?;
1679        let mut bob = bob_store.new_replica(namespace.clone())?;
1680
1681        let key = b"key";
1682        let alice_value = b"alice";
1683        let bob_value = b"bob";
1684        let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?;
1685        // system time increased - sync should overwrite
1686        let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?;
1687        sync(&mut alice, &mut bob).await?;
1688        assert_eq!(
1689            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1690            Some(bob_hash)
1691        );
1692        assert_eq!(
1693            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1694            Some(bob_hash)
1695        );
1696
1697        let mut alice = alice_store.new_replica(namespace.clone())?;
1698        let mut bob = bob_store.new_replica(namespace.clone())?;
1699
1700        let alice_value_2 = b"alice2";
1701        // system time increased - sync should overwrite
1702        let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?;
1703        let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?;
1704        sync(&mut alice, &mut bob).await?;
1705        assert_eq!(
1706            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1707            Some(alice_hash_2)
1708        );
1709        assert_eq!(
1710            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1711            Some(alice_hash_2)
1712        );
1713        alice_store.flush()?;
1714        bob_store.flush()?;
1715        Ok(())
1716    }
1717
1718    #[tokio::test]
1719    async fn test_future_timestamp() -> Result<()> {
1720        let mut rng = rand::rng();
1721        let mut store = store::Store::memory();
1722        let author = Author::new(&mut rng);
1723        let namespace = NamespaceSecret::new(&mut rng);
1724
1725        let mut replica = store.new_replica(namespace.clone())?;
1726        let key = b"hi";
1727        let t = system_time_now();
1728        let record = Record::from_data(b"1", t);
1729        let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1730        replica
1731            .insert_entry(entry0.clone(), InsertOrigin::Local)
1732            .await?;
1733
1734        assert_eq!(
1735            get_entry(&mut store, namespace.id(), author.id(), key)?,
1736            entry0
1737        );
1738
1739        let mut replica = store.new_replica(namespace.clone())?;
1740        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1741        let record = Record::from_data(b"2", t);
1742        let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1743        replica
1744            .insert_entry(entry1.clone(), InsertOrigin::Local)
1745            .await?;
1746        assert_eq!(
1747            get_entry(&mut store, namespace.id(), author.id(), key)?,
1748            entry1
1749        );
1750
1751        let mut replica = store.new_replica(namespace.clone())?;
1752        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1753        let record = Record::from_data(b"2", t);
1754        let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1755        replica
1756            .insert_entry(entry2.clone(), InsertOrigin::Local)
1757            .await?;
1758        assert_eq!(
1759            get_entry(&mut store, namespace.id(), author.id(), key)?,
1760            entry2
1761        );
1762
1763        let mut replica = store.new_replica(namespace.clone())?;
1764        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1765        let record = Record::from_data(b"2", t);
1766        let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1767        let res = replica.insert_entry(entry3, InsertOrigin::Local).await;
1768        assert!(matches!(
1769            res,
1770            Err(InsertError::Validation(
1771                ValidationFailure::TooFarInTheFuture
1772            ))
1773        ));
1774        assert_eq!(
1775            get_entry(&mut store, namespace.id(), author.id(), key)?,
1776            entry2
1777        );
1778        store.flush()?;
1779        Ok(())
1780    }
1781
1782    #[tokio::test]
1783    async fn test_insert_empty() -> Result<()> {
1784        let mut store = store::Store::memory();
1785        let mut rng = rand::rng();
1786        let alice = Author::new(&mut rng);
1787        let myspace = NamespaceSecret::new(&mut rng);
1788        let mut replica = store.new_replica(myspace.clone())?;
1789        let hash = Hash::new(b"");
1790        let res = replica.insert(b"foo", &alice, hash, 0).await;
1791        assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1792        store.flush()?;
1793        Ok(())
1794    }
1795
1796    #[tokio::test]
1797    async fn test_prefix_delete_memory() -> Result<()> {
1798        let store = store::Store::memory();
1799        test_prefix_delete(store).await?;
1800        Ok(())
1801    }
1802
1803    #[tokio::test]
1804    #[cfg(feature = "fs-store")]
1805    async fn test_prefix_delete_fs() -> Result<()> {
1806        let dbfile = tempfile::NamedTempFile::new()?;
1807        let store = store::fs::Store::persistent(dbfile.path())?;
1808        test_prefix_delete(store).await?;
1809        Ok(())
1810    }
1811
1812    async fn test_prefix_delete(mut store: Store) -> Result<()> {
1813        let mut rng = rand::rng();
1814        let alice = Author::new(&mut rng);
1815        let myspace = NamespaceSecret::new(&mut rng);
1816        let mut replica = store.new_replica(myspace.clone())?;
1817        let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?;
1818        let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?;
1819
1820        // sanity checks
1821        assert_eq!(
1822            get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1823            Some(hash1)
1824        );
1825        assert_eq!(
1826            get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1827            Some(hash2)
1828        );
1829
1830        // delete
1831        let mut replica = store.new_replica(myspace.clone())?;
1832        let deleted = replica.delete_prefix(b"foo", &alice).await?;
1833        assert_eq!(deleted, 2);
1834        assert_eq!(
1835            store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1836            None
1837        );
1838        assert_eq!(
1839            store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1840            None
1841        );
1842        assert_eq!(
1843            store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1844            None
1845        );
1846        store.flush()?;
1847        Ok(())
1848    }
1849
1850    #[tokio::test]
1851    async fn test_replica_sync_delete_memory() -> Result<()> {
1852        let alice_store = store::Store::memory();
1853        let bob_store = store::Store::memory();
1854
1855        test_replica_sync_delete(alice_store, bob_store).await
1856    }
1857
1858    #[tokio::test]
1859    #[cfg(feature = "fs-store")]
1860    async fn test_replica_sync_delete_fs() -> Result<()> {
1861        let alice_dbfile = tempfile::NamedTempFile::new()?;
1862        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1863        let bob_dbfile = tempfile::NamedTempFile::new()?;
1864        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1865        test_replica_sync_delete(alice_store, bob_store).await
1866    }
1867
1868    async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1869        let alice_set = ["foot"];
1870        let bob_set = ["fool", "foo", "fog"];
1871
1872        let mut rng = rand::rng();
1873        let author = Author::new(&mut rng);
1874        let myspace = NamespaceSecret::new(&mut rng);
1875        let mut alice = alice_store.new_replica(myspace.clone())?;
1876        for el in &alice_set {
1877            alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1878        }
1879
1880        let mut bob = bob_store.new_replica(myspace.clone())?;
1881        for el in &bob_set {
1882            bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1883        }
1884
1885        sync(&mut alice, &mut bob).await?;
1886
1887        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1888        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1889        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1890        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1891
1892        let mut alice = alice_store.new_replica(myspace.clone())?;
1893        let mut bob = bob_store.new_replica(myspace.clone())?;
1894        alice.delete_prefix("foo", &author).await?;
1895        bob.hash_and_insert("fooz", &author, "fooz".as_bytes())
1896            .await?;
1897        sync(&mut alice, &mut bob).await?;
1898        check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1899        check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1900        alice_store.flush()?;
1901        bob_store.flush()?;
1902        Ok(())
1903    }
1904
1905    #[tokio::test]
1906    async fn test_replica_remove_memory() -> Result<()> {
1907        let alice_store = store::Store::memory();
1908        test_replica_remove(alice_store).await
1909    }
1910
1911    #[tokio::test]
1912    #[cfg(feature = "fs-store")]
1913    async fn test_replica_remove_fs() -> Result<()> {
1914        let alice_dbfile = tempfile::NamedTempFile::new()?;
1915        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1916        test_replica_remove(alice_store).await
1917    }
1918
1919    async fn test_replica_remove(mut store: Store) -> Result<()> {
1920        let mut rng = rand::rng();
1921        let namespace = NamespaceSecret::new(&mut rng);
1922        let author = Author::new(&mut rng);
1923        let mut replica = store.new_replica(namespace.clone())?;
1924
1925        // insert entry
1926        let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?;
1927        let res = store
1928            .get_many(namespace.id(), Query::all())?
1929            .collect::<Vec<_>>();
1930        assert_eq!(res.len(), 1);
1931
1932        // remove replica
1933        let res = store.remove_replica(&namespace.id());
1934        // may not remove replica while still open;
1935        assert!(res.is_err());
1936        store.close_replica(namespace.id());
1937        store.remove_replica(&namespace.id())?;
1938        let res = store
1939            .get_many(namespace.id(), Query::all())?
1940            .collect::<Vec<_>>();
1941        assert_eq!(res.len(), 0);
1942
1943        // may not reopen removed replica
1944        let res = store.load_replica_info(&namespace.id());
1945        assert!(matches!(res, Err(OpenError::NotFound)));
1946
1947        // may recreate replica
1948        let mut replica = store.new_replica(namespace.clone())?;
1949        replica.insert(b"foo", &author, hash, 3).await?;
1950        let res = store
1951            .get_many(namespace.id(), Query::all())?
1952            .collect::<Vec<_>>();
1953        assert_eq!(res.len(), 1);
1954        store.flush()?;
1955        Ok(())
1956    }
1957
1958    #[tokio::test]
1959    async fn test_replica_delete_edge_cases_memory() -> Result<()> {
1960        let store = store::Store::memory();
1961        test_replica_delete_edge_cases(store).await
1962    }
1963
1964    #[tokio::test]
1965    #[cfg(feature = "fs-store")]
1966    async fn test_replica_delete_edge_cases_fs() -> Result<()> {
1967        let dbfile = tempfile::NamedTempFile::new()?;
1968        let store = store::fs::Store::persistent(dbfile.path())?;
1969        test_replica_delete_edge_cases(store).await
1970    }
1971
1972    async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
1973        let mut rng = rand::rng();
1974        let author = Author::new(&mut rng);
1975        let namespace = NamespaceSecret::new(&mut rng);
1976
1977        let edgecases = [0u8, 1u8, 255u8];
1978        let prefixes = [0u8, 255u8];
1979        let hash = Hash::new(b"foo");
1980        let len = 3;
1981        for prefix in prefixes {
1982            let mut expected = vec![];
1983            let mut replica = store.new_replica(namespace.clone())?;
1984            for suffix in edgecases {
1985                let key = [prefix, suffix].to_vec();
1986                expected.push(key.clone());
1987                replica.insert(&key, &author, hash, len).await?;
1988            }
1989            assert_keys(&mut store, namespace.id(), expected);
1990            let mut replica = store.new_replica(namespace.clone())?;
1991            replica.delete_prefix([prefix], &author).await?;
1992            assert_keys(&mut store, namespace.id(), vec![]);
1993        }
1994
1995        let mut replica = store.new_replica(namespace.clone())?;
1996        let key = vec![1u8, 0u8];
1997        replica.insert(key, &author, hash, len).await?;
1998        let key = vec![1u8, 1u8];
1999        replica.insert(key, &author, hash, len).await?;
2000        let key = vec![1u8, 2u8];
2001        replica.insert(key, &author, hash, len).await?;
2002        let prefix = vec![1u8, 1u8];
2003        replica.delete_prefix(prefix, &author).await?;
2004        assert_keys(
2005            &mut store,
2006            namespace.id(),
2007            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2008        );
2009
2010        let mut replica = store.new_replica(namespace.clone())?;
2011        let key = vec![0u8, 255u8];
2012        replica.insert(key, &author, hash, len).await?;
2013        let key = vec![0u8, 0u8];
2014        replica.insert(key, &author, hash, len).await?;
2015        let prefix = vec![0u8];
2016        replica.delete_prefix(prefix, &author).await?;
2017        assert_keys(
2018            &mut store,
2019            namespace.id(),
2020            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2021        );
2022        store.flush()?;
2023        Ok(())
2024    }
2025
2026    #[tokio::test]
2027    async fn test_latest_iter_memory() -> Result<()> {
2028        let store = store::Store::memory();
2029        test_latest_iter(store).await
2030    }
2031
2032    #[tokio::test]
2033    #[cfg(feature = "fs-store")]
2034    async fn test_latest_iter_fs() -> Result<()> {
2035        let dbfile = tempfile::NamedTempFile::new()?;
2036        let store = store::fs::Store::persistent(dbfile.path())?;
2037        test_latest_iter(store).await
2038    }
2039
2040    async fn test_latest_iter(mut store: Store) -> Result<()> {
2041        let mut rng = rand::rng();
2042        let author0 = Author::new(&mut rng);
2043        let author1 = Author::new(&mut rng);
2044        let namespace = NamespaceSecret::new(&mut rng);
2045        let mut replica = store.new_replica(namespace.clone())?;
2046
2047        replica.hash_and_insert(b"a0.1", &author0, b"hi").await?;
2048        let latest = store
2049            .get_latest_for_each_author(namespace.id())?
2050            .collect::<Result<Vec<_>>>()?;
2051        assert_eq!(latest.len(), 1);
2052        assert_eq!(latest[0].2, b"a0.1".to_vec());
2053
2054        let mut replica = store.new_replica(namespace.clone())?;
2055        replica.hash_and_insert(b"a1.1", &author1, b"hi").await?;
2056        replica.hash_and_insert(b"a0.2", &author0, b"hi").await?;
2057        let latest = store
2058            .get_latest_for_each_author(namespace.id())?
2059            .collect::<Result<Vec<_>>>()?;
2060        let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2061        latest_keys.sort();
2062        assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2063        store.flush()?;
2064        Ok(())
2065    }
2066
2067    #[tokio::test]
2068    async fn test_replica_byte_keys_memory() -> Result<()> {
2069        let store = store::Store::memory();
2070
2071        test_replica_byte_keys(store).await?;
2072        Ok(())
2073    }
2074
2075    #[tokio::test]
2076    #[cfg(feature = "fs-store")]
2077    async fn test_replica_byte_keys_fs() -> Result<()> {
2078        let dbfile = tempfile::NamedTempFile::new()?;
2079        let store = store::fs::Store::persistent(dbfile.path())?;
2080        test_replica_byte_keys(store).await?;
2081
2082        Ok(())
2083    }
2084
2085    async fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2086        let mut rng = rand::rng();
2087        let author = Author::new(&mut rng);
2088        let namespace = NamespaceSecret::new(&mut rng);
2089
2090        let hash = Hash::new(b"foo");
2091        let len = 3;
2092
2093        let key = vec![1u8, 0u8];
2094        let mut replica = store.new_replica(namespace.clone())?;
2095        replica.insert(key, &author, hash, len).await?;
2096        assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2097        let key = vec![1u8, 2u8];
2098        let mut replica = store.new_replica(namespace.clone())?;
2099        replica.insert(key, &author, hash, len).await?;
2100        assert_keys(
2101            &mut store,
2102            namespace.id(),
2103            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2104        );
2105
2106        let key = vec![0u8, 255u8];
2107        let mut replica = store.new_replica(namespace.clone())?;
2108        replica.insert(key, &author, hash, len).await?;
2109        assert_keys(
2110            &mut store,
2111            namespace.id(),
2112            vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2113        );
2114        store.flush()?;
2115        Ok(())
2116    }
2117
2118    #[tokio::test]
2119    async fn test_replica_capability_memory() -> Result<()> {
2120        let store = store::Store::memory();
2121        test_replica_capability(store).await
2122    }
2123
2124    #[tokio::test]
2125    #[cfg(feature = "fs-store")]
2126    async fn test_replica_capability_fs() -> Result<()> {
2127        let dbfile = tempfile::NamedTempFile::new()?;
2128        let store = store::fs::Store::persistent(dbfile.path())?;
2129        test_replica_capability(store).await
2130    }
2131
2132    #[allow(clippy::redundant_pattern_matching)]
2133    async fn test_replica_capability(mut store: Store) -> Result<()> {
2134        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2135        let author = store.new_author(&mut rng)?;
2136        let namespace = NamespaceSecret::new(&mut rng);
2137
2138        // import read capability - insert must fail
2139        let capability = Capability::Read(namespace.id());
2140        store.import_namespace(capability)?;
2141        let mut replica = store.open_replica(&namespace.id())?;
2142        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2143        assert!(matches!(res, Err(InsertError::ReadOnly)));
2144
2145        // import write capability - insert must succeed
2146        let capability = Capability::Write(namespace.clone());
2147        store.import_namespace(capability)?;
2148        let mut replica = store.open_replica(&namespace.id())?;
2149        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2150        assert!(matches!(res, Ok(_)));
2151        store.close_replica(namespace.id());
2152        let mut replica = store.open_replica(&namespace.id())?;
2153        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2154        assert!(res.is_ok());
2155
2156        // import read capability again - insert must still succeed
2157        let capability = Capability::Read(namespace.id());
2158        store.import_namespace(capability)?;
2159        store.close_replica(namespace.id());
2160        let mut replica = store.open_replica(&namespace.id())?;
2161        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2162        assert!(res.is_ok());
2163        store.flush()?;
2164        Ok(())
2165    }
2166
2167    #[tokio::test]
2168    async fn test_actor_capability_memory() -> Result<()> {
2169        let store = store::Store::memory();
2170        test_actor_capability(store).await
2171    }
2172
2173    #[tokio::test]
2174    #[cfg(feature = "fs-store")]
2175    async fn test_actor_capability_fs() -> Result<()> {
2176        let dbfile = tempfile::NamedTempFile::new()?;
2177        let store = store::fs::Store::persistent(dbfile.path())?;
2178        test_actor_capability(store).await
2179    }
2180
2181    async fn test_actor_capability(store: Store) -> Result<()> {
2182        // test with actor
2183        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2184        let author = Author::new(&mut rng);
2185        let handle = SyncHandle::spawn(store, None, "test".into());
2186        let author = handle.import_author(author).await?;
2187        let namespace = NamespaceSecret::new(&mut rng);
2188        let id = namespace.id();
2189
2190        // import read capability - insert must fail
2191        let capability = Capability::Read(namespace.id());
2192        handle.import_namespace(capability).await?;
2193        handle.open(namespace.id(), Default::default()).await?;
2194        let res = handle
2195            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2196            .await;
2197        assert!(res.is_err());
2198
2199        // import write capability - insert must succeed
2200        let capability = Capability::Write(namespace.clone());
2201        handle.import_namespace(capability).await?;
2202        let res = handle
2203            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2204            .await;
2205        assert!(res.is_ok());
2206
2207        // close and reopen - must still succeed
2208        handle.close(namespace.id()).await?;
2209        let res = handle
2210            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2211            .await;
2212        assert!(res.is_err());
2213        handle.open(namespace.id(), Default::default()).await?;
2214        let res = handle
2215            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2216            .await;
2217        assert!(res.is_ok());
2218        Ok(())
2219    }
2220
2221    fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2222        let mut res = vec![];
2223        while let Ok(ev) = events.try_recv() {
2224            res.push(ev);
2225        }
2226        res
2227    }
2228
2229    /// This tests that no events are emitted for entries received during sync which are obsolete
2230    /// (too old) by the time they are actually inserted in the store.
2231    #[tokio::test]
2232    async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2233        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2234        let mut store1 = store::Store::memory();
2235        let mut store2 = store::Store::memory();
2236        let peer1 = [1u8; 32];
2237        let peer2 = [2u8; 32];
2238        let mut state1 = SyncOutcome::default();
2239        let mut state2 = SyncOutcome::default();
2240
2241        let author = Author::new(&mut rng);
2242        let namespace = NamespaceSecret::new(&mut rng);
2243        let mut replica1 = store1.new_replica(namespace.clone())?;
2244        let mut replica2 = store2.new_replica(namespace.clone())?;
2245
2246        let (events1_sender, events1) = async_channel::bounded(32);
2247        let (events2_sender, events2) = async_channel::bounded(32);
2248
2249        replica1.info.subscribe(events1_sender);
2250        replica2.info.subscribe(events2_sender);
2251
2252        replica1.hash_and_insert(b"foo", &author, b"init").await?;
2253
2254        let from1 = replica1.sync_initial_message()?;
2255        let from2 = replica2
2256            .sync_process_message(from1, peer1, &mut state2)
2257            .await
2258            .unwrap()
2259            .unwrap();
2260        let from1 = replica1
2261            .sync_process_message(from2, peer2, &mut state1)
2262            .await
2263            .unwrap()
2264            .unwrap();
2265        // now we will receive the entry from rpelica1. we will insert a newer entry now, while the
2266        // sync is already running. this means the entry from replica1 will be rejected. we make
2267        // sure that no InsertRemote event is emitted for this entry.
2268        replica2.hash_and_insert(b"foo", &author, b"update").await?;
2269        let from2 = replica2
2270            .sync_process_message(from1, peer1, &mut state2)
2271            .await
2272            .unwrap();
2273        assert!(from2.is_none());
2274        let events1 = drain(events1);
2275        let events2 = drain(events2);
2276        assert_eq!(events1.len(), 1);
2277        assert_eq!(events2.len(), 1);
2278        assert!(matches!(events1[0], Event::LocalInsert { .. }));
2279        assert!(matches!(events2[0], Event::LocalInsert { .. }));
2280        assert_eq!(state1.num_sent, 1);
2281        assert_eq!(state1.num_recv, 0);
2282        assert_eq!(state2.num_sent, 0);
2283        assert_eq!(state2.num_recv, 1);
2284        store1.flush()?;
2285        store2.flush()?;
2286        Ok(())
2287    }
2288
2289    #[tokio::test]
2290    async fn test_replica_queries_mem() -> Result<()> {
2291        let store = store::Store::memory();
2292
2293        test_replica_queries(store).await?;
2294        Ok(())
2295    }
2296
2297    #[tokio::test]
2298    #[cfg(feature = "fs-store")]
2299    async fn test_replica_queries_fs() -> Result<()> {
2300        let dbfile = tempfile::NamedTempFile::new()?;
2301        let store = store::fs::Store::persistent(dbfile.path())?;
2302        test_replica_queries(store).await?;
2303
2304        Ok(())
2305    }
2306
2307    async fn test_replica_queries(mut store: Store) -> Result<()> {
2308        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2309        let namespace = NamespaceSecret::new(&mut rng);
2310        let namespace_id = namespace.id();
2311
2312        let a1 = store.new_author(&mut rng)?;
2313        let a2 = store.new_author(&mut rng)?;
2314        let a3 = store.new_author(&mut rng)?;
2315        println!(
2316            "a1 {} a2 {} a3 {}",
2317            a1.id().fmt_short(),
2318            a2.id().fmt_short(),
2319            a3.id().fmt_short()
2320        );
2321
2322        let mut replica = store.new_replica(namespace.clone())?;
2323        replica.hash_and_insert("hi/world", &a2, "a2").await?;
2324        replica.hash_and_insert("hi/world", &a1, "a1").await?;
2325        replica.hash_and_insert("hi/moon", &a2, "a1").await?;
2326        replica.hash_and_insert("hi", &a3, "a3").await?;
2327
2328        struct QueryTester<'a> {
2329            store: &'a mut Store,
2330            namespace: NamespaceId,
2331        }
2332        impl QueryTester<'_> {
2333            fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2334                let query = query.into();
2335                let actual = self
2336                    .store
2337                    .get_many(self.namespace, query.clone())
2338                    .unwrap()
2339                    .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2340                    .collect::<Result<Vec<_>>>()
2341                    .unwrap();
2342                let expected = expected
2343                    .into_iter()
2344                    .map(|(key, author)| (key.to_string(), author.id()))
2345                    .collect::<Vec<_>>();
2346                assert_eq!(actual, expected, "query: {query:#?}")
2347            }
2348        }
2349
2350        let mut qt = QueryTester {
2351            store: &mut store,
2352            namespace: namespace_id,
2353        };
2354
2355        qt.assert(
2356            Query::all(),
2357            vec![
2358                ("hi/world", &a1),
2359                ("hi/moon", &a2),
2360                ("hi/world", &a2),
2361                ("hi", &a3),
2362            ],
2363        );
2364
2365        qt.assert(
2366            Query::single_latest_per_key(),
2367            vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2368        );
2369
2370        qt.assert(
2371            Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2372            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2373        );
2374
2375        qt.assert(
2376            Query::single_latest_per_key().key_prefix("hi/"),
2377            vec![("hi/moon", &a2), ("hi/world", &a1)],
2378        );
2379
2380        qt.assert(
2381            Query::single_latest_per_key()
2382                .key_prefix("hi/")
2383                .sort_direction(SortDirection::Desc),
2384            vec![("hi/world", &a1), ("hi/moon", &a2)],
2385        );
2386
2387        qt.assert(
2388            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2389            vec![
2390                ("hi", &a3),
2391                ("hi/moon", &a2),
2392                ("hi/world", &a1),
2393                ("hi/world", &a2),
2394            ],
2395        );
2396
2397        qt.assert(
2398            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2399            vec![
2400                ("hi/world", &a2),
2401                ("hi/world", &a1),
2402                ("hi/moon", &a2),
2403                ("hi", &a3),
2404            ],
2405        );
2406
2407        qt.assert(
2408            Query::all().key_prefix("hi/"),
2409            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2410        );
2411
2412        qt.assert(
2413            Query::all().key_prefix("hi/").offset(1).limit(1),
2414            vec![("hi/moon", &a2)],
2415        );
2416
2417        qt.assert(
2418            Query::all()
2419                .key_prefix("hi/")
2420                .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2421            vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2422        );
2423
2424        qt.assert(
2425            Query::all()
2426                .key_prefix("hi/")
2427                .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2428                .offset(1)
2429                .limit(1),
2430            vec![("hi/world", &a1)],
2431        );
2432
2433        qt.assert(
2434            Query::all()
2435                .key_prefix("hi/")
2436                .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2437            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2438        );
2439
2440        qt.assert(
2441            Query::all()
2442                .key_prefix("hi/")
2443                .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2444            vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2445        );
2446
2447        qt.assert(
2448            Query::all()
2449                .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2450                .limit(2)
2451                .offset(1),
2452            vec![("hi/moon", &a2), ("hi/world", &a1)],
2453        );
2454
2455        let mut replica = store.new_replica(namespace)?;
2456        replica.delete_prefix("hi/world", &a2).await?;
2457        let mut qt = QueryTester {
2458            store: &mut store,
2459            namespace: namespace_id,
2460        };
2461
2462        qt.assert(
2463            Query::all(),
2464            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2465        );
2466
2467        qt.assert(
2468            Query::all().include_empty(),
2469            vec![
2470                ("hi/world", &a1),
2471                ("hi/moon", &a2),
2472                ("hi/world", &a2),
2473                ("hi", &a3),
2474            ],
2475        );
2476        store.flush()?;
2477        Ok(())
2478    }
2479
2480    #[test]
2481    fn test_dl_policies_mem() -> Result<()> {
2482        let mut store = store::Store::memory();
2483        test_dl_policies(&mut store)
2484    }
2485
2486    #[test]
2487    #[cfg(feature = "fs-store")]
2488    fn test_dl_policies_fs() -> Result<()> {
2489        let dbfile = tempfile::NamedTempFile::new()?;
2490        let mut store = store::fs::Store::persistent(dbfile.path())?;
2491        test_dl_policies(&mut store)
2492    }
2493
2494    fn test_dl_policies(store: &mut Store) -> Result<()> {
2495        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2496        let namespace = NamespaceSecret::new(&mut rng);
2497        let id = namespace.id();
2498
2499        let filter = store::FilterKind::Exact("foo".into());
2500        let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2501        store
2502            .set_download_policy(&id, policy.clone())
2503            .expect_err("document dos not exist");
2504
2505        // now create the document
2506        store.new_replica(namespace)?;
2507
2508        store.set_download_policy(&id, policy.clone())?;
2509        let retrieved_policy = store.get_download_policy(&id)?;
2510        assert_eq!(retrieved_policy, policy);
2511        store.flush()?;
2512        Ok(())
2513    }
2514
2515    fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2516        expected.sort();
2517        assert_eq!(expected, get_keys_sorted(store, namespace));
2518    }
2519
2520    fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2521        let mut res = store
2522            .get_many(namespace, Query::all())
2523            .unwrap()
2524            .map(|e| e.map(|e| e.key().to_vec()))
2525            .collect::<Result<Vec<_>>>()
2526            .unwrap();
2527        res.sort();
2528        res
2529    }
2530
2531    fn get_entry(
2532        store: &mut Store,
2533        namespace: NamespaceId,
2534        author: AuthorId,
2535        key: &[u8],
2536    ) -> anyhow::Result<SignedEntry> {
2537        let entry = store
2538            .get_exact(namespace, author, key, true)?
2539            .ok_or_else(|| anyhow::anyhow!("not found"))?;
2540        Ok(entry)
2541    }
2542
2543    fn get_content_hash(
2544        store: &mut Store,
2545        namespace: NamespaceId,
2546        author: AuthorId,
2547        key: &[u8],
2548    ) -> anyhow::Result<Option<Hash>> {
2549        let hash = store
2550            .get_exact(namespace, author, key, false)?
2551            .map(|e| e.content_hash());
2552        Ok(hash)
2553    }
2554
2555    async fn sync<'a>(
2556        alice: &'a mut Replica<'a>,
2557        bob: &'a mut Replica<'a>,
2558    ) -> Result<(SyncOutcome, SyncOutcome)> {
2559        let alice_peer_id = [1u8; 32];
2560        let bob_peer_id = [2u8; 32];
2561        let mut alice_state = SyncOutcome::default();
2562        let mut bob_state = SyncOutcome::default();
2563        // Sync alice - bob
2564        let mut next_to_bob = Some(alice.sync_initial_message()?);
2565        let mut rounds = 0;
2566        while let Some(msg) = next_to_bob.take() {
2567            assert!(rounds < 100, "too many rounds");
2568            rounds += 1;
2569            println!("round {rounds}");
2570            if let Some(msg) = bob
2571                .sync_process_message(msg, alice_peer_id, &mut bob_state)
2572                .await?
2573            {
2574                next_to_bob = alice
2575                    .sync_process_message(msg, bob_peer_id, &mut alice_state)
2576                    .await?
2577            }
2578        }
2579        assert_eq!(alice_state.num_sent, bob_state.num_recv);
2580        assert_eq!(alice_state.num_recv, bob_state.num_sent);
2581        Ok((alice_state, bob_state))
2582    }
2583
2584    fn check_entries(
2585        store: &mut Store,
2586        namespace: &NamespaceId,
2587        author: &Author,
2588        set: &[&str],
2589    ) -> Result<()> {
2590        for el in set {
2591            store.get_exact(*namespace, author.id(), el, false)?;
2592        }
2593        Ok(())
2594    }
2595}