1use std::{
11 collections::{BTreeMap, HashMap, HashSet},
12 future::Future,
13 io::{self, Write},
14 num::NonZeroU64,
15 ops::Deref,
16 sync::Arc,
17 time::SystemTime,
18};
19
20use bao_tree::{
21 blake3,
22 io::{
23 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24 outboard::PreOrderMemOutboard,
25 sync::{Outboard, ReadAt, WriteAt},
26 BaoContentItem, EncodeError, Leaf,
27 },
28 BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35 io::AsyncReadExt,
36 sync::watch,
37 task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43 api::{
44 self,
45 blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46 proto::{
47 BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48 CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49 DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51 ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52 ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53 ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54 SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
55 },
56 tags::TagInfo,
57 ApiClient,
58 },
59 protocol::ChunkRangesExt,
60 store::{
61 util::{SizeInfo, SparseMemFile, Tag},
62 IROH_BLOCK_SIZE,
63 },
64 util::temp_tag::{TagDrop, TempTagScope, TempTags},
65 BlobFormat, Hash, HashAndFormat,
66};
67
68#[derive(Debug, Default)]
69pub struct Options {}
70
71#[derive(Debug, Clone)]
72#[repr(transparent)]
73pub struct MemStore {
74 client: ApiClient,
75}
76
77impl From<MemStore> for crate::api::Store {
78 fn from(value: MemStore) -> Self {
79 crate::api::Store::from_sender(value.client)
80 }
81}
82
83impl AsRef<crate::api::Store> for MemStore {
84 fn as_ref(&self) -> &crate::api::Store {
85 crate::api::Store::ref_from_sender(&self.client)
86 }
87}
88
89impl Deref for MemStore {
90 type Target = crate::api::Store;
91
92 fn deref(&self) -> &Self::Target {
93 crate::api::Store::ref_from_sender(&self.client)
94 }
95}
96
97impl Default for MemStore {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103#[derive(derive_more::From)]
104enum TaskResult {
105 Unit(()),
106 Import(anyhow::Result<ImportEntry>),
107 Scope(Scope),
108}
109
110impl MemStore {
111 pub fn from_sender(client: ApiClient) -> Self {
112 Self { client }
113 }
114
115 pub fn new() -> Self {
116 let (sender, receiver) = tokio::sync::mpsc::channel(32);
117 tokio::spawn(
118 Actor {
119 commands: receiver,
120 tasks: JoinSet::new(),
121 state: State {
122 data: HashMap::new(),
123 tags: BTreeMap::new(),
124 empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
125 },
126 options: Arc::new(Options::default()),
127 temp_tags: Default::default(),
128 protected: Default::default(),
129 idle_waiters: Default::default(),
130 }
131 .run(),
132 );
133 Self::from_sender(sender.into())
134 }
135}
136
137struct Actor {
138 commands: tokio::sync::mpsc::Receiver<Command>,
139 tasks: JoinSet<TaskResult>,
140 state: State,
141 #[allow(dead_code)]
142 options: Arc<Options>,
143 temp_tags: TempTags,
145 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
147 protected: HashSet<Hash>,
148}
149
150impl Actor {
151 fn spawn<F, T>(&mut self, f: F)
152 where
153 F: Future<Output = T> + Send + 'static,
154 T: Into<TaskResult>,
155 {
156 let span = tracing::Span::current();
157 let fut = async move { f.await.into() }.instrument(span);
158 self.tasks.spawn(fut);
159 }
160
161 async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
162 match cmd {
163 Command::ImportBao(ImportBaoMsg {
164 inner: ImportBaoRequest { hash, size },
165 rx: data,
166 tx,
167 ..
168 }) => {
169 let entry = self.get_or_create_entry(hash);
170 self.spawn(import_bao(entry, size, data, tx));
171 }
172 Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
173 trace!("wait idle");
174 if self.tasks.is_empty() {
175 tx.send(()).await.ok();
177 } else {
178 self.idle_waiters.push(tx);
180 }
181 }
182 Command::Observe(ObserveMsg {
183 inner: ObserveRequest { hash },
184 tx,
185 ..
186 }) => {
187 let entry = self.get_or_create_entry(hash);
188 self.spawn(observe(entry, tx));
189 }
190 Command::ImportBytes(ImportBytesMsg {
191 inner:
192 ImportBytesRequest {
193 data,
194 scope,
195 format,
196 ..
197 },
198 tx,
199 ..
200 }) => {
201 self.spawn(import_bytes(data, scope, format, tx));
202 }
203 Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
204 self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
205 }
206 Command::ImportPath(cmd) => {
207 self.spawn(import_path(cmd));
208 }
209 Command::ExportBao(ExportBaoMsg {
210 inner: ExportBaoRequest { hash, ranges },
211 tx,
212 ..
213 }) => {
214 let entry = self.get(&hash);
215 self.spawn(export_bao(entry, ranges, tx))
216 }
217 Command::ExportPath(cmd) => {
218 let entry = self.get(&cmd.hash);
219 self.spawn(export_path(entry, cmd));
220 }
221 Command::DeleteTags(cmd) => {
222 let DeleteTagsMsg {
223 inner: DeleteTagsRequest { from, to },
224 tx,
225 ..
226 } = cmd;
227 info!("deleting tags from {:?} to {:?}", from, to);
228 let mut deleted = 0;
231 self.state.tags.retain(|tag, _| {
232 if let Some(from) = &from {
233 if tag < from {
234 return true;
235 }
236 }
237 if let Some(to) = &to {
238 if tag >= to {
239 return true;
240 }
241 }
242 info!(" removing {:?}", tag);
243 deleted += 1;
244 false
245 });
246 tx.send(Ok(deleted)).await.ok();
247 }
248 Command::RenameTag(cmd) => {
249 let RenameTagMsg {
250 inner: RenameTagRequest { from, to },
251 tx,
252 ..
253 } = cmd;
254 let tags = &mut self.state.tags;
255 let value = match tags.remove(&from) {
256 Some(value) => value,
257 None => {
258 tx.send(Err(api::Error::io(
259 io::ErrorKind::NotFound,
260 format!("tag not found: {from:?}"),
261 )))
262 .await
263 .ok();
264 return None;
265 }
266 };
267 tags.insert(to, value);
268 tx.send(Ok(())).await.ok();
269 return None;
270 }
271 Command::ListTags(cmd) => {
272 let ListTagsMsg {
273 inner:
274 ListTagsRequest {
275 from,
276 to,
277 raw,
278 hash_seq,
279 },
280 tx,
281 ..
282 } = cmd;
283 let tags = self
284 .state
285 .tags
286 .iter()
287 .filter(move |(tag, value)| {
288 if let Some(from) = &from {
289 if tag < &from {
290 return false;
291 }
292 }
293 if let Some(to) = &to {
294 if tag >= &to {
295 return false;
296 }
297 }
298 raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
299 })
300 .map(|(tag, value)| TagInfo {
301 name: tag.clone(),
302 hash: value.hash,
303 format: value.format,
304 })
305 .map(Ok);
306 tx.send(tags.collect()).await.ok();
307 }
308 Command::SetTag(SetTagMsg {
309 inner: SetTagRequest { name: tag, value },
310 tx,
311 ..
312 }) => {
313 self.state.tags.insert(tag, value);
314 tx.send(Ok(())).await.ok();
315 }
316 Command::CreateTag(CreateTagMsg {
317 inner: CreateTagRequest { value },
318 tx,
319 ..
320 }) => {
321 let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
322 self.state.tags.insert(tag.clone(), value);
323 tx.send(Ok(tag)).await.ok();
324 }
325 Command::CreateTempTag(cmd) => {
326 trace!("{cmd:?}");
327 self.create_temp_tag(cmd).await;
328 }
329 Command::ListTempTags(cmd) => {
330 trace!("{cmd:?}");
331 let tts = self.temp_tags.list();
332 cmd.tx.send(tts).await.ok();
333 }
334 Command::ListBlobs(cmd) => {
335 let ListBlobsMsg { tx, .. } = cmd;
336 let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
337 self.spawn(async move {
338 for blob in blobs {
339 if tx.send(Ok(blob)).await.is_err() {
340 break;
341 }
342 }
343 });
344 }
345 Command::BlobStatus(cmd) => {
346 trace!("{cmd:?}");
347 let BlobStatusMsg {
348 inner: BlobStatusRequest { hash },
349 tx,
350 ..
351 } = cmd;
352 let res = match self.get(&hash) {
353 None => api::blobs::BlobStatus::NotFound,
354 Some(x) => {
355 let bitfield = x.0.state.borrow().bitfield();
356 if bitfield.is_complete() {
357 BlobStatus::Complete {
358 size: bitfield.size,
359 }
360 } else {
361 BlobStatus::Partial {
362 size: bitfield.validated_size(),
363 }
364 }
365 }
366 };
367 tx.send(res).await.ok();
368 }
369 Command::DeleteBlobs(cmd) => {
370 trace!("{cmd:?}");
371 let DeleteBlobsMsg {
372 inner: BlobDeleteRequest { hashes, force },
373 tx,
374 ..
375 } = cmd;
376 for hash in hashes {
377 if !force && self.protected.contains(&hash) {
378 continue;
379 }
380 self.state.data.remove(&hash);
381 }
382 tx.send(Ok(())).await.ok();
383 }
384 Command::Batch(cmd) => {
385 trace!("{cmd:?}");
386 let (id, scope) = self.temp_tags.create_scope();
387 self.spawn(handle_batch(cmd, id, scope));
388 }
389 Command::ClearProtected(cmd) => {
390 self.protected.clear();
391 cmd.tx.send(Ok(())).await.ok();
392 }
393 Command::ExportRanges(cmd) => {
394 let entry = self.get(&cmd.hash);
395 self.spawn(export_ranges(cmd, entry));
396 }
397 Command::SyncDb(SyncDbMsg { tx, .. }) => {
398 tx.send(Ok(())).await.ok();
399 }
400 Command::Shutdown(cmd) => {
401 return Some(cmd);
402 }
403 }
404 None
405 }
406
407 fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
408 if *hash == Hash::EMPTY {
409 Some(self.state.empty_hash.clone())
410 } else {
411 self.state.data.get(hash).cloned()
412 }
413 }
414
415 fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
416 if hash == Hash::EMPTY {
417 self.state.empty_hash.clone()
418 } else {
419 self.state
420 .data
421 .entry(hash)
422 .or_insert_with(|| BaoFileHandle::new_partial(hash))
423 .clone()
424 }
425 }
426
427 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
428 let CreateTempTagMsg { tx, inner, .. } = cmd;
429 let mut tt = self.temp_tags.create(inner.scope, inner.value);
430 if tx.is_rpc() {
431 tt.leak();
432 }
433 tx.send(tt).await.ok();
434 }
435
436 async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
437 let import_data = match res {
438 Ok(entry) => entry,
439 Err(e) => {
440 error!("import failed: {e}");
441 return;
442 }
443 };
444 let hash = import_data.outboard.root().into();
445 let entry = self.get_or_create_entry(hash);
446 entry
447 .0
448 .state
449 .send_if_modified(|state: &mut BaoFileStorage| {
450 let BaoFileStorage::Partial(_) = state.deref() else {
451 return false;
452 };
453 *state =
454 CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
455 true
456 });
457 let tt = self.temp_tags.create(
458 import_data.scope,
459 HashAndFormat {
460 hash,
461 format: import_data.format,
462 },
463 );
464 import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
465 }
466
467 fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
468 match res {
469 Ok(x) => Some(x),
470 Err(e) => {
471 if e.is_cancelled() {
472 trace!("task cancelled: {e}");
473 } else {
474 error!("task failed: {e}");
475 }
476 None
477 }
478 }
479 }
480
481 pub async fn run(mut self) {
482 let shutdown = loop {
483 tokio::select! {
484 cmd = self.commands.recv() => {
485 let Some(cmd) = cmd else {
486 break None;
489 };
490 if let Some(cmd) = self.handle_command(cmd).await {
491 break Some(cmd);
492 }
493 }
494 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
495 let Some(res) = self.log_task_result(res) else {
496 continue;
497 };
498 match res {
499 TaskResult::Import(res) => {
500 self.finish_import(res).await;
501 }
502 TaskResult::Scope(scope) => {
503 self.temp_tags.end_scope(scope);
504 }
505 TaskResult::Unit(_) => {}
506 }
507 if self.tasks.is_empty() {
508 for tx in self.idle_waiters.drain(..) {
510 tx.send(()).await.ok();
511 }
512 }
513 }
514 }
515 };
516 if let Some(shutdown) = shutdown {
517 shutdown.tx.send(()).await.ok();
518 }
519 }
520}
521
522async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
523 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
524 error!("batch failed: {cause}");
525 }
526 id
527}
528
529async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
530 let BatchMsg { tx, mut rx, .. } = cmd;
531 trace!("created scope {}", id);
532 tx.send(id).await.map_err(api::Error::other)?;
533 while let Some(msg) = rx.recv().await? {
534 match msg {
535 BatchResponse::Drop(msg) => scope.on_drop(&msg),
536 BatchResponse::Ping => {}
537 }
538 }
539 Ok(())
540}
541
542async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
543 let Some(entry) = entry else {
544 let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
545 cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
546 return;
547 };
548 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
549 cmd.tx
550 .send(ExportRangesItem::Error(cause.into()))
551 .await
552 .ok();
553 }
554}
555
556async fn export_ranges_impl(
557 cmd: ExportRangesRequest,
558 tx: &mut mpsc::Sender<ExportRangesItem>,
559 entry: BaoFileHandle,
560) -> io::Result<()> {
561 let ExportRangesRequest { ranges, hash } = cmd;
562 let bitfield = entry.bitfield();
563 trace!(
564 "exporting ranges: {hash} {ranges:?} size={}",
565 bitfield.size()
566 );
567 debug_assert!(entry.hash() == hash, "hash mismatch");
568 let data = entry.data_reader();
569 let size = bitfield.size();
570 for range in ranges.iter() {
571 let range = match range {
572 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
573 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
574 };
575 let requested = ChunkRanges::bytes(range.start..range.end);
576 if !bitfield.ranges.is_superset(&requested) {
577 return Err(io::Error::other(format!(
578 "missing range: {requested:?}, present: {bitfield:?}",
579 )));
580 }
581 let bs = 1024;
582 let mut offset = range.start;
583 loop {
584 let end: u64 = (offset + bs).min(range.end);
585 let size = (end - offset) as usize;
586 tx.send(
587 Leaf {
588 offset,
589 data: data.read_bytes_at(offset, size)?,
590 }
591 .into(),
592 )
593 .await?;
594 offset = end;
595 if offset >= range.end {
596 break;
597 }
598 }
599 }
600 Ok(())
601}
602
603fn chunk_range(leaf: &Leaf) -> ChunkRanges {
604 let start = ChunkNum::chunks(leaf.offset);
605 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
606 (start..end).into()
607}
608
609async fn import_bao(
610 entry: BaoFileHandle,
611 size: NonZeroU64,
612 mut stream: mpsc::Receiver<BaoContentItem>,
613 tx: irpc::channel::oneshot::Sender<api::Result<()>>,
614) {
615 let size = size.get();
616 entry
617 .0
618 .state
619 .send_if_modified(|state: &mut BaoFileStorage| {
620 let BaoFileStorage::Partial(entry) = state else {
621 return false;
623 };
624 entry.size.write(0, size);
625 false
626 });
627 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
628 while let Some(item) = stream.recv().await.unwrap() {
629 entry.0.state.send_if_modified(|state| {
630 let BaoFileStorage::Partial(partial) = state else {
631 return false;
633 };
634 match item {
635 BaoContentItem::Parent(parent) => {
636 if let Some(offset) = tree.pre_order_offset(parent.node) {
637 let mut pair = [0u8; 64];
638 pair[..32].copy_from_slice(parent.pair.0.as_bytes());
639 pair[32..].copy_from_slice(parent.pair.1.as_bytes());
640 partial
641 .outboard
642 .write_at(offset * 64, &pair)
643 .expect("writing to mem can never fail");
644 }
645 false
646 }
647 BaoContentItem::Leaf(leaf) => {
648 let start = leaf.offset;
649 partial
650 .data
651 .write_at(start, &leaf.data)
652 .expect("writing to mem can never fail");
653 let added = chunk_range(&leaf);
654 let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
655 if update.new_state().complete {
656 let data = std::mem::take(&mut partial.data);
657 let outboard = std::mem::take(&mut partial.outboard);
658 let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
659 let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
660 *state = CompleteStorage::new(data, outboard).into();
661 }
662 update.changed()
663 }
664 }
665 });
666 }
667 tx.send(Ok(())).await.ok();
668}
669
670#[instrument(skip_all, fields(hash = tracing::field::Empty))]
671async fn export_bao(
672 entry: Option<BaoFileHandle>,
673 ranges: ChunkRanges,
674 mut sender: mpsc::Sender<EncodedItem>,
675) {
676 let Some(entry) = entry else {
677 let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
678 sender.send(err.into()).await.ok();
679 return;
680 };
681 tracing::Span::current().record("hash", tracing::field::display(entry.hash));
682 let data = entry.data_reader();
683 let outboard = entry.outboard_reader();
684 let tx = BaoTreeSender::new(&mut sender);
685 traverse_ranges_validated(data, outboard, &ranges, tx)
686 .await
687 .ok();
688}
689
690#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
691async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
692 entry.subscribe().forward(tx).await.ok();
693}
694
695async fn import_bytes(
696 data: Bytes,
697 scope: Scope,
698 format: BlobFormat,
699 tx: mpsc::Sender<AddProgressItem>,
700) -> anyhow::Result<ImportEntry> {
701 tx.send(AddProgressItem::Size(data.len() as u64)).await?;
702 tx.send(AddProgressItem::CopyDone).await?;
703 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
704 Ok(ImportEntry {
705 data,
706 outboard,
707 scope,
708 format,
709 tx,
710 })
711}
712
713async fn import_byte_stream(
714 scope: Scope,
715 format: BlobFormat,
716 mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
717 tx: mpsc::Sender<AddProgressItem>,
718) -> anyhow::Result<ImportEntry> {
719 let mut res = Vec::new();
720 loop {
721 match rx.recv().await {
722 Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
723 res.extend_from_slice(&data);
724 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
725 .await?;
726 }
727 Ok(Some(ImportByteStreamUpdate::Done)) => {
728 break;
729 }
730 Ok(None) => {
731 return Err(api::Error::io(
732 io::ErrorKind::UnexpectedEof,
733 "byte stream ended unexpectedly",
734 )
735 .into());
736 }
737 Err(e) => {
738 return Err(e.into());
739 }
740 }
741 }
742 import_bytes(res.into(), scope, format, tx).await
743}
744
745#[instrument(skip_all, fields(path = %cmd.path.display()))]
746async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
747 let ImportPathMsg {
748 inner:
749 ImportPathRequest {
750 path,
751 scope,
752 format,
753 ..
754 },
755 tx,
756 ..
757 } = cmd;
758 let mut res = Vec::new();
759 let mut file = tokio::fs::File::open(path).await?;
760 let mut buf = [0u8; 1024 * 64];
761 loop {
762 let size = file.read(&mut buf).await?;
763 if size == 0 {
764 break;
765 }
766 res.extend_from_slice(&buf[..size]);
767 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
768 .await?;
769 }
770 import_bytes(res.into(), scope, format, tx).await
771}
772
773#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
774async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
775 let ExportPathMsg { inner, mut tx, .. } = cmd;
776 let Some(entry) = entry else {
777 tx.send(ExportProgressItem::Error(api::Error::io(
778 io::ErrorKind::NotFound,
779 "hash not found",
780 )))
781 .await
782 .ok();
783 return;
784 };
785 match export_path_impl(entry, inner, &mut tx).await {
786 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
787 Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
788 };
789}
790
791async fn export_path_impl(
792 entry: BaoFileHandle,
793 cmd: ExportPathRequest,
794 tx: &mut mpsc::Sender<ExportProgressItem>,
795) -> io::Result<()> {
796 let ExportPathRequest { target, .. } = cmd;
797 if !target.is_absolute() {
798 return Err(io::Error::new(
799 io::ErrorKind::InvalidInput,
800 "path is not absolute",
801 ));
802 }
803 if let Some(parent) = target.parent() {
804 std::fs::create_dir_all(parent)?;
805 }
806 let mut file = std::fs::File::create(target)?;
808 let size = entry.0.state.borrow().size();
809 tx.send(ExportProgressItem::Size(size)).await?;
810 let mut buf = [0u8; 1024 * 64];
811 for offset in (0..size).step_by(1024 * 64) {
812 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
813 let buf = &mut buf[..len];
814 entry.0.state.borrow().data().read_exact_at(offset, buf)?;
815 file.write_all(buf)?;
816 tx.try_send(ExportProgressItem::CopyProgress(offset))
817 .await
818 .map_err(|_e| io::Error::other(""))?;
819 yield_now().await;
820 }
821 Ok(())
822}
823
824struct ImportEntry {
825 scope: Scope,
826 format: BlobFormat,
827 data: Bytes,
828 outboard: PreOrderMemOutboard,
829 tx: mpsc::Sender<AddProgressItem>,
830}
831
832pub struct DataReader(BaoFileHandle);
833
834impl ReadBytesAt for DataReader {
835 fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
836 let entry = self.0 .0.state.borrow();
837 entry.data().read_bytes_at(offset, size)
838 }
839}
840
841pub struct OutboardReader {
842 hash: blake3::Hash,
843 tree: BaoTree,
844 data: BaoFileHandle,
845}
846
847impl Outboard for OutboardReader {
848 fn root(&self) -> blake3::Hash {
849 self.hash
850 }
851
852 fn tree(&self) -> BaoTree {
853 self.tree
854 }
855
856 fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
857 let Some(offset) = self.tree.pre_order_offset(node) else {
858 return Ok(None);
859 };
860 let mut buf = [0u8; 64];
861 let size = self
862 .data
863 .0
864 .state
865 .borrow()
866 .outboard()
867 .read_at(offset * 64, &mut buf)?;
868 if size != 64 {
869 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
870 }
871 let left: [u8; 32] = buf[..32].try_into().unwrap();
872 let right: [u8; 32] = buf[32..].try_into().unwrap();
873 Ok(Some((left.into(), right.into())))
874 }
875}
876
877struct State {
878 data: HashMap<Hash, BaoFileHandle>,
879 tags: BTreeMap<Tag, HashAndFormat>,
880 empty_hash: BaoFileHandle,
881}
882
883#[derive(Debug, derive_more::From)]
884pub enum BaoFileStorage {
885 Partial(PartialMemStorage),
886 Complete(CompleteStorage),
887}
888
889impl BaoFileStorage {
890 pub fn bitfield(&self) -> Bitfield {
892 match self {
893 Self::Partial(entry) => entry.bitfield.clone(),
894 Self::Complete(entry) => Bitfield::complete(entry.size()),
895 }
896 }
897}
898
899#[derive(Debug)]
900pub struct BaoFileHandleInner {
901 state: watch::Sender<BaoFileStorage>,
902 hash: Hash,
903}
904
905#[derive(Debug, Clone, derive_more::Deref)]
907pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
908
909impl BaoFileHandle {
910 pub fn new_partial(hash: Hash) -> Self {
911 let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
912 data: SparseMemFile::new(),
913 outboard: SparseMemFile::new(),
914 size: SizeInfo::default(),
915 bitfield: Bitfield::empty(),
916 }));
917 Self(Arc::new(BaoFileHandleInner { state, hash }))
918 }
919
920 pub fn hash(&self) -> Hash {
921 self.hash
922 }
923
924 pub fn bitfield(&self) -> Bitfield {
925 self.0.state.borrow().bitfield()
926 }
927
928 pub fn subscribe(&self) -> BaoFileStorageSubscriber {
929 BaoFileStorageSubscriber::new(self.0.state.subscribe())
930 }
931
932 pub fn data_reader(&self) -> DataReader {
933 DataReader(self.clone())
934 }
935
936 pub fn outboard_reader(&self) -> OutboardReader {
937 let entry = self.0.state.borrow();
938 let hash = self.hash.into();
939 let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
940 OutboardReader {
941 hash,
942 tree,
943 data: self.clone(),
944 }
945 }
946}
947
948impl Default for BaoFileStorage {
949 fn default() -> Self {
950 Self::Partial(Default::default())
951 }
952}
953
954impl BaoFileStorage {
955 fn data(&self) -> &[u8] {
956 match self {
957 Self::Partial(entry) => entry.data.as_ref(),
958 Self::Complete(entry) => &entry.data,
959 }
960 }
961
962 fn outboard(&self) -> &[u8] {
963 match self {
964 Self::Partial(entry) => entry.outboard.as_ref(),
965 Self::Complete(entry) => &entry.outboard,
966 }
967 }
968
969 fn size(&self) -> u64 {
970 match self {
971 Self::Partial(entry) => entry.current_size(),
972 Self::Complete(entry) => entry.size(),
973 }
974 }
975}
976
977#[derive(Debug, Clone)]
978pub struct CompleteStorage {
979 pub(crate) data: Bytes,
980 pub(crate) outboard: Bytes,
981}
982
983impl CompleteStorage {
984 pub fn create(data: Bytes) -> (Hash, Self) {
985 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
986 let hash = outboard.root().into();
987 let outboard = outboard.data.into();
988 let entry = Self::new(data, outboard);
989 (hash, entry)
990 }
991
992 pub fn new(data: Bytes, outboard: Bytes) -> Self {
993 Self { data, outboard }
994 }
995
996 pub fn size(&self) -> u64 {
997 self.data.len() as u64
998 }
999}
1000
1001#[allow(dead_code)]
1002fn print_outboard(hashes: &[u8]) {
1003 assert!(hashes.len() % 64 == 0);
1004 for chunk in hashes.chunks(64) {
1005 let left: [u8; 32] = chunk[..32].try_into().unwrap();
1006 let right: [u8; 32] = chunk[32..].try_into().unwrap();
1007 let left = blake3::Hash::from(left);
1008 let right = blake3::Hash::from(right);
1009 println!("l: {left:?}, r: {right:?}");
1010 }
1011}
1012
1013pub struct BaoFileStorageSubscriber {
1014 receiver: watch::Receiver<BaoFileStorage>,
1015}
1016
1017impl BaoFileStorageSubscriber {
1018 pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1019 Self { receiver }
1020 }
1021
1022 pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1026 let value = self.receiver.borrow().bitfield();
1027 tx.send(value).await?;
1028 loop {
1029 self.update_or_closed(&mut tx).await?;
1030 let value = self.receiver.borrow().bitfield();
1031 tx.send(value.clone()).await?;
1032 }
1033 }
1034
1035 #[allow(dead_code)]
1039 pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1040 let value = self.receiver.borrow().bitfield();
1041 let mut old = value.clone();
1042 tx.send(value).await?;
1043 loop {
1044 self.update_or_closed(&mut tx).await?;
1045 let new = self.receiver.borrow().bitfield();
1046 let diff = old.diff(&new);
1047 if diff.is_empty() {
1048 continue;
1049 }
1050 tx.send(diff).await?;
1051 old = new;
1052 }
1053 }
1054
1055 async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1056 tokio::select! {
1057 _ = tx.closed() => {
1058 Err(irpc::channel::SendError::ReceiverClosed.into())
1060 }
1061 e = self.receiver.changed() => Ok(e?),
1062 }
1063 }
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068 use n0_future::StreamExt;
1069 use testresult::TestResult;
1070
1071 use super::*;
1072
1073 #[tokio::test]
1074 async fn smoke() -> TestResult<()> {
1075 let store = MemStore::new();
1076 let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1077 let hash = *tt.hash();
1078 println!("hash: {hash:?}");
1079 let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1080 while let Some(item) = stream.next().await {
1081 println!("item: {item:?}");
1082 }
1083 let stream = store.export_bao(hash, ChunkRanges::all());
1084 let exported = stream.bao_to_vec().await?;
1085
1086 let store2 = MemStore::new();
1087 let mut or = store2.observe(hash).stream().await?;
1088 tokio::spawn(async move {
1089 while let Some(event) = or.next().await {
1090 println!("event: {event:?}");
1091 }
1092 });
1093 store2
1094 .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1095 .await?;
1096
1097 let exported2 = store2
1098 .export_bao(hash, ChunkRanges::all())
1099 .bao_to_vec()
1100 .await?;
1101 assert_eq!(exported, exported2);
1102
1103 Ok(())
1104 }
1105}