1use std::{
11 collections::{BTreeMap, HashMap, HashSet},
12 future::Future,
13 io::{self, Write},
14 num::NonZeroU64,
15 ops::Deref,
16 sync::Arc,
17};
18
19use bao_tree::{
20 blake3,
21 io::{
22 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
23 outboard::PreOrderMemOutboard,
24 sync::{Outboard, ReadAt, WriteAt},
25 BaoContentItem, EncodeError, Leaf,
26 },
27 BaoTree, ChunkNum, ChunkRanges, TreeNode,
28};
29use bytes::Bytes;
30use irpc::channel::mpsc;
31use n0_future::{
32 future::yield_now,
33 task::{JoinError, JoinSet},
34 time::SystemTime,
35};
36use range_collections::range_set::RangeSetRange;
37use tokio::sync::watch;
38use tracing::{error, info, instrument, trace, Instrument};
39
40use super::util::{BaoTreeSender, PartialMemStorage};
41use crate::{
42 api::{
43 self,
44 blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
45 proto::{
46 BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
47 CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
48 DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
49 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
50 ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
51 ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
52 ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
53 SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
54 },
55 tags::TagInfo,
56 ApiClient,
57 },
58 protocol::ChunkRangesExt,
59 store::{
60 gc::{run_gc, GcConfig},
61 util::{SizeInfo, SparseMemFile, Tag},
62 IROH_BLOCK_SIZE,
63 },
64 util::temp_tag::{TagDrop, TempTagScope, TempTags},
65 BlobFormat, Hash, HashAndFormat,
66};
67
68#[derive(Debug, Default)]
69pub struct Options {
70 pub gc_config: Option<GcConfig>,
71}
72
73#[derive(Debug, Clone)]
74#[repr(transparent)]
75pub struct MemStore {
76 client: ApiClient,
77}
78
79impl From<MemStore> for crate::api::Store {
80 fn from(value: MemStore) -> Self {
81 crate::api::Store::from_sender(value.client)
82 }
83}
84
85impl AsRef<crate::api::Store> for MemStore {
86 fn as_ref(&self) -> &crate::api::Store {
87 crate::api::Store::ref_from_sender(&self.client)
88 }
89}
90
91impl Deref for MemStore {
92 type Target = crate::api::Store;
93
94 fn deref(&self) -> &Self::Target {
95 crate::api::Store::ref_from_sender(&self.client)
96 }
97}
98
99impl Default for MemStore {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105#[derive(derive_more::From)]
106enum TaskResult {
107 Unit(()),
108 Import(anyhow::Result<ImportEntry>),
109 Scope(Scope),
110}
111
112impl MemStore {
113 pub fn from_sender(client: ApiClient) -> Self {
114 Self { client }
115 }
116
117 pub fn new() -> Self {
118 Self::new_with_opts(Options::default())
119 }
120
121 pub fn new_with_opts(opts: Options) -> Self {
122 let (sender, receiver) = tokio::sync::mpsc::channel(32);
123 n0_future::task::spawn(
124 Actor {
125 commands: receiver,
126 tasks: JoinSet::new(),
127 state: State {
128 data: HashMap::new(),
129 tags: BTreeMap::new(),
130 empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
131 },
132 options: Arc::new(Options::default()),
133 temp_tags: Default::default(),
134 protected: Default::default(),
135 idle_waiters: Default::default(),
136 }
137 .run(),
138 );
139
140 let store = Self::from_sender(sender.into());
141 if let Some(gc_config) = opts.gc_config {
142 n0_future::task::spawn(run_gc(store.deref().clone(), gc_config));
143 }
144
145 store
146 }
147}
148
149struct Actor {
150 commands: tokio::sync::mpsc::Receiver<Command>,
151 tasks: JoinSet<TaskResult>,
152 state: State,
153 #[allow(dead_code)]
154 options: Arc<Options>,
155 temp_tags: TempTags,
157 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
159 protected: HashSet<Hash>,
160}
161
162impl Actor {
163 fn spawn<F, T>(&mut self, f: F)
164 where
165 F: Future<Output = T> + Send + 'static,
166 T: Into<TaskResult>,
167 {
168 let span = tracing::Span::current();
169 let fut = async move { f.await.into() }.instrument(span);
170 self.tasks.spawn(fut);
171 }
172
173 async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
174 match cmd {
175 Command::ImportBao(ImportBaoMsg {
176 inner: ImportBaoRequest { hash, size },
177 rx: data,
178 tx,
179 ..
180 }) => {
181 let entry = self.get_or_create_entry(hash);
182 self.spawn(import_bao(entry, size, data, tx));
183 }
184 Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
185 trace!("wait idle");
186 if self.tasks.is_empty() {
187 tx.send(()).await.ok();
189 } else {
190 self.idle_waiters.push(tx);
192 }
193 }
194 Command::Observe(ObserveMsg {
195 inner: ObserveRequest { hash },
196 tx,
197 ..
198 }) => {
199 let entry = self.get_or_create_entry(hash);
200 self.spawn(observe(entry, tx));
201 }
202 Command::ImportBytes(ImportBytesMsg {
203 inner:
204 ImportBytesRequest {
205 data,
206 scope,
207 format,
208 ..
209 },
210 tx,
211 ..
212 }) => {
213 self.spawn(import_bytes(data, scope, format, tx));
214 }
215 Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
216 self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
217 }
218 Command::ImportPath(cmd) => {
219 self.spawn(import_path(cmd));
220 }
221 Command::ExportBao(ExportBaoMsg {
222 inner: ExportBaoRequest { hash, ranges },
223 tx,
224 ..
225 }) => {
226 let entry = self.get(&hash);
227 self.spawn(export_bao(entry, ranges, tx))
228 }
229 Command::ExportPath(cmd) => {
230 let entry = self.get(&cmd.hash);
231 self.spawn(export_path(entry, cmd));
232 }
233 Command::DeleteTags(cmd) => {
234 let DeleteTagsMsg {
235 inner: DeleteTagsRequest { from, to },
236 tx,
237 ..
238 } = cmd;
239 info!("deleting tags from {:?} to {:?}", from, to);
240 let mut deleted = 0;
243 self.state.tags.retain(|tag, _| {
244 if let Some(from) = &from {
245 if tag < from {
246 return true;
247 }
248 }
249 if let Some(to) = &to {
250 if tag >= to {
251 return true;
252 }
253 }
254 info!(" removing {:?}", tag);
255 deleted += 1;
256 false
257 });
258 tx.send(Ok(deleted)).await.ok();
259 }
260 Command::RenameTag(cmd) => {
261 let RenameTagMsg {
262 inner: RenameTagRequest { from, to },
263 tx,
264 ..
265 } = cmd;
266 let tags = &mut self.state.tags;
267 let value = match tags.remove(&from) {
268 Some(value) => value,
269 None => {
270 tx.send(Err(api::Error::io(
271 io::ErrorKind::NotFound,
272 format!("tag not found: {from:?}"),
273 )))
274 .await
275 .ok();
276 return None;
277 }
278 };
279 tags.insert(to, value);
280 tx.send(Ok(())).await.ok();
281 return None;
282 }
283 Command::ListTags(cmd) => {
284 let ListTagsMsg {
285 inner:
286 ListTagsRequest {
287 from,
288 to,
289 raw,
290 hash_seq,
291 },
292 tx,
293 ..
294 } = cmd;
295 let tags = self
296 .state
297 .tags
298 .iter()
299 .filter(move |(tag, value)| {
300 if let Some(from) = &from {
301 if tag < &from {
302 return false;
303 }
304 }
305 if let Some(to) = &to {
306 if tag >= &to {
307 return false;
308 }
309 }
310 raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
311 })
312 .map(|(tag, value)| TagInfo {
313 name: tag.clone(),
314 hash: value.hash,
315 format: value.format,
316 })
317 .map(Ok);
318 tx.send(tags.collect()).await.ok();
319 }
320 Command::SetTag(SetTagMsg {
321 inner: SetTagRequest { name: tag, value },
322 tx,
323 ..
324 }) => {
325 self.state.tags.insert(tag, value);
326 tx.send(Ok(())).await.ok();
327 }
328 Command::CreateTag(CreateTagMsg {
329 inner: CreateTagRequest { value },
330 tx,
331 ..
332 }) => {
333 let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
334 self.state.tags.insert(tag.clone(), value);
335 tx.send(Ok(tag)).await.ok();
336 }
337 Command::CreateTempTag(cmd) => {
338 trace!("{cmd:?}");
339 self.create_temp_tag(cmd).await;
340 }
341 Command::ListTempTags(cmd) => {
342 trace!("{cmd:?}");
343 let tts = self.temp_tags.list();
344 cmd.tx.send(tts).await.ok();
345 }
346 Command::ListBlobs(cmd) => {
347 let ListBlobsMsg { tx, .. } = cmd;
348 let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
349 self.spawn(async move {
350 for blob in blobs {
351 if tx.send(Ok(blob)).await.is_err() {
352 break;
353 }
354 }
355 });
356 }
357 Command::BlobStatus(cmd) => {
358 trace!("{cmd:?}");
359 let BlobStatusMsg {
360 inner: BlobStatusRequest { hash },
361 tx,
362 ..
363 } = cmd;
364 let res = match self.get(&hash) {
365 None => api::blobs::BlobStatus::NotFound,
366 Some(x) => {
367 let bitfield = x.0.state.borrow().bitfield();
368 if bitfield.is_complete() {
369 BlobStatus::Complete {
370 size: bitfield.size,
371 }
372 } else {
373 BlobStatus::Partial {
374 size: bitfield.validated_size(),
375 }
376 }
377 }
378 };
379 tx.send(res).await.ok();
380 }
381 Command::DeleteBlobs(cmd) => {
382 trace!("{cmd:?}");
383 let DeleteBlobsMsg {
384 inner: BlobDeleteRequest { hashes, force },
385 tx,
386 ..
387 } = cmd;
388 for hash in hashes {
389 if !force && self.protected.contains(&hash) {
390 continue;
391 }
392 self.state.data.remove(&hash);
393 }
394 tx.send(Ok(())).await.ok();
395 }
396 Command::Batch(cmd) => {
397 trace!("{cmd:?}");
398 let (id, scope) = self.temp_tags.create_scope();
399 self.spawn(handle_batch(cmd, id, scope));
400 }
401 Command::ClearProtected(cmd) => {
402 self.protected.clear();
403 cmd.tx.send(Ok(())).await.ok();
404 }
405 Command::ExportRanges(cmd) => {
406 let entry = self.get(&cmd.hash);
407 self.spawn(export_ranges(cmd, entry));
408 }
409 Command::SyncDb(SyncDbMsg { tx, .. }) => {
410 tx.send(Ok(())).await.ok();
411 }
412 Command::Shutdown(cmd) => {
413 return Some(cmd);
414 }
415 }
416 None
417 }
418
419 fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
420 if *hash == Hash::EMPTY {
421 Some(self.state.empty_hash.clone())
422 } else {
423 self.state.data.get(hash).cloned()
424 }
425 }
426
427 fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
428 if hash == Hash::EMPTY {
429 self.state.empty_hash.clone()
430 } else {
431 self.state
432 .data
433 .entry(hash)
434 .or_insert_with(|| BaoFileHandle::new_partial(hash))
435 .clone()
436 }
437 }
438
439 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
440 let CreateTempTagMsg { tx, inner, .. } = cmd;
441 let mut tt = self.temp_tags.create(inner.scope, inner.value);
442 if tx.is_rpc() {
443 tt.leak();
444 }
445 tx.send(tt).await.ok();
446 }
447
448 async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
449 let import_data = match res {
450 Ok(entry) => entry,
451 Err(e) => {
452 error!("import failed: {e}");
453 return;
454 }
455 };
456 let hash = import_data.outboard.root().into();
457 let entry = self.get_or_create_entry(hash);
458 entry
459 .0
460 .state
461 .send_if_modified(|state: &mut BaoFileStorage| {
462 let BaoFileStorage::Partial(_) = state.deref() else {
463 return false;
464 };
465 *state =
466 CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
467 true
468 });
469 let tt = self.temp_tags.create(
470 import_data.scope,
471 HashAndFormat {
472 hash,
473 format: import_data.format,
474 },
475 );
476 import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
477 }
478
479 fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
480 match res {
481 Ok(x) => Some(x),
482 Err(e) => {
483 if e.is_cancelled() {
484 trace!("task cancelled: {e}");
485 } else {
486 error!("task failed: {e}");
487 }
488 None
489 }
490 }
491 }
492
493 pub async fn run(mut self) {
494 let shutdown = loop {
495 tokio::select! {
496 cmd = self.commands.recv() => {
497 let Some(cmd) = cmd else {
498 break None;
501 };
502 if let Some(cmd) = self.handle_command(cmd).await {
503 break Some(cmd);
504 }
505 }
506 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
507 let Some(res) = self.log_task_result(res) else {
508 continue;
509 };
510 match res {
511 TaskResult::Import(res) => {
512 self.finish_import(res).await;
513 }
514 TaskResult::Scope(scope) => {
515 self.temp_tags.end_scope(scope);
516 }
517 TaskResult::Unit(_) => {}
518 }
519 if self.tasks.is_empty() {
520 for tx in self.idle_waiters.drain(..) {
522 tx.send(()).await.ok();
523 }
524 }
525 }
526 }
527 };
528 if let Some(shutdown) = shutdown {
529 shutdown.tx.send(()).await.ok();
530 }
531 }
532}
533
534async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
535 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
536 error!("batch failed: {cause}");
537 }
538 id
539}
540
541async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
542 let BatchMsg { tx, mut rx, .. } = cmd;
543 trace!("created scope {}", id);
544 tx.send(id).await.map_err(api::Error::other)?;
545 while let Some(msg) = rx.recv().await? {
546 match msg {
547 BatchResponse::Drop(msg) => scope.on_drop(&msg),
548 BatchResponse::Ping => {}
549 }
550 }
551 Ok(())
552}
553
554async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
555 let Some(entry) = entry else {
556 let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
557 cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
558 return;
559 };
560 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
561 cmd.tx
562 .send(ExportRangesItem::Error(cause.into()))
563 .await
564 .ok();
565 }
566}
567
568async fn export_ranges_impl(
569 cmd: ExportRangesRequest,
570 tx: &mut mpsc::Sender<ExportRangesItem>,
571 entry: BaoFileHandle,
572) -> io::Result<()> {
573 let ExportRangesRequest { ranges, hash } = cmd;
574 let bitfield = entry.bitfield();
575 trace!(
576 "exporting ranges: {hash} {ranges:?} size={}",
577 bitfield.size()
578 );
579 debug_assert!(entry.hash() == hash, "hash mismatch");
580 let data = entry.data_reader();
581 let size = bitfield.size();
582 for range in ranges.iter() {
583 let range = match range {
584 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
585 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
586 };
587 let requested = ChunkRanges::bytes(range.start..range.end);
588 if !bitfield.ranges.is_superset(&requested) {
589 return Err(io::Error::other(format!(
590 "missing range: {requested:?}, present: {bitfield:?}",
591 )));
592 }
593 let bs = 1024;
594 let mut offset = range.start;
595 loop {
596 let end: u64 = (offset + bs).min(range.end);
597 let size = (end - offset) as usize;
598 tx.send(
599 Leaf {
600 offset,
601 data: data.read_bytes_at(offset, size)?,
602 }
603 .into(),
604 )
605 .await?;
606 offset = end;
607 if offset >= range.end {
608 break;
609 }
610 }
611 }
612 Ok(())
613}
614
615fn chunk_range(leaf: &Leaf) -> ChunkRanges {
616 let start = ChunkNum::chunks(leaf.offset);
617 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
618 (start..end).into()
619}
620
621async fn import_bao(
622 entry: BaoFileHandle,
623 size: NonZeroU64,
624 mut stream: mpsc::Receiver<BaoContentItem>,
625 tx: irpc::channel::oneshot::Sender<api::Result<()>>,
626) {
627 let size = size.get();
628 entry
629 .0
630 .state
631 .send_if_modified(|state: &mut BaoFileStorage| {
632 let BaoFileStorage::Partial(entry) = state else {
633 return false;
635 };
636 entry.size.write(0, size);
637 false
638 });
639 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
640 while let Some(item) = stream.recv().await.unwrap() {
641 entry.0.state.send_if_modified(|state| {
642 let BaoFileStorage::Partial(partial) = state else {
643 return false;
645 };
646 match item {
647 BaoContentItem::Parent(parent) => {
648 if let Some(offset) = tree.pre_order_offset(parent.node) {
649 let mut pair = [0u8; 64];
650 pair[..32].copy_from_slice(parent.pair.0.as_bytes());
651 pair[32..].copy_from_slice(parent.pair.1.as_bytes());
652 partial
653 .outboard
654 .write_at(offset * 64, &pair)
655 .expect("writing to mem can never fail");
656 }
657 false
658 }
659 BaoContentItem::Leaf(leaf) => {
660 let start = leaf.offset;
661 partial
662 .data
663 .write_at(start, &leaf.data)
664 .expect("writing to mem can never fail");
665 let added = chunk_range(&leaf);
666 let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
667 if update.new_state().complete {
668 let data = std::mem::take(&mut partial.data);
669 let outboard = std::mem::take(&mut partial.outboard);
670 let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
671 let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
672 *state = CompleteStorage::new(data, outboard).into();
673 }
674 update.changed()
675 }
676 }
677 });
678 }
679 tx.send(Ok(())).await.ok();
680}
681
682#[instrument(skip_all, fields(hash = tracing::field::Empty))]
683async fn export_bao(
684 entry: Option<BaoFileHandle>,
685 ranges: ChunkRanges,
686 mut sender: mpsc::Sender<EncodedItem>,
687) {
688 let Some(entry) = entry else {
689 let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
690 sender.send(err.into()).await.ok();
691 return;
692 };
693 tracing::Span::current().record("hash", tracing::field::display(entry.hash));
694 let data = entry.data_reader();
695 let outboard = entry.outboard_reader();
696 let tx = BaoTreeSender::new(&mut sender);
697 traverse_ranges_validated(data, outboard, &ranges, tx)
698 .await
699 .ok();
700}
701
702#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
703async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
704 entry.subscribe().forward(tx).await.ok();
705}
706
707async fn import_bytes(
708 data: Bytes,
709 scope: Scope,
710 format: BlobFormat,
711 tx: mpsc::Sender<AddProgressItem>,
712) -> anyhow::Result<ImportEntry> {
713 tx.send(AddProgressItem::Size(data.len() as u64)).await?;
714 tx.send(AddProgressItem::CopyDone).await?;
715 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
716 Ok(ImportEntry {
717 data,
718 outboard,
719 scope,
720 format,
721 tx,
722 })
723}
724
725async fn import_byte_stream(
726 scope: Scope,
727 format: BlobFormat,
728 mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
729 tx: mpsc::Sender<AddProgressItem>,
730) -> anyhow::Result<ImportEntry> {
731 let mut res = Vec::new();
732 loop {
733 match rx.recv().await {
734 Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
735 res.extend_from_slice(&data);
736 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
737 .await?;
738 }
739 Ok(Some(ImportByteStreamUpdate::Done)) => {
740 break;
741 }
742 Ok(None) => {
743 return Err(api::Error::io(
744 io::ErrorKind::UnexpectedEof,
745 "byte stream ended unexpectedly",
746 )
747 .into());
748 }
749 Err(e) => {
750 return Err(e.into());
751 }
752 }
753 }
754 import_bytes(res.into(), scope, format, tx).await
755}
756
757#[cfg(wasm_browser)]
758async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
759 let _: ImportPathRequest = cmd.inner;
760 Err(anyhow::anyhow!(
761 "import_path is not supported in the browser"
762 ))
763}
764
765#[instrument(skip_all, fields(path = %cmd.path.display()))]
766#[cfg(not(wasm_browser))]
767async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
768 use tokio::io::AsyncReadExt;
769 let ImportPathMsg {
770 inner:
771 ImportPathRequest {
772 path,
773 scope,
774 format,
775 ..
776 },
777 tx,
778 ..
779 } = cmd;
780 let mut res = Vec::new();
781 let mut file = tokio::fs::File::open(path).await?;
782 let mut buf = [0u8; 1024 * 64];
783 loop {
784 let size = file.read(&mut buf).await?;
785 if size == 0 {
786 break;
787 }
788 res.extend_from_slice(&buf[..size]);
789 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
790 .await?;
791 }
792 import_bytes(res.into(), scope, format, tx).await
793}
794
795#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
796async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
797 let ExportPathMsg { inner, mut tx, .. } = cmd;
798 let Some(entry) = entry else {
799 tx.send(ExportProgressItem::Error(api::Error::io(
800 io::ErrorKind::NotFound,
801 "hash not found",
802 )))
803 .await
804 .ok();
805 return;
806 };
807 match export_path_impl(entry, inner, &mut tx).await {
808 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
809 Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
810 };
811}
812
813async fn export_path_impl(
814 entry: BaoFileHandle,
815 cmd: ExportPathRequest,
816 tx: &mut mpsc::Sender<ExportProgressItem>,
817) -> io::Result<()> {
818 let ExportPathRequest { target, .. } = cmd;
819 if !target.is_absolute() {
820 return Err(io::Error::new(
821 io::ErrorKind::InvalidInput,
822 "path is not absolute",
823 ));
824 }
825 if let Some(parent) = target.parent() {
826 std::fs::create_dir_all(parent)?;
827 }
828 let mut file = std::fs::File::create(target)?;
830 let size = entry.0.state.borrow().size();
831 tx.send(ExportProgressItem::Size(size)).await?;
832 let mut buf = [0u8; 1024 * 64];
833 for offset in (0..size).step_by(1024 * 64) {
834 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
835 let buf = &mut buf[..len];
836 entry.0.state.borrow().data().read_exact_at(offset, buf)?;
837 file.write_all(buf)?;
838 tx.try_send(ExportProgressItem::CopyProgress(offset))
839 .await
840 .map_err(|_e| io::Error::other(""))?;
841 yield_now().await;
842 }
843 Ok(())
844}
845
846struct ImportEntry {
847 scope: Scope,
848 format: BlobFormat,
849 data: Bytes,
850 outboard: PreOrderMemOutboard,
851 tx: mpsc::Sender<AddProgressItem>,
852}
853
854pub struct DataReader(BaoFileHandle);
855
856impl ReadBytesAt for DataReader {
857 fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
858 let entry = self.0 .0.state.borrow();
859 entry.data().read_bytes_at(offset, size)
860 }
861}
862
863pub struct OutboardReader {
864 hash: blake3::Hash,
865 tree: BaoTree,
866 data: BaoFileHandle,
867}
868
869impl Outboard for OutboardReader {
870 fn root(&self) -> blake3::Hash {
871 self.hash
872 }
873
874 fn tree(&self) -> BaoTree {
875 self.tree
876 }
877
878 fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
879 let Some(offset) = self.tree.pre_order_offset(node) else {
880 return Ok(None);
881 };
882 let mut buf = [0u8; 64];
883 let size = self
884 .data
885 .0
886 .state
887 .borrow()
888 .outboard()
889 .read_at(offset * 64, &mut buf)?;
890 if size != 64 {
891 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
892 }
893 let left: [u8; 32] = buf[..32].try_into().unwrap();
894 let right: [u8; 32] = buf[32..].try_into().unwrap();
895 Ok(Some((left.into(), right.into())))
896 }
897}
898
899struct State {
900 data: HashMap<Hash, BaoFileHandle>,
901 tags: BTreeMap<Tag, HashAndFormat>,
902 empty_hash: BaoFileHandle,
903}
904
905#[derive(Debug, derive_more::From)]
906pub enum BaoFileStorage {
907 Partial(PartialMemStorage),
908 Complete(CompleteStorage),
909}
910
911impl BaoFileStorage {
912 pub fn bitfield(&self) -> Bitfield {
914 match self {
915 Self::Partial(entry) => entry.bitfield.clone(),
916 Self::Complete(entry) => Bitfield::complete(entry.size()),
917 }
918 }
919}
920
921#[derive(Debug)]
922pub struct BaoFileHandleInner {
923 state: watch::Sender<BaoFileStorage>,
924 hash: Hash,
925}
926
927#[derive(Debug, Clone, derive_more::Deref)]
929pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
930
931impl BaoFileHandle {
932 pub fn new_partial(hash: Hash) -> Self {
933 let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
934 data: SparseMemFile::new(),
935 outboard: SparseMemFile::new(),
936 size: SizeInfo::default(),
937 bitfield: Bitfield::empty(),
938 }));
939 Self(Arc::new(BaoFileHandleInner { state, hash }))
940 }
941
942 pub fn hash(&self) -> Hash {
943 self.hash
944 }
945
946 pub fn bitfield(&self) -> Bitfield {
947 self.0.state.borrow().bitfield()
948 }
949
950 pub fn subscribe(&self) -> BaoFileStorageSubscriber {
951 BaoFileStorageSubscriber::new(self.0.state.subscribe())
952 }
953
954 pub fn data_reader(&self) -> DataReader {
955 DataReader(self.clone())
956 }
957
958 pub fn outboard_reader(&self) -> OutboardReader {
959 let entry = self.0.state.borrow();
960 let hash = self.hash.into();
961 let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
962 OutboardReader {
963 hash,
964 tree,
965 data: self.clone(),
966 }
967 }
968}
969
970impl Default for BaoFileStorage {
971 fn default() -> Self {
972 Self::Partial(Default::default())
973 }
974}
975
976impl BaoFileStorage {
977 fn data(&self) -> &[u8] {
978 match self {
979 Self::Partial(entry) => entry.data.as_ref(),
980 Self::Complete(entry) => &entry.data,
981 }
982 }
983
984 fn outboard(&self) -> &[u8] {
985 match self {
986 Self::Partial(entry) => entry.outboard.as_ref(),
987 Self::Complete(entry) => &entry.outboard,
988 }
989 }
990
991 fn size(&self) -> u64 {
992 match self {
993 Self::Partial(entry) => entry.current_size(),
994 Self::Complete(entry) => entry.size(),
995 }
996 }
997}
998
999#[derive(Debug, Clone)]
1000pub struct CompleteStorage {
1001 pub(crate) data: Bytes,
1002 pub(crate) outboard: Bytes,
1003}
1004
1005impl CompleteStorage {
1006 pub fn create(data: Bytes) -> (Hash, Self) {
1007 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
1008 let hash = outboard.root().into();
1009 let outboard = outboard.data.into();
1010 let entry = Self::new(data, outboard);
1011 (hash, entry)
1012 }
1013
1014 pub fn new(data: Bytes, outboard: Bytes) -> Self {
1015 Self { data, outboard }
1016 }
1017
1018 pub fn size(&self) -> u64 {
1019 self.data.len() as u64
1020 }
1021}
1022
1023#[allow(dead_code)]
1024fn print_outboard(hashes: &[u8]) {
1025 assert!(hashes.len() % 64 == 0);
1026 for chunk in hashes.chunks(64) {
1027 let left: [u8; 32] = chunk[..32].try_into().unwrap();
1028 let right: [u8; 32] = chunk[32..].try_into().unwrap();
1029 let left = blake3::Hash::from(left);
1030 let right = blake3::Hash::from(right);
1031 println!("l: {left:?}, r: {right:?}");
1032 }
1033}
1034
1035pub struct BaoFileStorageSubscriber {
1036 receiver: watch::Receiver<BaoFileStorage>,
1037}
1038
1039impl BaoFileStorageSubscriber {
1040 pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1041 Self { receiver }
1042 }
1043
1044 pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1048 let value = self.receiver.borrow().bitfield();
1049 tx.send(value).await?;
1050 loop {
1051 self.update_or_closed(&mut tx).await?;
1052 let value = self.receiver.borrow().bitfield();
1053 tx.send(value.clone()).await?;
1054 }
1055 }
1056
1057 #[allow(dead_code)]
1061 pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1062 let value = self.receiver.borrow().bitfield();
1063 let mut old = value.clone();
1064 tx.send(value).await?;
1065 loop {
1066 self.update_or_closed(&mut tx).await?;
1067 let new = self.receiver.borrow().bitfield();
1068 let diff = old.diff(&new);
1069 if diff.is_empty() {
1070 continue;
1071 }
1072 tx.send(diff).await?;
1073 old = new;
1074 }
1075 }
1076
1077 async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1078 tokio::select! {
1079 _ = tx.closed() => {
1080 Err(irpc::channel::SendError::ReceiverClosed.into())
1082 }
1083 e = self.receiver.changed() => Ok(e?),
1084 }
1085 }
1086}
1087
1088#[cfg(test)]
1089mod tests {
1090 use n0_future::StreamExt;
1091 use testresult::TestResult;
1092
1093 use super::*;
1094
1095 #[tokio::test]
1096 async fn smoke() -> TestResult<()> {
1097 let store = MemStore::new();
1098 let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1099 let hash = tt.hash();
1100 println!("hash: {hash:?}");
1101 let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1102 while let Some(item) = stream.next().await {
1103 println!("item: {item:?}");
1104 }
1105 let stream = store.export_bao(hash, ChunkRanges::all());
1106 let exported = stream.bao_to_vec().await?;
1107
1108 let store2 = MemStore::new();
1109 let mut or = store2.observe(hash).stream().await?;
1110 n0_future::task::spawn(async move {
1111 while let Some(event) = or.next().await {
1112 println!("event: {event:?}");
1113 }
1114 });
1115 store2
1116 .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1117 .await?;
1118
1119 let exported2 = store2
1120 .export_bao(hash, ChunkRanges::all())
1121 .bao_to_vec()
1122 .await?;
1123 assert_eq!(exported, exported2);
1124
1125 Ok(())
1126 }
1127}