iroh_docs/store/
fs.rs

1//! On disk storage for replicas.
2
3use std::{
4    cmp::Ordering,
5    collections::HashSet,
6    iter::{Chain, Flatten},
7    num::NonZeroU64,
8    ops::Bound,
9    path::Path,
10};
11
12use anyhow::{anyhow, Result};
13use ed25519_dalek::{SignatureError, VerifyingKey};
14use iroh_blobs::Hash;
15use rand_core::CryptoRngCore;
16use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable};
17use tracing::{info, warn};
18
19use super::{
20    pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21    Query,
22};
23use crate::{
24    actor::MAX_COMMIT_DELAY,
25    keys::Author,
26    ranger::{Fingerprint, Range, RangeEntry},
27    sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28    AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29    ReplicaInfo,
30};
31
32mod bounds;
33mod migrations;
34mod query;
35mod ranges;
36pub(crate) mod tables;
37
38pub use self::ranges::RecordsRange;
39use self::{
40    bounds::{ByKeyBounds, RecordsBounds},
41    query::QueryIterator,
42    ranges::RangeExt,
43    tables::{
44        LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
45        RecordsValue, Tables, TransactionAndTables,
46    },
47};
48
49/// Manages the replicas and authors for an instance.
50#[derive(Debug)]
51pub struct Store {
52    db: Database,
53    transaction: CurrentTransaction,
54    open_replicas: HashSet<NamespaceId>,
55    pubkeys: MemPublicKeyStore,
56}
57
58impl Drop for Store {
59    fn drop(&mut self) {
60        if let Err(err) = self.flush() {
61            warn!("failed to trigger final flush: {:?}", err);
62        }
63    }
64}
65
66impl AsRef<Store> for Store {
67    fn as_ref(&self) -> &Store {
68        self
69    }
70}
71
72impl AsMut<Store> for Store {
73    fn as_mut(&mut self) -> &mut Store {
74        self
75    }
76}
77
78#[derive(derive_more::Debug, Default)]
79#[allow(clippy::large_enum_variant)]
80enum CurrentTransaction {
81    #[default]
82    None,
83    Read(ReadOnlyTables),
84    Write(TransactionAndTables),
85}
86
87impl Store {
88    /// Create a new store in memory.
89    pub fn memory() -> Self {
90        Self::memory_impl().expect("failed to create memory store")
91    }
92
93    fn memory_impl() -> Result<Self> {
94        let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
95        Self::new_impl(db)
96    }
97
98    /// Create or open a store from a `path` to a database file.
99    ///
100    /// The file will be created if it does not exist, otherwise it will be opened.
101    pub fn persistent(path: impl AsRef<Path>) -> Result<Self> {
102        let mut db = match Database::create(&path) {
103            Ok(db) => db,
104            Err(DatabaseError::UpgradeRequired(1)) => return Err(
105                anyhow!("Opening the database failed: Upgrading from old format is no longer supported. Use iroh-docs 0.92 to perform the upgrade, then upgrade to the latest release again.")
106            ),
107            Err(err) => return Err(err.into()),
108        };
109        match db.upgrade() {
110            Ok(true) => info!("Database was upgraded to redb v3 compatible format"),
111            Ok(false) => {}
112            Err(err) => warn!("Database upgrade to redb v3 compatible format failed: {err:#}"),
113        }
114        Self::new_impl(db)
115    }
116
117    fn new_impl(db: redb::Database) -> Result<Self> {
118        // Setup all tables
119        let write_tx = db.begin_write()?;
120        let _ = Tables::new(&write_tx)?;
121        write_tx.commit()?;
122
123        // Run database migrations
124        migrations::run_migrations(&db)?;
125
126        Ok(Store {
127            db,
128            transaction: Default::default(),
129            open_replicas: Default::default(),
130            pubkeys: Default::default(),
131        })
132    }
133
134    /// Flush the current transaction, if any.
135    ///
136    /// This is the cheapest way to ensure that the data is persisted.
137    pub fn flush(&mut self) -> Result<()> {
138        if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
139            w.commit()?;
140        }
141        Ok(())
142    }
143
144    /// Get a read-only snapshot of the database.
145    ///
146    /// This has the side effect of committing any open write transaction,
147    /// so it can be used as a way to ensure that the data is persisted.
148    pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
149        let guard = &mut self.transaction;
150        let tables = match std::mem::take(guard) {
151            CurrentTransaction::None => {
152                let tx = self.db.begin_read()?;
153                ReadOnlyTables::new(tx)?
154            }
155            CurrentTransaction::Write(w) => {
156                w.commit()?;
157                let tx = self.db.begin_read()?;
158                ReadOnlyTables::new(tx)?
159            }
160            CurrentTransaction::Read(tables) => tables,
161        };
162        *guard = CurrentTransaction::Read(tables);
163        match &*guard {
164            CurrentTransaction::Read(ref tables) => Ok(tables),
165            _ => unreachable!(),
166        }
167    }
168
169    /// Get an owned read-only snapshot of the database.
170    ///
171    /// This will open a new read transaction. The read transaction won't be reused for other
172    /// reads.
173    ///
174    /// This has the side effect of committing any open write transaction,
175    /// so it can be used as a way to ensure that the data is persisted.
176    pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
177        // make sure the current transaction is committed
178        self.flush()?;
179        assert!(matches!(self.transaction, CurrentTransaction::None));
180        let tx = self.db.begin_read()?;
181        let tables = ReadOnlyTables::new(tx)?;
182        Ok(tables)
183    }
184
185    /// Get access to the tables to read from them.
186    ///
187    /// The underlying transaction is a write transaction, but with a non-mut
188    /// reference to the tables you can not write.
189    ///
190    /// There is no guarantee that this will be an independent transaction.
191    /// You just get readonly access to the current state of the database.
192    ///
193    /// As such, there is also no guarantee that the data you see is
194    /// already persisted.
195    fn tables(&mut self) -> Result<&Tables<'_>> {
196        let guard = &mut self.transaction;
197        let tables = match std::mem::take(guard) {
198            CurrentTransaction::None => {
199                let tx = self.db.begin_write()?;
200                TransactionAndTables::new(tx)?
201            }
202            CurrentTransaction::Write(w) => {
203                if w.since.elapsed() > MAX_COMMIT_DELAY {
204                    tracing::debug!("committing transaction because it's too old");
205                    w.commit()?;
206                    let tx = self.db.begin_write()?;
207                    TransactionAndTables::new(tx)?
208                } else {
209                    w
210                }
211            }
212            CurrentTransaction::Read(_) => {
213                let tx = self.db.begin_write()?;
214                TransactionAndTables::new(tx)?
215            }
216        };
217        *guard = CurrentTransaction::Write(tables);
218        match guard {
219            CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
220            _ => unreachable!(),
221        }
222    }
223
224    /// Get exclusive write access to the tables in the current transaction.
225    ///
226    /// There is no guarantee that this will be an independent transaction.
227    /// As such, there is also no guarantee that the data you see or write
228    /// will be persisted.
229    ///
230    /// To ensure that the data is persisted, acquire a snapshot of the database
231    /// or call flush.
232    fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
233        let guard = &mut self.transaction;
234        let tables = match std::mem::take(guard) {
235            CurrentTransaction::None => {
236                let tx = self.db.begin_write()?;
237                TransactionAndTables::new(tx)?
238            }
239            CurrentTransaction::Write(w) => {
240                if w.since.elapsed() > MAX_COMMIT_DELAY {
241                    tracing::debug!("committing transaction because it's too old");
242                    w.commit()?;
243                    let tx = self.db.begin_write()?;
244                    TransactionAndTables::new(tx)?
245                } else {
246                    w
247                }
248            }
249            CurrentTransaction::Read(_) => {
250                let tx = self.db.begin_write()?;
251                TransactionAndTables::new(tx)?
252            }
253        };
254        *guard = CurrentTransaction::Write(tables);
255        let res = match &mut *guard {
256            CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
257            _ => unreachable!(),
258        };
259        Ok(res)
260    }
261}
262
263type PeersIter = std::vec::IntoIter<PeerIdBytes>;
264
265impl Store {
266    /// Create a new replica for `namespace` and persist in this store.
267    pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
268        let id = namespace.id();
269        self.import_namespace(namespace.into())?;
270        self.open_replica(&id).map_err(Into::into)
271    }
272
273    /// Create a new author key and persist it in the store.
274    pub fn new_author<R: CryptoRngCore + ?Sized>(&mut self, rng: &mut R) -> Result<Author> {
275        let author = Author::new(rng);
276        self.import_author(author.clone())?;
277        Ok(author)
278    }
279
280    /// Check if a [`AuthorHeads`] contains entry timestamps that we do not have locally.
281    ///
282    /// Returns the number of authors that the other peer has updates for.
283    pub fn has_news_for_us(
284        &mut self,
285        namespace: NamespaceId,
286        heads: &AuthorHeads,
287    ) -> Result<Option<NonZeroU64>> {
288        let our_heads = {
289            let latest = self.get_latest_for_each_author(namespace)?;
290            let mut heads = AuthorHeads::default();
291            for e in latest {
292                let (author, timestamp, _key) = e?;
293                heads.insert(author, timestamp);
294            }
295            heads
296        };
297        let has_news_for_us = heads.has_news_for(&our_heads);
298        Ok(has_news_for_us)
299    }
300
301    /// Open a replica from this store.
302    ///
303    /// This just calls load_replica_info and then creates a new replica with the info.
304    pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
305        let info = self.load_replica_info(namespace_id)?;
306        let instance = StoreInstance::new(*namespace_id, self);
307        Ok(Replica::new(instance, Box::new(info)))
308    }
309
310    /// Load the replica info from the store.
311    pub fn load_replica_info(
312        &mut self,
313        namespace_id: &NamespaceId,
314    ) -> Result<ReplicaInfo, OpenError> {
315        let tables = self.tables()?;
316        let info = match tables.namespaces.get(namespace_id.as_bytes()) {
317            Ok(Some(db_value)) => {
318                let (raw_kind, raw_bytes) = db_value.value();
319                let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
320                ReplicaInfo::new(namespace)
321            }
322            Ok(None) => return Err(OpenError::NotFound),
323            Err(err) => return Err(OpenError::Other(err.into())),
324        };
325        self.open_replicas.insert(info.capability.id());
326        Ok(info)
327    }
328
329    /// Close a replica.
330    pub fn close_replica(&mut self, id: NamespaceId) {
331        self.open_replicas.remove(&id);
332    }
333
334    /// List all replica namespaces in this store.
335    pub fn list_namespaces(
336        &mut self,
337    ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
338        let snapshot = self.snapshot()?;
339        let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
340        let iter = iter.map(|res| {
341            let capability = parse_capability(res?.1.value())?;
342            Ok((capability.id(), capability.kind()))
343        });
344        Ok(iter)
345    }
346
347    /// Get an author key from the store.
348    pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
349        let tables = self.tables()?;
350        let Some(author) = tables.authors.get(author_id.as_bytes())? else {
351            return Ok(None);
352        };
353        let author = Author::from_bytes(author.value());
354        Ok(Some(author))
355    }
356
357    /// Import an author key pair.
358    pub fn import_author(&mut self, author: Author) -> Result<()> {
359        self.modify(|tables| {
360            tables
361                .authors
362                .insert(author.id().as_bytes(), &author.to_bytes())?;
363            Ok(())
364        })
365    }
366
367    /// Delete an author.
368    pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
369        self.modify(|tables| {
370            tables.authors.remove(author.as_bytes())?;
371            Ok(())
372        })
373    }
374
375    /// List all author keys in this store.
376    pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
377        let tables = self.snapshot()?;
378        let iter = tables
379            .authors
380            .range::<&'static [u8; 32]>(..)?
381            .map(|res| match res {
382                Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
383                Err(err) => Err(err.into()),
384            });
385        Ok(iter)
386    }
387
388    /// Import a new replica namespace.
389    pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
390        self.modify(|tables| {
391            let outcome = {
392                let (capability, outcome) = {
393                    let existing = tables.namespaces.get(capability.id().as_bytes())?;
394                    if let Some(existing) = existing {
395                        let mut existing = parse_capability(existing.value())?;
396                        let outcome = if existing.merge(capability)? {
397                            ImportNamespaceOutcome::Upgraded
398                        } else {
399                            ImportNamespaceOutcome::NoChange
400                        };
401                        (existing, outcome)
402                    } else {
403                        (capability, ImportNamespaceOutcome::Inserted)
404                    }
405                };
406                let id = capability.id().to_bytes();
407                let (kind, bytes) = capability.raw();
408                tables.namespaces.insert(&id, (kind, &bytes))?;
409                outcome
410            };
411            Ok(outcome)
412        })
413    }
414
415    /// Remove a replica.
416    ///
417    /// Completely removes a replica and deletes both the namespace private key and all document
418    /// entries.
419    ///
420    /// Note that a replica has to be closed before it can be removed. The store has to enforce
421    /// that a replica cannot be removed while it is still open.
422    pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
423        if self.open_replicas.contains(namespace) {
424            return Err(anyhow!("replica is not closed"));
425        }
426        self.modify(|tables| {
427            let bounds = RecordsBounds::namespace(*namespace);
428            tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
429            let bounds = ByKeyBounds::namespace(*namespace);
430            let _ = tables
431                .records_by_key
432                .retain_in(bounds.as_ref(), |_k, _v| false);
433            tables.namespaces.remove(namespace.as_bytes())?;
434            tables.namespace_peers.remove_all(namespace.as_bytes())?;
435            tables.download_policy.remove(namespace.as_bytes())?;
436            Ok(())
437        })
438    }
439
440    /// Get an iterator over entries of a replica.
441    pub fn get_many(
442        &mut self,
443        namespace: NamespaceId,
444        query: impl Into<Query>,
445    ) -> Result<QueryIterator> {
446        let tables = self.snapshot_owned()?;
447        QueryIterator::new(tables, namespace, query.into())
448    }
449
450    /// Get an entry by key and author.
451    pub fn get_exact(
452        &mut self,
453        namespace: NamespaceId,
454        author: AuthorId,
455        key: impl AsRef<[u8]>,
456        include_empty: bool,
457    ) -> Result<Option<SignedEntry>> {
458        get_exact(
459            &self.tables()?.records,
460            namespace,
461            author,
462            key,
463            include_empty,
464        )
465    }
466
467    /// Get all content hashes of all replicas in the store.
468    pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
469        let tables = self.snapshot_owned()?;
470        ContentHashesIterator::all(&tables.records)
471    }
472
473    /// Get the latest entry for each author in a namespace.
474    pub fn get_latest_for_each_author(
475        &mut self,
476        namespace: NamespaceId,
477    ) -> Result<LatestIterator<'_>> {
478        LatestIterator::new(&self.tables()?.latest_per_author, namespace)
479    }
480
481    /// Register a peer that has been useful to sync a document.
482    pub fn register_useful_peer(
483        &mut self,
484        namespace: NamespaceId,
485        peer: crate::PeerIdBytes,
486    ) -> Result<()> {
487        let peer = &peer;
488        let namespace = namespace.as_bytes();
489        // calculate nanos since UNIX_EPOCH for a time measurement
490        let nanos = std::time::UNIX_EPOCH
491            .elapsed()
492            .map(|duration| duration.as_nanos() as u64)?;
493        self.modify(|tables| {
494            // ensure the document exists
495            anyhow::ensure!(
496                tables.namespaces.get(namespace)?.is_some(),
497                "document not created"
498            );
499
500            let mut namespace_peers = tables.namespace_peers.get(namespace)?;
501
502            // get the oldest entry since it's candidate for removal
503            let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
504                let (oldest_nanos, &oldest_peer) = guard.value();
505                (oldest_nanos, oldest_peer)
506            });
507            match maybe_oldest {
508                None => {
509                    // the table is empty so the peer can be inserted without further checks since
510                    // super::PEERS_PER_DOC_CACHE_SIZE is non zero
511                    drop(namespace_peers);
512                    tables.namespace_peers.insert(namespace, (nanos, peer))?;
513                }
514                Some((oldest_nanos, oldest_peer)) => {
515                    let oldest_peer = &oldest_peer;
516
517                    if oldest_peer == peer {
518                        // oldest peer is the current one, so replacing the entry for the peer will
519                        // maintain the size
520                        drop(namespace_peers);
521                        tables
522                            .namespace_peers
523                            .remove(namespace, (oldest_nanos, oldest_peer))?;
524                        tables.namespace_peers.insert(namespace, (nanos, peer))?;
525                    } else {
526                        // calculate the len in the same loop since calling `len` is another fallible operation
527                        let mut len = 1;
528                        // find any previous entry for the same peer to remove it
529                        let mut prev_peer_nanos = None;
530
531                        for result in namespace_peers {
532                            len += 1;
533                            let guard = result?;
534                            let (peer_nanos, peer_bytes) = guard.value();
535                            if prev_peer_nanos.is_none() && peer_bytes == peer {
536                                prev_peer_nanos = Some(peer_nanos)
537                            }
538                        }
539
540                        match prev_peer_nanos {
541                            Some(prev_nanos) => {
542                                // the peer was already present, so we can remove the old entry and
543                                // insert the new one without checking the size
544                                tables
545                                    .namespace_peers
546                                    .remove(namespace, (prev_nanos, peer))?;
547                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
548                            }
549                            None => {
550                                // the peer is new and the table is non empty, add it and check the
551                                // size to decide if the oldest peer should be evicted
552                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
553                                len += 1;
554                                if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
555                                    tables
556                                        .namespace_peers
557                                        .remove(namespace, (oldest_nanos, oldest_peer))?;
558                                }
559                            }
560                        }
561                    }
562                }
563            }
564            Ok(())
565        })
566    }
567
568    /// Get the peers that have been useful for a document.
569    pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
570        let tables = self.tables()?;
571        let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
572        for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
573            let (_nanos, &peer) = result?.value();
574            peers.push(peer);
575        }
576        if peers.is_empty() {
577            Ok(None)
578        } else {
579            Ok(Some(peers.into_iter()))
580        }
581    }
582
583    /// Set the download policy for a namespace.
584    pub fn set_download_policy(
585        &mut self,
586        namespace: &NamespaceId,
587        policy: DownloadPolicy,
588    ) -> Result<()> {
589        self.modify(|tables| {
590            let namespace = namespace.as_bytes();
591
592            // ensure the document exists
593            anyhow::ensure!(
594                tables.namespaces.get(&namespace)?.is_some(),
595                "document not created"
596            );
597
598            let value = postcard::to_stdvec(&policy)?;
599            tables.download_policy.insert(namespace, value.as_slice())?;
600            Ok(())
601        })
602    }
603
604    /// Get the download policy for a namespace.
605    pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
606        let tables = self.tables()?;
607        let value = tables.download_policy.get(namespace.as_bytes())?;
608        Ok(match value {
609            None => DownloadPolicy::default(),
610            Some(value) => postcard::from_bytes(value.value())?,
611        })
612    }
613}
614
615impl PublicKeyStore for Store {
616    fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
617        self.pubkeys.public_key(id)
618    }
619}
620
621fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
622    Capability::from_raw(raw_kind, raw_bytes)
623}
624
625fn get_exact(
626    record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
627    namespace: NamespaceId,
628    author: AuthorId,
629    key: impl AsRef<[u8]>,
630    include_empty: bool,
631) -> Result<Option<SignedEntry>> {
632    let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
633    let record = record_table.get(id)?;
634    Ok(record
635        .map(|r| into_entry(id, r.value()))
636        .filter(|entry| include_empty || !entry.is_empty()))
637}
638
639/// A wrapper around [`Store`] for a specific [`NamespaceId`]
640#[derive(Debug)]
641pub struct StoreInstance<'a> {
642    namespace: NamespaceId,
643    pub(crate) store: &'a mut Store,
644}
645
646impl<'a> StoreInstance<'a> {
647    pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
648        StoreInstance { namespace, store }
649    }
650}
651
652impl PublicKeyStore for StoreInstance<'_> {
653    fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
654        self.store.public_key(id)
655    }
656}
657
658impl super::DownloadPolicyStore for StoreInstance<'_> {
659    fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
660        self.store.get_download_policy(namespace)
661    }
662}
663
664impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
665    type Error = anyhow::Error;
666    type RangeIterator<'x>
667        = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
668    where
669        'a: 'x;
670    type ParentIterator<'x>
671        = ParentIterator
672    where
673        'a: 'x;
674
675    /// Get a the first key (or the default if none is available).
676    fn get_first(&mut self) -> Result<RecordIdentifier> {
677        let tables = self.store.as_mut().tables()?;
678        // TODO: verify this fetches all keys with this namespace
679        let bounds = RecordsBounds::namespace(self.namespace);
680        let mut records = tables.records.range(bounds.as_ref())?;
681
682        let Some(record) = records.next() else {
683            return Ok(RecordIdentifier::default());
684        };
685        let (compound_key, _value) = record?;
686        let (namespace_id, author_id, key) = compound_key.value();
687        let id = RecordIdentifier::new(namespace_id, author_id, key);
688        Ok(id)
689    }
690
691    #[cfg(test)]
692    fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
693        self.store
694            .as_mut()
695            .get_exact(id.namespace(), id.author(), id.key(), true)
696    }
697
698    #[cfg(test)]
699    fn len(&mut self) -> Result<usize> {
700        let tables = self.store.as_mut().tables()?;
701        let bounds = RecordsBounds::namespace(self.namespace);
702        let records = tables.records.range(bounds.as_ref())?;
703        Ok(records.count())
704    }
705
706    #[cfg(test)]
707    fn is_empty(&mut self) -> Result<bool> {
708        use redb::ReadableTableMetadata;
709        let tables = self.store.as_mut().tables()?;
710        Ok(tables.records.is_empty()?)
711    }
712
713    fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
714        // TODO: optimize
715        let elements = self.get_range(range.clone())?;
716
717        let mut fp = Fingerprint::empty();
718        for el in elements {
719            let el = el?;
720            fp ^= el.as_fingerprint();
721        }
722
723        Ok(fp)
724    }
725
726    fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
727        let id = e.id();
728        self.store.as_mut().modify(|tables| {
729            // insert into record table
730            let key = (
731                &id.namespace().to_bytes(),
732                &id.author().to_bytes(),
733                id.key(),
734            );
735            let hash = e.content_hash(); // let binding is needed
736            let value = (
737                e.timestamp(),
738                &e.signature().namespace().to_bytes(),
739                &e.signature().author().to_bytes(),
740                e.content_len(),
741                hash.as_bytes(),
742            );
743            tables.records.insert(key, value)?;
744
745            // insert into by key index table
746            let key = (
747                &id.namespace().to_bytes(),
748                id.key(),
749                &id.author().to_bytes(),
750            );
751            tables.records_by_key.insert(key, ())?;
752
753            // insert into latest table
754            let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
755            let value = (e.timestamp(), e.id().key());
756            tables.latest_per_author.insert(key, value)?;
757            Ok(())
758        })
759    }
760
761    fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
762        let tables = self.store.as_mut().tables()?;
763        let iter = match range.x().cmp(range.y()) {
764            // identity range: iter1 = all, iter2 = none
765            Ordering::Equal => {
766                // iterator for all entries in replica
767                let bounds = RecordsBounds::namespace(self.namespace);
768                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
769                chain_none(iter)
770            }
771            // regular range: iter1 = x <= t < y, iter2 = none
772            Ordering::Less => {
773                // iterator for entries from range.x to range.y
774                let start = Bound::Included(range.x().to_byte_tuple());
775                let end = Bound::Excluded(range.y().to_byte_tuple());
776                let bounds = RecordsBounds::new(start, end);
777                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
778                chain_none(iter)
779            }
780            // split range: iter1 = start <= t < y, iter2 = x <= t <= end
781            Ordering::Greater => {
782                // iterator for entries from start to range.y
783                let end = Bound::Excluded(range.y().to_byte_tuple());
784                let bounds = RecordsBounds::from_start(&self.namespace, end);
785                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
786
787                // iterator for entries from range.x to end
788                let start = Bound::Included(range.x().to_byte_tuple());
789                let bounds = RecordsBounds::to_end(&self.namespace, start);
790                let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
791
792                iter.chain(Some(iter2).into_iter().flatten())
793            }
794        };
795        Ok(iter)
796    }
797
798    #[cfg(test)]
799    fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
800        self.store.as_mut().modify(|tables| {
801            let entry = {
802                let (namespace, author, key) = id.as_byte_tuple();
803                let id = (namespace, key, author);
804                tables.records_by_key.remove(id)?;
805                let id = (namespace, author, key);
806                let value = tables.records.remove(id)?;
807                value.map(|value| into_entry(id, value.value()))
808            };
809            Ok(entry)
810        })
811    }
812
813    #[cfg(test)]
814    fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
815        let tables = self.store.as_mut().tables()?;
816        let bounds = RecordsBounds::namespace(self.namespace);
817        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
818        Ok(chain_none(iter))
819    }
820
821    fn prefixes_of(
822        &mut self,
823        id: &RecordIdentifier,
824    ) -> Result<Self::ParentIterator<'_>, Self::Error> {
825        let tables = self.store.as_mut().tables()?;
826        ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
827    }
828
829    #[cfg(test)]
830    fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
831        let tables = self.store.as_mut().tables()?;
832        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
833        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
834        Ok(chain_none(iter))
835    }
836
837    fn remove_prefix_filtered(
838        &mut self,
839        id: &RecordIdentifier,
840        predicate: impl Fn(&Record) -> bool,
841    ) -> Result<usize> {
842        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
843        self.store.as_mut().modify(|tables| {
844            let cb = |_k: RecordsId, v: RecordsValue| {
845                let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
846                let record = Record::new(hash.into(), len, timestamp);
847
848                predicate(&record)
849            };
850            let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
851            let count = iter.count();
852            Ok(count)
853        })
854    }
855}
856
857fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
858    iter: I,
859) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
860    iter.chain(None.into_iter().flatten())
861}
862
863/// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which
864/// is a prefix of the key passed to the iterator.
865#[derive(Debug)]
866pub struct ParentIterator {
867    inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
868}
869
870impl ParentIterator {
871    fn new(
872        tables: &Tables,
873        namespace: NamespaceId,
874        author: AuthorId,
875        key: Vec<u8>,
876    ) -> anyhow::Result<Self> {
877        let parents = parents(&tables.records, namespace, author, key.clone());
878        Ok(Self {
879            inner: parents.into_iter(),
880        })
881    }
882}
883
884fn parents(
885    table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
886    namespace: NamespaceId,
887    author: AuthorId,
888    mut key: Vec<u8>,
889) -> Vec<anyhow::Result<SignedEntry>> {
890    let mut res = Vec::new();
891
892    while !key.is_empty() {
893        let entry = get_exact(table, namespace, author, &key, false);
894        key.pop();
895        match entry {
896            Err(err) => res.push(Err(err)),
897            Ok(Some(entry)) => res.push(Ok(entry)),
898            Ok(None) => continue,
899        }
900    }
901    res.reverse();
902    res
903}
904
905impl Iterator for ParentIterator {
906    type Item = Result<SignedEntry>;
907
908    fn next(&mut self) -> Option<Self::Item> {
909        self.inner.next()
910    }
911}
912
913/// Iterator for all content hashes
914///
915/// Note that you might get duplicate hashes. Also, the iterator will keep
916/// a database snapshot open until it is dropped.
917///
918/// Also, this represents a snapshot of the database at the time of creation.
919/// It needs a copy of a redb::ReadOnlyTable to be self-contained.
920#[derive(derive_more::Debug)]
921pub struct ContentHashesIterator {
922    #[debug(skip)]
923    range: RecordsRange<'static>,
924}
925
926impl ContentHashesIterator {
927    /// Create a new iterator over all content hashes.
928    pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
929        let range = RecordsRange::all_static(table)?;
930        Ok(Self { range })
931    }
932}
933
934impl Iterator for ContentHashesIterator {
935    type Item = Result<Hash>;
936
937    fn next(&mut self) -> Option<Self::Item> {
938        let v = self.range.next()?;
939        Some(v.map(|e| e.content_hash()))
940    }
941}
942
943/// Iterator over the latest entry per author.
944#[derive(derive_more::Debug)]
945#[debug("LatestIterator")]
946pub struct LatestIterator<'a>(
947    redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
948);
949
950impl<'a> LatestIterator<'a> {
951    fn new(
952        latest_per_author: &'a impl ReadableTable<
953            LatestPerAuthorKey<'static>,
954            LatestPerAuthorValue<'static>,
955        >,
956        namespace: NamespaceId,
957    ) -> anyhow::Result<Self> {
958        let start = (namespace.as_bytes(), &[u8::MIN; 32]);
959        let end = (namespace.as_bytes(), &[u8::MAX; 32]);
960        let range = latest_per_author.range(start..=end)?;
961        Ok(Self(range))
962    }
963}
964
965impl Iterator for LatestIterator<'_> {
966    type Item = Result<(AuthorId, u64, Vec<u8>)>;
967
968    fn next(&mut self) -> Option<Self::Item> {
969        self.0.next_map(|key, value| {
970            let (_namespace, author) = key;
971            let (timestamp, key) = value;
972            (author.into(), timestamp, key.to_vec())
973        })
974    }
975}
976
977fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
978    let (namespace, author, key) = key;
979    let (timestamp, namespace_sig, author_sig, len, hash) = value;
980    let id = RecordIdentifier::new(namespace, author, key);
981    let record = Record::new(hash.into(), len, timestamp);
982    let entry = Entry::new(id, record);
983    let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
984    SignedEntry::new(entry_signature, entry)
985}
986
987#[cfg(test)]
988mod tests {
989    use super::{tables::LATEST_PER_AUTHOR_TABLE, *};
990    use crate::ranger::Store as _;
991
992    #[test]
993    fn test_ranges() -> Result<()> {
994        let dbfile = tempfile::NamedTempFile::new()?;
995        let mut store = Store::persistent(dbfile.path())?;
996
997        let author = store.new_author(&mut rand::thread_rng())?;
998        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
999        let mut replica = store.new_replica(namespace.clone())?;
1000
1001        // test author prefix relation for all-255 keys
1002        let key1 = vec![255, 255];
1003        let key2 = vec![255, 255, 255];
1004        replica.hash_and_insert(&key1, &author, b"v1")?;
1005        replica.hash_and_insert(&key2, &author, b"v2")?;
1006        let res = store
1007            .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
1008            .collect::<Result<Vec<_>>>()?;
1009        assert_eq!(res.len(), 2);
1010        assert_eq!(
1011            res.into_iter()
1012                .map(|entry| entry.key().to_vec())
1013                .collect::<Vec<_>>(),
1014            vec![key1, key2]
1015        );
1016        Ok(())
1017    }
1018
1019    #[test]
1020    fn test_basics() -> Result<()> {
1021        let dbfile = tempfile::NamedTempFile::new()?;
1022        let mut store = Store::persistent(dbfile.path())?;
1023
1024        let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1025        assert!(authors.is_empty());
1026
1027        let author = store.new_author(&mut rand::thread_rng())?;
1028        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1029        let _replica = store.new_replica(namespace.clone())?;
1030        store.close_replica(namespace.id());
1031        let replica = store.load_replica_info(&namespace.id())?;
1032        assert_eq!(replica.capability.id(), namespace.id());
1033
1034        let author_back = store.get_author(&author.id())?.unwrap();
1035        assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1036
1037        let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1038        for i in 0..5 {
1039            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1040            let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1041            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1042            wrapper.entry_put(entry)?;
1043        }
1044
1045        // all
1046        let all: Vec<_> = wrapper.all()?.collect();
1047        assert_eq!(all.len(), 5);
1048
1049        // add a second version
1050        let mut ids = Vec::new();
1051        for i in 0..5 {
1052            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1053            let entry = Entry::new(
1054                id.clone(),
1055                Record::current_from_data(format!("world-{i}-2")),
1056            );
1057            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1058            wrapper.entry_put(entry)?;
1059            ids.push(id);
1060        }
1061
1062        // get all
1063        let entries = wrapper
1064            .store
1065            .get_many(namespace.id(), Query::all())?
1066            .collect::<Result<Vec<_>>>()?;
1067        assert_eq!(entries.len(), 5);
1068
1069        // get all prefix
1070        let entries = wrapper
1071            .store
1072            .get_many(namespace.id(), Query::key_prefix("hello-"))?
1073            .collect::<Result<Vec<_>>>()?;
1074        assert_eq!(entries.len(), 5);
1075
1076        // delete and get
1077        for id in ids {
1078            let res = wrapper.get(&id)?;
1079            assert!(res.is_some());
1080            let out = wrapper.entry_remove(&id)?.unwrap();
1081            assert_eq!(out.entry().id(), &id);
1082            let res = wrapper.get(&id)?;
1083            assert!(res.is_none());
1084        }
1085
1086        // get latest
1087        let entries = wrapper
1088            .store
1089            .get_many(namespace.id(), Query::all())?
1090            .collect::<Result<Vec<_>>>()?;
1091        assert_eq!(entries.len(), 0);
1092
1093        Ok(())
1094    }
1095
1096    fn copy_and_modify(
1097        source: &Path,
1098        modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1099    ) -> Result<tempfile::NamedTempFile> {
1100        let dbfile = tempfile::NamedTempFile::new()?;
1101        std::fs::copy(source, dbfile.path())?;
1102        let db = Database::create(dbfile.path())?;
1103        let write_tx = db.begin_write()?;
1104        modify(&write_tx)?;
1105        write_tx.commit()?;
1106        drop(db);
1107        Ok(dbfile)
1108    }
1109
1110    #[test]
1111    fn test_migration_001_populate_latest_table() -> Result<()> {
1112        let dbfile = tempfile::NamedTempFile::new()?;
1113        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1114
1115        // create a store and add some data
1116        let expected = {
1117            let mut store = Store::persistent(dbfile.path())?;
1118            let author1 = store.new_author(&mut rand::thread_rng())?;
1119            let author2 = store.new_author(&mut rand::thread_rng())?;
1120            let mut replica = store.new_replica(namespace.clone())?;
1121            replica.hash_and_insert(b"k1", &author1, b"v1")?;
1122            replica.hash_and_insert(b"k2", &author2, b"v1")?;
1123            replica.hash_and_insert(b"k3", &author1, b"v1")?;
1124
1125            let expected = store
1126                .get_latest_for_each_author(namespace.id())?
1127                .collect::<Result<Vec<_>>>()?;
1128            // drop everything to clear file locks.
1129            store.close_replica(namespace.id());
1130            // flush the store to disk
1131            store.flush()?;
1132            drop(store);
1133            expected
1134        };
1135        assert_eq!(expected.len(), 2);
1136
1137        // create a copy of our db file with the latest table deleted.
1138        let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1139            tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1140            Ok(())
1141        })?;
1142
1143        // open the copied db file, which will run the migration.
1144        let mut store = Store::persistent(dbfile_before_migration.path())?;
1145        let actual = store
1146            .get_latest_for_each_author(namespace.id())?
1147            .collect::<Result<Vec<_>>>()?;
1148
1149        assert_eq!(expected, actual);
1150        Ok(())
1151    }
1152
1153    #[test]
1154    fn test_migration_004_populate_by_key_index() -> Result<()> {
1155        use redb::ReadableTableMetadata;
1156        let dbfile = tempfile::NamedTempFile::new()?;
1157
1158        let mut store = Store::persistent(dbfile.path())?;
1159
1160        // check that the new table is there, even if empty
1161        {
1162            let tables = store.tables()?;
1163            assert_eq!(tables.records_by_key.len()?, 0);
1164        }
1165
1166        // TODO: write test checking that the indexing is done correctly
1167        Ok(())
1168    }
1169}