iroh_blobs/
get.rs

1//! The low level client side API
2//!
3//! Note that while using this API directly is fine, a simpler way to get data
4//! to a store is to use the [`crate::api::remote`] API, in particular the
5//! [`crate::api::remote::Remote::fetch`] function to download data to your
6//! local store.
7//!
8//! To get data, create a connection using an [`iroh::Endpoint`].
9//!
10//! Create a [`crate::protocol::GetRequest`] describing the data you want to get.
11//!
12//! Then create a state machine using [fsm::start] and
13//! drive it to completion by calling next on each state.
14//!
15//! For some states you have to provide additional arguments when calling next,
16//! or you can choose to finish early.
17//!
18//! [iroh]: https://docs.rs/iroh
19use std::fmt::{self, Debug};
20
21use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
22use fsm::RequestCounters;
23use n0_error::Result;
24use n0_future::time::{Duration, Instant};
25use serde::{Deserialize, Serialize};
26use tracing::{debug, error};
27
28use crate::{
29    protocol::ChunkRangesSeq,
30    store::IROH_BLOCK_SIZE,
31    util::{RecvStream, SendStream},
32    Hash,
33};
34
35mod error;
36pub mod request;
37pub use error::{GetError, GetResult};
38
39type DefaultReader = iroh::endpoint::RecvStream;
40type DefaultWriter = iroh::endpoint::SendStream;
41
42pub struct StreamPair<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
43    pub connection_id: u64,
44    pub t0: Instant,
45    pub recv: R,
46    pub send: W,
47}
48
49impl<R: RecvStream, W: SendStream> StreamPair<R, W> {
50    pub fn new(connection_id: u64, recv: R, send: W) -> Self {
51        Self {
52            t0: Instant::now(),
53            recv,
54            send,
55            connection_id,
56        }
57    }
58}
59
60/// Stats about the transfer.
61#[derive(
62    Debug,
63    Default,
64    Clone,
65    PartialEq,
66    Eq,
67    Serialize,
68    Deserialize,
69    derive_more::Deref,
70    derive_more::DerefMut,
71)]
72pub struct Stats {
73    /// Counters
74    #[deref]
75    #[deref_mut]
76    pub counters: RequestCounters,
77    /// The time it took to transfer the data
78    pub elapsed: Duration,
79}
80
81impl Stats {
82    /// Transfer rate in megabits per second
83    pub fn mbits(&self) -> f64 {
84        let data_len_bit = self.total_bytes_read() * 8;
85        data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
86    }
87
88    pub fn total_bytes_read(&self) -> u64 {
89        self.payload_bytes_read + self.other_bytes_read
90    }
91
92    pub fn combine(&mut self, that: &Stats) {
93        self.payload_bytes_written += that.payload_bytes_written;
94        self.other_bytes_written += that.other_bytes_written;
95        self.payload_bytes_read += that.payload_bytes_read;
96        self.other_bytes_read += that.other_bytes_read;
97        self.elapsed += that.elapsed;
98    }
99}
100
101/// Finite state machine for get responses.
102///
103/// This is the low level API for getting data from a peer.
104#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
105pub mod fsm {
106    use std::{io, result};
107
108    use bao_tree::{
109        io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
110        BaoTree, ChunkRanges, TreeNode,
111    };
112    use derive_more::From;
113    use iroh::endpoint::Connection;
114    use iroh_io::AsyncSliceWriter;
115    use n0_error::{e, stack_error, AnyError};
116
117    use super::*;
118    use crate::{
119        protocol::{
120            GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
121        },
122        util::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
123    };
124
125    self_cell::self_cell! {
126        struct RangesIterInner {
127            owner: ChunkRangesSeq,
128            #[not_covariant]
129            dependent: NonEmptyRequestRangeSpecIter,
130        }
131    }
132
133    /// The entry point of the get response machine
134    pub fn start(
135        connection: Connection,
136        request: GetRequest,
137        counters: RequestCounters,
138    ) -> AtInitial {
139        AtInitial::new(connection, request, counters)
140    }
141
142    /// Start with a get many request. Todo: turn this into distinct states.
143    pub async fn start_get_many(
144        connection: Connection,
145        request: GetManyRequest,
146        counters: RequestCounters,
147    ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
148        let start = Instant::now();
149        let (mut writer, reader) = connection
150            .open_bi()
151            .await
152            .map_err(|e| e!(InitialNextError::Open, e.into()))?;
153        let request = Request::GetMany(request);
154        let request_bytes = postcard::to_stdvec(&request)
155            .map_err(|source| e!(GetError::BadRequest, AnyError::from_std(source)))?;
156        writer
157            .send_bytes(request_bytes.into())
158            .await
159            .map_err(|source| e!(ConnectedNextError::Write, source))?;
160        let Request::GetMany(request) = request else {
161            unreachable!();
162        };
163        let mut ranges_iter = RangesIter::new(request.ranges.clone());
164        let first_item = ranges_iter.next();
165        let misc = Box::new(Misc {
166            counters,
167            start,
168            ranges_iter,
169        });
170        Ok(match first_item {
171            Some((child_offset, child_ranges)) => Ok(AtStartChild {
172                ranges: child_ranges,
173                reader,
174                misc,
175                offset: child_offset,
176            }),
177            None => Err(AtClosing::new(misc, reader, true)),
178        })
179    }
180
181    /// Owned iterator for the ranges in a request
182    ///
183    /// We need an owned iterator for a fsm style API, otherwise we would have
184    /// to drag a lifetime around every single state.
185    struct RangesIter(RangesIterInner);
186
187    impl fmt::Debug for RangesIter {
188        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189            f.debug_struct("RangesIter").finish()
190        }
191    }
192
193    impl RangesIter {
194        pub fn new(owner: ChunkRangesSeq) -> Self {
195            Self(RangesIterInner::new(owner, |owner| {
196                owner.iter_non_empty_infinite()
197            }))
198        }
199
200        pub fn offset(&self) -> u64 {
201            self.0.with_dependent(|_owner, iter| iter.offset())
202        }
203    }
204
205    impl Iterator for RangesIter {
206        type Item = (u64, ChunkRanges);
207
208        fn next(&mut self) -> Option<Self::Item> {
209            self.0.with_dependent_mut(|_owner, iter| {
210                iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
211            })
212        }
213    }
214
215    /// Initial state of the get response machine
216    #[derive(Debug)]
217    pub struct AtInitial {
218        connection: Connection,
219        request: GetRequest,
220        counters: RequestCounters,
221    }
222
223    impl AtInitial {
224        /// Create a new get response
225        ///
226        /// `connection` is an existing connection
227        /// `request` is the request to be sent
228        pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
229            Self {
230                connection,
231                request,
232                counters,
233            }
234        }
235
236        /// Initiate a new bidi stream to use for the get response
237        pub async fn next(self) -> Result<AtConnected, InitialNextError> {
238            let start = Instant::now();
239            let (writer, reader) = self
240                .connection
241                .open_bi()
242                .await
243                .map_err(|e| e!(InitialNextError::Open, e.into()))?;
244            Ok(AtConnected {
245                start,
246                reader,
247                writer,
248                request: self.request,
249                counters: self.counters,
250            })
251        }
252    }
253
254    /// Error that you can get from [`AtInitial::next`]
255    #[stack_error(derive, add_meta, from_sources)]
256    pub enum InitialNextError {
257        #[error("open: {source}")]
258        Open {
259            #[error(std_err)]
260            source: io::Error,
261        },
262    }
263
264    /// State of the get response machine after the handshake has been sent
265    #[derive(Debug)]
266    pub struct AtConnected<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
267        start: Instant,
268        reader: R,
269        writer: W,
270        request: GetRequest,
271        counters: RequestCounters,
272    }
273
274    /// Possible next states after the handshake has been sent
275    #[derive(Debug, From)]
276    pub enum ConnectedNext<R: RecvStream = DefaultReader> {
277        /// First response is either a collection or a single blob
278        StartRoot(AtStartRoot<R>),
279        /// First response is a child
280        StartChild(AtStartChild<R>),
281        /// Request is empty
282        Closing(AtClosing<R>),
283    }
284
285    /// Error that you can get from [`AtConnected::next`]
286    #[stack_error(derive, add_meta)]
287    pub enum ConnectedNextError {
288        /// Error when serializing the request
289        #[error("postcard ser: {source}")]
290        PostcardSer {
291            #[error(std_err)]
292            source: postcard::Error,
293        },
294        /// The serialized request is too long to be sent
295        #[error("request too big")]
296        RequestTooBig {},
297        /// Error when writing the request to the [`SendStream`].
298        #[error("write: {source}")]
299        Write {
300            #[error(std_err)]
301            source: io::Error,
302        },
303    }
304
305    impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
306        pub fn new(
307            start: Instant,
308            reader: R,
309            writer: W,
310            request: GetRequest,
311            counters: RequestCounters,
312        ) -> Self {
313            Self {
314                start,
315                reader,
316                writer,
317                request,
318                counters,
319            }
320        }
321
322        /// Send the request and move to the next state
323        ///
324        /// The next state will be either `StartRoot` or `StartChild` depending on whether
325        /// the request requests part of the collection or not.
326        ///
327        /// If the request is empty, this can also move directly to `Finished`.
328        pub async fn next(self) -> Result<ConnectedNext<R>, ConnectedNextError> {
329            let Self {
330                start,
331                reader,
332                mut writer,
333                mut request,
334                mut counters,
335            } = self;
336            // 1. Send Request
337            counters.other_bytes_written += {
338                debug!("sending request");
339                let wrapped = Request::Get(request);
340                let request_bytes = postcard::to_stdvec(&wrapped)
341                    .map_err(|source| e!(ConnectedNextError::PostcardSer, source))?;
342                let Request::Get(x) = wrapped else {
343                    unreachable!();
344                };
345                request = x;
346
347                if request_bytes.len() > MAX_MESSAGE_SIZE {
348                    return Err(e!(ConnectedNextError::RequestTooBig));
349                }
350
351                // write the request itself
352                let len = request_bytes.len() as u64;
353                writer
354                    .send_bytes(request_bytes.into())
355                    .await
356                    .map_err(|source| e!(ConnectedNextError::Write, source))?;
357                writer
358                    .sync()
359                    .await
360                    .map_err(|source| e!(ConnectedNextError::Write, source))?;
361                len
362            };
363
364            // 2. Finish writing before expecting a response
365            drop(writer);
366
367            let hash = request.hash;
368            let ranges_iter = RangesIter::new(request.ranges);
369            // this is in a box so we don't have to memcpy it on every state transition
370            let mut misc = Box::new(Misc {
371                counters,
372                start,
373                ranges_iter,
374            });
375            Ok(match misc.ranges_iter.next() {
376                Some((offset, ranges)) => {
377                    if offset == 0 {
378                        AtStartRoot {
379                            reader,
380                            ranges,
381                            misc,
382                            hash,
383                        }
384                        .into()
385                    } else {
386                        AtStartChild {
387                            reader,
388                            ranges,
389                            misc,
390                            offset,
391                        }
392                        .into()
393                    }
394                }
395                None => AtClosing::new(misc, reader, true).into(),
396            })
397        }
398    }
399
400    /// State of the get response when we start reading a collection
401    #[derive(Debug)]
402    pub struct AtStartRoot<R: RecvStream = DefaultReader> {
403        ranges: ChunkRanges,
404        reader: R,
405        misc: Box<Misc>,
406        hash: Hash,
407    }
408
409    /// State of the get response when we start reading a child
410    #[derive(Debug)]
411    pub struct AtStartChild<R: RecvStream = DefaultReader> {
412        ranges: ChunkRanges,
413        reader: R,
414        misc: Box<Misc>,
415        offset: u64,
416    }
417
418    impl<R: RecvStream> AtStartChild<R> {
419        /// The offset of the child we are currently reading
420        ///
421        /// This must be used to determine the hash needed to call next.
422        /// If this is larger than the number of children in the collection,
423        /// you can call finish to stop reading the response.
424        pub fn offset(&self) -> u64 {
425            self.offset
426        }
427
428        /// The ranges we have requested for the child
429        pub fn ranges(&self) -> &ChunkRanges {
430            &self.ranges
431        }
432
433        /// Go into the next state, reading the header
434        ///
435        /// This requires passing in the hash of the child for validation
436        pub fn next(self, hash: Hash) -> AtBlobHeader<R> {
437            AtBlobHeader {
438                reader: self.reader,
439                ranges: self.ranges,
440                misc: self.misc,
441                hash,
442            }
443        }
444
445        /// Finish the get response without reading further
446        ///
447        /// This is used if you know that there are no more children from having
448        /// read the collection, or when you want to stop reading the response
449        /// early.
450        pub fn finish(self) -> AtClosing<R> {
451            AtClosing::new(self.misc, self.reader, false)
452        }
453    }
454
455    impl<R: RecvStream> AtStartRoot<R> {
456        /// The ranges we have requested for the child
457        pub fn ranges(&self) -> &ChunkRanges {
458            &self.ranges
459        }
460
461        /// Hash of the root blob
462        pub fn hash(&self) -> Hash {
463            self.hash
464        }
465
466        /// Go into the next state, reading the header
467        ///
468        /// For the collection we already know the hash, since it was part of the request
469        pub fn next(self) -> AtBlobHeader<R> {
470            AtBlobHeader {
471                reader: self.reader,
472                ranges: self.ranges,
473                hash: self.hash,
474                misc: self.misc,
475            }
476        }
477
478        /// Finish the get response without reading further
479        pub fn finish(self) -> AtClosing<R> {
480            AtClosing::new(self.misc, self.reader, false)
481        }
482    }
483
484    /// State before reading a size header
485    #[derive(Debug)]
486    pub struct AtBlobHeader<R: RecvStream = DefaultReader> {
487        ranges: ChunkRanges,
488        reader: R,
489        misc: Box<Misc>,
490        hash: Hash,
491    }
492
493    /// Error that you can get from [`AtBlobHeader::next`]
494    #[stack_error(derive, add_meta)]
495    pub enum AtBlobHeaderNextError {
496        /// Eof when reading the size header
497        ///
498        /// This indicates that the provider does not have the requested data.
499        #[error("not found")]
500        NotFound {},
501        /// Generic io error
502        #[error("io: {source}")]
503        Read {
504            #[error(std_err)]
505            source: io::Error,
506        },
507    }
508
509    impl From<AtBlobHeaderNextError> for io::Error {
510        fn from(cause: AtBlobHeaderNextError) -> Self {
511            match cause {
512                AtBlobHeaderNextError::NotFound { .. } => {
513                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
514                }
515                AtBlobHeaderNextError::Read { source, .. } => source,
516            }
517        }
518    }
519
520    impl<R: RecvStream> AtBlobHeader<R> {
521        /// Read the size header, returning it and going into the `Content` state.
522        pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
523            let mut size = [0; 8];
524            self.reader.recv_exact(&mut size).await.map_err(|cause| {
525                if cause.kind() == io::ErrorKind::UnexpectedEof {
526                    e!(AtBlobHeaderNextError::NotFound)
527                } else {
528                    e!(AtBlobHeaderNextError::Read, cause)
529                }
530            })?;
531            self.misc.other_bytes_read += 8;
532            let size = u64::from_le_bytes(size);
533            let stream = ResponseDecoder::new(
534                self.hash.into(),
535                self.ranges,
536                BaoTree::new(size, IROH_BLOCK_SIZE),
537                RecvStreamAsyncStreamReader::new(self.reader),
538            );
539            Ok((
540                AtBlobContent {
541                    stream,
542                    misc: self.misc,
543                },
544                size,
545            ))
546        }
547
548        /// Drain the response and throw away the result
549        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
550            let (content, _size) = self.next().await?;
551            content.drain().await
552        }
553
554        /// Concatenate the entire response into a vec
555        ///
556        /// For a request that does not request the complete blob, this will just
557        /// concatenate the ranges that were requested.
558        pub async fn concatenate_into_vec(
559            self,
560        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
561            let (content, _size) = self.next().await?;
562            content.concatenate_into_vec().await
563        }
564
565        /// Write the entire blob to a slice writer.
566        pub async fn write_all<D: AsyncSliceWriter>(
567            self,
568            data: D,
569        ) -> result::Result<AtEndBlob<R>, DecodeError> {
570            let (content, _size) = self.next().await?;
571            let res = content.write_all(data).await?;
572            Ok(res)
573        }
574
575        /// Write the entire blob to a slice writer and to an optional outboard.
576        ///
577        /// The outboard is only written to if the blob is larger than a single
578        /// chunk group.
579        pub async fn write_all_with_outboard<D, O>(
580            self,
581            outboard: Option<O>,
582            data: D,
583        ) -> result::Result<AtEndBlob<R>, DecodeError>
584        where
585            D: AsyncSliceWriter,
586            O: OutboardMut,
587        {
588            let (content, _size) = self.next().await?;
589            let res = content.write_all_with_outboard(outboard, data).await?;
590            Ok(res)
591        }
592
593        /// The hash of the blob we are reading.
594        pub fn hash(&self) -> Hash {
595            self.hash
596        }
597
598        /// The ranges we have requested for the current hash.
599        pub fn ranges(&self) -> &ChunkRanges {
600            &self.ranges
601        }
602
603        /// The current offset of the blob we are reading.
604        pub fn offset(&self) -> u64 {
605            self.misc.ranges_iter.offset()
606        }
607    }
608
609    /// State while we are reading content
610    #[derive(Debug)]
611    pub struct AtBlobContent<R: RecvStream = DefaultReader> {
612        stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
613        misc: Box<Misc>,
614    }
615
616    /// Decode error that you can get once you have sent the request and are
617    /// decoding the response, e.g. from [`AtBlobContent::next`].
618    ///
619    /// This is similar to [`bao_tree::io::DecodeError`], but takes into account
620    /// that we are reading from a [`RecvStream`], so read errors will be
621    /// propagated as [`DecodeError::Read`], containing a [`io::Error`].
622    ///
623    /// When the provider finds that it does not have a chunk that we requested,
624    /// or that the chunk is invalid, it will stop sending data without producing
625    /// an error. This is indicated by either the [`DecodeError::ParentNotFound`] or
626    /// [`DecodeError::LeafNotFound`] variant, which can be used to detect that data
627    /// is missing but the connection as well that the provider is otherwise healthy.
628    ///
629    /// The [`DecodeError::ParentHashMismatch`] and [`DecodeError::LeafHashMismatch`]
630    /// variants indicate that the provider has sent us invalid data. A well-behaved
631    /// provider should never do this, so this is an indication that the provider is
632    /// not behaving correctly.
633    #[non_exhaustive]
634    #[stack_error(derive, add_meta)]
635    pub enum DecodeError {
636        /// A chunk was not found or invalid, so the provider stopped sending data
637        #[error("not found")]
638        ChunkNotFound {},
639        /// A parent was not found or invalid, so the provider stopped sending data
640        #[error("parent not found {node:?}")]
641        ParentNotFound { node: TreeNode },
642        /// A parent was not found or invalid, so the provider stopped sending data
643        #[error("chunk not found {num}")]
644        LeafNotFound { num: ChunkNum },
645        /// The hash of a parent did not match the expected hash
646        #[error("parent hash mismatch: {node:?}")]
647        ParentHashMismatch { node: TreeNode },
648        /// The hash of a leaf did not match the expected hash
649        #[error("leaf hash mismatch: {num}")]
650        LeafHashMismatch { num: ChunkNum },
651        /// Error when reading from the stream
652        #[error("read: {source}")]
653        Read {
654            #[error(std_err)]
655            source: io::Error,
656        },
657        /// A generic io error
658        #[error("io: {source}")]
659        Write {
660            #[error(std_err)]
661            source: io::Error,
662        },
663    }
664
665    impl DecodeError {
666        pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
667            e!(DecodeError::LeafHashMismatch { num })
668        }
669    }
670
671    impl From<AtBlobHeaderNextError> for DecodeError {
672        fn from(cause: AtBlobHeaderNextError) -> Self {
673            match cause {
674                AtBlobHeaderNextError::NotFound { .. } => e!(DecodeError::ChunkNotFound),
675                AtBlobHeaderNextError::Read { source, .. } => e!(DecodeError::Read, source),
676            }
677        }
678    }
679
680    impl From<DecodeError> for io::Error {
681        fn from(cause: DecodeError) -> Self {
682            match cause {
683                DecodeError::ParentNotFound { .. } => {
684                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
685                }
686                DecodeError::LeafNotFound { .. } => {
687                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
688                }
689                DecodeError::Read { source, .. } => source,
690                DecodeError::Write { source, .. } => source,
691                _ => io::Error::other(cause),
692            }
693        }
694    }
695
696    impl From<bao_tree::io::DecodeError> for DecodeError {
697        fn from(value: bao_tree::io::DecodeError) -> Self {
698            match value {
699                bao_tree::io::DecodeError::ParentNotFound(node) => {
700                    e!(DecodeError::ParentNotFound { node })
701                }
702                bao_tree::io::DecodeError::LeafNotFound(num) => {
703                    e!(DecodeError::LeafNotFound { num })
704                }
705                bao_tree::io::DecodeError::ParentHashMismatch(node) => {
706                    e!(DecodeError::ParentHashMismatch { node })
707                }
708                bao_tree::io::DecodeError::LeafHashMismatch(num) => {
709                    e!(DecodeError::LeafHashMismatch { num })
710                }
711                bao_tree::io::DecodeError::Io(cause) => e!(DecodeError::Read, cause),
712            }
713        }
714    }
715
716    /// The next state after reading a content item
717    #[derive(Debug, From)]
718    pub enum BlobContentNext<R: RecvStream> {
719        /// We expect more content
720        More(
721            (
722                AtBlobContent<R>,
723                result::Result<BaoContentItem, DecodeError>,
724            ),
725        ),
726        /// We are done with this blob
727        Done(AtEndBlob<R>),
728    }
729
730    impl<R: RecvStream> AtBlobContent<R> {
731        /// Read the next item, either content, an error, or the end of the blob
732        pub async fn next(self) -> BlobContentNext<R> {
733            match self.stream.next().await {
734                ResponseDecoderNext::More((stream, res)) => {
735                    let mut next = Self { stream, ..self };
736                    let res = res.map_err(DecodeError::from);
737                    match &res {
738                        Ok(BaoContentItem::Parent(_)) => {
739                            next.misc.other_bytes_read += 64;
740                        }
741                        Ok(BaoContentItem::Leaf(leaf)) => {
742                            next.misc.payload_bytes_read += leaf.data.len() as u64;
743                        }
744                        _ => {}
745                    }
746                    BlobContentNext::More((next, res))
747                }
748                ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
749                    stream: stream.into_inner(),
750                    misc: self.misc,
751                }),
752            }
753        }
754
755        /// The geometry of the tree we are currently reading.
756        pub fn tree(&self) -> bao_tree::BaoTree {
757            self.stream.tree()
758        }
759
760        /// The hash of the blob we are reading.
761        pub fn hash(&self) -> Hash {
762            (*self.stream.hash()).into()
763        }
764
765        /// The current offset of the blob we are reading.
766        pub fn offset(&self) -> u64 {
767            self.misc.ranges_iter.offset()
768        }
769
770        /// Current stats
771        pub fn stats(&self) -> Stats {
772            Stats {
773                counters: self.misc.counters,
774                elapsed: self.misc.start.elapsed(),
775            }
776        }
777
778        /// Drain the response and throw away the result
779        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
780            let mut content = self;
781            loop {
782                match content.next().await {
783                    BlobContentNext::More((content1, res)) => {
784                        let _ = res?;
785                        content = content1;
786                    }
787                    BlobContentNext::Done(end) => {
788                        break Ok(end);
789                    }
790                }
791            }
792        }
793
794        /// Concatenate the entire response into a vec
795        pub async fn concatenate_into_vec(
796            self,
797        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
798            let mut res = Vec::with_capacity(1024);
799            let mut curr = self;
800            let done = loop {
801                match curr.next().await {
802                    BlobContentNext::More((next, data)) => {
803                        if let BaoContentItem::Leaf(leaf) = data? {
804                            res.extend_from_slice(&leaf.data);
805                        }
806                        curr = next;
807                    }
808                    BlobContentNext::Done(done) => {
809                        // we are done with the root blob
810                        break done;
811                    }
812                }
813            };
814            Ok((done, res))
815        }
816
817        /// Write the entire blob to a slice writer and to an optional outboard.
818        ///
819        /// The outboard is only written to if the blob is larger than a single
820        /// chunk group.
821        pub async fn write_all_with_outboard<D, O>(
822            self,
823            mut outboard: Option<O>,
824            mut data: D,
825        ) -> result::Result<AtEndBlob<R>, DecodeError>
826        where
827            D: AsyncSliceWriter,
828            O: OutboardMut,
829        {
830            let mut content = self;
831            loop {
832                match content.next().await {
833                    BlobContentNext::More((content1, item)) => {
834                        content = content1;
835                        match item? {
836                            BaoContentItem::Parent(parent) => {
837                                if let Some(outboard) = outboard.as_mut() {
838                                    outboard
839                                        .save(parent.node, &parent.pair)
840                                        .await
841                                        .map_err(|e| e!(DecodeError::Write, e))?;
842                                }
843                            }
844                            BaoContentItem::Leaf(leaf) => {
845                                data.write_bytes_at(leaf.offset, leaf.data)
846                                    .await
847                                    .map_err(|e| e!(DecodeError::Write, e))?;
848                            }
849                        }
850                    }
851                    BlobContentNext::Done(end) => {
852                        return Ok(end);
853                    }
854                }
855            }
856        }
857
858        /// Write the entire blob to a slice writer.
859        pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob<R>, DecodeError>
860        where
861            D: AsyncSliceWriter,
862        {
863            let mut content = self;
864            loop {
865                match content.next().await {
866                    BlobContentNext::More((content1, item)) => {
867                        content = content1;
868                        match item? {
869                            BaoContentItem::Parent(_) => {}
870                            BaoContentItem::Leaf(leaf) => {
871                                data.write_bytes_at(leaf.offset, leaf.data)
872                                    .await
873                                    .map_err(|e| e!(DecodeError::Write, e))?;
874                            }
875                        }
876                    }
877                    BlobContentNext::Done(end) => {
878                        return Ok(end);
879                    }
880                }
881            }
882        }
883
884        /// Immediately finish the get response without reading further
885        pub fn finish(self) -> AtClosing<R> {
886            AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
887        }
888    }
889
890    /// State after we have read all the content for a blob
891    #[derive(Debug)]
892    pub struct AtEndBlob<R: RecvStream = DefaultReader> {
893        stream: R,
894        misc: Box<Misc>,
895    }
896
897    /// The next state after the end of a blob
898    #[derive(Debug, From)]
899    pub enum EndBlobNext<R: RecvStream = DefaultReader> {
900        /// Response is expected to have more children
901        MoreChildren(AtStartChild<R>),
902        /// No more children expected
903        Closing(AtClosing<R>),
904    }
905
906    impl<R: RecvStream> AtEndBlob<R> {
907        /// Read the next child, or finish
908        pub fn next(mut self) -> EndBlobNext<R> {
909            if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
910                AtStartChild {
911                    reader: self.stream,
912                    offset,
913                    ranges,
914                    misc: self.misc,
915                }
916                .into()
917            } else {
918                AtClosing::new(self.misc, self.stream, true).into()
919            }
920        }
921    }
922
923    /// State when finishing the get response
924    #[derive(Debug)]
925    pub struct AtClosing<R: RecvStream = DefaultReader> {
926        misc: Box<Misc>,
927        reader: R,
928        check_extra_data: bool,
929    }
930
931    impl<R: RecvStream> AtClosing<R> {
932        fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
933            Self {
934                misc,
935                reader,
936                check_extra_data,
937            }
938        }
939
940        /// Finish the get response, returning statistics
941        pub async fn next(self) -> result::Result<Stats, AtClosingNextError> {
942            // Shut down the stream
943            let mut reader = self.reader;
944            if self.check_extra_data {
945                let rest = reader.recv_bytes(1).await?;
946                if !rest.is_empty() {
947                    error!("Unexpected extra data at the end of the stream");
948                }
949            }
950            Ok(Stats {
951                counters: self.misc.counters,
952                elapsed: self.misc.start.elapsed(),
953            })
954        }
955    }
956
957    /// Error that you can get from [`AtBlobHeader::next`]
958    #[stack_error(derive, add_meta, from_sources)]
959    pub enum AtClosingNextError {
960        /// Generic io error
961        #[error(transparent)]
962        Read {
963            #[error(std_err)]
964            source: io::Error,
965        },
966    }
967
968    #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
969    pub struct RequestCounters {
970        /// payload bytes written
971        pub payload_bytes_written: u64,
972        /// request, hash pair and size bytes written
973        pub other_bytes_written: u64,
974        /// payload bytes read
975        pub payload_bytes_read: u64,
976        /// hash pair and size bytes read
977        pub other_bytes_read: u64,
978    }
979
980    /// Stuff we need to hold on to while going through the machine states
981    #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
982    struct Misc {
983        /// start time for statistics
984        start: Instant,
985        /// counters
986        #[deref]
987        #[deref_mut]
988        counters: RequestCounters,
989        /// iterator over the ranges of the collection and the children
990        ranges_iter: RangesIter,
991    }
992}