1use std::{
8 collections::BTreeMap,
9 future::{Future, IntoFuture},
10 io,
11 num::NonZeroU64,
12 path::{Path, PathBuf},
13 pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18 io::{
19 fsm::{ResponseDecoder, ResponseDecoderNext},
20 BaoContentItem, Leaf,
21 },
22 BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::AsyncStreamWriter;
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use range_collections::{range_set::RangeSetRange, RangeSet2};
30use ref_cast::RefCast;
31use serde::{Deserialize, Serialize};
32use tracing::trace;
33mod reader;
34pub use reader::BlobReader;
35
36pub use super::proto::{
41 AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
42 ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
43 ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
44 ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
45};
46use super::{
47 proto::{
48 BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
49 ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
50 ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
51 },
52 remote::HashSeqChunk,
53 tags::TagInfo,
54 ApiClient, RequestResult, Tags,
55};
56use crate::{
57 api::proto::{BatchRequest, ImportByteStreamUpdate},
58 provider::events::ClientResult,
59 store::IROH_BLOCK_SIZE,
60 util::{temp_tag::TempTag, RecvStreamAsyncStreamReader},
61 BlobFormat, Hash, HashAndFormat,
62};
63
64#[derive(Debug)]
66pub struct AddBytesOptions {
67 pub data: Bytes,
68 pub format: BlobFormat,
69}
70
71impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
72 fn from(item: (T, BlobFormat)) -> Self {
73 let (data, format) = item;
74 Self {
75 data: data.into(),
76 format,
77 }
78 }
79}
80
81#[derive(Debug, Clone, ref_cast::RefCast)]
83#[repr(transparent)]
84pub struct Blobs {
85 client: ApiClient,
86}
87
88impl Blobs {
89 pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
90 Self::ref_cast(sender)
91 }
92
93 pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
94 let msg = BatchRequest;
95 trace!("{msg:?}");
96 let (tx, rx) = self.client.client_streaming(msg, 32).await?;
97 let scope = rx.await?;
98
99 Ok(Batch {
100 scope,
101 blobs: self,
102 _tx: tx,
103 })
104 }
105
106 pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
127 self.reader_with_opts(ReaderOptions { hash: hash.into() })
128 }
129
130 pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
135 BlobReader::new(self.clone(), options)
136 }
137
138 #[cfg(feature = "fs-store")]
146 pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
147 trace!("{options:?}");
148 self.client.rpc(options).await??;
149 Ok(())
150 }
151
152 #[cfg(feature = "fs-store")]
154 pub(crate) async fn delete(
155 &self,
156 hashes: impl IntoIterator<Item = impl Into<Hash>>,
157 ) -> RequestResult<()> {
158 self.delete_with_opts(DeleteOptions {
159 hashes: hashes.into_iter().map(Into::into).collect(),
160 force: false,
161 })
162 .await
163 }
164
165 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
166 let options = ImportBytesRequest {
167 data: Bytes::copy_from_slice(data.as_ref()),
168 format: crate::BlobFormat::Raw,
169 scope: Scope::GLOBAL,
170 };
171 self.add_bytes_impl(options)
172 }
173
174 pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
175 let options = ImportBytesRequest {
176 data: data.into(),
177 format: crate::BlobFormat::Raw,
178 scope: Scope::GLOBAL,
179 };
180 self.add_bytes_impl(options)
181 }
182
183 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
184 let options = options.into();
185 let request = ImportBytesRequest {
186 data: options.data,
187 format: options.format,
188 scope: Scope::GLOBAL,
189 };
190 self.add_bytes_impl(request)
191 }
192
193 fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
194 trace!("{options:?}");
195 let this = self.clone();
196 let stream = Gen::new(|co| async move {
197 let mut receiver = match this.client.server_streaming(options, 32).await {
198 Ok(receiver) => receiver,
199 Err(cause) => {
200 co.yield_(AddProgressItem::Error(cause.into())).await;
201 return;
202 }
203 };
204 loop {
205 match receiver.recv().await {
206 Ok(Some(item)) => co.yield_(item).await,
207 Err(cause) => {
208 co.yield_(AddProgressItem::Error(cause.into())).await;
209 break;
210 }
211 Ok(None) => break,
212 }
213 }
214 });
215 AddProgress::new(self, stream)
216 }
217
218 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
219 let options = options.into();
220 self.add_path_with_opts_impl(ImportPathRequest {
221 path: options.path,
222 mode: options.mode,
223 format: options.format,
224 scope: Scope::GLOBAL,
225 })
226 }
227
228 fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
229 trace!("{:?}", options);
230 let client = self.client.clone();
231 let stream = Gen::new(|co| async move {
232 let mut receiver = match client.server_streaming(options, 32).await {
233 Ok(receiver) => receiver,
234 Err(cause) => {
235 co.yield_(AddProgressItem::Error(cause.into())).await;
236 return;
237 }
238 };
239 loop {
240 match receiver.recv().await {
241 Ok(Some(item)) => co.yield_(item).await,
242 Err(cause) => {
243 co.yield_(AddProgressItem::Error(cause.into())).await;
244 break;
245 }
246 Ok(None) => break,
247 }
248 }
249 });
250 AddProgress::new(self, stream)
251 }
252
253 pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
254 self.add_path_with_opts(AddPathOptions {
255 path: path.as_ref().to_owned(),
256 mode: ImportMode::Copy,
257 format: BlobFormat::Raw,
258 })
259 }
260
261 pub async fn add_stream(
262 &self,
263 data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
264 ) -> AddProgress<'_> {
265 let inner = ImportByteStreamRequest {
266 format: crate::BlobFormat::Raw,
267 scope: Scope::default(),
268 };
269 let client = self.client.clone();
270 let stream = Gen::new(|co| async move {
271 let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
272 Ok(x) => x,
273 Err(cause) => {
274 co.yield_(AddProgressItem::Error(cause.into())).await;
275 return;
276 }
277 };
278 let recv = async {
279 loop {
280 match receiver.recv().await {
281 Ok(Some(item)) => co.yield_(item).await,
282 Err(cause) => {
283 co.yield_(AddProgressItem::Error(cause.into())).await;
284 break;
285 }
286 Ok(None) => break,
287 }
288 }
289 };
290 let send = async {
291 tokio::pin!(data);
292 while let Some(item) = data.next().await {
293 sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
294 }
295 sender.send(ImportByteStreamUpdate::Done).await?;
296 anyhow::Ok(())
297 };
298 let _ = tokio::join!(send, recv);
299 });
300 AddProgress::new(self, stream)
301 }
302
303 pub fn export_ranges(
304 &self,
305 hash: impl Into<Hash>,
306 ranges: impl Into<RangeSet2<u64>>,
307 ) -> ExportRangesProgress {
308 self.export_ranges_with_opts(ExportRangesOptions {
309 hash: hash.into(),
310 ranges: ranges.into(),
311 })
312 }
313
314 pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
315 trace!("{options:?}");
316 ExportRangesProgress::new(
317 options.ranges.clone(),
318 self.client.server_streaming(options, 32),
319 )
320 }
321
322 pub fn export_bao_with_opts(
323 &self,
324 options: ExportBaoOptions,
325 local_update_cap: usize,
326 ) -> ExportBaoProgress {
327 trace!("{options:?}");
328 ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
329 }
330
331 pub fn export_bao(
332 &self,
333 hash: impl Into<Hash>,
334 ranges: impl Into<ChunkRanges>,
335 ) -> ExportBaoProgress {
336 self.export_bao_with_opts(
337 ExportBaoRequest {
338 hash: hash.into(),
339 ranges: ranges.into(),
340 },
341 32,
342 )
343 }
344
345 pub async fn export_chunk(
347 &self,
348 hash: impl Into<Hash>,
349 offset: u64,
350 ) -> super::ExportBaoResult<Leaf> {
351 let base = ChunkNum::full_chunks(offset);
352 let ranges = ChunkRanges::from(base..base + 1);
353 let mut stream = self.export_bao(hash, ranges).stream();
354 while let Some(item) = stream.next().await {
355 match item {
356 EncodedItem::Leaf(leaf) => return Ok(leaf),
357 EncodedItem::Parent(_) => {}
358 EncodedItem::Size(_) => {}
359 EncodedItem::Done => break,
360 EncodedItem::Error(cause) => return Err(cause.into()),
361 }
362 }
363 Err(io::Error::other("unexpected end of stream").into())
364 }
365
366 pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
370 self.export_bao(hash.into(), ChunkRanges::all())
371 .data_to_bytes()
372 .await
373 }
374
375 pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
377 self.observe_with_opts(ObserveOptions { hash: hash.into() })
378 }
379
380 pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
381 trace!("{:?}", options);
382 if options.hash == Hash::EMPTY {
383 return ObserveProgress::new(async move {
384 let (tx, rx) = mpsc::channel(1);
385 tx.send(Bitfield::complete(0)).await.ok();
386 Ok(rx)
387 });
388 }
389 ObserveProgress::new(self.client.server_streaming(options, 32))
390 }
391
392 pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
393 trace!("{:?}", options);
394 ExportProgress::new(self.client.server_streaming(options, 32))
395 }
396
397 pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
398 let options = ExportOptions {
399 hash: hash.into(),
400 mode: ExportMode::Copy,
401 target: target.as_ref().to_owned(),
402 };
403 self.export_with_opts(options)
404 }
405
406 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
410 pub async fn import_bao(
411 &self,
412 hash: impl Into<Hash>,
413 size: NonZeroU64,
414 local_update_cap: usize,
415 ) -> irpc::Result<ImportBaoHandle> {
416 let options = ImportBaoRequest {
417 hash: hash.into(),
418 size,
419 };
420 self.import_bao_with_opts(options, local_update_cap).await
421 }
422
423 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
424 pub async fn import_bao_with_opts(
425 &self,
426 options: ImportBaoOptions,
427 local_update_cap: usize,
428 ) -> irpc::Result<ImportBaoHandle> {
429 trace!("{:?}", options);
430 ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
431 }
432
433 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
434 pub async fn import_bao_reader<R: crate::util::RecvStream>(
435 &self,
436 hash: Hash,
437 ranges: ChunkRanges,
438 mut reader: R,
439 ) -> RequestResult<R> {
440 let mut size = [0; 8];
441 reader
442 .recv_exact(&mut size)
443 .await
444 .map_err(super::Error::other)?;
445 let size = u64::from_le_bytes(size);
446 let Some(size) = NonZeroU64::new(size) else {
447 return if hash == Hash::EMPTY {
448 Ok(reader)
449 } else {
450 Err(super::Error::other("invalid size for hash").into())
451 };
452 };
453 let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
454 let mut decoder = ResponseDecoder::new(
455 hash.into(),
456 ranges,
457 tree,
458 RecvStreamAsyncStreamReader::new(reader),
459 );
460 let options = ImportBaoOptions { hash, size };
461 let handle = self.import_bao_with_opts(options, 32).await?;
462 let driver = async move {
463 let reader = loop {
464 match decoder.next().await {
465 ResponseDecoderNext::More((rest, item)) => {
466 handle.tx.send(item?).await?;
467 decoder = rest;
468 }
469 ResponseDecoderNext::Done(reader) => break reader,
470 };
471 };
472 drop(handle.tx);
473 io::Result::Ok(reader)
474 };
475 let fut = async move { handle.rx.await.map_err(io::Error::other)? };
476 let (reader, res) = tokio::join!(driver, fut);
477 res?;
478 Ok(reader?.into_inner())
479 }
480
481 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
482 pub async fn import_bao_bytes(
483 &self,
484 hash: Hash,
485 ranges: ChunkRanges,
486 data: impl Into<Bytes>,
487 ) -> RequestResult<()> {
488 self.import_bao_reader(hash, ranges, data.into()).await?;
489 Ok(())
490 }
491
492 pub fn list(&self) -> BlobsListProgress {
493 let msg = ListRequest;
494 let client = self.client.clone();
495 BlobsListProgress::new(client.server_streaming(msg, 32))
496 }
497
498 pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
499 let hash = hash.into();
500 let msg = BlobStatusRequest { hash };
501 self.client.rpc(msg).await
502 }
503
504 pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
505 match self.status(hash).await? {
506 BlobStatus::Complete { .. } => Ok(true),
507 _ => Ok(false),
508 }
509 }
510
511 #[allow(dead_code)]
512 pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
513 let msg = ClearProtectedRequest;
514 self.client.rpc(msg).await??;
515 Ok(())
516 }
517}
518
519pub struct BatchAddProgress<'a>(AddProgress<'a>);
521
522impl<'a> IntoFuture for BatchAddProgress<'a> {
523 type Output = RequestResult<TempTag>;
524
525 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
526
527 fn into_future(self) -> Self::IntoFuture {
528 Box::pin(self.temp_tag())
529 }
530}
531
532impl<'a> BatchAddProgress<'a> {
533 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
534 self.0.with_named_tag(name).await
535 }
536
537 pub async fn with_tag(self) -> RequestResult<TagInfo> {
538 self.0.with_tag().await
539 }
540
541 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
542 self.0.stream().await
543 }
544
545 pub async fn temp_tag(self) -> RequestResult<TempTag> {
546 self.0.temp_tag().await
547 }
548}
549
550pub struct Batch<'a> {
552 scope: Scope,
553 blobs: &'a Blobs,
554 _tx: mpsc::Sender<BatchResponse>,
555}
556
557impl<'a> Batch<'a> {
558 pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
559 let options = ImportBytesRequest {
560 data: data.into(),
561 format: crate::BlobFormat::Raw,
562 scope: self.scope,
563 };
564 BatchAddProgress(self.blobs.add_bytes_impl(options))
565 }
566
567 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
568 let options = options.into();
569 BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
570 data: options.data,
571 format: options.format,
572 scope: self.scope,
573 }))
574 }
575
576 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
577 let options = ImportBytesRequest {
578 data: Bytes::copy_from_slice(data.as_ref()),
579 format: crate::BlobFormat::Raw,
580 scope: self.scope,
581 };
582 BatchAddProgress(self.blobs.add_bytes_impl(options))
583 }
584
585 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
586 let options = options.into();
587 BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
588 path: options.path,
589 mode: options.mode,
590 format: options.format,
591 scope: self.scope,
592 }))
593 }
594
595 pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
596 let value = value.into();
597 let msg = CreateTempTagRequest {
598 scope: self.scope,
599 value,
600 };
601 self.blobs.client.rpc(msg).await
602 }
603}
604
605#[derive(Debug)]
607pub struct AddPathOptions {
608 pub path: PathBuf,
609 pub format: BlobFormat,
610 pub mode: ImportMode,
611}
612
613pub struct AddProgress<'a> {
624 blobs: &'a Blobs,
625 inner: stream::Boxed<AddProgressItem>,
626}
627
628impl<'a> IntoFuture for AddProgress<'a> {
629 type Output = RequestResult<TagInfo>;
630
631 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
632
633 fn into_future(self) -> Self::IntoFuture {
634 Box::pin(self.with_tag())
635 }
636}
637
638impl<'a> AddProgress<'a> {
639 fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
640 Self {
641 blobs,
642 inner: Box::pin(stream),
643 }
644 }
645
646 pub async fn temp_tag(self) -> RequestResult<TempTag> {
647 let mut stream = self.inner;
648 while let Some(item) = stream.next().await {
649 match item {
650 AddProgressItem::Done(tt) => return Ok(tt),
651 AddProgressItem::Error(e) => return Err(e.into()),
652 _ => {}
653 }
654 }
655 Err(super::Error::other("unexpected end of stream").into())
656 }
657
658 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
659 let blobs = self.blobs.clone();
660 let tt = self.temp_tag().await?;
661 let haf = *tt.hash_and_format();
662 let tags = Tags::ref_from_sender(&blobs.client);
663 tags.set(name, *tt.hash_and_format()).await?;
664 drop(tt);
665 Ok(haf)
666 }
667
668 pub async fn with_tag(self) -> RequestResult<TagInfo> {
669 let blobs = self.blobs.clone();
670 let tt = self.temp_tag().await?;
671 let hash = *tt.hash();
672 let format = tt.format();
673 let tags = Tags::ref_from_sender(&blobs.client);
674 let name = tags.create(*tt.hash_and_format()).await?;
675 drop(tt);
676 Ok(TagInfo { name, hash, format })
677 }
678
679 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
680 self.inner
681 }
682}
683
684#[derive(Debug, Clone, Serialize, Deserialize)]
686pub struct ReaderOptions {
687 pub hash: Hash,
688}
689
690pub struct ObserveProgress {
695 inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
696}
697
698impl IntoFuture for ObserveProgress {
699 type Output = RequestResult<Bitfield>;
700
701 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
702
703 fn into_future(self) -> Self::IntoFuture {
704 Box::pin(async move {
705 let mut rx = self.inner.await?;
706 match rx.recv().await? {
707 Some(bitfield) => Ok(bitfield),
708 None => Err(super::Error::other("unexpected end of stream").into()),
709 }
710 })
711 }
712}
713
714impl ObserveProgress {
715 fn new(
716 fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
717 ) -> Self {
718 Self {
719 inner: Box::pin(fut),
720 }
721 }
722
723 pub async fn await_completion(self) -> RequestResult<Bitfield> {
724 let mut stream = self.stream().await?;
725 while let Some(item) = stream.next().await {
726 if item.is_complete() {
727 return Ok(item);
728 }
729 }
730 Err(super::Error::other("unexpected end of stream").into())
731 }
732
733 pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
738 let mut rx = self.inner.await?;
739 Ok(Gen::new(|co| async move {
740 while let Ok(Some(item)) = rx.recv().await {
741 co.yield_(item).await;
742 }
743 }))
744 }
745}
746
747pub struct ExportProgress {
758 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
759}
760
761impl IntoFuture for ExportProgress {
762 type Output = RequestResult<u64>;
763
764 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
765
766 fn into_future(self) -> Self::IntoFuture {
767 Box::pin(self.finish())
768 }
769}
770
771impl ExportProgress {
772 fn new(
773 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
774 ) -> Self {
775 Self {
776 inner: Box::pin(fut),
777 }
778 }
779
780 pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
781 Gen::new(|co| async move {
782 let mut rx = match self.inner.await {
783 Ok(rx) => rx,
784 Err(e) => {
785 co.yield_(ExportProgressItem::Error(e.into())).await;
786 return;
787 }
788 };
789 while let Ok(Some(item)) = rx.recv().await {
790 co.yield_(item).await;
791 }
792 })
793 }
794
795 pub async fn finish(self) -> RequestResult<u64> {
796 let mut rx = self.inner.await?;
797 let mut size = None;
798 loop {
799 match rx.recv().await? {
800 Some(ExportProgressItem::Done) => break,
801 Some(ExportProgressItem::Size(s)) => size = Some(s),
802 Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
803 _ => {}
804 }
805 }
806 if let Some(size) = size {
807 Ok(size)
808 } else {
809 Err(super::Error::other("unexpected end of stream").into())
810 }
811 }
812}
813
814pub struct ImportBaoHandle {
816 pub tx: mpsc::Sender<BaoContentItem>,
817 pub rx: oneshot::Receiver<super::Result<()>>,
818}
819
820impl ImportBaoHandle {
821 pub(crate) async fn new(
822 fut: impl Future<
823 Output = irpc::Result<(
824 mpsc::Sender<BaoContentItem>,
825 oneshot::Receiver<super::Result<()>>,
826 )>,
827 > + Send
828 + 'static,
829 ) -> irpc::Result<Self> {
830 let (tx, rx) = fut.await?;
831 Ok(Self { tx, rx })
832 }
833}
834
835pub struct BlobsListProgress {
837 inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
838}
839
840impl BlobsListProgress {
841 fn new(
842 fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
843 ) -> Self {
844 Self {
845 inner: Box::pin(fut),
846 }
847 }
848
849 pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
850 let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
851 let mut hashes = Vec::new();
852 while let Some(item) = rx.recv().await? {
853 hashes.push(item?);
854 }
855 Ok(hashes)
856 }
857
858 pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
859 let mut rx = self.inner.await?;
860 Ok(Gen::new(|co| async move {
861 while let Ok(Some(item)) = rx.recv().await {
862 co.yield_(item).await;
863 }
864 }))
865 }
866}
867
868pub struct ExportRangesProgress {
876 ranges: RangeSet2<u64>,
877 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
878}
879
880impl ExportRangesProgress {
881 fn new(
882 ranges: RangeSet2<u64>,
883 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
884 ) -> Self {
885 Self {
886 ranges,
887 inner: Box::pin(fut),
888 }
889 }
890}
891
892impl ExportRangesProgress {
893 pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
900 Gen::new(|co| async move {
901 let mut rx = match self.inner.await {
902 Ok(rx) => rx,
903 Err(e) => {
904 co.yield_(ExportRangesItem::Error(e.into())).await;
905 return;
906 }
907 };
908 while let Ok(Some(item)) = rx.recv().await {
909 co.yield_(item).await;
910 }
911 })
912 }
913
914 pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
916 let mut rx = self.inner.await?;
917 let mut data = BTreeMap::new();
918 while let Some(item) = rx.recv().await? {
919 match item {
920 ExportRangesItem::Size(_) => {}
921 ExportRangesItem::Data(leaf) => {
922 data.insert(leaf.offset, leaf.data);
923 }
924 ExportRangesItem::Error(cause) => return Err(cause.into()),
925 }
926 }
927 let mut res = Vec::new();
928 for range in self.ranges.iter() {
929 let (start, end) = match range {
930 RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
931 RangeSetRange::Range(range) => (*range.start, *range.end),
932 };
933 for (offset, data) in data.iter() {
934 let cstart = *offset;
935 let cend = *offset + (data.len() as u64);
936 if cstart >= end || cend <= start {
937 continue;
938 }
939 let start = start.max(cstart);
940 let end = end.min(cend);
941 let data = &data[(start - cstart) as usize..(end - cstart) as usize];
942 res.extend_from_slice(data);
943 }
944 }
945 Ok(res)
946 }
947}
948
949pub struct ExportBaoProgress {
957 inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
958}
959
960impl ExportBaoProgress {
961 fn new(
962 fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
963 ) -> Self {
964 Self {
965 inner: Box::pin(fut),
966 }
967 }
968
969 pub fn hashes_with_index(
976 self,
977 ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
978 let mut stream = self.stream();
979 Gen::new(|co| async move {
980 while let Some(item) = stream.next().await {
981 let leaf = match item {
982 EncodedItem::Leaf(leaf) => leaf,
983 EncodedItem::Error(e) => {
984 co.yield_(Err(e.into())).await;
985 continue;
986 }
987 _ => continue,
988 };
989 let slice = match HashSeqChunk::try_from(leaf) {
990 Ok(slice) => slice,
991 Err(e) => {
992 co.yield_(Err(e)).await;
993 continue;
994 }
995 };
996 let offset = slice.base();
997 for (o, hash) in slice.into_iter().enumerate() {
998 co.yield_(Ok((offset + o as u64, hash))).await;
999 }
1000 }
1001 })
1002 }
1003
1004 pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
1006 self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1007 }
1008
1009 pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1010 let mut data = Vec::new();
1011 let mut stream = self.into_byte_stream();
1012 while let Some(item) = stream.next().await {
1013 data.extend_from_slice(&item?);
1014 }
1015 Ok(data)
1016 }
1017
1018 pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1019 let mut rx = self.inner.await?;
1020 let mut data = Vec::new();
1021 while let Some(item) = rx.recv().await? {
1022 match item {
1023 EncodedItem::Leaf(leaf) => {
1024 data.push(leaf.data);
1025 }
1026 EncodedItem::Parent(_) => {}
1027 EncodedItem::Size(_) => {}
1028 EncodedItem::Done => break,
1029 EncodedItem::Error(cause) => return Err(cause.into()),
1030 }
1031 }
1032 if data.len() == 1 {
1033 Ok(data.pop().unwrap())
1034 } else {
1035 let mut out = Vec::new();
1036 for item in data {
1037 out.extend_from_slice(&item);
1038 }
1039 Ok(out.into())
1040 }
1041 }
1042
1043 pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1044 let mut rx = self.inner.await?;
1045 let mut data = Vec::new();
1046 while let Some(item) = rx.recv().await? {
1047 match item {
1048 EncodedItem::Leaf(leaf) => {
1049 data.extend_from_slice(&leaf.data);
1050 }
1051 EncodedItem::Parent(_) => {}
1052 EncodedItem::Size(_) => {}
1053 EncodedItem::Done => break,
1054 EncodedItem::Error(cause) => return Err(cause.into()),
1055 }
1056 }
1057 Ok(data)
1058 }
1059
1060 pub async fn write<W: AsyncStreamWriter>(self, target: &mut W) -> super::ExportBaoResult<()> {
1061 let mut rx = self.inner.await?;
1062 while let Some(item) = rx.recv().await? {
1063 match item {
1064 EncodedItem::Size(size) => {
1065 target.write(&size.to_le_bytes()).await?;
1066 }
1067 EncodedItem::Parent(parent) => {
1068 let mut data = vec![0u8; 64];
1069 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1070 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1071 target.write(&data).await?;
1072 }
1073 EncodedItem::Leaf(leaf) => {
1074 target.write_bytes(leaf.data).await?;
1075 }
1076 EncodedItem::Done => break,
1077 EncodedItem::Error(cause) => return Err(cause.into()),
1078 }
1079 }
1080 Ok(())
1081 }
1082
1083 pub(crate) async fn write_with_progress<W: crate::util::SendStream>(
1085 self,
1086 writer: &mut W,
1087 progress: &mut impl WriteProgress,
1088 hash: &Hash,
1089 index: u64,
1090 ) -> super::ExportBaoResult<()> {
1091 let mut rx = self.inner.await?;
1092 while let Some(item) = rx.recv().await? {
1093 match item {
1094 EncodedItem::Size(size) => {
1095 progress.send_transfer_started(index, hash, size).await;
1096 writer.send(&size.to_le_bytes()).await?;
1097 progress.log_other_write(8);
1098 }
1099 EncodedItem::Parent(parent) => {
1100 let mut data = [0u8; 64];
1101 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1102 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1103 writer.send(&data).await?;
1104 progress.log_other_write(64);
1105 }
1106 EncodedItem::Leaf(leaf) => {
1107 let len = leaf.data.len();
1108 writer.send_bytes(leaf.data).await?;
1109 progress
1110 .notify_payload_write(index, leaf.offset, len)
1111 .await?;
1112 }
1113 EncodedItem::Done => break,
1114 EncodedItem::Error(cause) => return Err(cause.into()),
1115 }
1116 }
1117 Ok(())
1118 }
1119
1120 pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1121 self.stream().filter_map(|item| match item {
1122 EncodedItem::Size(size) => {
1123 let size = size.to_le_bytes().to_vec().into();
1124 Some(Ok(size))
1125 }
1126 EncodedItem::Parent(parent) => {
1127 let mut data = vec![0u8; 64];
1128 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1129 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1130 Some(Ok(data.into()))
1131 }
1132 EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1133 EncodedItem::Done => None,
1134 EncodedItem::Error(cause) => Some(Err(cause.into())),
1135 })
1136 }
1137
1138 pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1139 Gen::new(|co| async move {
1140 let mut rx = match self.inner.await {
1141 Ok(rx) => rx,
1142 Err(cause) => {
1143 co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1144 .await;
1145 return;
1146 }
1147 };
1148 while let Ok(Some(item)) = rx.recv().await {
1149 co.yield_(item).await;
1150 }
1151 })
1152 }
1153}
1154
1155pub(crate) trait WriteProgress {
1156 async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) -> ClientResult;
1158
1159 fn log_other_write(&mut self, len: usize);
1161
1162 async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1164}