iroh_docs/
sync.rs

1//! API for iroh-docs replicas
2
3// Names and concepts are roughly based on Willows design at the moment:
4//
5// https://hackmd.io/DTtck8QOQm6tZaQBBtTf7w
6//
7// This is going to change!
8
9use std::{
10    cmp::Ordering,
11    fmt::Debug,
12    ops::{Deref, DerefMut},
13    sync::Arc,
14};
15
16use bytes::{Bytes, BytesMut};
17use ed25519_dalek::{Signature, SignatureError};
18use iroh_blobs::Hash;
19use n0_future::{
20    time::{Duration, SystemTime},
21    IterExt,
22};
23use serde::{Deserialize, Serialize};
24
25pub use crate::heads::AuthorHeads;
26use crate::{
27    keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
28    ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
29    store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
30};
31
32/// Protocol message for the set reconciliation protocol.
33///
34/// Can be serialized to bytes with [serde] to transfer between peers.
35pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
36
37/// Byte representation of a `PeerId` from `iroh-net`.
38// TODO: PeerId is in iroh-net which iroh-docs doesn't depend on. Add iroh-base crate with `PeerId`.
39pub type PeerIdBytes = [u8; 32];
40
41/// Max time in the future from our wall clock time that we accept entries for.
42/// Value is 10 minutes.
43pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
44
45/// Callback that may be set on a replica to determine the availability status for a content hash.
46pub type ContentStatusCallback =
47    Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
48
49/// Event emitted by sync when entries are added.
50#[derive(Debug, Clone)]
51pub enum Event {
52    /// A local entry has been added.
53    LocalInsert {
54        /// Document in which the entry was inserted.
55        namespace: NamespaceId,
56        /// Inserted entry.
57        entry: SignedEntry,
58    },
59    /// A remote entry has been added.
60    RemoteInsert {
61        /// Document in which the entry was inserted.
62        namespace: NamespaceId,
63        /// Inserted entry.
64        entry: SignedEntry,
65        /// Peer that provided the inserted entry.
66        from: PeerIdBytes,
67        /// Whether download policies require the content to be downloaded.
68        should_download: bool,
69        /// [`ContentStatus`] for this entry in the remote's replica.
70        remote_content_status: ContentStatus,
71    },
72}
73
74/// Whether an entry was inserted locally or by a remote peer.
75#[derive(Debug, Clone)]
76pub enum InsertOrigin {
77    /// The entry was inserted locally.
78    Local,
79    /// The entry was received from the remote node identified by [`PeerIdBytes`].
80    Sync {
81        /// The peer from which we received this entry.
82        from: PeerIdBytes,
83        /// Whether the peer claims to have the content blob for this entry.
84        remote_content_status: ContentStatus,
85    },
86}
87
88/// Whether the content status is available on a node.
89#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
90pub enum ContentStatus {
91    /// The content is completely available.
92    Complete,
93    /// The content is partially available.
94    Incomplete,
95    /// The content is missing.
96    Missing,
97}
98
99/// Outcome of a sync operation.
100#[derive(Debug, Clone, Default)]
101pub struct SyncOutcome {
102    /// Timestamp of the latest entry for each author in the set we received.
103    pub heads_received: AuthorHeads,
104    /// Number of entries we received.
105    pub num_recv: usize,
106    /// Number of entries we sent.
107    pub num_sent: usize,
108}
109
110fn get_as_ptr<T>(value: &T) -> Option<usize> {
111    use std::mem;
112    if mem::size_of::<T>() == std::mem::size_of::<usize>()
113        && mem::align_of::<T>() == mem::align_of::<usize>()
114    {
115        // Safe only if size and alignment requirements are met
116        unsafe { Some(mem::transmute_copy(value)) }
117    } else {
118        None
119    }
120}
121
122fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
123    get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
124}
125
126#[derive(Debug, Default)]
127struct Subscribers(Vec<async_channel::Sender<Event>>);
128impl Subscribers {
129    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
130        self.0.push(sender)
131    }
132    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
133        self.0.retain(|s| !same_channel(s, sender));
134    }
135    pub async fn send(&mut self, event: Event) {
136        self.0 = std::mem::take(&mut self.0)
137            .into_iter()
138            .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx))
139            .join_all()
140            .await
141            .into_iter()
142            .flatten()
143            .collect();
144    }
145    pub fn len(&self) -> usize {
146        self.0.len()
147    }
148    pub async fn send_with(&mut self, f: impl FnOnce() -> Event) {
149        if !self.0.is_empty() {
150            self.send(f()).await
151        }
152    }
153}
154
155/// Kind of capability of the namespace.
156#[derive(
157    Debug,
158    Clone,
159    Copy,
160    Serialize,
161    Deserialize,
162    num_enum::IntoPrimitive,
163    num_enum::TryFromPrimitive,
164    strum::Display,
165)]
166#[repr(u8)]
167#[strum(serialize_all = "snake_case")]
168pub enum CapabilityKind {
169    /// A writable replica.
170    Write = 1,
171    /// A readable replica.
172    Read = 2,
173}
174
175/// The capability of the namespace.
176#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
177pub enum Capability {
178    /// Write access to the namespace.
179    Write(NamespaceSecret),
180    /// Read only access to the namespace.
181    Read(NamespaceId),
182}
183
184impl Capability {
185    /// Get the [`NamespaceId`] for this [`Capability`].
186    pub fn id(&self) -> NamespaceId {
187        match self {
188            Capability::Write(secret) => secret.id(),
189            Capability::Read(id) => *id,
190        }
191    }
192
193    /// Get the [`NamespaceSecret`] of this [`Capability`].
194    /// Will fail if the [`Capability`] is read only.
195    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
196        match self {
197            Capability::Write(secret) => Ok(secret),
198            Capability::Read(_) => Err(ReadOnly),
199        }
200    }
201
202    /// Get the kind of capability.
203    pub fn kind(&self) -> CapabilityKind {
204        match self {
205            Capability::Write(_) => CapabilityKind::Write,
206            Capability::Read(_) => CapabilityKind::Read,
207        }
208    }
209
210    /// Get the raw representation of this namespace capability.
211    pub fn raw(&self) -> (u8, [u8; 32]) {
212        let capability_repr: u8 = self.kind().into();
213        let bytes = match self {
214            Capability::Write(secret) => secret.to_bytes(),
215            Capability::Read(id) => id.to_bytes(),
216        };
217        (capability_repr, bytes)
218    }
219
220    /// Create a [`Capability`] from its raw representation.
221    pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
222        let kind: CapabilityKind = kind.try_into()?;
223        let capability = match kind {
224            CapabilityKind::Write => {
225                let secret = NamespaceSecret::from_bytes(bytes);
226                Capability::Write(secret)
227            }
228            CapabilityKind::Read => {
229                let id = NamespaceId::from(bytes);
230                Capability::Read(id)
231            }
232        };
233        Ok(capability)
234    }
235
236    /// Merge this capability with another capability.
237    ///
238    /// Will return an error if `other` is not a capability for the same namespace.
239    ///
240    /// Returns `true` if the capability was changed, `false` otherwise.
241    pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
242        if other.id() != self.id() {
243            return Err(CapabilityError::NamespaceMismatch);
244        }
245
246        // the only capability upgrade is from read-only (self) to writable (other)
247        if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
248            let _ = std::mem::replace(self, other);
249            Ok(true)
250        } else {
251            Ok(false)
252        }
253    }
254}
255
256/// Errors for capability operations
257#[derive(Debug, thiserror::Error)]
258pub enum CapabilityError {
259    /// Namespaces are not the same
260    #[error("Namespaces are not the same")]
261    NamespaceMismatch,
262}
263
264/// In memory information about an open replica.
265#[derive(derive_more::Debug)]
266pub struct ReplicaInfo {
267    pub(crate) capability: Capability,
268    subscribers: Subscribers,
269    #[debug("ContentStatusCallback")]
270    content_status_cb: Option<ContentStatusCallback>,
271    closed: bool,
272}
273
274impl ReplicaInfo {
275    /// Create a new replica.
276    pub fn new(capability: Capability) -> Self {
277        Self {
278            capability,
279            subscribers: Default::default(),
280            // on_insert_sender: RwLock::new(None),
281            content_status_cb: None,
282            closed: false,
283        }
284    }
285
286    /// Subscribe to insert events.
287    ///
288    /// When subscribing to a replica, you must ensure that the corresponding [`async_channel::Receiver`] is
289    /// received from in a loop. If not receiving, local and remote inserts will hang waiting for
290    /// the receiver to be received from.
291    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
292        self.subscribers.subscribe(sender)
293    }
294
295    /// Explicitly unsubscribe a sender.
296    ///
297    /// Simply dropping the receiver is fine too. If you cloned a single sender to subscribe to
298    /// multiple replicas, you can use this method to explicitly unsubscribe the sender from
299    /// this replica without having to drop the receiver.
300    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
301        self.subscribers.unsubscribe(sender)
302    }
303
304    /// Get the number of current event subscribers.
305    pub fn subscribers_count(&self) -> usize {
306        self.subscribers.len()
307    }
308
309    /// Set the content status callback.
310    ///
311    /// Only one callback can be active at a time. If a previous callback was registered, this
312    /// will return `false`.
313    pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
314        if self.content_status_cb.is_some() {
315            false
316        } else {
317            self.content_status_cb = Some(cb);
318            true
319        }
320    }
321
322    fn ensure_open(&self) -> Result<(), InsertError> {
323        if self.closed() {
324            Err(InsertError::Closed)
325        } else {
326            Ok(())
327        }
328    }
329
330    /// Returns true if the replica is closed.
331    ///
332    /// If a replica is closed, no further operations can be performed. A replica cannot be closed
333    /// manually, it must be closed via [`store::Store::close_replica`] or
334    /// [`store::Store::remove_replica`]
335    pub fn closed(&self) -> bool {
336        self.closed
337    }
338
339    /// Merge a capability.
340    ///
341    /// The capability must refer to the the same namespace, otherwise an error will be returned.
342    ///
343    /// This will upgrade the replica's capability when passing a `Capability::Write`.
344    /// It is a no-op if `capability` is a Capability::Read`.
345    pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
346        self.capability.merge(capability)
347    }
348}
349
350/// Local representation of a mutable, synchronizable key-value store.
351#[derive(derive_more::Debug)]
352pub struct Replica<'a, I = Box<ReplicaInfo>> {
353    pub(crate) store: StoreInstance<'a>,
354    pub(crate) info: I,
355}
356
357impl<'a, I> Replica<'a, I>
358where
359    I: Deref<Target = ReplicaInfo> + DerefMut,
360{
361    /// Create a new replica.
362    pub fn new(store: StoreInstance<'a>, info: I) -> Self {
363        Replica { info, store }
364    }
365
366    /// Insert a new record at the given key.
367    ///
368    /// The entry will by signed by the provided `author`.
369    /// The `len` must be the byte length of the data identified by `hash`.
370    ///
371    /// Returns the number of entries removed as a consequence of this insertion,
372    /// or an error either if the entry failed to validate or if a store operation failed.
373    pub async fn insert(
374        &mut self,
375        key: impl AsRef<[u8]>,
376        author: &Author,
377        hash: Hash,
378        len: u64,
379    ) -> Result<usize, InsertError> {
380        if len == 0 || hash == Hash::EMPTY {
381            return Err(InsertError::EntryIsEmpty);
382        }
383        self.info.ensure_open()?;
384        let id = RecordIdentifier::new(self.id(), author.id(), key);
385        let record = Record::new_current(hash, len);
386        let entry = Entry::new(id, record);
387        let secret = self.secret_key()?;
388        let signed_entry = entry.sign(secret, author);
389        self.insert_entry(signed_entry, InsertOrigin::Local).await
390    }
391
392    /// Delete entries that match the given `author` and key `prefix`.
393    ///
394    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
395    /// entries whose key starts with or is equal to the given `prefix`.
396    ///
397    /// Returns the number of entries deleted.
398    pub async fn delete_prefix(
399        &mut self,
400        prefix: impl AsRef<[u8]>,
401        author: &Author,
402    ) -> Result<usize, InsertError> {
403        self.info.ensure_open()?;
404        let id = RecordIdentifier::new(self.id(), author.id(), prefix);
405        let entry = Entry::new_empty(id);
406        let signed_entry = entry.sign(self.secret_key()?, author);
407        self.insert_entry(signed_entry, InsertOrigin::Local).await
408    }
409
410    /// Insert an entry into this replica which was received from a remote peer.
411    ///
412    /// This will verify both the namespace and author signatures of the entry, emit an `on_insert`
413    /// event, and insert the entry into the replica store.
414    ///
415    /// Returns the number of entries removed as a consequence of this insertion,
416    /// or an error if the entry failed to validate or if a store operation failed.
417    pub async fn insert_remote_entry(
418        &mut self,
419        entry: SignedEntry,
420        received_from: PeerIdBytes,
421        content_status: ContentStatus,
422    ) -> Result<usize, InsertError> {
423        self.info.ensure_open()?;
424        entry.validate_empty()?;
425        let origin = InsertOrigin::Sync {
426            from: received_from,
427            remote_content_status: content_status,
428        };
429        self.insert_entry(entry, origin).await
430    }
431
432    /// Insert a signed entry into the database.
433    ///
434    /// Returns the number of entries removed as a consequence of this insertion.
435    async fn insert_entry(
436        &mut self,
437        entry: SignedEntry,
438        origin: InsertOrigin,
439    ) -> Result<usize, InsertError> {
440        let namespace = self.id();
441
442        let store = &self.store;
443        validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
444
445        let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
446        tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
447
448        let removed_count = match outcome {
449            InsertOutcome::Inserted { removed } => removed,
450            InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
451        };
452
453        let insert_event = match origin {
454            InsertOrigin::Local => Event::LocalInsert { namespace, entry },
455            InsertOrigin::Sync {
456                from,
457                remote_content_status,
458            } => {
459                let download_policy = self
460                    .store
461                    .get_download_policy(&self.id())
462                    .unwrap_or_default();
463                let should_download = download_policy.matches(entry.entry());
464                Event::RemoteInsert {
465                    namespace,
466                    entry,
467                    from,
468                    should_download,
469                    remote_content_status,
470                }
471            }
472        };
473
474        self.info.subscribers.send(insert_event).await;
475
476        Ok(removed_count)
477    }
478
479    /// Hashes the given data and inserts it.
480    ///
481    /// This does not store the content, just the record of it.
482    /// Returns the calculated hash.
483    pub async fn hash_and_insert(
484        &mut self,
485        key: impl AsRef<[u8]>,
486        author: &Author,
487        data: impl AsRef<[u8]>,
488    ) -> Result<Hash, InsertError> {
489        self.info.ensure_open()?;
490        let len = data.as_ref().len() as u64;
491        let hash = Hash::new(data);
492        self.insert(key, author, hash, len).await?;
493        Ok(hash)
494    }
495
496    /// Get the identifier for an entry in this replica.
497    pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
498        RecordIdentifier::new(self.info.capability.id(), author.id(), key)
499    }
500
501    /// Create the initial message for the set reconciliation flow with a remote peer.
502    pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
503        self.info.ensure_open().map_err(anyhow::Error::from)?;
504        self.store.initial_message()
505    }
506
507    /// Process a set reconciliation message from a remote peer.
508    ///
509    /// Returns the next message to be sent to the peer, if any.
510    pub async fn sync_process_message(
511        &mut self,
512        message: crate::ranger::Message<SignedEntry>,
513        from_peer: PeerIdBytes,
514        state: &mut SyncOutcome,
515    ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
516        self.info.ensure_open()?;
517        let my_namespace = self.id();
518        let now = system_time_now();
519
520        // update state with incoming data.
521        state.num_recv += message.value_count();
522        for (entry, _content_status) in message.values() {
523            state
524                .heads_received
525                .insert(entry.author(), entry.timestamp());
526        }
527
528        // let subscribers = std::rc::Rc::new(&mut self.subscribers);
529        // l
530        let cb = self.info.content_status_cb.clone();
531        let download_policy = self
532            .store
533            .get_download_policy(&my_namespace)
534            .unwrap_or_default();
535        let reply = self
536            .store
537            .process_message(
538                &Default::default(),
539                message,
540                // validate callback: validate incoming entries, and send to on_insert channel
541                |store, entry, content_status| {
542                    let origin = InsertOrigin::Sync {
543                        from: from_peer,
544                        remote_content_status: content_status,
545                    };
546                    validate_entry(now, store, my_namespace, entry, &origin).is_ok()
547                },
548                // on_insert callback: is called when an entry was actually inserted in the store
549                async |_store, entry, content_status| {
550                    // We use `send_with` to only clone the entry if we have active subscriptions.
551                    self.info
552                        .subscribers
553                        .send_with(|| {
554                            let should_download = download_policy.matches(entry.entry());
555                            Event::RemoteInsert {
556                                from: from_peer,
557                                namespace: my_namespace,
558                                entry: entry.clone(),
559                                should_download,
560                                remote_content_status: content_status,
561                            }
562                        })
563                        .await
564                },
565                // content_status callback: get content status for outgoing entries
566                async move |entry| {
567                    if let Some(cb) = cb.as_ref() {
568                        cb(entry.content_hash()).await
569                    } else {
570                        ContentStatus::Missing
571                    }
572                },
573            )
574            .await?;
575
576        // update state with outgoing data.
577        if let Some(ref reply) = reply {
578            state.num_sent += reply.value_count();
579        }
580
581        Ok(reply)
582    }
583
584    /// Get the namespace identifier for this [`Replica`].
585    pub fn id(&self) -> NamespaceId {
586        self.info.capability.id()
587    }
588
589    /// Get the [`Capability`] of this [`Replica`].
590    pub fn capability(&self) -> &Capability {
591        &self.info.capability
592    }
593
594    /// Get the byte representation of the [`NamespaceSecret`] key for this replica. Will fail if
595    /// the replica is read only
596    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
597        self.info.capability.secret_key()
598    }
599}
600
601/// Error that occurs trying to access the [`NamespaceSecret`] of a read-only [`Capability`].
602#[derive(Debug, thiserror::Error)]
603#[error("Replica allows read access only.")]
604pub struct ReadOnly;
605
606/// Validate a [`SignedEntry`] if it's fit to be inserted.
607///
608/// This validates that
609/// * the entry's author and namespace signatures are correct
610/// * the entry's namespace matches the current replica
611/// * the entry's timestamp is not more than 10 minutes in the future of our system time
612/// * the entry is newer than an existing entry for the same key and author, if such exists.
613fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
614    now: u64,
615    store: &S,
616    expected_namespace: NamespaceId,
617    entry: &SignedEntry,
618    origin: &InsertOrigin,
619) -> Result<(), ValidationFailure> {
620    // Verify the namespace
621    if entry.namespace() != expected_namespace {
622        return Err(ValidationFailure::InvalidNamespace);
623    }
624
625    // Verify signature for non-local entries.
626    if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
627        return Err(ValidationFailure::BadSignature);
628    }
629
630    // Verify that the timestamp of the entry is not too far in the future.
631    if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
632        return Err(ValidationFailure::TooFarInTheFuture);
633    }
634    Ok(())
635}
636
637/// Error emitted when inserting entries into a [`Replica`] failed
638#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
639pub enum InsertError {
640    /// Storage error
641    #[error("storage error")]
642    Store(anyhow::Error),
643    /// Validation failure
644    #[error("validation failure")]
645    Validation(#[from] ValidationFailure),
646    /// A newer entry exists for either this entry's key or a prefix of the key.
647    #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
648    NewerEntryExists,
649    /// Attempted to insert an empty entry.
650    #[error("Attempted to insert an empty entry")]
651    EntryIsEmpty,
652    /// Replica is read only.
653    #[error("Attempted to insert to read only replica")]
654    #[from(ReadOnly)]
655    ReadOnly,
656    /// The replica is closed, no operations may be performed.
657    #[error("replica is closed")]
658    Closed,
659}
660
661/// Reason why entry validation failed
662#[derive(thiserror::Error, Debug)]
663pub enum ValidationFailure {
664    /// Entry namespace does not match the current replica.
665    #[error("Entry namespace does not match the current replica")]
666    InvalidNamespace,
667    /// Entry signature is invalid.
668    #[error("Entry signature is invalid")]
669    BadSignature,
670    /// Entry timestamp is too far in the future.
671    #[error("Entry timestamp is too far in the future.")]
672    TooFarInTheFuture,
673    /// Entry has length 0 but not the empty hash, or the empty hash but not length 0.
674    #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
675    InvalidEmptyEntry,
676}
677
678/// A signed entry.
679#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
680pub struct SignedEntry {
681    signature: EntrySignature,
682    entry: Entry,
683}
684
685impl From<SignedEntry> for Entry {
686    fn from(value: SignedEntry) -> Self {
687        value.entry
688    }
689}
690
691impl PartialOrd for SignedEntry {
692    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
693        Some(self.cmp(other))
694    }
695}
696
697impl Ord for SignedEntry {
698    fn cmp(&self, other: &Self) -> Ordering {
699        self.entry.cmp(&other.entry)
700    }
701}
702
703impl PartialOrd for Entry {
704    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
705        Some(self.cmp(other))
706    }
707}
708
709impl Ord for Entry {
710    fn cmp(&self, other: &Self) -> Ordering {
711        self.id
712            .cmp(&other.id)
713            .then_with(|| self.record.cmp(&other.record))
714    }
715}
716
717impl SignedEntry {
718    pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
719        SignedEntry { signature, entry }
720    }
721
722    /// Create a new signed entry by signing an entry with the `namespace` and `author`.
723    pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
724        let signature = EntrySignature::from_entry(&entry, namespace, author);
725        SignedEntry { signature, entry }
726    }
727
728    /// Create a new signed entries from its parts.
729    pub fn from_parts(
730        namespace: &NamespaceSecret,
731        author: &Author,
732        key: impl AsRef<[u8]>,
733        record: Record,
734    ) -> Self {
735        let id = RecordIdentifier::new(namespace.id(), author.id(), key);
736        let entry = Entry::new(id, record);
737        Self::from_entry(entry, namespace, author)
738    }
739
740    /// Verify the signatures on this entry.
741    pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
742        self.signature.verify(
743            &self.entry,
744            &self.entry.namespace().public_key(store)?,
745            &self.entry.author().public_key(store)?,
746        )
747    }
748
749    /// Get the signature.
750    pub fn signature(&self) -> &EntrySignature {
751        &self.signature
752    }
753
754    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
755    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
756        self.entry().validate_empty()
757    }
758
759    /// Get the [`Entry`].
760    pub fn entry(&self) -> &Entry {
761        &self.entry
762    }
763
764    /// Get the content [`struct@Hash`] of the entry.
765    pub fn content_hash(&self) -> Hash {
766        self.entry().content_hash()
767    }
768
769    /// Get the content length of the entry.
770    pub fn content_len(&self) -> u64 {
771        self.entry().content_len()
772    }
773
774    /// Get the author bytes of this entry.
775    pub fn author_bytes(&self) -> AuthorId {
776        self.entry().id().author()
777    }
778
779    /// Get the key of the entry.
780    pub fn key(&self) -> &[u8] {
781        self.entry().id().key()
782    }
783
784    /// Get the timestamp of the entry.
785    pub fn timestamp(&self) -> u64 {
786        self.entry().timestamp()
787    }
788}
789
790impl RangeEntry for SignedEntry {
791    type Key = RecordIdentifier;
792    type Value = Record;
793
794    fn key(&self) -> &Self::Key {
795        &self.entry.id
796    }
797
798    fn value(&self) -> &Self::Value {
799        &self.entry.record
800    }
801
802    fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
803        let mut hasher = blake3::Hasher::new();
804        hasher.update(self.namespace().as_ref());
805        hasher.update(self.author_bytes().as_ref());
806        hasher.update(self.key());
807        hasher.update(&self.timestamp().to_be_bytes());
808        hasher.update(self.content_hash().as_bytes());
809        Fingerprint(hasher.finalize().into())
810    }
811}
812
813/// Signature over an entry.
814#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
815pub struct EntrySignature {
816    author_signature: Signature,
817    namespace_signature: Signature,
818}
819
820impl Debug for EntrySignature {
821    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822        f.debug_struct("EntrySignature")
823            .field(
824                "namespace_signature",
825                &hex::encode(self.namespace_signature.to_bytes()),
826            )
827            .field(
828                "author_signature",
829                &hex::encode(self.author_signature.to_bytes()),
830            )
831            .finish()
832    }
833}
834
835impl EntrySignature {
836    /// Create a new signature by signing an entry with the `namespace` and `author`.
837    pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
838        // TODO: this should probably include a namespace prefix
839        // namespace in the cryptographic sense.
840        let bytes = entry.to_vec();
841        let namespace_signature = namespace.sign(&bytes);
842        let author_signature = author.sign(&bytes);
843
844        EntrySignature {
845            author_signature,
846            namespace_signature,
847        }
848    }
849
850    /// Verify that this signature was created by signing the `entry` with the
851    /// secret keys of the specified `author` and `namespace`.
852    pub fn verify(
853        &self,
854        entry: &Entry,
855        namespace: &NamespacePublicKey,
856        author: &AuthorPublicKey,
857    ) -> Result<(), SignatureError> {
858        let bytes = entry.to_vec();
859        namespace.verify(&bytes, &self.namespace_signature)?;
860        author.verify(&bytes, &self.author_signature)?;
861
862        Ok(())
863    }
864
865    pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
866        let namespace_signature = Signature::from_bytes(namespace_sig);
867        let author_signature = Signature::from_bytes(author_sig);
868
869        EntrySignature {
870            author_signature,
871            namespace_signature,
872        }
873    }
874
875    pub(crate) fn author(&self) -> &Signature {
876        &self.author_signature
877    }
878
879    pub(crate) fn namespace(&self) -> &Signature {
880        &self.namespace_signature
881    }
882}
883
884/// A single entry in a [`Replica`]
885///
886/// An entry is identified by a key, its [`Author`], and the [`Replica`]'s
887/// [`NamespaceSecret`]. Its value is the [32-byte BLAKE3 hash](iroh_blobs::Hash)
888/// of the entry's content data, the size of this content data, and a timestamp.
889#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
890pub struct Entry {
891    id: RecordIdentifier,
892    record: Record,
893}
894
895impl Entry {
896    /// Create a new entry
897    pub fn new(id: RecordIdentifier, record: Record) -> Self {
898        Entry { id, record }
899    }
900
901    /// Create a new empty entry with the current timestamp.
902    pub fn new_empty(id: RecordIdentifier) -> Self {
903        Entry {
904            id,
905            record: Record::empty_current(),
906        }
907    }
908
909    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
910    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
911        match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
912            (true, true) => Ok(()),
913            (false, false) => Ok(()),
914            (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
915            (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
916        }
917    }
918
919    /// Get the [`RecordIdentifier`] for this entry.
920    pub fn id(&self) -> &RecordIdentifier {
921        &self.id
922    }
923
924    /// Get the [`NamespaceId`] of this entry.
925    pub fn namespace(&self) -> NamespaceId {
926        self.id.namespace()
927    }
928
929    /// Get the [`AuthorId`] of this entry.
930    pub fn author(&self) -> AuthorId {
931        self.id.author()
932    }
933
934    /// Get the key of this entry.
935    pub fn key(&self) -> &[u8] {
936        self.id.key()
937    }
938
939    /// Get the [`Record`] contained in this entry.
940    pub fn record(&self) -> &Record {
941        &self.record
942    }
943
944    /// Get the content hash of the record.
945    pub fn content_hash(&self) -> Hash {
946        self.record.hash
947    }
948
949    /// Get the content length of the record.
950    pub fn content_len(&self) -> u64 {
951        self.record.len
952    }
953
954    /// Get the timestamp of the record.
955    pub fn timestamp(&self) -> u64 {
956        self.record.timestamp
957    }
958
959    /// Serialize this entry into its canonical byte representation used for signing.
960    pub fn encode(&self, out: &mut Vec<u8>) {
961        self.id.encode(out);
962        self.record.encode(out);
963    }
964
965    /// Serialize this entry into a new vector with its canonical byte representation.
966    pub fn to_vec(&self) -> Vec<u8> {
967        let mut out = Vec::new();
968        self.encode(&mut out);
969        out
970    }
971
972    /// Sign this entry with a [`NamespaceSecret`] and [`Author`].
973    pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
974        SignedEntry::from_entry(self, namespace, author)
975    }
976}
977
978const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
979const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
980const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
981
982/// The identifier of a record.
983#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
984pub struct RecordIdentifier(Bytes);
985
986impl Default for RecordIdentifier {
987    fn default() -> Self {
988        Self::new(NamespaceId::default(), AuthorId::default(), b"")
989    }
990}
991
992impl Debug for RecordIdentifier {
993    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
994        f.debug_struct("RecordIdentifier")
995            .field("namespace", &self.namespace())
996            .field("author", &self.author())
997            .field("key", &std::string::String::from_utf8_lossy(self.key()))
998            .finish()
999    }
1000}
1001
1002impl RangeKey for RecordIdentifier {
1003    #[cfg(test)]
1004    fn is_prefix_of(&self, other: &Self) -> bool {
1005        other.as_ref().starts_with(self.as_ref())
1006    }
1007}
1008
1009fn system_time_now() -> u64 {
1010    SystemTime::now()
1011        .duration_since(SystemTime::UNIX_EPOCH)
1012        .expect("time drift")
1013        .as_micros() as u64
1014}
1015
1016impl RecordIdentifier {
1017    /// Create a new [`RecordIdentifier`].
1018    pub fn new(
1019        namespace: impl Into<NamespaceId>,
1020        author: impl Into<AuthorId>,
1021        key: impl AsRef<[u8]>,
1022    ) -> Self {
1023        let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1024        bytes.extend_from_slice(namespace.into().as_bytes());
1025        bytes.extend_from_slice(author.into().as_bytes());
1026        bytes.extend_from_slice(key.as_ref());
1027        Self(bytes.freeze())
1028    }
1029
1030    /// Serialize this [`RecordIdentifier`] into a mutable byte array.
1031    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1032        out.extend_from_slice(&self.0);
1033    }
1034
1035    /// Get this [`RecordIdentifier`] as [Bytes].
1036    pub fn as_bytes(&self) -> Bytes {
1037        self.0.clone()
1038    }
1039
1040    /// Get this [`RecordIdentifier`] as a tuple of byte slices.
1041    pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1042        (
1043            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1044            self.0[AUTHOR_BYTES].try_into().unwrap(),
1045            &self.0[KEY_BYTES],
1046        )
1047    }
1048
1049    /// Get this [`RecordIdentifier`] as a tuple of bytes.
1050    pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1051        (
1052            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1053            self.0[AUTHOR_BYTES].try_into().unwrap(),
1054            self.0.slice(KEY_BYTES),
1055        )
1056    }
1057
1058    /// Get the key of this record.
1059    pub fn key(&self) -> &[u8] {
1060        &self.0[KEY_BYTES]
1061    }
1062
1063    /// Get the key of this record as [`Bytes`].
1064    pub fn key_bytes(&self) -> Bytes {
1065        self.0.slice(KEY_BYTES)
1066    }
1067
1068    /// Get the [`NamespaceId`] of this record as byte array.
1069    pub fn namespace(&self) -> NamespaceId {
1070        let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1071        value.into()
1072    }
1073
1074    /// Get the [`AuthorId`] of this record as byte array.
1075    pub fn author(&self) -> AuthorId {
1076        let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1077        value.into()
1078    }
1079}
1080
1081impl AsRef<[u8]> for RecordIdentifier {
1082    fn as_ref(&self) -> &[u8] {
1083        &self.0
1084    }
1085}
1086
1087impl Deref for SignedEntry {
1088    type Target = Entry;
1089    fn deref(&self) -> &Self::Target {
1090        &self.entry
1091    }
1092}
1093
1094impl Deref for Entry {
1095    type Target = Record;
1096    fn deref(&self) -> &Self::Target {
1097        &self.record
1098    }
1099}
1100
1101/// The data part of an entry in a [`Replica`].
1102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1103pub struct Record {
1104    /// Length of the data referenced by `hash`.
1105    len: u64,
1106    /// Hash of the content data.
1107    hash: Hash,
1108    /// Record creation timestamp. Counted as micros since the Unix epoch.
1109    timestamp: u64,
1110}
1111
1112impl RangeValue for Record {}
1113
1114/// Ordering for entry values.
1115///
1116/// Compares first the timestamp, then the content hash.
1117impl Ord for Record {
1118    fn cmp(&self, other: &Self) -> Ordering {
1119        self.timestamp
1120            .cmp(&other.timestamp)
1121            .then_with(|| self.hash.cmp(&other.hash))
1122    }
1123}
1124
1125impl PartialOrd for Record {
1126    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1127        Some(self.cmp(other))
1128    }
1129}
1130
1131impl Record {
1132    /// Create a new record.
1133    pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1134        debug_assert!(
1135            len != 0 || hash == Hash::EMPTY,
1136            "if `len` is 0 then `hash` must be the hash of the empty byte range"
1137        );
1138        Record {
1139            hash,
1140            len,
1141            timestamp,
1142        }
1143    }
1144
1145    /// Create a tombstone record (empty content)
1146    pub fn empty(timestamp: u64) -> Self {
1147        Self::new(Hash::EMPTY, 0, timestamp)
1148    }
1149
1150    /// Create a tombstone record with the timestamp set to now.
1151    pub fn empty_current() -> Self {
1152        Self::new_current(Hash::EMPTY, 0)
1153    }
1154
1155    /// Return `true` if the entry is empty.
1156    pub fn is_empty(&self) -> bool {
1157        self.hash == Hash::EMPTY
1158    }
1159
1160    /// Create a new [`Record`] with the timestamp set to now.
1161    pub fn new_current(hash: Hash, len: u64) -> Self {
1162        let timestamp = system_time_now();
1163        Self::new(hash, len, timestamp)
1164    }
1165
1166    /// Get the length of the data addressed by this record's content hash.
1167    pub fn content_len(&self) -> u64 {
1168        self.len
1169    }
1170
1171    /// Get the [`struct@Hash`] of the content data of this record.
1172    pub fn content_hash(&self) -> Hash {
1173        self.hash
1174    }
1175
1176    /// Get the timestamp of this record.
1177    pub fn timestamp(&self) -> u64 {
1178        self.timestamp
1179    }
1180
1181    #[cfg(test)]
1182    pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1183        let len = data.as_ref().len() as u64;
1184        let hash = Hash::new(data);
1185        Self::new_current(hash, len)
1186    }
1187
1188    #[cfg(test)]
1189    pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1190        let len = data.as_ref().len() as u64;
1191        let hash = Hash::new(data);
1192        Self::new(hash, len, timestamp)
1193    }
1194
1195    /// Serialize this record into a mutable byte array.
1196    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1197        out.extend_from_slice(&self.len.to_be_bytes());
1198        out.extend_from_slice(self.hash.as_ref());
1199        out.extend_from_slice(&self.timestamp.to_be_bytes())
1200    }
1201}
1202
1203#[cfg(test)]
1204mod tests {
1205    use std::collections::HashSet;
1206
1207    use anyhow::Result;
1208    use rand::SeedableRng;
1209
1210    use super::*;
1211    use crate::{
1212        actor::SyncHandle,
1213        ranger::{Range, Store as _},
1214        store::{OpenError, Query, SortBy, SortDirection, Store},
1215    };
1216
1217    #[tokio::test]
1218    async fn test_basics_memory() -> Result<()> {
1219        let store = store::Store::memory();
1220        test_basics(store).await?;
1221
1222        Ok(())
1223    }
1224
1225    #[tokio::test]
1226    #[cfg(feature = "fs-store")]
1227    async fn test_basics_fs() -> Result<()> {
1228        let dbfile = tempfile::NamedTempFile::new()?;
1229        let store = store::fs::Store::persistent(dbfile.path())?;
1230        test_basics(store).await?;
1231        Ok(())
1232    }
1233
1234    async fn test_basics(mut store: Store) -> Result<()> {
1235        let mut rng = rand::rng();
1236        let alice = Author::new(&mut rng);
1237        let bob = Author::new(&mut rng);
1238        let myspace = NamespaceSecret::new(&mut rng);
1239
1240        let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1241        let record = Record::current_from_data(b"this is my cool data");
1242        let entry = Entry::new(record_id, record);
1243        let signed_entry = entry.sign(&myspace, &alice);
1244        signed_entry.verify(&()).expect("failed to verify");
1245
1246        let mut my_replica = store.new_replica(myspace.clone())?;
1247        for i in 0..10 {
1248            my_replica
1249                .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice"))
1250                .await?;
1251        }
1252
1253        for i in 0..10 {
1254            let res = store
1255                .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1256                .unwrap();
1257            let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1258            assert_eq!(res.entry().record().content_len(), len);
1259            res.verify(&())?;
1260        }
1261
1262        // Test multiple records for the same key
1263        let mut my_replica = store.new_replica(myspace.clone())?;
1264        my_replica
1265            .hash_and_insert("/cool/path", &alice, "round 1")
1266            .await?;
1267        let _entry = store
1268            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1269            .unwrap();
1270        // Second
1271        let mut my_replica = store.new_replica(myspace.clone())?;
1272        my_replica
1273            .hash_and_insert("/cool/path", &alice, "round 2")
1274            .await?;
1275        let _entry = store
1276            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1277            .unwrap();
1278
1279        // Get All by author
1280        let entries: Vec<_> = store
1281            .get_many(myspace.id(), Query::author(alice.id()))?
1282            .collect::<Result<_>>()?;
1283        assert_eq!(entries.len(), 11);
1284
1285        // Get All by author
1286        let entries: Vec<_> = store
1287            .get_many(myspace.id(), Query::author(bob.id()))?
1288            .collect::<Result<_>>()?;
1289        assert_eq!(entries.len(), 0);
1290
1291        // Get All by key
1292        let entries: Vec<_> = store
1293            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1294            .collect::<Result<_>>()?;
1295        assert_eq!(entries.len(), 1);
1296
1297        // Get All
1298        let entries: Vec<_> = store
1299            .get_many(myspace.id(), Query::all())?
1300            .collect::<Result<_>>()?;
1301        assert_eq!(entries.len(), 11);
1302
1303        // insert record from different author
1304        let mut my_replica = store.new_replica(myspace.clone())?;
1305        let _entry = my_replica
1306            .hash_and_insert("/cool/path", &bob, "bob round 1")
1307            .await?;
1308
1309        // Get All by author
1310        let entries: Vec<_> = store
1311            .get_many(myspace.id(), Query::author(alice.id()))?
1312            .collect::<Result<_>>()?;
1313        assert_eq!(entries.len(), 11);
1314
1315        let entries: Vec<_> = store
1316            .get_many(myspace.id(), Query::author(bob.id()))?
1317            .collect::<Result<_>>()?;
1318        assert_eq!(entries.len(), 1);
1319
1320        // Get All by key
1321        let entries: Vec<_> = store
1322            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1323            .collect::<Result<_>>()?;
1324        assert_eq!(entries.len(), 2);
1325
1326        // Get all by prefix
1327        let entries: Vec<_> = store
1328            .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1329            .collect::<Result<_>>()?;
1330        assert_eq!(entries.len(), 2);
1331
1332        // Get All by author and prefix
1333        let entries: Vec<_> = store
1334            .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1335            .collect::<Result<_>>()?;
1336        assert_eq!(entries.len(), 1);
1337
1338        let entries: Vec<_> = store
1339            .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1340            .collect::<Result<_>>()?;
1341        assert_eq!(entries.len(), 1);
1342
1343        // Get All
1344        let entries: Vec<_> = store
1345            .get_many(myspace.id(), Query::all())?
1346            .collect::<Result<_>>()?;
1347        assert_eq!(entries.len(), 12);
1348
1349        // Get Range of all should return all latest
1350        let mut my_replica = store.new_replica(myspace.clone())?;
1351        let entries_second: Vec<_> = my_replica
1352            .store
1353            .get_range(Range::new(
1354                RecordIdentifier::default(),
1355                RecordIdentifier::default(),
1356            ))?
1357            .collect::<Result<_, _>>()?;
1358
1359        assert_eq!(entries_second.len(), 12);
1360        assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1361
1362        test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1363        store.flush()?;
1364        Ok(())
1365    }
1366
1367    /// Test that [`Store::register_useful_peer`] behaves like a LRUCache of size
1368    /// [`super::store::PEERS_PER_DOC_CACHE_SIZE`].
1369    fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1370        /// Helper to verify the store returns the expected peers for the namespace.
1371        #[track_caller]
1372        fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1373            assert_eq!(
1374                expected_peers,
1375                &store
1376                    .get_sync_peers(&namespace)
1377                    .unwrap()
1378                    .unwrap()
1379                    .collect::<Vec<_>>(),
1380                "sync peers differ"
1381            );
1382        }
1383
1384        let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1385        // expected peers: newest peers are to the front, oldest to the back
1386        let mut expected_peers = Vec::with_capacity(count);
1387        for i in 0..count as u8 {
1388            let peer = [i; 32];
1389            expected_peers.insert(0, peer);
1390            store.register_useful_peer(namespace, peer)?;
1391        }
1392        verify_peers(store, namespace, &expected_peers);
1393
1394        // one more peer should evict the last peer
1395        expected_peers.pop();
1396        let newer_peer = [count as u8; 32];
1397        expected_peers.insert(0, newer_peer);
1398        store.register_useful_peer(namespace, newer_peer)?;
1399        verify_peers(store, namespace, &expected_peers);
1400
1401        // move one existing peer up
1402        let refreshed_peer = expected_peers.remove(2);
1403        expected_peers.insert(0, refreshed_peer);
1404        store.register_useful_peer(namespace, refreshed_peer)?;
1405        verify_peers(store, namespace, &expected_peers);
1406        Ok(())
1407    }
1408
1409    #[tokio::test]
1410    async fn test_content_hashes_iterator_memory() -> Result<()> {
1411        let store = store::Store::memory();
1412        test_content_hashes_iterator(store).await
1413    }
1414
1415    #[tokio::test]
1416    #[cfg(feature = "fs-store")]
1417    async fn test_content_hashes_iterator_fs() -> Result<()> {
1418        let dbfile = tempfile::NamedTempFile::new()?;
1419        let store = store::fs::Store::persistent(dbfile.path())?;
1420        test_content_hashes_iterator(store).await
1421    }
1422
1423    async fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1424        let mut rng = rand::rng();
1425        let mut expected = HashSet::new();
1426        let n_replicas = 3;
1427        let n_entries = 4;
1428        for i in 0..n_replicas {
1429            let namespace = NamespaceSecret::new(&mut rng);
1430            let author = store.new_author(&mut rng)?;
1431            let mut replica = store.new_replica(namespace)?;
1432            for j in 0..n_entries {
1433                let key = format!("{j}");
1434                let data = format!("{i}:{j}");
1435                let hash = replica.hash_and_insert(key, &author, data).await?;
1436                expected.insert(hash);
1437            }
1438        }
1439        assert_eq!(expected.len(), n_replicas * n_entries);
1440        let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1441        assert_eq!(actual, expected);
1442        Ok(())
1443    }
1444
1445    #[test]
1446    fn test_multikey() {
1447        let mut rng = rand::rng();
1448
1449        let k = ["a", "c", "z"];
1450
1451        let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1452        n.sort_by_key(|n| n.id());
1453
1454        let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1455        a.sort_by_key(|a| a.id());
1456
1457        // Just key
1458        {
1459            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1460            let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1461            let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1462
1463            let range = Range::new(ri0.clone(), ri2.clone());
1464            assert!(range.contains(&ri0), "start");
1465            assert!(range.contains(&ri1), "inside");
1466            assert!(!range.contains(&ri2), "end");
1467
1468            assert!(ri0 < ri1);
1469            assert!(ri1 < ri2);
1470        }
1471
1472        // Just namespace
1473        {
1474            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1475            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1476            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1477
1478            let range = Range::new(ri0.clone(), ri2.clone());
1479            assert!(range.contains(&ri0), "start");
1480            assert!(range.contains(&ri1), "inside");
1481            assert!(!range.contains(&ri2), "end");
1482
1483            assert!(ri0 < ri1);
1484            assert!(ri1 < ri2);
1485        }
1486
1487        // Just author
1488        {
1489            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1490            let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1491            let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1492
1493            let range = Range::new(ri0.clone(), ri2.clone());
1494            assert!(range.contains(&ri0), "start");
1495            assert!(range.contains(&ri1), "inside");
1496            assert!(!range.contains(&ri2), "end");
1497
1498            assert!(ri0 < ri1);
1499            assert!(ri1 < ri2);
1500        }
1501
1502        // Just key and namespace
1503        {
1504            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1505            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1506            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1507
1508            let range = Range::new(ri0.clone(), ri2.clone());
1509            assert!(range.contains(&ri0), "start");
1510            assert!(range.contains(&ri1), "inside");
1511            assert!(!range.contains(&ri2), "end");
1512
1513            assert!(ri0 < ri1);
1514            assert!(ri1 < ri2);
1515        }
1516
1517        // Mixed
1518        {
1519            // Ord should prioritize namespace - author - key
1520
1521            let a0 = a[0].id();
1522            let a1 = a[1].id();
1523            let n0 = n[0].id();
1524            let n1 = n[1].id();
1525            let k0 = k[0];
1526            let k1 = k[1];
1527
1528            assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1529            assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1530            assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1531            assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1532        }
1533    }
1534
1535    #[tokio::test]
1536    async fn test_timestamps_memory() -> Result<()> {
1537        let store = store::Store::memory();
1538        test_timestamps(store).await?;
1539
1540        Ok(())
1541    }
1542
1543    #[tokio::test]
1544    #[cfg(feature = "fs-store")]
1545    async fn test_timestamps_fs() -> Result<()> {
1546        let dbfile = tempfile::NamedTempFile::new()?;
1547        let store = store::fs::Store::persistent(dbfile.path())?;
1548        test_timestamps(store).await?;
1549        Ok(())
1550    }
1551
1552    async fn test_timestamps(mut store: Store) -> Result<()> {
1553        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1554        let namespace = NamespaceSecret::new(&mut rng);
1555        let _replica = store.new_replica(namespace.clone())?;
1556        let author = store.new_author(&mut rng)?;
1557        store.close_replica(namespace.id());
1558        let mut replica = store.open_replica(&namespace.id())?;
1559
1560        let key = b"hello";
1561        let value = b"world";
1562        let entry = {
1563            let timestamp = 2;
1564            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1565            let record = Record::from_data(value, timestamp);
1566            Entry::new(id, record).sign(&namespace, &author)
1567        };
1568
1569        replica
1570            .insert_entry(entry.clone(), InsertOrigin::Local)
1571            .await
1572            .unwrap();
1573        store.close_replica(namespace.id());
1574        let res = store
1575            .get_exact(namespace.id(), author.id(), key, false)?
1576            .unwrap();
1577        assert_eq!(res, entry);
1578
1579        let entry2 = {
1580            let timestamp = 1;
1581            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1582            let record = Record::from_data(value, timestamp);
1583            Entry::new(id, record).sign(&namespace, &author)
1584        };
1585
1586        let mut replica = store.open_replica(&namespace.id())?;
1587        let res = replica.insert_entry(entry2, InsertOrigin::Local).await;
1588        store.close_replica(namespace.id());
1589        assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1590        let res = store
1591            .get_exact(namespace.id(), author.id(), key, false)?
1592            .unwrap();
1593        assert_eq!(res, entry);
1594        store.flush()?;
1595        Ok(())
1596    }
1597
1598    #[tokio::test]
1599    async fn test_replica_sync_memory() -> Result<()> {
1600        let alice_store = store::Store::memory();
1601        let bob_store = store::Store::memory();
1602
1603        test_replica_sync(alice_store, bob_store).await?;
1604        Ok(())
1605    }
1606
1607    #[tokio::test]
1608    #[cfg(feature = "fs-store")]
1609    async fn test_replica_sync_fs() -> Result<()> {
1610        let alice_dbfile = tempfile::NamedTempFile::new()?;
1611        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1612        let bob_dbfile = tempfile::NamedTempFile::new()?;
1613        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1614        test_replica_sync(alice_store, bob_store).await?;
1615
1616        Ok(())
1617    }
1618
1619    async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1620        let alice_set = ["ape", "eel", "fox", "gnu"];
1621        let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1622
1623        let mut rng = rand::rng();
1624        let author = Author::new(&mut rng);
1625        let myspace = NamespaceSecret::new(&mut rng);
1626        let mut alice = alice_store.new_replica(myspace.clone())?;
1627        for el in &alice_set {
1628            alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1629        }
1630
1631        let mut bob = bob_store.new_replica(myspace.clone())?;
1632        for el in &bob_set {
1633            bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1634        }
1635
1636        let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1637
1638        assert_eq!(alice_out.num_sent, 2);
1639        assert_eq!(bob_out.num_recv, 2);
1640        assert_eq!(alice_out.num_recv, 6);
1641        assert_eq!(bob_out.num_sent, 6);
1642
1643        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1644        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1645        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1646        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1647        alice_store.flush()?;
1648        bob_store.flush()?;
1649        Ok(())
1650    }
1651
1652    #[tokio::test]
1653    async fn test_replica_timestamp_sync_memory() -> Result<()> {
1654        let alice_store = store::Store::memory();
1655        let bob_store = store::Store::memory();
1656
1657        test_replica_timestamp_sync(alice_store, bob_store).await?;
1658        Ok(())
1659    }
1660
1661    #[tokio::test]
1662    #[cfg(feature = "fs-store")]
1663    async fn test_replica_timestamp_sync_fs() -> Result<()> {
1664        let alice_dbfile = tempfile::NamedTempFile::new()?;
1665        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1666        let bob_dbfile = tempfile::NamedTempFile::new()?;
1667        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1668        test_replica_timestamp_sync(alice_store, bob_store).await?;
1669
1670        Ok(())
1671    }
1672
1673    async fn test_replica_timestamp_sync(
1674        mut alice_store: Store,
1675        mut bob_store: Store,
1676    ) -> Result<()> {
1677        let mut rng = rand::rng();
1678        let author = Author::new(&mut rng);
1679        let namespace = NamespaceSecret::new(&mut rng);
1680        let mut alice = alice_store.new_replica(namespace.clone())?;
1681        let mut bob = bob_store.new_replica(namespace.clone())?;
1682
1683        let key = b"key";
1684        let alice_value = b"alice";
1685        let bob_value = b"bob";
1686        let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?;
1687        // system time increased - sync should overwrite
1688        let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?;
1689        sync(&mut alice, &mut bob).await?;
1690        assert_eq!(
1691            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1692            Some(bob_hash)
1693        );
1694        assert_eq!(
1695            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1696            Some(bob_hash)
1697        );
1698
1699        let mut alice = alice_store.new_replica(namespace.clone())?;
1700        let mut bob = bob_store.new_replica(namespace.clone())?;
1701
1702        let alice_value_2 = b"alice2";
1703        // system time increased - sync should overwrite
1704        let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?;
1705        let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?;
1706        sync(&mut alice, &mut bob).await?;
1707        assert_eq!(
1708            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1709            Some(alice_hash_2)
1710        );
1711        assert_eq!(
1712            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1713            Some(alice_hash_2)
1714        );
1715        alice_store.flush()?;
1716        bob_store.flush()?;
1717        Ok(())
1718    }
1719
1720    #[tokio::test]
1721    async fn test_future_timestamp() -> Result<()> {
1722        let mut rng = rand::rng();
1723        let mut store = store::Store::memory();
1724        let author = Author::new(&mut rng);
1725        let namespace = NamespaceSecret::new(&mut rng);
1726
1727        let mut replica = store.new_replica(namespace.clone())?;
1728        let key = b"hi";
1729        let t = system_time_now();
1730        let record = Record::from_data(b"1", t);
1731        let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1732        replica
1733            .insert_entry(entry0.clone(), InsertOrigin::Local)
1734            .await?;
1735
1736        assert_eq!(
1737            get_entry(&mut store, namespace.id(), author.id(), key)?,
1738            entry0
1739        );
1740
1741        let mut replica = store.new_replica(namespace.clone())?;
1742        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1743        let record = Record::from_data(b"2", t);
1744        let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1745        replica
1746            .insert_entry(entry1.clone(), InsertOrigin::Local)
1747            .await?;
1748        assert_eq!(
1749            get_entry(&mut store, namespace.id(), author.id(), key)?,
1750            entry1
1751        );
1752
1753        let mut replica = store.new_replica(namespace.clone())?;
1754        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1755        let record = Record::from_data(b"2", t);
1756        let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1757        replica
1758            .insert_entry(entry2.clone(), InsertOrigin::Local)
1759            .await?;
1760        assert_eq!(
1761            get_entry(&mut store, namespace.id(), author.id(), key)?,
1762            entry2
1763        );
1764
1765        let mut replica = store.new_replica(namespace.clone())?;
1766        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1767        let record = Record::from_data(b"2", t);
1768        let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1769        let res = replica.insert_entry(entry3, InsertOrigin::Local).await;
1770        assert!(matches!(
1771            res,
1772            Err(InsertError::Validation(
1773                ValidationFailure::TooFarInTheFuture
1774            ))
1775        ));
1776        assert_eq!(
1777            get_entry(&mut store, namespace.id(), author.id(), key)?,
1778            entry2
1779        );
1780        store.flush()?;
1781        Ok(())
1782    }
1783
1784    #[tokio::test]
1785    async fn test_insert_empty() -> Result<()> {
1786        let mut store = store::Store::memory();
1787        let mut rng = rand::rng();
1788        let alice = Author::new(&mut rng);
1789        let myspace = NamespaceSecret::new(&mut rng);
1790        let mut replica = store.new_replica(myspace.clone())?;
1791        let hash = Hash::new(b"");
1792        let res = replica.insert(b"foo", &alice, hash, 0).await;
1793        assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1794        store.flush()?;
1795        Ok(())
1796    }
1797
1798    #[tokio::test]
1799    async fn test_prefix_delete_memory() -> Result<()> {
1800        let store = store::Store::memory();
1801        test_prefix_delete(store).await?;
1802        Ok(())
1803    }
1804
1805    #[tokio::test]
1806    #[cfg(feature = "fs-store")]
1807    async fn test_prefix_delete_fs() -> Result<()> {
1808        let dbfile = tempfile::NamedTempFile::new()?;
1809        let store = store::fs::Store::persistent(dbfile.path())?;
1810        test_prefix_delete(store).await?;
1811        Ok(())
1812    }
1813
1814    async fn test_prefix_delete(mut store: Store) -> Result<()> {
1815        let mut rng = rand::rng();
1816        let alice = Author::new(&mut rng);
1817        let myspace = NamespaceSecret::new(&mut rng);
1818        let mut replica = store.new_replica(myspace.clone())?;
1819        let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?;
1820        let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?;
1821
1822        // sanity checks
1823        assert_eq!(
1824            get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1825            Some(hash1)
1826        );
1827        assert_eq!(
1828            get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1829            Some(hash2)
1830        );
1831
1832        // delete
1833        let mut replica = store.new_replica(myspace.clone())?;
1834        let deleted = replica.delete_prefix(b"foo", &alice).await?;
1835        assert_eq!(deleted, 2);
1836        assert_eq!(
1837            store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1838            None
1839        );
1840        assert_eq!(
1841            store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1842            None
1843        );
1844        assert_eq!(
1845            store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1846            None
1847        );
1848        store.flush()?;
1849        Ok(())
1850    }
1851
1852    #[tokio::test]
1853    async fn test_replica_sync_delete_memory() -> Result<()> {
1854        let alice_store = store::Store::memory();
1855        let bob_store = store::Store::memory();
1856
1857        test_replica_sync_delete(alice_store, bob_store).await
1858    }
1859
1860    #[tokio::test]
1861    #[cfg(feature = "fs-store")]
1862    async fn test_replica_sync_delete_fs() -> Result<()> {
1863        let alice_dbfile = tempfile::NamedTempFile::new()?;
1864        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1865        let bob_dbfile = tempfile::NamedTempFile::new()?;
1866        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1867        test_replica_sync_delete(alice_store, bob_store).await
1868    }
1869
1870    async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1871        let alice_set = ["foot"];
1872        let bob_set = ["fool", "foo", "fog"];
1873
1874        let mut rng = rand::rng();
1875        let author = Author::new(&mut rng);
1876        let myspace = NamespaceSecret::new(&mut rng);
1877        let mut alice = alice_store.new_replica(myspace.clone())?;
1878        for el in &alice_set {
1879            alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1880        }
1881
1882        let mut bob = bob_store.new_replica(myspace.clone())?;
1883        for el in &bob_set {
1884            bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1885        }
1886
1887        sync(&mut alice, &mut bob).await?;
1888
1889        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1890        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1891        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1892        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1893
1894        let mut alice = alice_store.new_replica(myspace.clone())?;
1895        let mut bob = bob_store.new_replica(myspace.clone())?;
1896        alice.delete_prefix("foo", &author).await?;
1897        bob.hash_and_insert("fooz", &author, "fooz".as_bytes())
1898            .await?;
1899        sync(&mut alice, &mut bob).await?;
1900        check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1901        check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1902        alice_store.flush()?;
1903        bob_store.flush()?;
1904        Ok(())
1905    }
1906
1907    #[tokio::test]
1908    async fn test_replica_remove_memory() -> Result<()> {
1909        let alice_store = store::Store::memory();
1910        test_replica_remove(alice_store).await
1911    }
1912
1913    #[tokio::test]
1914    #[cfg(feature = "fs-store")]
1915    async fn test_replica_remove_fs() -> Result<()> {
1916        let alice_dbfile = tempfile::NamedTempFile::new()?;
1917        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1918        test_replica_remove(alice_store).await
1919    }
1920
1921    async fn test_replica_remove(mut store: Store) -> Result<()> {
1922        let mut rng = rand::rng();
1923        let namespace = NamespaceSecret::new(&mut rng);
1924        let author = Author::new(&mut rng);
1925        let mut replica = store.new_replica(namespace.clone())?;
1926
1927        // insert entry
1928        let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?;
1929        let res = store
1930            .get_many(namespace.id(), Query::all())?
1931            .collect::<Vec<_>>();
1932        assert_eq!(res.len(), 1);
1933
1934        // remove replica
1935        let res = store.remove_replica(&namespace.id());
1936        // may not remove replica while still open;
1937        assert!(res.is_err());
1938        store.close_replica(namespace.id());
1939        store.remove_replica(&namespace.id())?;
1940        let res = store
1941            .get_many(namespace.id(), Query::all())?
1942            .collect::<Vec<_>>();
1943        assert_eq!(res.len(), 0);
1944
1945        // may not reopen removed replica
1946        let res = store.load_replica_info(&namespace.id());
1947        assert!(matches!(res, Err(OpenError::NotFound)));
1948
1949        // may recreate replica
1950        let mut replica = store.new_replica(namespace.clone())?;
1951        replica.insert(b"foo", &author, hash, 3).await?;
1952        let res = store
1953            .get_many(namespace.id(), Query::all())?
1954            .collect::<Vec<_>>();
1955        assert_eq!(res.len(), 1);
1956        store.flush()?;
1957        Ok(())
1958    }
1959
1960    #[tokio::test]
1961    async fn test_replica_delete_edge_cases_memory() -> Result<()> {
1962        let store = store::Store::memory();
1963        test_replica_delete_edge_cases(store).await
1964    }
1965
1966    #[tokio::test]
1967    #[cfg(feature = "fs-store")]
1968    async fn test_replica_delete_edge_cases_fs() -> Result<()> {
1969        let dbfile = tempfile::NamedTempFile::new()?;
1970        let store = store::fs::Store::persistent(dbfile.path())?;
1971        test_replica_delete_edge_cases(store).await
1972    }
1973
1974    async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
1975        let mut rng = rand::rng();
1976        let author = Author::new(&mut rng);
1977        let namespace = NamespaceSecret::new(&mut rng);
1978
1979        let edgecases = [0u8, 1u8, 255u8];
1980        let prefixes = [0u8, 255u8];
1981        let hash = Hash::new(b"foo");
1982        let len = 3;
1983        for prefix in prefixes {
1984            let mut expected = vec![];
1985            let mut replica = store.new_replica(namespace.clone())?;
1986            for suffix in edgecases {
1987                let key = [prefix, suffix].to_vec();
1988                expected.push(key.clone());
1989                replica.insert(&key, &author, hash, len).await?;
1990            }
1991            assert_keys(&mut store, namespace.id(), expected);
1992            let mut replica = store.new_replica(namespace.clone())?;
1993            replica.delete_prefix([prefix], &author).await?;
1994            assert_keys(&mut store, namespace.id(), vec![]);
1995        }
1996
1997        let mut replica = store.new_replica(namespace.clone())?;
1998        let key = vec![1u8, 0u8];
1999        replica.insert(key, &author, hash, len).await?;
2000        let key = vec![1u8, 1u8];
2001        replica.insert(key, &author, hash, len).await?;
2002        let key = vec![1u8, 2u8];
2003        replica.insert(key, &author, hash, len).await?;
2004        let prefix = vec![1u8, 1u8];
2005        replica.delete_prefix(prefix, &author).await?;
2006        assert_keys(
2007            &mut store,
2008            namespace.id(),
2009            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2010        );
2011
2012        let mut replica = store.new_replica(namespace.clone())?;
2013        let key = vec![0u8, 255u8];
2014        replica.insert(key, &author, hash, len).await?;
2015        let key = vec![0u8, 0u8];
2016        replica.insert(key, &author, hash, len).await?;
2017        let prefix = vec![0u8];
2018        replica.delete_prefix(prefix, &author).await?;
2019        assert_keys(
2020            &mut store,
2021            namespace.id(),
2022            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2023        );
2024        store.flush()?;
2025        Ok(())
2026    }
2027
2028    #[tokio::test]
2029    async fn test_latest_iter_memory() -> Result<()> {
2030        let store = store::Store::memory();
2031        test_latest_iter(store).await
2032    }
2033
2034    #[tokio::test]
2035    #[cfg(feature = "fs-store")]
2036    async fn test_latest_iter_fs() -> Result<()> {
2037        let dbfile = tempfile::NamedTempFile::new()?;
2038        let store = store::fs::Store::persistent(dbfile.path())?;
2039        test_latest_iter(store).await
2040    }
2041
2042    async fn test_latest_iter(mut store: Store) -> Result<()> {
2043        let mut rng = rand::rng();
2044        let author0 = Author::new(&mut rng);
2045        let author1 = Author::new(&mut rng);
2046        let namespace = NamespaceSecret::new(&mut rng);
2047        let mut replica = store.new_replica(namespace.clone())?;
2048
2049        replica.hash_and_insert(b"a0.1", &author0, b"hi").await?;
2050        let latest = store
2051            .get_latest_for_each_author(namespace.id())?
2052            .collect::<Result<Vec<_>>>()?;
2053        assert_eq!(latest.len(), 1);
2054        assert_eq!(latest[0].2, b"a0.1".to_vec());
2055
2056        let mut replica = store.new_replica(namespace.clone())?;
2057        replica.hash_and_insert(b"a1.1", &author1, b"hi").await?;
2058        replica.hash_and_insert(b"a0.2", &author0, b"hi").await?;
2059        let latest = store
2060            .get_latest_for_each_author(namespace.id())?
2061            .collect::<Result<Vec<_>>>()?;
2062        let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2063        latest_keys.sort();
2064        assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2065        store.flush()?;
2066        Ok(())
2067    }
2068
2069    #[tokio::test]
2070    async fn test_replica_byte_keys_memory() -> Result<()> {
2071        let store = store::Store::memory();
2072
2073        test_replica_byte_keys(store).await?;
2074        Ok(())
2075    }
2076
2077    #[tokio::test]
2078    #[cfg(feature = "fs-store")]
2079    async fn test_replica_byte_keys_fs() -> Result<()> {
2080        let dbfile = tempfile::NamedTempFile::new()?;
2081        let store = store::fs::Store::persistent(dbfile.path())?;
2082        test_replica_byte_keys(store).await?;
2083
2084        Ok(())
2085    }
2086
2087    async fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2088        let mut rng = rand::rng();
2089        let author = Author::new(&mut rng);
2090        let namespace = NamespaceSecret::new(&mut rng);
2091
2092        let hash = Hash::new(b"foo");
2093        let len = 3;
2094
2095        let key = vec![1u8, 0u8];
2096        let mut replica = store.new_replica(namespace.clone())?;
2097        replica.insert(key, &author, hash, len).await?;
2098        assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2099        let key = vec![1u8, 2u8];
2100        let mut replica = store.new_replica(namespace.clone())?;
2101        replica.insert(key, &author, hash, len).await?;
2102        assert_keys(
2103            &mut store,
2104            namespace.id(),
2105            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2106        );
2107
2108        let key = vec![0u8, 255u8];
2109        let mut replica = store.new_replica(namespace.clone())?;
2110        replica.insert(key, &author, hash, len).await?;
2111        assert_keys(
2112            &mut store,
2113            namespace.id(),
2114            vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2115        );
2116        store.flush()?;
2117        Ok(())
2118    }
2119
2120    #[tokio::test]
2121    async fn test_replica_capability_memory() -> Result<()> {
2122        let store = store::Store::memory();
2123        test_replica_capability(store).await
2124    }
2125
2126    #[tokio::test]
2127    #[cfg(feature = "fs-store")]
2128    async fn test_replica_capability_fs() -> Result<()> {
2129        let dbfile = tempfile::NamedTempFile::new()?;
2130        let store = store::fs::Store::persistent(dbfile.path())?;
2131        test_replica_capability(store).await
2132    }
2133
2134    #[allow(clippy::redundant_pattern_matching)]
2135    async fn test_replica_capability(mut store: Store) -> Result<()> {
2136        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2137        let author = store.new_author(&mut rng)?;
2138        let namespace = NamespaceSecret::new(&mut rng);
2139
2140        // import read capability - insert must fail
2141        let capability = Capability::Read(namespace.id());
2142        store.import_namespace(capability)?;
2143        let mut replica = store.open_replica(&namespace.id())?;
2144        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2145        assert!(matches!(res, Err(InsertError::ReadOnly)));
2146
2147        // import write capability - insert must succeed
2148        let capability = Capability::Write(namespace.clone());
2149        store.import_namespace(capability)?;
2150        let mut replica = store.open_replica(&namespace.id())?;
2151        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2152        assert!(matches!(res, Ok(_)));
2153        store.close_replica(namespace.id());
2154        let mut replica = store.open_replica(&namespace.id())?;
2155        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2156        assert!(res.is_ok());
2157
2158        // import read capability again - insert must still succeed
2159        let capability = Capability::Read(namespace.id());
2160        store.import_namespace(capability)?;
2161        store.close_replica(namespace.id());
2162        let mut replica = store.open_replica(&namespace.id())?;
2163        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2164        assert!(res.is_ok());
2165        store.flush()?;
2166        Ok(())
2167    }
2168
2169    #[tokio::test]
2170    async fn test_actor_capability_memory() -> Result<()> {
2171        let store = store::Store::memory();
2172        test_actor_capability(store).await
2173    }
2174
2175    #[tokio::test]
2176    #[cfg(feature = "fs-store")]
2177    async fn test_actor_capability_fs() -> Result<()> {
2178        let dbfile = tempfile::NamedTempFile::new()?;
2179        let store = store::fs::Store::persistent(dbfile.path())?;
2180        test_actor_capability(store).await
2181    }
2182
2183    async fn test_actor_capability(store: Store) -> Result<()> {
2184        // test with actor
2185        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2186        let author = Author::new(&mut rng);
2187        let handle = SyncHandle::spawn(store, None, "test".into());
2188        let author = handle.import_author(author).await?;
2189        let namespace = NamespaceSecret::new(&mut rng);
2190        let id = namespace.id();
2191
2192        // import read capability - insert must fail
2193        let capability = Capability::Read(namespace.id());
2194        handle.import_namespace(capability).await?;
2195        handle.open(namespace.id(), Default::default()).await?;
2196        let res = handle
2197            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2198            .await;
2199        assert!(res.is_err());
2200
2201        // import write capability - insert must succeed
2202        let capability = Capability::Write(namespace.clone());
2203        handle.import_namespace(capability).await?;
2204        let res = handle
2205            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2206            .await;
2207        assert!(res.is_ok());
2208
2209        // close and reopen - must still succeed
2210        handle.close(namespace.id()).await?;
2211        let res = handle
2212            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2213            .await;
2214        assert!(res.is_err());
2215        handle.open(namespace.id(), Default::default()).await?;
2216        let res = handle
2217            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2218            .await;
2219        assert!(res.is_ok());
2220        Ok(())
2221    }
2222
2223    fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2224        let mut res = vec![];
2225        while let Ok(ev) = events.try_recv() {
2226            res.push(ev);
2227        }
2228        res
2229    }
2230
2231    /// This tests that no events are emitted for entries received during sync which are obsolete
2232    /// (too old) by the time they are actually inserted in the store.
2233    #[tokio::test]
2234    async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2235        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2236        let mut store1 = store::Store::memory();
2237        let mut store2 = store::Store::memory();
2238        let peer1 = [1u8; 32];
2239        let peer2 = [2u8; 32];
2240        let mut state1 = SyncOutcome::default();
2241        let mut state2 = SyncOutcome::default();
2242
2243        let author = Author::new(&mut rng);
2244        let namespace = NamespaceSecret::new(&mut rng);
2245        let mut replica1 = store1.new_replica(namespace.clone())?;
2246        let mut replica2 = store2.new_replica(namespace.clone())?;
2247
2248        let (events1_sender, events1) = async_channel::bounded(32);
2249        let (events2_sender, events2) = async_channel::bounded(32);
2250
2251        replica1.info.subscribe(events1_sender);
2252        replica2.info.subscribe(events2_sender);
2253
2254        replica1.hash_and_insert(b"foo", &author, b"init").await?;
2255
2256        let from1 = replica1.sync_initial_message()?;
2257        let from2 = replica2
2258            .sync_process_message(from1, peer1, &mut state2)
2259            .await
2260            .unwrap()
2261            .unwrap();
2262        let from1 = replica1
2263            .sync_process_message(from2, peer2, &mut state1)
2264            .await
2265            .unwrap()
2266            .unwrap();
2267        // now we will receive the entry from rpelica1. we will insert a newer entry now, while the
2268        // sync is already running. this means the entry from replica1 will be rejected. we make
2269        // sure that no InsertRemote event is emitted for this entry.
2270        replica2.hash_and_insert(b"foo", &author, b"update").await?;
2271        let from2 = replica2
2272            .sync_process_message(from1, peer1, &mut state2)
2273            .await
2274            .unwrap();
2275        assert!(from2.is_none());
2276        let events1 = drain(events1);
2277        let events2 = drain(events2);
2278        assert_eq!(events1.len(), 1);
2279        assert_eq!(events2.len(), 1);
2280        assert!(matches!(events1[0], Event::LocalInsert { .. }));
2281        assert!(matches!(events2[0], Event::LocalInsert { .. }));
2282        assert_eq!(state1.num_sent, 1);
2283        assert_eq!(state1.num_recv, 0);
2284        assert_eq!(state2.num_sent, 0);
2285        assert_eq!(state2.num_recv, 1);
2286        store1.flush()?;
2287        store2.flush()?;
2288        Ok(())
2289    }
2290
2291    #[tokio::test]
2292    async fn test_replica_queries_mem() -> Result<()> {
2293        let store = store::Store::memory();
2294
2295        test_replica_queries(store).await?;
2296        Ok(())
2297    }
2298
2299    #[tokio::test]
2300    #[cfg(feature = "fs-store")]
2301    async fn test_replica_queries_fs() -> Result<()> {
2302        let dbfile = tempfile::NamedTempFile::new()?;
2303        let store = store::fs::Store::persistent(dbfile.path())?;
2304        test_replica_queries(store).await?;
2305
2306        Ok(())
2307    }
2308
2309    async fn test_replica_queries(mut store: Store) -> Result<()> {
2310        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2311        let namespace = NamespaceSecret::new(&mut rng);
2312        let namespace_id = namespace.id();
2313
2314        let a1 = store.new_author(&mut rng)?;
2315        let a2 = store.new_author(&mut rng)?;
2316        let a3 = store.new_author(&mut rng)?;
2317        println!(
2318            "a1 {} a2 {} a3 {}",
2319            a1.id().fmt_short(),
2320            a2.id().fmt_short(),
2321            a3.id().fmt_short()
2322        );
2323
2324        let mut replica = store.new_replica(namespace.clone())?;
2325        replica.hash_and_insert("hi/world", &a2, "a2").await?;
2326        replica.hash_and_insert("hi/world", &a1, "a1").await?;
2327        replica.hash_and_insert("hi/moon", &a2, "a1").await?;
2328        replica.hash_and_insert("hi", &a3, "a3").await?;
2329
2330        struct QueryTester<'a> {
2331            store: &'a mut Store,
2332            namespace: NamespaceId,
2333        }
2334        impl QueryTester<'_> {
2335            fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2336                let query = query.into();
2337                let actual = self
2338                    .store
2339                    .get_many(self.namespace, query.clone())
2340                    .unwrap()
2341                    .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2342                    .collect::<Result<Vec<_>>>()
2343                    .unwrap();
2344                let expected = expected
2345                    .into_iter()
2346                    .map(|(key, author)| (key.to_string(), author.id()))
2347                    .collect::<Vec<_>>();
2348                assert_eq!(actual, expected, "query: {query:#?}")
2349            }
2350        }
2351
2352        let mut qt = QueryTester {
2353            store: &mut store,
2354            namespace: namespace_id,
2355        };
2356
2357        qt.assert(
2358            Query::all(),
2359            vec![
2360                ("hi/world", &a1),
2361                ("hi/moon", &a2),
2362                ("hi/world", &a2),
2363                ("hi", &a3),
2364            ],
2365        );
2366
2367        qt.assert(
2368            Query::single_latest_per_key(),
2369            vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2370        );
2371
2372        qt.assert(
2373            Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2374            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2375        );
2376
2377        qt.assert(
2378            Query::single_latest_per_key().key_prefix("hi/"),
2379            vec![("hi/moon", &a2), ("hi/world", &a1)],
2380        );
2381
2382        qt.assert(
2383            Query::single_latest_per_key()
2384                .key_prefix("hi/")
2385                .sort_direction(SortDirection::Desc),
2386            vec![("hi/world", &a1), ("hi/moon", &a2)],
2387        );
2388
2389        qt.assert(
2390            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2391            vec![
2392                ("hi", &a3),
2393                ("hi/moon", &a2),
2394                ("hi/world", &a1),
2395                ("hi/world", &a2),
2396            ],
2397        );
2398
2399        qt.assert(
2400            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2401            vec![
2402                ("hi/world", &a2),
2403                ("hi/world", &a1),
2404                ("hi/moon", &a2),
2405                ("hi", &a3),
2406            ],
2407        );
2408
2409        qt.assert(
2410            Query::all().key_prefix("hi/"),
2411            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2412        );
2413
2414        qt.assert(
2415            Query::all().key_prefix("hi/").offset(1).limit(1),
2416            vec![("hi/moon", &a2)],
2417        );
2418
2419        qt.assert(
2420            Query::all()
2421                .key_prefix("hi/")
2422                .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2423            vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2424        );
2425
2426        qt.assert(
2427            Query::all()
2428                .key_prefix("hi/")
2429                .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2430                .offset(1)
2431                .limit(1),
2432            vec![("hi/world", &a1)],
2433        );
2434
2435        qt.assert(
2436            Query::all()
2437                .key_prefix("hi/")
2438                .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2439            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2440        );
2441
2442        qt.assert(
2443            Query::all()
2444                .key_prefix("hi/")
2445                .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2446            vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2447        );
2448
2449        qt.assert(
2450            Query::all()
2451                .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2452                .limit(2)
2453                .offset(1),
2454            vec![("hi/moon", &a2), ("hi/world", &a1)],
2455        );
2456
2457        let mut replica = store.new_replica(namespace)?;
2458        replica.delete_prefix("hi/world", &a2).await?;
2459        let mut qt = QueryTester {
2460            store: &mut store,
2461            namespace: namespace_id,
2462        };
2463
2464        qt.assert(
2465            Query::all(),
2466            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2467        );
2468
2469        qt.assert(
2470            Query::all().include_empty(),
2471            vec![
2472                ("hi/world", &a1),
2473                ("hi/moon", &a2),
2474                ("hi/world", &a2),
2475                ("hi", &a3),
2476            ],
2477        );
2478        store.flush()?;
2479        Ok(())
2480    }
2481
2482    #[test]
2483    fn test_dl_policies_mem() -> Result<()> {
2484        let mut store = store::Store::memory();
2485        test_dl_policies(&mut store)
2486    }
2487
2488    #[test]
2489    #[cfg(feature = "fs-store")]
2490    fn test_dl_policies_fs() -> Result<()> {
2491        let dbfile = tempfile::NamedTempFile::new()?;
2492        let mut store = store::fs::Store::persistent(dbfile.path())?;
2493        test_dl_policies(&mut store)
2494    }
2495
2496    fn test_dl_policies(store: &mut Store) -> Result<()> {
2497        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2498        let namespace = NamespaceSecret::new(&mut rng);
2499        let id = namespace.id();
2500
2501        let filter = store::FilterKind::Exact("foo".into());
2502        let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2503        store
2504            .set_download_policy(&id, policy.clone())
2505            .expect_err("document dos not exist");
2506
2507        // now create the document
2508        store.new_replica(namespace)?;
2509
2510        store.set_download_policy(&id, policy.clone())?;
2511        let retrieved_policy = store.get_download_policy(&id)?;
2512        assert_eq!(retrieved_policy, policy);
2513        store.flush()?;
2514        Ok(())
2515    }
2516
2517    fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2518        expected.sort();
2519        assert_eq!(expected, get_keys_sorted(store, namespace));
2520    }
2521
2522    fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2523        let mut res = store
2524            .get_many(namespace, Query::all())
2525            .unwrap()
2526            .map(|e| e.map(|e| e.key().to_vec()))
2527            .collect::<Result<Vec<_>>>()
2528            .unwrap();
2529        res.sort();
2530        res
2531    }
2532
2533    fn get_entry(
2534        store: &mut Store,
2535        namespace: NamespaceId,
2536        author: AuthorId,
2537        key: &[u8],
2538    ) -> anyhow::Result<SignedEntry> {
2539        let entry = store
2540            .get_exact(namespace, author, key, true)?
2541            .ok_or_else(|| anyhow::anyhow!("not found"))?;
2542        Ok(entry)
2543    }
2544
2545    fn get_content_hash(
2546        store: &mut Store,
2547        namespace: NamespaceId,
2548        author: AuthorId,
2549        key: &[u8],
2550    ) -> anyhow::Result<Option<Hash>> {
2551        let hash = store
2552            .get_exact(namespace, author, key, false)?
2553            .map(|e| e.content_hash());
2554        Ok(hash)
2555    }
2556
2557    async fn sync<'a>(
2558        alice: &'a mut Replica<'a>,
2559        bob: &'a mut Replica<'a>,
2560    ) -> Result<(SyncOutcome, SyncOutcome)> {
2561        let alice_peer_id = [1u8; 32];
2562        let bob_peer_id = [2u8; 32];
2563        let mut alice_state = SyncOutcome::default();
2564        let mut bob_state = SyncOutcome::default();
2565        // Sync alice - bob
2566        let mut next_to_bob = Some(alice.sync_initial_message()?);
2567        let mut rounds = 0;
2568        while let Some(msg) = next_to_bob.take() {
2569            assert!(rounds < 100, "too many rounds");
2570            rounds += 1;
2571            println!("round {rounds}");
2572            if let Some(msg) = bob
2573                .sync_process_message(msg, alice_peer_id, &mut bob_state)
2574                .await?
2575            {
2576                next_to_bob = alice
2577                    .sync_process_message(msg, bob_peer_id, &mut alice_state)
2578                    .await?
2579            }
2580        }
2581        assert_eq!(alice_state.num_sent, bob_state.num_recv);
2582        assert_eq!(alice_state.num_recv, bob_state.num_sent);
2583        Ok((alice_state, bob_state))
2584    }
2585
2586    fn check_entries(
2587        store: &mut Store,
2588        namespace: &NamespaceId,
2589        author: &Author,
2590        set: &[&str],
2591    ) -> Result<()> {
2592        for el in set {
2593            store.get_exact(*namespace, author.id(), el, false)?;
2594        }
2595        Ok(())
2596    }
2597}