1use std::{
11 collections::{BTreeMap, HashMap, HashSet},
12 future::Future,
13 io::{self, Write},
14 num::NonZeroU64,
15 ops::Deref,
16 sync::Arc,
17 time::SystemTime,
18};
19
20use bao_tree::{
21 blake3,
22 io::{
23 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24 outboard::PreOrderMemOutboard,
25 sync::{Outboard, ReadAt, WriteAt},
26 BaoContentItem, EncodeError, Leaf,
27 },
28 BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35 io::AsyncReadExt,
36 sync::watch,
37 task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43 api::{
44 self,
45 blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46 proto::{
47 BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48 CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49 DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51 ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52 ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53 ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54 SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
55 },
56 tags::TagInfo,
57 ApiClient,
58 },
59 protocol::ChunkRangesExt,
60 store::{
61 util::{SizeInfo, SparseMemFile, Tag},
62 IROH_BLOCK_SIZE,
63 },
64 util::temp_tag::{TagDrop, TempTagScope, TempTags},
65 BlobFormat, Hash, HashAndFormat,
66};
67
68#[derive(Debug, Default)]
69pub struct Options {}
70
71#[derive(Debug, Clone)]
72#[repr(transparent)]
73pub struct MemStore {
74 client: ApiClient,
75}
76
77impl From<MemStore> for crate::api::Store {
78 fn from(value: MemStore) -> Self {
79 crate::api::Store::from_sender(value.client)
80 }
81}
82
83impl AsRef<crate::api::Store> for MemStore {
84 fn as_ref(&self) -> &crate::api::Store {
85 crate::api::Store::ref_from_sender(&self.client)
86 }
87}
88
89impl Deref for MemStore {
90 type Target = crate::api::Store;
91
92 fn deref(&self) -> &Self::Target {
93 crate::api::Store::ref_from_sender(&self.client)
94 }
95}
96
97impl Default for MemStore {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103#[derive(derive_more::From)]
104enum TaskResult {
105 Unit(()),
106 Import(anyhow::Result<ImportEntry>),
107 Scope(Scope),
108}
109
110impl MemStore {
111 pub fn from_sender(client: ApiClient) -> Self {
112 Self { client }
113 }
114
115 pub fn new() -> Self {
116 let (sender, receiver) = tokio::sync::mpsc::channel(32);
117 tokio::spawn(
118 Actor {
119 commands: receiver,
120 tasks: JoinSet::new(),
121 state: State {
122 data: HashMap::new(),
123 tags: BTreeMap::new(),
124 empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
125 },
126 options: Arc::new(Options::default()),
127 temp_tags: Default::default(),
128 protected: Default::default(),
129 idle_waiters: Default::default(),
130 }
131 .run(),
132 );
133 Self::from_sender(sender.into())
134 }
135}
136
137struct Actor {
138 commands: tokio::sync::mpsc::Receiver<Command>,
139 tasks: JoinSet<TaskResult>,
140 state: State,
141 #[allow(dead_code)]
142 options: Arc<Options>,
143 temp_tags: TempTags,
145 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
147 protected: HashSet<Hash>,
148}
149
150impl Actor {
151 fn spawn<F, T>(&mut self, f: F)
152 where
153 F: Future<Output = T> + Send + 'static,
154 T: Into<TaskResult>,
155 {
156 let span = tracing::Span::current();
157 let fut = async move { f.await.into() }.instrument(span);
158 self.tasks.spawn(fut);
159 }
160
161 async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
162 match cmd {
163 Command::ImportBao(ImportBaoMsg {
164 inner: ImportBaoRequest { hash, size },
165 rx: data,
166 tx,
167 ..
168 }) => {
169 let entry = self.get_or_create_entry(hash);
170 self.spawn(import_bao(entry, size, data, tx));
171 }
172 Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
173 trace!("wait idle");
174 if self.tasks.is_empty() {
175 tx.send(()).await.ok();
177 } else {
178 self.idle_waiters.push(tx);
180 }
181 }
182 Command::Observe(ObserveMsg {
183 inner: ObserveRequest { hash },
184 tx,
185 ..
186 }) => {
187 let entry = self.get_or_create_entry(hash);
188 self.spawn(observe(entry, tx));
189 }
190 Command::ImportBytes(ImportBytesMsg {
191 inner:
192 ImportBytesRequest {
193 data,
194 scope,
195 format,
196 ..
197 },
198 tx,
199 ..
200 }) => {
201 self.spawn(import_bytes(data, scope, format, tx));
202 }
203 Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
204 self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
205 }
206 Command::ImportPath(cmd) => {
207 self.spawn(import_path(cmd));
208 }
209 Command::ExportBao(ExportBaoMsg {
210 inner: ExportBaoRequest { hash, ranges },
211 tx,
212 ..
213 }) => {
214 let entry = self.get(&hash);
215 self.spawn(export_bao(entry, ranges, tx))
216 }
217 Command::ExportPath(cmd) => {
218 let entry = self.get(&cmd.hash);
219 self.spawn(export_path(entry, cmd));
220 }
221 Command::DeleteTags(cmd) => {
222 let DeleteTagsMsg {
223 inner: DeleteTagsRequest { from, to },
224 tx,
225 ..
226 } = cmd;
227 info!("deleting tags from {:?} to {:?}", from, to);
228 self.state.tags.retain(|tag, _| {
231 if let Some(from) = &from {
232 if tag < from {
233 return true;
234 }
235 }
236 if let Some(to) = &to {
237 if tag >= to {
238 return true;
239 }
240 }
241 info!(" removing {:?}", tag);
242 false
243 });
244 tx.send(Ok(())).await.ok();
245 }
246 Command::RenameTag(cmd) => {
247 let RenameTagMsg {
248 inner: RenameTagRequest { from, to },
249 tx,
250 ..
251 } = cmd;
252 let tags = &mut self.state.tags;
253 let value = match tags.remove(&from) {
254 Some(value) => value,
255 None => {
256 tx.send(Err(api::Error::io(
257 io::ErrorKind::NotFound,
258 format!("tag not found: {from:?}"),
259 )))
260 .await
261 .ok();
262 return None;
263 }
264 };
265 tags.insert(to, value);
266 tx.send(Ok(())).await.ok();
267 return None;
268 }
269 Command::ListTags(cmd) => {
270 let ListTagsMsg {
271 inner:
272 ListTagsRequest {
273 from,
274 to,
275 raw,
276 hash_seq,
277 },
278 tx,
279 ..
280 } = cmd;
281 let tags = self
282 .state
283 .tags
284 .iter()
285 .filter(move |(tag, value)| {
286 if let Some(from) = &from {
287 if tag < &from {
288 return false;
289 }
290 }
291 if let Some(to) = &to {
292 if tag >= &to {
293 return false;
294 }
295 }
296 raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
297 })
298 .map(|(tag, value)| TagInfo {
299 name: tag.clone(),
300 hash: value.hash,
301 format: value.format,
302 })
303 .map(Ok);
304 tx.send(tags.collect()).await.ok();
305 }
306 Command::SetTag(SetTagMsg {
307 inner: SetTagRequest { name: tag, value },
308 tx,
309 ..
310 }) => {
311 self.state.tags.insert(tag, value);
312 tx.send(Ok(())).await.ok();
313 }
314 Command::CreateTag(CreateTagMsg {
315 inner: CreateTagRequest { value },
316 tx,
317 ..
318 }) => {
319 let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
320 self.state.tags.insert(tag.clone(), value);
321 tx.send(Ok(tag)).await.ok();
322 }
323 Command::CreateTempTag(cmd) => {
324 trace!("{cmd:?}");
325 self.create_temp_tag(cmd).await;
326 }
327 Command::ListTempTags(cmd) => {
328 trace!("{cmd:?}");
329 let tts = self.temp_tags.list();
330 cmd.tx.send(tts).await.ok();
331 }
332 Command::ListBlobs(cmd) => {
333 let ListBlobsMsg { tx, .. } = cmd;
334 let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
335 self.spawn(async move {
336 for blob in blobs {
337 if tx.send(Ok(blob)).await.is_err() {
338 break;
339 }
340 }
341 });
342 }
343 Command::BlobStatus(cmd) => {
344 trace!("{cmd:?}");
345 let BlobStatusMsg {
346 inner: BlobStatusRequest { hash },
347 tx,
348 ..
349 } = cmd;
350 let res = match self.get(&hash) {
351 None => api::blobs::BlobStatus::NotFound,
352 Some(x) => {
353 let bitfield = x.0.state.borrow().bitfield();
354 if bitfield.is_complete() {
355 BlobStatus::Complete {
356 size: bitfield.size,
357 }
358 } else {
359 BlobStatus::Partial {
360 size: bitfield.validated_size(),
361 }
362 }
363 }
364 };
365 tx.send(res).await.ok();
366 }
367 Command::DeleteBlobs(cmd) => {
368 trace!("{cmd:?}");
369 let DeleteBlobsMsg {
370 inner: BlobDeleteRequest { hashes, force },
371 tx,
372 ..
373 } = cmd;
374 for hash in hashes {
375 if !force && self.protected.contains(&hash) {
376 continue;
377 }
378 self.state.data.remove(&hash);
379 }
380 tx.send(Ok(())).await.ok();
381 }
382 Command::Batch(cmd) => {
383 trace!("{cmd:?}");
384 let (id, scope) = self.temp_tags.create_scope();
385 self.spawn(handle_batch(cmd, id, scope));
386 }
387 Command::ClearProtected(cmd) => {
388 self.protected.clear();
389 cmd.tx.send(Ok(())).await.ok();
390 }
391 Command::ExportRanges(cmd) => {
392 let entry = self.get(&cmd.hash);
393 self.spawn(export_ranges(cmd, entry));
394 }
395 Command::SyncDb(SyncDbMsg { tx, .. }) => {
396 tx.send(Ok(())).await.ok();
397 }
398 Command::Shutdown(cmd) => {
399 return Some(cmd);
400 }
401 }
402 None
403 }
404
405 fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
406 if *hash == Hash::EMPTY {
407 Some(self.state.empty_hash.clone())
408 } else {
409 self.state.data.get(hash).cloned()
410 }
411 }
412
413 fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
414 if hash == Hash::EMPTY {
415 self.state.empty_hash.clone()
416 } else {
417 self.state
418 .data
419 .entry(hash)
420 .or_insert_with(|| BaoFileHandle::new_partial(hash))
421 .clone()
422 }
423 }
424
425 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
426 let CreateTempTagMsg { tx, inner, .. } = cmd;
427 let mut tt = self.temp_tags.create(inner.scope, inner.value);
428 if tx.is_rpc() {
429 tt.leak();
430 }
431 tx.send(tt).await.ok();
432 }
433
434 async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
435 let import_data = match res {
436 Ok(entry) => entry,
437 Err(e) => {
438 error!("import failed: {e}");
439 return;
440 }
441 };
442 let hash = import_data.outboard.root().into();
443 let entry = self.get_or_create_entry(hash);
444 entry
445 .0
446 .state
447 .send_if_modified(|state: &mut BaoFileStorage| {
448 let BaoFileStorage::Partial(_) = state.deref() else {
449 return false;
450 };
451 *state =
452 CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
453 true
454 });
455 let tt = self.temp_tags.create(
456 import_data.scope,
457 HashAndFormat {
458 hash,
459 format: import_data.format,
460 },
461 );
462 import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
463 }
464
465 fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
466 match res {
467 Ok(x) => Some(x),
468 Err(e) => {
469 if e.is_cancelled() {
470 trace!("task cancelled: {e}");
471 } else {
472 error!("task failed: {e}");
473 }
474 None
475 }
476 }
477 }
478
479 pub async fn run(mut self) {
480 let shutdown = loop {
481 tokio::select! {
482 cmd = self.commands.recv() => {
483 let Some(cmd) = cmd else {
484 break None;
487 };
488 if let Some(cmd) = self.handle_command(cmd).await {
489 break Some(cmd);
490 }
491 }
492 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
493 let Some(res) = self.log_task_result(res) else {
494 continue;
495 };
496 match res {
497 TaskResult::Import(res) => {
498 self.finish_import(res).await;
499 }
500 TaskResult::Scope(scope) => {
501 self.temp_tags.end_scope(scope);
502 }
503 TaskResult::Unit(_) => {}
504 }
505 if self.tasks.is_empty() {
506 for tx in self.idle_waiters.drain(..) {
508 tx.send(()).await.ok();
509 }
510 }
511 }
512 }
513 };
514 if let Some(shutdown) = shutdown {
515 shutdown.tx.send(()).await.ok();
516 }
517 }
518}
519
520async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
521 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
522 error!("batch failed: {cause}");
523 }
524 id
525}
526
527async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
528 let BatchMsg { tx, mut rx, .. } = cmd;
529 trace!("created scope {}", id);
530 tx.send(id).await.map_err(api::Error::other)?;
531 while let Some(msg) = rx.recv().await? {
532 match msg {
533 BatchResponse::Drop(msg) => scope.on_drop(&msg),
534 BatchResponse::Ping => {}
535 }
536 }
537 Ok(())
538}
539
540async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
541 let Some(entry) = entry else {
542 let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
543 cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
544 return;
545 };
546 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
547 cmd.tx
548 .send(ExportRangesItem::Error(cause.into()))
549 .await
550 .ok();
551 }
552}
553
554async fn export_ranges_impl(
555 cmd: ExportRangesRequest,
556 tx: &mut mpsc::Sender<ExportRangesItem>,
557 entry: BaoFileHandle,
558) -> io::Result<()> {
559 let ExportRangesRequest { ranges, hash } = cmd;
560 let bitfield = entry.bitfield();
561 trace!(
562 "exporting ranges: {hash} {ranges:?} size={}",
563 bitfield.size()
564 );
565 debug_assert!(entry.hash() == hash, "hash mismatch");
566 let data = entry.data_reader();
567 let size = bitfield.size();
568 for range in ranges.iter() {
569 let range = match range {
570 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
571 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
572 };
573 let requested = ChunkRanges::bytes(range.start..range.end);
574 if !bitfield.ranges.is_superset(&requested) {
575 return Err(io::Error::other(format!(
576 "missing range: {requested:?}, present: {bitfield:?}",
577 )));
578 }
579 let bs = 1024;
580 let mut offset = range.start;
581 loop {
582 let end: u64 = (offset + bs).min(range.end);
583 let size = (end - offset) as usize;
584 tx.send(
585 Leaf {
586 offset,
587 data: data.read_bytes_at(offset, size)?,
588 }
589 .into(),
590 )
591 .await?;
592 offset = end;
593 if offset >= range.end {
594 break;
595 }
596 }
597 }
598 Ok(())
599}
600
601fn chunk_range(leaf: &Leaf) -> ChunkRanges {
602 let start = ChunkNum::chunks(leaf.offset);
603 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
604 (start..end).into()
605}
606
607async fn import_bao(
608 entry: BaoFileHandle,
609 size: NonZeroU64,
610 mut stream: mpsc::Receiver<BaoContentItem>,
611 tx: irpc::channel::oneshot::Sender<api::Result<()>>,
612) {
613 let size = size.get();
614 entry
615 .0
616 .state
617 .send_if_modified(|state: &mut BaoFileStorage| {
618 let BaoFileStorage::Partial(entry) = state else {
619 return false;
621 };
622 entry.size.write(0, size);
623 false
624 });
625 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
626 while let Some(item) = stream.recv().await.unwrap() {
627 entry.0.state.send_if_modified(|state| {
628 let BaoFileStorage::Partial(partial) = state else {
629 return false;
631 };
632 match item {
633 BaoContentItem::Parent(parent) => {
634 if let Some(offset) = tree.pre_order_offset(parent.node) {
635 let mut pair = [0u8; 64];
636 pair[..32].copy_from_slice(parent.pair.0.as_bytes());
637 pair[32..].copy_from_slice(parent.pair.1.as_bytes());
638 partial
639 .outboard
640 .write_at(offset * 64, &pair)
641 .expect("writing to mem can never fail");
642 }
643 false
644 }
645 BaoContentItem::Leaf(leaf) => {
646 let start = leaf.offset;
647 partial
648 .data
649 .write_at(start, &leaf.data)
650 .expect("writing to mem can never fail");
651 let added = chunk_range(&leaf);
652 let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
653 if update.new_state().complete {
654 let data = std::mem::take(&mut partial.data);
655 let outboard = std::mem::take(&mut partial.outboard);
656 let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
657 let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
658 *state = CompleteStorage::new(data, outboard).into();
659 }
660 update.changed()
661 }
662 }
663 });
664 }
665 tx.send(Ok(())).await.ok();
666}
667
668#[instrument(skip_all, fields(hash = tracing::field::Empty))]
669async fn export_bao(
670 entry: Option<BaoFileHandle>,
671 ranges: ChunkRanges,
672 mut sender: mpsc::Sender<EncodedItem>,
673) {
674 let Some(entry) = entry else {
675 let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
676 sender.send(err.into()).await.ok();
677 return;
678 };
679 tracing::Span::current().record("hash", tracing::field::display(entry.hash));
680 let data = entry.data_reader();
681 let outboard = entry.outboard_reader();
682 let tx = BaoTreeSender::new(&mut sender);
683 traverse_ranges_validated(data, outboard, &ranges, tx)
684 .await
685 .ok();
686}
687
688#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
689async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
690 entry.subscribe().forward(tx).await.ok();
691}
692
693async fn import_bytes(
694 data: Bytes,
695 scope: Scope,
696 format: BlobFormat,
697 tx: mpsc::Sender<AddProgressItem>,
698) -> anyhow::Result<ImportEntry> {
699 tx.send(AddProgressItem::Size(data.len() as u64)).await?;
700 tx.send(AddProgressItem::CopyDone).await?;
701 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
702 Ok(ImportEntry {
703 data,
704 outboard,
705 scope,
706 format,
707 tx,
708 })
709}
710
711async fn import_byte_stream(
712 scope: Scope,
713 format: BlobFormat,
714 mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
715 tx: mpsc::Sender<AddProgressItem>,
716) -> anyhow::Result<ImportEntry> {
717 let mut res = Vec::new();
718 loop {
719 match rx.recv().await {
720 Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
721 res.extend_from_slice(&data);
722 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
723 .await?;
724 }
725 Ok(Some(ImportByteStreamUpdate::Done)) => {
726 break;
727 }
728 Ok(None) => {
729 return Err(api::Error::io(
730 io::ErrorKind::UnexpectedEof,
731 "byte stream ended unexpectedly",
732 )
733 .into());
734 }
735 Err(e) => {
736 return Err(e.into());
737 }
738 }
739 }
740 import_bytes(res.into(), scope, format, tx).await
741}
742
743#[instrument(skip_all, fields(path = %cmd.path.display()))]
744async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
745 let ImportPathMsg {
746 inner:
747 ImportPathRequest {
748 path,
749 scope,
750 format,
751 ..
752 },
753 tx,
754 ..
755 } = cmd;
756 let mut res = Vec::new();
757 let mut file = tokio::fs::File::open(path).await?;
758 let mut buf = [0u8; 1024 * 64];
759 loop {
760 let size = file.read(&mut buf).await?;
761 if size == 0 {
762 break;
763 }
764 res.extend_from_slice(&buf[..size]);
765 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
766 .await?;
767 }
768 import_bytes(res.into(), scope, format, tx).await
769}
770
771#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
772async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
773 let ExportPathMsg { inner, mut tx, .. } = cmd;
774 let Some(entry) = entry else {
775 tx.send(ExportProgressItem::Error(api::Error::io(
776 io::ErrorKind::NotFound,
777 "hash not found",
778 )))
779 .await
780 .ok();
781 return;
782 };
783 match export_path_impl(entry, inner, &mut tx).await {
784 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
785 Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
786 };
787}
788
789async fn export_path_impl(
790 entry: BaoFileHandle,
791 cmd: ExportPathRequest,
792 tx: &mut mpsc::Sender<ExportProgressItem>,
793) -> io::Result<()> {
794 let ExportPathRequest { target, .. } = cmd;
795 if !target.is_absolute() {
796 return Err(io::Error::new(
797 io::ErrorKind::InvalidInput,
798 "path is not absolute",
799 ));
800 }
801 if let Some(parent) = target.parent() {
802 std::fs::create_dir_all(parent)?;
803 }
804 let mut file = std::fs::File::create(target)?;
806 let size = entry.0.state.borrow().size();
807 tx.send(ExportProgressItem::Size(size)).await?;
808 let mut buf = [0u8; 1024 * 64];
809 for offset in (0..size).step_by(1024 * 64) {
810 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
811 let buf = &mut buf[..len];
812 entry.0.state.borrow().data().read_exact_at(offset, buf)?;
813 file.write_all(buf)?;
814 tx.try_send(ExportProgressItem::CopyProgress(offset))
815 .await
816 .map_err(|_e| io::Error::other(""))?;
817 yield_now().await;
818 }
819 Ok(())
820}
821
822struct ImportEntry {
823 scope: Scope,
824 format: BlobFormat,
825 data: Bytes,
826 outboard: PreOrderMemOutboard,
827 tx: mpsc::Sender<AddProgressItem>,
828}
829
830pub struct DataReader(BaoFileHandle);
831
832impl ReadBytesAt for DataReader {
833 fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
834 let entry = self.0 .0.state.borrow();
835 entry.data().read_bytes_at(offset, size)
836 }
837}
838
839pub struct OutboardReader {
840 hash: blake3::Hash,
841 tree: BaoTree,
842 data: BaoFileHandle,
843}
844
845impl Outboard for OutboardReader {
846 fn root(&self) -> blake3::Hash {
847 self.hash
848 }
849
850 fn tree(&self) -> BaoTree {
851 self.tree
852 }
853
854 fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
855 let Some(offset) = self.tree.pre_order_offset(node) else {
856 return Ok(None);
857 };
858 let mut buf = [0u8; 64];
859 let size = self
860 .data
861 .0
862 .state
863 .borrow()
864 .outboard()
865 .read_at(offset * 64, &mut buf)?;
866 if size != 64 {
867 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
868 }
869 let left: [u8; 32] = buf[..32].try_into().unwrap();
870 let right: [u8; 32] = buf[32..].try_into().unwrap();
871 Ok(Some((left.into(), right.into())))
872 }
873}
874
875struct State {
876 data: HashMap<Hash, BaoFileHandle>,
877 tags: BTreeMap<Tag, HashAndFormat>,
878 empty_hash: BaoFileHandle,
879}
880
881#[derive(Debug, derive_more::From)]
882pub enum BaoFileStorage {
883 Partial(PartialMemStorage),
884 Complete(CompleteStorage),
885}
886
887impl BaoFileStorage {
888 pub fn bitfield(&self) -> Bitfield {
890 match self {
891 Self::Partial(entry) => entry.bitfield.clone(),
892 Self::Complete(entry) => Bitfield::complete(entry.size()),
893 }
894 }
895}
896
897#[derive(Debug)]
898pub struct BaoFileHandleInner {
899 state: watch::Sender<BaoFileStorage>,
900 hash: Hash,
901}
902
903#[derive(Debug, Clone, derive_more::Deref)]
905pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
906
907impl BaoFileHandle {
908 pub fn new_partial(hash: Hash) -> Self {
909 let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
910 data: SparseMemFile::new(),
911 outboard: SparseMemFile::new(),
912 size: SizeInfo::default(),
913 bitfield: Bitfield::empty(),
914 }));
915 Self(Arc::new(BaoFileHandleInner { state, hash }))
916 }
917
918 pub fn hash(&self) -> Hash {
919 self.hash
920 }
921
922 pub fn bitfield(&self) -> Bitfield {
923 self.0.state.borrow().bitfield()
924 }
925
926 pub fn subscribe(&self) -> BaoFileStorageSubscriber {
927 BaoFileStorageSubscriber::new(self.0.state.subscribe())
928 }
929
930 pub fn data_reader(&self) -> DataReader {
931 DataReader(self.clone())
932 }
933
934 pub fn outboard_reader(&self) -> OutboardReader {
935 let entry = self.0.state.borrow();
936 let hash = self.hash.into();
937 let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
938 OutboardReader {
939 hash,
940 tree,
941 data: self.clone(),
942 }
943 }
944}
945
946impl Default for BaoFileStorage {
947 fn default() -> Self {
948 Self::Partial(Default::default())
949 }
950}
951
952impl BaoFileStorage {
953 fn data(&self) -> &[u8] {
954 match self {
955 Self::Partial(entry) => entry.data.as_ref(),
956 Self::Complete(entry) => &entry.data,
957 }
958 }
959
960 fn outboard(&self) -> &[u8] {
961 match self {
962 Self::Partial(entry) => entry.outboard.as_ref(),
963 Self::Complete(entry) => &entry.outboard,
964 }
965 }
966
967 fn size(&self) -> u64 {
968 match self {
969 Self::Partial(entry) => entry.current_size(),
970 Self::Complete(entry) => entry.size(),
971 }
972 }
973}
974
975#[derive(Debug, Clone)]
976pub struct CompleteStorage {
977 pub(crate) data: Bytes,
978 pub(crate) outboard: Bytes,
979}
980
981impl CompleteStorage {
982 pub fn create(data: Bytes) -> (Hash, Self) {
983 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
984 let hash = outboard.root().into();
985 let outboard = outboard.data.into();
986 let entry = Self::new(data, outboard);
987 (hash, entry)
988 }
989
990 pub fn new(data: Bytes, outboard: Bytes) -> Self {
991 Self { data, outboard }
992 }
993
994 pub fn size(&self) -> u64 {
995 self.data.len() as u64
996 }
997}
998
999#[allow(dead_code)]
1000fn print_outboard(hashes: &[u8]) {
1001 assert!(hashes.len() % 64 == 0);
1002 for chunk in hashes.chunks(64) {
1003 let left: [u8; 32] = chunk[..32].try_into().unwrap();
1004 let right: [u8; 32] = chunk[32..].try_into().unwrap();
1005 let left = blake3::Hash::from(left);
1006 let right = blake3::Hash::from(right);
1007 println!("l: {left:?}, r: {right:?}");
1008 }
1009}
1010
1011pub struct BaoFileStorageSubscriber {
1012 receiver: watch::Receiver<BaoFileStorage>,
1013}
1014
1015impl BaoFileStorageSubscriber {
1016 pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1017 Self { receiver }
1018 }
1019
1020 pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1024 let value = self.receiver.borrow().bitfield();
1025 tx.send(value).await?;
1026 loop {
1027 self.update_or_closed(&mut tx).await?;
1028 let value = self.receiver.borrow().bitfield();
1029 tx.send(value.clone()).await?;
1030 }
1031 }
1032
1033 #[allow(dead_code)]
1037 pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1038 let value = self.receiver.borrow().bitfield();
1039 let mut old = value.clone();
1040 tx.send(value).await?;
1041 loop {
1042 self.update_or_closed(&mut tx).await?;
1043 let new = self.receiver.borrow().bitfield();
1044 let diff = old.diff(&new);
1045 if diff.is_empty() {
1046 continue;
1047 }
1048 tx.send(diff).await?;
1049 old = new;
1050 }
1051 }
1052
1053 async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1054 tokio::select! {
1055 _ = tx.closed() => {
1056 Err(irpc::channel::SendError::ReceiverClosed.into())
1058 }
1059 e = self.receiver.changed() => Ok(e?),
1060 }
1061 }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use n0_future::StreamExt;
1067 use testresult::TestResult;
1068
1069 use super::*;
1070
1071 #[tokio::test]
1072 async fn smoke() -> TestResult<()> {
1073 let store = MemStore::new();
1074 let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1075 let hash = *tt.hash();
1076 println!("hash: {hash:?}");
1077 let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1078 while let Some(item) = stream.next().await {
1079 println!("item: {item:?}");
1080 }
1081 let stream = store.export_bao(hash, ChunkRanges::all());
1082 let exported = stream.bao_to_vec().await?;
1083
1084 let store2 = MemStore::new();
1085 let mut or = store2.observe(hash).stream().await?;
1086 tokio::spawn(async move {
1087 while let Some(event) = or.next().await {
1088 println!("event: {event:?}");
1089 }
1090 });
1091 store2
1092 .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1093 .await?;
1094
1095 let exported2 = store2
1096 .export_bao(hash, ChunkRanges::all())
1097 .bao_to_vec()
1098 .await?;
1099 assert_eq!(exported, exported2);
1100
1101 Ok(())
1102 }
1103}