1use std::{
4 cmp::Ordering,
5 collections::HashSet,
6 iter::{Chain, Flatten},
7 num::NonZeroU64,
8 ops::Bound,
9 path::Path,
10};
11
12use anyhow::{anyhow, Result};
13use ed25519_dalek::{SignatureError, VerifyingKey};
14use iroh_blobs::Hash;
15use rand_core::CryptoRngCore;
16use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable};
17use tracing::warn;
18
19use super::{
20 pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21 Query,
22};
23use crate::{
24 actor::MAX_COMMIT_DELAY,
25 keys::Author,
26 ranger::{Fingerprint, Range, RangeEntry},
27 sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28 AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29 ReplicaInfo,
30};
31
32mod bounds;
33mod migrate_v1_v2;
34mod migrations;
35mod query;
36mod ranges;
37pub(crate) mod tables;
38
39pub use self::ranges::RecordsRange;
40use self::{
41 bounds::{ByKeyBounds, RecordsBounds},
42 query::QueryIterator,
43 ranges::RangeExt,
44 tables::{
45 LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
46 RecordsValue, Tables, TransactionAndTables,
47 },
48};
49
50#[derive(Debug)]
52pub struct Store {
53 db: Database,
54 transaction: CurrentTransaction,
55 open_replicas: HashSet<NamespaceId>,
56 pubkeys: MemPublicKeyStore,
57}
58
59impl Drop for Store {
60 fn drop(&mut self) {
61 if let Err(err) = self.flush() {
62 warn!("failed to trigger final flush: {:?}", err);
63 }
64 }
65}
66
67impl AsRef<Store> for Store {
68 fn as_ref(&self) -> &Store {
69 self
70 }
71}
72
73impl AsMut<Store> for Store {
74 fn as_mut(&mut self) -> &mut Store {
75 self
76 }
77}
78
79#[derive(derive_more::Debug, Default)]
80#[allow(clippy::large_enum_variant)]
81enum CurrentTransaction {
82 #[default]
83 None,
84 Read(ReadOnlyTables),
85 Write(TransactionAndTables),
86}
87
88impl Store {
89 pub fn memory() -> Self {
91 Self::memory_impl().expect("failed to create memory store")
92 }
93
94 fn memory_impl() -> Result<Self> {
95 let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
96 Self::new_impl(db)
97 }
98
99 pub fn persistent(path: impl AsRef<Path>) -> Result<Self> {
103 let db = match Database::create(&path) {
104 Ok(db) => db,
105 Err(DatabaseError::UpgradeRequired(1)) => migrate_v1_v2::run(&path)?,
106 Err(err) => return Err(err.into()),
107 };
108 Self::new_impl(db)
109 }
110
111 fn new_impl(db: redb::Database) -> Result<Self> {
112 let write_tx = db.begin_write()?;
114 let _ = Tables::new(&write_tx)?;
115 write_tx.commit()?;
116
117 migrations::run_migrations(&db)?;
119
120 Ok(Store {
121 db,
122 transaction: Default::default(),
123 open_replicas: Default::default(),
124 pubkeys: Default::default(),
125 })
126 }
127
128 pub fn flush(&mut self) -> Result<()> {
132 if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
133 w.commit()?;
134 }
135 Ok(())
136 }
137
138 pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
143 let guard = &mut self.transaction;
144 let tables = match std::mem::take(guard) {
145 CurrentTransaction::None => {
146 let tx = self.db.begin_read()?;
147 ReadOnlyTables::new(tx)?
148 }
149 CurrentTransaction::Write(w) => {
150 w.commit()?;
151 let tx = self.db.begin_read()?;
152 ReadOnlyTables::new(tx)?
153 }
154 CurrentTransaction::Read(tables) => tables,
155 };
156 *guard = CurrentTransaction::Read(tables);
157 match &*guard {
158 CurrentTransaction::Read(ref tables) => Ok(tables),
159 _ => unreachable!(),
160 }
161 }
162
163 pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
171 self.flush()?;
173 assert!(matches!(self.transaction, CurrentTransaction::None));
174 let tx = self.db.begin_read()?;
175 let tables = ReadOnlyTables::new(tx)?;
176 Ok(tables)
177 }
178
179 fn tables(&mut self) -> Result<&Tables<'_>> {
190 let guard = &mut self.transaction;
191 let tables = match std::mem::take(guard) {
192 CurrentTransaction::None => {
193 let tx = self.db.begin_write()?;
194 TransactionAndTables::new(tx)?
195 }
196 CurrentTransaction::Write(w) => {
197 if w.since.elapsed() > MAX_COMMIT_DELAY {
198 tracing::debug!("committing transaction because it's too old");
199 w.commit()?;
200 let tx = self.db.begin_write()?;
201 TransactionAndTables::new(tx)?
202 } else {
203 w
204 }
205 }
206 CurrentTransaction::Read(_) => {
207 let tx = self.db.begin_write()?;
208 TransactionAndTables::new(tx)?
209 }
210 };
211 *guard = CurrentTransaction::Write(tables);
212 match guard {
213 CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
214 _ => unreachable!(),
215 }
216 }
217
218 fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
227 let guard = &mut self.transaction;
228 let tables = match std::mem::take(guard) {
229 CurrentTransaction::None => {
230 let tx = self.db.begin_write()?;
231 TransactionAndTables::new(tx)?
232 }
233 CurrentTransaction::Write(w) => {
234 if w.since.elapsed() > MAX_COMMIT_DELAY {
235 tracing::debug!("committing transaction because it's too old");
236 w.commit()?;
237 let tx = self.db.begin_write()?;
238 TransactionAndTables::new(tx)?
239 } else {
240 w
241 }
242 }
243 CurrentTransaction::Read(_) => {
244 let tx = self.db.begin_write()?;
245 TransactionAndTables::new(tx)?
246 }
247 };
248 *guard = CurrentTransaction::Write(tables);
249 let res = match &mut *guard {
250 CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
251 _ => unreachable!(),
252 };
253 Ok(res)
254 }
255}
256
257type PeersIter = std::vec::IntoIter<PeerIdBytes>;
258
259impl Store {
260 pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
262 let id = namespace.id();
263 self.import_namespace(namespace.into())?;
264 self.open_replica(&id).map_err(Into::into)
265 }
266
267 pub fn new_author<R: CryptoRngCore + ?Sized>(&mut self, rng: &mut R) -> Result<Author> {
269 let author = Author::new(rng);
270 self.import_author(author.clone())?;
271 Ok(author)
272 }
273
274 pub fn has_news_for_us(
278 &mut self,
279 namespace: NamespaceId,
280 heads: &AuthorHeads,
281 ) -> Result<Option<NonZeroU64>> {
282 let our_heads = {
283 let latest = self.get_latest_for_each_author(namespace)?;
284 let mut heads = AuthorHeads::default();
285 for e in latest {
286 let (author, timestamp, _key) = e?;
287 heads.insert(author, timestamp);
288 }
289 heads
290 };
291 let has_news_for_us = heads.has_news_for(&our_heads);
292 Ok(has_news_for_us)
293 }
294
295 pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
299 let info = self.load_replica_info(namespace_id)?;
300 let instance = StoreInstance::new(*namespace_id, self);
301 Ok(Replica::new(instance, Box::new(info)))
302 }
303
304 pub fn load_replica_info(
306 &mut self,
307 namespace_id: &NamespaceId,
308 ) -> Result<ReplicaInfo, OpenError> {
309 let tables = self.tables()?;
310 let info = match tables.namespaces.get(namespace_id.as_bytes()) {
311 Ok(Some(db_value)) => {
312 let (raw_kind, raw_bytes) = db_value.value();
313 let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
314 ReplicaInfo::new(namespace)
315 }
316 Ok(None) => return Err(OpenError::NotFound),
317 Err(err) => return Err(OpenError::Other(err.into())),
318 };
319 self.open_replicas.insert(info.capability.id());
320 Ok(info)
321 }
322
323 pub fn close_replica(&mut self, id: NamespaceId) {
325 self.open_replicas.remove(&id);
326 }
327
328 pub fn list_namespaces(
330 &mut self,
331 ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
332 let snapshot = self.snapshot()?;
333 let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
334 let iter = iter.map(|res| {
335 let capability = parse_capability(res?.1.value())?;
336 Ok((capability.id(), capability.kind()))
337 });
338 Ok(iter)
339 }
340
341 pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
343 let tables = self.tables()?;
344 let Some(author) = tables.authors.get(author_id.as_bytes())? else {
345 return Ok(None);
346 };
347 let author = Author::from_bytes(author.value());
348 Ok(Some(author))
349 }
350
351 pub fn import_author(&mut self, author: Author) -> Result<()> {
353 self.modify(|tables| {
354 tables
355 .authors
356 .insert(author.id().as_bytes(), &author.to_bytes())?;
357 Ok(())
358 })
359 }
360
361 pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
363 self.modify(|tables| {
364 tables.authors.remove(author.as_bytes())?;
365 Ok(())
366 })
367 }
368
369 pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
371 let tables = self.snapshot()?;
372 let iter = tables
373 .authors
374 .range::<&'static [u8; 32]>(..)?
375 .map(|res| match res {
376 Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
377 Err(err) => Err(err.into()),
378 });
379 Ok(iter)
380 }
381
382 pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
384 self.modify(|tables| {
385 let outcome = {
386 let (capability, outcome) = {
387 let existing = tables.namespaces.get(capability.id().as_bytes())?;
388 if let Some(existing) = existing {
389 let mut existing = parse_capability(existing.value())?;
390 let outcome = if existing.merge(capability)? {
391 ImportNamespaceOutcome::Upgraded
392 } else {
393 ImportNamespaceOutcome::NoChange
394 };
395 (existing, outcome)
396 } else {
397 (capability, ImportNamespaceOutcome::Inserted)
398 }
399 };
400 let id = capability.id().to_bytes();
401 let (kind, bytes) = capability.raw();
402 tables.namespaces.insert(&id, (kind, &bytes))?;
403 outcome
404 };
405 Ok(outcome)
406 })
407 }
408
409 pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
417 if self.open_replicas.contains(namespace) {
418 return Err(anyhow!("replica is not closed"));
419 }
420 self.modify(|tables| {
421 let bounds = RecordsBounds::namespace(*namespace);
422 tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
423 let bounds = ByKeyBounds::namespace(*namespace);
424 let _ = tables
425 .records_by_key
426 .retain_in(bounds.as_ref(), |_k, _v| false);
427 tables.namespaces.remove(namespace.as_bytes())?;
428 tables.namespace_peers.remove_all(namespace.as_bytes())?;
429 tables.download_policy.remove(namespace.as_bytes())?;
430 Ok(())
431 })
432 }
433
434 pub fn get_many(
436 &mut self,
437 namespace: NamespaceId,
438 query: impl Into<Query>,
439 ) -> Result<QueryIterator> {
440 let tables = self.snapshot_owned()?;
441 QueryIterator::new(tables, namespace, query.into())
442 }
443
444 pub fn get_exact(
446 &mut self,
447 namespace: NamespaceId,
448 author: AuthorId,
449 key: impl AsRef<[u8]>,
450 include_empty: bool,
451 ) -> Result<Option<SignedEntry>> {
452 get_exact(
453 &self.tables()?.records,
454 namespace,
455 author,
456 key,
457 include_empty,
458 )
459 }
460
461 pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
463 let tables = self.snapshot_owned()?;
464 ContentHashesIterator::all(&tables.records)
465 }
466
467 pub fn get_latest_for_each_author(
469 &mut self,
470 namespace: NamespaceId,
471 ) -> Result<LatestIterator<'_>> {
472 LatestIterator::new(&self.tables()?.latest_per_author, namespace)
473 }
474
475 pub fn register_useful_peer(
477 &mut self,
478 namespace: NamespaceId,
479 peer: crate::PeerIdBytes,
480 ) -> Result<()> {
481 let peer = &peer;
482 let namespace = namespace.as_bytes();
483 let nanos = std::time::UNIX_EPOCH
485 .elapsed()
486 .map(|duration| duration.as_nanos() as u64)?;
487 self.modify(|tables| {
488 anyhow::ensure!(
490 tables.namespaces.get(namespace)?.is_some(),
491 "document not created"
492 );
493
494 let mut namespace_peers = tables.namespace_peers.get(namespace)?;
495
496 let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
498 let (oldest_nanos, &oldest_peer) = guard.value();
499 (oldest_nanos, oldest_peer)
500 });
501 match maybe_oldest {
502 None => {
503 drop(namespace_peers);
506 tables.namespace_peers.insert(namespace, (nanos, peer))?;
507 }
508 Some((oldest_nanos, oldest_peer)) => {
509 let oldest_peer = &oldest_peer;
510
511 if oldest_peer == peer {
512 drop(namespace_peers);
515 tables
516 .namespace_peers
517 .remove(namespace, (oldest_nanos, oldest_peer))?;
518 tables.namespace_peers.insert(namespace, (nanos, peer))?;
519 } else {
520 let mut len = 1;
522 let mut prev_peer_nanos = None;
524
525 for result in namespace_peers {
526 len += 1;
527 let guard = result?;
528 let (peer_nanos, peer_bytes) = guard.value();
529 if prev_peer_nanos.is_none() && peer_bytes == peer {
530 prev_peer_nanos = Some(peer_nanos)
531 }
532 }
533
534 match prev_peer_nanos {
535 Some(prev_nanos) => {
536 tables
539 .namespace_peers
540 .remove(namespace, (prev_nanos, peer))?;
541 tables.namespace_peers.insert(namespace, (nanos, peer))?;
542 }
543 None => {
544 tables.namespace_peers.insert(namespace, (nanos, peer))?;
547 len += 1;
548 if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
549 tables
550 .namespace_peers
551 .remove(namespace, (oldest_nanos, oldest_peer))?;
552 }
553 }
554 }
555 }
556 }
557 }
558 Ok(())
559 })
560 }
561
562 pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
564 let tables = self.tables()?;
565 let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
566 for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
567 let (_nanos, &peer) = result?.value();
568 peers.push(peer);
569 }
570 if peers.is_empty() {
571 Ok(None)
572 } else {
573 Ok(Some(peers.into_iter()))
574 }
575 }
576
577 pub fn set_download_policy(
579 &mut self,
580 namespace: &NamespaceId,
581 policy: DownloadPolicy,
582 ) -> Result<()> {
583 self.modify(|tables| {
584 let namespace = namespace.as_bytes();
585
586 anyhow::ensure!(
588 tables.namespaces.get(&namespace)?.is_some(),
589 "document not created"
590 );
591
592 let value = postcard::to_stdvec(&policy)?;
593 tables.download_policy.insert(namespace, value.as_slice())?;
594 Ok(())
595 })
596 }
597
598 pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
600 let tables = self.tables()?;
601 let value = tables.download_policy.get(namespace.as_bytes())?;
602 Ok(match value {
603 None => DownloadPolicy::default(),
604 Some(value) => postcard::from_bytes(value.value())?,
605 })
606 }
607}
608
609impl PublicKeyStore for Store {
610 fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
611 self.pubkeys.public_key(id)
612 }
613}
614
615fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
616 Capability::from_raw(raw_kind, raw_bytes)
617}
618
619fn get_exact(
620 record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
621 namespace: NamespaceId,
622 author: AuthorId,
623 key: impl AsRef<[u8]>,
624 include_empty: bool,
625) -> Result<Option<SignedEntry>> {
626 let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
627 let record = record_table.get(id)?;
628 Ok(record
629 .map(|r| into_entry(id, r.value()))
630 .filter(|entry| include_empty || !entry.is_empty()))
631}
632
633#[derive(Debug)]
635pub struct StoreInstance<'a> {
636 namespace: NamespaceId,
637 pub(crate) store: &'a mut Store,
638}
639
640impl<'a> StoreInstance<'a> {
641 pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
642 StoreInstance { namespace, store }
643 }
644}
645
646impl PublicKeyStore for StoreInstance<'_> {
647 fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
648 self.store.public_key(id)
649 }
650}
651
652impl super::DownloadPolicyStore for StoreInstance<'_> {
653 fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
654 self.store.get_download_policy(namespace)
655 }
656}
657
658impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
659 type Error = anyhow::Error;
660 type RangeIterator<'x>
661 = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
662 where
663 'a: 'x;
664 type ParentIterator<'x>
665 = ParentIterator
666 where
667 'a: 'x;
668
669 fn get_first(&mut self) -> Result<RecordIdentifier> {
671 let tables = self.store.as_mut().tables()?;
672 let bounds = RecordsBounds::namespace(self.namespace);
674 let mut records = tables.records.range(bounds.as_ref())?;
675
676 let Some(record) = records.next() else {
677 return Ok(RecordIdentifier::default());
678 };
679 let (compound_key, _value) = record?;
680 let (namespace_id, author_id, key) = compound_key.value();
681 let id = RecordIdentifier::new(namespace_id, author_id, key);
682 Ok(id)
683 }
684
685 #[cfg(test)]
686 fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
687 self.store
688 .as_mut()
689 .get_exact(id.namespace(), id.author(), id.key(), true)
690 }
691
692 #[cfg(test)]
693 fn len(&mut self) -> Result<usize> {
694 let tables = self.store.as_mut().tables()?;
695 let bounds = RecordsBounds::namespace(self.namespace);
696 let records = tables.records.range(bounds.as_ref())?;
697 Ok(records.count())
698 }
699
700 #[cfg(test)]
701 fn is_empty(&mut self) -> Result<bool> {
702 use redb::ReadableTableMetadata;
703 let tables = self.store.as_mut().tables()?;
704 Ok(tables.records.is_empty()?)
705 }
706
707 fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
708 let elements = self.get_range(range.clone())?;
710
711 let mut fp = Fingerprint::empty();
712 for el in elements {
713 let el = el?;
714 fp ^= el.as_fingerprint();
715 }
716
717 Ok(fp)
718 }
719
720 fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
721 let id = e.id();
722 self.store.as_mut().modify(|tables| {
723 let key = (
725 &id.namespace().to_bytes(),
726 &id.author().to_bytes(),
727 id.key(),
728 );
729 let hash = e.content_hash(); let value = (
731 e.timestamp(),
732 &e.signature().namespace().to_bytes(),
733 &e.signature().author().to_bytes(),
734 e.content_len(),
735 hash.as_bytes(),
736 );
737 tables.records.insert(key, value)?;
738
739 let key = (
741 &id.namespace().to_bytes(),
742 id.key(),
743 &id.author().to_bytes(),
744 );
745 tables.records_by_key.insert(key, ())?;
746
747 let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
749 let value = (e.timestamp(), e.id().key());
750 tables.latest_per_author.insert(key, value)?;
751 Ok(())
752 })
753 }
754
755 fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
756 let tables = self.store.as_mut().tables()?;
757 let iter = match range.x().cmp(range.y()) {
758 Ordering::Equal => {
760 let bounds = RecordsBounds::namespace(self.namespace);
762 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
763 chain_none(iter)
764 }
765 Ordering::Less => {
767 let start = Bound::Included(range.x().to_byte_tuple());
769 let end = Bound::Excluded(range.y().to_byte_tuple());
770 let bounds = RecordsBounds::new(start, end);
771 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
772 chain_none(iter)
773 }
774 Ordering::Greater => {
776 let end = Bound::Excluded(range.y().to_byte_tuple());
778 let bounds = RecordsBounds::from_start(&self.namespace, end);
779 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
780
781 let start = Bound::Included(range.x().to_byte_tuple());
783 let bounds = RecordsBounds::to_end(&self.namespace, start);
784 let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
785
786 iter.chain(Some(iter2).into_iter().flatten())
787 }
788 };
789 Ok(iter)
790 }
791
792 #[cfg(test)]
793 fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
794 self.store.as_mut().modify(|tables| {
795 let entry = {
796 let (namespace, author, key) = id.as_byte_tuple();
797 let id = (namespace, key, author);
798 tables.records_by_key.remove(id)?;
799 let id = (namespace, author, key);
800 let value = tables.records.remove(id)?;
801 value.map(|value| into_entry(id, value.value()))
802 };
803 Ok(entry)
804 })
805 }
806
807 #[cfg(test)]
808 fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
809 let tables = self.store.as_mut().tables()?;
810 let bounds = RecordsBounds::namespace(self.namespace);
811 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
812 Ok(chain_none(iter))
813 }
814
815 fn prefixes_of(
816 &mut self,
817 id: &RecordIdentifier,
818 ) -> Result<Self::ParentIterator<'_>, Self::Error> {
819 let tables = self.store.as_mut().tables()?;
820 ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
821 }
822
823 #[cfg(test)]
824 fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
825 let tables = self.store.as_mut().tables()?;
826 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
827 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
828 Ok(chain_none(iter))
829 }
830
831 fn remove_prefix_filtered(
832 &mut self,
833 id: &RecordIdentifier,
834 predicate: impl Fn(&Record) -> bool,
835 ) -> Result<usize> {
836 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
837 self.store.as_mut().modify(|tables| {
838 let cb = |_k: RecordsId, v: RecordsValue| {
839 let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
840 let record = Record::new(hash.into(), len, timestamp);
841
842 predicate(&record)
843 };
844 let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
845 let count = iter.count();
846 Ok(count)
847 })
848 }
849}
850
851fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
852 iter: I,
853) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
854 iter.chain(None.into_iter().flatten())
855}
856
857#[derive(Debug)]
860pub struct ParentIterator {
861 inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
862}
863
864impl ParentIterator {
865 fn new(
866 tables: &Tables,
867 namespace: NamespaceId,
868 author: AuthorId,
869 key: Vec<u8>,
870 ) -> anyhow::Result<Self> {
871 let parents = parents(&tables.records, namespace, author, key.clone());
872 Ok(Self {
873 inner: parents.into_iter(),
874 })
875 }
876}
877
878fn parents(
879 table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
880 namespace: NamespaceId,
881 author: AuthorId,
882 mut key: Vec<u8>,
883) -> Vec<anyhow::Result<SignedEntry>> {
884 let mut res = Vec::new();
885
886 while !key.is_empty() {
887 let entry = get_exact(table, namespace, author, &key, false);
888 key.pop();
889 match entry {
890 Err(err) => res.push(Err(err)),
891 Ok(Some(entry)) => res.push(Ok(entry)),
892 Ok(None) => continue,
893 }
894 }
895 res.reverse();
896 res
897}
898
899impl Iterator for ParentIterator {
900 type Item = Result<SignedEntry>;
901
902 fn next(&mut self) -> Option<Self::Item> {
903 self.inner.next()
904 }
905}
906
907#[derive(derive_more::Debug)]
915pub struct ContentHashesIterator {
916 #[debug(skip)]
917 range: RecordsRange<'static>,
918}
919
920impl ContentHashesIterator {
921 pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
923 let range = RecordsRange::all_static(table)?;
924 Ok(Self { range })
925 }
926}
927
928impl Iterator for ContentHashesIterator {
929 type Item = Result<Hash>;
930
931 fn next(&mut self) -> Option<Self::Item> {
932 let v = self.range.next()?;
933 Some(v.map(|e| e.content_hash()))
934 }
935}
936
937#[derive(derive_more::Debug)]
939#[debug("LatestIterator")]
940pub struct LatestIterator<'a>(
941 redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
942);
943
944impl<'a> LatestIterator<'a> {
945 fn new(
946 latest_per_author: &'a impl ReadableTable<
947 LatestPerAuthorKey<'static>,
948 LatestPerAuthorValue<'static>,
949 >,
950 namespace: NamespaceId,
951 ) -> anyhow::Result<Self> {
952 let start = (namespace.as_bytes(), &[u8::MIN; 32]);
953 let end = (namespace.as_bytes(), &[u8::MAX; 32]);
954 let range = latest_per_author.range(start..=end)?;
955 Ok(Self(range))
956 }
957}
958
959impl Iterator for LatestIterator<'_> {
960 type Item = Result<(AuthorId, u64, Vec<u8>)>;
961
962 fn next(&mut self) -> Option<Self::Item> {
963 self.0.next_map(|key, value| {
964 let (_namespace, author) = key;
965 let (timestamp, key) = value;
966 (author.into(), timestamp, key.to_vec())
967 })
968 }
969}
970
971fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
972 let (namespace, author, key) = key;
973 let (timestamp, namespace_sig, author_sig, len, hash) = value;
974 let id = RecordIdentifier::new(namespace, author, key);
975 let record = Record::new(hash.into(), len, timestamp);
976 let entry = Entry::new(id, record);
977 let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
978 SignedEntry::new(entry_signature, entry)
979}
980
981#[cfg(test)]
982mod tests {
983 use super::{tables::LATEST_PER_AUTHOR_TABLE, *};
984 use crate::ranger::Store as _;
985
986 #[test]
987 fn test_ranges() -> Result<()> {
988 let dbfile = tempfile::NamedTempFile::new()?;
989 let mut store = Store::persistent(dbfile.path())?;
990
991 let author = store.new_author(&mut rand::thread_rng())?;
992 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
993 let mut replica = store.new_replica(namespace.clone())?;
994
995 let key1 = vec![255, 255];
997 let key2 = vec![255, 255, 255];
998 replica.hash_and_insert(&key1, &author, b"v1")?;
999 replica.hash_and_insert(&key2, &author, b"v2")?;
1000 let res = store
1001 .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
1002 .collect::<Result<Vec<_>>>()?;
1003 assert_eq!(res.len(), 2);
1004 assert_eq!(
1005 res.into_iter()
1006 .map(|entry| entry.key().to_vec())
1007 .collect::<Vec<_>>(),
1008 vec![key1, key2]
1009 );
1010 Ok(())
1011 }
1012
1013 #[test]
1014 fn test_basics() -> Result<()> {
1015 let dbfile = tempfile::NamedTempFile::new()?;
1016 let mut store = Store::persistent(dbfile.path())?;
1017
1018 let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1019 assert!(authors.is_empty());
1020
1021 let author = store.new_author(&mut rand::thread_rng())?;
1022 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1023 let _replica = store.new_replica(namespace.clone())?;
1024 store.close_replica(namespace.id());
1025 let replica = store.load_replica_info(&namespace.id())?;
1026 assert_eq!(replica.capability.id(), namespace.id());
1027
1028 let author_back = store.get_author(&author.id())?.unwrap();
1029 assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1030
1031 let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1032 for i in 0..5 {
1033 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1034 let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1035 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1036 wrapper.entry_put(entry)?;
1037 }
1038
1039 let all: Vec<_> = wrapper.all()?.collect();
1041 assert_eq!(all.len(), 5);
1042
1043 let mut ids = Vec::new();
1045 for i in 0..5 {
1046 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1047 let entry = Entry::new(
1048 id.clone(),
1049 Record::current_from_data(format!("world-{i}-2")),
1050 );
1051 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1052 wrapper.entry_put(entry)?;
1053 ids.push(id);
1054 }
1055
1056 let entries = wrapper
1058 .store
1059 .get_many(namespace.id(), Query::all())?
1060 .collect::<Result<Vec<_>>>()?;
1061 assert_eq!(entries.len(), 5);
1062
1063 let entries = wrapper
1065 .store
1066 .get_many(namespace.id(), Query::key_prefix("hello-"))?
1067 .collect::<Result<Vec<_>>>()?;
1068 assert_eq!(entries.len(), 5);
1069
1070 for id in ids {
1072 let res = wrapper.get(&id)?;
1073 assert!(res.is_some());
1074 let out = wrapper.entry_remove(&id)?.unwrap();
1075 assert_eq!(out.entry().id(), &id);
1076 let res = wrapper.get(&id)?;
1077 assert!(res.is_none());
1078 }
1079
1080 let entries = wrapper
1082 .store
1083 .get_many(namespace.id(), Query::all())?
1084 .collect::<Result<Vec<_>>>()?;
1085 assert_eq!(entries.len(), 0);
1086
1087 Ok(())
1088 }
1089
1090 fn copy_and_modify(
1091 source: &Path,
1092 modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1093 ) -> Result<tempfile::NamedTempFile> {
1094 let dbfile = tempfile::NamedTempFile::new()?;
1095 std::fs::copy(source, dbfile.path())?;
1096 let db = Database::create(dbfile.path())?;
1097 let write_tx = db.begin_write()?;
1098 modify(&write_tx)?;
1099 write_tx.commit()?;
1100 drop(db);
1101 Ok(dbfile)
1102 }
1103
1104 #[test]
1105 fn test_migration_001_populate_latest_table() -> Result<()> {
1106 let dbfile = tempfile::NamedTempFile::new()?;
1107 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1108
1109 let expected = {
1111 let mut store = Store::persistent(dbfile.path())?;
1112 let author1 = store.new_author(&mut rand::thread_rng())?;
1113 let author2 = store.new_author(&mut rand::thread_rng())?;
1114 let mut replica = store.new_replica(namespace.clone())?;
1115 replica.hash_and_insert(b"k1", &author1, b"v1")?;
1116 replica.hash_and_insert(b"k2", &author2, b"v1")?;
1117 replica.hash_and_insert(b"k3", &author1, b"v1")?;
1118
1119 let expected = store
1120 .get_latest_for_each_author(namespace.id())?
1121 .collect::<Result<Vec<_>>>()?;
1122 store.close_replica(namespace.id());
1124 store.flush()?;
1126 drop(store);
1127 expected
1128 };
1129 assert_eq!(expected.len(), 2);
1130
1131 let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1133 tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1134 Ok(())
1135 })?;
1136
1137 let mut store = Store::persistent(dbfile_before_migration.path())?;
1139 let actual = store
1140 .get_latest_for_each_author(namespace.id())?
1141 .collect::<Result<Vec<_>>>()?;
1142
1143 assert_eq!(expected, actual);
1144 Ok(())
1145 }
1146
1147 #[test]
1148 fn test_migration_004_populate_by_key_index() -> Result<()> {
1149 use redb::ReadableTableMetadata;
1150 let dbfile = tempfile::NamedTempFile::new()?;
1151
1152 let mut store = Store::persistent(dbfile.path())?;
1153
1154 {
1156 let tables = store.tables()?;
1157 assert_eq!(tables.records_by_key.len()?, 0);
1158 }
1159
1160 Ok(())
1162 }
1163}