iroh_blobs/
get.rs

1//! The low level client side API
2//!
3//! Note that while using this API directly is fine, a simpler way to get data
4//! to a store is to use the [`crate::api::remote`] API, in particular the
5//! [`crate::api::remote::Remote::fetch`] function to download data to your
6//! local store.
7//!
8//! To get data, create a connection using an [`iroh::Endpoint`].
9//!
10//! Create a [`crate::protocol::GetRequest`] describing the data you want to get.
11//!
12//! Then create a state machine using [fsm::start] and
13//! drive it to completion by calling next on each state.
14//!
15//! For some states you have to provide additional arguments when calling next,
16//! or you can choose to finish early.
17//!
18//! [iroh]: https://docs.rs/iroh
19use std::fmt::{self, Debug};
20
21use anyhow::Result;
22use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
23use fsm::RequestCounters;
24use n0_future::time::{Duration, Instant};
25use n0_snafu::SpanTrace;
26use nested_enum_utils::common_fields;
27use serde::{Deserialize, Serialize};
28use snafu::{Backtrace, IntoError, ResultExt, Snafu};
29use tracing::{debug, error};
30
31use crate::{
32    protocol::ChunkRangesSeq,
33    store::IROH_BLOCK_SIZE,
34    util::{RecvStream, SendStream},
35    Hash,
36};
37
38mod error;
39pub mod request;
40pub(crate) use error::get_error;
41pub use error::{GetError, GetResult};
42
43type DefaultReader = iroh::endpoint::RecvStream;
44type DefaultWriter = iroh::endpoint::SendStream;
45
46pub struct StreamPair<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
47    pub connection_id: u64,
48    pub t0: Instant,
49    pub recv: R,
50    pub send: W,
51}
52
53impl<R: RecvStream, W: SendStream> StreamPair<R, W> {
54    pub fn new(connection_id: u64, recv: R, send: W) -> Self {
55        Self {
56            t0: Instant::now(),
57            recv,
58            send,
59            connection_id,
60        }
61    }
62}
63
64/// Stats about the transfer.
65#[derive(
66    Debug,
67    Default,
68    Clone,
69    PartialEq,
70    Eq,
71    Serialize,
72    Deserialize,
73    derive_more::Deref,
74    derive_more::DerefMut,
75)]
76pub struct Stats {
77    /// Counters
78    #[deref]
79    #[deref_mut]
80    pub counters: RequestCounters,
81    /// The time it took to transfer the data
82    pub elapsed: Duration,
83}
84
85impl Stats {
86    /// Transfer rate in megabits per second
87    pub fn mbits(&self) -> f64 {
88        let data_len_bit = self.total_bytes_read() * 8;
89        data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
90    }
91
92    pub fn total_bytes_read(&self) -> u64 {
93        self.payload_bytes_read + self.other_bytes_read
94    }
95
96    pub fn combine(&mut self, that: &Stats) {
97        self.payload_bytes_written += that.payload_bytes_written;
98        self.other_bytes_written += that.other_bytes_written;
99        self.payload_bytes_read += that.payload_bytes_read;
100        self.other_bytes_read += that.other_bytes_read;
101        self.elapsed += that.elapsed;
102    }
103}
104
105/// Finite state machine for get responses.
106///
107/// This is the low level API for getting data from a peer.
108#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
109pub mod fsm {
110    use std::{io, result};
111
112    use bao_tree::{
113        io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
114        BaoTree, ChunkRanges, TreeNode,
115    };
116    use derive_more::From;
117    use iroh::endpoint::Connection;
118    use iroh_io::AsyncSliceWriter;
119
120    use super::*;
121    use crate::{
122        get::get_error::BadRequestSnafu,
123        protocol::{
124            GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
125        },
126        util::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
127    };
128
129    self_cell::self_cell! {
130        struct RangesIterInner {
131            owner: ChunkRangesSeq,
132            #[not_covariant]
133            dependent: NonEmptyRequestRangeSpecIter,
134        }
135    }
136
137    /// The entry point of the get response machine
138    pub fn start(
139        connection: Connection,
140        request: GetRequest,
141        counters: RequestCounters,
142    ) -> AtInitial {
143        AtInitial::new(connection, request, counters)
144    }
145
146    /// Start with a get many request. Todo: turn this into distinct states.
147    pub async fn start_get_many(
148        connection: Connection,
149        request: GetManyRequest,
150        counters: RequestCounters,
151    ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
152        let start = Instant::now();
153        let (mut writer, reader) = connection
154            .open_bi()
155            .await
156            .map_err(|e| OpenSnafu.into_error(e.into()))?;
157        let request = Request::GetMany(request);
158        let request_bytes = postcard::to_stdvec(&request)
159            .map_err(|source| BadRequestSnafu.into_error(source.into()))?;
160        writer
161            .send_bytes(request_bytes.into())
162            .await
163            .context(connected_next_error::WriteSnafu)?;
164        let Request::GetMany(request) = request else {
165            unreachable!();
166        };
167        let mut ranges_iter = RangesIter::new(request.ranges.clone());
168        let first_item = ranges_iter.next();
169        let misc = Box::new(Misc {
170            counters,
171            start,
172            ranges_iter,
173        });
174        Ok(match first_item {
175            Some((child_offset, child_ranges)) => Ok(AtStartChild {
176                ranges: child_ranges,
177                reader,
178                misc,
179                offset: child_offset,
180            }),
181            None => Err(AtClosing::new(misc, reader, true)),
182        })
183    }
184
185    /// Owned iterator for the ranges in a request
186    ///
187    /// We need an owned iterator for a fsm style API, otherwise we would have
188    /// to drag a lifetime around every single state.
189    struct RangesIter(RangesIterInner);
190
191    impl fmt::Debug for RangesIter {
192        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193            f.debug_struct("RangesIter").finish()
194        }
195    }
196
197    impl RangesIter {
198        pub fn new(owner: ChunkRangesSeq) -> Self {
199            Self(RangesIterInner::new(owner, |owner| {
200                owner.iter_non_empty_infinite()
201            }))
202        }
203
204        pub fn offset(&self) -> u64 {
205            self.0.with_dependent(|_owner, iter| iter.offset())
206        }
207    }
208
209    impl Iterator for RangesIter {
210        type Item = (u64, ChunkRanges);
211
212        fn next(&mut self) -> Option<Self::Item> {
213            self.0.with_dependent_mut(|_owner, iter| {
214                iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
215            })
216        }
217    }
218
219    /// Initial state of the get response machine
220    #[derive(Debug)]
221    pub struct AtInitial {
222        connection: Connection,
223        request: GetRequest,
224        counters: RequestCounters,
225    }
226
227    impl AtInitial {
228        /// Create a new get response
229        ///
230        /// `connection` is an existing connection
231        /// `request` is the request to be sent
232        pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
233            Self {
234                connection,
235                request,
236                counters,
237            }
238        }
239
240        /// Initiate a new bidi stream to use for the get response
241        pub async fn next(self) -> Result<AtConnected, InitialNextError> {
242            let start = Instant::now();
243            let (writer, reader) = self
244                .connection
245                .open_bi()
246                .await
247                .map_err(|e| OpenSnafu.into_error(e.into()))?;
248            Ok(AtConnected {
249                start,
250                reader,
251                writer,
252                request: self.request,
253                counters: self.counters,
254            })
255        }
256    }
257
258    /// Error that you can get from [`AtConnected::next`]
259    #[common_fields({
260        backtrace: Option<Backtrace>,
261        #[snafu(implicit)]
262        span_trace: SpanTrace,
263    })]
264    #[allow(missing_docs)]
265    #[derive(Debug, Snafu)]
266    #[non_exhaustive]
267    pub enum InitialNextError {
268        Open { source: io::Error },
269    }
270
271    /// State of the get response machine after the handshake has been sent
272    #[derive(Debug)]
273    pub struct AtConnected<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
274        start: Instant,
275        reader: R,
276        writer: W,
277        request: GetRequest,
278        counters: RequestCounters,
279    }
280
281    /// Possible next states after the handshake has been sent
282    #[derive(Debug, From)]
283    pub enum ConnectedNext<R: RecvStream = DefaultReader> {
284        /// First response is either a collection or a single blob
285        StartRoot(AtStartRoot<R>),
286        /// First response is a child
287        StartChild(AtStartChild<R>),
288        /// Request is empty
289        Closing(AtClosing<R>),
290    }
291
292    /// Error that you can get from [`AtConnected::next`]
293    #[common_fields({
294        backtrace: Option<Backtrace>,
295        #[snafu(implicit)]
296        span_trace: SpanTrace,
297    })]
298    #[allow(missing_docs)]
299    #[derive(Debug, Snafu)]
300    #[snafu(module)]
301    #[non_exhaustive]
302    pub enum ConnectedNextError {
303        /// Error when serializing the request
304        #[snafu(display("postcard ser: {source}"))]
305        PostcardSer { source: postcard::Error },
306        /// The serialized request is too long to be sent
307        #[snafu(display("request too big"))]
308        RequestTooBig {},
309        /// Error when writing the request to the [`SendStream`].
310        #[snafu(display("write: {source}"))]
311        Write { source: io::Error },
312    }
313
314    impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
315        pub fn new(
316            start: Instant,
317            reader: R,
318            writer: W,
319            request: GetRequest,
320            counters: RequestCounters,
321        ) -> Self {
322            Self {
323                start,
324                reader,
325                writer,
326                request,
327                counters,
328            }
329        }
330
331        /// Send the request and move to the next state
332        ///
333        /// The next state will be either `StartRoot` or `StartChild` depending on whether
334        /// the request requests part of the collection or not.
335        ///
336        /// If the request is empty, this can also move directly to `Finished`.
337        pub async fn next(self) -> Result<ConnectedNext<R>, ConnectedNextError> {
338            let Self {
339                start,
340                reader,
341                mut writer,
342                mut request,
343                mut counters,
344            } = self;
345            // 1. Send Request
346            counters.other_bytes_written += {
347                debug!("sending request");
348                let wrapped = Request::Get(request);
349                let request_bytes = postcard::to_stdvec(&wrapped)
350                    .context(connected_next_error::PostcardSerSnafu)?;
351                let Request::Get(x) = wrapped else {
352                    unreachable!();
353                };
354                request = x;
355
356                if request_bytes.len() > MAX_MESSAGE_SIZE {
357                    return Err(connected_next_error::RequestTooBigSnafu.build());
358                }
359
360                // write the request itself
361                let len = request_bytes.len() as u64;
362                writer
363                    .send_bytes(request_bytes.into())
364                    .await
365                    .context(connected_next_error::WriteSnafu)?;
366                writer
367                    .sync()
368                    .await
369                    .context(connected_next_error::WriteSnafu)?;
370                len
371            };
372
373            // 2. Finish writing before expecting a response
374            drop(writer);
375
376            let hash = request.hash;
377            let ranges_iter = RangesIter::new(request.ranges);
378            // this is in a box so we don't have to memcpy it on every state transition
379            let mut misc = Box::new(Misc {
380                counters,
381                start,
382                ranges_iter,
383            });
384            Ok(match misc.ranges_iter.next() {
385                Some((offset, ranges)) => {
386                    if offset == 0 {
387                        AtStartRoot {
388                            reader,
389                            ranges,
390                            misc,
391                            hash,
392                        }
393                        .into()
394                    } else {
395                        AtStartChild {
396                            reader,
397                            ranges,
398                            misc,
399                            offset,
400                        }
401                        .into()
402                    }
403                }
404                None => AtClosing::new(misc, reader, true).into(),
405            })
406        }
407    }
408
409    /// State of the get response when we start reading a collection
410    #[derive(Debug)]
411    pub struct AtStartRoot<R: RecvStream = DefaultReader> {
412        ranges: ChunkRanges,
413        reader: R,
414        misc: Box<Misc>,
415        hash: Hash,
416    }
417
418    /// State of the get response when we start reading a child
419    #[derive(Debug)]
420    pub struct AtStartChild<R: RecvStream = DefaultReader> {
421        ranges: ChunkRanges,
422        reader: R,
423        misc: Box<Misc>,
424        offset: u64,
425    }
426
427    impl<R: RecvStream> AtStartChild<R> {
428        /// The offset of the child we are currently reading
429        ///
430        /// This must be used to determine the hash needed to call next.
431        /// If this is larger than the number of children in the collection,
432        /// you can call finish to stop reading the response.
433        pub fn offset(&self) -> u64 {
434            self.offset
435        }
436
437        /// The ranges we have requested for the child
438        pub fn ranges(&self) -> &ChunkRanges {
439            &self.ranges
440        }
441
442        /// Go into the next state, reading the header
443        ///
444        /// This requires passing in the hash of the child for validation
445        pub fn next(self, hash: Hash) -> AtBlobHeader<R> {
446            AtBlobHeader {
447                reader: self.reader,
448                ranges: self.ranges,
449                misc: self.misc,
450                hash,
451            }
452        }
453
454        /// Finish the get response without reading further
455        ///
456        /// This is used if you know that there are no more children from having
457        /// read the collection, or when you want to stop reading the response
458        /// early.
459        pub fn finish(self) -> AtClosing<R> {
460            AtClosing::new(self.misc, self.reader, false)
461        }
462    }
463
464    impl<R: RecvStream> AtStartRoot<R> {
465        /// The ranges we have requested for the child
466        pub fn ranges(&self) -> &ChunkRanges {
467            &self.ranges
468        }
469
470        /// Hash of the root blob
471        pub fn hash(&self) -> Hash {
472            self.hash
473        }
474
475        /// Go into the next state, reading the header
476        ///
477        /// For the collection we already know the hash, since it was part of the request
478        pub fn next(self) -> AtBlobHeader<R> {
479            AtBlobHeader {
480                reader: self.reader,
481                ranges: self.ranges,
482                hash: self.hash,
483                misc: self.misc,
484            }
485        }
486
487        /// Finish the get response without reading further
488        pub fn finish(self) -> AtClosing<R> {
489            AtClosing::new(self.misc, self.reader, false)
490        }
491    }
492
493    /// State before reading a size header
494    #[derive(Debug)]
495    pub struct AtBlobHeader<R: RecvStream = DefaultReader> {
496        ranges: ChunkRanges,
497        reader: R,
498        misc: Box<Misc>,
499        hash: Hash,
500    }
501
502    /// Error that you can get from [`AtBlobHeader::next`]
503    #[common_fields({
504        backtrace: Option<Backtrace>,
505        #[snafu(implicit)]
506        span_trace: SpanTrace,
507    })]
508    #[non_exhaustive]
509    #[derive(Debug, Snafu)]
510    #[snafu(module)]
511    pub enum AtBlobHeaderNextError {
512        /// Eof when reading the size header
513        ///
514        /// This indicates that the provider does not have the requested data.
515        #[snafu(display("not found"))]
516        NotFound {},
517        /// Generic io error
518        #[snafu(display("io: {source}"))]
519        Read { source: io::Error },
520    }
521
522    impl From<AtBlobHeaderNextError> for io::Error {
523        fn from(cause: AtBlobHeaderNextError) -> Self {
524            match cause {
525                AtBlobHeaderNextError::NotFound { .. } => {
526                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
527                }
528                AtBlobHeaderNextError::Read { source, .. } => source,
529            }
530        }
531    }
532
533    impl<R: RecvStream> AtBlobHeader<R> {
534        /// Read the size header, returning it and going into the `Content` state.
535        pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
536            let mut size = [0; 8];
537            self.reader.recv_exact(&mut size).await.map_err(|cause| {
538                if cause.kind() == io::ErrorKind::UnexpectedEof {
539                    at_blob_header_next_error::NotFoundSnafu.build()
540                } else {
541                    at_blob_header_next_error::ReadSnafu.into_error(cause)
542                }
543            })?;
544            self.misc.other_bytes_read += 8;
545            let size = u64::from_le_bytes(size);
546            let stream = ResponseDecoder::new(
547                self.hash.into(),
548                self.ranges,
549                BaoTree::new(size, IROH_BLOCK_SIZE),
550                RecvStreamAsyncStreamReader::new(self.reader),
551            );
552            Ok((
553                AtBlobContent {
554                    stream,
555                    misc: self.misc,
556                },
557                size,
558            ))
559        }
560
561        /// Drain the response and throw away the result
562        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
563            let (content, _size) = self.next().await?;
564            content.drain().await
565        }
566
567        /// Concatenate the entire response into a vec
568        ///
569        /// For a request that does not request the complete blob, this will just
570        /// concatenate the ranges that were requested.
571        pub async fn concatenate_into_vec(
572            self,
573        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
574            let (content, _size) = self.next().await?;
575            content.concatenate_into_vec().await
576        }
577
578        /// Write the entire blob to a slice writer.
579        pub async fn write_all<D: AsyncSliceWriter>(
580            self,
581            data: D,
582        ) -> result::Result<AtEndBlob<R>, DecodeError> {
583            let (content, _size) = self.next().await?;
584            let res = content.write_all(data).await?;
585            Ok(res)
586        }
587
588        /// Write the entire blob to a slice writer and to an optional outboard.
589        ///
590        /// The outboard is only written to if the blob is larger than a single
591        /// chunk group.
592        pub async fn write_all_with_outboard<D, O>(
593            self,
594            outboard: Option<O>,
595            data: D,
596        ) -> result::Result<AtEndBlob<R>, DecodeError>
597        where
598            D: AsyncSliceWriter,
599            O: OutboardMut,
600        {
601            let (content, _size) = self.next().await?;
602            let res = content.write_all_with_outboard(outboard, data).await?;
603            Ok(res)
604        }
605
606        /// The hash of the blob we are reading.
607        pub fn hash(&self) -> Hash {
608            self.hash
609        }
610
611        /// The ranges we have requested for the current hash.
612        pub fn ranges(&self) -> &ChunkRanges {
613            &self.ranges
614        }
615
616        /// The current offset of the blob we are reading.
617        pub fn offset(&self) -> u64 {
618            self.misc.ranges_iter.offset()
619        }
620    }
621
622    /// State while we are reading content
623    #[derive(Debug)]
624    pub struct AtBlobContent<R: RecvStream = DefaultReader> {
625        stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
626        misc: Box<Misc>,
627    }
628
629    /// Decode error that you can get once you have sent the request and are
630    /// decoding the response, e.g. from [`AtBlobContent::next`].
631    ///
632    /// This is similar to [`bao_tree::io::DecodeError`], but takes into account
633    /// that we are reading from a [`RecvStream`], so read errors will be
634    /// propagated as [`DecodeError::Read`], containing a [`io::Error`].
635    ///
636    /// When the provider finds that it does not have a chunk that we requested,
637    /// or that the chunk is invalid, it will stop sending data without producing
638    /// an error. This is indicated by either the [`DecodeError::ParentNotFound`] or
639    /// [`DecodeError::LeafNotFound`] variant, which can be used to detect that data
640    /// is missing but the connection as well that the provider is otherwise healthy.
641    ///
642    /// The [`DecodeError::ParentHashMismatch`] and [`DecodeError::LeafHashMismatch`]
643    /// variants indicate that the provider has sent us invalid data. A well-behaved
644    /// provider should never do this, so this is an indication that the provider is
645    /// not behaving correctly.
646    #[common_fields({
647        backtrace: Option<Backtrace>,
648        #[snafu(implicit)]
649        span_trace: SpanTrace,
650    })]
651    #[non_exhaustive]
652    #[derive(Debug, Snafu)]
653    #[snafu(module)]
654    pub enum DecodeError {
655        /// A chunk was not found or invalid, so the provider stopped sending data
656        #[snafu(display("not found"))]
657        ChunkNotFound {},
658        /// A parent was not found or invalid, so the provider stopped sending data
659        #[snafu(display("parent not found {node:?}"))]
660        ParentNotFound { node: TreeNode },
661        /// A parent was not found or invalid, so the provider stopped sending data
662        #[snafu(display("chunk not found {num}"))]
663        LeafNotFound { num: ChunkNum },
664        /// The hash of a parent did not match the expected hash
665        #[snafu(display("parent hash mismatch: {node:?}"))]
666        ParentHashMismatch { node: TreeNode },
667        /// The hash of a leaf did not match the expected hash
668        #[snafu(display("leaf hash mismatch: {num}"))]
669        LeafHashMismatch { num: ChunkNum },
670        /// Error when reading from the stream
671        #[snafu(display("read: {source}"))]
672        Read { source: io::Error },
673        /// A generic io error
674        #[snafu(display("io: {source}"))]
675        Write { source: io::Error },
676    }
677
678    impl DecodeError {
679        pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
680            decode_error::LeafHashMismatchSnafu { num }.build()
681        }
682    }
683
684    impl From<AtBlobHeaderNextError> for DecodeError {
685        fn from(cause: AtBlobHeaderNextError) -> Self {
686            match cause {
687                AtBlobHeaderNextError::NotFound { .. } => decode_error::ChunkNotFoundSnafu.build(),
688                AtBlobHeaderNextError::Read { source, .. } => {
689                    decode_error::ReadSnafu.into_error(source)
690                }
691            }
692        }
693    }
694
695    impl From<DecodeError> for io::Error {
696        fn from(cause: DecodeError) -> Self {
697            match cause {
698                DecodeError::ParentNotFound { .. } => {
699                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
700                }
701                DecodeError::LeafNotFound { .. } => {
702                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
703                }
704                DecodeError::Read { source, .. } => source,
705                DecodeError::Write { source, .. } => source,
706                _ => io::Error::other(cause),
707            }
708        }
709    }
710
711    impl From<bao_tree::io::DecodeError> for DecodeError {
712        fn from(value: bao_tree::io::DecodeError) -> Self {
713            match value {
714                bao_tree::io::DecodeError::ParentNotFound(node) => {
715                    decode_error::ParentNotFoundSnafu { node }.build()
716                }
717                bao_tree::io::DecodeError::LeafNotFound(num) => {
718                    decode_error::LeafNotFoundSnafu { num }.build()
719                }
720                bao_tree::io::DecodeError::ParentHashMismatch(node) => {
721                    decode_error::ParentHashMismatchSnafu { node }.build()
722                }
723                bao_tree::io::DecodeError::LeafHashMismatch(num) => {
724                    decode_error::LeafHashMismatchSnafu { num }.build()
725                }
726                bao_tree::io::DecodeError::Io(cause) => decode_error::ReadSnafu.into_error(cause),
727            }
728        }
729    }
730
731    /// The next state after reading a content item
732    #[derive(Debug, From)]
733    pub enum BlobContentNext<R: RecvStream> {
734        /// We expect more content
735        More(
736            (
737                AtBlobContent<R>,
738                result::Result<BaoContentItem, DecodeError>,
739            ),
740        ),
741        /// We are done with this blob
742        Done(AtEndBlob<R>),
743    }
744
745    impl<R: RecvStream> AtBlobContent<R> {
746        /// Read the next item, either content, an error, or the end of the blob
747        pub async fn next(self) -> BlobContentNext<R> {
748            match self.stream.next().await {
749                ResponseDecoderNext::More((stream, res)) => {
750                    let mut next = Self { stream, ..self };
751                    let res = res.map_err(DecodeError::from);
752                    match &res {
753                        Ok(BaoContentItem::Parent(_)) => {
754                            next.misc.other_bytes_read += 64;
755                        }
756                        Ok(BaoContentItem::Leaf(leaf)) => {
757                            next.misc.payload_bytes_read += leaf.data.len() as u64;
758                        }
759                        _ => {}
760                    }
761                    BlobContentNext::More((next, res))
762                }
763                ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
764                    stream: stream.into_inner(),
765                    misc: self.misc,
766                }),
767            }
768        }
769
770        /// The geometry of the tree we are currently reading.
771        pub fn tree(&self) -> bao_tree::BaoTree {
772            self.stream.tree()
773        }
774
775        /// The hash of the blob we are reading.
776        pub fn hash(&self) -> Hash {
777            (*self.stream.hash()).into()
778        }
779
780        /// The current offset of the blob we are reading.
781        pub fn offset(&self) -> u64 {
782            self.misc.ranges_iter.offset()
783        }
784
785        /// Current stats
786        pub fn stats(&self) -> Stats {
787            Stats {
788                counters: self.misc.counters,
789                elapsed: self.misc.start.elapsed(),
790            }
791        }
792
793        /// Drain the response and throw away the result
794        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
795            let mut content = self;
796            loop {
797                match content.next().await {
798                    BlobContentNext::More((content1, res)) => {
799                        let _ = res?;
800                        content = content1;
801                    }
802                    BlobContentNext::Done(end) => {
803                        break Ok(end);
804                    }
805                }
806            }
807        }
808
809        /// Concatenate the entire response into a vec
810        pub async fn concatenate_into_vec(
811            self,
812        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
813            let mut res = Vec::with_capacity(1024);
814            let mut curr = self;
815            let done = loop {
816                match curr.next().await {
817                    BlobContentNext::More((next, data)) => {
818                        if let BaoContentItem::Leaf(leaf) = data? {
819                            res.extend_from_slice(&leaf.data);
820                        }
821                        curr = next;
822                    }
823                    BlobContentNext::Done(done) => {
824                        // we are done with the root blob
825                        break done;
826                    }
827                }
828            };
829            Ok((done, res))
830        }
831
832        /// Write the entire blob to a slice writer and to an optional outboard.
833        ///
834        /// The outboard is only written to if the blob is larger than a single
835        /// chunk group.
836        pub async fn write_all_with_outboard<D, O>(
837            self,
838            mut outboard: Option<O>,
839            mut data: D,
840        ) -> result::Result<AtEndBlob<R>, DecodeError>
841        where
842            D: AsyncSliceWriter,
843            O: OutboardMut,
844        {
845            let mut content = self;
846            loop {
847                match content.next().await {
848                    BlobContentNext::More((content1, item)) => {
849                        content = content1;
850                        match item? {
851                            BaoContentItem::Parent(parent) => {
852                                if let Some(outboard) = outboard.as_mut() {
853                                    outboard
854                                        .save(parent.node, &parent.pair)
855                                        .await
856                                        .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
857                                }
858                            }
859                            BaoContentItem::Leaf(leaf) => {
860                                data.write_bytes_at(leaf.offset, leaf.data)
861                                    .await
862                                    .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
863                            }
864                        }
865                    }
866                    BlobContentNext::Done(end) => {
867                        return Ok(end);
868                    }
869                }
870            }
871        }
872
873        /// Write the entire blob to a slice writer.
874        pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob<R>, DecodeError>
875        where
876            D: AsyncSliceWriter,
877        {
878            let mut content = self;
879            loop {
880                match content.next().await {
881                    BlobContentNext::More((content1, item)) => {
882                        content = content1;
883                        match item? {
884                            BaoContentItem::Parent(_) => {}
885                            BaoContentItem::Leaf(leaf) => {
886                                data.write_bytes_at(leaf.offset, leaf.data)
887                                    .await
888                                    .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
889                            }
890                        }
891                    }
892                    BlobContentNext::Done(end) => {
893                        return Ok(end);
894                    }
895                }
896            }
897        }
898
899        /// Immediately finish the get response without reading further
900        pub fn finish(self) -> AtClosing<R> {
901            AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
902        }
903    }
904
905    /// State after we have read all the content for a blob
906    #[derive(Debug)]
907    pub struct AtEndBlob<R: RecvStream = DefaultReader> {
908        stream: R,
909        misc: Box<Misc>,
910    }
911
912    /// The next state after the end of a blob
913    #[derive(Debug, From)]
914    pub enum EndBlobNext<R: RecvStream = DefaultReader> {
915        /// Response is expected to have more children
916        MoreChildren(AtStartChild<R>),
917        /// No more children expected
918        Closing(AtClosing<R>),
919    }
920
921    impl<R: RecvStream> AtEndBlob<R> {
922        /// Read the next child, or finish
923        pub fn next(mut self) -> EndBlobNext<R> {
924            if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
925                AtStartChild {
926                    reader: self.stream,
927                    offset,
928                    ranges,
929                    misc: self.misc,
930                }
931                .into()
932            } else {
933                AtClosing::new(self.misc, self.stream, true).into()
934            }
935        }
936    }
937
938    /// State when finishing the get response
939    #[derive(Debug)]
940    pub struct AtClosing<R: RecvStream = DefaultReader> {
941        misc: Box<Misc>,
942        reader: R,
943        check_extra_data: bool,
944    }
945
946    impl<R: RecvStream> AtClosing<R> {
947        fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
948            Self {
949                misc,
950                reader,
951                check_extra_data,
952            }
953        }
954
955        /// Finish the get response, returning statistics
956        pub async fn next(self) -> result::Result<Stats, AtClosingNextError> {
957            // Shut down the stream
958            let mut reader = self.reader;
959            if self.check_extra_data {
960                let rest = reader.recv_bytes(1).await?;
961                if !rest.is_empty() {
962                    error!("Unexpected extra data at the end of the stream");
963                }
964            }
965            Ok(Stats {
966                counters: self.misc.counters,
967                elapsed: self.misc.start.elapsed(),
968            })
969        }
970    }
971
972    /// Error that you can get from [`AtBlobHeader::next`]
973    #[common_fields({
974        backtrace: Option<Backtrace>,
975        #[snafu(implicit)]
976        span_trace: SpanTrace,
977    })]
978    #[non_exhaustive]
979    #[derive(Debug, Snafu)]
980    #[snafu(module)]
981    pub enum AtClosingNextError {
982        /// Generic io error
983        #[snafu(transparent)]
984        Read { source: io::Error },
985    }
986
987    #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
988    pub struct RequestCounters {
989        /// payload bytes written
990        pub payload_bytes_written: u64,
991        /// request, hash pair and size bytes written
992        pub other_bytes_written: u64,
993        /// payload bytes read
994        pub payload_bytes_read: u64,
995        /// hash pair and size bytes read
996        pub other_bytes_read: u64,
997    }
998
999    /// Stuff we need to hold on to while going through the machine states
1000    #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
1001    struct Misc {
1002        /// start time for statistics
1003        start: Instant,
1004        /// counters
1005        #[deref]
1006        #[deref_mut]
1007        counters: RequestCounters,
1008        /// iterator over the ranges of the collection and the children
1009        ranges_iter: RangesIter,
1010    }
1011}