1use std::{
8 collections::BTreeMap,
9 future::{Future, IntoFuture},
10 io,
11 num::NonZeroU64,
12 path::{Path, PathBuf},
13 pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18 io::{
19 fsm::{ResponseDecoder, ResponseDecoderNext},
20 BaoContentItem, Leaf,
21 },
22 BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::AsyncStreamWriter;
27use irpc::channel::{mpsc, oneshot};
28use n0_error::AnyError;
29use n0_future::{future, stream, Stream, StreamExt};
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use serde::{Deserialize, Serialize};
33use tracing::trace;
34mod reader;
35pub use reader::BlobReader;
36
37pub use super::proto::{
42 AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
43 ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
44 ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
45 ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
46};
47use super::{
48 proto::{
49 BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
50 ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
51 ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
52 },
53 remote::HashSeqChunk,
54 tags::TagInfo,
55 ApiClient, RequestResult, Tags,
56};
57use crate::{
58 api::proto::{BatchRequest, ImportByteStreamUpdate},
59 provider::events::ClientResult,
60 store::IROH_BLOCK_SIZE,
61 util::{temp_tag::TempTag, RecvStreamAsyncStreamReader},
62 BlobFormat, Hash, HashAndFormat,
63};
64
65#[derive(Debug)]
67pub struct AddBytesOptions {
68 pub data: Bytes,
69 pub format: BlobFormat,
70}
71
72impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
73 fn from(item: (T, BlobFormat)) -> Self {
74 let (data, format) = item;
75 Self {
76 data: data.into(),
77 format,
78 }
79 }
80}
81
82#[derive(Debug, Clone, ref_cast::RefCast)]
84#[repr(transparent)]
85pub struct Blobs {
86 client: ApiClient,
87}
88
89impl Blobs {
90 pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
91 Self::ref_cast(sender)
92 }
93
94 pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
95 let msg = BatchRequest;
96 trace!("{msg:?}");
97 let (tx, rx) = self.client.client_streaming(msg, 32).await?;
98 let scope = rx.await?;
99
100 Ok(Batch {
101 scope,
102 blobs: self,
103 _tx: tx,
104 })
105 }
106
107 pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
128 self.reader_with_opts(ReaderOptions { hash: hash.into() })
129 }
130
131 pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
136 BlobReader::new(self.clone(), options)
137 }
138
139 pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
147 trace!("{options:?}");
148 self.client.rpc(options).await??;
149 Ok(())
150 }
151
152 pub(crate) async fn delete(
154 &self,
155 hashes: impl IntoIterator<Item = impl Into<Hash>>,
156 ) -> RequestResult<()> {
157 self.delete_with_opts(DeleteOptions {
158 hashes: hashes.into_iter().map(Into::into).collect(),
159 force: false,
160 })
161 .await
162 }
163
164 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
165 let options = ImportBytesRequest {
166 data: Bytes::copy_from_slice(data.as_ref()),
167 format: crate::BlobFormat::Raw,
168 scope: Scope::GLOBAL,
169 };
170 self.add_bytes_impl(options)
171 }
172
173 pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
174 let options = ImportBytesRequest {
175 data: data.into(),
176 format: crate::BlobFormat::Raw,
177 scope: Scope::GLOBAL,
178 };
179 self.add_bytes_impl(options)
180 }
181
182 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
183 let options = options.into();
184 let request = ImportBytesRequest {
185 data: options.data,
186 format: options.format,
187 scope: Scope::GLOBAL,
188 };
189 self.add_bytes_impl(request)
190 }
191
192 fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
193 trace!("{options:?}");
194 let this = self.clone();
195 let stream = Gen::new(|co| async move {
196 let mut receiver = match this.client.server_streaming(options, 32).await {
197 Ok(receiver) => receiver,
198 Err(cause) => {
199 co.yield_(AddProgressItem::Error(cause.into())).await;
200 return;
201 }
202 };
203 loop {
204 match receiver.recv().await {
205 Ok(Some(item)) => co.yield_(item).await,
206 Err(cause) => {
207 co.yield_(AddProgressItem::Error(cause.into())).await;
208 break;
209 }
210 Ok(None) => break,
211 }
212 }
213 });
214 AddProgress::new(self, stream)
215 }
216
217 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
218 let options = options.into();
219 self.add_path_with_opts_impl(ImportPathRequest {
220 path: options.path,
221 mode: options.mode,
222 format: options.format,
223 scope: Scope::GLOBAL,
224 })
225 }
226
227 fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
228 trace!("{:?}", options);
229 let client = self.client.clone();
230 let stream = Gen::new(|co| async move {
231 let mut receiver = match client.server_streaming(options, 32).await {
232 Ok(receiver) => receiver,
233 Err(cause) => {
234 co.yield_(AddProgressItem::Error(cause.into())).await;
235 return;
236 }
237 };
238 loop {
239 match receiver.recv().await {
240 Ok(Some(item)) => co.yield_(item).await,
241 Err(cause) => {
242 co.yield_(AddProgressItem::Error(cause.into())).await;
243 break;
244 }
245 Ok(None) => break,
246 }
247 }
248 });
249 AddProgress::new(self, stream)
250 }
251
252 pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
253 self.add_path_with_opts(AddPathOptions {
254 path: path.as_ref().to_owned(),
255 mode: ImportMode::Copy,
256 format: BlobFormat::Raw,
257 })
258 }
259
260 pub async fn add_stream(
261 &self,
262 data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
263 ) -> AddProgress<'_> {
264 let inner = ImportByteStreamRequest {
265 format: crate::BlobFormat::Raw,
266 scope: Scope::default(),
267 };
268 let client = self.client.clone();
269 let stream = Gen::new(|co| async move {
270 let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
271 Ok(x) => x,
272 Err(cause) => {
273 co.yield_(AddProgressItem::Error(cause.into())).await;
274 return;
275 }
276 };
277 let recv = async {
278 loop {
279 match receiver.recv().await {
280 Ok(Some(item)) => co.yield_(item).await,
281 Err(cause) => {
282 co.yield_(AddProgressItem::Error(cause.into())).await;
283 break;
284 }
285 Ok(None) => break,
286 }
287 }
288 };
289 let send = async {
290 tokio::pin!(data);
291 while let Some(item) = data.next().await {
292 sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
293 }
294 sender.send(ImportByteStreamUpdate::Done).await?;
295 n0_error::Ok(())
296 };
297 let _ = tokio::join!(send, recv);
298 });
299 AddProgress::new(self, stream)
300 }
301
302 pub fn export_ranges(
303 &self,
304 hash: impl Into<Hash>,
305 ranges: impl Into<RangeSet2<u64>>,
306 ) -> ExportRangesProgress {
307 self.export_ranges_with_opts(ExportRangesOptions {
308 hash: hash.into(),
309 ranges: ranges.into(),
310 })
311 }
312
313 pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
314 trace!("{options:?}");
315 ExportRangesProgress::new(
316 options.ranges.clone(),
317 self.client.server_streaming(options, 32),
318 )
319 }
320
321 pub fn export_bao_with_opts(
322 &self,
323 options: ExportBaoOptions,
324 local_update_cap: usize,
325 ) -> ExportBaoProgress {
326 trace!("{options:?}");
327 ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
328 }
329
330 pub fn export_bao(
331 &self,
332 hash: impl Into<Hash>,
333 ranges: impl Into<ChunkRanges>,
334 ) -> ExportBaoProgress {
335 self.export_bao_with_opts(
336 ExportBaoRequest {
337 hash: hash.into(),
338 ranges: ranges.into(),
339 },
340 32,
341 )
342 }
343
344 pub async fn export_chunk(
346 &self,
347 hash: impl Into<Hash>,
348 offset: u64,
349 ) -> super::ExportBaoResult<Leaf> {
350 let base = ChunkNum::full_chunks(offset);
351 let ranges = ChunkRanges::from(base..base + 1);
352 let mut stream = self.export_bao(hash, ranges).stream();
353 while let Some(item) = stream.next().await {
354 match item {
355 EncodedItem::Leaf(leaf) => return Ok(leaf),
356 EncodedItem::Parent(_) => {}
357 EncodedItem::Size(_) => {}
358 EncodedItem::Done => break,
359 EncodedItem::Error(cause) => return Err(cause.into()),
360 }
361 }
362 Err(io::Error::other("unexpected end of stream").into())
363 }
364
365 pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
369 self.export_bao(hash.into(), ChunkRanges::all())
370 .data_to_bytes()
371 .await
372 }
373
374 pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
376 self.observe_with_opts(ObserveOptions { hash: hash.into() })
377 }
378
379 pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
380 trace!("{:?}", options);
381 if options.hash == Hash::EMPTY {
382 return ObserveProgress::new(async move {
383 let (tx, rx) = mpsc::channel(1);
384 tx.send(Bitfield::complete(0)).await.ok();
385 Ok(rx)
386 });
387 }
388 ObserveProgress::new(self.client.server_streaming(options, 32))
389 }
390
391 pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
392 trace!("{:?}", options);
393 ExportProgress::new(self.client.server_streaming(options, 32))
394 }
395
396 pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
397 let options = ExportOptions {
398 hash: hash.into(),
399 mode: ExportMode::Copy,
400 target: target.as_ref().to_owned(),
401 };
402 self.export_with_opts(options)
403 }
404
405 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
409 pub async fn import_bao(
410 &self,
411 hash: impl Into<Hash>,
412 size: NonZeroU64,
413 local_update_cap: usize,
414 ) -> irpc::Result<ImportBaoHandle> {
415 let options = ImportBaoRequest {
416 hash: hash.into(),
417 size,
418 };
419 self.import_bao_with_opts(options, local_update_cap).await
420 }
421
422 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
423 pub async fn import_bao_with_opts(
424 &self,
425 options: ImportBaoOptions,
426 local_update_cap: usize,
427 ) -> irpc::Result<ImportBaoHandle> {
428 trace!("{:?}", options);
429 ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
430 }
431
432 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
433 pub async fn import_bao_reader<R: crate::util::RecvStream>(
434 &self,
435 hash: Hash,
436 ranges: ChunkRanges,
437 mut reader: R,
438 ) -> RequestResult<R> {
439 let mut size = [0; 8];
440 reader
441 .recv_exact(&mut size)
442 .await
443 .map_err(super::Error::other)?;
444 let size = u64::from_le_bytes(size);
445 let Some(size) = NonZeroU64::new(size) else {
446 return if hash == Hash::EMPTY {
447 Ok(reader)
448 } else {
449 Err(super::Error::other("invalid size for hash").into())
450 };
451 };
452 let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
453 let mut decoder = ResponseDecoder::new(
454 hash.into(),
455 ranges,
456 tree,
457 RecvStreamAsyncStreamReader::new(reader),
458 );
459 let options = ImportBaoOptions { hash, size };
460 let handle = self.import_bao_with_opts(options, 32).await?;
461 let driver = async move {
462 let reader = loop {
463 match decoder.next().await {
464 ResponseDecoderNext::More((rest, item)) => {
465 handle.tx.send(item?).await?;
466 decoder = rest;
467 }
468 ResponseDecoderNext::Done(reader) => break reader,
469 };
470 };
471 drop(handle.tx);
472 io::Result::Ok(reader)
473 };
474 let fut = async move { handle.rx.await.map_err(io::Error::other)? };
475 let (reader, res) = tokio::join!(driver, fut);
476 res?;
477 Ok(reader?.into_inner())
478 }
479
480 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
481 pub async fn import_bao_bytes(
482 &self,
483 hash: Hash,
484 ranges: ChunkRanges,
485 data: impl Into<Bytes>,
486 ) -> RequestResult<()> {
487 self.import_bao_reader(hash, ranges, data.into()).await?;
488 Ok(())
489 }
490
491 pub fn list(&self) -> BlobsListProgress {
492 let msg = ListRequest;
493 let client = self.client.clone();
494 BlobsListProgress::new(client.server_streaming(msg, 32))
495 }
496
497 pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
498 let hash = hash.into();
499 let msg = BlobStatusRequest { hash };
500 self.client.rpc(msg).await
501 }
502
503 pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
504 match self.status(hash).await? {
505 BlobStatus::Complete { .. } => Ok(true),
506 _ => Ok(false),
507 }
508 }
509
510 #[allow(dead_code)]
511 pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
512 let msg = ClearProtectedRequest;
513 self.client.rpc(msg).await??;
514 Ok(())
515 }
516}
517
518pub struct BatchAddProgress<'a>(AddProgress<'a>);
520
521impl<'a> IntoFuture for BatchAddProgress<'a> {
522 type Output = RequestResult<TempTag>;
523
524 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
525
526 fn into_future(self) -> Self::IntoFuture {
527 Box::pin(self.temp_tag())
528 }
529}
530
531impl<'a> BatchAddProgress<'a> {
532 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
533 self.0.with_named_tag(name).await
534 }
535
536 pub async fn with_tag(self) -> RequestResult<TagInfo> {
537 self.0.with_tag().await
538 }
539
540 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
541 self.0.stream().await
542 }
543
544 pub async fn temp_tag(self) -> RequestResult<TempTag> {
545 self.0.temp_tag().await
546 }
547}
548
549pub struct Batch<'a> {
551 scope: Scope,
552 blobs: &'a Blobs,
553 _tx: mpsc::Sender<BatchResponse>,
554}
555
556impl<'a> Batch<'a> {
557 pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
558 let options = ImportBytesRequest {
559 data: data.into(),
560 format: crate::BlobFormat::Raw,
561 scope: self.scope,
562 };
563 BatchAddProgress(self.blobs.add_bytes_impl(options))
564 }
565
566 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
567 let options = options.into();
568 BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
569 data: options.data,
570 format: options.format,
571 scope: self.scope,
572 }))
573 }
574
575 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
576 let options = ImportBytesRequest {
577 data: Bytes::copy_from_slice(data.as_ref()),
578 format: crate::BlobFormat::Raw,
579 scope: self.scope,
580 };
581 BatchAddProgress(self.blobs.add_bytes_impl(options))
582 }
583
584 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
585 let options = options.into();
586 BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
587 path: options.path,
588 mode: options.mode,
589 format: options.format,
590 scope: self.scope,
591 }))
592 }
593
594 pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
595 let value = value.into();
596 let msg = CreateTempTagRequest {
597 scope: self.scope,
598 value,
599 };
600 self.blobs.client.rpc(msg).await
601 }
602}
603
604#[derive(Debug)]
606pub struct AddPathOptions {
607 pub path: PathBuf,
608 pub format: BlobFormat,
609 pub mode: ImportMode,
610}
611
612pub struct AddProgress<'a> {
623 blobs: &'a Blobs,
624 inner: stream::Boxed<AddProgressItem>,
625}
626
627impl<'a> IntoFuture for AddProgress<'a> {
628 type Output = RequestResult<TagInfo>;
629
630 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
631
632 fn into_future(self) -> Self::IntoFuture {
633 Box::pin(self.with_tag())
634 }
635}
636
637impl<'a> AddProgress<'a> {
638 fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
639 Self {
640 blobs,
641 inner: Box::pin(stream),
642 }
643 }
644
645 pub async fn temp_tag(self) -> RequestResult<TempTag> {
646 let mut stream = self.inner;
647 while let Some(item) = stream.next().await {
648 match item {
649 AddProgressItem::Done(tt) => return Ok(tt),
650 AddProgressItem::Error(e) => return Err(e.into()),
651 _ => {}
652 }
653 }
654 Err(super::Error::other("unexpected end of stream").into())
655 }
656
657 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
658 let blobs = self.blobs.clone();
659 let tt = self.temp_tag().await?;
660 let haf = tt.hash_and_format();
661 let tags = Tags::ref_from_sender(&blobs.client);
662 tags.set(name, haf).await?;
663 drop(tt);
664 Ok(haf)
665 }
666
667 pub async fn with_tag(self) -> RequestResult<TagInfo> {
668 let blobs = self.blobs.clone();
669 let tt = self.temp_tag().await?;
670 let hash = tt.hash();
671 let format = tt.format();
672 let tags = Tags::ref_from_sender(&blobs.client);
673 let name = tags.create(tt.hash_and_format()).await?;
674 drop(tt);
675 Ok(TagInfo { name, hash, format })
676 }
677
678 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
679 self.inner
680 }
681}
682
683#[derive(Debug, Clone, Serialize, Deserialize)]
685pub struct ReaderOptions {
686 pub hash: Hash,
687}
688
689pub struct ObserveProgress {
694 inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
695}
696
697impl IntoFuture for ObserveProgress {
698 type Output = RequestResult<Bitfield>;
699
700 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
701
702 fn into_future(self) -> Self::IntoFuture {
703 Box::pin(async move {
704 let mut rx = self.inner.await?;
705 match rx.recv().await? {
706 Some(bitfield) => Ok(bitfield),
707 None => Err(super::Error::other("unexpected end of stream").into()),
708 }
709 })
710 }
711}
712
713impl ObserveProgress {
714 fn new(
715 fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
716 ) -> Self {
717 Self {
718 inner: Box::pin(fut),
719 }
720 }
721
722 pub async fn await_completion(self) -> RequestResult<Bitfield> {
723 let mut stream = self.stream().await?;
724 while let Some(item) = stream.next().await {
725 if item.is_complete() {
726 return Ok(item);
727 }
728 }
729 Err(super::Error::other("unexpected end of stream").into())
730 }
731
732 pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
737 let mut rx = self.inner.await?;
738 Ok(Gen::new(|co| async move {
739 while let Ok(Some(item)) = rx.recv().await {
740 co.yield_(item).await;
741 }
742 }))
743 }
744}
745
746pub struct ExportProgress {
757 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
758}
759
760impl IntoFuture for ExportProgress {
761 type Output = RequestResult<u64>;
762
763 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
764
765 fn into_future(self) -> Self::IntoFuture {
766 Box::pin(self.finish())
767 }
768}
769
770impl ExportProgress {
771 fn new(
772 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
773 ) -> Self {
774 Self {
775 inner: Box::pin(fut),
776 }
777 }
778
779 pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
780 Gen::new(|co| async move {
781 let mut rx = match self.inner.await {
782 Ok(rx) => rx,
783 Err(e) => {
784 co.yield_(ExportProgressItem::Error(e.into())).await;
785 return;
786 }
787 };
788 while let Ok(Some(item)) = rx.recv().await {
789 co.yield_(item).await;
790 }
791 })
792 }
793
794 pub async fn finish(self) -> RequestResult<u64> {
795 let mut rx = self.inner.await?;
796 let mut size = None;
797 loop {
798 match rx.recv().await? {
799 Some(ExportProgressItem::Done) => break,
800 Some(ExportProgressItem::Size(s)) => size = Some(s),
801 Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
802 _ => {}
803 }
804 }
805 if let Some(size) = size {
806 Ok(size)
807 } else {
808 Err(super::Error::other("unexpected end of stream").into())
809 }
810 }
811}
812
813pub struct ImportBaoHandle {
815 pub tx: mpsc::Sender<BaoContentItem>,
816 pub rx: oneshot::Receiver<super::Result<()>>,
817}
818
819impl ImportBaoHandle {
820 pub(crate) async fn new(
821 fut: impl Future<
822 Output = irpc::Result<(
823 mpsc::Sender<BaoContentItem>,
824 oneshot::Receiver<super::Result<()>>,
825 )>,
826 > + Send
827 + 'static,
828 ) -> irpc::Result<Self> {
829 let (tx, rx) = fut.await?;
830 Ok(Self { tx, rx })
831 }
832}
833
834pub struct BlobsListProgress {
836 inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
837}
838
839impl BlobsListProgress {
840 fn new(
841 fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
842 ) -> Self {
843 Self {
844 inner: Box::pin(fut),
845 }
846 }
847
848 pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
849 let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
850 let mut hashes = Vec::new();
851 while let Some(item) = rx.recv().await? {
852 hashes.push(item?);
853 }
854 Ok(hashes)
855 }
856
857 pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
858 let mut rx = self.inner.await?;
859 Ok(Gen::new(|co| async move {
860 while let Ok(Some(item)) = rx.recv().await {
861 co.yield_(item).await;
862 }
863 }))
864 }
865}
866
867pub struct ExportRangesProgress {
875 ranges: RangeSet2<u64>,
876 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
877}
878
879impl ExportRangesProgress {
880 fn new(
881 ranges: RangeSet2<u64>,
882 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
883 ) -> Self {
884 Self {
885 ranges,
886 inner: Box::pin(fut),
887 }
888 }
889}
890
891impl ExportRangesProgress {
892 pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
899 Gen::new(|co| async move {
900 let mut rx = match self.inner.await {
901 Ok(rx) => rx,
902 Err(e) => {
903 co.yield_(ExportRangesItem::Error(e.into())).await;
904 return;
905 }
906 };
907 while let Ok(Some(item)) = rx.recv().await {
908 co.yield_(item).await;
909 }
910 })
911 }
912
913 pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
915 let mut rx = self.inner.await?;
916 let mut data = BTreeMap::new();
917 while let Some(item) = rx.recv().await? {
918 match item {
919 ExportRangesItem::Size(_) => {}
920 ExportRangesItem::Data(leaf) => {
921 data.insert(leaf.offset, leaf.data);
922 }
923 ExportRangesItem::Error(cause) => return Err(cause.into()),
924 }
925 }
926 let mut res = Vec::new();
927 for range in self.ranges.iter() {
928 let (start, end) = match range {
929 RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
930 RangeSetRange::Range(range) => (*range.start, *range.end),
931 };
932 for (offset, data) in data.iter() {
933 let cstart = *offset;
934 let cend = *offset + (data.len() as u64);
935 if cstart >= end || cend <= start {
936 continue;
937 }
938 let start = start.max(cstart);
939 let end = end.min(cend);
940 let data = &data[(start - cstart) as usize..(end - cstart) as usize];
941 res.extend_from_slice(data);
942 }
943 }
944 Ok(res)
945 }
946}
947
948pub struct ExportBaoProgress {
956 inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
957}
958
959impl ExportBaoProgress {
960 fn new(
961 fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
962 ) -> Self {
963 Self {
964 inner: Box::pin(fut),
965 }
966 }
967
968 pub fn hashes_with_index(
975 self,
976 ) -> impl Stream<Item = std::result::Result<(u64, Hash), AnyError>> {
977 let mut stream = self.stream();
978 Gen::new(|co| async move {
979 while let Some(item) = stream.next().await {
980 let leaf = match item {
981 EncodedItem::Leaf(leaf) => leaf,
982 EncodedItem::Error(e) => {
983 co.yield_(Err(AnyError::from_std(e))).await;
984 continue;
985 }
986 _ => continue,
987 };
988 let slice = match HashSeqChunk::try_from(leaf) {
989 Ok(slice) => slice,
990 Err(e) => {
991 co.yield_(Err(e)).await;
992 continue;
993 }
994 };
995 let offset = slice.base();
996 for (o, hash) in slice.into_iter().enumerate() {
997 co.yield_(Ok((offset + o as u64, hash))).await;
998 }
999 }
1000 })
1001 }
1002
1003 pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, AnyError>> {
1005 self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1006 }
1007
1008 pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1009 let mut data = Vec::new();
1010 let mut stream = self.into_byte_stream();
1011 while let Some(item) = stream.next().await {
1012 data.extend_from_slice(&item?);
1013 }
1014 Ok(data)
1015 }
1016
1017 pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1018 let mut rx = self.inner.await?;
1019 let mut data = Vec::new();
1020 while let Some(item) = rx.recv().await? {
1021 match item {
1022 EncodedItem::Leaf(leaf) => {
1023 data.push(leaf.data);
1024 }
1025 EncodedItem::Parent(_) => {}
1026 EncodedItem::Size(_) => {}
1027 EncodedItem::Done => break,
1028 EncodedItem::Error(cause) => return Err(cause.into()),
1029 }
1030 }
1031 if data.len() == 1 {
1032 Ok(data.pop().unwrap())
1033 } else {
1034 let mut out = Vec::new();
1035 for item in data {
1036 out.extend_from_slice(&item);
1037 }
1038 Ok(out.into())
1039 }
1040 }
1041
1042 pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1043 let mut rx = self.inner.await?;
1044 let mut data = Vec::new();
1045 while let Some(item) = rx.recv().await? {
1046 match item {
1047 EncodedItem::Leaf(leaf) => {
1048 data.extend_from_slice(&leaf.data);
1049 }
1050 EncodedItem::Parent(_) => {}
1051 EncodedItem::Size(_) => {}
1052 EncodedItem::Done => break,
1053 EncodedItem::Error(cause) => return Err(cause.into()),
1054 }
1055 }
1056 Ok(data)
1057 }
1058
1059 pub async fn write<W: AsyncStreamWriter>(self, target: &mut W) -> super::ExportBaoResult<()> {
1060 let mut rx = self.inner.await?;
1061 while let Some(item) = rx.recv().await? {
1062 match item {
1063 EncodedItem::Size(size) => {
1064 target.write(&size.to_le_bytes()).await?;
1065 }
1066 EncodedItem::Parent(parent) => {
1067 let mut data = vec![0u8; 64];
1068 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1069 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1070 target.write(&data).await?;
1071 }
1072 EncodedItem::Leaf(leaf) => {
1073 target.write_bytes(leaf.data).await?;
1074 }
1075 EncodedItem::Done => break,
1076 EncodedItem::Error(cause) => return Err(cause.into()),
1077 }
1078 }
1079 Ok(())
1080 }
1081
1082 pub(crate) async fn write_with_progress<W: crate::util::SendStream>(
1084 self,
1085 writer: &mut W,
1086 progress: &mut impl WriteProgress,
1087 hash: &Hash,
1088 index: u64,
1089 ) -> super::ExportBaoResult<()> {
1090 let mut rx = self.inner.await?;
1091 while let Some(item) = rx.recv().await? {
1092 match item {
1093 EncodedItem::Size(size) => {
1094 progress.send_transfer_started(index, hash, size).await;
1095 writer.send(&size.to_le_bytes()).await?;
1096 progress.log_other_write(8);
1097 }
1098 EncodedItem::Parent(parent) => {
1099 let mut data = [0u8; 64];
1100 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1101 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1102 writer.send(&data).await?;
1103 progress.log_other_write(64);
1104 }
1105 EncodedItem::Leaf(leaf) => {
1106 let len = leaf.data.len();
1107 writer.send_bytes(leaf.data).await?;
1108 progress
1109 .notify_payload_write(index, leaf.offset, len)
1110 .await?;
1111 }
1112 EncodedItem::Done => break,
1113 EncodedItem::Error(cause) => return Err(cause.into()),
1114 }
1115 }
1116 Ok(())
1117 }
1118
1119 pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1120 self.stream().filter_map(|item| match item {
1121 EncodedItem::Size(size) => {
1122 let size = size.to_le_bytes().to_vec().into();
1123 Some(Ok(size))
1124 }
1125 EncodedItem::Parent(parent) => {
1126 let mut data = vec![0u8; 64];
1127 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1128 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1129 Some(Ok(data.into()))
1130 }
1131 EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1132 EncodedItem::Done => None,
1133 EncodedItem::Error(cause) => Some(Err(cause.into())),
1134 })
1135 }
1136
1137 pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1138 Gen::new(|co| async move {
1139 let mut rx = match self.inner.await {
1140 Ok(rx) => rx,
1141 Err(cause) => {
1142 co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1143 .await;
1144 return;
1145 }
1146 };
1147 while let Ok(Some(item)) = rx.recv().await {
1148 co.yield_(item).await;
1149 }
1150 })
1151 }
1152}
1153
1154pub(crate) trait WriteProgress {
1155 async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) -> ClientResult;
1157
1158 fn log_other_write(&mut self, len: usize);
1160
1161 async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1163}