1use std::{
20 fmt::{self, Debug},
21 time::Duration,
22};
23
24use anyhow::Result;
25use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
26use fsm::RequestCounters;
27use n0_future::time::Instant;
28use n0_snafu::SpanTrace;
29use nested_enum_utils::common_fields;
30use serde::{Deserialize, Serialize};
31use snafu::{Backtrace, IntoError, ResultExt, Snafu};
32use tracing::{debug, error};
33
34use crate::{
35 protocol::ChunkRangesSeq,
36 store::IROH_BLOCK_SIZE,
37 util::{RecvStream, SendStream},
38 Hash,
39};
40
41mod error;
42pub mod request;
43pub(crate) use error::get_error;
44pub use error::{GetError, GetResult};
45
46type DefaultReader = iroh::endpoint::RecvStream;
47type DefaultWriter = iroh::endpoint::SendStream;
48
49pub struct StreamPair<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
50 pub connection_id: u64,
51 pub t0: Instant,
52 pub recv: R,
53 pub send: W,
54}
55
56impl<R: RecvStream, W: SendStream> StreamPair<R, W> {
57 pub fn new(connection_id: u64, recv: R, send: W) -> Self {
58 Self {
59 t0: Instant::now(),
60 recv,
61 send,
62 connection_id,
63 }
64 }
65}
66
67#[derive(
69 Debug,
70 Default,
71 Clone,
72 PartialEq,
73 Eq,
74 Serialize,
75 Deserialize,
76 derive_more::Deref,
77 derive_more::DerefMut,
78)]
79pub struct Stats {
80 #[deref]
82 #[deref_mut]
83 pub counters: RequestCounters,
84 pub elapsed: Duration,
86}
87
88impl Stats {
89 pub fn mbits(&self) -> f64 {
91 let data_len_bit = self.total_bytes_read() * 8;
92 data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
93 }
94
95 pub fn total_bytes_read(&self) -> u64 {
96 self.payload_bytes_read + self.other_bytes_read
97 }
98
99 pub fn combine(&mut self, that: &Stats) {
100 self.payload_bytes_written += that.payload_bytes_written;
101 self.other_bytes_written += that.other_bytes_written;
102 self.payload_bytes_read += that.payload_bytes_read;
103 self.other_bytes_read += that.other_bytes_read;
104 self.elapsed += that.elapsed;
105 }
106}
107
108#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
112pub mod fsm {
113 use std::{io, result};
114
115 use bao_tree::{
116 io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
117 BaoTree, ChunkRanges, TreeNode,
118 };
119 use derive_more::From;
120 use iroh::endpoint::Connection;
121 use iroh_io::AsyncSliceWriter;
122
123 use super::*;
124 use crate::{
125 get::get_error::BadRequestSnafu,
126 protocol::{
127 GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
128 },
129 util::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
130 };
131
132 self_cell::self_cell! {
133 struct RangesIterInner {
134 owner: ChunkRangesSeq,
135 #[not_covariant]
136 dependent: NonEmptyRequestRangeSpecIter,
137 }
138 }
139
140 pub fn start(
142 connection: Connection,
143 request: GetRequest,
144 counters: RequestCounters,
145 ) -> AtInitial {
146 AtInitial::new(connection, request, counters)
147 }
148
149 pub async fn start_get_many(
151 connection: Connection,
152 request: GetManyRequest,
153 counters: RequestCounters,
154 ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
155 let start = Instant::now();
156 let (mut writer, reader) = connection
157 .open_bi()
158 .await
159 .map_err(|e| OpenSnafu.into_error(e.into()))?;
160 let request = Request::GetMany(request);
161 let request_bytes = postcard::to_stdvec(&request)
162 .map_err(|source| BadRequestSnafu.into_error(source.into()))?;
163 writer
164 .send_bytes(request_bytes.into())
165 .await
166 .context(connected_next_error::WriteSnafu)?;
167 let Request::GetMany(request) = request else {
168 unreachable!();
169 };
170 let mut ranges_iter = RangesIter::new(request.ranges.clone());
171 let first_item = ranges_iter.next();
172 let misc = Box::new(Misc {
173 counters,
174 start,
175 ranges_iter,
176 });
177 Ok(match first_item {
178 Some((child_offset, child_ranges)) => Ok(AtStartChild {
179 ranges: child_ranges,
180 reader,
181 misc,
182 offset: child_offset,
183 }),
184 None => Err(AtClosing::new(misc, reader, true)),
185 })
186 }
187
188 struct RangesIter(RangesIterInner);
193
194 impl fmt::Debug for RangesIter {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 f.debug_struct("RangesIter").finish()
197 }
198 }
199
200 impl RangesIter {
201 pub fn new(owner: ChunkRangesSeq) -> Self {
202 Self(RangesIterInner::new(owner, |owner| {
203 owner.iter_non_empty_infinite()
204 }))
205 }
206
207 pub fn offset(&self) -> u64 {
208 self.0.with_dependent(|_owner, iter| iter.offset())
209 }
210 }
211
212 impl Iterator for RangesIter {
213 type Item = (u64, ChunkRanges);
214
215 fn next(&mut self) -> Option<Self::Item> {
216 self.0.with_dependent_mut(|_owner, iter| {
217 iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
218 })
219 }
220 }
221
222 #[derive(Debug)]
224 pub struct AtInitial {
225 connection: Connection,
226 request: GetRequest,
227 counters: RequestCounters,
228 }
229
230 impl AtInitial {
231 pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
236 Self {
237 connection,
238 request,
239 counters,
240 }
241 }
242
243 pub async fn next(self) -> Result<AtConnected, InitialNextError> {
245 let start = Instant::now();
246 let (writer, reader) = self
247 .connection
248 .open_bi()
249 .await
250 .map_err(|e| OpenSnafu.into_error(e.into()))?;
251 Ok(AtConnected {
252 start,
253 reader,
254 writer,
255 request: self.request,
256 counters: self.counters,
257 })
258 }
259 }
260
261 #[common_fields({
263 backtrace: Option<Backtrace>,
264 #[snafu(implicit)]
265 span_trace: SpanTrace,
266 })]
267 #[allow(missing_docs)]
268 #[derive(Debug, Snafu)]
269 #[non_exhaustive]
270 pub enum InitialNextError {
271 Open { source: io::Error },
272 }
273
274 #[derive(Debug)]
276 pub struct AtConnected<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
277 start: Instant,
278 reader: R,
279 writer: W,
280 request: GetRequest,
281 counters: RequestCounters,
282 }
283
284 #[derive(Debug, From)]
286 pub enum ConnectedNext<R: RecvStream = DefaultReader> {
287 StartRoot(AtStartRoot<R>),
289 StartChild(AtStartChild<R>),
291 Closing(AtClosing<R>),
293 }
294
295 #[common_fields({
297 backtrace: Option<Backtrace>,
298 #[snafu(implicit)]
299 span_trace: SpanTrace,
300 })]
301 #[allow(missing_docs)]
302 #[derive(Debug, Snafu)]
303 #[snafu(module)]
304 #[non_exhaustive]
305 pub enum ConnectedNextError {
306 #[snafu(display("postcard ser: {source}"))]
308 PostcardSer { source: postcard::Error },
309 #[snafu(display("request too big"))]
311 RequestTooBig {},
312 #[snafu(display("write: {source}"))]
314 Write { source: io::Error },
315 }
316
317 impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
318 pub fn new(
319 start: Instant,
320 reader: R,
321 writer: W,
322 request: GetRequest,
323 counters: RequestCounters,
324 ) -> Self {
325 Self {
326 start,
327 reader,
328 writer,
329 request,
330 counters,
331 }
332 }
333
334 pub async fn next(self) -> Result<ConnectedNext<R>, ConnectedNextError> {
341 let Self {
342 start,
343 reader,
344 mut writer,
345 mut request,
346 mut counters,
347 } = self;
348 counters.other_bytes_written += {
350 debug!("sending request");
351 let wrapped = Request::Get(request);
352 let request_bytes = postcard::to_stdvec(&wrapped)
353 .context(connected_next_error::PostcardSerSnafu)?;
354 let Request::Get(x) = wrapped else {
355 unreachable!();
356 };
357 request = x;
358
359 if request_bytes.len() > MAX_MESSAGE_SIZE {
360 return Err(connected_next_error::RequestTooBigSnafu.build());
361 }
362
363 let len = request_bytes.len() as u64;
365 writer
366 .send_bytes(request_bytes.into())
367 .await
368 .context(connected_next_error::WriteSnafu)?;
369 writer
370 .sync()
371 .await
372 .context(connected_next_error::WriteSnafu)?;
373 len
374 };
375
376 drop(writer);
378
379 let hash = request.hash;
380 let ranges_iter = RangesIter::new(request.ranges);
381 let mut misc = Box::new(Misc {
383 counters,
384 start,
385 ranges_iter,
386 });
387 Ok(match misc.ranges_iter.next() {
388 Some((offset, ranges)) => {
389 if offset == 0 {
390 AtStartRoot {
391 reader,
392 ranges,
393 misc,
394 hash,
395 }
396 .into()
397 } else {
398 AtStartChild {
399 reader,
400 ranges,
401 misc,
402 offset,
403 }
404 .into()
405 }
406 }
407 None => AtClosing::new(misc, reader, true).into(),
408 })
409 }
410 }
411
412 #[derive(Debug)]
414 pub struct AtStartRoot<R: RecvStream = DefaultReader> {
415 ranges: ChunkRanges,
416 reader: R,
417 misc: Box<Misc>,
418 hash: Hash,
419 }
420
421 #[derive(Debug)]
423 pub struct AtStartChild<R: RecvStream = DefaultReader> {
424 ranges: ChunkRanges,
425 reader: R,
426 misc: Box<Misc>,
427 offset: u64,
428 }
429
430 impl<R: RecvStream> AtStartChild<R> {
431 pub fn offset(&self) -> u64 {
437 self.offset
438 }
439
440 pub fn ranges(&self) -> &ChunkRanges {
442 &self.ranges
443 }
444
445 pub fn next(self, hash: Hash) -> AtBlobHeader<R> {
449 AtBlobHeader {
450 reader: self.reader,
451 ranges: self.ranges,
452 misc: self.misc,
453 hash,
454 }
455 }
456
457 pub fn finish(self) -> AtClosing<R> {
463 AtClosing::new(self.misc, self.reader, false)
464 }
465 }
466
467 impl<R: RecvStream> AtStartRoot<R> {
468 pub fn ranges(&self) -> &ChunkRanges {
470 &self.ranges
471 }
472
473 pub fn hash(&self) -> Hash {
475 self.hash
476 }
477
478 pub fn next(self) -> AtBlobHeader<R> {
482 AtBlobHeader {
483 reader: self.reader,
484 ranges: self.ranges,
485 hash: self.hash,
486 misc: self.misc,
487 }
488 }
489
490 pub fn finish(self) -> AtClosing<R> {
492 AtClosing::new(self.misc, self.reader, false)
493 }
494 }
495
496 #[derive(Debug)]
498 pub struct AtBlobHeader<R: RecvStream = DefaultReader> {
499 ranges: ChunkRanges,
500 reader: R,
501 misc: Box<Misc>,
502 hash: Hash,
503 }
504
505 #[common_fields({
507 backtrace: Option<Backtrace>,
508 #[snafu(implicit)]
509 span_trace: SpanTrace,
510 })]
511 #[non_exhaustive]
512 #[derive(Debug, Snafu)]
513 #[snafu(module)]
514 pub enum AtBlobHeaderNextError {
515 #[snafu(display("not found"))]
519 NotFound {},
520 #[snafu(display("io: {source}"))]
522 Read { source: io::Error },
523 }
524
525 impl From<AtBlobHeaderNextError> for io::Error {
526 fn from(cause: AtBlobHeaderNextError) -> Self {
527 match cause {
528 AtBlobHeaderNextError::NotFound { .. } => {
529 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
530 }
531 AtBlobHeaderNextError::Read { source, .. } => source,
532 }
533 }
534 }
535
536 impl<R: RecvStream> AtBlobHeader<R> {
537 pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
539 let mut size = [0; 8];
540 self.reader.recv_exact(&mut size).await.map_err(|cause| {
541 if cause.kind() == io::ErrorKind::UnexpectedEof {
542 at_blob_header_next_error::NotFoundSnafu.build()
543 } else {
544 at_blob_header_next_error::ReadSnafu.into_error(cause)
545 }
546 })?;
547 self.misc.other_bytes_read += 8;
548 let size = u64::from_le_bytes(size);
549 let stream = ResponseDecoder::new(
550 self.hash.into(),
551 self.ranges,
552 BaoTree::new(size, IROH_BLOCK_SIZE),
553 RecvStreamAsyncStreamReader::new(self.reader),
554 );
555 Ok((
556 AtBlobContent {
557 stream,
558 misc: self.misc,
559 },
560 size,
561 ))
562 }
563
564 pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
566 let (content, _size) = self.next().await?;
567 content.drain().await
568 }
569
570 pub async fn concatenate_into_vec(
575 self,
576 ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
577 let (content, _size) = self.next().await?;
578 content.concatenate_into_vec().await
579 }
580
581 pub async fn write_all<D: AsyncSliceWriter>(
583 self,
584 data: D,
585 ) -> result::Result<AtEndBlob<R>, DecodeError> {
586 let (content, _size) = self.next().await?;
587 let res = content.write_all(data).await?;
588 Ok(res)
589 }
590
591 pub async fn write_all_with_outboard<D, O>(
596 self,
597 outboard: Option<O>,
598 data: D,
599 ) -> result::Result<AtEndBlob<R>, DecodeError>
600 where
601 D: AsyncSliceWriter,
602 O: OutboardMut,
603 {
604 let (content, _size) = self.next().await?;
605 let res = content.write_all_with_outboard(outboard, data).await?;
606 Ok(res)
607 }
608
609 pub fn hash(&self) -> Hash {
611 self.hash
612 }
613
614 pub fn ranges(&self) -> &ChunkRanges {
616 &self.ranges
617 }
618
619 pub fn offset(&self) -> u64 {
621 self.misc.ranges_iter.offset()
622 }
623 }
624
625 #[derive(Debug)]
627 pub struct AtBlobContent<R: RecvStream = DefaultReader> {
628 stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
629 misc: Box<Misc>,
630 }
631
632 #[common_fields({
650 backtrace: Option<Backtrace>,
651 #[snafu(implicit)]
652 span_trace: SpanTrace,
653 })]
654 #[non_exhaustive]
655 #[derive(Debug, Snafu)]
656 #[snafu(module)]
657 pub enum DecodeError {
658 #[snafu(display("not found"))]
660 ChunkNotFound {},
661 #[snafu(display("parent not found {node:?}"))]
663 ParentNotFound { node: TreeNode },
664 #[snafu(display("chunk not found {num}"))]
666 LeafNotFound { num: ChunkNum },
667 #[snafu(display("parent hash mismatch: {node:?}"))]
669 ParentHashMismatch { node: TreeNode },
670 #[snafu(display("leaf hash mismatch: {num}"))]
672 LeafHashMismatch { num: ChunkNum },
673 #[snafu(display("read: {source}"))]
675 Read { source: io::Error },
676 #[snafu(display("io: {source}"))]
678 Write { source: io::Error },
679 }
680
681 impl DecodeError {
682 pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
683 decode_error::LeafHashMismatchSnafu { num }.build()
684 }
685 }
686
687 impl From<AtBlobHeaderNextError> for DecodeError {
688 fn from(cause: AtBlobHeaderNextError) -> Self {
689 match cause {
690 AtBlobHeaderNextError::NotFound { .. } => decode_error::ChunkNotFoundSnafu.build(),
691 AtBlobHeaderNextError::Read { source, .. } => {
692 decode_error::ReadSnafu.into_error(source)
693 }
694 }
695 }
696 }
697
698 impl From<DecodeError> for io::Error {
699 fn from(cause: DecodeError) -> Self {
700 match cause {
701 DecodeError::ParentNotFound { .. } => {
702 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
703 }
704 DecodeError::LeafNotFound { .. } => {
705 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
706 }
707 DecodeError::Read { source, .. } => source,
708 DecodeError::Write { source, .. } => source,
709 _ => io::Error::other(cause),
710 }
711 }
712 }
713
714 impl From<bao_tree::io::DecodeError> for DecodeError {
715 fn from(value: bao_tree::io::DecodeError) -> Self {
716 match value {
717 bao_tree::io::DecodeError::ParentNotFound(node) => {
718 decode_error::ParentNotFoundSnafu { node }.build()
719 }
720 bao_tree::io::DecodeError::LeafNotFound(num) => {
721 decode_error::LeafNotFoundSnafu { num }.build()
722 }
723 bao_tree::io::DecodeError::ParentHashMismatch(node) => {
724 decode_error::ParentHashMismatchSnafu { node }.build()
725 }
726 bao_tree::io::DecodeError::LeafHashMismatch(num) => {
727 decode_error::LeafHashMismatchSnafu { num }.build()
728 }
729 bao_tree::io::DecodeError::Io(cause) => decode_error::ReadSnafu.into_error(cause),
730 }
731 }
732 }
733
734 #[derive(Debug, From)]
736 pub enum BlobContentNext<R: RecvStream> {
737 More(
739 (
740 AtBlobContent<R>,
741 result::Result<BaoContentItem, DecodeError>,
742 ),
743 ),
744 Done(AtEndBlob<R>),
746 }
747
748 impl<R: RecvStream> AtBlobContent<R> {
749 pub async fn next(self) -> BlobContentNext<R> {
751 match self.stream.next().await {
752 ResponseDecoderNext::More((stream, res)) => {
753 let mut next = Self { stream, ..self };
754 let res = res.map_err(DecodeError::from);
755 match &res {
756 Ok(BaoContentItem::Parent(_)) => {
757 next.misc.other_bytes_read += 64;
758 }
759 Ok(BaoContentItem::Leaf(leaf)) => {
760 next.misc.payload_bytes_read += leaf.data.len() as u64;
761 }
762 _ => {}
763 }
764 BlobContentNext::More((next, res))
765 }
766 ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
767 stream: stream.into_inner(),
768 misc: self.misc,
769 }),
770 }
771 }
772
773 pub fn tree(&self) -> bao_tree::BaoTree {
775 self.stream.tree()
776 }
777
778 pub fn hash(&self) -> Hash {
780 (*self.stream.hash()).into()
781 }
782
783 pub fn offset(&self) -> u64 {
785 self.misc.ranges_iter.offset()
786 }
787
788 pub fn stats(&self) -> Stats {
790 Stats {
791 counters: self.misc.counters,
792 elapsed: self.misc.start.elapsed(),
793 }
794 }
795
796 pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
798 let mut content = self;
799 loop {
800 match content.next().await {
801 BlobContentNext::More((content1, res)) => {
802 let _ = res?;
803 content = content1;
804 }
805 BlobContentNext::Done(end) => {
806 break Ok(end);
807 }
808 }
809 }
810 }
811
812 pub async fn concatenate_into_vec(
814 self,
815 ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
816 let mut res = Vec::with_capacity(1024);
817 let mut curr = self;
818 let done = loop {
819 match curr.next().await {
820 BlobContentNext::More((next, data)) => {
821 if let BaoContentItem::Leaf(leaf) = data? {
822 res.extend_from_slice(&leaf.data);
823 }
824 curr = next;
825 }
826 BlobContentNext::Done(done) => {
827 break done;
829 }
830 }
831 };
832 Ok((done, res))
833 }
834
835 pub async fn write_all_with_outboard<D, O>(
840 self,
841 mut outboard: Option<O>,
842 mut data: D,
843 ) -> result::Result<AtEndBlob<R>, DecodeError>
844 where
845 D: AsyncSliceWriter,
846 O: OutboardMut,
847 {
848 let mut content = self;
849 loop {
850 match content.next().await {
851 BlobContentNext::More((content1, item)) => {
852 content = content1;
853 match item? {
854 BaoContentItem::Parent(parent) => {
855 if let Some(outboard) = outboard.as_mut() {
856 outboard
857 .save(parent.node, &parent.pair)
858 .await
859 .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
860 }
861 }
862 BaoContentItem::Leaf(leaf) => {
863 data.write_bytes_at(leaf.offset, leaf.data)
864 .await
865 .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
866 }
867 }
868 }
869 BlobContentNext::Done(end) => {
870 return Ok(end);
871 }
872 }
873 }
874 }
875
876 pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob<R>, DecodeError>
878 where
879 D: AsyncSliceWriter,
880 {
881 let mut content = self;
882 loop {
883 match content.next().await {
884 BlobContentNext::More((content1, item)) => {
885 content = content1;
886 match item? {
887 BaoContentItem::Parent(_) => {}
888 BaoContentItem::Leaf(leaf) => {
889 data.write_bytes_at(leaf.offset, leaf.data)
890 .await
891 .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
892 }
893 }
894 }
895 BlobContentNext::Done(end) => {
896 return Ok(end);
897 }
898 }
899 }
900 }
901
902 pub fn finish(self) -> AtClosing<R> {
904 AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
905 }
906 }
907
908 #[derive(Debug)]
910 pub struct AtEndBlob<R: RecvStream = DefaultReader> {
911 stream: R,
912 misc: Box<Misc>,
913 }
914
915 #[derive(Debug, From)]
917 pub enum EndBlobNext<R: RecvStream = DefaultReader> {
918 MoreChildren(AtStartChild<R>),
920 Closing(AtClosing<R>),
922 }
923
924 impl<R: RecvStream> AtEndBlob<R> {
925 pub fn next(mut self) -> EndBlobNext<R> {
927 if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
928 AtStartChild {
929 reader: self.stream,
930 offset,
931 ranges,
932 misc: self.misc,
933 }
934 .into()
935 } else {
936 AtClosing::new(self.misc, self.stream, true).into()
937 }
938 }
939 }
940
941 #[derive(Debug)]
943 pub struct AtClosing<R: RecvStream = DefaultReader> {
944 misc: Box<Misc>,
945 reader: R,
946 check_extra_data: bool,
947 }
948
949 impl<R: RecvStream> AtClosing<R> {
950 fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
951 Self {
952 misc,
953 reader,
954 check_extra_data,
955 }
956 }
957
958 pub async fn next(self) -> result::Result<Stats, AtClosingNextError> {
960 let mut reader = self.reader;
962 if self.check_extra_data {
963 let rest = reader.recv_bytes(1).await?;
964 if !rest.is_empty() {
965 error!("Unexpected extra data at the end of the stream");
966 }
967 }
968 Ok(Stats {
969 counters: self.misc.counters,
970 elapsed: self.misc.start.elapsed(),
971 })
972 }
973 }
974
975 #[common_fields({
977 backtrace: Option<Backtrace>,
978 #[snafu(implicit)]
979 span_trace: SpanTrace,
980 })]
981 #[non_exhaustive]
982 #[derive(Debug, Snafu)]
983 #[snafu(module)]
984 pub enum AtClosingNextError {
985 #[snafu(transparent)]
987 Read { source: io::Error },
988 }
989
990 #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
991 pub struct RequestCounters {
992 pub payload_bytes_written: u64,
994 pub other_bytes_written: u64,
996 pub payload_bytes_read: u64,
998 pub other_bytes_read: u64,
1000 }
1001
1002 #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
1004 struct Misc {
1005 start: Instant,
1007 #[deref]
1009 #[deref_mut]
1010 counters: RequestCounters,
1011 ranges_iter: RangesIter,
1013 }
1014}