iroh_blobs/store/
fs.rs

1//! # File based blob store.
2//!
3//! A file based blob store needs a writeable directory to work with.
4//!
5//! General design:
6//!
7//! The file store consists of two actors.
8//!
9//! # The main actor
10//!
11//! The purpose of the main actor is to handle user commands and own a map of
12//! handles for hashes that are currently being worked on.
13//!
14//! It also owns tasks for ongoing import and export operations, as well as the
15//! database actor.
16//!
17//! Handling a command almost always involves either forwarding it to the
18//! database actor or creating a hash context and spawning a task.
19//!
20//! # The database actor
21//!
22//! The database actor is responsible for storing metadata about each hash,
23//! as well as inlined data and outboard data for small files.
24//!
25//! In addition to the metadata, the database actor also stores tags.
26//!
27//! # Tasks
28//!
29//! Tasks do not return a result. They are responsible for sending an error
30//! to the requester if possible. Otherwise, just dropping the sender will
31//! also fail the receiver, but without a descriptive error message.
32//!
33//! Tasks are usually implemented as an impl fn that does return a result,
34//! and a wrapper (named `..._task`) that just forwards the error, if any.
35//!
36//! That way you can use `?` syntax in the task implementation. The impl fns
37//! are also easier to test.
38//!
39//! # Context
40//!
41//! The main actor holds a TaskContext that is needed for almost all tasks,
42//! such as the config and a way to interact with the database.
43//!
44//! For tasks that are specific to a hash, a HashContext combines the task
45//! context with a slot from the table of the main actor that can be used
46//! to obtain an unique handle for the hash.
47//!
48//! # Runtime
49//!
50//! The fs store owns and manages its own tokio runtime. Dropping the store
51//! will clean up the database and shut down the runtime. However, some parts
52//! of the persistent state won't make it to disk, so operations involving large
53//! partial blobs will have a large initial delay on the next startup.
54//!
55//! It is also not guaranteed that all write operations will make it to disk.
56//! The on-disk store will be in a consistent state, but might miss some writes
57//! in the last seconds before shutdown.
58//!
59//! To avoid this, you can use the [`crate::api::Store::shutdown`] method to
60//! cleanly shut down the store and save ephemeral state to disk.
61//!
62//! Note that if you use the store inside a [`iroh::protocol::Router`] and shut
63//! down the router using [`iroh::protocol::Router::shutdown`], the store will be
64//! safely shut down as well. Any store refs you are holding will be inoperable
65//! after this.
66use std::{
67    fmt::{self, Debug},
68    fs,
69    future::Future,
70    io::Write,
71    num::NonZeroU64,
72    ops::Deref,
73    path::{Path, PathBuf},
74    sync::{
75        atomic::{AtomicU64, Ordering},
76        Arc,
77    },
78};
79
80use bao_tree::{
81    blake3,
82    io::{
83        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
84        outboard::PreOrderOutboard,
85        sync::ReadAt,
86        BaoContentItem, Leaf,
87    },
88    BaoTree, ChunkNum, ChunkRanges,
89};
90use bytes::Bytes;
91use delete_set::{BaoFilePart, ProtectHandle};
92use entity_manager::{EntityManagerState, SpawnArg};
93use entry_state::{DataLocation, OutboardLocation};
94use gc::run_gc;
95use import::{ImportEntry, ImportSource};
96use irpc::{channel::mpsc, RpcMessage};
97use meta::list_blobs;
98use n0_future::{future::yield_now, io};
99use nested_enum_utils::enum_conversions;
100use range_collections::range_set::RangeSetRange;
101use tokio::task::{JoinError, JoinSet};
102use tracing::{error, instrument, trace};
103
104use crate::{
105    api::{
106        proto::{
107            self, bitfield::is_validated, BatchMsg, BatchResponse, Bitfield, Command,
108            CreateTempTagMsg, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
109            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, HashSpecific, ImportBaoMsg,
110            ImportBaoRequest, ObserveMsg, Scope,
111        },
112        ApiClient,
113    },
114    protocol::ChunkRangesExt,
115    store::{
116        fs::{
117            bao_file::{
118                BaoFileStorage, BaoFileStorageSubscriber, CompleteStorage, DataReader,
119                OutboardReader,
120            },
121            util::entity_manager::{self, ActiveEntityState},
122        },
123        util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
124        IROH_BLOCK_SIZE,
125    },
126    util::{
127        channel::oneshot,
128        temp_tag::{TagDrop, TempTag, TempTagScope, TempTags},
129    },
130    Hash,
131};
132mod bao_file;
133use bao_file::BaoFileHandle;
134mod delete_set;
135mod entry_state;
136mod import;
137mod meta;
138pub mod options;
139pub(crate) mod util;
140use entry_state::EntryState;
141use import::{import_byte_stream, import_bytes, import_path, ImportEntryMsg};
142use options::Options;
143use tracing::Instrument;
144mod gc;
145
146use crate::{
147    api::{
148        self,
149        blobs::{AddProgressItem, ExportMode, ExportProgressItem},
150        Store,
151    },
152    HashAndFormat,
153};
154
155/// Maximum number of external paths we track per blob.
156const MAX_EXTERNAL_PATHS: usize = 8;
157
158/// Create a 16 byte unique ID.
159fn new_uuid() -> [u8; 16] {
160    use rand::RngCore;
161    let mut rng = rand::thread_rng();
162    let mut bytes = [0u8; 16];
163    rng.fill_bytes(&mut bytes);
164    bytes
165}
166
167/// Create temp file name based on a 16 byte UUID.
168fn temp_name() -> String {
169    format!("{}.temp", hex::encode(new_uuid()))
170}
171
172#[derive(Debug)]
173#[enum_conversions()]
174pub(crate) enum InternalCommand {
175    Dump(meta::Dump),
176    FinishImport(ImportEntryMsg),
177    ClearScope(ClearScope),
178}
179
180#[derive(Debug)]
181pub(crate) struct ClearScope {
182    pub scope: Scope,
183}
184
185impl InternalCommand {
186    pub fn parent_span(&self) -> tracing::Span {
187        match self {
188            Self::Dump(_) => tracing::Span::current(),
189            Self::ClearScope(_) => tracing::Span::current(),
190            Self::FinishImport(cmd) => cmd
191                .parent_span_opt()
192                .cloned()
193                .unwrap_or_else(tracing::Span::current),
194        }
195    }
196}
197
198/// Context needed by most tasks
199#[derive(Debug)]
200struct TaskContext {
201    // Store options such as paths and inline thresholds, in an Arc to cheaply share with tasks.
202    pub options: Arc<Options>,
203    // Metadata database, basically a mpsc sender with some extra functionality.
204    pub db: meta::Db,
205    // Handle to send internal commands
206    pub internal_cmd_tx: tokio::sync::mpsc::Sender<InternalCommand>,
207    /// Handle to protect files from deletion.
208    pub protect: ProtectHandle,
209}
210
211impl TaskContext {
212    pub async fn clear_scope(&self, scope: Scope) {
213        self.internal_cmd_tx
214            .send(ClearScope { scope }.into())
215            .await
216            .ok();
217    }
218}
219
220#[derive(Debug)]
221struct EmParams;
222
223impl entity_manager::Params for EmParams {
224    type EntityId = Hash;
225
226    type GlobalState = Arc<TaskContext>;
227
228    type EntityState = BaoFileHandle;
229
230    async fn on_shutdown(
231        state: entity_manager::ActiveEntityState<Self>,
232        cause: entity_manager::ShutdownCause,
233    ) {
234        trace!("persist {:?} due to {cause:?}", state.id);
235        state.persist().await;
236    }
237}
238
239#[derive(Debug)]
240struct Actor {
241    // Context that can be cheaply shared with tasks.
242    context: Arc<TaskContext>,
243    // Receiver for incoming user commands.
244    cmd_rx: tokio::sync::mpsc::Receiver<Command>,
245    // Receiver for incoming file store specific commands.
246    fs_cmd_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
247    // Tasks for import and export operations.
248    tasks: JoinSet<()>,
249    // Entity manager that handles concurrency for entities.
250    handles: EntityManagerState<EmParams>,
251    // temp tags
252    temp_tags: TempTags,
253    // waiters for idle state.
254    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
255    // our private tokio runtime. It has to live somewhere.
256    _rt: RtWrapper,
257}
258
259type HashContext = ActiveEntityState<EmParams>;
260
261impl SyncEntityApi for HashContext {
262    /// Load the state from the database.
263    ///
264    /// If the state is Initial, this will start the load.
265    /// If it is Loading, it will wait until loading is done.
266    /// If it is any other state, it will be a noop.
267    async fn load(&self) {
268        enum Action {
269            Load,
270            Wait,
271            None,
272        }
273        let mut action = Action::None;
274        self.state.send_if_modified(|guard| match guard.deref() {
275            BaoFileStorage::Initial => {
276                *guard = BaoFileStorage::Loading;
277                action = Action::Load;
278                true
279            }
280            BaoFileStorage::Loading => {
281                action = Action::Wait;
282                false
283            }
284            _ => false,
285        });
286        match action {
287            Action::Load => {
288                let state = if self.id == Hash::EMPTY {
289                    BaoFileStorage::Complete(CompleteStorage {
290                        data: MemOrFile::Mem(Bytes::new()),
291                        outboard: MemOrFile::empty(),
292                    })
293                } else {
294                    // we must assign a new state even in the error case, otherwise
295                    // tasks waiting for loading would stall!
296                    match self.global.db.get(self.id).await {
297                        Ok(state) => match BaoFileStorage::open(state, self).await {
298                            Ok(handle) => handle,
299                            Err(_) => BaoFileStorage::Poisoned,
300                        },
301                        Err(_) => BaoFileStorage::Poisoned,
302                    }
303                };
304                self.state.send_replace(state);
305            }
306            Action::Wait => {
307                // we are in state loading already, so we just need to wait for the
308                // other task to complete loading.
309                while matches!(self.state.borrow().deref(), BaoFileStorage::Loading) {
310                    self.state.0.subscribe().changed().await.ok();
311                }
312            }
313            Action::None => {}
314        }
315    }
316
317    /// Write a batch and notify the db
318    async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()> {
319        trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len());
320        let mut res = Ok(None);
321        self.state.send_if_modified(|state| {
322            let Ok((state1, update)) = state.take().write_batch(batch, bitfield, self) else {
323                res = Err(io::Error::other("write batch failed"));
324                return false;
325            };
326            res = Ok(update);
327            *state = state1;
328            true
329        });
330        if let Some(update) = res? {
331            self.global.db.update(self.id, update).await?;
332        }
333        Ok(())
334    }
335
336    /// An AsyncSliceReader for the data file.
337    ///
338    /// Caution: this is a reader for the unvalidated data file. Reading this
339    /// can produce data that does not match the hash.
340    #[allow(refining_impl_trait_internal)]
341    fn data_reader(&self) -> DataReader {
342        DataReader(self.state.clone())
343    }
344
345    /// An AsyncSliceReader for the outboard file.
346    ///
347    /// The outboard file is used to validate the data file. It is not guaranteed
348    /// to be complete.
349    #[allow(refining_impl_trait_internal)]
350    fn outboard_reader(&self) -> OutboardReader {
351        OutboardReader(self.state.clone())
352    }
353
354    /// The most precise known total size of the data file.
355    fn current_size(&self) -> io::Result<u64> {
356        match self.state.borrow().deref() {
357            BaoFileStorage::Complete(mem) => Ok(mem.size()),
358            BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()),
359            BaoFileStorage::Partial(file) => file.current_size(),
360            BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")),
361            BaoFileStorage::Initial => Err(io::Error::other("initial")),
362            BaoFileStorage::Loading => Err(io::Error::other("loading")),
363            BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()),
364        }
365    }
366
367    /// The most precise known total size of the data file.
368    fn bitfield(&self) -> io::Result<Bitfield> {
369        match self.state.borrow().deref() {
370            BaoFileStorage::Complete(mem) => Ok(mem.bitfield()),
371            BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()),
372            BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()),
373            BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")),
374            BaoFileStorage::Initial => Err(io::Error::other("initial")),
375            BaoFileStorage::Loading => Err(io::Error::other("loading")),
376            BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()),
377        }
378    }
379}
380
381impl HashContext {
382    /// The outboard for the file.
383    pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader>> {
384        let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE);
385        let outboard = self.outboard_reader();
386        Ok(PreOrderOutboard {
387            root: blake3::Hash::from(self.id),
388            tree,
389            data: outboard,
390        })
391    }
392
393    fn db(&self) -> &meta::Db {
394        &self.global.db
395    }
396
397    pub fn options(&self) -> &Arc<Options> {
398        &self.global.options
399    }
400
401    pub fn protect(&self, parts: impl IntoIterator<Item = BaoFilePart>) {
402        self.global.protect.protect(self.id, parts);
403    }
404
405    /// Update the entry state in the database, and wait for completion.
406    pub async fn update_await(&self, state: EntryState<Bytes>) -> io::Result<()> {
407        self.db().update_await(self.id, state).await?;
408        Ok(())
409    }
410
411    pub async fn get_entry_state(&self) -> io::Result<Option<EntryState<Bytes>>> {
412        let hash = self.id;
413        if hash == Hash::EMPTY {
414            return Ok(Some(EntryState::Complete {
415                data_location: DataLocation::Inline(Bytes::new()),
416                outboard_location: OutboardLocation::NotNeeded,
417            }));
418        };
419        self.db().get(hash).await
420    }
421
422    /// Update the entry state in the database, and wait for completion.
423    pub async fn set(&self, state: EntryState<Bytes>) -> io::Result<()> {
424        self.db().set(self.id, state).await
425    }
426}
427
428impl Actor {
429    fn db(&self) -> &meta::Db {
430        &self.context.db
431    }
432
433    fn context(&self) -> Arc<TaskContext> {
434        self.context.clone()
435    }
436
437    fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
438        let span = tracing::Span::current();
439        self.tasks.spawn(fut.instrument(span));
440    }
441
442    fn log_task_result(res: Result<(), JoinError>) {
443        match res {
444            Ok(_) => {}
445            Err(e) => {
446                error!("task failed: {e}");
447            }
448        }
449    }
450
451    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
452        let CreateTempTagMsg { tx, inner, .. } = cmd;
453        let mut tt = self.temp_tags.create(inner.scope, inner.value);
454        if tx.is_rpc() {
455            tt.leak();
456        }
457        tx.send(tt).await.ok();
458    }
459
460    async fn handle_command(&mut self, cmd: Command) {
461        let span = cmd.parent_span();
462        let _entered = span.enter();
463        match cmd {
464            Command::SyncDb(cmd) => {
465                trace!("{cmd:?}");
466                self.db().send(cmd.into()).await.ok();
467            }
468            Command::WaitIdle(cmd) => {
469                trace!("{cmd:?}");
470                if self.tasks.is_empty() {
471                    // we are currently idle
472                    cmd.tx.send(()).await.ok();
473                } else {
474                    // wait for idle state
475                    self.idle_waiters.push(cmd.tx);
476                }
477            }
478            Command::Shutdown(cmd) => {
479                trace!("{cmd:?}");
480                self.db().send(cmd.into()).await.ok();
481            }
482            Command::CreateTag(cmd) => {
483                trace!("{cmd:?}");
484                self.db().send(cmd.into()).await.ok();
485            }
486            Command::SetTag(cmd) => {
487                trace!("{cmd:?}");
488                self.db().send(cmd.into()).await.ok();
489            }
490            Command::ListTags(cmd) => {
491                trace!("{cmd:?}");
492                self.db().send(cmd.into()).await.ok();
493            }
494            Command::DeleteTags(cmd) => {
495                trace!("{cmd:?}");
496                self.db().send(cmd.into()).await.ok();
497            }
498            Command::RenameTag(cmd) => {
499                trace!("{cmd:?}");
500                self.db().send(cmd.into()).await.ok();
501            }
502            Command::ClearProtected(cmd) => {
503                trace!("{cmd:?}");
504                self.db().send(cmd.into()).await.ok();
505            }
506            Command::BlobStatus(cmd) => {
507                trace!("{cmd:?}");
508                self.db().send(cmd.into()).await.ok();
509            }
510            Command::DeleteBlobs(cmd) => {
511                trace!("{cmd:?}");
512                self.db().send(cmd.into()).await.ok();
513            }
514            Command::ListBlobs(cmd) => {
515                trace!("{cmd:?}");
516                if let Ok(snapshot) = self.db().snapshot(cmd.span.clone()).await {
517                    self.spawn(list_blobs(snapshot, cmd));
518                }
519            }
520            Command::Batch(cmd) => {
521                trace!("{cmd:?}");
522                let (id, scope) = self.temp_tags.create_scope();
523                self.spawn(handle_batch(cmd, id, scope, self.context()));
524            }
525            Command::CreateTempTag(cmd) => {
526                trace!("{cmd:?}");
527                self.create_temp_tag(cmd).await;
528            }
529            Command::ListTempTags(cmd) => {
530                trace!("{cmd:?}");
531                let tts = self.temp_tags.list();
532                cmd.tx.send(tts).await.ok();
533            }
534            Command::ImportBytes(cmd) => {
535                trace!("{cmd:?}");
536                self.spawn(import_bytes(cmd, self.context()));
537            }
538            Command::ImportByteStream(cmd) => {
539                trace!("{cmd:?}");
540                self.spawn(import_byte_stream(cmd, self.context()));
541            }
542            Command::ImportPath(cmd) => {
543                trace!("{cmd:?}");
544                self.spawn(import_path(cmd, self.context()));
545            }
546            Command::ExportPath(cmd) => {
547                trace!("{cmd:?}");
548                cmd.spawn(&mut self.handles, &mut self.tasks).await;
549            }
550            Command::ExportBao(cmd) => {
551                trace!("{cmd:?}");
552                cmd.spawn(&mut self.handles, &mut self.tasks).await;
553            }
554            Command::ExportRanges(cmd) => {
555                trace!("{cmd:?}");
556                cmd.spawn(&mut self.handles, &mut self.tasks).await;
557            }
558            Command::ImportBao(cmd) => {
559                trace!("{cmd:?}");
560                cmd.spawn(&mut self.handles, &mut self.tasks).await;
561            }
562            Command::Observe(cmd) => {
563                trace!("{cmd:?}");
564                cmd.spawn(&mut self.handles, &mut self.tasks).await;
565            }
566        }
567    }
568
569    async fn handle_fs_command(&mut self, cmd: InternalCommand) {
570        let span = cmd.parent_span();
571        let _entered = span.enter();
572        match cmd {
573            InternalCommand::Dump(cmd) => {
574                trace!("{cmd:?}");
575                self.db().send(cmd.into()).await.ok();
576            }
577            InternalCommand::ClearScope(cmd) => {
578                trace!("{cmd:?}");
579                self.temp_tags.end_scope(cmd.scope);
580            }
581            InternalCommand::FinishImport(cmd) => {
582                trace!("{cmd:?}");
583                if cmd.hash == Hash::EMPTY {
584                    cmd.tx
585                        .send(AddProgressItem::Done(TempTag::leaking_empty(cmd.format)))
586                        .await
587                        .ok();
588                } else {
589                    let tt = self.temp_tags.create(
590                        cmd.scope,
591                        HashAndFormat {
592                            hash: cmd.hash,
593                            format: cmd.format,
594                        },
595                    );
596                    (tt, cmd).spawn(&mut self.handles, &mut self.tasks).await;
597                }
598            }
599        }
600    }
601
602    async fn run(mut self) {
603        loop {
604            tokio::select! {
605                task = self.handles.tick() => {
606                    if let Some(task) = task {
607                        self.spawn(task);
608                    }
609                }
610                cmd = self.cmd_rx.recv() => {
611                    let Some(cmd) = cmd else {
612                        break;
613                    };
614                    self.handle_command(cmd).await;
615                }
616                Some(cmd) = self.fs_cmd_rx.recv() => {
617                    self.handle_fs_command(cmd).await;
618                }
619                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
620                    Self::log_task_result(res);
621                    if self.tasks.is_empty() {
622                        for tx in self.idle_waiters.drain(..) {
623                            tx.send(()).await.ok();
624                        }
625                    }
626                }
627            }
628        }
629        self.handles.shutdown().await;
630        while let Some(res) = self.tasks.join_next().await {
631            Self::log_task_result(res);
632        }
633    }
634
635    async fn new(
636        db_path: PathBuf,
637        rt: RtWrapper,
638        cmd_rx: tokio::sync::mpsc::Receiver<Command>,
639        fs_commands_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
640        fs_commands_tx: tokio::sync::mpsc::Sender<InternalCommand>,
641        options: Arc<Options>,
642    ) -> anyhow::Result<Self> {
643        trace!(
644            "creating data directory: {}",
645            options.path.data_path.display()
646        );
647        fs::create_dir_all(&options.path.data_path)?;
648        trace!(
649            "creating temp directory: {}",
650            options.path.temp_path.display()
651        );
652        fs::create_dir_all(&options.path.temp_path)?;
653        trace!(
654            "creating parent directory for db file{}",
655            db_path.parent().unwrap().display()
656        );
657        fs::create_dir_all(db_path.parent().unwrap())?;
658        let (db_send, db_recv) = tokio::sync::mpsc::channel(100);
659        let (protect, ds) = delete_set::pair(Arc::new(options.path.clone()));
660        let db_actor = meta::Actor::new(db_path, db_recv, ds, options.batch.clone())?;
661        let slot_context = Arc::new(TaskContext {
662            options: options.clone(),
663            db: meta::Db::new(db_send),
664            internal_cmd_tx: fs_commands_tx,
665            protect,
666        });
667        rt.spawn(db_actor.run());
668        Ok(Self {
669            context: slot_context.clone(),
670            cmd_rx,
671            fs_cmd_rx: fs_commands_rx,
672            tasks: JoinSet::new(),
673            handles: EntityManagerState::new(slot_context, 1024, 32, 32, 2),
674            temp_tags: Default::default(),
675            idle_waiters: Vec::new(),
676            _rt: rt,
677        })
678    }
679}
680
681trait HashSpecificCommand: HashSpecific + Send + 'static {
682    /// Handle the command on success by spawning a task into the per-hash context.
683    fn handle(self, ctx: HashContext) -> impl Future<Output = ()> + Send + 'static;
684
685    /// Opportunity to send an error if spawning fails due to the task being busy (inbox full)
686    /// or dead (e.g. panic in one of the running tasks).
687    fn on_error(self, arg: SpawnArg<EmParams>) -> impl Future<Output = ()> + Send + 'static;
688
689    async fn spawn(
690        self,
691        manager: &mut entity_manager::EntityManagerState<EmParams>,
692        tasks: &mut JoinSet<()>,
693    ) where
694        Self: Sized,
695    {
696        let span = tracing::Span::current();
697        let task = manager
698            .spawn(self.hash(), |arg| {
699                async move {
700                    match arg {
701                        SpawnArg::Active(state) => {
702                            self.handle(state).await;
703                        }
704                        SpawnArg::Busy => {
705                            self.on_error(arg).await;
706                        }
707                        SpawnArg::Dead => {
708                            self.on_error(arg).await;
709                        }
710                    }
711                }
712                .instrument(span)
713            })
714            .await;
715        if let Some(task) = task {
716            tasks.spawn(task);
717        }
718    }
719}
720
721impl HashSpecificCommand for ObserveMsg {
722    async fn handle(self, ctx: HashContext) {
723        ctx.observe(self).await
724    }
725    async fn on_error(self, _arg: SpawnArg<EmParams>) {}
726}
727impl HashSpecificCommand for ExportPathMsg {
728    async fn handle(self, ctx: HashContext) {
729        ctx.export_path(self).await
730    }
731    async fn on_error(self, arg: SpawnArg<EmParams>) {
732        let err = match arg {
733            SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
734            SpawnArg::Dead => io::Error::other("entity is dead"),
735            _ => unreachable!(),
736        };
737        self.tx
738            .send(ExportProgressItem::Error(api::Error::Io(err)))
739            .await
740            .ok();
741    }
742}
743impl HashSpecificCommand for ExportBaoMsg {
744    async fn handle(self, ctx: HashContext) {
745        ctx.export_bao(self).await
746    }
747    async fn on_error(self, arg: SpawnArg<EmParams>) {
748        let err = match arg {
749            SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
750            SpawnArg::Dead => io::Error::other("entity is dead"),
751            _ => unreachable!(),
752        };
753        self.tx
754            .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(err)))
755            .await
756            .ok();
757    }
758}
759impl HashSpecificCommand for ExportRangesMsg {
760    async fn handle(self, ctx: HashContext) {
761        ctx.export_ranges(self).await
762    }
763    async fn on_error(self, arg: SpawnArg<EmParams>) {
764        let err = match arg {
765            SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
766            SpawnArg::Dead => io::Error::other("entity is dead"),
767            _ => unreachable!(),
768        };
769        self.tx
770            .send(ExportRangesItem::Error(api::Error::Io(err)))
771            .await
772            .ok();
773    }
774}
775impl HashSpecificCommand for ImportBaoMsg {
776    async fn handle(self, ctx: HashContext) {
777        ctx.import_bao(self).await
778    }
779    async fn on_error(self, arg: SpawnArg<EmParams>) {
780        let err = match arg {
781            SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
782            SpawnArg::Dead => io::Error::other("entity is dead"),
783            _ => unreachable!(),
784        };
785        self.tx.send(Err(api::Error::Io(err))).await.ok();
786    }
787}
788impl HashSpecific for (TempTag, ImportEntryMsg) {
789    fn hash(&self) -> Hash {
790        self.1.hash()
791    }
792}
793impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
794    async fn handle(self, ctx: HashContext) {
795        let (tt, cmd) = self;
796        ctx.finish_import(cmd, tt).await
797    }
798    async fn on_error(self, arg: SpawnArg<EmParams>) {
799        let err = match arg {
800            SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
801            SpawnArg::Dead => io::Error::other("entity is dead"),
802            _ => unreachable!(),
803        };
804        self.1.tx.send(AddProgressItem::Error(err)).await.ok();
805    }
806}
807
808struct RtWrapper(Option<tokio::runtime::Runtime>);
809
810impl From<tokio::runtime::Runtime> for RtWrapper {
811    fn from(rt: tokio::runtime::Runtime) -> Self {
812        Self(Some(rt))
813    }
814}
815
816impl fmt::Debug for RtWrapper {
817    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
818        ValueOrPoisioned(self.0.as_ref()).fmt(f)
819    }
820}
821
822impl Deref for RtWrapper {
823    type Target = tokio::runtime::Runtime;
824
825    fn deref(&self) -> &Self::Target {
826        self.0.as_ref().unwrap()
827    }
828}
829
830impl Drop for RtWrapper {
831    fn drop(&mut self) {
832        if let Some(rt) = self.0.take() {
833            trace!("dropping tokio runtime");
834            tokio::task::block_in_place(|| {
835                drop(rt);
836            });
837            trace!("dropped tokio runtime");
838        }
839    }
840}
841
842async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: Arc<TaskContext>) {
843    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
844        error!("batch failed: {cause}");
845    }
846    ctx.clear_scope(id).await;
847}
848
849async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
850    let BatchMsg { tx, mut rx, .. } = cmd;
851    trace!("created scope {}", id);
852    tx.send(id).await.map_err(api::Error::other)?;
853    while let Some(msg) = rx.recv().await? {
854        match msg {
855            BatchResponse::Drop(msg) => scope.on_drop(&msg),
856            BatchResponse::Ping => {}
857        }
858    }
859    Ok(())
860}
861
862/// The minimal API you need to implement for an entity for a store to work.
863trait EntityApi {
864    /// Import from a stream of n0 bao encoded data.
865    async fn import_bao(&self, cmd: ImportBaoMsg);
866    /// Finish an import from a local file or memory.
867    async fn finish_import(&self, cmd: ImportEntryMsg, tt: TempTag);
868    /// Observe the bitfield of the entry.
869    async fn observe(&self, cmd: ObserveMsg);
870    /// Export byte ranges of the entry as data
871    async fn export_ranges(&self, cmd: ExportRangesMsg);
872    /// Export chunk ranges of the entry as a n0 bao encoded stream.
873    async fn export_bao(&self, cmd: ExportBaoMsg);
874    /// Export the entry to a local file.
875    async fn export_path(&self, cmd: ExportPathMsg);
876    /// Persist the entry at the end of its lifecycle.
877    async fn persist(&self);
878}
879
880/// A more opinionated API that can be used as a helper to save implementation
881/// effort when implementing the EntityApi trait.
882trait SyncEntityApi: EntityApi {
883    /// Load the entry state from the database. This must make sure that it is
884    /// not run concurrently, so if load is called multiple times, all but one
885    /// must wait. You can use a tokio::sync::OnceCell or similar to achieve this.
886    async fn load(&self);
887
888    /// Get a synchronous reader for the data file.
889    fn data_reader(&self) -> impl ReadBytesAt;
890
891    /// Get a synchronous reader for the outboard file.
892    fn outboard_reader(&self) -> impl ReadAt;
893
894    /// Get the best known size of the data file.
895    fn current_size(&self) -> io::Result<u64>;
896
897    /// Get the bitfield of the entry.
898    fn bitfield(&self) -> io::Result<Bitfield>;
899
900    /// Write a batch of content items to the entry.
901    async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()>;
902}
903
904/// The high level entry point per entry.
905impl EntityApi for HashContext {
906    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
907    async fn import_bao(&self, cmd: ImportBaoMsg) {
908        trace!("{cmd:?}");
909        self.load().await;
910        let ImportBaoMsg {
911            inner: ImportBaoRequest { size, .. },
912            rx,
913            tx,
914            ..
915        } = cmd;
916        let res = import_bao_impl(self, size, rx).await;
917        trace!("{res:?}");
918        tx.send(res).await.ok();
919    }
920
921    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
922    async fn observe(&self, cmd: ObserveMsg) {
923        trace!("{cmd:?}");
924        self.load().await;
925        BaoFileStorageSubscriber::new(self.state.subscribe())
926            .forward(cmd.tx)
927            .await
928            .ok();
929    }
930
931    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
932    async fn export_ranges(&self, mut cmd: ExportRangesMsg) {
933        trace!("{cmd:?}");
934        self.load().await;
935        if let Err(cause) = export_ranges_impl(self, cmd.inner, &mut cmd.tx).await {
936            cmd.tx
937                .send(ExportRangesItem::Error(cause.into()))
938                .await
939                .ok();
940        }
941    }
942
943    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
944    async fn export_bao(&self, mut cmd: ExportBaoMsg) {
945        trace!("{cmd:?}");
946        self.load().await;
947        if let Err(cause) = export_bao_impl(self, cmd.inner, &mut cmd.tx).await {
948            // if the entry is in state NonExisting, this will be an io error with
949            // kind NotFound. So we must not wrap this somehow but pass it on directly.
950            cmd.tx
951                .send(bao_tree::io::EncodeError::Io(cause).into())
952                .await
953                .ok();
954        }
955    }
956
957    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
958    async fn export_path(&self, cmd: ExportPathMsg) {
959        trace!("{cmd:?}");
960        self.load().await;
961        let ExportPathMsg { inner, mut tx, .. } = cmd;
962        if let Err(cause) = export_path_impl(self, inner, &mut tx).await {
963            tx.send(cause.into()).await.ok();
964        }
965    }
966
967    #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
968    async fn finish_import(&self, cmd: ImportEntryMsg, mut tt: TempTag) {
969        trace!("{cmd:?}");
970        self.load().await;
971        let res = match finish_import_impl(self, cmd.inner).await {
972            Ok(()) => {
973                // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag
974                // it will be cleaned up when either the process exits or scope ends
975                if cmd.tx.is_rpc() {
976                    trace!("leaking temp tag {}", tt.hash_and_format());
977                    tt.leak();
978                }
979                AddProgressItem::Done(tt)
980            }
981            Err(cause) => AddProgressItem::Error(cause),
982        };
983        cmd.tx.send(res).await.ok();
984    }
985
986    #[instrument(skip_all, fields(hash = %self.id.fmt_short()))]
987    async fn persist(&self) {
988        self.state.send_if_modified(|guard| {
989            let hash = &self.id;
990            let BaoFileStorage::Partial(fs) = guard.take() else {
991                return false;
992            };
993            let path = self.global.options.path.bitfield_path(hash);
994            trace!("writing bitfield for hash {} to {}", hash, path.display());
995            if let Err(cause) = fs.sync_all(&path) {
996                error!(
997                    "failed to write bitfield for {} at {}: {:?}",
998                    hash,
999                    path.display(),
1000                    cause
1001                );
1002            }
1003            false
1004        });
1005    }
1006}
1007
1008async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> {
1009    if ctx.id == Hash::EMPTY {
1010        return Ok(()); // nothing to do for the empty hash
1011    }
1012    let ImportEntry {
1013        source,
1014        hash,
1015        outboard,
1016        ..
1017    } = import_data;
1018    let options = ctx.options();
1019    match &source {
1020        ImportSource::Memory(data) => {
1021            debug_assert!(options.is_inlined_data(data.len() as u64));
1022        }
1023        ImportSource::External(_, _, size) => {
1024            debug_assert!(!options.is_inlined_data(*size));
1025        }
1026        ImportSource::TempFile(_, _, size) => {
1027            debug_assert!(!options.is_inlined_data(*size));
1028        }
1029    }
1030    ctx.load().await;
1031    let handle = &ctx.state;
1032    // if I do have an existing handle, I have to possibly deal with observers.
1033    // if I don't have an existing handle, there are 2 cases:
1034    //   the entry exists in the db, but we don't have a handle
1035    //   the entry does not exist at all.
1036    // convert the import source to a data location and drop the open files
1037    ctx.protect([BaoFilePart::Data, BaoFilePart::Outboard]);
1038    let data_location = match source {
1039        ImportSource::Memory(data) => DataLocation::Inline(data),
1040        ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size),
1041        ImportSource::TempFile(path, _file, size) => {
1042            // this will always work on any unix, but on windows there might be an issue if the target file is open!
1043            // possibly open with FILE_SHARE_DELETE on windows?
1044            let target = ctx.options().path.data_path(&hash);
1045            trace!(
1046                "moving temp file to owned data location: {} -> {}",
1047                path.display(),
1048                target.display()
1049            );
1050            if let Err(cause) = fs::rename(&path, &target) {
1051                error!(
1052                    "failed to move temp file {} to owned data location {}: {cause}",
1053                    path.display(),
1054                    target.display()
1055                );
1056            }
1057            DataLocation::Owned(size)
1058        }
1059    };
1060    let outboard_location = match outboard {
1061        MemOrFile::Mem(bytes) if bytes.is_empty() => OutboardLocation::NotNeeded,
1062        MemOrFile::Mem(bytes) => OutboardLocation::Inline(bytes),
1063        MemOrFile::File(path) => {
1064            // the same caveat as above applies here
1065            let target = ctx.options().path.outboard_path(&hash);
1066            trace!(
1067                "moving temp file to owned outboard location: {} -> {}",
1068                path.display(),
1069                target.display()
1070            );
1071            if let Err(cause) = fs::rename(&path, &target) {
1072                error!(
1073                    "failed to move temp file {} to owned outboard location {}: {cause}",
1074                    path.display(),
1075                    target.display()
1076                );
1077            }
1078            OutboardLocation::Owned
1079        }
1080    };
1081    let data = match &data_location {
1082        DataLocation::Inline(data) => MemOrFile::Mem(data.clone()),
1083        DataLocation::Owned(size) => {
1084            let path = ctx.options().path.data_path(&hash);
1085            let file = fs::File::open(&path)?;
1086            MemOrFile::File(FixedSize::new(file, *size))
1087        }
1088        DataLocation::External(paths, size) => {
1089            let Some(path) = paths.iter().next() else {
1090                return Err(io::Error::other("no external data path"));
1091            };
1092            let file = fs::File::open(path)?;
1093            MemOrFile::File(FixedSize::new(file, *size))
1094        }
1095    };
1096    let outboard = match &outboard_location {
1097        OutboardLocation::NotNeeded => MemOrFile::empty(),
1098        OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()),
1099        OutboardLocation::Owned => {
1100            let path = ctx.options().path.outboard_path(&hash);
1101            let file = fs::File::open(&path)?;
1102            MemOrFile::File(file)
1103        }
1104    };
1105    handle.complete(data, outboard);
1106    let state = EntryState::Complete {
1107        data_location,
1108        outboard_location,
1109    };
1110    ctx.update_await(state).await?;
1111    Ok(())
1112}
1113
1114fn chunk_range(leaf: &Leaf) -> ChunkRanges {
1115    let start = ChunkNum::chunks(leaf.offset);
1116    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
1117    (start..end).into()
1118}
1119
1120async fn import_bao_impl(
1121    ctx: &HashContext,
1122    size: NonZeroU64,
1123    mut rx: mpsc::Receiver<BaoContentItem>,
1124) -> api::Result<()> {
1125    trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size);
1126    let mut batch = Vec::<BaoContentItem>::new();
1127    let mut ranges = ChunkRanges::empty();
1128    while let Some(item) = rx.recv().await? {
1129        // if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch
1130        if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
1131            let bitfield = Bitfield::new_unchecked(ranges, size.into());
1132            ctx.write_batch(&batch, &bitfield).await?;
1133            batch.clear();
1134            ranges = ChunkRanges::empty();
1135        }
1136        if let BaoContentItem::Leaf(leaf) = &item {
1137            let leaf_range = chunk_range(leaf);
1138            if is_validated(size, &leaf_range) && size.get() != leaf.offset + leaf.data.len() as u64
1139            {
1140                return Err(api::Error::io(io::ErrorKind::InvalidData, "invalid size"));
1141            }
1142            ranges |= leaf_range;
1143        }
1144        batch.push(item);
1145    }
1146    if !batch.is_empty() {
1147        let bitfield = Bitfield::new_unchecked(ranges, size.into());
1148        ctx.write_batch(&batch, &bitfield).await?;
1149    }
1150    Ok(())
1151}
1152
1153async fn export_ranges_impl(
1154    ctx: &HashContext,
1155    cmd: ExportRangesRequest,
1156    tx: &mut mpsc::Sender<ExportRangesItem>,
1157) -> io::Result<()> {
1158    let ExportRangesRequest { ranges, hash } = cmd;
1159    trace!(
1160        "exporting ranges: {hash} {ranges:?} size={}",
1161        ctx.current_size()?
1162    );
1163    let bitfield = ctx.bitfield()?;
1164    let data = ctx.data_reader();
1165    let size = bitfield.size();
1166    for range in ranges.iter() {
1167        let range = match range {
1168            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
1169            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
1170        };
1171        let requested = ChunkRanges::bytes(range.start..range.end);
1172        if !bitfield.ranges.is_superset(&requested) {
1173            return Err(io::Error::other(format!(
1174                "missing range: {requested:?}, present: {bitfield:?}",
1175            )));
1176        }
1177        let bs = 1024;
1178        let mut offset = range.start;
1179        loop {
1180            let end: u64 = (offset + bs).min(range.end);
1181            let size = (end - offset) as usize;
1182            let res = data.read_bytes_at(offset, size);
1183            tx.send(ExportRangesItem::Data(Leaf { offset, data: res? }))
1184                .await?;
1185            offset = end;
1186            if offset >= range.end {
1187                break;
1188            }
1189        }
1190    }
1191    Ok(())
1192}
1193
1194async fn export_bao_impl(
1195    ctx: &HashContext,
1196    cmd: ExportBaoRequest,
1197    tx: &mut mpsc::Sender<EncodedItem>,
1198) -> io::Result<()> {
1199    let ExportBaoRequest { ranges, hash, .. } = cmd;
1200    let outboard = ctx.outboard()?;
1201    let size = outboard.tree.size();
1202    if size == 0 && cmd.hash != Hash::EMPTY {
1203        // we have no data whatsoever, so we stop here
1204        return Ok(());
1205    }
1206    trace!("exporting bao: {hash} {ranges:?} size={size}",);
1207    let data = ctx.data_reader();
1208    let tx = BaoTreeSender::new(tx);
1209    traverse_ranges_validated(data, outboard, &ranges, tx).await?;
1210    Ok(())
1211}
1212
1213async fn export_path_impl(
1214    ctx: &HashContext,
1215    cmd: ExportPathRequest,
1216    tx: &mut mpsc::Sender<ExportProgressItem>,
1217) -> api::Result<()> {
1218    let ExportPathRequest { mode, target, .. } = cmd;
1219    if !target.is_absolute() {
1220        return Err(api::Error::io(
1221            io::ErrorKind::InvalidInput,
1222            "path is not absolute",
1223        ));
1224    }
1225    if let Some(parent) = target.parent() {
1226        fs::create_dir_all(parent)?;
1227    }
1228    let state = ctx.get_entry_state().await?;
1229    let (data_location, outboard_location) = match state {
1230        Some(EntryState::Complete {
1231            data_location,
1232            outboard_location,
1233        }) => (data_location, outboard_location),
1234        Some(EntryState::Partial { .. }) => {
1235            return Err(api::Error::io(
1236                io::ErrorKind::InvalidInput,
1237                "cannot export partial entry",
1238            ));
1239        }
1240        None => {
1241            return Err(api::Error::io(io::ErrorKind::NotFound, "no entry found"));
1242        }
1243    };
1244    trace!("exporting {} to {}", cmd.hash.to_hex(), target.display());
1245    let (data, mut external) = match data_location {
1246        DataLocation::Inline(data) => (MemOrFile::Mem(data), vec![]),
1247        DataLocation::Owned(size) => (
1248            MemOrFile::File((ctx.options().path.data_path(&cmd.hash), size)),
1249            vec![],
1250        ),
1251        DataLocation::External(paths, size) => (
1252            MemOrFile::File((
1253                paths.first().cloned().ok_or_else(|| {
1254                    io::Error::new(io::ErrorKind::NotFound, "no external data path")
1255                })?,
1256                size,
1257            )),
1258            paths,
1259        ),
1260    };
1261    let size = match &data {
1262        MemOrFile::Mem(data) => data.len() as u64,
1263        MemOrFile::File((_, size)) => *size,
1264    };
1265    tx.send(ExportProgressItem::Size(size))
1266        .await
1267        .map_err(api::Error::other)?;
1268    match data {
1269        MemOrFile::Mem(data) => {
1270            let mut target = fs::File::create(&target)?;
1271            target.write_all(&data)?;
1272        }
1273        MemOrFile::File((source_path, size)) => match mode {
1274            ExportMode::Copy => {
1275                let res = reflink_or_copy_with_progress(&source_path, &target, size, tx).await?;
1276                trace!(
1277                    "exported {} to {}, {res:?}",
1278                    source_path.display(),
1279                    target.display()
1280                );
1281            }
1282            ExportMode::TryReference => {
1283                if !external.is_empty() {
1284                    // the file already exists externally, so we need to copy it.
1285                    // if the OS supports reflink, we might as well use that.
1286                    let res =
1287                        reflink_or_copy_with_progress(&source_path, &target, size, tx).await?;
1288                    trace!(
1289                        "exported {} also to {}, {res:?}",
1290                        source_path.display(),
1291                        target.display()
1292                    );
1293                    external.push(target);
1294                    external.sort();
1295                    external.dedup();
1296                    external.truncate(MAX_EXTERNAL_PATHS);
1297                } else {
1298                    // the file was previously owned, so we can just move it.
1299                    // if that fails with ERR_CROSS, we fall back to copy.
1300                    match std::fs::rename(&source_path, &target) {
1301                        Ok(()) => {}
1302                        Err(cause) => {
1303                            const ERR_CROSS: i32 = 18;
1304                            if cause.raw_os_error() == Some(ERR_CROSS) {
1305                                reflink_or_copy_with_progress(&source_path, &target, size, tx)
1306                                    .await?;
1307                            } else {
1308                                return Err(cause.into());
1309                            }
1310                        }
1311                    }
1312                    external.push(target);
1313                };
1314                // setting the new entry state will also take care of deleting the owned data file!
1315                ctx.set(EntryState::Complete {
1316                    data_location: DataLocation::External(external, size),
1317                    outboard_location,
1318                })
1319                .await?;
1320            }
1321        },
1322    }
1323    tx.send(ExportProgressItem::Done)
1324        .await
1325        .map_err(api::Error::other)?;
1326    Ok(())
1327}
1328
1329trait CopyProgress: RpcMessage {
1330    fn from_offset(offset: u64) -> Self;
1331}
1332
1333impl CopyProgress for ExportProgressItem {
1334    fn from_offset(offset: u64) -> Self {
1335        ExportProgressItem::CopyProgress(offset)
1336    }
1337}
1338
1339impl CopyProgress for AddProgressItem {
1340    fn from_offset(offset: u64) -> Self {
1341        AddProgressItem::CopyProgress(offset)
1342    }
1343}
1344
1345#[derive(Debug)]
1346enum CopyResult {
1347    Reflinked,
1348    Copied,
1349}
1350
1351async fn reflink_or_copy_with_progress(
1352    from: impl AsRef<Path>,
1353    to: impl AsRef<Path>,
1354    size: u64,
1355    tx: &mut mpsc::Sender<impl CopyProgress>,
1356) -> io::Result<CopyResult> {
1357    let from = from.as_ref();
1358    let to = to.as_ref();
1359    if reflink_copy::reflink(from, to).is_ok() {
1360        return Ok(CopyResult::Reflinked);
1361    }
1362    let source = fs::File::open(from)?;
1363    let mut target = fs::File::create(to)?;
1364    copy_with_progress(source, size, &mut target, tx).await?;
1365    Ok(CopyResult::Copied)
1366}
1367
1368async fn copy_with_progress<T: CopyProgress>(
1369    file: impl ReadAt,
1370    size: u64,
1371    target: &mut impl Write,
1372    tx: &mut mpsc::Sender<T>,
1373) -> io::Result<()> {
1374    let mut offset = 0;
1375    let mut buf = vec![0u8; 1024 * 1024];
1376    while offset < size {
1377        let remaining = buf.len().min((size - offset) as usize);
1378        let buf: &mut [u8] = &mut buf[..remaining];
1379        file.read_exact_at(offset, buf)?;
1380        target.write_all(buf)?;
1381        tx.try_send(T::from_offset(offset))
1382            .await
1383            .map_err(|_e| io::Error::other(""))?;
1384        yield_now().await;
1385        offset += buf.len() as u64;
1386    }
1387    Ok(())
1388}
1389
1390impl FsStore {
1391    /// Load or create a new store.
1392    pub async fn load(root: impl AsRef<Path>) -> anyhow::Result<Self> {
1393        let path = root.as_ref();
1394        let db_path = path.join("blobs.db");
1395        let options = Options::new(path);
1396        Self::load_with_opts(db_path, options).await
1397    }
1398
1399    /// Load or create a new store with custom options, returning an additional sender for file store specific commands.
1400    pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result<FsStore> {
1401        static THREAD_NR: AtomicU64 = AtomicU64::new(0);
1402        let rt = tokio::runtime::Builder::new_multi_thread()
1403            .thread_name_fn(|| {
1404                format!(
1405                    "iroh-blob-store-{}",
1406                    THREAD_NR.fetch_add(1, Ordering::Relaxed)
1407                )
1408            })
1409            .enable_time()
1410            .build()?;
1411        let handle = rt.handle().clone();
1412        let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(100);
1413        let (fs_commands_tx, fs_commands_rx) = tokio::sync::mpsc::channel(100);
1414        let gc_config = options.gc.clone();
1415        let actor = handle
1416            .spawn(Actor::new(
1417                db_path,
1418                rt.into(),
1419                commands_rx,
1420                fs_commands_rx,
1421                fs_commands_tx.clone(),
1422                Arc::new(options),
1423            ))
1424            .await??;
1425        handle.spawn(actor.run());
1426        let store = FsStore::new(commands_tx.into(), fs_commands_tx);
1427        if let Some(config) = gc_config {
1428            handle.spawn(run_gc(store.deref().clone(), config));
1429        }
1430        Ok(store)
1431    }
1432}
1433
1434/// A file based store.
1435///
1436/// A store can be created using [`load`](FsStore::load) or [`load_with_opts`](FsStore::load_with_opts).
1437/// Load will use the default options and create the required directories, while load_with_opts allows
1438/// you to customize the options and the location of the database. Both variants will create the database
1439/// if it does not exist, and load an existing database if one is found at the configured location.
1440///
1441/// In addition to implementing the [`Store`](`crate::api::Store`) API via [`Deref`](`std::ops::Deref`),
1442/// there are a few additional methods that are specific to file based stores, such as [`dump`](FsStore::dump).
1443#[derive(Debug, Clone)]
1444pub struct FsStore {
1445    sender: ApiClient,
1446    db: tokio::sync::mpsc::Sender<InternalCommand>,
1447}
1448
1449impl From<FsStore> for Store {
1450    fn from(value: FsStore) -> Self {
1451        Store::from_sender(value.sender)
1452    }
1453}
1454
1455impl Deref for FsStore {
1456    type Target = Store;
1457
1458    fn deref(&self) -> &Self::Target {
1459        Store::ref_from_sender(&self.sender)
1460    }
1461}
1462
1463impl AsRef<Store> for FsStore {
1464    fn as_ref(&self) -> &Store {
1465        self.deref()
1466    }
1467}
1468
1469impl FsStore {
1470    fn new(
1471        sender: irpc::LocalSender<proto::Request>,
1472        db: tokio::sync::mpsc::Sender<InternalCommand>,
1473    ) -> Self {
1474        Self {
1475            sender: sender.into(),
1476            db,
1477        }
1478    }
1479
1480    pub async fn dump(&self) -> anyhow::Result<()> {
1481        let (tx, rx) = oneshot::channel();
1482        self.db
1483            .send(
1484                meta::Dump {
1485                    tx,
1486                    span: tracing::Span::current(),
1487                }
1488                .into(),
1489            )
1490            .await?;
1491        rx.await??;
1492        Ok(())
1493    }
1494}
1495
1496#[cfg(test)]
1497pub mod tests {
1498    use core::panic;
1499    use std::collections::{HashMap, HashSet};
1500
1501    use bao_tree::{
1502        io::{outboard::PreOrderMemOutboard, round_up_to_chunks_groups},
1503        ChunkRanges,
1504    };
1505    use n0_future::{stream, Stream, StreamExt};
1506    use testresult::TestResult;
1507    use walkdir::WalkDir;
1508
1509    use super::*;
1510    use crate::{
1511        api::blobs::Bitfield,
1512        store::{
1513            util::{read_checksummed, SliceInfoExt, Tag},
1514            IROH_BLOCK_SIZE,
1515        },
1516    };
1517
1518    /// Interesting sizes for testing.
1519    pub const INTERESTING_SIZES: [usize; 8] = [
1520        0,               // annoying corner case - always present, handled by the api
1521        1,               // less than 1 chunk, data inline, outboard not needed
1522        1024,            // exactly 1 chunk, data inline, outboard not needed
1523        1024 * 16 - 1,   // less than 1 chunk group, data inline, outboard not needed
1524        1024 * 16,       // exactly 1 chunk group, data inline, outboard not needed
1525        1024 * 16 + 1,   // data file, outboard inline (just 1 hash pair)
1526        1024 * 1024,     // data file, outboard inline (many hash pairs)
1527        1024 * 1024 * 8, // data file, outboard file
1528    ];
1529
1530    /// Create n0 flavoured bao. Note that this can be used to request ranges below a chunk group size,
1531    /// which can not be exported via bao because we don't store hashes below the chunk group level.
1532    pub fn create_n0_bao(data: &[u8], ranges: &ChunkRanges) -> anyhow::Result<(Hash, Vec<u8>)> {
1533        let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
1534        let mut encoded = Vec::new();
1535        let size = data.len() as u64;
1536        encoded.extend_from_slice(&size.to_le_bytes());
1537        bao_tree::io::sync::encode_ranges_validated(data, &outboard, ranges, &mut encoded)?;
1538        Ok((outboard.root.into(), encoded))
1539    }
1540
1541    pub fn round_up_request(size: u64, ranges: &ChunkRanges) -> ChunkRanges {
1542        let last_chunk = ChunkNum::chunks(size);
1543        let data_range = ChunkRanges::from(..last_chunk);
1544        let ranges = if !data_range.intersects(ranges) && !ranges.is_empty() {
1545            if last_chunk == 0 {
1546                ChunkRanges::all()
1547            } else {
1548                ChunkRanges::from(last_chunk - 1..)
1549            }
1550        } else {
1551            ranges.clone()
1552        };
1553        round_up_to_chunks_groups(ranges, IROH_BLOCK_SIZE)
1554    }
1555
1556    fn create_n0_bao_full(
1557        data: &[u8],
1558        ranges: &ChunkRanges,
1559    ) -> anyhow::Result<(Hash, ChunkRanges, Vec<u8>)> {
1560        let ranges = round_up_request(data.len() as u64, ranges);
1561        let (hash, encoded) = create_n0_bao(data, &ranges)?;
1562        Ok((hash, ranges, encoded))
1563    }
1564
1565    #[tokio::test]
1566    // #[traced_test]
1567    async fn test_observe() -> TestResult<()> {
1568        tracing_subscriber::fmt::try_init().ok();
1569        let testdir = tempfile::tempdir()?;
1570        let db_dir = testdir.path().join("db");
1571        let options = Options::new(&db_dir);
1572        let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options).await?;
1573        let sizes = INTERESTING_SIZES;
1574        for size in sizes {
1575            let data = test_data(size);
1576            let ranges = ChunkRanges::all();
1577            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1578            let obs = store.observe(hash);
1579            let task = tokio::spawn(async move {
1580                obs.await_completion().await?;
1581                api::Result::Ok(())
1582            });
1583            store.import_bao_bytes(hash, ranges, bao).await?;
1584            task.await??;
1585        }
1586        Ok(())
1587    }
1588
1589    /// Generate test data for size n.
1590    ///
1591    /// We don't really care about the content, since we assume blake3 works.
1592    /// The only thing it should not be is all zeros, since that is what you
1593    /// will get for a gap.
1594    pub fn test_data(n: usize) -> Bytes {
1595        let mut res = Vec::with_capacity(n);
1596        // Using uppercase A-Z (65-90), 26 possible characters
1597        for i in 0..n {
1598            // Change character every 1024 bytes
1599            let block_num = i / 1024;
1600            // Map to uppercase A-Z range (65-90)
1601            let ascii_val = 65 + (block_num % 26) as u8;
1602            res.push(ascii_val);
1603        }
1604        Bytes::from(res)
1605    }
1606
1607    // import data via import_bytes, check that we can observe it and that it is complete
1608    #[tokio::test]
1609    async fn test_import_byte_stream() -> TestResult<()> {
1610        tracing_subscriber::fmt::try_init().ok();
1611        let testdir = tempfile::tempdir()?;
1612        let db_dir = testdir.path().join("db");
1613        let store = FsStore::load(db_dir).await?;
1614        for size in INTERESTING_SIZES {
1615            let expected = test_data(size);
1616            let expected_hash = Hash::new(&expected);
1617            let stream = bytes_to_stream(expected.clone(), 1023);
1618            let obs = store.observe(expected_hash);
1619            let tt = store.add_stream(stream).await.temp_tag().await?;
1620            assert_eq!(expected_hash, *tt.hash());
1621            // we must at some point see completion, otherwise the test will hang
1622            obs.await_completion().await?;
1623            let actual = store.get_bytes(expected_hash).await?;
1624            // check that the data is there
1625            assert_eq!(&expected, &actual);
1626        }
1627        Ok(())
1628    }
1629
1630    // import data via import_bytes, check that we can observe it and that it is complete
1631    #[tokio::test]
1632    async fn test_import_bytes_simple() -> TestResult<()> {
1633        tracing_subscriber::fmt::try_init().ok();
1634        let testdir = tempfile::tempdir()?;
1635        let db_dir = testdir.path().join("db");
1636        let store = FsStore::load(&db_dir).await?;
1637        let sizes = INTERESTING_SIZES;
1638        trace!("{}", Options::new(&db_dir).is_inlined_data(16385));
1639        for size in sizes {
1640            let expected = test_data(size);
1641            let expected_hash = Hash::new(&expected);
1642            let obs = store.observe(expected_hash);
1643            let tt = store.add_bytes(expected.clone()).await?;
1644            assert_eq!(expected_hash, tt.hash);
1645            // we must at some point see completion, otherwise the test will hang
1646            obs.await_completion().await?;
1647            let actual = store.get_bytes(expected_hash).await?;
1648            // check that the data is there
1649            assert_eq!(&expected, &actual);
1650        }
1651        store.shutdown().await?;
1652        dump_dir_full(db_dir)?;
1653        Ok(())
1654    }
1655
1656    // import data via import_bytes, check that we can observe it and that it is complete
1657    #[tokio::test]
1658    #[ignore = "flaky. I need a reliable way to keep the handle alive"]
1659    async fn test_roundtrip_bytes_small() -> TestResult<()> {
1660        tracing_subscriber::fmt::try_init().ok();
1661        let testdir = tempfile::tempdir()?;
1662        let db_dir = testdir.path().join("db");
1663        let store = FsStore::load(db_dir).await?;
1664        for size in INTERESTING_SIZES
1665            .into_iter()
1666            .filter(|x| *x != 0 && *x <= IROH_BLOCK_SIZE.bytes())
1667        {
1668            let expected = test_data(size);
1669            let expected_hash = Hash::new(&expected);
1670            let obs = store.observe(expected_hash);
1671            let tt = store.add_bytes(expected.clone()).await?;
1672            assert_eq!(expected_hash, tt.hash);
1673            let actual = store.get_bytes(expected_hash).await?;
1674            // check that the data is there
1675            assert_eq!(&expected, &actual);
1676            assert_eq!(
1677                &expected.addr(),
1678                &actual.addr(),
1679                "address mismatch for size {size}"
1680            );
1681            // we must at some point see completion, otherwise the test will hang
1682            // keep the handle alive by observing until the end, otherwise the handle
1683            // will change and the bytes won't be the same instance anymore
1684            obs.await_completion().await?;
1685        }
1686        store.shutdown().await?;
1687        Ok(())
1688    }
1689
1690    // import data via import_bytes, check that we can observe it and that it is complete
1691    #[tokio::test]
1692    async fn test_import_path() -> TestResult<()> {
1693        tracing_subscriber::fmt::try_init().ok();
1694        let testdir = tempfile::tempdir()?;
1695        let db_dir = testdir.path().join("db");
1696        let store = FsStore::load(db_dir).await?;
1697        for size in INTERESTING_SIZES {
1698            let expected = test_data(size);
1699            let expected_hash = Hash::new(&expected);
1700            let path = testdir.path().join(format!("in-{size}"));
1701            fs::write(&path, &expected)?;
1702            let obs = store.observe(expected_hash);
1703            let tt = store.add_path(&path).await?;
1704            assert_eq!(expected_hash, tt.hash);
1705            // we must at some point see completion, otherwise the test will hang
1706            obs.await_completion().await?;
1707            let actual = store.get_bytes(expected_hash).await?;
1708            // check that the data is there
1709            assert_eq!(&expected, &actual, "size={size}");
1710        }
1711        dump_dir_full(testdir.path())?;
1712        Ok(())
1713    }
1714
1715    // import data via import_bytes, check that we can observe it and that it is complete
1716    #[tokio::test]
1717    async fn test_export_path() -> TestResult<()> {
1718        tracing_subscriber::fmt::try_init().ok();
1719        let testdir = tempfile::tempdir()?;
1720        let db_dir = testdir.path().join("db");
1721        let store = FsStore::load(db_dir).await?;
1722        for size in INTERESTING_SIZES {
1723            let expected = test_data(size);
1724            let expected_hash = Hash::new(&expected);
1725            let tt = store.add_bytes(expected.clone()).await?;
1726            assert_eq!(expected_hash, tt.hash);
1727            let out_path = testdir.path().join(format!("out-{size}"));
1728            store.export(expected_hash, &out_path).await?;
1729            let actual = fs::read(&out_path)?;
1730            assert_eq!(expected, actual);
1731        }
1732        Ok(())
1733    }
1734
1735    #[tokio::test]
1736    async fn test_import_bao_ranges() -> TestResult<()> {
1737        tracing_subscriber::fmt::try_init().ok();
1738        let testdir = tempfile::tempdir()?;
1739        let db_dir = testdir.path().join("db");
1740        {
1741            let store = FsStore::load(&db_dir).await?;
1742            let data = test_data(100000);
1743            let ranges = ChunkRanges::chunks(16..32);
1744            let (hash, bao) = create_n0_bao(&data, &ranges)?;
1745            store
1746                .import_bao_bytes(hash, ranges.clone(), bao.clone())
1747                .await?;
1748            let bitfield = store.observe(hash).await?;
1749            assert_eq!(bitfield.ranges, ranges);
1750            assert_eq!(bitfield.size(), data.len() as u64);
1751            let export = store.export_bao(hash, ranges).bao_to_vec().await?;
1752            assert_eq!(export, bao);
1753        }
1754        Ok(())
1755    }
1756
1757    #[tokio::test]
1758    async fn test_import_bao_minimal() -> TestResult<()> {
1759        tracing_subscriber::fmt::try_init().ok();
1760        let testdir = tempfile::tempdir()?;
1761        let sizes = [1];
1762        let db_dir = testdir.path().join("db");
1763        {
1764            let store = FsStore::load(&db_dir).await?;
1765            for size in sizes {
1766                let data = vec![0u8; size];
1767                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1768                let data = Bytes::from(encoded);
1769                store
1770                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1771                    .await?;
1772            }
1773            store.shutdown().await?;
1774        }
1775        Ok(())
1776    }
1777
1778    #[tokio::test]
1779    async fn test_import_bao_simple() -> TestResult<()> {
1780        tracing_subscriber::fmt::try_init().ok();
1781        let testdir = tempfile::tempdir()?;
1782        let sizes = [1048576];
1783        let db_dir = testdir.path().join("db");
1784        {
1785            let store = FsStore::load(&db_dir).await?;
1786            for size in sizes {
1787                let data = vec![0u8; size];
1788                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1789                let data = Bytes::from(encoded);
1790                trace!("importing size={}", size);
1791                store
1792                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1793                    .await?;
1794            }
1795            store.shutdown().await?;
1796        }
1797        Ok(())
1798    }
1799
1800    #[tokio::test]
1801    async fn test_import_bao_persistence_full() -> TestResult<()> {
1802        tracing_subscriber::fmt::try_init().ok();
1803        let testdir = tempfile::tempdir()?;
1804        let sizes = INTERESTING_SIZES;
1805        let db_dir = testdir.path().join("db");
1806        {
1807            let store = FsStore::load(&db_dir).await?;
1808            for size in sizes {
1809                let data = vec![0u8; size];
1810                let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1811                let data = Bytes::from(encoded);
1812                store
1813                    .import_bao_bytes(hash, ChunkRanges::all(), data)
1814                    .await?;
1815            }
1816            store.shutdown().await?;
1817        }
1818        {
1819            let store = FsStore::load(&db_dir).await?;
1820            for size in sizes {
1821                let expected = vec![0u8; size];
1822                let hash = Hash::new(&expected);
1823                let actual = store
1824                    .export_bao(hash, ChunkRanges::all())
1825                    .data_to_vec()
1826                    .await?;
1827                assert_eq!(&expected, &actual);
1828            }
1829            store.shutdown().await?;
1830        }
1831        Ok(())
1832    }
1833
1834    #[tokio::test]
1835    async fn test_import_bao_persistence_just_size() -> TestResult<()> {
1836        tracing_subscriber::fmt::try_init().ok();
1837        let testdir = tempfile::tempdir()?;
1838        let sizes = INTERESTING_SIZES;
1839        let db_dir = testdir.path().join("db");
1840        let just_size = ChunkRanges::last_chunk();
1841        {
1842            let store = FsStore::load(&db_dir).await?;
1843            for size in sizes {
1844                let data = test_data(size);
1845                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1846                let data = Bytes::from(encoded);
1847                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1848                    panic!("failed to import size={size}: {cause}");
1849                }
1850            }
1851            store.dump().await?;
1852            store.shutdown().await?;
1853        }
1854        {
1855            let store = FsStore::load(&db_dir).await?;
1856            store.dump().await?;
1857            for size in sizes {
1858                let data = test_data(size);
1859                let (hash, ranges, expected) = create_n0_bao_full(&data, &just_size)?;
1860                let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1861                    Ok(actual) => actual,
1862                    Err(cause) => panic!("failed to export size={size}: {cause}"),
1863                };
1864                assert_eq!(&expected, &actual);
1865            }
1866            store.shutdown().await?;
1867        }
1868        dump_dir_full(testdir.path())?;
1869        Ok(())
1870    }
1871
1872    #[tokio::test]
1873    async fn test_import_bao_persistence_two_stages() -> TestResult<()> {
1874        tracing_subscriber::fmt::try_init().ok();
1875        let testdir = tempfile::tempdir()?;
1876        let sizes = INTERESTING_SIZES;
1877        let db_dir = testdir.path().join("db");
1878        let just_size = ChunkRanges::last_chunk();
1879        // stage 1, import just the last full chunk group to get a validated size
1880        {
1881            let store = FsStore::load(&db_dir).await?;
1882            for size in sizes {
1883                let data = test_data(size);
1884                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1885                let data = Bytes::from(encoded);
1886                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1887                    panic!("failed to import size={size}: {cause}");
1888                }
1889            }
1890            store.dump().await?;
1891            store.shutdown().await?;
1892        }
1893        dump_dir_full(testdir.path())?;
1894        // stage 2, import the rest
1895        {
1896            let store = FsStore::load(&db_dir).await?;
1897            for size in sizes {
1898                let remaining = ChunkRanges::all() - round_up_request(size as u64, &just_size);
1899                if remaining.is_empty() {
1900                    continue;
1901                }
1902                let data = test_data(size);
1903                let (hash, ranges, encoded) = create_n0_bao_full(&data, &remaining)?;
1904                let data = Bytes::from(encoded);
1905                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1906                    panic!("failed to import size={size}: {cause}");
1907                }
1908            }
1909            store.dump().await?;
1910            store.shutdown().await?;
1911        }
1912        // check if the data is complete
1913        {
1914            let store = FsStore::load(&db_dir).await?;
1915            store.dump().await?;
1916            for size in sizes {
1917                let data = test_data(size);
1918                let (hash, ranges, expected) = create_n0_bao_full(&data, &ChunkRanges::all())?;
1919                let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1920                    Ok(actual) => actual,
1921                    Err(cause) => panic!("failed to export size={size}: {cause}"),
1922                };
1923                assert_eq!(&expected, &actual);
1924            }
1925            store.dump().await?;
1926            store.shutdown().await?;
1927        }
1928        dump_dir_full(testdir.path())?;
1929        Ok(())
1930    }
1931
1932    fn just_size() -> ChunkRanges {
1933        ChunkRanges::last_chunk()
1934    }
1935
1936    #[tokio::test]
1937    async fn test_import_bao_persistence_observe() -> TestResult<()> {
1938        tracing_subscriber::fmt::try_init().ok();
1939        let testdir = tempfile::tempdir()?;
1940        let sizes = INTERESTING_SIZES;
1941        let db_dir = testdir.path().join("db");
1942        let just_size = just_size();
1943        // stage 1, import just the last full chunk group to get a validated size
1944        {
1945            let store = FsStore::load(&db_dir).await?;
1946            for size in sizes {
1947                let data = test_data(size);
1948                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1949                let data = Bytes::from(encoded);
1950                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1951                    panic!("failed to import size={size}: {cause}");
1952                }
1953            }
1954            store.dump().await?;
1955            store.shutdown().await?;
1956        }
1957        dump_dir_full(testdir.path())?;
1958        // stage 2, import the rest
1959        {
1960            let store = FsStore::load(&db_dir).await?;
1961            for size in sizes {
1962                let expected_ranges = round_up_request(size as u64, &just_size);
1963                let data = test_data(size);
1964                let hash = Hash::new(&data);
1965                let bitfield = store.observe(hash).await?;
1966                assert_eq!(bitfield.ranges, expected_ranges);
1967            }
1968            store.dump().await?;
1969            store.shutdown().await?;
1970        }
1971        Ok(())
1972    }
1973
1974    #[tokio::test]
1975    async fn test_import_bao_persistence_recover() -> TestResult<()> {
1976        tracing_subscriber::fmt::try_init().ok();
1977        let testdir = tempfile::tempdir()?;
1978        let sizes = INTERESTING_SIZES;
1979        let db_dir = testdir.path().join("db");
1980        let options = Options::new(&db_dir);
1981        let just_size = just_size();
1982        // stage 1, import just the last full chunk group to get a validated size
1983        {
1984            let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1985            for size in sizes {
1986                let data = test_data(size);
1987                let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1988                let data = Bytes::from(encoded);
1989                if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1990                    panic!("failed to import size={size}: {cause}");
1991                }
1992            }
1993            store.dump().await?;
1994            store.shutdown().await?;
1995        }
1996        delete_rec(testdir.path(), "bitfield")?;
1997        dump_dir_full(testdir.path())?;
1998        // stage 2, import the rest
1999        {
2000            let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
2001            for size in sizes {
2002                let expected_ranges = round_up_request(size as u64, &just_size);
2003                let data = test_data(size);
2004                let hash = Hash::new(&data);
2005                let bitfield = store.observe(hash).await?;
2006                assert_eq!(bitfield.ranges, expected_ranges, "size={size}");
2007            }
2008            store.dump().await?;
2009            store.shutdown().await?;
2010        }
2011        Ok(())
2012    }
2013
2014    #[tokio::test]
2015    async fn test_import_bytes_persistence_full() -> TestResult<()> {
2016        tracing_subscriber::fmt::try_init().ok();
2017        let testdir = tempfile::tempdir()?;
2018        let sizes = INTERESTING_SIZES;
2019        let db_dir = testdir.path().join("db");
2020        {
2021            let store = FsStore::load(&db_dir).await?;
2022            let mut tts = Vec::new();
2023            for size in sizes {
2024                let data = test_data(size);
2025                let data = data;
2026                tts.push(store.add_bytes(data.clone()).await?);
2027            }
2028            store.dump().await?;
2029            store.shutdown().await?;
2030        }
2031        {
2032            let store = FsStore::load(&db_dir).await?;
2033            store.dump().await?;
2034            for size in sizes {
2035                let expected = test_data(size);
2036                let hash = Hash::new(&expected);
2037                let Ok(actual) = store
2038                    .export_bao(hash, ChunkRanges::all())
2039                    .data_to_vec()
2040                    .await
2041                else {
2042                    panic!("failed to export size={size}");
2043                };
2044                assert_eq!(&expected, &actual, "size={size}");
2045            }
2046            store.shutdown().await?;
2047        }
2048        Ok(())
2049    }
2050
2051    async fn test_batch(store: &Store) -> TestResult<()> {
2052        let batch = store.blobs().batch().await?;
2053        let tt1 = batch.temp_tag(Hash::new("foo")).await?;
2054        let tt2 = batch.add_slice("boo").await?;
2055        let tts = store
2056            .tags()
2057            .list_temp_tags()
2058            .await?
2059            .collect::<HashSet<_>>()
2060            .await;
2061        assert!(tts.contains(tt1.hash_and_format()));
2062        assert!(tts.contains(tt2.hash_and_format()));
2063        drop(batch);
2064        store.sync_db().await?;
2065        store.wait_idle().await?;
2066        let tts = store
2067            .tags()
2068            .list_temp_tags()
2069            .await?
2070            .collect::<HashSet<_>>()
2071            .await;
2072        // temp tag went out of scope, so it does not work anymore
2073        assert!(!tts.contains(tt1.hash_and_format()));
2074        assert!(!tts.contains(tt2.hash_and_format()));
2075        drop(tt1);
2076        drop(tt2);
2077        Ok(())
2078    }
2079
2080    #[tokio::test]
2081    async fn test_batch_fs() -> TestResult<()> {
2082        tracing_subscriber::fmt::try_init().ok();
2083        let testdir = tempfile::tempdir()?;
2084        let db_dir = testdir.path().join("db");
2085        let store = FsStore::load(db_dir).await?;
2086        test_batch(&store).await
2087    }
2088
2089    #[tokio::test]
2090    async fn smoke() -> TestResult<()> {
2091        tracing_subscriber::fmt::try_init().ok();
2092        let testdir = tempfile::tempdir()?;
2093        let db_dir = testdir.path().join("db");
2094        let store = FsStore::load(db_dir).await?;
2095        let haf = HashAndFormat::raw(Hash::from([0u8; 32]));
2096        store.tags().set(Tag::from("test"), haf).await?;
2097        store.tags().set(Tag::from("boo"), haf).await?;
2098        store.tags().set(Tag::from("bar"), haf).await?;
2099        let sizes = INTERESTING_SIZES;
2100        let mut hashes = Vec::new();
2101        let mut data_by_hash = HashMap::new();
2102        let mut bao_by_hash = HashMap::new();
2103        for size in sizes {
2104            let data = vec![0u8; size];
2105            let data = Bytes::from(data);
2106            let tt = store.add_bytes(data.clone()).temp_tag().await?;
2107            data_by_hash.insert(*tt.hash(), data);
2108            hashes.push(tt);
2109        }
2110        store.sync_db().await?;
2111        for tt in &hashes {
2112            let hash = *tt.hash();
2113            let path = testdir.path().join(format!("{hash}.txt"));
2114            store.export(hash, path).await?;
2115        }
2116        for tt in &hashes {
2117            let hash = tt.hash();
2118            let data = store
2119                .export_bao(*hash, ChunkRanges::all())
2120                .data_to_vec()
2121                .await
2122                .unwrap();
2123            assert_eq!(data, data_by_hash[hash].to_vec());
2124            let bao = store
2125                .export_bao(*hash, ChunkRanges::all())
2126                .bao_to_vec()
2127                .await
2128                .unwrap();
2129            bao_by_hash.insert(*hash, bao);
2130        }
2131        store.dump().await?;
2132
2133        for size in sizes {
2134            let data = test_data(size);
2135            let ranges = ChunkRanges::all();
2136            let (hash, bao) = create_n0_bao(&data, &ranges)?;
2137            store.import_bao_bytes(hash, ranges, bao).await?;
2138        }
2139
2140        for (_hash, _bao_tree) in bao_by_hash {
2141            // let mut reader = Cursor::new(bao_tree);
2142            // let size = reader.read_u64_le().await?;
2143            // let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
2144            // let ranges = ChunkRanges::all();
2145            // let mut decoder = DecodeResponseIter::new(hash, tree, reader, &ranges);
2146            // while let Some(item) = decoder.next() {
2147            //     let item = item?;
2148            // }
2149            // store.import_bao_bytes(hash, ChunkRanges::all(), bao_tree.into()).await?;
2150        }
2151        Ok(())
2152    }
2153
2154    pub fn delete_rec(root_dir: impl AsRef<Path>, extension: &str) -> Result<(), std::io::Error> {
2155        // Remove leading dot if present, so we have just the extension
2156        let ext = extension.trim_start_matches('.').to_lowercase();
2157
2158        for entry in WalkDir::new(root_dir).into_iter().filter_map(|e| e.ok()) {
2159            let path = entry.path();
2160
2161            if path.is_file() {
2162                if let Some(file_ext) = path.extension() {
2163                    if file_ext.to_string_lossy().to_lowercase() == ext {
2164                        fs::remove_file(path)?;
2165                    }
2166                }
2167            }
2168        }
2169
2170        Ok(())
2171    }
2172
2173    pub fn dump_dir(path: impl AsRef<Path>) -> io::Result<()> {
2174        let mut entries: Vec<_> = WalkDir::new(&path)
2175            .into_iter()
2176            .filter_map(Result::ok) // Skip errors
2177            .collect();
2178
2179        // Sort by path (name at each depth)
2180        entries.sort_by(|a, b| a.path().cmp(b.path()));
2181
2182        for entry in entries {
2183            let depth = entry.depth();
2184            let indent = "  ".repeat(depth); // Two spaces per level
2185            let name = entry.file_name().to_string_lossy();
2186            let size = entry.metadata()?.len(); // Size in bytes
2187
2188            if entry.file_type().is_file() {
2189                println!("{indent}{name} ({size} bytes)");
2190            } else if entry.file_type().is_dir() {
2191                println!("{indent}{name}/");
2192            }
2193        }
2194        Ok(())
2195    }
2196
2197    pub fn dump_dir_full(path: impl AsRef<Path>) -> io::Result<()> {
2198        let mut entries: Vec<_> = WalkDir::new(&path)
2199            .into_iter()
2200            .filter_map(Result::ok) // Skip errors
2201            .collect();
2202
2203        // Sort by path (name at each depth)
2204        entries.sort_by(|a, b| a.path().cmp(b.path()));
2205
2206        for entry in entries {
2207            let depth = entry.depth();
2208            let indent = "  ".repeat(depth);
2209            let name = entry.file_name().to_string_lossy();
2210
2211            if entry.file_type().is_dir() {
2212                println!("{indent}{name}/");
2213            } else if entry.file_type().is_file() {
2214                let size = entry.metadata()?.len();
2215                println!("{indent}{name} ({size} bytes)");
2216
2217                // Dump depending on file type
2218                let path = entry.path();
2219                if name.ends_with(".data") {
2220                    print!("{indent}  ");
2221                    dump_file(path, 1024 * 16)?;
2222                } else if name.ends_with(".obao4") {
2223                    print!("{indent}  ");
2224                    dump_file(path, 64)?;
2225                } else if name.ends_with(".sizes4") {
2226                    print!("{indent}  ");
2227                    dump_file(path, 8)?;
2228                } else if name.ends_with(".bitfield") {
2229                    match read_checksummed::<Bitfield>(path) {
2230                        Ok(bitfield) => {
2231                            println!("{indent}  bitfield: {bitfield:?}");
2232                        }
2233                        Err(cause) => {
2234                            println!("{indent}  bitfield: error: {cause}");
2235                        }
2236                    }
2237                } else {
2238                    continue; // Skip content dump for other files
2239                };
2240            }
2241        }
2242        Ok(())
2243    }
2244
2245    pub fn dump_file<P: AsRef<Path>>(path: P, chunk_size: u64) -> io::Result<()> {
2246        let bits = file_bits(path, chunk_size)?;
2247        println!("{}", print_bitfield_ansi(bits));
2248        Ok(())
2249    }
2250
2251    pub fn file_bits(path: impl AsRef<Path>, chunk_size: u64) -> io::Result<Vec<bool>> {
2252        let file = fs::File::open(&path)?;
2253        let file_size = file.metadata()?.len();
2254        let mut buffer = vec![0u8; chunk_size as usize];
2255        let mut bits = Vec::new();
2256
2257        let mut offset = 0u64;
2258        while offset < file_size {
2259            let remaining = file_size - offset;
2260            let current_chunk_size = chunk_size.min(remaining);
2261
2262            let chunk = &mut buffer[..current_chunk_size as usize];
2263            file.read_exact_at(offset, chunk)?;
2264
2265            let has_non_zero = chunk.iter().any(|&byte| byte != 0);
2266            bits.push(has_non_zero);
2267
2268            offset += current_chunk_size;
2269        }
2270
2271        Ok(bits)
2272    }
2273
2274    #[allow(dead_code)]
2275    fn print_bitfield(bits: impl IntoIterator<Item = bool>) -> String {
2276        bits.into_iter()
2277            .map(|bit| if bit { '#' } else { '_' })
2278            .collect()
2279    }
2280
2281    fn print_bitfield_ansi(bits: impl IntoIterator<Item = bool>) -> String {
2282        let mut result = String::new();
2283        let mut iter = bits.into_iter();
2284
2285        while let Some(b1) = iter.next() {
2286            let b2 = iter.next();
2287
2288            // ANSI color codes
2289            let white_fg = "\x1b[97m"; // bright white foreground
2290            let reset = "\x1b[0m"; // reset all attributes
2291            let gray_bg = "\x1b[100m"; // bright black (gray) background
2292            let black_bg = "\x1b[40m"; // black background
2293
2294            let colored_char = match (b1, b2) {
2295                (true, Some(true)) => format!("{}{}{}", white_fg, '█', reset), // 11 - solid white on default background
2296                (true, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, '▌', reset), // 10 - left half white on gray background
2297                (false, Some(true)) => format!("{}{}{}{}", gray_bg, white_fg, '▐', reset), // 01 - right half white on gray background
2298                (false, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, ' ', reset), // 00 - space with gray background
2299                (true, None) => format!("{}{}{}{}", black_bg, white_fg, '▌', reset), // 1 (pad 0) - left half white on black background
2300                (false, None) => format!("{}{}{}{}", black_bg, white_fg, ' ', reset), // 0 (pad 0) - space with black background
2301            };
2302
2303            result.push_str(&colored_char);
2304        }
2305
2306        // Ensure we end with a reset code to prevent color bleeding
2307        result.push_str("\x1b[0m");
2308        result
2309    }
2310
2311    fn bytes_to_stream(
2312        bytes: Bytes,
2313        chunk_size: usize,
2314    ) -> impl Stream<Item = io::Result<Bytes>> + 'static {
2315        assert!(chunk_size > 0, "Chunk size must be greater than 0");
2316        stream::unfold((bytes, 0), move |(bytes, offset)| async move {
2317            if offset >= bytes.len() {
2318                None
2319            } else {
2320                let chunk_len = chunk_size.min(bytes.len() - offset);
2321                let chunk = bytes.slice(offset..offset + chunk_len);
2322                Some((Ok(chunk), (bytes, offset + chunk_len)))
2323            }
2324        })
2325    }
2326}