1use std::{
67 fmt::{self, Debug},
68 fs,
69 future::Future,
70 io::Write,
71 num::NonZeroU64,
72 ops::Deref,
73 path::{Path, PathBuf},
74 sync::{
75 atomic::{AtomicU64, Ordering},
76 Arc,
77 },
78};
79
80use bao_tree::{
81 blake3,
82 io::{
83 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
84 outboard::PreOrderOutboard,
85 sync::ReadAt,
86 BaoContentItem, Leaf,
87 },
88 BaoTree, ChunkNum, ChunkRanges,
89};
90use bytes::Bytes;
91use delete_set::{BaoFilePart, ProtectHandle};
92use entity_manager::{EntityManagerState, SpawnArg};
93use entry_state::{DataLocation, OutboardLocation};
94use gc::run_gc;
95use import::{ImportEntry, ImportSource};
96use irpc::{channel::mpsc, RpcMessage};
97use meta::list_blobs;
98use n0_future::{future::yield_now, io};
99use nested_enum_utils::enum_conversions;
100use range_collections::range_set::RangeSetRange;
101use tokio::task::{JoinError, JoinSet};
102use tracing::{error, instrument, trace};
103
104use crate::{
105 api::{
106 proto::{
107 self, bitfield::is_validated, BatchMsg, BatchResponse, Bitfield, Command,
108 CreateTempTagMsg, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
109 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, HashSpecific, ImportBaoMsg,
110 ImportBaoRequest, ObserveMsg, Scope,
111 },
112 ApiClient,
113 },
114 protocol::ChunkRangesExt,
115 store::{
116 fs::{
117 bao_file::{
118 BaoFileStorage, BaoFileStorageSubscriber, CompleteStorage, DataReader,
119 OutboardReader,
120 },
121 util::entity_manager::{self, ActiveEntityState},
122 },
123 util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
124 IROH_BLOCK_SIZE,
125 },
126 util::{
127 channel::oneshot,
128 temp_tag::{TagDrop, TempTag, TempTagScope, TempTags},
129 },
130 Hash,
131};
132mod bao_file;
133use bao_file::BaoFileHandle;
134mod delete_set;
135mod entry_state;
136mod import;
137mod meta;
138pub mod options;
139pub(crate) mod util;
140use entry_state::EntryState;
141use import::{import_byte_stream, import_bytes, import_path, ImportEntryMsg};
142use options::Options;
143use tracing::Instrument;
144mod gc;
145
146use crate::{
147 api::{
148 self,
149 blobs::{AddProgressItem, ExportMode, ExportProgressItem},
150 Store,
151 },
152 HashAndFormat,
153};
154
155const MAX_EXTERNAL_PATHS: usize = 8;
157
158fn new_uuid() -> [u8; 16] {
160 use rand::RngCore;
161 let mut rng = rand::thread_rng();
162 let mut bytes = [0u8; 16];
163 rng.fill_bytes(&mut bytes);
164 bytes
165}
166
167fn temp_name() -> String {
169 format!("{}.temp", hex::encode(new_uuid()))
170}
171
172#[derive(Debug)]
173#[enum_conversions()]
174pub(crate) enum InternalCommand {
175 Dump(meta::Dump),
176 FinishImport(ImportEntryMsg),
177 ClearScope(ClearScope),
178}
179
180#[derive(Debug)]
181pub(crate) struct ClearScope {
182 pub scope: Scope,
183}
184
185impl InternalCommand {
186 pub fn parent_span(&self) -> tracing::Span {
187 match self {
188 Self::Dump(_) => tracing::Span::current(),
189 Self::ClearScope(_) => tracing::Span::current(),
190 Self::FinishImport(cmd) => cmd
191 .parent_span_opt()
192 .cloned()
193 .unwrap_or_else(tracing::Span::current),
194 }
195 }
196}
197
198#[derive(Debug)]
200struct TaskContext {
201 pub options: Arc<Options>,
203 pub db: meta::Db,
205 pub internal_cmd_tx: tokio::sync::mpsc::Sender<InternalCommand>,
207 pub protect: ProtectHandle,
209}
210
211impl TaskContext {
212 pub async fn clear_scope(&self, scope: Scope) {
213 self.internal_cmd_tx
214 .send(ClearScope { scope }.into())
215 .await
216 .ok();
217 }
218}
219
220#[derive(Debug)]
221struct EmParams;
222
223impl entity_manager::Params for EmParams {
224 type EntityId = Hash;
225
226 type GlobalState = Arc<TaskContext>;
227
228 type EntityState = BaoFileHandle;
229
230 async fn on_shutdown(
231 state: entity_manager::ActiveEntityState<Self>,
232 cause: entity_manager::ShutdownCause,
233 ) {
234 trace!("persist {:?} due to {cause:?}", state.id);
235 state.persist().await;
236 }
237}
238
239#[derive(Debug)]
240struct Actor {
241 context: Arc<TaskContext>,
243 cmd_rx: tokio::sync::mpsc::Receiver<Command>,
245 fs_cmd_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
247 tasks: JoinSet<()>,
249 handles: EntityManagerState<EmParams>,
251 temp_tags: TempTags,
253 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
255 _rt: RtWrapper,
257}
258
259type HashContext = ActiveEntityState<EmParams>;
260
261impl SyncEntityApi for HashContext {
262 async fn load(&self) {
268 enum Action {
269 Load,
270 Wait,
271 None,
272 }
273 let mut action = Action::None;
274 self.state.send_if_modified(|guard| match guard.deref() {
275 BaoFileStorage::Initial => {
276 *guard = BaoFileStorage::Loading;
277 action = Action::Load;
278 true
279 }
280 BaoFileStorage::Loading => {
281 action = Action::Wait;
282 false
283 }
284 _ => false,
285 });
286 match action {
287 Action::Load => {
288 let state = if self.id == Hash::EMPTY {
289 BaoFileStorage::Complete(CompleteStorage {
290 data: MemOrFile::Mem(Bytes::new()),
291 outboard: MemOrFile::empty(),
292 })
293 } else {
294 match self.global.db.get(self.id).await {
297 Ok(state) => match BaoFileStorage::open(state, self).await {
298 Ok(handle) => handle,
299 Err(_) => BaoFileStorage::Poisoned,
300 },
301 Err(_) => BaoFileStorage::Poisoned,
302 }
303 };
304 self.state.send_replace(state);
305 }
306 Action::Wait => {
307 while matches!(self.state.borrow().deref(), BaoFileStorage::Loading) {
310 self.state.0.subscribe().changed().await.ok();
311 }
312 }
313 Action::None => {}
314 }
315 }
316
317 async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()> {
319 trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len());
320 let mut res = Ok(None);
321 self.state.send_if_modified(|state| {
322 let Ok((state1, update)) = state.take().write_batch(batch, bitfield, self) else {
323 res = Err(io::Error::other("write batch failed"));
324 return false;
325 };
326 res = Ok(update);
327 *state = state1;
328 true
329 });
330 if let Some(update) = res? {
331 self.global.db.update(self.id, update).await?;
332 }
333 Ok(())
334 }
335
336 #[allow(refining_impl_trait_internal)]
341 fn data_reader(&self) -> DataReader {
342 DataReader(self.state.clone())
343 }
344
345 #[allow(refining_impl_trait_internal)]
350 fn outboard_reader(&self) -> OutboardReader {
351 OutboardReader(self.state.clone())
352 }
353
354 fn current_size(&self) -> io::Result<u64> {
356 match self.state.borrow().deref() {
357 BaoFileStorage::Complete(mem) => Ok(mem.size()),
358 BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()),
359 BaoFileStorage::Partial(file) => file.current_size(),
360 BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")),
361 BaoFileStorage::Initial => Err(io::Error::other("initial")),
362 BaoFileStorage::Loading => Err(io::Error::other("loading")),
363 BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()),
364 }
365 }
366
367 fn bitfield(&self) -> io::Result<Bitfield> {
369 match self.state.borrow().deref() {
370 BaoFileStorage::Complete(mem) => Ok(mem.bitfield()),
371 BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()),
372 BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()),
373 BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")),
374 BaoFileStorage::Initial => Err(io::Error::other("initial")),
375 BaoFileStorage::Loading => Err(io::Error::other("loading")),
376 BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()),
377 }
378 }
379}
380
381impl HashContext {
382 pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader>> {
384 let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE);
385 let outboard = self.outboard_reader();
386 Ok(PreOrderOutboard {
387 root: blake3::Hash::from(self.id),
388 tree,
389 data: outboard,
390 })
391 }
392
393 fn db(&self) -> &meta::Db {
394 &self.global.db
395 }
396
397 pub fn options(&self) -> &Arc<Options> {
398 &self.global.options
399 }
400
401 pub fn protect(&self, parts: impl IntoIterator<Item = BaoFilePart>) {
402 self.global.protect.protect(self.id, parts);
403 }
404
405 pub async fn update_await(&self, state: EntryState<Bytes>) -> io::Result<()> {
407 self.db().update_await(self.id, state).await?;
408 Ok(())
409 }
410
411 pub async fn get_entry_state(&self) -> io::Result<Option<EntryState<Bytes>>> {
412 let hash = self.id;
413 if hash == Hash::EMPTY {
414 return Ok(Some(EntryState::Complete {
415 data_location: DataLocation::Inline(Bytes::new()),
416 outboard_location: OutboardLocation::NotNeeded,
417 }));
418 };
419 self.db().get(hash).await
420 }
421
422 pub async fn set(&self, state: EntryState<Bytes>) -> io::Result<()> {
424 self.db().set(self.id, state).await
425 }
426}
427
428impl Actor {
429 fn db(&self) -> &meta::Db {
430 &self.context.db
431 }
432
433 fn context(&self) -> Arc<TaskContext> {
434 self.context.clone()
435 }
436
437 fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
438 let span = tracing::Span::current();
439 self.tasks.spawn(fut.instrument(span));
440 }
441
442 fn log_task_result(res: Result<(), JoinError>) {
443 match res {
444 Ok(_) => {}
445 Err(e) => {
446 error!("task failed: {e}");
447 }
448 }
449 }
450
451 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
452 let CreateTempTagMsg { tx, inner, .. } = cmd;
453 let mut tt = self.temp_tags.create(inner.scope, inner.value);
454 if tx.is_rpc() {
455 tt.leak();
456 }
457 tx.send(tt).await.ok();
458 }
459
460 async fn handle_command(&mut self, cmd: Command) {
461 let span = cmd.parent_span();
462 let _entered = span.enter();
463 match cmd {
464 Command::SyncDb(cmd) => {
465 trace!("{cmd:?}");
466 self.db().send(cmd.into()).await.ok();
467 }
468 Command::WaitIdle(cmd) => {
469 trace!("{cmd:?}");
470 if self.tasks.is_empty() {
471 cmd.tx.send(()).await.ok();
473 } else {
474 self.idle_waiters.push(cmd.tx);
476 }
477 }
478 Command::Shutdown(cmd) => {
479 trace!("{cmd:?}");
480 self.db().send(cmd.into()).await.ok();
481 }
482 Command::CreateTag(cmd) => {
483 trace!("{cmd:?}");
484 self.db().send(cmd.into()).await.ok();
485 }
486 Command::SetTag(cmd) => {
487 trace!("{cmd:?}");
488 self.db().send(cmd.into()).await.ok();
489 }
490 Command::ListTags(cmd) => {
491 trace!("{cmd:?}");
492 self.db().send(cmd.into()).await.ok();
493 }
494 Command::DeleteTags(cmd) => {
495 trace!("{cmd:?}");
496 self.db().send(cmd.into()).await.ok();
497 }
498 Command::RenameTag(cmd) => {
499 trace!("{cmd:?}");
500 self.db().send(cmd.into()).await.ok();
501 }
502 Command::ClearProtected(cmd) => {
503 trace!("{cmd:?}");
504 self.db().send(cmd.into()).await.ok();
505 }
506 Command::BlobStatus(cmd) => {
507 trace!("{cmd:?}");
508 self.db().send(cmd.into()).await.ok();
509 }
510 Command::DeleteBlobs(cmd) => {
511 trace!("{cmd:?}");
512 self.db().send(cmd.into()).await.ok();
513 }
514 Command::ListBlobs(cmd) => {
515 trace!("{cmd:?}");
516 if let Ok(snapshot) = self.db().snapshot(cmd.span.clone()).await {
517 self.spawn(list_blobs(snapshot, cmd));
518 }
519 }
520 Command::Batch(cmd) => {
521 trace!("{cmd:?}");
522 let (id, scope) = self.temp_tags.create_scope();
523 self.spawn(handle_batch(cmd, id, scope, self.context()));
524 }
525 Command::CreateTempTag(cmd) => {
526 trace!("{cmd:?}");
527 self.create_temp_tag(cmd).await;
528 }
529 Command::ListTempTags(cmd) => {
530 trace!("{cmd:?}");
531 let tts = self.temp_tags.list();
532 cmd.tx.send(tts).await.ok();
533 }
534 Command::ImportBytes(cmd) => {
535 trace!("{cmd:?}");
536 self.spawn(import_bytes(cmd, self.context()));
537 }
538 Command::ImportByteStream(cmd) => {
539 trace!("{cmd:?}");
540 self.spawn(import_byte_stream(cmd, self.context()));
541 }
542 Command::ImportPath(cmd) => {
543 trace!("{cmd:?}");
544 self.spawn(import_path(cmd, self.context()));
545 }
546 Command::ExportPath(cmd) => {
547 trace!("{cmd:?}");
548 cmd.spawn(&mut self.handles, &mut self.tasks).await;
549 }
550 Command::ExportBao(cmd) => {
551 trace!("{cmd:?}");
552 cmd.spawn(&mut self.handles, &mut self.tasks).await;
553 }
554 Command::ExportRanges(cmd) => {
555 trace!("{cmd:?}");
556 cmd.spawn(&mut self.handles, &mut self.tasks).await;
557 }
558 Command::ImportBao(cmd) => {
559 trace!("{cmd:?}");
560 cmd.spawn(&mut self.handles, &mut self.tasks).await;
561 }
562 Command::Observe(cmd) => {
563 trace!("{cmd:?}");
564 cmd.spawn(&mut self.handles, &mut self.tasks).await;
565 }
566 }
567 }
568
569 async fn handle_fs_command(&mut self, cmd: InternalCommand) {
570 let span = cmd.parent_span();
571 let _entered = span.enter();
572 match cmd {
573 InternalCommand::Dump(cmd) => {
574 trace!("{cmd:?}");
575 self.db().send(cmd.into()).await.ok();
576 }
577 InternalCommand::ClearScope(cmd) => {
578 trace!("{cmd:?}");
579 self.temp_tags.end_scope(cmd.scope);
580 }
581 InternalCommand::FinishImport(cmd) => {
582 trace!("{cmd:?}");
583 if cmd.hash == Hash::EMPTY {
584 cmd.tx
585 .send(AddProgressItem::Done(TempTag::leaking_empty(cmd.format)))
586 .await
587 .ok();
588 } else {
589 let tt = self.temp_tags.create(
590 cmd.scope,
591 HashAndFormat {
592 hash: cmd.hash,
593 format: cmd.format,
594 },
595 );
596 (tt, cmd).spawn(&mut self.handles, &mut self.tasks).await;
597 }
598 }
599 }
600 }
601
602 async fn run(mut self) {
603 loop {
604 tokio::select! {
605 task = self.handles.tick() => {
606 if let Some(task) = task {
607 self.spawn(task);
608 }
609 }
610 cmd = self.cmd_rx.recv() => {
611 let Some(cmd) = cmd else {
612 break;
613 };
614 self.handle_command(cmd).await;
615 }
616 Some(cmd) = self.fs_cmd_rx.recv() => {
617 self.handle_fs_command(cmd).await;
618 }
619 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
620 Self::log_task_result(res);
621 if self.tasks.is_empty() {
622 for tx in self.idle_waiters.drain(..) {
623 tx.send(()).await.ok();
624 }
625 }
626 }
627 }
628 }
629 self.handles.shutdown().await;
630 while let Some(res) = self.tasks.join_next().await {
631 Self::log_task_result(res);
632 }
633 }
634
635 async fn new(
636 db_path: PathBuf,
637 rt: RtWrapper,
638 cmd_rx: tokio::sync::mpsc::Receiver<Command>,
639 fs_commands_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
640 fs_commands_tx: tokio::sync::mpsc::Sender<InternalCommand>,
641 options: Arc<Options>,
642 ) -> anyhow::Result<Self> {
643 trace!(
644 "creating data directory: {}",
645 options.path.data_path.display()
646 );
647 fs::create_dir_all(&options.path.data_path)?;
648 trace!(
649 "creating temp directory: {}",
650 options.path.temp_path.display()
651 );
652 fs::create_dir_all(&options.path.temp_path)?;
653 trace!(
654 "creating parent directory for db file{}",
655 db_path.parent().unwrap().display()
656 );
657 fs::create_dir_all(db_path.parent().unwrap())?;
658 let (db_send, db_recv) = tokio::sync::mpsc::channel(100);
659 let (protect, ds) = delete_set::pair(Arc::new(options.path.clone()));
660 let db_actor = meta::Actor::new(db_path, db_recv, ds, options.batch.clone())?;
661 let slot_context = Arc::new(TaskContext {
662 options: options.clone(),
663 db: meta::Db::new(db_send),
664 internal_cmd_tx: fs_commands_tx,
665 protect,
666 });
667 rt.spawn(db_actor.run());
668 Ok(Self {
669 context: slot_context.clone(),
670 cmd_rx,
671 fs_cmd_rx: fs_commands_rx,
672 tasks: JoinSet::new(),
673 handles: EntityManagerState::new(slot_context, 1024, 32, 32, 2),
674 temp_tags: Default::default(),
675 idle_waiters: Vec::new(),
676 _rt: rt,
677 })
678 }
679}
680
681trait HashSpecificCommand: HashSpecific + Send + 'static {
682 fn handle(self, ctx: HashContext) -> impl Future<Output = ()> + Send + 'static;
684
685 fn on_error(self, arg: SpawnArg<EmParams>) -> impl Future<Output = ()> + Send + 'static;
688
689 async fn spawn(
690 self,
691 manager: &mut entity_manager::EntityManagerState<EmParams>,
692 tasks: &mut JoinSet<()>,
693 ) where
694 Self: Sized,
695 {
696 let span = tracing::Span::current();
697 let task = manager
698 .spawn(self.hash(), |arg| {
699 async move {
700 match arg {
701 SpawnArg::Active(state) => {
702 self.handle(state).await;
703 }
704 SpawnArg::Busy => {
705 self.on_error(arg).await;
706 }
707 SpawnArg::Dead => {
708 self.on_error(arg).await;
709 }
710 }
711 }
712 .instrument(span)
713 })
714 .await;
715 if let Some(task) = task {
716 tasks.spawn(task);
717 }
718 }
719}
720
721impl HashSpecificCommand for ObserveMsg {
722 async fn handle(self, ctx: HashContext) {
723 ctx.observe(self).await
724 }
725 async fn on_error(self, _arg: SpawnArg<EmParams>) {}
726}
727impl HashSpecificCommand for ExportPathMsg {
728 async fn handle(self, ctx: HashContext) {
729 ctx.export_path(self).await
730 }
731 async fn on_error(self, arg: SpawnArg<EmParams>) {
732 let err = match arg {
733 SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
734 SpawnArg::Dead => io::Error::other("entity is dead"),
735 _ => unreachable!(),
736 };
737 self.tx
738 .send(ExportProgressItem::Error(api::Error::Io(err)))
739 .await
740 .ok();
741 }
742}
743impl HashSpecificCommand for ExportBaoMsg {
744 async fn handle(self, ctx: HashContext) {
745 ctx.export_bao(self).await
746 }
747 async fn on_error(self, arg: SpawnArg<EmParams>) {
748 let err = match arg {
749 SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
750 SpawnArg::Dead => io::Error::other("entity is dead"),
751 _ => unreachable!(),
752 };
753 self.tx
754 .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(err)))
755 .await
756 .ok();
757 }
758}
759impl HashSpecificCommand for ExportRangesMsg {
760 async fn handle(self, ctx: HashContext) {
761 ctx.export_ranges(self).await
762 }
763 async fn on_error(self, arg: SpawnArg<EmParams>) {
764 let err = match arg {
765 SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
766 SpawnArg::Dead => io::Error::other("entity is dead"),
767 _ => unreachable!(),
768 };
769 self.tx
770 .send(ExportRangesItem::Error(api::Error::Io(err)))
771 .await
772 .ok();
773 }
774}
775impl HashSpecificCommand for ImportBaoMsg {
776 async fn handle(self, ctx: HashContext) {
777 ctx.import_bao(self).await
778 }
779 async fn on_error(self, arg: SpawnArg<EmParams>) {
780 let err = match arg {
781 SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
782 SpawnArg::Dead => io::Error::other("entity is dead"),
783 _ => unreachable!(),
784 };
785 self.tx.send(Err(api::Error::Io(err))).await.ok();
786 }
787}
788impl HashSpecific for (TempTag, ImportEntryMsg) {
789 fn hash(&self) -> Hash {
790 self.1.hash()
791 }
792}
793impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
794 async fn handle(self, ctx: HashContext) {
795 let (tt, cmd) = self;
796 ctx.finish_import(cmd, tt).await
797 }
798 async fn on_error(self, arg: SpawnArg<EmParams>) {
799 let err = match arg {
800 SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
801 SpawnArg::Dead => io::Error::other("entity is dead"),
802 _ => unreachable!(),
803 };
804 self.1.tx.send(AddProgressItem::Error(err)).await.ok();
805 }
806}
807
808struct RtWrapper(Option<tokio::runtime::Runtime>);
809
810impl From<tokio::runtime::Runtime> for RtWrapper {
811 fn from(rt: tokio::runtime::Runtime) -> Self {
812 Self(Some(rt))
813 }
814}
815
816impl fmt::Debug for RtWrapper {
817 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
818 ValueOrPoisioned(self.0.as_ref()).fmt(f)
819 }
820}
821
822impl Deref for RtWrapper {
823 type Target = tokio::runtime::Runtime;
824
825 fn deref(&self) -> &Self::Target {
826 self.0.as_ref().unwrap()
827 }
828}
829
830impl Drop for RtWrapper {
831 fn drop(&mut self) {
832 if let Some(rt) = self.0.take() {
833 trace!("dropping tokio runtime");
834 tokio::task::block_in_place(|| {
835 drop(rt);
836 });
837 trace!("dropped tokio runtime");
838 }
839 }
840}
841
842async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: Arc<TaskContext>) {
843 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
844 error!("batch failed: {cause}");
845 }
846 ctx.clear_scope(id).await;
847}
848
849async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
850 let BatchMsg { tx, mut rx, .. } = cmd;
851 trace!("created scope {}", id);
852 tx.send(id).await.map_err(api::Error::other)?;
853 while let Some(msg) = rx.recv().await? {
854 match msg {
855 BatchResponse::Drop(msg) => scope.on_drop(&msg),
856 BatchResponse::Ping => {}
857 }
858 }
859 Ok(())
860}
861
862trait EntityApi {
864 async fn import_bao(&self, cmd: ImportBaoMsg);
866 async fn finish_import(&self, cmd: ImportEntryMsg, tt: TempTag);
868 async fn observe(&self, cmd: ObserveMsg);
870 async fn export_ranges(&self, cmd: ExportRangesMsg);
872 async fn export_bao(&self, cmd: ExportBaoMsg);
874 async fn export_path(&self, cmd: ExportPathMsg);
876 async fn persist(&self);
878}
879
880trait SyncEntityApi: EntityApi {
883 async fn load(&self);
887
888 fn data_reader(&self) -> impl ReadBytesAt;
890
891 fn outboard_reader(&self) -> impl ReadAt;
893
894 fn current_size(&self) -> io::Result<u64>;
896
897 fn bitfield(&self) -> io::Result<Bitfield>;
899
900 async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()>;
902}
903
904impl EntityApi for HashContext {
906 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
907 async fn import_bao(&self, cmd: ImportBaoMsg) {
908 trace!("{cmd:?}");
909 self.load().await;
910 let ImportBaoMsg {
911 inner: ImportBaoRequest { size, .. },
912 rx,
913 tx,
914 ..
915 } = cmd;
916 let res = import_bao_impl(self, size, rx).await;
917 trace!("{res:?}");
918 tx.send(res).await.ok();
919 }
920
921 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
922 async fn observe(&self, cmd: ObserveMsg) {
923 trace!("{cmd:?}");
924 self.load().await;
925 BaoFileStorageSubscriber::new(self.state.subscribe())
926 .forward(cmd.tx)
927 .await
928 .ok();
929 }
930
931 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
932 async fn export_ranges(&self, mut cmd: ExportRangesMsg) {
933 trace!("{cmd:?}");
934 self.load().await;
935 if let Err(cause) = export_ranges_impl(self, cmd.inner, &mut cmd.tx).await {
936 cmd.tx
937 .send(ExportRangesItem::Error(cause.into()))
938 .await
939 .ok();
940 }
941 }
942
943 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
944 async fn export_bao(&self, mut cmd: ExportBaoMsg) {
945 trace!("{cmd:?}");
946 self.load().await;
947 if let Err(cause) = export_bao_impl(self, cmd.inner, &mut cmd.tx).await {
948 cmd.tx
951 .send(bao_tree::io::EncodeError::Io(cause).into())
952 .await
953 .ok();
954 }
955 }
956
957 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
958 async fn export_path(&self, cmd: ExportPathMsg) {
959 trace!("{cmd:?}");
960 self.load().await;
961 let ExportPathMsg { inner, mut tx, .. } = cmd;
962 if let Err(cause) = export_path_impl(self, inner, &mut tx).await {
963 tx.send(cause.into()).await.ok();
964 }
965 }
966
967 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
968 async fn finish_import(&self, cmd: ImportEntryMsg, mut tt: TempTag) {
969 trace!("{cmd:?}");
970 self.load().await;
971 let res = match finish_import_impl(self, cmd.inner).await {
972 Ok(()) => {
973 if cmd.tx.is_rpc() {
976 trace!("leaking temp tag {}", tt.hash_and_format());
977 tt.leak();
978 }
979 AddProgressItem::Done(tt)
980 }
981 Err(cause) => AddProgressItem::Error(cause),
982 };
983 cmd.tx.send(res).await.ok();
984 }
985
986 #[instrument(skip_all, fields(hash = %self.id.fmt_short()))]
987 async fn persist(&self) {
988 self.state.send_if_modified(|guard| {
989 let hash = &self.id;
990 let BaoFileStorage::Partial(fs) = guard.take() else {
991 return false;
992 };
993 let path = self.global.options.path.bitfield_path(hash);
994 trace!("writing bitfield for hash {} to {}", hash, path.display());
995 if let Err(cause) = fs.sync_all(&path) {
996 error!(
997 "failed to write bitfield for {} at {}: {:?}",
998 hash,
999 path.display(),
1000 cause
1001 );
1002 }
1003 false
1004 });
1005 }
1006}
1007
1008async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> {
1009 if ctx.id == Hash::EMPTY {
1010 return Ok(()); }
1012 let ImportEntry {
1013 source,
1014 hash,
1015 outboard,
1016 ..
1017 } = import_data;
1018 let options = ctx.options();
1019 match &source {
1020 ImportSource::Memory(data) => {
1021 debug_assert!(options.is_inlined_data(data.len() as u64));
1022 }
1023 ImportSource::External(_, _, size) => {
1024 debug_assert!(!options.is_inlined_data(*size));
1025 }
1026 ImportSource::TempFile(_, _, size) => {
1027 debug_assert!(!options.is_inlined_data(*size));
1028 }
1029 }
1030 ctx.load().await;
1031 let handle = &ctx.state;
1032 ctx.protect([BaoFilePart::Data, BaoFilePart::Outboard]);
1038 let data_location = match source {
1039 ImportSource::Memory(data) => DataLocation::Inline(data),
1040 ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size),
1041 ImportSource::TempFile(path, _file, size) => {
1042 let target = ctx.options().path.data_path(&hash);
1045 trace!(
1046 "moving temp file to owned data location: {} -> {}",
1047 path.display(),
1048 target.display()
1049 );
1050 if let Err(cause) = fs::rename(&path, &target) {
1051 error!(
1052 "failed to move temp file {} to owned data location {}: {cause}",
1053 path.display(),
1054 target.display()
1055 );
1056 }
1057 DataLocation::Owned(size)
1058 }
1059 };
1060 let outboard_location = match outboard {
1061 MemOrFile::Mem(bytes) if bytes.is_empty() => OutboardLocation::NotNeeded,
1062 MemOrFile::Mem(bytes) => OutboardLocation::Inline(bytes),
1063 MemOrFile::File(path) => {
1064 let target = ctx.options().path.outboard_path(&hash);
1066 trace!(
1067 "moving temp file to owned outboard location: {} -> {}",
1068 path.display(),
1069 target.display()
1070 );
1071 if let Err(cause) = fs::rename(&path, &target) {
1072 error!(
1073 "failed to move temp file {} to owned outboard location {}: {cause}",
1074 path.display(),
1075 target.display()
1076 );
1077 }
1078 OutboardLocation::Owned
1079 }
1080 };
1081 let data = match &data_location {
1082 DataLocation::Inline(data) => MemOrFile::Mem(data.clone()),
1083 DataLocation::Owned(size) => {
1084 let path = ctx.options().path.data_path(&hash);
1085 let file = fs::File::open(&path)?;
1086 MemOrFile::File(FixedSize::new(file, *size))
1087 }
1088 DataLocation::External(paths, size) => {
1089 let Some(path) = paths.iter().next() else {
1090 return Err(io::Error::other("no external data path"));
1091 };
1092 let file = fs::File::open(path)?;
1093 MemOrFile::File(FixedSize::new(file, *size))
1094 }
1095 };
1096 let outboard = match &outboard_location {
1097 OutboardLocation::NotNeeded => MemOrFile::empty(),
1098 OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()),
1099 OutboardLocation::Owned => {
1100 let path = ctx.options().path.outboard_path(&hash);
1101 let file = fs::File::open(&path)?;
1102 MemOrFile::File(file)
1103 }
1104 };
1105 handle.complete(data, outboard);
1106 let state = EntryState::Complete {
1107 data_location,
1108 outboard_location,
1109 };
1110 ctx.update_await(state).await?;
1111 Ok(())
1112}
1113
1114fn chunk_range(leaf: &Leaf) -> ChunkRanges {
1115 let start = ChunkNum::chunks(leaf.offset);
1116 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
1117 (start..end).into()
1118}
1119
1120async fn import_bao_impl(
1121 ctx: &HashContext,
1122 size: NonZeroU64,
1123 mut rx: mpsc::Receiver<BaoContentItem>,
1124) -> api::Result<()> {
1125 trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size);
1126 let mut batch = Vec::<BaoContentItem>::new();
1127 let mut ranges = ChunkRanges::empty();
1128 while let Some(item) = rx.recv().await? {
1129 if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
1131 let bitfield = Bitfield::new_unchecked(ranges, size.into());
1132 ctx.write_batch(&batch, &bitfield).await?;
1133 batch.clear();
1134 ranges = ChunkRanges::empty();
1135 }
1136 if let BaoContentItem::Leaf(leaf) = &item {
1137 let leaf_range = chunk_range(leaf);
1138 if is_validated(size, &leaf_range) && size.get() != leaf.offset + leaf.data.len() as u64
1139 {
1140 return Err(api::Error::io(io::ErrorKind::InvalidData, "invalid size"));
1141 }
1142 ranges |= leaf_range;
1143 }
1144 batch.push(item);
1145 }
1146 if !batch.is_empty() {
1147 let bitfield = Bitfield::new_unchecked(ranges, size.into());
1148 ctx.write_batch(&batch, &bitfield).await?;
1149 }
1150 Ok(())
1151}
1152
1153async fn export_ranges_impl(
1154 ctx: &HashContext,
1155 cmd: ExportRangesRequest,
1156 tx: &mut mpsc::Sender<ExportRangesItem>,
1157) -> io::Result<()> {
1158 let ExportRangesRequest { ranges, hash } = cmd;
1159 trace!(
1160 "exporting ranges: {hash} {ranges:?} size={}",
1161 ctx.current_size()?
1162 );
1163 let bitfield = ctx.bitfield()?;
1164 let data = ctx.data_reader();
1165 let size = bitfield.size();
1166 for range in ranges.iter() {
1167 let range = match range {
1168 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
1169 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
1170 };
1171 let requested = ChunkRanges::bytes(range.start..range.end);
1172 if !bitfield.ranges.is_superset(&requested) {
1173 return Err(io::Error::other(format!(
1174 "missing range: {requested:?}, present: {bitfield:?}",
1175 )));
1176 }
1177 let bs = 1024;
1178 let mut offset = range.start;
1179 loop {
1180 let end: u64 = (offset + bs).min(range.end);
1181 let size = (end - offset) as usize;
1182 let res = data.read_bytes_at(offset, size);
1183 tx.send(ExportRangesItem::Data(Leaf { offset, data: res? }))
1184 .await?;
1185 offset = end;
1186 if offset >= range.end {
1187 break;
1188 }
1189 }
1190 }
1191 Ok(())
1192}
1193
1194async fn export_bao_impl(
1195 ctx: &HashContext,
1196 cmd: ExportBaoRequest,
1197 tx: &mut mpsc::Sender<EncodedItem>,
1198) -> io::Result<()> {
1199 let ExportBaoRequest { ranges, hash, .. } = cmd;
1200 let outboard = ctx.outboard()?;
1201 let size = outboard.tree.size();
1202 if size == 0 && cmd.hash != Hash::EMPTY {
1203 return Ok(());
1205 }
1206 trace!("exporting bao: {hash} {ranges:?} size={size}",);
1207 let data = ctx.data_reader();
1208 let tx = BaoTreeSender::new(tx);
1209 traverse_ranges_validated(data, outboard, &ranges, tx).await?;
1210 Ok(())
1211}
1212
1213async fn export_path_impl(
1214 ctx: &HashContext,
1215 cmd: ExportPathRequest,
1216 tx: &mut mpsc::Sender<ExportProgressItem>,
1217) -> api::Result<()> {
1218 let ExportPathRequest { mode, target, .. } = cmd;
1219 if !target.is_absolute() {
1220 return Err(api::Error::io(
1221 io::ErrorKind::InvalidInput,
1222 "path is not absolute",
1223 ));
1224 }
1225 if let Some(parent) = target.parent() {
1226 fs::create_dir_all(parent)?;
1227 }
1228 let state = ctx.get_entry_state().await?;
1229 let (data_location, outboard_location) = match state {
1230 Some(EntryState::Complete {
1231 data_location,
1232 outboard_location,
1233 }) => (data_location, outboard_location),
1234 Some(EntryState::Partial { .. }) => {
1235 return Err(api::Error::io(
1236 io::ErrorKind::InvalidInput,
1237 "cannot export partial entry",
1238 ));
1239 }
1240 None => {
1241 return Err(api::Error::io(io::ErrorKind::NotFound, "no entry found"));
1242 }
1243 };
1244 trace!("exporting {} to {}", cmd.hash.to_hex(), target.display());
1245 let (data, mut external) = match data_location {
1246 DataLocation::Inline(data) => (MemOrFile::Mem(data), vec![]),
1247 DataLocation::Owned(size) => (
1248 MemOrFile::File((ctx.options().path.data_path(&cmd.hash), size)),
1249 vec![],
1250 ),
1251 DataLocation::External(paths, size) => (
1252 MemOrFile::File((
1253 paths.first().cloned().ok_or_else(|| {
1254 io::Error::new(io::ErrorKind::NotFound, "no external data path")
1255 })?,
1256 size,
1257 )),
1258 paths,
1259 ),
1260 };
1261 let size = match &data {
1262 MemOrFile::Mem(data) => data.len() as u64,
1263 MemOrFile::File((_, size)) => *size,
1264 };
1265 tx.send(ExportProgressItem::Size(size))
1266 .await
1267 .map_err(api::Error::other)?;
1268 match data {
1269 MemOrFile::Mem(data) => {
1270 let mut target = fs::File::create(&target)?;
1271 target.write_all(&data)?;
1272 }
1273 MemOrFile::File((source_path, size)) => match mode {
1274 ExportMode::Copy => {
1275 let res = reflink_or_copy_with_progress(&source_path, &target, size, tx).await?;
1276 trace!(
1277 "exported {} to {}, {res:?}",
1278 source_path.display(),
1279 target.display()
1280 );
1281 }
1282 ExportMode::TryReference => {
1283 if !external.is_empty() {
1284 let res =
1287 reflink_or_copy_with_progress(&source_path, &target, size, tx).await?;
1288 trace!(
1289 "exported {} also to {}, {res:?}",
1290 source_path.display(),
1291 target.display()
1292 );
1293 external.push(target);
1294 external.sort();
1295 external.dedup();
1296 external.truncate(MAX_EXTERNAL_PATHS);
1297 } else {
1298 match std::fs::rename(&source_path, &target) {
1301 Ok(()) => {}
1302 Err(cause) => {
1303 const ERR_CROSS: i32 = 18;
1304 if cause.raw_os_error() == Some(ERR_CROSS) {
1305 reflink_or_copy_with_progress(&source_path, &target, size, tx)
1306 .await?;
1307 } else {
1308 return Err(cause.into());
1309 }
1310 }
1311 }
1312 external.push(target);
1313 };
1314 ctx.set(EntryState::Complete {
1316 data_location: DataLocation::External(external, size),
1317 outboard_location,
1318 })
1319 .await?;
1320 }
1321 },
1322 }
1323 tx.send(ExportProgressItem::Done)
1324 .await
1325 .map_err(api::Error::other)?;
1326 Ok(())
1327}
1328
1329trait CopyProgress: RpcMessage {
1330 fn from_offset(offset: u64) -> Self;
1331}
1332
1333impl CopyProgress for ExportProgressItem {
1334 fn from_offset(offset: u64) -> Self {
1335 ExportProgressItem::CopyProgress(offset)
1336 }
1337}
1338
1339impl CopyProgress for AddProgressItem {
1340 fn from_offset(offset: u64) -> Self {
1341 AddProgressItem::CopyProgress(offset)
1342 }
1343}
1344
1345#[derive(Debug)]
1346enum CopyResult {
1347 Reflinked,
1348 Copied,
1349}
1350
1351async fn reflink_or_copy_with_progress(
1352 from: impl AsRef<Path>,
1353 to: impl AsRef<Path>,
1354 size: u64,
1355 tx: &mut mpsc::Sender<impl CopyProgress>,
1356) -> io::Result<CopyResult> {
1357 let from = from.as_ref();
1358 let to = to.as_ref();
1359 if reflink_copy::reflink(from, to).is_ok() {
1360 return Ok(CopyResult::Reflinked);
1361 }
1362 let source = fs::File::open(from)?;
1363 let mut target = fs::File::create(to)?;
1364 copy_with_progress(source, size, &mut target, tx).await?;
1365 Ok(CopyResult::Copied)
1366}
1367
1368async fn copy_with_progress<T: CopyProgress>(
1369 file: impl ReadAt,
1370 size: u64,
1371 target: &mut impl Write,
1372 tx: &mut mpsc::Sender<T>,
1373) -> io::Result<()> {
1374 let mut offset = 0;
1375 let mut buf = vec![0u8; 1024 * 1024];
1376 while offset < size {
1377 let remaining = buf.len().min((size - offset) as usize);
1378 let buf: &mut [u8] = &mut buf[..remaining];
1379 file.read_exact_at(offset, buf)?;
1380 target.write_all(buf)?;
1381 tx.try_send(T::from_offset(offset))
1382 .await
1383 .map_err(|_e| io::Error::other(""))?;
1384 yield_now().await;
1385 offset += buf.len() as u64;
1386 }
1387 Ok(())
1388}
1389
1390impl FsStore {
1391 pub async fn load(root: impl AsRef<Path>) -> anyhow::Result<Self> {
1393 let path = root.as_ref();
1394 let db_path = path.join("blobs.db");
1395 let options = Options::new(path);
1396 Self::load_with_opts(db_path, options).await
1397 }
1398
1399 pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result<FsStore> {
1401 static THREAD_NR: AtomicU64 = AtomicU64::new(0);
1402 let rt = tokio::runtime::Builder::new_multi_thread()
1403 .thread_name_fn(|| {
1404 format!(
1405 "iroh-blob-store-{}",
1406 THREAD_NR.fetch_add(1, Ordering::Relaxed)
1407 )
1408 })
1409 .enable_time()
1410 .build()?;
1411 let handle = rt.handle().clone();
1412 let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(100);
1413 let (fs_commands_tx, fs_commands_rx) = tokio::sync::mpsc::channel(100);
1414 let gc_config = options.gc.clone();
1415 let actor = handle
1416 .spawn(Actor::new(
1417 db_path,
1418 rt.into(),
1419 commands_rx,
1420 fs_commands_rx,
1421 fs_commands_tx.clone(),
1422 Arc::new(options),
1423 ))
1424 .await??;
1425 handle.spawn(actor.run());
1426 let store = FsStore::new(commands_tx.into(), fs_commands_tx);
1427 if let Some(config) = gc_config {
1428 handle.spawn(run_gc(store.deref().clone(), config));
1429 }
1430 Ok(store)
1431 }
1432}
1433
1434#[derive(Debug, Clone)]
1444pub struct FsStore {
1445 sender: ApiClient,
1446 db: tokio::sync::mpsc::Sender<InternalCommand>,
1447}
1448
1449impl From<FsStore> for Store {
1450 fn from(value: FsStore) -> Self {
1451 Store::from_sender(value.sender)
1452 }
1453}
1454
1455impl Deref for FsStore {
1456 type Target = Store;
1457
1458 fn deref(&self) -> &Self::Target {
1459 Store::ref_from_sender(&self.sender)
1460 }
1461}
1462
1463impl AsRef<Store> for FsStore {
1464 fn as_ref(&self) -> &Store {
1465 self.deref()
1466 }
1467}
1468
1469impl FsStore {
1470 fn new(
1471 sender: irpc::LocalSender<proto::Request>,
1472 db: tokio::sync::mpsc::Sender<InternalCommand>,
1473 ) -> Self {
1474 Self {
1475 sender: sender.into(),
1476 db,
1477 }
1478 }
1479
1480 pub async fn dump(&self) -> anyhow::Result<()> {
1481 let (tx, rx) = oneshot::channel();
1482 self.db
1483 .send(
1484 meta::Dump {
1485 tx,
1486 span: tracing::Span::current(),
1487 }
1488 .into(),
1489 )
1490 .await?;
1491 rx.await??;
1492 Ok(())
1493 }
1494}
1495
1496#[cfg(test)]
1497pub mod tests {
1498 use core::panic;
1499 use std::collections::{HashMap, HashSet};
1500
1501 use bao_tree::{
1502 io::{outboard::PreOrderMemOutboard, round_up_to_chunks_groups},
1503 ChunkRanges,
1504 };
1505 use n0_future::{stream, Stream, StreamExt};
1506 use testresult::TestResult;
1507 use walkdir::WalkDir;
1508
1509 use super::*;
1510 use crate::{
1511 api::blobs::Bitfield,
1512 store::{
1513 util::{read_checksummed, SliceInfoExt, Tag},
1514 IROH_BLOCK_SIZE,
1515 },
1516 };
1517
1518 pub const INTERESTING_SIZES: [usize; 8] = [
1520 0, 1, 1024, 1024 * 16 - 1, 1024 * 16, 1024 * 16 + 1, 1024 * 1024, 1024 * 1024 * 8, ];
1529
1530 pub fn create_n0_bao(data: &[u8], ranges: &ChunkRanges) -> anyhow::Result<(Hash, Vec<u8>)> {
1533 let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
1534 let mut encoded = Vec::new();
1535 let size = data.len() as u64;
1536 encoded.extend_from_slice(&size.to_le_bytes());
1537 bao_tree::io::sync::encode_ranges_validated(data, &outboard, ranges, &mut encoded)?;
1538 Ok((outboard.root.into(), encoded))
1539 }
1540
1541 pub fn round_up_request(size: u64, ranges: &ChunkRanges) -> ChunkRanges {
1542 let last_chunk = ChunkNum::chunks(size);
1543 let data_range = ChunkRanges::from(..last_chunk);
1544 let ranges = if !data_range.intersects(ranges) && !ranges.is_empty() {
1545 if last_chunk == 0 {
1546 ChunkRanges::all()
1547 } else {
1548 ChunkRanges::from(last_chunk - 1..)
1549 }
1550 } else {
1551 ranges.clone()
1552 };
1553 round_up_to_chunks_groups(ranges, IROH_BLOCK_SIZE)
1554 }
1555
1556 fn create_n0_bao_full(
1557 data: &[u8],
1558 ranges: &ChunkRanges,
1559 ) -> anyhow::Result<(Hash, ChunkRanges, Vec<u8>)> {
1560 let ranges = round_up_request(data.len() as u64, ranges);
1561 let (hash, encoded) = create_n0_bao(data, &ranges)?;
1562 Ok((hash, ranges, encoded))
1563 }
1564
1565 #[tokio::test]
1566 async fn test_observe() -> TestResult<()> {
1568 tracing_subscriber::fmt::try_init().ok();
1569 let testdir = tempfile::tempdir()?;
1570 let db_dir = testdir.path().join("db");
1571 let options = Options::new(&db_dir);
1572 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options).await?;
1573 let sizes = INTERESTING_SIZES;
1574 for size in sizes {
1575 let data = test_data(size);
1576 let ranges = ChunkRanges::all();
1577 let (hash, bao) = create_n0_bao(&data, &ranges)?;
1578 let obs = store.observe(hash);
1579 let task = tokio::spawn(async move {
1580 obs.await_completion().await?;
1581 api::Result::Ok(())
1582 });
1583 store.import_bao_bytes(hash, ranges, bao).await?;
1584 task.await??;
1585 }
1586 Ok(())
1587 }
1588
1589 pub fn test_data(n: usize) -> Bytes {
1595 let mut res = Vec::with_capacity(n);
1596 for i in 0..n {
1598 let block_num = i / 1024;
1600 let ascii_val = 65 + (block_num % 26) as u8;
1602 res.push(ascii_val);
1603 }
1604 Bytes::from(res)
1605 }
1606
1607 #[tokio::test]
1609 async fn test_import_byte_stream() -> TestResult<()> {
1610 tracing_subscriber::fmt::try_init().ok();
1611 let testdir = tempfile::tempdir()?;
1612 let db_dir = testdir.path().join("db");
1613 let store = FsStore::load(db_dir).await?;
1614 for size in INTERESTING_SIZES {
1615 let expected = test_data(size);
1616 let expected_hash = Hash::new(&expected);
1617 let stream = bytes_to_stream(expected.clone(), 1023);
1618 let obs = store.observe(expected_hash);
1619 let tt = store.add_stream(stream).await.temp_tag().await?;
1620 assert_eq!(expected_hash, *tt.hash());
1621 obs.await_completion().await?;
1623 let actual = store.get_bytes(expected_hash).await?;
1624 assert_eq!(&expected, &actual);
1626 }
1627 Ok(())
1628 }
1629
1630 #[tokio::test]
1632 async fn test_import_bytes_simple() -> TestResult<()> {
1633 tracing_subscriber::fmt::try_init().ok();
1634 let testdir = tempfile::tempdir()?;
1635 let db_dir = testdir.path().join("db");
1636 let store = FsStore::load(&db_dir).await?;
1637 let sizes = INTERESTING_SIZES;
1638 trace!("{}", Options::new(&db_dir).is_inlined_data(16385));
1639 for size in sizes {
1640 let expected = test_data(size);
1641 let expected_hash = Hash::new(&expected);
1642 let obs = store.observe(expected_hash);
1643 let tt = store.add_bytes(expected.clone()).await?;
1644 assert_eq!(expected_hash, tt.hash);
1645 obs.await_completion().await?;
1647 let actual = store.get_bytes(expected_hash).await?;
1648 assert_eq!(&expected, &actual);
1650 }
1651 store.shutdown().await?;
1652 dump_dir_full(db_dir)?;
1653 Ok(())
1654 }
1655
1656 #[tokio::test]
1658 #[ignore = "flaky. I need a reliable way to keep the handle alive"]
1659 async fn test_roundtrip_bytes_small() -> TestResult<()> {
1660 tracing_subscriber::fmt::try_init().ok();
1661 let testdir = tempfile::tempdir()?;
1662 let db_dir = testdir.path().join("db");
1663 let store = FsStore::load(db_dir).await?;
1664 for size in INTERESTING_SIZES
1665 .into_iter()
1666 .filter(|x| *x != 0 && *x <= IROH_BLOCK_SIZE.bytes())
1667 {
1668 let expected = test_data(size);
1669 let expected_hash = Hash::new(&expected);
1670 let obs = store.observe(expected_hash);
1671 let tt = store.add_bytes(expected.clone()).await?;
1672 assert_eq!(expected_hash, tt.hash);
1673 let actual = store.get_bytes(expected_hash).await?;
1674 assert_eq!(&expected, &actual);
1676 assert_eq!(
1677 &expected.addr(),
1678 &actual.addr(),
1679 "address mismatch for size {size}"
1680 );
1681 obs.await_completion().await?;
1685 }
1686 store.shutdown().await?;
1687 Ok(())
1688 }
1689
1690 #[tokio::test]
1692 async fn test_import_path() -> TestResult<()> {
1693 tracing_subscriber::fmt::try_init().ok();
1694 let testdir = tempfile::tempdir()?;
1695 let db_dir = testdir.path().join("db");
1696 let store = FsStore::load(db_dir).await?;
1697 for size in INTERESTING_SIZES {
1698 let expected = test_data(size);
1699 let expected_hash = Hash::new(&expected);
1700 let path = testdir.path().join(format!("in-{size}"));
1701 fs::write(&path, &expected)?;
1702 let obs = store.observe(expected_hash);
1703 let tt = store.add_path(&path).await?;
1704 assert_eq!(expected_hash, tt.hash);
1705 obs.await_completion().await?;
1707 let actual = store.get_bytes(expected_hash).await?;
1708 assert_eq!(&expected, &actual, "size={size}");
1710 }
1711 dump_dir_full(testdir.path())?;
1712 Ok(())
1713 }
1714
1715 #[tokio::test]
1717 async fn test_export_path() -> TestResult<()> {
1718 tracing_subscriber::fmt::try_init().ok();
1719 let testdir = tempfile::tempdir()?;
1720 let db_dir = testdir.path().join("db");
1721 let store = FsStore::load(db_dir).await?;
1722 for size in INTERESTING_SIZES {
1723 let expected = test_data(size);
1724 let expected_hash = Hash::new(&expected);
1725 let tt = store.add_bytes(expected.clone()).await?;
1726 assert_eq!(expected_hash, tt.hash);
1727 let out_path = testdir.path().join(format!("out-{size}"));
1728 store.export(expected_hash, &out_path).await?;
1729 let actual = fs::read(&out_path)?;
1730 assert_eq!(expected, actual);
1731 }
1732 Ok(())
1733 }
1734
1735 #[tokio::test]
1736 async fn test_import_bao_ranges() -> TestResult<()> {
1737 tracing_subscriber::fmt::try_init().ok();
1738 let testdir = tempfile::tempdir()?;
1739 let db_dir = testdir.path().join("db");
1740 {
1741 let store = FsStore::load(&db_dir).await?;
1742 let data = test_data(100000);
1743 let ranges = ChunkRanges::chunks(16..32);
1744 let (hash, bao) = create_n0_bao(&data, &ranges)?;
1745 store
1746 .import_bao_bytes(hash, ranges.clone(), bao.clone())
1747 .await?;
1748 let bitfield = store.observe(hash).await?;
1749 assert_eq!(bitfield.ranges, ranges);
1750 assert_eq!(bitfield.size(), data.len() as u64);
1751 let export = store.export_bao(hash, ranges).bao_to_vec().await?;
1752 assert_eq!(export, bao);
1753 }
1754 Ok(())
1755 }
1756
1757 #[tokio::test]
1758 async fn test_import_bao_minimal() -> TestResult<()> {
1759 tracing_subscriber::fmt::try_init().ok();
1760 let testdir = tempfile::tempdir()?;
1761 let sizes = [1];
1762 let db_dir = testdir.path().join("db");
1763 {
1764 let store = FsStore::load(&db_dir).await?;
1765 for size in sizes {
1766 let data = vec![0u8; size];
1767 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1768 let data = Bytes::from(encoded);
1769 store
1770 .import_bao_bytes(hash, ChunkRanges::all(), data)
1771 .await?;
1772 }
1773 store.shutdown().await?;
1774 }
1775 Ok(())
1776 }
1777
1778 #[tokio::test]
1779 async fn test_import_bao_simple() -> TestResult<()> {
1780 tracing_subscriber::fmt::try_init().ok();
1781 let testdir = tempfile::tempdir()?;
1782 let sizes = [1048576];
1783 let db_dir = testdir.path().join("db");
1784 {
1785 let store = FsStore::load(&db_dir).await?;
1786 for size in sizes {
1787 let data = vec![0u8; size];
1788 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1789 let data = Bytes::from(encoded);
1790 trace!("importing size={}", size);
1791 store
1792 .import_bao_bytes(hash, ChunkRanges::all(), data)
1793 .await?;
1794 }
1795 store.shutdown().await?;
1796 }
1797 Ok(())
1798 }
1799
1800 #[tokio::test]
1801 async fn test_import_bao_persistence_full() -> TestResult<()> {
1802 tracing_subscriber::fmt::try_init().ok();
1803 let testdir = tempfile::tempdir()?;
1804 let sizes = INTERESTING_SIZES;
1805 let db_dir = testdir.path().join("db");
1806 {
1807 let store = FsStore::load(&db_dir).await?;
1808 for size in sizes {
1809 let data = vec![0u8; size];
1810 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1811 let data = Bytes::from(encoded);
1812 store
1813 .import_bao_bytes(hash, ChunkRanges::all(), data)
1814 .await?;
1815 }
1816 store.shutdown().await?;
1817 }
1818 {
1819 let store = FsStore::load(&db_dir).await?;
1820 for size in sizes {
1821 let expected = vec![0u8; size];
1822 let hash = Hash::new(&expected);
1823 let actual = store
1824 .export_bao(hash, ChunkRanges::all())
1825 .data_to_vec()
1826 .await?;
1827 assert_eq!(&expected, &actual);
1828 }
1829 store.shutdown().await?;
1830 }
1831 Ok(())
1832 }
1833
1834 #[tokio::test]
1835 async fn test_import_bao_persistence_just_size() -> TestResult<()> {
1836 tracing_subscriber::fmt::try_init().ok();
1837 let testdir = tempfile::tempdir()?;
1838 let sizes = INTERESTING_SIZES;
1839 let db_dir = testdir.path().join("db");
1840 let just_size = ChunkRanges::last_chunk();
1841 {
1842 let store = FsStore::load(&db_dir).await?;
1843 for size in sizes {
1844 let data = test_data(size);
1845 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1846 let data = Bytes::from(encoded);
1847 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1848 panic!("failed to import size={size}: {cause}");
1849 }
1850 }
1851 store.dump().await?;
1852 store.shutdown().await?;
1853 }
1854 {
1855 let store = FsStore::load(&db_dir).await?;
1856 store.dump().await?;
1857 for size in sizes {
1858 let data = test_data(size);
1859 let (hash, ranges, expected) = create_n0_bao_full(&data, &just_size)?;
1860 let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1861 Ok(actual) => actual,
1862 Err(cause) => panic!("failed to export size={size}: {cause}"),
1863 };
1864 assert_eq!(&expected, &actual);
1865 }
1866 store.shutdown().await?;
1867 }
1868 dump_dir_full(testdir.path())?;
1869 Ok(())
1870 }
1871
1872 #[tokio::test]
1873 async fn test_import_bao_persistence_two_stages() -> TestResult<()> {
1874 tracing_subscriber::fmt::try_init().ok();
1875 let testdir = tempfile::tempdir()?;
1876 let sizes = INTERESTING_SIZES;
1877 let db_dir = testdir.path().join("db");
1878 let just_size = ChunkRanges::last_chunk();
1879 {
1881 let store = FsStore::load(&db_dir).await?;
1882 for size in sizes {
1883 let data = test_data(size);
1884 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1885 let data = Bytes::from(encoded);
1886 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1887 panic!("failed to import size={size}: {cause}");
1888 }
1889 }
1890 store.dump().await?;
1891 store.shutdown().await?;
1892 }
1893 dump_dir_full(testdir.path())?;
1894 {
1896 let store = FsStore::load(&db_dir).await?;
1897 for size in sizes {
1898 let remaining = ChunkRanges::all() - round_up_request(size as u64, &just_size);
1899 if remaining.is_empty() {
1900 continue;
1901 }
1902 let data = test_data(size);
1903 let (hash, ranges, encoded) = create_n0_bao_full(&data, &remaining)?;
1904 let data = Bytes::from(encoded);
1905 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1906 panic!("failed to import size={size}: {cause}");
1907 }
1908 }
1909 store.dump().await?;
1910 store.shutdown().await?;
1911 }
1912 {
1914 let store = FsStore::load(&db_dir).await?;
1915 store.dump().await?;
1916 for size in sizes {
1917 let data = test_data(size);
1918 let (hash, ranges, expected) = create_n0_bao_full(&data, &ChunkRanges::all())?;
1919 let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1920 Ok(actual) => actual,
1921 Err(cause) => panic!("failed to export size={size}: {cause}"),
1922 };
1923 assert_eq!(&expected, &actual);
1924 }
1925 store.dump().await?;
1926 store.shutdown().await?;
1927 }
1928 dump_dir_full(testdir.path())?;
1929 Ok(())
1930 }
1931
1932 fn just_size() -> ChunkRanges {
1933 ChunkRanges::last_chunk()
1934 }
1935
1936 #[tokio::test]
1937 async fn test_import_bao_persistence_observe() -> TestResult<()> {
1938 tracing_subscriber::fmt::try_init().ok();
1939 let testdir = tempfile::tempdir()?;
1940 let sizes = INTERESTING_SIZES;
1941 let db_dir = testdir.path().join("db");
1942 let just_size = just_size();
1943 {
1945 let store = FsStore::load(&db_dir).await?;
1946 for size in sizes {
1947 let data = test_data(size);
1948 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1949 let data = Bytes::from(encoded);
1950 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1951 panic!("failed to import size={size}: {cause}");
1952 }
1953 }
1954 store.dump().await?;
1955 store.shutdown().await?;
1956 }
1957 dump_dir_full(testdir.path())?;
1958 {
1960 let store = FsStore::load(&db_dir).await?;
1961 for size in sizes {
1962 let expected_ranges = round_up_request(size as u64, &just_size);
1963 let data = test_data(size);
1964 let hash = Hash::new(&data);
1965 let bitfield = store.observe(hash).await?;
1966 assert_eq!(bitfield.ranges, expected_ranges);
1967 }
1968 store.dump().await?;
1969 store.shutdown().await?;
1970 }
1971 Ok(())
1972 }
1973
1974 #[tokio::test]
1975 async fn test_import_bao_persistence_recover() -> TestResult<()> {
1976 tracing_subscriber::fmt::try_init().ok();
1977 let testdir = tempfile::tempdir()?;
1978 let sizes = INTERESTING_SIZES;
1979 let db_dir = testdir.path().join("db");
1980 let options = Options::new(&db_dir);
1981 let just_size = just_size();
1982 {
1984 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1985 for size in sizes {
1986 let data = test_data(size);
1987 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1988 let data = Bytes::from(encoded);
1989 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1990 panic!("failed to import size={size}: {cause}");
1991 }
1992 }
1993 store.dump().await?;
1994 store.shutdown().await?;
1995 }
1996 delete_rec(testdir.path(), "bitfield")?;
1997 dump_dir_full(testdir.path())?;
1998 {
2000 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
2001 for size in sizes {
2002 let expected_ranges = round_up_request(size as u64, &just_size);
2003 let data = test_data(size);
2004 let hash = Hash::new(&data);
2005 let bitfield = store.observe(hash).await?;
2006 assert_eq!(bitfield.ranges, expected_ranges, "size={size}");
2007 }
2008 store.dump().await?;
2009 store.shutdown().await?;
2010 }
2011 Ok(())
2012 }
2013
2014 #[tokio::test]
2015 async fn test_import_bytes_persistence_full() -> TestResult<()> {
2016 tracing_subscriber::fmt::try_init().ok();
2017 let testdir = tempfile::tempdir()?;
2018 let sizes = INTERESTING_SIZES;
2019 let db_dir = testdir.path().join("db");
2020 {
2021 let store = FsStore::load(&db_dir).await?;
2022 let mut tts = Vec::new();
2023 for size in sizes {
2024 let data = test_data(size);
2025 let data = data;
2026 tts.push(store.add_bytes(data.clone()).await?);
2027 }
2028 store.dump().await?;
2029 store.shutdown().await?;
2030 }
2031 {
2032 let store = FsStore::load(&db_dir).await?;
2033 store.dump().await?;
2034 for size in sizes {
2035 let expected = test_data(size);
2036 let hash = Hash::new(&expected);
2037 let Ok(actual) = store
2038 .export_bao(hash, ChunkRanges::all())
2039 .data_to_vec()
2040 .await
2041 else {
2042 panic!("failed to export size={size}");
2043 };
2044 assert_eq!(&expected, &actual, "size={size}");
2045 }
2046 store.shutdown().await?;
2047 }
2048 Ok(())
2049 }
2050
2051 async fn test_batch(store: &Store) -> TestResult<()> {
2052 let batch = store.blobs().batch().await?;
2053 let tt1 = batch.temp_tag(Hash::new("foo")).await?;
2054 let tt2 = batch.add_slice("boo").await?;
2055 let tts = store
2056 .tags()
2057 .list_temp_tags()
2058 .await?
2059 .collect::<HashSet<_>>()
2060 .await;
2061 assert!(tts.contains(tt1.hash_and_format()));
2062 assert!(tts.contains(tt2.hash_and_format()));
2063 drop(batch);
2064 store.sync_db().await?;
2065 store.wait_idle().await?;
2066 let tts = store
2067 .tags()
2068 .list_temp_tags()
2069 .await?
2070 .collect::<HashSet<_>>()
2071 .await;
2072 assert!(!tts.contains(tt1.hash_and_format()));
2074 assert!(!tts.contains(tt2.hash_and_format()));
2075 drop(tt1);
2076 drop(tt2);
2077 Ok(())
2078 }
2079
2080 #[tokio::test]
2081 async fn test_batch_fs() -> TestResult<()> {
2082 tracing_subscriber::fmt::try_init().ok();
2083 let testdir = tempfile::tempdir()?;
2084 let db_dir = testdir.path().join("db");
2085 let store = FsStore::load(db_dir).await?;
2086 test_batch(&store).await
2087 }
2088
2089 #[tokio::test]
2090 async fn smoke() -> TestResult<()> {
2091 tracing_subscriber::fmt::try_init().ok();
2092 let testdir = tempfile::tempdir()?;
2093 let db_dir = testdir.path().join("db");
2094 let store = FsStore::load(db_dir).await?;
2095 let haf = HashAndFormat::raw(Hash::from([0u8; 32]));
2096 store.tags().set(Tag::from("test"), haf).await?;
2097 store.tags().set(Tag::from("boo"), haf).await?;
2098 store.tags().set(Tag::from("bar"), haf).await?;
2099 let sizes = INTERESTING_SIZES;
2100 let mut hashes = Vec::new();
2101 let mut data_by_hash = HashMap::new();
2102 let mut bao_by_hash = HashMap::new();
2103 for size in sizes {
2104 let data = vec![0u8; size];
2105 let data = Bytes::from(data);
2106 let tt = store.add_bytes(data.clone()).temp_tag().await?;
2107 data_by_hash.insert(*tt.hash(), data);
2108 hashes.push(tt);
2109 }
2110 store.sync_db().await?;
2111 for tt in &hashes {
2112 let hash = *tt.hash();
2113 let path = testdir.path().join(format!("{hash}.txt"));
2114 store.export(hash, path).await?;
2115 }
2116 for tt in &hashes {
2117 let hash = tt.hash();
2118 let data = store
2119 .export_bao(*hash, ChunkRanges::all())
2120 .data_to_vec()
2121 .await
2122 .unwrap();
2123 assert_eq!(data, data_by_hash[hash].to_vec());
2124 let bao = store
2125 .export_bao(*hash, ChunkRanges::all())
2126 .bao_to_vec()
2127 .await
2128 .unwrap();
2129 bao_by_hash.insert(*hash, bao);
2130 }
2131 store.dump().await?;
2132
2133 for size in sizes {
2134 let data = test_data(size);
2135 let ranges = ChunkRanges::all();
2136 let (hash, bao) = create_n0_bao(&data, &ranges)?;
2137 store.import_bao_bytes(hash, ranges, bao).await?;
2138 }
2139
2140 for (_hash, _bao_tree) in bao_by_hash {
2141 }
2151 Ok(())
2152 }
2153
2154 pub fn delete_rec(root_dir: impl AsRef<Path>, extension: &str) -> Result<(), std::io::Error> {
2155 let ext = extension.trim_start_matches('.').to_lowercase();
2157
2158 for entry in WalkDir::new(root_dir).into_iter().filter_map(|e| e.ok()) {
2159 let path = entry.path();
2160
2161 if path.is_file() {
2162 if let Some(file_ext) = path.extension() {
2163 if file_ext.to_string_lossy().to_lowercase() == ext {
2164 fs::remove_file(path)?;
2165 }
2166 }
2167 }
2168 }
2169
2170 Ok(())
2171 }
2172
2173 pub fn dump_dir(path: impl AsRef<Path>) -> io::Result<()> {
2174 let mut entries: Vec<_> = WalkDir::new(&path)
2175 .into_iter()
2176 .filter_map(Result::ok) .collect();
2178
2179 entries.sort_by(|a, b| a.path().cmp(b.path()));
2181
2182 for entry in entries {
2183 let depth = entry.depth();
2184 let indent = " ".repeat(depth); let name = entry.file_name().to_string_lossy();
2186 let size = entry.metadata()?.len(); if entry.file_type().is_file() {
2189 println!("{indent}{name} ({size} bytes)");
2190 } else if entry.file_type().is_dir() {
2191 println!("{indent}{name}/");
2192 }
2193 }
2194 Ok(())
2195 }
2196
2197 pub fn dump_dir_full(path: impl AsRef<Path>) -> io::Result<()> {
2198 let mut entries: Vec<_> = WalkDir::new(&path)
2199 .into_iter()
2200 .filter_map(Result::ok) .collect();
2202
2203 entries.sort_by(|a, b| a.path().cmp(b.path()));
2205
2206 for entry in entries {
2207 let depth = entry.depth();
2208 let indent = " ".repeat(depth);
2209 let name = entry.file_name().to_string_lossy();
2210
2211 if entry.file_type().is_dir() {
2212 println!("{indent}{name}/");
2213 } else if entry.file_type().is_file() {
2214 let size = entry.metadata()?.len();
2215 println!("{indent}{name} ({size} bytes)");
2216
2217 let path = entry.path();
2219 if name.ends_with(".data") {
2220 print!("{indent} ");
2221 dump_file(path, 1024 * 16)?;
2222 } else if name.ends_with(".obao4") {
2223 print!("{indent} ");
2224 dump_file(path, 64)?;
2225 } else if name.ends_with(".sizes4") {
2226 print!("{indent} ");
2227 dump_file(path, 8)?;
2228 } else if name.ends_with(".bitfield") {
2229 match read_checksummed::<Bitfield>(path) {
2230 Ok(bitfield) => {
2231 println!("{indent} bitfield: {bitfield:?}");
2232 }
2233 Err(cause) => {
2234 println!("{indent} bitfield: error: {cause}");
2235 }
2236 }
2237 } else {
2238 continue; };
2240 }
2241 }
2242 Ok(())
2243 }
2244
2245 pub fn dump_file<P: AsRef<Path>>(path: P, chunk_size: u64) -> io::Result<()> {
2246 let bits = file_bits(path, chunk_size)?;
2247 println!("{}", print_bitfield_ansi(bits));
2248 Ok(())
2249 }
2250
2251 pub fn file_bits(path: impl AsRef<Path>, chunk_size: u64) -> io::Result<Vec<bool>> {
2252 let file = fs::File::open(&path)?;
2253 let file_size = file.metadata()?.len();
2254 let mut buffer = vec![0u8; chunk_size as usize];
2255 let mut bits = Vec::new();
2256
2257 let mut offset = 0u64;
2258 while offset < file_size {
2259 let remaining = file_size - offset;
2260 let current_chunk_size = chunk_size.min(remaining);
2261
2262 let chunk = &mut buffer[..current_chunk_size as usize];
2263 file.read_exact_at(offset, chunk)?;
2264
2265 let has_non_zero = chunk.iter().any(|&byte| byte != 0);
2266 bits.push(has_non_zero);
2267
2268 offset += current_chunk_size;
2269 }
2270
2271 Ok(bits)
2272 }
2273
2274 #[allow(dead_code)]
2275 fn print_bitfield(bits: impl IntoIterator<Item = bool>) -> String {
2276 bits.into_iter()
2277 .map(|bit| if bit { '#' } else { '_' })
2278 .collect()
2279 }
2280
2281 fn print_bitfield_ansi(bits: impl IntoIterator<Item = bool>) -> String {
2282 let mut result = String::new();
2283 let mut iter = bits.into_iter();
2284
2285 while let Some(b1) = iter.next() {
2286 let b2 = iter.next();
2287
2288 let white_fg = "\x1b[97m"; let reset = "\x1b[0m"; let gray_bg = "\x1b[100m"; let black_bg = "\x1b[40m"; let colored_char = match (b1, b2) {
2295 (true, Some(true)) => format!("{}{}{}", white_fg, '█', reset), (true, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, '▌', reset), (false, Some(true)) => format!("{}{}{}{}", gray_bg, white_fg, '▐', reset), (false, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, ' ', reset), (true, None) => format!("{}{}{}{}", black_bg, white_fg, '▌', reset), (false, None) => format!("{}{}{}{}", black_bg, white_fg, ' ', reset), };
2302
2303 result.push_str(&colored_char);
2304 }
2305
2306 result.push_str("\x1b[0m");
2308 result
2309 }
2310
2311 fn bytes_to_stream(
2312 bytes: Bytes,
2313 chunk_size: usize,
2314 ) -> impl Stream<Item = io::Result<Bytes>> + 'static {
2315 assert!(chunk_size > 0, "Chunk size must be greater than 0");
2316 stream::unfold((bytes, 0), move |(bytes, offset)| async move {
2317 if offset >= bytes.len() {
2318 None
2319 } else {
2320 let chunk_len = chunk_size.min(bytes.len() - offset);
2321 let chunk = bytes.slice(offset..offset + chunk_len);
2322 Some((Ok(chunk), (bytes, offset + chunk_len)))
2323 }
2324 })
2325 }
2326}