iroh_docs/store/
fs.rs

1//! On disk storage for replicas.
2
3use std::{
4    cmp::Ordering,
5    collections::HashSet,
6    iter::{Chain, Flatten},
7    num::NonZeroU64,
8    ops::Bound,
9};
10
11use anyhow::{anyhow, Result};
12use ed25519_dalek::{SignatureError, VerifyingKey};
13use iroh_blobs::Hash;
14use n0_future::time::SystemTime;
15use rand::CryptoRng;
16use redb::{Database, ReadableDatabase, ReadableMultimapTable, ReadableTable};
17use tracing::warn;
18
19use super::{
20    pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21    Query,
22};
23use crate::{
24    actor::MAX_COMMIT_DELAY,
25    keys::Author,
26    ranger::{Fingerprint, Range, RangeEntry},
27    sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28    AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29    ReplicaInfo,
30};
31
32mod bounds;
33mod migrations;
34mod query;
35mod ranges;
36pub(crate) mod tables;
37
38pub use self::ranges::RecordsRange;
39use self::{
40    bounds::{ByKeyBounds, RecordsBounds},
41    query::QueryIterator,
42    ranges::RangeExt,
43    tables::{
44        LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
45        RecordsValue, Tables, TransactionAndTables,
46    },
47};
48
49/// Manages the replicas and authors for an instance.
50#[derive(Debug)]
51pub struct Store {
52    db: Database,
53    transaction: CurrentTransaction,
54    open_replicas: HashSet<NamespaceId>,
55    pubkeys: MemPublicKeyStore,
56}
57
58impl Drop for Store {
59    fn drop(&mut self) {
60        if let Err(err) = self.flush() {
61            warn!("failed to trigger final flush: {:?}", err);
62        }
63    }
64}
65
66impl AsRef<Store> for Store {
67    fn as_ref(&self) -> &Store {
68        self
69    }
70}
71
72impl AsMut<Store> for Store {
73    fn as_mut(&mut self) -> &mut Store {
74        self
75    }
76}
77
78#[derive(derive_more::Debug, Default)]
79#[allow(clippy::large_enum_variant)]
80enum CurrentTransaction {
81    #[default]
82    None,
83    Read(ReadOnlyTables),
84    Write(TransactionAndTables),
85}
86
87impl Store {
88    /// Create a new store in memory.
89    pub fn memory() -> Self {
90        Self::memory_impl().expect("failed to create memory store")
91    }
92
93    fn memory_impl() -> Result<Self> {
94        let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
95        Self::new_impl(db)
96    }
97
98    /// Create or open a store from a `path` to a database file.
99    ///
100    /// The file will be created if it does not exist, otherwise it will be opened.
101    #[cfg(feature = "fs-store")]
102    pub fn persistent(path: impl AsRef<std::path::Path>) -> Result<Self> {
103        let db = match Database::create(&path) {
104            Ok(db) => db,
105            Err(redb::DatabaseError::UpgradeRequired(v)) => {
106                return Err(anyhow!(
107                    "Opening the database failed: Upgrading from redb {v} longer supported. Use and older redb version first."
108                ));
109            }
110            Err(err) => return Err(err.into()),
111        };
112        Self::new_impl(db)
113    }
114
115    fn new_impl(db: redb::Database) -> Result<Self> {
116        // Setup all tables
117        let write_tx = db.begin_write()?;
118        let _ = Tables::new(&write_tx)?;
119        write_tx.commit()?;
120
121        // Run database migrations
122        migrations::run_migrations(&db)?;
123
124        Ok(Store {
125            db,
126            transaction: Default::default(),
127            open_replicas: Default::default(),
128            pubkeys: Default::default(),
129        })
130    }
131
132    /// Flush the current transaction, if any.
133    ///
134    /// This is the cheapest way to ensure that the data is persisted.
135    pub fn flush(&mut self) -> Result<()> {
136        if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
137            w.commit()?;
138        }
139        Ok(())
140    }
141
142    /// Get a read-only snapshot of the database.
143    ///
144    /// This has the side effect of committing any open write transaction,
145    /// so it can be used as a way to ensure that the data is persisted.
146    pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
147        let guard = &mut self.transaction;
148        let tables = match std::mem::take(guard) {
149            CurrentTransaction::None => {
150                let tx = self.db.begin_read()?;
151                ReadOnlyTables::new(tx)?
152            }
153            CurrentTransaction::Write(w) => {
154                w.commit()?;
155                let tx = self.db.begin_read()?;
156                ReadOnlyTables::new(tx)?
157            }
158            CurrentTransaction::Read(tables) => tables,
159        };
160        *guard = CurrentTransaction::Read(tables);
161        match &*guard {
162            CurrentTransaction::Read(ref tables) => Ok(tables),
163            _ => unreachable!(),
164        }
165    }
166
167    /// Get an owned read-only snapshot of the database.
168    ///
169    /// This will open a new read transaction. The read transaction won't be reused for other
170    /// reads.
171    ///
172    /// This has the side effect of committing any open write transaction,
173    /// so it can be used as a way to ensure that the data is persisted.
174    pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
175        // make sure the current transaction is committed
176        self.flush()?;
177        assert!(matches!(self.transaction, CurrentTransaction::None));
178        let tx = self.db.begin_read()?;
179        let tables = ReadOnlyTables::new(tx)?;
180        Ok(tables)
181    }
182
183    /// Get access to the tables to read from them.
184    ///
185    /// The underlying transaction is a write transaction, but with a non-mut
186    /// reference to the tables you can not write.
187    ///
188    /// There is no guarantee that this will be an independent transaction.
189    /// You just get readonly access to the current state of the database.
190    ///
191    /// As such, there is also no guarantee that the data you see is
192    /// already persisted.
193    fn tables(&mut self) -> Result<&Tables<'_>> {
194        let guard = &mut self.transaction;
195        let tables = match std::mem::take(guard) {
196            CurrentTransaction::None => {
197                let tx = self.db.begin_write()?;
198                TransactionAndTables::new(tx)?
199            }
200            CurrentTransaction::Write(w) => {
201                if w.since.elapsed() > MAX_COMMIT_DELAY {
202                    tracing::debug!("committing transaction because it's too old");
203                    w.commit()?;
204                    let tx = self.db.begin_write()?;
205                    TransactionAndTables::new(tx)?
206                } else {
207                    w
208                }
209            }
210            CurrentTransaction::Read(_) => {
211                let tx = self.db.begin_write()?;
212                TransactionAndTables::new(tx)?
213            }
214        };
215        *guard = CurrentTransaction::Write(tables);
216        match guard {
217            CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
218            _ => unreachable!(),
219        }
220    }
221
222    /// Get exclusive write access to the tables in the current transaction.
223    ///
224    /// There is no guarantee that this will be an independent transaction.
225    /// As such, there is also no guarantee that the data you see or write
226    /// will be persisted.
227    ///
228    /// To ensure that the data is persisted, acquire a snapshot of the database
229    /// or call flush.
230    fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
231        let guard = &mut self.transaction;
232        let tables = match std::mem::take(guard) {
233            CurrentTransaction::None => {
234                let tx = self.db.begin_write()?;
235                TransactionAndTables::new(tx)?
236            }
237            CurrentTransaction::Write(w) => {
238                if w.since.elapsed() > MAX_COMMIT_DELAY {
239                    tracing::debug!("committing transaction because it's too old");
240                    w.commit()?;
241                    let tx = self.db.begin_write()?;
242                    TransactionAndTables::new(tx)?
243                } else {
244                    w
245                }
246            }
247            CurrentTransaction::Read(_) => {
248                let tx = self.db.begin_write()?;
249                TransactionAndTables::new(tx)?
250            }
251        };
252        *guard = CurrentTransaction::Write(tables);
253        let res = match &mut *guard {
254            CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
255            _ => unreachable!(),
256        };
257        Ok(res)
258    }
259}
260
261type PeersIter = std::vec::IntoIter<PeerIdBytes>;
262
263impl Store {
264    /// Create a new replica for `namespace` and persist in this store.
265    pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
266        let id = namespace.id();
267        self.import_namespace(namespace.into())?;
268        self.open_replica(&id).map_err(Into::into)
269    }
270
271    /// Create a new author key and persist it in the store.
272    pub fn new_author<R: CryptoRng>(&mut self, rng: &mut R) -> Result<Author> {
273        let author = Author::new(rng);
274        self.import_author(author.clone())?;
275        Ok(author)
276    }
277
278    /// Check if a [`AuthorHeads`] contains entry timestamps that we do not have locally.
279    ///
280    /// Returns the number of authors that the other peer has updates for.
281    pub fn has_news_for_us(
282        &mut self,
283        namespace: NamespaceId,
284        heads: &AuthorHeads,
285    ) -> Result<Option<NonZeroU64>> {
286        let our_heads = {
287            let latest = self.get_latest_for_each_author(namespace)?;
288            let mut heads = AuthorHeads::default();
289            for e in latest {
290                let (author, timestamp, _key) = e?;
291                heads.insert(author, timestamp);
292            }
293            heads
294        };
295        let has_news_for_us = heads.has_news_for(&our_heads);
296        Ok(has_news_for_us)
297    }
298
299    /// Open a replica from this store.
300    ///
301    /// This just calls load_replica_info and then creates a new replica with the info.
302    pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
303        let info = self.load_replica_info(namespace_id)?;
304        let instance = StoreInstance::new(*namespace_id, self);
305        Ok(Replica::new(instance, Box::new(info)))
306    }
307
308    /// Load the replica info from the store.
309    pub fn load_replica_info(
310        &mut self,
311        namespace_id: &NamespaceId,
312    ) -> Result<ReplicaInfo, OpenError> {
313        let tables = self.tables()?;
314        let info = match tables.namespaces.get(namespace_id.as_bytes()) {
315            Ok(Some(db_value)) => {
316                let (raw_kind, raw_bytes) = db_value.value();
317                let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
318                ReplicaInfo::new(namespace)
319            }
320            Ok(None) => return Err(OpenError::NotFound),
321            Err(err) => return Err(OpenError::Other(err.into())),
322        };
323        self.open_replicas.insert(info.capability.id());
324        Ok(info)
325    }
326
327    /// Close a replica.
328    pub fn close_replica(&mut self, id: NamespaceId) {
329        self.open_replicas.remove(&id);
330    }
331
332    /// List all replica namespaces in this store.
333    pub fn list_namespaces(
334        &mut self,
335    ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
336        let snapshot = self.snapshot()?;
337        let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
338        let iter = iter.map(|res| {
339            let capability = parse_capability(res?.1.value())?;
340            Ok((capability.id(), capability.kind()))
341        });
342        Ok(iter)
343    }
344
345    /// Get an author key from the store.
346    pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
347        let tables = self.tables()?;
348        let Some(author) = tables.authors.get(author_id.as_bytes())? else {
349            return Ok(None);
350        };
351        let author = Author::from_bytes(author.value());
352        Ok(Some(author))
353    }
354
355    /// Import an author key pair.
356    pub fn import_author(&mut self, author: Author) -> Result<()> {
357        self.modify(|tables| {
358            tables
359                .authors
360                .insert(author.id().as_bytes(), &author.to_bytes())?;
361            Ok(())
362        })
363    }
364
365    /// Delete an author.
366    pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
367        self.modify(|tables| {
368            tables.authors.remove(author.as_bytes())?;
369            Ok(())
370        })
371    }
372
373    /// List all author keys in this store.
374    pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
375        let tables = self.snapshot()?;
376        let iter = tables
377            .authors
378            .range::<&'static [u8; 32]>(..)?
379            .map(|res| match res {
380                Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
381                Err(err) => Err(err.into()),
382            });
383        Ok(iter)
384    }
385
386    /// Import a new replica namespace.
387    pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
388        self.modify(|tables| {
389            let outcome = {
390                let (capability, outcome) = {
391                    let existing = tables.namespaces.get(capability.id().as_bytes())?;
392                    if let Some(existing) = existing {
393                        let mut existing = parse_capability(existing.value())?;
394                        let outcome = if existing.merge(capability)? {
395                            ImportNamespaceOutcome::Upgraded
396                        } else {
397                            ImportNamespaceOutcome::NoChange
398                        };
399                        (existing, outcome)
400                    } else {
401                        (capability, ImportNamespaceOutcome::Inserted)
402                    }
403                };
404                let id = capability.id().to_bytes();
405                let (kind, bytes) = capability.raw();
406                tables.namespaces.insert(&id, (kind, &bytes))?;
407                outcome
408            };
409            Ok(outcome)
410        })
411    }
412
413    /// Remove a replica.
414    ///
415    /// Completely removes a replica and deletes both the namespace private key and all document
416    /// entries.
417    ///
418    /// Note that a replica has to be closed before it can be removed. The store has to enforce
419    /// that a replica cannot be removed while it is still open.
420    pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
421        if self.open_replicas.contains(namespace) {
422            return Err(anyhow!("replica is not closed"));
423        }
424        self.modify(|tables| {
425            let bounds = RecordsBounds::namespace(*namespace);
426            tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
427            let bounds = ByKeyBounds::namespace(*namespace);
428            let _ = tables
429                .records_by_key
430                .retain_in(bounds.as_ref(), |_k, _v| false);
431            tables.namespaces.remove(namespace.as_bytes())?;
432            tables.namespace_peers.remove_all(namespace.as_bytes())?;
433            tables.download_policy.remove(namespace.as_bytes())?;
434            Ok(())
435        })
436    }
437
438    /// Get an iterator over entries of a replica.
439    pub fn get_many(
440        &mut self,
441        namespace: NamespaceId,
442        query: impl Into<Query>,
443    ) -> Result<QueryIterator> {
444        let tables = self.snapshot_owned()?;
445        QueryIterator::new(tables, namespace, query.into())
446    }
447
448    /// Get an entry by key and author.
449    pub fn get_exact(
450        &mut self,
451        namespace: NamespaceId,
452        author: AuthorId,
453        key: impl AsRef<[u8]>,
454        include_empty: bool,
455    ) -> Result<Option<SignedEntry>> {
456        get_exact(
457            &self.tables()?.records,
458            namespace,
459            author,
460            key,
461            include_empty,
462        )
463    }
464
465    /// Get all content hashes of all replicas in the store.
466    pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
467        let tables = self.snapshot_owned()?;
468        ContentHashesIterator::all(&tables.records)
469    }
470
471    /// Get the latest entry for each author in a namespace.
472    pub fn get_latest_for_each_author(
473        &mut self,
474        namespace: NamespaceId,
475    ) -> Result<LatestIterator<'_>> {
476        LatestIterator::new(&self.tables()?.latest_per_author, namespace)
477    }
478
479    /// Register a peer that has been useful to sync a document.
480    pub fn register_useful_peer(
481        &mut self,
482        namespace: NamespaceId,
483        peer: crate::PeerIdBytes,
484    ) -> Result<()> {
485        let peer = &peer;
486        let namespace = namespace.as_bytes();
487        // calculate nanos since UNIX_EPOCH for a time measurement
488        let nanos = SystemTime::UNIX_EPOCH
489            .elapsed()
490            .map(|duration| duration.as_nanos() as u64)?;
491        self.modify(|tables| {
492            // ensure the document exists
493            anyhow::ensure!(
494                tables.namespaces.get(namespace)?.is_some(),
495                "document not created"
496            );
497
498            let mut namespace_peers = tables.namespace_peers.get(namespace)?;
499
500            // get the oldest entry since it's candidate for removal
501            let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
502                let (oldest_nanos, &oldest_peer) = guard.value();
503                (oldest_nanos, oldest_peer)
504            });
505            match maybe_oldest {
506                None => {
507                    // the table is empty so the peer can be inserted without further checks since
508                    // super::PEERS_PER_DOC_CACHE_SIZE is non zero
509                    drop(namespace_peers);
510                    tables.namespace_peers.insert(namespace, (nanos, peer))?;
511                }
512                Some((oldest_nanos, oldest_peer)) => {
513                    let oldest_peer = &oldest_peer;
514
515                    if oldest_peer == peer {
516                        // oldest peer is the current one, so replacing the entry for the peer will
517                        // maintain the size
518                        drop(namespace_peers);
519                        tables
520                            .namespace_peers
521                            .remove(namespace, (oldest_nanos, oldest_peer))?;
522                        tables.namespace_peers.insert(namespace, (nanos, peer))?;
523                    } else {
524                        // calculate the len in the same loop since calling `len` is another fallible operation
525                        let mut len = 1;
526                        // find any previous entry for the same peer to remove it
527                        let mut prev_peer_nanos = None;
528
529                        for result in namespace_peers {
530                            len += 1;
531                            let guard = result?;
532                            let (peer_nanos, peer_bytes) = guard.value();
533                            if prev_peer_nanos.is_none() && peer_bytes == peer {
534                                prev_peer_nanos = Some(peer_nanos)
535                            }
536                        }
537
538                        match prev_peer_nanos {
539                            Some(prev_nanos) => {
540                                // the peer was already present, so we can remove the old entry and
541                                // insert the new one without checking the size
542                                tables
543                                    .namespace_peers
544                                    .remove(namespace, (prev_nanos, peer))?;
545                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
546                            }
547                            None => {
548                                // the peer is new and the table is non empty, add it and check the
549                                // size to decide if the oldest peer should be evicted
550                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
551                                len += 1;
552                                if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
553                                    tables
554                                        .namespace_peers
555                                        .remove(namespace, (oldest_nanos, oldest_peer))?;
556                                }
557                            }
558                        }
559                    }
560                }
561            }
562            Ok(())
563        })
564    }
565
566    /// Get the peers that have been useful for a document.
567    pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
568        let tables = self.tables()?;
569        let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
570        for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
571            let (_nanos, &peer) = result?.value();
572            peers.push(peer);
573        }
574        if peers.is_empty() {
575            Ok(None)
576        } else {
577            Ok(Some(peers.into_iter()))
578        }
579    }
580
581    /// Set the download policy for a namespace.
582    pub fn set_download_policy(
583        &mut self,
584        namespace: &NamespaceId,
585        policy: DownloadPolicy,
586    ) -> Result<()> {
587        self.modify(|tables| {
588            let namespace = namespace.as_bytes();
589
590            // ensure the document exists
591            anyhow::ensure!(
592                tables.namespaces.get(&namespace)?.is_some(),
593                "document not created"
594            );
595
596            let value = postcard::to_stdvec(&policy)?;
597            tables.download_policy.insert(namespace, value.as_slice())?;
598            Ok(())
599        })
600    }
601
602    /// Get the download policy for a namespace.
603    pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
604        let tables = self.tables()?;
605        let value = tables.download_policy.get(namespace.as_bytes())?;
606        Ok(match value {
607            None => DownloadPolicy::default(),
608            Some(value) => postcard::from_bytes(value.value())?,
609        })
610    }
611}
612
613impl PublicKeyStore for Store {
614    fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
615        self.pubkeys.public_key(id)
616    }
617}
618
619fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
620    Capability::from_raw(raw_kind, raw_bytes)
621}
622
623fn get_exact(
624    record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
625    namespace: NamespaceId,
626    author: AuthorId,
627    key: impl AsRef<[u8]>,
628    include_empty: bool,
629) -> Result<Option<SignedEntry>> {
630    let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
631    let record = record_table.get(id)?;
632    Ok(record
633        .map(|r| into_entry(id, r.value()))
634        .filter(|entry| include_empty || !entry.is_empty()))
635}
636
637/// A wrapper around [`Store`] for a specific [`NamespaceId`]
638#[derive(Debug)]
639pub struct StoreInstance<'a> {
640    namespace: NamespaceId,
641    pub(crate) store: &'a mut Store,
642}
643
644impl<'a> StoreInstance<'a> {
645    pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
646        StoreInstance { namespace, store }
647    }
648}
649
650impl PublicKeyStore for StoreInstance<'_> {
651    fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
652        self.store.public_key(id)
653    }
654}
655
656impl super::DownloadPolicyStore for StoreInstance<'_> {
657    fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
658        self.store.get_download_policy(namespace)
659    }
660}
661
662impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
663    type Error = anyhow::Error;
664    type RangeIterator<'x>
665        = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
666    where
667        'a: 'x;
668    type ParentIterator<'x>
669        = ParentIterator
670    where
671        'a: 'x;
672
673    /// Get a the first key (or the default if none is available).
674    fn get_first(&mut self) -> Result<RecordIdentifier> {
675        let tables = self.store.as_mut().tables()?;
676        // TODO: verify this fetches all keys with this namespace
677        let bounds = RecordsBounds::namespace(self.namespace);
678        let mut records = tables.records.range(bounds.as_ref())?;
679
680        let Some(record) = records.next() else {
681            return Ok(RecordIdentifier::default());
682        };
683        let (compound_key, _value) = record?;
684        let (namespace_id, author_id, key) = compound_key.value();
685        let id = RecordIdentifier::new(namespace_id, author_id, key);
686        Ok(id)
687    }
688
689    #[cfg(test)]
690    fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
691        self.store
692            .as_mut()
693            .get_exact(id.namespace(), id.author(), id.key(), true)
694    }
695
696    #[cfg(test)]
697    fn len(&mut self) -> Result<usize> {
698        let tables = self.store.as_mut().tables()?;
699        let bounds = RecordsBounds::namespace(self.namespace);
700        let records = tables.records.range(bounds.as_ref())?;
701        Ok(records.count())
702    }
703
704    #[cfg(test)]
705    fn is_empty(&mut self) -> Result<bool> {
706        use redb::ReadableTableMetadata;
707        let tables = self.store.as_mut().tables()?;
708        Ok(tables.records.is_empty()?)
709    }
710
711    fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
712        // TODO: optimize
713        let elements = self.get_range(range.clone())?;
714
715        let mut fp = Fingerprint::empty();
716        for el in elements {
717            let el = el?;
718            fp ^= el.as_fingerprint();
719        }
720
721        Ok(fp)
722    }
723
724    fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
725        let id = e.id();
726        self.store.as_mut().modify(|tables| {
727            // insert into record table
728            let key = (
729                &id.namespace().to_bytes(),
730                &id.author().to_bytes(),
731                id.key(),
732            );
733            let hash = e.content_hash(); // let binding is needed
734            let value = (
735                e.timestamp(),
736                &e.signature().namespace().to_bytes(),
737                &e.signature().author().to_bytes(),
738                e.content_len(),
739                hash.as_bytes(),
740            );
741            tables.records.insert(key, value)?;
742
743            // insert into by key index table
744            let key = (
745                &id.namespace().to_bytes(),
746                id.key(),
747                &id.author().to_bytes(),
748            );
749            tables.records_by_key.insert(key, ())?;
750
751            // insert into latest table
752            let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
753            let value = (e.timestamp(), e.id().key());
754            tables.latest_per_author.insert(key, value)?;
755            Ok(())
756        })
757    }
758
759    fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
760        let tables = self.store.as_mut().tables()?;
761        let iter = match range.x().cmp(range.y()) {
762            // identity range: iter1 = all, iter2 = none
763            Ordering::Equal => {
764                // iterator for all entries in replica
765                let bounds = RecordsBounds::namespace(self.namespace);
766                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
767                chain_none(iter)
768            }
769            // regular range: iter1 = x <= t < y, iter2 = none
770            Ordering::Less => {
771                // iterator for entries from range.x to range.y
772                let start = Bound::Included(range.x().to_byte_tuple());
773                let end = Bound::Excluded(range.y().to_byte_tuple());
774                let bounds = RecordsBounds::new(start, end);
775                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
776                chain_none(iter)
777            }
778            // split range: iter1 = start <= t < y, iter2 = x <= t <= end
779            Ordering::Greater => {
780                // iterator for entries from start to range.y
781                let end = Bound::Excluded(range.y().to_byte_tuple());
782                let bounds = RecordsBounds::from_start(&self.namespace, end);
783                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
784
785                // iterator for entries from range.x to end
786                let start = Bound::Included(range.x().to_byte_tuple());
787                let bounds = RecordsBounds::to_end(&self.namespace, start);
788                let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
789
790                iter.chain(Some(iter2).into_iter().flatten())
791            }
792        };
793        Ok(iter)
794    }
795
796    #[cfg(test)]
797    fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
798        self.store.as_mut().modify(|tables| {
799            let entry = {
800                let (namespace, author, key) = id.as_byte_tuple();
801                let id = (namespace, key, author);
802                tables.records_by_key.remove(id)?;
803                let id = (namespace, author, key);
804                let value = tables.records.remove(id)?;
805                value.map(|value| into_entry(id, value.value()))
806            };
807            Ok(entry)
808        })
809    }
810
811    #[cfg(test)]
812    fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
813        let tables = self.store.as_mut().tables()?;
814        let bounds = RecordsBounds::namespace(self.namespace);
815        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
816        Ok(chain_none(iter))
817    }
818
819    fn prefixes_of(
820        &mut self,
821        id: &RecordIdentifier,
822    ) -> Result<Self::ParentIterator<'_>, Self::Error> {
823        let tables = self.store.as_mut().tables()?;
824        ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
825    }
826
827    #[cfg(test)]
828    fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
829        let tables = self.store.as_mut().tables()?;
830        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
831        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
832        Ok(chain_none(iter))
833    }
834
835    fn remove_prefix_filtered(
836        &mut self,
837        id: &RecordIdentifier,
838        predicate: impl Fn(&Record) -> bool,
839    ) -> Result<usize> {
840        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
841        self.store.as_mut().modify(|tables| {
842            let cb = |_k: RecordsId, v: RecordsValue| {
843                let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
844                let record = Record::new(hash.into(), len, timestamp);
845
846                predicate(&record)
847            };
848            let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
849            let count = iter.count();
850            Ok(count)
851        })
852    }
853}
854
855fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
856    iter: I,
857) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
858    iter.chain(None.into_iter().flatten())
859}
860
861/// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which
862/// is a prefix of the key passed to the iterator.
863#[derive(Debug)]
864pub struct ParentIterator {
865    inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
866}
867
868impl ParentIterator {
869    fn new(
870        tables: &Tables,
871        namespace: NamespaceId,
872        author: AuthorId,
873        key: Vec<u8>,
874    ) -> anyhow::Result<Self> {
875        let parents = parents(&tables.records, namespace, author, key.clone());
876        Ok(Self {
877            inner: parents.into_iter(),
878        })
879    }
880}
881
882fn parents(
883    table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
884    namespace: NamespaceId,
885    author: AuthorId,
886    mut key: Vec<u8>,
887) -> Vec<anyhow::Result<SignedEntry>> {
888    let mut res = Vec::new();
889
890    while !key.is_empty() {
891        let entry = get_exact(table, namespace, author, &key, false);
892        key.pop();
893        match entry {
894            Err(err) => res.push(Err(err)),
895            Ok(Some(entry)) => res.push(Ok(entry)),
896            Ok(None) => continue,
897        }
898    }
899    res.reverse();
900    res
901}
902
903impl Iterator for ParentIterator {
904    type Item = Result<SignedEntry>;
905
906    fn next(&mut self) -> Option<Self::Item> {
907        self.inner.next()
908    }
909}
910
911/// Iterator for all content hashes
912///
913/// Note that you might get duplicate hashes. Also, the iterator will keep
914/// a database snapshot open until it is dropped.
915///
916/// Also, this represents a snapshot of the database at the time of creation.
917/// It needs a copy of a redb::ReadOnlyTable to be self-contained.
918#[derive(derive_more::Debug)]
919pub struct ContentHashesIterator {
920    #[debug(skip)]
921    range: RecordsRange<'static>,
922}
923
924impl ContentHashesIterator {
925    /// Create a new iterator over all content hashes.
926    pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
927        let range = RecordsRange::all_static(table)?;
928        Ok(Self { range })
929    }
930}
931
932impl Iterator for ContentHashesIterator {
933    type Item = Result<Hash>;
934
935    fn next(&mut self) -> Option<Self::Item> {
936        let v = self.range.next()?;
937        Some(v.map(|e| e.content_hash()))
938    }
939}
940
941/// Iterator over the latest entry per author.
942#[derive(derive_more::Debug)]
943#[debug("LatestIterator")]
944pub struct LatestIterator<'a>(
945    redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
946);
947
948impl<'a> LatestIterator<'a> {
949    fn new(
950        latest_per_author: &'a impl ReadableTable<
951            LatestPerAuthorKey<'static>,
952            LatestPerAuthorValue<'static>,
953        >,
954        namespace: NamespaceId,
955    ) -> anyhow::Result<Self> {
956        let start = (namespace.as_bytes(), &[u8::MIN; 32]);
957        let end = (namespace.as_bytes(), &[u8::MAX; 32]);
958        let range = latest_per_author.range(start..=end)?;
959        Ok(Self(range))
960    }
961}
962
963impl Iterator for LatestIterator<'_> {
964    type Item = Result<(AuthorId, u64, Vec<u8>)>;
965
966    fn next(&mut self) -> Option<Self::Item> {
967        self.0.next_map(|key, value| {
968            let (_namespace, author) = key;
969            let (timestamp, key) = value;
970            (author.into(), timestamp, key.to_vec())
971        })
972    }
973}
974
975fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
976    let (namespace, author, key) = key;
977    let (timestamp, namespace_sig, author_sig, len, hash) = value;
978    let id = RecordIdentifier::new(namespace, author, key);
979    let record = Record::new(hash.into(), len, timestamp);
980    let entry = Entry::new(id, record);
981    let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
982    SignedEntry::new(entry_signature, entry)
983}
984
985#[cfg(all(test, feature = "fs-store"))]
986mod tests {
987    use super::*;
988    use crate::ranger::Store as _;
989
990    #[tokio::test]
991    async fn test_ranges() -> Result<()> {
992        let dbfile = tempfile::NamedTempFile::new()?;
993        let mut store = Store::persistent(dbfile.path())?;
994
995        let author = store.new_author(&mut rand::rng())?;
996        let namespace = NamespaceSecret::new(&mut rand::rng());
997        let mut replica = store.new_replica(namespace.clone())?;
998
999        // test author prefix relation for all-255 keys
1000        let key1 = vec![255, 255];
1001        let key2 = vec![255, 255, 255];
1002        replica.hash_and_insert(&key1, &author, b"v1").await?;
1003        replica.hash_and_insert(&key2, &author, b"v2").await?;
1004        let res = store
1005            .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
1006            .collect::<Result<Vec<_>>>()?;
1007        assert_eq!(res.len(), 2);
1008        assert_eq!(
1009            res.into_iter()
1010                .map(|entry| entry.key().to_vec())
1011                .collect::<Vec<_>>(),
1012            vec![key1, key2]
1013        );
1014        Ok(())
1015    }
1016
1017    #[test]
1018    fn test_basics() -> Result<()> {
1019        let dbfile = tempfile::NamedTempFile::new()?;
1020        let mut store = Store::persistent(dbfile.path())?;
1021
1022        let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1023        assert!(authors.is_empty());
1024
1025        let author = store.new_author(&mut rand::rng())?;
1026        let namespace = NamespaceSecret::new(&mut rand::rng());
1027        let _replica = store.new_replica(namespace.clone())?;
1028        store.close_replica(namespace.id());
1029        let replica = store.load_replica_info(&namespace.id())?;
1030        assert_eq!(replica.capability.id(), namespace.id());
1031
1032        let author_back = store.get_author(&author.id())?.unwrap();
1033        assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1034
1035        let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1036        for i in 0..5 {
1037            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1038            let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1039            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1040            wrapper.entry_put(entry)?;
1041        }
1042
1043        // all
1044        let all: Vec<_> = wrapper.all()?.collect();
1045        assert_eq!(all.len(), 5);
1046
1047        // add a second version
1048        let mut ids = Vec::new();
1049        for i in 0..5 {
1050            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1051            let entry = Entry::new(
1052                id.clone(),
1053                Record::current_from_data(format!("world-{i}-2")),
1054            );
1055            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1056            wrapper.entry_put(entry)?;
1057            ids.push(id);
1058        }
1059
1060        // get all
1061        let entries = wrapper
1062            .store
1063            .get_many(namespace.id(), Query::all())?
1064            .collect::<Result<Vec<_>>>()?;
1065        assert_eq!(entries.len(), 5);
1066
1067        // get all prefix
1068        let entries = wrapper
1069            .store
1070            .get_many(namespace.id(), Query::key_prefix("hello-"))?
1071            .collect::<Result<Vec<_>>>()?;
1072        assert_eq!(entries.len(), 5);
1073
1074        // delete and get
1075        for id in ids {
1076            let res = wrapper.get(&id)?;
1077            assert!(res.is_some());
1078            let out = wrapper.entry_remove(&id)?.unwrap();
1079            assert_eq!(out.entry().id(), &id);
1080            let res = wrapper.get(&id)?;
1081            assert!(res.is_none());
1082        }
1083
1084        // get latest
1085        let entries = wrapper
1086            .store
1087            .get_many(namespace.id(), Query::all())?
1088            .collect::<Result<Vec<_>>>()?;
1089        assert_eq!(entries.len(), 0);
1090
1091        Ok(())
1092    }
1093
1094    fn copy_and_modify(
1095        source: &std::path::Path,
1096        modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1097    ) -> Result<tempfile::NamedTempFile> {
1098        let dbfile = tempfile::NamedTempFile::new()?;
1099        std::fs::copy(source, dbfile.path())?;
1100        let db = Database::create(dbfile.path())?;
1101        let write_tx = db.begin_write()?;
1102        modify(&write_tx)?;
1103        write_tx.commit()?;
1104        drop(db);
1105        Ok(dbfile)
1106    }
1107
1108    #[tokio::test]
1109    #[cfg(feature = "fs-store")]
1110    async fn test_migration_001_populate_latest_table() -> Result<()> {
1111        use super::tables::LATEST_PER_AUTHOR_TABLE;
1112        let dbfile = tempfile::NamedTempFile::new()?;
1113        let namespace = NamespaceSecret::new(&mut rand::rng());
1114
1115        // create a store and add some data
1116        let expected = {
1117            let mut store = Store::persistent(dbfile.path())?;
1118            let author1 = store.new_author(&mut rand::rng())?;
1119            let author2 = store.new_author(&mut rand::rng())?;
1120            let mut replica = store.new_replica(namespace.clone())?;
1121            replica.hash_and_insert(b"k1", &author1, b"v1").await?;
1122            replica.hash_and_insert(b"k2", &author2, b"v1").await?;
1123            replica.hash_and_insert(b"k3", &author1, b"v1").await?;
1124
1125            let expected = store
1126                .get_latest_for_each_author(namespace.id())?
1127                .collect::<Result<Vec<_>>>()?;
1128            // drop everything to clear file locks.
1129            store.close_replica(namespace.id());
1130            // flush the store to disk
1131            store.flush()?;
1132            drop(store);
1133            expected
1134        };
1135        assert_eq!(expected.len(), 2);
1136
1137        // create a copy of our db file with the latest table deleted.
1138        let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1139            tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1140            Ok(())
1141        })?;
1142
1143        // open the copied db file, which will run the migration.
1144        let mut store = Store::persistent(dbfile_before_migration.path())?;
1145        let actual = store
1146            .get_latest_for_each_author(namespace.id())?
1147            .collect::<Result<Vec<_>>>()?;
1148
1149        assert_eq!(expected, actual);
1150        Ok(())
1151    }
1152
1153    #[test]
1154    #[cfg(feature = "fs-store")]
1155    fn test_migration_004_populate_by_key_index() -> Result<()> {
1156        use redb::ReadableTableMetadata;
1157        let dbfile = tempfile::NamedTempFile::new()?;
1158
1159        let mut store = Store::persistent(dbfile.path())?;
1160
1161        // check that the new table is there, even if empty
1162        {
1163            let tables = store.tables()?;
1164            assert_eq!(tables.records_by_key.len()?, 0);
1165        }
1166
1167        // TODO: write test checking that the indexing is done correctly
1168        Ok(())
1169    }
1170}