use std::{
    collections::BTreeMap,
    future::{Future, IntoFuture},
    io,
    num::NonZeroU64,
    path::{Path, PathBuf},
    pin::Pin,
};
pub use bao_tree::io::mixed::EncodedItem;
use bao_tree::{
    io::{
        fsm::{ResponseDecoder, ResponseDecoderNext},
        BaoContentItem, Leaf,
    },
    BaoTree, ChunkNum, ChunkRanges,
};
use bytes::Bytes;
use genawaiter::sync::Gen;
use iroh_io::{AsyncStreamReader, TokioStreamReader};
use irpc::channel::{mpsc, oneshot};
use n0_future::{future, stream, Stream, StreamExt};
use quinn::SendStream;
use range_collections::{range_set::RangeSetRange, RangeSet2};
use ref_cast::RefCast;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tracing::trace;
mod reader;
pub use reader::BlobReader;
pub use super::proto::{
    AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
    ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
    ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
    ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
};
use super::{
    proto::{
        BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
        ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
        ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
    },
    remote::HashSeqChunk,
    tags::TagInfo,
    ApiClient, RequestResult, Tags,
};
use crate::{
    api::proto::{BatchRequest, ImportByteStreamUpdate},
    provider::StreamContext,
    store::IROH_BLOCK_SIZE,
    util::temp_tag::TempTag,
    BlobFormat, Hash, HashAndFormat,
};
#[derive(Debug)]
pub struct AddBytesOptions {
    pub data: Bytes,
    pub format: BlobFormat,
}
impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
    fn from(item: (T, BlobFormat)) -> Self {
        let (data, format) = item;
        Self {
            data: data.into(),
            format,
        }
    }
}
#[derive(Debug, Clone, ref_cast::RefCast)]
#[repr(transparent)]
pub struct Blobs {
    client: ApiClient,
}
impl Blobs {
    pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
        Self::ref_cast(sender)
    }
    pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
        let msg = BatchRequest;
        trace!("{msg:?}");
        let (tx, rx) = self.client.client_streaming(msg, 32).await?;
        let scope = rx.await?;
        Ok(Batch {
            scope,
            blobs: self,
            _tx: tx,
        })
    }
    pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
        self.reader_with_opts(ReaderOptions { hash: hash.into() })
    }
    pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
        BlobReader::new(self.clone(), options)
    }
    pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
        trace!("{options:?}");
        self.client.rpc(options).await??;
        Ok(())
    }
    pub(crate) async fn delete(
        &self,
        hashes: impl IntoIterator<Item = impl Into<Hash>>,
    ) -> RequestResult<()> {
        self.delete_with_opts(DeleteOptions {
            hashes: hashes.into_iter().map(Into::into).collect(),
            force: false,
        })
        .await
    }
    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
        let options = ImportBytesRequest {
            data: Bytes::copy_from_slice(data.as_ref()),
            format: crate::BlobFormat::Raw,
            scope: Scope::GLOBAL,
        };
        self.add_bytes_impl(options)
    }
    pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
        let options = ImportBytesRequest {
            data: data.into(),
            format: crate::BlobFormat::Raw,
            scope: Scope::GLOBAL,
        };
        self.add_bytes_impl(options)
    }
    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
        let options = options.into();
        let request = ImportBytesRequest {
            data: options.data,
            format: options.format,
            scope: Scope::GLOBAL,
        };
        self.add_bytes_impl(request)
    }
    fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
        trace!("{options:?}");
        let this = self.clone();
        let stream = Gen::new(|co| async move {
            let mut receiver = match this.client.server_streaming(options, 32).await {
                Ok(receiver) => receiver,
                Err(cause) => {
                    co.yield_(AddProgressItem::Error(cause.into())).await;
                    return;
                }
            };
            loop {
                match receiver.recv().await {
                    Ok(Some(item)) => co.yield_(item).await,
                    Err(cause) => {
                        co.yield_(AddProgressItem::Error(cause.into())).await;
                        break;
                    }
                    Ok(None) => break,
                }
            }
        });
        AddProgress::new(self, stream)
    }
    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
        let options = options.into();
        self.add_path_with_opts_impl(ImportPathRequest {
            path: options.path,
            mode: options.mode,
            format: options.format,
            scope: Scope::GLOBAL,
        })
    }
    fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
        trace!("{:?}", options);
        let client = self.client.clone();
        let stream = Gen::new(|co| async move {
            let mut receiver = match client.server_streaming(options, 32).await {
                Ok(receiver) => receiver,
                Err(cause) => {
                    co.yield_(AddProgressItem::Error(cause.into())).await;
                    return;
                }
            };
            loop {
                match receiver.recv().await {
                    Ok(Some(item)) => co.yield_(item).await,
                    Err(cause) => {
                        co.yield_(AddProgressItem::Error(cause.into())).await;
                        break;
                    }
                    Ok(None) => break,
                }
            }
        });
        AddProgress::new(self, stream)
    }
    pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
        self.add_path_with_opts(AddPathOptions {
            path: path.as_ref().to_owned(),
            mode: ImportMode::Copy,
            format: BlobFormat::Raw,
        })
    }
    pub async fn add_stream(
        &self,
        data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
    ) -> AddProgress<'_> {
        let inner = ImportByteStreamRequest {
            format: crate::BlobFormat::Raw,
            scope: Scope::default(),
        };
        let client = self.client.clone();
        let stream = Gen::new(|co| async move {
            let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
                Ok(x) => x,
                Err(cause) => {
                    co.yield_(AddProgressItem::Error(cause.into())).await;
                    return;
                }
            };
            let recv = async {
                loop {
                    match receiver.recv().await {
                        Ok(Some(item)) => co.yield_(item).await,
                        Err(cause) => {
                            co.yield_(AddProgressItem::Error(cause.into())).await;
                            break;
                        }
                        Ok(None) => break,
                    }
                }
            };
            let send = async {
                tokio::pin!(data);
                while let Some(item) = data.next().await {
                    sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
                }
                sender.send(ImportByteStreamUpdate::Done).await?;
                anyhow::Ok(())
            };
            let _ = tokio::join!(send, recv);
        });
        AddProgress::new(self, stream)
    }
    pub fn export_ranges(
        &self,
        hash: impl Into<Hash>,
        ranges: impl Into<RangeSet2<u64>>,
    ) -> ExportRangesProgress {
        self.export_ranges_with_opts(ExportRangesOptions {
            hash: hash.into(),
            ranges: ranges.into(),
        })
    }
    pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
        trace!("{options:?}");
        ExportRangesProgress::new(
            options.ranges.clone(),
            self.client.server_streaming(options, 32),
        )
    }
    pub fn export_bao_with_opts(
        &self,
        options: ExportBaoOptions,
        local_update_cap: usize,
    ) -> ExportBaoProgress {
        trace!("{options:?}");
        ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
    }
    pub fn export_bao(
        &self,
        hash: impl Into<Hash>,
        ranges: impl Into<ChunkRanges>,
    ) -> ExportBaoProgress {
        self.export_bao_with_opts(
            ExportBaoRequest {
                hash: hash.into(),
                ranges: ranges.into(),
            },
            32,
        )
    }
    pub async fn export_chunk(
        &self,
        hash: impl Into<Hash>,
        offset: u64,
    ) -> super::ExportBaoResult<Leaf> {
        let base = ChunkNum::full_chunks(offset);
        let ranges = ChunkRanges::from(base..base + 1);
        let mut stream = self.export_bao(hash, ranges).stream();
        while let Some(item) = stream.next().await {
            match item {
                EncodedItem::Leaf(leaf) => return Ok(leaf),
                EncodedItem::Parent(_) => {}
                EncodedItem::Size(_) => {}
                EncodedItem::Done => break,
                EncodedItem::Error(cause) => return Err(cause.into()),
            }
        }
        Err(io::Error::other("unexpected end of stream").into())
    }
    pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
        self.export_bao(hash.into(), ChunkRanges::all())
            .data_to_bytes()
            .await
    }
    pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
        self.observe_with_opts(ObserveOptions { hash: hash.into() })
    }
    pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
        trace!("{:?}", options);
        if options.hash == Hash::EMPTY {
            return ObserveProgress::new(async move {
                let (tx, rx) = mpsc::channel(1);
                tx.send(Bitfield::complete(0)).await.ok();
                Ok(rx)
            });
        }
        ObserveProgress::new(self.client.server_streaming(options, 32))
    }
    pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
        trace!("{:?}", options);
        ExportProgress::new(self.client.server_streaming(options, 32))
    }
    pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
        let options = ExportOptions {
            hash: hash.into(),
            mode: ExportMode::Copy,
            target: target.as_ref().to_owned(),
        };
        self.export_with_opts(options)
    }
    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
    pub async fn import_bao(
        &self,
        hash: impl Into<Hash>,
        size: NonZeroU64,
        local_update_cap: usize,
    ) -> irpc::Result<ImportBaoHandle> {
        let options = ImportBaoRequest {
            hash: hash.into(),
            size,
        };
        self.import_bao_with_opts(options, local_update_cap).await
    }
    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
    pub async fn import_bao_with_opts(
        &self,
        options: ImportBaoOptions,
        local_update_cap: usize,
    ) -> irpc::Result<ImportBaoHandle> {
        trace!("{:?}", options);
        ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
    }
    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
    async fn import_bao_reader<R: AsyncStreamReader>(
        &self,
        hash: Hash,
        ranges: ChunkRanges,
        mut reader: R,
    ) -> RequestResult<R> {
        let size = u64::from_le_bytes(reader.read::<8>().await.map_err(super::Error::other)?);
        let Some(size) = NonZeroU64::new(size) else {
            return if hash == Hash::EMPTY {
                Ok(reader)
            } else {
                Err(super::Error::other("invalid size for hash").into())
            };
        };
        let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
        let mut decoder = ResponseDecoder::new(hash.into(), ranges, tree, reader);
        let options = ImportBaoOptions { hash, size };
        let handle = self.import_bao_with_opts(options, 32).await?;
        let driver = async move {
            let reader = loop {
                match decoder.next().await {
                    ResponseDecoderNext::More((rest, item)) => {
                        handle.tx.send(item?).await?;
                        decoder = rest;
                    }
                    ResponseDecoderNext::Done(reader) => break reader,
                };
            };
            drop(handle.tx);
            io::Result::Ok(reader)
        };
        let fut = async move { handle.rx.await.map_err(io::Error::other)? };
        let (reader, res) = tokio::join!(driver, fut);
        res?;
        Ok(reader?)
    }
    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
    pub async fn import_bao_quinn(
        &self,
        hash: Hash,
        ranges: ChunkRanges,
        stream: &mut iroh::endpoint::RecvStream,
    ) -> RequestResult<()> {
        let reader = TokioStreamReader::new(stream);
        self.import_bao_reader(hash, ranges, reader).await?;
        Ok(())
    }
    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
    pub async fn import_bao_bytes(
        &self,
        hash: Hash,
        ranges: ChunkRanges,
        data: impl Into<Bytes>,
    ) -> RequestResult<()> {
        self.import_bao_reader(hash, ranges, data.into()).await?;
        Ok(())
    }
    pub fn list(&self) -> BlobsListProgress {
        let msg = ListRequest;
        let client = self.client.clone();
        BlobsListProgress::new(client.server_streaming(msg, 32))
    }
    pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
        let hash = hash.into();
        let msg = BlobStatusRequest { hash };
        self.client.rpc(msg).await
    }
    pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
        match self.status(hash).await? {
            BlobStatus::Complete { .. } => Ok(true),
            _ => Ok(false),
        }
    }
    pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
        let msg = ClearProtectedRequest;
        self.client.rpc(msg).await??;
        Ok(())
    }
}
pub struct BatchAddProgress<'a>(AddProgress<'a>);
impl<'a> IntoFuture for BatchAddProgress<'a> {
    type Output = RequestResult<TempTag>;
    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
    fn into_future(self) -> Self::IntoFuture {
        Box::pin(self.temp_tag())
    }
}
impl<'a> BatchAddProgress<'a> {
    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
        self.0.with_named_tag(name).await
    }
    pub async fn with_tag(self) -> RequestResult<TagInfo> {
        self.0.with_tag().await
    }
    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
        self.0.stream().await
    }
    pub async fn temp_tag(self) -> RequestResult<TempTag> {
        self.0.temp_tag().await
    }
}
pub struct Batch<'a> {
    scope: Scope,
    blobs: &'a Blobs,
    _tx: mpsc::Sender<BatchResponse>,
}
impl<'a> Batch<'a> {
    pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
        let options = ImportBytesRequest {
            data: data.into(),
            format: crate::BlobFormat::Raw,
            scope: self.scope,
        };
        BatchAddProgress(self.blobs.add_bytes_impl(options))
    }
    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
        let options = options.into();
        BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
            data: options.data,
            format: options.format,
            scope: self.scope,
        }))
    }
    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
        let options = ImportBytesRequest {
            data: Bytes::copy_from_slice(data.as_ref()),
            format: crate::BlobFormat::Raw,
            scope: self.scope,
        };
        BatchAddProgress(self.blobs.add_bytes_impl(options))
    }
    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
        let options = options.into();
        BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
            path: options.path,
            mode: options.mode,
            format: options.format,
            scope: self.scope,
        }))
    }
    pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
        let value = value.into();
        let msg = CreateTempTagRequest {
            scope: self.scope,
            value,
        };
        self.blobs.client.rpc(msg).await
    }
}
#[derive(Debug)]
pub struct AddPathOptions {
    pub path: PathBuf,
    pub format: BlobFormat,
    pub mode: ImportMode,
}
pub struct AddProgress<'a> {
    blobs: &'a Blobs,
    inner: stream::Boxed<AddProgressItem>,
}
impl<'a> IntoFuture for AddProgress<'a> {
    type Output = RequestResult<TagInfo>;
    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
    fn into_future(self) -> Self::IntoFuture {
        Box::pin(self.with_tag())
    }
}
impl<'a> AddProgress<'a> {
    fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
        Self {
            blobs,
            inner: Box::pin(stream),
        }
    }
    pub async fn temp_tag(self) -> RequestResult<TempTag> {
        let mut stream = self.inner;
        while let Some(item) = stream.next().await {
            match item {
                AddProgressItem::Done(tt) => return Ok(tt),
                AddProgressItem::Error(e) => return Err(e.into()),
                _ => {}
            }
        }
        Err(super::Error::other("unexpected end of stream").into())
    }
    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
        let blobs = self.blobs.clone();
        let tt = self.temp_tag().await?;
        let haf = *tt.hash_and_format();
        let tags = Tags::ref_from_sender(&blobs.client);
        tags.set(name, *tt.hash_and_format()).await?;
        drop(tt);
        Ok(haf)
    }
    pub async fn with_tag(self) -> RequestResult<TagInfo> {
        let blobs = self.blobs.clone();
        let tt = self.temp_tag().await?;
        let hash = *tt.hash();
        let format = tt.format();
        let tags = Tags::ref_from_sender(&blobs.client);
        let name = tags.create(*tt.hash_and_format()).await?;
        drop(tt);
        Ok(TagInfo { name, hash, format })
    }
    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
        self.inner
    }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReaderOptions {
    pub hash: Hash,
}
pub struct ObserveProgress {
    inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
}
impl IntoFuture for ObserveProgress {
    type Output = RequestResult<Bitfield>;
    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
    fn into_future(self) -> Self::IntoFuture {
        Box::pin(async move {
            let mut rx = self.inner.await?;
            match rx.recv().await? {
                Some(bitfield) => Ok(bitfield),
                None => Err(super::Error::other("unexpected end of stream").into()),
            }
        })
    }
}
impl ObserveProgress {
    fn new(
        fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
    ) -> Self {
        Self {
            inner: Box::pin(fut),
        }
    }
    pub async fn await_completion(self) -> RequestResult<Bitfield> {
        let mut stream = self.stream().await?;
        while let Some(item) = stream.next().await {
            if item.is_complete() {
                return Ok(item);
            }
        }
        Err(super::Error::other("unexpected end of stream").into())
    }
    pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
        let mut rx = self.inner.await?;
        Ok(Gen::new(|co| async move {
            while let Ok(Some(item)) = rx.recv().await {
                co.yield_(item).await;
            }
        }))
    }
}
pub struct ExportProgress {
    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
}
impl IntoFuture for ExportProgress {
    type Output = RequestResult<u64>;
    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
    fn into_future(self) -> Self::IntoFuture {
        Box::pin(self.finish())
    }
}
impl ExportProgress {
    fn new(
        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
    ) -> Self {
        Self {
            inner: Box::pin(fut),
        }
    }
    pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
        Gen::new(|co| async move {
            let mut rx = match self.inner.await {
                Ok(rx) => rx,
                Err(e) => {
                    co.yield_(ExportProgressItem::Error(e.into())).await;
                    return;
                }
            };
            while let Ok(Some(item)) = rx.recv().await {
                co.yield_(item).await;
            }
        })
    }
    pub async fn finish(self) -> RequestResult<u64> {
        let mut rx = self.inner.await?;
        let mut size = None;
        loop {
            match rx.recv().await? {
                Some(ExportProgressItem::Done) => break,
                Some(ExportProgressItem::Size(s)) => size = Some(s),
                Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
                _ => {}
            }
        }
        if let Some(size) = size {
            Ok(size)
        } else {
            Err(super::Error::other("unexpected end of stream").into())
        }
    }
}
pub struct ImportBaoHandle {
    pub tx: mpsc::Sender<BaoContentItem>,
    pub rx: oneshot::Receiver<super::Result<()>>,
}
impl ImportBaoHandle {
    pub(crate) async fn new(
        fut: impl Future<
                Output = irpc::Result<(
                    mpsc::Sender<BaoContentItem>,
                    oneshot::Receiver<super::Result<()>>,
                )>,
            > + Send
            + 'static,
    ) -> irpc::Result<Self> {
        let (tx, rx) = fut.await?;
        Ok(Self { tx, rx })
    }
}
pub struct BlobsListProgress {
    inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
}
impl BlobsListProgress {
    fn new(
        fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
    ) -> Self {
        Self {
            inner: Box::pin(fut),
        }
    }
    pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
        let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
        let mut hashes = Vec::new();
        while let Some(item) = rx.recv().await? {
            hashes.push(item?);
        }
        Ok(hashes)
    }
    pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
        let mut rx = self.inner.await?;
        Ok(Gen::new(|co| async move {
            while let Ok(Some(item)) = rx.recv().await {
                co.yield_(item).await;
            }
        }))
    }
}
pub struct ExportRangesProgress {
    ranges: RangeSet2<u64>,
    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
}
impl ExportRangesProgress {
    fn new(
        ranges: RangeSet2<u64>,
        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
    ) -> Self {
        Self {
            ranges,
            inner: Box::pin(fut),
        }
    }
}
impl ExportRangesProgress {
    pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
        Gen::new(|co| async move {
            let mut rx = match self.inner.await {
                Ok(rx) => rx,
                Err(e) => {
                    co.yield_(ExportRangesItem::Error(e.into())).await;
                    return;
                }
            };
            while let Ok(Some(item)) = rx.recv().await {
                co.yield_(item).await;
            }
        })
    }
    pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
        let mut rx = self.inner.await?;
        let mut data = BTreeMap::new();
        while let Some(item) = rx.recv().await? {
            match item {
                ExportRangesItem::Size(_) => {}
                ExportRangesItem::Data(leaf) => {
                    data.insert(leaf.offset, leaf.data);
                }
                ExportRangesItem::Error(cause) => return Err(cause.into()),
            }
        }
        let mut res = Vec::new();
        for range in self.ranges.iter() {
            let (start, end) = match range {
                RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
                RangeSetRange::Range(range) => (*range.start, *range.end),
            };
            for (offset, data) in data.iter() {
                let cstart = *offset;
                let cend = *offset + (data.len() as u64);
                if cstart >= end || cend <= start {
                    continue;
                }
                let start = start.max(cstart);
                let end = end.min(cend);
                let data = &data[(start - cstart) as usize..(end - cstart) as usize];
                res.extend_from_slice(data);
            }
        }
        Ok(res)
    }
}
pub struct ExportBaoProgress {
    inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
}
impl ExportBaoProgress {
    fn new(
        fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
    ) -> Self {
        Self {
            inner: Box::pin(fut),
        }
    }
    pub fn hashes_with_index(
        self,
    ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
        let mut stream = self.stream();
        Gen::new(|co| async move {
            while let Some(item) = stream.next().await {
                let leaf = match item {
                    EncodedItem::Leaf(leaf) => leaf,
                    EncodedItem::Error(e) => {
                        co.yield_(Err(e.into())).await;
                        continue;
                    }
                    _ => continue,
                };
                let slice = match HashSeqChunk::try_from(leaf) {
                    Ok(slice) => slice,
                    Err(e) => {
                        co.yield_(Err(e)).await;
                        continue;
                    }
                };
                let offset = slice.base();
                for (o, hash) in slice.into_iter().enumerate() {
                    co.yield_(Ok((offset + o as u64, hash))).await;
                }
            }
        })
    }
    pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
        self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
    }
    pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
        let mut data = Vec::new();
        let mut stream = self.into_byte_stream();
        while let Some(item) = stream.next().await {
            data.extend_from_slice(&item?);
        }
        Ok(data)
    }
    pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
        let mut rx = self.inner.await?;
        let mut data = Vec::new();
        while let Some(item) = rx.recv().await? {
            match item {
                EncodedItem::Leaf(leaf) => {
                    data.push(leaf.data);
                }
                EncodedItem::Parent(_) => {}
                EncodedItem::Size(_) => {}
                EncodedItem::Done => break,
                EncodedItem::Error(cause) => return Err(cause.into()),
            }
        }
        if data.len() == 1 {
            Ok(data.pop().unwrap())
        } else {
            let mut out = Vec::new();
            for item in data {
                out.extend_from_slice(&item);
            }
            Ok(out.into())
        }
    }
    pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
        let mut rx = self.inner.await?;
        let mut data = Vec::new();
        while let Some(item) = rx.recv().await? {
            match item {
                EncodedItem::Leaf(leaf) => {
                    data.extend_from_slice(&leaf.data);
                }
                EncodedItem::Parent(_) => {}
                EncodedItem::Size(_) => {}
                EncodedItem::Done => break,
                EncodedItem::Error(cause) => return Err(cause.into()),
            }
        }
        Ok(data)
    }
    pub async fn write_quinn(self, target: &mut quinn::SendStream) -> super::ExportBaoResult<()> {
        let mut rx = self.inner.await?;
        while let Some(item) = rx.recv().await? {
            match item {
                EncodedItem::Size(size) => {
                    target.write_u64_le(size).await?;
                }
                EncodedItem::Parent(parent) => {
                    let mut data = vec![0u8; 64];
                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
                    target.write_all(&data).await.map_err(io::Error::from)?;
                }
                EncodedItem::Leaf(leaf) => {
                    target
                        .write_chunk(leaf.data)
                        .await
                        .map_err(io::Error::from)?;
                }
                EncodedItem::Done => break,
                EncodedItem::Error(cause) => return Err(cause.into()),
            }
        }
        Ok(())
    }
    pub(crate) async fn write_quinn_with_progress(
        self,
        writer: &mut SendStream,
        progress: &mut impl WriteProgress,
        hash: &Hash,
        index: u64,
    ) -> super::ExportBaoResult<()> {
        let mut rx = self.inner.await?;
        while let Some(item) = rx.recv().await? {
            match item {
                EncodedItem::Size(size) => {
                    progress.send_transfer_started(index, hash, size).await;
                    writer.write_u64_le(size).await?;
                    progress.log_other_write(8);
                }
                EncodedItem::Parent(parent) => {
                    let mut data = vec![0u8; 64];
                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
                    writer.write_all(&data).await.map_err(io::Error::from)?;
                    progress.log_other_write(64);
                }
                EncodedItem::Leaf(leaf) => {
                    let len = leaf.data.len();
                    writer
                        .write_chunk(leaf.data)
                        .await
                        .map_err(io::Error::from)?;
                    progress.notify_payload_write(index, leaf.offset, len).await;
                }
                EncodedItem::Done => break,
                EncodedItem::Error(cause) => return Err(cause.into()),
            }
        }
        Ok(())
    }
    pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
        self.stream().filter_map(|item| match item {
            EncodedItem::Size(size) => {
                let size = size.to_le_bytes().to_vec().into();
                Some(Ok(size))
            }
            EncodedItem::Parent(parent) => {
                let mut data = vec![0u8; 64];
                data[..32].copy_from_slice(parent.pair.0.as_bytes());
                data[32..].copy_from_slice(parent.pair.1.as_bytes());
                Some(Ok(data.into()))
            }
            EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
            EncodedItem::Done => None,
            EncodedItem::Error(cause) => Some(Err(cause.into())),
        })
    }
    pub fn stream(self) -> impl Stream<Item = EncodedItem> {
        Gen::new(|co| async move {
            let mut rx = match self.inner.await {
                Ok(rx) => rx,
                Err(cause) => {
                    co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
                        .await;
                    return;
                }
            };
            while let Ok(Some(item)) = rx.recv().await {
                co.yield_(item).await;
            }
        })
    }
}
pub(crate) trait WriteProgress {
    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize);
    fn log_other_write(&mut self, len: usize);
    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
}
impl WriteProgress for StreamContext {
    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) {
        StreamContext::notify_payload_write(self, index, offset, len);
    }
    fn log_other_write(&mut self, len: usize) {
        StreamContext::log_other_write(self, len);
    }
    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64) {
        StreamContext::send_transfer_started(self, index, hash, size).await
    }
}