iroh_quinn_proto/connection/streams/
send.rs

1use bytes::Bytes;
2use thiserror::Error;
3
4use crate::{
5    VarInt,
6    connection::{send_buffer::SendBuffer, streams::BytesOrSlice},
7    frame,
8};
9
10#[derive(Debug)]
11pub(super) struct Send {
12    pub(super) max_data: u64,
13    pub(super) state: SendState,
14    pub(super) pending: SendBuffer,
15    pub(super) priority: i32,
16    /// Whether a frame containing a FIN bit must be transmitted, even if we don't have any new data
17    pub(super) fin_pending: bool,
18    /// Whether this stream is in the `connection_blocked` list of `Streams`
19    pub(super) connection_blocked: bool,
20    /// The reason the peer wants us to stop, if `STOP_SENDING` was received
21    pub(super) stop_reason: Option<VarInt>,
22}
23
24impl Send {
25    pub(super) fn new(max_data: VarInt) -> Box<Self> {
26        Box::new(Self {
27            max_data: max_data.into(),
28            state: SendState::Ready,
29            pending: SendBuffer::new(),
30            priority: 0,
31            fin_pending: false,
32            connection_blocked: false,
33            stop_reason: None,
34        })
35    }
36
37    /// Whether the stream has been reset
38    pub(super) fn is_reset(&self) -> bool {
39        matches!(self.state, SendState::ResetSent)
40    }
41
42    pub(super) fn finish(&mut self) -> Result<(), FinishError> {
43        if let Some(error_code) = self.stop_reason {
44            Err(FinishError::Stopped(error_code))
45        } else if self.state == SendState::Ready {
46            self.state = SendState::DataSent {
47                finish_acked: false,
48            };
49            self.fin_pending = true;
50            Ok(())
51        } else {
52            Err(FinishError::ClosedStream)
53        }
54    }
55
56    pub(super) fn write<'a, S: BytesSource<'a>>(
57        &mut self,
58        source: &'a mut S,
59        limit: u64,
60    ) -> Result<Written, WriteError> {
61        if !self.is_writable() {
62            return Err(WriteError::ClosedStream);
63        }
64        if let Some(error_code) = self.stop_reason {
65            return Err(WriteError::Stopped(error_code));
66        }
67        let budget = self.max_data - self.pending.offset();
68        if budget == 0 {
69            return Err(WriteError::Blocked);
70        }
71        let mut limit = limit.min(budget) as usize;
72
73        let mut result = Written::default();
74        loop {
75            let (chunk, chunks_consumed) = source.pop_chunk(limit);
76            result.chunks += chunks_consumed;
77            result.bytes += chunk.len();
78
79            if chunk.is_empty() {
80                break;
81            }
82
83            limit -= chunk.len();
84            self.pending.write(chunk);
85        }
86
87        Ok(result)
88    }
89
90    /// Update stream state due to a reset sent by the local application
91    pub(super) fn reset(&mut self) {
92        use SendState::*;
93        if let DataSent { .. } | Ready = self.state {
94            self.state = ResetSent;
95        }
96    }
97
98    /// Handle STOP_SENDING
99    ///
100    /// Returns true if the stream was stopped due to this frame, and false
101    /// if it had been stopped before
102    pub(super) fn try_stop(&mut self, error_code: VarInt) -> bool {
103        if self.stop_reason.is_none() {
104            self.stop_reason = Some(error_code);
105            true
106        } else {
107            false
108        }
109    }
110
111    /// Returns whether the stream has been finished and all data has been acknowledged by the peer
112    pub(super) fn ack(&mut self, frame: frame::StreamMeta) -> bool {
113        self.pending.ack(frame.offsets);
114        match self.state {
115            SendState::DataSent {
116                ref mut finish_acked,
117            } => {
118                *finish_acked |= frame.fin;
119                *finish_acked && self.pending.is_fully_acked()
120            }
121            _ => false,
122        }
123    }
124
125    /// Handle increase to stream-level flow control limit
126    ///
127    /// Returns whether the stream was unblocked
128    pub(super) fn increase_max_data(&mut self, offset: u64) -> bool {
129        if offset <= self.max_data || self.state != SendState::Ready {
130            return false;
131        }
132        let was_blocked = self.pending.offset() == self.max_data;
133        self.max_data = offset;
134        was_blocked
135    }
136
137    pub(super) fn offset(&self) -> u64 {
138        self.pending.offset()
139    }
140
141    pub(super) fn is_pending(&self) -> bool {
142        self.pending.has_unsent_data() || self.fin_pending
143    }
144
145    pub(super) fn is_writable(&self) -> bool {
146        matches!(self.state, SendState::Ready)
147    }
148}
149
150/// A [`BytesSource`] implementation for `&'a mut [Bytes]`
151///
152/// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to
153/// a configured limit.
154pub(crate) struct BytesArray<'a> {
155    /// The wrapped slice of `Bytes`
156    chunks: &'a mut [Bytes],
157    /// The amount of chunks consumed from this source
158    consumed: usize,
159}
160
161impl<'a> BytesArray<'a> {
162    pub(crate) fn from_chunks(chunks: &'a mut [Bytes]) -> Self {
163        Self {
164            chunks,
165            consumed: 0,
166        }
167    }
168}
169
170impl<'a> BytesSource<'a> for BytesArray<'a> {
171    fn pop_chunk<'b>(&'b mut self, limit: usize) -> (impl BytesOrSlice<'b>, usize)
172    where
173        'a: 'b,
174    {
175        // The loop exists to skip empty chunks while still marking them as
176        // consumed
177        let mut chunks_consumed = 0;
178
179        while self.consumed < self.chunks.len() {
180            let chunk = &mut self.chunks[self.consumed];
181
182            if chunk.len() <= limit {
183                let chunk = std::mem::take(chunk);
184                self.consumed += 1;
185                chunks_consumed += 1;
186                if chunk.is_empty() {
187                    continue;
188                }
189                return (chunk, chunks_consumed);
190            } else if limit > 0 {
191                let chunk = chunk.split_to(limit);
192                return (chunk, chunks_consumed);
193            } else {
194                break;
195            }
196        }
197
198        (Bytes::new(), chunks_consumed)
199    }
200}
201
202/// A [`BytesSource`] implementation for `&[u8]`
203///
204/// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily
205/// created from a reference. This allows to defer the allocation until it is
206/// known how much data needs to be copied.
207pub(crate) struct ByteSlice<'a> {
208    /// The wrapped byte slice
209    data: &'a [u8],
210}
211
212impl<'a> ByteSlice<'a> {
213    pub(crate) fn from_slice(data: &'a [u8]) -> Self {
214        Self { data }
215    }
216}
217
218impl<'a> BytesSource<'a> for ByteSlice<'a> {
219    fn pop_chunk<'b>(&'b mut self, limit: usize) -> (impl BytesOrSlice<'b>, usize)
220    where
221        'a: 'b,
222    {
223        let limit = limit.min(self.data.len());
224        if limit == 0 {
225            return (&[][..], 0);
226        }
227
228        let chunk = &self.data[..limit];
229        self.data = &self.data[chunk.len()..];
230
231        let chunks_consumed = usize::from(self.data.is_empty());
232        (chunk, chunks_consumed)
233    }
234}
235
236/// A source of one or more buffers which can be converted into `Bytes` buffers on demand
237///
238/// The purpose of this data type is to defer conversion as long as possible,
239/// so that no heap allocation is required in case no data is writable.
240pub(super) trait BytesSource<'a> {
241    /// Returns the next chunk from the source of owned chunks.
242    ///
243    /// This method will consume parts of the source.
244    /// Calling it will yield `Bytes` elements up to the configured `limit`.
245    ///
246    /// The method returns a tuple:
247    /// - The first item is the yielded `Bytes` element. The element will be
248    ///   empty if the limit is zero or no more data is available.
249    /// - The second item returns how many complete chunks inside the source had
250    ///   had been consumed. This can be less than 1, if a chunk inside the
251    ///   source had been truncated in order to adhere to the limit. It can also
252    ///   be more than 1, if zero-length chunks had been skipped.
253    fn pop_chunk<'b>(&'b mut self, limit: usize) -> (impl BytesOrSlice<'b>, usize)
254    where
255        'a: 'b;
256}
257
258/// Indicates how many bytes and chunks had been transferred in a write operation
259#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
260pub struct Written {
261    /// The amount of bytes which had been written
262    pub bytes: usize,
263    /// The amount of full chunks which had been written
264    ///
265    /// If a chunk was only partially written, it will not be counted by this field.
266    pub chunks: usize,
267}
268
269/// Errors triggered while writing to a send stream
270#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
271pub enum WriteError {
272    /// The peer is not able to accept additional data, or the connection is congested.
273    ///
274    /// If the peer issues additional flow control credit, a [`StreamEvent::Writable`] event will
275    /// be generated, indicating that retrying the write might succeed.
276    ///
277    /// [`StreamEvent::Writable`]: crate::StreamEvent::Writable
278    #[error("unable to accept further writes")]
279    Blocked,
280    /// The peer is no longer accepting data on this stream, and it has been implicitly reset. The
281    /// stream cannot be finished or further written to.
282    ///
283    /// Carries an application-defined error code.
284    ///
285    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
286    #[error("stopped by peer: code {0}")]
287    Stopped(VarInt),
288    /// The stream has not been opened or has already been finished or reset
289    #[error("closed stream")]
290    ClosedStream,
291}
292
293#[derive(Debug, Copy, Clone, Eq, PartialEq)]
294pub(super) enum SendState {
295    /// Sending new data
296    Ready,
297    /// Stream was finished; now sending retransmits only
298    DataSent { finish_acked: bool },
299    /// Sent RESET
300    ResetSent,
301}
302
303/// Reasons why attempting to finish a stream might fail
304#[derive(Debug, Error, Clone, PartialEq, Eq)]
305pub enum FinishError {
306    /// The peer is no longer accepting data on this stream. No
307    /// [`StreamEvent::Finished`] event will be emitted for this stream.
308    ///
309    /// Carries an application-defined error code.
310    ///
311    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
312    #[error("stopped by peer: code {0}")]
313    Stopped(VarInt),
314    /// The stream has not been opened or was already finished or reset
315    #[error("closed stream")]
316    ClosedStream,
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322
323    #[test]
324    fn bytes_array() {
325        let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
326        for limit in 0..full.len() {
327            let mut chunks = [
328                Bytes::from_static(b""),
329                Bytes::from_static(b"Hello "),
330                Bytes::from_static(b"Wo"),
331                Bytes::from_static(b""),
332                Bytes::from_static(b"r"),
333                Bytes::from_static(b"ld"),
334                Bytes::from_static(b""),
335                Bytes::from_static(b" 12345678"),
336                Bytes::from_static(b"9 ABCDE"),
337                Bytes::from_static(b"F"),
338                Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"),
339            ];
340            let num_chunks = chunks.len();
341            let last_chunk_len = chunks[chunks.len() - 1].len();
342
343            let mut array = BytesArray::from_chunks(&mut chunks);
344
345            let mut buf = Vec::new();
346            let mut chunks_popped = 0;
347            let mut chunks_consumed = 0;
348            let mut remaining = limit;
349            loop {
350                let (chunk, consumed) = array.pop_chunk(remaining);
351                chunks_consumed += consumed;
352
353                if !chunk.is_empty() {
354                    buf.extend_from_slice(chunk.as_ref());
355                    remaining -= chunk.len();
356                    chunks_popped += 1;
357                } else {
358                    break;
359                }
360            }
361
362            assert_eq!(&buf[..], &full[..limit]);
363
364            if limit == full.len() {
365                // Full consumption of the last chunk
366                assert_eq!(chunks_consumed, num_chunks);
367                // Since there are empty chunks, we consume more than there are popped
368                assert_eq!(chunks_consumed, chunks_popped + 3);
369            } else if limit > full.len() - last_chunk_len {
370                // Partial consumption of the last chunk
371                assert_eq!(chunks_consumed, num_chunks - 1);
372                assert_eq!(chunks_consumed, chunks_popped + 2);
373            }
374        }
375    }
376
377    #[test]
378    fn byte_slice() {
379        let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
380        for limit in 0..full.len() {
381            let mut array = ByteSlice::from_slice(&full[..]);
382
383            let mut buf = Vec::new();
384            let mut chunks_popped = 0;
385            let mut chunks_consumed = 0;
386            let mut remaining = limit;
387            loop {
388                let (chunk, consumed) = array.pop_chunk(remaining);
389                chunks_consumed += consumed;
390
391                if !chunk.is_empty() {
392                    buf.extend_from_slice(chunk.as_ref());
393                    remaining -= chunk.len();
394                    chunks_popped += 1;
395                } else {
396                    break;
397                }
398            }
399
400            assert_eq!(&buf[..], &full[..limit]);
401            if limit != 0 {
402                assert_eq!(chunks_popped, 1);
403            } else {
404                assert_eq!(chunks_popped, 0);
405            }
406
407            if limit == full.len() {
408                assert_eq!(chunks_consumed, 1);
409            } else {
410                assert_eq!(chunks_consumed, 0);
411            }
412        }
413    }
414}