noq_proto/connection/streams/
mod.rs

1use std::{
2    collections::{BinaryHeap, hash_map},
3    io,
4};
5
6use bytes::Bytes;
7use thiserror::Error;
8use tracing::trace;
9
10use super::spaces::Retransmits;
11use crate::{
12    Dir, StreamId, VarInt,
13    connection::streams::state::{get_or_insert_recv, get_or_insert_send},
14    frame,
15};
16
17mod recv;
18use recv::Recv;
19pub use recv::{Chunks, ReadError, ReadableError};
20
21mod send;
22pub(crate) use send::{ByteSlice, BytesArray, Written};
23use send::{BytesSource, Send, SendState};
24pub use send::{FinishError, WriteError};
25
26mod state;
27#[allow(unreachable_pub)] // fuzzing only
28pub use state::StreamsState;
29
30/// Access to streams
31pub struct Streams<'a> {
32    pub(super) state: &'a mut StreamsState,
33    pub(super) conn_state: &'a super::State,
34}
35
36#[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing)
37impl<'a> Streams<'a> {
38    #[cfg(fuzzing)]
39    pub fn new(state: &'a mut StreamsState, conn_state: &'a super::State) -> Self {
40        Self { state, conn_state }
41    }
42
43    /// Open a single stream if possible
44    ///
45    /// Returns `None` if the streams in the given direction are currently exhausted.
46    pub fn open(&mut self, dir: Dir) -> Option<StreamId> {
47        if self.conn_state.is_closed() {
48            return None;
49        }
50
51        if self.state.next[dir as usize] >= self.state.max[dir as usize] {
52            self.state.streams_blocked[dir as usize] = true;
53            return None;
54        }
55
56        self.state.next[dir as usize] += 1;
57        let id = StreamId::new(self.state.side, dir, self.state.next[dir as usize] - 1);
58        self.state.insert_local(id);
59        self.state.send_streams += 1;
60        Some(id)
61    }
62
63    /// Accept a remotely initiated stream of a certain directionality, if possible
64    ///
65    /// Returns `None` if there are no new incoming streams for this connection.
66    /// Has no impact on the data flow-control or stream concurrency limits.
67    pub fn accept(&mut self, dir: Dir) -> Option<StreamId> {
68        if self.state.next_remote[dir as usize] == self.state.next_reported_remote[dir as usize] {
69            return None;
70        }
71
72        let x = self.state.next_reported_remote[dir as usize];
73        self.state.next_reported_remote[dir as usize] = x + 1;
74        if dir == Dir::Bi {
75            self.state.send_streams += 1;
76        }
77
78        Some(StreamId::new(!self.state.side, dir, x))
79    }
80
81    #[cfg(fuzzing)]
82    pub fn state(&mut self) -> &mut StreamsState {
83        self.state
84    }
85
86    /// The number of streams that may have unacknowledged data.
87    pub fn send_streams(&self) -> usize {
88        self.state.send_streams
89    }
90
91    /// The number of remotely initiated open streams of a certain directionality.
92    ///
93    /// Includes remotely initiated streams, which have not been accepted via [`accept`](Self::accept).
94    /// These streams count against the respective concurrency limit reported by
95    /// [`Connection::max_concurrent_streams`](super::Connection::max_concurrent_streams).
96    pub fn remote_open_streams(&self, dir: Dir) -> u64 {
97        // total opened - total closed = total opened - ( total permitted - total permitted unclosed )
98        self.state.next_remote[dir as usize]
99            - (self.state.max_remote[dir as usize]
100                - self.state.allocated_remote_count[dir as usize])
101    }
102}
103
104/// Access to streams
105pub struct RecvStream<'a> {
106    pub(super) id: StreamId,
107    pub(super) state: &'a mut StreamsState,
108    pub(super) pending: &'a mut Retransmits,
109}
110
111impl RecvStream<'_> {
112    /// Read from the given recv stream
113    ///
114    /// `max_length` limits the maximum size of the returned `Bytes` value; passing `usize::MAX`
115    /// will yield the best performance. `ordered` will make sure the returned chunk's offset will
116    /// have an offset exactly equal to the previously returned offset plus the previously returned
117    /// bytes' length.
118    ///
119    /// Yields `Ok(None)` if the stream was finished. Otherwise, yields a segment of data and its
120    /// offset in the stream. If `ordered` is `false`, segments may be received in any order, and
121    /// the `Chunk`'s `offset` field can be used to determine ordering in the caller.
122    ///
123    /// While most applications will prefer to consume stream data in order, unordered reads can
124    /// improve performance when packet loss occurs and data cannot be retransmitted before the flow
125    /// control window is filled. On any given stream, you can switch from ordered to unordered
126    /// reads, but ordered reads on streams that have seen previous unordered reads will return
127    /// `ReadError::IllegalOrderedRead`.
128    pub fn read(&mut self, ordered: bool) -> Result<Chunks<'_>, ReadableError> {
129        Chunks::new(self.id, ordered, self.state, self.pending)
130    }
131
132    /// Stop accepting data on the given receive stream
133    ///
134    /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
135    /// attempts to operate on a stream will yield `ClosedStream` errors.
136    pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
137        let mut entry = match self.state.recv.entry(self.id) {
138            hash_map::Entry::Occupied(s) => s,
139            hash_map::Entry::Vacant(_) => return Err(ClosedStream { _private: () }),
140        };
141        let stream = get_or_insert_recv(self.state.stream_receive_window)(entry.get_mut());
142
143        let (read_credits, stop_sending) = stream.stop()?;
144        if stop_sending.should_transmit() {
145            self.pending.stop_sending.push(frame::StopSending {
146                id: self.id,
147                error_code,
148            });
149        }
150
151        // We need to keep stopped streams around until they're finished or reset so we can update
152        // connection-level flow control to account for discarded data. Otherwise, we can discard
153        // state immediately.
154        if !stream.final_offset_unknown() {
155            let recv = entry.remove().expect("must have recv when stopping");
156            self.state.stream_recv_freed(self.id, recv);
157        }
158
159        if self.state.add_read_credits(read_credits).should_transmit() {
160            self.pending.max_data = true;
161        }
162
163        Ok(())
164    }
165
166    /// Returns the number of bytes read from this stream.
167    ///
168    /// This is the offset of the next byte to be read, i.e. the length of the contiguous
169    /// prefix of the stream consumed by the application.
170    pub fn bytes_read(&self) -> Result<u64, ClosedStream> {
171        let recv = self
172            .state
173            .recv
174            .get(&self.id)
175            .and_then(|s| s.as_ref())
176            .and_then(|s| s.as_open_recv())
177            .ok_or(ClosedStream { _private: () })?;
178        Ok(recv.assembler.bytes_read())
179    }
180
181    /// Check whether this stream has been reset by the peer, returning the reset error code if so
182    ///
183    /// After returning `Ok(Some(_))` once, stream state will be discarded and all future calls will
184    /// return `Err(ClosedStream)`.
185    pub fn received_reset(&mut self) -> Result<Option<VarInt>, ClosedStream> {
186        let hash_map::Entry::Occupied(entry) = self.state.recv.entry(self.id) else {
187            return Err(ClosedStream { _private: () });
188        };
189        let Some(s) = entry.get().as_ref().and_then(|s| s.as_open_recv()) else {
190            return Ok(None);
191        };
192        if s.stopped {
193            return Err(ClosedStream { _private: () });
194        }
195        let Some(code) = s.reset_code() else {
196            return Ok(None);
197        };
198
199        // Clean up state after application observes the reset, since there's no reason for the
200        // application to attempt to read or stop the stream once it knows it's reset
201        let (_, recv) = entry.remove_entry();
202        self.state
203            .stream_recv_freed(self.id, recv.expect("must have recv on reset"));
204        self.state.queue_max_stream_id(self.pending);
205
206        Ok(Some(code))
207    }
208}
209
210/// Access to streams
211pub struct SendStream<'a> {
212    pub(super) id: StreamId,
213    pub(super) state: &'a mut StreamsState,
214    pub(super) pending: &'a mut Retransmits,
215    pub(super) conn_state: &'a super::State,
216}
217
218#[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing)
219impl<'a> SendStream<'a> {
220    #[cfg(fuzzing)]
221    pub fn new(
222        id: StreamId,
223        state: &'a mut StreamsState,
224        pending: &'a mut Retransmits,
225        conn_state: &'a super::State,
226    ) -> Self {
227        Self {
228            id,
229            state,
230            pending,
231            conn_state,
232        }
233    }
234
235    /// Send data on the given stream
236    ///
237    /// Returns the number of bytes successfully written.
238    pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
239        Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes)
240    }
241
242    /// Send data on the given stream
243    ///
244    /// Returns the number of bytes written and advances the provided `Bytes`
245    /// slice, removing all completely written chunks.
246    ///
247    /// Note that this method might also write a partial chunk. In this case
248    /// the chunk will be advanced and contain only non-written data after the call.
249    pub fn write_chunks(&mut self, data: &mut &mut [Bytes]) -> Result<usize, WriteError> {
250        let written = self.write_source(&mut BytesArray::from_chunks(data))?;
251        *data = &mut std::mem::take(data)[written.chunks..];
252        Ok(written.bytes)
253    }
254
255    fn write_source<'b, B: BytesSource<'b>>(
256        &mut self,
257        source: &'b mut B,
258    ) -> Result<Written, WriteError> {
259        if self.conn_state.is_closed() {
260            trace!(%self.id, "write blocked; connection draining");
261            return Err(WriteError::Blocked);
262        }
263
264        let limit = self.state.write_limit();
265
266        let max_send_data = self.state.max_send_data(self.id);
267
268        let stream = self
269            .state
270            .send
271            .get_mut(&self.id)
272            .map(get_or_insert_send(max_send_data))
273            .ok_or(WriteError::ClosedStream)?;
274
275        if limit == 0 {
276            trace!(
277                stream = %self.id, max_data = self.state.max_data, data_sent = self.state.data_sent,
278                "write blocked by connection-level flow control or send window"
279            );
280            if !stream.connection_blocked {
281                stream.connection_blocked = true;
282                self.state.connection_blocked.push(self.id);
283            }
284            return Err(WriteError::Blocked);
285        }
286
287        let was_pending = stream.is_pending();
288        let written = stream.write(source, limit)?;
289        self.state.data_sent += written.bytes as u64;
290        self.state.unacked_data += written.bytes as u64;
291        trace!(stream = %self.id, "wrote {} bytes", written.bytes);
292        if !was_pending {
293            self.state.pending.push_pending(self.id, stream.priority);
294        }
295        Ok(written)
296    }
297
298    /// Check if this stream was stopped, get the reason if it was
299    pub fn stopped(&self) -> Result<Option<VarInt>, ClosedStream> {
300        match self.state.send.get(&self.id).as_ref() {
301            Some(Some(s)) => Ok(s.stop_reason),
302            Some(None) => Ok(None),
303            None => Err(ClosedStream { _private: () }),
304        }
305    }
306
307    /// Finish a send stream, signalling that no more data will be sent.
308    ///
309    /// If this fails, no [`StreamEvent::Finished`] will be generated.
310    ///
311    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
312    pub fn finish(&mut self) -> Result<(), FinishError> {
313        let max_send_data = self.state.max_send_data(self.id);
314        let stream = self
315            .state
316            .send
317            .get_mut(&self.id)
318            .map(get_or_insert_send(max_send_data))
319            .ok_or(FinishError::ClosedStream)?;
320
321        let was_pending = stream.is_pending();
322        stream.finish()?;
323        if !was_pending {
324            self.state.pending.push_pending(self.id, stream.priority);
325        }
326
327        Ok(())
328    }
329
330    /// Abandon transmitting data on a stream
331    ///
332    /// # Panics
333    /// - when applied to a receive stream
334    pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
335        let max_send_data = self.state.max_send_data(self.id);
336        let stream = self
337            .state
338            .send
339            .get_mut(&self.id)
340            .map(get_or_insert_send(max_send_data))
341            .ok_or(ClosedStream { _private: () })?;
342
343        if matches!(stream.state, SendState::ResetSent) {
344            // Redundant reset call
345            return Err(ClosedStream { _private: () });
346        }
347
348        // Restore the portion of the send window consumed by the data that we aren't about to
349        // send. We leave flow control alone because the peer's responsible for issuing additional
350        // credit based on the final offset communicated in the RESET_STREAM frame we send.
351        self.state.unacked_data -= stream.pending.unacked();
352        stream.reset();
353        self.pending.reset_stream.push((self.id, error_code));
354
355        // Don't reopen an already-closed stream we haven't forgotten yet
356        Ok(())
357    }
358
359    /// Set the priority of a stream
360    ///
361    /// # Panics
362    /// - when applied to a receive stream
363    pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> {
364        let max_send_data = self.state.max_send_data(self.id);
365        let stream = self
366            .state
367            .send
368            .get_mut(&self.id)
369            .map(get_or_insert_send(max_send_data))
370            .ok_or(ClosedStream { _private: () })?;
371
372        stream.priority = priority;
373        Ok(())
374    }
375
376    /// Get the priority of a stream
377    ///
378    /// # Panics
379    /// - when applied to a receive stream
380    pub fn priority(&self) -> Result<i32, ClosedStream> {
381        let stream = self
382            .state
383            .send
384            .get(&self.id)
385            .ok_or(ClosedStream { _private: () })?;
386
387        Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
388    }
389}
390
391/// A queue of streams with pending outgoing data, sorted by priority
392struct PendingStreamsQueue {
393    streams: BinaryHeap<PendingStream>,
394    /// The next stream to write out. This is `Some` when `TransportConfig::send_fairness(false)` and writing a stream is
395    /// interrupted while the stream still has some pending data. See `reinsert_pending()`.
396    next: Option<PendingStream>,
397    /// A monotonically decreasing counter, used to implement round-robin scheduling for streams of the same priority.
398    /// Underflowing is not a practical concern, as it is initialized to u64::MAX and only decremented by 1 in `push_pending`
399    recency: u64,
400}
401
402impl PendingStreamsQueue {
403    fn new() -> Self {
404        Self {
405            streams: BinaryHeap::new(),
406            next: None,
407            recency: u64::MAX,
408        }
409    }
410
411    /// Reinsert a stream that was pending and still contains unsent data.
412    fn reinsert_pending(&mut self, id: StreamId, priority: i32) {
413        assert!(self.next.is_none());
414
415        self.next = Some(PendingStream {
416            priority,
417            recency: self.recency, // the value here doesn't really matter
418            id,
419        });
420    }
421
422    /// Push a pending stream ID with the given priority, queued after any already-queued streams for the priority
423    fn push_pending(&mut self, id: StreamId, priority: i32) {
424        // Note that in the case where fairness is disabled, if we have a reinserted stream we don't
425        // bump it even if priority > next.priority. In order to minimize fragmentation we
426        // always try to complete a stream once part of it has been written.
427
428        // As the recency counter is monotonically decreasing, we know that using its value to sort this stream will queue it
429        // after all other queued streams of the same priority.
430        // This is enough to implement round-robin scheduling for streams that are still pending even after being handled,
431        // as in that case they are removed from the `BinaryHeap`, handled, and then immediately reinserted.
432        self.recency -= 1;
433        self.streams.push(PendingStream {
434            priority,
435            recency: self.recency,
436            id,
437        });
438    }
439
440    fn pop(&mut self) -> Option<PendingStream> {
441        self.next.take().or_else(|| self.streams.pop())
442    }
443
444    fn clear(&mut self) {
445        self.next = None;
446        self.streams.clear();
447    }
448
449    fn iter(&self) -> impl Iterator<Item = &PendingStream> {
450        self.next.iter().chain(self.streams.iter())
451    }
452
453    #[cfg(test)]
454    fn len(&self) -> usize {
455        self.streams.len() + self.next.is_some() as usize
456    }
457}
458
459/// The [`StreamId`] of a stream with pending data queued, ordered by its priority and recency
460#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
461struct PendingStream {
462    /// The priority of the stream
463    // Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct
464    // (See https://doc.rust-lang.org/stable/std/cmp/trait.Ord.html#derivable)
465    priority: i32,
466    /// A tie-breaker for streams of the same priority, used to improve fairness by implementing round-robin scheduling:
467    /// Larger values are prioritized, so it is initialised to `u64::MAX`, and when a stream writes data, we know
468    /// that it currently has the highest recency value, so it is deprioritized by setting its recency to 1 less than the
469    /// previous lowest recency value, such that all other streams of this priority will get processed once before we get back
470    /// round to this one
471    recency: u64,
472    /// The ID of the stream
473    // The way this type is used ensures that every instance has a unique `recency` value, so this field should be kept below
474    // the `priority` and `recency` fields, so that it does not interfere with the behaviour of the `Ord` derive
475    id: StreamId,
476}
477
478/// Application events about streams
479#[derive(Debug, PartialEq, Eq)]
480pub enum StreamEvent {
481    /// One or more new streams has been opened and might be readable
482    Opened {
483        /// Directionality for which streams have been opened
484        dir: Dir,
485    },
486    /// A currently open stream likely has data or errors waiting to be read
487    Readable {
488        /// Which stream is now readable
489        id: StreamId,
490    },
491    /// A formerly write-blocked stream might be ready for a write or have been stopped
492    ///
493    /// Only generated for streams that are currently open.
494    Writable {
495        /// Which stream is now writable
496        id: StreamId,
497    },
498    /// A finished stream has been fully acknowledged or stopped
499    Finished {
500        /// Which stream has been finished
501        id: StreamId,
502    },
503    /// The peer asked us to stop sending on an outgoing stream
504    Stopped {
505        /// Which stream has been stopped
506        id: StreamId,
507        /// Error code supplied by the peer
508        error_code: VarInt,
509    },
510    /// At least one new stream of a certain directionality may be opened
511    Available {
512        /// Directionality for which streams are newly available
513        dir: Dir,
514    },
515}
516
517/// Indicates whether a frame needs to be transmitted
518///
519/// This type wraps around bool and uses the `#[must_use]` attribute in order
520/// to prevent accidental loss of the frame transmission requirement.
521#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
522#[must_use = "A frame might need to be enqueued"]
523pub struct ShouldTransmit(bool);
524
525impl ShouldTransmit {
526    /// Returns whether a frame should be transmitted
527    pub fn should_transmit(self) -> bool {
528        self.0
529    }
530}
531
532/// Error indicating that a stream has not been opened or has already been finished or reset
533#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
534#[error("closed stream")]
535pub struct ClosedStream {
536    _private: (),
537}
538
539impl From<ClosedStream> for io::Error {
540    fn from(x: ClosedStream) -> Self {
541        Self::new(io::ErrorKind::NotConnected, x)
542    }
543}
544
545#[derive(Debug, Copy, Clone, Eq, PartialEq)]
546enum StreamHalf {
547    Send,
548    Recv,
549}
550
551/// A helper trait to unify Bytes, `Vec<u8>` and `&[u8]` as sources of bytes
552pub(super) trait BytesOrSlice<'a>: AsRef<[u8]> + 'a {
553    fn len(&self) -> usize {
554        self.as_ref().len()
555    }
556    fn is_empty(&self) -> bool {
557        self.as_ref().is_empty()
558    }
559    fn into_bytes(self) -> Bytes;
560}
561
562impl BytesOrSlice<'_> for Bytes {
563    fn into_bytes(self) -> Bytes {
564        self
565    }
566}
567
568impl<'a> BytesOrSlice<'a> for &'a [u8] {
569    fn into_bytes(self) -> Bytes {
570        Bytes::copy_from_slice(self)
571    }
572}