iroh_quinn_proto/connection/
assembler.rs

1use std::{
2    cmp::Ordering,
3    collections::{BinaryHeap, binary_heap::PeekMut},
4    mem,
5};
6
7use bytes::{Buf, Bytes, BytesMut};
8
9use crate::range_set::ArrayRangeSet;
10
11/// Helper to assemble unordered stream frames into an ordered stream
12#[derive(Debug, Default)]
13pub(super) struct Assembler {
14    state: State,
15    data: BinaryHeap<Buffer>,
16    /// Total number of buffered bytes, including duplicates in ordered mode.
17    buffered: usize,
18    /// Estimated number of allocated bytes, will never be less than `buffered`.
19    allocated: usize,
20    /// Number of bytes read by the application. When only ordered reads have been used, this is the
21    /// length of the contiguous prefix of the stream which has been consumed by the application,
22    /// aka the stream offset.
23    bytes_read: u64,
24    end: u64,
25}
26
27impl Assembler {
28    pub(super) fn new() -> Self {
29        Self::default()
30    }
31
32    /// Reset to the initial state
33    pub(super) fn reinit(&mut self) {
34        let old_data = mem::take(&mut self.data);
35        *self = Self::default();
36        self.data = old_data;
37        self.data.clear();
38    }
39
40    pub(super) fn ensure_ordering(&mut self, ordered: bool) -> Result<(), IllegalOrderedRead> {
41        if ordered && !self.state.is_ordered() {
42            return Err(IllegalOrderedRead);
43        } else if !ordered && self.state.is_ordered() {
44            // Enter unordered mode
45            if !self.data.is_empty() {
46                // Get rid of possible duplicates
47                self.defragment();
48            }
49            let mut recvd = ArrayRangeSet::new();
50            recvd.insert(0..self.bytes_read);
51            for chunk in &self.data {
52                recvd.insert(chunk.offset..chunk.offset + chunk.bytes.len() as u64);
53            }
54            self.state = State::Unordered { recvd };
55        }
56        Ok(())
57    }
58
59    /// Get the the next chunk
60    pub(super) fn read(&mut self, max_length: usize, ordered: bool) -> Option<Chunk> {
61        loop {
62            let mut chunk = self.data.peek_mut()?;
63
64            if ordered {
65                if chunk.offset > self.bytes_read {
66                    // Next chunk is after current read index
67                    return None;
68                } else if (chunk.offset + chunk.bytes.len() as u64) <= self.bytes_read {
69                    // Next chunk is useless as the read index is beyond its end
70                    self.buffered -= chunk.bytes.len();
71                    self.allocated -= chunk.allocation_size;
72                    PeekMut::pop(chunk);
73                    continue;
74                }
75
76                // Determine `start` and `len` of the slice of useful data in chunk
77                let start = (self.bytes_read - chunk.offset) as usize;
78                if start > 0 {
79                    chunk.bytes.advance(start);
80                    chunk.offset += start as u64;
81                    self.buffered -= start;
82                }
83            }
84
85            return Some(if max_length < chunk.bytes.len() {
86                self.bytes_read += max_length as u64;
87                let offset = chunk.offset;
88                chunk.offset += max_length as u64;
89                self.buffered -= max_length;
90                Chunk::new(offset, chunk.bytes.split_to(max_length))
91            } else {
92                self.bytes_read += chunk.bytes.len() as u64;
93                self.buffered -= chunk.bytes.len();
94                self.allocated -= chunk.allocation_size;
95                let chunk = PeekMut::pop(chunk);
96                Chunk::new(chunk.offset, chunk.bytes)
97            });
98        }
99    }
100
101    /// Copy fragmented chunk data to new chunks backed by a single buffer
102    ///
103    /// This makes sure we're not unnecessarily holding on to many larger allocations.
104    /// We merge contiguous chunks in the process of doing so.
105    fn defragment(&mut self) {
106        let new = BinaryHeap::with_capacity(self.data.len());
107        let old = mem::replace(&mut self.data, new);
108        let mut buffers = old.into_sorted_vec();
109        self.buffered = 0;
110        let mut fragmented_buffered = 0;
111        let mut offset = self.bytes_read;
112        for chunk in buffers.iter_mut().rev() {
113            chunk.try_mark_defragment(offset);
114            let size = chunk.bytes.len();
115            offset = chunk.offset + size as u64;
116            self.buffered += size;
117            if !chunk.defragmented {
118                fragmented_buffered += size;
119            }
120        }
121        self.allocated = self.buffered;
122        let mut buffer = BytesMut::with_capacity(fragmented_buffered);
123        let mut offset = self.bytes_read;
124        for chunk in buffers.into_iter().rev() {
125            if chunk.defragmented {
126                // bytes might be empty after try_mark_defragment
127                if !chunk.bytes.is_empty() {
128                    self.data.push(chunk);
129                }
130                continue;
131            }
132            // Overlap is resolved by try_mark_defragment
133            if chunk.offset != offset + (buffer.len() as u64) {
134                if !buffer.is_empty() {
135                    self.data
136                        .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
137                }
138                offset = chunk.offset;
139            }
140            buffer.extend_from_slice(&chunk.bytes);
141        }
142        if !buffer.is_empty() {
143            self.data
144                .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
145        }
146    }
147
148    // Note: If a packet contains many frames from the same stream, the estimated over-allocation
149    // will be much higher because we are counting the same allocation multiple times.
150    pub(super) fn insert(&mut self, mut offset: u64, mut bytes: Bytes, allocation_size: usize) {
151        debug_assert!(
152            bytes.len() <= allocation_size,
153            "allocation_size less than bytes.len(): {:?} < {:?}",
154            allocation_size,
155            bytes.len()
156        );
157        self.end = self.end.max(offset + bytes.len() as u64);
158        if let State::Unordered { ref mut recvd } = self.state {
159            // Discard duplicate data
160            let range = offset..offset + bytes.len() as u64;
161            for duplicate in recvd.iter_range(range.clone()) {
162                if duplicate.start > offset {
163                    let buffer = Buffer::new(
164                        offset,
165                        bytes.split_to((duplicate.start - offset) as usize),
166                        allocation_size,
167                    );
168                    self.buffered += buffer.bytes.len();
169                    self.allocated += buffer.allocation_size;
170                    self.data.push(buffer);
171                    offset = duplicate.start;
172                }
173                bytes.advance((duplicate.end - offset) as usize);
174                offset = duplicate.end;
175            }
176            recvd.insert(range);
177        } else if offset < self.bytes_read {
178            if (offset + bytes.len() as u64) <= self.bytes_read {
179                return;
180            } else {
181                let diff = self.bytes_read - offset;
182                offset += diff;
183                bytes.advance(diff as usize);
184            }
185        }
186
187        if bytes.is_empty() {
188            return;
189        }
190        let buffer = Buffer::new(offset, bytes, allocation_size);
191        self.buffered += buffer.bytes.len();
192        self.allocated += buffer.allocation_size;
193        self.data.push(buffer);
194        // `self.buffered` also counts duplicate bytes, therefore we use
195        // `self.end - self.bytes_read` as an upper bound of buffered unique
196        // bytes. This will cause a defragmentation if the amount of duplicate
197        // bytes exceedes a proportion of the receive window size.
198        let buffered = self.buffered.min((self.end - self.bytes_read) as usize);
199        let over_allocation = self.allocated - buffered;
200        // Rationale: on the one hand, we want to defragment rarely, ideally never
201        // in non-pathological scenarios. However, a pathological or malicious
202        // peer could send us one-byte frames, and since we use reference-counted
203        // buffers in order to prevent copying, this could result in keeping a lot
204        // of memory allocated. This limits over-allocation in proportion to the
205        // buffered data. The constants are chosen somewhat arbitrarily and try to
206        // balance between defragmentation overhead and over-allocation.
207        let threshold = 32768.max(buffered * 3 / 2);
208        if over_allocation > threshold {
209            self.defragment()
210        }
211    }
212
213    /// Number of bytes consumed by the application
214    pub(super) fn bytes_read(&self) -> u64 {
215        self.bytes_read
216    }
217
218    /// Discard all buffered data
219    pub(super) fn clear(&mut self) {
220        self.data.clear();
221        self.buffered = 0;
222        self.allocated = 0;
223    }
224}
225
226/// A chunk of data from the receive stream
227#[derive(Debug, PartialEq, Eq)]
228pub struct Chunk {
229    /// The offset in the stream
230    pub offset: u64,
231    /// The contents of the chunk
232    pub bytes: Bytes,
233}
234
235impl Chunk {
236    fn new(offset: u64, bytes: Bytes) -> Self {
237        Self { offset, bytes }
238    }
239}
240
241#[derive(Debug, Eq)]
242struct Buffer {
243    offset: u64,
244    bytes: Bytes,
245    /// Size of the allocation behind `bytes`, if `defragmented == false`.
246    /// Otherwise this will be set to `bytes.len()` by `try_mark_defragment`.
247    /// Will never be less than `bytes.len()`.
248    allocation_size: usize,
249    defragmented: bool,
250}
251
252impl Buffer {
253    /// Constructs a new fragmented Buffer
254    fn new(offset: u64, bytes: Bytes, allocation_size: usize) -> Self {
255        Self {
256            offset,
257            bytes,
258            allocation_size,
259            defragmented: false,
260        }
261    }
262
263    /// Constructs a new defragmented Buffer
264    fn new_defragmented(offset: u64, bytes: Bytes) -> Self {
265        let allocation_size = bytes.len();
266        Self {
267            offset,
268            bytes,
269            allocation_size,
270            defragmented: true,
271        }
272    }
273
274    /// Discards data before `offset` and flags `self` as defragmented if it has good utilization
275    fn try_mark_defragment(&mut self, offset: u64) {
276        let duplicate = offset.saturating_sub(self.offset) as usize;
277        self.offset = self.offset.max(offset);
278        if duplicate >= self.bytes.len() {
279            // All bytes are duplicate
280            self.bytes = Bytes::new();
281            self.defragmented = true;
282            self.allocation_size = 0;
283            return;
284        }
285        self.bytes.advance(duplicate);
286        // Make sure that fragmented buffers with high utilization become defragmented and
287        // defragmented buffers remain defragmented
288        self.defragmented = self.defragmented || self.bytes.len() * 6 / 5 >= self.allocation_size;
289        if self.defragmented {
290            // Make sure that defragmented buffers do not contribute to over-allocation
291            self.allocation_size = self.bytes.len();
292        }
293    }
294}
295
296impl Ord for Buffer {
297    // Invert ordering based on offset (max-heap, min offset first),
298    // prioritize longer chunks at the same offset.
299    fn cmp(&self, other: &Self) -> Ordering {
300        self.offset
301            .cmp(&other.offset)
302            .reverse()
303            .then(self.bytes.len().cmp(&other.bytes.len()))
304    }
305}
306
307impl PartialOrd for Buffer {
308    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
309        Some(self.cmp(other))
310    }
311}
312
313impl PartialEq for Buffer {
314    fn eq(&self, other: &Self) -> bool {
315        (self.offset, self.bytes.len()) == (other.offset, other.bytes.len())
316    }
317}
318
319#[derive(Debug, Default)]
320enum State {
321    #[default]
322    Ordered,
323    Unordered {
324        /// The set of offsets that have been received from the peer, including portions not yet
325        /// read by the application.
326        recvd: ArrayRangeSet,
327    },
328}
329
330impl State {
331    fn is_ordered(&self) -> bool {
332        matches!(self, Self::Ordered)
333    }
334}
335
336/// Error indicating that an ordered read was performed on a stream after an unordered read
337#[derive(Debug)]
338pub struct IllegalOrderedRead;
339
340#[cfg(test)]
341mod test {
342    use super::*;
343    use assert_matches::assert_matches;
344
345    #[test]
346    fn assemble_ordered() {
347        let mut x = Assembler::new();
348        assert_matches!(next(&mut x, 32), None);
349        x.insert(0, Bytes::from_static(b"123"), 3);
350        assert_matches!(next(&mut x, 1), Some(ref y) if &y[..] == b"1");
351        assert_matches!(next(&mut x, 3), Some(ref y) if &y[..] == b"23");
352        x.insert(3, Bytes::from_static(b"456"), 3);
353        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
354        x.insert(6, Bytes::from_static(b"789"), 3);
355        x.insert(9, Bytes::from_static(b"10"), 2);
356        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"789");
357        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"10");
358        assert_matches!(next(&mut x, 32), None);
359    }
360
361    #[test]
362    fn assemble_unordered() {
363        let mut x = Assembler::new();
364        x.ensure_ordering(false).unwrap();
365        x.insert(3, Bytes::from_static(b"456"), 3);
366        assert_matches!(next(&mut x, 32), None);
367        x.insert(0, Bytes::from_static(b"123"), 3);
368        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
369        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
370        assert_matches!(next(&mut x, 32), None);
371    }
372
373    #[test]
374    fn assemble_duplicate() {
375        let mut x = Assembler::new();
376        x.insert(0, Bytes::from_static(b"123"), 3);
377        x.insert(0, Bytes::from_static(b"123"), 3);
378        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
379        assert_matches!(next(&mut x, 32), None);
380    }
381
382    #[test]
383    fn assemble_duplicate_compact() {
384        let mut x = Assembler::new();
385        x.insert(0, Bytes::from_static(b"123"), 3);
386        x.insert(0, Bytes::from_static(b"123"), 3);
387        x.defragment();
388        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
389        assert_matches!(next(&mut x, 32), None);
390    }
391
392    #[test]
393    fn assemble_contained() {
394        let mut x = Assembler::new();
395        x.insert(0, Bytes::from_static(b"12345"), 5);
396        x.insert(1, Bytes::from_static(b"234"), 3);
397        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
398        assert_matches!(next(&mut x, 32), None);
399    }
400
401    #[test]
402    fn assemble_contained_compact() {
403        let mut x = Assembler::new();
404        x.insert(0, Bytes::from_static(b"12345"), 5);
405        x.insert(1, Bytes::from_static(b"234"), 3);
406        x.defragment();
407        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
408        assert_matches!(next(&mut x, 32), None);
409    }
410
411    #[test]
412    fn assemble_contains() {
413        let mut x = Assembler::new();
414        x.insert(1, Bytes::from_static(b"234"), 3);
415        x.insert(0, Bytes::from_static(b"12345"), 5);
416        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
417        assert_matches!(next(&mut x, 32), None);
418    }
419
420    #[test]
421    fn assemble_contains_compact() {
422        let mut x = Assembler::new();
423        x.insert(1, Bytes::from_static(b"234"), 3);
424        x.insert(0, Bytes::from_static(b"12345"), 5);
425        x.defragment();
426        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
427        assert_matches!(next(&mut x, 32), None);
428    }
429
430    #[test]
431    fn assemble_overlapping() {
432        let mut x = Assembler::new();
433        x.insert(0, Bytes::from_static(b"123"), 3);
434        x.insert(1, Bytes::from_static(b"234"), 3);
435        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
436        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"4");
437        assert_matches!(next(&mut x, 32), None);
438    }
439
440    #[test]
441    fn assemble_overlapping_compact() {
442        let mut x = Assembler::new();
443        x.insert(0, Bytes::from_static(b"123"), 4);
444        x.insert(1, Bytes::from_static(b"234"), 4);
445        x.defragment();
446        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
447        assert_matches!(next(&mut x, 32), None);
448    }
449
450    #[test]
451    fn assemble_complex() {
452        let mut x = Assembler::new();
453        x.insert(0, Bytes::from_static(b"1"), 1);
454        x.insert(2, Bytes::from_static(b"3"), 1);
455        x.insert(4, Bytes::from_static(b"5"), 1);
456        x.insert(0, Bytes::from_static(b"123456"), 6);
457        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
458        assert_matches!(next(&mut x, 32), None);
459    }
460
461    #[test]
462    fn assemble_complex_compact() {
463        let mut x = Assembler::new();
464        x.insert(0, Bytes::from_static(b"1"), 1);
465        x.insert(2, Bytes::from_static(b"3"), 1);
466        x.insert(4, Bytes::from_static(b"5"), 1);
467        x.insert(0, Bytes::from_static(b"123456"), 6);
468        x.defragment();
469        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
470        assert_matches!(next(&mut x, 32), None);
471    }
472
473    #[test]
474    fn assemble_old() {
475        let mut x = Assembler::new();
476        x.insert(0, Bytes::from_static(b"1234"), 4);
477        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
478        x.insert(0, Bytes::from_static(b"1234"), 4);
479        assert_matches!(next(&mut x, 32), None);
480    }
481
482    #[test]
483    fn compact() {
484        let mut x = Assembler::new();
485        x.insert(0, Bytes::from_static(b"abc"), 4);
486        x.insert(3, Bytes::from_static(b"def"), 4);
487        x.insert(9, Bytes::from_static(b"jkl"), 4);
488        x.insert(12, Bytes::from_static(b"mno"), 4);
489        x.defragment();
490        assert_eq!(
491            next_unordered(&mut x),
492            Chunk::new(0, Bytes::from_static(b"abcdef"))
493        );
494        assert_eq!(
495            next_unordered(&mut x),
496            Chunk::new(9, Bytes::from_static(b"jklmno"))
497        );
498    }
499
500    #[test]
501    fn defrag_with_missing_prefix() {
502        let mut x = Assembler::new();
503        x.insert(3, Bytes::from_static(b"def"), 3);
504        x.defragment();
505        assert_eq!(
506            next_unordered(&mut x),
507            Chunk::new(3, Bytes::from_static(b"def"))
508        );
509    }
510
511    #[test]
512    fn defrag_read_chunk() {
513        let mut x = Assembler::new();
514        x.insert(3, Bytes::from_static(b"def"), 4);
515        x.insert(0, Bytes::from_static(b"abc"), 4);
516        x.insert(7, Bytes::from_static(b"hij"), 4);
517        x.insert(11, Bytes::from_static(b"lmn"), 4);
518        x.defragment();
519        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"abcdef");
520        x.insert(5, Bytes::from_static(b"fghijklmn"), 9);
521        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"ghijklmn");
522        x.insert(13, Bytes::from_static(b"nopq"), 4);
523        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"opq");
524        x.insert(15, Bytes::from_static(b"pqrs"), 4);
525        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"rs");
526        assert_matches!(x.read(usize::MAX, true), None);
527    }
528
529    #[test]
530    fn unordered_happy_path() {
531        let mut x = Assembler::new();
532        x.ensure_ordering(false).unwrap();
533        x.insert(0, Bytes::from_static(b"abc"), 3);
534        assert_eq!(
535            next_unordered(&mut x),
536            Chunk::new(0, Bytes::from_static(b"abc"))
537        );
538        assert_eq!(x.read(usize::MAX, false), None);
539        x.insert(3, Bytes::from_static(b"def"), 3);
540        assert_eq!(
541            next_unordered(&mut x),
542            Chunk::new(3, Bytes::from_static(b"def"))
543        );
544        assert_eq!(x.read(usize::MAX, false), None);
545    }
546
547    #[test]
548    fn unordered_dedup() {
549        let mut x = Assembler::new();
550        x.ensure_ordering(false).unwrap();
551        x.insert(3, Bytes::from_static(b"def"), 3);
552        assert_eq!(
553            next_unordered(&mut x),
554            Chunk::new(3, Bytes::from_static(b"def"))
555        );
556        assert_eq!(x.read(usize::MAX, false), None);
557        x.insert(0, Bytes::from_static(b"a"), 1);
558        x.insert(0, Bytes::from_static(b"abcdefghi"), 9);
559        x.insert(0, Bytes::from_static(b"abcd"), 4);
560        assert_eq!(
561            next_unordered(&mut x),
562            Chunk::new(0, Bytes::from_static(b"a"))
563        );
564        assert_eq!(
565            next_unordered(&mut x),
566            Chunk::new(1, Bytes::from_static(b"bc"))
567        );
568        assert_eq!(
569            next_unordered(&mut x),
570            Chunk::new(6, Bytes::from_static(b"ghi"))
571        );
572        assert_eq!(x.read(usize::MAX, false), None);
573        x.insert(8, Bytes::from_static(b"ijkl"), 4);
574        assert_eq!(
575            next_unordered(&mut x),
576            Chunk::new(9, Bytes::from_static(b"jkl"))
577        );
578        assert_eq!(x.read(usize::MAX, false), None);
579        x.insert(12, Bytes::from_static(b"mno"), 3);
580        assert_eq!(
581            next_unordered(&mut x),
582            Chunk::new(12, Bytes::from_static(b"mno"))
583        );
584        assert_eq!(x.read(usize::MAX, false), None);
585        x.insert(2, Bytes::from_static(b"cde"), 3);
586        assert_eq!(x.read(usize::MAX, false), None);
587    }
588
589    #[test]
590    fn chunks_dedup() {
591        let mut x = Assembler::new();
592        x.insert(3, Bytes::from_static(b"def"), 3);
593        assert_eq!(x.read(usize::MAX, true), None);
594        x.insert(0, Bytes::from_static(b"a"), 1);
595        x.insert(1, Bytes::from_static(b"bcdefghi"), 9);
596        x.insert(0, Bytes::from_static(b"abcd"), 4);
597        assert_eq!(
598            x.read(usize::MAX, true),
599            Some(Chunk::new(0, Bytes::from_static(b"abcd")))
600        );
601        assert_eq!(
602            x.read(usize::MAX, true),
603            Some(Chunk::new(4, Bytes::from_static(b"efghi")))
604        );
605        assert_eq!(x.read(usize::MAX, true), None);
606        x.insert(8, Bytes::from_static(b"ijkl"), 4);
607        assert_eq!(
608            x.read(usize::MAX, true),
609            Some(Chunk::new(9, Bytes::from_static(b"jkl")))
610        );
611        assert_eq!(x.read(usize::MAX, true), None);
612        x.insert(12, Bytes::from_static(b"mno"), 3);
613        assert_eq!(
614            x.read(usize::MAX, true),
615            Some(Chunk::new(12, Bytes::from_static(b"mno")))
616        );
617        assert_eq!(x.read(usize::MAX, true), None);
618        x.insert(2, Bytes::from_static(b"cde"), 3);
619        assert_eq!(x.read(usize::MAX, true), None);
620    }
621
622    #[test]
623    fn ordered_eager_discard() {
624        let mut x = Assembler::new();
625        x.insert(0, Bytes::from_static(b"abc"), 3);
626        assert_eq!(x.data.len(), 1);
627        assert_eq!(
628            x.read(usize::MAX, true),
629            Some(Chunk::new(0, Bytes::from_static(b"abc")))
630        );
631        x.insert(0, Bytes::from_static(b"ab"), 2);
632        assert_eq!(x.data.len(), 0);
633        x.insert(2, Bytes::from_static(b"cd"), 2);
634        assert_eq!(
635            x.data.peek(),
636            Some(&Buffer::new(3, Bytes::from_static(b"d"), 2))
637        );
638    }
639
640    #[test]
641    fn ordered_insert_unordered_read() {
642        let mut x = Assembler::new();
643        x.insert(0, Bytes::from_static(b"abc"), 3);
644        x.insert(0, Bytes::from_static(b"abc"), 3);
645        x.ensure_ordering(false).unwrap();
646        assert_eq!(
647            x.read(3, false),
648            Some(Chunk::new(0, Bytes::from_static(b"abc")))
649        );
650        assert_eq!(x.read(3, false), None);
651    }
652
653    #[test]
654    fn no_duplicate_after_mode_switch() {
655        // Regression test: bytes read in ordered mode should not be returned again in unordered mode
656        let mut x = Assembler::new();
657        x.insert(0, Bytes::from_static(b"a"), 1);
658        x.insert(0, Bytes::from_static(b"a"), 1); // duplicate
659        assert_eq!(
660            x.read(1, true),
661            Some(Chunk::new(0, Bytes::from_static(b"a")))
662        );
663        x.ensure_ordering(false).unwrap();
664        assert_eq!(x.read(1, false), None); // should be None, byte 0 already returned
665    }
666
667    fn next_unordered(x: &mut Assembler) -> Chunk {
668        x.read(usize::MAX, false).unwrap()
669    }
670
671    fn next(x: &mut Assembler, size: usize) -> Option<Bytes> {
672        x.read(size, true).map(|chunk| chunk.bytes)
673    }
674}
675
676#[cfg(all(test, not(target_family = "wasm")))]
677mod proptests {
678    use proptest::prelude::*;
679    use test_strategy::{Arbitrary, proptest};
680
681    use super::*;
682
683    const MAX_OFFSET: u64 = 512;
684    const MAX_LEN: usize = 64;
685
686    #[derive(Debug, Clone, Arbitrary)]
687    enum Op {
688        #[weight(10)]
689        Insert {
690            #[strategy(0..MAX_OFFSET)]
691            offset: u64,
692            #[strategy(1..MAX_LEN)]
693            len: usize,
694        },
695        #[weight(10)]
696        Read {
697            #[strategy(1..MAX_LEN)]
698            max_len: usize,
699        },
700        #[weight(1)]
701        EnsureOrdering { ordered: bool },
702        #[weight(1)]
703        Defragment,
704    }
705
706    /// Tracks the state of the assembler for verification
707    struct RefState {
708        received: Vec<bool>,
709        returned: Vec<bool>,
710        ordered: bool,
711    }
712
713    fn set_range(bits: &mut [bool], start: u64, len: usize) {
714        for i in start..(start + len as u64).min(bits.len() as u64) {
715            bits[i as usize] = true;
716        }
717    }
718
719    impl RefState {
720        fn new() -> Self {
721            Self {
722                received: vec![false; MAX_OFFSET as usize],
723                returned: vec![false; MAX_OFFSET as usize],
724                ordered: true,
725            }
726        }
727
728        fn insert(&mut self, offset: u64, len: usize) {
729            set_range(&mut self.received, offset, len);
730        }
731
732        fn ensure_ordering(&mut self, ordered: bool) -> bool {
733            if ordered && !self.ordered {
734                return false;
735            }
736            self.ordered = ordered;
737            true
738        }
739
740        fn bytes_read(&self) -> u64 {
741            self.returned.iter().filter(|&&x| x).count() as u64
742        }
743    }
744
745    fn make_data() -> Vec<u8> {
746        use rand::{Rng, SeedableRng};
747        let mut rng = rand::rngs::StdRng::seed_from_u64(0xDEADBEEF);
748        let mut data = vec![0u8; MAX_OFFSET as usize];
749        rng.fill(data.as_mut_slice());
750        data
751    }
752
753    fn get_slice(data: &[u8], offset: u64, len: usize) -> Bytes {
754        let start = offset as usize;
755        let end = (start + len).min(data.len());
756        Bytes::copy_from_slice(&data[start..end])
757    }
758
759    fn verify_chunk(data: &[u8], chunk: &Chunk) -> bool {
760        let start = chunk.offset as usize;
761        chunk.bytes[..] == data[start..start + chunk.bytes.len()]
762    }
763
764    #[proptest]
765    fn assembler_matches_reference(
766        #[strategy(proptest::collection::vec(any::<Op>(), 1..100))] ops: Vec<Op>,
767    ) {
768        let data = make_data();
769        let mut asm = Assembler::new();
770        let mut reference = RefState::new();
771
772        for op in ops {
773            match op {
774                Op::Insert { offset, len } => {
775                    let bytes = get_slice(&data, offset, len);
776                    asm.insert(offset, bytes, len);
777                    reference.insert(offset, len);
778                }
779                Op::Read { max_len } => {
780                    let ordered = reference.ordered;
781                    let actual = asm.read(max_len, ordered);
782
783                    match actual {
784                        None => {
785                            // Should only be None if no unreturned received bytes available
786                            let has_available = if ordered {
787                                // In ordered mode, check if the first unreturned byte is received
788                                reference
789                                    .returned
790                                    .iter()
791                                    .position(|&x| !x)
792                                    .is_some_and(|pos| reference.received[pos])
793                            } else {
794                                // In unordered mode, check if any unreturned received byte exists
795                                reference
796                                    .received
797                                    .iter()
798                                    .zip(&reference.returned)
799                                    .any(|(&r, &ret)| r && !ret)
800                            };
801                            prop_assert!(
802                                !has_available,
803                                "read returned None but data was available"
804                            );
805                        }
806                        Some(chunk) => {
807                            prop_assert!(chunk.bytes.len() <= max_len, "chunk exceeds max_len");
808                            prop_assert!(verify_chunk(&data, &chunk), "data corruption");
809                            // Mark as returned, check for duplicates
810                            for i in 0..chunk.bytes.len() {
811                                let offset = chunk.offset as usize + i;
812                                prop_assert!(
813                                    reference.received[offset],
814                                    "returned unreceived byte at {offset}"
815                                );
816                                prop_assert!(
817                                    !reference.returned[offset],
818                                    "duplicate byte at {offset}"
819                                );
820                                reference.returned[offset] = true;
821                            }
822                        }
823                    }
824                }
825                Op::EnsureOrdering { ordered } => {
826                    let actual = asm.ensure_ordering(ordered).is_ok();
827                    let expected = reference.ensure_ordering(ordered);
828                    prop_assert_eq!(actual, expected, "ensure_ordering result mismatch");
829                }
830                Op::Defragment => {
831                    if asm.state.is_ordered() {
832                        asm.defragment();
833                    }
834                }
835            }
836        }
837
838        // Invariant: bytes_read matches
839        prop_assert_eq!(
840            asm.bytes_read(),
841            reference.bytes_read(),
842            "bytes_read mismatch"
843        );
844    }
845}