iroh_docs/store/
fs.rs

1//! On disk storage for replicas.
2
3use std::{
4    cmp::Ordering,
5    collections::HashSet,
6    iter::{Chain, Flatten},
7    num::NonZeroU64,
8    ops::Bound,
9    path::Path,
10};
11
12use anyhow::{anyhow, Result};
13use ed25519_dalek::{SignatureError, VerifyingKey};
14use iroh_blobs::Hash;
15use rand_core::CryptoRngCore;
16use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable, ReadableTableMetadata};
17use tracing::warn;
18
19use super::{
20    pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21    Query,
22};
23use crate::{
24    actor::MAX_COMMIT_DELAY,
25    keys::Author,
26    ranger::{Fingerprint, Range, RangeEntry},
27    sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28    AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29    ReplicaInfo,
30};
31
32mod bounds;
33mod migrate_v1_v2;
34mod migrations;
35mod query;
36mod ranges;
37pub(crate) mod tables;
38
39pub use self::ranges::RecordsRange;
40use self::{
41    bounds::{ByKeyBounds, RecordsBounds},
42    query::QueryIterator,
43    ranges::RangeExt,
44    tables::{
45        LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
46        RecordsValue, Tables, TransactionAndTables,
47    },
48};
49
50/// Manages the replicas and authors for an instance.
51#[derive(Debug)]
52pub struct Store {
53    db: Database,
54    transaction: CurrentTransaction,
55    open_replicas: HashSet<NamespaceId>,
56    pubkeys: MemPublicKeyStore,
57}
58
59impl Drop for Store {
60    fn drop(&mut self) {
61        if let Err(err) = self.flush() {
62            warn!("failed to trigger final flush: {:?}", err);
63        }
64    }
65}
66
67impl AsRef<Store> for Store {
68    fn as_ref(&self) -> &Store {
69        self
70    }
71}
72
73impl AsMut<Store> for Store {
74    fn as_mut(&mut self) -> &mut Store {
75        self
76    }
77}
78
79#[derive(derive_more::Debug, Default)]
80#[allow(clippy::large_enum_variant)]
81enum CurrentTransaction {
82    #[default]
83    None,
84    Read(ReadOnlyTables),
85    Write(TransactionAndTables),
86}
87
88impl Store {
89    /// Create a new store in memory.
90    pub fn memory() -> Self {
91        Self::memory_impl().expect("failed to create memory store")
92    }
93
94    fn memory_impl() -> Result<Self> {
95        let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
96        Self::new_impl(db)
97    }
98
99    /// Create or open a store from a `path` to a database file.
100    ///
101    /// The file will be created if it does not exist, otherwise it will be opened.
102    pub fn persistent(path: impl AsRef<Path>) -> Result<Self> {
103        let db = match Database::create(&path) {
104            Ok(db) => db,
105            Err(DatabaseError::UpgradeRequired(1)) => migrate_v1_v2::run(&path)?,
106            Err(err) => return Err(err.into()),
107        };
108        Self::new_impl(db)
109    }
110
111    fn new_impl(db: redb::Database) -> Result<Self> {
112        // Setup all tables
113        let write_tx = db.begin_write()?;
114        let _ = Tables::new(&write_tx)?;
115        write_tx.commit()?;
116
117        // Run database migrations
118        migrations::run_migrations(&db)?;
119
120        Ok(Store {
121            db,
122            transaction: Default::default(),
123            open_replicas: Default::default(),
124            pubkeys: Default::default(),
125        })
126    }
127
128    /// Flush the current transaction, if any.
129    ///
130    /// This is the cheapest way to ensure that the data is persisted.
131    pub fn flush(&mut self) -> Result<()> {
132        if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
133            w.commit()?;
134        }
135        Ok(())
136    }
137
138    /// Get a read-only snapshot of the database.
139    ///
140    /// This has the side effect of committing any open write transaction,
141    /// so it can be used as a way to ensure that the data is persisted.
142    pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
143        let guard = &mut self.transaction;
144        let tables = match std::mem::take(guard) {
145            CurrentTransaction::None => {
146                let tx = self.db.begin_read()?;
147                ReadOnlyTables::new(tx)?
148            }
149            CurrentTransaction::Write(w) => {
150                w.commit()?;
151                let tx = self.db.begin_read()?;
152                ReadOnlyTables::new(tx)?
153            }
154            CurrentTransaction::Read(tables) => tables,
155        };
156        *guard = CurrentTransaction::Read(tables);
157        match &*guard {
158            CurrentTransaction::Read(ref tables) => Ok(tables),
159            _ => unreachable!(),
160        }
161    }
162
163    /// Get an owned read-only snapshot of the database.
164    ///
165    /// This will open a new read transaction. The read transaction won't be reused for other
166    /// reads.
167    ///
168    /// This has the side effect of committing any open write transaction,
169    /// so it can be used as a way to ensure that the data is persisted.
170    pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
171        // make sure the current transaction is committed
172        self.flush()?;
173        assert!(matches!(self.transaction, CurrentTransaction::None));
174        let tx = self.db.begin_read()?;
175        let tables = ReadOnlyTables::new(tx)?;
176        Ok(tables)
177    }
178
179    /// Get access to the tables to read from them.
180    ///
181    /// The underlying transaction is a write transaction, but with a non-mut
182    /// reference to the tables you can not write.
183    ///
184    /// There is no guarantee that this will be an independent transaction.
185    /// You just get readonly access to the current state of the database.
186    ///
187    /// As such, there is also no guarantee that the data you see is
188    /// already persisted.
189    fn tables(&mut self) -> Result<&Tables> {
190        let guard = &mut self.transaction;
191        let tables = match std::mem::take(guard) {
192            CurrentTransaction::None => {
193                let tx = self.db.begin_write()?;
194                TransactionAndTables::new(tx)?
195            }
196            CurrentTransaction::Write(w) => {
197                if w.since.elapsed() > MAX_COMMIT_DELAY {
198                    tracing::debug!("committing transaction because it's too old");
199                    w.commit()?;
200                    let tx = self.db.begin_write()?;
201                    TransactionAndTables::new(tx)?
202                } else {
203                    w
204                }
205            }
206            CurrentTransaction::Read(_) => {
207                let tx = self.db.begin_write()?;
208                TransactionAndTables::new(tx)?
209            }
210        };
211        *guard = CurrentTransaction::Write(tables);
212        match guard {
213            CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
214            _ => unreachable!(),
215        }
216    }
217
218    /// Get exclusive write access to the tables in the current transaction.
219    ///
220    /// There is no guarantee that this will be an independent transaction.
221    /// As such, there is also no guarantee that the data you see or write
222    /// will be persisted.
223    ///
224    /// To ensure that the data is persisted, acquire a snapshot of the database
225    /// or call flush.
226    fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
227        let guard = &mut self.transaction;
228        let tables = match std::mem::take(guard) {
229            CurrentTransaction::None => {
230                let tx = self.db.begin_write()?;
231                TransactionAndTables::new(tx)?
232            }
233            CurrentTransaction::Write(w) => {
234                if w.since.elapsed() > MAX_COMMIT_DELAY {
235                    tracing::debug!("committing transaction because it's too old");
236                    w.commit()?;
237                    let tx = self.db.begin_write()?;
238                    TransactionAndTables::new(tx)?
239                } else {
240                    w
241                }
242            }
243            CurrentTransaction::Read(_) => {
244                let tx = self.db.begin_write()?;
245                TransactionAndTables::new(tx)?
246            }
247        };
248        *guard = CurrentTransaction::Write(tables);
249        let res = match &mut *guard {
250            CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
251            _ => unreachable!(),
252        };
253        Ok(res)
254    }
255}
256
257type PeersIter = std::vec::IntoIter<PeerIdBytes>;
258
259impl Store {
260    /// Create a new replica for `namespace` and persist in this store.
261    pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica> {
262        let id = namespace.id();
263        self.import_namespace(namespace.into())?;
264        self.open_replica(&id).map_err(Into::into)
265    }
266
267    /// Create a new author key and persist it in the store.
268    pub fn new_author<R: CryptoRngCore + ?Sized>(&mut self, rng: &mut R) -> Result<Author> {
269        let author = Author::new(rng);
270        self.import_author(author.clone())?;
271        Ok(author)
272    }
273
274    /// Check if a [`AuthorHeads`] contains entry timestamps that we do not have locally.
275    ///
276    /// Returns the number of authors that the other peer has updates for.
277    pub fn has_news_for_us(
278        &mut self,
279        namespace: NamespaceId,
280        heads: &AuthorHeads,
281    ) -> Result<Option<NonZeroU64>> {
282        let our_heads = {
283            let latest = self.get_latest_for_each_author(namespace)?;
284            let mut heads = AuthorHeads::default();
285            for e in latest {
286                let (author, timestamp, _key) = e?;
287                heads.insert(author, timestamp);
288            }
289            heads
290        };
291        let has_news_for_us = heads.has_news_for(&our_heads);
292        Ok(has_news_for_us)
293    }
294
295    /// Open a replica from this store.
296    ///
297    /// This just calls load_replica_info and then creates a new replica with the info.
298    pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica, OpenError> {
299        let info = self.load_replica_info(namespace_id)?;
300        let instance = StoreInstance::new(*namespace_id, self);
301        Ok(Replica::new(instance, Box::new(info)))
302    }
303
304    /// Load the replica info from the store.
305    pub fn load_replica_info(
306        &mut self,
307        namespace_id: &NamespaceId,
308    ) -> Result<ReplicaInfo, OpenError> {
309        let tables = self.tables()?;
310        let info = match tables.namespaces.get(namespace_id.as_bytes()) {
311            Ok(Some(db_value)) => {
312                let (raw_kind, raw_bytes) = db_value.value();
313                let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
314                ReplicaInfo::new(namespace)
315            }
316            Ok(None) => return Err(OpenError::NotFound),
317            Err(err) => return Err(OpenError::Other(err.into())),
318        };
319        self.open_replicas.insert(info.capability.id());
320        Ok(info)
321    }
322
323    /// Close a replica.
324    pub fn close_replica(&mut self, id: NamespaceId) {
325        self.open_replicas.remove(&id);
326    }
327
328    /// List all replica namespaces in this store.
329    pub fn list_namespaces(
330        &mut self,
331    ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
332        let snapshot = self.snapshot()?;
333        let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
334        let iter = iter.map(|res| {
335            let capability = parse_capability(res?.1.value())?;
336            Ok((capability.id(), capability.kind()))
337        });
338        Ok(iter)
339    }
340
341    /// Get an author key from the store.
342    pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
343        let tables = self.tables()?;
344        let Some(author) = tables.authors.get(author_id.as_bytes())? else {
345            return Ok(None);
346        };
347        let author = Author::from_bytes(author.value());
348        Ok(Some(author))
349    }
350
351    /// Import an author key pair.
352    pub fn import_author(&mut self, author: Author) -> Result<()> {
353        self.modify(|tables| {
354            tables
355                .authors
356                .insert(author.id().as_bytes(), &author.to_bytes())?;
357            Ok(())
358        })
359    }
360
361    /// Delete an author.
362    pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
363        self.modify(|tables| {
364            tables.authors.remove(author.as_bytes())?;
365            Ok(())
366        })
367    }
368
369    /// List all author keys in this store.
370    pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
371        let tables = self.snapshot()?;
372        let iter = tables
373            .authors
374            .range::<&'static [u8; 32]>(..)?
375            .map(|res| match res {
376                Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
377                Err(err) => Err(err.into()),
378            });
379        Ok(iter)
380    }
381
382    /// Import a new replica namespace.
383    pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
384        self.modify(|tables| {
385            let outcome = {
386                let (capability, outcome) = {
387                    let existing = tables.namespaces.get(capability.id().as_bytes())?;
388                    if let Some(existing) = existing {
389                        let mut existing = parse_capability(existing.value())?;
390                        let outcome = if existing.merge(capability)? {
391                            ImportNamespaceOutcome::Upgraded
392                        } else {
393                            ImportNamespaceOutcome::NoChange
394                        };
395                        (existing, outcome)
396                    } else {
397                        (capability, ImportNamespaceOutcome::Inserted)
398                    }
399                };
400                let id = capability.id().to_bytes();
401                let (kind, bytes) = capability.raw();
402                tables.namespaces.insert(&id, (kind, &bytes))?;
403                outcome
404            };
405            Ok(outcome)
406        })
407    }
408
409    /// Remove a replica.
410    ///
411    /// Completely removes a replica and deletes both the namespace private key and all document
412    /// entries.
413    ///
414    /// Note that a replica has to be closed before it can be removed. The store has to enforce
415    /// that a replica cannot be removed while it is still open.
416    pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
417        if self.open_replicas.contains(namespace) {
418            return Err(anyhow!("replica is not closed"));
419        }
420        self.modify(|tables| {
421            let bounds = RecordsBounds::namespace(*namespace);
422            tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
423            let bounds = ByKeyBounds::namespace(*namespace);
424            let _ = tables
425                .records_by_key
426                .retain_in(bounds.as_ref(), |_k, _v| false);
427            tables.namespaces.remove(namespace.as_bytes())?;
428            tables.namespace_peers.remove_all(namespace.as_bytes())?;
429            tables.download_policy.remove(namespace.as_bytes())?;
430            Ok(())
431        })
432    }
433
434    /// Get an iterator over entries of a replica.
435    pub fn get_many(
436        &mut self,
437        namespace: NamespaceId,
438        query: impl Into<Query>,
439    ) -> Result<QueryIterator> {
440        let tables = self.snapshot_owned()?;
441        QueryIterator::new(tables, namespace, query.into())
442    }
443
444    /// Get an entry by key and author.
445    pub fn get_exact(
446        &mut self,
447        namespace: NamespaceId,
448        author: AuthorId,
449        key: impl AsRef<[u8]>,
450        include_empty: bool,
451    ) -> Result<Option<SignedEntry>> {
452        get_exact(
453            &self.tables()?.records,
454            namespace,
455            author,
456            key,
457            include_empty,
458        )
459    }
460
461    /// Get all content hashes of all replicas in the store.
462    pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
463        let tables = self.snapshot_owned()?;
464        ContentHashesIterator::all(&tables.records)
465    }
466
467    /// Get the latest entry for each author in a namespace.
468    pub fn get_latest_for_each_author(&mut self, namespace: NamespaceId) -> Result<LatestIterator> {
469        LatestIterator::new(&self.tables()?.latest_per_author, namespace)
470    }
471
472    /// Register a peer that has been useful to sync a document.
473    pub fn register_useful_peer(
474        &mut self,
475        namespace: NamespaceId,
476        peer: crate::PeerIdBytes,
477    ) -> Result<()> {
478        let peer = &peer;
479        let namespace = namespace.as_bytes();
480        // calculate nanos since UNIX_EPOCH for a time measurement
481        let nanos = std::time::UNIX_EPOCH
482            .elapsed()
483            .map(|duration| duration.as_nanos() as u64)?;
484        self.modify(|tables| {
485            // ensure the document exists
486            anyhow::ensure!(
487                tables.namespaces.get(namespace)?.is_some(),
488                "document not created"
489            );
490
491            let mut namespace_peers = tables.namespace_peers.get(namespace)?;
492
493            // get the oldest entry since it's candidate for removal
494            let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
495                let (oldest_nanos, &oldest_peer) = guard.value();
496                (oldest_nanos, oldest_peer)
497            });
498            match maybe_oldest {
499                None => {
500                    // the table is empty so the peer can be inserted without further checks since
501                    // super::PEERS_PER_DOC_CACHE_SIZE is non zero
502                    drop(namespace_peers);
503                    tables.namespace_peers.insert(namespace, (nanos, peer))?;
504                }
505                Some((oldest_nanos, oldest_peer)) => {
506                    let oldest_peer = &oldest_peer;
507
508                    if oldest_peer == peer {
509                        // oldest peer is the current one, so replacing the entry for the peer will
510                        // maintain the size
511                        drop(namespace_peers);
512                        tables
513                            .namespace_peers
514                            .remove(namespace, (oldest_nanos, oldest_peer))?;
515                        tables.namespace_peers.insert(namespace, (nanos, peer))?;
516                    } else {
517                        // calculate the len in the same loop since calling `len` is another fallible operation
518                        let mut len = 1;
519                        // find any previous entry for the same peer to remove it
520                        let mut prev_peer_nanos = None;
521
522                        for result in namespace_peers {
523                            len += 1;
524                            let guard = result?;
525                            let (peer_nanos, peer_bytes) = guard.value();
526                            if prev_peer_nanos.is_none() && peer_bytes == peer {
527                                prev_peer_nanos = Some(peer_nanos)
528                            }
529                        }
530
531                        match prev_peer_nanos {
532                            Some(prev_nanos) => {
533                                // the peer was already present, so we can remove the old entry and
534                                // insert the new one without checking the size
535                                tables
536                                    .namespace_peers
537                                    .remove(namespace, (prev_nanos, peer))?;
538                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
539                            }
540                            None => {
541                                // the peer is new and the table is non empty, add it and check the
542                                // size to decide if the oldest peer should be evicted
543                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
544                                len += 1;
545                                if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
546                                    tables
547                                        .namespace_peers
548                                        .remove(namespace, (oldest_nanos, oldest_peer))?;
549                                }
550                            }
551                        }
552                    }
553                }
554            }
555            Ok(())
556        })
557    }
558
559    /// Get the peers that have been useful for a document.
560    pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
561        let tables = self.tables()?;
562        let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
563        for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
564            let (_nanos, &peer) = result?.value();
565            peers.push(peer);
566        }
567        if peers.is_empty() {
568            Ok(None)
569        } else {
570            Ok(Some(peers.into_iter()))
571        }
572    }
573
574    /// Set the download policy for a namespace.
575    pub fn set_download_policy(
576        &mut self,
577        namespace: &NamespaceId,
578        policy: DownloadPolicy,
579    ) -> Result<()> {
580        self.modify(|tables| {
581            let namespace = namespace.as_bytes();
582
583            // ensure the document exists
584            anyhow::ensure!(
585                tables.namespaces.get(&namespace)?.is_some(),
586                "document not created"
587            );
588
589            let value = postcard::to_stdvec(&policy)?;
590            tables.download_policy.insert(namespace, value.as_slice())?;
591            Ok(())
592        })
593    }
594
595    /// Get the download policy for a namespace.
596    pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
597        let tables = self.tables()?;
598        let value = tables.download_policy.get(namespace.as_bytes())?;
599        Ok(match value {
600            None => DownloadPolicy::default(),
601            Some(value) => postcard::from_bytes(value.value())?,
602        })
603    }
604}
605
606impl PublicKeyStore for Store {
607    fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
608        self.pubkeys.public_key(id)
609    }
610}
611
612fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
613    Capability::from_raw(raw_kind, raw_bytes)
614}
615
616fn get_exact(
617    record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
618    namespace: NamespaceId,
619    author: AuthorId,
620    key: impl AsRef<[u8]>,
621    include_empty: bool,
622) -> Result<Option<SignedEntry>> {
623    let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
624    let record = record_table.get(id)?;
625    Ok(record
626        .map(|r| into_entry(id, r.value()))
627        .filter(|entry| include_empty || !entry.is_empty()))
628}
629
630/// A wrapper around [`Store`] for a specific [`NamespaceId`]
631#[derive(Debug)]
632pub struct StoreInstance<'a> {
633    namespace: NamespaceId,
634    pub(crate) store: &'a mut Store,
635}
636
637impl<'a> StoreInstance<'a> {
638    pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
639        StoreInstance { namespace, store }
640    }
641}
642
643impl PublicKeyStore for StoreInstance<'_> {
644    fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
645        self.store.public_key(id)
646    }
647}
648
649impl super::DownloadPolicyStore for StoreInstance<'_> {
650    fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
651        self.store.get_download_policy(namespace)
652    }
653}
654
655impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
656    type Error = anyhow::Error;
657    type RangeIterator<'x>
658        = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
659    where
660        'a: 'x;
661    type ParentIterator<'x>
662        = ParentIterator
663    where
664        'a: 'x;
665
666    /// Get a the first key (or the default if none is available).
667    fn get_first(&mut self) -> Result<RecordIdentifier> {
668        let tables = self.store.as_mut().tables()?;
669        // TODO: verify this fetches all keys with this namespace
670        let bounds = RecordsBounds::namespace(self.namespace);
671        let mut records = tables.records.range(bounds.as_ref())?;
672
673        let Some(record) = records.next() else {
674            return Ok(RecordIdentifier::default());
675        };
676        let (compound_key, _value) = record?;
677        let (namespace_id, author_id, key) = compound_key.value();
678        let id = RecordIdentifier::new(namespace_id, author_id, key);
679        Ok(id)
680    }
681
682    fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
683        self.store
684            .as_mut()
685            .get_exact(id.namespace(), id.author(), id.key(), true)
686    }
687
688    fn len(&mut self) -> Result<usize> {
689        let tables = self.store.as_mut().tables()?;
690        let bounds = RecordsBounds::namespace(self.namespace);
691        let records = tables.records.range(bounds.as_ref())?;
692        Ok(records.count())
693    }
694
695    fn is_empty(&mut self) -> Result<bool> {
696        let tables = self.store.as_mut().tables()?;
697        Ok(tables.records.is_empty()?)
698    }
699
700    fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
701        // TODO: optimize
702        let elements = self.get_range(range.clone())?;
703
704        let mut fp = Fingerprint::empty();
705        for el in elements {
706            let el = el?;
707            fp ^= el.as_fingerprint();
708        }
709
710        Ok(fp)
711    }
712
713    fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
714        let id = e.id();
715        self.store.as_mut().modify(|tables| {
716            // insert into record table
717            let key = (
718                &id.namespace().to_bytes(),
719                &id.author().to_bytes(),
720                id.key(),
721            );
722            let hash = e.content_hash(); // let binding is needed
723            let value = (
724                e.timestamp(),
725                &e.signature().namespace().to_bytes(),
726                &e.signature().author().to_bytes(),
727                e.content_len(),
728                hash.as_bytes(),
729            );
730            tables.records.insert(key, value)?;
731
732            // insert into by key index table
733            let key = (
734                &id.namespace().to_bytes(),
735                id.key(),
736                &id.author().to_bytes(),
737            );
738            tables.records_by_key.insert(key, ())?;
739
740            // insert into latest table
741            let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
742            let value = (e.timestamp(), e.id().key());
743            tables.latest_per_author.insert(key, value)?;
744            Ok(())
745        })
746    }
747
748    fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
749        let tables = self.store.as_mut().tables()?;
750        let iter = match range.x().cmp(range.y()) {
751            // identity range: iter1 = all, iter2 = none
752            Ordering::Equal => {
753                // iterator for all entries in replica
754                let bounds = RecordsBounds::namespace(self.namespace);
755                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
756                chain_none(iter)
757            }
758            // regular range: iter1 = x <= t < y, iter2 = none
759            Ordering::Less => {
760                // iterator for entries from range.x to range.y
761                let start = Bound::Included(range.x().to_byte_tuple());
762                let end = Bound::Excluded(range.y().to_byte_tuple());
763                let bounds = RecordsBounds::new(start, end);
764                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
765                chain_none(iter)
766            }
767            // split range: iter1 = start <= t < y, iter2 = x <= t <= end
768            Ordering::Greater => {
769                // iterator for entries from start to range.y
770                let end = Bound::Excluded(range.y().to_byte_tuple());
771                let bounds = RecordsBounds::from_start(&self.namespace, end);
772                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
773
774                // iterator for entries from range.x to end
775                let start = Bound::Included(range.x().to_byte_tuple());
776                let bounds = RecordsBounds::to_end(&self.namespace, start);
777                let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
778
779                iter.chain(Some(iter2).into_iter().flatten())
780            }
781        };
782        Ok(iter)
783    }
784
785    fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
786        self.store.as_mut().modify(|tables| {
787            let entry = {
788                let (namespace, author, key) = id.as_byte_tuple();
789                let id = (namespace, key, author);
790                tables.records_by_key.remove(id)?;
791                let id = (namespace, author, key);
792                let value = tables.records.remove(id)?;
793                value.map(|value| into_entry(id, value.value()))
794            };
795            Ok(entry)
796        })
797    }
798
799    fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
800        let tables = self.store.as_mut().tables()?;
801        let bounds = RecordsBounds::namespace(self.namespace);
802        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
803        Ok(chain_none(iter))
804    }
805
806    fn prefixes_of(
807        &mut self,
808        id: &RecordIdentifier,
809    ) -> Result<Self::ParentIterator<'_>, Self::Error> {
810        let tables = self.store.as_mut().tables()?;
811        ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
812    }
813
814    fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
815        let tables = self.store.as_mut().tables()?;
816        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
817        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
818        Ok(chain_none(iter))
819    }
820
821    fn remove_prefix_filtered(
822        &mut self,
823        id: &RecordIdentifier,
824        predicate: impl Fn(&Record) -> bool,
825    ) -> Result<usize> {
826        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
827        self.store.as_mut().modify(|tables| {
828            let cb = |_k: RecordsId, v: RecordsValue| {
829                let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
830                let record = Record::new(hash.into(), len, timestamp);
831
832                predicate(&record)
833            };
834            let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
835            let count = iter.count();
836            Ok(count)
837        })
838    }
839}
840
841fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
842    iter: I,
843) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
844    iter.chain(None.into_iter().flatten())
845}
846
847/// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which
848/// is a prefix of the key passed to the iterator.
849#[derive(Debug)]
850pub struct ParentIterator {
851    inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
852}
853
854impl ParentIterator {
855    fn new(
856        tables: &Tables,
857        namespace: NamespaceId,
858        author: AuthorId,
859        key: Vec<u8>,
860    ) -> anyhow::Result<Self> {
861        let parents = parents(&tables.records, namespace, author, key.clone());
862        Ok(Self {
863            inner: parents.into_iter(),
864        })
865    }
866}
867
868fn parents(
869    table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
870    namespace: NamespaceId,
871    author: AuthorId,
872    mut key: Vec<u8>,
873) -> Vec<anyhow::Result<SignedEntry>> {
874    let mut res = Vec::new();
875
876    while !key.is_empty() {
877        let entry = get_exact(table, namespace, author, &key, false);
878        key.pop();
879        match entry {
880            Err(err) => res.push(Err(err)),
881            Ok(Some(entry)) => res.push(Ok(entry)),
882            Ok(None) => continue,
883        }
884    }
885    res.reverse();
886    res
887}
888
889impl Iterator for ParentIterator {
890    type Item = Result<SignedEntry>;
891
892    fn next(&mut self) -> Option<Self::Item> {
893        self.inner.next()
894    }
895}
896
897/// Iterator for all content hashes
898///
899/// Note that you might get duplicate hashes. Also, the iterator will keep
900/// a database snapshot open until it is dropped.
901///
902/// Also, this represents a snapshot of the database at the time of creation.
903/// It needs a copy of a redb::ReadOnlyTable to be self-contained.
904#[derive(derive_more::Debug)]
905pub struct ContentHashesIterator {
906    #[debug(skip)]
907    range: RecordsRange<'static>,
908}
909
910impl ContentHashesIterator {
911    /// Create a new iterator over all content hashes.
912    pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
913        let range = RecordsRange::all_static(table)?;
914        Ok(Self { range })
915    }
916}
917
918impl Iterator for ContentHashesIterator {
919    type Item = Result<Hash>;
920
921    fn next(&mut self) -> Option<Self::Item> {
922        let v = self.range.next()?;
923        Some(v.map(|e| e.content_hash()))
924    }
925}
926
927/// Iterator over the latest entry per author.
928#[derive(derive_more::Debug)]
929#[debug("LatestIterator")]
930pub struct LatestIterator<'a>(
931    redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
932);
933
934impl<'a> LatestIterator<'a> {
935    fn new(
936        latest_per_author: &'a impl ReadableTable<
937            LatestPerAuthorKey<'static>,
938            LatestPerAuthorValue<'static>,
939        >,
940        namespace: NamespaceId,
941    ) -> anyhow::Result<Self> {
942        let start = (namespace.as_bytes(), &[u8::MIN; 32]);
943        let end = (namespace.as_bytes(), &[u8::MAX; 32]);
944        let range = latest_per_author.range(start..=end)?;
945        Ok(Self(range))
946    }
947}
948
949impl Iterator for LatestIterator<'_> {
950    type Item = Result<(AuthorId, u64, Vec<u8>)>;
951
952    fn next(&mut self) -> Option<Self::Item> {
953        self.0.next_map(|key, value| {
954            let (_namespace, author) = key;
955            let (timestamp, key) = value;
956            (author.into(), timestamp, key.to_vec())
957        })
958    }
959}
960
961fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
962    let (namespace, author, key) = key;
963    let (timestamp, namespace_sig, author_sig, len, hash) = value;
964    let id = RecordIdentifier::new(namespace, author, key);
965    let record = Record::new(hash.into(), len, timestamp);
966    let entry = Entry::new(id, record);
967    let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
968    SignedEntry::new(entry_signature, entry)
969}
970
971#[cfg(test)]
972mod tests {
973    use super::{tables::LATEST_PER_AUTHOR_TABLE, *};
974    use crate::ranger::Store as _;
975
976    #[test]
977    fn test_ranges() -> Result<()> {
978        let dbfile = tempfile::NamedTempFile::new()?;
979        let mut store = Store::persistent(dbfile.path())?;
980
981        let author = store.new_author(&mut rand::thread_rng())?;
982        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
983        let mut replica = store.new_replica(namespace.clone())?;
984
985        // test author prefix relation for all-255 keys
986        let key1 = vec![255, 255];
987        let key2 = vec![255, 255, 255];
988        replica.hash_and_insert(&key1, &author, b"v1")?;
989        replica.hash_and_insert(&key2, &author, b"v2")?;
990        let res = store
991            .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
992            .collect::<Result<Vec<_>>>()?;
993        assert_eq!(res.len(), 2);
994        assert_eq!(
995            res.into_iter()
996                .map(|entry| entry.key().to_vec())
997                .collect::<Vec<_>>(),
998            vec![key1, key2]
999        );
1000        Ok(())
1001    }
1002
1003    #[test]
1004    fn test_basics() -> Result<()> {
1005        let dbfile = tempfile::NamedTempFile::new()?;
1006        let mut store = Store::persistent(dbfile.path())?;
1007
1008        let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1009        assert!(authors.is_empty());
1010
1011        let author = store.new_author(&mut rand::thread_rng())?;
1012        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1013        let _replica = store.new_replica(namespace.clone())?;
1014        store.close_replica(namespace.id());
1015        let replica = store.load_replica_info(&namespace.id())?;
1016        assert_eq!(replica.capability.id(), namespace.id());
1017
1018        let author_back = store.get_author(&author.id())?.unwrap();
1019        assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1020
1021        let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1022        for i in 0..5 {
1023            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1024            let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1025            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1026            wrapper.entry_put(entry)?;
1027        }
1028
1029        // all
1030        let all: Vec<_> = wrapper.all()?.collect();
1031        assert_eq!(all.len(), 5);
1032
1033        // add a second version
1034        let mut ids = Vec::new();
1035        for i in 0..5 {
1036            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1037            let entry = Entry::new(
1038                id.clone(),
1039                Record::current_from_data(format!("world-{i}-2")),
1040            );
1041            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1042            wrapper.entry_put(entry)?;
1043            ids.push(id);
1044        }
1045
1046        // get all
1047        let entries = wrapper
1048            .store
1049            .get_many(namespace.id(), Query::all())?
1050            .collect::<Result<Vec<_>>>()?;
1051        assert_eq!(entries.len(), 5);
1052
1053        // get all prefix
1054        let entries = wrapper
1055            .store
1056            .get_many(namespace.id(), Query::key_prefix("hello-"))?
1057            .collect::<Result<Vec<_>>>()?;
1058        assert_eq!(entries.len(), 5);
1059
1060        // delete and get
1061        for id in ids {
1062            let res = wrapper.get(&id)?;
1063            assert!(res.is_some());
1064            let out = wrapper.entry_remove(&id)?.unwrap();
1065            assert_eq!(out.entry().id(), &id);
1066            let res = wrapper.get(&id)?;
1067            assert!(res.is_none());
1068        }
1069
1070        // get latest
1071        let entries = wrapper
1072            .store
1073            .get_many(namespace.id(), Query::all())?
1074            .collect::<Result<Vec<_>>>()?;
1075        assert_eq!(entries.len(), 0);
1076
1077        Ok(())
1078    }
1079
1080    fn copy_and_modify(
1081        source: &Path,
1082        modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1083    ) -> Result<tempfile::NamedTempFile> {
1084        let dbfile = tempfile::NamedTempFile::new()?;
1085        std::fs::copy(source, dbfile.path())?;
1086        let db = Database::create(dbfile.path())?;
1087        let write_tx = db.begin_write()?;
1088        modify(&write_tx)?;
1089        write_tx.commit()?;
1090        drop(db);
1091        Ok(dbfile)
1092    }
1093
1094    #[test]
1095    fn test_migration_001_populate_latest_table() -> Result<()> {
1096        let dbfile = tempfile::NamedTempFile::new()?;
1097        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1098
1099        // create a store and add some data
1100        let expected = {
1101            let mut store = Store::persistent(dbfile.path())?;
1102            let author1 = store.new_author(&mut rand::thread_rng())?;
1103            let author2 = store.new_author(&mut rand::thread_rng())?;
1104            let mut replica = store.new_replica(namespace.clone())?;
1105            replica.hash_and_insert(b"k1", &author1, b"v1")?;
1106            replica.hash_and_insert(b"k2", &author2, b"v1")?;
1107            replica.hash_and_insert(b"k3", &author1, b"v1")?;
1108
1109            let expected = store
1110                .get_latest_for_each_author(namespace.id())?
1111                .collect::<Result<Vec<_>>>()?;
1112            // drop everything to clear file locks.
1113            store.close_replica(namespace.id());
1114            // flush the store to disk
1115            store.flush()?;
1116            drop(store);
1117            expected
1118        };
1119        assert_eq!(expected.len(), 2);
1120
1121        // create a copy of our db file with the latest table deleted.
1122        let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1123            tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1124            Ok(())
1125        })?;
1126
1127        // open the copied db file, which will run the migration.
1128        let mut store = Store::persistent(dbfile_before_migration.path())?;
1129        let actual = store
1130            .get_latest_for_each_author(namespace.id())?
1131            .collect::<Result<Vec<_>>>()?;
1132
1133        assert_eq!(expected, actual);
1134        Ok(())
1135    }
1136
1137    #[test]
1138    fn test_migration_004_populate_by_key_index() -> Result<()> {
1139        let dbfile = tempfile::NamedTempFile::new()?;
1140
1141        let mut store = Store::persistent(dbfile.path())?;
1142
1143        // check that the new table is there, even if empty
1144        {
1145            let tables = store.tables()?;
1146            assert_eq!(tables.records_by_key.len()?, 0);
1147        }
1148
1149        // TODO: write test checking that the indexing is done correctly
1150        Ok(())
1151    }
1152}