use std::ops::{Bound, RangeBounds};
use bao_tree::{io::round_up_to_chunks, ChunkNum, ChunkRanges};
use range_collections::{range_set::RangeSetEntry, RangeSet2};
pub mod channel;
pub(crate) mod temp_tag;
pub mod serde {
pub mod io_error_serde {
use std::{fmt, io};
use serde::{
de::{self, SeqAccess, Visitor},
ser::SerializeTuple,
Deserializer, Serializer,
};
fn error_kind_to_u8(kind: io::ErrorKind) -> u8 {
match kind {
io::ErrorKind::AddrInUse => 0,
io::ErrorKind::AddrNotAvailable => 1,
io::ErrorKind::AlreadyExists => 2,
io::ErrorKind::ArgumentListTooLong => 3,
io::ErrorKind::BrokenPipe => 4,
io::ErrorKind::ConnectionAborted => 5,
io::ErrorKind::ConnectionRefused => 6,
io::ErrorKind::ConnectionReset => 7,
io::ErrorKind::CrossesDevices => 8,
io::ErrorKind::Deadlock => 9,
io::ErrorKind::DirectoryNotEmpty => 10,
io::ErrorKind::ExecutableFileBusy => 11,
io::ErrorKind::FileTooLarge => 12,
io::ErrorKind::HostUnreachable => 13,
io::ErrorKind::Interrupted => 14,
io::ErrorKind::InvalidData => 15,
io::ErrorKind::InvalidInput => 17,
io::ErrorKind::IsADirectory => 18,
io::ErrorKind::NetworkDown => 19,
io::ErrorKind::NetworkUnreachable => 20,
io::ErrorKind::NotADirectory => 21,
io::ErrorKind::NotConnected => 22,
io::ErrorKind::NotFound => 23,
io::ErrorKind::NotSeekable => 24,
io::ErrorKind::Other => 25,
io::ErrorKind::OutOfMemory => 26,
io::ErrorKind::PermissionDenied => 27,
io::ErrorKind::QuotaExceeded => 28,
io::ErrorKind::ReadOnlyFilesystem => 29,
io::ErrorKind::ResourceBusy => 30,
io::ErrorKind::StaleNetworkFileHandle => 31,
io::ErrorKind::StorageFull => 32,
io::ErrorKind::TimedOut => 33,
io::ErrorKind::TooManyLinks => 34,
io::ErrorKind::UnexpectedEof => 35,
io::ErrorKind::Unsupported => 36,
io::ErrorKind::WouldBlock => 37,
io::ErrorKind::WriteZero => 38,
_ => 25,
}
}
fn u8_to_error_kind(num: u8) -> io::ErrorKind {
match num {
0 => io::ErrorKind::AddrInUse,
1 => io::ErrorKind::AddrNotAvailable,
2 => io::ErrorKind::AlreadyExists,
3 => io::ErrorKind::ArgumentListTooLong,
4 => io::ErrorKind::BrokenPipe,
5 => io::ErrorKind::ConnectionAborted,
6 => io::ErrorKind::ConnectionRefused,
7 => io::ErrorKind::ConnectionReset,
8 => io::ErrorKind::CrossesDevices,
9 => io::ErrorKind::Deadlock,
10 => io::ErrorKind::DirectoryNotEmpty,
11 => io::ErrorKind::ExecutableFileBusy,
12 => io::ErrorKind::FileTooLarge,
13 => io::ErrorKind::HostUnreachable,
14 => io::ErrorKind::Interrupted,
15 => io::ErrorKind::InvalidData,
17 => io::ErrorKind::InvalidInput,
18 => io::ErrorKind::IsADirectory,
19 => io::ErrorKind::NetworkDown,
20 => io::ErrorKind::NetworkUnreachable,
21 => io::ErrorKind::NotADirectory,
22 => io::ErrorKind::NotConnected,
23 => io::ErrorKind::NotFound,
24 => io::ErrorKind::NotSeekable,
25 => io::ErrorKind::Other,
26 => io::ErrorKind::OutOfMemory,
27 => io::ErrorKind::PermissionDenied,
28 => io::ErrorKind::QuotaExceeded,
29 => io::ErrorKind::ReadOnlyFilesystem,
30 => io::ErrorKind::ResourceBusy,
31 => io::ErrorKind::StaleNetworkFileHandle,
32 => io::ErrorKind::StorageFull,
33 => io::ErrorKind::TimedOut,
34 => io::ErrorKind::TooManyLinks,
35 => io::ErrorKind::UnexpectedEof,
36 => io::ErrorKind::Unsupported,
37 => io::ErrorKind::WouldBlock,
38 => io::ErrorKind::WriteZero,
_ => io::ErrorKind::Other,
}
}
pub fn serialize<S>(error: &io::Error, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut tup = serializer.serialize_tuple(2)?;
tup.serialize_element(&error_kind_to_u8(error.kind()))?;
tup.serialize_element(&error.to_string())?;
tup.end()
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<io::Error, D::Error>
where
D: Deserializer<'de>,
{
struct IoErrorVisitor;
impl<'de> Visitor<'de> for IoErrorVisitor {
type Value = io::Error;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a tuple of (u32, String) representing io::Error")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let num: u8 = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(0, &self))?;
let message: String = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(1, &self))?;
let kind = u8_to_error_kind(num);
Ok(io::Error::new(kind, message))
}
}
deserializer.deserialize_tuple(2, IoErrorVisitor)
}
}
#[cfg(test)]
mod tests {
use std::io::{self, ErrorKind};
use postcard;
use serde::{Deserialize, Serialize};
use super::io_error_serde;
#[derive(Serialize, Deserialize)]
struct TestError(#[serde(with = "io_error_serde")] io::Error);
#[test]
fn test_roundtrip_error_kinds() {
let message = "test error";
let kinds = [
ErrorKind::AddrInUse,
ErrorKind::AddrNotAvailable,
ErrorKind::AlreadyExists,
ErrorKind::ArgumentListTooLong,
ErrorKind::BrokenPipe,
ErrorKind::ConnectionAborted,
ErrorKind::ConnectionRefused,
ErrorKind::ConnectionReset,
ErrorKind::CrossesDevices,
ErrorKind::Deadlock,
ErrorKind::DirectoryNotEmpty,
ErrorKind::ExecutableFileBusy,
ErrorKind::FileTooLarge,
ErrorKind::HostUnreachable,
ErrorKind::Interrupted,
ErrorKind::InvalidData,
ErrorKind::InvalidInput,
ErrorKind::IsADirectory,
ErrorKind::NetworkDown,
ErrorKind::NetworkUnreachable,
ErrorKind::NotADirectory,
ErrorKind::NotConnected,
ErrorKind::NotFound,
ErrorKind::NotSeekable,
ErrorKind::Other,
ErrorKind::OutOfMemory,
ErrorKind::PermissionDenied,
ErrorKind::QuotaExceeded,
ErrorKind::ReadOnlyFilesystem,
ErrorKind::ResourceBusy,
ErrorKind::StaleNetworkFileHandle,
ErrorKind::StorageFull,
ErrorKind::TimedOut,
ErrorKind::TooManyLinks,
ErrorKind::UnexpectedEof,
ErrorKind::Unsupported,
ErrorKind::WouldBlock,
ErrorKind::WriteZero,
];
for kind in kinds {
let err = TestError(io::Error::new(kind, message));
let serialized = postcard::to_allocvec(&err).unwrap();
let deserialized: TestError = postcard::from_bytes(&serialized).unwrap();
assert_eq!(err.0.kind(), deserialized.0.kind());
assert_eq!(err.0.to_string(), deserialized.0.to_string());
}
}
}
}
pub trait ChunkRangesExt {
fn last_chunk() -> Self;
fn chunk(offset: u64) -> Self;
fn bytes(ranges: impl RangeBounds<u64>) -> Self;
fn chunks(ranges: impl RangeBounds<u64>) -> Self;
fn offset(offset: u64) -> Self;
}
impl ChunkRangesExt for ChunkRanges {
fn last_chunk() -> Self {
ChunkRanges::from(ChunkNum(u64::MAX)..)
}
fn chunk(offset: u64) -> Self {
ChunkRanges::from(ChunkNum(offset)..ChunkNum(offset + 1))
}
fn bytes(ranges: impl RangeBounds<u64>) -> Self {
round_up_to_chunks(&bounds_from_range(ranges, |v| v))
}
fn chunks(ranges: impl RangeBounds<u64>) -> Self {
bounds_from_range(ranges, ChunkNum)
}
fn offset(offset: u64) -> Self {
Self::bytes(offset..offset + 1)
}
}
pub(crate) fn bounds_from_range<R, T, F>(range: R, f: F) -> RangeSet2<T>
where
R: RangeBounds<u64>,
T: RangeSetEntry,
F: Fn(u64) -> T,
{
let from = match range.start_bound() {
Bound::Included(start) => Some(*start),
Bound::Excluded(start) => {
let Some(start) = start.checked_add(1) else {
return RangeSet2::empty();
};
Some(start)
}
Bound::Unbounded => None,
};
let to = match range.end_bound() {
Bound::Included(end) => end.checked_add(1),
Bound::Excluded(end) => Some(*end),
Bound::Unbounded => None,
};
match (from, to) {
(Some(from), Some(to)) => RangeSet2::from(f(from)..f(to)),
(Some(from), None) => RangeSet2::from(f(from)..),
(None, Some(to)) => RangeSet2::from(..f(to)),
(None, None) => RangeSet2::all(),
}
}
pub mod outboard_with_progress {
use std::io::{self, BufReader, Read};
use bao_tree::{
blake3,
io::{
outboard::PreOrderOutboard,
sync::{OutboardMut, WriteAt},
},
iter::BaoChunk,
BaoTree, ChunkNum,
};
use smallvec::SmallVec;
use super::sink::Sink;
fn hash_subtree(start_chunk: u64, data: &[u8], is_root: bool) -> blake3::Hash {
use blake3::hazmat::{ChainingValue, HasherExt};
if is_root {
debug_assert!(start_chunk == 0);
blake3::hash(data)
} else {
let mut hasher = blake3::Hasher::new();
hasher.set_input_offset(start_chunk * 1024);
hasher.update(data);
let non_root_hash: ChainingValue = hasher.finalize_non_root();
blake3::Hash::from(non_root_hash)
}
}
fn parent_cv(
left_child: &blake3::Hash,
right_child: &blake3::Hash,
is_root: bool,
) -> blake3::Hash {
use blake3::hazmat::{merge_subtrees_non_root, merge_subtrees_root, ChainingValue, Mode};
let left_child: ChainingValue = *left_child.as_bytes();
let right_child: ChainingValue = *right_child.as_bytes();
if is_root {
merge_subtrees_root(&left_child, &right_child, Mode::Hash)
} else {
blake3::Hash::from(merge_subtrees_non_root(
&left_child,
&right_child,
Mode::Hash,
))
}
}
pub async fn init_outboard<R, W, P>(
data: R,
outboard: &mut PreOrderOutboard<W>,
progress: &mut P,
) -> std::io::Result<std::result::Result<(), P::Error>>
where
W: WriteAt,
R: Read,
P: Sink<ChunkNum>,
{
let size = usize::try_from(outboard.tree.size()).unwrap_or(usize::MAX);
let read_buf_size = size.min(1024 * 1024);
let chunk_buf_size = size.min(outboard.tree.block_size().bytes());
let reader = BufReader::with_capacity(read_buf_size, data);
let mut buffer = SmallVec::<[u8; 128]>::from_elem(0u8, chunk_buf_size);
let res = init_impl(outboard.tree, reader, outboard, &mut buffer, progress).await?;
Ok(res)
}
async fn init_impl<W, P>(
tree: BaoTree,
mut data: impl Read,
outboard: &mut PreOrderOutboard<W>,
buffer: &mut [u8],
progress: &mut P,
) -> io::Result<std::result::Result<(), P::Error>>
where
W: WriteAt,
P: Sink<ChunkNum>,
{
let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
for item in tree.post_order_chunks_iter() {
match item {
BaoChunk::Parent { is_root, node, .. } => {
let right_hash = stack.pop().unwrap();
let left_hash = stack.pop().unwrap();
outboard.save(node, &(left_hash, right_hash))?;
let parent = parent_cv(&left_hash, &right_hash, is_root);
stack.push(parent);
}
BaoChunk::Leaf {
size,
is_root,
start_chunk,
..
} => {
if let Err(err) = progress.send(start_chunk).await {
return Ok(Err(err));
}
let buf = &mut buffer[..size];
data.read_exact(buf)?;
let hash = hash_subtree(start_chunk.0, buf, is_root);
stack.push(hash);
}
}
}
debug_assert_eq!(stack.len(), 1);
outboard.root = stack.pop().unwrap();
Ok(Ok(()))
}
#[cfg(test)]
mod tests {
use bao_tree::{
blake3,
io::{outboard::PreOrderOutboard, sync::CreateOutboard},
BaoTree,
};
use testresult::TestResult;
use crate::{
store::{fs::tests::test_data, IROH_BLOCK_SIZE},
util::{outboard_with_progress::init_outboard, sink::Drain},
};
#[tokio::test]
async fn init_outboard_with_progress() -> TestResult<()> {
for size in [1024 * 18 + 1] {
let data = test_data(size);
let mut o1 = PreOrderOutboard::<Vec<u8>> {
tree: BaoTree::new(data.len() as u64, IROH_BLOCK_SIZE),
..Default::default()
};
let mut o2 = o1.clone();
o1.init_from(data.as_ref())?;
init_outboard(data.as_ref(), &mut o2, &mut Drain).await??;
assert_eq!(o1.root, blake3::hash(&data));
assert_eq!(o1.root, o2.root);
assert_eq!(o1.data, o2.data);
}
Ok(())
}
}
}
pub mod sink {
use std::{future::Future, io};
use irpc::RpcMessage;
pub trait Sink<Item> {
type Error;
fn send(
&mut self,
value: Item,
) -> impl Future<Output = std::result::Result<(), Self::Error>>;
fn with_map_err<F, U>(self, f: F) -> WithMapErr<Self, F>
where
Self: Sized,
F: Fn(Self::Error) -> U + Send + 'static,
{
WithMapErr { inner: self, f }
}
fn with_map<F, U>(self, f: F) -> WithMap<Self, F>
where
Self: Sized,
F: Fn(U) -> Item + Send + 'static,
{
WithMap { inner: self, f }
}
}
impl<I, T> Sink<T> for &mut I
where
I: Sink<T>,
{
type Error = I::Error;
async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
(*self).send(value).await
}
}
pub struct IrpcSenderSink<T>(pub irpc::channel::mpsc::Sender<T>);
impl<T> Sink<T> for IrpcSenderSink<T>
where
T: RpcMessage,
{
type Error = irpc::channel::SendError;
async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
self.0.send(value).await
}
}
pub struct IrpcSenderRefSink<'a, T>(pub &'a mut irpc::channel::mpsc::Sender<T>);
impl<'a, T> Sink<T> for IrpcSenderRefSink<'a, T>
where
T: RpcMessage,
{
type Error = irpc::channel::SendError;
async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
self.0.send(value).await
}
}
pub struct TokioMpscSenderSink<T>(pub tokio::sync::mpsc::Sender<T>);
impl<T> Sink<T> for TokioMpscSenderSink<T> {
type Error = tokio::sync::mpsc::error::SendError<T>;
async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
self.0.send(value).await
}
}
pub struct WithMapErr<P, F> {
inner: P,
f: F,
}
impl<P, F, E, U> Sink<U> for WithMapErr<P, F>
where
P: Sink<U>,
F: Fn(P::Error) -> E + Send + 'static,
{
type Error = E;
async fn send(&mut self, value: U) -> std::result::Result<(), Self::Error> {
match self.inner.send(value).await {
Ok(()) => Ok(()),
Err(err) => {
let err = (self.f)(err);
Err(err)
}
}
}
}
pub struct WithMap<P, F> {
inner: P,
f: F,
}
impl<P, F, T, U> Sink<T> for WithMap<P, F>
where
P: Sink<U>,
F: Fn(T) -> U + Send + 'static,
{
type Error = P::Error;
async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
self.inner.send((self.f)(value)).await
}
}
pub struct Drain;
impl<T> Sink<T> for Drain {
type Error = io::Error;
async fn send(&mut self, _offset: T) -> std::result::Result<(), Self::Error> {
io::Result::Ok(())
}
}
}