iroh_blobs/
get.rs

1//! The low level client side API
2//!
3//! Note that while using this API directly is fine, a simpler way to get data
4//! to a store is to use the [`crate::api::remote`] API, in particular the
5//! [`crate::api::remote::Remote::fetch`] function to download data to your
6//! local store.
7//!
8//! To get data, create a connection using an [`iroh::Endpoint`].
9//!
10//! Create a [`crate::protocol::GetRequest`] describing the data you want to get.
11//!
12//! Then create a state machine using [fsm::start] and
13//! drive it to completion by calling next on each state.
14//!
15//! For some states you have to provide additional arguments when calling next,
16//! or you can choose to finish early.
17//!
18//! [iroh]: https://docs.rs/iroh
19use std::{
20    fmt::{self, Debug},
21    time::Duration,
22};
23
24use anyhow::Result;
25use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
26use fsm::RequestCounters;
27use n0_future::time::Instant;
28use n0_snafu::SpanTrace;
29use nested_enum_utils::common_fields;
30use serde::{Deserialize, Serialize};
31use snafu::{Backtrace, IntoError, ResultExt, Snafu};
32use tracing::{debug, error};
33
34use crate::{
35    protocol::ChunkRangesSeq,
36    store::IROH_BLOCK_SIZE,
37    util::{RecvStream, SendStream},
38    Hash,
39};
40
41mod error;
42pub mod request;
43pub(crate) use error::get_error;
44pub use error::{GetError, GetResult};
45
46type DefaultReader = iroh::endpoint::RecvStream;
47type DefaultWriter = iroh::endpoint::SendStream;
48
49pub struct StreamPair<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
50    pub connection_id: u64,
51    pub t0: Instant,
52    pub recv: R,
53    pub send: W,
54}
55
56impl<R: RecvStream, W: SendStream> StreamPair<R, W> {
57    pub fn new(connection_id: u64, recv: R, send: W) -> Self {
58        Self {
59            t0: Instant::now(),
60            recv,
61            send,
62            connection_id,
63        }
64    }
65}
66
67/// Stats about the transfer.
68#[derive(
69    Debug,
70    Default,
71    Clone,
72    PartialEq,
73    Eq,
74    Serialize,
75    Deserialize,
76    derive_more::Deref,
77    derive_more::DerefMut,
78)]
79pub struct Stats {
80    /// Counters
81    #[deref]
82    #[deref_mut]
83    pub counters: RequestCounters,
84    /// The time it took to transfer the data
85    pub elapsed: Duration,
86}
87
88impl Stats {
89    /// Transfer rate in megabits per second
90    pub fn mbits(&self) -> f64 {
91        let data_len_bit = self.total_bytes_read() * 8;
92        data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
93    }
94
95    pub fn total_bytes_read(&self) -> u64 {
96        self.payload_bytes_read + self.other_bytes_read
97    }
98
99    pub fn combine(&mut self, that: &Stats) {
100        self.payload_bytes_written += that.payload_bytes_written;
101        self.other_bytes_written += that.other_bytes_written;
102        self.payload_bytes_read += that.payload_bytes_read;
103        self.other_bytes_read += that.other_bytes_read;
104        self.elapsed += that.elapsed;
105    }
106}
107
108/// Finite state machine for get responses.
109///
110/// This is the low level API for getting data from a peer.
111#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
112pub mod fsm {
113    use std::{io, result};
114
115    use bao_tree::{
116        io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
117        BaoTree, ChunkRanges, TreeNode,
118    };
119    use derive_more::From;
120    use iroh::endpoint::Connection;
121    use iroh_io::AsyncSliceWriter;
122
123    use super::*;
124    use crate::{
125        get::get_error::BadRequestSnafu,
126        protocol::{
127            GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
128        },
129        util::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
130    };
131
132    self_cell::self_cell! {
133        struct RangesIterInner {
134            owner: ChunkRangesSeq,
135            #[not_covariant]
136            dependent: NonEmptyRequestRangeSpecIter,
137        }
138    }
139
140    /// The entry point of the get response machine
141    pub fn start(
142        connection: Connection,
143        request: GetRequest,
144        counters: RequestCounters,
145    ) -> AtInitial {
146        AtInitial::new(connection, request, counters)
147    }
148
149    /// Start with a get many request. Todo: turn this into distinct states.
150    pub async fn start_get_many(
151        connection: Connection,
152        request: GetManyRequest,
153        counters: RequestCounters,
154    ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
155        let start = Instant::now();
156        let (mut writer, reader) = connection
157            .open_bi()
158            .await
159            .map_err(|e| OpenSnafu.into_error(e.into()))?;
160        let request = Request::GetMany(request);
161        let request_bytes = postcard::to_stdvec(&request)
162            .map_err(|source| BadRequestSnafu.into_error(source.into()))?;
163        writer
164            .send_bytes(request_bytes.into())
165            .await
166            .context(connected_next_error::WriteSnafu)?;
167        let Request::GetMany(request) = request else {
168            unreachable!();
169        };
170        let mut ranges_iter = RangesIter::new(request.ranges.clone());
171        let first_item = ranges_iter.next();
172        let misc = Box::new(Misc {
173            counters,
174            start,
175            ranges_iter,
176        });
177        Ok(match first_item {
178            Some((child_offset, child_ranges)) => Ok(AtStartChild {
179                ranges: child_ranges,
180                reader,
181                misc,
182                offset: child_offset,
183            }),
184            None => Err(AtClosing::new(misc, reader, true)),
185        })
186    }
187
188    /// Owned iterator for the ranges in a request
189    ///
190    /// We need an owned iterator for a fsm style API, otherwise we would have
191    /// to drag a lifetime around every single state.
192    struct RangesIter(RangesIterInner);
193
194    impl fmt::Debug for RangesIter {
195        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196            f.debug_struct("RangesIter").finish()
197        }
198    }
199
200    impl RangesIter {
201        pub fn new(owner: ChunkRangesSeq) -> Self {
202            Self(RangesIterInner::new(owner, |owner| {
203                owner.iter_non_empty_infinite()
204            }))
205        }
206
207        pub fn offset(&self) -> u64 {
208            self.0.with_dependent(|_owner, iter| iter.offset())
209        }
210    }
211
212    impl Iterator for RangesIter {
213        type Item = (u64, ChunkRanges);
214
215        fn next(&mut self) -> Option<Self::Item> {
216            self.0.with_dependent_mut(|_owner, iter| {
217                iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
218            })
219        }
220    }
221
222    /// Initial state of the get response machine
223    #[derive(Debug)]
224    pub struct AtInitial {
225        connection: Connection,
226        request: GetRequest,
227        counters: RequestCounters,
228    }
229
230    impl AtInitial {
231        /// Create a new get response
232        ///
233        /// `connection` is an existing connection
234        /// `request` is the request to be sent
235        pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
236            Self {
237                connection,
238                request,
239                counters,
240            }
241        }
242
243        /// Initiate a new bidi stream to use for the get response
244        pub async fn next(self) -> Result<AtConnected, InitialNextError> {
245            let start = Instant::now();
246            let (writer, reader) = self
247                .connection
248                .open_bi()
249                .await
250                .map_err(|e| OpenSnafu.into_error(e.into()))?;
251            Ok(AtConnected {
252                start,
253                reader,
254                writer,
255                request: self.request,
256                counters: self.counters,
257            })
258        }
259    }
260
261    /// Error that you can get from [`AtConnected::next`]
262    #[common_fields({
263        backtrace: Option<Backtrace>,
264        #[snafu(implicit)]
265        span_trace: SpanTrace,
266    })]
267    #[allow(missing_docs)]
268    #[derive(Debug, Snafu)]
269    #[non_exhaustive]
270    pub enum InitialNextError {
271        Open { source: io::Error },
272    }
273
274    /// State of the get response machine after the handshake has been sent
275    #[derive(Debug)]
276    pub struct AtConnected<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
277        start: Instant,
278        reader: R,
279        writer: W,
280        request: GetRequest,
281        counters: RequestCounters,
282    }
283
284    /// Possible next states after the handshake has been sent
285    #[derive(Debug, From)]
286    pub enum ConnectedNext<R: RecvStream = DefaultReader> {
287        /// First response is either a collection or a single blob
288        StartRoot(AtStartRoot<R>),
289        /// First response is a child
290        StartChild(AtStartChild<R>),
291        /// Request is empty
292        Closing(AtClosing<R>),
293    }
294
295    /// Error that you can get from [`AtConnected::next`]
296    #[common_fields({
297        backtrace: Option<Backtrace>,
298        #[snafu(implicit)]
299        span_trace: SpanTrace,
300    })]
301    #[allow(missing_docs)]
302    #[derive(Debug, Snafu)]
303    #[snafu(module)]
304    #[non_exhaustive]
305    pub enum ConnectedNextError {
306        /// Error when serializing the request
307        #[snafu(display("postcard ser: {source}"))]
308        PostcardSer { source: postcard::Error },
309        /// The serialized request is too long to be sent
310        #[snafu(display("request too big"))]
311        RequestTooBig {},
312        /// Error when writing the request to the [`SendStream`].
313        #[snafu(display("write: {source}"))]
314        Write { source: io::Error },
315    }
316
317    impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
318        pub fn new(
319            start: Instant,
320            reader: R,
321            writer: W,
322            request: GetRequest,
323            counters: RequestCounters,
324        ) -> Self {
325            Self {
326                start,
327                reader,
328                writer,
329                request,
330                counters,
331            }
332        }
333
334        /// Send the request and move to the next state
335        ///
336        /// The next state will be either `StartRoot` or `StartChild` depending on whether
337        /// the request requests part of the collection or not.
338        ///
339        /// If the request is empty, this can also move directly to `Finished`.
340        pub async fn next(self) -> Result<ConnectedNext<R>, ConnectedNextError> {
341            let Self {
342                start,
343                reader,
344                mut writer,
345                mut request,
346                mut counters,
347            } = self;
348            // 1. Send Request
349            counters.other_bytes_written += {
350                debug!("sending request");
351                let wrapped = Request::Get(request);
352                let request_bytes = postcard::to_stdvec(&wrapped)
353                    .context(connected_next_error::PostcardSerSnafu)?;
354                let Request::Get(x) = wrapped else {
355                    unreachable!();
356                };
357                request = x;
358
359                if request_bytes.len() > MAX_MESSAGE_SIZE {
360                    return Err(connected_next_error::RequestTooBigSnafu.build());
361                }
362
363                // write the request itself
364                let len = request_bytes.len() as u64;
365                writer
366                    .send_bytes(request_bytes.into())
367                    .await
368                    .context(connected_next_error::WriteSnafu)?;
369                writer
370                    .sync()
371                    .await
372                    .context(connected_next_error::WriteSnafu)?;
373                len
374            };
375
376            // 2. Finish writing before expecting a response
377            drop(writer);
378
379            let hash = request.hash;
380            let ranges_iter = RangesIter::new(request.ranges);
381            // this is in a box so we don't have to memcpy it on every state transition
382            let mut misc = Box::new(Misc {
383                counters,
384                start,
385                ranges_iter,
386            });
387            Ok(match misc.ranges_iter.next() {
388                Some((offset, ranges)) => {
389                    if offset == 0 {
390                        AtStartRoot {
391                            reader,
392                            ranges,
393                            misc,
394                            hash,
395                        }
396                        .into()
397                    } else {
398                        AtStartChild {
399                            reader,
400                            ranges,
401                            misc,
402                            offset,
403                        }
404                        .into()
405                    }
406                }
407                None => AtClosing::new(misc, reader, true).into(),
408            })
409        }
410    }
411
412    /// State of the get response when we start reading a collection
413    #[derive(Debug)]
414    pub struct AtStartRoot<R: RecvStream = DefaultReader> {
415        ranges: ChunkRanges,
416        reader: R,
417        misc: Box<Misc>,
418        hash: Hash,
419    }
420
421    /// State of the get response when we start reading a child
422    #[derive(Debug)]
423    pub struct AtStartChild<R: RecvStream = DefaultReader> {
424        ranges: ChunkRanges,
425        reader: R,
426        misc: Box<Misc>,
427        offset: u64,
428    }
429
430    impl<R: RecvStream> AtStartChild<R> {
431        /// The offset of the child we are currently reading
432        ///
433        /// This must be used to determine the hash needed to call next.
434        /// If this is larger than the number of children in the collection,
435        /// you can call finish to stop reading the response.
436        pub fn offset(&self) -> u64 {
437            self.offset
438        }
439
440        /// The ranges we have requested for the child
441        pub fn ranges(&self) -> &ChunkRanges {
442            &self.ranges
443        }
444
445        /// Go into the next state, reading the header
446        ///
447        /// This requires passing in the hash of the child for validation
448        pub fn next(self, hash: Hash) -> AtBlobHeader<R> {
449            AtBlobHeader {
450                reader: self.reader,
451                ranges: self.ranges,
452                misc: self.misc,
453                hash,
454            }
455        }
456
457        /// Finish the get response without reading further
458        ///
459        /// This is used if you know that there are no more children from having
460        /// read the collection, or when you want to stop reading the response
461        /// early.
462        pub fn finish(self) -> AtClosing<R> {
463            AtClosing::new(self.misc, self.reader, false)
464        }
465    }
466
467    impl<R: RecvStream> AtStartRoot<R> {
468        /// The ranges we have requested for the child
469        pub fn ranges(&self) -> &ChunkRanges {
470            &self.ranges
471        }
472
473        /// Hash of the root blob
474        pub fn hash(&self) -> Hash {
475            self.hash
476        }
477
478        /// Go into the next state, reading the header
479        ///
480        /// For the collection we already know the hash, since it was part of the request
481        pub fn next(self) -> AtBlobHeader<R> {
482            AtBlobHeader {
483                reader: self.reader,
484                ranges: self.ranges,
485                hash: self.hash,
486                misc: self.misc,
487            }
488        }
489
490        /// Finish the get response without reading further
491        pub fn finish(self) -> AtClosing<R> {
492            AtClosing::new(self.misc, self.reader, false)
493        }
494    }
495
496    /// State before reading a size header
497    #[derive(Debug)]
498    pub struct AtBlobHeader<R: RecvStream = DefaultReader> {
499        ranges: ChunkRanges,
500        reader: R,
501        misc: Box<Misc>,
502        hash: Hash,
503    }
504
505    /// Error that you can get from [`AtBlobHeader::next`]
506    #[common_fields({
507        backtrace: Option<Backtrace>,
508        #[snafu(implicit)]
509        span_trace: SpanTrace,
510    })]
511    #[non_exhaustive]
512    #[derive(Debug, Snafu)]
513    #[snafu(module)]
514    pub enum AtBlobHeaderNextError {
515        /// Eof when reading the size header
516        ///
517        /// This indicates that the provider does not have the requested data.
518        #[snafu(display("not found"))]
519        NotFound {},
520        /// Generic io error
521        #[snafu(display("io: {source}"))]
522        Read { source: io::Error },
523    }
524
525    impl From<AtBlobHeaderNextError> for io::Error {
526        fn from(cause: AtBlobHeaderNextError) -> Self {
527            match cause {
528                AtBlobHeaderNextError::NotFound { .. } => {
529                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
530                }
531                AtBlobHeaderNextError::Read { source, .. } => source,
532            }
533        }
534    }
535
536    impl<R: RecvStream> AtBlobHeader<R> {
537        /// Read the size header, returning it and going into the `Content` state.
538        pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
539            let mut size = [0; 8];
540            self.reader.recv_exact(&mut size).await.map_err(|cause| {
541                if cause.kind() == io::ErrorKind::UnexpectedEof {
542                    at_blob_header_next_error::NotFoundSnafu.build()
543                } else {
544                    at_blob_header_next_error::ReadSnafu.into_error(cause)
545                }
546            })?;
547            self.misc.other_bytes_read += 8;
548            let size = u64::from_le_bytes(size);
549            let stream = ResponseDecoder::new(
550                self.hash.into(),
551                self.ranges,
552                BaoTree::new(size, IROH_BLOCK_SIZE),
553                RecvStreamAsyncStreamReader::new(self.reader),
554            );
555            Ok((
556                AtBlobContent {
557                    stream,
558                    misc: self.misc,
559                },
560                size,
561            ))
562        }
563
564        /// Drain the response and throw away the result
565        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
566            let (content, _size) = self.next().await?;
567            content.drain().await
568        }
569
570        /// Concatenate the entire response into a vec
571        ///
572        /// For a request that does not request the complete blob, this will just
573        /// concatenate the ranges that were requested.
574        pub async fn concatenate_into_vec(
575            self,
576        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
577            let (content, _size) = self.next().await?;
578            content.concatenate_into_vec().await
579        }
580
581        /// Write the entire blob to a slice writer.
582        pub async fn write_all<D: AsyncSliceWriter>(
583            self,
584            data: D,
585        ) -> result::Result<AtEndBlob<R>, DecodeError> {
586            let (content, _size) = self.next().await?;
587            let res = content.write_all(data).await?;
588            Ok(res)
589        }
590
591        /// Write the entire blob to a slice writer and to an optional outboard.
592        ///
593        /// The outboard is only written to if the blob is larger than a single
594        /// chunk group.
595        pub async fn write_all_with_outboard<D, O>(
596            self,
597            outboard: Option<O>,
598            data: D,
599        ) -> result::Result<AtEndBlob<R>, DecodeError>
600        where
601            D: AsyncSliceWriter,
602            O: OutboardMut,
603        {
604            let (content, _size) = self.next().await?;
605            let res = content.write_all_with_outboard(outboard, data).await?;
606            Ok(res)
607        }
608
609        /// The hash of the blob we are reading.
610        pub fn hash(&self) -> Hash {
611            self.hash
612        }
613
614        /// The ranges we have requested for the current hash.
615        pub fn ranges(&self) -> &ChunkRanges {
616            &self.ranges
617        }
618
619        /// The current offset of the blob we are reading.
620        pub fn offset(&self) -> u64 {
621            self.misc.ranges_iter.offset()
622        }
623    }
624
625    /// State while we are reading content
626    #[derive(Debug)]
627    pub struct AtBlobContent<R: RecvStream = DefaultReader> {
628        stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
629        misc: Box<Misc>,
630    }
631
632    /// Decode error that you can get once you have sent the request and are
633    /// decoding the response, e.g. from [`AtBlobContent::next`].
634    ///
635    /// This is similar to [`bao_tree::io::DecodeError`], but takes into account
636    /// that we are reading from a [`RecvStream`], so read errors will be
637    /// propagated as [`DecodeError::Read`], containing a [`io::Error`].
638    ///
639    /// When the provider finds that it does not have a chunk that we requested,
640    /// or that the chunk is invalid, it will stop sending data without producing
641    /// an error. This is indicated by either the [`DecodeError::ParentNotFound`] or
642    /// [`DecodeError::LeafNotFound`] variant, which can be used to detect that data
643    /// is missing but the connection as well that the provider is otherwise healthy.
644    ///
645    /// The [`DecodeError::ParentHashMismatch`] and [`DecodeError::LeafHashMismatch`]
646    /// variants indicate that the provider has sent us invalid data. A well-behaved
647    /// provider should never do this, so this is an indication that the provider is
648    /// not behaving correctly.
649    #[common_fields({
650        backtrace: Option<Backtrace>,
651        #[snafu(implicit)]
652        span_trace: SpanTrace,
653    })]
654    #[non_exhaustive]
655    #[derive(Debug, Snafu)]
656    #[snafu(module)]
657    pub enum DecodeError {
658        /// A chunk was not found or invalid, so the provider stopped sending data
659        #[snafu(display("not found"))]
660        ChunkNotFound {},
661        /// A parent was not found or invalid, so the provider stopped sending data
662        #[snafu(display("parent not found {node:?}"))]
663        ParentNotFound { node: TreeNode },
664        /// A parent was not found or invalid, so the provider stopped sending data
665        #[snafu(display("chunk not found {num}"))]
666        LeafNotFound { num: ChunkNum },
667        /// The hash of a parent did not match the expected hash
668        #[snafu(display("parent hash mismatch: {node:?}"))]
669        ParentHashMismatch { node: TreeNode },
670        /// The hash of a leaf did not match the expected hash
671        #[snafu(display("leaf hash mismatch: {num}"))]
672        LeafHashMismatch { num: ChunkNum },
673        /// Error when reading from the stream
674        #[snafu(display("read: {source}"))]
675        Read { source: io::Error },
676        /// A generic io error
677        #[snafu(display("io: {source}"))]
678        Write { source: io::Error },
679    }
680
681    impl DecodeError {
682        pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
683            decode_error::LeafHashMismatchSnafu { num }.build()
684        }
685    }
686
687    impl From<AtBlobHeaderNextError> for DecodeError {
688        fn from(cause: AtBlobHeaderNextError) -> Self {
689            match cause {
690                AtBlobHeaderNextError::NotFound { .. } => decode_error::ChunkNotFoundSnafu.build(),
691                AtBlobHeaderNextError::Read { source, .. } => {
692                    decode_error::ReadSnafu.into_error(source)
693                }
694            }
695        }
696    }
697
698    impl From<DecodeError> for io::Error {
699        fn from(cause: DecodeError) -> Self {
700            match cause {
701                DecodeError::ParentNotFound { .. } => {
702                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
703                }
704                DecodeError::LeafNotFound { .. } => {
705                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
706                }
707                DecodeError::Read { source, .. } => source,
708                DecodeError::Write { source, .. } => source,
709                _ => io::Error::other(cause),
710            }
711        }
712    }
713
714    impl From<bao_tree::io::DecodeError> for DecodeError {
715        fn from(value: bao_tree::io::DecodeError) -> Self {
716            match value {
717                bao_tree::io::DecodeError::ParentNotFound(node) => {
718                    decode_error::ParentNotFoundSnafu { node }.build()
719                }
720                bao_tree::io::DecodeError::LeafNotFound(num) => {
721                    decode_error::LeafNotFoundSnafu { num }.build()
722                }
723                bao_tree::io::DecodeError::ParentHashMismatch(node) => {
724                    decode_error::ParentHashMismatchSnafu { node }.build()
725                }
726                bao_tree::io::DecodeError::LeafHashMismatch(num) => {
727                    decode_error::LeafHashMismatchSnafu { num }.build()
728                }
729                bao_tree::io::DecodeError::Io(cause) => decode_error::ReadSnafu.into_error(cause),
730            }
731        }
732    }
733
734    /// The next state after reading a content item
735    #[derive(Debug, From)]
736    pub enum BlobContentNext<R: RecvStream> {
737        /// We expect more content
738        More(
739            (
740                AtBlobContent<R>,
741                result::Result<BaoContentItem, DecodeError>,
742            ),
743        ),
744        /// We are done with this blob
745        Done(AtEndBlob<R>),
746    }
747
748    impl<R: RecvStream> AtBlobContent<R> {
749        /// Read the next item, either content, an error, or the end of the blob
750        pub async fn next(self) -> BlobContentNext<R> {
751            match self.stream.next().await {
752                ResponseDecoderNext::More((stream, res)) => {
753                    let mut next = Self { stream, ..self };
754                    let res = res.map_err(DecodeError::from);
755                    match &res {
756                        Ok(BaoContentItem::Parent(_)) => {
757                            next.misc.other_bytes_read += 64;
758                        }
759                        Ok(BaoContentItem::Leaf(leaf)) => {
760                            next.misc.payload_bytes_read += leaf.data.len() as u64;
761                        }
762                        _ => {}
763                    }
764                    BlobContentNext::More((next, res))
765                }
766                ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
767                    stream: stream.into_inner(),
768                    misc: self.misc,
769                }),
770            }
771        }
772
773        /// The geometry of the tree we are currently reading.
774        pub fn tree(&self) -> bao_tree::BaoTree {
775            self.stream.tree()
776        }
777
778        /// The hash of the blob we are reading.
779        pub fn hash(&self) -> Hash {
780            (*self.stream.hash()).into()
781        }
782
783        /// The current offset of the blob we are reading.
784        pub fn offset(&self) -> u64 {
785            self.misc.ranges_iter.offset()
786        }
787
788        /// Current stats
789        pub fn stats(&self) -> Stats {
790            Stats {
791                counters: self.misc.counters,
792                elapsed: self.misc.start.elapsed(),
793            }
794        }
795
796        /// Drain the response and throw away the result
797        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
798            let mut content = self;
799            loop {
800                match content.next().await {
801                    BlobContentNext::More((content1, res)) => {
802                        let _ = res?;
803                        content = content1;
804                    }
805                    BlobContentNext::Done(end) => {
806                        break Ok(end);
807                    }
808                }
809            }
810        }
811
812        /// Concatenate the entire response into a vec
813        pub async fn concatenate_into_vec(
814            self,
815        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
816            let mut res = Vec::with_capacity(1024);
817            let mut curr = self;
818            let done = loop {
819                match curr.next().await {
820                    BlobContentNext::More((next, data)) => {
821                        if let BaoContentItem::Leaf(leaf) = data? {
822                            res.extend_from_slice(&leaf.data);
823                        }
824                        curr = next;
825                    }
826                    BlobContentNext::Done(done) => {
827                        // we are done with the root blob
828                        break done;
829                    }
830                }
831            };
832            Ok((done, res))
833        }
834
835        /// Write the entire blob to a slice writer and to an optional outboard.
836        ///
837        /// The outboard is only written to if the blob is larger than a single
838        /// chunk group.
839        pub async fn write_all_with_outboard<D, O>(
840            self,
841            mut outboard: Option<O>,
842            mut data: D,
843        ) -> result::Result<AtEndBlob<R>, DecodeError>
844        where
845            D: AsyncSliceWriter,
846            O: OutboardMut,
847        {
848            let mut content = self;
849            loop {
850                match content.next().await {
851                    BlobContentNext::More((content1, item)) => {
852                        content = content1;
853                        match item? {
854                            BaoContentItem::Parent(parent) => {
855                                if let Some(outboard) = outboard.as_mut() {
856                                    outboard
857                                        .save(parent.node, &parent.pair)
858                                        .await
859                                        .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
860                                }
861                            }
862                            BaoContentItem::Leaf(leaf) => {
863                                data.write_bytes_at(leaf.offset, leaf.data)
864                                    .await
865                                    .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
866                            }
867                        }
868                    }
869                    BlobContentNext::Done(end) => {
870                        return Ok(end);
871                    }
872                }
873            }
874        }
875
876        /// Write the entire blob to a slice writer.
877        pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob<R>, DecodeError>
878        where
879            D: AsyncSliceWriter,
880        {
881            let mut content = self;
882            loop {
883                match content.next().await {
884                    BlobContentNext::More((content1, item)) => {
885                        content = content1;
886                        match item? {
887                            BaoContentItem::Parent(_) => {}
888                            BaoContentItem::Leaf(leaf) => {
889                                data.write_bytes_at(leaf.offset, leaf.data)
890                                    .await
891                                    .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
892                            }
893                        }
894                    }
895                    BlobContentNext::Done(end) => {
896                        return Ok(end);
897                    }
898                }
899            }
900        }
901
902        /// Immediately finish the get response without reading further
903        pub fn finish(self) -> AtClosing<R> {
904            AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
905        }
906    }
907
908    /// State after we have read all the content for a blob
909    #[derive(Debug)]
910    pub struct AtEndBlob<R: RecvStream = DefaultReader> {
911        stream: R,
912        misc: Box<Misc>,
913    }
914
915    /// The next state after the end of a blob
916    #[derive(Debug, From)]
917    pub enum EndBlobNext<R: RecvStream = DefaultReader> {
918        /// Response is expected to have more children
919        MoreChildren(AtStartChild<R>),
920        /// No more children expected
921        Closing(AtClosing<R>),
922    }
923
924    impl<R: RecvStream> AtEndBlob<R> {
925        /// Read the next child, or finish
926        pub fn next(mut self) -> EndBlobNext<R> {
927            if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
928                AtStartChild {
929                    reader: self.stream,
930                    offset,
931                    ranges,
932                    misc: self.misc,
933                }
934                .into()
935            } else {
936                AtClosing::new(self.misc, self.stream, true).into()
937            }
938        }
939    }
940
941    /// State when finishing the get response
942    #[derive(Debug)]
943    pub struct AtClosing<R: RecvStream = DefaultReader> {
944        misc: Box<Misc>,
945        reader: R,
946        check_extra_data: bool,
947    }
948
949    impl<R: RecvStream> AtClosing<R> {
950        fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
951            Self {
952                misc,
953                reader,
954                check_extra_data,
955            }
956        }
957
958        /// Finish the get response, returning statistics
959        pub async fn next(self) -> result::Result<Stats, AtClosingNextError> {
960            // Shut down the stream
961            let mut reader = self.reader;
962            if self.check_extra_data {
963                let rest = reader.recv_bytes(1).await?;
964                if !rest.is_empty() {
965                    error!("Unexpected extra data at the end of the stream");
966                }
967            }
968            Ok(Stats {
969                counters: self.misc.counters,
970                elapsed: self.misc.start.elapsed(),
971            })
972        }
973    }
974
975    /// Error that you can get from [`AtBlobHeader::next`]
976    #[common_fields({
977        backtrace: Option<Backtrace>,
978        #[snafu(implicit)]
979        span_trace: SpanTrace,
980    })]
981    #[non_exhaustive]
982    #[derive(Debug, Snafu)]
983    #[snafu(module)]
984    pub enum AtClosingNextError {
985        /// Generic io error
986        #[snafu(transparent)]
987        Read { source: io::Error },
988    }
989
990    #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
991    pub struct RequestCounters {
992        /// payload bytes written
993        pub payload_bytes_written: u64,
994        /// request, hash pair and size bytes written
995        pub other_bytes_written: u64,
996        /// payload bytes read
997        pub payload_bytes_read: u64,
998        /// hash pair and size bytes read
999        pub other_bytes_read: u64,
1000    }
1001
1002    /// Stuff we need to hold on to while going through the machine states
1003    #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
1004    struct Misc {
1005        /// start time for statistics
1006        start: Instant,
1007        /// counters
1008        #[deref]
1009        #[deref_mut]
1010        counters: RequestCounters,
1011        /// iterator over the ranges of the collection and the children
1012        ranges_iter: RangesIter,
1013    }
1014}