iroh_quinn/recv_stream.rs
1use std::{
2 future::{Future, poll_fn},
3 io,
4 pin::Pin,
5 task::{Context, Poll, ready},
6};
7
8use bytes::Bytes;
9use proto::{Chunk, Chunks, ClosedStream, ConnectionError, ReadableError, StreamId};
10use thiserror::Error;
11use tokio::io::ReadBuf;
12
13use crate::{VarInt, connection::ConnectionRef};
14
15/// A stream that can only be used to receive data
16///
17/// `stop(0)` is implicitly called on drop unless:
18/// - A variant of [`ReadError`] has been yielded by a read call
19/// - [`stop()`] was called explicitly
20///
21/// # Cancellation
22///
23/// A `read` method is said to be *cancel-safe* when dropping its future before the future becomes
24/// ready cannot lead to loss of stream data. This is true of methods which succeed immediately when
25/// any progress is made, and is not true of methods which might need to perform multiple reads
26/// internally before succeeding. Each `read` method documents whether it is cancel-safe.
27///
28/// # Common issues
29///
30/// ## Data never received on a locally-opened stream
31///
32/// Peers are not notified of streams until they or a later-numbered stream are used to send
33/// data. If a bidirectional stream is locally opened but never used to send, then the peer may
34/// never see it. Application protocols should always arrange for the endpoint which will first
35/// transmit on a stream to be the endpoint responsible for opening it.
36///
37/// ## Data never received on a remotely-opened stream
38///
39/// Verify that the stream you are receiving is the same one that the server is sending on, e.g. by
40/// logging the [`id`] of each. Streams are always accepted in the same order as they are created,
41/// i.e. ascending order by [`StreamId`]. For example, even if a sender first transmits on
42/// bidirectional stream 1, the first stream yielded by [`Connection::accept_bi`] on the receiver
43/// will be bidirectional stream 0.
44///
45/// [`ReadError`]: crate::ReadError
46/// [`stop()`]: RecvStream::stop
47/// [`SendStream::finish`]: crate::SendStream::finish
48/// [`WriteError::Stopped`]: crate::WriteError::Stopped
49/// [`id`]: RecvStream::id
50/// [`Connection::accept_bi`]: crate::Connection::accept_bi
51#[derive(Debug)]
52pub struct RecvStream {
53 conn: ConnectionRef,
54 stream: StreamId,
55 is_0rtt: bool,
56 all_data_read: bool,
57 reset: Option<VarInt>,
58}
59
60impl RecvStream {
61 pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
62 Self {
63 conn,
64 stream,
65 is_0rtt,
66 all_data_read: false,
67 reset: None,
68 }
69 }
70
71 /// Read data contiguously from the stream.
72 ///
73 /// Yields the number of bytes read into `buf` on success, or `None` if the stream was finished.
74 ///
75 /// This operation is cancel-safe.
76 pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
77 Read {
78 stream: self,
79 buf: ReadBuf::new(buf),
80 }
81 .await
82 }
83
84 /// Read an exact number of bytes contiguously from the stream.
85 ///
86 /// See [`read()`] for details. This operation is *not* cancel-safe.
87 ///
88 /// [`read()`]: RecvStream::read
89 pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> {
90 ReadExact {
91 stream: self,
92 buf: ReadBuf::new(buf),
93 }
94 .await
95 }
96
97 /// Attempts to read from the stream into the provided buffer
98 ///
99 /// On success, returns `Poll::Ready(Ok(num_bytes_read))` and places data into `buf`. If this
100 /// returns zero bytes read (and `buf` has a non-zero length), that indicates that the remote
101 /// side has [`finish`]ed the stream and the local side has already read all bytes.
102 ///
103 /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
104 /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
105 /// closed.
106 ///
107 /// [`finish`]: crate::SendStream::finish
108 pub fn poll_read(
109 &mut self,
110 cx: &mut Context,
111 buf: &mut [u8],
112 ) -> Poll<Result<usize, ReadError>> {
113 let mut buf = ReadBuf::new(buf);
114 ready!(self.poll_read_buf(cx, &mut buf))?;
115 Poll::Ready(Ok(buf.filled().len()))
116 }
117
118 /// Attempts to read from the stream into the provided buffer, which may be uninitialized
119 ///
120 /// On success, returns `Poll::Ready(Ok(()))` and places data into the unfilled portion of
121 /// `buf`. If this does not write any bytes to `buf` (and `buf.remaining()` is non-zero), that
122 /// indicates that the remote side has [`finish`]ed the stream and the local side has already
123 /// read all bytes.
124 ///
125 /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
126 /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
127 /// closed.
128 ///
129 /// [`finish`]: crate::SendStream::finish
130 pub fn poll_read_buf(
131 &mut self,
132 cx: &mut Context,
133 buf: &mut ReadBuf<'_>,
134 ) -> Poll<Result<(), ReadError>> {
135 if buf.remaining() == 0 {
136 return Poll::Ready(Ok(()));
137 }
138
139 self.poll_read_generic(cx, true, |chunks| {
140 let mut read = false;
141 loop {
142 if buf.remaining() == 0 {
143 // We know `read` is `true` because `buf.remaining()` was not 0 before
144 return ReadStatus::Readable(());
145 }
146
147 match chunks.next(buf.remaining()) {
148 Ok(Some(chunk)) => {
149 buf.put_slice(&chunk.bytes);
150 read = true;
151 }
152 res => return (if read { Some(()) } else { None }, res.err()).into(),
153 }
154 }
155 })
156 .map(|res| res.map(|_| ()))
157 }
158
159 /// Read the next segment of data
160 ///
161 /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
162 /// offset in the stream. The chunk's offset will be immediately after
163 /// the last data yielded by [`RecvStream::read`] or [`RecvStream::read_chunk`].
164 ///
165 /// For unordered reads, convert the stream into an unordered stream using [`Self::into_unordered`].
166 ///
167 /// Slightly more efficient than [`RecvStream::read`] due to not copying. Chunk boundaries do not correspond
168 /// to peer writes, and hence cannot be used as framing.
169 ///
170 /// This operation is cancel-safe.
171 pub async fn read_chunk(&mut self, max_length: usize) -> Result<Option<Chunk>, ReadError> {
172 ReadChunk {
173 stream: self,
174 max_length,
175 ordered: true,
176 }
177 .await
178 }
179
180 /// Attempts to read a chunk from the stream.
181 ///
182 /// On success, returns `Poll::Ready(Ok(Some(chunk)))`. If `Poll::Ready(Ok(None))`
183 /// is returned, it implies that EOF has been reached.
184 ///
185 /// If no data is available for reading, the method returns `Poll::Pending`
186 /// and arranges for the current task (via cx.waker()) to receive a notification
187 /// when the stream becomes readable or is closed.
188 fn poll_read_chunk(
189 &mut self,
190 cx: &mut Context,
191 max_length: usize,
192 ordered: bool,
193 ) -> Poll<Result<Option<Chunk>, ReadError>> {
194 self.poll_read_generic(cx, ordered, |chunks| match chunks.next(max_length) {
195 Ok(Some(chunk)) => ReadStatus::Readable(chunk),
196 res => (None, res.err()).into(),
197 })
198 }
199
200 /// Read the next segments of data
201 ///
202 /// Fills `bufs` with the segments of data beginning immediately after the
203 /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
204 /// finished.
205 ///
206 /// Slightly more efficient than `read` due to not copying. Chunk boundaries
207 /// do not correspond to peer writes, and hence cannot be used as framing.
208 ///
209 /// This operation is cancel-safe.
210 pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
211 ReadChunks { stream: self, bufs }.await
212 }
213
214 /// Foundation of [`Self::read_chunks`]
215 fn poll_read_chunks(
216 &mut self,
217 cx: &mut Context,
218 bufs: &mut [Bytes],
219 ) -> Poll<Result<Option<usize>, ReadError>> {
220 if bufs.is_empty() {
221 return Poll::Ready(Ok(Some(0)));
222 }
223
224 self.poll_read_generic(cx, true, |chunks| {
225 let mut read = 0;
226 loop {
227 if read >= bufs.len() {
228 // We know `read > 0` because `bufs` cannot be empty here
229 return ReadStatus::Readable(read);
230 }
231
232 match chunks.next(usize::MAX) {
233 Ok(Some(chunk)) => {
234 bufs[read] = chunk.bytes;
235 read += 1;
236 }
237 res => return (if read == 0 { None } else { Some(read) }, res.err()).into(),
238 }
239 }
240 })
241 }
242
243 /// Convenience method to read all remaining data into a buffer
244 ///
245 /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
246 /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
247 /// allow. `size_limit` should be set to limit worst-case memory use.
248 ///
249 /// If unordered reads have already been made, the resulting buffer may have gaps containing
250 /// arbitrary data.
251 ///
252 /// This operation is *not* cancel-safe.
253 ///
254 /// [`ReadToEndError::TooLong`]: crate::ReadToEndError::TooLong
255 pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
256 ReadToEnd {
257 stream: self,
258 size_limit,
259 read: Vec::new(),
260 start: u64::MAX,
261 end: 0,
262 }
263 .await
264 }
265
266 /// Stop accepting data
267 ///
268 /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
269 /// attempts to operate on a stream will yield `ClosedStream` errors.
270 pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
271 let mut conn = self.conn.state.lock("RecvStream::stop");
272 if self.is_0rtt && conn.check_0rtt().is_err() {
273 return Ok(());
274 }
275 conn.inner.recv_stream(self.stream).stop(error_code)?;
276 conn.wake();
277 self.all_data_read = true;
278 Ok(())
279 }
280
281 /// Check if this stream has been opened during 0-RTT.
282 ///
283 /// In which case any non-idempotent request should be considered dangerous at the application
284 /// level. Because read data is subject to replay attacks.
285 pub fn is_0rtt(&self) -> bool {
286 self.is_0rtt
287 }
288
289 /// Get the identity of this stream
290 pub fn id(&self) -> StreamId {
291 self.stream
292 }
293
294 /// Completes when the stream has been reset by the peer or otherwise closed
295 ///
296 /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
297 /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
298 /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
299 /// which it is no longer meaningful for the stream to be reset.
300 ///
301 /// This operation is cancel-safe.
302 pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
303 poll_fn(|cx| {
304 let mut conn = self.conn.state.lock("RecvStream::reset");
305 if self.is_0rtt && conn.check_0rtt().is_err() {
306 return Poll::Ready(Err(ResetError::ZeroRttRejected));
307 }
308
309 if let Some(code) = self.reset {
310 return Poll::Ready(Ok(Some(code)));
311 }
312
313 match conn.inner.recv_stream(self.stream).received_reset() {
314 Err(_) => Poll::Ready(Ok(None)),
315 Ok(Some(error_code)) => {
316 // Stream state has just now been freed, so the connection may need to issue new
317 // stream ID flow control credit
318 conn.wake();
319 Poll::Ready(Ok(Some(error_code)))
320 }
321 Ok(None) => {
322 if let Some(e) = &conn.error {
323 return Poll::Ready(Err(e.clone().into()));
324 }
325 // Resets always notify readers, since a reset is an immediate read error. We
326 // could introduce a dedicated channel to reduce the risk of spurious wakeups,
327 // but that increased complexity is probably not justified, as an application
328 // that is expecting a reset is not likely to receive large amounts of data.
329 conn.blocked_readers.insert(self.stream, cx.waker().clone());
330 Poll::Pending
331 }
332 }
333 })
334 .await
335 }
336
337 /// Handle common logic related to reading out of a receive stream
338 ///
339 /// This takes an `FnMut` closure that takes care of the actual reading process, matching
340 /// the detailed read semantics for the calling function with a particular return type.
341 /// The closure can read from the passed `&mut Chunks` and has to return the status after
342 /// reading: the amount of data read, and the status after the final read call.
343 fn poll_read_generic<T, U>(
344 &mut self,
345 cx: &mut Context,
346 ordered: bool,
347 mut read_fn: T,
348 ) -> Poll<Result<Option<U>, ReadError>>
349 where
350 T: FnMut(&mut Chunks) -> ReadStatus<U>,
351 {
352 use proto::ReadError::*;
353 if self.all_data_read {
354 return Poll::Ready(Ok(None));
355 }
356
357 let mut conn = self.conn.state.lock("RecvStream::poll_read");
358 if self.is_0rtt {
359 conn.check_0rtt().map_err(|()| ReadError::ZeroRttRejected)?;
360 }
361
362 // If we stored an error during a previous call, return it now. This can happen if a
363 // `read_fn` both wants to return data and also returns an error in its final stream status.
364 let status = match self.reset {
365 Some(code) => ReadStatus::Failed(None, Reset(code)),
366 None => {
367 let mut recv = conn.inner.recv_stream(self.stream);
368 let mut chunks = recv.read(ordered).map_err(|e| match e {
369 ReadableError::ClosedStream => ReadError::ClosedStream,
370 ReadableError::IllegalOrderedRead => {
371 // We should never get here because the only way to do unordered reads is
372 // via UnorderedRecvStream, which allows only unordered reads. It is not
373 // possible to get a RecvStream from an UnorderedRecvStream.
374 unreachable!("ordered read after unordered read")
375 }
376 })?;
377 let status = read_fn(&mut chunks);
378 if chunks.finalize().should_transmit() {
379 conn.wake();
380 }
381 status
382 }
383 };
384
385 match status {
386 ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
387 ReadStatus::Finished(read) => {
388 self.all_data_read = true;
389 Poll::Ready(Ok(read))
390 }
391 ReadStatus::Failed(read, Blocked) => match read {
392 Some(val) => Poll::Ready(Ok(Some(val))),
393 None => {
394 if let Some(ref x) = conn.error {
395 return Poll::Ready(Err(ReadError::ConnectionLost(x.clone())));
396 }
397 conn.blocked_readers.insert(self.stream, cx.waker().clone());
398 Poll::Pending
399 }
400 },
401 ReadStatus::Failed(read, Reset(error_code)) => match read {
402 None => {
403 self.all_data_read = true;
404 self.reset = Some(error_code);
405 Poll::Ready(Err(ReadError::Reset(error_code)))
406 }
407 done => {
408 self.reset = Some(error_code);
409 Poll::Ready(Ok(done))
410 }
411 },
412 }
413 }
414
415 /// Converts this stream into an unordered stream.
416 pub fn into_unordered(self) -> UnorderedRecvStream {
417 UnorderedRecvStream { inner: self }
418 }
419}
420
421/// A stream that can be used to receive data out-of-order.
422///
423/// Obtained by converting a [`RecvStream`] via [`RecvStream::into_unordered`].
424///
425/// This variant of `RecvStream` allows reading chunks of data *exclusively*
426/// out of order. Once you have done an unordered read, ordered reads are no
427/// longer possible since data may have been consumed out of order.
428///
429/// The stream state related fns like [`Self::id`], [`Self::is_0rtt`], [`Self::stop`], and
430/// [`Self::received_reset`] behave exactly as on [`RecvStream`].
431#[derive(Debug)]
432pub struct UnorderedRecvStream {
433 inner: RecvStream,
434}
435
436impl UnorderedRecvStream {
437 /// Reads the next segment of data.
438 ///
439 /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
440 /// offset in the stream. Segments may be received in any order, and the `Chunk`'s `offset`
441 /// field can be used to determine ordering in the caller. Unordered reads are less prone
442 /// to head-of-line blocking within a stream, but require the application to manage
443 /// reassembling the original data.
444 ///
445 /// This operation is cancel-safe.
446 pub async fn read_chunk(&mut self, max_length: usize) -> Result<Option<Chunk>, ReadError> {
447 ReadChunk {
448 stream: &mut self.inner,
449 max_length,
450 ordered: false,
451 }
452 .await
453 }
454
455 /// Get the identity of this stream
456 pub fn id(&self) -> StreamId {
457 self.inner.id()
458 }
459
460 /// Check if this stream has been opened during 0-RTT.
461 ///
462 /// In which case any non-idempotent request should be considered dangerous at the application
463 /// level. Because read data is subject to replay attacks.
464 pub fn is_0rtt(&self) -> bool {
465 self.inner.is_0rtt()
466 }
467
468 /// Stop accepting data
469 ///
470 /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
471 /// attempts to operate on a stream will yield `ClosedStream` errors.
472 pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
473 self.inner.stop(error_code)
474 }
475
476 /// Completes when the stream has been reset by the peer or otherwise closed
477 ///
478 /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
479 /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
480 /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
481 /// which it is no longer meaningful for the stream to be reset.
482 ///
483 /// This operation is cancel-safe.
484 pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
485 self.inner.received_reset().await
486 }
487}
488
489enum ReadStatus<T> {
490 Readable(T),
491 Finished(Option<T>),
492 Failed(Option<T>, proto::ReadError),
493}
494
495impl<T> From<(Option<T>, Option<proto::ReadError>)> for ReadStatus<T> {
496 fn from(status: (Option<T>, Option<proto::ReadError>)) -> Self {
497 match status {
498 (read, None) => Self::Finished(read),
499 (read, Some(e)) => Self::Failed(read, e),
500 }
501 }
502}
503
504/// Future produced by [`RecvStream::read_to_end()`].
505///
506/// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end
507struct ReadToEnd<'a> {
508 stream: &'a mut RecvStream,
509 read: Vec<(Bytes, u64)>,
510 start: u64,
511 end: u64,
512 size_limit: usize,
513}
514
515impl Future for ReadToEnd<'_> {
516 type Output = Result<Vec<u8>, ReadToEndError>;
517 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
518 loop {
519 match ready!(self.stream.poll_read_chunk(cx, usize::MAX, false))? {
520 Some(chunk) => {
521 self.start = self.start.min(chunk.offset);
522 let end = chunk.bytes.len() as u64 + chunk.offset;
523 if (end - self.start) > self.size_limit as u64 {
524 return Poll::Ready(Err(ReadToEndError::TooLong));
525 }
526 self.end = self.end.max(end);
527 self.read.push((chunk.bytes, chunk.offset));
528 }
529 None => {
530 if self.end == 0 {
531 // Never received anything
532 return Poll::Ready(Ok(Vec::new()));
533 }
534 let start = self.start;
535 let mut buffer = vec![0; (self.end - start) as usize];
536 for (data, offset) in self.read.drain(..) {
537 let offset = (offset - start) as usize;
538 buffer[offset..offset + data.len()].copy_from_slice(&data);
539 }
540 return Poll::Ready(Ok(buffer));
541 }
542 }
543 }
544 }
545}
546
547/// Errors from [`RecvStream::read_to_end`]
548#[derive(Debug, Error, Clone, PartialEq, Eq)]
549pub enum ReadToEndError {
550 /// An error occurred during reading
551 #[error("read error: {0}")]
552 Read(#[from] ReadError),
553 /// The stream is larger than the user-supplied limit
554 #[error("stream too long")]
555 TooLong,
556}
557
558#[cfg(feature = "futures-io")]
559impl futures_io::AsyncRead for RecvStream {
560 fn poll_read(
561 self: Pin<&mut Self>,
562 cx: &mut Context,
563 buf: &mut [u8],
564 ) -> Poll<io::Result<usize>> {
565 let mut buf = ReadBuf::new(buf);
566 ready!(Self::poll_read_buf(self.get_mut(), cx, &mut buf))?;
567 Poll::Ready(Ok(buf.filled().len()))
568 }
569}
570
571impl tokio::io::AsyncRead for RecvStream {
572 fn poll_read(
573 self: Pin<&mut Self>,
574 cx: &mut Context<'_>,
575 buf: &mut ReadBuf<'_>,
576 ) -> Poll<io::Result<()>> {
577 ready!(Self::poll_read_buf(self.get_mut(), cx, buf))?;
578 Poll::Ready(Ok(()))
579 }
580}
581
582impl Drop for RecvStream {
583 fn drop(&mut self) {
584 let mut conn = self.conn.state.lock("RecvStream::drop");
585
586 // clean up any previously registered wakers
587 conn.blocked_readers.remove(&self.stream);
588
589 if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
590 return;
591 }
592 if !self.all_data_read {
593 // Ignore ClosedStream errors
594 let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
595 conn.wake();
596 }
597 }
598}
599
600/// Errors that arise from reading from a stream.
601#[derive(Debug, Error, Clone, PartialEq, Eq)]
602pub enum ReadError {
603 /// The peer abandoned transmitting data on this stream
604 ///
605 /// Carries an application-defined error code.
606 #[error("stream reset by peer: error {0}")]
607 Reset(VarInt),
608 /// The connection was lost
609 #[error("connection lost")]
610 ConnectionLost(#[from] ConnectionError),
611 /// The stream has already been stopped, finished, or reset
612 #[error("closed stream")]
613 ClosedStream,
614 /// This was a 0-RTT stream and the server rejected it
615 ///
616 /// Can only occur on clients for 0-RTT streams, which can be opened using
617 /// [`Connecting::into_0rtt()`].
618 ///
619 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
620 #[error("0-RTT rejected")]
621 ZeroRttRejected,
622}
623
624impl From<ResetError> for ReadError {
625 fn from(e: ResetError) -> Self {
626 match e {
627 ResetError::ConnectionLost(e) => Self::ConnectionLost(e),
628 ResetError::ZeroRttRejected => Self::ZeroRttRejected,
629 }
630 }
631}
632
633impl From<ReadError> for io::Error {
634 fn from(x: ReadError) -> Self {
635 use ReadError::*;
636 let kind = match x {
637 Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
638 ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
639 };
640 Self::new(kind, x)
641 }
642}
643
644/// Errors that arise while waiting for a stream to be reset
645#[derive(Debug, Error, Clone, PartialEq, Eq)]
646pub enum ResetError {
647 /// The connection was lost
648 #[error("connection lost")]
649 ConnectionLost(#[from] ConnectionError),
650 /// This was a 0-RTT stream and the server rejected it
651 ///
652 /// Can only occur on clients for 0-RTT streams, which can be opened using
653 /// [`Connecting::into_0rtt()`].
654 ///
655 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
656 #[error("0-RTT rejected")]
657 ZeroRttRejected,
658}
659
660impl From<ResetError> for io::Error {
661 fn from(x: ResetError) -> Self {
662 use ResetError::*;
663 let kind = match x {
664 ZeroRttRejected => io::ErrorKind::ConnectionReset,
665 ConnectionLost(_) => io::ErrorKind::NotConnected,
666 };
667 Self::new(kind, x)
668 }
669}
670
671/// Future produced by [`RecvStream::read()`].
672///
673/// [`RecvStream::read()`]: crate::RecvStream::read
674struct Read<'a> {
675 stream: &'a mut RecvStream,
676 buf: ReadBuf<'a>,
677}
678
679impl Future for Read<'_> {
680 type Output = Result<Option<usize>, ReadError>;
681
682 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
683 let this = self.get_mut();
684 ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
685 match this.buf.filled().len() {
686 0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
687 n => Poll::Ready(Ok(Some(n))),
688 }
689 }
690}
691
692/// Future produced by [`RecvStream::read_exact()`].
693///
694/// [`RecvStream::read_exact()`]: crate::RecvStream::read_exact
695struct ReadExact<'a> {
696 stream: &'a mut RecvStream,
697 buf: ReadBuf<'a>,
698}
699
700impl Future for ReadExact<'_> {
701 type Output = Result<(), ReadExactError>;
702 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
703 let this = self.get_mut();
704 let mut remaining = this.buf.remaining();
705 while remaining > 0 {
706 ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
707 let new = this.buf.remaining();
708 if new == remaining {
709 return Poll::Ready(Err(ReadExactError::FinishedEarly(this.buf.filled().len())));
710 }
711 remaining = new;
712 }
713 Poll::Ready(Ok(()))
714 }
715}
716
717/// Errors that arise from reading from a stream.
718#[derive(Debug, Error, Clone, PartialEq, Eq)]
719pub enum ReadExactError {
720 /// The stream finished before all bytes were read
721 #[error("stream finished early ({0} bytes read)")]
722 FinishedEarly(usize),
723 /// A read error occurred
724 #[error(transparent)]
725 ReadError(#[from] ReadError),
726}
727
728/// Future produced by [`RecvStream::read_chunk()`].
729///
730/// [`RecvStream::read_chunk()`]: crate::RecvStream::read_chunk
731struct ReadChunk<'a> {
732 stream: &'a mut RecvStream,
733 max_length: usize,
734 ordered: bool,
735}
736
737impl Future for ReadChunk<'_> {
738 type Output = Result<Option<Chunk>, ReadError>;
739 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
740 let (max_length, ordered) = (self.max_length, self.ordered);
741 self.stream.poll_read_chunk(cx, max_length, ordered)
742 }
743}
744
745/// Future produced by [`RecvStream::read_chunks()`].
746///
747/// [`RecvStream::read_chunks()`]: crate::RecvStream::read_chunks
748struct ReadChunks<'a> {
749 stream: &'a mut RecvStream,
750 bufs: &'a mut [Bytes],
751}
752
753impl Future for ReadChunks<'_> {
754 type Output = Result<Option<usize>, ReadError>;
755 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
756 let this = self.get_mut();
757 this.stream.poll_read_chunks(cx, this.bufs)
758 }
759}