1use std::{
4 cmp::Ordering,
5 collections::HashSet,
6 iter::{Chain, Flatten},
7 num::NonZeroU64,
8 ops::Bound,
9};
10
11use anyhow::{anyhow, Result};
12use ed25519_dalek::{SignatureError, VerifyingKey};
13use iroh_blobs::Hash;
14use n0_future::time::SystemTime;
15use rand::CryptoRng;
16use redb::{Database, ReadableDatabase, ReadableMultimapTable, ReadableTable};
17use tracing::warn;
18
19use super::{
20 pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21 Query,
22};
23use crate::{
24 actor::MAX_COMMIT_DELAY,
25 keys::Author,
26 ranger::{Fingerprint, Range, RangeEntry},
27 sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28 AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29 ReplicaInfo,
30};
31
32mod bounds;
33mod migrations;
34mod query;
35mod ranges;
36pub(crate) mod tables;
37
38pub use self::ranges::RecordsRange;
39use self::{
40 bounds::{ByKeyBounds, RecordsBounds},
41 query::QueryIterator,
42 ranges::RangeExt,
43 tables::{
44 LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
45 RecordsValue, Tables, TransactionAndTables,
46 },
47};
48
49#[derive(Debug)]
51pub struct Store {
52 db: Database,
53 transaction: CurrentTransaction,
54 open_replicas: HashSet<NamespaceId>,
55 pubkeys: MemPublicKeyStore,
56}
57
58impl Drop for Store {
59 fn drop(&mut self) {
60 if let Err(err) = self.flush() {
61 warn!("failed to trigger final flush: {:?}", err);
62 }
63 }
64}
65
66impl AsRef<Store> for Store {
67 fn as_ref(&self) -> &Store {
68 self
69 }
70}
71
72impl AsMut<Store> for Store {
73 fn as_mut(&mut self) -> &mut Store {
74 self
75 }
76}
77
78#[derive(derive_more::Debug, Default)]
79#[allow(clippy::large_enum_variant)]
80enum CurrentTransaction {
81 #[default]
82 None,
83 Read(ReadOnlyTables),
84 Write(TransactionAndTables),
85}
86
87impl Store {
88 pub fn memory() -> Self {
90 Self::memory_impl().expect("failed to create memory store")
91 }
92
93 fn memory_impl() -> Result<Self> {
94 let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
95 Self::new_impl(db)
96 }
97
98 #[cfg(feature = "fs-store")]
102 pub fn persistent(path: impl AsRef<std::path::Path>) -> Result<Self> {
103 let db = match Database::create(&path) {
104 Ok(db) => db,
105 Err(redb::DatabaseError::UpgradeRequired(v)) => {
106 return Err(anyhow!(
107 "Opening the database failed: Upgrading from redb {v} longer supported. Use and older redb version first."
108 ));
109 }
110 Err(err) => return Err(err.into()),
111 };
112 Self::new_impl(db)
113 }
114
115 fn new_impl(db: redb::Database) -> Result<Self> {
116 let write_tx = db.begin_write()?;
118 let _ = Tables::new(&write_tx)?;
119 write_tx.commit()?;
120
121 migrations::run_migrations(&db)?;
123
124 Ok(Store {
125 db,
126 transaction: Default::default(),
127 open_replicas: Default::default(),
128 pubkeys: Default::default(),
129 })
130 }
131
132 pub fn flush(&mut self) -> Result<()> {
136 if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
137 w.commit()?;
138 }
139 Ok(())
140 }
141
142 pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
147 let guard = &mut self.transaction;
148 let tables = match std::mem::take(guard) {
149 CurrentTransaction::None => {
150 let tx = self.db.begin_read()?;
151 ReadOnlyTables::new(tx)?
152 }
153 CurrentTransaction::Write(w) => {
154 w.commit()?;
155 let tx = self.db.begin_read()?;
156 ReadOnlyTables::new(tx)?
157 }
158 CurrentTransaction::Read(tables) => tables,
159 };
160 *guard = CurrentTransaction::Read(tables);
161 match &*guard {
162 CurrentTransaction::Read(ref tables) => Ok(tables),
163 _ => unreachable!(),
164 }
165 }
166
167 pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
175 self.flush()?;
177 assert!(matches!(self.transaction, CurrentTransaction::None));
178 let tx = self.db.begin_read()?;
179 let tables = ReadOnlyTables::new(tx)?;
180 Ok(tables)
181 }
182
183 fn tables(&mut self) -> Result<&Tables<'_>> {
194 let guard = &mut self.transaction;
195 let tables = match std::mem::take(guard) {
196 CurrentTransaction::None => {
197 let tx = self.db.begin_write()?;
198 TransactionAndTables::new(tx)?
199 }
200 CurrentTransaction::Write(w) => {
201 if w.since.elapsed() > MAX_COMMIT_DELAY {
202 tracing::debug!("committing transaction because it's too old");
203 w.commit()?;
204 let tx = self.db.begin_write()?;
205 TransactionAndTables::new(tx)?
206 } else {
207 w
208 }
209 }
210 CurrentTransaction::Read(_) => {
211 let tx = self.db.begin_write()?;
212 TransactionAndTables::new(tx)?
213 }
214 };
215 *guard = CurrentTransaction::Write(tables);
216 match guard {
217 CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
218 _ => unreachable!(),
219 }
220 }
221
222 fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
231 let guard = &mut self.transaction;
232 let tables = match std::mem::take(guard) {
233 CurrentTransaction::None => {
234 let tx = self.db.begin_write()?;
235 TransactionAndTables::new(tx)?
236 }
237 CurrentTransaction::Write(w) => {
238 if w.since.elapsed() > MAX_COMMIT_DELAY {
239 tracing::debug!("committing transaction because it's too old");
240 w.commit()?;
241 let tx = self.db.begin_write()?;
242 TransactionAndTables::new(tx)?
243 } else {
244 w
245 }
246 }
247 CurrentTransaction::Read(_) => {
248 let tx = self.db.begin_write()?;
249 TransactionAndTables::new(tx)?
250 }
251 };
252 *guard = CurrentTransaction::Write(tables);
253 let res = match &mut *guard {
254 CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
255 _ => unreachable!(),
256 };
257 Ok(res)
258 }
259}
260
261type PeersIter = std::vec::IntoIter<PeerIdBytes>;
262
263impl Store {
264 pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
266 let id = namespace.id();
267 self.import_namespace(namespace.into())?;
268 self.open_replica(&id).map_err(Into::into)
269 }
270
271 pub fn new_author<R: CryptoRng>(&mut self, rng: &mut R) -> Result<Author> {
273 let author = Author::new(rng);
274 self.import_author(author.clone())?;
275 Ok(author)
276 }
277
278 pub fn has_news_for_us(
282 &mut self,
283 namespace: NamespaceId,
284 heads: &AuthorHeads,
285 ) -> Result<Option<NonZeroU64>> {
286 let our_heads = {
287 let latest = self.get_latest_for_each_author(namespace)?;
288 let mut heads = AuthorHeads::default();
289 for e in latest {
290 let (author, timestamp, _key) = e?;
291 heads.insert(author, timestamp);
292 }
293 heads
294 };
295 let has_news_for_us = heads.has_news_for(&our_heads);
296 Ok(has_news_for_us)
297 }
298
299 pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
303 let info = self.load_replica_info(namespace_id)?;
304 let instance = StoreInstance::new(*namespace_id, self);
305 Ok(Replica::new(instance, Box::new(info)))
306 }
307
308 pub fn load_replica_info(
310 &mut self,
311 namespace_id: &NamespaceId,
312 ) -> Result<ReplicaInfo, OpenError> {
313 let tables = self.tables()?;
314 let info = match tables.namespaces.get(namespace_id.as_bytes()) {
315 Ok(Some(db_value)) => {
316 let (raw_kind, raw_bytes) = db_value.value();
317 let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
318 ReplicaInfo::new(namespace)
319 }
320 Ok(None) => return Err(OpenError::NotFound),
321 Err(err) => return Err(OpenError::Other(err.into())),
322 };
323 self.open_replicas.insert(info.capability.id());
324 Ok(info)
325 }
326
327 pub fn close_replica(&mut self, id: NamespaceId) {
329 self.open_replicas.remove(&id);
330 }
331
332 pub fn list_namespaces(
334 &mut self,
335 ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
336 let snapshot = self.snapshot()?;
337 let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
338 let iter = iter.map(|res| {
339 let capability = parse_capability(res?.1.value())?;
340 Ok((capability.id(), capability.kind()))
341 });
342 Ok(iter)
343 }
344
345 pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
347 let tables = self.tables()?;
348 let Some(author) = tables.authors.get(author_id.as_bytes())? else {
349 return Ok(None);
350 };
351 let author = Author::from_bytes(author.value());
352 Ok(Some(author))
353 }
354
355 pub fn import_author(&mut self, author: Author) -> Result<()> {
357 self.modify(|tables| {
358 tables
359 .authors
360 .insert(author.id().as_bytes(), &author.to_bytes())?;
361 Ok(())
362 })
363 }
364
365 pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
367 self.modify(|tables| {
368 tables.authors.remove(author.as_bytes())?;
369 Ok(())
370 })
371 }
372
373 pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
375 let tables = self.snapshot()?;
376 let iter = tables
377 .authors
378 .range::<&'static [u8; 32]>(..)?
379 .map(|res| match res {
380 Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
381 Err(err) => Err(err.into()),
382 });
383 Ok(iter)
384 }
385
386 pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
388 self.modify(|tables| {
389 let outcome = {
390 let (capability, outcome) = {
391 let existing = tables.namespaces.get(capability.id().as_bytes())?;
392 if let Some(existing) = existing {
393 let mut existing = parse_capability(existing.value())?;
394 let outcome = if existing.merge(capability)? {
395 ImportNamespaceOutcome::Upgraded
396 } else {
397 ImportNamespaceOutcome::NoChange
398 };
399 (existing, outcome)
400 } else {
401 (capability, ImportNamespaceOutcome::Inserted)
402 }
403 };
404 let id = capability.id().to_bytes();
405 let (kind, bytes) = capability.raw();
406 tables.namespaces.insert(&id, (kind, &bytes))?;
407 outcome
408 };
409 Ok(outcome)
410 })
411 }
412
413 pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
421 if self.open_replicas.contains(namespace) {
422 return Err(anyhow!("replica is not closed"));
423 }
424 self.modify(|tables| {
425 let bounds = RecordsBounds::namespace(*namespace);
426 tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
427 let bounds = ByKeyBounds::namespace(*namespace);
428 let _ = tables
429 .records_by_key
430 .retain_in(bounds.as_ref(), |_k, _v| false);
431 tables.namespaces.remove(namespace.as_bytes())?;
432 tables.namespace_peers.remove_all(namespace.as_bytes())?;
433 tables.download_policy.remove(namespace.as_bytes())?;
434 Ok(())
435 })
436 }
437
438 pub fn get_many(
440 &mut self,
441 namespace: NamespaceId,
442 query: impl Into<Query>,
443 ) -> Result<QueryIterator> {
444 let tables = self.snapshot_owned()?;
445 QueryIterator::new(tables, namespace, query.into())
446 }
447
448 pub fn get_exact(
450 &mut self,
451 namespace: NamespaceId,
452 author: AuthorId,
453 key: impl AsRef<[u8]>,
454 include_empty: bool,
455 ) -> Result<Option<SignedEntry>> {
456 get_exact(
457 &self.tables()?.records,
458 namespace,
459 author,
460 key,
461 include_empty,
462 )
463 }
464
465 pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
467 let tables = self.snapshot_owned()?;
468 ContentHashesIterator::all(&tables.records)
469 }
470
471 pub fn get_latest_for_each_author(
473 &mut self,
474 namespace: NamespaceId,
475 ) -> Result<LatestIterator<'_>> {
476 LatestIterator::new(&self.tables()?.latest_per_author, namespace)
477 }
478
479 pub fn register_useful_peer(
481 &mut self,
482 namespace: NamespaceId,
483 peer: crate::PeerIdBytes,
484 ) -> Result<()> {
485 let peer = &peer;
486 let namespace = namespace.as_bytes();
487 let nanos = SystemTime::UNIX_EPOCH
489 .elapsed()
490 .map(|duration| duration.as_nanos() as u64)?;
491 self.modify(|tables| {
492 anyhow::ensure!(
494 tables.namespaces.get(namespace)?.is_some(),
495 "document not created"
496 );
497
498 let mut namespace_peers = tables.namespace_peers.get(namespace)?;
499
500 let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
502 let (oldest_nanos, &oldest_peer) = guard.value();
503 (oldest_nanos, oldest_peer)
504 });
505 match maybe_oldest {
506 None => {
507 drop(namespace_peers);
510 tables.namespace_peers.insert(namespace, (nanos, peer))?;
511 }
512 Some((oldest_nanos, oldest_peer)) => {
513 let oldest_peer = &oldest_peer;
514
515 if oldest_peer == peer {
516 drop(namespace_peers);
519 tables
520 .namespace_peers
521 .remove(namespace, (oldest_nanos, oldest_peer))?;
522 tables.namespace_peers.insert(namespace, (nanos, peer))?;
523 } else {
524 let mut len = 1;
526 let mut prev_peer_nanos = None;
528
529 for result in namespace_peers {
530 len += 1;
531 let guard = result?;
532 let (peer_nanos, peer_bytes) = guard.value();
533 if prev_peer_nanos.is_none() && peer_bytes == peer {
534 prev_peer_nanos = Some(peer_nanos)
535 }
536 }
537
538 match prev_peer_nanos {
539 Some(prev_nanos) => {
540 tables
543 .namespace_peers
544 .remove(namespace, (prev_nanos, peer))?;
545 tables.namespace_peers.insert(namespace, (nanos, peer))?;
546 }
547 None => {
548 tables.namespace_peers.insert(namespace, (nanos, peer))?;
551 len += 1;
552 if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
553 tables
554 .namespace_peers
555 .remove(namespace, (oldest_nanos, oldest_peer))?;
556 }
557 }
558 }
559 }
560 }
561 }
562 Ok(())
563 })
564 }
565
566 pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
568 let tables = self.tables()?;
569 let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
570 for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
571 let (_nanos, &peer) = result?.value();
572 peers.push(peer);
573 }
574 if peers.is_empty() {
575 Ok(None)
576 } else {
577 Ok(Some(peers.into_iter()))
578 }
579 }
580
581 pub fn set_download_policy(
583 &mut self,
584 namespace: &NamespaceId,
585 policy: DownloadPolicy,
586 ) -> Result<()> {
587 self.modify(|tables| {
588 let namespace = namespace.as_bytes();
589
590 anyhow::ensure!(
592 tables.namespaces.get(&namespace)?.is_some(),
593 "document not created"
594 );
595
596 let value = postcard::to_stdvec(&policy)?;
597 tables.download_policy.insert(namespace, value.as_slice())?;
598 Ok(())
599 })
600 }
601
602 pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
604 let tables = self.tables()?;
605 let value = tables.download_policy.get(namespace.as_bytes())?;
606 Ok(match value {
607 None => DownloadPolicy::default(),
608 Some(value) => postcard::from_bytes(value.value())?,
609 })
610 }
611}
612
613impl PublicKeyStore for Store {
614 fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
615 self.pubkeys.public_key(id)
616 }
617}
618
619fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
620 Capability::from_raw(raw_kind, raw_bytes)
621}
622
623fn get_exact(
624 record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
625 namespace: NamespaceId,
626 author: AuthorId,
627 key: impl AsRef<[u8]>,
628 include_empty: bool,
629) -> Result<Option<SignedEntry>> {
630 let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
631 let record = record_table.get(id)?;
632 Ok(record
633 .map(|r| into_entry(id, r.value()))
634 .filter(|entry| include_empty || !entry.is_empty()))
635}
636
637#[derive(Debug)]
639pub struct StoreInstance<'a> {
640 namespace: NamespaceId,
641 pub(crate) store: &'a mut Store,
642}
643
644impl<'a> StoreInstance<'a> {
645 pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
646 StoreInstance { namespace, store }
647 }
648}
649
650impl PublicKeyStore for StoreInstance<'_> {
651 fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
652 self.store.public_key(id)
653 }
654}
655
656impl super::DownloadPolicyStore for StoreInstance<'_> {
657 fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
658 self.store.get_download_policy(namespace)
659 }
660}
661
662impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
663 type Error = anyhow::Error;
664 type RangeIterator<'x>
665 = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
666 where
667 'a: 'x;
668 type ParentIterator<'x>
669 = ParentIterator
670 where
671 'a: 'x;
672
673 fn get_first(&mut self) -> Result<RecordIdentifier> {
675 let tables = self.store.as_mut().tables()?;
676 let bounds = RecordsBounds::namespace(self.namespace);
678 let mut records = tables.records.range(bounds.as_ref())?;
679
680 let Some(record) = records.next() else {
681 return Ok(RecordIdentifier::default());
682 };
683 let (compound_key, _value) = record?;
684 let (namespace_id, author_id, key) = compound_key.value();
685 let id = RecordIdentifier::new(namespace_id, author_id, key);
686 Ok(id)
687 }
688
689 #[cfg(test)]
690 fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
691 self.store
692 .as_mut()
693 .get_exact(id.namespace(), id.author(), id.key(), true)
694 }
695
696 #[cfg(test)]
697 fn len(&mut self) -> Result<usize> {
698 let tables = self.store.as_mut().tables()?;
699 let bounds = RecordsBounds::namespace(self.namespace);
700 let records = tables.records.range(bounds.as_ref())?;
701 Ok(records.count())
702 }
703
704 #[cfg(test)]
705 fn is_empty(&mut self) -> Result<bool> {
706 use redb::ReadableTableMetadata;
707 let tables = self.store.as_mut().tables()?;
708 Ok(tables.records.is_empty()?)
709 }
710
711 fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
712 let elements = self.get_range(range.clone())?;
714
715 let mut fp = Fingerprint::empty();
716 for el in elements {
717 let el = el?;
718 fp ^= el.as_fingerprint();
719 }
720
721 Ok(fp)
722 }
723
724 fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
725 let id = e.id();
726 self.store.as_mut().modify(|tables| {
727 let key = (
729 &id.namespace().to_bytes(),
730 &id.author().to_bytes(),
731 id.key(),
732 );
733 let hash = e.content_hash(); let value = (
735 e.timestamp(),
736 &e.signature().namespace().to_bytes(),
737 &e.signature().author().to_bytes(),
738 e.content_len(),
739 hash.as_bytes(),
740 );
741 tables.records.insert(key, value)?;
742
743 let key = (
745 &id.namespace().to_bytes(),
746 id.key(),
747 &id.author().to_bytes(),
748 );
749 tables.records_by_key.insert(key, ())?;
750
751 let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
753 let value = (e.timestamp(), e.id().key());
754 tables.latest_per_author.insert(key, value)?;
755 Ok(())
756 })
757 }
758
759 fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
760 let tables = self.store.as_mut().tables()?;
761 let iter = match range.x().cmp(range.y()) {
762 Ordering::Equal => {
764 let bounds = RecordsBounds::namespace(self.namespace);
766 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
767 chain_none(iter)
768 }
769 Ordering::Less => {
771 let start = Bound::Included(range.x().to_byte_tuple());
773 let end = Bound::Excluded(range.y().to_byte_tuple());
774 let bounds = RecordsBounds::new(start, end);
775 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
776 chain_none(iter)
777 }
778 Ordering::Greater => {
780 let end = Bound::Excluded(range.y().to_byte_tuple());
782 let bounds = RecordsBounds::from_start(&self.namespace, end);
783 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
784
785 let start = Bound::Included(range.x().to_byte_tuple());
787 let bounds = RecordsBounds::to_end(&self.namespace, start);
788 let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
789
790 iter.chain(Some(iter2).into_iter().flatten())
791 }
792 };
793 Ok(iter)
794 }
795
796 #[cfg(test)]
797 fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
798 self.store.as_mut().modify(|tables| {
799 let entry = {
800 let (namespace, author, key) = id.as_byte_tuple();
801 let id = (namespace, key, author);
802 tables.records_by_key.remove(id)?;
803 let id = (namespace, author, key);
804 let value = tables.records.remove(id)?;
805 value.map(|value| into_entry(id, value.value()))
806 };
807 Ok(entry)
808 })
809 }
810
811 #[cfg(test)]
812 fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
813 let tables = self.store.as_mut().tables()?;
814 let bounds = RecordsBounds::namespace(self.namespace);
815 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
816 Ok(chain_none(iter))
817 }
818
819 fn prefixes_of(
820 &mut self,
821 id: &RecordIdentifier,
822 ) -> Result<Self::ParentIterator<'_>, Self::Error> {
823 let tables = self.store.as_mut().tables()?;
824 ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
825 }
826
827 #[cfg(test)]
828 fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
829 let tables = self.store.as_mut().tables()?;
830 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
831 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
832 Ok(chain_none(iter))
833 }
834
835 fn remove_prefix_filtered(
836 &mut self,
837 id: &RecordIdentifier,
838 predicate: impl Fn(&Record) -> bool,
839 ) -> Result<usize> {
840 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
841 self.store.as_mut().modify(|tables| {
842 let cb = |_k: RecordsId, v: RecordsValue| {
843 let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
844 let record = Record::new(hash.into(), len, timestamp);
845
846 predicate(&record)
847 };
848 let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
849 let count = iter.count();
850 Ok(count)
851 })
852 }
853}
854
855fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
856 iter: I,
857) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
858 iter.chain(None.into_iter().flatten())
859}
860
861#[derive(Debug)]
864pub struct ParentIterator {
865 inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
866}
867
868impl ParentIterator {
869 fn new(
870 tables: &Tables,
871 namespace: NamespaceId,
872 author: AuthorId,
873 key: Vec<u8>,
874 ) -> anyhow::Result<Self> {
875 let parents = parents(&tables.records, namespace, author, key.clone());
876 Ok(Self {
877 inner: parents.into_iter(),
878 })
879 }
880}
881
882fn parents(
883 table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
884 namespace: NamespaceId,
885 author: AuthorId,
886 mut key: Vec<u8>,
887) -> Vec<anyhow::Result<SignedEntry>> {
888 let mut res = Vec::new();
889
890 while !key.is_empty() {
891 let entry = get_exact(table, namespace, author, &key, false);
892 key.pop();
893 match entry {
894 Err(err) => res.push(Err(err)),
895 Ok(Some(entry)) => res.push(Ok(entry)),
896 Ok(None) => continue,
897 }
898 }
899 res.reverse();
900 res
901}
902
903impl Iterator for ParentIterator {
904 type Item = Result<SignedEntry>;
905
906 fn next(&mut self) -> Option<Self::Item> {
907 self.inner.next()
908 }
909}
910
911#[derive(derive_more::Debug)]
919pub struct ContentHashesIterator {
920 #[debug(skip)]
921 range: RecordsRange<'static>,
922}
923
924impl ContentHashesIterator {
925 pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
927 let range = RecordsRange::all_static(table)?;
928 Ok(Self { range })
929 }
930}
931
932impl Iterator for ContentHashesIterator {
933 type Item = Result<Hash>;
934
935 fn next(&mut self) -> Option<Self::Item> {
936 let v = self.range.next()?;
937 Some(v.map(|e| e.content_hash()))
938 }
939}
940
941#[derive(derive_more::Debug)]
943#[debug("LatestIterator")]
944pub struct LatestIterator<'a>(
945 redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
946);
947
948impl<'a> LatestIterator<'a> {
949 fn new(
950 latest_per_author: &'a impl ReadableTable<
951 LatestPerAuthorKey<'static>,
952 LatestPerAuthorValue<'static>,
953 >,
954 namespace: NamespaceId,
955 ) -> anyhow::Result<Self> {
956 let start = (namespace.as_bytes(), &[u8::MIN; 32]);
957 let end = (namespace.as_bytes(), &[u8::MAX; 32]);
958 let range = latest_per_author.range(start..=end)?;
959 Ok(Self(range))
960 }
961}
962
963impl Iterator for LatestIterator<'_> {
964 type Item = Result<(AuthorId, u64, Vec<u8>)>;
965
966 fn next(&mut self) -> Option<Self::Item> {
967 self.0.next_map(|key, value| {
968 let (_namespace, author) = key;
969 let (timestamp, key) = value;
970 (author.into(), timestamp, key.to_vec())
971 })
972 }
973}
974
975fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
976 let (namespace, author, key) = key;
977 let (timestamp, namespace_sig, author_sig, len, hash) = value;
978 let id = RecordIdentifier::new(namespace, author, key);
979 let record = Record::new(hash.into(), len, timestamp);
980 let entry = Entry::new(id, record);
981 let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
982 SignedEntry::new(entry_signature, entry)
983}
984
985#[cfg(all(test, feature = "fs-store"))]
986mod tests {
987 use super::*;
988 use crate::ranger::Store as _;
989
990 #[tokio::test]
991 async fn test_ranges() -> Result<()> {
992 let dbfile = tempfile::NamedTempFile::new()?;
993 let mut store = Store::persistent(dbfile.path())?;
994
995 let author = store.new_author(&mut rand::rng())?;
996 let namespace = NamespaceSecret::new(&mut rand::rng());
997 let mut replica = store.new_replica(namespace.clone())?;
998
999 let key1 = vec![255, 255];
1001 let key2 = vec![255, 255, 255];
1002 replica.hash_and_insert(&key1, &author, b"v1").await?;
1003 replica.hash_and_insert(&key2, &author, b"v2").await?;
1004 let res = store
1005 .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
1006 .collect::<Result<Vec<_>>>()?;
1007 assert_eq!(res.len(), 2);
1008 assert_eq!(
1009 res.into_iter()
1010 .map(|entry| entry.key().to_vec())
1011 .collect::<Vec<_>>(),
1012 vec![key1, key2]
1013 );
1014 Ok(())
1015 }
1016
1017 #[test]
1018 fn test_basics() -> Result<()> {
1019 let dbfile = tempfile::NamedTempFile::new()?;
1020 let mut store = Store::persistent(dbfile.path())?;
1021
1022 let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1023 assert!(authors.is_empty());
1024
1025 let author = store.new_author(&mut rand::rng())?;
1026 let namespace = NamespaceSecret::new(&mut rand::rng());
1027 let _replica = store.new_replica(namespace.clone())?;
1028 store.close_replica(namespace.id());
1029 let replica = store.load_replica_info(&namespace.id())?;
1030 assert_eq!(replica.capability.id(), namespace.id());
1031
1032 let author_back = store.get_author(&author.id())?.unwrap();
1033 assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1034
1035 let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1036 for i in 0..5 {
1037 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1038 let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1039 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1040 wrapper.entry_put(entry)?;
1041 }
1042
1043 let all: Vec<_> = wrapper.all()?.collect();
1045 assert_eq!(all.len(), 5);
1046
1047 let mut ids = Vec::new();
1049 for i in 0..5 {
1050 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1051 let entry = Entry::new(
1052 id.clone(),
1053 Record::current_from_data(format!("world-{i}-2")),
1054 );
1055 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1056 wrapper.entry_put(entry)?;
1057 ids.push(id);
1058 }
1059
1060 let entries = wrapper
1062 .store
1063 .get_many(namespace.id(), Query::all())?
1064 .collect::<Result<Vec<_>>>()?;
1065 assert_eq!(entries.len(), 5);
1066
1067 let entries = wrapper
1069 .store
1070 .get_many(namespace.id(), Query::key_prefix("hello-"))?
1071 .collect::<Result<Vec<_>>>()?;
1072 assert_eq!(entries.len(), 5);
1073
1074 for id in ids {
1076 let res = wrapper.get(&id)?;
1077 assert!(res.is_some());
1078 let out = wrapper.entry_remove(&id)?.unwrap();
1079 assert_eq!(out.entry().id(), &id);
1080 let res = wrapper.get(&id)?;
1081 assert!(res.is_none());
1082 }
1083
1084 let entries = wrapper
1086 .store
1087 .get_many(namespace.id(), Query::all())?
1088 .collect::<Result<Vec<_>>>()?;
1089 assert_eq!(entries.len(), 0);
1090
1091 Ok(())
1092 }
1093
1094 fn copy_and_modify(
1095 source: &std::path::Path,
1096 modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1097 ) -> Result<tempfile::NamedTempFile> {
1098 let dbfile = tempfile::NamedTempFile::new()?;
1099 std::fs::copy(source, dbfile.path())?;
1100 let db = Database::create(dbfile.path())?;
1101 let write_tx = db.begin_write()?;
1102 modify(&write_tx)?;
1103 write_tx.commit()?;
1104 drop(db);
1105 Ok(dbfile)
1106 }
1107
1108 #[tokio::test]
1109 #[cfg(feature = "fs-store")]
1110 async fn test_migration_001_populate_latest_table() -> Result<()> {
1111 use super::tables::LATEST_PER_AUTHOR_TABLE;
1112 let dbfile = tempfile::NamedTempFile::new()?;
1113 let namespace = NamespaceSecret::new(&mut rand::rng());
1114
1115 let expected = {
1117 let mut store = Store::persistent(dbfile.path())?;
1118 let author1 = store.new_author(&mut rand::rng())?;
1119 let author2 = store.new_author(&mut rand::rng())?;
1120 let mut replica = store.new_replica(namespace.clone())?;
1121 replica.hash_and_insert(b"k1", &author1, b"v1").await?;
1122 replica.hash_and_insert(b"k2", &author2, b"v1").await?;
1123 replica.hash_and_insert(b"k3", &author1, b"v1").await?;
1124
1125 let expected = store
1126 .get_latest_for_each_author(namespace.id())?
1127 .collect::<Result<Vec<_>>>()?;
1128 store.close_replica(namespace.id());
1130 store.flush()?;
1132 drop(store);
1133 expected
1134 };
1135 assert_eq!(expected.len(), 2);
1136
1137 let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1139 tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1140 Ok(())
1141 })?;
1142
1143 let mut store = Store::persistent(dbfile_before_migration.path())?;
1145 let actual = store
1146 .get_latest_for_each_author(namespace.id())?
1147 .collect::<Result<Vec<_>>>()?;
1148
1149 assert_eq!(expected, actual);
1150 Ok(())
1151 }
1152
1153 #[test]
1154 #[cfg(feature = "fs-store")]
1155 fn test_migration_004_populate_by_key_index() -> Result<()> {
1156 use redb::ReadableTableMetadata;
1157 let dbfile = tempfile::NamedTempFile::new()?;
1158
1159 let mut store = Store::persistent(dbfile.path())?;
1160
1161 {
1163 let tables = store.tables()?;
1164 assert_eq!(tables.records_by_key.len()?, 0);
1165 }
1166
1167 Ok(())
1169 }
1170}