1use std::{
4 cmp::Ordering,
5 collections::HashSet,
6 iter::{Chain, Flatten},
7 num::NonZeroU64,
8 ops::Bound,
9 path::Path,
10};
11
12use anyhow::{anyhow, Result};
13use ed25519_dalek::{SignatureError, VerifyingKey};
14use iroh_blobs::Hash;
15use rand_core::CryptoRngCore;
16use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable};
17use tracing::{info, warn};
18
19use super::{
20 pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21 Query,
22};
23use crate::{
24 actor::MAX_COMMIT_DELAY,
25 keys::Author,
26 ranger::{Fingerprint, Range, RangeEntry},
27 sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28 AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29 ReplicaInfo,
30};
31
32mod bounds;
33mod migrations;
34mod query;
35mod ranges;
36pub(crate) mod tables;
37
38pub use self::ranges::RecordsRange;
39use self::{
40 bounds::{ByKeyBounds, RecordsBounds},
41 query::QueryIterator,
42 ranges::RangeExt,
43 tables::{
44 LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
45 RecordsValue, Tables, TransactionAndTables,
46 },
47};
48
49#[derive(Debug)]
51pub struct Store {
52 db: Database,
53 transaction: CurrentTransaction,
54 open_replicas: HashSet<NamespaceId>,
55 pubkeys: MemPublicKeyStore,
56}
57
58impl Drop for Store {
59 fn drop(&mut self) {
60 if let Err(err) = self.flush() {
61 warn!("failed to trigger final flush: {:?}", err);
62 }
63 }
64}
65
66impl AsRef<Store> for Store {
67 fn as_ref(&self) -> &Store {
68 self
69 }
70}
71
72impl AsMut<Store> for Store {
73 fn as_mut(&mut self) -> &mut Store {
74 self
75 }
76}
77
78#[derive(derive_more::Debug, Default)]
79#[allow(clippy::large_enum_variant)]
80enum CurrentTransaction {
81 #[default]
82 None,
83 Read(ReadOnlyTables),
84 Write(TransactionAndTables),
85}
86
87impl Store {
88 pub fn memory() -> Self {
90 Self::memory_impl().expect("failed to create memory store")
91 }
92
93 fn memory_impl() -> Result<Self> {
94 let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
95 Self::new_impl(db)
96 }
97
98 pub fn persistent(path: impl AsRef<Path>) -> Result<Self> {
102 let mut db = match Database::create(&path) {
103 Ok(db) => db,
104 Err(DatabaseError::UpgradeRequired(1)) => return Err(
105 anyhow!("Opening the database failed: Upgrading from old format is no longer supported. Use iroh-docs 0.92 to perform the upgrade, then upgrade to the latest release again.")
106 ),
107 Err(err) => return Err(err.into()),
108 };
109 match db.upgrade() {
110 Ok(true) => info!("Database was upgraded to redb v3 compatible format"),
111 Ok(false) => {}
112 Err(err) => warn!("Database upgrade to redb v3 compatible format failed: {err:#}"),
113 }
114 Self::new_impl(db)
115 }
116
117 fn new_impl(db: redb::Database) -> Result<Self> {
118 let write_tx = db.begin_write()?;
120 let _ = Tables::new(&write_tx)?;
121 write_tx.commit()?;
122
123 migrations::run_migrations(&db)?;
125
126 Ok(Store {
127 db,
128 transaction: Default::default(),
129 open_replicas: Default::default(),
130 pubkeys: Default::default(),
131 })
132 }
133
134 pub fn flush(&mut self) -> Result<()> {
138 if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
139 w.commit()?;
140 }
141 Ok(())
142 }
143
144 pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
149 let guard = &mut self.transaction;
150 let tables = match std::mem::take(guard) {
151 CurrentTransaction::None => {
152 let tx = self.db.begin_read()?;
153 ReadOnlyTables::new(tx)?
154 }
155 CurrentTransaction::Write(w) => {
156 w.commit()?;
157 let tx = self.db.begin_read()?;
158 ReadOnlyTables::new(tx)?
159 }
160 CurrentTransaction::Read(tables) => tables,
161 };
162 *guard = CurrentTransaction::Read(tables);
163 match &*guard {
164 CurrentTransaction::Read(ref tables) => Ok(tables),
165 _ => unreachable!(),
166 }
167 }
168
169 pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
177 self.flush()?;
179 assert!(matches!(self.transaction, CurrentTransaction::None));
180 let tx = self.db.begin_read()?;
181 let tables = ReadOnlyTables::new(tx)?;
182 Ok(tables)
183 }
184
185 fn tables(&mut self) -> Result<&Tables<'_>> {
196 let guard = &mut self.transaction;
197 let tables = match std::mem::take(guard) {
198 CurrentTransaction::None => {
199 let tx = self.db.begin_write()?;
200 TransactionAndTables::new(tx)?
201 }
202 CurrentTransaction::Write(w) => {
203 if w.since.elapsed() > MAX_COMMIT_DELAY {
204 tracing::debug!("committing transaction because it's too old");
205 w.commit()?;
206 let tx = self.db.begin_write()?;
207 TransactionAndTables::new(tx)?
208 } else {
209 w
210 }
211 }
212 CurrentTransaction::Read(_) => {
213 let tx = self.db.begin_write()?;
214 TransactionAndTables::new(tx)?
215 }
216 };
217 *guard = CurrentTransaction::Write(tables);
218 match guard {
219 CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
220 _ => unreachable!(),
221 }
222 }
223
224 fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
233 let guard = &mut self.transaction;
234 let tables = match std::mem::take(guard) {
235 CurrentTransaction::None => {
236 let tx = self.db.begin_write()?;
237 TransactionAndTables::new(tx)?
238 }
239 CurrentTransaction::Write(w) => {
240 if w.since.elapsed() > MAX_COMMIT_DELAY {
241 tracing::debug!("committing transaction because it's too old");
242 w.commit()?;
243 let tx = self.db.begin_write()?;
244 TransactionAndTables::new(tx)?
245 } else {
246 w
247 }
248 }
249 CurrentTransaction::Read(_) => {
250 let tx = self.db.begin_write()?;
251 TransactionAndTables::new(tx)?
252 }
253 };
254 *guard = CurrentTransaction::Write(tables);
255 let res = match &mut *guard {
256 CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
257 _ => unreachable!(),
258 };
259 Ok(res)
260 }
261}
262
263type PeersIter = std::vec::IntoIter<PeerIdBytes>;
264
265impl Store {
266 pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
268 let id = namespace.id();
269 self.import_namespace(namespace.into())?;
270 self.open_replica(&id).map_err(Into::into)
271 }
272
273 pub fn new_author<R: CryptoRngCore + ?Sized>(&mut self, rng: &mut R) -> Result<Author> {
275 let author = Author::new(rng);
276 self.import_author(author.clone())?;
277 Ok(author)
278 }
279
280 pub fn has_news_for_us(
284 &mut self,
285 namespace: NamespaceId,
286 heads: &AuthorHeads,
287 ) -> Result<Option<NonZeroU64>> {
288 let our_heads = {
289 let latest = self.get_latest_for_each_author(namespace)?;
290 let mut heads = AuthorHeads::default();
291 for e in latest {
292 let (author, timestamp, _key) = e?;
293 heads.insert(author, timestamp);
294 }
295 heads
296 };
297 let has_news_for_us = heads.has_news_for(&our_heads);
298 Ok(has_news_for_us)
299 }
300
301 pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
305 let info = self.load_replica_info(namespace_id)?;
306 let instance = StoreInstance::new(*namespace_id, self);
307 Ok(Replica::new(instance, Box::new(info)))
308 }
309
310 pub fn load_replica_info(
312 &mut self,
313 namespace_id: &NamespaceId,
314 ) -> Result<ReplicaInfo, OpenError> {
315 let tables = self.tables()?;
316 let info = match tables.namespaces.get(namespace_id.as_bytes()) {
317 Ok(Some(db_value)) => {
318 let (raw_kind, raw_bytes) = db_value.value();
319 let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
320 ReplicaInfo::new(namespace)
321 }
322 Ok(None) => return Err(OpenError::NotFound),
323 Err(err) => return Err(OpenError::Other(err.into())),
324 };
325 self.open_replicas.insert(info.capability.id());
326 Ok(info)
327 }
328
329 pub fn close_replica(&mut self, id: NamespaceId) {
331 self.open_replicas.remove(&id);
332 }
333
334 pub fn list_namespaces(
336 &mut self,
337 ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
338 let snapshot = self.snapshot()?;
339 let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
340 let iter = iter.map(|res| {
341 let capability = parse_capability(res?.1.value())?;
342 Ok((capability.id(), capability.kind()))
343 });
344 Ok(iter)
345 }
346
347 pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
349 let tables = self.tables()?;
350 let Some(author) = tables.authors.get(author_id.as_bytes())? else {
351 return Ok(None);
352 };
353 let author = Author::from_bytes(author.value());
354 Ok(Some(author))
355 }
356
357 pub fn import_author(&mut self, author: Author) -> Result<()> {
359 self.modify(|tables| {
360 tables
361 .authors
362 .insert(author.id().as_bytes(), &author.to_bytes())?;
363 Ok(())
364 })
365 }
366
367 pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
369 self.modify(|tables| {
370 tables.authors.remove(author.as_bytes())?;
371 Ok(())
372 })
373 }
374
375 pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
377 let tables = self.snapshot()?;
378 let iter = tables
379 .authors
380 .range::<&'static [u8; 32]>(..)?
381 .map(|res| match res {
382 Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
383 Err(err) => Err(err.into()),
384 });
385 Ok(iter)
386 }
387
388 pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
390 self.modify(|tables| {
391 let outcome = {
392 let (capability, outcome) = {
393 let existing = tables.namespaces.get(capability.id().as_bytes())?;
394 if let Some(existing) = existing {
395 let mut existing = parse_capability(existing.value())?;
396 let outcome = if existing.merge(capability)? {
397 ImportNamespaceOutcome::Upgraded
398 } else {
399 ImportNamespaceOutcome::NoChange
400 };
401 (existing, outcome)
402 } else {
403 (capability, ImportNamespaceOutcome::Inserted)
404 }
405 };
406 let id = capability.id().to_bytes();
407 let (kind, bytes) = capability.raw();
408 tables.namespaces.insert(&id, (kind, &bytes))?;
409 outcome
410 };
411 Ok(outcome)
412 })
413 }
414
415 pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
423 if self.open_replicas.contains(namespace) {
424 return Err(anyhow!("replica is not closed"));
425 }
426 self.modify(|tables| {
427 let bounds = RecordsBounds::namespace(*namespace);
428 tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
429 let bounds = ByKeyBounds::namespace(*namespace);
430 let _ = tables
431 .records_by_key
432 .retain_in(bounds.as_ref(), |_k, _v| false);
433 tables.namespaces.remove(namespace.as_bytes())?;
434 tables.namespace_peers.remove_all(namespace.as_bytes())?;
435 tables.download_policy.remove(namespace.as_bytes())?;
436 Ok(())
437 })
438 }
439
440 pub fn get_many(
442 &mut self,
443 namespace: NamespaceId,
444 query: impl Into<Query>,
445 ) -> Result<QueryIterator> {
446 let tables = self.snapshot_owned()?;
447 QueryIterator::new(tables, namespace, query.into())
448 }
449
450 pub fn get_exact(
452 &mut self,
453 namespace: NamespaceId,
454 author: AuthorId,
455 key: impl AsRef<[u8]>,
456 include_empty: bool,
457 ) -> Result<Option<SignedEntry>> {
458 get_exact(
459 &self.tables()?.records,
460 namespace,
461 author,
462 key,
463 include_empty,
464 )
465 }
466
467 pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
469 let tables = self.snapshot_owned()?;
470 ContentHashesIterator::all(&tables.records)
471 }
472
473 pub fn get_latest_for_each_author(
475 &mut self,
476 namespace: NamespaceId,
477 ) -> Result<LatestIterator<'_>> {
478 LatestIterator::new(&self.tables()?.latest_per_author, namespace)
479 }
480
481 pub fn register_useful_peer(
483 &mut self,
484 namespace: NamespaceId,
485 peer: crate::PeerIdBytes,
486 ) -> Result<()> {
487 let peer = &peer;
488 let namespace = namespace.as_bytes();
489 let nanos = std::time::UNIX_EPOCH
491 .elapsed()
492 .map(|duration| duration.as_nanos() as u64)?;
493 self.modify(|tables| {
494 anyhow::ensure!(
496 tables.namespaces.get(namespace)?.is_some(),
497 "document not created"
498 );
499
500 let mut namespace_peers = tables.namespace_peers.get(namespace)?;
501
502 let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
504 let (oldest_nanos, &oldest_peer) = guard.value();
505 (oldest_nanos, oldest_peer)
506 });
507 match maybe_oldest {
508 None => {
509 drop(namespace_peers);
512 tables.namespace_peers.insert(namespace, (nanos, peer))?;
513 }
514 Some((oldest_nanos, oldest_peer)) => {
515 let oldest_peer = &oldest_peer;
516
517 if oldest_peer == peer {
518 drop(namespace_peers);
521 tables
522 .namespace_peers
523 .remove(namespace, (oldest_nanos, oldest_peer))?;
524 tables.namespace_peers.insert(namespace, (nanos, peer))?;
525 } else {
526 let mut len = 1;
528 let mut prev_peer_nanos = None;
530
531 for result in namespace_peers {
532 len += 1;
533 let guard = result?;
534 let (peer_nanos, peer_bytes) = guard.value();
535 if prev_peer_nanos.is_none() && peer_bytes == peer {
536 prev_peer_nanos = Some(peer_nanos)
537 }
538 }
539
540 match prev_peer_nanos {
541 Some(prev_nanos) => {
542 tables
545 .namespace_peers
546 .remove(namespace, (prev_nanos, peer))?;
547 tables.namespace_peers.insert(namespace, (nanos, peer))?;
548 }
549 None => {
550 tables.namespace_peers.insert(namespace, (nanos, peer))?;
553 len += 1;
554 if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
555 tables
556 .namespace_peers
557 .remove(namespace, (oldest_nanos, oldest_peer))?;
558 }
559 }
560 }
561 }
562 }
563 }
564 Ok(())
565 })
566 }
567
568 pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
570 let tables = self.tables()?;
571 let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
572 for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
573 let (_nanos, &peer) = result?.value();
574 peers.push(peer);
575 }
576 if peers.is_empty() {
577 Ok(None)
578 } else {
579 Ok(Some(peers.into_iter()))
580 }
581 }
582
583 pub fn set_download_policy(
585 &mut self,
586 namespace: &NamespaceId,
587 policy: DownloadPolicy,
588 ) -> Result<()> {
589 self.modify(|tables| {
590 let namespace = namespace.as_bytes();
591
592 anyhow::ensure!(
594 tables.namespaces.get(&namespace)?.is_some(),
595 "document not created"
596 );
597
598 let value = postcard::to_stdvec(&policy)?;
599 tables.download_policy.insert(namespace, value.as_slice())?;
600 Ok(())
601 })
602 }
603
604 pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
606 let tables = self.tables()?;
607 let value = tables.download_policy.get(namespace.as_bytes())?;
608 Ok(match value {
609 None => DownloadPolicy::default(),
610 Some(value) => postcard::from_bytes(value.value())?,
611 })
612 }
613}
614
615impl PublicKeyStore for Store {
616 fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
617 self.pubkeys.public_key(id)
618 }
619}
620
621fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
622 Capability::from_raw(raw_kind, raw_bytes)
623}
624
625fn get_exact(
626 record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
627 namespace: NamespaceId,
628 author: AuthorId,
629 key: impl AsRef<[u8]>,
630 include_empty: bool,
631) -> Result<Option<SignedEntry>> {
632 let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
633 let record = record_table.get(id)?;
634 Ok(record
635 .map(|r| into_entry(id, r.value()))
636 .filter(|entry| include_empty || !entry.is_empty()))
637}
638
639#[derive(Debug)]
641pub struct StoreInstance<'a> {
642 namespace: NamespaceId,
643 pub(crate) store: &'a mut Store,
644}
645
646impl<'a> StoreInstance<'a> {
647 pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
648 StoreInstance { namespace, store }
649 }
650}
651
652impl PublicKeyStore for StoreInstance<'_> {
653 fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
654 self.store.public_key(id)
655 }
656}
657
658impl super::DownloadPolicyStore for StoreInstance<'_> {
659 fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
660 self.store.get_download_policy(namespace)
661 }
662}
663
664impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
665 type Error = anyhow::Error;
666 type RangeIterator<'x>
667 = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
668 where
669 'a: 'x;
670 type ParentIterator<'x>
671 = ParentIterator
672 where
673 'a: 'x;
674
675 fn get_first(&mut self) -> Result<RecordIdentifier> {
677 let tables = self.store.as_mut().tables()?;
678 let bounds = RecordsBounds::namespace(self.namespace);
680 let mut records = tables.records.range(bounds.as_ref())?;
681
682 let Some(record) = records.next() else {
683 return Ok(RecordIdentifier::default());
684 };
685 let (compound_key, _value) = record?;
686 let (namespace_id, author_id, key) = compound_key.value();
687 let id = RecordIdentifier::new(namespace_id, author_id, key);
688 Ok(id)
689 }
690
691 #[cfg(test)]
692 fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
693 self.store
694 .as_mut()
695 .get_exact(id.namespace(), id.author(), id.key(), true)
696 }
697
698 #[cfg(test)]
699 fn len(&mut self) -> Result<usize> {
700 let tables = self.store.as_mut().tables()?;
701 let bounds = RecordsBounds::namespace(self.namespace);
702 let records = tables.records.range(bounds.as_ref())?;
703 Ok(records.count())
704 }
705
706 #[cfg(test)]
707 fn is_empty(&mut self) -> Result<bool> {
708 use redb::ReadableTableMetadata;
709 let tables = self.store.as_mut().tables()?;
710 Ok(tables.records.is_empty()?)
711 }
712
713 fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
714 let elements = self.get_range(range.clone())?;
716
717 let mut fp = Fingerprint::empty();
718 for el in elements {
719 let el = el?;
720 fp ^= el.as_fingerprint();
721 }
722
723 Ok(fp)
724 }
725
726 fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
727 let id = e.id();
728 self.store.as_mut().modify(|tables| {
729 let key = (
731 &id.namespace().to_bytes(),
732 &id.author().to_bytes(),
733 id.key(),
734 );
735 let hash = e.content_hash(); let value = (
737 e.timestamp(),
738 &e.signature().namespace().to_bytes(),
739 &e.signature().author().to_bytes(),
740 e.content_len(),
741 hash.as_bytes(),
742 );
743 tables.records.insert(key, value)?;
744
745 let key = (
747 &id.namespace().to_bytes(),
748 id.key(),
749 &id.author().to_bytes(),
750 );
751 tables.records_by_key.insert(key, ())?;
752
753 let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
755 let value = (e.timestamp(), e.id().key());
756 tables.latest_per_author.insert(key, value)?;
757 Ok(())
758 })
759 }
760
761 fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
762 let tables = self.store.as_mut().tables()?;
763 let iter = match range.x().cmp(range.y()) {
764 Ordering::Equal => {
766 let bounds = RecordsBounds::namespace(self.namespace);
768 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
769 chain_none(iter)
770 }
771 Ordering::Less => {
773 let start = Bound::Included(range.x().to_byte_tuple());
775 let end = Bound::Excluded(range.y().to_byte_tuple());
776 let bounds = RecordsBounds::new(start, end);
777 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
778 chain_none(iter)
779 }
780 Ordering::Greater => {
782 let end = Bound::Excluded(range.y().to_byte_tuple());
784 let bounds = RecordsBounds::from_start(&self.namespace, end);
785 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
786
787 let start = Bound::Included(range.x().to_byte_tuple());
789 let bounds = RecordsBounds::to_end(&self.namespace, start);
790 let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
791
792 iter.chain(Some(iter2).into_iter().flatten())
793 }
794 };
795 Ok(iter)
796 }
797
798 #[cfg(test)]
799 fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
800 self.store.as_mut().modify(|tables| {
801 let entry = {
802 let (namespace, author, key) = id.as_byte_tuple();
803 let id = (namespace, key, author);
804 tables.records_by_key.remove(id)?;
805 let id = (namespace, author, key);
806 let value = tables.records.remove(id)?;
807 value.map(|value| into_entry(id, value.value()))
808 };
809 Ok(entry)
810 })
811 }
812
813 #[cfg(test)]
814 fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
815 let tables = self.store.as_mut().tables()?;
816 let bounds = RecordsBounds::namespace(self.namespace);
817 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
818 Ok(chain_none(iter))
819 }
820
821 fn prefixes_of(
822 &mut self,
823 id: &RecordIdentifier,
824 ) -> Result<Self::ParentIterator<'_>, Self::Error> {
825 let tables = self.store.as_mut().tables()?;
826 ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
827 }
828
829 #[cfg(test)]
830 fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
831 let tables = self.store.as_mut().tables()?;
832 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
833 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
834 Ok(chain_none(iter))
835 }
836
837 fn remove_prefix_filtered(
838 &mut self,
839 id: &RecordIdentifier,
840 predicate: impl Fn(&Record) -> bool,
841 ) -> Result<usize> {
842 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
843 self.store.as_mut().modify(|tables| {
844 let cb = |_k: RecordsId, v: RecordsValue| {
845 let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
846 let record = Record::new(hash.into(), len, timestamp);
847
848 predicate(&record)
849 };
850 let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
851 let count = iter.count();
852 Ok(count)
853 })
854 }
855}
856
857fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
858 iter: I,
859) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
860 iter.chain(None.into_iter().flatten())
861}
862
863#[derive(Debug)]
866pub struct ParentIterator {
867 inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
868}
869
870impl ParentIterator {
871 fn new(
872 tables: &Tables,
873 namespace: NamespaceId,
874 author: AuthorId,
875 key: Vec<u8>,
876 ) -> anyhow::Result<Self> {
877 let parents = parents(&tables.records, namespace, author, key.clone());
878 Ok(Self {
879 inner: parents.into_iter(),
880 })
881 }
882}
883
884fn parents(
885 table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
886 namespace: NamespaceId,
887 author: AuthorId,
888 mut key: Vec<u8>,
889) -> Vec<anyhow::Result<SignedEntry>> {
890 let mut res = Vec::new();
891
892 while !key.is_empty() {
893 let entry = get_exact(table, namespace, author, &key, false);
894 key.pop();
895 match entry {
896 Err(err) => res.push(Err(err)),
897 Ok(Some(entry)) => res.push(Ok(entry)),
898 Ok(None) => continue,
899 }
900 }
901 res.reverse();
902 res
903}
904
905impl Iterator for ParentIterator {
906 type Item = Result<SignedEntry>;
907
908 fn next(&mut self) -> Option<Self::Item> {
909 self.inner.next()
910 }
911}
912
913#[derive(derive_more::Debug)]
921pub struct ContentHashesIterator {
922 #[debug(skip)]
923 range: RecordsRange<'static>,
924}
925
926impl ContentHashesIterator {
927 pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
929 let range = RecordsRange::all_static(table)?;
930 Ok(Self { range })
931 }
932}
933
934impl Iterator for ContentHashesIterator {
935 type Item = Result<Hash>;
936
937 fn next(&mut self) -> Option<Self::Item> {
938 let v = self.range.next()?;
939 Some(v.map(|e| e.content_hash()))
940 }
941}
942
943#[derive(derive_more::Debug)]
945#[debug("LatestIterator")]
946pub struct LatestIterator<'a>(
947 redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
948);
949
950impl<'a> LatestIterator<'a> {
951 fn new(
952 latest_per_author: &'a impl ReadableTable<
953 LatestPerAuthorKey<'static>,
954 LatestPerAuthorValue<'static>,
955 >,
956 namespace: NamespaceId,
957 ) -> anyhow::Result<Self> {
958 let start = (namespace.as_bytes(), &[u8::MIN; 32]);
959 let end = (namespace.as_bytes(), &[u8::MAX; 32]);
960 let range = latest_per_author.range(start..=end)?;
961 Ok(Self(range))
962 }
963}
964
965impl Iterator for LatestIterator<'_> {
966 type Item = Result<(AuthorId, u64, Vec<u8>)>;
967
968 fn next(&mut self) -> Option<Self::Item> {
969 self.0.next_map(|key, value| {
970 let (_namespace, author) = key;
971 let (timestamp, key) = value;
972 (author.into(), timestamp, key.to_vec())
973 })
974 }
975}
976
977fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
978 let (namespace, author, key) = key;
979 let (timestamp, namespace_sig, author_sig, len, hash) = value;
980 let id = RecordIdentifier::new(namespace, author, key);
981 let record = Record::new(hash.into(), len, timestamp);
982 let entry = Entry::new(id, record);
983 let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
984 SignedEntry::new(entry_signature, entry)
985}
986
987#[cfg(test)]
988mod tests {
989 use super::{tables::LATEST_PER_AUTHOR_TABLE, *};
990 use crate::ranger::Store as _;
991
992 #[test]
993 fn test_ranges() -> Result<()> {
994 let dbfile = tempfile::NamedTempFile::new()?;
995 let mut store = Store::persistent(dbfile.path())?;
996
997 let author = store.new_author(&mut rand::thread_rng())?;
998 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
999 let mut replica = store.new_replica(namespace.clone())?;
1000
1001 let key1 = vec![255, 255];
1003 let key2 = vec![255, 255, 255];
1004 replica.hash_and_insert(&key1, &author, b"v1")?;
1005 replica.hash_and_insert(&key2, &author, b"v2")?;
1006 let res = store
1007 .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
1008 .collect::<Result<Vec<_>>>()?;
1009 assert_eq!(res.len(), 2);
1010 assert_eq!(
1011 res.into_iter()
1012 .map(|entry| entry.key().to_vec())
1013 .collect::<Vec<_>>(),
1014 vec![key1, key2]
1015 );
1016 Ok(())
1017 }
1018
1019 #[test]
1020 fn test_basics() -> Result<()> {
1021 let dbfile = tempfile::NamedTempFile::new()?;
1022 let mut store = Store::persistent(dbfile.path())?;
1023
1024 let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1025 assert!(authors.is_empty());
1026
1027 let author = store.new_author(&mut rand::thread_rng())?;
1028 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1029 let _replica = store.new_replica(namespace.clone())?;
1030 store.close_replica(namespace.id());
1031 let replica = store.load_replica_info(&namespace.id())?;
1032 assert_eq!(replica.capability.id(), namespace.id());
1033
1034 let author_back = store.get_author(&author.id())?.unwrap();
1035 assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1036
1037 let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1038 for i in 0..5 {
1039 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1040 let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1041 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1042 wrapper.entry_put(entry)?;
1043 }
1044
1045 let all: Vec<_> = wrapper.all()?.collect();
1047 assert_eq!(all.len(), 5);
1048
1049 let mut ids = Vec::new();
1051 for i in 0..5 {
1052 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1053 let entry = Entry::new(
1054 id.clone(),
1055 Record::current_from_data(format!("world-{i}-2")),
1056 );
1057 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1058 wrapper.entry_put(entry)?;
1059 ids.push(id);
1060 }
1061
1062 let entries = wrapper
1064 .store
1065 .get_many(namespace.id(), Query::all())?
1066 .collect::<Result<Vec<_>>>()?;
1067 assert_eq!(entries.len(), 5);
1068
1069 let entries = wrapper
1071 .store
1072 .get_many(namespace.id(), Query::key_prefix("hello-"))?
1073 .collect::<Result<Vec<_>>>()?;
1074 assert_eq!(entries.len(), 5);
1075
1076 for id in ids {
1078 let res = wrapper.get(&id)?;
1079 assert!(res.is_some());
1080 let out = wrapper.entry_remove(&id)?.unwrap();
1081 assert_eq!(out.entry().id(), &id);
1082 let res = wrapper.get(&id)?;
1083 assert!(res.is_none());
1084 }
1085
1086 let entries = wrapper
1088 .store
1089 .get_many(namespace.id(), Query::all())?
1090 .collect::<Result<Vec<_>>>()?;
1091 assert_eq!(entries.len(), 0);
1092
1093 Ok(())
1094 }
1095
1096 fn copy_and_modify(
1097 source: &Path,
1098 modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1099 ) -> Result<tempfile::NamedTempFile> {
1100 let dbfile = tempfile::NamedTempFile::new()?;
1101 std::fs::copy(source, dbfile.path())?;
1102 let db = Database::create(dbfile.path())?;
1103 let write_tx = db.begin_write()?;
1104 modify(&write_tx)?;
1105 write_tx.commit()?;
1106 drop(db);
1107 Ok(dbfile)
1108 }
1109
1110 #[test]
1111 fn test_migration_001_populate_latest_table() -> Result<()> {
1112 let dbfile = tempfile::NamedTempFile::new()?;
1113 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1114
1115 let expected = {
1117 let mut store = Store::persistent(dbfile.path())?;
1118 let author1 = store.new_author(&mut rand::thread_rng())?;
1119 let author2 = store.new_author(&mut rand::thread_rng())?;
1120 let mut replica = store.new_replica(namespace.clone())?;
1121 replica.hash_and_insert(b"k1", &author1, b"v1")?;
1122 replica.hash_and_insert(b"k2", &author2, b"v1")?;
1123 replica.hash_and_insert(b"k3", &author1, b"v1")?;
1124
1125 let expected = store
1126 .get_latest_for_each_author(namespace.id())?
1127 .collect::<Result<Vec<_>>>()?;
1128 store.close_replica(namespace.id());
1130 store.flush()?;
1132 drop(store);
1133 expected
1134 };
1135 assert_eq!(expected.len(), 2);
1136
1137 let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1139 tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1140 Ok(())
1141 })?;
1142
1143 let mut store = Store::persistent(dbfile_before_migration.path())?;
1145 let actual = store
1146 .get_latest_for_each_author(namespace.id())?
1147 .collect::<Result<Vec<_>>>()?;
1148
1149 assert_eq!(expected, actual);
1150 Ok(())
1151 }
1152
1153 #[test]
1154 fn test_migration_004_populate_by_key_index() -> Result<()> {
1155 use redb::ReadableTableMetadata;
1156 let dbfile = tempfile::NamedTempFile::new()?;
1157
1158 let mut store = Store::persistent(dbfile.path())?;
1159
1160 {
1162 let tables = store.tables()?;
1163 assert_eq!(tables.records_by_key.len()?, 0);
1164 }
1165
1166 Ok(())
1168 }
1169}