iroh_blobs/store/
mem.rs

1//! Mutable in-memory blob store.
2//!
3//! Being a memory store, this store has to import all data into memory before it can
4//! serve it. So the amount of data you can serve is limited by your available memory.
5//! Other than that this is a fully featured store that provides all features such as
6//! tags and garbage collection.
7//!
8//! For many use cases this can be quite useful, since it does not require write access
9//! to the file system.
10use std::{
11    collections::{BTreeMap, HashMap, HashSet},
12    future::Future,
13    io::{self, Write},
14    num::NonZeroU64,
15    ops::Deref,
16    sync::Arc,
17    time::SystemTime,
18};
19
20use bao_tree::{
21    blake3,
22    io::{
23        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24        outboard::PreOrderMemOutboard,
25        sync::{Outboard, ReadAt, WriteAt},
26        BaoContentItem, EncodeError, Leaf,
27    },
28    BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35    io::AsyncReadExt,
36    sync::watch,
37    task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43    api::{
44        self,
45        blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46        proto::{
47            BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48            CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49            DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51            ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52            ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53            ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54            SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
55        },
56        tags::TagInfo,
57        ApiClient,
58    },
59    protocol::ChunkRangesExt,
60    store::{
61        util::{SizeInfo, SparseMemFile, Tag},
62        IROH_BLOCK_SIZE,
63    },
64    util::temp_tag::{TagDrop, TempTagScope, TempTags},
65    BlobFormat, Hash, HashAndFormat,
66};
67
68#[derive(Debug, Default)]
69pub struct Options {}
70
71#[derive(Debug, Clone)]
72#[repr(transparent)]
73pub struct MemStore {
74    client: ApiClient,
75}
76
77impl From<MemStore> for crate::api::Store {
78    fn from(value: MemStore) -> Self {
79        crate::api::Store::from_sender(value.client)
80    }
81}
82
83impl AsRef<crate::api::Store> for MemStore {
84    fn as_ref(&self) -> &crate::api::Store {
85        crate::api::Store::ref_from_sender(&self.client)
86    }
87}
88
89impl Deref for MemStore {
90    type Target = crate::api::Store;
91
92    fn deref(&self) -> &Self::Target {
93        crate::api::Store::ref_from_sender(&self.client)
94    }
95}
96
97impl Default for MemStore {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103#[derive(derive_more::From)]
104enum TaskResult {
105    Unit(()),
106    Import(anyhow::Result<ImportEntry>),
107    Scope(Scope),
108}
109
110impl MemStore {
111    pub fn from_sender(client: ApiClient) -> Self {
112        Self { client }
113    }
114
115    pub fn new() -> Self {
116        let (sender, receiver) = tokio::sync::mpsc::channel(32);
117        tokio::spawn(
118            Actor {
119                commands: receiver,
120                tasks: JoinSet::new(),
121                state: State {
122                    data: HashMap::new(),
123                    tags: BTreeMap::new(),
124                    empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
125                },
126                options: Arc::new(Options::default()),
127                temp_tags: Default::default(),
128                protected: Default::default(),
129                idle_waiters: Default::default(),
130            }
131            .run(),
132        );
133        Self::from_sender(sender.into())
134    }
135}
136
137struct Actor {
138    commands: tokio::sync::mpsc::Receiver<Command>,
139    tasks: JoinSet<TaskResult>,
140    state: State,
141    #[allow(dead_code)]
142    options: Arc<Options>,
143    // temp tags
144    temp_tags: TempTags,
145    // idle waiters
146    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
147    protected: HashSet<Hash>,
148}
149
150impl Actor {
151    fn spawn<F, T>(&mut self, f: F)
152    where
153        F: Future<Output = T> + Send + 'static,
154        T: Into<TaskResult>,
155    {
156        let span = tracing::Span::current();
157        let fut = async move { f.await.into() }.instrument(span);
158        self.tasks.spawn(fut);
159    }
160
161    async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
162        match cmd {
163            Command::ImportBao(ImportBaoMsg {
164                inner: ImportBaoRequest { hash, size },
165                rx: data,
166                tx,
167                ..
168            }) => {
169                let entry = self.get_or_create_entry(hash);
170                self.spawn(import_bao(entry, size, data, tx));
171            }
172            Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
173                trace!("wait idle");
174                if self.tasks.is_empty() {
175                    // we are currently idle
176                    tx.send(()).await.ok();
177                } else {
178                    // wait for idle state
179                    self.idle_waiters.push(tx);
180                }
181            }
182            Command::Observe(ObserveMsg {
183                inner: ObserveRequest { hash },
184                tx,
185                ..
186            }) => {
187                let entry = self.get_or_create_entry(hash);
188                self.spawn(observe(entry, tx));
189            }
190            Command::ImportBytes(ImportBytesMsg {
191                inner:
192                    ImportBytesRequest {
193                        data,
194                        scope,
195                        format,
196                        ..
197                    },
198                tx,
199                ..
200            }) => {
201                self.spawn(import_bytes(data, scope, format, tx));
202            }
203            Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
204                self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
205            }
206            Command::ImportPath(cmd) => {
207                self.spawn(import_path(cmd));
208            }
209            Command::ExportBao(ExportBaoMsg {
210                inner: ExportBaoRequest { hash, ranges },
211                tx,
212                ..
213            }) => {
214                let entry = self.get(&hash);
215                self.spawn(export_bao(entry, ranges, tx))
216            }
217            Command::ExportPath(cmd) => {
218                let entry = self.get(&cmd.hash);
219                self.spawn(export_path(entry, cmd));
220            }
221            Command::DeleteTags(cmd) => {
222                let DeleteTagsMsg {
223                    inner: DeleteTagsRequest { from, to },
224                    tx,
225                    ..
226                } = cmd;
227                info!("deleting tags from {:?} to {:?}", from, to);
228                // state.tags.remove(&from.unwrap());
229                // todo: more efficient impl
230                self.state.tags.retain(|tag, _| {
231                    if let Some(from) = &from {
232                        if tag < from {
233                            return true;
234                        }
235                    }
236                    if let Some(to) = &to {
237                        if tag >= to {
238                            return true;
239                        }
240                    }
241                    info!("    removing {:?}", tag);
242                    false
243                });
244                tx.send(Ok(())).await.ok();
245            }
246            Command::RenameTag(cmd) => {
247                let RenameTagMsg {
248                    inner: RenameTagRequest { from, to },
249                    tx,
250                    ..
251                } = cmd;
252                let tags = &mut self.state.tags;
253                let value = match tags.remove(&from) {
254                    Some(value) => value,
255                    None => {
256                        tx.send(Err(api::Error::io(
257                            io::ErrorKind::NotFound,
258                            format!("tag not found: {from:?}"),
259                        )))
260                        .await
261                        .ok();
262                        return None;
263                    }
264                };
265                tags.insert(to, value);
266                tx.send(Ok(())).await.ok();
267                return None;
268            }
269            Command::ListTags(cmd) => {
270                let ListTagsMsg {
271                    inner:
272                        ListTagsRequest {
273                            from,
274                            to,
275                            raw,
276                            hash_seq,
277                        },
278                    tx,
279                    ..
280                } = cmd;
281                let tags = self
282                    .state
283                    .tags
284                    .iter()
285                    .filter(move |(tag, value)| {
286                        if let Some(from) = &from {
287                            if tag < &from {
288                                return false;
289                            }
290                        }
291                        if let Some(to) = &to {
292                            if tag >= &to {
293                                return false;
294                            }
295                        }
296                        raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
297                    })
298                    .map(|(tag, value)| TagInfo {
299                        name: tag.clone(),
300                        hash: value.hash,
301                        format: value.format,
302                    })
303                    .map(Ok);
304                tx.send(tags.collect()).await.ok();
305            }
306            Command::SetTag(SetTagMsg {
307                inner: SetTagRequest { name: tag, value },
308                tx,
309                ..
310            }) => {
311                self.state.tags.insert(tag, value);
312                tx.send(Ok(())).await.ok();
313            }
314            Command::CreateTag(CreateTagMsg {
315                inner: CreateTagRequest { value },
316                tx,
317                ..
318            }) => {
319                let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
320                self.state.tags.insert(tag.clone(), value);
321                tx.send(Ok(tag)).await.ok();
322            }
323            Command::CreateTempTag(cmd) => {
324                trace!("{cmd:?}");
325                self.create_temp_tag(cmd).await;
326            }
327            Command::ListTempTags(cmd) => {
328                trace!("{cmd:?}");
329                let tts = self.temp_tags.list();
330                cmd.tx.send(tts).await.ok();
331            }
332            Command::ListBlobs(cmd) => {
333                let ListBlobsMsg { tx, .. } = cmd;
334                let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
335                self.spawn(async move {
336                    for blob in blobs {
337                        if tx.send(Ok(blob)).await.is_err() {
338                            break;
339                        }
340                    }
341                });
342            }
343            Command::BlobStatus(cmd) => {
344                trace!("{cmd:?}");
345                let BlobStatusMsg {
346                    inner: BlobStatusRequest { hash },
347                    tx,
348                    ..
349                } = cmd;
350                let res = match self.get(&hash) {
351                    None => api::blobs::BlobStatus::NotFound,
352                    Some(x) => {
353                        let bitfield = x.0.state.borrow().bitfield();
354                        if bitfield.is_complete() {
355                            BlobStatus::Complete {
356                                size: bitfield.size,
357                            }
358                        } else {
359                            BlobStatus::Partial {
360                                size: bitfield.validated_size(),
361                            }
362                        }
363                    }
364                };
365                tx.send(res).await.ok();
366            }
367            Command::DeleteBlobs(cmd) => {
368                trace!("{cmd:?}");
369                let DeleteBlobsMsg {
370                    inner: BlobDeleteRequest { hashes, force },
371                    tx,
372                    ..
373                } = cmd;
374                for hash in hashes {
375                    if !force && self.protected.contains(&hash) {
376                        continue;
377                    }
378                    self.state.data.remove(&hash);
379                }
380                tx.send(Ok(())).await.ok();
381            }
382            Command::Batch(cmd) => {
383                trace!("{cmd:?}");
384                let (id, scope) = self.temp_tags.create_scope();
385                self.spawn(handle_batch(cmd, id, scope));
386            }
387            Command::ClearProtected(cmd) => {
388                self.protected.clear();
389                cmd.tx.send(Ok(())).await.ok();
390            }
391            Command::ExportRanges(cmd) => {
392                let entry = self.get(&cmd.hash);
393                self.spawn(export_ranges(cmd, entry));
394            }
395            Command::SyncDb(SyncDbMsg { tx, .. }) => {
396                tx.send(Ok(())).await.ok();
397            }
398            Command::Shutdown(cmd) => {
399                return Some(cmd);
400            }
401        }
402        None
403    }
404
405    fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
406        if *hash == Hash::EMPTY {
407            Some(self.state.empty_hash.clone())
408        } else {
409            self.state.data.get(hash).cloned()
410        }
411    }
412
413    fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
414        if hash == Hash::EMPTY {
415            self.state.empty_hash.clone()
416        } else {
417            self.state
418                .data
419                .entry(hash)
420                .or_insert_with(|| BaoFileHandle::new_partial(hash))
421                .clone()
422        }
423    }
424
425    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
426        let CreateTempTagMsg { tx, inner, .. } = cmd;
427        let mut tt = self.temp_tags.create(inner.scope, inner.value);
428        if tx.is_rpc() {
429            tt.leak();
430        }
431        tx.send(tt).await.ok();
432    }
433
434    async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
435        let import_data = match res {
436            Ok(entry) => entry,
437            Err(e) => {
438                error!("import failed: {e}");
439                return;
440            }
441        };
442        let hash = import_data.outboard.root().into();
443        let entry = self.get_or_create_entry(hash);
444        entry
445            .0
446            .state
447            .send_if_modified(|state: &mut BaoFileStorage| {
448                let BaoFileStorage::Partial(_) = state.deref() else {
449                    return false;
450                };
451                *state =
452                    CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
453                true
454            });
455        let tt = self.temp_tags.create(
456            import_data.scope,
457            HashAndFormat {
458                hash,
459                format: import_data.format,
460            },
461        );
462        import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
463    }
464
465    fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
466        match res {
467            Ok(x) => Some(x),
468            Err(e) => {
469                if e.is_cancelled() {
470                    trace!("task cancelled: {e}");
471                } else {
472                    error!("task failed: {e}");
473                }
474                None
475            }
476        }
477    }
478
479    pub async fn run(mut self) {
480        let shutdown = loop {
481            tokio::select! {
482                cmd = self.commands.recv() => {
483                    let Some(cmd) = cmd else {
484                        // last sender has been dropped.
485                        // exit immediately.
486                        break None;
487                    };
488                    if let Some(cmd) = self.handle_command(cmd).await {
489                        break Some(cmd);
490                    }
491                }
492                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
493                    let Some(res) = self.log_task_result(res) else {
494                        continue;
495                    };
496                    match res {
497                        TaskResult::Import(res) => {
498                            self.finish_import(res).await;
499                        }
500                        TaskResult::Scope(scope) => {
501                            self.temp_tags.end_scope(scope);
502                        }
503                        TaskResult::Unit(_) => {}
504                    }
505                    if self.tasks.is_empty() {
506                        // we are idle now
507                        for tx in self.idle_waiters.drain(..) {
508                            tx.send(()).await.ok();
509                        }
510                    }
511                }
512            }
513        };
514        if let Some(shutdown) = shutdown {
515            shutdown.tx.send(()).await.ok();
516        }
517    }
518}
519
520async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
521    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
522        error!("batch failed: {cause}");
523    }
524    id
525}
526
527async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
528    let BatchMsg { tx, mut rx, .. } = cmd;
529    trace!("created scope {}", id);
530    tx.send(id).await.map_err(api::Error::other)?;
531    while let Some(msg) = rx.recv().await? {
532        match msg {
533            BatchResponse::Drop(msg) => scope.on_drop(&msg),
534            BatchResponse::Ping => {}
535        }
536    }
537    Ok(())
538}
539
540async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
541    let Some(entry) = entry else {
542        let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
543        cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
544        return;
545    };
546    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
547        cmd.tx
548            .send(ExportRangesItem::Error(cause.into()))
549            .await
550            .ok();
551    }
552}
553
554async fn export_ranges_impl(
555    cmd: ExportRangesRequest,
556    tx: &mut mpsc::Sender<ExportRangesItem>,
557    entry: BaoFileHandle,
558) -> io::Result<()> {
559    let ExportRangesRequest { ranges, hash } = cmd;
560    let bitfield = entry.bitfield();
561    trace!(
562        "exporting ranges: {hash} {ranges:?} size={}",
563        bitfield.size()
564    );
565    debug_assert!(entry.hash() == hash, "hash mismatch");
566    let data = entry.data_reader();
567    let size = bitfield.size();
568    for range in ranges.iter() {
569        let range = match range {
570            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
571            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
572        };
573        let requested = ChunkRanges::bytes(range.start..range.end);
574        if !bitfield.ranges.is_superset(&requested) {
575            return Err(io::Error::other(format!(
576                "missing range: {requested:?}, present: {bitfield:?}",
577            )));
578        }
579        let bs = 1024;
580        let mut offset = range.start;
581        loop {
582            let end: u64 = (offset + bs).min(range.end);
583            let size = (end - offset) as usize;
584            tx.send(
585                Leaf {
586                    offset,
587                    data: data.read_bytes_at(offset, size)?,
588                }
589                .into(),
590            )
591            .await?;
592            offset = end;
593            if offset >= range.end {
594                break;
595            }
596        }
597    }
598    Ok(())
599}
600
601fn chunk_range(leaf: &Leaf) -> ChunkRanges {
602    let start = ChunkNum::chunks(leaf.offset);
603    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
604    (start..end).into()
605}
606
607async fn import_bao(
608    entry: BaoFileHandle,
609    size: NonZeroU64,
610    mut stream: mpsc::Receiver<BaoContentItem>,
611    tx: irpc::channel::oneshot::Sender<api::Result<()>>,
612) {
613    let size = size.get();
614    entry
615        .0
616        .state
617        .send_if_modified(|state: &mut BaoFileStorage| {
618            let BaoFileStorage::Partial(entry) = state else {
619                // entry was already completed, no need to write
620                return false;
621            };
622            entry.size.write(0, size);
623            false
624        });
625    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
626    while let Some(item) = stream.recv().await.unwrap() {
627        entry.0.state.send_if_modified(|state| {
628            let BaoFileStorage::Partial(partial) = state else {
629                // entry was already completed, no need to write
630                return false;
631            };
632            match item {
633                BaoContentItem::Parent(parent) => {
634                    if let Some(offset) = tree.pre_order_offset(parent.node) {
635                        let mut pair = [0u8; 64];
636                        pair[..32].copy_from_slice(parent.pair.0.as_bytes());
637                        pair[32..].copy_from_slice(parent.pair.1.as_bytes());
638                        partial
639                            .outboard
640                            .write_at(offset * 64, &pair)
641                            .expect("writing to mem can never fail");
642                    }
643                    false
644                }
645                BaoContentItem::Leaf(leaf) => {
646                    let start = leaf.offset;
647                    partial
648                        .data
649                        .write_at(start, &leaf.data)
650                        .expect("writing to mem can never fail");
651                    let added = chunk_range(&leaf);
652                    let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
653                    if update.new_state().complete {
654                        let data = std::mem::take(&mut partial.data);
655                        let outboard = std::mem::take(&mut partial.outboard);
656                        let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
657                        let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
658                        *state = CompleteStorage::new(data, outboard).into();
659                    }
660                    update.changed()
661                }
662            }
663        });
664    }
665    tx.send(Ok(())).await.ok();
666}
667
668#[instrument(skip_all, fields(hash = tracing::field::Empty))]
669async fn export_bao(
670    entry: Option<BaoFileHandle>,
671    ranges: ChunkRanges,
672    mut sender: mpsc::Sender<EncodedItem>,
673) {
674    let Some(entry) = entry else {
675        let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
676        sender.send(err.into()).await.ok();
677        return;
678    };
679    tracing::Span::current().record("hash", tracing::field::display(entry.hash));
680    let data = entry.data_reader();
681    let outboard = entry.outboard_reader();
682    let tx = BaoTreeSender::new(&mut sender);
683    traverse_ranges_validated(data, outboard, &ranges, tx)
684        .await
685        .ok();
686}
687
688#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
689async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
690    entry.subscribe().forward(tx).await.ok();
691}
692
693async fn import_bytes(
694    data: Bytes,
695    scope: Scope,
696    format: BlobFormat,
697    tx: mpsc::Sender<AddProgressItem>,
698) -> anyhow::Result<ImportEntry> {
699    tx.send(AddProgressItem::Size(data.len() as u64)).await?;
700    tx.send(AddProgressItem::CopyDone).await?;
701    let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
702    Ok(ImportEntry {
703        data,
704        outboard,
705        scope,
706        format,
707        tx,
708    })
709}
710
711async fn import_byte_stream(
712    scope: Scope,
713    format: BlobFormat,
714    mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
715    tx: mpsc::Sender<AddProgressItem>,
716) -> anyhow::Result<ImportEntry> {
717    let mut res = Vec::new();
718    loop {
719        match rx.recv().await {
720            Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
721                res.extend_from_slice(&data);
722                tx.send(AddProgressItem::CopyProgress(res.len() as u64))
723                    .await?;
724            }
725            Ok(Some(ImportByteStreamUpdate::Done)) => {
726                break;
727            }
728            Ok(None) => {
729                return Err(api::Error::io(
730                    io::ErrorKind::UnexpectedEof,
731                    "byte stream ended unexpectedly",
732                )
733                .into());
734            }
735            Err(e) => {
736                return Err(e.into());
737            }
738        }
739    }
740    import_bytes(res.into(), scope, format, tx).await
741}
742
743#[instrument(skip_all, fields(path = %cmd.path.display()))]
744async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
745    let ImportPathMsg {
746        inner:
747            ImportPathRequest {
748                path,
749                scope,
750                format,
751                ..
752            },
753        tx,
754        ..
755    } = cmd;
756    let mut res = Vec::new();
757    let mut file = tokio::fs::File::open(path).await?;
758    let mut buf = [0u8; 1024 * 64];
759    loop {
760        let size = file.read(&mut buf).await?;
761        if size == 0 {
762            break;
763        }
764        res.extend_from_slice(&buf[..size]);
765        tx.send(AddProgressItem::CopyProgress(res.len() as u64))
766            .await?;
767    }
768    import_bytes(res.into(), scope, format, tx).await
769}
770
771#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
772async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
773    let ExportPathMsg { inner, mut tx, .. } = cmd;
774    let Some(entry) = entry else {
775        tx.send(ExportProgressItem::Error(api::Error::io(
776            io::ErrorKind::NotFound,
777            "hash not found",
778        )))
779        .await
780        .ok();
781        return;
782    };
783    match export_path_impl(entry, inner, &mut tx).await {
784        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
785        Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
786    };
787}
788
789async fn export_path_impl(
790    entry: BaoFileHandle,
791    cmd: ExportPathRequest,
792    tx: &mut mpsc::Sender<ExportProgressItem>,
793) -> io::Result<()> {
794    let ExportPathRequest { target, .. } = cmd;
795    if !target.is_absolute() {
796        return Err(io::Error::new(
797            io::ErrorKind::InvalidInput,
798            "path is not absolute",
799        ));
800    }
801    if let Some(parent) = target.parent() {
802        std::fs::create_dir_all(parent)?;
803    }
804    // todo: for partial entries make sure to only write the part that is actually present
805    let mut file = std::fs::File::create(target)?;
806    let size = entry.0.state.borrow().size();
807    tx.send(ExportProgressItem::Size(size)).await?;
808    let mut buf = [0u8; 1024 * 64];
809    for offset in (0..size).step_by(1024 * 64) {
810        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
811        let buf = &mut buf[..len];
812        entry.0.state.borrow().data().read_exact_at(offset, buf)?;
813        file.write_all(buf)?;
814        tx.try_send(ExportProgressItem::CopyProgress(offset))
815            .await
816            .map_err(|_e| io::Error::other(""))?;
817        yield_now().await;
818    }
819    Ok(())
820}
821
822struct ImportEntry {
823    scope: Scope,
824    format: BlobFormat,
825    data: Bytes,
826    outboard: PreOrderMemOutboard,
827    tx: mpsc::Sender<AddProgressItem>,
828}
829
830pub struct DataReader(BaoFileHandle);
831
832impl ReadBytesAt for DataReader {
833    fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
834        let entry = self.0 .0.state.borrow();
835        entry.data().read_bytes_at(offset, size)
836    }
837}
838
839pub struct OutboardReader {
840    hash: blake3::Hash,
841    tree: BaoTree,
842    data: BaoFileHandle,
843}
844
845impl Outboard for OutboardReader {
846    fn root(&self) -> blake3::Hash {
847        self.hash
848    }
849
850    fn tree(&self) -> BaoTree {
851        self.tree
852    }
853
854    fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
855        let Some(offset) = self.tree.pre_order_offset(node) else {
856            return Ok(None);
857        };
858        let mut buf = [0u8; 64];
859        let size = self
860            .data
861            .0
862            .state
863            .borrow()
864            .outboard()
865            .read_at(offset * 64, &mut buf)?;
866        if size != 64 {
867            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
868        }
869        let left: [u8; 32] = buf[..32].try_into().unwrap();
870        let right: [u8; 32] = buf[32..].try_into().unwrap();
871        Ok(Some((left.into(), right.into())))
872    }
873}
874
875struct State {
876    data: HashMap<Hash, BaoFileHandle>,
877    tags: BTreeMap<Tag, HashAndFormat>,
878    empty_hash: BaoFileHandle,
879}
880
881#[derive(Debug, derive_more::From)]
882pub enum BaoFileStorage {
883    Partial(PartialMemStorage),
884    Complete(CompleteStorage),
885}
886
887impl BaoFileStorage {
888    /// Get the bitfield of the storage.
889    pub fn bitfield(&self) -> Bitfield {
890        match self {
891            Self::Partial(entry) => entry.bitfield.clone(),
892            Self::Complete(entry) => Bitfield::complete(entry.size()),
893        }
894    }
895}
896
897#[derive(Debug)]
898pub struct BaoFileHandleInner {
899    state: watch::Sender<BaoFileStorage>,
900    hash: Hash,
901}
902
903/// A cheaply cloneable handle to a bao file, including the hash
904#[derive(Debug, Clone, derive_more::Deref)]
905pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
906
907impl BaoFileHandle {
908    pub fn new_partial(hash: Hash) -> Self {
909        let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
910            data: SparseMemFile::new(),
911            outboard: SparseMemFile::new(),
912            size: SizeInfo::default(),
913            bitfield: Bitfield::empty(),
914        }));
915        Self(Arc::new(BaoFileHandleInner { state, hash }))
916    }
917
918    pub fn hash(&self) -> Hash {
919        self.hash
920    }
921
922    pub fn bitfield(&self) -> Bitfield {
923        self.0.state.borrow().bitfield()
924    }
925
926    pub fn subscribe(&self) -> BaoFileStorageSubscriber {
927        BaoFileStorageSubscriber::new(self.0.state.subscribe())
928    }
929
930    pub fn data_reader(&self) -> DataReader {
931        DataReader(self.clone())
932    }
933
934    pub fn outboard_reader(&self) -> OutboardReader {
935        let entry = self.0.state.borrow();
936        let hash = self.hash.into();
937        let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
938        OutboardReader {
939            hash,
940            tree,
941            data: self.clone(),
942        }
943    }
944}
945
946impl Default for BaoFileStorage {
947    fn default() -> Self {
948        Self::Partial(Default::default())
949    }
950}
951
952impl BaoFileStorage {
953    fn data(&self) -> &[u8] {
954        match self {
955            Self::Partial(entry) => entry.data.as_ref(),
956            Self::Complete(entry) => &entry.data,
957        }
958    }
959
960    fn outboard(&self) -> &[u8] {
961        match self {
962            Self::Partial(entry) => entry.outboard.as_ref(),
963            Self::Complete(entry) => &entry.outboard,
964        }
965    }
966
967    fn size(&self) -> u64 {
968        match self {
969            Self::Partial(entry) => entry.current_size(),
970            Self::Complete(entry) => entry.size(),
971        }
972    }
973}
974
975#[derive(Debug, Clone)]
976pub struct CompleteStorage {
977    pub(crate) data: Bytes,
978    pub(crate) outboard: Bytes,
979}
980
981impl CompleteStorage {
982    pub fn create(data: Bytes) -> (Hash, Self) {
983        let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
984        let hash = outboard.root().into();
985        let outboard = outboard.data.into();
986        let entry = Self::new(data, outboard);
987        (hash, entry)
988    }
989
990    pub fn new(data: Bytes, outboard: Bytes) -> Self {
991        Self { data, outboard }
992    }
993
994    pub fn size(&self) -> u64 {
995        self.data.len() as u64
996    }
997}
998
999#[allow(dead_code)]
1000fn print_outboard(hashes: &[u8]) {
1001    assert!(hashes.len() % 64 == 0);
1002    for chunk in hashes.chunks(64) {
1003        let left: [u8; 32] = chunk[..32].try_into().unwrap();
1004        let right: [u8; 32] = chunk[32..].try_into().unwrap();
1005        let left = blake3::Hash::from(left);
1006        let right = blake3::Hash::from(right);
1007        println!("l: {left:?}, r: {right:?}");
1008    }
1009}
1010
1011pub struct BaoFileStorageSubscriber {
1012    receiver: watch::Receiver<BaoFileStorage>,
1013}
1014
1015impl BaoFileStorageSubscriber {
1016    pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1017        Self { receiver }
1018    }
1019
1020    /// Forward observed *values* to the given sender
1021    ///
1022    /// Returns an error if sending fails, or if the last sender is dropped
1023    pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1024        let value = self.receiver.borrow().bitfield();
1025        tx.send(value).await?;
1026        loop {
1027            self.update_or_closed(&mut tx).await?;
1028            let value = self.receiver.borrow().bitfield();
1029            tx.send(value.clone()).await?;
1030        }
1031    }
1032
1033    /// Forward observed *deltas* to the given sender
1034    ///
1035    /// Returns an error if sending fails, or if the last sender is dropped
1036    #[allow(dead_code)]
1037    pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1038        let value = self.receiver.borrow().bitfield();
1039        let mut old = value.clone();
1040        tx.send(value).await?;
1041        loop {
1042            self.update_or_closed(&mut tx).await?;
1043            let new = self.receiver.borrow().bitfield();
1044            let diff = old.diff(&new);
1045            if diff.is_empty() {
1046                continue;
1047            }
1048            tx.send(diff).await?;
1049            old = new;
1050        }
1051    }
1052
1053    async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1054        tokio::select! {
1055            _ = tx.closed() => {
1056                // the sender is closed, we are done
1057                Err(irpc::channel::SendError::ReceiverClosed.into())
1058            }
1059            e = self.receiver.changed() => Ok(e?),
1060        }
1061    }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066    use n0_future::StreamExt;
1067    use testresult::TestResult;
1068
1069    use super::*;
1070
1071    #[tokio::test]
1072    async fn smoke() -> TestResult<()> {
1073        let store = MemStore::new();
1074        let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1075        let hash = *tt.hash();
1076        println!("hash: {hash:?}");
1077        let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1078        while let Some(item) = stream.next().await {
1079            println!("item: {item:?}");
1080        }
1081        let stream = store.export_bao(hash, ChunkRanges::all());
1082        let exported = stream.bao_to_vec().await?;
1083
1084        let store2 = MemStore::new();
1085        let mut or = store2.observe(hash).stream().await?;
1086        tokio::spawn(async move {
1087            while let Some(event) = or.next().await {
1088                println!("event: {event:?}");
1089            }
1090        });
1091        store2
1092            .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1093            .await?;
1094
1095        let exported2 = store2
1096            .export_bao(hash, ChunkRanges::all())
1097            .bao_to_vec()
1098            .await?;
1099        assert_eq!(exported, exported2);
1100
1101        Ok(())
1102    }
1103}