iroh_blobs/store/
mem.rs

1//! Mutable in-memory blob store.
2//!
3//! Being a memory store, this store has to import all data into memory before it can
4//! serve it. So the amount of data you can serve is limited by your available memory.
5//! Other than that this is a fully featured store that provides all features such as
6//! tags and garbage collection.
7//!
8//! For many use cases this can be quite useful, since it does not require write access
9//! to the file system.
10use std::{
11    collections::{BTreeMap, HashMap, HashSet},
12    future::Future,
13    io::{self, Write},
14    num::NonZeroU64,
15    ops::Deref,
16    sync::Arc,
17};
18
19use bao_tree::{
20    blake3,
21    io::{
22        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
23        outboard::PreOrderMemOutboard,
24        sync::{Outboard, ReadAt, WriteAt},
25        BaoContentItem, EncodeError, Leaf,
26    },
27    BaoTree, ChunkNum, ChunkRanges, TreeNode,
28};
29use bytes::Bytes;
30use irpc::channel::mpsc;
31use n0_future::{
32    future::yield_now,
33    task::{JoinError, JoinSet},
34    time::SystemTime,
35};
36use range_collections::range_set::RangeSetRange;
37use tokio::sync::watch;
38use tracing::{error, info, instrument, trace, Instrument};
39
40use super::util::{BaoTreeSender, PartialMemStorage};
41use crate::{
42    api::{
43        self,
44        blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
45        proto::{
46            BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
47            CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
48            DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
49            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
50            ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
51            ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
52            ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
53            SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
54        },
55        tags::TagInfo,
56        ApiClient,
57    },
58    protocol::ChunkRangesExt,
59    store::{
60        gc::{run_gc, GcConfig},
61        util::{SizeInfo, SparseMemFile, Tag},
62        IROH_BLOCK_SIZE,
63    },
64    util::temp_tag::{TagDrop, TempTagScope, TempTags},
65    BlobFormat, Hash, HashAndFormat,
66};
67
68#[derive(Debug, Default)]
69pub struct Options {
70    pub gc_config: Option<GcConfig>,
71}
72
73#[derive(Debug, Clone)]
74#[repr(transparent)]
75pub struct MemStore {
76    client: ApiClient,
77}
78
79impl From<MemStore> for crate::api::Store {
80    fn from(value: MemStore) -> Self {
81        crate::api::Store::from_sender(value.client)
82    }
83}
84
85impl AsRef<crate::api::Store> for MemStore {
86    fn as_ref(&self) -> &crate::api::Store {
87        crate::api::Store::ref_from_sender(&self.client)
88    }
89}
90
91impl Deref for MemStore {
92    type Target = crate::api::Store;
93
94    fn deref(&self) -> &Self::Target {
95        crate::api::Store::ref_from_sender(&self.client)
96    }
97}
98
99impl Default for MemStore {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105#[derive(derive_more::From)]
106enum TaskResult {
107    Unit(()),
108    Import(anyhow::Result<ImportEntry>),
109    Scope(Scope),
110}
111
112impl MemStore {
113    pub fn from_sender(client: ApiClient) -> Self {
114        Self { client }
115    }
116
117    pub fn new() -> Self {
118        Self::new_with_opts(Options::default())
119    }
120
121    pub fn new_with_opts(opts: Options) -> Self {
122        let (sender, receiver) = tokio::sync::mpsc::channel(32);
123        n0_future::task::spawn(
124            Actor {
125                commands: receiver,
126                tasks: JoinSet::new(),
127                state: State {
128                    data: HashMap::new(),
129                    tags: BTreeMap::new(),
130                    empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
131                },
132                options: Arc::new(Options::default()),
133                temp_tags: Default::default(),
134                protected: Default::default(),
135                idle_waiters: Default::default(),
136            }
137            .run(),
138        );
139
140        let store = Self::from_sender(sender.into());
141        if let Some(gc_config) = opts.gc_config {
142            n0_future::task::spawn(run_gc(store.deref().clone(), gc_config));
143        }
144
145        store
146    }
147}
148
149struct Actor {
150    commands: tokio::sync::mpsc::Receiver<Command>,
151    tasks: JoinSet<TaskResult>,
152    state: State,
153    #[allow(dead_code)]
154    options: Arc<Options>,
155    // temp tags
156    temp_tags: TempTags,
157    // idle waiters
158    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
159    protected: HashSet<Hash>,
160}
161
162impl Actor {
163    fn spawn<F, T>(&mut self, f: F)
164    where
165        F: Future<Output = T> + Send + 'static,
166        T: Into<TaskResult>,
167    {
168        let span = tracing::Span::current();
169        let fut = async move { f.await.into() }.instrument(span);
170        self.tasks.spawn(fut);
171    }
172
173    async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
174        match cmd {
175            Command::ImportBao(ImportBaoMsg {
176                inner: ImportBaoRequest { hash, size },
177                rx: data,
178                tx,
179                ..
180            }) => {
181                let entry = self.get_or_create_entry(hash);
182                self.spawn(import_bao(entry, size, data, tx));
183            }
184            Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
185                trace!("wait idle");
186                if self.tasks.is_empty() {
187                    // we are currently idle
188                    tx.send(()).await.ok();
189                } else {
190                    // wait for idle state
191                    self.idle_waiters.push(tx);
192                }
193            }
194            Command::Observe(ObserveMsg {
195                inner: ObserveRequest { hash },
196                tx,
197                ..
198            }) => {
199                let entry = self.get_or_create_entry(hash);
200                self.spawn(observe(entry, tx));
201            }
202            Command::ImportBytes(ImportBytesMsg {
203                inner:
204                    ImportBytesRequest {
205                        data,
206                        scope,
207                        format,
208                        ..
209                    },
210                tx,
211                ..
212            }) => {
213                self.spawn(import_bytes(data, scope, format, tx));
214            }
215            Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
216                self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
217            }
218            Command::ImportPath(cmd) => {
219                self.spawn(import_path(cmd));
220            }
221            Command::ExportBao(ExportBaoMsg {
222                inner: ExportBaoRequest { hash, ranges },
223                tx,
224                ..
225            }) => {
226                let entry = self.get(&hash);
227                self.spawn(export_bao(entry, ranges, tx))
228            }
229            Command::ExportPath(cmd) => {
230                let entry = self.get(&cmd.hash);
231                self.spawn(export_path(entry, cmd));
232            }
233            Command::DeleteTags(cmd) => {
234                let DeleteTagsMsg {
235                    inner: DeleteTagsRequest { from, to },
236                    tx,
237                    ..
238                } = cmd;
239                info!("deleting tags from {:?} to {:?}", from, to);
240                // state.tags.remove(&from.unwrap());
241                // todo: more efficient impl
242                let mut deleted = 0;
243                self.state.tags.retain(|tag, _| {
244                    if let Some(from) = &from {
245                        if tag < from {
246                            return true;
247                        }
248                    }
249                    if let Some(to) = &to {
250                        if tag >= to {
251                            return true;
252                        }
253                    }
254                    info!("    removing {:?}", tag);
255                    deleted += 1;
256                    false
257                });
258                tx.send(Ok(deleted)).await.ok();
259            }
260            Command::RenameTag(cmd) => {
261                let RenameTagMsg {
262                    inner: RenameTagRequest { from, to },
263                    tx,
264                    ..
265                } = cmd;
266                let tags = &mut self.state.tags;
267                let value = match tags.remove(&from) {
268                    Some(value) => value,
269                    None => {
270                        tx.send(Err(api::Error::io(
271                            io::ErrorKind::NotFound,
272                            format!("tag not found: {from:?}"),
273                        )))
274                        .await
275                        .ok();
276                        return None;
277                    }
278                };
279                tags.insert(to, value);
280                tx.send(Ok(())).await.ok();
281                return None;
282            }
283            Command::ListTags(cmd) => {
284                let ListTagsMsg {
285                    inner:
286                        ListTagsRequest {
287                            from,
288                            to,
289                            raw,
290                            hash_seq,
291                        },
292                    tx,
293                    ..
294                } = cmd;
295                let tags = self
296                    .state
297                    .tags
298                    .iter()
299                    .filter(move |(tag, value)| {
300                        if let Some(from) = &from {
301                            if tag < &from {
302                                return false;
303                            }
304                        }
305                        if let Some(to) = &to {
306                            if tag >= &to {
307                                return false;
308                            }
309                        }
310                        raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
311                    })
312                    .map(|(tag, value)| TagInfo {
313                        name: tag.clone(),
314                        hash: value.hash,
315                        format: value.format,
316                    })
317                    .map(Ok);
318                tx.send(tags.collect()).await.ok();
319            }
320            Command::SetTag(SetTagMsg {
321                inner: SetTagRequest { name: tag, value },
322                tx,
323                ..
324            }) => {
325                self.state.tags.insert(tag, value);
326                tx.send(Ok(())).await.ok();
327            }
328            Command::CreateTag(CreateTagMsg {
329                inner: CreateTagRequest { value },
330                tx,
331                ..
332            }) => {
333                let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
334                self.state.tags.insert(tag.clone(), value);
335                tx.send(Ok(tag)).await.ok();
336            }
337            Command::CreateTempTag(cmd) => {
338                trace!("{cmd:?}");
339                self.create_temp_tag(cmd).await;
340            }
341            Command::ListTempTags(cmd) => {
342                trace!("{cmd:?}");
343                let tts = self.temp_tags.list();
344                cmd.tx.send(tts).await.ok();
345            }
346            Command::ListBlobs(cmd) => {
347                let ListBlobsMsg { tx, .. } = cmd;
348                let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
349                self.spawn(async move {
350                    for blob in blobs {
351                        if tx.send(Ok(blob)).await.is_err() {
352                            break;
353                        }
354                    }
355                });
356            }
357            Command::BlobStatus(cmd) => {
358                trace!("{cmd:?}");
359                let BlobStatusMsg {
360                    inner: BlobStatusRequest { hash },
361                    tx,
362                    ..
363                } = cmd;
364                let res = match self.get(&hash) {
365                    None => api::blobs::BlobStatus::NotFound,
366                    Some(x) => {
367                        let bitfield = x.0.state.borrow().bitfield();
368                        if bitfield.is_complete() {
369                            BlobStatus::Complete {
370                                size: bitfield.size,
371                            }
372                        } else {
373                            BlobStatus::Partial {
374                                size: bitfield.validated_size(),
375                            }
376                        }
377                    }
378                };
379                tx.send(res).await.ok();
380            }
381            Command::DeleteBlobs(cmd) => {
382                trace!("{cmd:?}");
383                let DeleteBlobsMsg {
384                    inner: BlobDeleteRequest { hashes, force },
385                    tx,
386                    ..
387                } = cmd;
388                for hash in hashes {
389                    if !force && self.protected.contains(&hash) {
390                        continue;
391                    }
392                    self.state.data.remove(&hash);
393                }
394                tx.send(Ok(())).await.ok();
395            }
396            Command::Batch(cmd) => {
397                trace!("{cmd:?}");
398                let (id, scope) = self.temp_tags.create_scope();
399                self.spawn(handle_batch(cmd, id, scope));
400            }
401            Command::ClearProtected(cmd) => {
402                self.protected.clear();
403                cmd.tx.send(Ok(())).await.ok();
404            }
405            Command::ExportRanges(cmd) => {
406                let entry = self.get(&cmd.hash);
407                self.spawn(export_ranges(cmd, entry));
408            }
409            Command::SyncDb(SyncDbMsg { tx, .. }) => {
410                tx.send(Ok(())).await.ok();
411            }
412            Command::Shutdown(cmd) => {
413                return Some(cmd);
414            }
415        }
416        None
417    }
418
419    fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
420        if *hash == Hash::EMPTY {
421            Some(self.state.empty_hash.clone())
422        } else {
423            self.state.data.get(hash).cloned()
424        }
425    }
426
427    fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
428        if hash == Hash::EMPTY {
429            self.state.empty_hash.clone()
430        } else {
431            self.state
432                .data
433                .entry(hash)
434                .or_insert_with(|| BaoFileHandle::new_partial(hash))
435                .clone()
436        }
437    }
438
439    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
440        let CreateTempTagMsg { tx, inner, .. } = cmd;
441        let mut tt = self.temp_tags.create(inner.scope, inner.value);
442        if tx.is_rpc() {
443            tt.leak();
444        }
445        tx.send(tt).await.ok();
446    }
447
448    async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
449        let import_data = match res {
450            Ok(entry) => entry,
451            Err(e) => {
452                error!("import failed: {e}");
453                return;
454            }
455        };
456        let hash = import_data.outboard.root().into();
457        let entry = self.get_or_create_entry(hash);
458        entry
459            .0
460            .state
461            .send_if_modified(|state: &mut BaoFileStorage| {
462                let BaoFileStorage::Partial(_) = state.deref() else {
463                    return false;
464                };
465                *state =
466                    CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
467                true
468            });
469        let tt = self.temp_tags.create(
470            import_data.scope,
471            HashAndFormat {
472                hash,
473                format: import_data.format,
474            },
475        );
476        import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
477    }
478
479    fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
480        match res {
481            Ok(x) => Some(x),
482            Err(e) => {
483                if e.is_cancelled() {
484                    trace!("task cancelled: {e}");
485                } else {
486                    error!("task failed: {e}");
487                }
488                None
489            }
490        }
491    }
492
493    pub async fn run(mut self) {
494        let shutdown = loop {
495            tokio::select! {
496                cmd = self.commands.recv() => {
497                    let Some(cmd) = cmd else {
498                        // last sender has been dropped.
499                        // exit immediately.
500                        break None;
501                    };
502                    if let Some(cmd) = self.handle_command(cmd).await {
503                        break Some(cmd);
504                    }
505                }
506                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
507                    let Some(res) = self.log_task_result(res) else {
508                        continue;
509                    };
510                    match res {
511                        TaskResult::Import(res) => {
512                            self.finish_import(res).await;
513                        }
514                        TaskResult::Scope(scope) => {
515                            self.temp_tags.end_scope(scope);
516                        }
517                        TaskResult::Unit(_) => {}
518                    }
519                    if self.tasks.is_empty() {
520                        // we are idle now
521                        for tx in self.idle_waiters.drain(..) {
522                            tx.send(()).await.ok();
523                        }
524                    }
525                }
526            }
527        };
528        if let Some(shutdown) = shutdown {
529            shutdown.tx.send(()).await.ok();
530        }
531    }
532}
533
534async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
535    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
536        error!("batch failed: {cause}");
537    }
538    id
539}
540
541async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
542    let BatchMsg { tx, mut rx, .. } = cmd;
543    trace!("created scope {}", id);
544    tx.send(id).await.map_err(api::Error::other)?;
545    while let Some(msg) = rx.recv().await? {
546        match msg {
547            BatchResponse::Drop(msg) => scope.on_drop(&msg),
548            BatchResponse::Ping => {}
549        }
550    }
551    Ok(())
552}
553
554async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
555    let Some(entry) = entry else {
556        let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
557        cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
558        return;
559    };
560    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
561        cmd.tx
562            .send(ExportRangesItem::Error(cause.into()))
563            .await
564            .ok();
565    }
566}
567
568async fn export_ranges_impl(
569    cmd: ExportRangesRequest,
570    tx: &mut mpsc::Sender<ExportRangesItem>,
571    entry: BaoFileHandle,
572) -> io::Result<()> {
573    let ExportRangesRequest { ranges, hash } = cmd;
574    let bitfield = entry.bitfield();
575    trace!(
576        "exporting ranges: {hash} {ranges:?} size={}",
577        bitfield.size()
578    );
579    debug_assert!(entry.hash() == hash, "hash mismatch");
580    let data = entry.data_reader();
581    let size = bitfield.size();
582    for range in ranges.iter() {
583        let range = match range {
584            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
585            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
586        };
587        let requested = ChunkRanges::bytes(range.start..range.end);
588        if !bitfield.ranges.is_superset(&requested) {
589            return Err(io::Error::other(format!(
590                "missing range: {requested:?}, present: {bitfield:?}",
591            )));
592        }
593        let bs = 1024;
594        let mut offset = range.start;
595        loop {
596            let end: u64 = (offset + bs).min(range.end);
597            let size = (end - offset) as usize;
598            tx.send(
599                Leaf {
600                    offset,
601                    data: data.read_bytes_at(offset, size)?,
602                }
603                .into(),
604            )
605            .await?;
606            offset = end;
607            if offset >= range.end {
608                break;
609            }
610        }
611    }
612    Ok(())
613}
614
615fn chunk_range(leaf: &Leaf) -> ChunkRanges {
616    let start = ChunkNum::chunks(leaf.offset);
617    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
618    (start..end).into()
619}
620
621async fn import_bao(
622    entry: BaoFileHandle,
623    size: NonZeroU64,
624    mut stream: mpsc::Receiver<BaoContentItem>,
625    tx: irpc::channel::oneshot::Sender<api::Result<()>>,
626) {
627    let size = size.get();
628    entry
629        .0
630        .state
631        .send_if_modified(|state: &mut BaoFileStorage| {
632            let BaoFileStorage::Partial(entry) = state else {
633                // entry was already completed, no need to write
634                return false;
635            };
636            entry.size.write(0, size);
637            false
638        });
639    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
640    while let Some(item) = stream.recv().await.unwrap() {
641        entry.0.state.send_if_modified(|state| {
642            let BaoFileStorage::Partial(partial) = state else {
643                // entry was already completed, no need to write
644                return false;
645            };
646            match item {
647                BaoContentItem::Parent(parent) => {
648                    if let Some(offset) = tree.pre_order_offset(parent.node) {
649                        let mut pair = [0u8; 64];
650                        pair[..32].copy_from_slice(parent.pair.0.as_bytes());
651                        pair[32..].copy_from_slice(parent.pair.1.as_bytes());
652                        partial
653                            .outboard
654                            .write_at(offset * 64, &pair)
655                            .expect("writing to mem can never fail");
656                    }
657                    false
658                }
659                BaoContentItem::Leaf(leaf) => {
660                    let start = leaf.offset;
661                    partial
662                        .data
663                        .write_at(start, &leaf.data)
664                        .expect("writing to mem can never fail");
665                    let added = chunk_range(&leaf);
666                    let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
667                    if update.new_state().complete {
668                        let data = std::mem::take(&mut partial.data);
669                        let outboard = std::mem::take(&mut partial.outboard);
670                        let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
671                        let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
672                        *state = CompleteStorage::new(data, outboard).into();
673                    }
674                    update.changed()
675                }
676            }
677        });
678    }
679    tx.send(Ok(())).await.ok();
680}
681
682#[instrument(skip_all, fields(hash = tracing::field::Empty))]
683async fn export_bao(
684    entry: Option<BaoFileHandle>,
685    ranges: ChunkRanges,
686    mut sender: mpsc::Sender<EncodedItem>,
687) {
688    let Some(entry) = entry else {
689        let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
690        sender.send(err.into()).await.ok();
691        return;
692    };
693    tracing::Span::current().record("hash", tracing::field::display(entry.hash));
694    let data = entry.data_reader();
695    let outboard = entry.outboard_reader();
696    let tx = BaoTreeSender::new(&mut sender);
697    traverse_ranges_validated(data, outboard, &ranges, tx)
698        .await
699        .ok();
700}
701
702#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
703async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
704    entry.subscribe().forward(tx).await.ok();
705}
706
707async fn import_bytes(
708    data: Bytes,
709    scope: Scope,
710    format: BlobFormat,
711    tx: mpsc::Sender<AddProgressItem>,
712) -> anyhow::Result<ImportEntry> {
713    tx.send(AddProgressItem::Size(data.len() as u64)).await?;
714    tx.send(AddProgressItem::CopyDone).await?;
715    let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
716    Ok(ImportEntry {
717        data,
718        outboard,
719        scope,
720        format,
721        tx,
722    })
723}
724
725async fn import_byte_stream(
726    scope: Scope,
727    format: BlobFormat,
728    mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
729    tx: mpsc::Sender<AddProgressItem>,
730) -> anyhow::Result<ImportEntry> {
731    let mut res = Vec::new();
732    loop {
733        match rx.recv().await {
734            Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
735                res.extend_from_slice(&data);
736                tx.send(AddProgressItem::CopyProgress(res.len() as u64))
737                    .await?;
738            }
739            Ok(Some(ImportByteStreamUpdate::Done)) => {
740                break;
741            }
742            Ok(None) => {
743                return Err(api::Error::io(
744                    io::ErrorKind::UnexpectedEof,
745                    "byte stream ended unexpectedly",
746                )
747                .into());
748            }
749            Err(e) => {
750                return Err(e.into());
751            }
752        }
753    }
754    import_bytes(res.into(), scope, format, tx).await
755}
756
757#[cfg(wasm_browser)]
758async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
759    let _: ImportPathRequest = cmd.inner;
760    Err(anyhow::anyhow!(
761        "import_path is not supported in the browser"
762    ))
763}
764
765#[instrument(skip_all, fields(path = %cmd.path.display()))]
766#[cfg(not(wasm_browser))]
767async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
768    use tokio::io::AsyncReadExt;
769    let ImportPathMsg {
770        inner:
771            ImportPathRequest {
772                path,
773                scope,
774                format,
775                ..
776            },
777        tx,
778        ..
779    } = cmd;
780    let mut res = Vec::new();
781    let mut file = tokio::fs::File::open(path).await?;
782    let mut buf = [0u8; 1024 * 64];
783    loop {
784        let size = file.read(&mut buf).await?;
785        if size == 0 {
786            break;
787        }
788        res.extend_from_slice(&buf[..size]);
789        tx.send(AddProgressItem::CopyProgress(res.len() as u64))
790            .await?;
791    }
792    import_bytes(res.into(), scope, format, tx).await
793}
794
795#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
796async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
797    let ExportPathMsg { inner, mut tx, .. } = cmd;
798    let Some(entry) = entry else {
799        tx.send(ExportProgressItem::Error(api::Error::io(
800            io::ErrorKind::NotFound,
801            "hash not found",
802        )))
803        .await
804        .ok();
805        return;
806    };
807    match export_path_impl(entry, inner, &mut tx).await {
808        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
809        Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
810    };
811}
812
813async fn export_path_impl(
814    entry: BaoFileHandle,
815    cmd: ExportPathRequest,
816    tx: &mut mpsc::Sender<ExportProgressItem>,
817) -> io::Result<()> {
818    let ExportPathRequest { target, .. } = cmd;
819    if !target.is_absolute() {
820        return Err(io::Error::new(
821            io::ErrorKind::InvalidInput,
822            "path is not absolute",
823        ));
824    }
825    if let Some(parent) = target.parent() {
826        std::fs::create_dir_all(parent)?;
827    }
828    // todo: for partial entries make sure to only write the part that is actually present
829    let mut file = std::fs::File::create(target)?;
830    let size = entry.0.state.borrow().size();
831    tx.send(ExportProgressItem::Size(size)).await?;
832    let mut buf = [0u8; 1024 * 64];
833    for offset in (0..size).step_by(1024 * 64) {
834        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
835        let buf = &mut buf[..len];
836        entry.0.state.borrow().data().read_exact_at(offset, buf)?;
837        file.write_all(buf)?;
838        tx.try_send(ExportProgressItem::CopyProgress(offset))
839            .await
840            .map_err(|_e| io::Error::other(""))?;
841        yield_now().await;
842    }
843    Ok(())
844}
845
846struct ImportEntry {
847    scope: Scope,
848    format: BlobFormat,
849    data: Bytes,
850    outboard: PreOrderMemOutboard,
851    tx: mpsc::Sender<AddProgressItem>,
852}
853
854pub struct DataReader(BaoFileHandle);
855
856impl ReadBytesAt for DataReader {
857    fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
858        let entry = self.0 .0.state.borrow();
859        entry.data().read_bytes_at(offset, size)
860    }
861}
862
863pub struct OutboardReader {
864    hash: blake3::Hash,
865    tree: BaoTree,
866    data: BaoFileHandle,
867}
868
869impl Outboard for OutboardReader {
870    fn root(&self) -> blake3::Hash {
871        self.hash
872    }
873
874    fn tree(&self) -> BaoTree {
875        self.tree
876    }
877
878    fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
879        let Some(offset) = self.tree.pre_order_offset(node) else {
880            return Ok(None);
881        };
882        let mut buf = [0u8; 64];
883        let size = self
884            .data
885            .0
886            .state
887            .borrow()
888            .outboard()
889            .read_at(offset * 64, &mut buf)?;
890        if size != 64 {
891            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
892        }
893        let left: [u8; 32] = buf[..32].try_into().unwrap();
894        let right: [u8; 32] = buf[32..].try_into().unwrap();
895        Ok(Some((left.into(), right.into())))
896    }
897}
898
899struct State {
900    data: HashMap<Hash, BaoFileHandle>,
901    tags: BTreeMap<Tag, HashAndFormat>,
902    empty_hash: BaoFileHandle,
903}
904
905#[derive(Debug, derive_more::From)]
906pub enum BaoFileStorage {
907    Partial(PartialMemStorage),
908    Complete(CompleteStorage),
909}
910
911impl BaoFileStorage {
912    /// Get the bitfield of the storage.
913    pub fn bitfield(&self) -> Bitfield {
914        match self {
915            Self::Partial(entry) => entry.bitfield.clone(),
916            Self::Complete(entry) => Bitfield::complete(entry.size()),
917        }
918    }
919}
920
921#[derive(Debug)]
922pub struct BaoFileHandleInner {
923    state: watch::Sender<BaoFileStorage>,
924    hash: Hash,
925}
926
927/// A cheaply cloneable handle to a bao file, including the hash
928#[derive(Debug, Clone, derive_more::Deref)]
929pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
930
931impl BaoFileHandle {
932    pub fn new_partial(hash: Hash) -> Self {
933        let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
934            data: SparseMemFile::new(),
935            outboard: SparseMemFile::new(),
936            size: SizeInfo::default(),
937            bitfield: Bitfield::empty(),
938        }));
939        Self(Arc::new(BaoFileHandleInner { state, hash }))
940    }
941
942    pub fn hash(&self) -> Hash {
943        self.hash
944    }
945
946    pub fn bitfield(&self) -> Bitfield {
947        self.0.state.borrow().bitfield()
948    }
949
950    pub fn subscribe(&self) -> BaoFileStorageSubscriber {
951        BaoFileStorageSubscriber::new(self.0.state.subscribe())
952    }
953
954    pub fn data_reader(&self) -> DataReader {
955        DataReader(self.clone())
956    }
957
958    pub fn outboard_reader(&self) -> OutboardReader {
959        let entry = self.0.state.borrow();
960        let hash = self.hash.into();
961        let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
962        OutboardReader {
963            hash,
964            tree,
965            data: self.clone(),
966        }
967    }
968}
969
970impl Default for BaoFileStorage {
971    fn default() -> Self {
972        Self::Partial(Default::default())
973    }
974}
975
976impl BaoFileStorage {
977    fn data(&self) -> &[u8] {
978        match self {
979            Self::Partial(entry) => entry.data.as_ref(),
980            Self::Complete(entry) => &entry.data,
981        }
982    }
983
984    fn outboard(&self) -> &[u8] {
985        match self {
986            Self::Partial(entry) => entry.outboard.as_ref(),
987            Self::Complete(entry) => &entry.outboard,
988        }
989    }
990
991    fn size(&self) -> u64 {
992        match self {
993            Self::Partial(entry) => entry.current_size(),
994            Self::Complete(entry) => entry.size(),
995        }
996    }
997}
998
999#[derive(Debug, Clone)]
1000pub struct CompleteStorage {
1001    pub(crate) data: Bytes,
1002    pub(crate) outboard: Bytes,
1003}
1004
1005impl CompleteStorage {
1006    pub fn create(data: Bytes) -> (Hash, Self) {
1007        let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
1008        let hash = outboard.root().into();
1009        let outboard = outboard.data.into();
1010        let entry = Self::new(data, outboard);
1011        (hash, entry)
1012    }
1013
1014    pub fn new(data: Bytes, outboard: Bytes) -> Self {
1015        Self { data, outboard }
1016    }
1017
1018    pub fn size(&self) -> u64 {
1019        self.data.len() as u64
1020    }
1021}
1022
1023#[allow(dead_code)]
1024fn print_outboard(hashes: &[u8]) {
1025    assert!(hashes.len() % 64 == 0);
1026    for chunk in hashes.chunks(64) {
1027        let left: [u8; 32] = chunk[..32].try_into().unwrap();
1028        let right: [u8; 32] = chunk[32..].try_into().unwrap();
1029        let left = blake3::Hash::from(left);
1030        let right = blake3::Hash::from(right);
1031        println!("l: {left:?}, r: {right:?}");
1032    }
1033}
1034
1035pub struct BaoFileStorageSubscriber {
1036    receiver: watch::Receiver<BaoFileStorage>,
1037}
1038
1039impl BaoFileStorageSubscriber {
1040    pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1041        Self { receiver }
1042    }
1043
1044    /// Forward observed *values* to the given sender
1045    ///
1046    /// Returns an error if sending fails, or if the last sender is dropped
1047    pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1048        let value = self.receiver.borrow().bitfield();
1049        tx.send(value).await?;
1050        loop {
1051            self.update_or_closed(&mut tx).await?;
1052            let value = self.receiver.borrow().bitfield();
1053            tx.send(value.clone()).await?;
1054        }
1055    }
1056
1057    /// Forward observed *deltas* to the given sender
1058    ///
1059    /// Returns an error if sending fails, or if the last sender is dropped
1060    #[allow(dead_code)]
1061    pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1062        let value = self.receiver.borrow().bitfield();
1063        let mut old = value.clone();
1064        tx.send(value).await?;
1065        loop {
1066            self.update_or_closed(&mut tx).await?;
1067            let new = self.receiver.borrow().bitfield();
1068            let diff = old.diff(&new);
1069            if diff.is_empty() {
1070                continue;
1071            }
1072            tx.send(diff).await?;
1073            old = new;
1074        }
1075    }
1076
1077    async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1078        tokio::select! {
1079            _ = tx.closed() => {
1080                // the sender is closed, we are done
1081                Err(irpc::channel::SendError::ReceiverClosed.into())
1082            }
1083            e = self.receiver.changed() => Ok(e?),
1084        }
1085    }
1086}
1087
1088#[cfg(test)]
1089mod tests {
1090    use n0_future::StreamExt;
1091    use testresult::TestResult;
1092
1093    use super::*;
1094
1095    #[tokio::test]
1096    async fn smoke() -> TestResult<()> {
1097        let store = MemStore::new();
1098        let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1099        let hash = tt.hash();
1100        println!("hash: {hash:?}");
1101        let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1102        while let Some(item) = stream.next().await {
1103            println!("item: {item:?}");
1104        }
1105        let stream = store.export_bao(hash, ChunkRanges::all());
1106        let exported = stream.bao_to_vec().await?;
1107
1108        let store2 = MemStore::new();
1109        let mut or = store2.observe(hash).stream().await?;
1110        n0_future::task::spawn(async move {
1111            while let Some(event) = or.next().await {
1112                println!("event: {event:?}");
1113            }
1114        });
1115        store2
1116            .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1117            .await?;
1118
1119        let exported2 = store2
1120            .export_bao(hash, ChunkRanges::all())
1121            .bao_to_vec()
1122            .await?;
1123        assert_eq!(exported, exported2);
1124
1125        Ok(())
1126    }
1127}