1use std::{
8 collections::BTreeMap,
9 future::{Future, IntoFuture},
10 io,
11 num::NonZeroU64,
12 path::{Path, PathBuf},
13 pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18 io::{
19 fsm::{ResponseDecoder, ResponseDecoderNext},
20 BaoContentItem, Leaf,
21 },
22 BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::AsyncStreamWriter;
27use irpc::channel::{mpsc, oneshot};
28use n0_error::AnyError;
29use n0_future::{future, stream, Stream, StreamExt};
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use serde::{Deserialize, Serialize};
33use tracing::trace;
34mod reader;
35pub use reader::BlobReader;
36
37pub use super::proto::{
42 AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
43 ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
44 ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
45 ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
46};
47use super::{
48 proto::{
49 BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
50 ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
51 ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
52 },
53 remote::HashSeqChunk,
54 tags::TagInfo,
55 ApiClient, RequestResult, Tags,
56};
57use crate::{
58 api::proto::{BatchRequest, ImportByteStreamUpdate},
59 provider::events::ClientResult,
60 store::IROH_BLOCK_SIZE,
61 util::{temp_tag::TempTag, RecvStreamAsyncStreamReader},
62 BlobFormat, Hash, HashAndFormat,
63};
64
65#[derive(Debug)]
67pub struct AddBytesOptions {
68 pub data: Bytes,
69 pub format: BlobFormat,
70}
71
72impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
73 fn from(item: (T, BlobFormat)) -> Self {
74 let (data, format) = item;
75 Self {
76 data: data.into(),
77 format,
78 }
79 }
80}
81
82#[derive(Debug, Clone, ref_cast::RefCast)]
84#[repr(transparent)]
85pub struct Blobs {
86 client: ApiClient,
87}
88
89impl Blobs {
90 pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
91 Self::ref_cast(sender)
92 }
93
94 pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
95 let msg = BatchRequest;
96 trace!("{msg:?}");
97 let (tx, rx) = self.client.client_streaming(msg, 32).await?;
98 let scope = rx.await?;
99
100 Ok(Batch {
101 scope,
102 blobs: self,
103 _tx: tx,
104 })
105 }
106
107 pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
128 self.reader_with_opts(ReaderOptions { hash: hash.into() })
129 }
130
131 pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
136 BlobReader::new(self.clone(), options)
137 }
138
139 pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
147 trace!("{options:?}");
148 self.client.rpc(options).await??;
149 Ok(())
150 }
151
152 pub(crate) async fn delete(
154 &self,
155 hashes: impl IntoIterator<Item = impl Into<Hash>>,
156 ) -> RequestResult<()> {
157 self.delete_with_opts(DeleteOptions {
158 hashes: hashes.into_iter().map(Into::into).collect(),
159 force: false,
160 })
161 .await
162 }
163
164 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
165 let options = ImportBytesRequest {
166 data: Bytes::copy_from_slice(data.as_ref()),
167 format: crate::BlobFormat::Raw,
168 scope: Scope::GLOBAL,
169 };
170 self.add_bytes_impl(options)
171 }
172
173 pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
174 let options = ImportBytesRequest {
175 data: data.into(),
176 format: crate::BlobFormat::Raw,
177 scope: Scope::GLOBAL,
178 };
179 self.add_bytes_impl(options)
180 }
181
182 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
183 let options = options.into();
184 let request = ImportBytesRequest {
185 data: options.data,
186 format: options.format,
187 scope: Scope::GLOBAL,
188 };
189 self.add_bytes_impl(request)
190 }
191
192 fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
193 trace!("{options:?}");
194 let this = self.clone();
195 let stream = Gen::new(|co| async move {
196 let mut receiver = match this.client.server_streaming(options, 32).await {
197 Ok(receiver) => receiver,
198 Err(cause) => {
199 co.yield_(AddProgressItem::Error(cause.into())).await;
200 return;
201 }
202 };
203 loop {
204 match receiver.recv().await {
205 Ok(Some(item)) => co.yield_(item).await,
206 Err(cause) => {
207 co.yield_(AddProgressItem::Error(cause.into())).await;
208 break;
209 }
210 Ok(None) => break,
211 }
212 }
213 });
214 AddProgress::new(self, stream)
215 }
216
217 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
218 let options = options.into();
219 self.add_path_with_opts_impl(ImportPathRequest {
220 path: options.path,
221 mode: options.mode,
222 format: options.format,
223 scope: Scope::GLOBAL,
224 })
225 }
226
227 fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
228 trace!("{:?}", options);
229 let client = self.client.clone();
230 let stream = Gen::new(|co| async move {
231 let mut receiver = match client.server_streaming(options, 32).await {
232 Ok(receiver) => receiver,
233 Err(cause) => {
234 co.yield_(AddProgressItem::Error(cause.into())).await;
235 return;
236 }
237 };
238 loop {
239 match receiver.recv().await {
240 Ok(Some(item)) => co.yield_(item).await,
241 Err(cause) => {
242 co.yield_(AddProgressItem::Error(cause.into())).await;
243 break;
244 }
245 Ok(None) => break,
246 }
247 }
248 });
249 AddProgress::new(self, stream)
250 }
251
252 pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
253 self.add_path_with_opts(AddPathOptions {
254 path: path.as_ref().to_owned(),
255 mode: ImportMode::Copy,
256 format: BlobFormat::Raw,
257 })
258 }
259
260 pub async fn add_stream(
261 &self,
262 data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
263 ) -> AddProgress<'_> {
264 let inner = ImportByteStreamRequest {
265 format: crate::BlobFormat::Raw,
266 scope: Scope::default(),
267 };
268 let client = self.client.clone();
269 let stream = Gen::new(|co| async move {
270 let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
271 Ok(x) => x,
272 Err(cause) => {
273 co.yield_(AddProgressItem::Error(cause.into())).await;
274 return;
275 }
276 };
277 let recv = async {
278 loop {
279 match receiver.recv().await {
280 Ok(Some(item)) => co.yield_(item).await,
281 Err(cause) => {
282 co.yield_(AddProgressItem::Error(cause.into())).await;
283 break;
284 }
285 Ok(None) => break,
286 }
287 }
288 };
289 let send = async {
290 tokio::pin!(data);
291 while let Some(item) = data.next().await {
292 sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
293 }
294 sender.send(ImportByteStreamUpdate::Done).await?;
295 n0_error::Ok(())
296 };
297 let _ = tokio::join!(send, recv);
298 });
299 AddProgress::new(self, stream)
300 }
301
302 pub fn export_ranges(
303 &self,
304 hash: impl Into<Hash>,
305 ranges: impl Into<RangeSet2<u64>>,
306 ) -> ExportRangesProgress {
307 self.export_ranges_with_opts(ExportRangesOptions {
308 hash: hash.into(),
309 ranges: ranges.into(),
310 })
311 }
312
313 pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
314 trace!("{options:?}");
315 ExportRangesProgress::new(
316 options.ranges.clone(),
317 self.client.server_streaming(options, 32),
318 )
319 }
320
321 pub fn export_bao_with_opts(
322 &self,
323 options: ExportBaoOptions,
324 local_update_cap: usize,
325 ) -> ExportBaoProgress {
326 trace!("{options:?}");
327 ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
328 }
329
330 pub fn export_bao(
331 &self,
332 hash: impl Into<Hash>,
333 ranges: impl Into<ChunkRanges>,
334 ) -> ExportBaoProgress {
335 self.export_bao_with_opts(
336 ExportBaoRequest {
337 hash: hash.into(),
338 ranges: ranges.into(),
339 },
340 32,
341 )
342 }
343
344 pub async fn export_chunk(
346 &self,
347 hash: impl Into<Hash>,
348 offset: u64,
349 ) -> super::ExportBaoResult<Leaf> {
350 let base = ChunkNum::full_chunks(offset);
351 let ranges = ChunkRanges::from(base..base + 1);
352 let mut stream = self.export_bao(hash, ranges).stream();
353 while let Some(item) = stream.next().await {
354 match item {
355 EncodedItem::Leaf(leaf) => return Ok(leaf),
356 EncodedItem::Parent(_) => {}
357 EncodedItem::Size(_) => {}
358 EncodedItem::Done => break,
359 EncodedItem::Error(cause) => return Err(cause.into()),
360 }
361 }
362 Err(io::Error::other("unexpected end of stream").into())
363 }
364
365 pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
369 self.export_bao(hash.into(), ChunkRanges::all())
370 .data_to_bytes()
371 .await
372 }
373
374 pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
376 self.observe_with_opts(ObserveOptions { hash: hash.into() })
377 }
378
379 pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
380 trace!("{:?}", options);
381 if options.hash == Hash::EMPTY {
382 return ObserveProgress::new(async move {
383 let (tx, rx) = mpsc::channel(1);
384 tx.send(Bitfield::complete(0)).await.ok();
385 Ok(rx)
386 });
387 }
388 ObserveProgress::new(self.client.server_streaming(options, 32))
389 }
390
391 pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
392 trace!("{:?}", options);
393 ExportProgress::new(self.client.server_streaming(options, 32))
394 }
395
396 pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
397 let options = ExportOptions {
398 hash: hash.into(),
399 mode: ExportMode::Copy,
400 target: target.as_ref().to_owned(),
401 };
402 self.export_with_opts(options)
403 }
404
405 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
409 pub async fn import_bao(
410 &self,
411 hash: impl Into<Hash>,
412 size: NonZeroU64,
413 local_update_cap: usize,
414 ) -> irpc::Result<ImportBaoHandle> {
415 let options = ImportBaoRequest {
416 hash: hash.into(),
417 size,
418 };
419 self.import_bao_with_opts(options, local_update_cap).await
420 }
421
422 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
423 pub async fn import_bao_with_opts(
424 &self,
425 options: ImportBaoOptions,
426 local_update_cap: usize,
427 ) -> irpc::Result<ImportBaoHandle> {
428 trace!("{:?}", options);
429 ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
430 }
431
432 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
433 pub async fn import_bao_reader<R: crate::util::RecvStream>(
434 &self,
435 hash: Hash,
436 ranges: ChunkRanges,
437 mut reader: R,
438 ) -> RequestResult<R> {
439 let mut size = [0; 8];
440 reader.recv_exact(&mut size).await?;
441 let size = u64::from_le_bytes(size);
442 let Some(size) = NonZeroU64::new(size) else {
443 return if hash == Hash::EMPTY {
444 Ok(reader)
445 } else {
446 Err(io::Error::other("invalid size for hash").into())
447 };
448 };
449 let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
450 let mut decoder = ResponseDecoder::new(
451 hash.into(),
452 ranges,
453 tree,
454 RecvStreamAsyncStreamReader::new(reader),
455 );
456 let options = ImportBaoOptions { hash, size };
457 let handle = self.import_bao_with_opts(options, 32).await?;
458 let driver = async move {
459 let reader = loop {
460 match decoder.next().await {
461 ResponseDecoderNext::More((rest, item)) => {
462 handle.tx.send(item?).await?;
463 decoder = rest;
464 }
465 ResponseDecoderNext::Done(reader) => break reader,
466 };
467 };
468 drop(handle.tx);
469 io::Result::Ok(reader)
470 };
471 let fut = async move { handle.rx.await.map_err(io::Error::other)? };
472 let (reader, res) = tokio::join!(driver, fut);
473 res?;
474 Ok(reader?.into_inner())
475 }
476
477 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
478 pub async fn import_bao_bytes(
479 &self,
480 hash: Hash,
481 ranges: ChunkRanges,
482 data: impl Into<Bytes>,
483 ) -> RequestResult<()> {
484 self.import_bao_reader(hash, ranges, data.into()).await?;
485 Ok(())
486 }
487
488 pub fn list(&self) -> BlobsListProgress {
489 let msg = ListRequest;
490 let client = self.client.clone();
491 BlobsListProgress::new(client.server_streaming(msg, 32))
492 }
493
494 pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
495 let hash = hash.into();
496 let msg = BlobStatusRequest { hash };
497 self.client.rpc(msg).await
498 }
499
500 pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
501 match self.status(hash).await? {
502 BlobStatus::Complete { .. } => Ok(true),
503 _ => Ok(false),
504 }
505 }
506
507 #[allow(dead_code)]
508 pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
509 let msg = ClearProtectedRequest;
510 self.client.rpc(msg).await??;
511 Ok(())
512 }
513}
514
515pub struct BatchAddProgress<'a>(AddProgress<'a>);
517
518impl<'a> IntoFuture for BatchAddProgress<'a> {
519 type Output = RequestResult<TempTag>;
520
521 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
522
523 fn into_future(self) -> Self::IntoFuture {
524 Box::pin(self.temp_tag())
525 }
526}
527
528impl<'a> BatchAddProgress<'a> {
529 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
530 self.0.with_named_tag(name).await
531 }
532
533 pub async fn with_tag(self) -> RequestResult<TagInfo> {
534 self.0.with_tag().await
535 }
536
537 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
538 self.0.stream().await
539 }
540
541 pub async fn temp_tag(self) -> RequestResult<TempTag> {
542 self.0.temp_tag().await
543 }
544}
545
546pub struct Batch<'a> {
548 scope: Scope,
549 blobs: &'a Blobs,
550 _tx: mpsc::Sender<BatchResponse>,
551}
552
553impl<'a> Batch<'a> {
554 pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
555 let options = ImportBytesRequest {
556 data: data.into(),
557 format: crate::BlobFormat::Raw,
558 scope: self.scope,
559 };
560 BatchAddProgress(self.blobs.add_bytes_impl(options))
561 }
562
563 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
564 let options = options.into();
565 BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
566 data: options.data,
567 format: options.format,
568 scope: self.scope,
569 }))
570 }
571
572 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
573 let options = ImportBytesRequest {
574 data: Bytes::copy_from_slice(data.as_ref()),
575 format: crate::BlobFormat::Raw,
576 scope: self.scope,
577 };
578 BatchAddProgress(self.blobs.add_bytes_impl(options))
579 }
580
581 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
582 let options = options.into();
583 BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
584 path: options.path,
585 mode: options.mode,
586 format: options.format,
587 scope: self.scope,
588 }))
589 }
590
591 pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
592 let value = value.into();
593 let msg = CreateTempTagRequest {
594 scope: self.scope,
595 value,
596 };
597 self.blobs.client.rpc(msg).await
598 }
599}
600
601#[derive(Debug)]
603pub struct AddPathOptions {
604 pub path: PathBuf,
605 pub format: BlobFormat,
606 pub mode: ImportMode,
607}
608
609pub struct AddProgress<'a> {
620 blobs: &'a Blobs,
621 inner: stream::Boxed<AddProgressItem>,
622}
623
624impl<'a> IntoFuture for AddProgress<'a> {
625 type Output = RequestResult<TagInfo>;
626
627 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
628
629 fn into_future(self) -> Self::IntoFuture {
630 Box::pin(self.with_tag())
631 }
632}
633
634impl<'a> AddProgress<'a> {
635 fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
636 Self {
637 blobs,
638 inner: Box::pin(stream),
639 }
640 }
641
642 pub async fn temp_tag(self) -> RequestResult<TempTag> {
643 let mut stream = self.inner;
644 while let Some(item) = stream.next().await {
645 match item {
646 AddProgressItem::Done(tt) => return Ok(tt),
647 AddProgressItem::Error(e) => return Err(e.into()),
648 _ => {}
649 }
650 }
651 Err(io::Error::other("unexpected end of stream").into())
652 }
653
654 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
655 let blobs = self.blobs.clone();
656 let tt = self.temp_tag().await?;
657 let haf = tt.hash_and_format();
658 let tags = Tags::ref_from_sender(&blobs.client);
659 tags.set(name, haf).await?;
660 drop(tt);
661 Ok(haf)
662 }
663
664 pub async fn with_tag(self) -> RequestResult<TagInfo> {
665 let blobs = self.blobs.clone();
666 let tt = self.temp_tag().await?;
667 let hash = tt.hash();
668 let format = tt.format();
669 let tags = Tags::ref_from_sender(&blobs.client);
670 let name = tags.create(tt.hash_and_format()).await?;
671 drop(tt);
672 Ok(TagInfo { name, hash, format })
673 }
674
675 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
676 self.inner
677 }
678}
679
680#[derive(Debug, Clone, Serialize, Deserialize)]
682pub struct ReaderOptions {
683 pub hash: Hash,
684}
685
686pub struct ObserveProgress {
691 inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
692}
693
694impl IntoFuture for ObserveProgress {
695 type Output = RequestResult<Bitfield>;
696
697 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
698
699 fn into_future(self) -> Self::IntoFuture {
700 Box::pin(async move {
701 let mut rx = self.inner.await?;
702 match rx.recv().await? {
703 Some(bitfield) => Ok(bitfield),
704 None => Err(io::Error::other("unexpected end of stream").into()),
705 }
706 })
707 }
708}
709
710impl ObserveProgress {
711 fn new(
712 fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
713 ) -> Self {
714 Self {
715 inner: Box::pin(fut),
716 }
717 }
718
719 pub async fn await_completion(self) -> RequestResult<Bitfield> {
720 let mut stream = self.stream().await?;
721 while let Some(item) = stream.next().await {
722 if item.is_complete() {
723 return Ok(item);
724 }
725 }
726 Err(io::Error::other("unexpected end of stream").into())
727 }
728
729 pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
734 let mut rx = self.inner.await?;
735 Ok(Gen::new(|co| async move {
736 while let Ok(Some(item)) = rx.recv().await {
737 co.yield_(item).await;
738 }
739 }))
740 }
741}
742
743pub struct ExportProgress {
754 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
755}
756
757impl IntoFuture for ExportProgress {
758 type Output = RequestResult<u64>;
759
760 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
761
762 fn into_future(self) -> Self::IntoFuture {
763 Box::pin(self.finish())
764 }
765}
766
767impl ExportProgress {
768 fn new(
769 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
770 ) -> Self {
771 Self {
772 inner: Box::pin(fut),
773 }
774 }
775
776 pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
777 Gen::new(|co| async move {
778 let mut rx = match self.inner.await {
779 Ok(rx) => rx,
780 Err(e) => {
781 co.yield_(ExportProgressItem::Error(e.into())).await;
782 return;
783 }
784 };
785 while let Ok(Some(item)) = rx.recv().await {
786 co.yield_(item).await;
787 }
788 })
789 }
790
791 pub async fn finish(self) -> RequestResult<u64> {
792 let mut rx = self.inner.await?;
793 let mut size = None;
794 loop {
795 match rx.recv().await? {
796 Some(ExportProgressItem::Done) => break,
797 Some(ExportProgressItem::Size(s)) => size = Some(s),
798 Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
799 _ => {}
800 }
801 }
802 if let Some(size) = size {
803 Ok(size)
804 } else {
805 Err(io::Error::other("unexpected end of stream").into())
806 }
807 }
808}
809
810pub struct ImportBaoHandle {
812 pub tx: mpsc::Sender<BaoContentItem>,
813 pub rx: oneshot::Receiver<super::Result<()>>,
814}
815
816impl ImportBaoHandle {
817 pub(crate) async fn new(
818 fut: impl Future<
819 Output = irpc::Result<(
820 mpsc::Sender<BaoContentItem>,
821 oneshot::Receiver<super::Result<()>>,
822 )>,
823 > + Send
824 + 'static,
825 ) -> irpc::Result<Self> {
826 let (tx, rx) = fut.await?;
827 Ok(Self { tx, rx })
828 }
829}
830
831pub struct BlobsListProgress {
833 inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
834}
835
836impl BlobsListProgress {
837 fn new(
838 fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
839 ) -> Self {
840 Self {
841 inner: Box::pin(fut),
842 }
843 }
844
845 pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
846 let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
847 let mut hashes = Vec::new();
848 while let Some(item) = rx.recv().await? {
849 hashes.push(item?);
850 }
851 Ok(hashes)
852 }
853
854 pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
855 let mut rx = self.inner.await?;
856 Ok(Gen::new(|co| async move {
857 while let Ok(Some(item)) = rx.recv().await {
858 co.yield_(item).await;
859 }
860 }))
861 }
862}
863
864pub struct ExportRangesProgress {
872 ranges: RangeSet2<u64>,
873 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
874}
875
876impl ExportRangesProgress {
877 fn new(
878 ranges: RangeSet2<u64>,
879 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
880 ) -> Self {
881 Self {
882 ranges,
883 inner: Box::pin(fut),
884 }
885 }
886}
887
888impl ExportRangesProgress {
889 pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
896 Gen::new(|co| async move {
897 let mut rx = match self.inner.await {
898 Ok(rx) => rx,
899 Err(e) => {
900 co.yield_(ExportRangesItem::Error(e.into())).await;
901 return;
902 }
903 };
904 while let Ok(Some(item)) = rx.recv().await {
905 co.yield_(item).await;
906 }
907 })
908 }
909
910 pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
912 let mut rx = self.inner.await?;
913 let mut data = BTreeMap::new();
914 while let Some(item) = rx.recv().await? {
915 match item {
916 ExportRangesItem::Size(_) => {}
917 ExportRangesItem::Data(leaf) => {
918 data.insert(leaf.offset, leaf.data);
919 }
920 ExportRangesItem::Error(cause) => return Err(cause.into()),
921 }
922 }
923 let mut res = Vec::new();
924 for range in self.ranges.iter() {
925 let (start, end) = match range {
926 RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
927 RangeSetRange::Range(range) => (*range.start, *range.end),
928 };
929 for (offset, data) in data.iter() {
930 let cstart = *offset;
931 let cend = *offset + (data.len() as u64);
932 if cstart >= end || cend <= start {
933 continue;
934 }
935 let start = start.max(cstart);
936 let end = end.min(cend);
937 let data = &data[(start - cstart) as usize..(end - cstart) as usize];
938 res.extend_from_slice(data);
939 }
940 }
941 Ok(res)
942 }
943}
944
945pub struct ExportBaoProgress {
953 inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
954}
955
956impl ExportBaoProgress {
957 fn new(
958 fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
959 ) -> Self {
960 Self {
961 inner: Box::pin(fut),
962 }
963 }
964
965 pub fn hashes_with_index(
972 self,
973 ) -> impl Stream<Item = std::result::Result<(u64, Hash), AnyError>> {
974 let mut stream = self.stream();
975 Gen::new(|co| async move {
976 while let Some(item) = stream.next().await {
977 let leaf = match item {
978 EncodedItem::Leaf(leaf) => leaf,
979 EncodedItem::Error(e) => {
980 co.yield_(Err(AnyError::from_std(e))).await;
981 continue;
982 }
983 _ => continue,
984 };
985 let slice = match HashSeqChunk::try_from(leaf) {
986 Ok(slice) => slice,
987 Err(e) => {
988 co.yield_(Err(e)).await;
989 continue;
990 }
991 };
992 let offset = slice.base();
993 for (o, hash) in slice.into_iter().enumerate() {
994 co.yield_(Ok((offset + o as u64, hash))).await;
995 }
996 }
997 })
998 }
999
1000 pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, AnyError>> {
1002 self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1003 }
1004
1005 pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1006 let mut data = Vec::new();
1007 let mut stream = self.into_byte_stream();
1008 while let Some(item) = stream.next().await {
1009 data.extend_from_slice(&item?);
1010 }
1011 Ok(data)
1012 }
1013
1014 pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1015 let mut rx = self.inner.await?;
1016 let mut data = Vec::new();
1017 while let Some(item) = rx.recv().await? {
1018 match item {
1019 EncodedItem::Leaf(leaf) => {
1020 data.push(leaf.data);
1021 }
1022 EncodedItem::Parent(_) => {}
1023 EncodedItem::Size(_) => {}
1024 EncodedItem::Done => break,
1025 EncodedItem::Error(cause) => return Err(cause.into()),
1026 }
1027 }
1028 if data.len() == 1 {
1029 Ok(data.pop().unwrap())
1030 } else {
1031 let mut out = Vec::new();
1032 for item in data {
1033 out.extend_from_slice(&item);
1034 }
1035 Ok(out.into())
1036 }
1037 }
1038
1039 pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1040 let mut rx = self.inner.await?;
1041 let mut data = Vec::new();
1042 while let Some(item) = rx.recv().await? {
1043 match item {
1044 EncodedItem::Leaf(leaf) => {
1045 data.extend_from_slice(&leaf.data);
1046 }
1047 EncodedItem::Parent(_) => {}
1048 EncodedItem::Size(_) => {}
1049 EncodedItem::Done => break,
1050 EncodedItem::Error(cause) => return Err(cause.into()),
1051 }
1052 }
1053 Ok(data)
1054 }
1055
1056 pub async fn write<W: AsyncStreamWriter>(self, target: &mut W) -> super::ExportBaoResult<()> {
1057 let mut rx = self.inner.await?;
1058 while let Some(item) = rx.recv().await? {
1059 match item {
1060 EncodedItem::Size(size) => {
1061 target.write(&size.to_le_bytes()).await?;
1062 }
1063 EncodedItem::Parent(parent) => {
1064 let mut data = vec![0u8; 64];
1065 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1066 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1067 target.write(&data).await?;
1068 }
1069 EncodedItem::Leaf(leaf) => {
1070 target.write_bytes(leaf.data).await?;
1071 }
1072 EncodedItem::Done => break,
1073 EncodedItem::Error(cause) => return Err(cause.into()),
1074 }
1075 }
1076 Ok(())
1077 }
1078
1079 pub(crate) async fn write_with_progress<W: crate::util::SendStream>(
1081 self,
1082 writer: &mut W,
1083 progress: &mut impl WriteProgress,
1084 hash: &Hash,
1085 index: u64,
1086 ) -> super::ExportBaoResult<()> {
1087 let mut rx = self.inner.await?;
1088 while let Some(item) = rx.recv().await? {
1089 match item {
1090 EncodedItem::Size(size) => {
1091 progress.send_transfer_started(index, hash, size).await;
1092 writer.send(&size.to_le_bytes()).await?;
1093 progress.log_other_write(8);
1094 }
1095 EncodedItem::Parent(parent) => {
1096 let mut data = [0u8; 64];
1097 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1098 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1099 writer.send(&data).await?;
1100 progress.log_other_write(64);
1101 }
1102 EncodedItem::Leaf(leaf) => {
1103 let len = leaf.data.len();
1104 writer.send_bytes(leaf.data).await?;
1105 progress
1106 .notify_payload_write(index, leaf.offset, len)
1107 .await?;
1108 }
1109 EncodedItem::Done => break,
1110 EncodedItem::Error(cause) => return Err(cause.into()),
1111 }
1112 }
1113 Ok(())
1114 }
1115
1116 pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1117 self.stream().filter_map(|item| match item {
1118 EncodedItem::Size(size) => {
1119 let size = size.to_le_bytes().to_vec().into();
1120 Some(Ok(size))
1121 }
1122 EncodedItem::Parent(parent) => {
1123 let mut data = vec![0u8; 64];
1124 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1125 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1126 Some(Ok(data.into()))
1127 }
1128 EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1129 EncodedItem::Done => None,
1130 EncodedItem::Error(cause) => Some(Err(cause.into())),
1131 })
1132 }
1133
1134 pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1135 Gen::new(|co| async move {
1136 let mut rx = match self.inner.await {
1137 Ok(rx) => rx,
1138 Err(cause) => {
1139 co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1140 .await;
1141 return;
1142 }
1143 };
1144 while let Ok(Some(item)) = rx.recv().await {
1145 co.yield_(item).await;
1146 }
1147 })
1148 }
1149}
1150
1151pub(crate) trait WriteProgress {
1152 async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) -> ClientResult;
1154
1155 fn log_other_write(&mut self, len: usize);
1157
1158 async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1160}