use std::{
cmp::Ordering,
fmt::Debug,
ops::{Deref, DerefMut},
sync::Arc,
time::{Duration, SystemTime},
};
use bytes::{Bytes, BytesMut};
use ed25519_dalek::{Signature, SignatureError};
use iroh_blobs::Hash;
#[cfg(feature = "metrics")]
use iroh_metrics::{inc, inc_by};
use serde::{Deserialize, Serialize};
pub use crate::heads::AuthorHeads;
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
use crate::{
keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
};
pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
pub type PeerIdBytes = [u8; 32];
pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
pub type ContentStatusCallback = Arc<dyn Fn(Hash) -> ContentStatus + Send + Sync + 'static>;
#[derive(Debug, Clone)]
pub enum Event {
LocalInsert {
namespace: NamespaceId,
entry: SignedEntry,
},
RemoteInsert {
namespace: NamespaceId,
entry: SignedEntry,
from: PeerIdBytes,
should_download: bool,
remote_content_status: ContentStatus,
},
}
#[derive(Debug, Clone)]
pub enum InsertOrigin {
Local,
Sync {
from: PeerIdBytes,
remote_content_status: ContentStatus,
},
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum ContentStatus {
Complete,
Incomplete,
Missing,
}
#[derive(Debug, Clone, Default)]
pub struct SyncOutcome {
pub heads_received: AuthorHeads,
pub num_recv: usize,
pub num_sent: usize,
}
fn get_as_ptr<T>(value: &T) -> Option<usize> {
use std::mem;
if mem::size_of::<T>() == std::mem::size_of::<usize>()
&& mem::align_of::<T>() == mem::align_of::<usize>()
{
unsafe { Some(mem::transmute_copy(value)) }
} else {
None
}
}
fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
}
#[derive(Debug, Default)]
struct Subscribers(Vec<async_channel::Sender<Event>>);
impl Subscribers {
pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
self.0.push(sender)
}
pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
self.0.retain(|s| !same_channel(s, sender));
}
pub fn send(&mut self, event: Event) {
self.0
.retain(|sender| sender.send_blocking(event.clone()).is_ok())
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn send_with(&mut self, f: impl FnOnce() -> Event) {
if !self.0.is_empty() {
self.send(f())
}
}
}
#[derive(
Debug,
Clone,
Copy,
Serialize,
Deserialize,
num_enum::IntoPrimitive,
num_enum::TryFromPrimitive,
strum::Display,
)]
#[repr(u8)]
#[strum(serialize_all = "snake_case")]
pub enum CapabilityKind {
Write = 1,
Read = 2,
}
#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
pub enum Capability {
Write(NamespaceSecret),
Read(NamespaceId),
}
impl Capability {
pub fn id(&self) -> NamespaceId {
match self {
Capability::Write(secret) => secret.id(),
Capability::Read(id) => *id,
}
}
pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
match self {
Capability::Write(secret) => Ok(secret),
Capability::Read(_) => Err(ReadOnly),
}
}
pub fn kind(&self) -> CapabilityKind {
match self {
Capability::Write(_) => CapabilityKind::Write,
Capability::Read(_) => CapabilityKind::Read,
}
}
pub fn raw(&self) -> (u8, [u8; 32]) {
let capability_repr: u8 = self.kind().into();
let bytes = match self {
Capability::Write(secret) => secret.to_bytes(),
Capability::Read(id) => id.to_bytes(),
};
(capability_repr, bytes)
}
pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
let kind: CapabilityKind = kind.try_into()?;
let capability = match kind {
CapabilityKind::Write => {
let secret = NamespaceSecret::from_bytes(bytes);
Capability::Write(secret)
}
CapabilityKind::Read => {
let id = NamespaceId::from(bytes);
Capability::Read(id)
}
};
Ok(capability)
}
pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
if other.id() != self.id() {
return Err(CapabilityError::NamespaceMismatch);
}
if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
let _ = std::mem::replace(self, other);
Ok(true)
} else {
Ok(false)
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum CapabilityError {
#[error("Namespaces are not the same")]
NamespaceMismatch,
}
#[derive(derive_more::Debug)]
pub struct ReplicaInfo {
pub(crate) capability: Capability,
subscribers: Subscribers,
#[debug("ContentStatusCallback")]
content_status_cb: Option<ContentStatusCallback>,
closed: bool,
}
impl ReplicaInfo {
pub fn new(capability: Capability) -> Self {
Self {
capability,
subscribers: Default::default(),
content_status_cb: None,
closed: false,
}
}
pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
self.subscribers.subscribe(sender)
}
pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
self.subscribers.unsubscribe(sender)
}
pub fn subscribers_count(&self) -> usize {
self.subscribers.len()
}
pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
if self.content_status_cb.is_some() {
false
} else {
self.content_status_cb = Some(cb);
true
}
}
fn ensure_open(&self) -> Result<(), InsertError> {
if self.closed() {
Err(InsertError::Closed)
} else {
Ok(())
}
}
pub fn closed(&self) -> bool {
self.closed
}
pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
self.capability.merge(capability)
}
}
#[derive(derive_more::Debug)]
pub struct Replica<'a, I = Box<ReplicaInfo>> {
pub(crate) store: StoreInstance<'a>,
pub(crate) info: I,
}
impl<'a, I> Replica<'a, I>
where
I: Deref<Target = ReplicaInfo> + DerefMut,
{
pub fn new(store: StoreInstance<'a>, info: I) -> Self {
Replica { info, store }
}
pub fn insert(
&mut self,
key: impl AsRef<[u8]>,
author: &Author,
hash: Hash,
len: u64,
) -> Result<usize, InsertError> {
if len == 0 || hash == Hash::EMPTY {
return Err(InsertError::EntryIsEmpty);
}
self.info.ensure_open()?;
let id = RecordIdentifier::new(self.id(), author.id(), key);
let record = Record::new_current(hash, len);
let entry = Entry::new(id, record);
let secret = self.secret_key()?;
let signed_entry = entry.sign(secret, author);
self.insert_entry(signed_entry, InsertOrigin::Local)
}
pub fn delete_prefix(
&mut self,
prefix: impl AsRef<[u8]>,
author: &Author,
) -> Result<usize, InsertError> {
self.info.ensure_open()?;
let id = RecordIdentifier::new(self.id(), author.id(), prefix);
let entry = Entry::new_empty(id);
let signed_entry = entry.sign(self.secret_key()?, author);
self.insert_entry(signed_entry, InsertOrigin::Local)
}
pub fn insert_remote_entry(
&mut self,
entry: SignedEntry,
received_from: PeerIdBytes,
content_status: ContentStatus,
) -> Result<usize, InsertError> {
self.info.ensure_open()?;
entry.validate_empty()?;
let origin = InsertOrigin::Sync {
from: received_from,
remote_content_status: content_status,
};
self.insert_entry(entry, origin)
}
fn insert_entry(
&mut self,
entry: SignedEntry,
origin: InsertOrigin,
) -> Result<usize, InsertError> {
let namespace = self.id();
#[cfg(feature = "metrics")]
let len = entry.content_len();
let store = &self.store;
validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
let removed_count = match outcome {
InsertOutcome::Inserted { removed } => removed,
InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
};
let insert_event = match origin {
InsertOrigin::Local => {
#[cfg(feature = "metrics")]
{
inc!(Metrics, new_entries_local);
inc_by!(Metrics, new_entries_local_size, len);
}
Event::LocalInsert { namespace, entry }
}
InsertOrigin::Sync {
from,
remote_content_status,
} => {
#[cfg(feature = "metrics")]
{
inc!(Metrics, new_entries_remote);
inc_by!(Metrics, new_entries_remote_size, len);
}
let download_policy = self
.store
.get_download_policy(&self.id())
.unwrap_or_default();
let should_download = download_policy.matches(entry.entry());
Event::RemoteInsert {
namespace,
entry,
from,
should_download,
remote_content_status,
}
}
};
self.info.subscribers.send(insert_event);
Ok(removed_count)
}
pub fn hash_and_insert(
&mut self,
key: impl AsRef<[u8]>,
author: &Author,
data: impl AsRef<[u8]>,
) -> Result<Hash, InsertError> {
self.info.ensure_open()?;
let len = data.as_ref().len() as u64;
let hash = Hash::new(data);
self.insert(key, author, hash, len)?;
Ok(hash)
}
pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
RecordIdentifier::new(self.info.capability.id(), author.id(), key)
}
pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
self.info.ensure_open().map_err(anyhow::Error::from)?;
self.store.initial_message()
}
pub fn sync_process_message(
&mut self,
message: crate::ranger::Message<SignedEntry>,
from_peer: PeerIdBytes,
state: &mut SyncOutcome,
) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
self.info.ensure_open()?;
let my_namespace = self.id();
let now = system_time_now();
state.num_recv += message.value_count();
for (entry, _content_status) in message.values() {
state
.heads_received
.insert(entry.author(), entry.timestamp());
}
let cb = self.info.content_status_cb.clone();
let download_policy = self
.store
.get_download_policy(&my_namespace)
.unwrap_or_default();
let reply = self.store.process_message(
&Default::default(),
message,
|store, entry, content_status| {
let origin = InsertOrigin::Sync {
from: from_peer,
remote_content_status: content_status,
};
validate_entry(now, store, my_namespace, entry, &origin).is_ok()
},
|_store, entry, content_status| {
self.info.subscribers.send_with(|| {
let should_download = download_policy.matches(entry.entry());
Event::RemoteInsert {
from: from_peer,
namespace: my_namespace,
entry: entry.clone(),
should_download,
remote_content_status: content_status,
}
})
},
|_store, entry| {
if let Some(cb) = cb.as_ref() {
cb(entry.content_hash())
} else {
ContentStatus::Missing
}
},
)?;
if let Some(ref reply) = reply {
state.num_sent += reply.value_count();
}
Ok(reply)
}
pub fn id(&self) -> NamespaceId {
self.info.capability.id()
}
pub fn capability(&self) -> &Capability {
&self.info.capability
}
pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
self.info.capability.secret_key()
}
}
#[derive(Debug, thiserror::Error)]
#[error("Replica allows read access only.")]
pub struct ReadOnly;
fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
now: u64,
store: &S,
expected_namespace: NamespaceId,
entry: &SignedEntry,
origin: &InsertOrigin,
) -> Result<(), ValidationFailure> {
if entry.namespace() != expected_namespace {
return Err(ValidationFailure::InvalidNamespace);
}
if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
return Err(ValidationFailure::BadSignature);
}
if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
return Err(ValidationFailure::TooFarInTheFuture);
}
Ok(())
}
#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
pub enum InsertError {
#[error("storage error")]
Store(anyhow::Error),
#[error("validation failure")]
Validation(#[from] ValidationFailure),
#[error("A newer entry exists for either this entry's key or a prefix of the key.")]
NewerEntryExists,
#[error("Attempted to insert an empty entry")]
EntryIsEmpty,
#[error("Attempted to insert to read only replica")]
#[from(ReadOnly)]
ReadOnly,
#[error("replica is closed")]
Closed,
}
#[derive(thiserror::Error, Debug)]
pub enum ValidationFailure {
#[error("Entry namespace does not match the current replica")]
InvalidNamespace,
#[error("Entry signature is invalid")]
BadSignature,
#[error("Entry timestamp is too far in the future.")]
TooFarInTheFuture,
#[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
InvalidEmptyEntry,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SignedEntry {
signature: EntrySignature,
entry: Entry,
}
impl From<SignedEntry> for Entry {
fn from(value: SignedEntry) -> Self {
value.entry
}
}
impl PartialOrd for SignedEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SignedEntry {
fn cmp(&self, other: &Self) -> Ordering {
self.entry.cmp(&other.entry)
}
}
impl PartialOrd for Entry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Entry {
fn cmp(&self, other: &Self) -> Ordering {
self.id
.cmp(&other.id)
.then_with(|| self.record.cmp(&other.record))
}
}
impl SignedEntry {
pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
SignedEntry { signature, entry }
}
pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
let signature = EntrySignature::from_entry(&entry, namespace, author);
SignedEntry { signature, entry }
}
pub fn from_parts(
namespace: &NamespaceSecret,
author: &Author,
key: impl AsRef<[u8]>,
record: Record,
) -> Self {
let id = RecordIdentifier::new(namespace.id(), author.id(), key);
let entry = Entry::new(id, record);
Self::from_entry(entry, namespace, author)
}
pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
self.signature.verify(
&self.entry,
&self.entry.namespace().public_key(store)?,
&self.entry.author().public_key(store)?,
)
}
pub fn signature(&self) -> &EntrySignature {
&self.signature
}
pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
self.entry().validate_empty()
}
pub fn entry(&self) -> &Entry {
&self.entry
}
pub fn content_hash(&self) -> Hash {
self.entry().content_hash()
}
pub fn content_len(&self) -> u64 {
self.entry().content_len()
}
pub fn author_bytes(&self) -> AuthorId {
self.entry().id().author()
}
pub fn key(&self) -> &[u8] {
self.entry().id().key()
}
pub fn timestamp(&self) -> u64 {
self.entry().timestamp()
}
}
impl RangeEntry for SignedEntry {
type Key = RecordIdentifier;
type Value = Record;
fn key(&self) -> &Self::Key {
&self.entry.id
}
fn value(&self) -> &Self::Value {
&self.entry.record
}
fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
let mut hasher = blake3::Hasher::new();
hasher.update(self.namespace().as_ref());
hasher.update(self.author_bytes().as_ref());
hasher.update(self.key());
hasher.update(&self.timestamp().to_be_bytes());
hasher.update(self.content_hash().as_bytes());
Fingerprint(hasher.finalize().into())
}
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EntrySignature {
author_signature: Signature,
namespace_signature: Signature,
}
impl Debug for EntrySignature {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EntrySignature")
.field(
"namespace_signature",
&hex::encode(self.namespace_signature.to_bytes()),
)
.field(
"author_signature",
&hex::encode(self.author_signature.to_bytes()),
)
.finish()
}
}
impl EntrySignature {
pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
let bytes = entry.to_vec();
let namespace_signature = namespace.sign(&bytes);
let author_signature = author.sign(&bytes);
EntrySignature {
author_signature,
namespace_signature,
}
}
pub fn verify(
&self,
entry: &Entry,
namespace: &NamespacePublicKey,
author: &AuthorPublicKey,
) -> Result<(), SignatureError> {
let bytes = entry.to_vec();
namespace.verify(&bytes, &self.namespace_signature)?;
author.verify(&bytes, &self.author_signature)?;
Ok(())
}
pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
let namespace_signature = Signature::from_bytes(namespace_sig);
let author_signature = Signature::from_bytes(author_sig);
EntrySignature {
author_signature,
namespace_signature,
}
}
pub(crate) fn author(&self) -> &Signature {
&self.author_signature
}
pub(crate) fn namespace(&self) -> &Signature {
&self.namespace_signature
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Entry {
id: RecordIdentifier,
record: Record,
}
impl Entry {
pub fn new(id: RecordIdentifier, record: Record) -> Self {
Entry { id, record }
}
pub fn new_empty(id: RecordIdentifier) -> Self {
Entry {
id,
record: Record::empty_current(),
}
}
pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
(true, true) => Ok(()),
(false, false) => Ok(()),
(true, false) => Err(ValidationFailure::InvalidEmptyEntry),
(false, true) => Err(ValidationFailure::InvalidEmptyEntry),
}
}
pub fn id(&self) -> &RecordIdentifier {
&self.id
}
pub fn namespace(&self) -> NamespaceId {
self.id.namespace()
}
pub fn author(&self) -> AuthorId {
self.id.author()
}
pub fn key(&self) -> &[u8] {
self.id.key()
}
pub fn record(&self) -> &Record {
&self.record
}
pub fn content_hash(&self) -> Hash {
self.record.hash
}
pub fn content_len(&self) -> u64 {
self.record.len
}
pub fn timestamp(&self) -> u64 {
self.record.timestamp
}
pub fn encode(&self, out: &mut Vec<u8>) {
self.id.encode(out);
self.record.encode(out);
}
pub fn to_vec(&self) -> Vec<u8> {
let mut out = Vec::new();
self.encode(&mut out);
out
}
pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
SignedEntry::from_entry(self, namespace, author)
}
}
const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct RecordIdentifier(Bytes);
impl Default for RecordIdentifier {
fn default() -> Self {
Self::new(NamespaceId::default(), AuthorId::default(), b"")
}
}
impl Debug for RecordIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RecordIdentifier")
.field("namespace", &self.namespace())
.field("author", &self.author())
.field("key", &std::string::String::from_utf8_lossy(self.key()))
.finish()
}
}
impl RangeKey for RecordIdentifier {
#[cfg(test)]
fn is_prefix_of(&self, other: &Self) -> bool {
other.as_ref().starts_with(self.as_ref())
}
}
fn system_time_now() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("time drift")
.as_micros() as u64
}
impl RecordIdentifier {
pub fn new(
namespace: impl Into<NamespaceId>,
author: impl Into<AuthorId>,
key: impl AsRef<[u8]>,
) -> Self {
let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
bytes.extend_from_slice(namespace.into().as_bytes());
bytes.extend_from_slice(author.into().as_bytes());
bytes.extend_from_slice(key.as_ref());
Self(bytes.freeze())
}
pub(crate) fn encode(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&self.0);
}
pub fn as_bytes(&self) -> Bytes {
self.0.clone()
}
pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
(
self.0[NAMESPACE_BYTES].try_into().unwrap(),
self.0[AUTHOR_BYTES].try_into().unwrap(),
&self.0[KEY_BYTES],
)
}
pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
(
self.0[NAMESPACE_BYTES].try_into().unwrap(),
self.0[AUTHOR_BYTES].try_into().unwrap(),
self.0.slice(KEY_BYTES),
)
}
pub fn key(&self) -> &[u8] {
&self.0[KEY_BYTES]
}
pub fn key_bytes(&self) -> Bytes {
self.0.slice(KEY_BYTES)
}
pub fn namespace(&self) -> NamespaceId {
let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
value.into()
}
pub fn author(&self) -> AuthorId {
let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
value.into()
}
}
impl AsRef<[u8]> for RecordIdentifier {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl Deref for SignedEntry {
type Target = Entry;
fn deref(&self) -> &Self::Target {
&self.entry
}
}
impl Deref for Entry {
type Target = Record;
fn deref(&self) -> &Self::Target {
&self.record
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Record {
len: u64,
hash: Hash,
timestamp: u64,
}
impl RangeValue for Record {}
impl Ord for Record {
fn cmp(&self, other: &Self) -> Ordering {
self.timestamp
.cmp(&other.timestamp)
.then_with(|| self.hash.cmp(&other.hash))
}
}
impl PartialOrd for Record {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Record {
pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
debug_assert!(
len != 0 || hash == Hash::EMPTY,
"if `len` is 0 then `hash` must be the hash of the empty byte range"
);
Record {
hash,
len,
timestamp,
}
}
pub fn empty(timestamp: u64) -> Self {
Self::new(Hash::EMPTY, 0, timestamp)
}
pub fn empty_current() -> Self {
Self::new_current(Hash::EMPTY, 0)
}
pub fn is_empty(&self) -> bool {
self.hash == Hash::EMPTY
}
pub fn new_current(hash: Hash, len: u64) -> Self {
let timestamp = system_time_now();
Self::new(hash, len, timestamp)
}
pub fn content_len(&self) -> u64 {
self.len
}
pub fn content_hash(&self) -> Hash {
self.hash
}
pub fn timestamp(&self) -> u64 {
self.timestamp
}
#[cfg(test)]
pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
let len = data.as_ref().len() as u64;
let hash = Hash::new(data);
Self::new_current(hash, len)
}
#[cfg(test)]
pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
let len = data.as_ref().len() as u64;
let hash = Hash::new(data);
Self::new(hash, len, timestamp)
}
pub(crate) fn encode(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&self.len.to_be_bytes());
out.extend_from_slice(self.hash.as_ref());
out.extend_from_slice(&self.timestamp.to_be_bytes())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use anyhow::Result;
use rand_core::SeedableRng;
use super::*;
use crate::{
actor::SyncHandle,
ranger::{Range, Store as _},
store::{OpenError, Query, SortBy, SortDirection, Store},
};
#[test]
fn test_basics_memory() -> Result<()> {
let store = store::Store::memory();
test_basics(store)?;
Ok(())
}
#[test]
fn test_basics_fs() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let store = store::fs::Store::persistent(dbfile.path())?;
test_basics(store)?;
Ok(())
}
fn test_basics(mut store: Store) -> Result<()> {
let mut rng = rand::thread_rng();
let alice = Author::new(&mut rng);
let bob = Author::new(&mut rng);
let myspace = NamespaceSecret::new(&mut rng);
let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
let record = Record::current_from_data(b"this is my cool data");
let entry = Entry::new(record_id, record);
let signed_entry = entry.sign(&myspace, &alice);
signed_entry.verify(&()).expect("failed to verify");
let mut my_replica = store.new_replica(myspace.clone())?;
for i in 0..10 {
my_replica.hash_and_insert(
format!("/{i}"),
&alice,
format!("{i}: hello from alice"),
)?;
}
for i in 0..10 {
let res = store
.get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
.unwrap();
let len = format!("{i}: hello from alice").as_bytes().len() as u64;
assert_eq!(res.entry().record().content_len(), len);
res.verify(&())?;
}
let mut my_replica = store.new_replica(myspace.clone())?;
my_replica.hash_and_insert("/cool/path", &alice, "round 1")?;
let _entry = store
.get_exact(myspace.id(), alice.id(), "/cool/path", false)?
.unwrap();
let mut my_replica = store.new_replica(myspace.clone())?;
my_replica.hash_and_insert("/cool/path", &alice, "round 2")?;
let _entry = store
.get_exact(myspace.id(), alice.id(), "/cool/path", false)?
.unwrap();
let entries: Vec<_> = store
.get_many(myspace.id(), Query::author(alice.id()))?
.collect::<Result<_>>()?;
assert_eq!(entries.len(), 11);
let entries: Vec<_> = store
.get_many(myspace.id(), Query::author(bob.id()))?
.collect::<Result<_>>()?;
assert_eq!(entries.len(), 0);
let entries: Vec<_> = store
.get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
.collect::<Result<_>>()?;
assert_eq!(entries.len(), 1);
let entries: Vec<_> = store
.get_many(myspace.id(), Query::all())?
.collect::<Result<_>>()?;
assert_eq!(entries.len(), 11);
let mut my_replica = store.new_replica(myspace.clone())?;
let _entry = my_replica.hash_and_insert("/cool/path", &bob, "bob round 1")?;
let entries: Vec<_> = store
.get_many(myspace.id(), Query::author(alice.id()))?
.collect::<Result<_>>()?;
assert_eq!(entries.len(), 11);
let entries: Vec<_> = store
.get_many(myspace.id(), Query::author(bob.id()))?
.collect::<Result<_>>()?;
assert_eq!(entries.len(), 1);
let entries: Vec<_> = store
.get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
.collect::<Result<_>>()?;
assert_eq!(entries.len(), 2);
let entries: Vec<_> = store
.get_many(myspace.id(), Query::key_prefix(b"/cool"))?
.collect::<Result<_>>()?;
assert_eq!(entries.len(), 2);
let entries: Vec<_> = store
.get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
.collect::<Result<_>>()?;
assert_eq!(entries.len(), 1);
let entries: Vec<_> = store
.get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
.collect::<Result<_>>()?;
assert_eq!(entries.len(), 1);
let entries: Vec<_> = store
.get_many(myspace.id(), Query::all())?
.collect::<Result<_>>()?;
assert_eq!(entries.len(), 12);
let mut my_replica = store.new_replica(myspace.clone())?;
let entries_second: Vec<_> = my_replica
.store
.get_range(Range::new(
RecordIdentifier::default(),
RecordIdentifier::default(),
))?
.collect::<Result<_, _>>()?;
assert_eq!(entries_second.len(), 12);
assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
test_lru_cache_like_behaviour(&mut store, myspace.id())?;
store.flush()?;
Ok(())
}
fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
#[track_caller]
fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
assert_eq!(
expected_peers,
&store
.get_sync_peers(&namespace)
.unwrap()
.unwrap()
.collect::<Vec<_>>(),
"sync peers differ"
);
}
let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
let mut expected_peers = Vec::with_capacity(count);
for i in 0..count as u8 {
let peer = [i; 32];
expected_peers.insert(0, peer);
store.register_useful_peer(namespace, peer)?;
}
verify_peers(store, namespace, &expected_peers);
expected_peers.pop();
let newer_peer = [count as u8; 32];
expected_peers.insert(0, newer_peer);
store.register_useful_peer(namespace, newer_peer)?;
verify_peers(store, namespace, &expected_peers);
let refreshed_peer = expected_peers.remove(2);
expected_peers.insert(0, refreshed_peer);
store.register_useful_peer(namespace, refreshed_peer)?;
verify_peers(store, namespace, &expected_peers);
Ok(())
}
#[test]
fn test_content_hashes_iterator_memory() -> Result<()> {
let store = store::Store::memory();
test_content_hashes_iterator(store)
}
#[test]
fn test_content_hashes_iterator_fs() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let store = store::fs::Store::persistent(dbfile.path())?;
test_content_hashes_iterator(store)
}
fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
let mut rng = rand::thread_rng();
let mut expected = HashSet::new();
let n_replicas = 3;
let n_entries = 4;
for i in 0..n_replicas {
let namespace = NamespaceSecret::new(&mut rng);
let author = store.new_author(&mut rng)?;
let mut replica = store.new_replica(namespace)?;
for j in 0..n_entries {
let key = format!("{j}");
let data = format!("{i}:{j}");
let hash = replica.hash_and_insert(key, &author, data)?;
expected.insert(hash);
}
}
assert_eq!(expected.len(), n_replicas * n_entries);
let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
assert_eq!(actual, expected);
Ok(())
}
#[test]
fn test_multikey() {
let mut rng = rand::thread_rng();
let k = ["a", "c", "z"];
let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
n.sort_by_key(|n| n.id());
let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
a.sort_by_key(|a| a.id());
{
let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
let range = Range::new(ri0.clone(), ri2.clone());
assert!(range.contains(&ri0), "start");
assert!(range.contains(&ri1), "inside");
assert!(!range.contains(&ri2), "end");
assert!(ri0 < ri1);
assert!(ri1 < ri2);
}
{
let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
let range = Range::new(ri0.clone(), ri2.clone());
assert!(range.contains(&ri0), "start");
assert!(range.contains(&ri1), "inside");
assert!(!range.contains(&ri2), "end");
assert!(ri0 < ri1);
assert!(ri1 < ri2);
}
{
let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
let range = Range::new(ri0.clone(), ri2.clone());
assert!(range.contains(&ri0), "start");
assert!(range.contains(&ri1), "inside");
assert!(!range.contains(&ri2), "end");
assert!(ri0 < ri1);
assert!(ri1 < ri2);
}
{
let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
let range = Range::new(ri0.clone(), ri2.clone());
assert!(range.contains(&ri0), "start");
assert!(range.contains(&ri1), "inside");
assert!(!range.contains(&ri2), "end");
assert!(ri0 < ri1);
assert!(ri1 < ri2);
}
{
let a0 = a[0].id();
let a1 = a[1].id();
let n0 = n[0].id();
let n1 = n[1].id();
let k0 = k[0];
let k1 = k[1];
assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
}
}
#[test]
fn test_timestamps_memory() -> Result<()> {
let store = store::Store::memory();
test_timestamps(store)?;
Ok(())
}
#[test]
fn test_timestamps_fs() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let store = store::fs::Store::persistent(dbfile.path())?;
test_timestamps(store)?;
Ok(())
}
fn test_timestamps(mut store: Store) -> Result<()> {
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
let namespace = NamespaceSecret::new(&mut rng);
let _replica = store.new_replica(namespace.clone())?;
let author = store.new_author(&mut rng)?;
store.close_replica(namespace.id());
let mut replica = store.open_replica(&namespace.id())?;
let key = b"hello";
let value = b"world";
let entry = {
let timestamp = 2;
let id = RecordIdentifier::new(namespace.id(), author.id(), key);
let record = Record::from_data(value, timestamp);
Entry::new(id, record).sign(&namespace, &author)
};
replica
.insert_entry(entry.clone(), InsertOrigin::Local)
.unwrap();
store.close_replica(namespace.id());
let res = store
.get_exact(namespace.id(), author.id(), key, false)?
.unwrap();
assert_eq!(res, entry);
let entry2 = {
let timestamp = 1;
let id = RecordIdentifier::new(namespace.id(), author.id(), key);
let record = Record::from_data(value, timestamp);
Entry::new(id, record).sign(&namespace, &author)
};
let mut replica = store.open_replica(&namespace.id())?;
let res = replica.insert_entry(entry2, InsertOrigin::Local);
store.close_replica(namespace.id());
assert!(matches!(res, Err(InsertError::NewerEntryExists)));
let res = store
.get_exact(namespace.id(), author.id(), key, false)?
.unwrap();
assert_eq!(res, entry);
store.flush()?;
Ok(())
}
#[test]
fn test_replica_sync_memory() -> Result<()> {
let alice_store = store::Store::memory();
let bob_store = store::Store::memory();
test_replica_sync(alice_store, bob_store)?;
Ok(())
}
#[test]
fn test_replica_sync_fs() -> Result<()> {
let alice_dbfile = tempfile::NamedTempFile::new()?;
let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
let bob_dbfile = tempfile::NamedTempFile::new()?;
let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
test_replica_sync(alice_store, bob_store)?;
Ok(())
}
fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
let alice_set = ["ape", "eel", "fox", "gnu"];
let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
let mut rng = rand::thread_rng();
let author = Author::new(&mut rng);
let myspace = NamespaceSecret::new(&mut rng);
let mut alice = alice_store.new_replica(myspace.clone())?;
for el in &alice_set {
alice.hash_and_insert(el, &author, el.as_bytes())?;
}
let mut bob = bob_store.new_replica(myspace.clone())?;
for el in &bob_set {
bob.hash_and_insert(el, &author, el.as_bytes())?;
}
let (alice_out, bob_out) = sync(&mut alice, &mut bob)?;
assert_eq!(alice_out.num_sent, 2);
assert_eq!(bob_out.num_recv, 2);
assert_eq!(alice_out.num_recv, 6);
assert_eq!(bob_out.num_sent, 6);
check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
alice_store.flush()?;
bob_store.flush()?;
Ok(())
}
#[test]
fn test_replica_timestamp_sync_memory() -> Result<()> {
let alice_store = store::Store::memory();
let bob_store = store::Store::memory();
test_replica_timestamp_sync(alice_store, bob_store)?;
Ok(())
}
#[test]
fn test_replica_timestamp_sync_fs() -> Result<()> {
let alice_dbfile = tempfile::NamedTempFile::new()?;
let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
let bob_dbfile = tempfile::NamedTempFile::new()?;
let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
test_replica_timestamp_sync(alice_store, bob_store)?;
Ok(())
}
fn test_replica_timestamp_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
let mut rng = rand::thread_rng();
let author = Author::new(&mut rng);
let namespace = NamespaceSecret::new(&mut rng);
let mut alice = alice_store.new_replica(namespace.clone())?;
let mut bob = bob_store.new_replica(namespace.clone())?;
let key = b"key";
let alice_value = b"alice";
let bob_value = b"bob";
let _alice_hash = alice.hash_and_insert(key, &author, alice_value)?;
let bob_hash = bob.hash_and_insert(key, &author, bob_value)?;
sync(&mut alice, &mut bob)?;
assert_eq!(
get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
Some(bob_hash)
);
assert_eq!(
get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
Some(bob_hash)
);
let mut alice = alice_store.new_replica(namespace.clone())?;
let mut bob = bob_store.new_replica(namespace.clone())?;
let alice_value_2 = b"alice2";
let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value)?;
let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2)?;
sync(&mut alice, &mut bob)?;
assert_eq!(
get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
Some(alice_hash_2)
);
assert_eq!(
get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
Some(alice_hash_2)
);
alice_store.flush()?;
bob_store.flush()?;
Ok(())
}
#[test]
fn test_future_timestamp() -> Result<()> {
let mut rng = rand::thread_rng();
let mut store = store::Store::memory();
let author = Author::new(&mut rng);
let namespace = NamespaceSecret::new(&mut rng);
let mut replica = store.new_replica(namespace.clone())?;
let key = b"hi";
let t = system_time_now();
let record = Record::from_data(b"1", t);
let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
replica.insert_entry(entry0.clone(), InsertOrigin::Local)?;
assert_eq!(
get_entry(&mut store, namespace.id(), author.id(), key)?,
entry0
);
let mut replica = store.new_replica(namespace.clone())?;
let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
let record = Record::from_data(b"2", t);
let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
replica.insert_entry(entry1.clone(), InsertOrigin::Local)?;
assert_eq!(
get_entry(&mut store, namespace.id(), author.id(), key)?,
entry1
);
let mut replica = store.new_replica(namespace.clone())?;
let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
let record = Record::from_data(b"2", t);
let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
replica.insert_entry(entry2.clone(), InsertOrigin::Local)?;
assert_eq!(
get_entry(&mut store, namespace.id(), author.id(), key)?,
entry2
);
let mut replica = store.new_replica(namespace.clone())?;
let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
let record = Record::from_data(b"2", t);
let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
let res = replica.insert_entry(entry3, InsertOrigin::Local);
assert!(matches!(
res,
Err(InsertError::Validation(
ValidationFailure::TooFarInTheFuture
))
));
assert_eq!(
get_entry(&mut store, namespace.id(), author.id(), key)?,
entry2
);
store.flush()?;
Ok(())
}
#[test]
fn test_insert_empty() -> Result<()> {
let mut store = store::Store::memory();
let mut rng = rand::thread_rng();
let alice = Author::new(&mut rng);
let myspace = NamespaceSecret::new(&mut rng);
let mut replica = store.new_replica(myspace.clone())?;
let hash = Hash::new(b"");
let res = replica.insert(b"foo", &alice, hash, 0);
assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
store.flush()?;
Ok(())
}
#[test]
fn test_prefix_delete_memory() -> Result<()> {
let store = store::Store::memory();
test_prefix_delete(store)?;
Ok(())
}
#[test]
fn test_prefix_delete_fs() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let store = store::fs::Store::persistent(dbfile.path())?;
test_prefix_delete(store)?;
Ok(())
}
fn test_prefix_delete(mut store: Store) -> Result<()> {
let mut rng = rand::thread_rng();
let alice = Author::new(&mut rng);
let myspace = NamespaceSecret::new(&mut rng);
let mut replica = store.new_replica(myspace.clone())?;
let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello")?;
let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world")?;
assert_eq!(
get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
Some(hash1)
);
assert_eq!(
get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
Some(hash2)
);
let mut replica = store.new_replica(myspace.clone())?;
let deleted = replica.delete_prefix(b"foo", &alice)?;
assert_eq!(deleted, 2);
assert_eq!(
store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
None
);
assert_eq!(
store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
None
);
assert_eq!(
store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
None
);
store.flush()?;
Ok(())
}
#[test]
fn test_replica_sync_delete_memory() -> Result<()> {
let alice_store = store::Store::memory();
let bob_store = store::Store::memory();
test_replica_sync_delete(alice_store, bob_store)
}
#[test]
fn test_replica_sync_delete_fs() -> Result<()> {
let alice_dbfile = tempfile::NamedTempFile::new()?;
let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
let bob_dbfile = tempfile::NamedTempFile::new()?;
let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
test_replica_sync_delete(alice_store, bob_store)
}
fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
let alice_set = ["foot"];
let bob_set = ["fool", "foo", "fog"];
let mut rng = rand::thread_rng();
let author = Author::new(&mut rng);
let myspace = NamespaceSecret::new(&mut rng);
let mut alice = alice_store.new_replica(myspace.clone())?;
for el in &alice_set {
alice.hash_and_insert(el, &author, el.as_bytes())?;
}
let mut bob = bob_store.new_replica(myspace.clone())?;
for el in &bob_set {
bob.hash_and_insert(el, &author, el.as_bytes())?;
}
sync(&mut alice, &mut bob)?;
check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
let mut alice = alice_store.new_replica(myspace.clone())?;
let mut bob = bob_store.new_replica(myspace.clone())?;
alice.delete_prefix("foo", &author)?;
bob.hash_and_insert("fooz", &author, "fooz".as_bytes())?;
sync(&mut alice, &mut bob)?;
check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
alice_store.flush()?;
bob_store.flush()?;
Ok(())
}
#[test]
fn test_replica_remove_memory() -> Result<()> {
let alice_store = store::Store::memory();
test_replica_remove(alice_store)
}
#[test]
fn test_replica_remove_fs() -> Result<()> {
let alice_dbfile = tempfile::NamedTempFile::new()?;
let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
test_replica_remove(alice_store)
}
fn test_replica_remove(mut store: Store) -> Result<()> {
let mut rng = rand::thread_rng();
let namespace = NamespaceSecret::new(&mut rng);
let author = Author::new(&mut rng);
let mut replica = store.new_replica(namespace.clone())?;
let hash = replica.hash_and_insert(b"foo", &author, b"bar")?;
let res = store
.get_many(namespace.id(), Query::all())?
.collect::<Vec<_>>();
assert_eq!(res.len(), 1);
let res = store.remove_replica(&namespace.id());
assert!(res.is_err());
store.close_replica(namespace.id());
store.remove_replica(&namespace.id())?;
let res = store
.get_many(namespace.id(), Query::all())?
.collect::<Vec<_>>();
assert_eq!(res.len(), 0);
let res = store.load_replica_info(&namespace.id());
assert!(matches!(res, Err(OpenError::NotFound)));
let mut replica = store.new_replica(namespace.clone())?;
replica.insert(b"foo", &author, hash, 3)?;
let res = store
.get_many(namespace.id(), Query::all())?
.collect::<Vec<_>>();
assert_eq!(res.len(), 1);
store.flush()?;
Ok(())
}
#[test]
fn test_replica_delete_edge_cases_memory() -> Result<()> {
let store = store::Store::memory();
test_replica_delete_edge_cases(store)
}
#[test]
fn test_replica_delete_edge_cases_fs() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let store = store::fs::Store::persistent(dbfile.path())?;
test_replica_delete_edge_cases(store)
}
fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
let mut rng = rand::thread_rng();
let author = Author::new(&mut rng);
let namespace = NamespaceSecret::new(&mut rng);
let edgecases = [0u8, 1u8, 255u8];
let prefixes = [0u8, 255u8];
let hash = Hash::new(b"foo");
let len = 3;
for prefix in prefixes {
let mut expected = vec![];
let mut replica = store.new_replica(namespace.clone())?;
for suffix in edgecases {
let key = [prefix, suffix].to_vec();
expected.push(key.clone());
replica.insert(&key, &author, hash, len)?;
}
assert_keys(&mut store, namespace.id(), expected);
let mut replica = store.new_replica(namespace.clone())?;
replica.delete_prefix([prefix], &author)?;
assert_keys(&mut store, namespace.id(), vec![]);
}
let mut replica = store.new_replica(namespace.clone())?;
let key = vec![1u8, 0u8];
replica.insert(key, &author, hash, len)?;
let key = vec![1u8, 1u8];
replica.insert(key, &author, hash, len)?;
let key = vec![1u8, 2u8];
replica.insert(key, &author, hash, len)?;
let prefix = vec![1u8, 1u8];
replica.delete_prefix(prefix, &author)?;
assert_keys(
&mut store,
namespace.id(),
vec![vec![1u8, 0u8], vec![1u8, 2u8]],
);
let mut replica = store.new_replica(namespace.clone())?;
let key = vec![0u8, 255u8];
replica.insert(key, &author, hash, len)?;
let key = vec![0u8, 0u8];
replica.insert(key, &author, hash, len)?;
let prefix = vec![0u8];
replica.delete_prefix(prefix, &author)?;
assert_keys(
&mut store,
namespace.id(),
vec![vec![1u8, 0u8], vec![1u8, 2u8]],
);
store.flush()?;
Ok(())
}
#[test]
fn test_latest_iter_memory() -> Result<()> {
let store = store::Store::memory();
test_latest_iter(store)
}
#[test]
fn test_latest_iter_fs() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let store = store::fs::Store::persistent(dbfile.path())?;
test_latest_iter(store)
}
fn test_latest_iter(mut store: Store) -> Result<()> {
let mut rng = rand::thread_rng();
let author0 = Author::new(&mut rng);
let author1 = Author::new(&mut rng);
let namespace = NamespaceSecret::new(&mut rng);
let mut replica = store.new_replica(namespace.clone())?;
replica.hash_and_insert(b"a0.1", &author0, b"hi")?;
let latest = store
.get_latest_for_each_author(namespace.id())?
.collect::<Result<Vec<_>>>()?;
assert_eq!(latest.len(), 1);
assert_eq!(latest[0].2, b"a0.1".to_vec());
let mut replica = store.new_replica(namespace.clone())?;
replica.hash_and_insert(b"a1.1", &author1, b"hi")?;
replica.hash_and_insert(b"a0.2", &author0, b"hi")?;
let latest = store
.get_latest_for_each_author(namespace.id())?
.collect::<Result<Vec<_>>>()?;
let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
latest_keys.sort();
assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
store.flush()?;
Ok(())
}
#[test]
fn test_replica_byte_keys_memory() -> Result<()> {
let store = store::Store::memory();
test_replica_byte_keys(store)?;
Ok(())
}
#[test]
fn test_replica_byte_keys_fs() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let store = store::fs::Store::persistent(dbfile.path())?;
test_replica_byte_keys(store)?;
Ok(())
}
fn test_replica_byte_keys(mut store: Store) -> Result<()> {
let mut rng = rand::thread_rng();
let author = Author::new(&mut rng);
let namespace = NamespaceSecret::new(&mut rng);
let hash = Hash::new(b"foo");
let len = 3;
let key = vec![1u8, 0u8];
let mut replica = store.new_replica(namespace.clone())?;
replica.insert(key, &author, hash, len)?;
assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
let key = vec![1u8, 2u8];
let mut replica = store.new_replica(namespace.clone())?;
replica.insert(key, &author, hash, len)?;
assert_keys(
&mut store,
namespace.id(),
vec![vec![1u8, 0u8], vec![1u8, 2u8]],
);
let key = vec![0u8, 255u8];
let mut replica = store.new_replica(namespace.clone())?;
replica.insert(key, &author, hash, len)?;
assert_keys(
&mut store,
namespace.id(),
vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
);
store.flush()?;
Ok(())
}
#[test]
fn test_replica_capability_memory() -> Result<()> {
let store = store::Store::memory();
test_replica_capability(store)
}
#[test]
fn test_replica_capability_fs() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let store = store::fs::Store::persistent(dbfile.path())?;
test_replica_capability(store)
}
#[allow(clippy::redundant_pattern_matching)]
fn test_replica_capability(mut store: Store) -> Result<()> {
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
let author = store.new_author(&mut rng)?;
let namespace = NamespaceSecret::new(&mut rng);
let capability = Capability::Read(namespace.id());
store.import_namespace(capability)?;
let mut replica = store.open_replica(&namespace.id())?;
let res = replica.hash_and_insert(b"foo", &author, b"bar");
assert!(matches!(res, Err(InsertError::ReadOnly)));
let capability = Capability::Write(namespace.clone());
store.import_namespace(capability)?;
let mut replica = store.open_replica(&namespace.id())?;
let res = replica.hash_and_insert(b"foo", &author, b"bar");
assert!(matches!(res, Ok(_)));
store.close_replica(namespace.id());
let mut replica = store.open_replica(&namespace.id())?;
let res = replica.hash_and_insert(b"foo", &author, b"bar");
assert!(res.is_ok());
let capability = Capability::Read(namespace.id());
store.import_namespace(capability)?;
store.close_replica(namespace.id());
let mut replica = store.open_replica(&namespace.id())?;
let res = replica.hash_and_insert(b"foo", &author, b"bar");
assert!(res.is_ok());
store.flush()?;
Ok(())
}
#[tokio::test]
async fn test_actor_capability_memory() -> Result<()> {
let store = store::Store::memory();
test_actor_capability(store).await
}
#[tokio::test]
async fn test_actor_capability_fs() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let store = store::fs::Store::persistent(dbfile.path())?;
test_actor_capability(store).await
}
async fn test_actor_capability(store: Store) -> Result<()> {
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
let author = Author::new(&mut rng);
let handle = SyncHandle::spawn(store, None, "test".into());
let author = handle.import_author(author).await?;
let namespace = NamespaceSecret::new(&mut rng);
let id = namespace.id();
let capability = Capability::Read(namespace.id());
handle.import_namespace(capability).await?;
handle.open(namespace.id(), Default::default()).await?;
let res = handle
.insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
.await;
assert!(res.is_err());
let capability = Capability::Write(namespace.clone());
handle.import_namespace(capability).await?;
let res = handle
.insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
.await;
assert!(res.is_ok());
handle.close(namespace.id()).await?;
let res = handle
.insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
.await;
assert!(res.is_err());
handle.open(namespace.id(), Default::default()).await?;
let res = handle
.insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
.await;
assert!(res.is_ok());
Ok(())
}
fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
let mut res = vec![];
while let Ok(ev) = events.try_recv() {
res.push(ev);
}
res
}
#[test]
fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
let mut store1 = store::Store::memory();
let mut store2 = store::Store::memory();
let peer1 = [1u8; 32];
let peer2 = [2u8; 32];
let mut state1 = SyncOutcome::default();
let mut state2 = SyncOutcome::default();
let author = Author::new(&mut rng);
let namespace = NamespaceSecret::new(&mut rng);
let mut replica1 = store1.new_replica(namespace.clone())?;
let mut replica2 = store2.new_replica(namespace.clone())?;
let (events1_sender, events1) = async_channel::bounded(32);
let (events2_sender, events2) = async_channel::bounded(32);
replica1.info.subscribe(events1_sender);
replica2.info.subscribe(events2_sender);
replica1.hash_and_insert(b"foo", &author, b"init")?;
let from1 = replica1.sync_initial_message()?;
let from2 = replica2
.sync_process_message(from1, peer1, &mut state2)
.unwrap()
.unwrap();
let from1 = replica1
.sync_process_message(from2, peer2, &mut state1)
.unwrap()
.unwrap();
replica2.hash_and_insert(b"foo", &author, b"update")?;
let from2 = replica2
.sync_process_message(from1, peer1, &mut state2)
.unwrap();
assert!(from2.is_none());
let events1 = drain(events1);
let events2 = drain(events2);
assert_eq!(events1.len(), 1);
assert_eq!(events2.len(), 1);
assert!(matches!(events1[0], Event::LocalInsert { .. }));
assert!(matches!(events2[0], Event::LocalInsert { .. }));
assert_eq!(state1.num_sent, 1);
assert_eq!(state1.num_recv, 0);
assert_eq!(state2.num_sent, 0);
assert_eq!(state2.num_recv, 1);
store1.flush()?;
store2.flush()?;
Ok(())
}
#[test]
fn test_replica_queries_mem() -> Result<()> {
let store = store::Store::memory();
test_replica_queries(store)?;
Ok(())
}
#[test]
fn test_replica_queries_fs() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let store = store::fs::Store::persistent(dbfile.path())?;
test_replica_queries(store)?;
Ok(())
}
fn test_replica_queries(mut store: Store) -> Result<()> {
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
let namespace = NamespaceSecret::new(&mut rng);
let namespace_id = namespace.id();
let a1 = store.new_author(&mut rng)?;
let a2 = store.new_author(&mut rng)?;
let a3 = store.new_author(&mut rng)?;
println!(
"a1 {} a2 {} a3 {}",
a1.id().fmt_short(),
a2.id().fmt_short(),
a3.id().fmt_short()
);
let mut replica = store.new_replica(namespace.clone())?;
replica.hash_and_insert("hi/world", &a2, "a2")?;
replica.hash_and_insert("hi/world", &a1, "a1")?;
replica.hash_and_insert("hi/moon", &a2, "a1")?;
replica.hash_and_insert("hi", &a3, "a3")?;
struct QueryTester<'a> {
store: &'a mut Store,
namespace: NamespaceId,
}
impl QueryTester<'_> {
fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
let query = query.into();
let actual = self
.store
.get_many(self.namespace, query.clone())
.unwrap()
.map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
.collect::<Result<Vec<_>>>()
.unwrap();
let expected = expected
.into_iter()
.map(|(key, author)| (key.to_string(), author.id()))
.collect::<Vec<_>>();
assert_eq!(actual, expected, "query: {query:#?}")
}
}
let mut qt = QueryTester {
store: &mut store,
namespace: namespace_id,
};
qt.assert(
Query::all(),
vec![
("hi/world", &a1),
("hi/moon", &a2),
("hi/world", &a2),
("hi", &a3),
],
);
qt.assert(
Query::single_latest_per_key(),
vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
);
qt.assert(
Query::single_latest_per_key().sort_direction(SortDirection::Desc),
vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
);
qt.assert(
Query::single_latest_per_key().key_prefix("hi/"),
vec![("hi/moon", &a2), ("hi/world", &a1)],
);
qt.assert(
Query::single_latest_per_key()
.key_prefix("hi/")
.sort_direction(SortDirection::Desc),
vec![("hi/world", &a1), ("hi/moon", &a2)],
);
qt.assert(
Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
vec![
("hi", &a3),
("hi/moon", &a2),
("hi/world", &a1),
("hi/world", &a2),
],
);
qt.assert(
Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
vec![
("hi/world", &a2),
("hi/world", &a1),
("hi/moon", &a2),
("hi", &a3),
],
);
qt.assert(
Query::all().key_prefix("hi/"),
vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
);
qt.assert(
Query::all().key_prefix("hi/").offset(1).limit(1),
vec![("hi/moon", &a2)],
);
qt.assert(
Query::all()
.key_prefix("hi/")
.sort_by(SortBy::KeyAuthor, SortDirection::Desc),
vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
);
qt.assert(
Query::all()
.key_prefix("hi/")
.sort_by(SortBy::KeyAuthor, SortDirection::Desc)
.offset(1)
.limit(1),
vec![("hi/world", &a1)],
);
qt.assert(
Query::all()
.key_prefix("hi/")
.sort_by(SortBy::AuthorKey, SortDirection::Asc),
vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
);
qt.assert(
Query::all()
.key_prefix("hi/")
.sort_by(SortBy::AuthorKey, SortDirection::Desc),
vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
);
qt.assert(
Query::all()
.sort_by(SortBy::KeyAuthor, SortDirection::Asc)
.limit(2)
.offset(1),
vec![("hi/moon", &a2), ("hi/world", &a1)],
);
let mut replica = store.new_replica(namespace)?;
replica.delete_prefix("hi/world", &a2)?;
let mut qt = QueryTester {
store: &mut store,
namespace: namespace_id,
};
qt.assert(
Query::all(),
vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
);
qt.assert(
Query::all().include_empty(),
vec![
("hi/world", &a1),
("hi/moon", &a2),
("hi/world", &a2),
("hi", &a3),
],
);
store.flush()?;
Ok(())
}
#[test]
fn test_dl_policies_mem() -> Result<()> {
let mut store = store::Store::memory();
test_dl_policies(&mut store)
}
#[test]
fn test_dl_policies_fs() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let mut store = store::fs::Store::persistent(dbfile.path())?;
test_dl_policies(&mut store)
}
fn test_dl_policies(store: &mut Store) -> Result<()> {
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
let namespace = NamespaceSecret::new(&mut rng);
let id = namespace.id();
let filter = store::FilterKind::Exact("foo".into());
let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
store
.set_download_policy(&id, policy.clone())
.expect_err("document dos not exist");
store.new_replica(namespace)?;
store.set_download_policy(&id, policy.clone())?;
let retrieved_policy = store.get_download_policy(&id)?;
assert_eq!(retrieved_policy, policy);
store.flush()?;
Ok(())
}
fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
expected.sort();
assert_eq!(expected, get_keys_sorted(store, namespace));
}
fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
let mut res = store
.get_many(namespace, Query::all())
.unwrap()
.map(|e| e.map(|e| e.key().to_vec()))
.collect::<Result<Vec<_>>>()
.unwrap();
res.sort();
res
}
fn get_entry(
store: &mut Store,
namespace: NamespaceId,
author: AuthorId,
key: &[u8],
) -> anyhow::Result<SignedEntry> {
let entry = store
.get_exact(namespace, author, key, true)?
.ok_or_else(|| anyhow::anyhow!("not found"))?;
Ok(entry)
}
fn get_content_hash(
store: &mut Store,
namespace: NamespaceId,
author: AuthorId,
key: &[u8],
) -> anyhow::Result<Option<Hash>> {
let hash = store
.get_exact(namespace, author, key, false)?
.map(|e| e.content_hash());
Ok(hash)
}
fn sync(alice: &mut Replica, bob: &mut Replica) -> Result<(SyncOutcome, SyncOutcome)> {
let alice_peer_id = [1u8; 32];
let bob_peer_id = [2u8; 32];
let mut alice_state = SyncOutcome::default();
let mut bob_state = SyncOutcome::default();
let mut next_to_bob = Some(alice.sync_initial_message()?);
let mut rounds = 0;
while let Some(msg) = next_to_bob.take() {
assert!(rounds < 100, "too many rounds");
rounds += 1;
println!("round {}", rounds);
if let Some(msg) = bob.sync_process_message(msg, alice_peer_id, &mut bob_state)? {
next_to_bob = alice.sync_process_message(msg, bob_peer_id, &mut alice_state)?
}
}
assert_eq!(alice_state.num_sent, bob_state.num_recv);
assert_eq!(alice_state.num_recv, bob_state.num_sent);
Ok((alice_state, bob_state))
}
fn check_entries(
store: &mut Store,
namespace: &NamespaceId,
author: &Author,
set: &[&str],
) -> Result<()> {
for el in set {
store.get_exact(*namespace, author.id(), el, false)?;
}
Ok(())
}
}