1use std::{
2 cmp::Ordering,
3 collections::{BinaryHeap, binary_heap::PeekMut},
4 mem,
5};
6
7use bytes::{Buf, Bytes, BytesMut};
8
9use crate::range_set::ArrayRangeSet;
10
11#[derive(Debug, Default)]
13pub(super) struct Assembler {
14 state: State,
15 data: BinaryHeap<Buffer>,
16 buffered: usize,
18 allocated: usize,
20 bytes_read: u64,
24 end: u64,
25}
26
27impl Assembler {
28 pub(super) fn new() -> Self {
29 Self::default()
30 }
31
32 pub(super) fn reinit(&mut self) {
34 let old_data = mem::take(&mut self.data);
35 *self = Self::default();
36 self.data = old_data;
37 self.data.clear();
38 }
39
40 pub(super) fn ensure_ordering(&mut self, ordered: bool) -> Result<(), IllegalOrderedRead> {
41 if ordered && !self.state.is_ordered() {
42 return Err(IllegalOrderedRead);
43 } else if !ordered && self.state.is_ordered() {
44 if !self.data.is_empty() {
46 self.defragment();
48 }
49 let mut recvd = ArrayRangeSet::new();
50 recvd.insert(0..self.bytes_read);
51 for chunk in &self.data {
52 recvd.insert(chunk.offset..chunk.offset + chunk.bytes.len() as u64);
53 }
54 self.state = State::Unordered { recvd };
55 }
56 Ok(())
57 }
58
59 pub(super) fn read(&mut self, max_length: usize, ordered: bool) -> Option<Chunk> {
61 loop {
62 let mut chunk = self.data.peek_mut()?;
63
64 if ordered {
65 if chunk.offset > self.bytes_read {
66 return None;
68 } else if (chunk.offset + chunk.bytes.len() as u64) <= self.bytes_read {
69 self.buffered -= chunk.bytes.len();
71 self.allocated -= chunk.allocation_size;
72 PeekMut::pop(chunk);
73 continue;
74 }
75
76 let start = (self.bytes_read - chunk.offset) as usize;
78 if start > 0 {
79 chunk.bytes.advance(start);
80 chunk.offset += start as u64;
81 self.buffered -= start;
82 }
83 }
84
85 return Some(if max_length < chunk.bytes.len() {
86 self.bytes_read += max_length as u64;
87 let offset = chunk.offset;
88 chunk.offset += max_length as u64;
89 self.buffered -= max_length;
90 Chunk::new(offset, chunk.bytes.split_to(max_length))
91 } else {
92 self.bytes_read += chunk.bytes.len() as u64;
93 self.buffered -= chunk.bytes.len();
94 self.allocated -= chunk.allocation_size;
95 let chunk = PeekMut::pop(chunk);
96 Chunk::new(chunk.offset, chunk.bytes)
97 });
98 }
99 }
100
101 fn defragment(&mut self) {
106 let new = BinaryHeap::with_capacity(self.data.len());
107 let old = mem::replace(&mut self.data, new);
108 let mut buffers = old.into_sorted_vec();
109 self.buffered = 0;
110 let mut fragmented_buffered = 0;
111 let mut offset = self.bytes_read;
112 for chunk in buffers.iter_mut().rev() {
113 chunk.try_mark_defragment(offset);
114 let size = chunk.bytes.len();
115 offset = chunk.offset + size as u64;
116 self.buffered += size;
117 if !chunk.defragmented {
118 fragmented_buffered += size;
119 }
120 }
121 self.allocated = self.buffered;
122 let mut buffer = BytesMut::with_capacity(fragmented_buffered);
123 let mut offset = self.bytes_read;
124 for chunk in buffers.into_iter().rev() {
125 if chunk.defragmented {
126 if !chunk.bytes.is_empty() {
128 self.data.push(chunk);
129 }
130 continue;
131 }
132 if chunk.offset != offset + (buffer.len() as u64) {
134 if !buffer.is_empty() {
135 self.data
136 .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
137 }
138 offset = chunk.offset;
139 }
140 buffer.extend_from_slice(&chunk.bytes);
141 }
142 if !buffer.is_empty() {
143 self.data
144 .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
145 }
146 }
147
148 pub(super) fn insert(&mut self, mut offset: u64, mut bytes: Bytes, allocation_size: usize) {
151 debug_assert!(
152 bytes.len() <= allocation_size,
153 "allocation_size less than bytes.len(): {:?} < {:?}",
154 allocation_size,
155 bytes.len()
156 );
157 self.end = self.end.max(offset + bytes.len() as u64);
158 if let State::Unordered { ref mut recvd } = self.state {
159 let range = offset..offset + bytes.len() as u64;
161 for duplicate in recvd.iter_range(range.clone()) {
162 if duplicate.start > offset {
163 let buffer = Buffer::new(
164 offset,
165 bytes.split_to((duplicate.start - offset) as usize),
166 allocation_size,
167 );
168 self.buffered += buffer.bytes.len();
169 self.allocated += buffer.allocation_size;
170 self.data.push(buffer);
171 offset = duplicate.start;
172 }
173 bytes.advance((duplicate.end - offset) as usize);
174 offset = duplicate.end;
175 }
176 recvd.insert(range);
177 } else if offset < self.bytes_read {
178 if (offset + bytes.len() as u64) <= self.bytes_read {
179 return;
180 } else {
181 let diff = self.bytes_read - offset;
182 offset += diff;
183 bytes.advance(diff as usize);
184 }
185 }
186
187 if bytes.is_empty() {
188 return;
189 }
190 let buffer = Buffer::new(offset, bytes, allocation_size);
191 self.buffered += buffer.bytes.len();
192 self.allocated += buffer.allocation_size;
193 self.data.push(buffer);
194 let buffered = self.buffered.min((self.end - self.bytes_read) as usize);
199 let over_allocation = self.allocated - buffered;
200 let threshold = 32768.max(buffered * 3 / 2);
208 if over_allocation > threshold {
209 self.defragment()
210 }
211 }
212
213 pub(super) fn bytes_read(&self) -> u64 {
215 self.bytes_read
216 }
217
218 pub(super) fn clear(&mut self) {
220 self.data.clear();
221 self.buffered = 0;
222 self.allocated = 0;
223 }
224}
225
226#[derive(Debug, PartialEq, Eq)]
228pub struct Chunk {
229 pub offset: u64,
231 pub bytes: Bytes,
233}
234
235impl Chunk {
236 fn new(offset: u64, bytes: Bytes) -> Self {
237 Self { offset, bytes }
238 }
239}
240
241#[derive(Debug, Eq)]
242struct Buffer {
243 offset: u64,
244 bytes: Bytes,
245 allocation_size: usize,
249 defragmented: bool,
250}
251
252impl Buffer {
253 fn new(offset: u64, bytes: Bytes, allocation_size: usize) -> Self {
255 Self {
256 offset,
257 bytes,
258 allocation_size,
259 defragmented: false,
260 }
261 }
262
263 fn new_defragmented(offset: u64, bytes: Bytes) -> Self {
265 let allocation_size = bytes.len();
266 Self {
267 offset,
268 bytes,
269 allocation_size,
270 defragmented: true,
271 }
272 }
273
274 fn try_mark_defragment(&mut self, offset: u64) {
276 let duplicate = offset.saturating_sub(self.offset) as usize;
277 self.offset = self.offset.max(offset);
278 if duplicate >= self.bytes.len() {
279 self.bytes = Bytes::new();
281 self.defragmented = true;
282 self.allocation_size = 0;
283 return;
284 }
285 self.bytes.advance(duplicate);
286 self.defragmented = self.defragmented || self.bytes.len() * 6 / 5 >= self.allocation_size;
289 if self.defragmented {
290 self.allocation_size = self.bytes.len();
292 }
293 }
294}
295
296impl Ord for Buffer {
297 fn cmp(&self, other: &Self) -> Ordering {
300 self.offset
301 .cmp(&other.offset)
302 .reverse()
303 .then(self.bytes.len().cmp(&other.bytes.len()))
304 }
305}
306
307impl PartialOrd for Buffer {
308 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
309 Some(self.cmp(other))
310 }
311}
312
313impl PartialEq for Buffer {
314 fn eq(&self, other: &Self) -> bool {
315 (self.offset, self.bytes.len()) == (other.offset, other.bytes.len())
316 }
317}
318
319#[derive(Debug, Default)]
320enum State {
321 #[default]
322 Ordered,
323 Unordered {
324 recvd: ArrayRangeSet,
327 },
328}
329
330impl State {
331 fn is_ordered(&self) -> bool {
332 matches!(self, Self::Ordered)
333 }
334}
335
336#[derive(Debug)]
338pub struct IllegalOrderedRead;
339
340#[cfg(test)]
341mod test {
342 use super::*;
343 use assert_matches::assert_matches;
344
345 #[test]
346 fn assemble_ordered() {
347 let mut x = Assembler::new();
348 assert_matches!(next(&mut x, 32), None);
349 x.insert(0, Bytes::from_static(b"123"), 3);
350 assert_matches!(next(&mut x, 1), Some(ref y) if &y[..] == b"1");
351 assert_matches!(next(&mut x, 3), Some(ref y) if &y[..] == b"23");
352 x.insert(3, Bytes::from_static(b"456"), 3);
353 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
354 x.insert(6, Bytes::from_static(b"789"), 3);
355 x.insert(9, Bytes::from_static(b"10"), 2);
356 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"789");
357 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"10");
358 assert_matches!(next(&mut x, 32), None);
359 }
360
361 #[test]
362 fn assemble_unordered() {
363 let mut x = Assembler::new();
364 x.ensure_ordering(false).unwrap();
365 x.insert(3, Bytes::from_static(b"456"), 3);
366 assert_matches!(next(&mut x, 32), None);
367 x.insert(0, Bytes::from_static(b"123"), 3);
368 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
369 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
370 assert_matches!(next(&mut x, 32), None);
371 }
372
373 #[test]
374 fn assemble_duplicate() {
375 let mut x = Assembler::new();
376 x.insert(0, Bytes::from_static(b"123"), 3);
377 x.insert(0, Bytes::from_static(b"123"), 3);
378 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
379 assert_matches!(next(&mut x, 32), None);
380 }
381
382 #[test]
383 fn assemble_duplicate_compact() {
384 let mut x = Assembler::new();
385 x.insert(0, Bytes::from_static(b"123"), 3);
386 x.insert(0, Bytes::from_static(b"123"), 3);
387 x.defragment();
388 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
389 assert_matches!(next(&mut x, 32), None);
390 }
391
392 #[test]
393 fn assemble_contained() {
394 let mut x = Assembler::new();
395 x.insert(0, Bytes::from_static(b"12345"), 5);
396 x.insert(1, Bytes::from_static(b"234"), 3);
397 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
398 assert_matches!(next(&mut x, 32), None);
399 }
400
401 #[test]
402 fn assemble_contained_compact() {
403 let mut x = Assembler::new();
404 x.insert(0, Bytes::from_static(b"12345"), 5);
405 x.insert(1, Bytes::from_static(b"234"), 3);
406 x.defragment();
407 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
408 assert_matches!(next(&mut x, 32), None);
409 }
410
411 #[test]
412 fn assemble_contains() {
413 let mut x = Assembler::new();
414 x.insert(1, Bytes::from_static(b"234"), 3);
415 x.insert(0, Bytes::from_static(b"12345"), 5);
416 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
417 assert_matches!(next(&mut x, 32), None);
418 }
419
420 #[test]
421 fn assemble_contains_compact() {
422 let mut x = Assembler::new();
423 x.insert(1, Bytes::from_static(b"234"), 3);
424 x.insert(0, Bytes::from_static(b"12345"), 5);
425 x.defragment();
426 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
427 assert_matches!(next(&mut x, 32), None);
428 }
429
430 #[test]
431 fn assemble_overlapping() {
432 let mut x = Assembler::new();
433 x.insert(0, Bytes::from_static(b"123"), 3);
434 x.insert(1, Bytes::from_static(b"234"), 3);
435 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
436 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"4");
437 assert_matches!(next(&mut x, 32), None);
438 }
439
440 #[test]
441 fn assemble_overlapping_compact() {
442 let mut x = Assembler::new();
443 x.insert(0, Bytes::from_static(b"123"), 4);
444 x.insert(1, Bytes::from_static(b"234"), 4);
445 x.defragment();
446 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
447 assert_matches!(next(&mut x, 32), None);
448 }
449
450 #[test]
451 fn assemble_complex() {
452 let mut x = Assembler::new();
453 x.insert(0, Bytes::from_static(b"1"), 1);
454 x.insert(2, Bytes::from_static(b"3"), 1);
455 x.insert(4, Bytes::from_static(b"5"), 1);
456 x.insert(0, Bytes::from_static(b"123456"), 6);
457 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
458 assert_matches!(next(&mut x, 32), None);
459 }
460
461 #[test]
462 fn assemble_complex_compact() {
463 let mut x = Assembler::new();
464 x.insert(0, Bytes::from_static(b"1"), 1);
465 x.insert(2, Bytes::from_static(b"3"), 1);
466 x.insert(4, Bytes::from_static(b"5"), 1);
467 x.insert(0, Bytes::from_static(b"123456"), 6);
468 x.defragment();
469 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
470 assert_matches!(next(&mut x, 32), None);
471 }
472
473 #[test]
474 fn assemble_old() {
475 let mut x = Assembler::new();
476 x.insert(0, Bytes::from_static(b"1234"), 4);
477 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
478 x.insert(0, Bytes::from_static(b"1234"), 4);
479 assert_matches!(next(&mut x, 32), None);
480 }
481
482 #[test]
483 fn compact() {
484 let mut x = Assembler::new();
485 x.insert(0, Bytes::from_static(b"abc"), 4);
486 x.insert(3, Bytes::from_static(b"def"), 4);
487 x.insert(9, Bytes::from_static(b"jkl"), 4);
488 x.insert(12, Bytes::from_static(b"mno"), 4);
489 x.defragment();
490 assert_eq!(
491 next_unordered(&mut x),
492 Chunk::new(0, Bytes::from_static(b"abcdef"))
493 );
494 assert_eq!(
495 next_unordered(&mut x),
496 Chunk::new(9, Bytes::from_static(b"jklmno"))
497 );
498 }
499
500 #[test]
501 fn defrag_with_missing_prefix() {
502 let mut x = Assembler::new();
503 x.insert(3, Bytes::from_static(b"def"), 3);
504 x.defragment();
505 assert_eq!(
506 next_unordered(&mut x),
507 Chunk::new(3, Bytes::from_static(b"def"))
508 );
509 }
510
511 #[test]
512 fn defrag_read_chunk() {
513 let mut x = Assembler::new();
514 x.insert(3, Bytes::from_static(b"def"), 4);
515 x.insert(0, Bytes::from_static(b"abc"), 4);
516 x.insert(7, Bytes::from_static(b"hij"), 4);
517 x.insert(11, Bytes::from_static(b"lmn"), 4);
518 x.defragment();
519 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"abcdef");
520 x.insert(5, Bytes::from_static(b"fghijklmn"), 9);
521 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"ghijklmn");
522 x.insert(13, Bytes::from_static(b"nopq"), 4);
523 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"opq");
524 x.insert(15, Bytes::from_static(b"pqrs"), 4);
525 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"rs");
526 assert_matches!(x.read(usize::MAX, true), None);
527 }
528
529 #[test]
530 fn unordered_happy_path() {
531 let mut x = Assembler::new();
532 x.ensure_ordering(false).unwrap();
533 x.insert(0, Bytes::from_static(b"abc"), 3);
534 assert_eq!(
535 next_unordered(&mut x),
536 Chunk::new(0, Bytes::from_static(b"abc"))
537 );
538 assert_eq!(x.read(usize::MAX, false), None);
539 x.insert(3, Bytes::from_static(b"def"), 3);
540 assert_eq!(
541 next_unordered(&mut x),
542 Chunk::new(3, Bytes::from_static(b"def"))
543 );
544 assert_eq!(x.read(usize::MAX, false), None);
545 }
546
547 #[test]
548 fn unordered_dedup() {
549 let mut x = Assembler::new();
550 x.ensure_ordering(false).unwrap();
551 x.insert(3, Bytes::from_static(b"def"), 3);
552 assert_eq!(
553 next_unordered(&mut x),
554 Chunk::new(3, Bytes::from_static(b"def"))
555 );
556 assert_eq!(x.read(usize::MAX, false), None);
557 x.insert(0, Bytes::from_static(b"a"), 1);
558 x.insert(0, Bytes::from_static(b"abcdefghi"), 9);
559 x.insert(0, Bytes::from_static(b"abcd"), 4);
560 assert_eq!(
561 next_unordered(&mut x),
562 Chunk::new(0, Bytes::from_static(b"a"))
563 );
564 assert_eq!(
565 next_unordered(&mut x),
566 Chunk::new(1, Bytes::from_static(b"bc"))
567 );
568 assert_eq!(
569 next_unordered(&mut x),
570 Chunk::new(6, Bytes::from_static(b"ghi"))
571 );
572 assert_eq!(x.read(usize::MAX, false), None);
573 x.insert(8, Bytes::from_static(b"ijkl"), 4);
574 assert_eq!(
575 next_unordered(&mut x),
576 Chunk::new(9, Bytes::from_static(b"jkl"))
577 );
578 assert_eq!(x.read(usize::MAX, false), None);
579 x.insert(12, Bytes::from_static(b"mno"), 3);
580 assert_eq!(
581 next_unordered(&mut x),
582 Chunk::new(12, Bytes::from_static(b"mno"))
583 );
584 assert_eq!(x.read(usize::MAX, false), None);
585 x.insert(2, Bytes::from_static(b"cde"), 3);
586 assert_eq!(x.read(usize::MAX, false), None);
587 }
588
589 #[test]
590 fn chunks_dedup() {
591 let mut x = Assembler::new();
592 x.insert(3, Bytes::from_static(b"def"), 3);
593 assert_eq!(x.read(usize::MAX, true), None);
594 x.insert(0, Bytes::from_static(b"a"), 1);
595 x.insert(1, Bytes::from_static(b"bcdefghi"), 9);
596 x.insert(0, Bytes::from_static(b"abcd"), 4);
597 assert_eq!(
598 x.read(usize::MAX, true),
599 Some(Chunk::new(0, Bytes::from_static(b"abcd")))
600 );
601 assert_eq!(
602 x.read(usize::MAX, true),
603 Some(Chunk::new(4, Bytes::from_static(b"efghi")))
604 );
605 assert_eq!(x.read(usize::MAX, true), None);
606 x.insert(8, Bytes::from_static(b"ijkl"), 4);
607 assert_eq!(
608 x.read(usize::MAX, true),
609 Some(Chunk::new(9, Bytes::from_static(b"jkl")))
610 );
611 assert_eq!(x.read(usize::MAX, true), None);
612 x.insert(12, Bytes::from_static(b"mno"), 3);
613 assert_eq!(
614 x.read(usize::MAX, true),
615 Some(Chunk::new(12, Bytes::from_static(b"mno")))
616 );
617 assert_eq!(x.read(usize::MAX, true), None);
618 x.insert(2, Bytes::from_static(b"cde"), 3);
619 assert_eq!(x.read(usize::MAX, true), None);
620 }
621
622 #[test]
623 fn ordered_eager_discard() {
624 let mut x = Assembler::new();
625 x.insert(0, Bytes::from_static(b"abc"), 3);
626 assert_eq!(x.data.len(), 1);
627 assert_eq!(
628 x.read(usize::MAX, true),
629 Some(Chunk::new(0, Bytes::from_static(b"abc")))
630 );
631 x.insert(0, Bytes::from_static(b"ab"), 2);
632 assert_eq!(x.data.len(), 0);
633 x.insert(2, Bytes::from_static(b"cd"), 2);
634 assert_eq!(
635 x.data.peek(),
636 Some(&Buffer::new(3, Bytes::from_static(b"d"), 2))
637 );
638 }
639
640 #[test]
641 fn ordered_insert_unordered_read() {
642 let mut x = Assembler::new();
643 x.insert(0, Bytes::from_static(b"abc"), 3);
644 x.insert(0, Bytes::from_static(b"abc"), 3);
645 x.ensure_ordering(false).unwrap();
646 assert_eq!(
647 x.read(3, false),
648 Some(Chunk::new(0, Bytes::from_static(b"abc")))
649 );
650 assert_eq!(x.read(3, false), None);
651 }
652
653 #[test]
654 fn no_duplicate_after_mode_switch() {
655 let mut x = Assembler::new();
657 x.insert(0, Bytes::from_static(b"a"), 1);
658 x.insert(0, Bytes::from_static(b"a"), 1); assert_eq!(
660 x.read(1, true),
661 Some(Chunk::new(0, Bytes::from_static(b"a")))
662 );
663 x.ensure_ordering(false).unwrap();
664 assert_eq!(x.read(1, false), None); }
666
667 fn next_unordered(x: &mut Assembler) -> Chunk {
668 x.read(usize::MAX, false).unwrap()
669 }
670
671 fn next(x: &mut Assembler, size: usize) -> Option<Bytes> {
672 x.read(size, true).map(|chunk| chunk.bytes)
673 }
674}
675
676#[cfg(all(test, not(target_family = "wasm")))]
677mod proptests {
678 use proptest::prelude::*;
679 use test_strategy::{Arbitrary, proptest};
680
681 use super::*;
682
683 const MAX_OFFSET: u64 = 512;
684 const MAX_LEN: usize = 64;
685
686 #[derive(Debug, Clone, Arbitrary)]
687 enum Op {
688 #[weight(10)]
689 Insert {
690 #[strategy(0..MAX_OFFSET)]
691 offset: u64,
692 #[strategy(1..MAX_LEN)]
693 len: usize,
694 },
695 #[weight(10)]
696 Read {
697 #[strategy(1..MAX_LEN)]
698 max_len: usize,
699 },
700 #[weight(1)]
701 EnsureOrdering { ordered: bool },
702 #[weight(1)]
703 Defragment,
704 }
705
706 struct RefState {
708 received: Vec<bool>,
709 returned: Vec<bool>,
710 ordered: bool,
711 }
712
713 fn set_range(bits: &mut [bool], start: u64, len: usize) {
714 for i in start..(start + len as u64).min(bits.len() as u64) {
715 bits[i as usize] = true;
716 }
717 }
718
719 impl RefState {
720 fn new() -> Self {
721 Self {
722 received: vec![false; MAX_OFFSET as usize],
723 returned: vec![false; MAX_OFFSET as usize],
724 ordered: true,
725 }
726 }
727
728 fn insert(&mut self, offset: u64, len: usize) {
729 set_range(&mut self.received, offset, len);
730 }
731
732 fn ensure_ordering(&mut self, ordered: bool) -> bool {
733 if ordered && !self.ordered {
734 return false;
735 }
736 self.ordered = ordered;
737 true
738 }
739
740 fn bytes_read(&self) -> u64 {
741 self.returned.iter().filter(|&&x| x).count() as u64
742 }
743 }
744
745 fn make_data() -> Vec<u8> {
746 use rand::{Rng, SeedableRng};
747 let mut rng = rand::rngs::StdRng::seed_from_u64(0xDEADBEEF);
748 let mut data = vec![0u8; MAX_OFFSET as usize];
749 rng.fill(data.as_mut_slice());
750 data
751 }
752
753 fn get_slice(data: &[u8], offset: u64, len: usize) -> Bytes {
754 let start = offset as usize;
755 let end = (start + len).min(data.len());
756 Bytes::copy_from_slice(&data[start..end])
757 }
758
759 fn verify_chunk(data: &[u8], chunk: &Chunk) -> bool {
760 let start = chunk.offset as usize;
761 chunk.bytes[..] == data[start..start + chunk.bytes.len()]
762 }
763
764 #[proptest]
765 fn assembler_matches_reference(
766 #[strategy(proptest::collection::vec(any::<Op>(), 1..100))] ops: Vec<Op>,
767 ) {
768 let data = make_data();
769 let mut asm = Assembler::new();
770 let mut reference = RefState::new();
771
772 for op in ops {
773 match op {
774 Op::Insert { offset, len } => {
775 let bytes = get_slice(&data, offset, len);
776 asm.insert(offset, bytes, len);
777 reference.insert(offset, len);
778 }
779 Op::Read { max_len } => {
780 let ordered = reference.ordered;
781 let actual = asm.read(max_len, ordered);
782
783 match actual {
784 None => {
785 let has_available = if ordered {
787 reference
789 .returned
790 .iter()
791 .position(|&x| !x)
792 .is_some_and(|pos| reference.received[pos])
793 } else {
794 reference
796 .received
797 .iter()
798 .zip(&reference.returned)
799 .any(|(&r, &ret)| r && !ret)
800 };
801 prop_assert!(
802 !has_available,
803 "read returned None but data was available"
804 );
805 }
806 Some(chunk) => {
807 prop_assert!(chunk.bytes.len() <= max_len, "chunk exceeds max_len");
808 prop_assert!(verify_chunk(&data, &chunk), "data corruption");
809 for i in 0..chunk.bytes.len() {
811 let offset = chunk.offset as usize + i;
812 prop_assert!(
813 reference.received[offset],
814 "returned unreceived byte at {offset}"
815 );
816 prop_assert!(
817 !reference.returned[offset],
818 "duplicate byte at {offset}"
819 );
820 reference.returned[offset] = true;
821 }
822 }
823 }
824 }
825 Op::EnsureOrdering { ordered } => {
826 let actual = asm.ensure_ordering(ordered).is_ok();
827 let expected = reference.ensure_ordering(ordered);
828 prop_assert_eq!(actual, expected, "ensure_ordering result mismatch");
829 }
830 Op::Defragment => {
831 if asm.state.is_ordered() {
832 asm.defragment();
833 }
834 }
835 }
836 }
837
838 prop_assert_eq!(
840 asm.bytes_read(),
841 reference.bytes_read(),
842 "bytes_read mismatch"
843 );
844 }
845}