1use std::fmt::{self, Debug};
20
21use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
22use fsm::RequestCounters;
23use n0_error::Result;
24use n0_future::time::{Duration, Instant};
25use serde::{Deserialize, Serialize};
26use tracing::{debug, error};
27
28use crate::{
29 protocol::ChunkRangesSeq,
30 store::IROH_BLOCK_SIZE,
31 util::{RecvStream, SendStream},
32 Hash,
33};
34
35mod error;
36pub mod request;
37pub use error::{GetError, GetResult};
38
39type DefaultReader = iroh::endpoint::RecvStream;
40type DefaultWriter = iroh::endpoint::SendStream;
41
42pub struct StreamPair<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
43 pub connection_id: u64,
44 pub t0: Instant,
45 pub recv: R,
46 pub send: W,
47}
48
49impl<R: RecvStream, W: SendStream> StreamPair<R, W> {
50 pub fn new(connection_id: u64, recv: R, send: W) -> Self {
51 Self {
52 t0: Instant::now(),
53 recv,
54 send,
55 connection_id,
56 }
57 }
58}
59
60#[derive(
62 Debug,
63 Default,
64 Clone,
65 PartialEq,
66 Eq,
67 Serialize,
68 Deserialize,
69 derive_more::Deref,
70 derive_more::DerefMut,
71)]
72pub struct Stats {
73 #[deref]
75 #[deref_mut]
76 pub counters: RequestCounters,
77 pub elapsed: Duration,
79}
80
81impl Stats {
82 pub fn mbits(&self) -> f64 {
84 let data_len_bit = self.total_bytes_read() * 8;
85 data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
86 }
87
88 pub fn total_bytes_read(&self) -> u64 {
89 self.payload_bytes_read + self.other_bytes_read
90 }
91
92 pub fn combine(&mut self, that: &Stats) {
93 self.payload_bytes_written += that.payload_bytes_written;
94 self.other_bytes_written += that.other_bytes_written;
95 self.payload_bytes_read += that.payload_bytes_read;
96 self.other_bytes_read += that.other_bytes_read;
97 self.elapsed += that.elapsed;
98 }
99}
100
101#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
105pub mod fsm {
106 use std::{io, result};
107
108 use bao_tree::{
109 io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
110 BaoTree, ChunkRanges, TreeNode,
111 };
112 use derive_more::From;
113 use iroh::endpoint::Connection;
114 use iroh_io::AsyncSliceWriter;
115 use n0_error::{e, stack_error, AnyError};
116
117 use super::*;
118 use crate::{
119 protocol::{
120 GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
121 },
122 util::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
123 };
124
125 self_cell::self_cell! {
126 struct RangesIterInner {
127 owner: ChunkRangesSeq,
128 #[not_covariant]
129 dependent: NonEmptyRequestRangeSpecIter,
130 }
131 }
132
133 pub fn start(
135 connection: Connection,
136 request: GetRequest,
137 counters: RequestCounters,
138 ) -> AtInitial {
139 AtInitial::new(connection, request, counters)
140 }
141
142 pub async fn start_get_many(
144 connection: Connection,
145 request: GetManyRequest,
146 counters: RequestCounters,
147 ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
148 let start = Instant::now();
149 let (mut writer, reader) = connection
150 .open_bi()
151 .await
152 .map_err(|e| e!(InitialNextError::Open, e.into()))?;
153 let request = Request::GetMany(request);
154 let request_bytes = postcard::to_stdvec(&request)
155 .map_err(|source| e!(GetError::BadRequest, AnyError::from_std(source)))?;
156 writer
157 .send_bytes(request_bytes.into())
158 .await
159 .map_err(|source| e!(ConnectedNextError::Write, source))?;
160 let Request::GetMany(request) = request else {
161 unreachable!();
162 };
163 let mut ranges_iter = RangesIter::new(request.ranges.clone());
164 let first_item = ranges_iter.next();
165 let misc = Box::new(Misc {
166 counters,
167 start,
168 ranges_iter,
169 });
170 Ok(match first_item {
171 Some((child_offset, child_ranges)) => Ok(AtStartChild {
172 ranges: child_ranges,
173 reader,
174 misc,
175 offset: child_offset,
176 }),
177 None => Err(AtClosing::new(misc, reader, true)),
178 })
179 }
180
181 struct RangesIter(RangesIterInner);
186
187 impl fmt::Debug for RangesIter {
188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189 f.debug_struct("RangesIter").finish()
190 }
191 }
192
193 impl RangesIter {
194 pub fn new(owner: ChunkRangesSeq) -> Self {
195 Self(RangesIterInner::new(owner, |owner| {
196 owner.iter_non_empty_infinite()
197 }))
198 }
199
200 pub fn offset(&self) -> u64 {
201 self.0.with_dependent(|_owner, iter| iter.offset())
202 }
203 }
204
205 impl Iterator for RangesIter {
206 type Item = (u64, ChunkRanges);
207
208 fn next(&mut self) -> Option<Self::Item> {
209 self.0.with_dependent_mut(|_owner, iter| {
210 iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
211 })
212 }
213 }
214
215 #[derive(Debug)]
217 pub struct AtInitial {
218 connection: Connection,
219 request: GetRequest,
220 counters: RequestCounters,
221 }
222
223 impl AtInitial {
224 pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
229 Self {
230 connection,
231 request,
232 counters,
233 }
234 }
235
236 pub async fn next(self) -> Result<AtConnected, InitialNextError> {
238 let start = Instant::now();
239 let (writer, reader) = self
240 .connection
241 .open_bi()
242 .await
243 .map_err(|e| e!(InitialNextError::Open, e.into()))?;
244 Ok(AtConnected {
245 start,
246 reader,
247 writer,
248 request: self.request,
249 counters: self.counters,
250 })
251 }
252 }
253
254 #[stack_error(derive, add_meta, from_sources)]
256 pub enum InitialNextError {
257 #[error("open: {source}")]
258 Open {
259 #[error(std_err)]
260 source: io::Error,
261 },
262 }
263
264 #[derive(Debug)]
266 pub struct AtConnected<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
267 start: Instant,
268 reader: R,
269 writer: W,
270 request: GetRequest,
271 counters: RequestCounters,
272 }
273
274 #[derive(Debug, From)]
276 pub enum ConnectedNext<R: RecvStream = DefaultReader> {
277 StartRoot(AtStartRoot<R>),
279 StartChild(AtStartChild<R>),
281 Closing(AtClosing<R>),
283 }
284
285 #[stack_error(derive, add_meta)]
287 pub enum ConnectedNextError {
288 #[error("postcard ser: {source}")]
290 PostcardSer {
291 #[error(std_err)]
292 source: postcard::Error,
293 },
294 #[error("request too big")]
296 RequestTooBig {},
297 #[error("write: {source}")]
299 Write {
300 #[error(std_err)]
301 source: io::Error,
302 },
303 }
304
305 impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
306 pub fn new(
307 start: Instant,
308 reader: R,
309 writer: W,
310 request: GetRequest,
311 counters: RequestCounters,
312 ) -> Self {
313 Self {
314 start,
315 reader,
316 writer,
317 request,
318 counters,
319 }
320 }
321
322 pub async fn next(self) -> Result<ConnectedNext<R>, ConnectedNextError> {
329 let Self {
330 start,
331 reader,
332 mut writer,
333 mut request,
334 mut counters,
335 } = self;
336 counters.other_bytes_written += {
338 debug!("sending request");
339 let wrapped = Request::Get(request);
340 let request_bytes = postcard::to_stdvec(&wrapped)
341 .map_err(|source| e!(ConnectedNextError::PostcardSer, source))?;
342 let Request::Get(x) = wrapped else {
343 unreachable!();
344 };
345 request = x;
346
347 if request_bytes.len() > MAX_MESSAGE_SIZE {
348 return Err(e!(ConnectedNextError::RequestTooBig));
349 }
350
351 let len = request_bytes.len() as u64;
353 writer
354 .send_bytes(request_bytes.into())
355 .await
356 .map_err(|source| e!(ConnectedNextError::Write, source))?;
357 writer
358 .sync()
359 .await
360 .map_err(|source| e!(ConnectedNextError::Write, source))?;
361 len
362 };
363
364 drop(writer);
366
367 let hash = request.hash;
368 let ranges_iter = RangesIter::new(request.ranges);
369 let mut misc = Box::new(Misc {
371 counters,
372 start,
373 ranges_iter,
374 });
375 Ok(match misc.ranges_iter.next() {
376 Some((offset, ranges)) => {
377 if offset == 0 {
378 AtStartRoot {
379 reader,
380 ranges,
381 misc,
382 hash,
383 }
384 .into()
385 } else {
386 AtStartChild {
387 reader,
388 ranges,
389 misc,
390 offset,
391 }
392 .into()
393 }
394 }
395 None => AtClosing::new(misc, reader, true).into(),
396 })
397 }
398 }
399
400 #[derive(Debug)]
402 pub struct AtStartRoot<R: RecvStream = DefaultReader> {
403 ranges: ChunkRanges,
404 reader: R,
405 misc: Box<Misc>,
406 hash: Hash,
407 }
408
409 #[derive(Debug)]
411 pub struct AtStartChild<R: RecvStream = DefaultReader> {
412 ranges: ChunkRanges,
413 reader: R,
414 misc: Box<Misc>,
415 offset: u64,
416 }
417
418 impl<R: RecvStream> AtStartChild<R> {
419 pub fn offset(&self) -> u64 {
425 self.offset
426 }
427
428 pub fn ranges(&self) -> &ChunkRanges {
430 &self.ranges
431 }
432
433 pub fn next(self, hash: Hash) -> AtBlobHeader<R> {
437 AtBlobHeader {
438 reader: self.reader,
439 ranges: self.ranges,
440 misc: self.misc,
441 hash,
442 }
443 }
444
445 pub fn finish(self) -> AtClosing<R> {
451 AtClosing::new(self.misc, self.reader, false)
452 }
453 }
454
455 impl<R: RecvStream> AtStartRoot<R> {
456 pub fn ranges(&self) -> &ChunkRanges {
458 &self.ranges
459 }
460
461 pub fn hash(&self) -> Hash {
463 self.hash
464 }
465
466 pub fn next(self) -> AtBlobHeader<R> {
470 AtBlobHeader {
471 reader: self.reader,
472 ranges: self.ranges,
473 hash: self.hash,
474 misc: self.misc,
475 }
476 }
477
478 pub fn finish(self) -> AtClosing<R> {
480 AtClosing::new(self.misc, self.reader, false)
481 }
482 }
483
484 #[derive(Debug)]
486 pub struct AtBlobHeader<R: RecvStream = DefaultReader> {
487 ranges: ChunkRanges,
488 reader: R,
489 misc: Box<Misc>,
490 hash: Hash,
491 }
492
493 #[stack_error(derive, add_meta)]
495 pub enum AtBlobHeaderNextError {
496 #[error("not found")]
500 NotFound {},
501 #[error("io: {source}")]
503 Read {
504 #[error(std_err)]
505 source: io::Error,
506 },
507 }
508
509 impl From<AtBlobHeaderNextError> for io::Error {
510 fn from(cause: AtBlobHeaderNextError) -> Self {
511 match cause {
512 AtBlobHeaderNextError::NotFound { .. } => {
513 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
514 }
515 AtBlobHeaderNextError::Read { source, .. } => source,
516 }
517 }
518 }
519
520 impl<R: RecvStream> AtBlobHeader<R> {
521 pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
523 let mut size = [0; 8];
524 self.reader.recv_exact(&mut size).await.map_err(|cause| {
525 if cause.kind() == io::ErrorKind::UnexpectedEof {
526 e!(AtBlobHeaderNextError::NotFound)
527 } else {
528 e!(AtBlobHeaderNextError::Read, cause)
529 }
530 })?;
531 self.misc.other_bytes_read += 8;
532 let size = u64::from_le_bytes(size);
533 let stream = ResponseDecoder::new(
534 self.hash.into(),
535 self.ranges,
536 BaoTree::new(size, IROH_BLOCK_SIZE),
537 RecvStreamAsyncStreamReader::new(self.reader),
538 );
539 Ok((
540 AtBlobContent {
541 stream,
542 misc: self.misc,
543 },
544 size,
545 ))
546 }
547
548 pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
550 let (content, _size) = self.next().await?;
551 content.drain().await
552 }
553
554 pub async fn concatenate_into_vec(
559 self,
560 ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
561 let (content, _size) = self.next().await?;
562 content.concatenate_into_vec().await
563 }
564
565 pub async fn write_all<D: AsyncSliceWriter>(
567 self,
568 data: D,
569 ) -> result::Result<AtEndBlob<R>, DecodeError> {
570 let (content, _size) = self.next().await?;
571 let res = content.write_all(data).await?;
572 Ok(res)
573 }
574
575 pub async fn write_all_with_outboard<D, O>(
580 self,
581 outboard: Option<O>,
582 data: D,
583 ) -> result::Result<AtEndBlob<R>, DecodeError>
584 where
585 D: AsyncSliceWriter,
586 O: OutboardMut,
587 {
588 let (content, _size) = self.next().await?;
589 let res = content.write_all_with_outboard(outboard, data).await?;
590 Ok(res)
591 }
592
593 pub fn hash(&self) -> Hash {
595 self.hash
596 }
597
598 pub fn ranges(&self) -> &ChunkRanges {
600 &self.ranges
601 }
602
603 pub fn offset(&self) -> u64 {
605 self.misc.ranges_iter.offset()
606 }
607 }
608
609 #[derive(Debug)]
611 pub struct AtBlobContent<R: RecvStream = DefaultReader> {
612 stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
613 misc: Box<Misc>,
614 }
615
616 #[non_exhaustive]
634 #[stack_error(derive, add_meta)]
635 pub enum DecodeError {
636 #[error("not found")]
638 ChunkNotFound {},
639 #[error("parent not found {node:?}")]
641 ParentNotFound { node: TreeNode },
642 #[error("chunk not found {num}")]
644 LeafNotFound { num: ChunkNum },
645 #[error("parent hash mismatch: {node:?}")]
647 ParentHashMismatch { node: TreeNode },
648 #[error("leaf hash mismatch: {num}")]
650 LeafHashMismatch { num: ChunkNum },
651 #[error("read: {source}")]
653 Read {
654 #[error(std_err)]
655 source: io::Error,
656 },
657 #[error("io: {source}")]
659 Write {
660 #[error(std_err)]
661 source: io::Error,
662 },
663 }
664
665 impl DecodeError {
666 pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
667 e!(DecodeError::LeafHashMismatch { num })
668 }
669 }
670
671 impl From<AtBlobHeaderNextError> for DecodeError {
672 fn from(cause: AtBlobHeaderNextError) -> Self {
673 match cause {
674 AtBlobHeaderNextError::NotFound { .. } => e!(DecodeError::ChunkNotFound),
675 AtBlobHeaderNextError::Read { source, .. } => e!(DecodeError::Read, source),
676 }
677 }
678 }
679
680 impl From<DecodeError> for io::Error {
681 fn from(cause: DecodeError) -> Self {
682 match cause {
683 DecodeError::ParentNotFound { .. } => {
684 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
685 }
686 DecodeError::LeafNotFound { .. } => {
687 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
688 }
689 DecodeError::Read { source, .. } => source,
690 DecodeError::Write { source, .. } => source,
691 _ => io::Error::other(cause),
692 }
693 }
694 }
695
696 impl From<bao_tree::io::DecodeError> for DecodeError {
697 fn from(value: bao_tree::io::DecodeError) -> Self {
698 match value {
699 bao_tree::io::DecodeError::ParentNotFound(node) => {
700 e!(DecodeError::ParentNotFound { node })
701 }
702 bao_tree::io::DecodeError::LeafNotFound(num) => {
703 e!(DecodeError::LeafNotFound { num })
704 }
705 bao_tree::io::DecodeError::ParentHashMismatch(node) => {
706 e!(DecodeError::ParentHashMismatch { node })
707 }
708 bao_tree::io::DecodeError::LeafHashMismatch(num) => {
709 e!(DecodeError::LeafHashMismatch { num })
710 }
711 bao_tree::io::DecodeError::Io(cause) => e!(DecodeError::Read, cause),
712 }
713 }
714 }
715
716 #[derive(Debug, From)]
718 pub enum BlobContentNext<R: RecvStream> {
719 More(
721 (
722 AtBlobContent<R>,
723 result::Result<BaoContentItem, DecodeError>,
724 ),
725 ),
726 Done(AtEndBlob<R>),
728 }
729
730 impl<R: RecvStream> AtBlobContent<R> {
731 pub async fn next(self) -> BlobContentNext<R> {
733 match self.stream.next().await {
734 ResponseDecoderNext::More((stream, res)) => {
735 let mut next = Self { stream, ..self };
736 let res = res.map_err(DecodeError::from);
737 match &res {
738 Ok(BaoContentItem::Parent(_)) => {
739 next.misc.other_bytes_read += 64;
740 }
741 Ok(BaoContentItem::Leaf(leaf)) => {
742 next.misc.payload_bytes_read += leaf.data.len() as u64;
743 }
744 _ => {}
745 }
746 BlobContentNext::More((next, res))
747 }
748 ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
749 stream: stream.into_inner(),
750 misc: self.misc,
751 }),
752 }
753 }
754
755 pub fn tree(&self) -> bao_tree::BaoTree {
757 self.stream.tree()
758 }
759
760 pub fn hash(&self) -> Hash {
762 (*self.stream.hash()).into()
763 }
764
765 pub fn offset(&self) -> u64 {
767 self.misc.ranges_iter.offset()
768 }
769
770 pub fn stats(&self) -> Stats {
772 Stats {
773 counters: self.misc.counters,
774 elapsed: self.misc.start.elapsed(),
775 }
776 }
777
778 pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
780 let mut content = self;
781 loop {
782 match content.next().await {
783 BlobContentNext::More((content1, res)) => {
784 let _ = res?;
785 content = content1;
786 }
787 BlobContentNext::Done(end) => {
788 break Ok(end);
789 }
790 }
791 }
792 }
793
794 pub async fn concatenate_into_vec(
796 self,
797 ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
798 let mut res = Vec::with_capacity(1024);
799 let mut curr = self;
800 let done = loop {
801 match curr.next().await {
802 BlobContentNext::More((next, data)) => {
803 if let BaoContentItem::Leaf(leaf) = data? {
804 res.extend_from_slice(&leaf.data);
805 }
806 curr = next;
807 }
808 BlobContentNext::Done(done) => {
809 break done;
811 }
812 }
813 };
814 Ok((done, res))
815 }
816
817 pub async fn write_all_with_outboard<D, O>(
822 self,
823 mut outboard: Option<O>,
824 mut data: D,
825 ) -> result::Result<AtEndBlob<R>, DecodeError>
826 where
827 D: AsyncSliceWriter,
828 O: OutboardMut,
829 {
830 let mut content = self;
831 loop {
832 match content.next().await {
833 BlobContentNext::More((content1, item)) => {
834 content = content1;
835 match item? {
836 BaoContentItem::Parent(parent) => {
837 if let Some(outboard) = outboard.as_mut() {
838 outboard
839 .save(parent.node, &parent.pair)
840 .await
841 .map_err(|e| e!(DecodeError::Write, e))?;
842 }
843 }
844 BaoContentItem::Leaf(leaf) => {
845 data.write_bytes_at(leaf.offset, leaf.data)
846 .await
847 .map_err(|e| e!(DecodeError::Write, e))?;
848 }
849 }
850 }
851 BlobContentNext::Done(end) => {
852 return Ok(end);
853 }
854 }
855 }
856 }
857
858 pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob<R>, DecodeError>
860 where
861 D: AsyncSliceWriter,
862 {
863 let mut content = self;
864 loop {
865 match content.next().await {
866 BlobContentNext::More((content1, item)) => {
867 content = content1;
868 match item? {
869 BaoContentItem::Parent(_) => {}
870 BaoContentItem::Leaf(leaf) => {
871 data.write_bytes_at(leaf.offset, leaf.data)
872 .await
873 .map_err(|e| e!(DecodeError::Write, e))?;
874 }
875 }
876 }
877 BlobContentNext::Done(end) => {
878 return Ok(end);
879 }
880 }
881 }
882 }
883
884 pub fn finish(self) -> AtClosing<R> {
886 AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
887 }
888 }
889
890 #[derive(Debug)]
892 pub struct AtEndBlob<R: RecvStream = DefaultReader> {
893 stream: R,
894 misc: Box<Misc>,
895 }
896
897 #[derive(Debug, From)]
899 pub enum EndBlobNext<R: RecvStream = DefaultReader> {
900 MoreChildren(AtStartChild<R>),
902 Closing(AtClosing<R>),
904 }
905
906 impl<R: RecvStream> AtEndBlob<R> {
907 pub fn next(mut self) -> EndBlobNext<R> {
909 if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
910 AtStartChild {
911 reader: self.stream,
912 offset,
913 ranges,
914 misc: self.misc,
915 }
916 .into()
917 } else {
918 AtClosing::new(self.misc, self.stream, true).into()
919 }
920 }
921 }
922
923 #[derive(Debug)]
925 pub struct AtClosing<R: RecvStream = DefaultReader> {
926 misc: Box<Misc>,
927 reader: R,
928 check_extra_data: bool,
929 }
930
931 impl<R: RecvStream> AtClosing<R> {
932 fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
933 Self {
934 misc,
935 reader,
936 check_extra_data,
937 }
938 }
939
940 pub async fn next(self) -> result::Result<Stats, AtClosingNextError> {
942 let mut reader = self.reader;
944 if self.check_extra_data {
945 let rest = reader.recv_bytes(1).await?;
946 if !rest.is_empty() {
947 error!("Unexpected extra data at the end of the stream");
948 }
949 }
950 Ok(Stats {
951 counters: self.misc.counters,
952 elapsed: self.misc.start.elapsed(),
953 })
954 }
955 }
956
957 #[stack_error(derive, add_meta, from_sources)]
959 pub enum AtClosingNextError {
960 #[error(transparent)]
962 Read {
963 #[error(std_err)]
964 source: io::Error,
965 },
966 }
967
968 #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
969 pub struct RequestCounters {
970 pub payload_bytes_written: u64,
972 pub other_bytes_written: u64,
974 pub payload_bytes_read: u64,
976 pub other_bytes_read: u64,
978 }
979
980 #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
982 struct Misc {
983 start: Instant,
985 #[deref]
987 #[deref_mut]
988 counters: RequestCounters,
989 ranges_iter: RangesIter,
991 }
992}