1use std::fmt::{self, Debug};
20
21use anyhow::Result;
22use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
23use fsm::RequestCounters;
24use n0_future::time::{Duration, Instant};
25use n0_snafu::SpanTrace;
26use nested_enum_utils::common_fields;
27use serde::{Deserialize, Serialize};
28use snafu::{Backtrace, IntoError, ResultExt, Snafu};
29use tracing::{debug, error};
30
31use crate::{
32 protocol::ChunkRangesSeq,
33 store::IROH_BLOCK_SIZE,
34 util::{RecvStream, SendStream},
35 Hash,
36};
37
38mod error;
39pub mod request;
40pub(crate) use error::get_error;
41pub use error::{GetError, GetResult};
42
43type DefaultReader = iroh::endpoint::RecvStream;
44type DefaultWriter = iroh::endpoint::SendStream;
45
46pub struct StreamPair<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
47 pub connection_id: u64,
48 pub t0: Instant,
49 pub recv: R,
50 pub send: W,
51}
52
53impl<R: RecvStream, W: SendStream> StreamPair<R, W> {
54 pub fn new(connection_id: u64, recv: R, send: W) -> Self {
55 Self {
56 t0: Instant::now(),
57 recv,
58 send,
59 connection_id,
60 }
61 }
62}
63
64#[derive(
66 Debug,
67 Default,
68 Clone,
69 PartialEq,
70 Eq,
71 Serialize,
72 Deserialize,
73 derive_more::Deref,
74 derive_more::DerefMut,
75)]
76pub struct Stats {
77 #[deref]
79 #[deref_mut]
80 pub counters: RequestCounters,
81 pub elapsed: Duration,
83}
84
85impl Stats {
86 pub fn mbits(&self) -> f64 {
88 let data_len_bit = self.total_bytes_read() * 8;
89 data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
90 }
91
92 pub fn total_bytes_read(&self) -> u64 {
93 self.payload_bytes_read + self.other_bytes_read
94 }
95
96 pub fn combine(&mut self, that: &Stats) {
97 self.payload_bytes_written += that.payload_bytes_written;
98 self.other_bytes_written += that.other_bytes_written;
99 self.payload_bytes_read += that.payload_bytes_read;
100 self.other_bytes_read += that.other_bytes_read;
101 self.elapsed += that.elapsed;
102 }
103}
104
105#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
109pub mod fsm {
110 use std::{io, result};
111
112 use bao_tree::{
113 io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
114 BaoTree, ChunkRanges, TreeNode,
115 };
116 use derive_more::From;
117 use iroh::endpoint::Connection;
118 use iroh_io::AsyncSliceWriter;
119
120 use super::*;
121 use crate::{
122 get::get_error::BadRequestSnafu,
123 protocol::{
124 GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
125 },
126 util::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
127 };
128
129 self_cell::self_cell! {
130 struct RangesIterInner {
131 owner: ChunkRangesSeq,
132 #[not_covariant]
133 dependent: NonEmptyRequestRangeSpecIter,
134 }
135 }
136
137 pub fn start(
139 connection: Connection,
140 request: GetRequest,
141 counters: RequestCounters,
142 ) -> AtInitial {
143 AtInitial::new(connection, request, counters)
144 }
145
146 pub async fn start_get_many(
148 connection: Connection,
149 request: GetManyRequest,
150 counters: RequestCounters,
151 ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
152 let start = Instant::now();
153 let (mut writer, reader) = connection
154 .open_bi()
155 .await
156 .map_err(|e| OpenSnafu.into_error(e.into()))?;
157 let request = Request::GetMany(request);
158 let request_bytes = postcard::to_stdvec(&request)
159 .map_err(|source| BadRequestSnafu.into_error(source.into()))?;
160 writer
161 .send_bytes(request_bytes.into())
162 .await
163 .context(connected_next_error::WriteSnafu)?;
164 let Request::GetMany(request) = request else {
165 unreachable!();
166 };
167 let mut ranges_iter = RangesIter::new(request.ranges.clone());
168 let first_item = ranges_iter.next();
169 let misc = Box::new(Misc {
170 counters,
171 start,
172 ranges_iter,
173 });
174 Ok(match first_item {
175 Some((child_offset, child_ranges)) => Ok(AtStartChild {
176 ranges: child_ranges,
177 reader,
178 misc,
179 offset: child_offset,
180 }),
181 None => Err(AtClosing::new(misc, reader, true)),
182 })
183 }
184
185 struct RangesIter(RangesIterInner);
190
191 impl fmt::Debug for RangesIter {
192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193 f.debug_struct("RangesIter").finish()
194 }
195 }
196
197 impl RangesIter {
198 pub fn new(owner: ChunkRangesSeq) -> Self {
199 Self(RangesIterInner::new(owner, |owner| {
200 owner.iter_non_empty_infinite()
201 }))
202 }
203
204 pub fn offset(&self) -> u64 {
205 self.0.with_dependent(|_owner, iter| iter.offset())
206 }
207 }
208
209 impl Iterator for RangesIter {
210 type Item = (u64, ChunkRanges);
211
212 fn next(&mut self) -> Option<Self::Item> {
213 self.0.with_dependent_mut(|_owner, iter| {
214 iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
215 })
216 }
217 }
218
219 #[derive(Debug)]
221 pub struct AtInitial {
222 connection: Connection,
223 request: GetRequest,
224 counters: RequestCounters,
225 }
226
227 impl AtInitial {
228 pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
233 Self {
234 connection,
235 request,
236 counters,
237 }
238 }
239
240 pub async fn next(self) -> Result<AtConnected, InitialNextError> {
242 let start = Instant::now();
243 let (writer, reader) = self
244 .connection
245 .open_bi()
246 .await
247 .map_err(|e| OpenSnafu.into_error(e.into()))?;
248 Ok(AtConnected {
249 start,
250 reader,
251 writer,
252 request: self.request,
253 counters: self.counters,
254 })
255 }
256 }
257
258 #[common_fields({
260 backtrace: Option<Backtrace>,
261 #[snafu(implicit)]
262 span_trace: SpanTrace,
263 })]
264 #[allow(missing_docs)]
265 #[derive(Debug, Snafu)]
266 #[non_exhaustive]
267 pub enum InitialNextError {
268 Open { source: io::Error },
269 }
270
271 #[derive(Debug)]
273 pub struct AtConnected<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
274 start: Instant,
275 reader: R,
276 writer: W,
277 request: GetRequest,
278 counters: RequestCounters,
279 }
280
281 #[derive(Debug, From)]
283 pub enum ConnectedNext<R: RecvStream = DefaultReader> {
284 StartRoot(AtStartRoot<R>),
286 StartChild(AtStartChild<R>),
288 Closing(AtClosing<R>),
290 }
291
292 #[common_fields({
294 backtrace: Option<Backtrace>,
295 #[snafu(implicit)]
296 span_trace: SpanTrace,
297 })]
298 #[allow(missing_docs)]
299 #[derive(Debug, Snafu)]
300 #[snafu(module)]
301 #[non_exhaustive]
302 pub enum ConnectedNextError {
303 #[snafu(display("postcard ser: {source}"))]
305 PostcardSer { source: postcard::Error },
306 #[snafu(display("request too big"))]
308 RequestTooBig {},
309 #[snafu(display("write: {source}"))]
311 Write { source: io::Error },
312 }
313
314 impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
315 pub fn new(
316 start: Instant,
317 reader: R,
318 writer: W,
319 request: GetRequest,
320 counters: RequestCounters,
321 ) -> Self {
322 Self {
323 start,
324 reader,
325 writer,
326 request,
327 counters,
328 }
329 }
330
331 pub async fn next(self) -> Result<ConnectedNext<R>, ConnectedNextError> {
338 let Self {
339 start,
340 reader,
341 mut writer,
342 mut request,
343 mut counters,
344 } = self;
345 counters.other_bytes_written += {
347 debug!("sending request");
348 let wrapped = Request::Get(request);
349 let request_bytes = postcard::to_stdvec(&wrapped)
350 .context(connected_next_error::PostcardSerSnafu)?;
351 let Request::Get(x) = wrapped else {
352 unreachable!();
353 };
354 request = x;
355
356 if request_bytes.len() > MAX_MESSAGE_SIZE {
357 return Err(connected_next_error::RequestTooBigSnafu.build());
358 }
359
360 let len = request_bytes.len() as u64;
362 writer
363 .send_bytes(request_bytes.into())
364 .await
365 .context(connected_next_error::WriteSnafu)?;
366 writer
367 .sync()
368 .await
369 .context(connected_next_error::WriteSnafu)?;
370 len
371 };
372
373 drop(writer);
375
376 let hash = request.hash;
377 let ranges_iter = RangesIter::new(request.ranges);
378 let mut misc = Box::new(Misc {
380 counters,
381 start,
382 ranges_iter,
383 });
384 Ok(match misc.ranges_iter.next() {
385 Some((offset, ranges)) => {
386 if offset == 0 {
387 AtStartRoot {
388 reader,
389 ranges,
390 misc,
391 hash,
392 }
393 .into()
394 } else {
395 AtStartChild {
396 reader,
397 ranges,
398 misc,
399 offset,
400 }
401 .into()
402 }
403 }
404 None => AtClosing::new(misc, reader, true).into(),
405 })
406 }
407 }
408
409 #[derive(Debug)]
411 pub struct AtStartRoot<R: RecvStream = DefaultReader> {
412 ranges: ChunkRanges,
413 reader: R,
414 misc: Box<Misc>,
415 hash: Hash,
416 }
417
418 #[derive(Debug)]
420 pub struct AtStartChild<R: RecvStream = DefaultReader> {
421 ranges: ChunkRanges,
422 reader: R,
423 misc: Box<Misc>,
424 offset: u64,
425 }
426
427 impl<R: RecvStream> AtStartChild<R> {
428 pub fn offset(&self) -> u64 {
434 self.offset
435 }
436
437 pub fn ranges(&self) -> &ChunkRanges {
439 &self.ranges
440 }
441
442 pub fn next(self, hash: Hash) -> AtBlobHeader<R> {
446 AtBlobHeader {
447 reader: self.reader,
448 ranges: self.ranges,
449 misc: self.misc,
450 hash,
451 }
452 }
453
454 pub fn finish(self) -> AtClosing<R> {
460 AtClosing::new(self.misc, self.reader, false)
461 }
462 }
463
464 impl<R: RecvStream> AtStartRoot<R> {
465 pub fn ranges(&self) -> &ChunkRanges {
467 &self.ranges
468 }
469
470 pub fn hash(&self) -> Hash {
472 self.hash
473 }
474
475 pub fn next(self) -> AtBlobHeader<R> {
479 AtBlobHeader {
480 reader: self.reader,
481 ranges: self.ranges,
482 hash: self.hash,
483 misc: self.misc,
484 }
485 }
486
487 pub fn finish(self) -> AtClosing<R> {
489 AtClosing::new(self.misc, self.reader, false)
490 }
491 }
492
493 #[derive(Debug)]
495 pub struct AtBlobHeader<R: RecvStream = DefaultReader> {
496 ranges: ChunkRanges,
497 reader: R,
498 misc: Box<Misc>,
499 hash: Hash,
500 }
501
502 #[common_fields({
504 backtrace: Option<Backtrace>,
505 #[snafu(implicit)]
506 span_trace: SpanTrace,
507 })]
508 #[non_exhaustive]
509 #[derive(Debug, Snafu)]
510 #[snafu(module)]
511 pub enum AtBlobHeaderNextError {
512 #[snafu(display("not found"))]
516 NotFound {},
517 #[snafu(display("io: {source}"))]
519 Read { source: io::Error },
520 }
521
522 impl From<AtBlobHeaderNextError> for io::Error {
523 fn from(cause: AtBlobHeaderNextError) -> Self {
524 match cause {
525 AtBlobHeaderNextError::NotFound { .. } => {
526 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
527 }
528 AtBlobHeaderNextError::Read { source, .. } => source,
529 }
530 }
531 }
532
533 impl<R: RecvStream> AtBlobHeader<R> {
534 pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
536 let mut size = [0; 8];
537 self.reader.recv_exact(&mut size).await.map_err(|cause| {
538 if cause.kind() == io::ErrorKind::UnexpectedEof {
539 at_blob_header_next_error::NotFoundSnafu.build()
540 } else {
541 at_blob_header_next_error::ReadSnafu.into_error(cause)
542 }
543 })?;
544 self.misc.other_bytes_read += 8;
545 let size = u64::from_le_bytes(size);
546 let stream = ResponseDecoder::new(
547 self.hash.into(),
548 self.ranges,
549 BaoTree::new(size, IROH_BLOCK_SIZE),
550 RecvStreamAsyncStreamReader::new(self.reader),
551 );
552 Ok((
553 AtBlobContent {
554 stream,
555 misc: self.misc,
556 },
557 size,
558 ))
559 }
560
561 pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
563 let (content, _size) = self.next().await?;
564 content.drain().await
565 }
566
567 pub async fn concatenate_into_vec(
572 self,
573 ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
574 let (content, _size) = self.next().await?;
575 content.concatenate_into_vec().await
576 }
577
578 pub async fn write_all<D: AsyncSliceWriter>(
580 self,
581 data: D,
582 ) -> result::Result<AtEndBlob<R>, DecodeError> {
583 let (content, _size) = self.next().await?;
584 let res = content.write_all(data).await?;
585 Ok(res)
586 }
587
588 pub async fn write_all_with_outboard<D, O>(
593 self,
594 outboard: Option<O>,
595 data: D,
596 ) -> result::Result<AtEndBlob<R>, DecodeError>
597 where
598 D: AsyncSliceWriter,
599 O: OutboardMut,
600 {
601 let (content, _size) = self.next().await?;
602 let res = content.write_all_with_outboard(outboard, data).await?;
603 Ok(res)
604 }
605
606 pub fn hash(&self) -> Hash {
608 self.hash
609 }
610
611 pub fn ranges(&self) -> &ChunkRanges {
613 &self.ranges
614 }
615
616 pub fn offset(&self) -> u64 {
618 self.misc.ranges_iter.offset()
619 }
620 }
621
622 #[derive(Debug)]
624 pub struct AtBlobContent<R: RecvStream = DefaultReader> {
625 stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
626 misc: Box<Misc>,
627 }
628
629 #[common_fields({
647 backtrace: Option<Backtrace>,
648 #[snafu(implicit)]
649 span_trace: SpanTrace,
650 })]
651 #[non_exhaustive]
652 #[derive(Debug, Snafu)]
653 #[snafu(module)]
654 pub enum DecodeError {
655 #[snafu(display("not found"))]
657 ChunkNotFound {},
658 #[snafu(display("parent not found {node:?}"))]
660 ParentNotFound { node: TreeNode },
661 #[snafu(display("chunk not found {num}"))]
663 LeafNotFound { num: ChunkNum },
664 #[snafu(display("parent hash mismatch: {node:?}"))]
666 ParentHashMismatch { node: TreeNode },
667 #[snafu(display("leaf hash mismatch: {num}"))]
669 LeafHashMismatch { num: ChunkNum },
670 #[snafu(display("read: {source}"))]
672 Read { source: io::Error },
673 #[snafu(display("io: {source}"))]
675 Write { source: io::Error },
676 }
677
678 impl DecodeError {
679 pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
680 decode_error::LeafHashMismatchSnafu { num }.build()
681 }
682 }
683
684 impl From<AtBlobHeaderNextError> for DecodeError {
685 fn from(cause: AtBlobHeaderNextError) -> Self {
686 match cause {
687 AtBlobHeaderNextError::NotFound { .. } => decode_error::ChunkNotFoundSnafu.build(),
688 AtBlobHeaderNextError::Read { source, .. } => {
689 decode_error::ReadSnafu.into_error(source)
690 }
691 }
692 }
693 }
694
695 impl From<DecodeError> for io::Error {
696 fn from(cause: DecodeError) -> Self {
697 match cause {
698 DecodeError::ParentNotFound { .. } => {
699 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
700 }
701 DecodeError::LeafNotFound { .. } => {
702 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
703 }
704 DecodeError::Read { source, .. } => source,
705 DecodeError::Write { source, .. } => source,
706 _ => io::Error::other(cause),
707 }
708 }
709 }
710
711 impl From<bao_tree::io::DecodeError> for DecodeError {
712 fn from(value: bao_tree::io::DecodeError) -> Self {
713 match value {
714 bao_tree::io::DecodeError::ParentNotFound(node) => {
715 decode_error::ParentNotFoundSnafu { node }.build()
716 }
717 bao_tree::io::DecodeError::LeafNotFound(num) => {
718 decode_error::LeafNotFoundSnafu { num }.build()
719 }
720 bao_tree::io::DecodeError::ParentHashMismatch(node) => {
721 decode_error::ParentHashMismatchSnafu { node }.build()
722 }
723 bao_tree::io::DecodeError::LeafHashMismatch(num) => {
724 decode_error::LeafHashMismatchSnafu { num }.build()
725 }
726 bao_tree::io::DecodeError::Io(cause) => decode_error::ReadSnafu.into_error(cause),
727 }
728 }
729 }
730
731 #[derive(Debug, From)]
733 pub enum BlobContentNext<R: RecvStream> {
734 More(
736 (
737 AtBlobContent<R>,
738 result::Result<BaoContentItem, DecodeError>,
739 ),
740 ),
741 Done(AtEndBlob<R>),
743 }
744
745 impl<R: RecvStream> AtBlobContent<R> {
746 pub async fn next(self) -> BlobContentNext<R> {
748 match self.stream.next().await {
749 ResponseDecoderNext::More((stream, res)) => {
750 let mut next = Self { stream, ..self };
751 let res = res.map_err(DecodeError::from);
752 match &res {
753 Ok(BaoContentItem::Parent(_)) => {
754 next.misc.other_bytes_read += 64;
755 }
756 Ok(BaoContentItem::Leaf(leaf)) => {
757 next.misc.payload_bytes_read += leaf.data.len() as u64;
758 }
759 _ => {}
760 }
761 BlobContentNext::More((next, res))
762 }
763 ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
764 stream: stream.into_inner(),
765 misc: self.misc,
766 }),
767 }
768 }
769
770 pub fn tree(&self) -> bao_tree::BaoTree {
772 self.stream.tree()
773 }
774
775 pub fn hash(&self) -> Hash {
777 (*self.stream.hash()).into()
778 }
779
780 pub fn offset(&self) -> u64 {
782 self.misc.ranges_iter.offset()
783 }
784
785 pub fn stats(&self) -> Stats {
787 Stats {
788 counters: self.misc.counters,
789 elapsed: self.misc.start.elapsed(),
790 }
791 }
792
793 pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
795 let mut content = self;
796 loop {
797 match content.next().await {
798 BlobContentNext::More((content1, res)) => {
799 let _ = res?;
800 content = content1;
801 }
802 BlobContentNext::Done(end) => {
803 break Ok(end);
804 }
805 }
806 }
807 }
808
809 pub async fn concatenate_into_vec(
811 self,
812 ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
813 let mut res = Vec::with_capacity(1024);
814 let mut curr = self;
815 let done = loop {
816 match curr.next().await {
817 BlobContentNext::More((next, data)) => {
818 if let BaoContentItem::Leaf(leaf) = data? {
819 res.extend_from_slice(&leaf.data);
820 }
821 curr = next;
822 }
823 BlobContentNext::Done(done) => {
824 break done;
826 }
827 }
828 };
829 Ok((done, res))
830 }
831
832 pub async fn write_all_with_outboard<D, O>(
837 self,
838 mut outboard: Option<O>,
839 mut data: D,
840 ) -> result::Result<AtEndBlob<R>, DecodeError>
841 where
842 D: AsyncSliceWriter,
843 O: OutboardMut,
844 {
845 let mut content = self;
846 loop {
847 match content.next().await {
848 BlobContentNext::More((content1, item)) => {
849 content = content1;
850 match item? {
851 BaoContentItem::Parent(parent) => {
852 if let Some(outboard) = outboard.as_mut() {
853 outboard
854 .save(parent.node, &parent.pair)
855 .await
856 .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
857 }
858 }
859 BaoContentItem::Leaf(leaf) => {
860 data.write_bytes_at(leaf.offset, leaf.data)
861 .await
862 .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
863 }
864 }
865 }
866 BlobContentNext::Done(end) => {
867 return Ok(end);
868 }
869 }
870 }
871 }
872
873 pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob<R>, DecodeError>
875 where
876 D: AsyncSliceWriter,
877 {
878 let mut content = self;
879 loop {
880 match content.next().await {
881 BlobContentNext::More((content1, item)) => {
882 content = content1;
883 match item? {
884 BaoContentItem::Parent(_) => {}
885 BaoContentItem::Leaf(leaf) => {
886 data.write_bytes_at(leaf.offset, leaf.data)
887 .await
888 .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
889 }
890 }
891 }
892 BlobContentNext::Done(end) => {
893 return Ok(end);
894 }
895 }
896 }
897 }
898
899 pub fn finish(self) -> AtClosing<R> {
901 AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
902 }
903 }
904
905 #[derive(Debug)]
907 pub struct AtEndBlob<R: RecvStream = DefaultReader> {
908 stream: R,
909 misc: Box<Misc>,
910 }
911
912 #[derive(Debug, From)]
914 pub enum EndBlobNext<R: RecvStream = DefaultReader> {
915 MoreChildren(AtStartChild<R>),
917 Closing(AtClosing<R>),
919 }
920
921 impl<R: RecvStream> AtEndBlob<R> {
922 pub fn next(mut self) -> EndBlobNext<R> {
924 if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
925 AtStartChild {
926 reader: self.stream,
927 offset,
928 ranges,
929 misc: self.misc,
930 }
931 .into()
932 } else {
933 AtClosing::new(self.misc, self.stream, true).into()
934 }
935 }
936 }
937
938 #[derive(Debug)]
940 pub struct AtClosing<R: RecvStream = DefaultReader> {
941 misc: Box<Misc>,
942 reader: R,
943 check_extra_data: bool,
944 }
945
946 impl<R: RecvStream> AtClosing<R> {
947 fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
948 Self {
949 misc,
950 reader,
951 check_extra_data,
952 }
953 }
954
955 pub async fn next(self) -> result::Result<Stats, AtClosingNextError> {
957 let mut reader = self.reader;
959 if self.check_extra_data {
960 let rest = reader.recv_bytes(1).await?;
961 if !rest.is_empty() {
962 error!("Unexpected extra data at the end of the stream");
963 }
964 }
965 Ok(Stats {
966 counters: self.misc.counters,
967 elapsed: self.misc.start.elapsed(),
968 })
969 }
970 }
971
972 #[common_fields({
974 backtrace: Option<Backtrace>,
975 #[snafu(implicit)]
976 span_trace: SpanTrace,
977 })]
978 #[non_exhaustive]
979 #[derive(Debug, Snafu)]
980 #[snafu(module)]
981 pub enum AtClosingNextError {
982 #[snafu(transparent)]
984 Read { source: io::Error },
985 }
986
987 #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
988 pub struct RequestCounters {
989 pub payload_bytes_written: u64,
991 pub other_bytes_written: u64,
993 pub payload_bytes_read: u64,
995 pub other_bytes_read: u64,
997 }
998
999 #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
1001 struct Misc {
1002 start: Instant,
1004 #[deref]
1006 #[deref_mut]
1007 counters: RequestCounters,
1008 ranges_iter: RangesIter,
1010 }
1011}