iroh_docs/store/
fs.rs

1//! On disk storage for replicas.
2
3use std::{
4    cmp::Ordering,
5    collections::HashSet,
6    iter::{Chain, Flatten},
7    num::NonZeroU64,
8    ops::Bound,
9};
10
11use anyhow::{anyhow, Result};
12use ed25519_dalek::{SignatureError, VerifyingKey};
13use iroh_blobs::Hash;
14use n0_future::time::SystemTime;
15use rand::CryptoRng;
16use redb::{Database, ReadableMultimapTable, ReadableTable};
17use tracing::warn;
18
19use super::{
20    pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21    Query,
22};
23use crate::{
24    actor::MAX_COMMIT_DELAY,
25    keys::Author,
26    ranger::{Fingerprint, Range, RangeEntry},
27    sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28    AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29    ReplicaInfo,
30};
31
32mod bounds;
33mod migrations;
34mod query;
35mod ranges;
36pub(crate) mod tables;
37
38pub use self::ranges::RecordsRange;
39use self::{
40    bounds::{ByKeyBounds, RecordsBounds},
41    query::QueryIterator,
42    ranges::RangeExt,
43    tables::{
44        LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
45        RecordsValue, Tables, TransactionAndTables,
46    },
47};
48
49/// Manages the replicas and authors for an instance.
50#[derive(Debug)]
51pub struct Store {
52    db: Database,
53    transaction: CurrentTransaction,
54    open_replicas: HashSet<NamespaceId>,
55    pubkeys: MemPublicKeyStore,
56}
57
58impl Drop for Store {
59    fn drop(&mut self) {
60        if let Err(err) = self.flush() {
61            warn!("failed to trigger final flush: {:?}", err);
62        }
63    }
64}
65
66impl AsRef<Store> for Store {
67    fn as_ref(&self) -> &Store {
68        self
69    }
70}
71
72impl AsMut<Store> for Store {
73    fn as_mut(&mut self) -> &mut Store {
74        self
75    }
76}
77
78#[derive(derive_more::Debug, Default)]
79#[allow(clippy::large_enum_variant)]
80enum CurrentTransaction {
81    #[default]
82    None,
83    Read(ReadOnlyTables),
84    Write(TransactionAndTables),
85}
86
87impl Store {
88    /// Create a new store in memory.
89    pub fn memory() -> Self {
90        Self::memory_impl().expect("failed to create memory store")
91    }
92
93    fn memory_impl() -> Result<Self> {
94        let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
95        Self::new_impl(db)
96    }
97
98    /// Create or open a store from a `path` to a database file.
99    ///
100    /// The file will be created if it does not exist, otherwise it will be opened.
101    #[cfg(feature = "fs-store")]
102    pub fn persistent(path: impl AsRef<std::path::Path>) -> Result<Self> {
103        let mut db = match Database::create(&path) {
104            Ok(db) => db,
105            Err(redb::DatabaseError::UpgradeRequired(1)) => return Err(
106                anyhow!("Opening the database failed: Upgrading from old format is no longer supported. Use iroh-docs 0.92 to perform the upgrade, then upgrade to the latest release again.")
107            ),
108            Err(err) => return Err(err.into()),
109        };
110        match db.upgrade() {
111            Ok(true) => tracing::info!("Database was upgraded to redb v3 compatible format"),
112            Ok(false) => {}
113            Err(err) => warn!("Database upgrade to redb v3 compatible format failed: {err:#}"),
114        }
115        Self::new_impl(db)
116    }
117
118    fn new_impl(db: redb::Database) -> Result<Self> {
119        // Setup all tables
120        let write_tx = db.begin_write()?;
121        let _ = Tables::new(&write_tx)?;
122        write_tx.commit()?;
123
124        // Run database migrations
125        migrations::run_migrations(&db)?;
126
127        Ok(Store {
128            db,
129            transaction: Default::default(),
130            open_replicas: Default::default(),
131            pubkeys: Default::default(),
132        })
133    }
134
135    /// Flush the current transaction, if any.
136    ///
137    /// This is the cheapest way to ensure that the data is persisted.
138    pub fn flush(&mut self) -> Result<()> {
139        if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
140            w.commit()?;
141        }
142        Ok(())
143    }
144
145    /// Get a read-only snapshot of the database.
146    ///
147    /// This has the side effect of committing any open write transaction,
148    /// so it can be used as a way to ensure that the data is persisted.
149    pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
150        let guard = &mut self.transaction;
151        let tables = match std::mem::take(guard) {
152            CurrentTransaction::None => {
153                let tx = self.db.begin_read()?;
154                ReadOnlyTables::new(tx)?
155            }
156            CurrentTransaction::Write(w) => {
157                w.commit()?;
158                let tx = self.db.begin_read()?;
159                ReadOnlyTables::new(tx)?
160            }
161            CurrentTransaction::Read(tables) => tables,
162        };
163        *guard = CurrentTransaction::Read(tables);
164        match &*guard {
165            CurrentTransaction::Read(ref tables) => Ok(tables),
166            _ => unreachable!(),
167        }
168    }
169
170    /// Get an owned read-only snapshot of the database.
171    ///
172    /// This will open a new read transaction. The read transaction won't be reused for other
173    /// reads.
174    ///
175    /// This has the side effect of committing any open write transaction,
176    /// so it can be used as a way to ensure that the data is persisted.
177    pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
178        // make sure the current transaction is committed
179        self.flush()?;
180        assert!(matches!(self.transaction, CurrentTransaction::None));
181        let tx = self.db.begin_read()?;
182        let tables = ReadOnlyTables::new(tx)?;
183        Ok(tables)
184    }
185
186    /// Get access to the tables to read from them.
187    ///
188    /// The underlying transaction is a write transaction, but with a non-mut
189    /// reference to the tables you can not write.
190    ///
191    /// There is no guarantee that this will be an independent transaction.
192    /// You just get readonly access to the current state of the database.
193    ///
194    /// As such, there is also no guarantee that the data you see is
195    /// already persisted.
196    fn tables(&mut self) -> Result<&Tables<'_>> {
197        let guard = &mut self.transaction;
198        let tables = match std::mem::take(guard) {
199            CurrentTransaction::None => {
200                let tx = self.db.begin_write()?;
201                TransactionAndTables::new(tx)?
202            }
203            CurrentTransaction::Write(w) => {
204                if w.since.elapsed() > MAX_COMMIT_DELAY {
205                    tracing::debug!("committing transaction because it's too old");
206                    w.commit()?;
207                    let tx = self.db.begin_write()?;
208                    TransactionAndTables::new(tx)?
209                } else {
210                    w
211                }
212            }
213            CurrentTransaction::Read(_) => {
214                let tx = self.db.begin_write()?;
215                TransactionAndTables::new(tx)?
216            }
217        };
218        *guard = CurrentTransaction::Write(tables);
219        match guard {
220            CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
221            _ => unreachable!(),
222        }
223    }
224
225    /// Get exclusive write access to the tables in the current transaction.
226    ///
227    /// There is no guarantee that this will be an independent transaction.
228    /// As such, there is also no guarantee that the data you see or write
229    /// will be persisted.
230    ///
231    /// To ensure that the data is persisted, acquire a snapshot of the database
232    /// or call flush.
233    fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
234        let guard = &mut self.transaction;
235        let tables = match std::mem::take(guard) {
236            CurrentTransaction::None => {
237                let tx = self.db.begin_write()?;
238                TransactionAndTables::new(tx)?
239            }
240            CurrentTransaction::Write(w) => {
241                if w.since.elapsed() > MAX_COMMIT_DELAY {
242                    tracing::debug!("committing transaction because it's too old");
243                    w.commit()?;
244                    let tx = self.db.begin_write()?;
245                    TransactionAndTables::new(tx)?
246                } else {
247                    w
248                }
249            }
250            CurrentTransaction::Read(_) => {
251                let tx = self.db.begin_write()?;
252                TransactionAndTables::new(tx)?
253            }
254        };
255        *guard = CurrentTransaction::Write(tables);
256        let res = match &mut *guard {
257            CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
258            _ => unreachable!(),
259        };
260        Ok(res)
261    }
262}
263
264type PeersIter = std::vec::IntoIter<PeerIdBytes>;
265
266impl Store {
267    /// Create a new replica for `namespace` and persist in this store.
268    pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
269        let id = namespace.id();
270        self.import_namespace(namespace.into())?;
271        self.open_replica(&id).map_err(Into::into)
272    }
273
274    /// Create a new author key and persist it in the store.
275    pub fn new_author<R: CryptoRng>(&mut self, rng: &mut R) -> Result<Author> {
276        let author = Author::new(rng);
277        self.import_author(author.clone())?;
278        Ok(author)
279    }
280
281    /// Check if a [`AuthorHeads`] contains entry timestamps that we do not have locally.
282    ///
283    /// Returns the number of authors that the other peer has updates for.
284    pub fn has_news_for_us(
285        &mut self,
286        namespace: NamespaceId,
287        heads: &AuthorHeads,
288    ) -> Result<Option<NonZeroU64>> {
289        let our_heads = {
290            let latest = self.get_latest_for_each_author(namespace)?;
291            let mut heads = AuthorHeads::default();
292            for e in latest {
293                let (author, timestamp, _key) = e?;
294                heads.insert(author, timestamp);
295            }
296            heads
297        };
298        let has_news_for_us = heads.has_news_for(&our_heads);
299        Ok(has_news_for_us)
300    }
301
302    /// Open a replica from this store.
303    ///
304    /// This just calls load_replica_info and then creates a new replica with the info.
305    pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
306        let info = self.load_replica_info(namespace_id)?;
307        let instance = StoreInstance::new(*namespace_id, self);
308        Ok(Replica::new(instance, Box::new(info)))
309    }
310
311    /// Load the replica info from the store.
312    pub fn load_replica_info(
313        &mut self,
314        namespace_id: &NamespaceId,
315    ) -> Result<ReplicaInfo, OpenError> {
316        let tables = self.tables()?;
317        let info = match tables.namespaces.get(namespace_id.as_bytes()) {
318            Ok(Some(db_value)) => {
319                let (raw_kind, raw_bytes) = db_value.value();
320                let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
321                ReplicaInfo::new(namespace)
322            }
323            Ok(None) => return Err(OpenError::NotFound),
324            Err(err) => return Err(OpenError::Other(err.into())),
325        };
326        self.open_replicas.insert(info.capability.id());
327        Ok(info)
328    }
329
330    /// Close a replica.
331    pub fn close_replica(&mut self, id: NamespaceId) {
332        self.open_replicas.remove(&id);
333    }
334
335    /// List all replica namespaces in this store.
336    pub fn list_namespaces(
337        &mut self,
338    ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
339        let snapshot = self.snapshot()?;
340        let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
341        let iter = iter.map(|res| {
342            let capability = parse_capability(res?.1.value())?;
343            Ok((capability.id(), capability.kind()))
344        });
345        Ok(iter)
346    }
347
348    /// Get an author key from the store.
349    pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
350        let tables = self.tables()?;
351        let Some(author) = tables.authors.get(author_id.as_bytes())? else {
352            return Ok(None);
353        };
354        let author = Author::from_bytes(author.value());
355        Ok(Some(author))
356    }
357
358    /// Import an author key pair.
359    pub fn import_author(&mut self, author: Author) -> Result<()> {
360        self.modify(|tables| {
361            tables
362                .authors
363                .insert(author.id().as_bytes(), &author.to_bytes())?;
364            Ok(())
365        })
366    }
367
368    /// Delete an author.
369    pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
370        self.modify(|tables| {
371            tables.authors.remove(author.as_bytes())?;
372            Ok(())
373        })
374    }
375
376    /// List all author keys in this store.
377    pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
378        let tables = self.snapshot()?;
379        let iter = tables
380            .authors
381            .range::<&'static [u8; 32]>(..)?
382            .map(|res| match res {
383                Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
384                Err(err) => Err(err.into()),
385            });
386        Ok(iter)
387    }
388
389    /// Import a new replica namespace.
390    pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
391        self.modify(|tables| {
392            let outcome = {
393                let (capability, outcome) = {
394                    let existing = tables.namespaces.get(capability.id().as_bytes())?;
395                    if let Some(existing) = existing {
396                        let mut existing = parse_capability(existing.value())?;
397                        let outcome = if existing.merge(capability)? {
398                            ImportNamespaceOutcome::Upgraded
399                        } else {
400                            ImportNamespaceOutcome::NoChange
401                        };
402                        (existing, outcome)
403                    } else {
404                        (capability, ImportNamespaceOutcome::Inserted)
405                    }
406                };
407                let id = capability.id().to_bytes();
408                let (kind, bytes) = capability.raw();
409                tables.namespaces.insert(&id, (kind, &bytes))?;
410                outcome
411            };
412            Ok(outcome)
413        })
414    }
415
416    /// Remove a replica.
417    ///
418    /// Completely removes a replica and deletes both the namespace private key and all document
419    /// entries.
420    ///
421    /// Note that a replica has to be closed before it can be removed. The store has to enforce
422    /// that a replica cannot be removed while it is still open.
423    pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
424        if self.open_replicas.contains(namespace) {
425            return Err(anyhow!("replica is not closed"));
426        }
427        self.modify(|tables| {
428            let bounds = RecordsBounds::namespace(*namespace);
429            tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
430            let bounds = ByKeyBounds::namespace(*namespace);
431            let _ = tables
432                .records_by_key
433                .retain_in(bounds.as_ref(), |_k, _v| false);
434            tables.namespaces.remove(namespace.as_bytes())?;
435            tables.namespace_peers.remove_all(namespace.as_bytes())?;
436            tables.download_policy.remove(namespace.as_bytes())?;
437            Ok(())
438        })
439    }
440
441    /// Get an iterator over entries of a replica.
442    pub fn get_many(
443        &mut self,
444        namespace: NamespaceId,
445        query: impl Into<Query>,
446    ) -> Result<QueryIterator> {
447        let tables = self.snapshot_owned()?;
448        QueryIterator::new(tables, namespace, query.into())
449    }
450
451    /// Get an entry by key and author.
452    pub fn get_exact(
453        &mut self,
454        namespace: NamespaceId,
455        author: AuthorId,
456        key: impl AsRef<[u8]>,
457        include_empty: bool,
458    ) -> Result<Option<SignedEntry>> {
459        get_exact(
460            &self.tables()?.records,
461            namespace,
462            author,
463            key,
464            include_empty,
465        )
466    }
467
468    /// Get all content hashes of all replicas in the store.
469    pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
470        let tables = self.snapshot_owned()?;
471        ContentHashesIterator::all(&tables.records)
472    }
473
474    /// Get the latest entry for each author in a namespace.
475    pub fn get_latest_for_each_author(
476        &mut self,
477        namespace: NamespaceId,
478    ) -> Result<LatestIterator<'_>> {
479        LatestIterator::new(&self.tables()?.latest_per_author, namespace)
480    }
481
482    /// Register a peer that has been useful to sync a document.
483    pub fn register_useful_peer(
484        &mut self,
485        namespace: NamespaceId,
486        peer: crate::PeerIdBytes,
487    ) -> Result<()> {
488        let peer = &peer;
489        let namespace = namespace.as_bytes();
490        // calculate nanos since UNIX_EPOCH for a time measurement
491        let nanos = SystemTime::UNIX_EPOCH
492            .elapsed()
493            .map(|duration| duration.as_nanos() as u64)?;
494        self.modify(|tables| {
495            // ensure the document exists
496            anyhow::ensure!(
497                tables.namespaces.get(namespace)?.is_some(),
498                "document not created"
499            );
500
501            let mut namespace_peers = tables.namespace_peers.get(namespace)?;
502
503            // get the oldest entry since it's candidate for removal
504            let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
505                let (oldest_nanos, &oldest_peer) = guard.value();
506                (oldest_nanos, oldest_peer)
507            });
508            match maybe_oldest {
509                None => {
510                    // the table is empty so the peer can be inserted without further checks since
511                    // super::PEERS_PER_DOC_CACHE_SIZE is non zero
512                    drop(namespace_peers);
513                    tables.namespace_peers.insert(namespace, (nanos, peer))?;
514                }
515                Some((oldest_nanos, oldest_peer)) => {
516                    let oldest_peer = &oldest_peer;
517
518                    if oldest_peer == peer {
519                        // oldest peer is the current one, so replacing the entry for the peer will
520                        // maintain the size
521                        drop(namespace_peers);
522                        tables
523                            .namespace_peers
524                            .remove(namespace, (oldest_nanos, oldest_peer))?;
525                        tables.namespace_peers.insert(namespace, (nanos, peer))?;
526                    } else {
527                        // calculate the len in the same loop since calling `len` is another fallible operation
528                        let mut len = 1;
529                        // find any previous entry for the same peer to remove it
530                        let mut prev_peer_nanos = None;
531
532                        for result in namespace_peers {
533                            len += 1;
534                            let guard = result?;
535                            let (peer_nanos, peer_bytes) = guard.value();
536                            if prev_peer_nanos.is_none() && peer_bytes == peer {
537                                prev_peer_nanos = Some(peer_nanos)
538                            }
539                        }
540
541                        match prev_peer_nanos {
542                            Some(prev_nanos) => {
543                                // the peer was already present, so we can remove the old entry and
544                                // insert the new one without checking the size
545                                tables
546                                    .namespace_peers
547                                    .remove(namespace, (prev_nanos, peer))?;
548                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
549                            }
550                            None => {
551                                // the peer is new and the table is non empty, add it and check the
552                                // size to decide if the oldest peer should be evicted
553                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
554                                len += 1;
555                                if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
556                                    tables
557                                        .namespace_peers
558                                        .remove(namespace, (oldest_nanos, oldest_peer))?;
559                                }
560                            }
561                        }
562                    }
563                }
564            }
565            Ok(())
566        })
567    }
568
569    /// Get the peers that have been useful for a document.
570    pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
571        let tables = self.tables()?;
572        let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
573        for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
574            let (_nanos, &peer) = result?.value();
575            peers.push(peer);
576        }
577        if peers.is_empty() {
578            Ok(None)
579        } else {
580            Ok(Some(peers.into_iter()))
581        }
582    }
583
584    /// Set the download policy for a namespace.
585    pub fn set_download_policy(
586        &mut self,
587        namespace: &NamespaceId,
588        policy: DownloadPolicy,
589    ) -> Result<()> {
590        self.modify(|tables| {
591            let namespace = namespace.as_bytes();
592
593            // ensure the document exists
594            anyhow::ensure!(
595                tables.namespaces.get(&namespace)?.is_some(),
596                "document not created"
597            );
598
599            let value = postcard::to_stdvec(&policy)?;
600            tables.download_policy.insert(namespace, value.as_slice())?;
601            Ok(())
602        })
603    }
604
605    /// Get the download policy for a namespace.
606    pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
607        let tables = self.tables()?;
608        let value = tables.download_policy.get(namespace.as_bytes())?;
609        Ok(match value {
610            None => DownloadPolicy::default(),
611            Some(value) => postcard::from_bytes(value.value())?,
612        })
613    }
614}
615
616impl PublicKeyStore for Store {
617    fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
618        self.pubkeys.public_key(id)
619    }
620}
621
622fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
623    Capability::from_raw(raw_kind, raw_bytes)
624}
625
626fn get_exact(
627    record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
628    namespace: NamespaceId,
629    author: AuthorId,
630    key: impl AsRef<[u8]>,
631    include_empty: bool,
632) -> Result<Option<SignedEntry>> {
633    let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
634    let record = record_table.get(id)?;
635    Ok(record
636        .map(|r| into_entry(id, r.value()))
637        .filter(|entry| include_empty || !entry.is_empty()))
638}
639
640/// A wrapper around [`Store`] for a specific [`NamespaceId`]
641#[derive(Debug)]
642pub struct StoreInstance<'a> {
643    namespace: NamespaceId,
644    pub(crate) store: &'a mut Store,
645}
646
647impl<'a> StoreInstance<'a> {
648    pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
649        StoreInstance { namespace, store }
650    }
651}
652
653impl PublicKeyStore for StoreInstance<'_> {
654    fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
655        self.store.public_key(id)
656    }
657}
658
659impl super::DownloadPolicyStore for StoreInstance<'_> {
660    fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
661        self.store.get_download_policy(namespace)
662    }
663}
664
665impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
666    type Error = anyhow::Error;
667    type RangeIterator<'x>
668        = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
669    where
670        'a: 'x;
671    type ParentIterator<'x>
672        = ParentIterator
673    where
674        'a: 'x;
675
676    /// Get a the first key (or the default if none is available).
677    fn get_first(&mut self) -> Result<RecordIdentifier> {
678        let tables = self.store.as_mut().tables()?;
679        // TODO: verify this fetches all keys with this namespace
680        let bounds = RecordsBounds::namespace(self.namespace);
681        let mut records = tables.records.range(bounds.as_ref())?;
682
683        let Some(record) = records.next() else {
684            return Ok(RecordIdentifier::default());
685        };
686        let (compound_key, _value) = record?;
687        let (namespace_id, author_id, key) = compound_key.value();
688        let id = RecordIdentifier::new(namespace_id, author_id, key);
689        Ok(id)
690    }
691
692    #[cfg(test)]
693    fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
694        self.store
695            .as_mut()
696            .get_exact(id.namespace(), id.author(), id.key(), true)
697    }
698
699    #[cfg(test)]
700    fn len(&mut self) -> Result<usize> {
701        let tables = self.store.as_mut().tables()?;
702        let bounds = RecordsBounds::namespace(self.namespace);
703        let records = tables.records.range(bounds.as_ref())?;
704        Ok(records.count())
705    }
706
707    #[cfg(test)]
708    fn is_empty(&mut self) -> Result<bool> {
709        use redb::ReadableTableMetadata;
710        let tables = self.store.as_mut().tables()?;
711        Ok(tables.records.is_empty()?)
712    }
713
714    fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
715        // TODO: optimize
716        let elements = self.get_range(range.clone())?;
717
718        let mut fp = Fingerprint::empty();
719        for el in elements {
720            let el = el?;
721            fp ^= el.as_fingerprint();
722        }
723
724        Ok(fp)
725    }
726
727    fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
728        let id = e.id();
729        self.store.as_mut().modify(|tables| {
730            // insert into record table
731            let key = (
732                &id.namespace().to_bytes(),
733                &id.author().to_bytes(),
734                id.key(),
735            );
736            let hash = e.content_hash(); // let binding is needed
737            let value = (
738                e.timestamp(),
739                &e.signature().namespace().to_bytes(),
740                &e.signature().author().to_bytes(),
741                e.content_len(),
742                hash.as_bytes(),
743            );
744            tables.records.insert(key, value)?;
745
746            // insert into by key index table
747            let key = (
748                &id.namespace().to_bytes(),
749                id.key(),
750                &id.author().to_bytes(),
751            );
752            tables.records_by_key.insert(key, ())?;
753
754            // insert into latest table
755            let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
756            let value = (e.timestamp(), e.id().key());
757            tables.latest_per_author.insert(key, value)?;
758            Ok(())
759        })
760    }
761
762    fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
763        let tables = self.store.as_mut().tables()?;
764        let iter = match range.x().cmp(range.y()) {
765            // identity range: iter1 = all, iter2 = none
766            Ordering::Equal => {
767                // iterator for all entries in replica
768                let bounds = RecordsBounds::namespace(self.namespace);
769                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
770                chain_none(iter)
771            }
772            // regular range: iter1 = x <= t < y, iter2 = none
773            Ordering::Less => {
774                // iterator for entries from range.x to range.y
775                let start = Bound::Included(range.x().to_byte_tuple());
776                let end = Bound::Excluded(range.y().to_byte_tuple());
777                let bounds = RecordsBounds::new(start, end);
778                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
779                chain_none(iter)
780            }
781            // split range: iter1 = start <= t < y, iter2 = x <= t <= end
782            Ordering::Greater => {
783                // iterator for entries from start to range.y
784                let end = Bound::Excluded(range.y().to_byte_tuple());
785                let bounds = RecordsBounds::from_start(&self.namespace, end);
786                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
787
788                // iterator for entries from range.x to end
789                let start = Bound::Included(range.x().to_byte_tuple());
790                let bounds = RecordsBounds::to_end(&self.namespace, start);
791                let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
792
793                iter.chain(Some(iter2).into_iter().flatten())
794            }
795        };
796        Ok(iter)
797    }
798
799    #[cfg(test)]
800    fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
801        self.store.as_mut().modify(|tables| {
802            let entry = {
803                let (namespace, author, key) = id.as_byte_tuple();
804                let id = (namespace, key, author);
805                tables.records_by_key.remove(id)?;
806                let id = (namespace, author, key);
807                let value = tables.records.remove(id)?;
808                value.map(|value| into_entry(id, value.value()))
809            };
810            Ok(entry)
811        })
812    }
813
814    #[cfg(test)]
815    fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
816        let tables = self.store.as_mut().tables()?;
817        let bounds = RecordsBounds::namespace(self.namespace);
818        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
819        Ok(chain_none(iter))
820    }
821
822    fn prefixes_of(
823        &mut self,
824        id: &RecordIdentifier,
825    ) -> Result<Self::ParentIterator<'_>, Self::Error> {
826        let tables = self.store.as_mut().tables()?;
827        ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
828    }
829
830    #[cfg(test)]
831    fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
832        let tables = self.store.as_mut().tables()?;
833        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
834        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
835        Ok(chain_none(iter))
836    }
837
838    fn remove_prefix_filtered(
839        &mut self,
840        id: &RecordIdentifier,
841        predicate: impl Fn(&Record) -> bool,
842    ) -> Result<usize> {
843        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
844        self.store.as_mut().modify(|tables| {
845            let cb = |_k: RecordsId, v: RecordsValue| {
846                let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
847                let record = Record::new(hash.into(), len, timestamp);
848
849                predicate(&record)
850            };
851            let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
852            let count = iter.count();
853            Ok(count)
854        })
855    }
856}
857
858fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
859    iter: I,
860) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
861    iter.chain(None.into_iter().flatten())
862}
863
864/// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which
865/// is a prefix of the key passed to the iterator.
866#[derive(Debug)]
867pub struct ParentIterator {
868    inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
869}
870
871impl ParentIterator {
872    fn new(
873        tables: &Tables,
874        namespace: NamespaceId,
875        author: AuthorId,
876        key: Vec<u8>,
877    ) -> anyhow::Result<Self> {
878        let parents = parents(&tables.records, namespace, author, key.clone());
879        Ok(Self {
880            inner: parents.into_iter(),
881        })
882    }
883}
884
885fn parents(
886    table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
887    namespace: NamespaceId,
888    author: AuthorId,
889    mut key: Vec<u8>,
890) -> Vec<anyhow::Result<SignedEntry>> {
891    let mut res = Vec::new();
892
893    while !key.is_empty() {
894        let entry = get_exact(table, namespace, author, &key, false);
895        key.pop();
896        match entry {
897            Err(err) => res.push(Err(err)),
898            Ok(Some(entry)) => res.push(Ok(entry)),
899            Ok(None) => continue,
900        }
901    }
902    res.reverse();
903    res
904}
905
906impl Iterator for ParentIterator {
907    type Item = Result<SignedEntry>;
908
909    fn next(&mut self) -> Option<Self::Item> {
910        self.inner.next()
911    }
912}
913
914/// Iterator for all content hashes
915///
916/// Note that you might get duplicate hashes. Also, the iterator will keep
917/// a database snapshot open until it is dropped.
918///
919/// Also, this represents a snapshot of the database at the time of creation.
920/// It needs a copy of a redb::ReadOnlyTable to be self-contained.
921#[derive(derive_more::Debug)]
922pub struct ContentHashesIterator {
923    #[debug(skip)]
924    range: RecordsRange<'static>,
925}
926
927impl ContentHashesIterator {
928    /// Create a new iterator over all content hashes.
929    pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
930        let range = RecordsRange::all_static(table)?;
931        Ok(Self { range })
932    }
933}
934
935impl Iterator for ContentHashesIterator {
936    type Item = Result<Hash>;
937
938    fn next(&mut self) -> Option<Self::Item> {
939        let v = self.range.next()?;
940        Some(v.map(|e| e.content_hash()))
941    }
942}
943
944/// Iterator over the latest entry per author.
945#[derive(derive_more::Debug)]
946#[debug("LatestIterator")]
947pub struct LatestIterator<'a>(
948    redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
949);
950
951impl<'a> LatestIterator<'a> {
952    fn new(
953        latest_per_author: &'a impl ReadableTable<
954            LatestPerAuthorKey<'static>,
955            LatestPerAuthorValue<'static>,
956        >,
957        namespace: NamespaceId,
958    ) -> anyhow::Result<Self> {
959        let start = (namespace.as_bytes(), &[u8::MIN; 32]);
960        let end = (namespace.as_bytes(), &[u8::MAX; 32]);
961        let range = latest_per_author.range(start..=end)?;
962        Ok(Self(range))
963    }
964}
965
966impl Iterator for LatestIterator<'_> {
967    type Item = Result<(AuthorId, u64, Vec<u8>)>;
968
969    fn next(&mut self) -> Option<Self::Item> {
970        self.0.next_map(|key, value| {
971            let (_namespace, author) = key;
972            let (timestamp, key) = value;
973            (author.into(), timestamp, key.to_vec())
974        })
975    }
976}
977
978fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
979    let (namespace, author, key) = key;
980    let (timestamp, namespace_sig, author_sig, len, hash) = value;
981    let id = RecordIdentifier::new(namespace, author, key);
982    let record = Record::new(hash.into(), len, timestamp);
983    let entry = Entry::new(id, record);
984    let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
985    SignedEntry::new(entry_signature, entry)
986}
987
988#[cfg(all(test, feature = "fs-store"))]
989mod tests {
990    use super::*;
991    use crate::ranger::Store as _;
992
993    #[tokio::test]
994    async fn test_ranges() -> Result<()> {
995        let dbfile = tempfile::NamedTempFile::new()?;
996        let mut store = Store::persistent(dbfile.path())?;
997
998        let author = store.new_author(&mut rand::rng())?;
999        let namespace = NamespaceSecret::new(&mut rand::rng());
1000        let mut replica = store.new_replica(namespace.clone())?;
1001
1002        // test author prefix relation for all-255 keys
1003        let key1 = vec![255, 255];
1004        let key2 = vec![255, 255, 255];
1005        replica.hash_and_insert(&key1, &author, b"v1").await?;
1006        replica.hash_and_insert(&key2, &author, b"v2").await?;
1007        let res = store
1008            .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
1009            .collect::<Result<Vec<_>>>()?;
1010        assert_eq!(res.len(), 2);
1011        assert_eq!(
1012            res.into_iter()
1013                .map(|entry| entry.key().to_vec())
1014                .collect::<Vec<_>>(),
1015            vec![key1, key2]
1016        );
1017        Ok(())
1018    }
1019
1020    #[test]
1021    fn test_basics() -> Result<()> {
1022        let dbfile = tempfile::NamedTempFile::new()?;
1023        let mut store = Store::persistent(dbfile.path())?;
1024
1025        let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1026        assert!(authors.is_empty());
1027
1028        let author = store.new_author(&mut rand::rng())?;
1029        let namespace = NamespaceSecret::new(&mut rand::rng());
1030        let _replica = store.new_replica(namespace.clone())?;
1031        store.close_replica(namespace.id());
1032        let replica = store.load_replica_info(&namespace.id())?;
1033        assert_eq!(replica.capability.id(), namespace.id());
1034
1035        let author_back = store.get_author(&author.id())?.unwrap();
1036        assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1037
1038        let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1039        for i in 0..5 {
1040            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1041            let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1042            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1043            wrapper.entry_put(entry)?;
1044        }
1045
1046        // all
1047        let all: Vec<_> = wrapper.all()?.collect();
1048        assert_eq!(all.len(), 5);
1049
1050        // add a second version
1051        let mut ids = Vec::new();
1052        for i in 0..5 {
1053            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1054            let entry = Entry::new(
1055                id.clone(),
1056                Record::current_from_data(format!("world-{i}-2")),
1057            );
1058            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1059            wrapper.entry_put(entry)?;
1060            ids.push(id);
1061        }
1062
1063        // get all
1064        let entries = wrapper
1065            .store
1066            .get_many(namespace.id(), Query::all())?
1067            .collect::<Result<Vec<_>>>()?;
1068        assert_eq!(entries.len(), 5);
1069
1070        // get all prefix
1071        let entries = wrapper
1072            .store
1073            .get_many(namespace.id(), Query::key_prefix("hello-"))?
1074            .collect::<Result<Vec<_>>>()?;
1075        assert_eq!(entries.len(), 5);
1076
1077        // delete and get
1078        for id in ids {
1079            let res = wrapper.get(&id)?;
1080            assert!(res.is_some());
1081            let out = wrapper.entry_remove(&id)?.unwrap();
1082            assert_eq!(out.entry().id(), &id);
1083            let res = wrapper.get(&id)?;
1084            assert!(res.is_none());
1085        }
1086
1087        // get latest
1088        let entries = wrapper
1089            .store
1090            .get_many(namespace.id(), Query::all())?
1091            .collect::<Result<Vec<_>>>()?;
1092        assert_eq!(entries.len(), 0);
1093
1094        Ok(())
1095    }
1096
1097    fn copy_and_modify(
1098        source: &std::path::Path,
1099        modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1100    ) -> Result<tempfile::NamedTempFile> {
1101        let dbfile = tempfile::NamedTempFile::new()?;
1102        std::fs::copy(source, dbfile.path())?;
1103        let db = Database::create(dbfile.path())?;
1104        let write_tx = db.begin_write()?;
1105        modify(&write_tx)?;
1106        write_tx.commit()?;
1107        drop(db);
1108        Ok(dbfile)
1109    }
1110
1111    #[tokio::test]
1112    #[cfg(feature = "fs-store")]
1113    async fn test_migration_001_populate_latest_table() -> Result<()> {
1114        use super::tables::LATEST_PER_AUTHOR_TABLE;
1115        let dbfile = tempfile::NamedTempFile::new()?;
1116        let namespace = NamespaceSecret::new(&mut rand::rng());
1117
1118        // create a store and add some data
1119        let expected = {
1120            let mut store = Store::persistent(dbfile.path())?;
1121            let author1 = store.new_author(&mut rand::rng())?;
1122            let author2 = store.new_author(&mut rand::rng())?;
1123            let mut replica = store.new_replica(namespace.clone())?;
1124            replica.hash_and_insert(b"k1", &author1, b"v1").await?;
1125            replica.hash_and_insert(b"k2", &author2, b"v1").await?;
1126            replica.hash_and_insert(b"k3", &author1, b"v1").await?;
1127
1128            let expected = store
1129                .get_latest_for_each_author(namespace.id())?
1130                .collect::<Result<Vec<_>>>()?;
1131            // drop everything to clear file locks.
1132            store.close_replica(namespace.id());
1133            // flush the store to disk
1134            store.flush()?;
1135            drop(store);
1136            expected
1137        };
1138        assert_eq!(expected.len(), 2);
1139
1140        // create a copy of our db file with the latest table deleted.
1141        let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1142            tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1143            Ok(())
1144        })?;
1145
1146        // open the copied db file, which will run the migration.
1147        let mut store = Store::persistent(dbfile_before_migration.path())?;
1148        let actual = store
1149            .get_latest_for_each_author(namespace.id())?
1150            .collect::<Result<Vec<_>>>()?;
1151
1152        assert_eq!(expected, actual);
1153        Ok(())
1154    }
1155
1156    #[test]
1157    #[cfg(feature = "fs-store")]
1158    fn test_migration_004_populate_by_key_index() -> Result<()> {
1159        use redb::ReadableTableMetadata;
1160        let dbfile = tempfile::NamedTempFile::new()?;
1161
1162        let mut store = Store::persistent(dbfile.path())?;
1163
1164        // check that the new table is there, even if empty
1165        {
1166            let tables = store.tables()?;
1167            assert_eq!(tables.records_by_key.len()?, 0);
1168        }
1169
1170        // TODO: write test checking that the indexing is done correctly
1171        Ok(())
1172    }
1173}