use std::{
cmp::Ordering,
collections::HashSet,
iter::{Chain, Flatten},
num::NonZeroU64,
ops::Bound,
path::Path,
};
use anyhow::{anyhow, Result};
use ed25519_dalek::{SignatureError, VerifyingKey};
use iroh_base::hash::Hash;
use rand_core::CryptoRngCore;
use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable, ReadableTableMetadata};
use super::{
pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
Query,
};
use crate::{
actor::MAX_COMMIT_DELAY,
keys::Author,
ranger::{Fingerprint, Range, RangeEntry},
sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
ReplicaInfo,
};
mod bounds;
mod migrate_v1_v2;
mod migrations;
mod query;
mod ranges;
pub(crate) mod tables;
pub use self::ranges::RecordsRange;
use self::{
bounds::{ByKeyBounds, RecordsBounds},
query::QueryIterator,
ranges::RangeExt,
tables::{
LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
RecordsValue, Tables, TransactionAndTables,
},
};
#[derive(Debug)]
pub struct Store {
db: Database,
transaction: CurrentTransaction,
open_replicas: HashSet<NamespaceId>,
pubkeys: MemPublicKeyStore,
}
impl AsRef<Store> for Store {
fn as_ref(&self) -> &Store {
self
}
}
impl AsMut<Store> for Store {
fn as_mut(&mut self) -> &mut Store {
self
}
}
#[derive(derive_more::Debug, Default)]
enum CurrentTransaction {
#[default]
None,
Read(ReadOnlyTables),
Write(TransactionAndTables),
}
impl Store {
pub fn memory() -> Self {
Self::memory_impl().expect("failed to create memory store")
}
fn memory_impl() -> Result<Self> {
let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
Self::new_impl(db)
}
pub fn persistent(path: impl AsRef<Path>) -> Result<Self> {
let db = match Database::create(&path) {
Ok(db) => db,
Err(DatabaseError::UpgradeRequired(1)) => migrate_v1_v2::run(&path)?,
Err(err) => return Err(err.into()),
};
Self::new_impl(db)
}
fn new_impl(db: redb::Database) -> Result<Self> {
let write_tx = db.begin_write()?;
let _ = Tables::new(&write_tx)?;
write_tx.commit()?;
migrations::run_migrations(&db)?;
Ok(Store {
db,
transaction: Default::default(),
open_replicas: Default::default(),
pubkeys: Default::default(),
})
}
pub fn flush(&mut self) -> Result<()> {
if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
w.commit()?;
}
Ok(())
}
pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
let guard = &mut self.transaction;
let tables = match std::mem::take(guard) {
CurrentTransaction::None => {
let tx = self.db.begin_read()?;
ReadOnlyTables::new(tx)?
}
CurrentTransaction::Write(w) => {
w.commit()?;
let tx = self.db.begin_read()?;
ReadOnlyTables::new(tx)?
}
CurrentTransaction::Read(tables) => tables,
};
*guard = CurrentTransaction::Read(tables);
match &*guard {
CurrentTransaction::Read(ref tables) => Ok(tables),
_ => unreachable!(),
}
}
pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
self.flush()?;
assert!(matches!(self.transaction, CurrentTransaction::None));
let tx = self.db.begin_read()?;
let tables = ReadOnlyTables::new(tx)?;
Ok(tables)
}
fn tables(&mut self) -> Result<&Tables> {
let guard = &mut self.transaction;
let tables = match std::mem::take(guard) {
CurrentTransaction::None => {
let tx = self.db.begin_write()?;
TransactionAndTables::new(tx)?
}
CurrentTransaction::Write(w) => {
if w.since.elapsed() > MAX_COMMIT_DELAY {
tracing::debug!("committing transaction because it's too old");
w.commit()?;
let tx = self.db.begin_write()?;
TransactionAndTables::new(tx)?
} else {
w
}
}
CurrentTransaction::Read(_) => {
let tx = self.db.begin_write()?;
TransactionAndTables::new(tx)?
}
};
*guard = CurrentTransaction::Write(tables);
match guard {
CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
_ => unreachable!(),
}
}
fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
let guard = &mut self.transaction;
let tables = match std::mem::take(guard) {
CurrentTransaction::None => {
let tx = self.db.begin_write()?;
TransactionAndTables::new(tx)?
}
CurrentTransaction::Write(w) => {
if w.since.elapsed() > MAX_COMMIT_DELAY {
tracing::debug!("committing transaction because it's too old");
w.commit()?;
let tx = self.db.begin_write()?;
TransactionAndTables::new(tx)?
} else {
w
}
}
CurrentTransaction::Read(_) => {
let tx = self.db.begin_write()?;
TransactionAndTables::new(tx)?
}
};
*guard = CurrentTransaction::Write(tables);
let res = match &mut *guard {
CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
_ => unreachable!(),
};
Ok(res)
}
}
type PeersIter = std::vec::IntoIter<PeerIdBytes>;
impl Store {
pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica> {
let id = namespace.id();
self.import_namespace(namespace.into())?;
self.open_replica(&id).map_err(Into::into)
}
pub fn new_author<R: CryptoRngCore + ?Sized>(&mut self, rng: &mut R) -> Result<Author> {
let author = Author::new(rng);
self.import_author(author.clone())?;
Ok(author)
}
pub fn has_news_for_us(
&mut self,
namespace: NamespaceId,
heads: &AuthorHeads,
) -> Result<Option<NonZeroU64>> {
let our_heads = {
let latest = self.get_latest_for_each_author(namespace)?;
let mut heads = AuthorHeads::default();
for e in latest {
let (author, timestamp, _key) = e?;
heads.insert(author, timestamp);
}
heads
};
let has_news_for_us = heads.has_news_for(&our_heads);
Ok(has_news_for_us)
}
pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica, OpenError> {
let info = self.load_replica_info(namespace_id)?;
let instance = StoreInstance::new(*namespace_id, self);
Ok(Replica::new(instance, Box::new(info)))
}
pub fn load_replica_info(
&mut self,
namespace_id: &NamespaceId,
) -> Result<ReplicaInfo, OpenError> {
let tables = self.tables()?;
let info = match tables.namespaces.get(namespace_id.as_bytes()) {
Ok(Some(db_value)) => {
let (raw_kind, raw_bytes) = db_value.value();
let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
ReplicaInfo::new(namespace)
}
Ok(None) => return Err(OpenError::NotFound),
Err(err) => return Err(OpenError::Other(err.into())),
};
self.open_replicas.insert(info.capability.id());
Ok(info)
}
pub fn close_replica(&mut self, id: NamespaceId) {
self.open_replicas.remove(&id);
}
pub fn list_namespaces(
&mut self,
) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
let snapshot = self.snapshot()?;
let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
let iter = iter.map(|res| {
let capability = parse_capability(res?.1.value())?;
Ok((capability.id(), capability.kind()))
});
Ok(iter)
}
pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
let tables = self.tables()?;
let Some(author) = tables.authors.get(author_id.as_bytes())? else {
return Ok(None);
};
let author = Author::from_bytes(author.value());
Ok(Some(author))
}
pub fn import_author(&mut self, author: Author) -> Result<()> {
self.modify(|tables| {
tables
.authors
.insert(author.id().as_bytes(), &author.to_bytes())?;
Ok(())
})
}
pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
self.modify(|tables| {
tables.authors.remove(author.as_bytes())?;
Ok(())
})
}
pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
let tables = self.snapshot()?;
let iter = tables
.authors
.range::<&'static [u8; 32]>(..)?
.map(|res| match res {
Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
Err(err) => Err(err.into()),
});
Ok(iter)
}
pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
self.modify(|tables| {
let outcome = {
let (capability, outcome) = {
let existing = tables.namespaces.get(capability.id().as_bytes())?;
if let Some(existing) = existing {
let mut existing = parse_capability(existing.value())?;
let outcome = if existing.merge(capability)? {
ImportNamespaceOutcome::Upgraded
} else {
ImportNamespaceOutcome::NoChange
};
(existing, outcome)
} else {
(capability, ImportNamespaceOutcome::Inserted)
}
};
let id = capability.id().to_bytes();
let (kind, bytes) = capability.raw();
tables.namespaces.insert(&id, (kind, &bytes))?;
outcome
};
Ok(outcome)
})
}
pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
if self.open_replicas.contains(namespace) {
return Err(anyhow!("replica is not closed"));
}
self.modify(|tables| {
let bounds = RecordsBounds::namespace(*namespace);
tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
let bounds = ByKeyBounds::namespace(*namespace);
let _ = tables
.records_by_key
.retain_in(bounds.as_ref(), |_k, _v| false);
tables.namespaces.remove(namespace.as_bytes())?;
tables.namespace_peers.remove_all(namespace.as_bytes())?;
tables.download_policy.remove(namespace.as_bytes())?;
Ok(())
})
}
pub fn get_many(
&mut self,
namespace: NamespaceId,
query: impl Into<Query>,
) -> Result<QueryIterator> {
let tables = self.snapshot_owned()?;
QueryIterator::new(tables, namespace, query.into())
}
pub fn get_exact(
&mut self,
namespace: NamespaceId,
author: AuthorId,
key: impl AsRef<[u8]>,
include_empty: bool,
) -> Result<Option<SignedEntry>> {
get_exact(
&self.tables()?.records,
namespace,
author,
key,
include_empty,
)
}
pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
let tables = self.snapshot_owned()?;
ContentHashesIterator::all(&tables.records)
}
pub fn get_latest_for_each_author(&mut self, namespace: NamespaceId) -> Result<LatestIterator> {
LatestIterator::new(&self.tables()?.latest_per_author, namespace)
}
pub fn register_useful_peer(
&mut self,
namespace: NamespaceId,
peer: crate::PeerIdBytes,
) -> Result<()> {
let peer = &peer;
let namespace = namespace.as_bytes();
let nanos = std::time::UNIX_EPOCH
.elapsed()
.map(|duration| duration.as_nanos() as u64)?;
self.modify(|tables| {
anyhow::ensure!(
tables.namespaces.get(namespace)?.is_some(),
"document not created"
);
let mut namespace_peers = tables.namespace_peers.get(namespace)?;
let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
let (oldest_nanos, &oldest_peer) = guard.value();
(oldest_nanos, oldest_peer)
});
match maybe_oldest {
None => {
drop(namespace_peers);
tables.namespace_peers.insert(namespace, (nanos, peer))?;
}
Some((oldest_nanos, oldest_peer)) => {
let oldest_peer = &oldest_peer;
if oldest_peer == peer {
drop(namespace_peers);
tables
.namespace_peers
.remove(namespace, (oldest_nanos, oldest_peer))?;
tables.namespace_peers.insert(namespace, (nanos, peer))?;
} else {
let mut len = 1;
let mut prev_peer_nanos = None;
for result in namespace_peers {
len += 1;
let guard = result?;
let (peer_nanos, peer_bytes) = guard.value();
if prev_peer_nanos.is_none() && peer_bytes == peer {
prev_peer_nanos = Some(peer_nanos)
}
}
match prev_peer_nanos {
Some(prev_nanos) => {
tables
.namespace_peers
.remove(namespace, (prev_nanos, peer))?;
tables.namespace_peers.insert(namespace, (nanos, peer))?;
}
None => {
tables.namespace_peers.insert(namespace, (nanos, peer))?;
len += 1;
if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
tables
.namespace_peers
.remove(namespace, (oldest_nanos, oldest_peer))?;
}
}
}
}
}
}
Ok(())
})
}
pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
let tables = self.tables()?;
let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
let (_nanos, &peer) = result?.value();
peers.push(peer);
}
if peers.is_empty() {
Ok(None)
} else {
Ok(Some(peers.into_iter()))
}
}
pub fn set_download_policy(
&mut self,
namespace: &NamespaceId,
policy: DownloadPolicy,
) -> Result<()> {
self.modify(|tables| {
let namespace = namespace.as_bytes();
anyhow::ensure!(
tables.namespaces.get(&namespace)?.is_some(),
"document not created"
);
let value = postcard::to_stdvec(&policy)?;
tables.download_policy.insert(namespace, value.as_slice())?;
Ok(())
})
}
pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
let tables = self.tables()?;
let value = tables.download_policy.get(namespace.as_bytes())?;
Ok(match value {
None => DownloadPolicy::default(),
Some(value) => postcard::from_bytes(value.value())?,
})
}
}
impl PublicKeyStore for Store {
fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
self.pubkeys.public_key(id)
}
}
fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
Capability::from_raw(raw_kind, raw_bytes)
}
fn get_exact(
record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
namespace: NamespaceId,
author: AuthorId,
key: impl AsRef<[u8]>,
include_empty: bool,
) -> Result<Option<SignedEntry>> {
let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
let record = record_table.get(id)?;
Ok(record
.map(|r| into_entry(id, r.value()))
.filter(|entry| include_empty || !entry.is_empty()))
}
#[derive(Debug)]
pub struct StoreInstance<'a> {
namespace: NamespaceId,
pub(crate) store: &'a mut Store,
}
impl<'a> StoreInstance<'a> {
pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
StoreInstance { namespace, store }
}
}
impl<'a> PublicKeyStore for StoreInstance<'a> {
fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
self.store.public_key(id)
}
}
impl<'a> super::DownloadPolicyStore for StoreInstance<'a> {
fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
self.store.get_download_policy(namespace)
}
}
impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
type Error = anyhow::Error;
type RangeIterator<'x>
= Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
where
'a: 'x;
type ParentIterator<'x>
= ParentIterator
where
'a: 'x;
fn get_first(&mut self) -> Result<RecordIdentifier> {
let tables = self.store.as_mut().tables()?;
let bounds = RecordsBounds::namespace(self.namespace);
let mut records = tables.records.range(bounds.as_ref())?;
let Some(record) = records.next() else {
return Ok(RecordIdentifier::default());
};
let (compound_key, _value) = record?;
let (namespace_id, author_id, key) = compound_key.value();
let id = RecordIdentifier::new(namespace_id, author_id, key);
Ok(id)
}
fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
self.store
.as_mut()
.get_exact(id.namespace(), id.author(), id.key(), true)
}
fn len(&mut self) -> Result<usize> {
let tables = self.store.as_mut().tables()?;
let bounds = RecordsBounds::namespace(self.namespace);
let records = tables.records.range(bounds.as_ref())?;
Ok(records.count())
}
fn is_empty(&mut self) -> Result<bool> {
let tables = self.store.as_mut().tables()?;
Ok(tables.records.is_empty()?)
}
fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
let elements = self.get_range(range.clone())?;
let mut fp = Fingerprint::empty();
for el in elements {
let el = el?;
fp ^= el.as_fingerprint();
}
Ok(fp)
}
fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
let id = e.id();
self.store.as_mut().modify(|tables| {
let key = (
&id.namespace().to_bytes(),
&id.author().to_bytes(),
id.key(),
);
let hash = e.content_hash(); let value = (
e.timestamp(),
&e.signature().namespace().to_bytes(),
&e.signature().author().to_bytes(),
e.content_len(),
hash.as_bytes(),
);
tables.records.insert(key, value)?;
let key = (
&id.namespace().to_bytes(),
id.key(),
&id.author().to_bytes(),
);
tables.records_by_key.insert(key, ())?;
let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
let value = (e.timestamp(), e.id().key());
tables.latest_per_author.insert(key, value)?;
Ok(())
})
}
fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
let tables = self.store.as_mut().tables()?;
let iter = match range.x().cmp(range.y()) {
Ordering::Equal => {
let bounds = RecordsBounds::namespace(self.namespace);
let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
chain_none(iter)
}
Ordering::Less => {
let start = Bound::Included(range.x().to_byte_tuple());
let end = Bound::Excluded(range.y().to_byte_tuple());
let bounds = RecordsBounds::new(start, end);
let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
chain_none(iter)
}
Ordering::Greater => {
let end = Bound::Excluded(range.y().to_byte_tuple());
let bounds = RecordsBounds::from_start(&self.namespace, end);
let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
let start = Bound::Included(range.x().to_byte_tuple());
let bounds = RecordsBounds::to_end(&self.namespace, start);
let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
iter.chain(Some(iter2).into_iter().flatten())
}
};
Ok(iter)
}
fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
self.store.as_mut().modify(|tables| {
let entry = {
let (namespace, author, key) = id.as_byte_tuple();
let id = (namespace, key, author);
tables.records_by_key.remove(id)?;
let id = (namespace, author, key);
let value = tables.records.remove(id)?;
value.map(|value| into_entry(id, value.value()))
};
Ok(entry)
})
}
fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
let tables = self.store.as_mut().tables()?;
let bounds = RecordsBounds::namespace(self.namespace);
let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
Ok(chain_none(iter))
}
fn prefixes_of(
&mut self,
id: &RecordIdentifier,
) -> Result<Self::ParentIterator<'_>, Self::Error> {
let tables = self.store.as_mut().tables()?;
ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
}
fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
let tables = self.store.as_mut().tables()?;
let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
Ok(chain_none(iter))
}
fn remove_prefix_filtered(
&mut self,
id: &RecordIdentifier,
predicate: impl Fn(&Record) -> bool,
) -> Result<usize> {
let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
self.store.as_mut().modify(|tables| {
let cb = |_k: RecordsId, v: RecordsValue| {
let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
let record = Record::new(hash.into(), len, timestamp);
predicate(&record)
};
let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
let count = iter.count();
Ok(count)
})
}
}
fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
iter: I,
) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
iter.chain(None.into_iter().flatten())
}
#[derive(Debug)]
pub struct ParentIterator {
inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
}
impl ParentIterator {
fn new(
tables: &Tables,
namespace: NamespaceId,
author: AuthorId,
key: Vec<u8>,
) -> anyhow::Result<Self> {
let parents = parents(&tables.records, namespace, author, key.clone());
Ok(Self {
inner: parents.into_iter(),
})
}
}
fn parents(
table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
namespace: NamespaceId,
author: AuthorId,
mut key: Vec<u8>,
) -> Vec<anyhow::Result<SignedEntry>> {
let mut res = Vec::new();
while !key.is_empty() {
let entry = get_exact(table, namespace, author, &key, false);
key.pop();
match entry {
Err(err) => res.push(Err(err)),
Ok(Some(entry)) => res.push(Ok(entry)),
Ok(None) => continue,
}
}
res.reverse();
res
}
impl Iterator for ParentIterator {
type Item = Result<SignedEntry>;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}
#[derive(derive_more::Debug)]
pub struct ContentHashesIterator {
#[debug(skip)]
range: RecordsRange<'static>,
}
impl ContentHashesIterator {
pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
let range = RecordsRange::all_static(table)?;
Ok(Self { range })
}
}
impl Iterator for ContentHashesIterator {
type Item = Result<Hash>;
fn next(&mut self) -> Option<Self::Item> {
let v = self.range.next()?;
Some(v.map(|e| e.content_hash()))
}
}
#[derive(derive_more::Debug)]
#[debug("LatestIterator")]
pub struct LatestIterator<'a>(
redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
);
impl<'a> LatestIterator<'a> {
fn new(
latest_per_author: &'a impl ReadableTable<
LatestPerAuthorKey<'static>,
LatestPerAuthorValue<'static>,
>,
namespace: NamespaceId,
) -> anyhow::Result<Self> {
let start = (namespace.as_bytes(), &[u8::MIN; 32]);
let end = (namespace.as_bytes(), &[u8::MAX; 32]);
let range = latest_per_author.range(start..=end)?;
Ok(Self(range))
}
}
impl<'a> Iterator for LatestIterator<'a> {
type Item = Result<(AuthorId, u64, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
self.0.next_map(|key, value| {
let (_namespace, author) = key;
let (timestamp, key) = value;
(author.into(), timestamp, key.to_vec())
})
}
}
fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
let (namespace, author, key) = key;
let (timestamp, namespace_sig, author_sig, len, hash) = value;
let id = RecordIdentifier::new(namespace, author, key);
let record = Record::new(hash.into(), len, timestamp);
let entry = Entry::new(id, record);
let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
SignedEntry::new(entry_signature, entry)
}
#[cfg(test)]
mod tests {
use super::{tables::LATEST_PER_AUTHOR_TABLE, *};
use crate::ranger::Store as _;
#[test]
fn test_ranges() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let mut store = Store::persistent(dbfile.path())?;
let author = store.new_author(&mut rand::thread_rng())?;
let namespace = NamespaceSecret::new(&mut rand::thread_rng());
let mut replica = store.new_replica(namespace.clone())?;
let key1 = vec![255, 255];
let key2 = vec![255, 255, 255];
replica.hash_and_insert(&key1, &author, b"v1")?;
replica.hash_and_insert(&key2, &author, b"v2")?;
let res = store
.get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
.collect::<Result<Vec<_>>>()?;
assert_eq!(res.len(), 2);
assert_eq!(
res.into_iter()
.map(|entry| entry.key().to_vec())
.collect::<Vec<_>>(),
vec![key1, key2]
);
Ok(())
}
#[test]
fn test_basics() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let mut store = Store::persistent(dbfile.path())?;
let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
assert!(authors.is_empty());
let author = store.new_author(&mut rand::thread_rng())?;
let namespace = NamespaceSecret::new(&mut rand::thread_rng());
let _replica = store.new_replica(namespace.clone())?;
store.close_replica(namespace.id());
let replica = store.load_replica_info(&namespace.id())?;
assert_eq!(replica.capability.id(), namespace.id());
let author_back = store.get_author(&author.id())?.unwrap();
assert_eq!(author.to_bytes(), author_back.to_bytes(),);
let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
for i in 0..5 {
let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
let entry = SignedEntry::from_entry(entry, &namespace, &author);
wrapper.entry_put(entry)?;
}
let all: Vec<_> = wrapper.all()?.collect();
assert_eq!(all.len(), 5);
let mut ids = Vec::new();
for i in 0..5 {
let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
let entry = Entry::new(
id.clone(),
Record::current_from_data(format!("world-{i}-2")),
);
let entry = SignedEntry::from_entry(entry, &namespace, &author);
wrapper.entry_put(entry)?;
ids.push(id);
}
let entries = wrapper
.store
.get_many(namespace.id(), Query::all())?
.collect::<Result<Vec<_>>>()?;
assert_eq!(entries.len(), 5);
let entries = wrapper
.store
.get_many(namespace.id(), Query::key_prefix("hello-"))?
.collect::<Result<Vec<_>>>()?;
assert_eq!(entries.len(), 5);
for id in ids {
let res = wrapper.get(&id)?;
assert!(res.is_some());
let out = wrapper.entry_remove(&id)?.unwrap();
assert_eq!(out.entry().id(), &id);
let res = wrapper.get(&id)?;
assert!(res.is_none());
}
let entries = wrapper
.store
.get_many(namespace.id(), Query::all())?
.collect::<Result<Vec<_>>>()?;
assert_eq!(entries.len(), 0);
Ok(())
}
fn copy_and_modify(
source: &Path,
modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
) -> Result<tempfile::NamedTempFile> {
let dbfile = tempfile::NamedTempFile::new()?;
std::fs::copy(source, dbfile.path())?;
let db = Database::create(dbfile.path())?;
let write_tx = db.begin_write()?;
modify(&write_tx)?;
write_tx.commit()?;
drop(db);
Ok(dbfile)
}
#[test]
fn test_migration_001_populate_latest_table() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let namespace = NamespaceSecret::new(&mut rand::thread_rng());
let expected = {
let mut store = Store::persistent(dbfile.path())?;
let author1 = store.new_author(&mut rand::thread_rng())?;
let author2 = store.new_author(&mut rand::thread_rng())?;
let mut replica = store.new_replica(namespace.clone())?;
replica.hash_and_insert(b"k1", &author1, b"v1")?;
replica.hash_and_insert(b"k2", &author2, b"v1")?;
replica.hash_and_insert(b"k3", &author1, b"v1")?;
let expected = store
.get_latest_for_each_author(namespace.id())?
.collect::<Result<Vec<_>>>()?;
store.close_replica(namespace.id());
store.flush()?;
drop(store);
expected
};
assert_eq!(expected.len(), 2);
let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
Ok(())
})?;
let mut store = Store::persistent(dbfile_before_migration.path())?;
let actual = store
.get_latest_for_each_author(namespace.id())?
.collect::<Result<Vec<_>>>()?;
assert_eq!(expected, actual);
Ok(())
}
#[test]
fn test_migration_004_populate_by_key_index() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let mut store = Store::persistent(dbfile.path())?;
{
let tables = store.tables()?;
assert_eq!(tables.records_by_key.len()?, 0);
}
Ok(())
}
}