1use std::{
11 collections::{BTreeMap, HashMap, HashSet},
12 future::Future,
13 io::{self, Write},
14 num::NonZeroU64,
15 ops::Deref,
16 sync::Arc,
17 time::SystemTime,
18};
19
20use bao_tree::{
21 blake3,
22 io::{
23 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24 outboard::PreOrderMemOutboard,
25 sync::{Outboard, ReadAt, WriteAt},
26 BaoContentItem, EncodeError, Leaf,
27 },
28 BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_error::{Result, StdResultExt};
33use n0_future::future::yield_now;
34use range_collections::range_set::RangeSetRange;
35use tokio::{
36 io::AsyncReadExt,
37 sync::watch,
38 task::{JoinError, JoinSet},
39};
40use tracing::{error, info, instrument, trace, Instrument};
41
42use super::util::{BaoTreeSender, PartialMemStorage};
43use crate::{
44 api::{
45 self,
46 blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
47 proto::{
48 BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
49 CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
50 DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
51 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
52 ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
53 ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
54 ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
55 SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
56 },
57 tags::TagInfo,
58 ApiClient,
59 },
60 protocol::ChunkRangesExt,
61 store::{
62 gc::{run_gc, GcConfig},
63 util::{SizeInfo, SparseMemFile, Tag},
64 IROH_BLOCK_SIZE,
65 },
66 util::temp_tag::{TagDrop, TempTagScope, TempTags},
67 BlobFormat, Hash, HashAndFormat,
68};
69
70#[derive(Debug, Default)]
71pub struct Options {
72 pub gc_config: Option<GcConfig>,
73}
74
75#[derive(Debug, Clone)]
76#[repr(transparent)]
77pub struct MemStore {
78 client: ApiClient,
79}
80
81impl From<MemStore> for crate::api::Store {
82 fn from(value: MemStore) -> Self {
83 crate::api::Store::from_sender(value.client)
84 }
85}
86
87impl AsRef<crate::api::Store> for MemStore {
88 fn as_ref(&self) -> &crate::api::Store {
89 crate::api::Store::ref_from_sender(&self.client)
90 }
91}
92
93impl Deref for MemStore {
94 type Target = crate::api::Store;
95
96 fn deref(&self) -> &Self::Target {
97 crate::api::Store::ref_from_sender(&self.client)
98 }
99}
100
101impl Default for MemStore {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107#[derive(derive_more::From)]
108enum TaskResult {
109 Unit(()),
110 Import(Result<ImportEntry>),
111 Scope(Scope),
112}
113
114impl MemStore {
115 pub fn from_sender(client: ApiClient) -> Self {
116 Self { client }
117 }
118
119 pub fn new() -> Self {
120 Self::new_with_opts(Options::default())
121 }
122
123 pub fn new_with_opts(opts: Options) -> Self {
124 let (sender, receiver) = tokio::sync::mpsc::channel(32);
125 tokio::spawn(
126 Actor {
127 commands: receiver,
128 tasks: JoinSet::new(),
129 state: State {
130 data: HashMap::new(),
131 tags: BTreeMap::new(),
132 empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
133 },
134 options: Arc::new(Options::default()),
135 temp_tags: Default::default(),
136 protected: Default::default(),
137 idle_waiters: Default::default(),
138 }
139 .run(),
140 );
141
142 let store = Self::from_sender(sender.into());
143 if let Some(gc_config) = opts.gc_config {
144 tokio::spawn(run_gc(store.deref().clone(), gc_config));
145 }
146
147 store
148 }
149}
150
151struct Actor {
152 commands: tokio::sync::mpsc::Receiver<Command>,
153 tasks: JoinSet<TaskResult>,
154 state: State,
155 #[allow(dead_code)]
156 options: Arc<Options>,
157 temp_tags: TempTags,
159 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
161 protected: HashSet<Hash>,
162}
163
164impl Actor {
165 fn spawn<F, T>(&mut self, f: F)
166 where
167 F: Future<Output = T> + Send + 'static,
168 T: Into<TaskResult>,
169 {
170 let span = tracing::Span::current();
171 let fut = async move { f.await.into() }.instrument(span);
172 self.tasks.spawn(fut);
173 }
174
175 async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
176 match cmd {
177 Command::ImportBao(ImportBaoMsg {
178 inner: ImportBaoRequest { hash, size },
179 rx: data,
180 tx,
181 ..
182 }) => {
183 let entry = self.get_or_create_entry(hash);
184 self.spawn(import_bao(entry, size, data, tx));
185 }
186 Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
187 trace!("wait idle");
188 if self.tasks.is_empty() {
189 tx.send(()).await.ok();
191 } else {
192 self.idle_waiters.push(tx);
194 }
195 }
196 Command::Observe(ObserveMsg {
197 inner: ObserveRequest { hash },
198 tx,
199 ..
200 }) => {
201 let entry = self.get_or_create_entry(hash);
202 self.spawn(observe(entry, tx));
203 }
204 Command::ImportBytes(ImportBytesMsg {
205 inner:
206 ImportBytesRequest {
207 data,
208 scope,
209 format,
210 ..
211 },
212 tx,
213 ..
214 }) => {
215 self.spawn(import_bytes(data, scope, format, tx));
216 }
217 Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
218 self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
219 }
220 Command::ImportPath(cmd) => {
221 self.spawn(import_path(cmd));
222 }
223 Command::ExportBao(ExportBaoMsg {
224 inner: ExportBaoRequest { hash, ranges },
225 tx,
226 ..
227 }) => {
228 let entry = self.get(&hash);
229 self.spawn(export_bao(entry, ranges, tx))
230 }
231 Command::ExportPath(cmd) => {
232 let entry = self.get(&cmd.hash);
233 self.spawn(export_path(entry, cmd));
234 }
235 Command::DeleteTags(cmd) => {
236 let DeleteTagsMsg {
237 inner: DeleteTagsRequest { from, to },
238 tx,
239 ..
240 } = cmd;
241 info!("deleting tags from {:?} to {:?}", from, to);
242 let mut deleted = 0;
245 self.state.tags.retain(|tag, _| {
246 if let Some(from) = &from {
247 if tag < from {
248 return true;
249 }
250 }
251 if let Some(to) = &to {
252 if tag >= to {
253 return true;
254 }
255 }
256 info!(" removing {:?}", tag);
257 deleted += 1;
258 false
259 });
260 tx.send(Ok(deleted)).await.ok();
261 }
262 Command::RenameTag(cmd) => {
263 let RenameTagMsg {
264 inner: RenameTagRequest { from, to },
265 tx,
266 ..
267 } = cmd;
268 let tags = &mut self.state.tags;
269 let value = match tags.remove(&from) {
270 Some(value) => value,
271 None => {
272 tx.send(Err(api::Error::io(
273 io::ErrorKind::NotFound,
274 format!("tag not found: {from:?}"),
275 )))
276 .await
277 .ok();
278 return None;
279 }
280 };
281 tags.insert(to, value);
282 tx.send(Ok(())).await.ok();
283 return None;
284 }
285 Command::ListTags(cmd) => {
286 let ListTagsMsg {
287 inner:
288 ListTagsRequest {
289 from,
290 to,
291 raw,
292 hash_seq,
293 },
294 tx,
295 ..
296 } = cmd;
297 let tags = self
298 .state
299 .tags
300 .iter()
301 .filter(move |(tag, value)| {
302 if let Some(from) = &from {
303 if tag < &from {
304 return false;
305 }
306 }
307 if let Some(to) = &to {
308 if tag >= &to {
309 return false;
310 }
311 }
312 raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
313 })
314 .map(|(tag, value)| TagInfo {
315 name: tag.clone(),
316 hash: value.hash,
317 format: value.format,
318 })
319 .map(Ok);
320 tx.send(tags.collect()).await.ok();
321 }
322 Command::SetTag(SetTagMsg {
323 inner: SetTagRequest { name: tag, value },
324 tx,
325 ..
326 }) => {
327 self.state.tags.insert(tag, value);
328 tx.send(Ok(())).await.ok();
329 }
330 Command::CreateTag(CreateTagMsg {
331 inner: CreateTagRequest { value },
332 tx,
333 ..
334 }) => {
335 let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
336 self.state.tags.insert(tag.clone(), value);
337 tx.send(Ok(tag)).await.ok();
338 }
339 Command::CreateTempTag(cmd) => {
340 trace!("{cmd:?}");
341 self.create_temp_tag(cmd).await;
342 }
343 Command::ListTempTags(cmd) => {
344 trace!("{cmd:?}");
345 let tts = self.temp_tags.list();
346 cmd.tx.send(tts).await.ok();
347 }
348 Command::ListBlobs(cmd) => {
349 let ListBlobsMsg { tx, .. } = cmd;
350 let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
351 self.spawn(async move {
352 for blob in blobs {
353 if tx.send(Ok(blob)).await.is_err() {
354 break;
355 }
356 }
357 });
358 }
359 Command::BlobStatus(cmd) => {
360 trace!("{cmd:?}");
361 let BlobStatusMsg {
362 inner: BlobStatusRequest { hash },
363 tx,
364 ..
365 } = cmd;
366 let res = match self.get(&hash) {
367 None => api::blobs::BlobStatus::NotFound,
368 Some(x) => {
369 let bitfield = x.0.state.borrow().bitfield();
370 if bitfield.is_complete() {
371 BlobStatus::Complete {
372 size: bitfield.size,
373 }
374 } else {
375 BlobStatus::Partial {
376 size: bitfield.validated_size(),
377 }
378 }
379 }
380 };
381 tx.send(res).await.ok();
382 }
383 Command::DeleteBlobs(cmd) => {
384 trace!("{cmd:?}");
385 let DeleteBlobsMsg {
386 inner: BlobDeleteRequest { hashes, force },
387 tx,
388 ..
389 } = cmd;
390 for hash in hashes {
391 if !force && self.protected.contains(&hash) {
392 continue;
393 }
394 self.state.data.remove(&hash);
395 }
396 tx.send(Ok(())).await.ok();
397 }
398 Command::Batch(cmd) => {
399 trace!("{cmd:?}");
400 let (id, scope) = self.temp_tags.create_scope();
401 self.spawn(handle_batch(cmd, id, scope));
402 }
403 Command::ClearProtected(cmd) => {
404 self.protected.clear();
405 cmd.tx.send(Ok(())).await.ok();
406 }
407 Command::ExportRanges(cmd) => {
408 let entry = self.get(&cmd.hash);
409 self.spawn(export_ranges(cmd, entry));
410 }
411 Command::SyncDb(SyncDbMsg { tx, .. }) => {
412 tx.send(Ok(())).await.ok();
413 }
414 Command::Shutdown(cmd) => {
415 return Some(cmd);
416 }
417 }
418 None
419 }
420
421 fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
422 if *hash == Hash::EMPTY {
423 Some(self.state.empty_hash.clone())
424 } else {
425 self.state.data.get(hash).cloned()
426 }
427 }
428
429 fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
430 if hash == Hash::EMPTY {
431 self.state.empty_hash.clone()
432 } else {
433 self.state
434 .data
435 .entry(hash)
436 .or_insert_with(|| BaoFileHandle::new_partial(hash))
437 .clone()
438 }
439 }
440
441 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
442 let CreateTempTagMsg { tx, inner, .. } = cmd;
443 let mut tt = self.temp_tags.create(inner.scope, inner.value);
444 if tx.is_rpc() {
445 tt.leak();
446 }
447 tx.send(tt).await.ok();
448 }
449
450 async fn finish_import(&mut self, res: Result<ImportEntry>) {
451 let import_data = match res {
452 Ok(entry) => entry,
453 Err(e) => {
454 error!("import failed: {e}");
455 return;
456 }
457 };
458 let hash = import_data.outboard.root().into();
459 let entry = self.get_or_create_entry(hash);
460 entry
461 .0
462 .state
463 .send_if_modified(|state: &mut BaoFileStorage| {
464 let BaoFileStorage::Partial(_) = state.deref() else {
465 return false;
466 };
467 *state =
468 CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
469 true
470 });
471 let tt = self.temp_tags.create(
472 import_data.scope,
473 HashAndFormat {
474 hash,
475 format: import_data.format,
476 },
477 );
478 import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
479 }
480
481 fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
482 match res {
483 Ok(x) => Some(x),
484 Err(e) => {
485 if e.is_cancelled() {
486 trace!("task cancelled: {e}");
487 } else {
488 error!("task failed: {e}");
489 }
490 None
491 }
492 }
493 }
494
495 pub async fn run(mut self) {
496 let shutdown = loop {
497 tokio::select! {
498 cmd = self.commands.recv() => {
499 let Some(cmd) = cmd else {
500 break None;
503 };
504 if let Some(cmd) = self.handle_command(cmd).await {
505 break Some(cmd);
506 }
507 }
508 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
509 let Some(res) = self.log_task_result(res) else {
510 continue;
511 };
512 match res {
513 TaskResult::Import(res) => {
514 self.finish_import(res).await;
515 }
516 TaskResult::Scope(scope) => {
517 self.temp_tags.end_scope(scope);
518 }
519 TaskResult::Unit(_) => {}
520 }
521 if self.tasks.is_empty() {
522 for tx in self.idle_waiters.drain(..) {
524 tx.send(()).await.ok();
525 }
526 }
527 }
528 }
529 };
530 if let Some(shutdown) = shutdown {
531 shutdown.tx.send(()).await.ok();
532 }
533 }
534}
535
536async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
537 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
538 error!("batch failed: {cause}");
539 }
540 id
541}
542
543async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
544 let BatchMsg { tx, mut rx, .. } = cmd;
545 trace!("created scope {}", id);
546 tx.send(id).await.map_err(api::Error::other)?;
547 while let Some(msg) = rx.recv().await? {
548 match msg {
549 BatchResponse::Drop(msg) => scope.on_drop(&msg),
550 BatchResponse::Ping => {}
551 }
552 }
553 Ok(())
554}
555
556async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
557 let Some(entry) = entry else {
558 let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
559 cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
560 return;
561 };
562 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
563 cmd.tx
564 .send(ExportRangesItem::Error(cause.into()))
565 .await
566 .ok();
567 }
568}
569
570async fn export_ranges_impl(
571 cmd: ExportRangesRequest,
572 tx: &mut mpsc::Sender<ExportRangesItem>,
573 entry: BaoFileHandle,
574) -> io::Result<()> {
575 let ExportRangesRequest { ranges, hash } = cmd;
576 let bitfield = entry.bitfield();
577 trace!(
578 "exporting ranges: {hash} {ranges:?} size={}",
579 bitfield.size()
580 );
581 debug_assert!(entry.hash() == hash, "hash mismatch");
582 let data = entry.data_reader();
583 let size = bitfield.size();
584 for range in ranges.iter() {
585 let range = match range {
586 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
587 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
588 };
589 let requested = ChunkRanges::bytes(range.start..range.end);
590 if !bitfield.ranges.is_superset(&requested) {
591 return Err(io::Error::other(format!(
592 "missing range: {requested:?}, present: {bitfield:?}",
593 )));
594 }
595 let bs = 1024;
596 let mut offset = range.start;
597 loop {
598 let end: u64 = (offset + bs).min(range.end);
599 let size = (end - offset) as usize;
600 tx.send(
601 Leaf {
602 offset,
603 data: data.read_bytes_at(offset, size)?,
604 }
605 .into(),
606 )
607 .await?;
608 offset = end;
609 if offset >= range.end {
610 break;
611 }
612 }
613 }
614 Ok(())
615}
616
617fn chunk_range(leaf: &Leaf) -> ChunkRanges {
618 let start = ChunkNum::chunks(leaf.offset);
619 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
620 (start..end).into()
621}
622
623async fn import_bao(
624 entry: BaoFileHandle,
625 size: NonZeroU64,
626 mut stream: mpsc::Receiver<BaoContentItem>,
627 tx: irpc::channel::oneshot::Sender<api::Result<()>>,
628) {
629 let size = size.get();
630 entry
631 .0
632 .state
633 .send_if_modified(|state: &mut BaoFileStorage| {
634 let BaoFileStorage::Partial(entry) = state else {
635 return false;
637 };
638 entry.size.write(0, size);
639 false
640 });
641 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
642 while let Some(item) = stream.recv().await.unwrap() {
643 entry.0.state.send_if_modified(|state| {
644 let BaoFileStorage::Partial(partial) = state else {
645 return false;
647 };
648 match item {
649 BaoContentItem::Parent(parent) => {
650 if let Some(offset) = tree.pre_order_offset(parent.node) {
651 let mut pair = [0u8; 64];
652 pair[..32].copy_from_slice(parent.pair.0.as_bytes());
653 pair[32..].copy_from_slice(parent.pair.1.as_bytes());
654 partial
655 .outboard
656 .write_at(offset * 64, &pair)
657 .expect("writing to mem can never fail");
658 }
659 false
660 }
661 BaoContentItem::Leaf(leaf) => {
662 let start = leaf.offset;
663 partial
664 .data
665 .write_at(start, &leaf.data)
666 .expect("writing to mem can never fail");
667 let added = chunk_range(&leaf);
668 let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
669 if update.new_state().complete {
670 let data = std::mem::take(&mut partial.data);
671 let outboard = std::mem::take(&mut partial.outboard);
672 let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
673 let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
674 *state = CompleteStorage::new(data, outboard).into();
675 }
676 update.changed()
677 }
678 }
679 });
680 }
681 tx.send(Ok(())).await.ok();
682}
683
684#[instrument(skip_all, fields(hash = tracing::field::Empty))]
685async fn export_bao(
686 entry: Option<BaoFileHandle>,
687 ranges: ChunkRanges,
688 mut sender: mpsc::Sender<EncodedItem>,
689) {
690 let Some(entry) = entry else {
691 let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
692 sender.send(err.into()).await.ok();
693 return;
694 };
695 tracing::Span::current().record("hash", tracing::field::display(entry.hash));
696 let data = entry.data_reader();
697 let outboard = entry.outboard_reader();
698 let tx = BaoTreeSender::new(&mut sender);
699 traverse_ranges_validated(data, outboard, &ranges, tx)
700 .await
701 .ok();
702}
703
704#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
705async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
706 entry.subscribe().forward(tx).await.ok();
707}
708
709async fn import_bytes(
710 data: Bytes,
711 scope: Scope,
712 format: BlobFormat,
713 tx: mpsc::Sender<AddProgressItem>,
714) -> Result<ImportEntry> {
715 tx.send(AddProgressItem::Size(data.len() as u64)).await?;
716 tx.send(AddProgressItem::CopyDone).await?;
717 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
718 Ok(ImportEntry {
719 data,
720 outboard,
721 scope,
722 format,
723 tx,
724 })
725}
726
727async fn import_byte_stream(
728 scope: Scope,
729 format: BlobFormat,
730 mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
731 tx: mpsc::Sender<AddProgressItem>,
732) -> Result<ImportEntry> {
733 let mut res = Vec::new();
734 loop {
735 match rx.recv().await {
736 Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
737 res.extend_from_slice(&data);
738 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
739 .await?;
740 }
741 Ok(Some(ImportByteStreamUpdate::Done)) => {
742 break;
743 }
744 Ok(None) => {
745 return Err(api::Error::io(
746 io::ErrorKind::UnexpectedEof,
747 "byte stream ended unexpectedly",
748 )
749 .into());
750 }
751 Err(e) => {
752 return Err(e.into());
753 }
754 }
755 }
756 import_bytes(res.into(), scope, format, tx).await
757}
758
759#[instrument(skip_all, fields(path = %cmd.path.display()))]
760async fn import_path(cmd: ImportPathMsg) -> Result<ImportEntry> {
761 let ImportPathMsg {
762 inner:
763 ImportPathRequest {
764 path,
765 scope,
766 format,
767 ..
768 },
769 tx,
770 ..
771 } = cmd;
772 let mut res = Vec::new();
773 let mut file = tokio::fs::File::open(path).await?;
774 let mut buf = [0u8; 1024 * 64];
775 loop {
776 let size = file.read(&mut buf).await?;
777 if size == 0 {
778 break;
779 }
780 res.extend_from_slice(&buf[..size]);
781 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
782 .await?;
783 }
784 import_bytes(res.into(), scope, format, tx).await
785}
786
787#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
788async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
789 let ExportPathMsg { inner, mut tx, .. } = cmd;
790 let Some(entry) = entry else {
791 tx.send(ExportProgressItem::Error(api::Error::io(
792 io::ErrorKind::NotFound,
793 "hash not found",
794 )))
795 .await
796 .ok();
797 return;
798 };
799 match export_path_impl(entry, inner, &mut tx).await {
800 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
801 Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
802 };
803}
804
805async fn export_path_impl(
806 entry: BaoFileHandle,
807 cmd: ExportPathRequest,
808 tx: &mut mpsc::Sender<ExportProgressItem>,
809) -> io::Result<()> {
810 let ExportPathRequest { target, .. } = cmd;
811 if !target.is_absolute() {
812 return Err(io::Error::new(
813 io::ErrorKind::InvalidInput,
814 "path is not absolute",
815 ));
816 }
817 if let Some(parent) = target.parent() {
818 std::fs::create_dir_all(parent)?;
819 }
820 let mut file = std::fs::File::create(target)?;
822 let size = entry.0.state.borrow().size();
823 tx.send(ExportProgressItem::Size(size)).await?;
824 let mut buf = [0u8; 1024 * 64];
825 for offset in (0..size).step_by(1024 * 64) {
826 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
827 let buf = &mut buf[..len];
828 entry.0.state.borrow().data().read_exact_at(offset, buf)?;
829 file.write_all(buf)?;
830 tx.try_send(ExportProgressItem::CopyProgress(offset))
831 .await
832 .map_err(|_e| io::Error::other(""))?;
833 yield_now().await;
834 }
835 Ok(())
836}
837
838struct ImportEntry {
839 scope: Scope,
840 format: BlobFormat,
841 data: Bytes,
842 outboard: PreOrderMemOutboard,
843 tx: mpsc::Sender<AddProgressItem>,
844}
845
846pub struct DataReader(BaoFileHandle);
847
848impl ReadBytesAt for DataReader {
849 fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
850 let entry = self.0 .0.state.borrow();
851 entry.data().read_bytes_at(offset, size)
852 }
853}
854
855pub struct OutboardReader {
856 hash: blake3::Hash,
857 tree: BaoTree,
858 data: BaoFileHandle,
859}
860
861impl Outboard for OutboardReader {
862 fn root(&self) -> blake3::Hash {
863 self.hash
864 }
865
866 fn tree(&self) -> BaoTree {
867 self.tree
868 }
869
870 fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
871 let Some(offset) = self.tree.pre_order_offset(node) else {
872 return Ok(None);
873 };
874 let mut buf = [0u8; 64];
875 let size = self
876 .data
877 .0
878 .state
879 .borrow()
880 .outboard()
881 .read_at(offset * 64, &mut buf)?;
882 if size != 64 {
883 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
884 }
885 let left: [u8; 32] = buf[..32].try_into().unwrap();
886 let right: [u8; 32] = buf[32..].try_into().unwrap();
887 Ok(Some((left.into(), right.into())))
888 }
889}
890
891struct State {
892 data: HashMap<Hash, BaoFileHandle>,
893 tags: BTreeMap<Tag, HashAndFormat>,
894 empty_hash: BaoFileHandle,
895}
896
897#[derive(Debug, derive_more::From)]
898pub enum BaoFileStorage {
899 Partial(PartialMemStorage),
900 Complete(CompleteStorage),
901}
902
903impl BaoFileStorage {
904 pub fn bitfield(&self) -> Bitfield {
906 match self {
907 Self::Partial(entry) => entry.bitfield.clone(),
908 Self::Complete(entry) => Bitfield::complete(entry.size()),
909 }
910 }
911}
912
913#[derive(Debug)]
914pub struct BaoFileHandleInner {
915 state: watch::Sender<BaoFileStorage>,
916 hash: Hash,
917}
918
919#[derive(Debug, Clone, derive_more::Deref)]
921pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
922
923impl BaoFileHandle {
924 pub fn new_partial(hash: Hash) -> Self {
925 let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
926 data: SparseMemFile::new(),
927 outboard: SparseMemFile::new(),
928 size: SizeInfo::default(),
929 bitfield: Bitfield::empty(),
930 }));
931 Self(Arc::new(BaoFileHandleInner { state, hash }))
932 }
933
934 pub fn hash(&self) -> Hash {
935 self.hash
936 }
937
938 pub fn bitfield(&self) -> Bitfield {
939 self.0.state.borrow().bitfield()
940 }
941
942 pub fn subscribe(&self) -> BaoFileStorageSubscriber {
943 BaoFileStorageSubscriber::new(self.0.state.subscribe())
944 }
945
946 pub fn data_reader(&self) -> DataReader {
947 DataReader(self.clone())
948 }
949
950 pub fn outboard_reader(&self) -> OutboardReader {
951 let entry = self.0.state.borrow();
952 let hash = self.hash.into();
953 let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
954 OutboardReader {
955 hash,
956 tree,
957 data: self.clone(),
958 }
959 }
960}
961
962impl Default for BaoFileStorage {
963 fn default() -> Self {
964 Self::Partial(Default::default())
965 }
966}
967
968impl BaoFileStorage {
969 fn data(&self) -> &[u8] {
970 match self {
971 Self::Partial(entry) => entry.data.as_ref(),
972 Self::Complete(entry) => &entry.data,
973 }
974 }
975
976 fn outboard(&self) -> &[u8] {
977 match self {
978 Self::Partial(entry) => entry.outboard.as_ref(),
979 Self::Complete(entry) => &entry.outboard,
980 }
981 }
982
983 fn size(&self) -> u64 {
984 match self {
985 Self::Partial(entry) => entry.current_size(),
986 Self::Complete(entry) => entry.size(),
987 }
988 }
989}
990
991#[derive(Debug, Clone)]
992pub struct CompleteStorage {
993 pub(crate) data: Bytes,
994 pub(crate) outboard: Bytes,
995}
996
997impl CompleteStorage {
998 pub fn create(data: Bytes) -> (Hash, Self) {
999 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
1000 let hash = outboard.root().into();
1001 let outboard = outboard.data.into();
1002 let entry = Self::new(data, outboard);
1003 (hash, entry)
1004 }
1005
1006 pub fn new(data: Bytes, outboard: Bytes) -> Self {
1007 Self { data, outboard }
1008 }
1009
1010 pub fn size(&self) -> u64 {
1011 self.data.len() as u64
1012 }
1013}
1014
1015#[allow(dead_code)]
1016fn print_outboard(hashes: &[u8]) {
1017 assert!(hashes.len() % 64 == 0);
1018 for chunk in hashes.chunks(64) {
1019 let left: [u8; 32] = chunk[..32].try_into().unwrap();
1020 let right: [u8; 32] = chunk[32..].try_into().unwrap();
1021 let left = blake3::Hash::from(left);
1022 let right = blake3::Hash::from(right);
1023 println!("l: {left:?}, r: {right:?}");
1024 }
1025}
1026
1027pub struct BaoFileStorageSubscriber {
1028 receiver: watch::Receiver<BaoFileStorage>,
1029}
1030
1031impl BaoFileStorageSubscriber {
1032 pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1033 Self { receiver }
1034 }
1035
1036 pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> Result<()> {
1040 let value = self.receiver.borrow().bitfield();
1041 tx.send(value).await?;
1042 loop {
1043 self.update_or_closed(&mut tx).await?;
1044 let value = self.receiver.borrow().bitfield();
1045 tx.send(value.clone()).await?;
1046 }
1047 }
1048
1049 #[allow(dead_code)]
1053 pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> Result<()> {
1054 let value = self.receiver.borrow().bitfield();
1055 let mut old = value.clone();
1056 tx.send(value).await?;
1057 loop {
1058 self.update_or_closed(&mut tx).await?;
1059 let new = self.receiver.borrow().bitfield();
1060 let diff = old.diff(&new);
1061 if diff.is_empty() {
1062 continue;
1063 }
1064 tx.send(diff).await?;
1065 old = new;
1066 }
1067 }
1068
1069 async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> Result<()> {
1070 tokio::select! {
1071 _ = tx.closed() => {
1072 Err(n0_error::e!(irpc::channel::SendError::ReceiverClosed).into())
1074 }
1075 e = self.receiver.changed() => Ok(e.anyerr()?),
1076 }
1077 }
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082 use n0_future::StreamExt;
1083 use testresult::TestResult;
1084
1085 use super::*;
1086
1087 #[tokio::test]
1088 async fn smoke() -> TestResult<()> {
1089 let store = MemStore::new();
1090 let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1091 let hash = tt.hash();
1092 println!("hash: {hash:?}");
1093 let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1094 while let Some(item) = stream.next().await {
1095 println!("item: {item:?}");
1096 }
1097 let stream = store.export_bao(hash, ChunkRanges::all());
1098 let exported = stream.bao_to_vec().await?;
1099
1100 let store2 = MemStore::new();
1101 let mut or = store2.observe(hash).stream().await?;
1102 tokio::spawn(async move {
1103 while let Some(event) = or.next().await {
1104 println!("event: {event:?}");
1105 }
1106 });
1107 store2
1108 .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1109 .await?;
1110
1111 let exported2 = store2
1112 .export_bao(hash, ChunkRanges::all())
1113 .bao_to_vec()
1114 .await?;
1115 assert_eq!(exported, exported2);
1116
1117 Ok(())
1118 }
1119}