iroh_blobs/
get.rs

1//! The low level client side API
2//!
3//! Note that while using this API directly is fine, a simpler way to get data
4//! to a store is to use the [`crate::api::remote`] API, in particular the
5//! [`crate::api::remote::Remote::fetch`] function to download data to your
6//! local store.
7//!
8//! To get data, create a connection using an [`iroh::Endpoint`].
9//!
10//! Create a [`crate::protocol::GetRequest`] describing the data you want to get.
11//!
12//! Then create a state machine using [fsm::start] and
13//! drive it to completion by calling next on each state.
14//!
15//! For some states you have to provide additional arguments when calling next,
16//! or you can choose to finish early.
17//!
18//! [iroh]: https://docs.rs/iroh
19use std::{
20    fmt::{self, Debug},
21    time::{Duration, Instant},
22};
23
24use anyhow::Result;
25use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
26use fsm::RequestCounters;
27use n0_snafu::SpanTrace;
28use nested_enum_utils::common_fields;
29use serde::{Deserialize, Serialize};
30use snafu::{Backtrace, IntoError, ResultExt, Snafu};
31use tracing::{debug, error};
32
33use crate::{
34    protocol::ChunkRangesSeq,
35    store::IROH_BLOCK_SIZE,
36    util::{RecvStream, SendStream},
37    Hash,
38};
39
40mod error;
41pub mod request;
42pub(crate) use error::get_error;
43pub use error::{GetError, GetResult};
44
45type DefaultReader = iroh::endpoint::RecvStream;
46type DefaultWriter = iroh::endpoint::SendStream;
47
48pub struct StreamPair<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
49    pub connection_id: u64,
50    pub t0: Instant,
51    pub recv: R,
52    pub send: W,
53}
54
55impl<R: RecvStream, W: SendStream> StreamPair<R, W> {
56    pub fn new(connection_id: u64, recv: R, send: W) -> Self {
57        Self {
58            t0: Instant::now(),
59            recv,
60            send,
61            connection_id,
62        }
63    }
64}
65
66/// Stats about the transfer.
67#[derive(
68    Debug,
69    Default,
70    Clone,
71    PartialEq,
72    Eq,
73    Serialize,
74    Deserialize,
75    derive_more::Deref,
76    derive_more::DerefMut,
77)]
78pub struct Stats {
79    /// Counters
80    #[deref]
81    #[deref_mut]
82    pub counters: RequestCounters,
83    /// The time it took to transfer the data
84    pub elapsed: Duration,
85}
86
87impl Stats {
88    /// Transfer rate in megabits per second
89    pub fn mbits(&self) -> f64 {
90        let data_len_bit = self.total_bytes_read() * 8;
91        data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
92    }
93
94    pub fn total_bytes_read(&self) -> u64 {
95        self.payload_bytes_read + self.other_bytes_read
96    }
97
98    pub fn combine(&mut self, that: &Stats) {
99        self.payload_bytes_written += that.payload_bytes_written;
100        self.other_bytes_written += that.other_bytes_written;
101        self.payload_bytes_read += that.payload_bytes_read;
102        self.other_bytes_read += that.other_bytes_read;
103        self.elapsed += that.elapsed;
104    }
105}
106
107/// Finite state machine for get responses.
108///
109/// This is the low level API for getting data from a peer.
110#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
111pub mod fsm {
112    use std::{io, result};
113
114    use bao_tree::{
115        io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
116        BaoTree, ChunkRanges, TreeNode,
117    };
118    use derive_more::From;
119    use iroh::endpoint::Connection;
120    use iroh_io::AsyncSliceWriter;
121
122    use super::*;
123    use crate::{
124        get::get_error::BadRequestSnafu,
125        protocol::{
126            GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
127        },
128        util::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
129    };
130
131    self_cell::self_cell! {
132        struct RangesIterInner {
133            owner: ChunkRangesSeq,
134            #[not_covariant]
135            dependent: NonEmptyRequestRangeSpecIter,
136        }
137    }
138
139    /// The entry point of the get response machine
140    pub fn start(
141        connection: Connection,
142        request: GetRequest,
143        counters: RequestCounters,
144    ) -> AtInitial {
145        AtInitial::new(connection, request, counters)
146    }
147
148    /// Start with a get many request. Todo: turn this into distinct states.
149    pub async fn start_get_many(
150        connection: Connection,
151        request: GetManyRequest,
152        counters: RequestCounters,
153    ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
154        let start = Instant::now();
155        let (mut writer, reader) = connection
156            .open_bi()
157            .await
158            .map_err(|e| OpenSnafu.into_error(e.into()))?;
159        let request = Request::GetMany(request);
160        let request_bytes = postcard::to_stdvec(&request)
161            .map_err(|source| BadRequestSnafu.into_error(source.into()))?;
162        writer
163            .send_bytes(request_bytes.into())
164            .await
165            .context(connected_next_error::WriteSnafu)?;
166        let Request::GetMany(request) = request else {
167            unreachable!();
168        };
169        let mut ranges_iter = RangesIter::new(request.ranges.clone());
170        let first_item = ranges_iter.next();
171        let misc = Box::new(Misc {
172            counters,
173            start,
174            ranges_iter,
175        });
176        Ok(match first_item {
177            Some((child_offset, child_ranges)) => Ok(AtStartChild {
178                ranges: child_ranges,
179                reader,
180                misc,
181                offset: child_offset,
182            }),
183            None => Err(AtClosing::new(misc, reader, true)),
184        })
185    }
186
187    /// Owned iterator for the ranges in a request
188    ///
189    /// We need an owned iterator for a fsm style API, otherwise we would have
190    /// to drag a lifetime around every single state.
191    struct RangesIter(RangesIterInner);
192
193    impl fmt::Debug for RangesIter {
194        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195            f.debug_struct("RangesIter").finish()
196        }
197    }
198
199    impl RangesIter {
200        pub fn new(owner: ChunkRangesSeq) -> Self {
201            Self(RangesIterInner::new(owner, |owner| {
202                owner.iter_non_empty_infinite()
203            }))
204        }
205
206        pub fn offset(&self) -> u64 {
207            self.0.with_dependent(|_owner, iter| iter.offset())
208        }
209    }
210
211    impl Iterator for RangesIter {
212        type Item = (u64, ChunkRanges);
213
214        fn next(&mut self) -> Option<Self::Item> {
215            self.0.with_dependent_mut(|_owner, iter| {
216                iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
217            })
218        }
219    }
220
221    /// Initial state of the get response machine
222    #[derive(Debug)]
223    pub struct AtInitial {
224        connection: Connection,
225        request: GetRequest,
226        counters: RequestCounters,
227    }
228
229    impl AtInitial {
230        /// Create a new get response
231        ///
232        /// `connection` is an existing connection
233        /// `request` is the request to be sent
234        pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
235            Self {
236                connection,
237                request,
238                counters,
239            }
240        }
241
242        /// Initiate a new bidi stream to use for the get response
243        pub async fn next(self) -> Result<AtConnected, InitialNextError> {
244            let start = Instant::now();
245            let (writer, reader) = self
246                .connection
247                .open_bi()
248                .await
249                .map_err(|e| OpenSnafu.into_error(e.into()))?;
250            Ok(AtConnected {
251                start,
252                reader,
253                writer,
254                request: self.request,
255                counters: self.counters,
256            })
257        }
258    }
259
260    /// Error that you can get from [`AtConnected::next`]
261    #[common_fields({
262        backtrace: Option<Backtrace>,
263        #[snafu(implicit)]
264        span_trace: SpanTrace,
265    })]
266    #[allow(missing_docs)]
267    #[derive(Debug, Snafu)]
268    #[non_exhaustive]
269    pub enum InitialNextError {
270        Open { source: io::Error },
271    }
272
273    /// State of the get response machine after the handshake has been sent
274    #[derive(Debug)]
275    pub struct AtConnected<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
276        start: Instant,
277        reader: R,
278        writer: W,
279        request: GetRequest,
280        counters: RequestCounters,
281    }
282
283    /// Possible next states after the handshake has been sent
284    #[derive(Debug, From)]
285    pub enum ConnectedNext<R: RecvStream = DefaultReader> {
286        /// First response is either a collection or a single blob
287        StartRoot(AtStartRoot<R>),
288        /// First response is a child
289        StartChild(AtStartChild<R>),
290        /// Request is empty
291        Closing(AtClosing<R>),
292    }
293
294    /// Error that you can get from [`AtConnected::next`]
295    #[common_fields({
296        backtrace: Option<Backtrace>,
297        #[snafu(implicit)]
298        span_trace: SpanTrace,
299    })]
300    #[allow(missing_docs)]
301    #[derive(Debug, Snafu)]
302    #[snafu(module)]
303    #[non_exhaustive]
304    pub enum ConnectedNextError {
305        /// Error when serializing the request
306        #[snafu(display("postcard ser: {source}"))]
307        PostcardSer { source: postcard::Error },
308        /// The serialized request is too long to be sent
309        #[snafu(display("request too big"))]
310        RequestTooBig {},
311        /// Error when writing the request to the [`SendStream`].
312        #[snafu(display("write: {source}"))]
313        Write { source: io::Error },
314    }
315
316    impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
317        pub fn new(
318            start: Instant,
319            reader: R,
320            writer: W,
321            request: GetRequest,
322            counters: RequestCounters,
323        ) -> Self {
324            Self {
325                start,
326                reader,
327                writer,
328                request,
329                counters,
330            }
331        }
332
333        /// Send the request and move to the next state
334        ///
335        /// The next state will be either `StartRoot` or `StartChild` depending on whether
336        /// the request requests part of the collection or not.
337        ///
338        /// If the request is empty, this can also move directly to `Finished`.
339        pub async fn next(self) -> Result<ConnectedNext<R>, ConnectedNextError> {
340            let Self {
341                start,
342                reader,
343                mut writer,
344                mut request,
345                mut counters,
346            } = self;
347            // 1. Send Request
348            counters.other_bytes_written += {
349                debug!("sending request");
350                let wrapped = Request::Get(request);
351                let request_bytes = postcard::to_stdvec(&wrapped)
352                    .context(connected_next_error::PostcardSerSnafu)?;
353                let Request::Get(x) = wrapped else {
354                    unreachable!();
355                };
356                request = x;
357
358                if request_bytes.len() > MAX_MESSAGE_SIZE {
359                    return Err(connected_next_error::RequestTooBigSnafu.build());
360                }
361
362                // write the request itself
363                let len = request_bytes.len() as u64;
364                writer
365                    .send_bytes(request_bytes.into())
366                    .await
367                    .context(connected_next_error::WriteSnafu)?;
368                writer
369                    .sync()
370                    .await
371                    .context(connected_next_error::WriteSnafu)?;
372                len
373            };
374
375            // 2. Finish writing before expecting a response
376            drop(writer);
377
378            let hash = request.hash;
379            let ranges_iter = RangesIter::new(request.ranges);
380            // this is in a box so we don't have to memcpy it on every state transition
381            let mut misc = Box::new(Misc {
382                counters,
383                start,
384                ranges_iter,
385            });
386            Ok(match misc.ranges_iter.next() {
387                Some((offset, ranges)) => {
388                    if offset == 0 {
389                        AtStartRoot {
390                            reader,
391                            ranges,
392                            misc,
393                            hash,
394                        }
395                        .into()
396                    } else {
397                        AtStartChild {
398                            reader,
399                            ranges,
400                            misc,
401                            offset,
402                        }
403                        .into()
404                    }
405                }
406                None => AtClosing::new(misc, reader, true).into(),
407            })
408        }
409    }
410
411    /// State of the get response when we start reading a collection
412    #[derive(Debug)]
413    pub struct AtStartRoot<R: RecvStream = DefaultReader> {
414        ranges: ChunkRanges,
415        reader: R,
416        misc: Box<Misc>,
417        hash: Hash,
418    }
419
420    /// State of the get response when we start reading a child
421    #[derive(Debug)]
422    pub struct AtStartChild<R: RecvStream = DefaultReader> {
423        ranges: ChunkRanges,
424        reader: R,
425        misc: Box<Misc>,
426        offset: u64,
427    }
428
429    impl<R: RecvStream> AtStartChild<R> {
430        /// The offset of the child we are currently reading
431        ///
432        /// This must be used to determine the hash needed to call next.
433        /// If this is larger than the number of children in the collection,
434        /// you can call finish to stop reading the response.
435        pub fn offset(&self) -> u64 {
436            self.offset
437        }
438
439        /// The ranges we have requested for the child
440        pub fn ranges(&self) -> &ChunkRanges {
441            &self.ranges
442        }
443
444        /// Go into the next state, reading the header
445        ///
446        /// This requires passing in the hash of the child for validation
447        pub fn next(self, hash: Hash) -> AtBlobHeader<R> {
448            AtBlobHeader {
449                reader: self.reader,
450                ranges: self.ranges,
451                misc: self.misc,
452                hash,
453            }
454        }
455
456        /// Finish the get response without reading further
457        ///
458        /// This is used if you know that there are no more children from having
459        /// read the collection, or when you want to stop reading the response
460        /// early.
461        pub fn finish(self) -> AtClosing<R> {
462            AtClosing::new(self.misc, self.reader, false)
463        }
464    }
465
466    impl<R: RecvStream> AtStartRoot<R> {
467        /// The ranges we have requested for the child
468        pub fn ranges(&self) -> &ChunkRanges {
469            &self.ranges
470        }
471
472        /// Hash of the root blob
473        pub fn hash(&self) -> Hash {
474            self.hash
475        }
476
477        /// Go into the next state, reading the header
478        ///
479        /// For the collection we already know the hash, since it was part of the request
480        pub fn next(self) -> AtBlobHeader<R> {
481            AtBlobHeader {
482                reader: self.reader,
483                ranges: self.ranges,
484                hash: self.hash,
485                misc: self.misc,
486            }
487        }
488
489        /// Finish the get response without reading further
490        pub fn finish(self) -> AtClosing<R> {
491            AtClosing::new(self.misc, self.reader, false)
492        }
493    }
494
495    /// State before reading a size header
496    #[derive(Debug)]
497    pub struct AtBlobHeader<R: RecvStream = DefaultReader> {
498        ranges: ChunkRanges,
499        reader: R,
500        misc: Box<Misc>,
501        hash: Hash,
502    }
503
504    /// Error that you can get from [`AtBlobHeader::next`]
505    #[common_fields({
506        backtrace: Option<Backtrace>,
507        #[snafu(implicit)]
508        span_trace: SpanTrace,
509    })]
510    #[non_exhaustive]
511    #[derive(Debug, Snafu)]
512    #[snafu(module)]
513    pub enum AtBlobHeaderNextError {
514        /// Eof when reading the size header
515        ///
516        /// This indicates that the provider does not have the requested data.
517        #[snafu(display("not found"))]
518        NotFound {},
519        /// Generic io error
520        #[snafu(display("io: {source}"))]
521        Read { source: io::Error },
522    }
523
524    impl From<AtBlobHeaderNextError> for io::Error {
525        fn from(cause: AtBlobHeaderNextError) -> Self {
526            match cause {
527                AtBlobHeaderNextError::NotFound { .. } => {
528                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
529                }
530                AtBlobHeaderNextError::Read { source, .. } => source,
531            }
532        }
533    }
534
535    impl<R: RecvStream> AtBlobHeader<R> {
536        /// Read the size header, returning it and going into the `Content` state.
537        pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
538            let mut size = [0; 8];
539            self.reader.recv_exact(&mut size).await.map_err(|cause| {
540                if cause.kind() == io::ErrorKind::UnexpectedEof {
541                    at_blob_header_next_error::NotFoundSnafu.build()
542                } else {
543                    at_blob_header_next_error::ReadSnafu.into_error(cause)
544                }
545            })?;
546            self.misc.other_bytes_read += 8;
547            let size = u64::from_le_bytes(size);
548            let stream = ResponseDecoder::new(
549                self.hash.into(),
550                self.ranges,
551                BaoTree::new(size, IROH_BLOCK_SIZE),
552                RecvStreamAsyncStreamReader::new(self.reader),
553            );
554            Ok((
555                AtBlobContent {
556                    stream,
557                    misc: self.misc,
558                },
559                size,
560            ))
561        }
562
563        /// Drain the response and throw away the result
564        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
565            let (content, _size) = self.next().await?;
566            content.drain().await
567        }
568
569        /// Concatenate the entire response into a vec
570        ///
571        /// For a request that does not request the complete blob, this will just
572        /// concatenate the ranges that were requested.
573        pub async fn concatenate_into_vec(
574            self,
575        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
576            let (content, _size) = self.next().await?;
577            content.concatenate_into_vec().await
578        }
579
580        /// Write the entire blob to a slice writer.
581        pub async fn write_all<D: AsyncSliceWriter>(
582            self,
583            data: D,
584        ) -> result::Result<AtEndBlob<R>, DecodeError> {
585            let (content, _size) = self.next().await?;
586            let res = content.write_all(data).await?;
587            Ok(res)
588        }
589
590        /// Write the entire blob to a slice writer and to an optional outboard.
591        ///
592        /// The outboard is only written to if the blob is larger than a single
593        /// chunk group.
594        pub async fn write_all_with_outboard<D, O>(
595            self,
596            outboard: Option<O>,
597            data: D,
598        ) -> result::Result<AtEndBlob<R>, DecodeError>
599        where
600            D: AsyncSliceWriter,
601            O: OutboardMut,
602        {
603            let (content, _size) = self.next().await?;
604            let res = content.write_all_with_outboard(outboard, data).await?;
605            Ok(res)
606        }
607
608        /// The hash of the blob we are reading.
609        pub fn hash(&self) -> Hash {
610            self.hash
611        }
612
613        /// The ranges we have requested for the current hash.
614        pub fn ranges(&self) -> &ChunkRanges {
615            &self.ranges
616        }
617
618        /// The current offset of the blob we are reading.
619        pub fn offset(&self) -> u64 {
620            self.misc.ranges_iter.offset()
621        }
622    }
623
624    /// State while we are reading content
625    #[derive(Debug)]
626    pub struct AtBlobContent<R: RecvStream = DefaultReader> {
627        stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
628        misc: Box<Misc>,
629    }
630
631    /// Decode error that you can get once you have sent the request and are
632    /// decoding the response, e.g. from [`AtBlobContent::next`].
633    ///
634    /// This is similar to [`bao_tree::io::DecodeError`], but takes into account
635    /// that we are reading from a [`RecvStream`], so read errors will be
636    /// propagated as [`DecodeError::Read`], containing a [`ReadError`].
637    /// This carries more concrete information about the error than an [`io::Error`].
638    ///
639    /// When the provider finds that it does not have a chunk that we requested,
640    /// or that the chunk is invalid, it will stop sending data without producing
641    /// an error. This is indicated by either the [`DecodeError::ParentNotFound`] or
642    /// [`DecodeError::LeafNotFound`] variant, which can be used to detect that data
643    /// is missing but the connection as well that the provider is otherwise healthy.
644    ///
645    /// The [`DecodeError::ParentHashMismatch`] and [`DecodeError::LeafHashMismatch`]
646    /// variants indicate that the provider has sent us invalid data. A well-behaved
647    /// provider should never do this, so this is an indication that the provider is
648    /// not behaving correctly.
649    ///
650    /// The [`DecodeError::DecodeIo`] variant is just a fallback for any other io error that
651    /// is not actually a [`DecodeError::Read`].
652    ///
653    /// [`ReadError`]: endpoint::ReadError
654    #[common_fields({
655        backtrace: Option<Backtrace>,
656        #[snafu(implicit)]
657        span_trace: SpanTrace,
658    })]
659    #[non_exhaustive]
660    #[derive(Debug, Snafu)]
661    #[snafu(module)]
662    pub enum DecodeError {
663        /// A chunk was not found or invalid, so the provider stopped sending data
664        #[snafu(display("not found"))]
665        ChunkNotFound {},
666        /// A parent was not found or invalid, so the provider stopped sending data
667        #[snafu(display("parent not found {node:?}"))]
668        ParentNotFound { node: TreeNode },
669        /// A parent was not found or invalid, so the provider stopped sending data
670        #[snafu(display("chunk not found {num}"))]
671        LeafNotFound { num: ChunkNum },
672        /// The hash of a parent did not match the expected hash
673        #[snafu(display("parent hash mismatch: {node:?}"))]
674        ParentHashMismatch { node: TreeNode },
675        /// The hash of a leaf did not match the expected hash
676        #[snafu(display("leaf hash mismatch: {num}"))]
677        LeafHashMismatch { num: ChunkNum },
678        /// Error when reading from the stream
679        #[snafu(display("read: {source}"))]
680        Read { source: io::Error },
681        /// A generic io error
682        #[snafu(display("io: {source}"))]
683        Write { source: io::Error },
684    }
685
686    impl DecodeError {
687        pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
688            decode_error::LeafHashMismatchSnafu { num }.build()
689        }
690    }
691
692    impl From<AtBlobHeaderNextError> for DecodeError {
693        fn from(cause: AtBlobHeaderNextError) -> Self {
694            match cause {
695                AtBlobHeaderNextError::NotFound { .. } => decode_error::ChunkNotFoundSnafu.build(),
696                AtBlobHeaderNextError::Read { source, .. } => {
697                    decode_error::ReadSnafu.into_error(source)
698                }
699            }
700        }
701    }
702
703    impl From<DecodeError> for io::Error {
704        fn from(cause: DecodeError) -> Self {
705            match cause {
706                DecodeError::ParentNotFound { .. } => {
707                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
708                }
709                DecodeError::LeafNotFound { .. } => {
710                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
711                }
712                DecodeError::Read { source, .. } => source,
713                DecodeError::Write { source, .. } => source,
714                _ => io::Error::other(cause),
715            }
716        }
717    }
718
719    impl From<bao_tree::io::DecodeError> for DecodeError {
720        fn from(value: bao_tree::io::DecodeError) -> Self {
721            match value {
722                bao_tree::io::DecodeError::ParentNotFound(node) => {
723                    decode_error::ParentNotFoundSnafu { node }.build()
724                }
725                bao_tree::io::DecodeError::LeafNotFound(num) => {
726                    decode_error::LeafNotFoundSnafu { num }.build()
727                }
728                bao_tree::io::DecodeError::ParentHashMismatch(node) => {
729                    decode_error::ParentHashMismatchSnafu { node }.build()
730                }
731                bao_tree::io::DecodeError::LeafHashMismatch(num) => {
732                    decode_error::LeafHashMismatchSnafu { num }.build()
733                }
734                bao_tree::io::DecodeError::Io(cause) => decode_error::ReadSnafu.into_error(cause),
735            }
736        }
737    }
738
739    /// The next state after reading a content item
740    #[derive(Debug, From)]
741    pub enum BlobContentNext<R: RecvStream> {
742        /// We expect more content
743        More(
744            (
745                AtBlobContent<R>,
746                result::Result<BaoContentItem, DecodeError>,
747            ),
748        ),
749        /// We are done with this blob
750        Done(AtEndBlob<R>),
751    }
752
753    impl<R: RecvStream> AtBlobContent<R> {
754        /// Read the next item, either content, an error, or the end of the blob
755        pub async fn next(self) -> BlobContentNext<R> {
756            match self.stream.next().await {
757                ResponseDecoderNext::More((stream, res)) => {
758                    let mut next = Self { stream, ..self };
759                    let res = res.map_err(DecodeError::from);
760                    match &res {
761                        Ok(BaoContentItem::Parent(_)) => {
762                            next.misc.other_bytes_read += 64;
763                        }
764                        Ok(BaoContentItem::Leaf(leaf)) => {
765                            next.misc.payload_bytes_read += leaf.data.len() as u64;
766                        }
767                        _ => {}
768                    }
769                    BlobContentNext::More((next, res))
770                }
771                ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
772                    stream: stream.into_inner(),
773                    misc: self.misc,
774                }),
775            }
776        }
777
778        /// The geometry of the tree we are currently reading.
779        pub fn tree(&self) -> bao_tree::BaoTree {
780            self.stream.tree()
781        }
782
783        /// The hash of the blob we are reading.
784        pub fn hash(&self) -> Hash {
785            (*self.stream.hash()).into()
786        }
787
788        /// The current offset of the blob we are reading.
789        pub fn offset(&self) -> u64 {
790            self.misc.ranges_iter.offset()
791        }
792
793        /// Current stats
794        pub fn stats(&self) -> Stats {
795            Stats {
796                counters: self.misc.counters,
797                elapsed: self.misc.start.elapsed(),
798            }
799        }
800
801        /// Drain the response and throw away the result
802        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
803            let mut content = self;
804            loop {
805                match content.next().await {
806                    BlobContentNext::More((content1, res)) => {
807                        let _ = res?;
808                        content = content1;
809                    }
810                    BlobContentNext::Done(end) => {
811                        break Ok(end);
812                    }
813                }
814            }
815        }
816
817        /// Concatenate the entire response into a vec
818        pub async fn concatenate_into_vec(
819            self,
820        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
821            let mut res = Vec::with_capacity(1024);
822            let mut curr = self;
823            let done = loop {
824                match curr.next().await {
825                    BlobContentNext::More((next, data)) => {
826                        if let BaoContentItem::Leaf(leaf) = data? {
827                            res.extend_from_slice(&leaf.data);
828                        }
829                        curr = next;
830                    }
831                    BlobContentNext::Done(done) => {
832                        // we are done with the root blob
833                        break done;
834                    }
835                }
836            };
837            Ok((done, res))
838        }
839
840        /// Write the entire blob to a slice writer and to an optional outboard.
841        ///
842        /// The outboard is only written to if the blob is larger than a single
843        /// chunk group.
844        pub async fn write_all_with_outboard<D, O>(
845            self,
846            mut outboard: Option<O>,
847            mut data: D,
848        ) -> result::Result<AtEndBlob<R>, DecodeError>
849        where
850            D: AsyncSliceWriter,
851            O: OutboardMut,
852        {
853            let mut content = self;
854            loop {
855                match content.next().await {
856                    BlobContentNext::More((content1, item)) => {
857                        content = content1;
858                        match item? {
859                            BaoContentItem::Parent(parent) => {
860                                if let Some(outboard) = outboard.as_mut() {
861                                    outboard
862                                        .save(parent.node, &parent.pair)
863                                        .await
864                                        .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
865                                }
866                            }
867                            BaoContentItem::Leaf(leaf) => {
868                                data.write_bytes_at(leaf.offset, leaf.data)
869                                    .await
870                                    .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
871                            }
872                        }
873                    }
874                    BlobContentNext::Done(end) => {
875                        return Ok(end);
876                    }
877                }
878            }
879        }
880
881        /// Write the entire blob to a slice writer.
882        pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob<R>, DecodeError>
883        where
884            D: AsyncSliceWriter,
885        {
886            let mut content = self;
887            loop {
888                match content.next().await {
889                    BlobContentNext::More((content1, item)) => {
890                        content = content1;
891                        match item? {
892                            BaoContentItem::Parent(_) => {}
893                            BaoContentItem::Leaf(leaf) => {
894                                data.write_bytes_at(leaf.offset, leaf.data)
895                                    .await
896                                    .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
897                            }
898                        }
899                    }
900                    BlobContentNext::Done(end) => {
901                        return Ok(end);
902                    }
903                }
904            }
905        }
906
907        /// Immediately finish the get response without reading further
908        pub fn finish(self) -> AtClosing<R> {
909            AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
910        }
911    }
912
913    /// State after we have read all the content for a blob
914    #[derive(Debug)]
915    pub struct AtEndBlob<R: RecvStream = DefaultReader> {
916        stream: R,
917        misc: Box<Misc>,
918    }
919
920    /// The next state after the end of a blob
921    #[derive(Debug, From)]
922    pub enum EndBlobNext<R: RecvStream = DefaultReader> {
923        /// Response is expected to have more children
924        MoreChildren(AtStartChild<R>),
925        /// No more children expected
926        Closing(AtClosing<R>),
927    }
928
929    impl<R: RecvStream> AtEndBlob<R> {
930        /// Read the next child, or finish
931        pub fn next(mut self) -> EndBlobNext<R> {
932            if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
933                AtStartChild {
934                    reader: self.stream,
935                    offset,
936                    ranges,
937                    misc: self.misc,
938                }
939                .into()
940            } else {
941                AtClosing::new(self.misc, self.stream, true).into()
942            }
943        }
944    }
945
946    /// State when finishing the get response
947    #[derive(Debug)]
948    pub struct AtClosing<R: RecvStream = DefaultReader> {
949        misc: Box<Misc>,
950        reader: R,
951        check_extra_data: bool,
952    }
953
954    impl<R: RecvStream> AtClosing<R> {
955        fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
956            Self {
957                misc,
958                reader,
959                check_extra_data,
960            }
961        }
962
963        /// Finish the get response, returning statistics
964        pub async fn next(self) -> result::Result<Stats, AtClosingNextError> {
965            // Shut down the stream
966            let mut reader = self.reader;
967            if self.check_extra_data {
968                let rest = reader.recv_bytes(1).await?;
969                if !rest.is_empty() {
970                    error!("Unexpected extra data at the end of the stream");
971                }
972            }
973            Ok(Stats {
974                counters: self.misc.counters,
975                elapsed: self.misc.start.elapsed(),
976            })
977        }
978    }
979
980    /// Error that you can get from [`AtBlobHeader::next`]
981    #[common_fields({
982        backtrace: Option<Backtrace>,
983        #[snafu(implicit)]
984        span_trace: SpanTrace,
985    })]
986    #[non_exhaustive]
987    #[derive(Debug, Snafu)]
988    #[snafu(module)]
989    pub enum AtClosingNextError {
990        /// Generic io error
991        #[snafu(transparent)]
992        Read { source: io::Error },
993    }
994
995    #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
996    pub struct RequestCounters {
997        /// payload bytes written
998        pub payload_bytes_written: u64,
999        /// request, hash pair and size bytes written
1000        pub other_bytes_written: u64,
1001        /// payload bytes read
1002        pub payload_bytes_read: u64,
1003        /// hash pair and size bytes read
1004        pub other_bytes_read: u64,
1005    }
1006
1007    /// Stuff we need to hold on to while going through the machine states
1008    #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
1009    struct Misc {
1010        /// start time for statistics
1011        start: Instant,
1012        /// counters
1013        #[deref]
1014        #[deref_mut]
1015        counters: RequestCounters,
1016        /// iterator over the ranges of the collection and the children
1017        ranges_iter: RangesIter,
1018    }
1019}