iroh_docs/store/
fs.rs

1//! On disk storage for replicas.
2
3use std::{
4    cmp::Ordering,
5    collections::HashSet,
6    iter::{Chain, Flatten},
7    num::NonZeroU64,
8    ops::Bound,
9    path::Path,
10};
11
12use anyhow::{anyhow, Result};
13use ed25519_dalek::{SignatureError, VerifyingKey};
14use iroh_blobs::Hash;
15use rand_core::CryptoRngCore;
16use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable};
17use tracing::warn;
18
19use super::{
20    pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21    Query,
22};
23use crate::{
24    actor::MAX_COMMIT_DELAY,
25    keys::Author,
26    ranger::{Fingerprint, Range, RangeEntry},
27    sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28    AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29    ReplicaInfo,
30};
31
32mod bounds;
33mod migrate_v1_v2;
34mod migrations;
35mod query;
36mod ranges;
37pub(crate) mod tables;
38
39pub use self::ranges::RecordsRange;
40use self::{
41    bounds::{ByKeyBounds, RecordsBounds},
42    query::QueryIterator,
43    ranges::RangeExt,
44    tables::{
45        LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
46        RecordsValue, Tables, TransactionAndTables,
47    },
48};
49
50/// Manages the replicas and authors for an instance.
51#[derive(Debug)]
52pub struct Store {
53    db: Database,
54    transaction: CurrentTransaction,
55    open_replicas: HashSet<NamespaceId>,
56    pubkeys: MemPublicKeyStore,
57}
58
59impl Drop for Store {
60    fn drop(&mut self) {
61        if let Err(err) = self.flush() {
62            warn!("failed to trigger final flush: {:?}", err);
63        }
64    }
65}
66
67impl AsRef<Store> for Store {
68    fn as_ref(&self) -> &Store {
69        self
70    }
71}
72
73impl AsMut<Store> for Store {
74    fn as_mut(&mut self) -> &mut Store {
75        self
76    }
77}
78
79#[derive(derive_more::Debug, Default)]
80#[allow(clippy::large_enum_variant)]
81enum CurrentTransaction {
82    #[default]
83    None,
84    Read(ReadOnlyTables),
85    Write(TransactionAndTables),
86}
87
88impl Store {
89    /// Create a new store in memory.
90    pub fn memory() -> Self {
91        Self::memory_impl().expect("failed to create memory store")
92    }
93
94    fn memory_impl() -> Result<Self> {
95        let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
96        Self::new_impl(db)
97    }
98
99    /// Create or open a store from a `path` to a database file.
100    ///
101    /// The file will be created if it does not exist, otherwise it will be opened.
102    pub fn persistent(path: impl AsRef<Path>) -> Result<Self> {
103        let db = match Database::create(&path) {
104            Ok(db) => db,
105            Err(DatabaseError::UpgradeRequired(1)) => migrate_v1_v2::run(&path)?,
106            Err(err) => return Err(err.into()),
107        };
108        Self::new_impl(db)
109    }
110
111    fn new_impl(db: redb::Database) -> Result<Self> {
112        // Setup all tables
113        let write_tx = db.begin_write()?;
114        let _ = Tables::new(&write_tx)?;
115        write_tx.commit()?;
116
117        // Run database migrations
118        migrations::run_migrations(&db)?;
119
120        Ok(Store {
121            db,
122            transaction: Default::default(),
123            open_replicas: Default::default(),
124            pubkeys: Default::default(),
125        })
126    }
127
128    /// Flush the current transaction, if any.
129    ///
130    /// This is the cheapest way to ensure that the data is persisted.
131    pub fn flush(&mut self) -> Result<()> {
132        if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
133            w.commit()?;
134        }
135        Ok(())
136    }
137
138    /// Get a read-only snapshot of the database.
139    ///
140    /// This has the side effect of committing any open write transaction,
141    /// so it can be used as a way to ensure that the data is persisted.
142    pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
143        let guard = &mut self.transaction;
144        let tables = match std::mem::take(guard) {
145            CurrentTransaction::None => {
146                let tx = self.db.begin_read()?;
147                ReadOnlyTables::new(tx)?
148            }
149            CurrentTransaction::Write(w) => {
150                w.commit()?;
151                let tx = self.db.begin_read()?;
152                ReadOnlyTables::new(tx)?
153            }
154            CurrentTransaction::Read(tables) => tables,
155        };
156        *guard = CurrentTransaction::Read(tables);
157        match &*guard {
158            CurrentTransaction::Read(ref tables) => Ok(tables),
159            _ => unreachable!(),
160        }
161    }
162
163    /// Get an owned read-only snapshot of the database.
164    ///
165    /// This will open a new read transaction. The read transaction won't be reused for other
166    /// reads.
167    ///
168    /// This has the side effect of committing any open write transaction,
169    /// so it can be used as a way to ensure that the data is persisted.
170    pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
171        // make sure the current transaction is committed
172        self.flush()?;
173        assert!(matches!(self.transaction, CurrentTransaction::None));
174        let tx = self.db.begin_read()?;
175        let tables = ReadOnlyTables::new(tx)?;
176        Ok(tables)
177    }
178
179    /// Get access to the tables to read from them.
180    ///
181    /// The underlying transaction is a write transaction, but with a non-mut
182    /// reference to the tables you can not write.
183    ///
184    /// There is no guarantee that this will be an independent transaction.
185    /// You just get readonly access to the current state of the database.
186    ///
187    /// As such, there is also no guarantee that the data you see is
188    /// already persisted.
189    fn tables(&mut self) -> Result<&Tables<'_>> {
190        let guard = &mut self.transaction;
191        let tables = match std::mem::take(guard) {
192            CurrentTransaction::None => {
193                let tx = self.db.begin_write()?;
194                TransactionAndTables::new(tx)?
195            }
196            CurrentTransaction::Write(w) => {
197                if w.since.elapsed() > MAX_COMMIT_DELAY {
198                    tracing::debug!("committing transaction because it's too old");
199                    w.commit()?;
200                    let tx = self.db.begin_write()?;
201                    TransactionAndTables::new(tx)?
202                } else {
203                    w
204                }
205            }
206            CurrentTransaction::Read(_) => {
207                let tx = self.db.begin_write()?;
208                TransactionAndTables::new(tx)?
209            }
210        };
211        *guard = CurrentTransaction::Write(tables);
212        match guard {
213            CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
214            _ => unreachable!(),
215        }
216    }
217
218    /// Get exclusive write access to the tables in the current transaction.
219    ///
220    /// There is no guarantee that this will be an independent transaction.
221    /// As such, there is also no guarantee that the data you see or write
222    /// will be persisted.
223    ///
224    /// To ensure that the data is persisted, acquire a snapshot of the database
225    /// or call flush.
226    fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
227        let guard = &mut self.transaction;
228        let tables = match std::mem::take(guard) {
229            CurrentTransaction::None => {
230                let tx = self.db.begin_write()?;
231                TransactionAndTables::new(tx)?
232            }
233            CurrentTransaction::Write(w) => {
234                if w.since.elapsed() > MAX_COMMIT_DELAY {
235                    tracing::debug!("committing transaction because it's too old");
236                    w.commit()?;
237                    let tx = self.db.begin_write()?;
238                    TransactionAndTables::new(tx)?
239                } else {
240                    w
241                }
242            }
243            CurrentTransaction::Read(_) => {
244                let tx = self.db.begin_write()?;
245                TransactionAndTables::new(tx)?
246            }
247        };
248        *guard = CurrentTransaction::Write(tables);
249        let res = match &mut *guard {
250            CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
251            _ => unreachable!(),
252        };
253        Ok(res)
254    }
255}
256
257type PeersIter = std::vec::IntoIter<PeerIdBytes>;
258
259impl Store {
260    /// Create a new replica for `namespace` and persist in this store.
261    pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
262        let id = namespace.id();
263        self.import_namespace(namespace.into())?;
264        self.open_replica(&id).map_err(Into::into)
265    }
266
267    /// Create a new author key and persist it in the store.
268    pub fn new_author<R: CryptoRngCore + ?Sized>(&mut self, rng: &mut R) -> Result<Author> {
269        let author = Author::new(rng);
270        self.import_author(author.clone())?;
271        Ok(author)
272    }
273
274    /// Check if a [`AuthorHeads`] contains entry timestamps that we do not have locally.
275    ///
276    /// Returns the number of authors that the other peer has updates for.
277    pub fn has_news_for_us(
278        &mut self,
279        namespace: NamespaceId,
280        heads: &AuthorHeads,
281    ) -> Result<Option<NonZeroU64>> {
282        let our_heads = {
283            let latest = self.get_latest_for_each_author(namespace)?;
284            let mut heads = AuthorHeads::default();
285            for e in latest {
286                let (author, timestamp, _key) = e?;
287                heads.insert(author, timestamp);
288            }
289            heads
290        };
291        let has_news_for_us = heads.has_news_for(&our_heads);
292        Ok(has_news_for_us)
293    }
294
295    /// Open a replica from this store.
296    ///
297    /// This just calls load_replica_info and then creates a new replica with the info.
298    pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
299        let info = self.load_replica_info(namespace_id)?;
300        let instance = StoreInstance::new(*namespace_id, self);
301        Ok(Replica::new(instance, Box::new(info)))
302    }
303
304    /// Load the replica info from the store.
305    pub fn load_replica_info(
306        &mut self,
307        namespace_id: &NamespaceId,
308    ) -> Result<ReplicaInfo, OpenError> {
309        let tables = self.tables()?;
310        let info = match tables.namespaces.get(namespace_id.as_bytes()) {
311            Ok(Some(db_value)) => {
312                let (raw_kind, raw_bytes) = db_value.value();
313                let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
314                ReplicaInfo::new(namespace)
315            }
316            Ok(None) => return Err(OpenError::NotFound),
317            Err(err) => return Err(OpenError::Other(err.into())),
318        };
319        self.open_replicas.insert(info.capability.id());
320        Ok(info)
321    }
322
323    /// Close a replica.
324    pub fn close_replica(&mut self, id: NamespaceId) {
325        self.open_replicas.remove(&id);
326    }
327
328    /// List all replica namespaces in this store.
329    pub fn list_namespaces(
330        &mut self,
331    ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
332        let snapshot = self.snapshot()?;
333        let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
334        let iter = iter.map(|res| {
335            let capability = parse_capability(res?.1.value())?;
336            Ok((capability.id(), capability.kind()))
337        });
338        Ok(iter)
339    }
340
341    /// Get an author key from the store.
342    pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
343        let tables = self.tables()?;
344        let Some(author) = tables.authors.get(author_id.as_bytes())? else {
345            return Ok(None);
346        };
347        let author = Author::from_bytes(author.value());
348        Ok(Some(author))
349    }
350
351    /// Import an author key pair.
352    pub fn import_author(&mut self, author: Author) -> Result<()> {
353        self.modify(|tables| {
354            tables
355                .authors
356                .insert(author.id().as_bytes(), &author.to_bytes())?;
357            Ok(())
358        })
359    }
360
361    /// Delete an author.
362    pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
363        self.modify(|tables| {
364            tables.authors.remove(author.as_bytes())?;
365            Ok(())
366        })
367    }
368
369    /// List all author keys in this store.
370    pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
371        let tables = self.snapshot()?;
372        let iter = tables
373            .authors
374            .range::<&'static [u8; 32]>(..)?
375            .map(|res| match res {
376                Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
377                Err(err) => Err(err.into()),
378            });
379        Ok(iter)
380    }
381
382    /// Import a new replica namespace.
383    pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
384        self.modify(|tables| {
385            let outcome = {
386                let (capability, outcome) = {
387                    let existing = tables.namespaces.get(capability.id().as_bytes())?;
388                    if let Some(existing) = existing {
389                        let mut existing = parse_capability(existing.value())?;
390                        let outcome = if existing.merge(capability)? {
391                            ImportNamespaceOutcome::Upgraded
392                        } else {
393                            ImportNamespaceOutcome::NoChange
394                        };
395                        (existing, outcome)
396                    } else {
397                        (capability, ImportNamespaceOutcome::Inserted)
398                    }
399                };
400                let id = capability.id().to_bytes();
401                let (kind, bytes) = capability.raw();
402                tables.namespaces.insert(&id, (kind, &bytes))?;
403                outcome
404            };
405            Ok(outcome)
406        })
407    }
408
409    /// Remove a replica.
410    ///
411    /// Completely removes a replica and deletes both the namespace private key and all document
412    /// entries.
413    ///
414    /// Note that a replica has to be closed before it can be removed. The store has to enforce
415    /// that a replica cannot be removed while it is still open.
416    pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
417        if self.open_replicas.contains(namespace) {
418            return Err(anyhow!("replica is not closed"));
419        }
420        self.modify(|tables| {
421            let bounds = RecordsBounds::namespace(*namespace);
422            tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
423            let bounds = ByKeyBounds::namespace(*namespace);
424            let _ = tables
425                .records_by_key
426                .retain_in(bounds.as_ref(), |_k, _v| false);
427            tables.namespaces.remove(namespace.as_bytes())?;
428            tables.namespace_peers.remove_all(namespace.as_bytes())?;
429            tables.download_policy.remove(namespace.as_bytes())?;
430            Ok(())
431        })
432    }
433
434    /// Get an iterator over entries of a replica.
435    pub fn get_many(
436        &mut self,
437        namespace: NamespaceId,
438        query: impl Into<Query>,
439    ) -> Result<QueryIterator> {
440        let tables = self.snapshot_owned()?;
441        QueryIterator::new(tables, namespace, query.into())
442    }
443
444    /// Get an entry by key and author.
445    pub fn get_exact(
446        &mut self,
447        namespace: NamespaceId,
448        author: AuthorId,
449        key: impl AsRef<[u8]>,
450        include_empty: bool,
451    ) -> Result<Option<SignedEntry>> {
452        get_exact(
453            &self.tables()?.records,
454            namespace,
455            author,
456            key,
457            include_empty,
458        )
459    }
460
461    /// Get all content hashes of all replicas in the store.
462    pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
463        let tables = self.snapshot_owned()?;
464        ContentHashesIterator::all(&tables.records)
465    }
466
467    /// Get the latest entry for each author in a namespace.
468    pub fn get_latest_for_each_author(
469        &mut self,
470        namespace: NamespaceId,
471    ) -> Result<LatestIterator<'_>> {
472        LatestIterator::new(&self.tables()?.latest_per_author, namespace)
473    }
474
475    /// Register a peer that has been useful to sync a document.
476    pub fn register_useful_peer(
477        &mut self,
478        namespace: NamespaceId,
479        peer: crate::PeerIdBytes,
480    ) -> Result<()> {
481        let peer = &peer;
482        let namespace = namespace.as_bytes();
483        // calculate nanos since UNIX_EPOCH for a time measurement
484        let nanos = std::time::UNIX_EPOCH
485            .elapsed()
486            .map(|duration| duration.as_nanos() as u64)?;
487        self.modify(|tables| {
488            // ensure the document exists
489            anyhow::ensure!(
490                tables.namespaces.get(namespace)?.is_some(),
491                "document not created"
492            );
493
494            let mut namespace_peers = tables.namespace_peers.get(namespace)?;
495
496            // get the oldest entry since it's candidate for removal
497            let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
498                let (oldest_nanos, &oldest_peer) = guard.value();
499                (oldest_nanos, oldest_peer)
500            });
501            match maybe_oldest {
502                None => {
503                    // the table is empty so the peer can be inserted without further checks since
504                    // super::PEERS_PER_DOC_CACHE_SIZE is non zero
505                    drop(namespace_peers);
506                    tables.namespace_peers.insert(namespace, (nanos, peer))?;
507                }
508                Some((oldest_nanos, oldest_peer)) => {
509                    let oldest_peer = &oldest_peer;
510
511                    if oldest_peer == peer {
512                        // oldest peer is the current one, so replacing the entry for the peer will
513                        // maintain the size
514                        drop(namespace_peers);
515                        tables
516                            .namespace_peers
517                            .remove(namespace, (oldest_nanos, oldest_peer))?;
518                        tables.namespace_peers.insert(namespace, (nanos, peer))?;
519                    } else {
520                        // calculate the len in the same loop since calling `len` is another fallible operation
521                        let mut len = 1;
522                        // find any previous entry for the same peer to remove it
523                        let mut prev_peer_nanos = None;
524
525                        for result in namespace_peers {
526                            len += 1;
527                            let guard = result?;
528                            let (peer_nanos, peer_bytes) = guard.value();
529                            if prev_peer_nanos.is_none() && peer_bytes == peer {
530                                prev_peer_nanos = Some(peer_nanos)
531                            }
532                        }
533
534                        match prev_peer_nanos {
535                            Some(prev_nanos) => {
536                                // the peer was already present, so we can remove the old entry and
537                                // insert the new one without checking the size
538                                tables
539                                    .namespace_peers
540                                    .remove(namespace, (prev_nanos, peer))?;
541                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
542                            }
543                            None => {
544                                // the peer is new and the table is non empty, add it and check the
545                                // size to decide if the oldest peer should be evicted
546                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
547                                len += 1;
548                                if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
549                                    tables
550                                        .namespace_peers
551                                        .remove(namespace, (oldest_nanos, oldest_peer))?;
552                                }
553                            }
554                        }
555                    }
556                }
557            }
558            Ok(())
559        })
560    }
561
562    /// Get the peers that have been useful for a document.
563    pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
564        let tables = self.tables()?;
565        let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
566        for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
567            let (_nanos, &peer) = result?.value();
568            peers.push(peer);
569        }
570        if peers.is_empty() {
571            Ok(None)
572        } else {
573            Ok(Some(peers.into_iter()))
574        }
575    }
576
577    /// Set the download policy for a namespace.
578    pub fn set_download_policy(
579        &mut self,
580        namespace: &NamespaceId,
581        policy: DownloadPolicy,
582    ) -> Result<()> {
583        self.modify(|tables| {
584            let namespace = namespace.as_bytes();
585
586            // ensure the document exists
587            anyhow::ensure!(
588                tables.namespaces.get(&namespace)?.is_some(),
589                "document not created"
590            );
591
592            let value = postcard::to_stdvec(&policy)?;
593            tables.download_policy.insert(namespace, value.as_slice())?;
594            Ok(())
595        })
596    }
597
598    /// Get the download policy for a namespace.
599    pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
600        let tables = self.tables()?;
601        let value = tables.download_policy.get(namespace.as_bytes())?;
602        Ok(match value {
603            None => DownloadPolicy::default(),
604            Some(value) => postcard::from_bytes(value.value())?,
605        })
606    }
607}
608
609impl PublicKeyStore for Store {
610    fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
611        self.pubkeys.public_key(id)
612    }
613}
614
615fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
616    Capability::from_raw(raw_kind, raw_bytes)
617}
618
619fn get_exact(
620    record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
621    namespace: NamespaceId,
622    author: AuthorId,
623    key: impl AsRef<[u8]>,
624    include_empty: bool,
625) -> Result<Option<SignedEntry>> {
626    let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
627    let record = record_table.get(id)?;
628    Ok(record
629        .map(|r| into_entry(id, r.value()))
630        .filter(|entry| include_empty || !entry.is_empty()))
631}
632
633/// A wrapper around [`Store`] for a specific [`NamespaceId`]
634#[derive(Debug)]
635pub struct StoreInstance<'a> {
636    namespace: NamespaceId,
637    pub(crate) store: &'a mut Store,
638}
639
640impl<'a> StoreInstance<'a> {
641    pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
642        StoreInstance { namespace, store }
643    }
644}
645
646impl PublicKeyStore for StoreInstance<'_> {
647    fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
648        self.store.public_key(id)
649    }
650}
651
652impl super::DownloadPolicyStore for StoreInstance<'_> {
653    fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
654        self.store.get_download_policy(namespace)
655    }
656}
657
658impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
659    type Error = anyhow::Error;
660    type RangeIterator<'x>
661        = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
662    where
663        'a: 'x;
664    type ParentIterator<'x>
665        = ParentIterator
666    where
667        'a: 'x;
668
669    /// Get a the first key (or the default if none is available).
670    fn get_first(&mut self) -> Result<RecordIdentifier> {
671        let tables = self.store.as_mut().tables()?;
672        // TODO: verify this fetches all keys with this namespace
673        let bounds = RecordsBounds::namespace(self.namespace);
674        let mut records = tables.records.range(bounds.as_ref())?;
675
676        let Some(record) = records.next() else {
677            return Ok(RecordIdentifier::default());
678        };
679        let (compound_key, _value) = record?;
680        let (namespace_id, author_id, key) = compound_key.value();
681        let id = RecordIdentifier::new(namespace_id, author_id, key);
682        Ok(id)
683    }
684
685    #[cfg(test)]
686    fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
687        self.store
688            .as_mut()
689            .get_exact(id.namespace(), id.author(), id.key(), true)
690    }
691
692    #[cfg(test)]
693    fn len(&mut self) -> Result<usize> {
694        let tables = self.store.as_mut().tables()?;
695        let bounds = RecordsBounds::namespace(self.namespace);
696        let records = tables.records.range(bounds.as_ref())?;
697        Ok(records.count())
698    }
699
700    #[cfg(test)]
701    fn is_empty(&mut self) -> Result<bool> {
702        use redb::ReadableTableMetadata;
703        let tables = self.store.as_mut().tables()?;
704        Ok(tables.records.is_empty()?)
705    }
706
707    fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
708        // TODO: optimize
709        let elements = self.get_range(range.clone())?;
710
711        let mut fp = Fingerprint::empty();
712        for el in elements {
713            let el = el?;
714            fp ^= el.as_fingerprint();
715        }
716
717        Ok(fp)
718    }
719
720    fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
721        let id = e.id();
722        self.store.as_mut().modify(|tables| {
723            // insert into record table
724            let key = (
725                &id.namespace().to_bytes(),
726                &id.author().to_bytes(),
727                id.key(),
728            );
729            let hash = e.content_hash(); // let binding is needed
730            let value = (
731                e.timestamp(),
732                &e.signature().namespace().to_bytes(),
733                &e.signature().author().to_bytes(),
734                e.content_len(),
735                hash.as_bytes(),
736            );
737            tables.records.insert(key, value)?;
738
739            // insert into by key index table
740            let key = (
741                &id.namespace().to_bytes(),
742                id.key(),
743                &id.author().to_bytes(),
744            );
745            tables.records_by_key.insert(key, ())?;
746
747            // insert into latest table
748            let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
749            let value = (e.timestamp(), e.id().key());
750            tables.latest_per_author.insert(key, value)?;
751            Ok(())
752        })
753    }
754
755    fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
756        let tables = self.store.as_mut().tables()?;
757        let iter = match range.x().cmp(range.y()) {
758            // identity range: iter1 = all, iter2 = none
759            Ordering::Equal => {
760                // iterator for all entries in replica
761                let bounds = RecordsBounds::namespace(self.namespace);
762                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
763                chain_none(iter)
764            }
765            // regular range: iter1 = x <= t < y, iter2 = none
766            Ordering::Less => {
767                // iterator for entries from range.x to range.y
768                let start = Bound::Included(range.x().to_byte_tuple());
769                let end = Bound::Excluded(range.y().to_byte_tuple());
770                let bounds = RecordsBounds::new(start, end);
771                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
772                chain_none(iter)
773            }
774            // split range: iter1 = start <= t < y, iter2 = x <= t <= end
775            Ordering::Greater => {
776                // iterator for entries from start to range.y
777                let end = Bound::Excluded(range.y().to_byte_tuple());
778                let bounds = RecordsBounds::from_start(&self.namespace, end);
779                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
780
781                // iterator for entries from range.x to end
782                let start = Bound::Included(range.x().to_byte_tuple());
783                let bounds = RecordsBounds::to_end(&self.namespace, start);
784                let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
785
786                iter.chain(Some(iter2).into_iter().flatten())
787            }
788        };
789        Ok(iter)
790    }
791
792    #[cfg(test)]
793    fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
794        self.store.as_mut().modify(|tables| {
795            let entry = {
796                let (namespace, author, key) = id.as_byte_tuple();
797                let id = (namespace, key, author);
798                tables.records_by_key.remove(id)?;
799                let id = (namespace, author, key);
800                let value = tables.records.remove(id)?;
801                value.map(|value| into_entry(id, value.value()))
802            };
803            Ok(entry)
804        })
805    }
806
807    #[cfg(test)]
808    fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
809        let tables = self.store.as_mut().tables()?;
810        let bounds = RecordsBounds::namespace(self.namespace);
811        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
812        Ok(chain_none(iter))
813    }
814
815    fn prefixes_of(
816        &mut self,
817        id: &RecordIdentifier,
818    ) -> Result<Self::ParentIterator<'_>, Self::Error> {
819        let tables = self.store.as_mut().tables()?;
820        ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
821    }
822
823    #[cfg(test)]
824    fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
825        let tables = self.store.as_mut().tables()?;
826        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
827        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
828        Ok(chain_none(iter))
829    }
830
831    fn remove_prefix_filtered(
832        &mut self,
833        id: &RecordIdentifier,
834        predicate: impl Fn(&Record) -> bool,
835    ) -> Result<usize> {
836        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
837        self.store.as_mut().modify(|tables| {
838            let cb = |_k: RecordsId, v: RecordsValue| {
839                let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
840                let record = Record::new(hash.into(), len, timestamp);
841
842                predicate(&record)
843            };
844            let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
845            let count = iter.count();
846            Ok(count)
847        })
848    }
849}
850
851fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
852    iter: I,
853) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
854    iter.chain(None.into_iter().flatten())
855}
856
857/// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which
858/// is a prefix of the key passed to the iterator.
859#[derive(Debug)]
860pub struct ParentIterator {
861    inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
862}
863
864impl ParentIterator {
865    fn new(
866        tables: &Tables,
867        namespace: NamespaceId,
868        author: AuthorId,
869        key: Vec<u8>,
870    ) -> anyhow::Result<Self> {
871        let parents = parents(&tables.records, namespace, author, key.clone());
872        Ok(Self {
873            inner: parents.into_iter(),
874        })
875    }
876}
877
878fn parents(
879    table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
880    namespace: NamespaceId,
881    author: AuthorId,
882    mut key: Vec<u8>,
883) -> Vec<anyhow::Result<SignedEntry>> {
884    let mut res = Vec::new();
885
886    while !key.is_empty() {
887        let entry = get_exact(table, namespace, author, &key, false);
888        key.pop();
889        match entry {
890            Err(err) => res.push(Err(err)),
891            Ok(Some(entry)) => res.push(Ok(entry)),
892            Ok(None) => continue,
893        }
894    }
895    res.reverse();
896    res
897}
898
899impl Iterator for ParentIterator {
900    type Item = Result<SignedEntry>;
901
902    fn next(&mut self) -> Option<Self::Item> {
903        self.inner.next()
904    }
905}
906
907/// Iterator for all content hashes
908///
909/// Note that you might get duplicate hashes. Also, the iterator will keep
910/// a database snapshot open until it is dropped.
911///
912/// Also, this represents a snapshot of the database at the time of creation.
913/// It needs a copy of a redb::ReadOnlyTable to be self-contained.
914#[derive(derive_more::Debug)]
915pub struct ContentHashesIterator {
916    #[debug(skip)]
917    range: RecordsRange<'static>,
918}
919
920impl ContentHashesIterator {
921    /// Create a new iterator over all content hashes.
922    pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
923        let range = RecordsRange::all_static(table)?;
924        Ok(Self { range })
925    }
926}
927
928impl Iterator for ContentHashesIterator {
929    type Item = Result<Hash>;
930
931    fn next(&mut self) -> Option<Self::Item> {
932        let v = self.range.next()?;
933        Some(v.map(|e| e.content_hash()))
934    }
935}
936
937/// Iterator over the latest entry per author.
938#[derive(derive_more::Debug)]
939#[debug("LatestIterator")]
940pub struct LatestIterator<'a>(
941    redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
942);
943
944impl<'a> LatestIterator<'a> {
945    fn new(
946        latest_per_author: &'a impl ReadableTable<
947            LatestPerAuthorKey<'static>,
948            LatestPerAuthorValue<'static>,
949        >,
950        namespace: NamespaceId,
951    ) -> anyhow::Result<Self> {
952        let start = (namespace.as_bytes(), &[u8::MIN; 32]);
953        let end = (namespace.as_bytes(), &[u8::MAX; 32]);
954        let range = latest_per_author.range(start..=end)?;
955        Ok(Self(range))
956    }
957}
958
959impl Iterator for LatestIterator<'_> {
960    type Item = Result<(AuthorId, u64, Vec<u8>)>;
961
962    fn next(&mut self) -> Option<Self::Item> {
963        self.0.next_map(|key, value| {
964            let (_namespace, author) = key;
965            let (timestamp, key) = value;
966            (author.into(), timestamp, key.to_vec())
967        })
968    }
969}
970
971fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
972    let (namespace, author, key) = key;
973    let (timestamp, namespace_sig, author_sig, len, hash) = value;
974    let id = RecordIdentifier::new(namespace, author, key);
975    let record = Record::new(hash.into(), len, timestamp);
976    let entry = Entry::new(id, record);
977    let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
978    SignedEntry::new(entry_signature, entry)
979}
980
981#[cfg(test)]
982mod tests {
983    use super::{tables::LATEST_PER_AUTHOR_TABLE, *};
984    use crate::ranger::Store as _;
985
986    #[test]
987    fn test_ranges() -> Result<()> {
988        let dbfile = tempfile::NamedTempFile::new()?;
989        let mut store = Store::persistent(dbfile.path())?;
990
991        let author = store.new_author(&mut rand::thread_rng())?;
992        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
993        let mut replica = store.new_replica(namespace.clone())?;
994
995        // test author prefix relation for all-255 keys
996        let key1 = vec![255, 255];
997        let key2 = vec![255, 255, 255];
998        replica.hash_and_insert(&key1, &author, b"v1")?;
999        replica.hash_and_insert(&key2, &author, b"v2")?;
1000        let res = store
1001            .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
1002            .collect::<Result<Vec<_>>>()?;
1003        assert_eq!(res.len(), 2);
1004        assert_eq!(
1005            res.into_iter()
1006                .map(|entry| entry.key().to_vec())
1007                .collect::<Vec<_>>(),
1008            vec![key1, key2]
1009        );
1010        Ok(())
1011    }
1012
1013    #[test]
1014    fn test_basics() -> Result<()> {
1015        let dbfile = tempfile::NamedTempFile::new()?;
1016        let mut store = Store::persistent(dbfile.path())?;
1017
1018        let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1019        assert!(authors.is_empty());
1020
1021        let author = store.new_author(&mut rand::thread_rng())?;
1022        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1023        let _replica = store.new_replica(namespace.clone())?;
1024        store.close_replica(namespace.id());
1025        let replica = store.load_replica_info(&namespace.id())?;
1026        assert_eq!(replica.capability.id(), namespace.id());
1027
1028        let author_back = store.get_author(&author.id())?.unwrap();
1029        assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1030
1031        let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1032        for i in 0..5 {
1033            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1034            let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1035            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1036            wrapper.entry_put(entry)?;
1037        }
1038
1039        // all
1040        let all: Vec<_> = wrapper.all()?.collect();
1041        assert_eq!(all.len(), 5);
1042
1043        // add a second version
1044        let mut ids = Vec::new();
1045        for i in 0..5 {
1046            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1047            let entry = Entry::new(
1048                id.clone(),
1049                Record::current_from_data(format!("world-{i}-2")),
1050            );
1051            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1052            wrapper.entry_put(entry)?;
1053            ids.push(id);
1054        }
1055
1056        // get all
1057        let entries = wrapper
1058            .store
1059            .get_many(namespace.id(), Query::all())?
1060            .collect::<Result<Vec<_>>>()?;
1061        assert_eq!(entries.len(), 5);
1062
1063        // get all prefix
1064        let entries = wrapper
1065            .store
1066            .get_many(namespace.id(), Query::key_prefix("hello-"))?
1067            .collect::<Result<Vec<_>>>()?;
1068        assert_eq!(entries.len(), 5);
1069
1070        // delete and get
1071        for id in ids {
1072            let res = wrapper.get(&id)?;
1073            assert!(res.is_some());
1074            let out = wrapper.entry_remove(&id)?.unwrap();
1075            assert_eq!(out.entry().id(), &id);
1076            let res = wrapper.get(&id)?;
1077            assert!(res.is_none());
1078        }
1079
1080        // get latest
1081        let entries = wrapper
1082            .store
1083            .get_many(namespace.id(), Query::all())?
1084            .collect::<Result<Vec<_>>>()?;
1085        assert_eq!(entries.len(), 0);
1086
1087        Ok(())
1088    }
1089
1090    fn copy_and_modify(
1091        source: &Path,
1092        modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1093    ) -> Result<tempfile::NamedTempFile> {
1094        let dbfile = tempfile::NamedTempFile::new()?;
1095        std::fs::copy(source, dbfile.path())?;
1096        let db = Database::create(dbfile.path())?;
1097        let write_tx = db.begin_write()?;
1098        modify(&write_tx)?;
1099        write_tx.commit()?;
1100        drop(db);
1101        Ok(dbfile)
1102    }
1103
1104    #[test]
1105    fn test_migration_001_populate_latest_table() -> Result<()> {
1106        let dbfile = tempfile::NamedTempFile::new()?;
1107        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1108
1109        // create a store and add some data
1110        let expected = {
1111            let mut store = Store::persistent(dbfile.path())?;
1112            let author1 = store.new_author(&mut rand::thread_rng())?;
1113            let author2 = store.new_author(&mut rand::thread_rng())?;
1114            let mut replica = store.new_replica(namespace.clone())?;
1115            replica.hash_and_insert(b"k1", &author1, b"v1")?;
1116            replica.hash_and_insert(b"k2", &author2, b"v1")?;
1117            replica.hash_and_insert(b"k3", &author1, b"v1")?;
1118
1119            let expected = store
1120                .get_latest_for_each_author(namespace.id())?
1121                .collect::<Result<Vec<_>>>()?;
1122            // drop everything to clear file locks.
1123            store.close_replica(namespace.id());
1124            // flush the store to disk
1125            store.flush()?;
1126            drop(store);
1127            expected
1128        };
1129        assert_eq!(expected.len(), 2);
1130
1131        // create a copy of our db file with the latest table deleted.
1132        let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1133            tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1134            Ok(())
1135        })?;
1136
1137        // open the copied db file, which will run the migration.
1138        let mut store = Store::persistent(dbfile_before_migration.path())?;
1139        let actual = store
1140            .get_latest_for_each_author(namespace.id())?
1141            .collect::<Result<Vec<_>>>()?;
1142
1143        assert_eq!(expected, actual);
1144        Ok(())
1145    }
1146
1147    #[test]
1148    fn test_migration_004_populate_by_key_index() -> Result<()> {
1149        use redb::ReadableTableMetadata;
1150        let dbfile = tempfile::NamedTempFile::new()?;
1151
1152        let mut store = Store::persistent(dbfile.path())?;
1153
1154        // check that the new table is there, even if empty
1155        {
1156            let tables = store.tables()?;
1157            assert_eq!(tables.records_by_key.len()?, 0);
1158        }
1159
1160        // TODO: write test checking that the indexing is done correctly
1161        Ok(())
1162    }
1163}