iroh_blobs/api/
blobs.rs

1//! API to interact with a local blob store
2//!
3//! This API is for local interactions with the blob store, such as importing
4//! and exporting blobs, observing the bitfield of a blob, and deleting blobs.
5//!
6//! The main entry point is the [`Blobs`] struct.
7use std::{
8    collections::BTreeMap,
9    future::{Future, IntoFuture},
10    io,
11    num::NonZeroU64,
12    path::{Path, PathBuf},
13    pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18    io::{
19        fsm::{ResponseDecoder, ResponseDecoderNext},
20        BaoContentItem, Leaf,
21    },
22    BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::AsyncStreamWriter;
27use irpc::channel::{mpsc, oneshot};
28use n0_error::AnyError;
29use n0_future::{future, stream, Stream, StreamExt};
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use serde::{Deserialize, Serialize};
33use tracing::trace;
34mod reader;
35pub use reader::BlobReader;
36
37// Public reexports from the proto module.
38//
39// Due to the fact that the proto module is hidden from docs by default,
40// these will appear in the docs as if they were declared here.
41pub use super::proto::{
42    AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
43    ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
44    ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
45    ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
46};
47use super::{
48    proto::{
49        BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
50        ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
51        ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
52    },
53    remote::HashSeqChunk,
54    tags::TagInfo,
55    ApiClient, RequestResult, Tags,
56};
57use crate::{
58    api::proto::{BatchRequest, ImportByteStreamUpdate},
59    provider::events::ClientResult,
60    store::IROH_BLOCK_SIZE,
61    util::{temp_tag::TempTag, RecvStreamAsyncStreamReader},
62    BlobFormat, Hash, HashAndFormat,
63};
64
65/// Options for adding bytes.
66#[derive(Debug)]
67pub struct AddBytesOptions {
68    pub data: Bytes,
69    pub format: BlobFormat,
70}
71
72impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
73    fn from(item: (T, BlobFormat)) -> Self {
74        let (data, format) = item;
75        Self {
76            data: data.into(),
77            format,
78        }
79    }
80}
81
82/// Blobs API
83#[derive(Debug, Clone, ref_cast::RefCast)]
84#[repr(transparent)]
85pub struct Blobs {
86    client: ApiClient,
87}
88
89impl Blobs {
90    pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
91        Self::ref_cast(sender)
92    }
93
94    pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
95        let msg = BatchRequest;
96        trace!("{msg:?}");
97        let (tx, rx) = self.client.client_streaming(msg, 32).await?;
98        let scope = rx.await?;
99
100        Ok(Batch {
101            scope,
102            blobs: self,
103            _tx: tx,
104        })
105    }
106
107    /// Create a reader for the given hash. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
108    /// and therefore can be used to read the blob's content.
109    ///
110    /// Any access to parts of the blob that are not present will result in an error.
111    ///
112    /// Example:
113    /// ```rust
114    /// use iroh_blobs::{store::mem::MemStore, api::blobs::Blobs};
115    /// use tokio::io::AsyncReadExt;
116    ///
117    /// # async fn example() -> n0_error::Result<()> {
118    /// let store = MemStore::new();
119    /// let tag = store.add_slice(b"Hello, world!").await?;
120    /// let mut reader = store.reader(tag.hash);
121    /// let mut buf = String::new();
122    /// reader.read_to_string(&mut buf).await?;
123    /// assert_eq!(buf, "Hello, world!");
124    /// # Ok(())
125    /// }
126    /// ```
127    pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
128        self.reader_with_opts(ReaderOptions { hash: hash.into() })
129    }
130
131    /// Create a reader for the given options. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
132    /// and therefore can be used to read the blob's content.
133    ///
134    /// Any access to parts of the blob that are not present will result in an error.
135    pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
136        BlobReader::new(self.clone(), options)
137    }
138
139    /// Delete a blob.
140    ///
141    /// This function is not public, because it does not work as expected when called manually,
142    /// because blobs are protected from deletion. This is only called from the gc task, which
143    /// clears the protections before.
144    ///
145    /// Users should rely only on garbage collection for blob deletion.
146    pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
147        trace!("{options:?}");
148        self.client.rpc(options).await??;
149        Ok(())
150    }
151
152    /// See [`Self::delete_with_opts`].
153    pub(crate) async fn delete(
154        &self,
155        hashes: impl IntoIterator<Item = impl Into<Hash>>,
156    ) -> RequestResult<()> {
157        self.delete_with_opts(DeleteOptions {
158            hashes: hashes.into_iter().map(Into::into).collect(),
159            force: false,
160        })
161        .await
162    }
163
164    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
165        let options = ImportBytesRequest {
166            data: Bytes::copy_from_slice(data.as_ref()),
167            format: crate::BlobFormat::Raw,
168            scope: Scope::GLOBAL,
169        };
170        self.add_bytes_impl(options)
171    }
172
173    pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
174        let options = ImportBytesRequest {
175            data: data.into(),
176            format: crate::BlobFormat::Raw,
177            scope: Scope::GLOBAL,
178        };
179        self.add_bytes_impl(options)
180    }
181
182    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
183        let options = options.into();
184        let request = ImportBytesRequest {
185            data: options.data,
186            format: options.format,
187            scope: Scope::GLOBAL,
188        };
189        self.add_bytes_impl(request)
190    }
191
192    fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
193        trace!("{options:?}");
194        let this = self.clone();
195        let stream = Gen::new(|co| async move {
196            let mut receiver = match this.client.server_streaming(options, 32).await {
197                Ok(receiver) => receiver,
198                Err(cause) => {
199                    co.yield_(AddProgressItem::Error(cause.into())).await;
200                    return;
201                }
202            };
203            loop {
204                match receiver.recv().await {
205                    Ok(Some(item)) => co.yield_(item).await,
206                    Err(cause) => {
207                        co.yield_(AddProgressItem::Error(cause.into())).await;
208                        break;
209                    }
210                    Ok(None) => break,
211                }
212            }
213        });
214        AddProgress::new(self, stream)
215    }
216
217    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
218        let options = options.into();
219        self.add_path_with_opts_impl(ImportPathRequest {
220            path: options.path,
221            mode: options.mode,
222            format: options.format,
223            scope: Scope::GLOBAL,
224        })
225    }
226
227    fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
228        trace!("{:?}", options);
229        let client = self.client.clone();
230        let stream = Gen::new(|co| async move {
231            let mut receiver = match client.server_streaming(options, 32).await {
232                Ok(receiver) => receiver,
233                Err(cause) => {
234                    co.yield_(AddProgressItem::Error(cause.into())).await;
235                    return;
236                }
237            };
238            loop {
239                match receiver.recv().await {
240                    Ok(Some(item)) => co.yield_(item).await,
241                    Err(cause) => {
242                        co.yield_(AddProgressItem::Error(cause.into())).await;
243                        break;
244                    }
245                    Ok(None) => break,
246                }
247            }
248        });
249        AddProgress::new(self, stream)
250    }
251
252    pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
253        self.add_path_with_opts(AddPathOptions {
254            path: path.as_ref().to_owned(),
255            mode: ImportMode::Copy,
256            format: BlobFormat::Raw,
257        })
258    }
259
260    pub async fn add_stream(
261        &self,
262        data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
263    ) -> AddProgress<'_> {
264        let inner = ImportByteStreamRequest {
265            format: crate::BlobFormat::Raw,
266            scope: Scope::default(),
267        };
268        let client = self.client.clone();
269        let stream = Gen::new(|co| async move {
270            let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
271                Ok(x) => x,
272                Err(cause) => {
273                    co.yield_(AddProgressItem::Error(cause.into())).await;
274                    return;
275                }
276            };
277            let recv = async {
278                loop {
279                    match receiver.recv().await {
280                        Ok(Some(item)) => co.yield_(item).await,
281                        Err(cause) => {
282                            co.yield_(AddProgressItem::Error(cause.into())).await;
283                            break;
284                        }
285                        Ok(None) => break,
286                    }
287                }
288            };
289            let send = async {
290                tokio::pin!(data);
291                while let Some(item) = data.next().await {
292                    sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
293                }
294                sender.send(ImportByteStreamUpdate::Done).await?;
295                n0_error::Ok(())
296            };
297            let _ = tokio::join!(send, recv);
298        });
299        AddProgress::new(self, stream)
300    }
301
302    pub fn export_ranges(
303        &self,
304        hash: impl Into<Hash>,
305        ranges: impl Into<RangeSet2<u64>>,
306    ) -> ExportRangesProgress {
307        self.export_ranges_with_opts(ExportRangesOptions {
308            hash: hash.into(),
309            ranges: ranges.into(),
310        })
311    }
312
313    pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
314        trace!("{options:?}");
315        ExportRangesProgress::new(
316            options.ranges.clone(),
317            self.client.server_streaming(options, 32),
318        )
319    }
320
321    pub fn export_bao_with_opts(
322        &self,
323        options: ExportBaoOptions,
324        local_update_cap: usize,
325    ) -> ExportBaoProgress {
326        trace!("{options:?}");
327        ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
328    }
329
330    pub fn export_bao(
331        &self,
332        hash: impl Into<Hash>,
333        ranges: impl Into<ChunkRanges>,
334    ) -> ExportBaoProgress {
335        self.export_bao_with_opts(
336            ExportBaoRequest {
337                hash: hash.into(),
338                ranges: ranges.into(),
339            },
340            32,
341        )
342    }
343
344    /// Export a single chunk from the given hash, at the given offset.
345    pub async fn export_chunk(
346        &self,
347        hash: impl Into<Hash>,
348        offset: u64,
349    ) -> super::ExportBaoResult<Leaf> {
350        let base = ChunkNum::full_chunks(offset);
351        let ranges = ChunkRanges::from(base..base + 1);
352        let mut stream = self.export_bao(hash, ranges).stream();
353        while let Some(item) = stream.next().await {
354            match item {
355                EncodedItem::Leaf(leaf) => return Ok(leaf),
356                EncodedItem::Parent(_) => {}
357                EncodedItem::Size(_) => {}
358                EncodedItem::Done => break,
359                EncodedItem::Error(cause) => return Err(cause.into()),
360            }
361        }
362        Err(io::Error::other("unexpected end of stream").into())
363    }
364
365    /// Get the entire blob into a Bytes
366    ///
367    /// This will run out of memory when called for very large blobs, so be careful!
368    pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
369        self.export_bao(hash.into(), ChunkRanges::all())
370            .data_to_bytes()
371            .await
372    }
373
374    /// Observe the bitfield of the given hash.
375    pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
376        self.observe_with_opts(ObserveOptions { hash: hash.into() })
377    }
378
379    pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
380        trace!("{:?}", options);
381        if options.hash == Hash::EMPTY {
382            return ObserveProgress::new(async move {
383                let (tx, rx) = mpsc::channel(1);
384                tx.send(Bitfield::complete(0)).await.ok();
385                Ok(rx)
386            });
387        }
388        ObserveProgress::new(self.client.server_streaming(options, 32))
389    }
390
391    pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
392        trace!("{:?}", options);
393        ExportProgress::new(self.client.server_streaming(options, 32))
394    }
395
396    pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
397        let options = ExportOptions {
398            hash: hash.into(),
399            mode: ExportMode::Copy,
400            target: target.as_ref().to_owned(),
401        };
402        self.export_with_opts(options)
403    }
404
405    /// Import BaoContentItems from a stream.
406    ///
407    /// The store assumes that these are already verified and in the correct order.
408    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
409    pub async fn import_bao(
410        &self,
411        hash: impl Into<Hash>,
412        size: NonZeroU64,
413        local_update_cap: usize,
414    ) -> irpc::Result<ImportBaoHandle> {
415        let options = ImportBaoRequest {
416            hash: hash.into(),
417            size,
418        };
419        self.import_bao_with_opts(options, local_update_cap).await
420    }
421
422    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
423    pub async fn import_bao_with_opts(
424        &self,
425        options: ImportBaoOptions,
426        local_update_cap: usize,
427    ) -> irpc::Result<ImportBaoHandle> {
428        trace!("{:?}", options);
429        ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
430    }
431
432    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
433    pub async fn import_bao_reader<R: crate::util::RecvStream>(
434        &self,
435        hash: Hash,
436        ranges: ChunkRanges,
437        mut reader: R,
438    ) -> RequestResult<R> {
439        let mut size = [0; 8];
440        reader.recv_exact(&mut size).await?;
441        let size = u64::from_le_bytes(size);
442        let Some(size) = NonZeroU64::new(size) else {
443            return if hash == Hash::EMPTY {
444                Ok(reader)
445            } else {
446                Err(io::Error::other("invalid size for hash").into())
447            };
448        };
449        let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
450        let mut decoder = ResponseDecoder::new(
451            hash.into(),
452            ranges,
453            tree,
454            RecvStreamAsyncStreamReader::new(reader),
455        );
456        let options = ImportBaoOptions { hash, size };
457        let handle = self.import_bao_with_opts(options, 32).await?;
458        let driver = async move {
459            let reader = loop {
460                match decoder.next().await {
461                    ResponseDecoderNext::More((rest, item)) => {
462                        handle.tx.send(item?).await?;
463                        decoder = rest;
464                    }
465                    ResponseDecoderNext::Done(reader) => break reader,
466                };
467            };
468            drop(handle.tx);
469            io::Result::Ok(reader)
470        };
471        let fut = async move { handle.rx.await.map_err(io::Error::other)? };
472        let (reader, res) = tokio::join!(driver, fut);
473        res?;
474        Ok(reader?.into_inner())
475    }
476
477    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
478    pub async fn import_bao_bytes(
479        &self,
480        hash: Hash,
481        ranges: ChunkRanges,
482        data: impl Into<Bytes>,
483    ) -> RequestResult<()> {
484        self.import_bao_reader(hash, ranges, data.into()).await?;
485        Ok(())
486    }
487
488    pub fn list(&self) -> BlobsListProgress {
489        let msg = ListRequest;
490        let client = self.client.clone();
491        BlobsListProgress::new(client.server_streaming(msg, 32))
492    }
493
494    pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
495        let hash = hash.into();
496        let msg = BlobStatusRequest { hash };
497        self.client.rpc(msg).await
498    }
499
500    pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
501        match self.status(hash).await? {
502            BlobStatus::Complete { .. } => Ok(true),
503            _ => Ok(false),
504        }
505    }
506
507    #[allow(dead_code)]
508    pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
509        let msg = ClearProtectedRequest;
510        self.client.rpc(msg).await??;
511        Ok(())
512    }
513}
514
515/// A progress handle for a batch scoped add operation.
516pub struct BatchAddProgress<'a>(AddProgress<'a>);
517
518impl<'a> IntoFuture for BatchAddProgress<'a> {
519    type Output = RequestResult<TempTag>;
520
521    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
522
523    fn into_future(self) -> Self::IntoFuture {
524        Box::pin(self.temp_tag())
525    }
526}
527
528impl<'a> BatchAddProgress<'a> {
529    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
530        self.0.with_named_tag(name).await
531    }
532
533    pub async fn with_tag(self) -> RequestResult<TagInfo> {
534        self.0.with_tag().await
535    }
536
537    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
538        self.0.stream().await
539    }
540
541    pub async fn temp_tag(self) -> RequestResult<TempTag> {
542        self.0.temp_tag().await
543    }
544}
545
546/// A batch of operations that modify the blob store.
547pub struct Batch<'a> {
548    scope: Scope,
549    blobs: &'a Blobs,
550    _tx: mpsc::Sender<BatchResponse>,
551}
552
553impl<'a> Batch<'a> {
554    pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
555        let options = ImportBytesRequest {
556            data: data.into(),
557            format: crate::BlobFormat::Raw,
558            scope: self.scope,
559        };
560        BatchAddProgress(self.blobs.add_bytes_impl(options))
561    }
562
563    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
564        let options = options.into();
565        BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
566            data: options.data,
567            format: options.format,
568            scope: self.scope,
569        }))
570    }
571
572    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
573        let options = ImportBytesRequest {
574            data: Bytes::copy_from_slice(data.as_ref()),
575            format: crate::BlobFormat::Raw,
576            scope: self.scope,
577        };
578        BatchAddProgress(self.blobs.add_bytes_impl(options))
579    }
580
581    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
582        let options = options.into();
583        BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
584            path: options.path,
585            mode: options.mode,
586            format: options.format,
587            scope: self.scope,
588        }))
589    }
590
591    pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
592        let value = value.into();
593        let msg = CreateTempTagRequest {
594            scope: self.scope,
595            value,
596        };
597        self.blobs.client.rpc(msg).await
598    }
599}
600
601/// Options for adding data from a file system path.
602#[derive(Debug)]
603pub struct AddPathOptions {
604    pub path: PathBuf,
605    pub format: BlobFormat,
606    pub mode: ImportMode,
607}
608
609/// A progress handle for an import operation.
610///
611/// Internally this is a stream of [`AddProgressItem`] items. Working with this
612/// stream directly can be inconvenient, so this struct provides some convenience
613/// methods to work with the result.
614///
615/// It also implements [`IntoFuture`], so you can await it to get the [`TagInfo`] that
616/// contains the hash of the added content and also protects the content.
617///
618/// If you want access to the stream, you can use the [`AddProgress::stream`] method.
619pub struct AddProgress<'a> {
620    blobs: &'a Blobs,
621    inner: stream::Boxed<AddProgressItem>,
622}
623
624impl<'a> IntoFuture for AddProgress<'a> {
625    type Output = RequestResult<TagInfo>;
626
627    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
628
629    fn into_future(self) -> Self::IntoFuture {
630        Box::pin(self.with_tag())
631    }
632}
633
634impl<'a> AddProgress<'a> {
635    fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
636        Self {
637            blobs,
638            inner: Box::pin(stream),
639        }
640    }
641
642    pub async fn temp_tag(self) -> RequestResult<TempTag> {
643        let mut stream = self.inner;
644        while let Some(item) = stream.next().await {
645            match item {
646                AddProgressItem::Done(tt) => return Ok(tt),
647                AddProgressItem::Error(e) => return Err(e.into()),
648                _ => {}
649            }
650        }
651        Err(io::Error::other("unexpected end of stream").into())
652    }
653
654    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
655        let blobs = self.blobs.clone();
656        let tt = self.temp_tag().await?;
657        let haf = tt.hash_and_format();
658        let tags = Tags::ref_from_sender(&blobs.client);
659        tags.set(name, haf).await?;
660        drop(tt);
661        Ok(haf)
662    }
663
664    pub async fn with_tag(self) -> RequestResult<TagInfo> {
665        let blobs = self.blobs.clone();
666        let tt = self.temp_tag().await?;
667        let hash = tt.hash();
668        let format = tt.format();
669        let tags = Tags::ref_from_sender(&blobs.client);
670        let name = tags.create(tt.hash_and_format()).await?;
671        drop(tt);
672        Ok(TagInfo { name, hash, format })
673    }
674
675    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
676        self.inner
677    }
678}
679
680/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
681#[derive(Debug, Clone, Serialize, Deserialize)]
682pub struct ReaderOptions {
683    pub hash: Hash,
684}
685
686/// An observe result. Awaiting this will return the current state.
687///
688/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
689/// the first item is the current state and subsequent items are updates.
690pub struct ObserveProgress {
691    inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
692}
693
694impl IntoFuture for ObserveProgress {
695    type Output = RequestResult<Bitfield>;
696
697    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
698
699    fn into_future(self) -> Self::IntoFuture {
700        Box::pin(async move {
701            let mut rx = self.inner.await?;
702            match rx.recv().await? {
703                Some(bitfield) => Ok(bitfield),
704                None => Err(io::Error::other("unexpected end of stream").into()),
705            }
706        })
707    }
708}
709
710impl ObserveProgress {
711    fn new(
712        fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
713    ) -> Self {
714        Self {
715            inner: Box::pin(fut),
716        }
717    }
718
719    pub async fn await_completion(self) -> RequestResult<Bitfield> {
720        let mut stream = self.stream().await?;
721        while let Some(item) = stream.next().await {
722            if item.is_complete() {
723                return Ok(item);
724            }
725        }
726        Err(io::Error::other("unexpected end of stream").into())
727    }
728
729    /// Returns an infinite stream of bitfields. The first bitfield is the
730    /// current state, and the following bitfields are updates.
731    ///
732    /// Once a blob is complete, there will be no more updates.
733    pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
734        let mut rx = self.inner.await?;
735        Ok(Gen::new(|co| async move {
736            while let Ok(Some(item)) = rx.recv().await {
737                co.yield_(item).await;
738            }
739        }))
740    }
741}
742
743/// A progress handle for an export operation.
744///
745/// Internally this is a stream of [`ExportProgress`] items. Working with this
746/// stream directly can be inconvenient, so this struct provides some convenience
747/// methods to work with the result.
748///
749/// To get the underlying stream, use the [`ExportProgress::stream`] method.
750///
751/// It also implements [`IntoFuture`], so you can await it to get the size of the
752/// exported blob.
753pub struct ExportProgress {
754    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
755}
756
757impl IntoFuture for ExportProgress {
758    type Output = RequestResult<u64>;
759
760    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
761
762    fn into_future(self) -> Self::IntoFuture {
763        Box::pin(self.finish())
764    }
765}
766
767impl ExportProgress {
768    fn new(
769        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
770    ) -> Self {
771        Self {
772            inner: Box::pin(fut),
773        }
774    }
775
776    pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
777        Gen::new(|co| async move {
778            let mut rx = match self.inner.await {
779                Ok(rx) => rx,
780                Err(e) => {
781                    co.yield_(ExportProgressItem::Error(e.into())).await;
782                    return;
783                }
784            };
785            while let Ok(Some(item)) = rx.recv().await {
786                co.yield_(item).await;
787            }
788        })
789    }
790
791    pub async fn finish(self) -> RequestResult<u64> {
792        let mut rx = self.inner.await?;
793        let mut size = None;
794        loop {
795            match rx.recv().await? {
796                Some(ExportProgressItem::Done) => break,
797                Some(ExportProgressItem::Size(s)) => size = Some(s),
798                Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
799                _ => {}
800            }
801        }
802        if let Some(size) = size {
803            Ok(size)
804        } else {
805            Err(io::Error::other("unexpected end of stream").into())
806        }
807    }
808}
809
810/// A handle for an ongoing bao import operation.
811pub struct ImportBaoHandle {
812    pub tx: mpsc::Sender<BaoContentItem>,
813    pub rx: oneshot::Receiver<super::Result<()>>,
814}
815
816impl ImportBaoHandle {
817    pub(crate) async fn new(
818        fut: impl Future<
819                Output = irpc::Result<(
820                    mpsc::Sender<BaoContentItem>,
821                    oneshot::Receiver<super::Result<()>>,
822                )>,
823            > + Send
824            + 'static,
825    ) -> irpc::Result<Self> {
826        let (tx, rx) = fut.await?;
827        Ok(Self { tx, rx })
828    }
829}
830
831/// A progress handle for a blobs list operation.
832pub struct BlobsListProgress {
833    inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
834}
835
836impl BlobsListProgress {
837    fn new(
838        fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
839    ) -> Self {
840        Self {
841            inner: Box::pin(fut),
842        }
843    }
844
845    pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
846        let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
847        let mut hashes = Vec::new();
848        while let Some(item) = rx.recv().await? {
849            hashes.push(item?);
850        }
851        Ok(hashes)
852    }
853
854    pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
855        let mut rx = self.inner.await?;
856        Ok(Gen::new(|co| async move {
857            while let Ok(Some(item)) = rx.recv().await {
858                co.yield_(item).await;
859            }
860        }))
861    }
862}
863
864/// A progress handle for a bao export operation.
865///
866/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
867/// is often inconvenient, so there are a number of higher level methods to
868/// process the stream.
869///
870/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
871pub struct ExportRangesProgress {
872    ranges: RangeSet2<u64>,
873    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
874}
875
876impl ExportRangesProgress {
877    fn new(
878        ranges: RangeSet2<u64>,
879        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
880    ) -> Self {
881        Self {
882            ranges,
883            inner: Box::pin(fut),
884        }
885    }
886}
887
888impl ExportRangesProgress {
889    /// A raw stream of [`ExportRangesItem`]s.
890    ///
891    /// Ranges will be rounded up to chunk boundaries. So if you request a
892    /// range of 0..100, you will get the entire first chunk, 0..1024.
893    ///
894    /// It is up to the caller to clip the ranges to the requested ranges.
895    pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
896        Gen::new(|co| async move {
897            let mut rx = match self.inner.await {
898                Ok(rx) => rx,
899                Err(e) => {
900                    co.yield_(ExportRangesItem::Error(e.into())).await;
901                    return;
902                }
903            };
904            while let Ok(Some(item)) = rx.recv().await {
905                co.yield_(item).await;
906            }
907        })
908    }
909
910    /// Concatenate all the data into a single `Bytes`.
911    pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
912        let mut rx = self.inner.await?;
913        let mut data = BTreeMap::new();
914        while let Some(item) = rx.recv().await? {
915            match item {
916                ExportRangesItem::Size(_) => {}
917                ExportRangesItem::Data(leaf) => {
918                    data.insert(leaf.offset, leaf.data);
919                }
920                ExportRangesItem::Error(cause) => return Err(cause.into()),
921            }
922        }
923        let mut res = Vec::new();
924        for range in self.ranges.iter() {
925            let (start, end) = match range {
926                RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
927                RangeSetRange::Range(range) => (*range.start, *range.end),
928            };
929            for (offset, data) in data.iter() {
930                let cstart = *offset;
931                let cend = *offset + (data.len() as u64);
932                if cstart >= end || cend <= start {
933                    continue;
934                }
935                let start = start.max(cstart);
936                let end = end.min(cend);
937                let data = &data[(start - cstart) as usize..(end - cstart) as usize];
938                res.extend_from_slice(data);
939            }
940        }
941        Ok(res)
942    }
943}
944
945/// A progress handle for a bao export operation.
946///
947/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
948/// is often inconvenient, so there are a number of higher level methods to
949/// process the stream.
950///
951/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
952pub struct ExportBaoProgress {
953    inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
954}
955
956impl ExportBaoProgress {
957    fn new(
958        fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
959    ) -> Self {
960        Self {
961            inner: Box::pin(fut),
962        }
963    }
964
965    /// Interprets this blob as a hash sequence and returns a stream of hashes.
966    ///
967    /// Errors will be reported, but the iterator will nevertheless continue.
968    /// If you get an error despite having asked for ranges that should be present,
969    /// this means that the data is corrupted. It can still make sense to continue
970    /// to get all non-corrupted sections.
971    pub fn hashes_with_index(
972        self,
973    ) -> impl Stream<Item = std::result::Result<(u64, Hash), AnyError>> {
974        let mut stream = self.stream();
975        Gen::new(|co| async move {
976            while let Some(item) = stream.next().await {
977                let leaf = match item {
978                    EncodedItem::Leaf(leaf) => leaf,
979                    EncodedItem::Error(e) => {
980                        co.yield_(Err(AnyError::from_std(e))).await;
981                        continue;
982                    }
983                    _ => continue,
984                };
985                let slice = match HashSeqChunk::try_from(leaf) {
986                    Ok(slice) => slice,
987                    Err(e) => {
988                        co.yield_(Err(e)).await;
989                        continue;
990                    }
991                };
992                let offset = slice.base();
993                for (o, hash) in slice.into_iter().enumerate() {
994                    co.yield_(Ok((offset + o as u64, hash))).await;
995                }
996            }
997        })
998    }
999
1000    /// Same as [`Self::hashes_with_index`], but without the indexes.
1001    pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, AnyError>> {
1002        self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1003    }
1004
1005    pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1006        let mut data = Vec::new();
1007        let mut stream = self.into_byte_stream();
1008        while let Some(item) = stream.next().await {
1009            data.extend_from_slice(&item?);
1010        }
1011        Ok(data)
1012    }
1013
1014    pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1015        let mut rx = self.inner.await?;
1016        let mut data = Vec::new();
1017        while let Some(item) = rx.recv().await? {
1018            match item {
1019                EncodedItem::Leaf(leaf) => {
1020                    data.push(leaf.data);
1021                }
1022                EncodedItem::Parent(_) => {}
1023                EncodedItem::Size(_) => {}
1024                EncodedItem::Done => break,
1025                EncodedItem::Error(cause) => return Err(cause.into()),
1026            }
1027        }
1028        if data.len() == 1 {
1029            Ok(data.pop().unwrap())
1030        } else {
1031            let mut out = Vec::new();
1032            for item in data {
1033                out.extend_from_slice(&item);
1034            }
1035            Ok(out.into())
1036        }
1037    }
1038
1039    pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1040        let mut rx = self.inner.await?;
1041        let mut data = Vec::new();
1042        while let Some(item) = rx.recv().await? {
1043            match item {
1044                EncodedItem::Leaf(leaf) => {
1045                    data.extend_from_slice(&leaf.data);
1046                }
1047                EncodedItem::Parent(_) => {}
1048                EncodedItem::Size(_) => {}
1049                EncodedItem::Done => break,
1050                EncodedItem::Error(cause) => return Err(cause.into()),
1051            }
1052        }
1053        Ok(data)
1054    }
1055
1056    pub async fn write<W: AsyncStreamWriter>(self, target: &mut W) -> super::ExportBaoResult<()> {
1057        let mut rx = self.inner.await?;
1058        while let Some(item) = rx.recv().await? {
1059            match item {
1060                EncodedItem::Size(size) => {
1061                    target.write(&size.to_le_bytes()).await?;
1062                }
1063                EncodedItem::Parent(parent) => {
1064                    let mut data = vec![0u8; 64];
1065                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1066                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1067                    target.write(&data).await?;
1068                }
1069                EncodedItem::Leaf(leaf) => {
1070                    target.write_bytes(leaf.data).await?;
1071                }
1072                EncodedItem::Done => break,
1073                EncodedItem::Error(cause) => return Err(cause.into()),
1074            }
1075        }
1076        Ok(())
1077    }
1078
1079    /// Write quinn variant that also feeds a progress writer.
1080    pub(crate) async fn write_with_progress<W: crate::util::SendStream>(
1081        self,
1082        writer: &mut W,
1083        progress: &mut impl WriteProgress,
1084        hash: &Hash,
1085        index: u64,
1086    ) -> super::ExportBaoResult<()> {
1087        let mut rx = self.inner.await?;
1088        while let Some(item) = rx.recv().await? {
1089            match item {
1090                EncodedItem::Size(size) => {
1091                    progress.send_transfer_started(index, hash, size).await;
1092                    writer.send(&size.to_le_bytes()).await?;
1093                    progress.log_other_write(8);
1094                }
1095                EncodedItem::Parent(parent) => {
1096                    let mut data = [0u8; 64];
1097                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1098                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1099                    writer.send(&data).await?;
1100                    progress.log_other_write(64);
1101                }
1102                EncodedItem::Leaf(leaf) => {
1103                    let len = leaf.data.len();
1104                    writer.send_bytes(leaf.data).await?;
1105                    progress
1106                        .notify_payload_write(index, leaf.offset, len)
1107                        .await?;
1108                }
1109                EncodedItem::Done => break,
1110                EncodedItem::Error(cause) => return Err(cause.into()),
1111            }
1112        }
1113        Ok(())
1114    }
1115
1116    pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1117        self.stream().filter_map(|item| match item {
1118            EncodedItem::Size(size) => {
1119                let size = size.to_le_bytes().to_vec().into();
1120                Some(Ok(size))
1121            }
1122            EncodedItem::Parent(parent) => {
1123                let mut data = vec![0u8; 64];
1124                data[..32].copy_from_slice(parent.pair.0.as_bytes());
1125                data[32..].copy_from_slice(parent.pair.1.as_bytes());
1126                Some(Ok(data.into()))
1127            }
1128            EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1129            EncodedItem::Done => None,
1130            EncodedItem::Error(cause) => Some(Err(cause.into())),
1131        })
1132    }
1133
1134    pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1135        Gen::new(|co| async move {
1136            let mut rx = match self.inner.await {
1137                Ok(rx) => rx,
1138                Err(cause) => {
1139                    co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1140                        .await;
1141                    return;
1142                }
1143            };
1144            while let Ok(Some(item)) = rx.recv().await {
1145                co.yield_(item).await;
1146            }
1147        })
1148    }
1149}
1150
1151pub(crate) trait WriteProgress {
1152    /// Notify the progress writer that a payload write has happened.
1153    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) -> ClientResult;
1154
1155    /// Log a write of some other data.
1156    fn log_other_write(&mut self, len: usize);
1157
1158    /// Notify the progress writer that a transfer has started.
1159    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1160}