iroh_quinn_proto/connection/
send_buffer.rs

1use std::{collections::VecDeque, ops::Range};
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4
5use crate::{VarInt, range_set::ArrayRangeSet};
6
7/// Buffer of outgoing retransmittable stream data
8#[derive(Default, Debug)]
9pub(super) struct SendBuffer {
10    /// Data queued by the application that has to be retained for resends.
11    ///
12    /// Only data up to the highest contiguous acknowledged offset can be discarded.
13    /// We could discard acknowledged in this buffer, but it would require a more
14    /// complex data structure. Instead, we track acknowledged ranges in `acks`.
15    ///
16    /// Data keeps track of the base offset of the buffered data.
17    data: SendBufferData,
18    /// The first offset that hasn't been sent even once
19    ///
20    /// Always lies in `data.range()`
21    unsent: u64,
22    /// Acknowledged ranges which couldn't be discarded yet as they don't include the earliest
23    /// offset in `unacked`
24    ///
25    /// All ranges must be within `data.range().start..(data.range().end - unsent)`, since data
26    /// that has never been sent can't be acknowledged.
27    // TODO: Recover storage from these by compacting (#700)
28    acks: ArrayRangeSet,
29    /// Previously transmitted ranges deemed lost and marked for retransmission
30    ///
31    /// All ranges must be within `data.range().start..(data.range().end - unsent)`, since data
32    /// that has never been sent can't be retransmitted.
33    ///
34    /// This should usually not overlap with `acks`, but this is not strictly enforced.
35    retransmits: ArrayRangeSet,
36}
37
38/// Maximum number of bytes to combine into a single segment
39///
40/// Any segment larger than this will be stored as-is, possibly triggering a flush of the buffer.
41const MAX_COMBINE: usize = 1024;
42
43/// This is where the data of the send buffer lives. It supports appending at the end,
44/// removing from the front, and retrieving data by range.
45#[derive(Default, Debug)]
46struct SendBufferData {
47    /// Start offset of the buffered data
48    offset: u64,
49    /// Total size of `buffered_segments`
50    len: usize,
51    /// Buffered data segments
52    segments: VecDeque<Bytes>,
53    /// Last segment, possibly empty
54    last_segment: BytesMut,
55}
56
57impl SendBufferData {
58    /// Total size of buffered data
59    fn len(&self) -> usize {
60        self.len
61    }
62
63    /// Range of buffered data
64    #[inline(always)]
65    fn range(&self) -> Range<u64> {
66        self.offset..self.offset + self.len as u64
67    }
68
69    /// Append data to the end of the buffer
70    fn append(&mut self, data: Bytes) {
71        self.len += data.len();
72        if data.len() > MAX_COMBINE {
73            // use in place
74            if !self.last_segment.is_empty() {
75                self.segments.push_back(self.last_segment.split().freeze());
76            }
77            self.segments.push_back(data);
78        } else {
79            // copy
80            if self.last_segment.len() + data.len() > MAX_COMBINE && !self.last_segment.is_empty() {
81                self.segments.push_back(self.last_segment.split().freeze());
82            }
83            self.last_segment.extend_from_slice(&data);
84        }
85    }
86
87    /// Discard data from the front of the buffer
88    ///
89    /// Calling this with n > len() is allowed and will simply clear the buffer.
90    fn pop_front(&mut self, n: usize) {
91        let mut n = n.min(self.len);
92        self.len -= n;
93        self.offset += n as u64;
94        while n > 0 {
95            // segments is empty, which leaves only last_segment
96            let Some(front) = self.segments.front_mut() else {
97                break;
98            };
99            if front.len() <= n {
100                // Remove the whole front segment
101                n -= front.len();
102                self.segments.pop_front();
103            } else {
104                // Advance within the front segment
105                front.advance(n);
106                n = 0;
107            }
108        }
109        // the rest has to be in the last segment
110        self.last_segment.advance(n);
111        // shrink segments if we have a lot of unused capacity
112        if self.segments.len() * 4 < self.segments.capacity() {
113            self.segments.shrink_to_fit();
114        }
115    }
116
117    /// Iterator over all segments in order
118    ///
119    /// Concatenates `segments` and `last_segment` so they can be handled uniformly
120    fn segments_iter(&self) -> impl Iterator<Item = &[u8]> {
121        self.segments
122            .iter()
123            .map(|x| x.as_ref())
124            .chain(std::iter::once(self.last_segment.as_ref()))
125    }
126
127    /// Returns data which is associated with a range
128    ///
129    /// Requesting a range outside of the buffered data will panic.
130    #[cfg(any(test, feature = "bench"))]
131    fn get(&self, offsets: Range<u64>) -> &[u8] {
132        assert!(
133            offsets.start >= self.range().start && offsets.end <= self.range().end,
134            "Requested range is outside of buffered data"
135        );
136        // translate to segment-relative offsets and usize
137        let offsets = Range {
138            start: (offsets.start - self.offset) as usize,
139            end: (offsets.end - self.offset) as usize,
140        };
141        let mut segment_offset = 0;
142        for segment in self.segments_iter() {
143            if offsets.start >= segment_offset && offsets.start < segment_offset + segment.len() {
144                let start = offsets.start - segment_offset;
145                let end = offsets.end - segment_offset;
146
147                return &segment[start..end.min(segment.len())];
148            }
149            segment_offset += segment.len();
150        }
151
152        unreachable!("impossible if segments and range are consistent");
153    }
154
155    fn get_into(&self, offsets: Range<u64>, buf: &mut impl BufMut) {
156        assert!(
157            offsets.start >= self.range().start && offsets.end <= self.range().end,
158            "Requested range is outside of buffered data"
159        );
160        // translate to segment-relative offsets and usize
161        let offsets = Range {
162            start: (offsets.start - self.offset) as usize,
163            end: (offsets.end - self.offset) as usize,
164        };
165        let mut segment_offset = 0;
166        for segment in self.segments_iter() {
167            // intersect segment range with requested range
168            let start = segment_offset.max(offsets.start);
169            let end = (segment_offset + segment.len()).min(offsets.end);
170            if start < end {
171                // slice range intersects with requested range
172                buf.put_slice(&segment[start - segment_offset..end - segment_offset]);
173            }
174            segment_offset += segment.len();
175            if segment_offset >= offsets.end {
176                // we are beyond the requested range
177                break;
178            }
179        }
180    }
181
182    #[cfg(test)]
183    fn to_vec(&self) -> Vec<u8> {
184        let mut result = Vec::with_capacity(self.len);
185        for segment in self.segments_iter() {
186            result.extend_from_slice(segment);
187        }
188        result
189    }
190}
191
192impl SendBuffer {
193    /// Construct an empty buffer at the initial offset
194    pub(super) fn new() -> Self {
195        Self::default()
196    }
197
198    /// Append application data to the end of the stream
199    pub(super) fn write(&mut self, data: Bytes) {
200        self.data.append(data);
201    }
202
203    /// Discard a range of acknowledged stream data
204    pub(super) fn ack(&mut self, mut range: Range<u64>) {
205        // Clamp the range to data which is still tracked
206        let base_offset = self.data.range().start;
207        range.start = base_offset.max(range.start);
208        range.end = base_offset.max(range.end);
209
210        self.acks.insert(range);
211
212        while self.acks.min() == Some(self.data.range().start) {
213            let prefix = self.acks.pop_min().unwrap();
214            let to_advance = (prefix.end - prefix.start) as usize;
215            self.data.pop_front(to_advance);
216        }
217    }
218
219    /// Compute the next range to transmit on this stream and update state to account for that
220    /// transmission.
221    ///
222    /// `max_len` here includes the space which is available to transmit the
223    /// offset and length of the data to send. The caller has to guarantee that
224    /// there is at least enough space available to write maximum-sized metadata
225    /// (8 byte offset + 8 byte length).
226    ///
227    /// The method returns a tuple:
228    /// - The first return value indicates the range of data to send
229    /// - The second return value indicates whether the length needs to be encoded
230    ///   in the STREAM frames metadata (`true`), or whether it can be omitted
231    ///   since the selected range will fill the whole packet.
232    pub(super) fn poll_transmit(&mut self, mut max_len: usize) -> (Range<u64>, bool) {
233        debug_assert!(max_len >= 8 + 8);
234        let mut encode_length = false;
235
236        if let Some(range) = self.retransmits.pop_min() {
237            // Retransmit sent data
238
239            // When the offset is known, we know how many bytes are required to encode it.
240            // Offset 0 requires no space
241            if range.start != 0 {
242                max_len -= VarInt::size(unsafe { VarInt::from_u64_unchecked(range.start) });
243            }
244            if range.end - range.start < max_len as u64 {
245                encode_length = true;
246                max_len -= 8;
247            }
248
249            let end = range.end.min((max_len as u64).saturating_add(range.start));
250            if end != range.end {
251                self.retransmits.insert(end..range.end);
252            }
253            return (range.start..end, encode_length);
254        }
255
256        // Transmit new data
257
258        // When the offset is known, we know how many bytes are required to encode it.
259        // Offset 0 requires no space
260        if self.unsent != 0 {
261            max_len -= VarInt::size(unsafe { VarInt::from_u64_unchecked(self.unsent) });
262        }
263        if self.offset() - self.unsent < max_len as u64 {
264            encode_length = true;
265            max_len -= 8;
266        }
267
268        let end = self
269            .offset()
270            .min((max_len as u64).saturating_add(self.unsent));
271        let result = self.unsent..end;
272        self.unsent = end;
273        (result, encode_length)
274    }
275
276    /// Returns data which is associated with a range
277    ///
278    /// This function can return a subset of the range, if the data is stored
279    /// in noncontiguous fashion in the send buffer. In this case callers
280    /// should call the function again with an incremented start offset to
281    /// retrieve more data.
282    #[cfg(any(test, feature = "bench"))]
283    pub(super) fn get(&self, offsets: Range<u64>) -> &[u8] {
284        self.data.get(offsets)
285    }
286
287    pub(super) fn get_into(&self, offsets: Range<u64>, buf: &mut impl BufMut) {
288        self.data.get_into(offsets, buf)
289    }
290
291    /// Queue a range of sent but unacknowledged data to be retransmitted
292    pub(super) fn retransmit(&mut self, range: Range<u64>) {
293        debug_assert!(range.end <= self.unsent, "unsent data can't be lost");
294        self.retransmits.insert(range);
295    }
296
297    pub(super) fn retransmit_all_for_0rtt(&mut self) {
298        debug_assert_eq!(self.offset(), self.data.len() as u64);
299        self.unsent = 0;
300    }
301
302    /// First stream offset unwritten by the application, i.e. the offset that the next write will
303    /// begin at
304    pub(super) fn offset(&self) -> u64 {
305        self.data.range().end
306    }
307
308    /// Whether all sent data has been acknowledged
309    pub(super) fn is_fully_acked(&self) -> bool {
310        self.data.len() == 0
311    }
312
313    /// Whether there's data to send
314    ///
315    /// There may be sent unacknowledged data even when this is false.
316    pub(super) fn has_unsent_data(&self) -> bool {
317        self.unsent != self.offset() || !self.retransmits.is_empty()
318    }
319
320    /// Compute the amount of data that hasn't been acknowledged
321    pub(super) fn unacked(&self) -> u64 {
322        self.data.len() as u64 - self.acks.iter().map(|x| x.end - x.start).sum::<u64>()
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn fragment_with_length() {
332        let mut buf = SendBuffer::new();
333        const MSG: &[u8] = b"Hello, world!";
334        buf.write(MSG.into());
335        // 0 byte offset => 19 bytes left => 13 byte data isn't enough
336        // with 8 bytes reserved for length 11 payload bytes will fit
337        assert_eq!(buf.poll_transmit(19), (0..11, true));
338        assert_eq!(
339            buf.poll_transmit(MSG.len() + 16 - 11),
340            (11..MSG.len() as u64, true)
341        );
342        assert_eq!(
343            buf.poll_transmit(58),
344            (MSG.len() as u64..MSG.len() as u64, true)
345        );
346    }
347
348    #[test]
349    fn fragment_without_length() {
350        let mut buf = SendBuffer::new();
351        const MSG: &[u8] = b"Hello, world with some extra data!";
352        buf.write(MSG.into());
353        // 0 byte offset => 19 bytes left => can be filled by 34 bytes payload
354        assert_eq!(buf.poll_transmit(19), (0..19, false));
355        assert_eq!(
356            buf.poll_transmit(MSG.len() - 19 + 1),
357            (19..MSG.len() as u64, false)
358        );
359        assert_eq!(
360            buf.poll_transmit(58),
361            (MSG.len() as u64..MSG.len() as u64, true)
362        );
363    }
364
365    #[test]
366    fn reserves_encoded_offset() {
367        let mut buf = SendBuffer::new();
368
369        // Pretend we have more than 1 GB of data in the buffer
370        let chunk: Bytes = Bytes::from_static(&[0; 1024 * 1024]);
371        for _ in 0..1025 {
372            buf.write(chunk.clone());
373        }
374
375        const SIZE1: u64 = 64;
376        const SIZE2: u64 = 16 * 1024;
377        const SIZE3: u64 = 1024 * 1024 * 1024;
378
379        // Offset 0 requires no space
380        assert_eq!(buf.poll_transmit(16), (0..16, false));
381        buf.retransmit(0..16);
382        assert_eq!(buf.poll_transmit(16), (0..16, false));
383        let mut transmitted = 16u64;
384
385        // Offset 16 requires 1 byte
386        assert_eq!(
387            buf.poll_transmit((SIZE1 - transmitted + 1) as usize),
388            (transmitted..SIZE1, false)
389        );
390        buf.retransmit(transmitted..SIZE1);
391        assert_eq!(
392            buf.poll_transmit((SIZE1 - transmitted + 1) as usize),
393            (transmitted..SIZE1, false)
394        );
395        transmitted = SIZE1;
396
397        // Offset 64 requires 2 bytes
398        assert_eq!(
399            buf.poll_transmit((SIZE2 - transmitted + 2) as usize),
400            (transmitted..SIZE2, false)
401        );
402        buf.retransmit(transmitted..SIZE2);
403        assert_eq!(
404            buf.poll_transmit((SIZE2 - transmitted + 2) as usize),
405            (transmitted..SIZE2, false)
406        );
407        transmitted = SIZE2;
408
409        // Offset 16384 requires requires 4 bytes
410        assert_eq!(
411            buf.poll_transmit((SIZE3 - transmitted + 4) as usize),
412            (transmitted..SIZE3, false)
413        );
414        buf.retransmit(transmitted..SIZE3);
415        assert_eq!(
416            buf.poll_transmit((SIZE3 - transmitted + 4) as usize),
417            (transmitted..SIZE3, false)
418        );
419        transmitted = SIZE3;
420
421        // Offset 1GB requires 8 bytes
422        assert_eq!(
423            buf.poll_transmit(chunk.len() + 8),
424            (transmitted..transmitted + chunk.len() as u64, false)
425        );
426        buf.retransmit(transmitted..transmitted + chunk.len() as u64);
427        assert_eq!(
428            buf.poll_transmit(chunk.len() + 8),
429            (transmitted..transmitted + chunk.len() as u64, false)
430        );
431    }
432
433    #[test]
434    #[ignore]
435    fn multiple_segments() {
436        let mut buf = SendBuffer::new();
437        const MSG: &[u8] = b"Hello, world!";
438        const MSG_LEN: u64 = MSG.len() as u64;
439
440        const SEG1: &[u8] = b"He";
441        buf.write(SEG1.into());
442        const SEG2: &[u8] = b"llo,";
443        buf.write(SEG2.into());
444        const SEG3: &[u8] = b" w";
445        buf.write(SEG3.into());
446        const SEG4: &[u8] = b"o";
447        buf.write(SEG4.into());
448        const SEG5: &[u8] = b"rld!";
449        buf.write(SEG5.into());
450
451        assert_eq!(aggregate_unacked(&buf), MSG);
452
453        assert_eq!(buf.poll_transmit(16), (0..8, true));
454        assert_eq!(buf.get(0..5), SEG1);
455        assert_eq!(buf.get(2..8), SEG2);
456        assert_eq!(buf.get(6..8), SEG3);
457
458        assert_eq!(buf.poll_transmit(16), (8..MSG_LEN, true));
459        assert_eq!(buf.get(8..MSG_LEN), SEG4);
460        assert_eq!(buf.get(9..MSG_LEN), SEG5);
461
462        assert_eq!(buf.poll_transmit(42), (MSG_LEN..MSG_LEN, true));
463
464        // Now drain the segments
465        buf.ack(0..1);
466        assert_eq!(aggregate_unacked(&buf), &MSG[1..]);
467        buf.ack(0..3);
468        assert_eq!(aggregate_unacked(&buf), &MSG[3..]);
469        buf.ack(3..5);
470        assert_eq!(aggregate_unacked(&buf), &MSG[5..]);
471        buf.ack(7..9);
472        assert_eq!(aggregate_unacked(&buf), &MSG[5..]);
473        buf.ack(4..7);
474        assert_eq!(aggregate_unacked(&buf), &MSG[9..]);
475        buf.ack(0..MSG_LEN);
476        assert_eq!(aggregate_unacked(&buf), &[] as &[u8]);
477    }
478
479    #[test]
480    fn retransmit() {
481        let mut buf = SendBuffer::new();
482        const MSG: &[u8] = b"Hello, world with extra data!";
483        buf.write(MSG.into());
484        // Transmit two frames
485        assert_eq!(buf.poll_transmit(16), (0..16, false));
486        assert_eq!(buf.poll_transmit(16), (16..23, true));
487        // Lose the first, but not the second
488        buf.retransmit(0..16);
489        // Ensure we only retransmit the lost frame, then continue sending fresh data
490        assert_eq!(buf.poll_transmit(16), (0..16, false));
491        assert_eq!(buf.poll_transmit(16), (23..MSG.len() as u64, true));
492        // Lose the second frame
493        buf.retransmit(16..23);
494        assert_eq!(buf.poll_transmit(16), (16..23, true));
495    }
496
497    #[test]
498    fn ack() {
499        let mut buf = SendBuffer::new();
500        const MSG: &[u8] = b"Hello, world!";
501        buf.write(MSG.into());
502        assert_eq!(buf.poll_transmit(16), (0..8, true));
503        buf.ack(0..8);
504        assert_eq!(aggregate_unacked(&buf), &MSG[8..]);
505    }
506
507    #[test]
508    fn reordered_ack() {
509        let mut buf = SendBuffer::new();
510        const MSG: &[u8] = b"Hello, world with extra data!";
511        buf.write(MSG.into());
512        assert_eq!(buf.poll_transmit(16), (0..16, false));
513        assert_eq!(buf.poll_transmit(16), (16..23, true));
514        buf.ack(16..23);
515        assert_eq!(aggregate_unacked(&buf), MSG);
516        buf.ack(0..16);
517        assert_eq!(aggregate_unacked(&buf), &MSG[23..]);
518        assert!(buf.acks.is_empty());
519    }
520
521    fn aggregate_unacked(buf: &SendBuffer) -> Vec<u8> {
522        buf.data.to_vec()
523    }
524
525    #[test]
526    #[should_panic(expected = "Requested range is outside of buffered data")]
527    fn send_buffer_get_out_of_range() {
528        let data = SendBufferData::default();
529        data.get(0..1);
530    }
531
532    #[test]
533    #[should_panic(expected = "Requested range is outside of buffered data")]
534    fn send_buffer_get_into_out_of_range() {
535        let data = SendBufferData::default();
536        let mut buf = Vec::new();
537        data.get_into(0..1, &mut buf);
538    }
539}
540
541#[cfg(feature = "bench")]
542pub mod send_buffer_benches {
543    //! Bench fns for SendBuffer
544    //!
545    //! These are defined here and re-exported via `bench_exports` in lib.rs,
546    //! so we can access the private `SendBuffer` struct.
547    use criterion::Criterion;
548    use bytes::Bytes;
549    use super::SendBuffer;
550
551    /// Pathological case: many segments, get from end
552    pub fn get_into_many_segments(criterion: &mut Criterion) {
553        let mut group = criterion.benchmark_group("get_into_many_segments");
554        let mut buf = SendBuffer::new();
555
556        const SEGMENTS: u64 = 10000;
557        const SEGMENT_SIZE: u64 = 10;
558        const PACKET_SIZE: u64 = 1200;
559        const BYTES: u64 = SEGMENTS * SEGMENT_SIZE;
560
561        // 10000 segments of 10 bytes each = 100KB total (same data size)
562        for i in 0..SEGMENTS {
563            buf.write(Bytes::from(vec![i as u8; SEGMENT_SIZE as usize]));
564        }
565
566        let mut tgt = Vec::with_capacity(PACKET_SIZE as usize);
567        group.bench_function("get_into", |b| {
568            b.iter(|| {
569                // Get from end (very slow - scans through all 1000 segments)
570                tgt.clear();
571                buf.get_into(BYTES - PACKET_SIZE..BYTES, std::hint::black_box(&mut tgt));
572            });
573        });
574    }
575
576    /// Get segments in the old way, using a loop of get calls
577    pub fn get_loop_many_segments(criterion: &mut Criterion) {
578        let mut group = criterion.benchmark_group("get_loop_many_segments");
579        let mut buf = SendBuffer::new();
580
581        const SEGMENTS: u64 = 10000;
582        const SEGMENT_SIZE: u64 = 10;
583        const PACKET_SIZE: u64 = 1200;
584        const BYTES: u64 = SEGMENTS * SEGMENT_SIZE;
585
586        // 10000 segments of 10 bytes each = 100KB total (same data size)
587        for i in 0..SEGMENTS {
588            buf.write(Bytes::from(vec![i as u8; SEGMENT_SIZE as usize]));
589        }
590
591        let mut tgt = Vec::with_capacity(PACKET_SIZE as usize);
592        group.bench_function("get_loop", |b| {
593            b.iter(|| {
594                // Get from end (very slow - scans through all 1000 segments)
595                tgt.clear();
596                let mut range = BYTES - PACKET_SIZE..BYTES;
597                while range.start < range.end {
598                    let slice = std::hint::black_box(buf.get(range.clone()));
599                    range.start += slice.len() as u64;
600                    tgt.extend_from_slice(slice);
601                }
602            });
603        });
604    }
605}