iroh_blobs/api/
blobs.rs

1//! API to interact with a local blob store
2//!
3//! This API is for local interactions with the blob store, such as importing
4//! and exporting blobs, observing the bitfield of a blob, and deleting blobs.
5//!
6//! The main entry point is the [`Blobs`] struct.
7use std::{
8    collections::BTreeMap,
9    future::{Future, IntoFuture},
10    io,
11    num::NonZeroU64,
12    path::{Path, PathBuf},
13    pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18    io::{
19        fsm::{ResponseDecoder, ResponseDecoderNext},
20        BaoContentItem, Leaf,
21    },
22    BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::AsyncStreamWriter;
27use irpc::channel::{mpsc, oneshot};
28use n0_error::AnyError;
29use n0_future::{future, stream, Stream, StreamExt};
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use serde::{Deserialize, Serialize};
33use tracing::trace;
34mod reader;
35pub use reader::BlobReader;
36
37// Public reexports from the proto module.
38//
39// Due to the fact that the proto module is hidden from docs by default,
40// these will appear in the docs as if they were declared here.
41pub use super::proto::{
42    AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
43    ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
44    ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
45    ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
46};
47use super::{
48    proto::{
49        BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
50        ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
51        ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
52    },
53    remote::HashSeqChunk,
54    tags::TagInfo,
55    ApiClient, RequestResult, Tags,
56};
57use crate::{
58    api::proto::{BatchRequest, ImportByteStreamUpdate},
59    provider::events::ClientResult,
60    store::IROH_BLOCK_SIZE,
61    util::{temp_tag::TempTag, RecvStreamAsyncStreamReader},
62    BlobFormat, Hash, HashAndFormat,
63};
64
65/// Options for adding bytes.
66#[derive(Debug)]
67pub struct AddBytesOptions {
68    pub data: Bytes,
69    pub format: BlobFormat,
70}
71
72impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
73    fn from(item: (T, BlobFormat)) -> Self {
74        let (data, format) = item;
75        Self {
76            data: data.into(),
77            format,
78        }
79    }
80}
81
82/// Blobs API
83#[derive(Debug, Clone, ref_cast::RefCast)]
84#[repr(transparent)]
85pub struct Blobs {
86    client: ApiClient,
87}
88
89impl Blobs {
90    pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
91        Self::ref_cast(sender)
92    }
93
94    pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
95        let msg = BatchRequest;
96        trace!("{msg:?}");
97        let (tx, rx) = self.client.client_streaming(msg, 32).await?;
98        let scope = rx.await?;
99
100        Ok(Batch {
101            scope,
102            blobs: self,
103            _tx: tx,
104        })
105    }
106
107    /// Create a reader for the given hash. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
108    /// and therefore can be used to read the blob's content.
109    ///
110    /// Any access to parts of the blob that are not present will result in an error.
111    ///
112    /// Example:
113    /// ```rust
114    /// use iroh_blobs::{store::mem::MemStore, api::blobs::Blobs};
115    /// use tokio::io::AsyncReadExt;
116    ///
117    /// # async fn example() -> n0_error::Result<()> {
118    /// let store = MemStore::new();
119    /// let tag = store.add_slice(b"Hello, world!").await?;
120    /// let mut reader = store.reader(tag.hash);
121    /// let mut buf = String::new();
122    /// reader.read_to_string(&mut buf).await?;
123    /// assert_eq!(buf, "Hello, world!");
124    /// # Ok(())
125    /// }
126    /// ```
127    pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
128        self.reader_with_opts(ReaderOptions { hash: hash.into() })
129    }
130
131    /// Create a reader for the given options. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
132    /// and therefore can be used to read the blob's content.
133    ///
134    /// Any access to parts of the blob that are not present will result in an error.
135    pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
136        BlobReader::new(self.clone(), options)
137    }
138
139    /// Delete a blob.
140    ///
141    /// This function is not public, because it does not work as expected when called manually,
142    /// because blobs are protected from deletion. This is only called from the gc task, which
143    /// clears the protections before.
144    ///
145    /// Users should rely only on garbage collection for blob deletion.
146    pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
147        trace!("{options:?}");
148        self.client.rpc(options).await??;
149        Ok(())
150    }
151
152    /// See [`Self::delete_with_opts`].
153    pub(crate) async fn delete(
154        &self,
155        hashes: impl IntoIterator<Item = impl Into<Hash>>,
156    ) -> RequestResult<()> {
157        self.delete_with_opts(DeleteOptions {
158            hashes: hashes.into_iter().map(Into::into).collect(),
159            force: false,
160        })
161        .await
162    }
163
164    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
165        let options = ImportBytesRequest {
166            data: Bytes::copy_from_slice(data.as_ref()),
167            format: crate::BlobFormat::Raw,
168            scope: Scope::GLOBAL,
169        };
170        self.add_bytes_impl(options)
171    }
172
173    pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
174        let options = ImportBytesRequest {
175            data: data.into(),
176            format: crate::BlobFormat::Raw,
177            scope: Scope::GLOBAL,
178        };
179        self.add_bytes_impl(options)
180    }
181
182    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
183        let options = options.into();
184        let request = ImportBytesRequest {
185            data: options.data,
186            format: options.format,
187            scope: Scope::GLOBAL,
188        };
189        self.add_bytes_impl(request)
190    }
191
192    fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
193        trace!("{options:?}");
194        let this = self.clone();
195        let stream = Gen::new(|co| async move {
196            let mut receiver = match this.client.server_streaming(options, 32).await {
197                Ok(receiver) => receiver,
198                Err(cause) => {
199                    co.yield_(AddProgressItem::Error(cause.into())).await;
200                    return;
201                }
202            };
203            loop {
204                match receiver.recv().await {
205                    Ok(Some(item)) => co.yield_(item).await,
206                    Err(cause) => {
207                        co.yield_(AddProgressItem::Error(cause.into())).await;
208                        break;
209                    }
210                    Ok(None) => break,
211                }
212            }
213        });
214        AddProgress::new(self, stream)
215    }
216
217    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
218        let options = options.into();
219        self.add_path_with_opts_impl(ImportPathRequest {
220            path: options.path,
221            mode: options.mode,
222            format: options.format,
223            scope: Scope::GLOBAL,
224        })
225    }
226
227    fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
228        trace!("{:?}", options);
229        let client = self.client.clone();
230        let stream = Gen::new(|co| async move {
231            let mut receiver = match client.server_streaming(options, 32).await {
232                Ok(receiver) => receiver,
233                Err(cause) => {
234                    co.yield_(AddProgressItem::Error(cause.into())).await;
235                    return;
236                }
237            };
238            loop {
239                match receiver.recv().await {
240                    Ok(Some(item)) => co.yield_(item).await,
241                    Err(cause) => {
242                        co.yield_(AddProgressItem::Error(cause.into())).await;
243                        break;
244                    }
245                    Ok(None) => break,
246                }
247            }
248        });
249        AddProgress::new(self, stream)
250    }
251
252    pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
253        self.add_path_with_opts(AddPathOptions {
254            path: path.as_ref().to_owned(),
255            mode: ImportMode::Copy,
256            format: BlobFormat::Raw,
257        })
258    }
259
260    pub async fn add_stream(
261        &self,
262        data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
263    ) -> AddProgress<'_> {
264        let inner = ImportByteStreamRequest {
265            format: crate::BlobFormat::Raw,
266            scope: Scope::default(),
267        };
268        let client = self.client.clone();
269        let stream = Gen::new(|co| async move {
270            let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
271                Ok(x) => x,
272                Err(cause) => {
273                    co.yield_(AddProgressItem::Error(cause.into())).await;
274                    return;
275                }
276            };
277            let recv = async {
278                loop {
279                    match receiver.recv().await {
280                        Ok(Some(item)) => co.yield_(item).await,
281                        Err(cause) => {
282                            co.yield_(AddProgressItem::Error(cause.into())).await;
283                            break;
284                        }
285                        Ok(None) => break,
286                    }
287                }
288            };
289            let send = async {
290                tokio::pin!(data);
291                while let Some(item) = data.next().await {
292                    sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
293                }
294                sender.send(ImportByteStreamUpdate::Done).await?;
295                n0_error::Ok(())
296            };
297            let _ = tokio::join!(send, recv);
298        });
299        AddProgress::new(self, stream)
300    }
301
302    pub fn export_ranges(
303        &self,
304        hash: impl Into<Hash>,
305        ranges: impl Into<RangeSet2<u64>>,
306    ) -> ExportRangesProgress {
307        self.export_ranges_with_opts(ExportRangesOptions {
308            hash: hash.into(),
309            ranges: ranges.into(),
310        })
311    }
312
313    pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
314        trace!("{options:?}");
315        ExportRangesProgress::new(
316            options.ranges.clone(),
317            self.client.server_streaming(options, 32),
318        )
319    }
320
321    pub fn export_bao_with_opts(
322        &self,
323        options: ExportBaoOptions,
324        local_update_cap: usize,
325    ) -> ExportBaoProgress {
326        trace!("{options:?}");
327        ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
328    }
329
330    pub fn export_bao(
331        &self,
332        hash: impl Into<Hash>,
333        ranges: impl Into<ChunkRanges>,
334    ) -> ExportBaoProgress {
335        self.export_bao_with_opts(
336            ExportBaoRequest {
337                hash: hash.into(),
338                ranges: ranges.into(),
339            },
340            32,
341        )
342    }
343
344    /// Export a single chunk from the given hash, at the given offset.
345    pub async fn export_chunk(
346        &self,
347        hash: impl Into<Hash>,
348        offset: u64,
349    ) -> super::ExportBaoResult<Leaf> {
350        let base = ChunkNum::full_chunks(offset);
351        let ranges = ChunkRanges::from(base..base + 1);
352        let mut stream = self.export_bao(hash, ranges).stream();
353        while let Some(item) = stream.next().await {
354            match item {
355                EncodedItem::Leaf(leaf) => return Ok(leaf),
356                EncodedItem::Parent(_) => {}
357                EncodedItem::Size(_) => {}
358                EncodedItem::Done => break,
359                EncodedItem::Error(cause) => return Err(cause.into()),
360            }
361        }
362        Err(io::Error::other("unexpected end of stream").into())
363    }
364
365    /// Get the entire blob into a Bytes
366    ///
367    /// This will run out of memory when called for very large blobs, so be careful!
368    pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
369        self.export_bao(hash.into(), ChunkRanges::all())
370            .data_to_bytes()
371            .await
372    }
373
374    /// Observe the bitfield of the given hash.
375    pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
376        self.observe_with_opts(ObserveOptions { hash: hash.into() })
377    }
378
379    pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
380        trace!("{:?}", options);
381        if options.hash == Hash::EMPTY {
382            return ObserveProgress::new(async move {
383                let (tx, rx) = mpsc::channel(1);
384                tx.send(Bitfield::complete(0)).await.ok();
385                Ok(rx)
386            });
387        }
388        ObserveProgress::new(self.client.server_streaming(options, 32))
389    }
390
391    pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
392        trace!("{:?}", options);
393        ExportProgress::new(self.client.server_streaming(options, 32))
394    }
395
396    pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
397        let options = ExportOptions {
398            hash: hash.into(),
399            mode: ExportMode::Copy,
400            target: target.as_ref().to_owned(),
401        };
402        self.export_with_opts(options)
403    }
404
405    /// Import BaoContentItems from a stream.
406    ///
407    /// The store assumes that these are already verified and in the correct order.
408    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
409    pub async fn import_bao(
410        &self,
411        hash: impl Into<Hash>,
412        size: NonZeroU64,
413        local_update_cap: usize,
414    ) -> irpc::Result<ImportBaoHandle> {
415        let options = ImportBaoRequest {
416            hash: hash.into(),
417            size,
418        };
419        self.import_bao_with_opts(options, local_update_cap).await
420    }
421
422    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
423    pub async fn import_bao_with_opts(
424        &self,
425        options: ImportBaoOptions,
426        local_update_cap: usize,
427    ) -> irpc::Result<ImportBaoHandle> {
428        trace!("{:?}", options);
429        ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
430    }
431
432    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
433    pub async fn import_bao_reader<R: crate::util::RecvStream>(
434        &self,
435        hash: Hash,
436        ranges: ChunkRanges,
437        mut reader: R,
438    ) -> RequestResult<R> {
439        let mut size = [0; 8];
440        reader
441            .recv_exact(&mut size)
442            .await
443            .map_err(super::Error::other)?;
444        let size = u64::from_le_bytes(size);
445        let Some(size) = NonZeroU64::new(size) else {
446            return if hash == Hash::EMPTY {
447                Ok(reader)
448            } else {
449                Err(super::Error::other("invalid size for hash").into())
450            };
451        };
452        let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
453        let mut decoder = ResponseDecoder::new(
454            hash.into(),
455            ranges,
456            tree,
457            RecvStreamAsyncStreamReader::new(reader),
458        );
459        let options = ImportBaoOptions { hash, size };
460        let handle = self.import_bao_with_opts(options, 32).await?;
461        let driver = async move {
462            let reader = loop {
463                match decoder.next().await {
464                    ResponseDecoderNext::More((rest, item)) => {
465                        handle.tx.send(item?).await?;
466                        decoder = rest;
467                    }
468                    ResponseDecoderNext::Done(reader) => break reader,
469                };
470            };
471            drop(handle.tx);
472            io::Result::Ok(reader)
473        };
474        let fut = async move { handle.rx.await.map_err(io::Error::other)? };
475        let (reader, res) = tokio::join!(driver, fut);
476        res?;
477        Ok(reader?.into_inner())
478    }
479
480    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
481    pub async fn import_bao_bytes(
482        &self,
483        hash: Hash,
484        ranges: ChunkRanges,
485        data: impl Into<Bytes>,
486    ) -> RequestResult<()> {
487        self.import_bao_reader(hash, ranges, data.into()).await?;
488        Ok(())
489    }
490
491    pub fn list(&self) -> BlobsListProgress {
492        let msg = ListRequest;
493        let client = self.client.clone();
494        BlobsListProgress::new(client.server_streaming(msg, 32))
495    }
496
497    pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
498        let hash = hash.into();
499        let msg = BlobStatusRequest { hash };
500        self.client.rpc(msg).await
501    }
502
503    pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
504        match self.status(hash).await? {
505            BlobStatus::Complete { .. } => Ok(true),
506            _ => Ok(false),
507        }
508    }
509
510    #[allow(dead_code)]
511    pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
512        let msg = ClearProtectedRequest;
513        self.client.rpc(msg).await??;
514        Ok(())
515    }
516}
517
518/// A progress handle for a batch scoped add operation.
519pub struct BatchAddProgress<'a>(AddProgress<'a>);
520
521impl<'a> IntoFuture for BatchAddProgress<'a> {
522    type Output = RequestResult<TempTag>;
523
524    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
525
526    fn into_future(self) -> Self::IntoFuture {
527        Box::pin(self.temp_tag())
528    }
529}
530
531impl<'a> BatchAddProgress<'a> {
532    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
533        self.0.with_named_tag(name).await
534    }
535
536    pub async fn with_tag(self) -> RequestResult<TagInfo> {
537        self.0.with_tag().await
538    }
539
540    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
541        self.0.stream().await
542    }
543
544    pub async fn temp_tag(self) -> RequestResult<TempTag> {
545        self.0.temp_tag().await
546    }
547}
548
549/// A batch of operations that modify the blob store.
550pub struct Batch<'a> {
551    scope: Scope,
552    blobs: &'a Blobs,
553    _tx: mpsc::Sender<BatchResponse>,
554}
555
556impl<'a> Batch<'a> {
557    pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
558        let options = ImportBytesRequest {
559            data: data.into(),
560            format: crate::BlobFormat::Raw,
561            scope: self.scope,
562        };
563        BatchAddProgress(self.blobs.add_bytes_impl(options))
564    }
565
566    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
567        let options = options.into();
568        BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
569            data: options.data,
570            format: options.format,
571            scope: self.scope,
572        }))
573    }
574
575    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
576        let options = ImportBytesRequest {
577            data: Bytes::copy_from_slice(data.as_ref()),
578            format: crate::BlobFormat::Raw,
579            scope: self.scope,
580        };
581        BatchAddProgress(self.blobs.add_bytes_impl(options))
582    }
583
584    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
585        let options = options.into();
586        BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
587            path: options.path,
588            mode: options.mode,
589            format: options.format,
590            scope: self.scope,
591        }))
592    }
593
594    pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
595        let value = value.into();
596        let msg = CreateTempTagRequest {
597            scope: self.scope,
598            value,
599        };
600        self.blobs.client.rpc(msg).await
601    }
602}
603
604/// Options for adding data from a file system path.
605#[derive(Debug)]
606pub struct AddPathOptions {
607    pub path: PathBuf,
608    pub format: BlobFormat,
609    pub mode: ImportMode,
610}
611
612/// A progress handle for an import operation.
613///
614/// Internally this is a stream of [`AddProgressItem`] items. Working with this
615/// stream directly can be inconvenient, so this struct provides some convenience
616/// methods to work with the result.
617///
618/// It also implements [`IntoFuture`], so you can await it to get the [`TagInfo`] that
619/// contains the hash of the added content and also protects the content.
620///
621/// If you want access to the stream, you can use the [`AddProgress::stream`] method.
622pub struct AddProgress<'a> {
623    blobs: &'a Blobs,
624    inner: stream::Boxed<AddProgressItem>,
625}
626
627impl<'a> IntoFuture for AddProgress<'a> {
628    type Output = RequestResult<TagInfo>;
629
630    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
631
632    fn into_future(self) -> Self::IntoFuture {
633        Box::pin(self.with_tag())
634    }
635}
636
637impl<'a> AddProgress<'a> {
638    fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
639        Self {
640            blobs,
641            inner: Box::pin(stream),
642        }
643    }
644
645    pub async fn temp_tag(self) -> RequestResult<TempTag> {
646        let mut stream = self.inner;
647        while let Some(item) = stream.next().await {
648            match item {
649                AddProgressItem::Done(tt) => return Ok(tt),
650                AddProgressItem::Error(e) => return Err(e.into()),
651                _ => {}
652            }
653        }
654        Err(super::Error::other("unexpected end of stream").into())
655    }
656
657    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
658        let blobs = self.blobs.clone();
659        let tt = self.temp_tag().await?;
660        let haf = tt.hash_and_format();
661        let tags = Tags::ref_from_sender(&blobs.client);
662        tags.set(name, haf).await?;
663        drop(tt);
664        Ok(haf)
665    }
666
667    pub async fn with_tag(self) -> RequestResult<TagInfo> {
668        let blobs = self.blobs.clone();
669        let tt = self.temp_tag().await?;
670        let hash = tt.hash();
671        let format = tt.format();
672        let tags = Tags::ref_from_sender(&blobs.client);
673        let name = tags.create(tt.hash_and_format()).await?;
674        drop(tt);
675        Ok(TagInfo { name, hash, format })
676    }
677
678    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
679        self.inner
680    }
681}
682
683/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
684#[derive(Debug, Clone, Serialize, Deserialize)]
685pub struct ReaderOptions {
686    pub hash: Hash,
687}
688
689/// An observe result. Awaiting this will return the current state.
690///
691/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
692/// the first item is the current state and subsequent items are updates.
693pub struct ObserveProgress {
694    inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
695}
696
697impl IntoFuture for ObserveProgress {
698    type Output = RequestResult<Bitfield>;
699
700    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
701
702    fn into_future(self) -> Self::IntoFuture {
703        Box::pin(async move {
704            let mut rx = self.inner.await?;
705            match rx.recv().await? {
706                Some(bitfield) => Ok(bitfield),
707                None => Err(super::Error::other("unexpected end of stream").into()),
708            }
709        })
710    }
711}
712
713impl ObserveProgress {
714    fn new(
715        fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
716    ) -> Self {
717        Self {
718            inner: Box::pin(fut),
719        }
720    }
721
722    pub async fn await_completion(self) -> RequestResult<Bitfield> {
723        let mut stream = self.stream().await?;
724        while let Some(item) = stream.next().await {
725            if item.is_complete() {
726                return Ok(item);
727            }
728        }
729        Err(super::Error::other("unexpected end of stream").into())
730    }
731
732    /// Returns an infinite stream of bitfields. The first bitfield is the
733    /// current state, and the following bitfields are updates.
734    ///
735    /// Once a blob is complete, there will be no more updates.
736    pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
737        let mut rx = self.inner.await?;
738        Ok(Gen::new(|co| async move {
739            while let Ok(Some(item)) = rx.recv().await {
740                co.yield_(item).await;
741            }
742        }))
743    }
744}
745
746/// A progress handle for an export operation.
747///
748/// Internally this is a stream of [`ExportProgress`] items. Working with this
749/// stream directly can be inconvenient, so this struct provides some convenience
750/// methods to work with the result.
751///
752/// To get the underlying stream, use the [`ExportProgress::stream`] method.
753///
754/// It also implements [`IntoFuture`], so you can await it to get the size of the
755/// exported blob.
756pub struct ExportProgress {
757    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
758}
759
760impl IntoFuture for ExportProgress {
761    type Output = RequestResult<u64>;
762
763    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
764
765    fn into_future(self) -> Self::IntoFuture {
766        Box::pin(self.finish())
767    }
768}
769
770impl ExportProgress {
771    fn new(
772        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
773    ) -> Self {
774        Self {
775            inner: Box::pin(fut),
776        }
777    }
778
779    pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
780        Gen::new(|co| async move {
781            let mut rx = match self.inner.await {
782                Ok(rx) => rx,
783                Err(e) => {
784                    co.yield_(ExportProgressItem::Error(e.into())).await;
785                    return;
786                }
787            };
788            while let Ok(Some(item)) = rx.recv().await {
789                co.yield_(item).await;
790            }
791        })
792    }
793
794    pub async fn finish(self) -> RequestResult<u64> {
795        let mut rx = self.inner.await?;
796        let mut size = None;
797        loop {
798            match rx.recv().await? {
799                Some(ExportProgressItem::Done) => break,
800                Some(ExportProgressItem::Size(s)) => size = Some(s),
801                Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
802                _ => {}
803            }
804        }
805        if let Some(size) = size {
806            Ok(size)
807        } else {
808            Err(super::Error::other("unexpected end of stream").into())
809        }
810    }
811}
812
813/// A handle for an ongoing bao import operation.
814pub struct ImportBaoHandle {
815    pub tx: mpsc::Sender<BaoContentItem>,
816    pub rx: oneshot::Receiver<super::Result<()>>,
817}
818
819impl ImportBaoHandle {
820    pub(crate) async fn new(
821        fut: impl Future<
822                Output = irpc::Result<(
823                    mpsc::Sender<BaoContentItem>,
824                    oneshot::Receiver<super::Result<()>>,
825                )>,
826            > + Send
827            + 'static,
828    ) -> irpc::Result<Self> {
829        let (tx, rx) = fut.await?;
830        Ok(Self { tx, rx })
831    }
832}
833
834/// A progress handle for a blobs list operation.
835pub struct BlobsListProgress {
836    inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
837}
838
839impl BlobsListProgress {
840    fn new(
841        fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
842    ) -> Self {
843        Self {
844            inner: Box::pin(fut),
845        }
846    }
847
848    pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
849        let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
850        let mut hashes = Vec::new();
851        while let Some(item) = rx.recv().await? {
852            hashes.push(item?);
853        }
854        Ok(hashes)
855    }
856
857    pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
858        let mut rx = self.inner.await?;
859        Ok(Gen::new(|co| async move {
860            while let Ok(Some(item)) = rx.recv().await {
861                co.yield_(item).await;
862            }
863        }))
864    }
865}
866
867/// A progress handle for a bao export operation.
868///
869/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
870/// is often inconvenient, so there are a number of higher level methods to
871/// process the stream.
872///
873/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
874pub struct ExportRangesProgress {
875    ranges: RangeSet2<u64>,
876    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
877}
878
879impl ExportRangesProgress {
880    fn new(
881        ranges: RangeSet2<u64>,
882        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
883    ) -> Self {
884        Self {
885            ranges,
886            inner: Box::pin(fut),
887        }
888    }
889}
890
891impl ExportRangesProgress {
892    /// A raw stream of [`ExportRangesItem`]s.
893    ///
894    /// Ranges will be rounded up to chunk boundaries. So if you request a
895    /// range of 0..100, you will get the entire first chunk, 0..1024.
896    ///
897    /// It is up to the caller to clip the ranges to the requested ranges.
898    pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
899        Gen::new(|co| async move {
900            let mut rx = match self.inner.await {
901                Ok(rx) => rx,
902                Err(e) => {
903                    co.yield_(ExportRangesItem::Error(e.into())).await;
904                    return;
905                }
906            };
907            while let Ok(Some(item)) = rx.recv().await {
908                co.yield_(item).await;
909            }
910        })
911    }
912
913    /// Concatenate all the data into a single `Bytes`.
914    pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
915        let mut rx = self.inner.await?;
916        let mut data = BTreeMap::new();
917        while let Some(item) = rx.recv().await? {
918            match item {
919                ExportRangesItem::Size(_) => {}
920                ExportRangesItem::Data(leaf) => {
921                    data.insert(leaf.offset, leaf.data);
922                }
923                ExportRangesItem::Error(cause) => return Err(cause.into()),
924            }
925        }
926        let mut res = Vec::new();
927        for range in self.ranges.iter() {
928            let (start, end) = match range {
929                RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
930                RangeSetRange::Range(range) => (*range.start, *range.end),
931            };
932            for (offset, data) in data.iter() {
933                let cstart = *offset;
934                let cend = *offset + (data.len() as u64);
935                if cstart >= end || cend <= start {
936                    continue;
937                }
938                let start = start.max(cstart);
939                let end = end.min(cend);
940                let data = &data[(start - cstart) as usize..(end - cstart) as usize];
941                res.extend_from_slice(data);
942            }
943        }
944        Ok(res)
945    }
946}
947
948/// A progress handle for a bao export operation.
949///
950/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
951/// is often inconvenient, so there are a number of higher level methods to
952/// process the stream.
953///
954/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
955pub struct ExportBaoProgress {
956    inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
957}
958
959impl ExportBaoProgress {
960    fn new(
961        fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
962    ) -> Self {
963        Self {
964            inner: Box::pin(fut),
965        }
966    }
967
968    /// Interprets this blob as a hash sequence and returns a stream of hashes.
969    ///
970    /// Errors will be reported, but the iterator will nevertheless continue.
971    /// If you get an error despite having asked for ranges that should be present,
972    /// this means that the data is corrupted. It can still make sense to continue
973    /// to get all non-corrupted sections.
974    pub fn hashes_with_index(
975        self,
976    ) -> impl Stream<Item = std::result::Result<(u64, Hash), AnyError>> {
977        let mut stream = self.stream();
978        Gen::new(|co| async move {
979            while let Some(item) = stream.next().await {
980                let leaf = match item {
981                    EncodedItem::Leaf(leaf) => leaf,
982                    EncodedItem::Error(e) => {
983                        co.yield_(Err(AnyError::from_std(e))).await;
984                        continue;
985                    }
986                    _ => continue,
987                };
988                let slice = match HashSeqChunk::try_from(leaf) {
989                    Ok(slice) => slice,
990                    Err(e) => {
991                        co.yield_(Err(e)).await;
992                        continue;
993                    }
994                };
995                let offset = slice.base();
996                for (o, hash) in slice.into_iter().enumerate() {
997                    co.yield_(Ok((offset + o as u64, hash))).await;
998                }
999            }
1000        })
1001    }
1002
1003    /// Same as [`Self::hashes_with_index`], but without the indexes.
1004    pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, AnyError>> {
1005        self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1006    }
1007
1008    pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1009        let mut data = Vec::new();
1010        let mut stream = self.into_byte_stream();
1011        while let Some(item) = stream.next().await {
1012            data.extend_from_slice(&item?);
1013        }
1014        Ok(data)
1015    }
1016
1017    pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1018        let mut rx = self.inner.await?;
1019        let mut data = Vec::new();
1020        while let Some(item) = rx.recv().await? {
1021            match item {
1022                EncodedItem::Leaf(leaf) => {
1023                    data.push(leaf.data);
1024                }
1025                EncodedItem::Parent(_) => {}
1026                EncodedItem::Size(_) => {}
1027                EncodedItem::Done => break,
1028                EncodedItem::Error(cause) => return Err(cause.into()),
1029            }
1030        }
1031        if data.len() == 1 {
1032            Ok(data.pop().unwrap())
1033        } else {
1034            let mut out = Vec::new();
1035            for item in data {
1036                out.extend_from_slice(&item);
1037            }
1038            Ok(out.into())
1039        }
1040    }
1041
1042    pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1043        let mut rx = self.inner.await?;
1044        let mut data = Vec::new();
1045        while let Some(item) = rx.recv().await? {
1046            match item {
1047                EncodedItem::Leaf(leaf) => {
1048                    data.extend_from_slice(&leaf.data);
1049                }
1050                EncodedItem::Parent(_) => {}
1051                EncodedItem::Size(_) => {}
1052                EncodedItem::Done => break,
1053                EncodedItem::Error(cause) => return Err(cause.into()),
1054            }
1055        }
1056        Ok(data)
1057    }
1058
1059    pub async fn write<W: AsyncStreamWriter>(self, target: &mut W) -> super::ExportBaoResult<()> {
1060        let mut rx = self.inner.await?;
1061        while let Some(item) = rx.recv().await? {
1062            match item {
1063                EncodedItem::Size(size) => {
1064                    target.write(&size.to_le_bytes()).await?;
1065                }
1066                EncodedItem::Parent(parent) => {
1067                    let mut data = vec![0u8; 64];
1068                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1069                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1070                    target.write(&data).await?;
1071                }
1072                EncodedItem::Leaf(leaf) => {
1073                    target.write_bytes(leaf.data).await?;
1074                }
1075                EncodedItem::Done => break,
1076                EncodedItem::Error(cause) => return Err(cause.into()),
1077            }
1078        }
1079        Ok(())
1080    }
1081
1082    /// Write quinn variant that also feeds a progress writer.
1083    pub(crate) async fn write_with_progress<W: crate::util::SendStream>(
1084        self,
1085        writer: &mut W,
1086        progress: &mut impl WriteProgress,
1087        hash: &Hash,
1088        index: u64,
1089    ) -> super::ExportBaoResult<()> {
1090        let mut rx = self.inner.await?;
1091        while let Some(item) = rx.recv().await? {
1092            match item {
1093                EncodedItem::Size(size) => {
1094                    progress.send_transfer_started(index, hash, size).await;
1095                    writer.send(&size.to_le_bytes()).await?;
1096                    progress.log_other_write(8);
1097                }
1098                EncodedItem::Parent(parent) => {
1099                    let mut data = [0u8; 64];
1100                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1101                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1102                    writer.send(&data).await?;
1103                    progress.log_other_write(64);
1104                }
1105                EncodedItem::Leaf(leaf) => {
1106                    let len = leaf.data.len();
1107                    writer.send_bytes(leaf.data).await?;
1108                    progress
1109                        .notify_payload_write(index, leaf.offset, len)
1110                        .await?;
1111                }
1112                EncodedItem::Done => break,
1113                EncodedItem::Error(cause) => return Err(cause.into()),
1114            }
1115        }
1116        Ok(())
1117    }
1118
1119    pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1120        self.stream().filter_map(|item| match item {
1121            EncodedItem::Size(size) => {
1122                let size = size.to_le_bytes().to_vec().into();
1123                Some(Ok(size))
1124            }
1125            EncodedItem::Parent(parent) => {
1126                let mut data = vec![0u8; 64];
1127                data[..32].copy_from_slice(parent.pair.0.as_bytes());
1128                data[32..].copy_from_slice(parent.pair.1.as_bytes());
1129                Some(Ok(data.into()))
1130            }
1131            EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1132            EncodedItem::Done => None,
1133            EncodedItem::Error(cause) => Some(Err(cause.into())),
1134        })
1135    }
1136
1137    pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1138        Gen::new(|co| async move {
1139            let mut rx = match self.inner.await {
1140                Ok(rx) => rx,
1141                Err(cause) => {
1142                    co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1143                        .await;
1144                    return;
1145                }
1146            };
1147            while let Ok(Some(item)) = rx.recv().await {
1148                co.yield_(item).await;
1149            }
1150        })
1151    }
1152}
1153
1154pub(crate) trait WriteProgress {
1155    /// Notify the progress writer that a payload write has happened.
1156    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) -> ClientResult;
1157
1158    /// Log a write of some other data.
1159    fn log_other_write(&mut self, len: usize);
1160
1161    /// Notify the progress writer that a transfer has started.
1162    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1163}