1use std::{
4 cmp::Ordering,
5 collections::HashSet,
6 iter::{Chain, Flatten},
7 num::NonZeroU64,
8 ops::Bound,
9};
10
11use anyhow::{anyhow, Result};
12use ed25519_dalek::{SignatureError, VerifyingKey};
13use iroh_blobs::Hash;
14use n0_future::time::SystemTime;
15use rand::CryptoRng;
16use redb::{Database, ReadableMultimapTable, ReadableTable};
17use tracing::warn;
18
19use super::{
20 pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21 Query,
22};
23use crate::{
24 actor::MAX_COMMIT_DELAY,
25 keys::Author,
26 ranger::{Fingerprint, Range, RangeEntry},
27 sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28 AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29 ReplicaInfo,
30};
31
32mod bounds;
33mod migrations;
34mod query;
35mod ranges;
36pub(crate) mod tables;
37
38pub use self::ranges::RecordsRange;
39use self::{
40 bounds::{ByKeyBounds, RecordsBounds},
41 query::QueryIterator,
42 ranges::RangeExt,
43 tables::{
44 LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
45 RecordsValue, Tables, TransactionAndTables,
46 },
47};
48
49#[derive(Debug)]
51pub struct Store {
52 db: Database,
53 transaction: CurrentTransaction,
54 open_replicas: HashSet<NamespaceId>,
55 pubkeys: MemPublicKeyStore,
56}
57
58impl Drop for Store {
59 fn drop(&mut self) {
60 if let Err(err) = self.flush() {
61 warn!("failed to trigger final flush: {:?}", err);
62 }
63 }
64}
65
66impl AsRef<Store> for Store {
67 fn as_ref(&self) -> &Store {
68 self
69 }
70}
71
72impl AsMut<Store> for Store {
73 fn as_mut(&mut self) -> &mut Store {
74 self
75 }
76}
77
78#[derive(derive_more::Debug, Default)]
79#[allow(clippy::large_enum_variant)]
80enum CurrentTransaction {
81 #[default]
82 None,
83 Read(ReadOnlyTables),
84 Write(TransactionAndTables),
85}
86
87impl Store {
88 pub fn memory() -> Self {
90 Self::memory_impl().expect("failed to create memory store")
91 }
92
93 fn memory_impl() -> Result<Self> {
94 let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
95 Self::new_impl(db)
96 }
97
98 #[cfg(feature = "fs-store")]
102 pub fn persistent(path: impl AsRef<std::path::Path>) -> Result<Self> {
103 let mut db = match Database::create(&path) {
104 Ok(db) => db,
105 Err(redb::DatabaseError::UpgradeRequired(1)) => return Err(
106 anyhow!("Opening the database failed: Upgrading from old format is no longer supported. Use iroh-docs 0.92 to perform the upgrade, then upgrade to the latest release again.")
107 ),
108 Err(err) => return Err(err.into()),
109 };
110 match db.upgrade() {
111 Ok(true) => tracing::info!("Database was upgraded to redb v3 compatible format"),
112 Ok(false) => {}
113 Err(err) => warn!("Database upgrade to redb v3 compatible format failed: {err:#}"),
114 }
115 Self::new_impl(db)
116 }
117
118 fn new_impl(db: redb::Database) -> Result<Self> {
119 let write_tx = db.begin_write()?;
121 let _ = Tables::new(&write_tx)?;
122 write_tx.commit()?;
123
124 migrations::run_migrations(&db)?;
126
127 Ok(Store {
128 db,
129 transaction: Default::default(),
130 open_replicas: Default::default(),
131 pubkeys: Default::default(),
132 })
133 }
134
135 pub fn flush(&mut self) -> Result<()> {
139 if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
140 w.commit()?;
141 }
142 Ok(())
143 }
144
145 pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
150 let guard = &mut self.transaction;
151 let tables = match std::mem::take(guard) {
152 CurrentTransaction::None => {
153 let tx = self.db.begin_read()?;
154 ReadOnlyTables::new(tx)?
155 }
156 CurrentTransaction::Write(w) => {
157 w.commit()?;
158 let tx = self.db.begin_read()?;
159 ReadOnlyTables::new(tx)?
160 }
161 CurrentTransaction::Read(tables) => tables,
162 };
163 *guard = CurrentTransaction::Read(tables);
164 match &*guard {
165 CurrentTransaction::Read(ref tables) => Ok(tables),
166 _ => unreachable!(),
167 }
168 }
169
170 pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
178 self.flush()?;
180 assert!(matches!(self.transaction, CurrentTransaction::None));
181 let tx = self.db.begin_read()?;
182 let tables = ReadOnlyTables::new(tx)?;
183 Ok(tables)
184 }
185
186 fn tables(&mut self) -> Result<&Tables<'_>> {
197 let guard = &mut self.transaction;
198 let tables = match std::mem::take(guard) {
199 CurrentTransaction::None => {
200 let tx = self.db.begin_write()?;
201 TransactionAndTables::new(tx)?
202 }
203 CurrentTransaction::Write(w) => {
204 if w.since.elapsed() > MAX_COMMIT_DELAY {
205 tracing::debug!("committing transaction because it's too old");
206 w.commit()?;
207 let tx = self.db.begin_write()?;
208 TransactionAndTables::new(tx)?
209 } else {
210 w
211 }
212 }
213 CurrentTransaction::Read(_) => {
214 let tx = self.db.begin_write()?;
215 TransactionAndTables::new(tx)?
216 }
217 };
218 *guard = CurrentTransaction::Write(tables);
219 match guard {
220 CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
221 _ => unreachable!(),
222 }
223 }
224
225 fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
234 let guard = &mut self.transaction;
235 let tables = match std::mem::take(guard) {
236 CurrentTransaction::None => {
237 let tx = self.db.begin_write()?;
238 TransactionAndTables::new(tx)?
239 }
240 CurrentTransaction::Write(w) => {
241 if w.since.elapsed() > MAX_COMMIT_DELAY {
242 tracing::debug!("committing transaction because it's too old");
243 w.commit()?;
244 let tx = self.db.begin_write()?;
245 TransactionAndTables::new(tx)?
246 } else {
247 w
248 }
249 }
250 CurrentTransaction::Read(_) => {
251 let tx = self.db.begin_write()?;
252 TransactionAndTables::new(tx)?
253 }
254 };
255 *guard = CurrentTransaction::Write(tables);
256 let res = match &mut *guard {
257 CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
258 _ => unreachable!(),
259 };
260 Ok(res)
261 }
262}
263
264type PeersIter = std::vec::IntoIter<PeerIdBytes>;
265
266impl Store {
267 pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
269 let id = namespace.id();
270 self.import_namespace(namespace.into())?;
271 self.open_replica(&id).map_err(Into::into)
272 }
273
274 pub fn new_author<R: CryptoRng>(&mut self, rng: &mut R) -> Result<Author> {
276 let author = Author::new(rng);
277 self.import_author(author.clone())?;
278 Ok(author)
279 }
280
281 pub fn has_news_for_us(
285 &mut self,
286 namespace: NamespaceId,
287 heads: &AuthorHeads,
288 ) -> Result<Option<NonZeroU64>> {
289 let our_heads = {
290 let latest = self.get_latest_for_each_author(namespace)?;
291 let mut heads = AuthorHeads::default();
292 for e in latest {
293 let (author, timestamp, _key) = e?;
294 heads.insert(author, timestamp);
295 }
296 heads
297 };
298 let has_news_for_us = heads.has_news_for(&our_heads);
299 Ok(has_news_for_us)
300 }
301
302 pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
306 let info = self.load_replica_info(namespace_id)?;
307 let instance = StoreInstance::new(*namespace_id, self);
308 Ok(Replica::new(instance, Box::new(info)))
309 }
310
311 pub fn load_replica_info(
313 &mut self,
314 namespace_id: &NamespaceId,
315 ) -> Result<ReplicaInfo, OpenError> {
316 let tables = self.tables()?;
317 let info = match tables.namespaces.get(namespace_id.as_bytes()) {
318 Ok(Some(db_value)) => {
319 let (raw_kind, raw_bytes) = db_value.value();
320 let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
321 ReplicaInfo::new(namespace)
322 }
323 Ok(None) => return Err(OpenError::NotFound),
324 Err(err) => return Err(OpenError::Other(err.into())),
325 };
326 self.open_replicas.insert(info.capability.id());
327 Ok(info)
328 }
329
330 pub fn close_replica(&mut self, id: NamespaceId) {
332 self.open_replicas.remove(&id);
333 }
334
335 pub fn list_namespaces(
337 &mut self,
338 ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
339 let snapshot = self.snapshot()?;
340 let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
341 let iter = iter.map(|res| {
342 let capability = parse_capability(res?.1.value())?;
343 Ok((capability.id(), capability.kind()))
344 });
345 Ok(iter)
346 }
347
348 pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
350 let tables = self.tables()?;
351 let Some(author) = tables.authors.get(author_id.as_bytes())? else {
352 return Ok(None);
353 };
354 let author = Author::from_bytes(author.value());
355 Ok(Some(author))
356 }
357
358 pub fn import_author(&mut self, author: Author) -> Result<()> {
360 self.modify(|tables| {
361 tables
362 .authors
363 .insert(author.id().as_bytes(), &author.to_bytes())?;
364 Ok(())
365 })
366 }
367
368 pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
370 self.modify(|tables| {
371 tables.authors.remove(author.as_bytes())?;
372 Ok(())
373 })
374 }
375
376 pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
378 let tables = self.snapshot()?;
379 let iter = tables
380 .authors
381 .range::<&'static [u8; 32]>(..)?
382 .map(|res| match res {
383 Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
384 Err(err) => Err(err.into()),
385 });
386 Ok(iter)
387 }
388
389 pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
391 self.modify(|tables| {
392 let outcome = {
393 let (capability, outcome) = {
394 let existing = tables.namespaces.get(capability.id().as_bytes())?;
395 if let Some(existing) = existing {
396 let mut existing = parse_capability(existing.value())?;
397 let outcome = if existing.merge(capability)? {
398 ImportNamespaceOutcome::Upgraded
399 } else {
400 ImportNamespaceOutcome::NoChange
401 };
402 (existing, outcome)
403 } else {
404 (capability, ImportNamespaceOutcome::Inserted)
405 }
406 };
407 let id = capability.id().to_bytes();
408 let (kind, bytes) = capability.raw();
409 tables.namespaces.insert(&id, (kind, &bytes))?;
410 outcome
411 };
412 Ok(outcome)
413 })
414 }
415
416 pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
424 if self.open_replicas.contains(namespace) {
425 return Err(anyhow!("replica is not closed"));
426 }
427 self.modify(|tables| {
428 let bounds = RecordsBounds::namespace(*namespace);
429 tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
430 let bounds = ByKeyBounds::namespace(*namespace);
431 let _ = tables
432 .records_by_key
433 .retain_in(bounds.as_ref(), |_k, _v| false);
434 tables.namespaces.remove(namespace.as_bytes())?;
435 tables.namespace_peers.remove_all(namespace.as_bytes())?;
436 tables.download_policy.remove(namespace.as_bytes())?;
437 Ok(())
438 })
439 }
440
441 pub fn get_many(
443 &mut self,
444 namespace: NamespaceId,
445 query: impl Into<Query>,
446 ) -> Result<QueryIterator> {
447 let tables = self.snapshot_owned()?;
448 QueryIterator::new(tables, namespace, query.into())
449 }
450
451 pub fn get_exact(
453 &mut self,
454 namespace: NamespaceId,
455 author: AuthorId,
456 key: impl AsRef<[u8]>,
457 include_empty: bool,
458 ) -> Result<Option<SignedEntry>> {
459 get_exact(
460 &self.tables()?.records,
461 namespace,
462 author,
463 key,
464 include_empty,
465 )
466 }
467
468 pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
470 let tables = self.snapshot_owned()?;
471 ContentHashesIterator::all(&tables.records)
472 }
473
474 pub fn get_latest_for_each_author(
476 &mut self,
477 namespace: NamespaceId,
478 ) -> Result<LatestIterator<'_>> {
479 LatestIterator::new(&self.tables()?.latest_per_author, namespace)
480 }
481
482 pub fn register_useful_peer(
484 &mut self,
485 namespace: NamespaceId,
486 peer: crate::PeerIdBytes,
487 ) -> Result<()> {
488 let peer = &peer;
489 let namespace = namespace.as_bytes();
490 let nanos = SystemTime::UNIX_EPOCH
492 .elapsed()
493 .map(|duration| duration.as_nanos() as u64)?;
494 self.modify(|tables| {
495 anyhow::ensure!(
497 tables.namespaces.get(namespace)?.is_some(),
498 "document not created"
499 );
500
501 let mut namespace_peers = tables.namespace_peers.get(namespace)?;
502
503 let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
505 let (oldest_nanos, &oldest_peer) = guard.value();
506 (oldest_nanos, oldest_peer)
507 });
508 match maybe_oldest {
509 None => {
510 drop(namespace_peers);
513 tables.namespace_peers.insert(namespace, (nanos, peer))?;
514 }
515 Some((oldest_nanos, oldest_peer)) => {
516 let oldest_peer = &oldest_peer;
517
518 if oldest_peer == peer {
519 drop(namespace_peers);
522 tables
523 .namespace_peers
524 .remove(namespace, (oldest_nanos, oldest_peer))?;
525 tables.namespace_peers.insert(namespace, (nanos, peer))?;
526 } else {
527 let mut len = 1;
529 let mut prev_peer_nanos = None;
531
532 for result in namespace_peers {
533 len += 1;
534 let guard = result?;
535 let (peer_nanos, peer_bytes) = guard.value();
536 if prev_peer_nanos.is_none() && peer_bytes == peer {
537 prev_peer_nanos = Some(peer_nanos)
538 }
539 }
540
541 match prev_peer_nanos {
542 Some(prev_nanos) => {
543 tables
546 .namespace_peers
547 .remove(namespace, (prev_nanos, peer))?;
548 tables.namespace_peers.insert(namespace, (nanos, peer))?;
549 }
550 None => {
551 tables.namespace_peers.insert(namespace, (nanos, peer))?;
554 len += 1;
555 if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
556 tables
557 .namespace_peers
558 .remove(namespace, (oldest_nanos, oldest_peer))?;
559 }
560 }
561 }
562 }
563 }
564 }
565 Ok(())
566 })
567 }
568
569 pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
571 let tables = self.tables()?;
572 let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
573 for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
574 let (_nanos, &peer) = result?.value();
575 peers.push(peer);
576 }
577 if peers.is_empty() {
578 Ok(None)
579 } else {
580 Ok(Some(peers.into_iter()))
581 }
582 }
583
584 pub fn set_download_policy(
586 &mut self,
587 namespace: &NamespaceId,
588 policy: DownloadPolicy,
589 ) -> Result<()> {
590 self.modify(|tables| {
591 let namespace = namespace.as_bytes();
592
593 anyhow::ensure!(
595 tables.namespaces.get(&namespace)?.is_some(),
596 "document not created"
597 );
598
599 let value = postcard::to_stdvec(&policy)?;
600 tables.download_policy.insert(namespace, value.as_slice())?;
601 Ok(())
602 })
603 }
604
605 pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
607 let tables = self.tables()?;
608 let value = tables.download_policy.get(namespace.as_bytes())?;
609 Ok(match value {
610 None => DownloadPolicy::default(),
611 Some(value) => postcard::from_bytes(value.value())?,
612 })
613 }
614}
615
616impl PublicKeyStore for Store {
617 fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
618 self.pubkeys.public_key(id)
619 }
620}
621
622fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
623 Capability::from_raw(raw_kind, raw_bytes)
624}
625
626fn get_exact(
627 record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
628 namespace: NamespaceId,
629 author: AuthorId,
630 key: impl AsRef<[u8]>,
631 include_empty: bool,
632) -> Result<Option<SignedEntry>> {
633 let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
634 let record = record_table.get(id)?;
635 Ok(record
636 .map(|r| into_entry(id, r.value()))
637 .filter(|entry| include_empty || !entry.is_empty()))
638}
639
640#[derive(Debug)]
642pub struct StoreInstance<'a> {
643 namespace: NamespaceId,
644 pub(crate) store: &'a mut Store,
645}
646
647impl<'a> StoreInstance<'a> {
648 pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
649 StoreInstance { namespace, store }
650 }
651}
652
653impl PublicKeyStore for StoreInstance<'_> {
654 fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
655 self.store.public_key(id)
656 }
657}
658
659impl super::DownloadPolicyStore for StoreInstance<'_> {
660 fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
661 self.store.get_download_policy(namespace)
662 }
663}
664
665impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
666 type Error = anyhow::Error;
667 type RangeIterator<'x>
668 = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
669 where
670 'a: 'x;
671 type ParentIterator<'x>
672 = ParentIterator
673 where
674 'a: 'x;
675
676 fn get_first(&mut self) -> Result<RecordIdentifier> {
678 let tables = self.store.as_mut().tables()?;
679 let bounds = RecordsBounds::namespace(self.namespace);
681 let mut records = tables.records.range(bounds.as_ref())?;
682
683 let Some(record) = records.next() else {
684 return Ok(RecordIdentifier::default());
685 };
686 let (compound_key, _value) = record?;
687 let (namespace_id, author_id, key) = compound_key.value();
688 let id = RecordIdentifier::new(namespace_id, author_id, key);
689 Ok(id)
690 }
691
692 #[cfg(test)]
693 fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
694 self.store
695 .as_mut()
696 .get_exact(id.namespace(), id.author(), id.key(), true)
697 }
698
699 #[cfg(test)]
700 fn len(&mut self) -> Result<usize> {
701 let tables = self.store.as_mut().tables()?;
702 let bounds = RecordsBounds::namespace(self.namespace);
703 let records = tables.records.range(bounds.as_ref())?;
704 Ok(records.count())
705 }
706
707 #[cfg(test)]
708 fn is_empty(&mut self) -> Result<bool> {
709 use redb::ReadableTableMetadata;
710 let tables = self.store.as_mut().tables()?;
711 Ok(tables.records.is_empty()?)
712 }
713
714 fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
715 let elements = self.get_range(range.clone())?;
717
718 let mut fp = Fingerprint::empty();
719 for el in elements {
720 let el = el?;
721 fp ^= el.as_fingerprint();
722 }
723
724 Ok(fp)
725 }
726
727 fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
728 let id = e.id();
729 self.store.as_mut().modify(|tables| {
730 let key = (
732 &id.namespace().to_bytes(),
733 &id.author().to_bytes(),
734 id.key(),
735 );
736 let hash = e.content_hash(); let value = (
738 e.timestamp(),
739 &e.signature().namespace().to_bytes(),
740 &e.signature().author().to_bytes(),
741 e.content_len(),
742 hash.as_bytes(),
743 );
744 tables.records.insert(key, value)?;
745
746 let key = (
748 &id.namespace().to_bytes(),
749 id.key(),
750 &id.author().to_bytes(),
751 );
752 tables.records_by_key.insert(key, ())?;
753
754 let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
756 let value = (e.timestamp(), e.id().key());
757 tables.latest_per_author.insert(key, value)?;
758 Ok(())
759 })
760 }
761
762 fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
763 let tables = self.store.as_mut().tables()?;
764 let iter = match range.x().cmp(range.y()) {
765 Ordering::Equal => {
767 let bounds = RecordsBounds::namespace(self.namespace);
769 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
770 chain_none(iter)
771 }
772 Ordering::Less => {
774 let start = Bound::Included(range.x().to_byte_tuple());
776 let end = Bound::Excluded(range.y().to_byte_tuple());
777 let bounds = RecordsBounds::new(start, end);
778 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
779 chain_none(iter)
780 }
781 Ordering::Greater => {
783 let end = Bound::Excluded(range.y().to_byte_tuple());
785 let bounds = RecordsBounds::from_start(&self.namespace, end);
786 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
787
788 let start = Bound::Included(range.x().to_byte_tuple());
790 let bounds = RecordsBounds::to_end(&self.namespace, start);
791 let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
792
793 iter.chain(Some(iter2).into_iter().flatten())
794 }
795 };
796 Ok(iter)
797 }
798
799 #[cfg(test)]
800 fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
801 self.store.as_mut().modify(|tables| {
802 let entry = {
803 let (namespace, author, key) = id.as_byte_tuple();
804 let id = (namespace, key, author);
805 tables.records_by_key.remove(id)?;
806 let id = (namespace, author, key);
807 let value = tables.records.remove(id)?;
808 value.map(|value| into_entry(id, value.value()))
809 };
810 Ok(entry)
811 })
812 }
813
814 #[cfg(test)]
815 fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
816 let tables = self.store.as_mut().tables()?;
817 let bounds = RecordsBounds::namespace(self.namespace);
818 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
819 Ok(chain_none(iter))
820 }
821
822 fn prefixes_of(
823 &mut self,
824 id: &RecordIdentifier,
825 ) -> Result<Self::ParentIterator<'_>, Self::Error> {
826 let tables = self.store.as_mut().tables()?;
827 ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
828 }
829
830 #[cfg(test)]
831 fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
832 let tables = self.store.as_mut().tables()?;
833 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
834 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
835 Ok(chain_none(iter))
836 }
837
838 fn remove_prefix_filtered(
839 &mut self,
840 id: &RecordIdentifier,
841 predicate: impl Fn(&Record) -> bool,
842 ) -> Result<usize> {
843 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
844 self.store.as_mut().modify(|tables| {
845 let cb = |_k: RecordsId, v: RecordsValue| {
846 let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
847 let record = Record::new(hash.into(), len, timestamp);
848
849 predicate(&record)
850 };
851 let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
852 let count = iter.count();
853 Ok(count)
854 })
855 }
856}
857
858fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
859 iter: I,
860) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
861 iter.chain(None.into_iter().flatten())
862}
863
864#[derive(Debug)]
867pub struct ParentIterator {
868 inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
869}
870
871impl ParentIterator {
872 fn new(
873 tables: &Tables,
874 namespace: NamespaceId,
875 author: AuthorId,
876 key: Vec<u8>,
877 ) -> anyhow::Result<Self> {
878 let parents = parents(&tables.records, namespace, author, key.clone());
879 Ok(Self {
880 inner: parents.into_iter(),
881 })
882 }
883}
884
885fn parents(
886 table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
887 namespace: NamespaceId,
888 author: AuthorId,
889 mut key: Vec<u8>,
890) -> Vec<anyhow::Result<SignedEntry>> {
891 let mut res = Vec::new();
892
893 while !key.is_empty() {
894 let entry = get_exact(table, namespace, author, &key, false);
895 key.pop();
896 match entry {
897 Err(err) => res.push(Err(err)),
898 Ok(Some(entry)) => res.push(Ok(entry)),
899 Ok(None) => continue,
900 }
901 }
902 res.reverse();
903 res
904}
905
906impl Iterator for ParentIterator {
907 type Item = Result<SignedEntry>;
908
909 fn next(&mut self) -> Option<Self::Item> {
910 self.inner.next()
911 }
912}
913
914#[derive(derive_more::Debug)]
922pub struct ContentHashesIterator {
923 #[debug(skip)]
924 range: RecordsRange<'static>,
925}
926
927impl ContentHashesIterator {
928 pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
930 let range = RecordsRange::all_static(table)?;
931 Ok(Self { range })
932 }
933}
934
935impl Iterator for ContentHashesIterator {
936 type Item = Result<Hash>;
937
938 fn next(&mut self) -> Option<Self::Item> {
939 let v = self.range.next()?;
940 Some(v.map(|e| e.content_hash()))
941 }
942}
943
944#[derive(derive_more::Debug)]
946#[debug("LatestIterator")]
947pub struct LatestIterator<'a>(
948 redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
949);
950
951impl<'a> LatestIterator<'a> {
952 fn new(
953 latest_per_author: &'a impl ReadableTable<
954 LatestPerAuthorKey<'static>,
955 LatestPerAuthorValue<'static>,
956 >,
957 namespace: NamespaceId,
958 ) -> anyhow::Result<Self> {
959 let start = (namespace.as_bytes(), &[u8::MIN; 32]);
960 let end = (namespace.as_bytes(), &[u8::MAX; 32]);
961 let range = latest_per_author.range(start..=end)?;
962 Ok(Self(range))
963 }
964}
965
966impl Iterator for LatestIterator<'_> {
967 type Item = Result<(AuthorId, u64, Vec<u8>)>;
968
969 fn next(&mut self) -> Option<Self::Item> {
970 self.0.next_map(|key, value| {
971 let (_namespace, author) = key;
972 let (timestamp, key) = value;
973 (author.into(), timestamp, key.to_vec())
974 })
975 }
976}
977
978fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
979 let (namespace, author, key) = key;
980 let (timestamp, namespace_sig, author_sig, len, hash) = value;
981 let id = RecordIdentifier::new(namespace, author, key);
982 let record = Record::new(hash.into(), len, timestamp);
983 let entry = Entry::new(id, record);
984 let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
985 SignedEntry::new(entry_signature, entry)
986}
987
988#[cfg(all(test, feature = "fs-store"))]
989mod tests {
990 use super::*;
991 use crate::ranger::Store as _;
992
993 #[tokio::test]
994 async fn test_ranges() -> Result<()> {
995 let dbfile = tempfile::NamedTempFile::new()?;
996 let mut store = Store::persistent(dbfile.path())?;
997
998 let author = store.new_author(&mut rand::rng())?;
999 let namespace = NamespaceSecret::new(&mut rand::rng());
1000 let mut replica = store.new_replica(namespace.clone())?;
1001
1002 let key1 = vec![255, 255];
1004 let key2 = vec![255, 255, 255];
1005 replica.hash_and_insert(&key1, &author, b"v1").await?;
1006 replica.hash_and_insert(&key2, &author, b"v2").await?;
1007 let res = store
1008 .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
1009 .collect::<Result<Vec<_>>>()?;
1010 assert_eq!(res.len(), 2);
1011 assert_eq!(
1012 res.into_iter()
1013 .map(|entry| entry.key().to_vec())
1014 .collect::<Vec<_>>(),
1015 vec![key1, key2]
1016 );
1017 Ok(())
1018 }
1019
1020 #[test]
1021 fn test_basics() -> Result<()> {
1022 let dbfile = tempfile::NamedTempFile::new()?;
1023 let mut store = Store::persistent(dbfile.path())?;
1024
1025 let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1026 assert!(authors.is_empty());
1027
1028 let author = store.new_author(&mut rand::rng())?;
1029 let namespace = NamespaceSecret::new(&mut rand::rng());
1030 let _replica = store.new_replica(namespace.clone())?;
1031 store.close_replica(namespace.id());
1032 let replica = store.load_replica_info(&namespace.id())?;
1033 assert_eq!(replica.capability.id(), namespace.id());
1034
1035 let author_back = store.get_author(&author.id())?.unwrap();
1036 assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1037
1038 let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1039 for i in 0..5 {
1040 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1041 let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1042 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1043 wrapper.entry_put(entry)?;
1044 }
1045
1046 let all: Vec<_> = wrapper.all()?.collect();
1048 assert_eq!(all.len(), 5);
1049
1050 let mut ids = Vec::new();
1052 for i in 0..5 {
1053 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1054 let entry = Entry::new(
1055 id.clone(),
1056 Record::current_from_data(format!("world-{i}-2")),
1057 );
1058 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1059 wrapper.entry_put(entry)?;
1060 ids.push(id);
1061 }
1062
1063 let entries = wrapper
1065 .store
1066 .get_many(namespace.id(), Query::all())?
1067 .collect::<Result<Vec<_>>>()?;
1068 assert_eq!(entries.len(), 5);
1069
1070 let entries = wrapper
1072 .store
1073 .get_many(namespace.id(), Query::key_prefix("hello-"))?
1074 .collect::<Result<Vec<_>>>()?;
1075 assert_eq!(entries.len(), 5);
1076
1077 for id in ids {
1079 let res = wrapper.get(&id)?;
1080 assert!(res.is_some());
1081 let out = wrapper.entry_remove(&id)?.unwrap();
1082 assert_eq!(out.entry().id(), &id);
1083 let res = wrapper.get(&id)?;
1084 assert!(res.is_none());
1085 }
1086
1087 let entries = wrapper
1089 .store
1090 .get_many(namespace.id(), Query::all())?
1091 .collect::<Result<Vec<_>>>()?;
1092 assert_eq!(entries.len(), 0);
1093
1094 Ok(())
1095 }
1096
1097 fn copy_and_modify(
1098 source: &std::path::Path,
1099 modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1100 ) -> Result<tempfile::NamedTempFile> {
1101 let dbfile = tempfile::NamedTempFile::new()?;
1102 std::fs::copy(source, dbfile.path())?;
1103 let db = Database::create(dbfile.path())?;
1104 let write_tx = db.begin_write()?;
1105 modify(&write_tx)?;
1106 write_tx.commit()?;
1107 drop(db);
1108 Ok(dbfile)
1109 }
1110
1111 #[tokio::test]
1112 #[cfg(feature = "fs-store")]
1113 async fn test_migration_001_populate_latest_table() -> Result<()> {
1114 use super::tables::LATEST_PER_AUTHOR_TABLE;
1115 let dbfile = tempfile::NamedTempFile::new()?;
1116 let namespace = NamespaceSecret::new(&mut rand::rng());
1117
1118 let expected = {
1120 let mut store = Store::persistent(dbfile.path())?;
1121 let author1 = store.new_author(&mut rand::rng())?;
1122 let author2 = store.new_author(&mut rand::rng())?;
1123 let mut replica = store.new_replica(namespace.clone())?;
1124 replica.hash_and_insert(b"k1", &author1, b"v1").await?;
1125 replica.hash_and_insert(b"k2", &author2, b"v1").await?;
1126 replica.hash_and_insert(b"k3", &author1, b"v1").await?;
1127
1128 let expected = store
1129 .get_latest_for_each_author(namespace.id())?
1130 .collect::<Result<Vec<_>>>()?;
1131 store.close_replica(namespace.id());
1133 store.flush()?;
1135 drop(store);
1136 expected
1137 };
1138 assert_eq!(expected.len(), 2);
1139
1140 let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1142 tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1143 Ok(())
1144 })?;
1145
1146 let mut store = Store::persistent(dbfile_before_migration.path())?;
1148 let actual = store
1149 .get_latest_for_each_author(namespace.id())?
1150 .collect::<Result<Vec<_>>>()?;
1151
1152 assert_eq!(expected, actual);
1153 Ok(())
1154 }
1155
1156 #[test]
1157 #[cfg(feature = "fs-store")]
1158 fn test_migration_004_populate_by_key_index() -> Result<()> {
1159 use redb::ReadableTableMetadata;
1160 let dbfile = tempfile::NamedTempFile::new()?;
1161
1162 let mut store = Store::persistent(dbfile.path())?;
1163
1164 {
1166 let tables = store.tables()?;
1167 assert_eq!(tables.records_by_key.len()?, 0);
1168 }
1169
1170 Ok(())
1172 }
1173}