iroh_docs/store/
fs.rs

1//! On disk storage for replicas.
2
3use std::{
4    cmp::Ordering,
5    collections::HashSet,
6    iter::{Chain, Flatten},
7    num::NonZeroU64,
8    ops::Bound,
9    path::Path,
10};
11
12use anyhow::{anyhow, Result};
13use ed25519_dalek::{SignatureError, VerifyingKey};
14use iroh_blobs::Hash;
15use rand_core::CryptoRngCore;
16use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable, ReadableTableMetadata};
17use tracing::warn;
18
19use super::{
20    pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21    Query,
22};
23use crate::{
24    actor::MAX_COMMIT_DELAY,
25    keys::Author,
26    ranger::{Fingerprint, Range, RangeEntry},
27    sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28    AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29    ReplicaInfo,
30};
31
32mod bounds;
33mod migrate_v1_v2;
34mod migrations;
35mod query;
36mod ranges;
37pub(crate) mod tables;
38
39pub use self::ranges::RecordsRange;
40use self::{
41    bounds::{ByKeyBounds, RecordsBounds},
42    query::QueryIterator,
43    ranges::RangeExt,
44    tables::{
45        LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
46        RecordsValue, Tables, TransactionAndTables,
47    },
48};
49
50/// Manages the replicas and authors for an instance.
51#[derive(Debug)]
52pub struct Store {
53    db: Database,
54    transaction: CurrentTransaction,
55    open_replicas: HashSet<NamespaceId>,
56    pubkeys: MemPublicKeyStore,
57}
58
59impl Drop for Store {
60    fn drop(&mut self) {
61        if let Err(err) = self.flush() {
62            warn!("failed to trigger final flush: {:?}", err);
63        }
64    }
65}
66
67impl AsRef<Store> for Store {
68    fn as_ref(&self) -> &Store {
69        self
70    }
71}
72
73impl AsMut<Store> for Store {
74    fn as_mut(&mut self) -> &mut Store {
75        self
76    }
77}
78
79#[derive(derive_more::Debug, Default)]
80#[allow(clippy::large_enum_variant)]
81enum CurrentTransaction {
82    #[default]
83    None,
84    Read(ReadOnlyTables),
85    Write(TransactionAndTables),
86}
87
88impl Store {
89    /// Create a new store in memory.
90    pub fn memory() -> Self {
91        Self::memory_impl().expect("failed to create memory store")
92    }
93
94    fn memory_impl() -> Result<Self> {
95        let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
96        Self::new_impl(db)
97    }
98
99    /// Create or open a store from a `path` to a database file.
100    ///
101    /// The file will be created if it does not exist, otherwise it will be opened.
102    pub fn persistent(path: impl AsRef<Path>) -> Result<Self> {
103        let db = match Database::create(&path) {
104            Ok(db) => db,
105            Err(DatabaseError::UpgradeRequired(1)) => migrate_v1_v2::run(&path)?,
106            Err(err) => return Err(err.into()),
107        };
108        Self::new_impl(db)
109    }
110
111    fn new_impl(db: redb::Database) -> Result<Self> {
112        // Setup all tables
113        let write_tx = db.begin_write()?;
114        let _ = Tables::new(&write_tx)?;
115        write_tx.commit()?;
116
117        // Run database migrations
118        migrations::run_migrations(&db)?;
119
120        Ok(Store {
121            db,
122            transaction: Default::default(),
123            open_replicas: Default::default(),
124            pubkeys: Default::default(),
125        })
126    }
127
128    /// Flush the current transaction, if any.
129    ///
130    /// This is the cheapest way to ensure that the data is persisted.
131    pub fn flush(&mut self) -> Result<()> {
132        if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
133            w.commit()?;
134        }
135        Ok(())
136    }
137
138    /// Get a read-only snapshot of the database.
139    ///
140    /// This has the side effect of committing any open write transaction,
141    /// so it can be used as a way to ensure that the data is persisted.
142    pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
143        let guard = &mut self.transaction;
144        let tables = match std::mem::take(guard) {
145            CurrentTransaction::None => {
146                let tx = self.db.begin_read()?;
147                ReadOnlyTables::new(tx)?
148            }
149            CurrentTransaction::Write(w) => {
150                w.commit()?;
151                let tx = self.db.begin_read()?;
152                ReadOnlyTables::new(tx)?
153            }
154            CurrentTransaction::Read(tables) => tables,
155        };
156        *guard = CurrentTransaction::Read(tables);
157        match &*guard {
158            CurrentTransaction::Read(ref tables) => Ok(tables),
159            _ => unreachable!(),
160        }
161    }
162
163    /// Get an owned read-only snapshot of the database.
164    ///
165    /// This will open a new read transaction. The read transaction won't be reused for other
166    /// reads.
167    ///
168    /// This has the side effect of committing any open write transaction,
169    /// so it can be used as a way to ensure that the data is persisted.
170    pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
171        // make sure the current transaction is committed
172        self.flush()?;
173        assert!(matches!(self.transaction, CurrentTransaction::None));
174        let tx = self.db.begin_read()?;
175        let tables = ReadOnlyTables::new(tx)?;
176        Ok(tables)
177    }
178
179    /// Get access to the tables to read from them.
180    ///
181    /// The underlying transaction is a write transaction, but with a non-mut
182    /// reference to the tables you can not write.
183    ///
184    /// There is no guarantee that this will be an independent transaction.
185    /// You just get readonly access to the current state of the database.
186    ///
187    /// As such, there is also no guarantee that the data you see is
188    /// already persisted.
189    fn tables(&mut self) -> Result<&Tables<'_>> {
190        let guard = &mut self.transaction;
191        let tables = match std::mem::take(guard) {
192            CurrentTransaction::None => {
193                let tx = self.db.begin_write()?;
194                TransactionAndTables::new(tx)?
195            }
196            CurrentTransaction::Write(w) => {
197                if w.since.elapsed() > MAX_COMMIT_DELAY {
198                    tracing::debug!("committing transaction because it's too old");
199                    w.commit()?;
200                    let tx = self.db.begin_write()?;
201                    TransactionAndTables::new(tx)?
202                } else {
203                    w
204                }
205            }
206            CurrentTransaction::Read(_) => {
207                let tx = self.db.begin_write()?;
208                TransactionAndTables::new(tx)?
209            }
210        };
211        *guard = CurrentTransaction::Write(tables);
212        match guard {
213            CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
214            _ => unreachable!(),
215        }
216    }
217
218    /// Get exclusive write access to the tables in the current transaction.
219    ///
220    /// There is no guarantee that this will be an independent transaction.
221    /// As such, there is also no guarantee that the data you see or write
222    /// will be persisted.
223    ///
224    /// To ensure that the data is persisted, acquire a snapshot of the database
225    /// or call flush.
226    fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
227        let guard = &mut self.transaction;
228        let tables = match std::mem::take(guard) {
229            CurrentTransaction::None => {
230                let tx = self.db.begin_write()?;
231                TransactionAndTables::new(tx)?
232            }
233            CurrentTransaction::Write(w) => {
234                if w.since.elapsed() > MAX_COMMIT_DELAY {
235                    tracing::debug!("committing transaction because it's too old");
236                    w.commit()?;
237                    let tx = self.db.begin_write()?;
238                    TransactionAndTables::new(tx)?
239                } else {
240                    w
241                }
242            }
243            CurrentTransaction::Read(_) => {
244                let tx = self.db.begin_write()?;
245                TransactionAndTables::new(tx)?
246            }
247        };
248        *guard = CurrentTransaction::Write(tables);
249        let res = match &mut *guard {
250            CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
251            _ => unreachable!(),
252        };
253        Ok(res)
254    }
255}
256
257type PeersIter = std::vec::IntoIter<PeerIdBytes>;
258
259impl Store {
260    /// Create a new replica for `namespace` and persist in this store.
261    pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
262        let id = namespace.id();
263        self.import_namespace(namespace.into())?;
264        self.open_replica(&id).map_err(Into::into)
265    }
266
267    /// Create a new author key and persist it in the store.
268    pub fn new_author<R: CryptoRngCore + ?Sized>(&mut self, rng: &mut R) -> Result<Author> {
269        let author = Author::new(rng);
270        self.import_author(author.clone())?;
271        Ok(author)
272    }
273
274    /// Check if a [`AuthorHeads`] contains entry timestamps that we do not have locally.
275    ///
276    /// Returns the number of authors that the other peer has updates for.
277    pub fn has_news_for_us(
278        &mut self,
279        namespace: NamespaceId,
280        heads: &AuthorHeads,
281    ) -> Result<Option<NonZeroU64>> {
282        let our_heads = {
283            let latest = self.get_latest_for_each_author(namespace)?;
284            let mut heads = AuthorHeads::default();
285            for e in latest {
286                let (author, timestamp, _key) = e?;
287                heads.insert(author, timestamp);
288            }
289            heads
290        };
291        let has_news_for_us = heads.has_news_for(&our_heads);
292        Ok(has_news_for_us)
293    }
294
295    /// Open a replica from this store.
296    ///
297    /// This just calls load_replica_info and then creates a new replica with the info.
298    pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
299        let info = self.load_replica_info(namespace_id)?;
300        let instance = StoreInstance::new(*namespace_id, self);
301        Ok(Replica::new(instance, Box::new(info)))
302    }
303
304    /// Load the replica info from the store.
305    pub fn load_replica_info(
306        &mut self,
307        namespace_id: &NamespaceId,
308    ) -> Result<ReplicaInfo, OpenError> {
309        let tables = self.tables()?;
310        let info = match tables.namespaces.get(namespace_id.as_bytes()) {
311            Ok(Some(db_value)) => {
312                let (raw_kind, raw_bytes) = db_value.value();
313                let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
314                ReplicaInfo::new(namespace)
315            }
316            Ok(None) => return Err(OpenError::NotFound),
317            Err(err) => return Err(OpenError::Other(err.into())),
318        };
319        self.open_replicas.insert(info.capability.id());
320        Ok(info)
321    }
322
323    /// Close a replica.
324    pub fn close_replica(&mut self, id: NamespaceId) {
325        self.open_replicas.remove(&id);
326    }
327
328    /// List all replica namespaces in this store.
329    pub fn list_namespaces(
330        &mut self,
331    ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
332        let snapshot = self.snapshot()?;
333        let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
334        let iter = iter.map(|res| {
335            let capability = parse_capability(res?.1.value())?;
336            Ok((capability.id(), capability.kind()))
337        });
338        Ok(iter)
339    }
340
341    /// Get an author key from the store.
342    pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
343        let tables = self.tables()?;
344        let Some(author) = tables.authors.get(author_id.as_bytes())? else {
345            return Ok(None);
346        };
347        let author = Author::from_bytes(author.value());
348        Ok(Some(author))
349    }
350
351    /// Import an author key pair.
352    pub fn import_author(&mut self, author: Author) -> Result<()> {
353        self.modify(|tables| {
354            tables
355                .authors
356                .insert(author.id().as_bytes(), &author.to_bytes())?;
357            Ok(())
358        })
359    }
360
361    /// Delete an author.
362    pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
363        self.modify(|tables| {
364            tables.authors.remove(author.as_bytes())?;
365            Ok(())
366        })
367    }
368
369    /// List all author keys in this store.
370    pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
371        let tables = self.snapshot()?;
372        let iter = tables
373            .authors
374            .range::<&'static [u8; 32]>(..)?
375            .map(|res| match res {
376                Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
377                Err(err) => Err(err.into()),
378            });
379        Ok(iter)
380    }
381
382    /// Import a new replica namespace.
383    pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
384        self.modify(|tables| {
385            let outcome = {
386                let (capability, outcome) = {
387                    let existing = tables.namespaces.get(capability.id().as_bytes())?;
388                    if let Some(existing) = existing {
389                        let mut existing = parse_capability(existing.value())?;
390                        let outcome = if existing.merge(capability)? {
391                            ImportNamespaceOutcome::Upgraded
392                        } else {
393                            ImportNamespaceOutcome::NoChange
394                        };
395                        (existing, outcome)
396                    } else {
397                        (capability, ImportNamespaceOutcome::Inserted)
398                    }
399                };
400                let id = capability.id().to_bytes();
401                let (kind, bytes) = capability.raw();
402                tables.namespaces.insert(&id, (kind, &bytes))?;
403                outcome
404            };
405            Ok(outcome)
406        })
407    }
408
409    /// Remove a replica.
410    ///
411    /// Completely removes a replica and deletes both the namespace private key and all document
412    /// entries.
413    ///
414    /// Note that a replica has to be closed before it can be removed. The store has to enforce
415    /// that a replica cannot be removed while it is still open.
416    pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
417        if self.open_replicas.contains(namespace) {
418            return Err(anyhow!("replica is not closed"));
419        }
420        self.modify(|tables| {
421            let bounds = RecordsBounds::namespace(*namespace);
422            tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
423            let bounds = ByKeyBounds::namespace(*namespace);
424            let _ = tables
425                .records_by_key
426                .retain_in(bounds.as_ref(), |_k, _v| false);
427            tables.namespaces.remove(namespace.as_bytes())?;
428            tables.namespace_peers.remove_all(namespace.as_bytes())?;
429            tables.download_policy.remove(namespace.as_bytes())?;
430            Ok(())
431        })
432    }
433
434    /// Get an iterator over entries of a replica.
435    pub fn get_many(
436        &mut self,
437        namespace: NamespaceId,
438        query: impl Into<Query>,
439    ) -> Result<QueryIterator> {
440        let tables = self.snapshot_owned()?;
441        QueryIterator::new(tables, namespace, query.into())
442    }
443
444    /// Get an entry by key and author.
445    pub fn get_exact(
446        &mut self,
447        namespace: NamespaceId,
448        author: AuthorId,
449        key: impl AsRef<[u8]>,
450        include_empty: bool,
451    ) -> Result<Option<SignedEntry>> {
452        get_exact(
453            &self.tables()?.records,
454            namespace,
455            author,
456            key,
457            include_empty,
458        )
459    }
460
461    /// Get all content hashes of all replicas in the store.
462    pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
463        let tables = self.snapshot_owned()?;
464        ContentHashesIterator::all(&tables.records)
465    }
466
467    /// Get the latest entry for each author in a namespace.
468    pub fn get_latest_for_each_author(
469        &mut self,
470        namespace: NamespaceId,
471    ) -> Result<LatestIterator<'_>> {
472        LatestIterator::new(&self.tables()?.latest_per_author, namespace)
473    }
474
475    /// Register a peer that has been useful to sync a document.
476    pub fn register_useful_peer(
477        &mut self,
478        namespace: NamespaceId,
479        peer: crate::PeerIdBytes,
480    ) -> Result<()> {
481        let peer = &peer;
482        let namespace = namespace.as_bytes();
483        // calculate nanos since UNIX_EPOCH for a time measurement
484        let nanos = std::time::UNIX_EPOCH
485            .elapsed()
486            .map(|duration| duration.as_nanos() as u64)?;
487        self.modify(|tables| {
488            // ensure the document exists
489            anyhow::ensure!(
490                tables.namespaces.get(namespace)?.is_some(),
491                "document not created"
492            );
493
494            let mut namespace_peers = tables.namespace_peers.get(namespace)?;
495
496            // get the oldest entry since it's candidate for removal
497            let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
498                let (oldest_nanos, &oldest_peer) = guard.value();
499                (oldest_nanos, oldest_peer)
500            });
501            match maybe_oldest {
502                None => {
503                    // the table is empty so the peer can be inserted without further checks since
504                    // super::PEERS_PER_DOC_CACHE_SIZE is non zero
505                    drop(namespace_peers);
506                    tables.namespace_peers.insert(namespace, (nanos, peer))?;
507                }
508                Some((oldest_nanos, oldest_peer)) => {
509                    let oldest_peer = &oldest_peer;
510
511                    if oldest_peer == peer {
512                        // oldest peer is the current one, so replacing the entry for the peer will
513                        // maintain the size
514                        drop(namespace_peers);
515                        tables
516                            .namespace_peers
517                            .remove(namespace, (oldest_nanos, oldest_peer))?;
518                        tables.namespace_peers.insert(namespace, (nanos, peer))?;
519                    } else {
520                        // calculate the len in the same loop since calling `len` is another fallible operation
521                        let mut len = 1;
522                        // find any previous entry for the same peer to remove it
523                        let mut prev_peer_nanos = None;
524
525                        for result in namespace_peers {
526                            len += 1;
527                            let guard = result?;
528                            let (peer_nanos, peer_bytes) = guard.value();
529                            if prev_peer_nanos.is_none() && peer_bytes == peer {
530                                prev_peer_nanos = Some(peer_nanos)
531                            }
532                        }
533
534                        match prev_peer_nanos {
535                            Some(prev_nanos) => {
536                                // the peer was already present, so we can remove the old entry and
537                                // insert the new one without checking the size
538                                tables
539                                    .namespace_peers
540                                    .remove(namespace, (prev_nanos, peer))?;
541                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
542                            }
543                            None => {
544                                // the peer is new and the table is non empty, add it and check the
545                                // size to decide if the oldest peer should be evicted
546                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
547                                len += 1;
548                                if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
549                                    tables
550                                        .namespace_peers
551                                        .remove(namespace, (oldest_nanos, oldest_peer))?;
552                                }
553                            }
554                        }
555                    }
556                }
557            }
558            Ok(())
559        })
560    }
561
562    /// Get the peers that have been useful for a document.
563    pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
564        let tables = self.tables()?;
565        let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
566        for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
567            let (_nanos, &peer) = result?.value();
568            peers.push(peer);
569        }
570        if peers.is_empty() {
571            Ok(None)
572        } else {
573            Ok(Some(peers.into_iter()))
574        }
575    }
576
577    /// Set the download policy for a namespace.
578    pub fn set_download_policy(
579        &mut self,
580        namespace: &NamespaceId,
581        policy: DownloadPolicy,
582    ) -> Result<()> {
583        self.modify(|tables| {
584            let namespace = namespace.as_bytes();
585
586            // ensure the document exists
587            anyhow::ensure!(
588                tables.namespaces.get(&namespace)?.is_some(),
589                "document not created"
590            );
591
592            let value = postcard::to_stdvec(&policy)?;
593            tables.download_policy.insert(namespace, value.as_slice())?;
594            Ok(())
595        })
596    }
597
598    /// Get the download policy for a namespace.
599    pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
600        let tables = self.tables()?;
601        let value = tables.download_policy.get(namespace.as_bytes())?;
602        Ok(match value {
603            None => DownloadPolicy::default(),
604            Some(value) => postcard::from_bytes(value.value())?,
605        })
606    }
607}
608
609impl PublicKeyStore for Store {
610    fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
611        self.pubkeys.public_key(id)
612    }
613}
614
615fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
616    Capability::from_raw(raw_kind, raw_bytes)
617}
618
619fn get_exact(
620    record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
621    namespace: NamespaceId,
622    author: AuthorId,
623    key: impl AsRef<[u8]>,
624    include_empty: bool,
625) -> Result<Option<SignedEntry>> {
626    let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
627    let record = record_table.get(id)?;
628    Ok(record
629        .map(|r| into_entry(id, r.value()))
630        .filter(|entry| include_empty || !entry.is_empty()))
631}
632
633/// A wrapper around [`Store`] for a specific [`NamespaceId`]
634#[derive(Debug)]
635pub struct StoreInstance<'a> {
636    namespace: NamespaceId,
637    pub(crate) store: &'a mut Store,
638}
639
640impl<'a> StoreInstance<'a> {
641    pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
642        StoreInstance { namespace, store }
643    }
644}
645
646impl PublicKeyStore for StoreInstance<'_> {
647    fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
648        self.store.public_key(id)
649    }
650}
651
652impl super::DownloadPolicyStore for StoreInstance<'_> {
653    fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
654        self.store.get_download_policy(namespace)
655    }
656}
657
658impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
659    type Error = anyhow::Error;
660    type RangeIterator<'x>
661        = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
662    where
663        'a: 'x;
664    type ParentIterator<'x>
665        = ParentIterator
666    where
667        'a: 'x;
668
669    /// Get a the first key (or the default if none is available).
670    fn get_first(&mut self) -> Result<RecordIdentifier> {
671        let tables = self.store.as_mut().tables()?;
672        // TODO: verify this fetches all keys with this namespace
673        let bounds = RecordsBounds::namespace(self.namespace);
674        let mut records = tables.records.range(bounds.as_ref())?;
675
676        let Some(record) = records.next() else {
677            return Ok(RecordIdentifier::default());
678        };
679        let (compound_key, _value) = record?;
680        let (namespace_id, author_id, key) = compound_key.value();
681        let id = RecordIdentifier::new(namespace_id, author_id, key);
682        Ok(id)
683    }
684
685    fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
686        self.store
687            .as_mut()
688            .get_exact(id.namespace(), id.author(), id.key(), true)
689    }
690
691    fn len(&mut self) -> Result<usize> {
692        let tables = self.store.as_mut().tables()?;
693        let bounds = RecordsBounds::namespace(self.namespace);
694        let records = tables.records.range(bounds.as_ref())?;
695        Ok(records.count())
696    }
697
698    fn is_empty(&mut self) -> Result<bool> {
699        let tables = self.store.as_mut().tables()?;
700        Ok(tables.records.is_empty()?)
701    }
702
703    fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
704        // TODO: optimize
705        let elements = self.get_range(range.clone())?;
706
707        let mut fp = Fingerprint::empty();
708        for el in elements {
709            let el = el?;
710            fp ^= el.as_fingerprint();
711        }
712
713        Ok(fp)
714    }
715
716    fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
717        let id = e.id();
718        self.store.as_mut().modify(|tables| {
719            // insert into record table
720            let key = (
721                &id.namespace().to_bytes(),
722                &id.author().to_bytes(),
723                id.key(),
724            );
725            let hash = e.content_hash(); // let binding is needed
726            let value = (
727                e.timestamp(),
728                &e.signature().namespace().to_bytes(),
729                &e.signature().author().to_bytes(),
730                e.content_len(),
731                hash.as_bytes(),
732            );
733            tables.records.insert(key, value)?;
734
735            // insert into by key index table
736            let key = (
737                &id.namespace().to_bytes(),
738                id.key(),
739                &id.author().to_bytes(),
740            );
741            tables.records_by_key.insert(key, ())?;
742
743            // insert into latest table
744            let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
745            let value = (e.timestamp(), e.id().key());
746            tables.latest_per_author.insert(key, value)?;
747            Ok(())
748        })
749    }
750
751    fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
752        let tables = self.store.as_mut().tables()?;
753        let iter = match range.x().cmp(range.y()) {
754            // identity range: iter1 = all, iter2 = none
755            Ordering::Equal => {
756                // iterator for all entries in replica
757                let bounds = RecordsBounds::namespace(self.namespace);
758                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
759                chain_none(iter)
760            }
761            // regular range: iter1 = x <= t < y, iter2 = none
762            Ordering::Less => {
763                // iterator for entries from range.x to range.y
764                let start = Bound::Included(range.x().to_byte_tuple());
765                let end = Bound::Excluded(range.y().to_byte_tuple());
766                let bounds = RecordsBounds::new(start, end);
767                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
768                chain_none(iter)
769            }
770            // split range: iter1 = start <= t < y, iter2 = x <= t <= end
771            Ordering::Greater => {
772                // iterator for entries from start to range.y
773                let end = Bound::Excluded(range.y().to_byte_tuple());
774                let bounds = RecordsBounds::from_start(&self.namespace, end);
775                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
776
777                // iterator for entries from range.x to end
778                let start = Bound::Included(range.x().to_byte_tuple());
779                let bounds = RecordsBounds::to_end(&self.namespace, start);
780                let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
781
782                iter.chain(Some(iter2).into_iter().flatten())
783            }
784        };
785        Ok(iter)
786    }
787
788    fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
789        self.store.as_mut().modify(|tables| {
790            let entry = {
791                let (namespace, author, key) = id.as_byte_tuple();
792                let id = (namespace, key, author);
793                tables.records_by_key.remove(id)?;
794                let id = (namespace, author, key);
795                let value = tables.records.remove(id)?;
796                value.map(|value| into_entry(id, value.value()))
797            };
798            Ok(entry)
799        })
800    }
801
802    fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
803        let tables = self.store.as_mut().tables()?;
804        let bounds = RecordsBounds::namespace(self.namespace);
805        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
806        Ok(chain_none(iter))
807    }
808
809    fn prefixes_of(
810        &mut self,
811        id: &RecordIdentifier,
812    ) -> Result<Self::ParentIterator<'_>, Self::Error> {
813        let tables = self.store.as_mut().tables()?;
814        ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
815    }
816
817    fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
818        let tables = self.store.as_mut().tables()?;
819        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
820        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
821        Ok(chain_none(iter))
822    }
823
824    fn remove_prefix_filtered(
825        &mut self,
826        id: &RecordIdentifier,
827        predicate: impl Fn(&Record) -> bool,
828    ) -> Result<usize> {
829        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
830        self.store.as_mut().modify(|tables| {
831            let cb = |_k: RecordsId, v: RecordsValue| {
832                let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
833                let record = Record::new(hash.into(), len, timestamp);
834
835                predicate(&record)
836            };
837            let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
838            let count = iter.count();
839            Ok(count)
840        })
841    }
842}
843
844fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
845    iter: I,
846) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
847    iter.chain(None.into_iter().flatten())
848}
849
850/// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which
851/// is a prefix of the key passed to the iterator.
852#[derive(Debug)]
853pub struct ParentIterator {
854    inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
855}
856
857impl ParentIterator {
858    fn new(
859        tables: &Tables,
860        namespace: NamespaceId,
861        author: AuthorId,
862        key: Vec<u8>,
863    ) -> anyhow::Result<Self> {
864        let parents = parents(&tables.records, namespace, author, key.clone());
865        Ok(Self {
866            inner: parents.into_iter(),
867        })
868    }
869}
870
871fn parents(
872    table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
873    namespace: NamespaceId,
874    author: AuthorId,
875    mut key: Vec<u8>,
876) -> Vec<anyhow::Result<SignedEntry>> {
877    let mut res = Vec::new();
878
879    while !key.is_empty() {
880        let entry = get_exact(table, namespace, author, &key, false);
881        key.pop();
882        match entry {
883            Err(err) => res.push(Err(err)),
884            Ok(Some(entry)) => res.push(Ok(entry)),
885            Ok(None) => continue,
886        }
887    }
888    res.reverse();
889    res
890}
891
892impl Iterator for ParentIterator {
893    type Item = Result<SignedEntry>;
894
895    fn next(&mut self) -> Option<Self::Item> {
896        self.inner.next()
897    }
898}
899
900/// Iterator for all content hashes
901///
902/// Note that you might get duplicate hashes. Also, the iterator will keep
903/// a database snapshot open until it is dropped.
904///
905/// Also, this represents a snapshot of the database at the time of creation.
906/// It needs a copy of a redb::ReadOnlyTable to be self-contained.
907#[derive(derive_more::Debug)]
908pub struct ContentHashesIterator {
909    #[debug(skip)]
910    range: RecordsRange<'static>,
911}
912
913impl ContentHashesIterator {
914    /// Create a new iterator over all content hashes.
915    pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
916        let range = RecordsRange::all_static(table)?;
917        Ok(Self { range })
918    }
919}
920
921impl Iterator for ContentHashesIterator {
922    type Item = Result<Hash>;
923
924    fn next(&mut self) -> Option<Self::Item> {
925        let v = self.range.next()?;
926        Some(v.map(|e| e.content_hash()))
927    }
928}
929
930/// Iterator over the latest entry per author.
931#[derive(derive_more::Debug)]
932#[debug("LatestIterator")]
933pub struct LatestIterator<'a>(
934    redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
935);
936
937impl<'a> LatestIterator<'a> {
938    fn new(
939        latest_per_author: &'a impl ReadableTable<
940            LatestPerAuthorKey<'static>,
941            LatestPerAuthorValue<'static>,
942        >,
943        namespace: NamespaceId,
944    ) -> anyhow::Result<Self> {
945        let start = (namespace.as_bytes(), &[u8::MIN; 32]);
946        let end = (namespace.as_bytes(), &[u8::MAX; 32]);
947        let range = latest_per_author.range(start..=end)?;
948        Ok(Self(range))
949    }
950}
951
952impl Iterator for LatestIterator<'_> {
953    type Item = Result<(AuthorId, u64, Vec<u8>)>;
954
955    fn next(&mut self) -> Option<Self::Item> {
956        self.0.next_map(|key, value| {
957            let (_namespace, author) = key;
958            let (timestamp, key) = value;
959            (author.into(), timestamp, key.to_vec())
960        })
961    }
962}
963
964fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
965    let (namespace, author, key) = key;
966    let (timestamp, namespace_sig, author_sig, len, hash) = value;
967    let id = RecordIdentifier::new(namespace, author, key);
968    let record = Record::new(hash.into(), len, timestamp);
969    let entry = Entry::new(id, record);
970    let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
971    SignedEntry::new(entry_signature, entry)
972}
973
974#[cfg(test)]
975mod tests {
976    use super::{tables::LATEST_PER_AUTHOR_TABLE, *};
977    use crate::ranger::Store as _;
978
979    #[test]
980    fn test_ranges() -> Result<()> {
981        let dbfile = tempfile::NamedTempFile::new()?;
982        let mut store = Store::persistent(dbfile.path())?;
983
984        let author = store.new_author(&mut rand::thread_rng())?;
985        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
986        let mut replica = store.new_replica(namespace.clone())?;
987
988        // test author prefix relation for all-255 keys
989        let key1 = vec![255, 255];
990        let key2 = vec![255, 255, 255];
991        replica.hash_and_insert(&key1, &author, b"v1")?;
992        replica.hash_and_insert(&key2, &author, b"v2")?;
993        let res = store
994            .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
995            .collect::<Result<Vec<_>>>()?;
996        assert_eq!(res.len(), 2);
997        assert_eq!(
998            res.into_iter()
999                .map(|entry| entry.key().to_vec())
1000                .collect::<Vec<_>>(),
1001            vec![key1, key2]
1002        );
1003        Ok(())
1004    }
1005
1006    #[test]
1007    fn test_basics() -> Result<()> {
1008        let dbfile = tempfile::NamedTempFile::new()?;
1009        let mut store = Store::persistent(dbfile.path())?;
1010
1011        let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1012        assert!(authors.is_empty());
1013
1014        let author = store.new_author(&mut rand::thread_rng())?;
1015        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1016        let _replica = store.new_replica(namespace.clone())?;
1017        store.close_replica(namespace.id());
1018        let replica = store.load_replica_info(&namespace.id())?;
1019        assert_eq!(replica.capability.id(), namespace.id());
1020
1021        let author_back = store.get_author(&author.id())?.unwrap();
1022        assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1023
1024        let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1025        for i in 0..5 {
1026            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1027            let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1028            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1029            wrapper.entry_put(entry)?;
1030        }
1031
1032        // all
1033        let all: Vec<_> = wrapper.all()?.collect();
1034        assert_eq!(all.len(), 5);
1035
1036        // add a second version
1037        let mut ids = Vec::new();
1038        for i in 0..5 {
1039            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1040            let entry = Entry::new(
1041                id.clone(),
1042                Record::current_from_data(format!("world-{i}-2")),
1043            );
1044            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1045            wrapper.entry_put(entry)?;
1046            ids.push(id);
1047        }
1048
1049        // get all
1050        let entries = wrapper
1051            .store
1052            .get_many(namespace.id(), Query::all())?
1053            .collect::<Result<Vec<_>>>()?;
1054        assert_eq!(entries.len(), 5);
1055
1056        // get all prefix
1057        let entries = wrapper
1058            .store
1059            .get_many(namespace.id(), Query::key_prefix("hello-"))?
1060            .collect::<Result<Vec<_>>>()?;
1061        assert_eq!(entries.len(), 5);
1062
1063        // delete and get
1064        for id in ids {
1065            let res = wrapper.get(&id)?;
1066            assert!(res.is_some());
1067            let out = wrapper.entry_remove(&id)?.unwrap();
1068            assert_eq!(out.entry().id(), &id);
1069            let res = wrapper.get(&id)?;
1070            assert!(res.is_none());
1071        }
1072
1073        // get latest
1074        let entries = wrapper
1075            .store
1076            .get_many(namespace.id(), Query::all())?
1077            .collect::<Result<Vec<_>>>()?;
1078        assert_eq!(entries.len(), 0);
1079
1080        Ok(())
1081    }
1082
1083    fn copy_and_modify(
1084        source: &Path,
1085        modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1086    ) -> Result<tempfile::NamedTempFile> {
1087        let dbfile = tempfile::NamedTempFile::new()?;
1088        std::fs::copy(source, dbfile.path())?;
1089        let db = Database::create(dbfile.path())?;
1090        let write_tx = db.begin_write()?;
1091        modify(&write_tx)?;
1092        write_tx.commit()?;
1093        drop(db);
1094        Ok(dbfile)
1095    }
1096
1097    #[test]
1098    fn test_migration_001_populate_latest_table() -> Result<()> {
1099        let dbfile = tempfile::NamedTempFile::new()?;
1100        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1101
1102        // create a store and add some data
1103        let expected = {
1104            let mut store = Store::persistent(dbfile.path())?;
1105            let author1 = store.new_author(&mut rand::thread_rng())?;
1106            let author2 = store.new_author(&mut rand::thread_rng())?;
1107            let mut replica = store.new_replica(namespace.clone())?;
1108            replica.hash_and_insert(b"k1", &author1, b"v1")?;
1109            replica.hash_and_insert(b"k2", &author2, b"v1")?;
1110            replica.hash_and_insert(b"k3", &author1, b"v1")?;
1111
1112            let expected = store
1113                .get_latest_for_each_author(namespace.id())?
1114                .collect::<Result<Vec<_>>>()?;
1115            // drop everything to clear file locks.
1116            store.close_replica(namespace.id());
1117            // flush the store to disk
1118            store.flush()?;
1119            drop(store);
1120            expected
1121        };
1122        assert_eq!(expected.len(), 2);
1123
1124        // create a copy of our db file with the latest table deleted.
1125        let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1126            tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1127            Ok(())
1128        })?;
1129
1130        // open the copied db file, which will run the migration.
1131        let mut store = Store::persistent(dbfile_before_migration.path())?;
1132        let actual = store
1133            .get_latest_for_each_author(namespace.id())?
1134            .collect::<Result<Vec<_>>>()?;
1135
1136        assert_eq!(expected, actual);
1137        Ok(())
1138    }
1139
1140    #[test]
1141    fn test_migration_004_populate_by_key_index() -> Result<()> {
1142        let dbfile = tempfile::NamedTempFile::new()?;
1143
1144        let mut store = Store::persistent(dbfile.path())?;
1145
1146        // check that the new table is there, even if empty
1147        {
1148            let tables = store.tables()?;
1149            assert_eq!(tables.records_by_key.len()?, 0);
1150        }
1151
1152        // TODO: write test checking that the indexing is done correctly
1153        Ok(())
1154    }
1155}