use std::{
fs::{File, OpenOptions},
io,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::{Arc, RwLock, Weak},
};
use bao_tree::{
io::{
fsm::BaoContentItem,
outboard::PreOrderOutboard,
sync::{ReadAt, WriteAt},
},
BaoTree,
};
use bytes::{Bytes, BytesMut};
use derive_more::Debug;
use iroh_base::hash::Hash;
use iroh_io::AsyncSliceReader;
use super::mutable_mem_storage::{MutableMemStorage, SizeInfo};
use crate::{
store::BaoBatchWriter,
util::{get_limited_slice, MemOrFile, SparseMemFile},
IROH_BLOCK_SIZE,
};
struct DataPaths {
data: PathBuf,
outboard: PathBuf,
sizes: PathBuf,
}
#[derive(Default, derive_more::Debug)]
pub struct CompleteStorage {
#[debug("{:?}", data.as_ref().map_mem(|x| x.len()))]
pub data: MemOrFile<Bytes, (File, u64)>,
#[debug("{:?}", outboard.as_ref().map_mem(|x| x.len()))]
pub outboard: MemOrFile<Bytes, (File, u64)>,
}
impl CompleteStorage {
pub fn read_data_at(&self, offset: u64, len: usize) -> Bytes {
match &self.data {
MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len),
MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(),
}
}
pub fn read_outboard_at(&self, offset: u64, len: usize) -> Bytes {
match &self.outboard {
MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len),
MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(),
}
}
pub fn data_size(&self) -> u64 {
match &self.data {
MemOrFile::Mem(mem) => mem.len() as u64,
MemOrFile::File((_file, size)) => *size,
}
}
pub fn outboard_size(&self) -> u64 {
match &self.outboard {
MemOrFile::Mem(mem) => mem.len() as u64,
MemOrFile::File((_file, size)) => *size,
}
}
}
fn create_read_write(path: impl AsRef<Path>) -> io::Result<File> {
OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
}
fn read_to_end(file: impl ReadAt, offset: u64, max: usize) -> io::Result<Bytes> {
let mut res = BytesMut::new();
let mut buf = [0u8; 4096];
let mut remaining = max;
let mut offset = offset;
while remaining > 0 {
let end = buf.len().min(remaining);
let read = file.read_at(offset, &mut buf[..end])?;
if read == 0 {
break;
}
res.extend_from_slice(&buf[..read]);
offset += read as u64;
remaining -= read;
}
Ok(res.freeze())
}
fn max_offset(batch: &[BaoContentItem]) -> u64 {
batch
.iter()
.filter_map(|item| match item {
BaoContentItem::Leaf(leaf) => {
let len = leaf.data.len().try_into().unwrap();
let end = leaf
.offset
.checked_add(len)
.expect("u64 overflow for leaf end");
Some(end)
}
_ => None,
})
.max()
.unwrap_or(0)
}
#[derive(Debug)]
pub struct FileStorage {
data: std::fs::File,
outboard: std::fs::File,
sizes: std::fs::File,
}
impl FileStorage {
pub fn into_parts(self) -> (File, File, File) {
(self.data, self.outboard, self.sizes)
}
fn current_size(&self) -> io::Result<u64> {
let len = self.sizes.metadata()?.len();
if len < 8 {
Ok(0)
} else {
let mut buf = [0u8; 8];
self.sizes.read_exact_at(len - 8, &mut buf)?;
Ok(u64::from_le_bytes(buf))
}
}
fn write_batch(&mut self, size: u64, batch: &[BaoContentItem]) -> io::Result<()> {
let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
for item in batch {
match item {
BaoContentItem::Parent(parent) => {
if let Some(offset) = tree.pre_order_offset(parent.node) {
let o0 = offset * 64;
self.outboard
.write_all_at(o0, parent.pair.0.as_bytes().as_slice())?;
self.outboard
.write_all_at(o0 + 32, parent.pair.1.as_bytes().as_slice())?;
}
}
BaoContentItem::Leaf(leaf) => {
let o0 = leaf.offset;
let index = (leaf.offset >> (tree.block_size().chunk_log() + 10)) << 3;
tracing::trace!(
"write_batch f={:?} o={} l={}",
self.data,
o0,
leaf.data.len()
);
self.data.write_all_at(o0, leaf.data.as_ref())?;
let size = tree.size();
self.sizes.write_all_at(index, &size.to_le_bytes())?;
}
}
}
Ok(())
}
fn read_data_at(&self, offset: u64, len: usize) -> io::Result<Bytes> {
read_to_end(&self.data, offset, len)
}
fn read_outboard_at(&self, offset: u64, len: usize) -> io::Result<Bytes> {
read_to_end(&self.outboard, offset, len)
}
}
#[derive(Debug)]
pub(crate) enum BaoFileStorage {
IncompleteMem(MutableMemStorage),
IncompleteFile(FileStorage),
Complete(CompleteStorage),
}
impl Default for BaoFileStorage {
fn default() -> Self {
BaoFileStorage::Complete(Default::default())
}
}
impl BaoFileStorage {
#[cfg(feature = "fs-store")]
pub fn take(&mut self) -> Self {
std::mem::take(self)
}
pub fn incomplete_mem() -> Self {
Self::IncompleteMem(Default::default())
}
fn sync_all(&self) -> io::Result<()> {
match self {
Self::Complete(_) => Ok(()),
Self::IncompleteMem(_) => Ok(()),
Self::IncompleteFile(file) => {
file.data.sync_all()?;
file.outboard.sync_all()?;
file.sizes.sync_all()?;
Ok(())
}
}
}
pub fn is_mem(&self) -> bool {
match self {
Self::IncompleteMem(_) => true,
Self::IncompleteFile(_) => false,
Self::Complete(c) => c.data.is_mem() && c.outboard.is_mem(),
}
}
}
#[derive(Debug, Clone)]
pub struct BaoFileHandleWeak(Weak<BaoFileHandleInner>);
impl BaoFileHandleWeak {
pub fn upgrade(&self) -> Option<BaoFileHandle> {
self.0.upgrade().map(BaoFileHandle)
}
pub fn is_live(&self) -> bool {
self.0.strong_count() > 0
}
}
#[derive(Debug)]
pub struct BaoFileHandleInner {
pub(crate) storage: RwLock<BaoFileStorage>,
config: Arc<BaoFileConfig>,
hash: Hash,
}
#[derive(Debug, Clone, derive_more::Deref)]
pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
pub(crate) type CreateCb = Arc<dyn Fn(&Hash) -> io::Result<()> + Send + Sync>;
#[derive(derive_more::Debug, Clone)]
pub struct BaoFileConfig {
dir: Arc<PathBuf>,
max_mem: usize,
#[debug("{:?}", on_file_create.as_ref().map(|_| ()))]
on_file_create: Option<CreateCb>,
}
impl BaoFileConfig {
pub fn new(dir: Arc<PathBuf>, max_mem: usize, on_file_create: Option<CreateCb>) -> Self {
Self {
dir,
max_mem,
on_file_create,
}
}
fn paths(&self, hash: &Hash) -> DataPaths {
DataPaths {
data: self.dir.join(format!("{}.data", hash.to_hex())),
outboard: self.dir.join(format!("{}.obao4", hash.to_hex())),
sizes: self.dir.join(format!("{}.sizes4", hash.to_hex())),
}
}
}
#[derive(Debug)]
pub struct DataReader(Option<BaoFileHandle>);
async fn with_storage<T, P, F>(opt: &mut Option<BaoFileHandle>, no_io: P, f: F) -> io::Result<T>
where
P: Fn(&BaoFileStorage) -> bool + Send + 'static,
F: FnOnce(&BaoFileStorage) -> io::Result<T> + Send + 'static,
T: Send + 'static,
{
let handle = opt
.take()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "deferred batch busy"))?;
if let Ok(storage) = handle.storage.try_read() {
if no_io(&storage) {
let res = f(&storage);
*opt = Some(handle.clone());
return res;
}
};
let (handle, res) = tokio::task::spawn_blocking(move || {
let storage = handle.storage.read().unwrap();
let res = f(storage.deref());
drop(storage);
(handle, res)
})
.await
.expect("spawn_blocking failed");
*opt = Some(handle);
res
}
impl AsyncSliceReader for DataReader {
async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
with_storage(
&mut self.0,
BaoFileStorage::is_mem,
move |storage| match storage {
BaoFileStorage::Complete(mem) => Ok(mem.read_data_at(offset, len)),
BaoFileStorage::IncompleteMem(mem) => Ok(mem.read_data_at(offset, len)),
BaoFileStorage::IncompleteFile(file) => file.read_data_at(offset, len),
},
)
.await
}
async fn size(&mut self) -> io::Result<u64> {
with_storage(
&mut self.0,
BaoFileStorage::is_mem,
move |storage| match storage {
BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
BaoFileStorage::IncompleteMem(mem) => Ok(mem.data.len() as u64),
BaoFileStorage::IncompleteFile(file) => file.data.metadata().map(|m| m.len()),
},
)
.await
}
}
#[derive(Debug)]
pub struct OutboardReader(Option<BaoFileHandle>);
impl AsyncSliceReader for OutboardReader {
async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
with_storage(
&mut self.0,
BaoFileStorage::is_mem,
move |storage| match storage {
BaoFileStorage::Complete(mem) => Ok(mem.read_outboard_at(offset, len)),
BaoFileStorage::IncompleteMem(mem) => Ok(mem.read_outboard_at(offset, len)),
BaoFileStorage::IncompleteFile(file) => file.read_outboard_at(offset, len),
},
)
.await
}
async fn size(&mut self) -> io::Result<u64> {
with_storage(
&mut self.0,
BaoFileStorage::is_mem,
move |storage| match storage {
BaoFileStorage::Complete(mem) => Ok(mem.outboard_size()),
BaoFileStorage::IncompleteMem(mem) => Ok(mem.outboard.len() as u64),
BaoFileStorage::IncompleteFile(file) => file.outboard.metadata().map(|m| m.len()),
},
)
.await
}
}
enum HandleChange {
None,
MemToFile,
}
impl BaoFileHandle {
pub fn incomplete_mem(config: Arc<BaoFileConfig>, hash: Hash) -> Self {
let storage = BaoFileStorage::incomplete_mem();
Self(Arc::new(BaoFileHandleInner {
storage: RwLock::new(storage),
config,
hash,
}))
}
pub fn incomplete_file(config: Arc<BaoFileConfig>, hash: Hash) -> io::Result<Self> {
let paths = config.paths(&hash);
let storage = BaoFileStorage::IncompleteFile(FileStorage {
data: create_read_write(&paths.data)?,
outboard: create_read_write(&paths.outboard)?,
sizes: create_read_write(&paths.sizes)?,
});
Ok(Self(Arc::new(BaoFileHandleInner {
storage: RwLock::new(storage),
config,
hash,
})))
}
pub fn new_complete(
config: Arc<BaoFileConfig>,
hash: Hash,
data: MemOrFile<Bytes, (File, u64)>,
outboard: MemOrFile<Bytes, (File, u64)>,
) -> Self {
let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard });
Self(Arc::new(BaoFileHandleInner {
storage: RwLock::new(storage),
config,
hash,
}))
}
#[cfg(feature = "fs-store")]
pub(crate) fn transform(
&self,
f: impl FnOnce(BaoFileStorage) -> io::Result<BaoFileStorage>,
) -> io::Result<()> {
let mut lock = self.storage.write().unwrap();
let storage = lock.take();
*lock = f(storage)?;
Ok(())
}
pub fn is_complete(&self) -> bool {
matches!(
self.storage.read().unwrap().deref(),
BaoFileStorage::Complete(_)
)
}
pub fn data_reader(&self) -> DataReader {
DataReader(Some(self.clone()))
}
pub fn outboard_reader(&self) -> OutboardReader {
OutboardReader(Some(self.clone()))
}
pub fn current_size(&self) -> io::Result<u64> {
match self.storage.read().unwrap().deref() {
BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()),
BaoFileStorage::IncompleteFile(file) => file.current_size(),
}
}
pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader>> {
let root = self.hash.into();
let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE);
let outboard = self.outboard_reader();
Ok(PreOrderOutboard {
root,
tree,
data: outboard,
})
}
pub fn hash(&self) -> Hash {
self.hash
}
pub fn writer(&self) -> BaoFileWriter {
BaoFileWriter(Some(self.clone()))
}
fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result<HandleChange> {
let mut storage = self.storage.write().unwrap();
match storage.deref_mut() {
BaoFileStorage::IncompleteMem(mem) => {
if max_offset(batch) <= self.config.max_mem as u64 {
mem.write_batch(size, batch)?;
Ok(HandleChange::None)
} else {
let paths = self.config.paths(&self.hash);
let mut file_batch = mem.persist(paths)?;
file_batch.write_batch(size, batch)?;
*storage = BaoFileStorage::IncompleteFile(file_batch);
Ok(HandleChange::MemToFile)
}
}
BaoFileStorage::IncompleteFile(file) => {
file.write_batch(size, batch)?;
Ok(HandleChange::None)
}
BaoFileStorage::Complete(_) => {
Ok(HandleChange::None)
}
}
}
pub fn downgrade(&self) -> BaoFileHandleWeak {
BaoFileHandleWeak(Arc::downgrade(&self.0))
}
}
impl SizeInfo {
pub fn persist(&self, mut target: impl WriteAt) -> io::Result<()> {
let size_offset = (self.offset >> IROH_BLOCK_SIZE.chunk_log()) << 3;
target.write_all_at(size_offset, self.size.to_le_bytes().as_slice())?;
Ok(())
}
pub fn to_vec(&self) -> Vec<u8> {
let mut res = Vec::new();
self.persist(&mut res).expect("io error writing to vec");
res
}
}
impl MutableMemStorage {
fn persist(&self, paths: DataPaths) -> io::Result<FileStorage> {
let mut data = create_read_write(&paths.data)?;
let mut outboard = create_read_write(&paths.outboard)?;
let mut sizes = create_read_write(&paths.sizes)?;
self.data.persist(&mut data)?;
self.outboard.persist(&mut outboard)?;
self.sizes.persist(&mut sizes)?;
data.sync_all()?;
outboard.sync_all()?;
sizes.sync_all()?;
Ok(FileStorage {
data,
outboard,
sizes,
})
}
pub fn into_parts(self) -> (SparseMemFile, SparseMemFile, SizeInfo) {
(self.data, self.outboard, self.sizes)
}
}
#[derive(Debug)]
pub struct BaoFileWriter(Option<BaoFileHandle>);
impl BaoBatchWriter for BaoFileWriter {
async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> std::io::Result<()> {
let Some(handle) = self.0.take() else {
return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
};
let (handle, change) = tokio::task::spawn_blocking(move || {
let change = handle.write_batch(size, &batch);
(handle, change)
})
.await
.expect("spawn_blocking failed");
match change? {
HandleChange::None => {}
HandleChange::MemToFile => {
if let Some(cb) = handle.config.on_file_create.as_ref() {
cb(&handle.hash)?;
}
}
}
self.0 = Some(handle);
Ok(())
}
async fn sync(&mut self) -> io::Result<()> {
let Some(handle) = self.0.take() else {
return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
};
let (handle, res) = tokio::task::spawn_blocking(move || {
let res = handle.storage.write().unwrap().sync_all();
(handle, res)
})
.await
.expect("spawn_blocking failed");
self.0 = Some(handle);
res
}
}
#[cfg(test)]
pub mod test_support {
use std::{future::Future, io::Cursor, ops::Range};
use bao_tree::{
io::{
fsm::{ResponseDecoder, ResponseDecoderNext},
outboard::PostOrderMemOutboard,
round_up_to_chunks,
sync::encode_ranges_validated,
},
BlockSize, ChunkRanges,
};
use futures_lite::{Stream, StreamExt};
use iroh_io::AsyncStreamReader;
use rand::RngCore;
use range_collections::RangeSet2;
use super::*;
use crate::util::limited_range;
pub const IROH_BLOCK_SIZE: BlockSize = BlockSize::from_chunk_log(4);
pub async fn decode_response_into_batch<R, W>(
root: Hash,
block_size: BlockSize,
ranges: ChunkRanges,
mut encoded: R,
mut target: W,
) -> io::Result<()>
where
R: AsyncStreamReader,
W: BaoBatchWriter,
{
let size = encoded.read::<8>().await?;
let size = u64::from_le_bytes(size);
let mut reading =
ResponseDecoder::new(root.into(), ranges, BaoTree::new(size, block_size), encoded);
let mut stack = Vec::new();
loop {
let item = match reading.next().await {
ResponseDecoderNext::Done(_reader) => break,
ResponseDecoderNext::More((next, item)) => {
reading = next;
item?
}
};
match item {
BaoContentItem::Parent(_) => {
stack.push(item);
}
BaoContentItem::Leaf(_) => {
stack.push(item);
target.write_batch(size, std::mem::take(&mut stack)).await?;
}
}
}
assert!(stack.is_empty(), "last item should be a leaf");
Ok(())
}
pub fn random_test_data(size: usize) -> Vec<u8> {
let mut rand = rand::thread_rng();
let mut res = vec![0u8; size];
rand.fill_bytes(&mut res);
res
}
pub fn simulate_remote(data: &[u8]) -> (Hash, Cursor<Bytes>) {
let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
let size = data.len() as u64;
let mut encoded = size.to_le_bytes().to_vec();
bao_tree::io::sync::encode_ranges_validated(
data,
&outboard,
&ChunkRanges::all(),
&mut encoded,
)
.unwrap();
let hash = outboard.root;
(hash.into(), Cursor::new(encoded.into()))
}
pub fn to_ranges(ranges: &[Range<u64>]) -> RangeSet2<u64> {
let mut range_set = RangeSet2::empty();
for range in ranges.as_ref().iter().cloned() {
range_set |= RangeSet2::from(range);
}
range_set
}
pub fn make_wire_data(
data: &[u8],
ranges: impl AsRef<[Range<u64>]>,
) -> (Hash, ChunkRanges, Vec<u8>) {
let range_set = to_ranges(ranges.as_ref());
let chunk_ranges = round_up_to_chunks(&range_set);
let outboard = PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE).flip();
let size = data.len() as u64;
let mut encoded = size.to_le_bytes().to_vec();
encode_ranges_validated(data, &outboard, &chunk_ranges, &mut encoded).unwrap();
(outboard.root.into(), chunk_ranges, encoded)
}
pub async fn validate(handle: &BaoFileHandle, original: &[u8], ranges: &[Range<u64>]) {
let mut r = handle.data_reader();
for range in ranges {
let start = range.start;
let len = (range.end - range.start).try_into().unwrap();
let data = &original[limited_range(start, len, original.len())];
let read = r.read_at(start, len).await.unwrap();
assert_eq!(data.len(), read.as_ref().len());
assert_eq!(data, read.as_ref());
}
}
pub fn trickle(
data: &[u8],
mtu: usize,
delay: std::time::Duration,
) -> impl Stream<Item = Bytes> {
let parts = data
.chunks(mtu)
.map(Bytes::copy_from_slice)
.collect::<Vec<_>>();
futures_lite::stream::iter(parts).then(move |part| async move {
tokio::time::sleep(delay).await;
part
})
}
pub async fn local<F>(f: F) -> F::Output
where
F: Future,
{
tokio::task::LocalSet::new().run_until(f).await
}
}
#[cfg(test)]
mod tests {
use std::io::Write;
use bao_tree::{blake3, ChunkNum, ChunkRanges};
use futures_lite::StreamExt;
use iroh_io::TokioStreamReader;
use tests::test_support::{
decode_response_into_batch, local, make_wire_data, random_test_data, trickle, validate,
};
use tokio::task::JoinSet;
use super::*;
use crate::util::local_pool::LocalPool;
#[tokio::test]
async fn partial_downloads() {
local(async move {
let n = 1024 * 64u64;
let test_data = random_test_data(n as usize);
let temp_dir = tempfile::tempdir().unwrap();
let hash = blake3::hash(&test_data);
let handle = BaoFileHandle::incomplete_mem(
Arc::new(BaoFileConfig::new(
Arc::new(temp_dir.as_ref().to_owned()),
1024 * 16,
None,
)),
hash.into(),
);
let mut tasks = JoinSet::new();
for i in 1..3 {
let file = handle.writer();
let range = (i * (n / 4))..((i + 1) * (n / 4));
println!("range: {:?}", range);
let (hash, chunk_ranges, wire_data) = make_wire_data(&test_data, &[range]);
let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10))
.map(io::Result::Ok)
.boxed();
let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle));
let _task = tasks.spawn_local(async move {
decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file)
.await
});
}
while let Some(res) = tasks.join_next().await {
res.unwrap().unwrap();
}
println!(
"len {:?} {:?}",
handle,
handle.data_reader().size().await.unwrap()
);
#[allow(clippy::single_range_in_vec_init)]
let ranges = [1024 * 16..1024 * 48];
validate(&handle, &test_data, &ranges).await;
let mut encoded = Vec::new();
let ob = handle.outboard().unwrap();
encoded
.write_all(ob.tree.size().to_le_bytes().as_slice())
.unwrap();
bao_tree::io::fsm::encode_ranges_validated(
handle.data_reader(),
ob,
&ChunkRanges::from(ChunkNum(16)..ChunkNum(48)),
encoded,
)
.await
.unwrap();
})
.await;
}
#[tokio::test]
async fn concurrent_downloads() {
let n = 1024 * 32u64;
let test_data = random_test_data(n as usize);
let temp_dir = tempfile::tempdir().unwrap();
let hash = blake3::hash(&test_data);
let handle = BaoFileHandle::incomplete_mem(
Arc::new(BaoFileConfig::new(
Arc::new(temp_dir.as_ref().to_owned()),
1024 * 16,
None,
)),
hash.into(),
);
let local = LocalPool::default();
let mut tasks = Vec::new();
for i in 0..4 {
let file = handle.writer();
let range = (i * (n / 4))..((i + 1) * (n / 4));
println!("range: {:?}", range);
let (hash, chunk_ranges, wire_data) = make_wire_data(&test_data, &[range]);
let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10))
.map(io::Result::Ok)
.boxed();
let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle));
let task = local.spawn(move || async move {
decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file).await
});
tasks.push(task);
}
for task in tasks {
task.await.unwrap().unwrap();
}
println!(
"len {:?} {:?}",
handle,
handle.data_reader().size().await.unwrap()
);
#[allow(clippy::single_range_in_vec_init)]
let ranges = [0..n];
validate(&handle, &test_data, &ranges).await;
let mut encoded = Vec::new();
let ob = handle.outboard().unwrap();
encoded
.write_all(ob.tree.size().to_le_bytes().as_slice())
.unwrap();
bao_tree::io::fsm::encode_ranges_validated(
handle.data_reader(),
ob,
&ChunkRanges::all(),
encoded,
)
.await
.unwrap();
}
#[tokio::test]
async fn stay_in_mem() {
let test_data = random_test_data(1024 * 17);
#[allow(clippy::single_range_in_vec_init)]
let ranges = [0..test_data.len().try_into().unwrap()];
let (hash, chunk_ranges, wire_data) = make_wire_data(&test_data, &ranges);
println!("file len is {:?}", chunk_ranges);
let temp_dir = tempfile::tempdir().unwrap();
let handle = BaoFileHandle::incomplete_mem(
Arc::new(BaoFileConfig::new(
Arc::new(temp_dir.as_ref().to_owned()),
1024 * 16,
None,
)),
hash,
);
decode_response_into_batch(
hash,
IROH_BLOCK_SIZE,
chunk_ranges,
wire_data.as_slice(),
handle.writer(),
)
.await
.unwrap();
validate(&handle, &test_data, &ranges).await;
let mut encoded = Vec::new();
let ob = handle.outboard().unwrap();
encoded
.write_all(ob.tree.size().to_le_bytes().as_slice())
.unwrap();
bao_tree::io::fsm::encode_ranges_validated(
handle.data_reader(),
ob,
&ChunkRanges::all(),
encoded,
)
.await
.unwrap();
println!("{:?}", handle);
}
}