1use std::collections::hash_map::Entry;
2use std::mem;
3
4use thiserror::Error;
5use tracing::debug;
6
7use super::state::get_or_insert_recv;
8use super::{ClosedStream, Retransmits, ShouldTransmit, StreamId, StreamsState};
9use crate::connection::assembler::{Assembler, Chunk, IllegalOrderedRead};
10use crate::connection::streams::state::StreamRecv;
11use crate::{TransportError, VarInt, frame};
12
13#[derive(Debug, Default)]
14pub(super) struct Recv {
15 state: RecvState,
17 pub(super) assembler: Assembler,
18 sent_max_stream_data: u64,
19 pub(super) end: u64,
20 pub(super) stopped: bool,
21}
22
23impl Recv {
24 pub(super) fn new(initial_max_data: u64) -> Box<Self> {
25 Box::new(Self {
26 state: RecvState::default(),
27 assembler: Assembler::new(),
28 sent_max_stream_data: initial_max_data,
29 end: 0,
30 stopped: false,
31 })
32 }
33
34 pub(super) fn reinit(&mut self, initial_max_data: u64) {
36 self.state = RecvState::default();
37 self.assembler.reinit();
38 self.sent_max_stream_data = initial_max_data;
39 self.end = 0;
40 self.stopped = false;
41 }
42
43 pub(super) fn ingest(
47 &mut self,
48 frame: frame::Stream,
49 payload_len: usize,
50 received: u64,
51 max_data: u64,
52 ) -> Result<(u64, bool), TransportError> {
53 let end = frame.offset + frame.data.len() as u64;
54 if end >= 2u64.pow(62) {
55 return Err(TransportError::FLOW_CONTROL_ERROR(
56 "maximum stream offset too large",
57 ));
58 }
59
60 if let Some(final_offset) = self.final_offset()
61 && (end > final_offset || (frame.fin && end != final_offset))
62 {
63 debug!(end, final_offset, "final size error");
64 return Err(TransportError::FINAL_SIZE_ERROR(""));
65 }
66
67 let new_bytes = self.credit_consumed_by(end, received, max_data)?;
68
69 if frame.fin
72 && !self.stopped
73 && let RecvState::Recv { ref mut size } = self.state
74 {
75 *size = Some(end);
76 }
77
78 self.end = self.end.max(end);
79 if !self.stopped {
82 self.assembler.insert(frame.offset, frame.data, payload_len);
83 }
84
85 Ok((new_bytes, frame.fin && self.stopped))
86 }
87
88 pub(super) fn stop(&mut self) -> Result<(u64, ShouldTransmit), ClosedStream> {
89 if self.stopped {
90 return Err(ClosedStream { _private: () });
91 }
92
93 self.stopped = true;
94 self.assembler.clear();
95 let read_credits = self.end - self.assembler.bytes_read();
97 Ok((read_credits, ShouldTransmit(self.is_receiving())))
102 }
103
104 pub(super) fn max_stream_data(&mut self, stream_receive_window: u64) -> (u64, ShouldTransmit) {
112 let max_stream_data = self.assembler.bytes_read() + stream_receive_window;
113
114 let diff = max_stream_data - self.sent_max_stream_data;
122 let transmit = self.can_send_flow_control() && diff >= (stream_receive_window / 8);
123 (max_stream_data, ShouldTransmit(transmit))
124 }
125
126 pub(super) fn record_sent_max_stream_data(&mut self, sent_value: u64) {
132 if sent_value > self.sent_max_stream_data {
133 self.sent_max_stream_data = sent_value;
134 }
135 }
136
137 pub(super) fn final_offset_unknown(&self) -> bool {
145 matches!(self.state, RecvState::Recv { size: None })
146 }
147
148 pub(super) fn can_send_flow_control(&self) -> bool {
150 self.final_offset_unknown() && !self.stopped
153 }
154
155 pub(super) fn is_receiving(&self) -> bool {
157 matches!(self.state, RecvState::Recv { .. })
158 }
159
160 fn final_offset(&self) -> Option<u64> {
161 match self.state {
162 RecvState::Recv { size } => size,
163 RecvState::ResetRecvd { size, .. } => Some(size),
164 }
165 }
166
167 pub(super) fn reset(
169 &mut self,
170 error_code: VarInt,
171 final_offset: VarInt,
172 received: u64,
173 max_data: u64,
174 ) -> Result<bool, TransportError> {
175 if let Some(offset) = self.final_offset() {
177 if offset != final_offset.into_inner() {
178 return Err(TransportError::FINAL_SIZE_ERROR("inconsistent value"));
179 }
180 } else if self.end > u64::from(final_offset) {
181 return Err(TransportError::FINAL_SIZE_ERROR(
182 "lower than high water mark",
183 ));
184 }
185 self.credit_consumed_by(final_offset.into(), received, max_data)?;
186
187 if matches!(self.state, RecvState::ResetRecvd { .. }) {
188 return Ok(false);
189 }
190 self.state = RecvState::ResetRecvd {
191 size: final_offset.into(),
192 error_code,
193 };
194 self.assembler.clear();
199 Ok(true)
200 }
201
202 pub(super) fn reset_code(&self) -> Option<VarInt> {
203 match self.state {
204 RecvState::ResetRecvd { error_code, .. } => Some(error_code),
205 _ => None,
206 }
207 }
208
209 fn credit_consumed_by(
212 &self,
213 offset: u64,
214 received: u64,
215 max_data: u64,
216 ) -> Result<u64, TransportError> {
217 let prev_end = self.end;
218 let new_bytes = offset.saturating_sub(prev_end);
219 if offset > self.sent_max_stream_data || received + new_bytes > max_data {
220 debug!(
221 received,
222 new_bytes,
223 max_data,
224 offset,
225 stream_max_data = self.sent_max_stream_data,
226 "flow control error"
227 );
228 return Err(TransportError::FLOW_CONTROL_ERROR(""));
229 }
230
231 Ok(new_bytes)
232 }
233}
234
235pub struct Chunks<'a> {
247 id: StreamId,
248 ordered: bool,
249 streams: &'a mut StreamsState,
250 pending: &'a mut Retransmits,
251 state: ChunksState,
252 read: u64,
253}
254
255impl<'a> Chunks<'a> {
256 pub(super) fn new(
257 id: StreamId,
258 ordered: bool,
259 streams: &'a mut StreamsState,
260 pending: &'a mut Retransmits,
261 ) -> Result<Self, ReadableError> {
262 let mut entry = match streams.recv.entry(id) {
263 Entry::Occupied(entry) => entry,
264 Entry::Vacant(_) => return Err(ReadableError::ClosedStream),
265 };
266
267 let mut recv =
268 match get_or_insert_recv(streams.stream_receive_window)(entry.get_mut()).stopped {
269 true => return Err(ReadableError::ClosedStream),
270 false => entry.remove().unwrap().into_inner(), };
272
273 recv.assembler.ensure_ordering(ordered)?;
274 Ok(Self {
275 id,
276 ordered,
277 streams,
278 pending,
279 state: ChunksState::Readable(recv),
280 read: 0,
281 })
282 }
283
284 pub fn next(&mut self, max_length: usize) -> Result<Option<Chunk>, ReadError> {
288 let rs = match self.state {
289 ChunksState::Readable(ref mut rs) => rs,
290 ChunksState::Reset(error_code) => {
291 return Err(ReadError::Reset(error_code));
292 }
293 ChunksState::Finished => {
294 return Ok(None);
295 }
296 ChunksState::Finalized => panic!("must not call next() after finalize()"),
297 };
298
299 if let Some(chunk) = rs.assembler.read(max_length, self.ordered) {
300 self.read += chunk.bytes.len() as u64;
301 return Ok(Some(chunk));
302 }
303
304 match rs.state {
305 RecvState::ResetRecvd { error_code, .. } => {
306 debug_assert_eq!(self.read, 0, "reset streams have empty buffers");
307 let state = mem::replace(&mut self.state, ChunksState::Reset(error_code));
308 let recv = match state {
310 ChunksState::Readable(recv) => StreamRecv::Open(recv),
311 _ => unreachable!("state must be ChunkState::Readable"),
312 };
313 self.streams.stream_recv_freed(self.id, recv);
314 Err(ReadError::Reset(error_code))
315 }
316 RecvState::Recv { size } => {
317 if size == Some(rs.end) && rs.assembler.bytes_read() == rs.end {
318 let state = mem::replace(&mut self.state, ChunksState::Finished);
319 let recv = match state {
321 ChunksState::Readable(recv) => StreamRecv::Open(recv),
322 _ => unreachable!("state must be ChunkState::Readable"),
323 };
324 self.streams.stream_recv_freed(self.id, recv);
325 Ok(None)
326 } else {
327 Err(ReadError::Blocked)
332 }
333 }
334 }
335 }
336
337 pub fn finalize(mut self) -> ShouldTransmit {
347 self.finalize_inner()
348 }
349
350 fn finalize_inner(&mut self) -> ShouldTransmit {
351 let state = mem::replace(&mut self.state, ChunksState::Finalized);
352 if let ChunksState::Finalized = state {
353 return ShouldTransmit(false);
355 }
356
357 let mut should_transmit = self.streams.queue_max_stream_id(self.pending);
361
362 if let ChunksState::Readable(mut rs) = state {
364 let (_, max_stream_data) = rs.max_stream_data(self.streams.stream_receive_window);
365 should_transmit |= max_stream_data.0;
366 if max_stream_data.0 {
367 self.pending.max_stream_data.insert(self.id);
368 }
369 self.streams
371 .recv
372 .insert(self.id, Some(StreamRecv::Open(rs)));
373 }
374
375 let max_data = self.streams.add_read_credits(self.read);
377 self.pending.max_data |= max_data.0;
378 should_transmit |= max_data.0;
379 ShouldTransmit(should_transmit)
380 }
381}
382
383impl Drop for Chunks<'_> {
384 fn drop(&mut self) {
385 let _ = self.finalize_inner();
386 }
387}
388
389enum ChunksState {
390 Readable(Box<Recv>),
391 Reset(VarInt),
392 Finished,
393 Finalized,
394}
395
396#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
398pub enum ReadError {
399 #[error("blocked")]
404 Blocked,
405 #[error("reset by peer: code {0}")]
409 Reset(VarInt),
410}
411
412#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
414pub enum ReadableError {
415 #[error("closed stream")]
417 ClosedStream,
418 #[error("ordered read after unordered read")]
423 IllegalOrderedRead,
424}
425
426impl From<IllegalOrderedRead> for ReadableError {
427 fn from(_: IllegalOrderedRead) -> Self {
428 Self::IllegalOrderedRead
429 }
430}
431
432#[derive(Debug, Copy, Clone, Eq, PartialEq)]
433enum RecvState {
434 Recv { size: Option<u64> },
435 ResetRecvd { size: u64, error_code: VarInt },
436}
437
438impl Default for RecvState {
439 fn default() -> Self {
440 Self::Recv { size: None }
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use bytes::Bytes;
447
448 use crate::{Dir, Side};
449
450 use super::*;
451
452 #[test]
453 fn reordered_frames_while_stopped() {
454 const INITIAL_BYTES: u64 = 3;
455 const INITIAL_OFFSET: u64 = 3;
456 const RECV_WINDOW: u64 = 8;
457 let mut s = Recv::new(RECV_WINDOW);
458 let mut data_recvd = 0;
459 let (new_bytes, is_closed) = s
461 .ingest(
462 frame::Stream {
463 id: StreamId::new(Side::Client, Dir::Uni, 0),
464 offset: INITIAL_OFFSET,
465 fin: false,
466 data: Bytes::from_static(&[0; INITIAL_BYTES as usize]),
467 },
468 123,
469 data_recvd,
470 data_recvd + 1024,
471 )
472 .unwrap();
473 data_recvd += new_bytes;
474 assert_eq!(new_bytes, INITIAL_OFFSET + INITIAL_BYTES);
475 assert!(!is_closed);
476
477 let (credits, transmit) = s.stop().unwrap();
478 assert!(transmit.should_transmit());
479 assert_eq!(
480 credits,
481 INITIAL_OFFSET + INITIAL_BYTES,
482 "full connection flow control credit is issued by stop"
483 );
484
485 let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
486 assert!(!transmit.should_transmit());
487 assert_eq!(
488 max_stream_data, RECV_WINDOW,
489 "stream flow control credit isn't issued by stop"
490 );
491
492 let (new_bytes, is_closed) = s
494 .ingest(
495 frame::Stream {
496 id: StreamId::new(Side::Client, Dir::Uni, 0),
497 offset: RECV_WINDOW - 1,
498 fin: false,
499 data: Bytes::from_static(&[0; 1]),
500 },
501 123,
502 data_recvd,
503 data_recvd + 1024,
504 )
505 .unwrap();
506 data_recvd += new_bytes;
507 assert_eq!(new_bytes, RECV_WINDOW - (INITIAL_OFFSET + INITIAL_BYTES));
508 assert!(!is_closed);
509
510 let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
511 assert!(!transmit.should_transmit());
512 assert_eq!(
513 max_stream_data, RECV_WINDOW,
514 "stream flow control credit isn't issued after stop"
515 );
516
517 let (new_bytes, is_closed) = s
519 .ingest(
520 frame::Stream {
521 id: StreamId::new(Side::Client, Dir::Uni, 0),
522 offset: 0,
523 fin: false,
524 data: Bytes::from_static(&[0; INITIAL_OFFSET as usize]),
525 },
526 123,
527 data_recvd,
528 data_recvd + 1024,
529 )
530 .unwrap();
531 assert_eq!(
532 new_bytes, 0,
533 "reordered frames don't issue connection-level flow control for stopped streams"
534 );
535 assert!(!is_closed);
536
537 let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
538 assert!(!transmit.should_transmit());
539 assert_eq!(
540 max_stream_data, RECV_WINDOW,
541 "stream flow control credit isn't issued after stop"
542 );
543 }
544}