noq_proto/connection/streams/
recv.rs

1use std::collections::hash_map::Entry;
2use std::mem;
3
4use thiserror::Error;
5use tracing::debug;
6
7use super::state::get_or_insert_recv;
8use super::{ClosedStream, Retransmits, ShouldTransmit, StreamId, StreamsState};
9use crate::connection::assembler::{Assembler, Chunk, IllegalOrderedRead};
10use crate::connection::streams::state::StreamRecv;
11use crate::{TransportError, VarInt, frame};
12
13#[derive(Debug, Default)]
14pub(super) struct Recv {
15    // NB: when adding or removing fields, remember to update `reinit`.
16    state: RecvState,
17    pub(super) assembler: Assembler,
18    sent_max_stream_data: u64,
19    pub(super) end: u64,
20    pub(super) stopped: bool,
21}
22
23impl Recv {
24    pub(super) fn new(initial_max_data: u64) -> Box<Self> {
25        Box::new(Self {
26            state: RecvState::default(),
27            assembler: Assembler::new(),
28            sent_max_stream_data: initial_max_data,
29            end: 0,
30            stopped: false,
31        })
32    }
33
34    /// Reset to the initial state
35    pub(super) fn reinit(&mut self, initial_max_data: u64) {
36        self.state = RecvState::default();
37        self.assembler.reinit();
38        self.sent_max_stream_data = initial_max_data;
39        self.end = 0;
40        self.stopped = false;
41    }
42
43    /// Process a STREAM frame
44    ///
45    /// Return value is `(number_of_new_bytes_ingested, stream_is_closed)`
46    pub(super) fn ingest(
47        &mut self,
48        frame: frame::Stream,
49        payload_len: usize,
50        received: u64,
51        max_data: u64,
52    ) -> Result<(u64, bool), TransportError> {
53        let end = frame.offset + frame.data.len() as u64;
54        if end >= 2u64.pow(62) {
55            return Err(TransportError::FLOW_CONTROL_ERROR(
56                "maximum stream offset too large",
57            ));
58        }
59
60        if let Some(final_offset) = self.final_offset()
61            && (end > final_offset || (frame.fin && end != final_offset))
62        {
63            debug!(end, final_offset, "final size error");
64            return Err(TransportError::FINAL_SIZE_ERROR(""));
65        }
66
67        let new_bytes = self.credit_consumed_by(end, received, max_data)?;
68
69        // Stopped streams don't need to wait for the actual data, they just need to know
70        // how much there was.
71        if frame.fin
72            && !self.stopped
73            && let RecvState::Recv { ref mut size } = self.state
74        {
75            *size = Some(end);
76        }
77
78        self.end = self.end.max(end);
79        // Don't bother storing data or releasing stream-level flow control credit if the stream's
80        // already stopped
81        if !self.stopped {
82            self.assembler.insert(frame.offset, frame.data, payload_len);
83        }
84
85        Ok((new_bytes, frame.fin && self.stopped))
86    }
87
88    pub(super) fn stop(&mut self) -> Result<(u64, ShouldTransmit), ClosedStream> {
89        if self.stopped {
90            return Err(ClosedStream { _private: () });
91        }
92
93        self.stopped = true;
94        self.assembler.clear();
95        // Issue flow control credit for unread data
96        let read_credits = self.end - self.assembler.bytes_read();
97        // This may send a spurious STOP_SENDING if we've already received all data, but it's a bit
98        // fiddly to distinguish that from the case where we've received a FIN but are missing some
99        // data that the peer might still be trying to retransmit, in which case a STOP_SENDING is
100        // still useful.
101        Ok((read_credits, ShouldTransmit(self.is_receiving())))
102    }
103
104    /// Returns the window that should be advertised in a `MAX_STREAM_DATA` frame
105    ///
106    /// The method returns a tuple which consists of the window that should be
107    /// announced, as well as a boolean parameter which indicates if a new
108    /// transmission of the value is recommended. If the boolean value is
109    /// `false` the new window should only be transmitted if a previous transmission
110    /// had failed.
111    pub(super) fn max_stream_data(&mut self, stream_receive_window: u64) -> (u64, ShouldTransmit) {
112        let max_stream_data = self.assembler.bytes_read() + stream_receive_window;
113
114        // Only announce a window update if it's significant enough
115        // to make it worthwhile sending a MAX_STREAM_DATA frame.
116        // We use here a fraction of the configured stream receive window to make
117        // the decision, and accommodate for streams using bigger windows requiring
118        // less updates. A fixed size would also work - but it would need to be
119        // smaller than `stream_receive_window` in order to make sure the stream
120        // does not get stuck.
121        let diff = max_stream_data - self.sent_max_stream_data;
122        let transmit = self.can_send_flow_control() && diff >= (stream_receive_window / 8);
123        (max_stream_data, ShouldTransmit(transmit))
124    }
125
126    /// Records that a `MAX_STREAM_DATA` announcing a certain window was sent
127    ///
128    /// This will suppress enqueuing further `MAX_STREAM_DATA` frames unless
129    /// either the previous transmission was not acknowledged or the window
130    /// further increased.
131    pub(super) fn record_sent_max_stream_data(&mut self, sent_value: u64) {
132        if sent_value > self.sent_max_stream_data {
133            self.sent_max_stream_data = sent_value;
134        }
135    }
136
137    /// Whether the total amount of data that the peer will send on this stream is unknown
138    ///
139    /// True until we've received either a reset or the final frame.
140    ///
141    /// Implies that the sender might benefit from stream-level flow control updates, and we might
142    /// need to issue connection-level flow control updates due to flow control budget use by this
143    /// stream in the future, even if it's been stopped.
144    pub(super) fn final_offset_unknown(&self) -> bool {
145        matches!(self.state, RecvState::Recv { size: None })
146    }
147
148    /// Whether stream-level flow control updates should be sent for this stream
149    pub(super) fn can_send_flow_control(&self) -> bool {
150        // Stream-level flow control is redundant if the sender has already sent the whole stream,
151        // and moot if we no longer want data on this stream.
152        self.final_offset_unknown() && !self.stopped
153    }
154
155    /// Whether data is still being accepted from the peer
156    pub(super) fn is_receiving(&self) -> bool {
157        matches!(self.state, RecvState::Recv { .. })
158    }
159
160    fn final_offset(&self) -> Option<u64> {
161        match self.state {
162            RecvState::Recv { size } => size,
163            RecvState::ResetRecvd { size, .. } => Some(size),
164        }
165    }
166
167    /// Returns `false` iff the reset was redundant
168    pub(super) fn reset(
169        &mut self,
170        error_code: VarInt,
171        final_offset: VarInt,
172        received: u64,
173        max_data: u64,
174    ) -> Result<bool, TransportError> {
175        // Validate final_offset
176        if let Some(offset) = self.final_offset() {
177            if offset != final_offset.into_inner() {
178                return Err(TransportError::FINAL_SIZE_ERROR("inconsistent value"));
179            }
180        } else if self.end > u64::from(final_offset) {
181            return Err(TransportError::FINAL_SIZE_ERROR(
182                "lower than high water mark",
183            ));
184        }
185        self.credit_consumed_by(final_offset.into(), received, max_data)?;
186
187        if matches!(self.state, RecvState::ResetRecvd { .. }) {
188            return Ok(false);
189        }
190        self.state = RecvState::ResetRecvd {
191            size: final_offset.into(),
192            error_code,
193        };
194        // Nuke buffers so that future reads fail immediately, which ensures future reads don't
195        // issue flow control credit redundant to that already issued. We could instead special-case
196        // reset streams during read, but it's unclear if there's any benefit to retaining data for
197        // reset streams.
198        self.assembler.clear();
199        Ok(true)
200    }
201
202    pub(super) fn reset_code(&self) -> Option<VarInt> {
203        match self.state {
204            RecvState::ResetRecvd { error_code, .. } => Some(error_code),
205            _ => None,
206        }
207    }
208
209    /// Compute the amount of flow control credit consumed, or return an error if more was consumed
210    /// than issued
211    fn credit_consumed_by(
212        &self,
213        offset: u64,
214        received: u64,
215        max_data: u64,
216    ) -> Result<u64, TransportError> {
217        let prev_end = self.end;
218        let new_bytes = offset.saturating_sub(prev_end);
219        if offset > self.sent_max_stream_data || received + new_bytes > max_data {
220            debug!(
221                received,
222                new_bytes,
223                max_data,
224                offset,
225                stream_max_data = self.sent_max_stream_data,
226                "flow control error"
227            );
228            return Err(TransportError::FLOW_CONTROL_ERROR(""));
229        }
230
231        Ok(new_bytes)
232    }
233}
234
235/// Chunks returned from [`RecvStream::read()`][crate::RecvStream::read].
236///
237/// ### Note: Finalization Needed
238/// Bytes read from the stream are not released from the congestion window until
239/// either [`Self::finalize()`] is called, or this type is dropped.
240///
241/// It is recommended that you call [`Self::finalize()`] because it returns a flag
242/// telling you whether reading from the stream has resulted in the need to transmit a packet.
243///
244/// If this type is leaked, the stream will remain blocked on the remote peer until
245/// another read from the stream is done.
246pub struct Chunks<'a> {
247    id: StreamId,
248    ordered: bool,
249    streams: &'a mut StreamsState,
250    pending: &'a mut Retransmits,
251    state: ChunksState,
252    read: u64,
253}
254
255impl<'a> Chunks<'a> {
256    pub(super) fn new(
257        id: StreamId,
258        ordered: bool,
259        streams: &'a mut StreamsState,
260        pending: &'a mut Retransmits,
261    ) -> Result<Self, ReadableError> {
262        let mut entry = match streams.recv.entry(id) {
263            Entry::Occupied(entry) => entry,
264            Entry::Vacant(_) => return Err(ReadableError::ClosedStream),
265        };
266
267        let mut recv =
268            match get_or_insert_recv(streams.stream_receive_window)(entry.get_mut()).stopped {
269                true => return Err(ReadableError::ClosedStream),
270                false => entry.remove().unwrap().into_inner(), // this can't fail due to the previous get_or_insert_with
271            };
272
273        recv.assembler.ensure_ordering(ordered)?;
274        Ok(Self {
275            id,
276            ordered,
277            streams,
278            pending,
279            state: ChunksState::Readable(recv),
280            read: 0,
281        })
282    }
283
284    /// Next
285    ///
286    /// Should call finalize() when done calling this.
287    pub fn next(&mut self, max_length: usize) -> Result<Option<Chunk>, ReadError> {
288        let rs = match self.state {
289            ChunksState::Readable(ref mut rs) => rs,
290            ChunksState::Reset(error_code) => {
291                return Err(ReadError::Reset(error_code));
292            }
293            ChunksState::Finished => {
294                return Ok(None);
295            }
296            ChunksState::Finalized => panic!("must not call next() after finalize()"),
297        };
298
299        if let Some(chunk) = rs.assembler.read(max_length, self.ordered) {
300            self.read += chunk.bytes.len() as u64;
301            return Ok(Some(chunk));
302        }
303
304        match rs.state {
305            RecvState::ResetRecvd { error_code, .. } => {
306                debug_assert_eq!(self.read, 0, "reset streams have empty buffers");
307                let state = mem::replace(&mut self.state, ChunksState::Reset(error_code));
308                // At this point if we have `rs` self.state must be `ChunksState::Readable`
309                let recv = match state {
310                    ChunksState::Readable(recv) => StreamRecv::Open(recv),
311                    _ => unreachable!("state must be ChunkState::Readable"),
312                };
313                self.streams.stream_recv_freed(self.id, recv);
314                Err(ReadError::Reset(error_code))
315            }
316            RecvState::Recv { size } => {
317                if size == Some(rs.end) && rs.assembler.bytes_read() == rs.end {
318                    let state = mem::replace(&mut self.state, ChunksState::Finished);
319                    // At this point if we have `rs` self.state must be `ChunksState::Readable`
320                    let recv = match state {
321                        ChunksState::Readable(recv) => StreamRecv::Open(recv),
322                        _ => unreachable!("state must be ChunkState::Readable"),
323                    };
324                    self.streams.stream_recv_freed(self.id, recv);
325                    Ok(None)
326                } else {
327                    // We don't need a distinct `ChunksState` variant for a blocked stream because
328                    // retrying a read harmlessly re-traces our steps back to returning
329                    // `Err(Blocked)` again. The buffers can't refill and the stream's own state
330                    // can't change so long as this `Chunks` exists.
331                    Err(ReadError::Blocked)
332                }
333            }
334        }
335    }
336
337    /// Mark the read data as consumed from the stream.
338    ///
339    /// The number of read bytes will be released from the congestion window,
340    /// allowing the remote peer to send more data if it was previously blocked.
341    ///
342    /// If [`ShouldTransmit::should_transmit()`] returns `true`,
343    /// a packet needs to be sent to the peer informing them that the stream is unblocked.
344    /// This means that you should call [`Connection::poll_transmit()`][crate::Connection::poll_transmit]
345    /// and send the returned packet as soon as is reasonable, to unblock the remote peer.
346    pub fn finalize(mut self) -> ShouldTransmit {
347        self.finalize_inner()
348    }
349
350    fn finalize_inner(&mut self) -> ShouldTransmit {
351        let state = mem::replace(&mut self.state, ChunksState::Finalized);
352        if let ChunksState::Finalized = state {
353            // Noop on repeated calls
354            return ShouldTransmit(false);
355        }
356
357        // We issue additional stream ID credit after the application is notified that a previously
358        // open stream has finished or been reset and we've therefore disposed of its state, as
359        // recorded by `stream_freed` calls in `next`.
360        let mut should_transmit = self.streams.queue_max_stream_id(self.pending);
361
362        // If the stream hasn't finished, we may need to issue stream-level flow control credit
363        if let ChunksState::Readable(mut rs) = state {
364            let (_, max_stream_data) = rs.max_stream_data(self.streams.stream_receive_window);
365            should_transmit |= max_stream_data.0;
366            if max_stream_data.0 {
367                self.pending.max_stream_data.insert(self.id);
368            }
369            // Return the stream to storage for future use
370            self.streams
371                .recv
372                .insert(self.id, Some(StreamRecv::Open(rs)));
373        }
374
375        // Issue connection-level flow control credit for any data we read regardless of state
376        let max_data = self.streams.add_read_credits(self.read);
377        self.pending.max_data |= max_data.0;
378        should_transmit |= max_data.0;
379        ShouldTransmit(should_transmit)
380    }
381}
382
383impl Drop for Chunks<'_> {
384    fn drop(&mut self) {
385        let _ = self.finalize_inner();
386    }
387}
388
389enum ChunksState {
390    Readable(Box<Recv>),
391    Reset(VarInt),
392    Finished,
393    Finalized,
394}
395
396/// Errors triggered when reading from a recv stream
397#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
398pub enum ReadError {
399    /// No more data is currently available on this stream.
400    ///
401    /// If more data on this stream is received from the peer, an `Event::StreamReadable` will be
402    /// generated for this stream, indicating that retrying the read might succeed.
403    #[error("blocked")]
404    Blocked,
405    /// The peer abandoned transmitting data on this stream.
406    ///
407    /// Carries an application-defined error code.
408    #[error("reset by peer: code {0}")]
409    Reset(VarInt),
410}
411
412/// Errors triggered when opening a recv stream for reading
413#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
414pub enum ReadableError {
415    /// The stream has not been opened or was already stopped, finished, or reset
416    #[error("closed stream")]
417    ClosedStream,
418    /// Attempted an ordered read following an unordered read
419    ///
420    /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
421    /// stream which cannot be recovered, making further ordered reads impossible.
422    #[error("ordered read after unordered read")]
423    IllegalOrderedRead,
424}
425
426impl From<IllegalOrderedRead> for ReadableError {
427    fn from(_: IllegalOrderedRead) -> Self {
428        Self::IllegalOrderedRead
429    }
430}
431
432#[derive(Debug, Copy, Clone, Eq, PartialEq)]
433enum RecvState {
434    Recv { size: Option<u64> },
435    ResetRecvd { size: u64, error_code: VarInt },
436}
437
438impl Default for RecvState {
439    fn default() -> Self {
440        Self::Recv { size: None }
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use bytes::Bytes;
447
448    use crate::{Dir, Side};
449
450    use super::*;
451
452    #[test]
453    fn reordered_frames_while_stopped() {
454        const INITIAL_BYTES: u64 = 3;
455        const INITIAL_OFFSET: u64 = 3;
456        const RECV_WINDOW: u64 = 8;
457        let mut s = Recv::new(RECV_WINDOW);
458        let mut data_recvd = 0;
459        // Receive bytes 3..6
460        let (new_bytes, is_closed) = s
461            .ingest(
462                frame::Stream {
463                    id: StreamId::new(Side::Client, Dir::Uni, 0),
464                    offset: INITIAL_OFFSET,
465                    fin: false,
466                    data: Bytes::from_static(&[0; INITIAL_BYTES as usize]),
467                },
468                123,
469                data_recvd,
470                data_recvd + 1024,
471            )
472            .unwrap();
473        data_recvd += new_bytes;
474        assert_eq!(new_bytes, INITIAL_OFFSET + INITIAL_BYTES);
475        assert!(!is_closed);
476
477        let (credits, transmit) = s.stop().unwrap();
478        assert!(transmit.should_transmit());
479        assert_eq!(
480            credits,
481            INITIAL_OFFSET + INITIAL_BYTES,
482            "full connection flow control credit is issued by stop"
483        );
484
485        let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
486        assert!(!transmit.should_transmit());
487        assert_eq!(
488            max_stream_data, RECV_WINDOW,
489            "stream flow control credit isn't issued by stop"
490        );
491
492        // Receive byte 7
493        let (new_bytes, is_closed) = s
494            .ingest(
495                frame::Stream {
496                    id: StreamId::new(Side::Client, Dir::Uni, 0),
497                    offset: RECV_WINDOW - 1,
498                    fin: false,
499                    data: Bytes::from_static(&[0; 1]),
500                },
501                123,
502                data_recvd,
503                data_recvd + 1024,
504            )
505            .unwrap();
506        data_recvd += new_bytes;
507        assert_eq!(new_bytes, RECV_WINDOW - (INITIAL_OFFSET + INITIAL_BYTES));
508        assert!(!is_closed);
509
510        let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
511        assert!(!transmit.should_transmit());
512        assert_eq!(
513            max_stream_data, RECV_WINDOW,
514            "stream flow control credit isn't issued after stop"
515        );
516
517        // Receive bytes 0..3
518        let (new_bytes, is_closed) = s
519            .ingest(
520                frame::Stream {
521                    id: StreamId::new(Side::Client, Dir::Uni, 0),
522                    offset: 0,
523                    fin: false,
524                    data: Bytes::from_static(&[0; INITIAL_OFFSET as usize]),
525                },
526                123,
527                data_recvd,
528                data_recvd + 1024,
529            )
530            .unwrap();
531        assert_eq!(
532            new_bytes, 0,
533            "reordered frames don't issue connection-level flow control for stopped streams"
534        );
535        assert!(!is_closed);
536
537        let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
538        assert!(!transmit.should_transmit());
539        assert_eq!(
540            max_stream_data, RECV_WINDOW,
541            "stream flow control credit isn't issued after stop"
542        );
543    }
544}