iroh_blobs/store/
mem.rs

1//! Mutable in-memory blob store.
2//!
3//! Being a memory store, this store has to import all data into memory before it can
4//! serve it. So the amount of data you can serve is limited by your available memory.
5//! Other than that this is a fully featured store that provides all features such as
6//! tags and garbage collection.
7//!
8//! For many use cases this can be quite useful, since it does not require write access
9//! to the file system.
10use std::{
11    collections::{BTreeMap, HashMap, HashSet},
12    future::Future,
13    io::{self, Write},
14    num::NonZeroU64,
15    ops::Deref,
16    sync::Arc,
17    time::SystemTime,
18};
19
20use bao_tree::{
21    blake3,
22    io::{
23        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24        outboard::PreOrderMemOutboard,
25        sync::{Outboard, ReadAt, WriteAt},
26        BaoContentItem, EncodeError, Leaf,
27    },
28    BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35    io::AsyncReadExt,
36    sync::watch,
37    task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43    api::{
44        self,
45        blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46        proto::{
47            BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48            CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49            DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51            ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52            ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53            ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54            SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
55        },
56        tags::TagInfo,
57        ApiClient,
58    },
59    protocol::ChunkRangesExt,
60    store::{
61        util::{SizeInfo, SparseMemFile, Tag},
62        IROH_BLOCK_SIZE,
63    },
64    util::temp_tag::{TagDrop, TempTagScope, TempTags},
65    BlobFormat, Hash, HashAndFormat,
66};
67
68#[derive(Debug, Default)]
69pub struct Options {}
70
71#[derive(Debug, Clone)]
72#[repr(transparent)]
73pub struct MemStore {
74    client: ApiClient,
75}
76
77impl From<MemStore> for crate::api::Store {
78    fn from(value: MemStore) -> Self {
79        crate::api::Store::from_sender(value.client)
80    }
81}
82
83impl AsRef<crate::api::Store> for MemStore {
84    fn as_ref(&self) -> &crate::api::Store {
85        crate::api::Store::ref_from_sender(&self.client)
86    }
87}
88
89impl Deref for MemStore {
90    type Target = crate::api::Store;
91
92    fn deref(&self) -> &Self::Target {
93        crate::api::Store::ref_from_sender(&self.client)
94    }
95}
96
97impl Default for MemStore {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103#[derive(derive_more::From)]
104enum TaskResult {
105    Unit(()),
106    Import(anyhow::Result<ImportEntry>),
107    Scope(Scope),
108}
109
110impl MemStore {
111    pub fn from_sender(client: ApiClient) -> Self {
112        Self { client }
113    }
114
115    pub fn new() -> Self {
116        let (sender, receiver) = tokio::sync::mpsc::channel(32);
117        tokio::spawn(
118            Actor {
119                commands: receiver,
120                tasks: JoinSet::new(),
121                state: State {
122                    data: HashMap::new(),
123                    tags: BTreeMap::new(),
124                    empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
125                },
126                options: Arc::new(Options::default()),
127                temp_tags: Default::default(),
128                protected: Default::default(),
129                idle_waiters: Default::default(),
130            }
131            .run(),
132        );
133        Self::from_sender(sender.into())
134    }
135}
136
137struct Actor {
138    commands: tokio::sync::mpsc::Receiver<Command>,
139    tasks: JoinSet<TaskResult>,
140    state: State,
141    #[allow(dead_code)]
142    options: Arc<Options>,
143    // temp tags
144    temp_tags: TempTags,
145    // idle waiters
146    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
147    protected: HashSet<Hash>,
148}
149
150impl Actor {
151    fn spawn<F, T>(&mut self, f: F)
152    where
153        F: Future<Output = T> + Send + 'static,
154        T: Into<TaskResult>,
155    {
156        let span = tracing::Span::current();
157        let fut = async move { f.await.into() }.instrument(span);
158        self.tasks.spawn(fut);
159    }
160
161    async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
162        match cmd {
163            Command::ImportBao(ImportBaoMsg {
164                inner: ImportBaoRequest { hash, size },
165                rx: data,
166                tx,
167                ..
168            }) => {
169                let entry = self.get_or_create_entry(hash);
170                self.spawn(import_bao(entry, size, data, tx));
171            }
172            Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
173                trace!("wait idle");
174                if self.tasks.is_empty() {
175                    // we are currently idle
176                    tx.send(()).await.ok();
177                } else {
178                    // wait for idle state
179                    self.idle_waiters.push(tx);
180                }
181            }
182            Command::Observe(ObserveMsg {
183                inner: ObserveRequest { hash },
184                tx,
185                ..
186            }) => {
187                let entry = self.get_or_create_entry(hash);
188                self.spawn(observe(entry, tx));
189            }
190            Command::ImportBytes(ImportBytesMsg {
191                inner:
192                    ImportBytesRequest {
193                        data,
194                        scope,
195                        format,
196                        ..
197                    },
198                tx,
199                ..
200            }) => {
201                self.spawn(import_bytes(data, scope, format, tx));
202            }
203            Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
204                self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
205            }
206            Command::ImportPath(cmd) => {
207                self.spawn(import_path(cmd));
208            }
209            Command::ExportBao(ExportBaoMsg {
210                inner: ExportBaoRequest { hash, ranges },
211                tx,
212                ..
213            }) => {
214                let entry = self.get(&hash);
215                self.spawn(export_bao(entry, ranges, tx))
216            }
217            Command::ExportPath(cmd) => {
218                let entry = self.get(&cmd.hash);
219                self.spawn(export_path(entry, cmd));
220            }
221            Command::DeleteTags(cmd) => {
222                let DeleteTagsMsg {
223                    inner: DeleteTagsRequest { from, to },
224                    tx,
225                    ..
226                } = cmd;
227                info!("deleting tags from {:?} to {:?}", from, to);
228                // state.tags.remove(&from.unwrap());
229                // todo: more efficient impl
230                let mut deleted = 0;
231                self.state.tags.retain(|tag, _| {
232                    if let Some(from) = &from {
233                        if tag < from {
234                            return true;
235                        }
236                    }
237                    if let Some(to) = &to {
238                        if tag >= to {
239                            return true;
240                        }
241                    }
242                    info!("    removing {:?}", tag);
243                    deleted += 1;
244                    false
245                });
246                tx.send(Ok(deleted)).await.ok();
247            }
248            Command::RenameTag(cmd) => {
249                let RenameTagMsg {
250                    inner: RenameTagRequest { from, to },
251                    tx,
252                    ..
253                } = cmd;
254                let tags = &mut self.state.tags;
255                let value = match tags.remove(&from) {
256                    Some(value) => value,
257                    None => {
258                        tx.send(Err(api::Error::io(
259                            io::ErrorKind::NotFound,
260                            format!("tag not found: {from:?}"),
261                        )))
262                        .await
263                        .ok();
264                        return None;
265                    }
266                };
267                tags.insert(to, value);
268                tx.send(Ok(())).await.ok();
269                return None;
270            }
271            Command::ListTags(cmd) => {
272                let ListTagsMsg {
273                    inner:
274                        ListTagsRequest {
275                            from,
276                            to,
277                            raw,
278                            hash_seq,
279                        },
280                    tx,
281                    ..
282                } = cmd;
283                let tags = self
284                    .state
285                    .tags
286                    .iter()
287                    .filter(move |(tag, value)| {
288                        if let Some(from) = &from {
289                            if tag < &from {
290                                return false;
291                            }
292                        }
293                        if let Some(to) = &to {
294                            if tag >= &to {
295                                return false;
296                            }
297                        }
298                        raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
299                    })
300                    .map(|(tag, value)| TagInfo {
301                        name: tag.clone(),
302                        hash: value.hash,
303                        format: value.format,
304                    })
305                    .map(Ok);
306                tx.send(tags.collect()).await.ok();
307            }
308            Command::SetTag(SetTagMsg {
309                inner: SetTagRequest { name: tag, value },
310                tx,
311                ..
312            }) => {
313                self.state.tags.insert(tag, value);
314                tx.send(Ok(())).await.ok();
315            }
316            Command::CreateTag(CreateTagMsg {
317                inner: CreateTagRequest { value },
318                tx,
319                ..
320            }) => {
321                let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
322                self.state.tags.insert(tag.clone(), value);
323                tx.send(Ok(tag)).await.ok();
324            }
325            Command::CreateTempTag(cmd) => {
326                trace!("{cmd:?}");
327                self.create_temp_tag(cmd).await;
328            }
329            Command::ListTempTags(cmd) => {
330                trace!("{cmd:?}");
331                let tts = self.temp_tags.list();
332                cmd.tx.send(tts).await.ok();
333            }
334            Command::ListBlobs(cmd) => {
335                let ListBlobsMsg { tx, .. } = cmd;
336                let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
337                self.spawn(async move {
338                    for blob in blobs {
339                        if tx.send(Ok(blob)).await.is_err() {
340                            break;
341                        }
342                    }
343                });
344            }
345            Command::BlobStatus(cmd) => {
346                trace!("{cmd:?}");
347                let BlobStatusMsg {
348                    inner: BlobStatusRequest { hash },
349                    tx,
350                    ..
351                } = cmd;
352                let res = match self.get(&hash) {
353                    None => api::blobs::BlobStatus::NotFound,
354                    Some(x) => {
355                        let bitfield = x.0.state.borrow().bitfield();
356                        if bitfield.is_complete() {
357                            BlobStatus::Complete {
358                                size: bitfield.size,
359                            }
360                        } else {
361                            BlobStatus::Partial {
362                                size: bitfield.validated_size(),
363                            }
364                        }
365                    }
366                };
367                tx.send(res).await.ok();
368            }
369            Command::DeleteBlobs(cmd) => {
370                trace!("{cmd:?}");
371                let DeleteBlobsMsg {
372                    inner: BlobDeleteRequest { hashes, force },
373                    tx,
374                    ..
375                } = cmd;
376                for hash in hashes {
377                    if !force && self.protected.contains(&hash) {
378                        continue;
379                    }
380                    self.state.data.remove(&hash);
381                }
382                tx.send(Ok(())).await.ok();
383            }
384            Command::Batch(cmd) => {
385                trace!("{cmd:?}");
386                let (id, scope) = self.temp_tags.create_scope();
387                self.spawn(handle_batch(cmd, id, scope));
388            }
389            Command::ClearProtected(cmd) => {
390                self.protected.clear();
391                cmd.tx.send(Ok(())).await.ok();
392            }
393            Command::ExportRanges(cmd) => {
394                let entry = self.get(&cmd.hash);
395                self.spawn(export_ranges(cmd, entry));
396            }
397            Command::SyncDb(SyncDbMsg { tx, .. }) => {
398                tx.send(Ok(())).await.ok();
399            }
400            Command::Shutdown(cmd) => {
401                return Some(cmd);
402            }
403        }
404        None
405    }
406
407    fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
408        if *hash == Hash::EMPTY {
409            Some(self.state.empty_hash.clone())
410        } else {
411            self.state.data.get(hash).cloned()
412        }
413    }
414
415    fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
416        if hash == Hash::EMPTY {
417            self.state.empty_hash.clone()
418        } else {
419            self.state
420                .data
421                .entry(hash)
422                .or_insert_with(|| BaoFileHandle::new_partial(hash))
423                .clone()
424        }
425    }
426
427    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
428        let CreateTempTagMsg { tx, inner, .. } = cmd;
429        let mut tt = self.temp_tags.create(inner.scope, inner.value);
430        if tx.is_rpc() {
431            tt.leak();
432        }
433        tx.send(tt).await.ok();
434    }
435
436    async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
437        let import_data = match res {
438            Ok(entry) => entry,
439            Err(e) => {
440                error!("import failed: {e}");
441                return;
442            }
443        };
444        let hash = import_data.outboard.root().into();
445        let entry = self.get_or_create_entry(hash);
446        entry
447            .0
448            .state
449            .send_if_modified(|state: &mut BaoFileStorage| {
450                let BaoFileStorage::Partial(_) = state.deref() else {
451                    return false;
452                };
453                *state =
454                    CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
455                true
456            });
457        let tt = self.temp_tags.create(
458            import_data.scope,
459            HashAndFormat {
460                hash,
461                format: import_data.format,
462            },
463        );
464        import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
465    }
466
467    fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
468        match res {
469            Ok(x) => Some(x),
470            Err(e) => {
471                if e.is_cancelled() {
472                    trace!("task cancelled: {e}");
473                } else {
474                    error!("task failed: {e}");
475                }
476                None
477            }
478        }
479    }
480
481    pub async fn run(mut self) {
482        let shutdown = loop {
483            tokio::select! {
484                cmd = self.commands.recv() => {
485                    let Some(cmd) = cmd else {
486                        // last sender has been dropped.
487                        // exit immediately.
488                        break None;
489                    };
490                    if let Some(cmd) = self.handle_command(cmd).await {
491                        break Some(cmd);
492                    }
493                }
494                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
495                    let Some(res) = self.log_task_result(res) else {
496                        continue;
497                    };
498                    match res {
499                        TaskResult::Import(res) => {
500                            self.finish_import(res).await;
501                        }
502                        TaskResult::Scope(scope) => {
503                            self.temp_tags.end_scope(scope);
504                        }
505                        TaskResult::Unit(_) => {}
506                    }
507                    if self.tasks.is_empty() {
508                        // we are idle now
509                        for tx in self.idle_waiters.drain(..) {
510                            tx.send(()).await.ok();
511                        }
512                    }
513                }
514            }
515        };
516        if let Some(shutdown) = shutdown {
517            shutdown.tx.send(()).await.ok();
518        }
519    }
520}
521
522async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
523    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
524        error!("batch failed: {cause}");
525    }
526    id
527}
528
529async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
530    let BatchMsg { tx, mut rx, .. } = cmd;
531    trace!("created scope {}", id);
532    tx.send(id).await.map_err(api::Error::other)?;
533    while let Some(msg) = rx.recv().await? {
534        match msg {
535            BatchResponse::Drop(msg) => scope.on_drop(&msg),
536            BatchResponse::Ping => {}
537        }
538    }
539    Ok(())
540}
541
542async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
543    let Some(entry) = entry else {
544        let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
545        cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
546        return;
547    };
548    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
549        cmd.tx
550            .send(ExportRangesItem::Error(cause.into()))
551            .await
552            .ok();
553    }
554}
555
556async fn export_ranges_impl(
557    cmd: ExportRangesRequest,
558    tx: &mut mpsc::Sender<ExportRangesItem>,
559    entry: BaoFileHandle,
560) -> io::Result<()> {
561    let ExportRangesRequest { ranges, hash } = cmd;
562    let bitfield = entry.bitfield();
563    trace!(
564        "exporting ranges: {hash} {ranges:?} size={}",
565        bitfield.size()
566    );
567    debug_assert!(entry.hash() == hash, "hash mismatch");
568    let data = entry.data_reader();
569    let size = bitfield.size();
570    for range in ranges.iter() {
571        let range = match range {
572            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
573            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
574        };
575        let requested = ChunkRanges::bytes(range.start..range.end);
576        if !bitfield.ranges.is_superset(&requested) {
577            return Err(io::Error::other(format!(
578                "missing range: {requested:?}, present: {bitfield:?}",
579            )));
580        }
581        let bs = 1024;
582        let mut offset = range.start;
583        loop {
584            let end: u64 = (offset + bs).min(range.end);
585            let size = (end - offset) as usize;
586            tx.send(
587                Leaf {
588                    offset,
589                    data: data.read_bytes_at(offset, size)?,
590                }
591                .into(),
592            )
593            .await?;
594            offset = end;
595            if offset >= range.end {
596                break;
597            }
598        }
599    }
600    Ok(())
601}
602
603fn chunk_range(leaf: &Leaf) -> ChunkRanges {
604    let start = ChunkNum::chunks(leaf.offset);
605    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
606    (start..end).into()
607}
608
609async fn import_bao(
610    entry: BaoFileHandle,
611    size: NonZeroU64,
612    mut stream: mpsc::Receiver<BaoContentItem>,
613    tx: irpc::channel::oneshot::Sender<api::Result<()>>,
614) {
615    let size = size.get();
616    entry
617        .0
618        .state
619        .send_if_modified(|state: &mut BaoFileStorage| {
620            let BaoFileStorage::Partial(entry) = state else {
621                // entry was already completed, no need to write
622                return false;
623            };
624            entry.size.write(0, size);
625            false
626        });
627    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
628    while let Some(item) = stream.recv().await.unwrap() {
629        entry.0.state.send_if_modified(|state| {
630            let BaoFileStorage::Partial(partial) = state else {
631                // entry was already completed, no need to write
632                return false;
633            };
634            match item {
635                BaoContentItem::Parent(parent) => {
636                    if let Some(offset) = tree.pre_order_offset(parent.node) {
637                        let mut pair = [0u8; 64];
638                        pair[..32].copy_from_slice(parent.pair.0.as_bytes());
639                        pair[32..].copy_from_slice(parent.pair.1.as_bytes());
640                        partial
641                            .outboard
642                            .write_at(offset * 64, &pair)
643                            .expect("writing to mem can never fail");
644                    }
645                    false
646                }
647                BaoContentItem::Leaf(leaf) => {
648                    let start = leaf.offset;
649                    partial
650                        .data
651                        .write_at(start, &leaf.data)
652                        .expect("writing to mem can never fail");
653                    let added = chunk_range(&leaf);
654                    let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
655                    if update.new_state().complete {
656                        let data = std::mem::take(&mut partial.data);
657                        let outboard = std::mem::take(&mut partial.outboard);
658                        let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
659                        let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
660                        *state = CompleteStorage::new(data, outboard).into();
661                    }
662                    update.changed()
663                }
664            }
665        });
666    }
667    tx.send(Ok(())).await.ok();
668}
669
670#[instrument(skip_all, fields(hash = tracing::field::Empty))]
671async fn export_bao(
672    entry: Option<BaoFileHandle>,
673    ranges: ChunkRanges,
674    mut sender: mpsc::Sender<EncodedItem>,
675) {
676    let Some(entry) = entry else {
677        let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
678        sender.send(err.into()).await.ok();
679        return;
680    };
681    tracing::Span::current().record("hash", tracing::field::display(entry.hash));
682    let data = entry.data_reader();
683    let outboard = entry.outboard_reader();
684    let tx = BaoTreeSender::new(&mut sender);
685    traverse_ranges_validated(data, outboard, &ranges, tx)
686        .await
687        .ok();
688}
689
690#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
691async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
692    entry.subscribe().forward(tx).await.ok();
693}
694
695async fn import_bytes(
696    data: Bytes,
697    scope: Scope,
698    format: BlobFormat,
699    tx: mpsc::Sender<AddProgressItem>,
700) -> anyhow::Result<ImportEntry> {
701    tx.send(AddProgressItem::Size(data.len() as u64)).await?;
702    tx.send(AddProgressItem::CopyDone).await?;
703    let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
704    Ok(ImportEntry {
705        data,
706        outboard,
707        scope,
708        format,
709        tx,
710    })
711}
712
713async fn import_byte_stream(
714    scope: Scope,
715    format: BlobFormat,
716    mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
717    tx: mpsc::Sender<AddProgressItem>,
718) -> anyhow::Result<ImportEntry> {
719    let mut res = Vec::new();
720    loop {
721        match rx.recv().await {
722            Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
723                res.extend_from_slice(&data);
724                tx.send(AddProgressItem::CopyProgress(res.len() as u64))
725                    .await?;
726            }
727            Ok(Some(ImportByteStreamUpdate::Done)) => {
728                break;
729            }
730            Ok(None) => {
731                return Err(api::Error::io(
732                    io::ErrorKind::UnexpectedEof,
733                    "byte stream ended unexpectedly",
734                )
735                .into());
736            }
737            Err(e) => {
738                return Err(e.into());
739            }
740        }
741    }
742    import_bytes(res.into(), scope, format, tx).await
743}
744
745#[instrument(skip_all, fields(path = %cmd.path.display()))]
746async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
747    let ImportPathMsg {
748        inner:
749            ImportPathRequest {
750                path,
751                scope,
752                format,
753                ..
754            },
755        tx,
756        ..
757    } = cmd;
758    let mut res = Vec::new();
759    let mut file = tokio::fs::File::open(path).await?;
760    let mut buf = [0u8; 1024 * 64];
761    loop {
762        let size = file.read(&mut buf).await?;
763        if size == 0 {
764            break;
765        }
766        res.extend_from_slice(&buf[..size]);
767        tx.send(AddProgressItem::CopyProgress(res.len() as u64))
768            .await?;
769    }
770    import_bytes(res.into(), scope, format, tx).await
771}
772
773#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
774async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
775    let ExportPathMsg { inner, mut tx, .. } = cmd;
776    let Some(entry) = entry else {
777        tx.send(ExportProgressItem::Error(api::Error::io(
778            io::ErrorKind::NotFound,
779            "hash not found",
780        )))
781        .await
782        .ok();
783        return;
784    };
785    match export_path_impl(entry, inner, &mut tx).await {
786        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
787        Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
788    };
789}
790
791async fn export_path_impl(
792    entry: BaoFileHandle,
793    cmd: ExportPathRequest,
794    tx: &mut mpsc::Sender<ExportProgressItem>,
795) -> io::Result<()> {
796    let ExportPathRequest { target, .. } = cmd;
797    if !target.is_absolute() {
798        return Err(io::Error::new(
799            io::ErrorKind::InvalidInput,
800            "path is not absolute",
801        ));
802    }
803    if let Some(parent) = target.parent() {
804        std::fs::create_dir_all(parent)?;
805    }
806    // todo: for partial entries make sure to only write the part that is actually present
807    let mut file = std::fs::File::create(target)?;
808    let size = entry.0.state.borrow().size();
809    tx.send(ExportProgressItem::Size(size)).await?;
810    let mut buf = [0u8; 1024 * 64];
811    for offset in (0..size).step_by(1024 * 64) {
812        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
813        let buf = &mut buf[..len];
814        entry.0.state.borrow().data().read_exact_at(offset, buf)?;
815        file.write_all(buf)?;
816        tx.try_send(ExportProgressItem::CopyProgress(offset))
817            .await
818            .map_err(|_e| io::Error::other(""))?;
819        yield_now().await;
820    }
821    Ok(())
822}
823
824struct ImportEntry {
825    scope: Scope,
826    format: BlobFormat,
827    data: Bytes,
828    outboard: PreOrderMemOutboard,
829    tx: mpsc::Sender<AddProgressItem>,
830}
831
832pub struct DataReader(BaoFileHandle);
833
834impl ReadBytesAt for DataReader {
835    fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
836        let entry = self.0 .0.state.borrow();
837        entry.data().read_bytes_at(offset, size)
838    }
839}
840
841pub struct OutboardReader {
842    hash: blake3::Hash,
843    tree: BaoTree,
844    data: BaoFileHandle,
845}
846
847impl Outboard for OutboardReader {
848    fn root(&self) -> blake3::Hash {
849        self.hash
850    }
851
852    fn tree(&self) -> BaoTree {
853        self.tree
854    }
855
856    fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
857        let Some(offset) = self.tree.pre_order_offset(node) else {
858            return Ok(None);
859        };
860        let mut buf = [0u8; 64];
861        let size = self
862            .data
863            .0
864            .state
865            .borrow()
866            .outboard()
867            .read_at(offset * 64, &mut buf)?;
868        if size != 64 {
869            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
870        }
871        let left: [u8; 32] = buf[..32].try_into().unwrap();
872        let right: [u8; 32] = buf[32..].try_into().unwrap();
873        Ok(Some((left.into(), right.into())))
874    }
875}
876
877struct State {
878    data: HashMap<Hash, BaoFileHandle>,
879    tags: BTreeMap<Tag, HashAndFormat>,
880    empty_hash: BaoFileHandle,
881}
882
883#[derive(Debug, derive_more::From)]
884pub enum BaoFileStorage {
885    Partial(PartialMemStorage),
886    Complete(CompleteStorage),
887}
888
889impl BaoFileStorage {
890    /// Get the bitfield of the storage.
891    pub fn bitfield(&self) -> Bitfield {
892        match self {
893            Self::Partial(entry) => entry.bitfield.clone(),
894            Self::Complete(entry) => Bitfield::complete(entry.size()),
895        }
896    }
897}
898
899#[derive(Debug)]
900pub struct BaoFileHandleInner {
901    state: watch::Sender<BaoFileStorage>,
902    hash: Hash,
903}
904
905/// A cheaply cloneable handle to a bao file, including the hash
906#[derive(Debug, Clone, derive_more::Deref)]
907pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
908
909impl BaoFileHandle {
910    pub fn new_partial(hash: Hash) -> Self {
911        let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
912            data: SparseMemFile::new(),
913            outboard: SparseMemFile::new(),
914            size: SizeInfo::default(),
915            bitfield: Bitfield::empty(),
916        }));
917        Self(Arc::new(BaoFileHandleInner { state, hash }))
918    }
919
920    pub fn hash(&self) -> Hash {
921        self.hash
922    }
923
924    pub fn bitfield(&self) -> Bitfield {
925        self.0.state.borrow().bitfield()
926    }
927
928    pub fn subscribe(&self) -> BaoFileStorageSubscriber {
929        BaoFileStorageSubscriber::new(self.0.state.subscribe())
930    }
931
932    pub fn data_reader(&self) -> DataReader {
933        DataReader(self.clone())
934    }
935
936    pub fn outboard_reader(&self) -> OutboardReader {
937        let entry = self.0.state.borrow();
938        let hash = self.hash.into();
939        let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
940        OutboardReader {
941            hash,
942            tree,
943            data: self.clone(),
944        }
945    }
946}
947
948impl Default for BaoFileStorage {
949    fn default() -> Self {
950        Self::Partial(Default::default())
951    }
952}
953
954impl BaoFileStorage {
955    fn data(&self) -> &[u8] {
956        match self {
957            Self::Partial(entry) => entry.data.as_ref(),
958            Self::Complete(entry) => &entry.data,
959        }
960    }
961
962    fn outboard(&self) -> &[u8] {
963        match self {
964            Self::Partial(entry) => entry.outboard.as_ref(),
965            Self::Complete(entry) => &entry.outboard,
966        }
967    }
968
969    fn size(&self) -> u64 {
970        match self {
971            Self::Partial(entry) => entry.current_size(),
972            Self::Complete(entry) => entry.size(),
973        }
974    }
975}
976
977#[derive(Debug, Clone)]
978pub struct CompleteStorage {
979    pub(crate) data: Bytes,
980    pub(crate) outboard: Bytes,
981}
982
983impl CompleteStorage {
984    pub fn create(data: Bytes) -> (Hash, Self) {
985        let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
986        let hash = outboard.root().into();
987        let outboard = outboard.data.into();
988        let entry = Self::new(data, outboard);
989        (hash, entry)
990    }
991
992    pub fn new(data: Bytes, outboard: Bytes) -> Self {
993        Self { data, outboard }
994    }
995
996    pub fn size(&self) -> u64 {
997        self.data.len() as u64
998    }
999}
1000
1001#[allow(dead_code)]
1002fn print_outboard(hashes: &[u8]) {
1003    assert!(hashes.len() % 64 == 0);
1004    for chunk in hashes.chunks(64) {
1005        let left: [u8; 32] = chunk[..32].try_into().unwrap();
1006        let right: [u8; 32] = chunk[32..].try_into().unwrap();
1007        let left = blake3::Hash::from(left);
1008        let right = blake3::Hash::from(right);
1009        println!("l: {left:?}, r: {right:?}");
1010    }
1011}
1012
1013pub struct BaoFileStorageSubscriber {
1014    receiver: watch::Receiver<BaoFileStorage>,
1015}
1016
1017impl BaoFileStorageSubscriber {
1018    pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1019        Self { receiver }
1020    }
1021
1022    /// Forward observed *values* to the given sender
1023    ///
1024    /// Returns an error if sending fails, or if the last sender is dropped
1025    pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1026        let value = self.receiver.borrow().bitfield();
1027        tx.send(value).await?;
1028        loop {
1029            self.update_or_closed(&mut tx).await?;
1030            let value = self.receiver.borrow().bitfield();
1031            tx.send(value.clone()).await?;
1032        }
1033    }
1034
1035    /// Forward observed *deltas* to the given sender
1036    ///
1037    /// Returns an error if sending fails, or if the last sender is dropped
1038    #[allow(dead_code)]
1039    pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1040        let value = self.receiver.borrow().bitfield();
1041        let mut old = value.clone();
1042        tx.send(value).await?;
1043        loop {
1044            self.update_or_closed(&mut tx).await?;
1045            let new = self.receiver.borrow().bitfield();
1046            let diff = old.diff(&new);
1047            if diff.is_empty() {
1048                continue;
1049            }
1050            tx.send(diff).await?;
1051            old = new;
1052        }
1053    }
1054
1055    async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1056        tokio::select! {
1057            _ = tx.closed() => {
1058                // the sender is closed, we are done
1059                Err(irpc::channel::SendError::ReceiverClosed.into())
1060            }
1061            e = self.receiver.changed() => Ok(e?),
1062        }
1063    }
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068    use n0_future::StreamExt;
1069    use testresult::TestResult;
1070
1071    use super::*;
1072
1073    #[tokio::test]
1074    async fn smoke() -> TestResult<()> {
1075        let store = MemStore::new();
1076        let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1077        let hash = *tt.hash();
1078        println!("hash: {hash:?}");
1079        let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1080        while let Some(item) = stream.next().await {
1081            println!("item: {item:?}");
1082        }
1083        let stream = store.export_bao(hash, ChunkRanges::all());
1084        let exported = stream.bao_to_vec().await?;
1085
1086        let store2 = MemStore::new();
1087        let mut or = store2.observe(hash).stream().await?;
1088        tokio::spawn(async move {
1089            while let Some(event) = or.next().await {
1090                println!("event: {event:?}");
1091            }
1092        });
1093        store2
1094            .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1095            .await?;
1096
1097        let exported2 = store2
1098            .export_bao(hash, ChunkRanges::all())
1099            .bao_to_vec()
1100            .await?;
1101        assert_eq!(exported, exported2);
1102
1103        Ok(())
1104    }
1105}