iroh_blobs/store/
mem.rs

1//! Mutable in-memory blob store.
2//!
3//! Being a memory store, this store has to import all data into memory before it can
4//! serve it. So the amount of data you can serve is limited by your available memory.
5//! Other than that this is a fully featured store that provides all features such as
6//! tags and garbage collection.
7//!
8//! For many use cases this can be quite useful, since it does not require write access
9//! to the file system.
10use std::{
11    collections::{BTreeMap, HashMap, HashSet},
12    future::Future,
13    io::{self, Write},
14    num::NonZeroU64,
15    ops::Deref,
16    sync::Arc,
17    time::SystemTime,
18};
19
20use bao_tree::{
21    blake3,
22    io::{
23        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24        outboard::PreOrderMemOutboard,
25        sync::{Outboard, ReadAt, WriteAt},
26        BaoContentItem, EncodeError, Leaf,
27    },
28    BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_error::{Result, StdResultExt};
33use n0_future::future::yield_now;
34use range_collections::range_set::RangeSetRange;
35use tokio::{
36    io::AsyncReadExt,
37    sync::watch,
38    task::{JoinError, JoinSet},
39};
40use tracing::{error, info, instrument, trace, Instrument};
41
42use super::util::{BaoTreeSender, PartialMemStorage};
43use crate::{
44    api::{
45        self,
46        blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
47        proto::{
48            BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
49            CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
50            DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
51            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
52            ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
53            ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
54            ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
55            SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
56        },
57        tags::TagInfo,
58        ApiClient,
59    },
60    protocol::ChunkRangesExt,
61    store::{
62        gc::{run_gc, GcConfig},
63        util::{SizeInfo, SparseMemFile, Tag},
64        IROH_BLOCK_SIZE,
65    },
66    util::temp_tag::{TagDrop, TempTagScope, TempTags},
67    BlobFormat, Hash, HashAndFormat,
68};
69
70#[derive(Debug, Default)]
71pub struct Options {
72    pub gc_config: Option<GcConfig>,
73}
74
75#[derive(Debug, Clone)]
76#[repr(transparent)]
77pub struct MemStore {
78    client: ApiClient,
79}
80
81impl From<MemStore> for crate::api::Store {
82    fn from(value: MemStore) -> Self {
83        crate::api::Store::from_sender(value.client)
84    }
85}
86
87impl AsRef<crate::api::Store> for MemStore {
88    fn as_ref(&self) -> &crate::api::Store {
89        crate::api::Store::ref_from_sender(&self.client)
90    }
91}
92
93impl Deref for MemStore {
94    type Target = crate::api::Store;
95
96    fn deref(&self) -> &Self::Target {
97        crate::api::Store::ref_from_sender(&self.client)
98    }
99}
100
101impl Default for MemStore {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107#[derive(derive_more::From)]
108enum TaskResult {
109    Unit(()),
110    Import(Result<ImportEntry>),
111    Scope(Scope),
112}
113
114impl MemStore {
115    pub fn from_sender(client: ApiClient) -> Self {
116        Self { client }
117    }
118
119    pub fn new() -> Self {
120        Self::new_with_opts(Options::default())
121    }
122
123    pub fn new_with_opts(opts: Options) -> Self {
124        let (sender, receiver) = tokio::sync::mpsc::channel(32);
125        tokio::spawn(
126            Actor {
127                commands: receiver,
128                tasks: JoinSet::new(),
129                state: State {
130                    data: HashMap::new(),
131                    tags: BTreeMap::new(),
132                    empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
133                },
134                options: Arc::new(Options::default()),
135                temp_tags: Default::default(),
136                protected: Default::default(),
137                idle_waiters: Default::default(),
138            }
139            .run(),
140        );
141
142        let store = Self::from_sender(sender.into());
143        if let Some(gc_config) = opts.gc_config {
144            tokio::spawn(run_gc(store.deref().clone(), gc_config));
145        }
146
147        store
148    }
149}
150
151struct Actor {
152    commands: tokio::sync::mpsc::Receiver<Command>,
153    tasks: JoinSet<TaskResult>,
154    state: State,
155    #[allow(dead_code)]
156    options: Arc<Options>,
157    // temp tags
158    temp_tags: TempTags,
159    // idle waiters
160    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
161    protected: HashSet<Hash>,
162}
163
164impl Actor {
165    fn spawn<F, T>(&mut self, f: F)
166    where
167        F: Future<Output = T> + Send + 'static,
168        T: Into<TaskResult>,
169    {
170        let span = tracing::Span::current();
171        let fut = async move { f.await.into() }.instrument(span);
172        self.tasks.spawn(fut);
173    }
174
175    async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
176        match cmd {
177            Command::ImportBao(ImportBaoMsg {
178                inner: ImportBaoRequest { hash, size },
179                rx: data,
180                tx,
181                ..
182            }) => {
183                let entry = self.get_or_create_entry(hash);
184                self.spawn(import_bao(entry, size, data, tx));
185            }
186            Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
187                trace!("wait idle");
188                if self.tasks.is_empty() {
189                    // we are currently idle
190                    tx.send(()).await.ok();
191                } else {
192                    // wait for idle state
193                    self.idle_waiters.push(tx);
194                }
195            }
196            Command::Observe(ObserveMsg {
197                inner: ObserveRequest { hash },
198                tx,
199                ..
200            }) => {
201                let entry = self.get_or_create_entry(hash);
202                self.spawn(observe(entry, tx));
203            }
204            Command::ImportBytes(ImportBytesMsg {
205                inner:
206                    ImportBytesRequest {
207                        data,
208                        scope,
209                        format,
210                        ..
211                    },
212                tx,
213                ..
214            }) => {
215                self.spawn(import_bytes(data, scope, format, tx));
216            }
217            Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
218                self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
219            }
220            Command::ImportPath(cmd) => {
221                self.spawn(import_path(cmd));
222            }
223            Command::ExportBao(ExportBaoMsg {
224                inner: ExportBaoRequest { hash, ranges },
225                tx,
226                ..
227            }) => {
228                let entry = self.get(&hash);
229                self.spawn(export_bao(entry, ranges, tx))
230            }
231            Command::ExportPath(cmd) => {
232                let entry = self.get(&cmd.hash);
233                self.spawn(export_path(entry, cmd));
234            }
235            Command::DeleteTags(cmd) => {
236                let DeleteTagsMsg {
237                    inner: DeleteTagsRequest { from, to },
238                    tx,
239                    ..
240                } = cmd;
241                info!("deleting tags from {:?} to {:?}", from, to);
242                // state.tags.remove(&from.unwrap());
243                // todo: more efficient impl
244                let mut deleted = 0;
245                self.state.tags.retain(|tag, _| {
246                    if let Some(from) = &from {
247                        if tag < from {
248                            return true;
249                        }
250                    }
251                    if let Some(to) = &to {
252                        if tag >= to {
253                            return true;
254                        }
255                    }
256                    info!("    removing {:?}", tag);
257                    deleted += 1;
258                    false
259                });
260                tx.send(Ok(deleted)).await.ok();
261            }
262            Command::RenameTag(cmd) => {
263                let RenameTagMsg {
264                    inner: RenameTagRequest { from, to },
265                    tx,
266                    ..
267                } = cmd;
268                let tags = &mut self.state.tags;
269                let value = match tags.remove(&from) {
270                    Some(value) => value,
271                    None => {
272                        tx.send(Err(api::Error::io(
273                            io::ErrorKind::NotFound,
274                            format!("tag not found: {from:?}"),
275                        )))
276                        .await
277                        .ok();
278                        return None;
279                    }
280                };
281                tags.insert(to, value);
282                tx.send(Ok(())).await.ok();
283                return None;
284            }
285            Command::ListTags(cmd) => {
286                let ListTagsMsg {
287                    inner:
288                        ListTagsRequest {
289                            from,
290                            to,
291                            raw,
292                            hash_seq,
293                        },
294                    tx,
295                    ..
296                } = cmd;
297                let tags = self
298                    .state
299                    .tags
300                    .iter()
301                    .filter(move |(tag, value)| {
302                        if let Some(from) = &from {
303                            if tag < &from {
304                                return false;
305                            }
306                        }
307                        if let Some(to) = &to {
308                            if tag >= &to {
309                                return false;
310                            }
311                        }
312                        raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
313                    })
314                    .map(|(tag, value)| TagInfo {
315                        name: tag.clone(),
316                        hash: value.hash,
317                        format: value.format,
318                    })
319                    .map(Ok);
320                tx.send(tags.collect()).await.ok();
321            }
322            Command::SetTag(SetTagMsg {
323                inner: SetTagRequest { name: tag, value },
324                tx,
325                ..
326            }) => {
327                self.state.tags.insert(tag, value);
328                tx.send(Ok(())).await.ok();
329            }
330            Command::CreateTag(CreateTagMsg {
331                inner: CreateTagRequest { value },
332                tx,
333                ..
334            }) => {
335                let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
336                self.state.tags.insert(tag.clone(), value);
337                tx.send(Ok(tag)).await.ok();
338            }
339            Command::CreateTempTag(cmd) => {
340                trace!("{cmd:?}");
341                self.create_temp_tag(cmd).await;
342            }
343            Command::ListTempTags(cmd) => {
344                trace!("{cmd:?}");
345                let tts = self.temp_tags.list();
346                cmd.tx.send(tts).await.ok();
347            }
348            Command::ListBlobs(cmd) => {
349                let ListBlobsMsg { tx, .. } = cmd;
350                let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
351                self.spawn(async move {
352                    for blob in blobs {
353                        if tx.send(Ok(blob)).await.is_err() {
354                            break;
355                        }
356                    }
357                });
358            }
359            Command::BlobStatus(cmd) => {
360                trace!("{cmd:?}");
361                let BlobStatusMsg {
362                    inner: BlobStatusRequest { hash },
363                    tx,
364                    ..
365                } = cmd;
366                let res = match self.get(&hash) {
367                    None => api::blobs::BlobStatus::NotFound,
368                    Some(x) => {
369                        let bitfield = x.0.state.borrow().bitfield();
370                        if bitfield.is_complete() {
371                            BlobStatus::Complete {
372                                size: bitfield.size,
373                            }
374                        } else {
375                            BlobStatus::Partial {
376                                size: bitfield.validated_size(),
377                            }
378                        }
379                    }
380                };
381                tx.send(res).await.ok();
382            }
383            Command::DeleteBlobs(cmd) => {
384                trace!("{cmd:?}");
385                let DeleteBlobsMsg {
386                    inner: BlobDeleteRequest { hashes, force },
387                    tx,
388                    ..
389                } = cmd;
390                for hash in hashes {
391                    if !force && self.protected.contains(&hash) {
392                        continue;
393                    }
394                    self.state.data.remove(&hash);
395                }
396                tx.send(Ok(())).await.ok();
397            }
398            Command::Batch(cmd) => {
399                trace!("{cmd:?}");
400                let (id, scope) = self.temp_tags.create_scope();
401                self.spawn(handle_batch(cmd, id, scope));
402            }
403            Command::ClearProtected(cmd) => {
404                self.protected.clear();
405                cmd.tx.send(Ok(())).await.ok();
406            }
407            Command::ExportRanges(cmd) => {
408                let entry = self.get(&cmd.hash);
409                self.spawn(export_ranges(cmd, entry));
410            }
411            Command::SyncDb(SyncDbMsg { tx, .. }) => {
412                tx.send(Ok(())).await.ok();
413            }
414            Command::Shutdown(cmd) => {
415                return Some(cmd);
416            }
417        }
418        None
419    }
420
421    fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
422        if *hash == Hash::EMPTY {
423            Some(self.state.empty_hash.clone())
424        } else {
425            self.state.data.get(hash).cloned()
426        }
427    }
428
429    fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
430        if hash == Hash::EMPTY {
431            self.state.empty_hash.clone()
432        } else {
433            self.state
434                .data
435                .entry(hash)
436                .or_insert_with(|| BaoFileHandle::new_partial(hash))
437                .clone()
438        }
439    }
440
441    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
442        let CreateTempTagMsg { tx, inner, .. } = cmd;
443        let mut tt = self.temp_tags.create(inner.scope, inner.value);
444        if tx.is_rpc() {
445            tt.leak();
446        }
447        tx.send(tt).await.ok();
448    }
449
450    async fn finish_import(&mut self, res: Result<ImportEntry>) {
451        let import_data = match res {
452            Ok(entry) => entry,
453            Err(e) => {
454                error!("import failed: {e}");
455                return;
456            }
457        };
458        let hash = import_data.outboard.root().into();
459        let entry = self.get_or_create_entry(hash);
460        entry
461            .0
462            .state
463            .send_if_modified(|state: &mut BaoFileStorage| {
464                let BaoFileStorage::Partial(_) = state.deref() else {
465                    return false;
466                };
467                *state =
468                    CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
469                true
470            });
471        let tt = self.temp_tags.create(
472            import_data.scope,
473            HashAndFormat {
474                hash,
475                format: import_data.format,
476            },
477        );
478        import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
479    }
480
481    fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
482        match res {
483            Ok(x) => Some(x),
484            Err(e) => {
485                if e.is_cancelled() {
486                    trace!("task cancelled: {e}");
487                } else {
488                    error!("task failed: {e}");
489                }
490                None
491            }
492        }
493    }
494
495    pub async fn run(mut self) {
496        let shutdown = loop {
497            tokio::select! {
498                cmd = self.commands.recv() => {
499                    let Some(cmd) = cmd else {
500                        // last sender has been dropped.
501                        // exit immediately.
502                        break None;
503                    };
504                    if let Some(cmd) = self.handle_command(cmd).await {
505                        break Some(cmd);
506                    }
507                }
508                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
509                    let Some(res) = self.log_task_result(res) else {
510                        continue;
511                    };
512                    match res {
513                        TaskResult::Import(res) => {
514                            self.finish_import(res).await;
515                        }
516                        TaskResult::Scope(scope) => {
517                            self.temp_tags.end_scope(scope);
518                        }
519                        TaskResult::Unit(_) => {}
520                    }
521                    if self.tasks.is_empty() {
522                        // we are idle now
523                        for tx in self.idle_waiters.drain(..) {
524                            tx.send(()).await.ok();
525                        }
526                    }
527                }
528            }
529        };
530        if let Some(shutdown) = shutdown {
531            shutdown.tx.send(()).await.ok();
532        }
533    }
534}
535
536async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
537    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
538        error!("batch failed: {cause}");
539    }
540    id
541}
542
543async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
544    let BatchMsg { tx, mut rx, .. } = cmd;
545    trace!("created scope {}", id);
546    tx.send(id).await.map_err(api::Error::other)?;
547    while let Some(msg) = rx.recv().await? {
548        match msg {
549            BatchResponse::Drop(msg) => scope.on_drop(&msg),
550            BatchResponse::Ping => {}
551        }
552    }
553    Ok(())
554}
555
556async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
557    let Some(entry) = entry else {
558        let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
559        cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
560        return;
561    };
562    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
563        cmd.tx
564            .send(ExportRangesItem::Error(cause.into()))
565            .await
566            .ok();
567    }
568}
569
570async fn export_ranges_impl(
571    cmd: ExportRangesRequest,
572    tx: &mut mpsc::Sender<ExportRangesItem>,
573    entry: BaoFileHandle,
574) -> io::Result<()> {
575    let ExportRangesRequest { ranges, hash } = cmd;
576    let bitfield = entry.bitfield();
577    trace!(
578        "exporting ranges: {hash} {ranges:?} size={}",
579        bitfield.size()
580    );
581    debug_assert!(entry.hash() == hash, "hash mismatch");
582    let data = entry.data_reader();
583    let size = bitfield.size();
584    for range in ranges.iter() {
585        let range = match range {
586            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
587            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
588        };
589        let requested = ChunkRanges::bytes(range.start..range.end);
590        if !bitfield.ranges.is_superset(&requested) {
591            return Err(io::Error::other(format!(
592                "missing range: {requested:?}, present: {bitfield:?}",
593            )));
594        }
595        let bs = 1024;
596        let mut offset = range.start;
597        loop {
598            let end: u64 = (offset + bs).min(range.end);
599            let size = (end - offset) as usize;
600            tx.send(
601                Leaf {
602                    offset,
603                    data: data.read_bytes_at(offset, size)?,
604                }
605                .into(),
606            )
607            .await?;
608            offset = end;
609            if offset >= range.end {
610                break;
611            }
612        }
613    }
614    Ok(())
615}
616
617fn chunk_range(leaf: &Leaf) -> ChunkRanges {
618    let start = ChunkNum::chunks(leaf.offset);
619    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
620    (start..end).into()
621}
622
623async fn import_bao(
624    entry: BaoFileHandle,
625    size: NonZeroU64,
626    mut stream: mpsc::Receiver<BaoContentItem>,
627    tx: irpc::channel::oneshot::Sender<api::Result<()>>,
628) {
629    let size = size.get();
630    entry
631        .0
632        .state
633        .send_if_modified(|state: &mut BaoFileStorage| {
634            let BaoFileStorage::Partial(entry) = state else {
635                // entry was already completed, no need to write
636                return false;
637            };
638            entry.size.write(0, size);
639            false
640        });
641    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
642    while let Some(item) = stream.recv().await.unwrap() {
643        entry.0.state.send_if_modified(|state| {
644            let BaoFileStorage::Partial(partial) = state else {
645                // entry was already completed, no need to write
646                return false;
647            };
648            match item {
649                BaoContentItem::Parent(parent) => {
650                    if let Some(offset) = tree.pre_order_offset(parent.node) {
651                        let mut pair = [0u8; 64];
652                        pair[..32].copy_from_slice(parent.pair.0.as_bytes());
653                        pair[32..].copy_from_slice(parent.pair.1.as_bytes());
654                        partial
655                            .outboard
656                            .write_at(offset * 64, &pair)
657                            .expect("writing to mem can never fail");
658                    }
659                    false
660                }
661                BaoContentItem::Leaf(leaf) => {
662                    let start = leaf.offset;
663                    partial
664                        .data
665                        .write_at(start, &leaf.data)
666                        .expect("writing to mem can never fail");
667                    let added = chunk_range(&leaf);
668                    let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
669                    if update.new_state().complete {
670                        let data = std::mem::take(&mut partial.data);
671                        let outboard = std::mem::take(&mut partial.outboard);
672                        let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
673                        let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
674                        *state = CompleteStorage::new(data, outboard).into();
675                    }
676                    update.changed()
677                }
678            }
679        });
680    }
681    tx.send(Ok(())).await.ok();
682}
683
684#[instrument(skip_all, fields(hash = tracing::field::Empty))]
685async fn export_bao(
686    entry: Option<BaoFileHandle>,
687    ranges: ChunkRanges,
688    mut sender: mpsc::Sender<EncodedItem>,
689) {
690    let Some(entry) = entry else {
691        let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
692        sender.send(err.into()).await.ok();
693        return;
694    };
695    tracing::Span::current().record("hash", tracing::field::display(entry.hash));
696    let data = entry.data_reader();
697    let outboard = entry.outboard_reader();
698    let tx = BaoTreeSender::new(&mut sender);
699    traverse_ranges_validated(data, outboard, &ranges, tx)
700        .await
701        .ok();
702}
703
704#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
705async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
706    entry.subscribe().forward(tx).await.ok();
707}
708
709async fn import_bytes(
710    data: Bytes,
711    scope: Scope,
712    format: BlobFormat,
713    tx: mpsc::Sender<AddProgressItem>,
714) -> Result<ImportEntry> {
715    tx.send(AddProgressItem::Size(data.len() as u64)).await?;
716    tx.send(AddProgressItem::CopyDone).await?;
717    let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
718    Ok(ImportEntry {
719        data,
720        outboard,
721        scope,
722        format,
723        tx,
724    })
725}
726
727async fn import_byte_stream(
728    scope: Scope,
729    format: BlobFormat,
730    mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
731    tx: mpsc::Sender<AddProgressItem>,
732) -> Result<ImportEntry> {
733    let mut res = Vec::new();
734    loop {
735        match rx.recv().await {
736            Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
737                res.extend_from_slice(&data);
738                tx.send(AddProgressItem::CopyProgress(res.len() as u64))
739                    .await?;
740            }
741            Ok(Some(ImportByteStreamUpdate::Done)) => {
742                break;
743            }
744            Ok(None) => {
745                return Err(api::Error::io(
746                    io::ErrorKind::UnexpectedEof,
747                    "byte stream ended unexpectedly",
748                )
749                .into());
750            }
751            Err(e) => {
752                return Err(e.into());
753            }
754        }
755    }
756    import_bytes(res.into(), scope, format, tx).await
757}
758
759#[instrument(skip_all, fields(path = %cmd.path.display()))]
760async fn import_path(cmd: ImportPathMsg) -> Result<ImportEntry> {
761    let ImportPathMsg {
762        inner:
763            ImportPathRequest {
764                path,
765                scope,
766                format,
767                ..
768            },
769        tx,
770        ..
771    } = cmd;
772    let mut res = Vec::new();
773    let mut file = tokio::fs::File::open(path).await?;
774    let mut buf = [0u8; 1024 * 64];
775    loop {
776        let size = file.read(&mut buf).await?;
777        if size == 0 {
778            break;
779        }
780        res.extend_from_slice(&buf[..size]);
781        tx.send(AddProgressItem::CopyProgress(res.len() as u64))
782            .await?;
783    }
784    import_bytes(res.into(), scope, format, tx).await
785}
786
787#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
788async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
789    let ExportPathMsg { inner, mut tx, .. } = cmd;
790    let Some(entry) = entry else {
791        tx.send(ExportProgressItem::Error(api::Error::io(
792            io::ErrorKind::NotFound,
793            "hash not found",
794        )))
795        .await
796        .ok();
797        return;
798    };
799    match export_path_impl(entry, inner, &mut tx).await {
800        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
801        Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
802    };
803}
804
805async fn export_path_impl(
806    entry: BaoFileHandle,
807    cmd: ExportPathRequest,
808    tx: &mut mpsc::Sender<ExportProgressItem>,
809) -> io::Result<()> {
810    let ExportPathRequest { target, .. } = cmd;
811    if !target.is_absolute() {
812        return Err(io::Error::new(
813            io::ErrorKind::InvalidInput,
814            "path is not absolute",
815        ));
816    }
817    if let Some(parent) = target.parent() {
818        std::fs::create_dir_all(parent)?;
819    }
820    // todo: for partial entries make sure to only write the part that is actually present
821    let mut file = std::fs::File::create(target)?;
822    let size = entry.0.state.borrow().size();
823    tx.send(ExportProgressItem::Size(size)).await?;
824    let mut buf = [0u8; 1024 * 64];
825    for offset in (0..size).step_by(1024 * 64) {
826        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
827        let buf = &mut buf[..len];
828        entry.0.state.borrow().data().read_exact_at(offset, buf)?;
829        file.write_all(buf)?;
830        tx.try_send(ExportProgressItem::CopyProgress(offset))
831            .await
832            .map_err(|_e| io::Error::other(""))?;
833        yield_now().await;
834    }
835    Ok(())
836}
837
838struct ImportEntry {
839    scope: Scope,
840    format: BlobFormat,
841    data: Bytes,
842    outboard: PreOrderMemOutboard,
843    tx: mpsc::Sender<AddProgressItem>,
844}
845
846pub struct DataReader(BaoFileHandle);
847
848impl ReadBytesAt for DataReader {
849    fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
850        let entry = self.0 .0.state.borrow();
851        entry.data().read_bytes_at(offset, size)
852    }
853}
854
855pub struct OutboardReader {
856    hash: blake3::Hash,
857    tree: BaoTree,
858    data: BaoFileHandle,
859}
860
861impl Outboard for OutboardReader {
862    fn root(&self) -> blake3::Hash {
863        self.hash
864    }
865
866    fn tree(&self) -> BaoTree {
867        self.tree
868    }
869
870    fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
871        let Some(offset) = self.tree.pre_order_offset(node) else {
872            return Ok(None);
873        };
874        let mut buf = [0u8; 64];
875        let size = self
876            .data
877            .0
878            .state
879            .borrow()
880            .outboard()
881            .read_at(offset * 64, &mut buf)?;
882        if size != 64 {
883            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
884        }
885        let left: [u8; 32] = buf[..32].try_into().unwrap();
886        let right: [u8; 32] = buf[32..].try_into().unwrap();
887        Ok(Some((left.into(), right.into())))
888    }
889}
890
891struct State {
892    data: HashMap<Hash, BaoFileHandle>,
893    tags: BTreeMap<Tag, HashAndFormat>,
894    empty_hash: BaoFileHandle,
895}
896
897#[derive(Debug, derive_more::From)]
898pub enum BaoFileStorage {
899    Partial(PartialMemStorage),
900    Complete(CompleteStorage),
901}
902
903impl BaoFileStorage {
904    /// Get the bitfield of the storage.
905    pub fn bitfield(&self) -> Bitfield {
906        match self {
907            Self::Partial(entry) => entry.bitfield.clone(),
908            Self::Complete(entry) => Bitfield::complete(entry.size()),
909        }
910    }
911}
912
913#[derive(Debug)]
914pub struct BaoFileHandleInner {
915    state: watch::Sender<BaoFileStorage>,
916    hash: Hash,
917}
918
919/// A cheaply cloneable handle to a bao file, including the hash
920#[derive(Debug, Clone, derive_more::Deref)]
921pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
922
923impl BaoFileHandle {
924    pub fn new_partial(hash: Hash) -> Self {
925        let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
926            data: SparseMemFile::new(),
927            outboard: SparseMemFile::new(),
928            size: SizeInfo::default(),
929            bitfield: Bitfield::empty(),
930        }));
931        Self(Arc::new(BaoFileHandleInner { state, hash }))
932    }
933
934    pub fn hash(&self) -> Hash {
935        self.hash
936    }
937
938    pub fn bitfield(&self) -> Bitfield {
939        self.0.state.borrow().bitfield()
940    }
941
942    pub fn subscribe(&self) -> BaoFileStorageSubscriber {
943        BaoFileStorageSubscriber::new(self.0.state.subscribe())
944    }
945
946    pub fn data_reader(&self) -> DataReader {
947        DataReader(self.clone())
948    }
949
950    pub fn outboard_reader(&self) -> OutboardReader {
951        let entry = self.0.state.borrow();
952        let hash = self.hash.into();
953        let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
954        OutboardReader {
955            hash,
956            tree,
957            data: self.clone(),
958        }
959    }
960}
961
962impl Default for BaoFileStorage {
963    fn default() -> Self {
964        Self::Partial(Default::default())
965    }
966}
967
968impl BaoFileStorage {
969    fn data(&self) -> &[u8] {
970        match self {
971            Self::Partial(entry) => entry.data.as_ref(),
972            Self::Complete(entry) => &entry.data,
973        }
974    }
975
976    fn outboard(&self) -> &[u8] {
977        match self {
978            Self::Partial(entry) => entry.outboard.as_ref(),
979            Self::Complete(entry) => &entry.outboard,
980        }
981    }
982
983    fn size(&self) -> u64 {
984        match self {
985            Self::Partial(entry) => entry.current_size(),
986            Self::Complete(entry) => entry.size(),
987        }
988    }
989}
990
991#[derive(Debug, Clone)]
992pub struct CompleteStorage {
993    pub(crate) data: Bytes,
994    pub(crate) outboard: Bytes,
995}
996
997impl CompleteStorage {
998    pub fn create(data: Bytes) -> (Hash, Self) {
999        let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
1000        let hash = outboard.root().into();
1001        let outboard = outboard.data.into();
1002        let entry = Self::new(data, outboard);
1003        (hash, entry)
1004    }
1005
1006    pub fn new(data: Bytes, outboard: Bytes) -> Self {
1007        Self { data, outboard }
1008    }
1009
1010    pub fn size(&self) -> u64 {
1011        self.data.len() as u64
1012    }
1013}
1014
1015#[allow(dead_code)]
1016fn print_outboard(hashes: &[u8]) {
1017    assert!(hashes.len() % 64 == 0);
1018    for chunk in hashes.chunks(64) {
1019        let left: [u8; 32] = chunk[..32].try_into().unwrap();
1020        let right: [u8; 32] = chunk[32..].try_into().unwrap();
1021        let left = blake3::Hash::from(left);
1022        let right = blake3::Hash::from(right);
1023        println!("l: {left:?}, r: {right:?}");
1024    }
1025}
1026
1027pub struct BaoFileStorageSubscriber {
1028    receiver: watch::Receiver<BaoFileStorage>,
1029}
1030
1031impl BaoFileStorageSubscriber {
1032    pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1033        Self { receiver }
1034    }
1035
1036    /// Forward observed *values* to the given sender
1037    ///
1038    /// Returns an error if sending fails, or if the last sender is dropped
1039    pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> Result<()> {
1040        let value = self.receiver.borrow().bitfield();
1041        tx.send(value).await?;
1042        loop {
1043            self.update_or_closed(&mut tx).await?;
1044            let value = self.receiver.borrow().bitfield();
1045            tx.send(value.clone()).await?;
1046        }
1047    }
1048
1049    /// Forward observed *deltas* to the given sender
1050    ///
1051    /// Returns an error if sending fails, or if the last sender is dropped
1052    #[allow(dead_code)]
1053    pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> Result<()> {
1054        let value = self.receiver.borrow().bitfield();
1055        let mut old = value.clone();
1056        tx.send(value).await?;
1057        loop {
1058            self.update_or_closed(&mut tx).await?;
1059            let new = self.receiver.borrow().bitfield();
1060            let diff = old.diff(&new);
1061            if diff.is_empty() {
1062                continue;
1063            }
1064            tx.send(diff).await?;
1065            old = new;
1066        }
1067    }
1068
1069    async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> Result<()> {
1070        tokio::select! {
1071            _ = tx.closed() => {
1072                // the sender is closed, we are done
1073                Err(n0_error::e!(irpc::channel::SendError::ReceiverClosed).into())
1074            }
1075            e = self.receiver.changed() => Ok(e.anyerr()?),
1076        }
1077    }
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082    use n0_future::StreamExt;
1083    use testresult::TestResult;
1084
1085    use super::*;
1086
1087    #[tokio::test]
1088    async fn smoke() -> TestResult<()> {
1089        let store = MemStore::new();
1090        let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1091        let hash = tt.hash();
1092        println!("hash: {hash:?}");
1093        let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1094        while let Some(item) = stream.next().await {
1095            println!("item: {item:?}");
1096        }
1097        let stream = store.export_bao(hash, ChunkRanges::all());
1098        let exported = stream.bao_to_vec().await?;
1099
1100        let store2 = MemStore::new();
1101        let mut or = store2.observe(hash).stream().await?;
1102        tokio::spawn(async move {
1103            while let Some(event) = or.next().await {
1104                println!("event: {event:?}");
1105            }
1106        });
1107        store2
1108            .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1109            .await?;
1110
1111        let exported2 = store2
1112            .export_bao(hash, ChunkRanges::all())
1113            .bao_to_vec()
1114            .await?;
1115        assert_eq!(exported, exported2);
1116
1117        Ok(())
1118    }
1119}