use std::{
future::Future,
io,
path::PathBuf,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
use genawaiter::sync::{Co, Gen};
use iroh_net::NodeAddr;
use portable_atomic::{AtomicU64, Ordering};
use quic_rpc::{
client::{BoxStreamSync, BoxedConnector},
Connector, RpcClient,
};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;
pub use crate::net_protocol::DownloadMode;
use crate::{
export::ExportProgress as BytesExportProgress,
format::collection::{Collection, SimpleStore},
get::db::DownloadProgress as BytesDownloadProgress,
net_protocol::BlobDownloadRequest,
rpc::proto::RpcService,
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
util::SetTagOption,
BlobFormat, Hash, Tag,
};
mod batch;
pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch};
use super::{flatten, tags};
use crate::rpc::proto::blobs::{
AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse,
BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse,
DeleteRequest, ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest,
ReadAtResponse, ValidateRequest,
};
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Client<C = BoxedConnector<RpcService>> {
pub(super) rpc: RpcClient<RpcService, C>,
}
impl<C> Client<C>
where
C: Connector<RpcService>,
{
pub fn new(rpc: RpcClient<RpcService, C>) -> Self {
Self { rpc }
}
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
let status = self.rpc.rpc(BlobStatusRequest { hash }).await??;
Ok(status.0)
}
pub async fn has(&self, hash: Hash) -> Result<bool> {
match self.status(hash).await {
Ok(BlobStatus::Complete { .. }) => Ok(true),
Ok(_) => Ok(false),
Err(err) => Err(err),
}
}
pub async fn batch(&self) -> Result<Batch<C>> {
let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
let rpc = self.rpc.clone();
Ok(Batch::new(batch, rpc, updates, 1024))
}
pub async fn read(&self, hash: Hash) -> Result<Reader> {
Reader::from_rpc_read(&self.rpc, hash).await
}
pub async fn read_at(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Reader> {
Reader::from_rpc_read_at(&self.rpc, hash, offset, len).await
}
pub async fn read_to_bytes(&self, hash: Hash) -> Result<Bytes> {
Reader::from_rpc_read(&self.rpc, hash)
.await?
.read_to_bytes()
.await
}
pub async fn read_at_to_bytes(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Bytes> {
Reader::from_rpc_read_at(&self.rpc, hash, offset, len)
.await?
.read_to_bytes()
.await
}
pub async fn add_from_path(
&self,
path: PathBuf,
in_place: bool,
tag: SetTagOption,
wrap: WrapOption,
) -> Result<AddProgress> {
let stream = self
.rpc
.server_streaming(AddPathRequest {
path,
in_place,
tag,
wrap,
})
.await?;
Ok(AddProgress::new(stream))
}
pub async fn create_collection(
&self,
collection: Collection,
tag: SetTagOption,
tags_to_delete: Vec<Tag>,
) -> anyhow::Result<(Hash, Tag)> {
let CreateCollectionResponse { hash, tag } = self
.rpc
.rpc(CreateCollectionRequest {
collection,
tag,
tags_to_delete,
})
.await??;
Ok((hash, tag))
}
pub async fn add_reader(
&self,
reader: impl AsyncRead + Unpin + Send + 'static,
tag: SetTagOption,
) -> anyhow::Result<AddProgress> {
const CAP: usize = 1024 * 64; let input = ReaderStream::with_capacity(reader, CAP);
self.add_stream(input, tag).await
}
pub async fn add_stream(
&self,
input: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
tag: SetTagOption,
) -> anyhow::Result<AddProgress> {
let (mut sink, progress) = self.rpc.bidi(AddStreamRequest { tag }).await?;
let mut input = input.map(|chunk| match chunk {
Ok(chunk) => Ok(AddStreamUpdate::Chunk(chunk)),
Err(err) => {
warn!("Abort send, reason: failed to read from source stream: {err:?}");
Ok(AddStreamUpdate::Abort)
}
});
tokio::spawn(async move {
if let Err(err) = sink.send_all(&mut input).await {
warn!("Failed to send input stream to remote: {err:?}");
}
});
Ok(AddProgress::new(progress))
}
pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> anyhow::Result<AddOutcome> {
let input = futures_lite::stream::once(Ok(bytes.into()));
self.add_stream(input, SetTagOption::Auto).await?.await
}
pub async fn add_bytes_named(
&self,
bytes: impl Into<Bytes>,
name: impl Into<Tag>,
) -> anyhow::Result<AddOutcome> {
let input = futures_lite::stream::once(Ok(bytes.into()));
self.add_stream(input, SetTagOption::Named(name.into()))
.await?
.await
}
pub async fn validate(
&self,
repair: bool,
) -> Result<impl Stream<Item = Result<ValidateProgress>>> {
let stream = self
.rpc
.server_streaming(ValidateRequest { repair })
.await?;
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
}
pub async fn consistency_check(
&self,
repair: bool,
) -> Result<impl Stream<Item = Result<ConsistencyCheckProgress>>> {
let stream = self
.rpc
.server_streaming(ConsistencyCheckRequest { repair })
.await?;
Ok(stream.map(|r| r.map_err(anyhow::Error::from)))
}
pub async fn download(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
self.download_with_opts(
hash,
DownloadOptions {
format: BlobFormat::Raw,
nodes: vec![node],
tag: SetTagOption::Auto,
mode: DownloadMode::Queued,
},
)
.await
}
pub async fn download_hash_seq(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
self.download_with_opts(
hash,
DownloadOptions {
format: BlobFormat::HashSeq,
nodes: vec![node],
tag: SetTagOption::Auto,
mode: DownloadMode::Queued,
},
)
.await
}
pub async fn download_with_opts(
&self,
hash: Hash,
opts: DownloadOptions,
) -> Result<DownloadProgress> {
let DownloadOptions {
format,
nodes,
tag,
mode,
} = opts;
let stream = self
.rpc
.server_streaming(BlobDownloadRequest {
hash,
format,
nodes,
tag,
mode,
})
.await?;
Ok(DownloadProgress::new(
stream.map(|res| res.map_err(anyhow::Error::from)),
))
}
pub async fn export(
&self,
hash: Hash,
destination: PathBuf,
format: ExportFormat,
mode: ExportMode,
) -> Result<ExportProgress> {
let req = ExportRequest {
hash,
path: destination,
format,
mode,
};
let stream = self.rpc.server_streaming(req).await?;
Ok(ExportProgress::new(
stream.map(|r| r.map_err(anyhow::Error::from)),
))
}
pub async fn list(&self) -> Result<impl Stream<Item = Result<BlobInfo>>> {
let stream = self.rpc.server_streaming(ListRequest).await?;
Ok(flatten(stream))
}
pub async fn list_incomplete(&self) -> Result<impl Stream<Item = Result<IncompleteBlobInfo>>> {
let stream = self.rpc.server_streaming(ListIncompleteRequest).await?;
Ok(flatten(stream))
}
pub async fn get_collection(&self, hash: Hash) -> Result<Collection> {
Collection::load(hash, self).await
}
pub fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
let this = self.clone();
Ok(Gen::new(|co| async move {
if let Err(cause) = this.list_collections_impl(&co).await {
co.yield_(Err(cause)).await;
}
}))
}
async fn list_collections_impl(&self, co: &Co<Result<CollectionInfo>>) -> Result<()> {
let tags = self.tags_client();
let mut tags = tags.list_hash_seq().await?;
while let Some(tag) = tags.next().await {
let tag = tag?;
if let Ok(collection) = self.get_collection(tag.hash).await {
let info = CollectionInfo {
tag: tag.name,
hash: tag.hash,
total_blobs_count: Some(collection.len() as u64 + 1),
total_blobs_size: Some(0),
};
co.yield_(Ok(info)).await;
}
}
Ok(())
}
pub async fn delete_blob(&self, hash: Hash) -> Result<()> {
self.rpc.rpc(DeleteRequest { hash }).await??;
Ok(())
}
fn tags_client(&self) -> tags::Client<C> {
tags::Client::new(self.rpc.clone())
}
}
impl<C> SimpleStore for Client<C>
where
C: Connector<RpcService>,
{
async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
self.read_to_bytes(hash).await
}
}
#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)]
pub enum ReadAtLen {
#[default]
All,
Exact(u64),
AtMost(u64),
}
impl ReadAtLen {
pub fn as_result_len(&self, size_remaining: u64) -> u64 {
match self {
ReadAtLen::All => size_remaining,
ReadAtLen::Exact(len) => *len,
ReadAtLen::AtMost(len) => std::cmp::min(*len, size_remaining),
}
}
}
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub enum WrapOption {
#[default]
NoWrap,
Wrap {
name: Option<String>,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlobStatus {
NotFound,
Partial {
size: BaoBlobSize,
},
Complete {
size: u64,
},
}
#[derive(Debug, Clone)]
pub struct AddOutcome {
pub hash: Hash,
pub format: BlobFormat,
pub size: u64,
pub tag: Tag,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CollectionInfo {
pub tag: Tag,
pub hash: Hash,
pub total_blobs_count: Option<u64>,
pub total_blobs_size: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobInfo {
pub path: String,
pub hash: Hash,
pub size: u64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct IncompleteBlobInfo {
pub size: u64,
pub expected_size: u64,
pub hash: Hash,
}
#[derive(derive_more::Debug)]
pub struct AddProgress {
#[debug(skip)]
stream:
Pin<Box<dyn Stream<Item = Result<crate::provider::AddProgress>> + Send + Unpin + 'static>>,
current_total_size: Arc<AtomicU64>,
}
impl AddProgress {
fn new(
stream: (impl Stream<
Item = Result<impl Into<crate::provider::AddProgress>, impl Into<anyhow::Error>>,
> + Send
+ Unpin
+ 'static),
) -> Self {
let current_total_size = Arc::new(AtomicU64::new(0));
let total_size = current_total_size.clone();
let stream = stream.map(move |item| match item {
Ok(item) => {
let item = item.into();
if let crate::provider::AddProgress::Found { size, .. } = &item {
total_size.fetch_add(*size, Ordering::Relaxed);
}
Ok(item)
}
Err(err) => Err(err.into()),
});
Self {
stream: Box::pin(stream),
current_total_size,
}
}
pub async fn finish(self) -> Result<AddOutcome> {
self.await
}
}
impl Stream for AddProgress {
type Item = Result<crate::provider::AddProgress>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}
impl Future for AddProgress {
type Output = Result<AddOutcome>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
Poll::Ready(Some(Ok(msg))) => match msg {
crate::provider::AddProgress::AllDone { hash, format, tag } => {
let outcome = AddOutcome {
hash,
format,
tag,
size: self.current_total_size.load(Ordering::Relaxed),
};
return Poll::Ready(Ok(outcome));
}
crate::provider::AddProgress::Abort(err) => {
return Poll::Ready(Err(err.into()));
}
_ => {}
},
}
}
}
}
#[derive(Debug, Clone)]
pub struct DownloadOutcome {
pub local_size: u64,
pub downloaded_size: u64,
pub stats: crate::get::Stats,
}
#[derive(derive_more::Debug)]
pub struct DownloadProgress {
#[debug(skip)]
stream: Pin<Box<dyn Stream<Item = Result<BytesDownloadProgress>> + Send + Unpin + 'static>>,
current_local_size: Arc<AtomicU64>,
current_network_size: Arc<AtomicU64>,
}
impl DownloadProgress {
pub fn new(
stream: (impl Stream<Item = Result<impl Into<BytesDownloadProgress>, impl Into<anyhow::Error>>>
+ Send
+ Unpin
+ 'static),
) -> Self {
let current_local_size = Arc::new(AtomicU64::new(0));
let current_network_size = Arc::new(AtomicU64::new(0));
let local_size = current_local_size.clone();
let network_size = current_network_size.clone();
let stream = stream.map(move |item| match item {
Ok(item) => {
let item = item.into();
match &item {
BytesDownloadProgress::FoundLocal { size, .. } => {
local_size.fetch_add(size.value(), Ordering::Relaxed);
}
BytesDownloadProgress::Found { size, .. } => {
network_size.fetch_add(*size, Ordering::Relaxed);
}
_ => {}
}
Ok(item)
}
Err(err) => Err(err.into()),
});
Self {
stream: Box::pin(stream),
current_local_size,
current_network_size,
}
}
pub async fn finish(self) -> Result<DownloadOutcome> {
self.await
}
}
impl Stream for DownloadProgress {
type Item = Result<BytesDownloadProgress>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}
impl Future for DownloadProgress {
type Output = Result<DownloadOutcome>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
Poll::Ready(Some(Ok(msg))) => match msg {
BytesDownloadProgress::AllDone(stats) => {
let outcome = DownloadOutcome {
local_size: self.current_local_size.load(Ordering::Relaxed),
downloaded_size: self.current_network_size.load(Ordering::Relaxed),
stats,
};
return Poll::Ready(Ok(outcome));
}
BytesDownloadProgress::Abort(err) => {
return Poll::Ready(Err(err.into()));
}
_ => {}
},
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExportOutcome {
total_size: u64,
}
#[derive(derive_more::Debug)]
pub struct ExportProgress {
#[debug(skip)]
stream: Pin<Box<dyn Stream<Item = Result<BytesExportProgress>> + Send + Unpin + 'static>>,
current_total_size: Arc<AtomicU64>,
}
impl ExportProgress {
pub fn new(
stream: (impl Stream<Item = Result<impl Into<BytesExportProgress>, impl Into<anyhow::Error>>>
+ Send
+ Unpin
+ 'static),
) -> Self {
let current_total_size = Arc::new(AtomicU64::new(0));
let total_size = current_total_size.clone();
let stream = stream.map(move |item| match item {
Ok(item) => {
let item = item.into();
if let BytesExportProgress::Found { size, .. } = &item {
let size = size.value();
total_size.fetch_add(size, Ordering::Relaxed);
}
Ok(item)
}
Err(err) => Err(err.into()),
});
Self {
stream: Box::pin(stream),
current_total_size,
}
}
pub async fn finish(self) -> Result<ExportOutcome> {
self.await
}
}
impl Stream for ExportProgress {
type Item = Result<BytesExportProgress>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}
impl Future for ExportProgress {
type Output = Result<ExportOutcome>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
Poll::Ready(Some(Ok(msg))) => match msg {
BytesExportProgress::AllDone => {
let outcome = ExportOutcome {
total_size: self.current_total_size.load(Ordering::Relaxed),
};
return Poll::Ready(Ok(outcome));
}
BytesExportProgress::Abort(err) => {
return Poll::Ready(Err(err.into()));
}
_ => {}
},
}
}
}
}
#[derive(derive_more::Debug)]
pub struct Reader {
size: u64,
response_size: u64,
is_complete: bool,
#[debug("StreamReader")]
stream: tokio_util::io::StreamReader<BoxStreamSync<'static, io::Result<Bytes>>, Bytes>,
}
impl Reader {
fn new(
size: u64,
response_size: u64,
is_complete: bool,
stream: BoxStreamSync<'static, io::Result<Bytes>>,
) -> Self {
Self {
size,
response_size,
is_complete,
stream: StreamReader::new(stream),
}
}
pub async fn from_rpc_read<C>(
rpc: &RpcClient<RpcService, C>,
hash: Hash,
) -> anyhow::Result<Self>
where
C: Connector<RpcService>,
{
Self::from_rpc_read_at(rpc, hash, 0, ReadAtLen::All).await
}
async fn from_rpc_read_at<C>(
rpc: &RpcClient<RpcService, C>,
hash: Hash,
offset: u64,
len: ReadAtLen,
) -> anyhow::Result<Self>
where
C: Connector<RpcService>,
{
let stream = rpc
.server_streaming(ReadAtRequest { hash, offset, len })
.await?;
let mut stream = flatten(stream);
let (size, is_complete) = match stream.next().await {
Some(Ok(ReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
Some(Err(err)) => return Err(err),
Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")),
None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")),
};
let stream = stream.map(|item| match item {
Ok(ReadAtResponse::Data { chunk }) => Ok(chunk),
Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")),
Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))),
});
let len = len.as_result_len(size.value() - offset);
Ok(Self::new(size.value(), len, is_complete, Box::pin(stream)))
}
pub fn size(&self) -> u64 {
self.size
}
pub fn is_complete(&self) -> bool {
self.is_complete
}
pub async fn read_to_bytes(&mut self) -> anyhow::Result<Bytes> {
let mut buf = Vec::with_capacity(self.response_size as usize);
self.read_to_end(&mut buf).await?;
Ok(buf.into())
}
}
impl AsyncRead for Reader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_read(cx, buf)
}
}
impl Stream for Reader {
type Item = io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).get_pin_mut().poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.get_ref().size_hint()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DownloadOptions {
pub format: BlobFormat,
pub nodes: Vec<NodeAddr>,
pub tag: SetTagOption,
pub mode: DownloadMode,
}
#[cfg(test)]
mod tests {
use iroh_net::NodeId;
use rand::RngCore;
use testresult::TestResult;
use tokio::{io::AsyncWriteExt, sync::mpsc};
use super::*;
use crate::hashseq::HashSeq;
mod node {
use std::{path::Path, sync::Arc};
use iroh_net::{NodeAddr, NodeId};
use quic_rpc::transport::{Connector, Listener};
use tokio_util::task::AbortOnDropHandle;
use super::RpcService;
use crate::{
provider::{CustomEventSender, EventSender},
rpc::client::{blobs, tags},
util::local_pool::LocalPool,
};
type RpcClient = quic_rpc::RpcClient<RpcService>;
#[derive(Debug)]
pub struct Node {
router: iroh_router::Router,
client: RpcClient,
_local_pool: LocalPool,
_rpc_task: AbortOnDropHandle<()>,
}
#[derive(Debug)]
pub struct Builder<S> {
store: S,
events: EventSender,
}
impl<S: crate::store::Store> Builder<S> {
pub fn blobs_events(self, events: impl CustomEventSender) -> Self {
Builder {
store: self.store,
events: events.into(),
}
}
pub async fn spawn(self) -> anyhow::Result<Node> {
let (client, router, rpc_task, _local_pool) =
setup_router(self.store, self.events).await?;
Ok(Node {
router,
client,
_rpc_task: AbortOnDropHandle::new(rpc_task),
_local_pool,
})
}
}
impl Node {
pub fn memory() -> Builder<crate::store::mem::Store> {
Builder {
store: crate::store::mem::Store::new(),
events: Default::default(),
}
}
pub async fn persistent(
path: impl AsRef<Path>,
) -> anyhow::Result<Builder<crate::store::fs::Store>> {
Ok(Builder {
store: crate::store::fs::Store::load(path).await?,
events: Default::default(),
})
}
pub fn node_id(&self) -> NodeId {
self.router.endpoint().node_id()
}
pub async fn node_addr(&self) -> anyhow::Result<NodeAddr> {
self.router.endpoint().node_addr().await
}
pub async fn shutdown(self) -> anyhow::Result<()> {
self.router.shutdown().await
}
pub fn blobs(&self) -> blobs::Client {
blobs::Client::new(self.client.clone())
}
pub fn tags(&self) -> tags::Client {
tags::Client::new(self.client.clone())
}
}
async fn setup_router<S: crate::store::Store>(
store: S,
events: EventSender,
) -> anyhow::Result<(
RpcClient,
iroh_router::Router,
tokio::task::JoinHandle<()>,
LocalPool,
)> {
let endpoint = iroh_net::Endpoint::builder().discovery_n0().bind().await?;
let local_pool = LocalPool::single();
let mut router = iroh_router::Router::builder(endpoint.clone());
let downloader = crate::downloader::Downloader::new(
store.clone(),
endpoint.clone(),
local_pool.handle().clone(),
);
let blobs = Arc::new(crate::net_protocol::Blobs::new_with_events(
store.clone(),
local_pool.handle().clone(),
events,
downloader,
endpoint.clone(),
));
router = router.accept(crate::protocol::ALPN.to_vec(), blobs.clone());
let router = router.spawn().await?;
let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32);
let controller = controller.boxed();
let internal_rpc = internal_rpc.boxed();
let internal_rpc = quic_rpc::RpcServer::new(internal_rpc);
let rpc_server_task: tokio::task::JoinHandle<()> = tokio::task::spawn(async move {
loop {
let request = internal_rpc.accept().await;
match request {
Ok(accepting) => {
let blobs = blobs.clone();
tokio::task::spawn(async move {
let (msg, chan) = accepting.read_first().await.unwrap();
blobs.handle_rpc_request(msg, chan).await.unwrap();
});
}
Err(err) => {
tracing::warn!("rpc error: {:?}", err);
}
}
}
});
let client = quic_rpc::RpcClient::new(controller);
Ok((client, router, rpc_server_task, local_pool))
}
}
#[tokio::test]
async fn test_blob_create_collection() -> Result<()> {
let _guard = iroh_test::logging::setup();
let node = node::Node::memory().spawn().await?;
let temp_dir = tempfile::tempdir().context("tempdir")?;
let in_root = temp_dir.path().join("in");
tokio::fs::create_dir_all(in_root.clone())
.await
.context("create dir all")?;
let mut paths = Vec::new();
for i in 0..5 {
let path = in_root.join(format!("test-{i}"));
let size = 100;
let mut buf = vec![0u8; size];
rand::thread_rng().fill_bytes(&mut buf);
let mut file = tokio::fs::File::create(path.clone())
.await
.context("create file")?;
file.write_all(&buf.clone()).await.context("write_all")?;
file.flush().await.context("flush")?;
paths.push(path);
}
let blobs = node.blobs();
let mut collection = Collection::default();
let mut tags = Vec::new();
for path in &paths {
let import_outcome = blobs
.add_from_path(
path.to_path_buf(),
false,
SetTagOption::Auto,
WrapOption::NoWrap,
)
.await
.context("import file")?
.finish()
.await
.context("import finish")?;
collection.push(
path.file_name().unwrap().to_str().unwrap().to_string(),
import_outcome.hash,
);
tags.push(import_outcome.tag);
}
let (hash, tag) = blobs
.create_collection(collection, SetTagOption::Auto, tags)
.await?;
let collections: Vec<_> = blobs.list_collections()?.try_collect().await?;
assert_eq!(collections.len(), 1);
{
let CollectionInfo {
tag,
hash,
total_blobs_count,
..
} = &collections[0];
assert_eq!(tag, tag);
assert_eq!(hash, hash);
assert_eq!(total_blobs_count, &Some(5 + 1));
}
let tags: Vec<_> = node.tags().list().await?.try_collect().await?;
assert_eq!(tags.len(), 1);
assert_eq!(tags[0].hash, hash);
assert_eq!(tags[0].name, tag);
assert_eq!(tags[0].format, BlobFormat::HashSeq);
Ok(())
}
#[tokio::test]
async fn test_blob_read_at() -> Result<()> {
let node = node::Node::memory().spawn().await?;
let temp_dir = tempfile::tempdir().context("tempdir")?;
let in_root = temp_dir.path().join("in");
tokio::fs::create_dir_all(in_root.clone())
.await
.context("create dir all")?;
let path = in_root.join("test-blob");
let size = 1024 * 128;
let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
let mut file = tokio::fs::File::create(path.clone())
.await
.context("create file")?;
file.write_all(&buf.clone()).await.context("write_all")?;
file.flush().await.context("flush")?;
let blobs = node.blobs();
let import_outcome = blobs
.add_from_path(
path.to_path_buf(),
false,
SetTagOption::Auto,
WrapOption::NoWrap,
)
.await
.context("import file")?
.finish()
.await
.context("import finish")?;
let hash = import_outcome.hash;
let res = blobs.read_to_bytes(hash).await?;
assert_eq!(&res, &buf[..]);
let res = blobs
.read_at_to_bytes(hash, 0, ReadAtLen::Exact(100))
.await?;
assert_eq!(res.len(), 100);
assert_eq!(&res[..], &buf[0..100]);
let res = blobs
.read_at_to_bytes(hash, 20, ReadAtLen::Exact(120))
.await?;
assert_eq!(res.len(), 120);
assert_eq!(&res[..], &buf[20..140]);
let res = blobs
.read_at_to_bytes(hash, 0, ReadAtLen::Exact(1024 * 64))
.await?;
assert_eq!(res.len(), 1024 * 64);
assert_eq!(&res[..], &buf[0..1024 * 64]);
let res = blobs
.read_at_to_bytes(hash, 20, ReadAtLen::Exact(1024 * 64))
.await?;
assert_eq!(res.len(), 1024 * 64);
assert_eq!(&res[..], &buf[20..(20 + 1024 * 64)]);
let res = blobs
.read_at_to_bytes(hash, 0, ReadAtLen::Exact(10 + 1024 * 64))
.await?;
assert_eq!(res.len(), 10 + 1024 * 64);
assert_eq!(&res[..], &buf[0..(10 + 1024 * 64)]);
let res = blobs
.read_at_to_bytes(hash, 20, ReadAtLen::Exact(10 + 1024 * 64))
.await?;
assert_eq!(res.len(), 10 + 1024 * 64);
assert_eq!(&res[..], &buf[20..(20 + 10 + 1024 * 64)]);
let res = blobs.read_at_to_bytes(hash, 20, ReadAtLen::All).await?;
assert_eq!(res.len(), 1024 * 128 - 20);
assert_eq!(&res[..], &buf[20..]);
let reader = blobs.read_at(hash, 0, ReadAtLen::Exact(20)).await?;
assert_eq!(reader.size(), 1024 * 128);
assert_eq!(reader.response_size, 20);
let res = blobs
.read_at_to_bytes(hash, 1024 * 127, ReadAtLen::Exact(1024))
.await?;
assert_eq!(res.len(), 1024);
assert_eq!(res, &buf[1024 * 127..]);
let res = blobs
.read_at_to_bytes(hash, 1024 * 127, ReadAtLen::All)
.await?;
assert_eq!(res.len(), 1024);
assert_eq!(res, &buf[1024 * 127..]);
let mut res = blobs
.read_at(hash, 1024 * 127, ReadAtLen::AtMost(2048))
.await?;
assert_eq!(res.size, 1024 * 128);
assert_eq!(res.response_size, 1024);
let res = res.read_to_bytes().await?;
assert_eq!(res.len(), 1024);
assert_eq!(res, &buf[1024 * 127..]);
let res = blobs
.read_at(hash, 0, ReadAtLen::Exact(1024 * 128 + 1))
.await;
let err = res.unwrap_err();
assert!(err.to_string().contains("out of bound"));
let res = blobs.read_at(hash, 1024 * 128 + 1, ReadAtLen::All).await;
let err = res.unwrap_err();
assert!(err.to_string().contains("out of range"));
let res = blobs
.read_at(hash, 1024 * 127, ReadAtLen::Exact(1025))
.await;
let err = res.unwrap_err();
assert!(err.to_string().contains("out of bound"));
Ok(())
}
#[tokio::test]
async fn test_blob_get_collection() -> Result<()> {
let _guard = iroh_test::logging::setup();
let node = node::Node::memory().spawn().await?;
let temp_dir = tempfile::tempdir().context("tempdir")?;
let in_root = temp_dir.path().join("in");
tokio::fs::create_dir_all(in_root.clone())
.await
.context("create dir all")?;
let mut paths = Vec::new();
for i in 0..5 {
let path = in_root.join(format!("test-{i}"));
let size = 100;
let mut buf = vec![0u8; size];
rand::thread_rng().fill_bytes(&mut buf);
let mut file = tokio::fs::File::create(path.clone())
.await
.context("create file")?;
file.write_all(&buf.clone()).await.context("write_all")?;
file.flush().await.context("flush")?;
paths.push(path);
}
let blobs = node.blobs();
let mut collection = Collection::default();
let mut tags = Vec::new();
for path in &paths {
let import_outcome = blobs
.add_from_path(
path.to_path_buf(),
false,
SetTagOption::Auto,
WrapOption::NoWrap,
)
.await
.context("import file")?
.finish()
.await
.context("import finish")?;
collection.push(
path.file_name().unwrap().to_str().unwrap().to_string(),
import_outcome.hash,
);
tags.push(import_outcome.tag);
}
let (hash, _tag) = blobs
.create_collection(collection, SetTagOption::Auto, tags)
.await?;
let collection = blobs.get_collection(hash).await?;
assert_eq!(collection.len(), 5);
Ok(())
}
#[tokio::test]
async fn test_blob_share() -> Result<()> {
let _guard = iroh_test::logging::setup();
let node = node::Node::memory().spawn().await?;
let temp_dir = tempfile::tempdir().context("tempdir")?;
let in_root = temp_dir.path().join("in");
tokio::fs::create_dir_all(in_root.clone())
.await
.context("create dir all")?;
let path = in_root.join("test-blob");
let size = 1024 * 128;
let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
let mut file = tokio::fs::File::create(path.clone())
.await
.context("create file")?;
file.write_all(&buf.clone()).await.context("write_all")?;
file.flush().await.context("flush")?;
let blobs = node.blobs();
let import_outcome = blobs
.add_from_path(
path.to_path_buf(),
false,
SetTagOption::Auto,
WrapOption::NoWrap,
)
.await
.context("import file")?
.finish()
.await
.context("import finish")?;
let status = blobs.status(import_outcome.hash).await?;
assert_eq!(status, BlobStatus::Complete { size });
Ok(())
}
#[derive(Debug, Clone)]
struct BlobEvents {
sender: mpsc::Sender<crate::provider::Event>,
}
impl BlobEvents {
fn new(cap: usize) -> (Self, mpsc::Receiver<crate::provider::Event>) {
let (s, r) = mpsc::channel(cap);
(Self { sender: s }, r)
}
}
impl crate::provider::CustomEventSender for BlobEvents {
fn send(&self, event: crate::provider::Event) -> futures_lite::future::Boxed<()> {
let sender = self.sender.clone();
Box::pin(async move {
sender.send(event).await.ok();
})
}
fn try_send(&self, event: crate::provider::Event) {
self.sender.try_send(event).ok();
}
}
#[tokio::test]
async fn test_blob_provide_events() -> Result<()> {
let _guard = iroh_test::logging::setup();
let (node1_events, mut node1_events_r) = BlobEvents::new(16);
let node1 = node::Node::memory()
.blobs_events(node1_events)
.spawn()
.await?;
let (node2_events, mut node2_events_r) = BlobEvents::new(16);
let node2 = node::Node::memory()
.blobs_events(node2_events)
.spawn()
.await?;
let import_outcome = node1.blobs().add_bytes(&b"hello world"[..]).await?;
let node1_addr = node1.node_addr().await?;
let res = node2
.blobs()
.download(import_outcome.hash, node1_addr)
.await?
.await?;
dbg!(&res);
assert_eq!(res.local_size, 0);
assert_eq!(res.downloaded_size, 11);
node1.shutdown().await?;
node2.shutdown().await?;
let mut ev1 = Vec::new();
while let Some(ev) = node1_events_r.recv().await {
ev1.push(ev);
}
assert!(matches!(
ev1[0],
crate::provider::Event::ClientConnected { .. }
));
assert!(matches!(
ev1[1],
crate::provider::Event::GetRequestReceived { .. }
));
assert!(matches!(
ev1[2],
crate::provider::Event::TransferProgress { .. }
));
assert!(matches!(
ev1[3],
crate::provider::Event::TransferCompleted { .. }
));
dbg!(&ev1);
let mut ev2 = Vec::new();
while let Some(ev) = node2_events_r.recv().await {
ev2.push(ev);
}
assert!(ev2.is_empty());
Ok(())
}
#[tokio::test]
async fn test_blob_get_self_existing() -> TestResult<()> {
let _guard = iroh_test::logging::setup();
let node = node::Node::memory().spawn().await?;
let node_id = node.node_id();
let blobs = node.blobs();
let AddOutcome { hash, size, .. } = blobs.add_bytes("foo").await?;
let res = blobs
.download_with_opts(
hash,
DownloadOptions {
format: BlobFormat::Raw,
nodes: vec![node_id.into()],
tag: SetTagOption::Auto,
mode: DownloadMode::Direct,
},
)
.await?
.await?;
assert_eq!(res.local_size, size);
assert_eq!(res.downloaded_size, 0);
let res = blobs
.download_with_opts(
hash,
DownloadOptions {
format: BlobFormat::Raw,
nodes: vec![node_id.into()],
tag: SetTagOption::Auto,
mode: DownloadMode::Queued,
},
)
.await?
.await?;
assert_eq!(res.local_size, size);
assert_eq!(res.downloaded_size, 0);
Ok(())
}
#[tokio::test]
async fn test_blob_get_self_missing() -> TestResult<()> {
let _guard = iroh_test::logging::setup();
let node = node::Node::memory().spawn().await?;
let node_id = node.node_id();
let blobs = node.blobs();
let hash = Hash::from_bytes([0u8; 32]);
let res = blobs
.download_with_opts(
hash,
DownloadOptions {
format: BlobFormat::Raw,
nodes: vec![node_id.into()],
tag: SetTagOption::Auto,
mode: DownloadMode::Direct,
},
)
.await?
.await;
assert!(res.is_err());
assert_eq!(
res.err().unwrap().to_string().as_str(),
"No nodes to download from provided"
);
let res = blobs
.download_with_opts(
hash,
DownloadOptions {
format: BlobFormat::Raw,
nodes: vec![node_id.into()],
tag: SetTagOption::Auto,
mode: DownloadMode::Queued,
},
)
.await?
.await;
assert!(res.is_err());
assert_eq!(
res.err().unwrap().to_string().as_str(),
"No provider nodes found"
);
Ok(())
}
#[tokio::test]
async fn test_blob_get_existing_collection() -> TestResult<()> {
let _guard = iroh_test::logging::setup();
let node = node::Node::memory().spawn().await?;
let node_id = NodeId::from_bytes(&[0u8; 32])?;
let blobs = node.blobs();
let mut collection = Collection::default();
let mut tags = Vec::new();
let mut size = 0;
for value in ["iroh", "is", "cool"] {
let import_outcome = blobs.add_bytes(value).await.context("add bytes")?;
collection.push(value.to_string(), import_outcome.hash);
tags.push(import_outcome.tag);
size += import_outcome.size;
}
let (hash, _tag) = blobs
.create_collection(collection, SetTagOption::Auto, tags)
.await?;
let hashseq_bytes = blobs.read_to_bytes(hash).await?;
size += hashseq_bytes.len() as u64;
let hashseq = HashSeq::try_from(hashseq_bytes)?;
let collection_header_bytes = blobs
.read_to_bytes(hashseq.into_iter().next().expect("header to exist"))
.await?;
size += collection_header_bytes.len() as u64;
let res = blobs
.download_with_opts(
hash,
DownloadOptions {
format: BlobFormat::HashSeq,
nodes: vec![node_id.into()],
tag: SetTagOption::Auto,
mode: DownloadMode::Direct,
},
)
.await?
.await
.context("direct (download)")?;
assert_eq!(res.local_size, size);
assert_eq!(res.downloaded_size, 0);
let res = blobs
.download_with_opts(
hash,
DownloadOptions {
format: BlobFormat::HashSeq,
nodes: vec![node_id.into()],
tag: SetTagOption::Auto,
mode: DownloadMode::Queued,
},
)
.await?
.await
.context("queued")?;
assert_eq!(res.local_size, size);
assert_eq!(res.downloaded_size, 0);
Ok(())
}
#[tokio::test]
#[cfg_attr(target_os = "windows", ignore = "flaky")]
async fn test_blob_delete_mem() -> Result<()> {
let _guard = iroh_test::logging::setup();
let node = node::Node::memory().spawn().await?;
let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
assert_eq!(hashes.len(), 1);
assert_eq!(hashes[0].hash, res.hash);
node.blobs().delete_blob(res.hash).await?;
let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
assert!(hashes.is_empty());
Ok(())
}
#[tokio::test]
async fn test_blob_delete_fs() -> Result<()> {
let _guard = iroh_test::logging::setup();
let dir = tempfile::tempdir()?;
let node = node::Node::persistent(dir.path()).await?.spawn().await?;
let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
assert_eq!(hashes.len(), 1);
assert_eq!(hashes[0].hash, res.hash);
node.blobs().delete_blob(res.hash).await?;
let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
assert!(hashes.is_empty());
Ok(())
}
}