1use std::{
20    fmt::{self, Debug},
21    time::{Duration, Instant},
22};
23
24use anyhow::Result;
25use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
26use fsm::RequestCounters;
27use n0_snafu::SpanTrace;
28use nested_enum_utils::common_fields;
29use serde::{Deserialize, Serialize};
30use snafu::{Backtrace, IntoError, ResultExt, Snafu};
31use tracing::{debug, error};
32
33use crate::{
34    protocol::ChunkRangesSeq,
35    store::IROH_BLOCK_SIZE,
36    util::{RecvStream, SendStream},
37    Hash,
38};
39
40mod error;
41pub mod request;
42pub(crate) use error::get_error;
43pub use error::{GetError, GetResult};
44
45type DefaultReader = iroh::endpoint::RecvStream;
46type DefaultWriter = iroh::endpoint::SendStream;
47
48pub struct StreamPair<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
49    pub connection_id: u64,
50    pub t0: Instant,
51    pub recv: R,
52    pub send: W,
53}
54
55impl<R: RecvStream, W: SendStream> StreamPair<R, W> {
56    pub fn new(connection_id: u64, recv: R, send: W) -> Self {
57        Self {
58            t0: Instant::now(),
59            recv,
60            send,
61            connection_id,
62        }
63    }
64}
65
66#[derive(
68    Debug,
69    Default,
70    Clone,
71    PartialEq,
72    Eq,
73    Serialize,
74    Deserialize,
75    derive_more::Deref,
76    derive_more::DerefMut,
77)]
78pub struct Stats {
79    #[deref]
81    #[deref_mut]
82    pub counters: RequestCounters,
83    pub elapsed: Duration,
85}
86
87impl Stats {
88    pub fn mbits(&self) -> f64 {
90        let data_len_bit = self.total_bytes_read() * 8;
91        data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
92    }
93
94    pub fn total_bytes_read(&self) -> u64 {
95        self.payload_bytes_read + self.other_bytes_read
96    }
97
98    pub fn combine(&mut self, that: &Stats) {
99        self.payload_bytes_written += that.payload_bytes_written;
100        self.other_bytes_written += that.other_bytes_written;
101        self.payload_bytes_read += that.payload_bytes_read;
102        self.other_bytes_read += that.other_bytes_read;
103        self.elapsed += that.elapsed;
104    }
105}
106
107#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
111pub mod fsm {
112    use std::{io, result};
113
114    use bao_tree::{
115        io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
116        BaoTree, ChunkRanges, TreeNode,
117    };
118    use derive_more::From;
119    use iroh::endpoint::Connection;
120    use iroh_io::AsyncSliceWriter;
121
122    use super::*;
123    use crate::{
124        get::get_error::BadRequestSnafu,
125        protocol::{
126            GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
127        },
128        util::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
129    };
130
131    self_cell::self_cell! {
132        struct RangesIterInner {
133            owner: ChunkRangesSeq,
134            #[not_covariant]
135            dependent: NonEmptyRequestRangeSpecIter,
136        }
137    }
138
139    pub fn start(
141        connection: Connection,
142        request: GetRequest,
143        counters: RequestCounters,
144    ) -> AtInitial {
145        AtInitial::new(connection, request, counters)
146    }
147
148    pub async fn start_get_many(
150        connection: Connection,
151        request: GetManyRequest,
152        counters: RequestCounters,
153    ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
154        let start = Instant::now();
155        let (mut writer, reader) = connection
156            .open_bi()
157            .await
158            .map_err(|e| OpenSnafu.into_error(e.into()))?;
159        let request = Request::GetMany(request);
160        let request_bytes = postcard::to_stdvec(&request)
161            .map_err(|source| BadRequestSnafu.into_error(source.into()))?;
162        writer
163            .send_bytes(request_bytes.into())
164            .await
165            .context(connected_next_error::WriteSnafu)?;
166        let Request::GetMany(request) = request else {
167            unreachable!();
168        };
169        let mut ranges_iter = RangesIter::new(request.ranges.clone());
170        let first_item = ranges_iter.next();
171        let misc = Box::new(Misc {
172            counters,
173            start,
174            ranges_iter,
175        });
176        Ok(match first_item {
177            Some((child_offset, child_ranges)) => Ok(AtStartChild {
178                ranges: child_ranges,
179                reader,
180                misc,
181                offset: child_offset,
182            }),
183            None => Err(AtClosing::new(misc, reader, true)),
184        })
185    }
186
187    struct RangesIter(RangesIterInner);
192
193    impl fmt::Debug for RangesIter {
194        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195            f.debug_struct("RangesIter").finish()
196        }
197    }
198
199    impl RangesIter {
200        pub fn new(owner: ChunkRangesSeq) -> Self {
201            Self(RangesIterInner::new(owner, |owner| {
202                owner.iter_non_empty_infinite()
203            }))
204        }
205
206        pub fn offset(&self) -> u64 {
207            self.0.with_dependent(|_owner, iter| iter.offset())
208        }
209    }
210
211    impl Iterator for RangesIter {
212        type Item = (u64, ChunkRanges);
213
214        fn next(&mut self) -> Option<Self::Item> {
215            self.0.with_dependent_mut(|_owner, iter| {
216                iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
217            })
218        }
219    }
220
221    #[derive(Debug)]
223    pub struct AtInitial {
224        connection: Connection,
225        request: GetRequest,
226        counters: RequestCounters,
227    }
228
229    impl AtInitial {
230        pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
235            Self {
236                connection,
237                request,
238                counters,
239            }
240        }
241
242        pub async fn next(self) -> Result<AtConnected, InitialNextError> {
244            let start = Instant::now();
245            let (writer, reader) = self
246                .connection
247                .open_bi()
248                .await
249                .map_err(|e| OpenSnafu.into_error(e.into()))?;
250            Ok(AtConnected {
251                start,
252                reader,
253                writer,
254                request: self.request,
255                counters: self.counters,
256            })
257        }
258    }
259
260    #[common_fields({
262        backtrace: Option<Backtrace>,
263        #[snafu(implicit)]
264        span_trace: SpanTrace,
265    })]
266    #[allow(missing_docs)]
267    #[derive(Debug, Snafu)]
268    #[non_exhaustive]
269    pub enum InitialNextError {
270        Open { source: io::Error },
271    }
272
273    #[derive(Debug)]
275    pub struct AtConnected<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
276        start: Instant,
277        reader: R,
278        writer: W,
279        request: GetRequest,
280        counters: RequestCounters,
281    }
282
283    #[derive(Debug, From)]
285    pub enum ConnectedNext<R: RecvStream = DefaultReader> {
286        StartRoot(AtStartRoot<R>),
288        StartChild(AtStartChild<R>),
290        Closing(AtClosing<R>),
292    }
293
294    #[common_fields({
296        backtrace: Option<Backtrace>,
297        #[snafu(implicit)]
298        span_trace: SpanTrace,
299    })]
300    #[allow(missing_docs)]
301    #[derive(Debug, Snafu)]
302    #[snafu(module)]
303    #[non_exhaustive]
304    pub enum ConnectedNextError {
305        #[snafu(display("postcard ser: {source}"))]
307        PostcardSer { source: postcard::Error },
308        #[snafu(display("request too big"))]
310        RequestTooBig {},
311        #[snafu(display("write: {source}"))]
313        Write { source: io::Error },
314    }
315
316    impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
317        pub fn new(
318            start: Instant,
319            reader: R,
320            writer: W,
321            request: GetRequest,
322            counters: RequestCounters,
323        ) -> Self {
324            Self {
325                start,
326                reader,
327                writer,
328                request,
329                counters,
330            }
331        }
332
333        pub async fn next(self) -> Result<ConnectedNext<R>, ConnectedNextError> {
340            let Self {
341                start,
342                reader,
343                mut writer,
344                mut request,
345                mut counters,
346            } = self;
347            counters.other_bytes_written += {
349                debug!("sending request");
350                let wrapped = Request::Get(request);
351                let request_bytes = postcard::to_stdvec(&wrapped)
352                    .context(connected_next_error::PostcardSerSnafu)?;
353                let Request::Get(x) = wrapped else {
354                    unreachable!();
355                };
356                request = x;
357
358                if request_bytes.len() > MAX_MESSAGE_SIZE {
359                    return Err(connected_next_error::RequestTooBigSnafu.build());
360                }
361
362                let len = request_bytes.len() as u64;
364                writer
365                    .send_bytes(request_bytes.into())
366                    .await
367                    .context(connected_next_error::WriteSnafu)?;
368                writer
369                    .sync()
370                    .await
371                    .context(connected_next_error::WriteSnafu)?;
372                len
373            };
374
375            drop(writer);
377
378            let hash = request.hash;
379            let ranges_iter = RangesIter::new(request.ranges);
380            let mut misc = Box::new(Misc {
382                counters,
383                start,
384                ranges_iter,
385            });
386            Ok(match misc.ranges_iter.next() {
387                Some((offset, ranges)) => {
388                    if offset == 0 {
389                        AtStartRoot {
390                            reader,
391                            ranges,
392                            misc,
393                            hash,
394                        }
395                        .into()
396                    } else {
397                        AtStartChild {
398                            reader,
399                            ranges,
400                            misc,
401                            offset,
402                        }
403                        .into()
404                    }
405                }
406                None => AtClosing::new(misc, reader, true).into(),
407            })
408        }
409    }
410
411    #[derive(Debug)]
413    pub struct AtStartRoot<R: RecvStream = DefaultReader> {
414        ranges: ChunkRanges,
415        reader: R,
416        misc: Box<Misc>,
417        hash: Hash,
418    }
419
420    #[derive(Debug)]
422    pub struct AtStartChild<R: RecvStream = DefaultReader> {
423        ranges: ChunkRanges,
424        reader: R,
425        misc: Box<Misc>,
426        offset: u64,
427    }
428
429    impl<R: RecvStream> AtStartChild<R> {
430        pub fn offset(&self) -> u64 {
436            self.offset
437        }
438
439        pub fn ranges(&self) -> &ChunkRanges {
441            &self.ranges
442        }
443
444        pub fn next(self, hash: Hash) -> AtBlobHeader<R> {
448            AtBlobHeader {
449                reader: self.reader,
450                ranges: self.ranges,
451                misc: self.misc,
452                hash,
453            }
454        }
455
456        pub fn finish(self) -> AtClosing<R> {
462            AtClosing::new(self.misc, self.reader, false)
463        }
464    }
465
466    impl<R: RecvStream> AtStartRoot<R> {
467        pub fn ranges(&self) -> &ChunkRanges {
469            &self.ranges
470        }
471
472        pub fn hash(&self) -> Hash {
474            self.hash
475        }
476
477        pub fn next(self) -> AtBlobHeader<R> {
481            AtBlobHeader {
482                reader: self.reader,
483                ranges: self.ranges,
484                hash: self.hash,
485                misc: self.misc,
486            }
487        }
488
489        pub fn finish(self) -> AtClosing<R> {
491            AtClosing::new(self.misc, self.reader, false)
492        }
493    }
494
495    #[derive(Debug)]
497    pub struct AtBlobHeader<R: RecvStream = DefaultReader> {
498        ranges: ChunkRanges,
499        reader: R,
500        misc: Box<Misc>,
501        hash: Hash,
502    }
503
504    #[common_fields({
506        backtrace: Option<Backtrace>,
507        #[snafu(implicit)]
508        span_trace: SpanTrace,
509    })]
510    #[non_exhaustive]
511    #[derive(Debug, Snafu)]
512    #[snafu(module)]
513    pub enum AtBlobHeaderNextError {
514        #[snafu(display("not found"))]
518        NotFound {},
519        #[snafu(display("io: {source}"))]
521        Read { source: io::Error },
522    }
523
524    impl From<AtBlobHeaderNextError> for io::Error {
525        fn from(cause: AtBlobHeaderNextError) -> Self {
526            match cause {
527                AtBlobHeaderNextError::NotFound { .. } => {
528                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
529                }
530                AtBlobHeaderNextError::Read { source, .. } => source,
531            }
532        }
533    }
534
535    impl<R: RecvStream> AtBlobHeader<R> {
536        pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
538            let mut size = [0; 8];
539            self.reader.recv_exact(&mut size).await.map_err(|cause| {
540                if cause.kind() == io::ErrorKind::UnexpectedEof {
541                    at_blob_header_next_error::NotFoundSnafu.build()
542                } else {
543                    at_blob_header_next_error::ReadSnafu.into_error(cause)
544                }
545            })?;
546            self.misc.other_bytes_read += 8;
547            let size = u64::from_le_bytes(size);
548            let stream = ResponseDecoder::new(
549                self.hash.into(),
550                self.ranges,
551                BaoTree::new(size, IROH_BLOCK_SIZE),
552                RecvStreamAsyncStreamReader::new(self.reader),
553            );
554            Ok((
555                AtBlobContent {
556                    stream,
557                    misc: self.misc,
558                },
559                size,
560            ))
561        }
562
563        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
565            let (content, _size) = self.next().await?;
566            content.drain().await
567        }
568
569        pub async fn concatenate_into_vec(
574            self,
575        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
576            let (content, _size) = self.next().await?;
577            content.concatenate_into_vec().await
578        }
579
580        pub async fn write_all<D: AsyncSliceWriter>(
582            self,
583            data: D,
584        ) -> result::Result<AtEndBlob<R>, DecodeError> {
585            let (content, _size) = self.next().await?;
586            let res = content.write_all(data).await?;
587            Ok(res)
588        }
589
590        pub async fn write_all_with_outboard<D, O>(
595            self,
596            outboard: Option<O>,
597            data: D,
598        ) -> result::Result<AtEndBlob<R>, DecodeError>
599        where
600            D: AsyncSliceWriter,
601            O: OutboardMut,
602        {
603            let (content, _size) = self.next().await?;
604            let res = content.write_all_with_outboard(outboard, data).await?;
605            Ok(res)
606        }
607
608        pub fn hash(&self) -> Hash {
610            self.hash
611        }
612
613        pub fn ranges(&self) -> &ChunkRanges {
615            &self.ranges
616        }
617
618        pub fn offset(&self) -> u64 {
620            self.misc.ranges_iter.offset()
621        }
622    }
623
624    #[derive(Debug)]
626    pub struct AtBlobContent<R: RecvStream = DefaultReader> {
627        stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
628        misc: Box<Misc>,
629    }
630
631    #[common_fields({
649        backtrace: Option<Backtrace>,
650        #[snafu(implicit)]
651        span_trace: SpanTrace,
652    })]
653    #[non_exhaustive]
654    #[derive(Debug, Snafu)]
655    #[snafu(module)]
656    pub enum DecodeError {
657        #[snafu(display("not found"))]
659        ChunkNotFound {},
660        #[snafu(display("parent not found {node:?}"))]
662        ParentNotFound { node: TreeNode },
663        #[snafu(display("chunk not found {num}"))]
665        LeafNotFound { num: ChunkNum },
666        #[snafu(display("parent hash mismatch: {node:?}"))]
668        ParentHashMismatch { node: TreeNode },
669        #[snafu(display("leaf hash mismatch: {num}"))]
671        LeafHashMismatch { num: ChunkNum },
672        #[snafu(display("read: {source}"))]
674        Read { source: io::Error },
675        #[snafu(display("io: {source}"))]
677        Write { source: io::Error },
678    }
679
680    impl DecodeError {
681        pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
682            decode_error::LeafHashMismatchSnafu { num }.build()
683        }
684    }
685
686    impl From<AtBlobHeaderNextError> for DecodeError {
687        fn from(cause: AtBlobHeaderNextError) -> Self {
688            match cause {
689                AtBlobHeaderNextError::NotFound { .. } => decode_error::ChunkNotFoundSnafu.build(),
690                AtBlobHeaderNextError::Read { source, .. } => {
691                    decode_error::ReadSnafu.into_error(source)
692                }
693            }
694        }
695    }
696
697    impl From<DecodeError> for io::Error {
698        fn from(cause: DecodeError) -> Self {
699            match cause {
700                DecodeError::ParentNotFound { .. } => {
701                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
702                }
703                DecodeError::LeafNotFound { .. } => {
704                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
705                }
706                DecodeError::Read { source, .. } => source,
707                DecodeError::Write { source, .. } => source,
708                _ => io::Error::other(cause),
709            }
710        }
711    }
712
713    impl From<bao_tree::io::DecodeError> for DecodeError {
714        fn from(value: bao_tree::io::DecodeError) -> Self {
715            match value {
716                bao_tree::io::DecodeError::ParentNotFound(node) => {
717                    decode_error::ParentNotFoundSnafu { node }.build()
718                }
719                bao_tree::io::DecodeError::LeafNotFound(num) => {
720                    decode_error::LeafNotFoundSnafu { num }.build()
721                }
722                bao_tree::io::DecodeError::ParentHashMismatch(node) => {
723                    decode_error::ParentHashMismatchSnafu { node }.build()
724                }
725                bao_tree::io::DecodeError::LeafHashMismatch(num) => {
726                    decode_error::LeafHashMismatchSnafu { num }.build()
727                }
728                bao_tree::io::DecodeError::Io(cause) => decode_error::ReadSnafu.into_error(cause),
729            }
730        }
731    }
732
733    #[derive(Debug, From)]
735    pub enum BlobContentNext<R: RecvStream> {
736        More(
738            (
739                AtBlobContent<R>,
740                result::Result<BaoContentItem, DecodeError>,
741            ),
742        ),
743        Done(AtEndBlob<R>),
745    }
746
747    impl<R: RecvStream> AtBlobContent<R> {
748        pub async fn next(self) -> BlobContentNext<R> {
750            match self.stream.next().await {
751                ResponseDecoderNext::More((stream, res)) => {
752                    let mut next = Self { stream, ..self };
753                    let res = res.map_err(DecodeError::from);
754                    match &res {
755                        Ok(BaoContentItem::Parent(_)) => {
756                            next.misc.other_bytes_read += 64;
757                        }
758                        Ok(BaoContentItem::Leaf(leaf)) => {
759                            next.misc.payload_bytes_read += leaf.data.len() as u64;
760                        }
761                        _ => {}
762                    }
763                    BlobContentNext::More((next, res))
764                }
765                ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
766                    stream: stream.into_inner(),
767                    misc: self.misc,
768                }),
769            }
770        }
771
772        pub fn tree(&self) -> bao_tree::BaoTree {
774            self.stream.tree()
775        }
776
777        pub fn hash(&self) -> Hash {
779            (*self.stream.hash()).into()
780        }
781
782        pub fn offset(&self) -> u64 {
784            self.misc.ranges_iter.offset()
785        }
786
787        pub fn stats(&self) -> Stats {
789            Stats {
790                counters: self.misc.counters,
791                elapsed: self.misc.start.elapsed(),
792            }
793        }
794
795        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
797            let mut content = self;
798            loop {
799                match content.next().await {
800                    BlobContentNext::More((content1, res)) => {
801                        let _ = res?;
802                        content = content1;
803                    }
804                    BlobContentNext::Done(end) => {
805                        break Ok(end);
806                    }
807                }
808            }
809        }
810
811        pub async fn concatenate_into_vec(
813            self,
814        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
815            let mut res = Vec::with_capacity(1024);
816            let mut curr = self;
817            let done = loop {
818                match curr.next().await {
819                    BlobContentNext::More((next, data)) => {
820                        if let BaoContentItem::Leaf(leaf) = data? {
821                            res.extend_from_slice(&leaf.data);
822                        }
823                        curr = next;
824                    }
825                    BlobContentNext::Done(done) => {
826                        break done;
828                    }
829                }
830            };
831            Ok((done, res))
832        }
833
834        pub async fn write_all_with_outboard<D, O>(
839            self,
840            mut outboard: Option<O>,
841            mut data: D,
842        ) -> result::Result<AtEndBlob<R>, DecodeError>
843        where
844            D: AsyncSliceWriter,
845            O: OutboardMut,
846        {
847            let mut content = self;
848            loop {
849                match content.next().await {
850                    BlobContentNext::More((content1, item)) => {
851                        content = content1;
852                        match item? {
853                            BaoContentItem::Parent(parent) => {
854                                if let Some(outboard) = outboard.as_mut() {
855                                    outboard
856                                        .save(parent.node, &parent.pair)
857                                        .await
858                                        .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
859                                }
860                            }
861                            BaoContentItem::Leaf(leaf) => {
862                                data.write_bytes_at(leaf.offset, leaf.data)
863                                    .await
864                                    .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
865                            }
866                        }
867                    }
868                    BlobContentNext::Done(end) => {
869                        return Ok(end);
870                    }
871                }
872            }
873        }
874
875        pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob<R>, DecodeError>
877        where
878            D: AsyncSliceWriter,
879        {
880            let mut content = self;
881            loop {
882                match content.next().await {
883                    BlobContentNext::More((content1, item)) => {
884                        content = content1;
885                        match item? {
886                            BaoContentItem::Parent(_) => {}
887                            BaoContentItem::Leaf(leaf) => {
888                                data.write_bytes_at(leaf.offset, leaf.data)
889                                    .await
890                                    .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
891                            }
892                        }
893                    }
894                    BlobContentNext::Done(end) => {
895                        return Ok(end);
896                    }
897                }
898            }
899        }
900
901        pub fn finish(self) -> AtClosing<R> {
903            AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
904        }
905    }
906
907    #[derive(Debug)]
909    pub struct AtEndBlob<R: RecvStream = DefaultReader> {
910        stream: R,
911        misc: Box<Misc>,
912    }
913
914    #[derive(Debug, From)]
916    pub enum EndBlobNext<R: RecvStream = DefaultReader> {
917        MoreChildren(AtStartChild<R>),
919        Closing(AtClosing<R>),
921    }
922
923    impl<R: RecvStream> AtEndBlob<R> {
924        pub fn next(mut self) -> EndBlobNext<R> {
926            if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
927                AtStartChild {
928                    reader: self.stream,
929                    offset,
930                    ranges,
931                    misc: self.misc,
932                }
933                .into()
934            } else {
935                AtClosing::new(self.misc, self.stream, true).into()
936            }
937        }
938    }
939
940    #[derive(Debug)]
942    pub struct AtClosing<R: RecvStream = DefaultReader> {
943        misc: Box<Misc>,
944        reader: R,
945        check_extra_data: bool,
946    }
947
948    impl<R: RecvStream> AtClosing<R> {
949        fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
950            Self {
951                misc,
952                reader,
953                check_extra_data,
954            }
955        }
956
957        pub async fn next(self) -> result::Result<Stats, AtClosingNextError> {
959            let mut reader = self.reader;
961            if self.check_extra_data {
962                let rest = reader.recv_bytes(1).await?;
963                if !rest.is_empty() {
964                    error!("Unexpected extra data at the end of the stream");
965                }
966            }
967            Ok(Stats {
968                counters: self.misc.counters,
969                elapsed: self.misc.start.elapsed(),
970            })
971        }
972    }
973
974    #[common_fields({
976        backtrace: Option<Backtrace>,
977        #[snafu(implicit)]
978        span_trace: SpanTrace,
979    })]
980    #[non_exhaustive]
981    #[derive(Debug, Snafu)]
982    #[snafu(module)]
983    pub enum AtClosingNextError {
984        #[snafu(transparent)]
986        Read { source: io::Error },
987    }
988
989    #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
990    pub struct RequestCounters {
991        pub payload_bytes_written: u64,
993        pub other_bytes_written: u64,
995        pub payload_bytes_read: u64,
997        pub other_bytes_read: u64,
999    }
1000
1001    #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
1003    struct Misc {
1004        start: Instant,
1006        #[deref]
1008        #[deref_mut]
1009        counters: RequestCounters,
1010        ranges_iter: RangesIter,
1012    }
1013}