iroh_quinn/
recv_stream.rs

1use std::{
2    future::{Future, poll_fn},
3    io,
4    pin::Pin,
5    task::{Context, Poll, ready},
6};
7
8use bytes::Bytes;
9use proto::{Chunk, Chunks, ClosedStream, ConnectionError, ReadableError, StreamId};
10use thiserror::Error;
11use tokio::io::ReadBuf;
12
13use crate::{VarInt, connection::ConnectionRef};
14
15/// A stream that can only be used to receive data
16///
17/// `stop(0)` is implicitly called on drop unless:
18/// - A variant of [`ReadError`] has been yielded by a read call
19/// - [`stop()`] was called explicitly
20///
21/// # Cancellation
22///
23/// A `read` method is said to be *cancel-safe* when dropping its future before the future becomes
24/// ready cannot lead to loss of stream data. This is true of methods which succeed immediately when
25/// any progress is made, and is not true of methods which might need to perform multiple reads
26/// internally before succeeding. Each `read` method documents whether it is cancel-safe.
27///
28/// # Common issues
29///
30/// ## Data never received on a locally-opened stream
31///
32/// Peers are not notified of streams until they or a later-numbered stream are used to send
33/// data. If a bidirectional stream is locally opened but never used to send, then the peer may
34/// never see it. Application protocols should always arrange for the endpoint which will first
35/// transmit on a stream to be the endpoint responsible for opening it.
36///
37/// ## Data never received on a remotely-opened stream
38///
39/// Verify that the stream you are receiving is the same one that the server is sending on, e.g. by
40/// logging the [`id`] of each. Streams are always accepted in the same order as they are created,
41/// i.e. ascending order by [`StreamId`]. For example, even if a sender first transmits on
42/// bidirectional stream 1, the first stream yielded by [`Connection::accept_bi`] on the receiver
43/// will be bidirectional stream 0.
44///
45/// [`ReadError`]: crate::ReadError
46/// [`stop()`]: RecvStream::stop
47/// [`SendStream::finish`]: crate::SendStream::finish
48/// [`WriteError::Stopped`]: crate::WriteError::Stopped
49/// [`id`]: RecvStream::id
50/// [`Connection::accept_bi`]: crate::Connection::accept_bi
51#[derive(Debug)]
52pub struct RecvStream {
53    conn: ConnectionRef,
54    stream: StreamId,
55    is_0rtt: bool,
56    all_data_read: bool,
57    reset: Option<VarInt>,
58}
59
60impl RecvStream {
61    pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
62        Self {
63            conn,
64            stream,
65            is_0rtt,
66            all_data_read: false,
67            reset: None,
68        }
69    }
70
71    /// Read data contiguously from the stream.
72    ///
73    /// Yields the number of bytes read into `buf` on success, or `None` if the stream was finished.
74    ///
75    /// This operation is cancel-safe.
76    pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
77        Read {
78            stream: self,
79            buf: ReadBuf::new(buf),
80        }
81        .await
82    }
83
84    /// Read an exact number of bytes contiguously from the stream.
85    ///
86    /// See [`read()`] for details. This operation is *not* cancel-safe.
87    ///
88    /// [`read()`]: RecvStream::read
89    pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> {
90        ReadExact {
91            stream: self,
92            buf: ReadBuf::new(buf),
93        }
94        .await
95    }
96
97    /// Attempts to read from the stream into the provided buffer
98    ///
99    /// On success, returns `Poll::Ready(Ok(num_bytes_read))` and places data into `buf`. If this
100    /// returns zero bytes read (and `buf` has a non-zero length), that indicates that the remote
101    /// side has [`finish`]ed the stream and the local side has already read all bytes.
102    ///
103    /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
104    /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
105    /// closed.
106    ///
107    /// [`finish`]: crate::SendStream::finish
108    pub fn poll_read(
109        &mut self,
110        cx: &mut Context,
111        buf: &mut [u8],
112    ) -> Poll<Result<usize, ReadError>> {
113        let mut buf = ReadBuf::new(buf);
114        ready!(self.poll_read_buf(cx, &mut buf))?;
115        Poll::Ready(Ok(buf.filled().len()))
116    }
117
118    /// Attempts to read from the stream into the provided buffer, which may be uninitialized
119    ///
120    /// On success, returns `Poll::Ready(Ok(()))` and places data into the unfilled portion of
121    /// `buf`. If this does not write any bytes to `buf` (and `buf.remaining()` is non-zero), that
122    /// indicates that the remote side has [`finish`]ed the stream and the local side has already
123    /// read all bytes.
124    ///
125    /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
126    /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
127    /// closed.
128    ///
129    /// [`finish`]: crate::SendStream::finish
130    pub fn poll_read_buf(
131        &mut self,
132        cx: &mut Context,
133        buf: &mut ReadBuf<'_>,
134    ) -> Poll<Result<(), ReadError>> {
135        if buf.remaining() == 0 {
136            return Poll::Ready(Ok(()));
137        }
138
139        self.poll_read_generic(cx, true, |chunks| {
140            let mut read = false;
141            loop {
142                if buf.remaining() == 0 {
143                    // We know `read` is `true` because `buf.remaining()` was not 0 before
144                    return ReadStatus::Readable(());
145                }
146
147                match chunks.next(buf.remaining()) {
148                    Ok(Some(chunk)) => {
149                        buf.put_slice(&chunk.bytes);
150                        read = true;
151                    }
152                    res => return (if read { Some(()) } else { None }, res.err()).into(),
153                }
154            }
155        })
156        .map(|res| res.map(|_| ()))
157    }
158
159    /// Read the next segment of data
160    ///
161    /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
162    /// offset in the stream. The chunk's offset will be immediately after
163    /// the last data yielded by [`RecvStream::read`] or [`RecvStream::read_chunk`].
164    ///
165    /// For unordered reads, convert the stream into an unordered stream using [`Self::into_unordered`].
166    ///
167    /// Slightly more efficient than [`RecvStream::read`] due to not copying. Chunk boundaries do not correspond
168    /// to peer writes, and hence cannot be used as framing.
169    ///
170    /// This operation is cancel-safe.
171    pub async fn read_chunk(&mut self, max_length: usize) -> Result<Option<Chunk>, ReadError> {
172        ReadChunk {
173            stream: self,
174            max_length,
175            ordered: true,
176        }
177        .await
178    }
179
180    /// Attempts to read a chunk from the stream.
181    ///
182    /// On success, returns `Poll::Ready(Ok(Some(chunk)))`. If `Poll::Ready(Ok(None))`
183    /// is returned, it implies that EOF has been reached.
184    ///
185    /// If no data is available for reading, the method returns `Poll::Pending`
186    /// and arranges for the current task (via cx.waker()) to receive a notification
187    /// when the stream becomes readable or is closed.
188    fn poll_read_chunk(
189        &mut self,
190        cx: &mut Context,
191        max_length: usize,
192        ordered: bool,
193    ) -> Poll<Result<Option<Chunk>, ReadError>> {
194        self.poll_read_generic(cx, ordered, |chunks| match chunks.next(max_length) {
195            Ok(Some(chunk)) => ReadStatus::Readable(chunk),
196            res => (None, res.err()).into(),
197        })
198    }
199
200    /// Read the next segments of data
201    ///
202    /// Fills `bufs` with the segments of data beginning immediately after the
203    /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
204    /// finished.
205    ///
206    /// Slightly more efficient than `read` due to not copying. Chunk boundaries
207    /// do not correspond to peer writes, and hence cannot be used as framing.
208    ///
209    /// This operation is cancel-safe.
210    pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
211        ReadChunks { stream: self, bufs }.await
212    }
213
214    /// Foundation of [`Self::read_chunks`]
215    fn poll_read_chunks(
216        &mut self,
217        cx: &mut Context,
218        bufs: &mut [Bytes],
219    ) -> Poll<Result<Option<usize>, ReadError>> {
220        if bufs.is_empty() {
221            return Poll::Ready(Ok(Some(0)));
222        }
223
224        self.poll_read_generic(cx, true, |chunks| {
225            let mut read = 0;
226            loop {
227                if read >= bufs.len() {
228                    // We know `read > 0` because `bufs` cannot be empty here
229                    return ReadStatus::Readable(read);
230                }
231
232                match chunks.next(usize::MAX) {
233                    Ok(Some(chunk)) => {
234                        bufs[read] = chunk.bytes;
235                        read += 1;
236                    }
237                    res => return (if read == 0 { None } else { Some(read) }, res.err()).into(),
238                }
239            }
240        })
241    }
242
243    /// Convenience method to read all remaining data into a buffer
244    ///
245    /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
246    /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
247    /// allow. `size_limit` should be set to limit worst-case memory use.
248    ///
249    /// If unordered reads have already been made, the resulting buffer may have gaps containing
250    /// arbitrary data.
251    ///
252    /// This operation is *not* cancel-safe.
253    ///
254    /// [`ReadToEndError::TooLong`]: crate::ReadToEndError::TooLong
255    pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
256        ReadToEnd {
257            stream: self,
258            size_limit,
259            read: Vec::new(),
260            start: u64::MAX,
261            end: 0,
262        }
263        .await
264    }
265
266    /// Stop accepting data
267    ///
268    /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
269    /// attempts to operate on a stream will yield `ClosedStream` errors.
270    pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
271        let mut conn = self.conn.state.lock("RecvStream::stop");
272        if self.is_0rtt && conn.check_0rtt().is_err() {
273            return Ok(());
274        }
275        conn.inner.recv_stream(self.stream).stop(error_code)?;
276        conn.wake();
277        self.all_data_read = true;
278        Ok(())
279    }
280
281    /// Check if this stream has been opened during 0-RTT.
282    ///
283    /// In which case any non-idempotent request should be considered dangerous at the application
284    /// level. Because read data is subject to replay attacks.
285    pub fn is_0rtt(&self) -> bool {
286        self.is_0rtt
287    }
288
289    /// Get the identity of this stream
290    pub fn id(&self) -> StreamId {
291        self.stream
292    }
293
294    /// Completes when the stream has been reset by the peer or otherwise closed
295    ///
296    /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
297    /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
298    /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
299    /// which it is no longer meaningful for the stream to be reset.
300    ///
301    /// This operation is cancel-safe.
302    pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
303        poll_fn(|cx| {
304            let mut conn = self.conn.state.lock("RecvStream::reset");
305            if self.is_0rtt && conn.check_0rtt().is_err() {
306                return Poll::Ready(Err(ResetError::ZeroRttRejected));
307            }
308
309            if let Some(code) = self.reset {
310                return Poll::Ready(Ok(Some(code)));
311            }
312
313            match conn.inner.recv_stream(self.stream).received_reset() {
314                Err(_) => Poll::Ready(Ok(None)),
315                Ok(Some(error_code)) => {
316                    // Stream state has just now been freed, so the connection may need to issue new
317                    // stream ID flow control credit
318                    conn.wake();
319                    Poll::Ready(Ok(Some(error_code)))
320                }
321                Ok(None) => {
322                    if let Some(e) = &conn.error {
323                        return Poll::Ready(Err(e.clone().into()));
324                    }
325                    // Resets always notify readers, since a reset is an immediate read error. We
326                    // could introduce a dedicated channel to reduce the risk of spurious wakeups,
327                    // but that increased complexity is probably not justified, as an application
328                    // that is expecting a reset is not likely to receive large amounts of data.
329                    conn.blocked_readers.insert(self.stream, cx.waker().clone());
330                    Poll::Pending
331                }
332            }
333        })
334        .await
335    }
336
337    /// Handle common logic related to reading out of a receive stream
338    ///
339    /// This takes an `FnMut` closure that takes care of the actual reading process, matching
340    /// the detailed read semantics for the calling function with a particular return type.
341    /// The closure can read from the passed `&mut Chunks` and has to return the status after
342    /// reading: the amount of data read, and the status after the final read call.
343    fn poll_read_generic<T, U>(
344        &mut self,
345        cx: &mut Context,
346        ordered: bool,
347        mut read_fn: T,
348    ) -> Poll<Result<Option<U>, ReadError>>
349    where
350        T: FnMut(&mut Chunks) -> ReadStatus<U>,
351    {
352        use proto::ReadError::*;
353        if self.all_data_read {
354            return Poll::Ready(Ok(None));
355        }
356
357        let mut conn = self.conn.state.lock("RecvStream::poll_read");
358        if self.is_0rtt {
359            conn.check_0rtt().map_err(|()| ReadError::ZeroRttRejected)?;
360        }
361
362        // If we stored an error during a previous call, return it now. This can happen if a
363        // `read_fn` both wants to return data and also returns an error in its final stream status.
364        let status = match self.reset {
365            Some(code) => ReadStatus::Failed(None, Reset(code)),
366            None => {
367                let mut recv = conn.inner.recv_stream(self.stream);
368                let mut chunks = recv.read(ordered).map_err(|e| match e {
369                    ReadableError::ClosedStream => ReadError::ClosedStream,
370                    ReadableError::IllegalOrderedRead => {
371                        // We should never get here because the only way to do unordered reads is
372                        // via UnorderedRecvStream, which allows only unordered reads. It is not
373                        // possible to get a RecvStream from an UnorderedRecvStream.
374                        unreachable!("ordered read after unordered read")
375                    }
376                })?;
377                let status = read_fn(&mut chunks);
378                if chunks.finalize().should_transmit() {
379                    conn.wake();
380                }
381                status
382            }
383        };
384
385        match status {
386            ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
387            ReadStatus::Finished(read) => {
388                self.all_data_read = true;
389                Poll::Ready(Ok(read))
390            }
391            ReadStatus::Failed(read, Blocked) => match read {
392                Some(val) => Poll::Ready(Ok(Some(val))),
393                None => {
394                    if let Some(ref x) = conn.error {
395                        return Poll::Ready(Err(ReadError::ConnectionLost(x.clone())));
396                    }
397                    conn.blocked_readers.insert(self.stream, cx.waker().clone());
398                    Poll::Pending
399                }
400            },
401            ReadStatus::Failed(read, Reset(error_code)) => match read {
402                None => {
403                    self.all_data_read = true;
404                    self.reset = Some(error_code);
405                    Poll::Ready(Err(ReadError::Reset(error_code)))
406                }
407                done => {
408                    self.reset = Some(error_code);
409                    Poll::Ready(Ok(done))
410                }
411            },
412        }
413    }
414
415    /// Converts this stream into an unordered stream.
416    pub fn into_unordered(self) -> UnorderedRecvStream {
417        UnorderedRecvStream { inner: self }
418    }
419}
420
421/// A stream that can be used to receive data out-of-order.
422///
423/// Obtained by converting a [`RecvStream`] via [`RecvStream::into_unordered`].
424///
425/// This variant of `RecvStream` allows reading chunks of data *exclusively*
426/// out of order. Once you have done an unordered read, ordered reads are no
427/// longer possible since data may have been consumed out of order.
428///
429/// The stream state related fns like [`Self::id`], [`Self::is_0rtt`], [`Self::stop`], and
430/// [`Self::received_reset`] behave exactly as on [`RecvStream`].
431#[derive(Debug)]
432pub struct UnorderedRecvStream {
433    inner: RecvStream,
434}
435
436impl UnorderedRecvStream {
437    /// Reads the next segment of data.
438    ///
439    /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
440    /// offset in the stream. Segments may be received in any order, and the `Chunk`'s `offset`
441    /// field can be used to determine ordering in the caller. Unordered reads are less prone
442    /// to head-of-line blocking within a stream, but require the application to manage
443    /// reassembling the original data.
444    ///
445    /// This operation is cancel-safe.
446    pub async fn read_chunk(&mut self, max_length: usize) -> Result<Option<Chunk>, ReadError> {
447        ReadChunk {
448            stream: &mut self.inner,
449            max_length,
450            ordered: false,
451        }
452        .await
453    }
454
455    /// Get the identity of this stream
456    pub fn id(&self) -> StreamId {
457        self.inner.id()
458    }
459
460    /// Check if this stream has been opened during 0-RTT.
461    ///
462    /// In which case any non-idempotent request should be considered dangerous at the application
463    /// level. Because read data is subject to replay attacks.
464    pub fn is_0rtt(&self) -> bool {
465        self.inner.is_0rtt()
466    }
467
468    /// Stop accepting data
469    ///
470    /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
471    /// attempts to operate on a stream will yield `ClosedStream` errors.
472    pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
473        self.inner.stop(error_code)
474    }
475
476    /// Completes when the stream has been reset by the peer or otherwise closed
477    ///
478    /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
479    /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
480    /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
481    /// which it is no longer meaningful for the stream to be reset.
482    ///
483    /// This operation is cancel-safe.
484    pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
485        self.inner.received_reset().await
486    }
487}
488
489enum ReadStatus<T> {
490    Readable(T),
491    Finished(Option<T>),
492    Failed(Option<T>, proto::ReadError),
493}
494
495impl<T> From<(Option<T>, Option<proto::ReadError>)> for ReadStatus<T> {
496    fn from(status: (Option<T>, Option<proto::ReadError>)) -> Self {
497        match status {
498            (read, None) => Self::Finished(read),
499            (read, Some(e)) => Self::Failed(read, e),
500        }
501    }
502}
503
504/// Future produced by [`RecvStream::read_to_end()`].
505///
506/// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end
507struct ReadToEnd<'a> {
508    stream: &'a mut RecvStream,
509    read: Vec<(Bytes, u64)>,
510    start: u64,
511    end: u64,
512    size_limit: usize,
513}
514
515impl Future for ReadToEnd<'_> {
516    type Output = Result<Vec<u8>, ReadToEndError>;
517    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
518        loop {
519            match ready!(self.stream.poll_read_chunk(cx, usize::MAX, false))? {
520                Some(chunk) => {
521                    self.start = self.start.min(chunk.offset);
522                    let end = chunk.bytes.len() as u64 + chunk.offset;
523                    if (end - self.start) > self.size_limit as u64 {
524                        return Poll::Ready(Err(ReadToEndError::TooLong));
525                    }
526                    self.end = self.end.max(end);
527                    self.read.push((chunk.bytes, chunk.offset));
528                }
529                None => {
530                    if self.end == 0 {
531                        // Never received anything
532                        return Poll::Ready(Ok(Vec::new()));
533                    }
534                    let start = self.start;
535                    let mut buffer = vec![0; (self.end - start) as usize];
536                    for (data, offset) in self.read.drain(..) {
537                        let offset = (offset - start) as usize;
538                        buffer[offset..offset + data.len()].copy_from_slice(&data);
539                    }
540                    return Poll::Ready(Ok(buffer));
541                }
542            }
543        }
544    }
545}
546
547/// Errors from [`RecvStream::read_to_end`]
548#[derive(Debug, Error, Clone, PartialEq, Eq)]
549pub enum ReadToEndError {
550    /// An error occurred during reading
551    #[error("read error: {0}")]
552    Read(#[from] ReadError),
553    /// The stream is larger than the user-supplied limit
554    #[error("stream too long")]
555    TooLong,
556}
557
558#[cfg(feature = "futures-io")]
559impl futures_io::AsyncRead for RecvStream {
560    fn poll_read(
561        self: Pin<&mut Self>,
562        cx: &mut Context,
563        buf: &mut [u8],
564    ) -> Poll<io::Result<usize>> {
565        let mut buf = ReadBuf::new(buf);
566        ready!(Self::poll_read_buf(self.get_mut(), cx, &mut buf))?;
567        Poll::Ready(Ok(buf.filled().len()))
568    }
569}
570
571impl tokio::io::AsyncRead for RecvStream {
572    fn poll_read(
573        self: Pin<&mut Self>,
574        cx: &mut Context<'_>,
575        buf: &mut ReadBuf<'_>,
576    ) -> Poll<io::Result<()>> {
577        ready!(Self::poll_read_buf(self.get_mut(), cx, buf))?;
578        Poll::Ready(Ok(()))
579    }
580}
581
582impl Drop for RecvStream {
583    fn drop(&mut self) {
584        let mut conn = self.conn.state.lock("RecvStream::drop");
585
586        // clean up any previously registered wakers
587        conn.blocked_readers.remove(&self.stream);
588
589        if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
590            return;
591        }
592        if !self.all_data_read {
593            // Ignore ClosedStream errors
594            let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
595            conn.wake();
596        }
597    }
598}
599
600/// Errors that arise from reading from a stream.
601#[derive(Debug, Error, Clone, PartialEq, Eq)]
602pub enum ReadError {
603    /// The peer abandoned transmitting data on this stream
604    ///
605    /// Carries an application-defined error code.
606    #[error("stream reset by peer: error {0}")]
607    Reset(VarInt),
608    /// The connection was lost
609    #[error("connection lost")]
610    ConnectionLost(#[from] ConnectionError),
611    /// The stream has already been stopped, finished, or reset
612    #[error("closed stream")]
613    ClosedStream,
614    /// This was a 0-RTT stream and the server rejected it
615    ///
616    /// Can only occur on clients for 0-RTT streams, which can be opened using
617    /// [`Connecting::into_0rtt()`].
618    ///
619    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
620    #[error("0-RTT rejected")]
621    ZeroRttRejected,
622}
623
624impl From<ResetError> for ReadError {
625    fn from(e: ResetError) -> Self {
626        match e {
627            ResetError::ConnectionLost(e) => Self::ConnectionLost(e),
628            ResetError::ZeroRttRejected => Self::ZeroRttRejected,
629        }
630    }
631}
632
633impl From<ReadError> for io::Error {
634    fn from(x: ReadError) -> Self {
635        use ReadError::*;
636        let kind = match x {
637            Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
638            ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
639        };
640        Self::new(kind, x)
641    }
642}
643
644/// Errors that arise while waiting for a stream to be reset
645#[derive(Debug, Error, Clone, PartialEq, Eq)]
646pub enum ResetError {
647    /// The connection was lost
648    #[error("connection lost")]
649    ConnectionLost(#[from] ConnectionError),
650    /// This was a 0-RTT stream and the server rejected it
651    ///
652    /// Can only occur on clients for 0-RTT streams, which can be opened using
653    /// [`Connecting::into_0rtt()`].
654    ///
655    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
656    #[error("0-RTT rejected")]
657    ZeroRttRejected,
658}
659
660impl From<ResetError> for io::Error {
661    fn from(x: ResetError) -> Self {
662        use ResetError::*;
663        let kind = match x {
664            ZeroRttRejected => io::ErrorKind::ConnectionReset,
665            ConnectionLost(_) => io::ErrorKind::NotConnected,
666        };
667        Self::new(kind, x)
668    }
669}
670
671/// Future produced by [`RecvStream::read()`].
672///
673/// [`RecvStream::read()`]: crate::RecvStream::read
674struct Read<'a> {
675    stream: &'a mut RecvStream,
676    buf: ReadBuf<'a>,
677}
678
679impl Future for Read<'_> {
680    type Output = Result<Option<usize>, ReadError>;
681
682    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
683        let this = self.get_mut();
684        ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
685        match this.buf.filled().len() {
686            0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
687            n => Poll::Ready(Ok(Some(n))),
688        }
689    }
690}
691
692/// Future produced by [`RecvStream::read_exact()`].
693///
694/// [`RecvStream::read_exact()`]: crate::RecvStream::read_exact
695struct ReadExact<'a> {
696    stream: &'a mut RecvStream,
697    buf: ReadBuf<'a>,
698}
699
700impl Future for ReadExact<'_> {
701    type Output = Result<(), ReadExactError>;
702    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
703        let this = self.get_mut();
704        let mut remaining = this.buf.remaining();
705        while remaining > 0 {
706            ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
707            let new = this.buf.remaining();
708            if new == remaining {
709                return Poll::Ready(Err(ReadExactError::FinishedEarly(this.buf.filled().len())));
710            }
711            remaining = new;
712        }
713        Poll::Ready(Ok(()))
714    }
715}
716
717/// Errors that arise from reading from a stream.
718#[derive(Debug, Error, Clone, PartialEq, Eq)]
719pub enum ReadExactError {
720    /// The stream finished before all bytes were read
721    #[error("stream finished early ({0} bytes read)")]
722    FinishedEarly(usize),
723    /// A read error occurred
724    #[error(transparent)]
725    ReadError(#[from] ReadError),
726}
727
728/// Future produced by [`RecvStream::read_chunk()`].
729///
730/// [`RecvStream::read_chunk()`]: crate::RecvStream::read_chunk
731struct ReadChunk<'a> {
732    stream: &'a mut RecvStream,
733    max_length: usize,
734    ordered: bool,
735}
736
737impl Future for ReadChunk<'_> {
738    type Output = Result<Option<Chunk>, ReadError>;
739    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
740        let (max_length, ordered) = (self.max_length, self.ordered);
741        self.stream.poll_read_chunk(cx, max_length, ordered)
742    }
743}
744
745/// Future produced by [`RecvStream::read_chunks()`].
746///
747/// [`RecvStream::read_chunks()`]: crate::RecvStream::read_chunks
748struct ReadChunks<'a> {
749    stream: &'a mut RecvStream,
750    bufs: &'a mut [Bytes],
751}
752
753impl Future for ReadChunks<'_> {
754    type Output = Result<Option<usize>, ReadError>;
755    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
756        let this = self.get_mut();
757        this.stream.poll_read_chunks(cx, this.bufs)
758    }
759}