use std::{
    future::Future,
    io,
    path::PathBuf,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};
use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
use genawaiter::sync::{Co, Gen};
use iroh_net::NodeAddr;
use portable_atomic::{AtomicU64, Ordering};
use quic_rpc::{
    client::{BoxStreamSync, BoxedConnector},
    Connector, RpcClient,
};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;
pub use crate::net_protocol::DownloadMode;
use crate::{
    export::ExportProgress as BytesExportProgress,
    format::collection::{Collection, SimpleStore},
    get::db::DownloadProgress as BytesDownloadProgress,
    net_protocol::BlobDownloadRequest,
    rpc::proto::RpcService,
    store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
    util::SetTagOption,
    BlobFormat, Hash, Tag,
};
mod batch;
pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch};
use super::{flatten, tags};
use crate::rpc::proto::blobs::{
    AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse,
    BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse,
    DeleteRequest, ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest,
    ReadAtResponse, ValidateRequest,
};
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Client<C = BoxedConnector<RpcService>> {
    pub(super) rpc: RpcClient<RpcService, C>,
}
pub type MemClient = Client<crate::rpc::MemConnector>;
impl<C> Client<C>
where
    C: Connector<RpcService>,
{
    pub fn new(rpc: RpcClient<RpcService, C>) -> Self {
        Self { rpc }
    }
    pub fn tags(&self) -> tags::Client<C> {
        tags::Client::new(self.rpc.clone())
    }
    pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
        let status = self.rpc.rpc(BlobStatusRequest { hash }).await??;
        Ok(status.0)
    }
    pub async fn has(&self, hash: Hash) -> Result<bool> {
        match self.status(hash).await {
            Ok(BlobStatus::Complete { .. }) => Ok(true),
            Ok(_) => Ok(false),
            Err(err) => Err(err),
        }
    }
    pub async fn batch(&self) -> Result<Batch<C>> {
        let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
        let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
        let rpc = self.rpc.clone();
        Ok(Batch::new(batch, rpc, updates, 1024))
    }
    pub async fn read(&self, hash: Hash) -> Result<Reader> {
        Reader::from_rpc_read(&self.rpc, hash).await
    }
    pub async fn read_at(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Reader> {
        Reader::from_rpc_read_at(&self.rpc, hash, offset, len).await
    }
    pub async fn read_to_bytes(&self, hash: Hash) -> Result<Bytes> {
        Reader::from_rpc_read(&self.rpc, hash)
            .await?
            .read_to_bytes()
            .await
    }
    pub async fn read_at_to_bytes(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Bytes> {
        Reader::from_rpc_read_at(&self.rpc, hash, offset, len)
            .await?
            .read_to_bytes()
            .await
    }
    pub async fn add_from_path(
        &self,
        path: PathBuf,
        in_place: bool,
        tag: SetTagOption,
        wrap: WrapOption,
    ) -> Result<AddProgress> {
        let stream = self
            .rpc
            .server_streaming(AddPathRequest {
                path,
                in_place,
                tag,
                wrap,
            })
            .await?;
        Ok(AddProgress::new(stream))
    }
    pub async fn create_collection(
        &self,
        collection: Collection,
        tag: SetTagOption,
        tags_to_delete: Vec<Tag>,
    ) -> anyhow::Result<(Hash, Tag)> {
        let CreateCollectionResponse { hash, tag } = self
            .rpc
            .rpc(CreateCollectionRequest {
                collection,
                tag,
                tags_to_delete,
            })
            .await??;
        Ok((hash, tag))
    }
    pub async fn add_reader(
        &self,
        reader: impl AsyncRead + Unpin + Send + 'static,
        tag: SetTagOption,
    ) -> anyhow::Result<AddProgress> {
        const CAP: usize = 1024 * 64; let input = ReaderStream::with_capacity(reader, CAP);
        self.add_stream(input, tag).await
    }
    pub async fn add_stream(
        &self,
        input: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
        tag: SetTagOption,
    ) -> anyhow::Result<AddProgress> {
        let (mut sink, progress) = self.rpc.bidi(AddStreamRequest { tag }).await?;
        let mut input = input.map(|chunk| match chunk {
            Ok(chunk) => Ok(AddStreamUpdate::Chunk(chunk)),
            Err(err) => {
                warn!("Abort send, reason: failed to read from source stream: {err:?}");
                Ok(AddStreamUpdate::Abort)
            }
        });
        tokio::spawn(async move {
            if let Err(err) = sink.send_all(&mut input).await {
                warn!("Failed to send input stream to remote: {err:?}");
            }
        });
        Ok(AddProgress::new(progress))
    }
    pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> anyhow::Result<AddOutcome> {
        let input = futures_lite::stream::once(Ok(bytes.into()));
        self.add_stream(input, SetTagOption::Auto).await?.await
    }
    pub async fn add_bytes_named(
        &self,
        bytes: impl Into<Bytes>,
        name: impl Into<Tag>,
    ) -> anyhow::Result<AddOutcome> {
        let input = futures_lite::stream::once(Ok(bytes.into()));
        self.add_stream(input, SetTagOption::Named(name.into()))
            .await?
            .await
    }
    pub async fn validate(
        &self,
        repair: bool,
    ) -> Result<impl Stream<Item = Result<ValidateProgress>>> {
        let stream = self
            .rpc
            .server_streaming(ValidateRequest { repair })
            .await?;
        Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
    }
    pub async fn consistency_check(
        &self,
        repair: bool,
    ) -> Result<impl Stream<Item = Result<ConsistencyCheckProgress>>> {
        let stream = self
            .rpc
            .server_streaming(ConsistencyCheckRequest { repair })
            .await?;
        Ok(stream.map(|r| r.map_err(anyhow::Error::from)))
    }
    pub async fn download(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
        self.download_with_opts(
            hash,
            DownloadOptions {
                format: BlobFormat::Raw,
                nodes: vec![node],
                tag: SetTagOption::Auto,
                mode: DownloadMode::Queued,
            },
        )
        .await
    }
    pub async fn download_hash_seq(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
        self.download_with_opts(
            hash,
            DownloadOptions {
                format: BlobFormat::HashSeq,
                nodes: vec![node],
                tag: SetTagOption::Auto,
                mode: DownloadMode::Queued,
            },
        )
        .await
    }
    pub async fn download_with_opts(
        &self,
        hash: Hash,
        opts: DownloadOptions,
    ) -> Result<DownloadProgress> {
        let DownloadOptions {
            format,
            nodes,
            tag,
            mode,
        } = opts;
        let stream = self
            .rpc
            .server_streaming(BlobDownloadRequest {
                hash,
                format,
                nodes,
                tag,
                mode,
            })
            .await?;
        Ok(DownloadProgress::new(
            stream.map(|res| res.map_err(anyhow::Error::from)),
        ))
    }
    pub async fn export(
        &self,
        hash: Hash,
        destination: PathBuf,
        format: ExportFormat,
        mode: ExportMode,
    ) -> Result<ExportProgress> {
        let req = ExportRequest {
            hash,
            path: destination,
            format,
            mode,
        };
        let stream = self.rpc.server_streaming(req).await?;
        Ok(ExportProgress::new(
            stream.map(|r| r.map_err(anyhow::Error::from)),
        ))
    }
    pub async fn list(&self) -> Result<impl Stream<Item = Result<BlobInfo>>> {
        let stream = self.rpc.server_streaming(ListRequest).await?;
        Ok(flatten(stream))
    }
    pub async fn list_incomplete(&self) -> Result<impl Stream<Item = Result<IncompleteBlobInfo>>> {
        let stream = self.rpc.server_streaming(ListIncompleteRequest).await?;
        Ok(flatten(stream))
    }
    pub async fn get_collection(&self, hash: Hash) -> Result<Collection> {
        Collection::load(hash, self).await
    }
    pub fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
        let this = self.clone();
        Ok(Gen::new(|co| async move {
            if let Err(cause) = this.list_collections_impl(&co).await {
                co.yield_(Err(cause)).await;
            }
        }))
    }
    async fn list_collections_impl(&self, co: &Co<Result<CollectionInfo>>) -> Result<()> {
        let tags = self.tags_client();
        let mut tags = tags.list_hash_seq().await?;
        while let Some(tag) = tags.next().await {
            let tag = tag?;
            if let Ok(collection) = self.get_collection(tag.hash).await {
                let info = CollectionInfo {
                    tag: tag.name,
                    hash: tag.hash,
                    total_blobs_count: Some(collection.len() as u64 + 1),
                    total_blobs_size: Some(0),
                };
                co.yield_(Ok(info)).await;
            }
        }
        Ok(())
    }
    pub async fn delete_blob(&self, hash: Hash) -> Result<()> {
        self.rpc.rpc(DeleteRequest { hash }).await??;
        Ok(())
    }
    fn tags_client(&self) -> tags::Client<C> {
        tags::Client::new(self.rpc.clone())
    }
}
impl<C> SimpleStore for Client<C>
where
    C: Connector<RpcService>,
{
    async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
        self.read_to_bytes(hash).await
    }
}
#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)]
pub enum ReadAtLen {
    #[default]
    All,
    Exact(u64),
    AtMost(u64),
}
impl ReadAtLen {
    pub fn as_result_len(&self, size_remaining: u64) -> u64 {
        match self {
            ReadAtLen::All => size_remaining,
            ReadAtLen::Exact(len) => *len,
            ReadAtLen::AtMost(len) => std::cmp::min(*len, size_remaining),
        }
    }
}
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub enum WrapOption {
    #[default]
    NoWrap,
    Wrap {
        name: Option<String>,
    },
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlobStatus {
    NotFound,
    Partial {
        size: BaoBlobSize,
    },
    Complete {
        size: u64,
    },
}
#[derive(Debug, Clone)]
pub struct AddOutcome {
    pub hash: Hash,
    pub format: BlobFormat,
    pub size: u64,
    pub tag: Tag,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CollectionInfo {
    pub tag: Tag,
    pub hash: Hash,
    pub total_blobs_count: Option<u64>,
    pub total_blobs_size: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobInfo {
    pub path: String,
    pub hash: Hash,
    pub size: u64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct IncompleteBlobInfo {
    pub size: u64,
    pub expected_size: u64,
    pub hash: Hash,
}
#[derive(derive_more::Debug)]
pub struct AddProgress {
    #[debug(skip)]
    stream:
        Pin<Box<dyn Stream<Item = Result<crate::provider::AddProgress>> + Send + Unpin + 'static>>,
    current_total_size: Arc<AtomicU64>,
}
impl AddProgress {
    fn new(
        stream: (impl Stream<
            Item = Result<impl Into<crate::provider::AddProgress>, impl Into<anyhow::Error>>,
        > + Send
             + Unpin
             + 'static),
    ) -> Self {
        let current_total_size = Arc::new(AtomicU64::new(0));
        let total_size = current_total_size.clone();
        let stream = stream.map(move |item| match item {
            Ok(item) => {
                let item = item.into();
                if let crate::provider::AddProgress::Found { size, .. } = &item {
                    total_size.fetch_add(*size, Ordering::Relaxed);
                }
                Ok(item)
            }
            Err(err) => Err(err.into()),
        });
        Self {
            stream: Box::pin(stream),
            current_total_size,
        }
    }
    pub async fn finish(self) -> Result<AddOutcome> {
        self.await
    }
}
impl Stream for AddProgress {
    type Item = Result<crate::provider::AddProgress>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.stream).poll_next(cx)
    }
}
impl Future for AddProgress {
    type Output = Result<AddOutcome>;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            match Pin::new(&mut self.stream).poll_next(cx) {
                Poll::Pending => return Poll::Pending,
                Poll::Ready(None) => {
                    return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
                }
                Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
                Poll::Ready(Some(Ok(msg))) => match msg {
                    crate::provider::AddProgress::AllDone { hash, format, tag } => {
                        let outcome = AddOutcome {
                            hash,
                            format,
                            tag,
                            size: self.current_total_size.load(Ordering::Relaxed),
                        };
                        return Poll::Ready(Ok(outcome));
                    }
                    crate::provider::AddProgress::Abort(err) => {
                        return Poll::Ready(Err(err.into()));
                    }
                    _ => {}
                },
            }
        }
    }
}
#[derive(Debug, Clone)]
pub struct DownloadOutcome {
    pub local_size: u64,
    pub downloaded_size: u64,
    pub stats: crate::get::Stats,
}
#[derive(derive_more::Debug)]
pub struct DownloadProgress {
    #[debug(skip)]
    stream: Pin<Box<dyn Stream<Item = Result<BytesDownloadProgress>> + Send + Unpin + 'static>>,
    current_local_size: Arc<AtomicU64>,
    current_network_size: Arc<AtomicU64>,
}
impl DownloadProgress {
    pub fn new(
        stream: (impl Stream<Item = Result<impl Into<BytesDownloadProgress>, impl Into<anyhow::Error>>>
             + Send
             + Unpin
             + 'static),
    ) -> Self {
        let current_local_size = Arc::new(AtomicU64::new(0));
        let current_network_size = Arc::new(AtomicU64::new(0));
        let local_size = current_local_size.clone();
        let network_size = current_network_size.clone();
        let stream = stream.map(move |item| match item {
            Ok(item) => {
                let item = item.into();
                match &item {
                    BytesDownloadProgress::FoundLocal { size, .. } => {
                        local_size.fetch_add(size.value(), Ordering::Relaxed);
                    }
                    BytesDownloadProgress::Found { size, .. } => {
                        network_size.fetch_add(*size, Ordering::Relaxed);
                    }
                    _ => {}
                }
                Ok(item)
            }
            Err(err) => Err(err.into()),
        });
        Self {
            stream: Box::pin(stream),
            current_local_size,
            current_network_size,
        }
    }
    pub async fn finish(self) -> Result<DownloadOutcome> {
        self.await
    }
}
impl Stream for DownloadProgress {
    type Item = Result<BytesDownloadProgress>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.stream).poll_next(cx)
    }
}
impl Future for DownloadProgress {
    type Output = Result<DownloadOutcome>;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            match Pin::new(&mut self.stream).poll_next(cx) {
                Poll::Pending => return Poll::Pending,
                Poll::Ready(None) => {
                    return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
                }
                Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
                Poll::Ready(Some(Ok(msg))) => match msg {
                    BytesDownloadProgress::AllDone(stats) => {
                        let outcome = DownloadOutcome {
                            local_size: self.current_local_size.load(Ordering::Relaxed),
                            downloaded_size: self.current_network_size.load(Ordering::Relaxed),
                            stats,
                        };
                        return Poll::Ready(Ok(outcome));
                    }
                    BytesDownloadProgress::Abort(err) => {
                        return Poll::Ready(Err(err.into()));
                    }
                    _ => {}
                },
            }
        }
    }
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExportOutcome {
    total_size: u64,
}
#[derive(derive_more::Debug)]
pub struct ExportProgress {
    #[debug(skip)]
    stream: Pin<Box<dyn Stream<Item = Result<BytesExportProgress>> + Send + Unpin + 'static>>,
    current_total_size: Arc<AtomicU64>,
}
impl ExportProgress {
    pub fn new(
        stream: (impl Stream<Item = Result<impl Into<BytesExportProgress>, impl Into<anyhow::Error>>>
             + Send
             + Unpin
             + 'static),
    ) -> Self {
        let current_total_size = Arc::new(AtomicU64::new(0));
        let total_size = current_total_size.clone();
        let stream = stream.map(move |item| match item {
            Ok(item) => {
                let item = item.into();
                if let BytesExportProgress::Found { size, .. } = &item {
                    let size = size.value();
                    total_size.fetch_add(size, Ordering::Relaxed);
                }
                Ok(item)
            }
            Err(err) => Err(err.into()),
        });
        Self {
            stream: Box::pin(stream),
            current_total_size,
        }
    }
    pub async fn finish(self) -> Result<ExportOutcome> {
        self.await
    }
}
impl Stream for ExportProgress {
    type Item = Result<BytesExportProgress>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.stream).poll_next(cx)
    }
}
impl Future for ExportProgress {
    type Output = Result<ExportOutcome>;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            match Pin::new(&mut self.stream).poll_next(cx) {
                Poll::Pending => return Poll::Pending,
                Poll::Ready(None) => {
                    return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
                }
                Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
                Poll::Ready(Some(Ok(msg))) => match msg {
                    BytesExportProgress::AllDone => {
                        let outcome = ExportOutcome {
                            total_size: self.current_total_size.load(Ordering::Relaxed),
                        };
                        return Poll::Ready(Ok(outcome));
                    }
                    BytesExportProgress::Abort(err) => {
                        return Poll::Ready(Err(err.into()));
                    }
                    _ => {}
                },
            }
        }
    }
}
#[derive(derive_more::Debug)]
pub struct Reader {
    size: u64,
    response_size: u64,
    is_complete: bool,
    #[debug("StreamReader")]
    stream: tokio_util::io::StreamReader<BoxStreamSync<'static, io::Result<Bytes>>, Bytes>,
}
impl Reader {
    fn new(
        size: u64,
        response_size: u64,
        is_complete: bool,
        stream: BoxStreamSync<'static, io::Result<Bytes>>,
    ) -> Self {
        Self {
            size,
            response_size,
            is_complete,
            stream: StreamReader::new(stream),
        }
    }
    pub async fn from_rpc_read<C>(
        rpc: &RpcClient<RpcService, C>,
        hash: Hash,
    ) -> anyhow::Result<Self>
    where
        C: Connector<RpcService>,
    {
        Self::from_rpc_read_at(rpc, hash, 0, ReadAtLen::All).await
    }
    async fn from_rpc_read_at<C>(
        rpc: &RpcClient<RpcService, C>,
        hash: Hash,
        offset: u64,
        len: ReadAtLen,
    ) -> anyhow::Result<Self>
    where
        C: Connector<RpcService>,
    {
        let stream = rpc
            .server_streaming(ReadAtRequest { hash, offset, len })
            .await?;
        let mut stream = flatten(stream);
        let (size, is_complete) = match stream.next().await {
            Some(Ok(ReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
            Some(Err(err)) => return Err(err),
            Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")),
            None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")),
        };
        let stream = stream.map(|item| match item {
            Ok(ReadAtResponse::Data { chunk }) => Ok(chunk),
            Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")),
            Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))),
        });
        let len = len.as_result_len(size.value() - offset);
        Ok(Self::new(size.value(), len, is_complete, Box::pin(stream)))
    }
    pub fn size(&self) -> u64 {
        self.size
    }
    pub fn is_complete(&self) -> bool {
        self.is_complete
    }
    pub async fn read_to_bytes(&mut self) -> anyhow::Result<Bytes> {
        let mut buf = Vec::with_capacity(self.response_size as usize);
        self.read_to_end(&mut buf).await?;
        Ok(buf.into())
    }
}
impl AsyncRead for Reader {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        Pin::new(&mut self.stream).poll_read(cx, buf)
    }
}
impl Stream for Reader {
    type Item = io::Result<Bytes>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.stream).get_pin_mut().poll_next(cx)
    }
    fn size_hint(&self) -> (usize, Option<usize>) {
        self.stream.get_ref().size_hint()
    }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DownloadOptions {
    pub format: BlobFormat,
    pub nodes: Vec<NodeAddr>,
    pub tag: SetTagOption,
    pub mode: DownloadMode,
}
#[cfg(test)]
mod tests {
    use std::{path::Path, time::Duration};
    use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket};
    use iroh_net::{key::SecretKey, test_utils::DnsPkarrServer, NodeId, RelayMode};
    use node::Node;
    use rand::RngCore;
    use testresult::TestResult;
    use tokio::{io::AsyncWriteExt, sync::mpsc};
    use super::*;
    use crate::hashseq::HashSeq;
    mod node {
        use std::{path::Path, sync::Arc};
        use iroh_net::{Endpoint, NodeAddr, NodeId};
        use iroh_router::Router;
        use tokio_util::task::AbortOnDropHandle;
        use super::RpcService;
        use crate::{
            downloader::Downloader,
            net_protocol::Blobs,
            provider::{CustomEventSender, EventSender},
            rpc::client::{blobs, tags},
            util::local_pool::LocalPool,
        };
        type RpcClient = quic_rpc::RpcClient<RpcService>;
        #[derive(Debug)]
        pub struct Node {
            router: iroh_router::Router,
            client: RpcClient,
            _local_pool: LocalPool,
            _rpc_task: AbortOnDropHandle<()>,
        }
        #[derive(Debug)]
        pub struct Builder<S> {
            store: S,
            events: EventSender,
            endpoint: Option<iroh_net::endpoint::Builder>,
        }
        impl<S: crate::store::Store> Builder<S> {
            pub fn blobs_events(self, events: impl CustomEventSender) -> Self {
                Self {
                    events: events.into(),
                    ..self
                }
            }
            pub fn endpoint(self, endpoint: iroh_net::endpoint::Builder) -> Self {
                Self {
                    endpoint: Some(endpoint),
                    ..self
                }
            }
            pub async fn spawn(self) -> anyhow::Result<Node> {
                let store = self.store;
                let events = self.events;
                let endpoint = self
                    .endpoint
                    .unwrap_or_else(|| Endpoint::builder().discovery_n0())
                    .bind()
                    .await?;
                let local_pool = LocalPool::single();
                let mut router = Router::builder(endpoint.clone());
                let downloader =
                    Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone());
                let blobs = Arc::new(Blobs::new(
                    store.clone(),
                    local_pool.handle().clone(),
                    events,
                    downloader,
                    endpoint.clone(),
                ));
                router = router.accept(crate::ALPN, blobs.clone());
                let router = router.spawn().await?;
                let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32);
                let internal_rpc = quic_rpc::RpcServer::new(internal_rpc).boxed();
                let _rpc_task = internal_rpc.spawn_accept_loop(move |msg, chan| {
                    blobs.clone().handle_rpc_request(msg, chan)
                });
                let client = quic_rpc::RpcClient::new(controller).boxed();
                Ok(Node {
                    router,
                    client,
                    _rpc_task,
                    _local_pool: local_pool,
                })
            }
        }
        impl Node {
            pub fn memory() -> Builder<crate::store::mem::Store> {
                Builder {
                    store: crate::store::mem::Store::new(),
                    events: Default::default(),
                    endpoint: None,
                }
            }
            pub async fn persistent(
                path: impl AsRef<Path>,
            ) -> anyhow::Result<Builder<crate::store::fs::Store>> {
                Ok(Builder {
                    store: crate::store::fs::Store::load(path).await?,
                    events: Default::default(),
                    endpoint: None,
                })
            }
            pub fn node_id(&self) -> NodeId {
                self.router.endpoint().node_id()
            }
            pub async fn node_addr(&self) -> anyhow::Result<NodeAddr> {
                self.router.endpoint().node_addr().await
            }
            pub async fn shutdown(self) -> anyhow::Result<()> {
                self.router.shutdown().await
            }
            pub fn blobs(&self) -> blobs::Client {
                blobs::Client::new(self.client.clone())
            }
            pub fn tags(&self) -> tags::Client {
                tags::Client::new(self.client.clone())
            }
        }
    }
    #[tokio::test]
    async fn test_blob_create_collection() -> Result<()> {
        let _guard = iroh_test::logging::setup();
        let node = node::Node::memory().spawn().await?;
        let temp_dir = tempfile::tempdir().context("tempdir")?;
        let in_root = temp_dir.path().join("in");
        tokio::fs::create_dir_all(in_root.clone())
            .await
            .context("create dir all")?;
        let mut paths = Vec::new();
        for i in 0..5 {
            let path = in_root.join(format!("test-{i}"));
            let size = 100;
            let mut buf = vec![0u8; size];
            rand::thread_rng().fill_bytes(&mut buf);
            let mut file = tokio::fs::File::create(path.clone())
                .await
                .context("create file")?;
            file.write_all(&buf.clone()).await.context("write_all")?;
            file.flush().await.context("flush")?;
            paths.push(path);
        }
        let blobs = node.blobs();
        let mut collection = Collection::default();
        let mut tags = Vec::new();
        for path in &paths {
            let import_outcome = blobs
                .add_from_path(
                    path.to_path_buf(),
                    false,
                    SetTagOption::Auto,
                    WrapOption::NoWrap,
                )
                .await
                .context("import file")?
                .finish()
                .await
                .context("import finish")?;
            collection.push(
                path.file_name().unwrap().to_str().unwrap().to_string(),
                import_outcome.hash,
            );
            tags.push(import_outcome.tag);
        }
        let (hash, tag) = blobs
            .create_collection(collection, SetTagOption::Auto, tags)
            .await?;
        let collections: Vec<_> = blobs.list_collections()?.try_collect().await?;
        assert_eq!(collections.len(), 1);
        {
            let CollectionInfo {
                tag,
                hash,
                total_blobs_count,
                ..
            } = &collections[0];
            assert_eq!(tag, tag);
            assert_eq!(hash, hash);
            assert_eq!(total_blobs_count, &Some(5 + 1));
        }
        let tags: Vec<_> = node.tags().list().await?.try_collect().await?;
        assert_eq!(tags.len(), 1);
        assert_eq!(tags[0].hash, hash);
        assert_eq!(tags[0].name, tag);
        assert_eq!(tags[0].format, BlobFormat::HashSeq);
        Ok(())
    }
    #[tokio::test]
    async fn test_blob_read_at() -> Result<()> {
        let node = node::Node::memory().spawn().await?;
        let temp_dir = tempfile::tempdir().context("tempdir")?;
        let in_root = temp_dir.path().join("in");
        tokio::fs::create_dir_all(in_root.clone())
            .await
            .context("create dir all")?;
        let path = in_root.join("test-blob");
        let size = 1024 * 128;
        let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
        let mut file = tokio::fs::File::create(path.clone())
            .await
            .context("create file")?;
        file.write_all(&buf.clone()).await.context("write_all")?;
        file.flush().await.context("flush")?;
        let blobs = node.blobs();
        let import_outcome = blobs
            .add_from_path(
                path.to_path_buf(),
                false,
                SetTagOption::Auto,
                WrapOption::NoWrap,
            )
            .await
            .context("import file")?
            .finish()
            .await
            .context("import finish")?;
        let hash = import_outcome.hash;
        let res = blobs.read_to_bytes(hash).await?;
        assert_eq!(&res, &buf[..]);
        let res = blobs
            .read_at_to_bytes(hash, 0, ReadAtLen::Exact(100))
            .await?;
        assert_eq!(res.len(), 100);
        assert_eq!(&res[..], &buf[0..100]);
        let res = blobs
            .read_at_to_bytes(hash, 20, ReadAtLen::Exact(120))
            .await?;
        assert_eq!(res.len(), 120);
        assert_eq!(&res[..], &buf[20..140]);
        let res = blobs
            .read_at_to_bytes(hash, 0, ReadAtLen::Exact(1024 * 64))
            .await?;
        assert_eq!(res.len(), 1024 * 64);
        assert_eq!(&res[..], &buf[0..1024 * 64]);
        let res = blobs
            .read_at_to_bytes(hash, 20, ReadAtLen::Exact(1024 * 64))
            .await?;
        assert_eq!(res.len(), 1024 * 64);
        assert_eq!(&res[..], &buf[20..(20 + 1024 * 64)]);
        let res = blobs
            .read_at_to_bytes(hash, 0, ReadAtLen::Exact(10 + 1024 * 64))
            .await?;
        assert_eq!(res.len(), 10 + 1024 * 64);
        assert_eq!(&res[..], &buf[0..(10 + 1024 * 64)]);
        let res = blobs
            .read_at_to_bytes(hash, 20, ReadAtLen::Exact(10 + 1024 * 64))
            .await?;
        assert_eq!(res.len(), 10 + 1024 * 64);
        assert_eq!(&res[..], &buf[20..(20 + 10 + 1024 * 64)]);
        let res = blobs.read_at_to_bytes(hash, 20, ReadAtLen::All).await?;
        assert_eq!(res.len(), 1024 * 128 - 20);
        assert_eq!(&res[..], &buf[20..]);
        let reader = blobs.read_at(hash, 0, ReadAtLen::Exact(20)).await?;
        assert_eq!(reader.size(), 1024 * 128);
        assert_eq!(reader.response_size, 20);
        let res = blobs
            .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::Exact(1024))
            .await?;
        assert_eq!(res.len(), 1024);
        assert_eq!(res, &buf[1024 * 127..]);
        let res = blobs
            .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::All)
            .await?;
        assert_eq!(res.len(), 1024);
        assert_eq!(res, &buf[1024 * 127..]);
        let mut res = blobs
            .read_at(hash, 1024 * 127, ReadAtLen::AtMost(2048))
            .await?;
        assert_eq!(res.size, 1024 * 128);
        assert_eq!(res.response_size, 1024);
        let res = res.read_to_bytes().await?;
        assert_eq!(res.len(), 1024);
        assert_eq!(res, &buf[1024 * 127..]);
        let res = blobs
            .read_at(hash, 0, ReadAtLen::Exact(1024 * 128 + 1))
            .await;
        let err = res.unwrap_err();
        assert!(err.to_string().contains("out of bound"));
        let res = blobs.read_at(hash, 1024 * 128 + 1, ReadAtLen::All).await;
        let err = res.unwrap_err();
        assert!(err.to_string().contains("out of range"));
        let res = blobs
            .read_at(hash, 1024 * 127, ReadAtLen::Exact(1025))
            .await;
        let err = res.unwrap_err();
        assert!(err.to_string().contains("out of bound"));
        Ok(())
    }
    #[tokio::test]
    async fn test_blob_get_collection() -> Result<()> {
        let _guard = iroh_test::logging::setup();
        let node = node::Node::memory().spawn().await?;
        let temp_dir = tempfile::tempdir().context("tempdir")?;
        let in_root = temp_dir.path().join("in");
        tokio::fs::create_dir_all(in_root.clone())
            .await
            .context("create dir all")?;
        let mut paths = Vec::new();
        for i in 0..5 {
            let path = in_root.join(format!("test-{i}"));
            let size = 100;
            let mut buf = vec![0u8; size];
            rand::thread_rng().fill_bytes(&mut buf);
            let mut file = tokio::fs::File::create(path.clone())
                .await
                .context("create file")?;
            file.write_all(&buf.clone()).await.context("write_all")?;
            file.flush().await.context("flush")?;
            paths.push(path);
        }
        let blobs = node.blobs();
        let mut collection = Collection::default();
        let mut tags = Vec::new();
        for path in &paths {
            let import_outcome = blobs
                .add_from_path(
                    path.to_path_buf(),
                    false,
                    SetTagOption::Auto,
                    WrapOption::NoWrap,
                )
                .await
                .context("import file")?
                .finish()
                .await
                .context("import finish")?;
            collection.push(
                path.file_name().unwrap().to_str().unwrap().to_string(),
                import_outcome.hash,
            );
            tags.push(import_outcome.tag);
        }
        let (hash, _tag) = blobs
            .create_collection(collection, SetTagOption::Auto, tags)
            .await?;
        let collection = blobs.get_collection(hash).await?;
        assert_eq!(collection.len(), 5);
        Ok(())
    }
    #[tokio::test]
    async fn test_blob_share() -> Result<()> {
        let _guard = iroh_test::logging::setup();
        let node = node::Node::memory().spawn().await?;
        let temp_dir = tempfile::tempdir().context("tempdir")?;
        let in_root = temp_dir.path().join("in");
        tokio::fs::create_dir_all(in_root.clone())
            .await
            .context("create dir all")?;
        let path = in_root.join("test-blob");
        let size = 1024 * 128;
        let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
        let mut file = tokio::fs::File::create(path.clone())
            .await
            .context("create file")?;
        file.write_all(&buf.clone()).await.context("write_all")?;
        file.flush().await.context("flush")?;
        let blobs = node.blobs();
        let import_outcome = blobs
            .add_from_path(
                path.to_path_buf(),
                false,
                SetTagOption::Auto,
                WrapOption::NoWrap,
            )
            .await
            .context("import file")?
            .finish()
            .await
            .context("import finish")?;
        let status = blobs.status(import_outcome.hash).await?;
        assert_eq!(status, BlobStatus::Complete { size });
        Ok(())
    }
    #[derive(Debug, Clone)]
    struct BlobEvents {
        sender: mpsc::Sender<crate::provider::Event>,
    }
    impl BlobEvents {
        fn new(cap: usize) -> (Self, mpsc::Receiver<crate::provider::Event>) {
            let (s, r) = mpsc::channel(cap);
            (Self { sender: s }, r)
        }
    }
    impl crate::provider::CustomEventSender for BlobEvents {
        fn send(&self, event: crate::provider::Event) -> futures_lite::future::Boxed<()> {
            let sender = self.sender.clone();
            Box::pin(async move {
                sender.send(event).await.ok();
            })
        }
        fn try_send(&self, event: crate::provider::Event) {
            self.sender.try_send(event).ok();
        }
    }
    #[tokio::test]
    async fn test_blob_provide_events() -> Result<()> {
        let _guard = iroh_test::logging::setup();
        let (node1_events, mut node1_events_r) = BlobEvents::new(16);
        let node1 = node::Node::memory()
            .blobs_events(node1_events)
            .spawn()
            .await?;
        let (node2_events, mut node2_events_r) = BlobEvents::new(16);
        let node2 = node::Node::memory()
            .blobs_events(node2_events)
            .spawn()
            .await?;
        let import_outcome = node1.blobs().add_bytes(&b"hello world"[..]).await?;
        let node1_addr = node1.node_addr().await?;
        let res = node2
            .blobs()
            .download(import_outcome.hash, node1_addr)
            .await?
            .await?;
        dbg!(&res);
        assert_eq!(res.local_size, 0);
        assert_eq!(res.downloaded_size, 11);
        node1.shutdown().await?;
        node2.shutdown().await?;
        let mut ev1 = Vec::new();
        while let Some(ev) = node1_events_r.recv().await {
            ev1.push(ev);
        }
        assert!(matches!(
            ev1[0],
            crate::provider::Event::ClientConnected { .. }
        ));
        assert!(matches!(
            ev1[1],
            crate::provider::Event::GetRequestReceived { .. }
        ));
        assert!(matches!(
            ev1[2],
            crate::provider::Event::TransferProgress { .. }
        ));
        assert!(matches!(
            ev1[3],
            crate::provider::Event::TransferCompleted { .. }
        ));
        dbg!(&ev1);
        let mut ev2 = Vec::new();
        while let Some(ev) = node2_events_r.recv().await {
            ev2.push(ev);
        }
        assert!(ev2.is_empty());
        Ok(())
    }
    #[tokio::test]
    async fn test_blob_get_self_existing() -> TestResult<()> {
        let _guard = iroh_test::logging::setup();
        let node = node::Node::memory().spawn().await?;
        let node_id = node.node_id();
        let blobs = node.blobs();
        let AddOutcome { hash, size, .. } = blobs.add_bytes("foo").await?;
        let res = blobs
            .download_with_opts(
                hash,
                DownloadOptions {
                    format: BlobFormat::Raw,
                    nodes: vec![node_id.into()],
                    tag: SetTagOption::Auto,
                    mode: DownloadMode::Direct,
                },
            )
            .await?
            .await?;
        assert_eq!(res.local_size, size);
        assert_eq!(res.downloaded_size, 0);
        let res = blobs
            .download_with_opts(
                hash,
                DownloadOptions {
                    format: BlobFormat::Raw,
                    nodes: vec![node_id.into()],
                    tag: SetTagOption::Auto,
                    mode: DownloadMode::Queued,
                },
            )
            .await?
            .await?;
        assert_eq!(res.local_size, size);
        assert_eq!(res.downloaded_size, 0);
        Ok(())
    }
    #[tokio::test]
    async fn test_blob_get_self_missing() -> TestResult<()> {
        let _guard = iroh_test::logging::setup();
        let node = node::Node::memory().spawn().await?;
        let node_id = node.node_id();
        let blobs = node.blobs();
        let hash = Hash::from_bytes([0u8; 32]);
        let res = blobs
            .download_with_opts(
                hash,
                DownloadOptions {
                    format: BlobFormat::Raw,
                    nodes: vec![node_id.into()],
                    tag: SetTagOption::Auto,
                    mode: DownloadMode::Direct,
                },
            )
            .await?
            .await;
        assert!(res.is_err());
        assert_eq!(
            res.err().unwrap().to_string().as_str(),
            "No nodes to download from provided"
        );
        let res = blobs
            .download_with_opts(
                hash,
                DownloadOptions {
                    format: BlobFormat::Raw,
                    nodes: vec![node_id.into()],
                    tag: SetTagOption::Auto,
                    mode: DownloadMode::Queued,
                },
            )
            .await?
            .await;
        assert!(res.is_err());
        assert_eq!(
            res.err().unwrap().to_string().as_str(),
            "No provider nodes found"
        );
        Ok(())
    }
    #[tokio::test]
    async fn test_blob_get_existing_collection() -> TestResult<()> {
        let _guard = iroh_test::logging::setup();
        let node = node::Node::memory().spawn().await?;
        let node_id = NodeId::from_bytes(&[0u8; 32])?;
        let blobs = node.blobs();
        let mut collection = Collection::default();
        let mut tags = Vec::new();
        let mut size = 0;
        for value in ["iroh", "is", "cool"] {
            let import_outcome = blobs.add_bytes(value).await.context("add bytes")?;
            collection.push(value.to_string(), import_outcome.hash);
            tags.push(import_outcome.tag);
            size += import_outcome.size;
        }
        let (hash, _tag) = blobs
            .create_collection(collection, SetTagOption::Auto, tags)
            .await?;
        let hashseq_bytes = blobs.read_to_bytes(hash).await?;
        size += hashseq_bytes.len() as u64;
        let hashseq = HashSeq::try_from(hashseq_bytes)?;
        let collection_header_bytes = blobs
            .read_to_bytes(hashseq.into_iter().next().expect("header to exist"))
            .await?;
        size += collection_header_bytes.len() as u64;
        let res = blobs
            .download_with_opts(
                hash,
                DownloadOptions {
                    format: BlobFormat::HashSeq,
                    nodes: vec![node_id.into()],
                    tag: SetTagOption::Auto,
                    mode: DownloadMode::Direct,
                },
            )
            .await?
            .await
            .context("direct (download)")?;
        assert_eq!(res.local_size, size);
        assert_eq!(res.downloaded_size, 0);
        let res = blobs
            .download_with_opts(
                hash,
                DownloadOptions {
                    format: BlobFormat::HashSeq,
                    nodes: vec![node_id.into()],
                    tag: SetTagOption::Auto,
                    mode: DownloadMode::Queued,
                },
            )
            .await?
            .await
            .context("queued")?;
        assert_eq!(res.local_size, size);
        assert_eq!(res.downloaded_size, 0);
        Ok(())
    }
    #[tokio::test]
    #[cfg_attr(target_os = "windows", ignore = "flaky")]
    async fn test_blob_delete_mem() -> Result<()> {
        let _guard = iroh_test::logging::setup();
        let node = node::Node::memory().spawn().await?;
        let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
        assert_eq!(hashes.len(), 1);
        assert_eq!(hashes[0].hash, res.hash);
        node.blobs().delete_blob(res.hash).await?;
        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
        assert!(hashes.is_empty());
        Ok(())
    }
    #[tokio::test]
    async fn test_blob_delete_fs() -> Result<()> {
        let _guard = iroh_test::logging::setup();
        let dir = tempfile::tempdir()?;
        let node = node::Node::persistent(dir.path()).await?.spawn().await?;
        let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
        assert_eq!(hashes.len(), 1);
        assert_eq!(hashes[0].hash, res.hash);
        node.blobs().delete_blob(res.hash).await?;
        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
        assert!(hashes.is_empty());
        Ok(())
    }
    #[tokio::test]
    async fn test_ticket_multiple_addrs() -> TestResult<()> {
        let _guard = iroh_test::logging::setup();
        let node = Node::memory().spawn().await?;
        let hash = node
            .blobs()
            .add_bytes(Bytes::from_static(b"hello"))
            .await?
            .hash;
        let mut addr = node.node_addr().await?;
        addr.apply_options(AddrInfoOptions::RelayAndAddresses);
        let ticket = BlobTicket::new(addr, hash, BlobFormat::Raw)?;
        println!("addrs: {:?}", ticket.node_addr().info);
        assert!(!ticket.node_addr().info.direct_addresses.is_empty());
        Ok(())
    }
    #[tokio::test]
    async fn test_node_add_blob_stream() -> Result<()> {
        let _guard = iroh_test::logging::setup();
        use std::io::Cursor;
        let node = Node::memory().spawn().await?;
        let blobs = node.blobs();
        let input = vec![2u8; 1024 * 256]; let reader = Cursor::new(input.clone());
        let progress = blobs.add_reader(reader, SetTagOption::Auto).await?;
        let outcome = progress.finish().await?;
        let hash = outcome.hash;
        let output = blobs.read_to_bytes(hash).await?;
        assert_eq!(input, output.to_vec());
        Ok(())
    }
    #[tokio::test]
    async fn test_node_add_tagged_blob_event() -> Result<()> {
        let _guard = iroh_test::logging::setup();
        let node = Node::memory().spawn().await?;
        let _got_hash = tokio::time::timeout(Duration::from_secs(10), async move {
            let mut stream = node
                .blobs()
                .add_from_path(
                    Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md"),
                    false,
                    SetTagOption::Auto,
                    WrapOption::NoWrap,
                )
                .await?;
            while let Some(progress) = stream.next().await {
                match progress? {
                    crate::provider::AddProgress::AllDone { hash, .. } => {
                        return Ok(hash);
                    }
                    crate::provider::AddProgress::Abort(e) => {
                        anyhow::bail!("Error while adding data: {e}");
                    }
                    _ => {}
                }
            }
            anyhow::bail!("stream ended without providing data");
        })
        .await
        .context("timeout")?
        .context("get failed")?;
        Ok(())
    }
    #[tokio::test]
    async fn test_download_via_relay() -> Result<()> {
        let _guard = iroh_test::logging::setup();
        let (relay_map, relay_url, _guard) = iroh_net::test_utils::run_relay_server().await?;
        let endpoint1 = iroh_net::Endpoint::builder()
            .relay_mode(RelayMode::Custom(relay_map.clone()))
            .insecure_skip_relay_cert_verify(true);
        let node1 = Node::memory().endpoint(endpoint1).spawn().await?;
        let endpoint2 = iroh_net::Endpoint::builder()
            .relay_mode(RelayMode::Custom(relay_map.clone()))
            .insecure_skip_relay_cert_verify(true);
        let node2 = Node::memory().endpoint(endpoint2).spawn().await?;
        let AddOutcome { hash, .. } = node1.blobs().add_bytes(b"foo".to_vec()).await?;
        let addr = NodeAddr::new(node1.node_id()).with_relay_url(relay_url);
        node2.blobs().download(hash, addr).await?.await?;
        assert_eq!(
            node2
                .blobs()
                .read_to_bytes(hash)
                .await
                .context("get")?
                .as_ref(),
            b"foo"
        );
        Ok(())
    }
    #[tokio::test]
    #[ignore = "flaky"]
    async fn test_download_via_relay_with_discovery() -> Result<()> {
        let _guard = iroh_test::logging::setup();
        let (relay_map, _relay_url, _guard) = iroh_net::test_utils::run_relay_server().await?;
        let dns_pkarr_server = DnsPkarrServer::run().await?;
        let secret1 = SecretKey::generate();
        let endpoint1 = iroh_net::Endpoint::builder()
            .relay_mode(RelayMode::Custom(relay_map.clone()))
            .insecure_skip_relay_cert_verify(true)
            .dns_resolver(dns_pkarr_server.dns_resolver())
            .secret_key(secret1.clone())
            .discovery(dns_pkarr_server.discovery(secret1));
        let node1 = Node::memory().endpoint(endpoint1).spawn().await?;
        let secret2 = SecretKey::generate();
        let endpoint2 = iroh_net::Endpoint::builder()
            .relay_mode(RelayMode::Custom(relay_map.clone()))
            .insecure_skip_relay_cert_verify(true)
            .dns_resolver(dns_pkarr_server.dns_resolver())
            .secret_key(secret2.clone())
            .discovery(dns_pkarr_server.discovery(secret2));
        let node2 = Node::memory().endpoint(endpoint2).spawn().await?;
        let hash = node1.blobs().add_bytes(b"foo".to_vec()).await?.hash;
        let addr = NodeAddr::new(node1.node_id());
        node2.blobs().download(hash, addr).await?.await?;
        assert_eq!(
            node2
                .blobs()
                .read_to_bytes(hash)
                .await
                .context("get")?
                .as_ref(),
            b"foo"
        );
        Ok(())
    }
}