iroh_quinn_proto/connection/streams/mod.rs
1use std::{
2 collections::{BinaryHeap, hash_map},
3 io,
4};
5
6use bytes::Bytes;
7use thiserror::Error;
8use tracing::trace;
9
10use super::spaces::Retransmits;
11use crate::{
12 Dir, StreamId, VarInt,
13 connection::streams::state::{get_or_insert_recv, get_or_insert_send},
14 frame,
15};
16
17mod recv;
18use recv::Recv;
19pub use recv::{Chunks, ReadError, ReadableError};
20
21mod send;
22pub(crate) use send::{ByteSlice, BytesArray};
23use send::{BytesSource, Send, SendState};
24pub use send::{FinishError, WriteError, Written};
25
26mod state;
27#[allow(unreachable_pub)] // fuzzing only
28pub use state::StreamsState;
29
30/// Access to streams
31pub struct Streams<'a> {
32 pub(super) state: &'a mut StreamsState,
33 pub(super) conn_state: &'a super::State,
34}
35
36#[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing)
37impl<'a> Streams<'a> {
38 #[cfg(fuzzing)]
39 pub fn new(state: &'a mut StreamsState, conn_state: &'a super::State) -> Self {
40 Self { state, conn_state }
41 }
42
43 /// Open a single stream if possible
44 ///
45 /// Returns `None` if the streams in the given direction are currently exhausted.
46 pub fn open(&mut self, dir: Dir) -> Option<StreamId> {
47 if self.conn_state.is_closed() {
48 return None;
49 }
50
51 // TODO: Queue STREAM_ID_BLOCKED if this fails
52 if self.state.next[dir as usize] >= self.state.max[dir as usize] {
53 return None;
54 }
55
56 self.state.next[dir as usize] += 1;
57 let id = StreamId::new(self.state.side, dir, self.state.next[dir as usize] - 1);
58 self.state.insert(false, id);
59 self.state.send_streams += 1;
60 Some(id)
61 }
62
63 /// Accept a remotely initiated stream of a certain directionality, if possible
64 ///
65 /// Returns `None` if there are no new incoming streams for this connection.
66 /// Has no impact on the data flow-control or stream concurrency limits.
67 pub fn accept(&mut self, dir: Dir) -> Option<StreamId> {
68 if self.state.next_remote[dir as usize] == self.state.next_reported_remote[dir as usize] {
69 return None;
70 }
71
72 let x = self.state.next_reported_remote[dir as usize];
73 self.state.next_reported_remote[dir as usize] = x + 1;
74 if dir == Dir::Bi {
75 self.state.send_streams += 1;
76 }
77
78 Some(StreamId::new(!self.state.side, dir, x))
79 }
80
81 #[cfg(fuzzing)]
82 pub fn state(&mut self) -> &mut StreamsState {
83 self.state
84 }
85
86 /// The number of streams that may have unacknowledged data.
87 pub fn send_streams(&self) -> usize {
88 self.state.send_streams
89 }
90
91 /// The number of remotely initiated open streams of a certain directionality.
92 ///
93 /// Includes remotely initiated streams, which have not been accepted via [`accept`](Self::accept).
94 /// These streams count against the respective concurrency limit reported by
95 /// [`Connection::max_concurrent_streams`](super::Connection::max_concurrent_streams).
96 pub fn remote_open_streams(&self, dir: Dir) -> u64 {
97 // total opened - total closed = total opened - ( total permitted - total permitted unclosed )
98 self.state.next_remote[dir as usize]
99 - (self.state.max_remote[dir as usize]
100 - self.state.allocated_remote_count[dir as usize])
101 }
102}
103
104/// Access to streams
105pub struct RecvStream<'a> {
106 pub(super) id: StreamId,
107 pub(super) state: &'a mut StreamsState,
108 pub(super) pending: &'a mut Retransmits,
109}
110
111impl RecvStream<'_> {
112 /// Read from the given recv stream
113 ///
114 /// `max_length` limits the maximum size of the returned `Bytes` value; passing `usize::MAX`
115 /// will yield the best performance. `ordered` will make sure the returned chunk's offset will
116 /// have an offset exactly equal to the previously returned offset plus the previously returned
117 /// bytes' length.
118 ///
119 /// Yields `Ok(None)` if the stream was finished. Otherwise, yields a segment of data and its
120 /// offset in the stream. If `ordered` is `false`, segments may be received in any order, and
121 /// the `Chunk`'s `offset` field can be used to determine ordering in the caller.
122 ///
123 /// While most applications will prefer to consume stream data in order, unordered reads can
124 /// improve performance when packet loss occurs and data cannot be retransmitted before the flow
125 /// control window is filled. On any given stream, you can switch from ordered to unordered
126 /// reads, but ordered reads on streams that have seen previous unordered reads will return
127 /// `ReadError::IllegalOrderedRead`.
128 pub fn read(&mut self, ordered: bool) -> Result<Chunks<'_>, ReadableError> {
129 Chunks::new(self.id, ordered, self.state, self.pending)
130 }
131
132 /// Stop accepting data on the given receive stream
133 ///
134 /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
135 /// attempts to operate on a stream will yield `ClosedStream` errors.
136 pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
137 let mut entry = match self.state.recv.entry(self.id) {
138 hash_map::Entry::Occupied(s) => s,
139 hash_map::Entry::Vacant(_) => return Err(ClosedStream { _private: () }),
140 };
141 let stream = get_or_insert_recv(self.state.stream_receive_window)(entry.get_mut());
142
143 let (read_credits, stop_sending) = stream.stop()?;
144 if stop_sending.should_transmit() {
145 self.pending.stop_sending.push(frame::StopSending {
146 id: self.id,
147 error_code,
148 });
149 }
150
151 // We need to keep stopped streams around until they're finished or reset so we can update
152 // connection-level flow control to account for discarded data. Otherwise, we can discard
153 // state immediately.
154 if !stream.final_offset_unknown() {
155 let recv = entry.remove().expect("must have recv when stopping");
156 self.state.stream_recv_freed(self.id, recv);
157 }
158
159 if self.state.add_read_credits(read_credits).should_transmit() {
160 self.pending.max_data = true;
161 }
162
163 Ok(())
164 }
165
166 /// Check whether this stream has been reset by the peer, returning the reset error code if so
167 ///
168 /// After returning `Ok(Some(_))` once, stream state will be discarded and all future calls will
169 /// return `Err(ClosedStream)`.
170 pub fn received_reset(&mut self) -> Result<Option<VarInt>, ClosedStream> {
171 let hash_map::Entry::Occupied(entry) = self.state.recv.entry(self.id) else {
172 return Err(ClosedStream { _private: () });
173 };
174 let Some(s) = entry.get().as_ref().and_then(|s| s.as_open_recv()) else {
175 return Ok(None);
176 };
177 if s.stopped {
178 return Err(ClosedStream { _private: () });
179 }
180 let Some(code) = s.reset_code() else {
181 return Ok(None);
182 };
183
184 // Clean up state after application observes the reset, since there's no reason for the
185 // application to attempt to read or stop the stream once it knows it's reset
186 let (_, recv) = entry.remove_entry();
187 self.state
188 .stream_recv_freed(self.id, recv.expect("must have recv on reset"));
189 self.state.queue_max_stream_id(self.pending);
190
191 Ok(Some(code))
192 }
193}
194
195/// Access to streams
196pub struct SendStream<'a> {
197 pub(super) id: StreamId,
198 pub(super) state: &'a mut StreamsState,
199 pub(super) pending: &'a mut Retransmits,
200 pub(super) conn_state: &'a super::State,
201}
202
203#[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing)
204impl<'a> SendStream<'a> {
205 #[cfg(fuzzing)]
206 pub fn new(
207 id: StreamId,
208 state: &'a mut StreamsState,
209 pending: &'a mut Retransmits,
210 conn_state: &'a super::State,
211 ) -> Self {
212 Self {
213 id,
214 state,
215 pending,
216 conn_state,
217 }
218 }
219
220 /// Send data on the given stream
221 ///
222 /// Returns the number of bytes successfully written.
223 pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
224 Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes)
225 }
226
227 /// Send data on the given stream
228 ///
229 /// Returns the number of bytes and chunks successfully written.
230 /// Note that this method might also write a partial chunk. In this case
231 /// [`Written::chunks`] will not count this chunk as fully written. However
232 /// the chunk will be advanced and contain only non-written data after the call.
233 pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> {
234 self.write_source(&mut BytesArray::from_chunks(data))
235 }
236
237 fn write_source<'b, B: BytesSource<'b>>(
238 &mut self,
239 source: &'b mut B,
240 ) -> Result<Written, WriteError> {
241 if self.conn_state.is_closed() {
242 trace!(%self.id, "write blocked; connection draining");
243 return Err(WriteError::Blocked);
244 }
245
246 let limit = self.state.write_limit();
247
248 let max_send_data = self.state.max_send_data(self.id);
249
250 let stream = self
251 .state
252 .send
253 .get_mut(&self.id)
254 .map(get_or_insert_send(max_send_data))
255 .ok_or(WriteError::ClosedStream)?;
256
257 if limit == 0 {
258 trace!(
259 stream = %self.id, max_data = self.state.max_data, data_sent = self.state.data_sent,
260 "write blocked by connection-level flow control or send window"
261 );
262 if !stream.connection_blocked {
263 stream.connection_blocked = true;
264 self.state.connection_blocked.push(self.id);
265 }
266 return Err(WriteError::Blocked);
267 }
268
269 let was_pending = stream.is_pending();
270 let written = stream.write(source, limit)?;
271 self.state.data_sent += written.bytes as u64;
272 self.state.unacked_data += written.bytes as u64;
273 trace!(stream = %self.id, "wrote {} bytes", written.bytes);
274 if !was_pending {
275 self.state.pending.push_pending(self.id, stream.priority);
276 }
277 Ok(written)
278 }
279
280 /// Check if this stream was stopped, get the reason if it was
281 pub fn stopped(&self) -> Result<Option<VarInt>, ClosedStream> {
282 match self.state.send.get(&self.id).as_ref() {
283 Some(Some(s)) => Ok(s.stop_reason),
284 Some(None) => Ok(None),
285 None => Err(ClosedStream { _private: () }),
286 }
287 }
288
289 /// Finish a send stream, signalling that no more data will be sent.
290 ///
291 /// If this fails, no [`StreamEvent::Finished`] will be generated.
292 ///
293 /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
294 pub fn finish(&mut self) -> Result<(), FinishError> {
295 let max_send_data = self.state.max_send_data(self.id);
296 let stream = self
297 .state
298 .send
299 .get_mut(&self.id)
300 .map(get_or_insert_send(max_send_data))
301 .ok_or(FinishError::ClosedStream)?;
302
303 let was_pending = stream.is_pending();
304 stream.finish()?;
305 if !was_pending {
306 self.state.pending.push_pending(self.id, stream.priority);
307 }
308
309 Ok(())
310 }
311
312 /// Abandon transmitting data on a stream
313 ///
314 /// # Panics
315 /// - when applied to a receive stream
316 pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
317 let max_send_data = self.state.max_send_data(self.id);
318 let stream = self
319 .state
320 .send
321 .get_mut(&self.id)
322 .map(get_or_insert_send(max_send_data))
323 .ok_or(ClosedStream { _private: () })?;
324
325 if matches!(stream.state, SendState::ResetSent) {
326 // Redundant reset call
327 return Err(ClosedStream { _private: () });
328 }
329
330 // Restore the portion of the send window consumed by the data that we aren't about to
331 // send. We leave flow control alone because the peer's responsible for issuing additional
332 // credit based on the final offset communicated in the RESET_STREAM frame we send.
333 self.state.unacked_data -= stream.pending.unacked();
334 stream.reset();
335 self.pending.reset_stream.push((self.id, error_code));
336
337 // Don't reopen an already-closed stream we haven't forgotten yet
338 Ok(())
339 }
340
341 /// Set the priority of a stream
342 ///
343 /// # Panics
344 /// - when applied to a receive stream
345 pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> {
346 let max_send_data = self.state.max_send_data(self.id);
347 let stream = self
348 .state
349 .send
350 .get_mut(&self.id)
351 .map(get_or_insert_send(max_send_data))
352 .ok_or(ClosedStream { _private: () })?;
353
354 stream.priority = priority;
355 Ok(())
356 }
357
358 /// Get the priority of a stream
359 ///
360 /// # Panics
361 /// - when applied to a receive stream
362 pub fn priority(&self) -> Result<i32, ClosedStream> {
363 let stream = self
364 .state
365 .send
366 .get(&self.id)
367 .ok_or(ClosedStream { _private: () })?;
368
369 Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
370 }
371}
372
373/// A queue of streams with pending outgoing data, sorted by priority
374struct PendingStreamsQueue {
375 streams: BinaryHeap<PendingStream>,
376 /// The next stream to write out. This is `Some` when `TransportConfig::send_fairness(false)` and writing a stream is
377 /// interrupted while the stream still has some pending data. See `reinsert_pending()`.
378 next: Option<PendingStream>,
379 /// A monotonically decreasing counter, used to implement round-robin scheduling for streams of the same priority.
380 /// Underflowing is not a practical concern, as it is initialized to u64::MAX and only decremented by 1 in `push_pending`
381 recency: u64,
382}
383
384impl PendingStreamsQueue {
385 fn new() -> Self {
386 Self {
387 streams: BinaryHeap::new(),
388 next: None,
389 recency: u64::MAX,
390 }
391 }
392
393 /// Reinsert a stream that was pending and still contains unsent data.
394 fn reinsert_pending(&mut self, id: StreamId, priority: i32) {
395 assert!(self.next.is_none());
396
397 self.next = Some(PendingStream {
398 priority,
399 recency: self.recency, // the value here doesn't really matter
400 id,
401 });
402 }
403
404 /// Push a pending stream ID with the given priority, queued after any already-queued streams for the priority
405 fn push_pending(&mut self, id: StreamId, priority: i32) {
406 // Note that in the case where fairness is disabled, if we have a reinserted stream we don't
407 // bump it even if priority > next.priority. In order to minimize fragmentation we
408 // always try to complete a stream once part of it has been written.
409
410 // As the recency counter is monotonically decreasing, we know that using its value to sort this stream will queue it
411 // after all other queued streams of the same priority.
412 // This is enough to implement round-robin scheduling for streams that are still pending even after being handled,
413 // as in that case they are removed from the `BinaryHeap`, handled, and then immediately reinserted.
414 self.recency -= 1;
415 self.streams.push(PendingStream {
416 priority,
417 recency: self.recency,
418 id,
419 });
420 }
421
422 fn pop(&mut self) -> Option<PendingStream> {
423 self.next.take().or_else(|| self.streams.pop())
424 }
425
426 fn clear(&mut self) {
427 self.next = None;
428 self.streams.clear();
429 }
430
431 fn iter(&self) -> impl Iterator<Item = &PendingStream> {
432 self.next.iter().chain(self.streams.iter())
433 }
434
435 #[cfg(test)]
436 fn len(&self) -> usize {
437 self.streams.len() + self.next.is_some() as usize
438 }
439}
440
441/// The [`StreamId`] of a stream with pending data queued, ordered by its priority and recency
442#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
443struct PendingStream {
444 /// The priority of the stream
445 // Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct
446 // (See https://doc.rust-lang.org/stable/std/cmp/trait.Ord.html#derivable)
447 priority: i32,
448 /// A tie-breaker for streams of the same priority, used to improve fairness by implementing round-robin scheduling:
449 /// Larger values are prioritized, so it is initialised to `u64::MAX`, and when a stream writes data, we know
450 /// that it currently has the highest recency value, so it is deprioritized by setting its recency to 1 less than the
451 /// previous lowest recency value, such that all other streams of this priority will get processed once before we get back
452 /// round to this one
453 recency: u64,
454 /// The ID of the stream
455 // The way this type is used ensures that every instance has a unique `recency` value, so this field should be kept below
456 // the `priority` and `recency` fields, so that it does not interfere with the behaviour of the `Ord` derive
457 id: StreamId,
458}
459
460/// Application events about streams
461#[derive(Debug, PartialEq, Eq)]
462pub enum StreamEvent {
463 /// One or more new streams has been opened and might be readable
464 Opened {
465 /// Directionality for which streams have been opened
466 dir: Dir,
467 },
468 /// A currently open stream likely has data or errors waiting to be read
469 Readable {
470 /// Which stream is now readable
471 id: StreamId,
472 },
473 /// A formerly write-blocked stream might be ready for a write or have been stopped
474 ///
475 /// Only generated for streams that are currently open.
476 Writable {
477 /// Which stream is now writable
478 id: StreamId,
479 },
480 /// A finished stream has been fully acknowledged or stopped
481 Finished {
482 /// Which stream has been finished
483 id: StreamId,
484 },
485 /// The peer asked us to stop sending on an outgoing stream
486 Stopped {
487 /// Which stream has been stopped
488 id: StreamId,
489 /// Error code supplied by the peer
490 error_code: VarInt,
491 },
492 /// At least one new stream of a certain directionality may be opened
493 Available {
494 /// Directionality for which streams are newly available
495 dir: Dir,
496 },
497}
498
499/// Indicates whether a frame needs to be transmitted
500///
501/// This type wraps around bool and uses the `#[must_use]` attribute in order
502/// to prevent accidental loss of the frame transmission requirement.
503#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
504#[must_use = "A frame might need to be enqueued"]
505pub struct ShouldTransmit(bool);
506
507impl ShouldTransmit {
508 /// Returns whether a frame should be transmitted
509 pub fn should_transmit(self) -> bool {
510 self.0
511 }
512}
513
514/// Error indicating that a stream has not been opened or has already been finished or reset
515#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
516#[error("closed stream")]
517pub struct ClosedStream {
518 _private: (),
519}
520
521impl From<ClosedStream> for io::Error {
522 fn from(x: ClosedStream) -> Self {
523 Self::new(io::ErrorKind::NotConnected, x)
524 }
525}
526
527#[derive(Debug, Copy, Clone, Eq, PartialEq)]
528enum StreamHalf {
529 Send,
530 Recv,
531}
532
533/// A helper trait to unify Bytes, Vec<u8> and &[u8] as sources of bytes
534pub(super) trait BytesOrSlice<'a>: AsRef<[u8]> + 'a {
535 fn len(&self) -> usize {
536 self.as_ref().len()
537 }
538 fn is_empty(&self) -> bool {
539 self.as_ref().is_empty()
540 }
541 fn into_bytes(self) -> Bytes;
542}
543
544impl BytesOrSlice<'_> for Bytes {
545 fn into_bytes(self) -> Bytes {
546 self
547 }
548}
549
550impl<'a> BytesOrSlice<'a> for &'a [u8] {
551 fn into_bytes(self) -> Bytes {
552 Bytes::copy_from_slice(self)
553 }
554}