iroh_blobs/api/
blobs.rs

1//! API to interact with a local blob store
2//!
3//! This API is for local interactions with the blob store, such as importing
4//! and exporting blobs, observing the bitfield of a blob, and deleting blobs.
5//!
6//! The main entry point is the [`Blobs`] struct.
7use std::{
8    collections::BTreeMap,
9    future::{Future, IntoFuture},
10    io,
11    num::NonZeroU64,
12    path::{Path, PathBuf},
13    pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18    io::{
19        fsm::{ResponseDecoder, ResponseDecoderNext},
20        BaoContentItem, Leaf,
21    },
22    BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::AsyncStreamWriter;
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use range_collections::{range_set::RangeSetRange, RangeSet2};
30use ref_cast::RefCast;
31use serde::{Deserialize, Serialize};
32use tracing::trace;
33mod reader;
34pub use reader::BlobReader;
35
36// Public reexports from the proto module.
37//
38// Due to the fact that the proto module is hidden from docs by default,
39// these will appear in the docs as if they were declared here.
40pub use super::proto::{
41    AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
42    ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
43    ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
44    ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
45};
46use super::{
47    proto::{
48        BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
49        ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
50        ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
51    },
52    remote::HashSeqChunk,
53    tags::TagInfo,
54    ApiClient, RequestResult, Tags,
55};
56use crate::{
57    api::proto::{BatchRequest, ImportByteStreamUpdate},
58    provider::events::ClientResult,
59    store::IROH_BLOCK_SIZE,
60    util::{temp_tag::TempTag, RecvStreamAsyncStreamReader},
61    BlobFormat, Hash, HashAndFormat,
62};
63
64/// Options for adding bytes.
65#[derive(Debug)]
66pub struct AddBytesOptions {
67    pub data: Bytes,
68    pub format: BlobFormat,
69}
70
71impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
72    fn from(item: (T, BlobFormat)) -> Self {
73        let (data, format) = item;
74        Self {
75            data: data.into(),
76            format,
77        }
78    }
79}
80
81/// Blobs API
82#[derive(Debug, Clone, ref_cast::RefCast)]
83#[repr(transparent)]
84pub struct Blobs {
85    client: ApiClient,
86}
87
88impl Blobs {
89    pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
90        Self::ref_cast(sender)
91    }
92
93    pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
94        let msg = BatchRequest;
95        trace!("{msg:?}");
96        let (tx, rx) = self.client.client_streaming(msg, 32).await?;
97        let scope = rx.await?;
98
99        Ok(Batch {
100            scope,
101            blobs: self,
102            _tx: tx,
103        })
104    }
105
106    /// Create a reader for the given hash. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
107    /// and therefore can be used to read the blob's content.
108    ///
109    /// Any access to parts of the blob that are not present will result in an error.
110    ///
111    /// Example:
112    /// ```rust
113    /// use iroh_blobs::{store::mem::MemStore, api::blobs::Blobs};
114    /// use tokio::io::AsyncReadExt;
115    ///
116    /// # async fn example() -> anyhow::Result<()> {
117    /// let store = MemStore::new();
118    /// let tag = store.add_slice(b"Hello, world!").await?;
119    /// let mut reader = store.reader(tag.hash);
120    /// let mut buf = String::new();
121    /// reader.read_to_string(&mut buf).await?;
122    /// assert_eq!(buf, "Hello, world!");
123    /// # Ok(())
124    /// }
125    /// ```
126    pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
127        self.reader_with_opts(ReaderOptions { hash: hash.into() })
128    }
129
130    /// Create a reader for the given options. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
131    /// and therefore can be used to read the blob's content.
132    ///
133    /// Any access to parts of the blob that are not present will result in an error.
134    pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
135        BlobReader::new(self.clone(), options)
136    }
137
138    /// Delete a blob.
139    ///
140    /// This function is not public, because it does not work as expected when called manually,
141    /// because blobs are protected from deletion. This is only called from the gc task, which
142    /// clears the protections before.
143    ///
144    /// Users should rely only on garbage collection for blob deletion.
145    #[cfg(feature = "fs-store")]
146    pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
147        trace!("{options:?}");
148        self.client.rpc(options).await??;
149        Ok(())
150    }
151
152    /// See [`Self::delete_with_opts`].
153    #[cfg(feature = "fs-store")]
154    pub(crate) async fn delete(
155        &self,
156        hashes: impl IntoIterator<Item = impl Into<Hash>>,
157    ) -> RequestResult<()> {
158        self.delete_with_opts(DeleteOptions {
159            hashes: hashes.into_iter().map(Into::into).collect(),
160            force: false,
161        })
162        .await
163    }
164
165    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
166        let options = ImportBytesRequest {
167            data: Bytes::copy_from_slice(data.as_ref()),
168            format: crate::BlobFormat::Raw,
169            scope: Scope::GLOBAL,
170        };
171        self.add_bytes_impl(options)
172    }
173
174    pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
175        let options = ImportBytesRequest {
176            data: data.into(),
177            format: crate::BlobFormat::Raw,
178            scope: Scope::GLOBAL,
179        };
180        self.add_bytes_impl(options)
181    }
182
183    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
184        let options = options.into();
185        let request = ImportBytesRequest {
186            data: options.data,
187            format: options.format,
188            scope: Scope::GLOBAL,
189        };
190        self.add_bytes_impl(request)
191    }
192
193    fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
194        trace!("{options:?}");
195        let this = self.clone();
196        let stream = Gen::new(|co| async move {
197            let mut receiver = match this.client.server_streaming(options, 32).await {
198                Ok(receiver) => receiver,
199                Err(cause) => {
200                    co.yield_(AddProgressItem::Error(cause.into())).await;
201                    return;
202                }
203            };
204            loop {
205                match receiver.recv().await {
206                    Ok(Some(item)) => co.yield_(item).await,
207                    Err(cause) => {
208                        co.yield_(AddProgressItem::Error(cause.into())).await;
209                        break;
210                    }
211                    Ok(None) => break,
212                }
213            }
214        });
215        AddProgress::new(self, stream)
216    }
217
218    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
219        let options = options.into();
220        self.add_path_with_opts_impl(ImportPathRequest {
221            path: options.path,
222            mode: options.mode,
223            format: options.format,
224            scope: Scope::GLOBAL,
225        })
226    }
227
228    fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
229        trace!("{:?}", options);
230        let client = self.client.clone();
231        let stream = Gen::new(|co| async move {
232            let mut receiver = match client.server_streaming(options, 32).await {
233                Ok(receiver) => receiver,
234                Err(cause) => {
235                    co.yield_(AddProgressItem::Error(cause.into())).await;
236                    return;
237                }
238            };
239            loop {
240                match receiver.recv().await {
241                    Ok(Some(item)) => co.yield_(item).await,
242                    Err(cause) => {
243                        co.yield_(AddProgressItem::Error(cause.into())).await;
244                        break;
245                    }
246                    Ok(None) => break,
247                }
248            }
249        });
250        AddProgress::new(self, stream)
251    }
252
253    pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
254        self.add_path_with_opts(AddPathOptions {
255            path: path.as_ref().to_owned(),
256            mode: ImportMode::Copy,
257            format: BlobFormat::Raw,
258        })
259    }
260
261    pub async fn add_stream(
262        &self,
263        data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
264    ) -> AddProgress<'_> {
265        let inner = ImportByteStreamRequest {
266            format: crate::BlobFormat::Raw,
267            scope: Scope::default(),
268        };
269        let client = self.client.clone();
270        let stream = Gen::new(|co| async move {
271            let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
272                Ok(x) => x,
273                Err(cause) => {
274                    co.yield_(AddProgressItem::Error(cause.into())).await;
275                    return;
276                }
277            };
278            let recv = async {
279                loop {
280                    match receiver.recv().await {
281                        Ok(Some(item)) => co.yield_(item).await,
282                        Err(cause) => {
283                            co.yield_(AddProgressItem::Error(cause.into())).await;
284                            break;
285                        }
286                        Ok(None) => break,
287                    }
288                }
289            };
290            let send = async {
291                tokio::pin!(data);
292                while let Some(item) = data.next().await {
293                    sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
294                }
295                sender.send(ImportByteStreamUpdate::Done).await?;
296                anyhow::Ok(())
297            };
298            let _ = tokio::join!(send, recv);
299        });
300        AddProgress::new(self, stream)
301    }
302
303    pub fn export_ranges(
304        &self,
305        hash: impl Into<Hash>,
306        ranges: impl Into<RangeSet2<u64>>,
307    ) -> ExportRangesProgress {
308        self.export_ranges_with_opts(ExportRangesOptions {
309            hash: hash.into(),
310            ranges: ranges.into(),
311        })
312    }
313
314    pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
315        trace!("{options:?}");
316        ExportRangesProgress::new(
317            options.ranges.clone(),
318            self.client.server_streaming(options, 32),
319        )
320    }
321
322    pub fn export_bao_with_opts(
323        &self,
324        options: ExportBaoOptions,
325        local_update_cap: usize,
326    ) -> ExportBaoProgress {
327        trace!("{options:?}");
328        ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
329    }
330
331    pub fn export_bao(
332        &self,
333        hash: impl Into<Hash>,
334        ranges: impl Into<ChunkRanges>,
335    ) -> ExportBaoProgress {
336        self.export_bao_with_opts(
337            ExportBaoRequest {
338                hash: hash.into(),
339                ranges: ranges.into(),
340            },
341            32,
342        )
343    }
344
345    /// Export a single chunk from the given hash, at the given offset.
346    pub async fn export_chunk(
347        &self,
348        hash: impl Into<Hash>,
349        offset: u64,
350    ) -> super::ExportBaoResult<Leaf> {
351        let base = ChunkNum::full_chunks(offset);
352        let ranges = ChunkRanges::from(base..base + 1);
353        let mut stream = self.export_bao(hash, ranges).stream();
354        while let Some(item) = stream.next().await {
355            match item {
356                EncodedItem::Leaf(leaf) => return Ok(leaf),
357                EncodedItem::Parent(_) => {}
358                EncodedItem::Size(_) => {}
359                EncodedItem::Done => break,
360                EncodedItem::Error(cause) => return Err(cause.into()),
361            }
362        }
363        Err(io::Error::other("unexpected end of stream").into())
364    }
365
366    /// Get the entire blob into a Bytes
367    ///
368    /// This will run out of memory when called for very large blobs, so be careful!
369    pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
370        self.export_bao(hash.into(), ChunkRanges::all())
371            .data_to_bytes()
372            .await
373    }
374
375    /// Observe the bitfield of the given hash.
376    pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
377        self.observe_with_opts(ObserveOptions { hash: hash.into() })
378    }
379
380    pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
381        trace!("{:?}", options);
382        if options.hash == Hash::EMPTY {
383            return ObserveProgress::new(async move {
384                let (tx, rx) = mpsc::channel(1);
385                tx.send(Bitfield::complete(0)).await.ok();
386                Ok(rx)
387            });
388        }
389        ObserveProgress::new(self.client.server_streaming(options, 32))
390    }
391
392    pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
393        trace!("{:?}", options);
394        ExportProgress::new(self.client.server_streaming(options, 32))
395    }
396
397    pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
398        let options = ExportOptions {
399            hash: hash.into(),
400            mode: ExportMode::Copy,
401            target: target.as_ref().to_owned(),
402        };
403        self.export_with_opts(options)
404    }
405
406    /// Import BaoContentItems from a stream.
407    ///
408    /// The store assumes that these are already verified and in the correct order.
409    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
410    pub async fn import_bao(
411        &self,
412        hash: impl Into<Hash>,
413        size: NonZeroU64,
414        local_update_cap: usize,
415    ) -> irpc::Result<ImportBaoHandle> {
416        let options = ImportBaoRequest {
417            hash: hash.into(),
418            size,
419        };
420        self.import_bao_with_opts(options, local_update_cap).await
421    }
422
423    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
424    pub async fn import_bao_with_opts(
425        &self,
426        options: ImportBaoOptions,
427        local_update_cap: usize,
428    ) -> irpc::Result<ImportBaoHandle> {
429        trace!("{:?}", options);
430        ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
431    }
432
433    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
434    pub async fn import_bao_reader<R: crate::util::RecvStream>(
435        &self,
436        hash: Hash,
437        ranges: ChunkRanges,
438        mut reader: R,
439    ) -> RequestResult<R> {
440        let mut size = [0; 8];
441        reader
442            .recv_exact(&mut size)
443            .await
444            .map_err(super::Error::other)?;
445        let size = u64::from_le_bytes(size);
446        let Some(size) = NonZeroU64::new(size) else {
447            return if hash == Hash::EMPTY {
448                Ok(reader)
449            } else {
450                Err(super::Error::other("invalid size for hash").into())
451            };
452        };
453        let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
454        let mut decoder = ResponseDecoder::new(
455            hash.into(),
456            ranges,
457            tree,
458            RecvStreamAsyncStreamReader::new(reader),
459        );
460        let options = ImportBaoOptions { hash, size };
461        let handle = self.import_bao_with_opts(options, 32).await?;
462        let driver = async move {
463            let reader = loop {
464                match decoder.next().await {
465                    ResponseDecoderNext::More((rest, item)) => {
466                        handle.tx.send(item?).await?;
467                        decoder = rest;
468                    }
469                    ResponseDecoderNext::Done(reader) => break reader,
470                };
471            };
472            drop(handle.tx);
473            io::Result::Ok(reader)
474        };
475        let fut = async move { handle.rx.await.map_err(io::Error::other)? };
476        let (reader, res) = tokio::join!(driver, fut);
477        res?;
478        Ok(reader?.into_inner())
479    }
480
481    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
482    pub async fn import_bao_bytes(
483        &self,
484        hash: Hash,
485        ranges: ChunkRanges,
486        data: impl Into<Bytes>,
487    ) -> RequestResult<()> {
488        self.import_bao_reader(hash, ranges, data.into()).await?;
489        Ok(())
490    }
491
492    pub fn list(&self) -> BlobsListProgress {
493        let msg = ListRequest;
494        let client = self.client.clone();
495        BlobsListProgress::new(client.server_streaming(msg, 32))
496    }
497
498    pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
499        let hash = hash.into();
500        let msg = BlobStatusRequest { hash };
501        self.client.rpc(msg).await
502    }
503
504    pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
505        match self.status(hash).await? {
506            BlobStatus::Complete { .. } => Ok(true),
507            _ => Ok(false),
508        }
509    }
510
511    #[allow(dead_code)]
512    pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
513        let msg = ClearProtectedRequest;
514        self.client.rpc(msg).await??;
515        Ok(())
516    }
517}
518
519/// A progress handle for a batch scoped add operation.
520pub struct BatchAddProgress<'a>(AddProgress<'a>);
521
522impl<'a> IntoFuture for BatchAddProgress<'a> {
523    type Output = RequestResult<TempTag>;
524
525    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
526
527    fn into_future(self) -> Self::IntoFuture {
528        Box::pin(self.temp_tag())
529    }
530}
531
532impl<'a> BatchAddProgress<'a> {
533    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
534        self.0.with_named_tag(name).await
535    }
536
537    pub async fn with_tag(self) -> RequestResult<TagInfo> {
538        self.0.with_tag().await
539    }
540
541    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
542        self.0.stream().await
543    }
544
545    pub async fn temp_tag(self) -> RequestResult<TempTag> {
546        self.0.temp_tag().await
547    }
548}
549
550/// A batch of operations that modify the blob store.
551pub struct Batch<'a> {
552    scope: Scope,
553    blobs: &'a Blobs,
554    _tx: mpsc::Sender<BatchResponse>,
555}
556
557impl<'a> Batch<'a> {
558    pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
559        let options = ImportBytesRequest {
560            data: data.into(),
561            format: crate::BlobFormat::Raw,
562            scope: self.scope,
563        };
564        BatchAddProgress(self.blobs.add_bytes_impl(options))
565    }
566
567    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
568        let options = options.into();
569        BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
570            data: options.data,
571            format: options.format,
572            scope: self.scope,
573        }))
574    }
575
576    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
577        let options = ImportBytesRequest {
578            data: Bytes::copy_from_slice(data.as_ref()),
579            format: crate::BlobFormat::Raw,
580            scope: self.scope,
581        };
582        BatchAddProgress(self.blobs.add_bytes_impl(options))
583    }
584
585    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
586        let options = options.into();
587        BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
588            path: options.path,
589            mode: options.mode,
590            format: options.format,
591            scope: self.scope,
592        }))
593    }
594
595    pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
596        let value = value.into();
597        let msg = CreateTempTagRequest {
598            scope: self.scope,
599            value,
600        };
601        self.blobs.client.rpc(msg).await
602    }
603}
604
605/// Options for adding data from a file system path.
606#[derive(Debug)]
607pub struct AddPathOptions {
608    pub path: PathBuf,
609    pub format: BlobFormat,
610    pub mode: ImportMode,
611}
612
613/// A progress handle for an import operation.
614///
615/// Internally this is a stream of [`AddProgressItem`] items. Working with this
616/// stream directly can be inconvenient, so this struct provides some convenience
617/// methods to work with the result.
618///
619/// It also implements [`IntoFuture`], so you can await it to get the [`TempTag`] that
620/// contains the hash of the added content and also protects the content.
621///
622/// If you want access to the stream, you can use the [`AddProgress::stream`] method.
623pub struct AddProgress<'a> {
624    blobs: &'a Blobs,
625    inner: stream::Boxed<AddProgressItem>,
626}
627
628impl<'a> IntoFuture for AddProgress<'a> {
629    type Output = RequestResult<TagInfo>;
630
631    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
632
633    fn into_future(self) -> Self::IntoFuture {
634        Box::pin(self.with_tag())
635    }
636}
637
638impl<'a> AddProgress<'a> {
639    fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
640        Self {
641            blobs,
642            inner: Box::pin(stream),
643        }
644    }
645
646    pub async fn temp_tag(self) -> RequestResult<TempTag> {
647        let mut stream = self.inner;
648        while let Some(item) = stream.next().await {
649            match item {
650                AddProgressItem::Done(tt) => return Ok(tt),
651                AddProgressItem::Error(e) => return Err(e.into()),
652                _ => {}
653            }
654        }
655        Err(super::Error::other("unexpected end of stream").into())
656    }
657
658    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
659        let blobs = self.blobs.clone();
660        let tt = self.temp_tag().await?;
661        let haf = *tt.hash_and_format();
662        let tags = Tags::ref_from_sender(&blobs.client);
663        tags.set(name, *tt.hash_and_format()).await?;
664        drop(tt);
665        Ok(haf)
666    }
667
668    pub async fn with_tag(self) -> RequestResult<TagInfo> {
669        let blobs = self.blobs.clone();
670        let tt = self.temp_tag().await?;
671        let hash = *tt.hash();
672        let format = tt.format();
673        let tags = Tags::ref_from_sender(&blobs.client);
674        let name = tags.create(*tt.hash_and_format()).await?;
675        drop(tt);
676        Ok(TagInfo { name, hash, format })
677    }
678
679    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
680        self.inner
681    }
682}
683
684/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
685#[derive(Debug, Clone, Serialize, Deserialize)]
686pub struct ReaderOptions {
687    pub hash: Hash,
688}
689
690/// An observe result. Awaiting this will return the current state.
691///
692/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
693/// the first item is the current state and subsequent items are updates.
694pub struct ObserveProgress {
695    inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
696}
697
698impl IntoFuture for ObserveProgress {
699    type Output = RequestResult<Bitfield>;
700
701    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
702
703    fn into_future(self) -> Self::IntoFuture {
704        Box::pin(async move {
705            let mut rx = self.inner.await?;
706            match rx.recv().await? {
707                Some(bitfield) => Ok(bitfield),
708                None => Err(super::Error::other("unexpected end of stream").into()),
709            }
710        })
711    }
712}
713
714impl ObserveProgress {
715    fn new(
716        fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
717    ) -> Self {
718        Self {
719            inner: Box::pin(fut),
720        }
721    }
722
723    pub async fn await_completion(self) -> RequestResult<Bitfield> {
724        let mut stream = self.stream().await?;
725        while let Some(item) = stream.next().await {
726            if item.is_complete() {
727                return Ok(item);
728            }
729        }
730        Err(super::Error::other("unexpected end of stream").into())
731    }
732
733    /// Returns an infinite stream of bitfields. The first bitfield is the
734    /// current state, and the following bitfields are updates.
735    ///
736    /// Once a blob is complete, there will be no more updates.
737    pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
738        let mut rx = self.inner.await?;
739        Ok(Gen::new(|co| async move {
740            while let Ok(Some(item)) = rx.recv().await {
741                co.yield_(item).await;
742            }
743        }))
744    }
745}
746
747/// A progress handle for an export operation.
748///
749/// Internally this is a stream of [`ExportProgress`] items. Working with this
750/// stream directly can be inconvenient, so this struct provides some convenience
751/// methods to work with the result.
752///
753/// To get the underlying stream, use the [`ExportProgress::stream`] method.
754///
755/// It also implements [`IntoFuture`], so you can await it to get the size of the
756/// exported blob.
757pub struct ExportProgress {
758    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
759}
760
761impl IntoFuture for ExportProgress {
762    type Output = RequestResult<u64>;
763
764    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
765
766    fn into_future(self) -> Self::IntoFuture {
767        Box::pin(self.finish())
768    }
769}
770
771impl ExportProgress {
772    fn new(
773        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
774    ) -> Self {
775        Self {
776            inner: Box::pin(fut),
777        }
778    }
779
780    pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
781        Gen::new(|co| async move {
782            let mut rx = match self.inner.await {
783                Ok(rx) => rx,
784                Err(e) => {
785                    co.yield_(ExportProgressItem::Error(e.into())).await;
786                    return;
787                }
788            };
789            while let Ok(Some(item)) = rx.recv().await {
790                co.yield_(item).await;
791            }
792        })
793    }
794
795    pub async fn finish(self) -> RequestResult<u64> {
796        let mut rx = self.inner.await?;
797        let mut size = None;
798        loop {
799            match rx.recv().await? {
800                Some(ExportProgressItem::Done) => break,
801                Some(ExportProgressItem::Size(s)) => size = Some(s),
802                Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
803                _ => {}
804            }
805        }
806        if let Some(size) = size {
807            Ok(size)
808        } else {
809            Err(super::Error::other("unexpected end of stream").into())
810        }
811    }
812}
813
814/// A handle for an ongoing bao import operation.
815pub struct ImportBaoHandle {
816    pub tx: mpsc::Sender<BaoContentItem>,
817    pub rx: oneshot::Receiver<super::Result<()>>,
818}
819
820impl ImportBaoHandle {
821    pub(crate) async fn new(
822        fut: impl Future<
823                Output = irpc::Result<(
824                    mpsc::Sender<BaoContentItem>,
825                    oneshot::Receiver<super::Result<()>>,
826                )>,
827            > + Send
828            + 'static,
829    ) -> irpc::Result<Self> {
830        let (tx, rx) = fut.await?;
831        Ok(Self { tx, rx })
832    }
833}
834
835/// A progress handle for a blobs list operation.
836pub struct BlobsListProgress {
837    inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
838}
839
840impl BlobsListProgress {
841    fn new(
842        fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
843    ) -> Self {
844        Self {
845            inner: Box::pin(fut),
846        }
847    }
848
849    pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
850        let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
851        let mut hashes = Vec::new();
852        while let Some(item) = rx.recv().await? {
853            hashes.push(item?);
854        }
855        Ok(hashes)
856    }
857
858    pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
859        let mut rx = self.inner.await?;
860        Ok(Gen::new(|co| async move {
861            while let Ok(Some(item)) = rx.recv().await {
862                co.yield_(item).await;
863            }
864        }))
865    }
866}
867
868/// A progress handle for a bao export operation.
869///
870/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
871/// is often inconvenient, so there are a number of higher level methods to
872/// process the stream.
873///
874/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
875pub struct ExportRangesProgress {
876    ranges: RangeSet2<u64>,
877    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
878}
879
880impl ExportRangesProgress {
881    fn new(
882        ranges: RangeSet2<u64>,
883        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
884    ) -> Self {
885        Self {
886            ranges,
887            inner: Box::pin(fut),
888        }
889    }
890}
891
892impl ExportRangesProgress {
893    /// A raw stream of [`ExportRangesItem`]s.
894    ///
895    /// Ranges will be rounded up to chunk boundaries. So if you request a
896    /// range of 0..100, you will get the entire first chunk, 0..1024.
897    ///
898    /// It is up to the caller to clip the ranges to the requested ranges.
899    pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
900        Gen::new(|co| async move {
901            let mut rx = match self.inner.await {
902                Ok(rx) => rx,
903                Err(e) => {
904                    co.yield_(ExportRangesItem::Error(e.into())).await;
905                    return;
906                }
907            };
908            while let Ok(Some(item)) = rx.recv().await {
909                co.yield_(item).await;
910            }
911        })
912    }
913
914    /// Concatenate all the data into a single `Bytes`.
915    pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
916        let mut rx = self.inner.await?;
917        let mut data = BTreeMap::new();
918        while let Some(item) = rx.recv().await? {
919            match item {
920                ExportRangesItem::Size(_) => {}
921                ExportRangesItem::Data(leaf) => {
922                    data.insert(leaf.offset, leaf.data);
923                }
924                ExportRangesItem::Error(cause) => return Err(cause.into()),
925            }
926        }
927        let mut res = Vec::new();
928        for range in self.ranges.iter() {
929            let (start, end) = match range {
930                RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
931                RangeSetRange::Range(range) => (*range.start, *range.end),
932            };
933            for (offset, data) in data.iter() {
934                let cstart = *offset;
935                let cend = *offset + (data.len() as u64);
936                if cstart >= end || cend <= start {
937                    continue;
938                }
939                let start = start.max(cstart);
940                let end = end.min(cend);
941                let data = &data[(start - cstart) as usize..(end - cstart) as usize];
942                res.extend_from_slice(data);
943            }
944        }
945        Ok(res)
946    }
947}
948
949/// A progress handle for a bao export operation.
950///
951/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
952/// is often inconvenient, so there are a number of higher level methods to
953/// process the stream.
954///
955/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
956pub struct ExportBaoProgress {
957    inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
958}
959
960impl ExportBaoProgress {
961    fn new(
962        fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
963    ) -> Self {
964        Self {
965            inner: Box::pin(fut),
966        }
967    }
968
969    /// Interprets this blob as a hash sequence and returns a stream of hashes.
970    ///
971    /// Errors will be reported, but the iterator will nevertheless continue.
972    /// If you get an error despite having asked for ranges that should be present,
973    /// this means that the data is corrupted. It can still make sense to continue
974    /// to get all non-corrupted sections.
975    pub fn hashes_with_index(
976        self,
977    ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
978        let mut stream = self.stream();
979        Gen::new(|co| async move {
980            while let Some(item) = stream.next().await {
981                let leaf = match item {
982                    EncodedItem::Leaf(leaf) => leaf,
983                    EncodedItem::Error(e) => {
984                        co.yield_(Err(e.into())).await;
985                        continue;
986                    }
987                    _ => continue,
988                };
989                let slice = match HashSeqChunk::try_from(leaf) {
990                    Ok(slice) => slice,
991                    Err(e) => {
992                        co.yield_(Err(e)).await;
993                        continue;
994                    }
995                };
996                let offset = slice.base();
997                for (o, hash) in slice.into_iter().enumerate() {
998                    co.yield_(Ok((offset + o as u64, hash))).await;
999                }
1000            }
1001        })
1002    }
1003
1004    /// Same as [`Self::hashes_with_index`], but without the indexes.
1005    pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
1006        self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1007    }
1008
1009    pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1010        let mut data = Vec::new();
1011        let mut stream = self.into_byte_stream();
1012        while let Some(item) = stream.next().await {
1013            data.extend_from_slice(&item?);
1014        }
1015        Ok(data)
1016    }
1017
1018    pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1019        let mut rx = self.inner.await?;
1020        let mut data = Vec::new();
1021        while let Some(item) = rx.recv().await? {
1022            match item {
1023                EncodedItem::Leaf(leaf) => {
1024                    data.push(leaf.data);
1025                }
1026                EncodedItem::Parent(_) => {}
1027                EncodedItem::Size(_) => {}
1028                EncodedItem::Done => break,
1029                EncodedItem::Error(cause) => return Err(cause.into()),
1030            }
1031        }
1032        if data.len() == 1 {
1033            Ok(data.pop().unwrap())
1034        } else {
1035            let mut out = Vec::new();
1036            for item in data {
1037                out.extend_from_slice(&item);
1038            }
1039            Ok(out.into())
1040        }
1041    }
1042
1043    pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1044        let mut rx = self.inner.await?;
1045        let mut data = Vec::new();
1046        while let Some(item) = rx.recv().await? {
1047            match item {
1048                EncodedItem::Leaf(leaf) => {
1049                    data.extend_from_slice(&leaf.data);
1050                }
1051                EncodedItem::Parent(_) => {}
1052                EncodedItem::Size(_) => {}
1053                EncodedItem::Done => break,
1054                EncodedItem::Error(cause) => return Err(cause.into()),
1055            }
1056        }
1057        Ok(data)
1058    }
1059
1060    pub async fn write<W: AsyncStreamWriter>(self, target: &mut W) -> super::ExportBaoResult<()> {
1061        let mut rx = self.inner.await?;
1062        while let Some(item) = rx.recv().await? {
1063            match item {
1064                EncodedItem::Size(size) => {
1065                    target.write(&size.to_le_bytes()).await?;
1066                }
1067                EncodedItem::Parent(parent) => {
1068                    let mut data = vec![0u8; 64];
1069                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1070                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1071                    target.write(&data).await?;
1072                }
1073                EncodedItem::Leaf(leaf) => {
1074                    target.write_bytes(leaf.data).await?;
1075                }
1076                EncodedItem::Done => break,
1077                EncodedItem::Error(cause) => return Err(cause.into()),
1078            }
1079        }
1080        Ok(())
1081    }
1082
1083    /// Write quinn variant that also feeds a progress writer.
1084    pub(crate) async fn write_with_progress<W: crate::util::SendStream>(
1085        self,
1086        writer: &mut W,
1087        progress: &mut impl WriteProgress,
1088        hash: &Hash,
1089        index: u64,
1090    ) -> super::ExportBaoResult<()> {
1091        let mut rx = self.inner.await?;
1092        while let Some(item) = rx.recv().await? {
1093            match item {
1094                EncodedItem::Size(size) => {
1095                    progress.send_transfer_started(index, hash, size).await;
1096                    writer.send(&size.to_le_bytes()).await?;
1097                    progress.log_other_write(8);
1098                }
1099                EncodedItem::Parent(parent) => {
1100                    let mut data = [0u8; 64];
1101                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1102                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1103                    writer.send(&data).await?;
1104                    progress.log_other_write(64);
1105                }
1106                EncodedItem::Leaf(leaf) => {
1107                    let len = leaf.data.len();
1108                    writer.send_bytes(leaf.data).await?;
1109                    progress
1110                        .notify_payload_write(index, leaf.offset, len)
1111                        .await?;
1112                }
1113                EncodedItem::Done => break,
1114                EncodedItem::Error(cause) => return Err(cause.into()),
1115            }
1116        }
1117        Ok(())
1118    }
1119
1120    pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1121        self.stream().filter_map(|item| match item {
1122            EncodedItem::Size(size) => {
1123                let size = size.to_le_bytes().to_vec().into();
1124                Some(Ok(size))
1125            }
1126            EncodedItem::Parent(parent) => {
1127                let mut data = vec![0u8; 64];
1128                data[..32].copy_from_slice(parent.pair.0.as_bytes());
1129                data[32..].copy_from_slice(parent.pair.1.as_bytes());
1130                Some(Ok(data.into()))
1131            }
1132            EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1133            EncodedItem::Done => None,
1134            EncodedItem::Error(cause) => Some(Err(cause.into())),
1135        })
1136    }
1137
1138    pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1139        Gen::new(|co| async move {
1140            let mut rx = match self.inner.await {
1141                Ok(rx) => rx,
1142                Err(cause) => {
1143                    co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1144                        .await;
1145                    return;
1146                }
1147            };
1148            while let Ok(Some(item)) = rx.recv().await {
1149                co.yield_(item).await;
1150            }
1151        })
1152    }
1153}
1154
1155pub(crate) trait WriteProgress {
1156    /// Notify the progress writer that a payload write has happened.
1157    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) -> ClientResult;
1158
1159    /// Log a write of some other data.
1160    fn log_other_write(&mut self, len: usize);
1161
1162    /// Notify the progress writer that a transfer has started.
1163    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1164}