1use std::{
9 collections::HashMap,
10 io::{self, Write},
11 ops::Deref,
12 path::PathBuf,
13};
14
15use bao_tree::{
16 io::{
17 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
18 outboard::PreOrderMemOutboard,
19 sync::ReadAt,
20 Leaf,
21 },
22 BaoTree, ChunkRanges,
23};
24use bytes::Bytes;
25use irpc::channel::mpsc;
26use n0_future::{
27 future::{self, yield_now},
28 task::{JoinError, JoinSet},
29};
30use range_collections::range_set::RangeSetRange;
31use ref_cast::RefCast;
32
33use super::util::BaoTreeSender;
34use crate::{
35 api::{
36 self,
37 blobs::{Bitfield, ExportProgressItem},
38 proto::{
39 self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
40 ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
41 ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
42 ObserveRequest, WaitIdleMsg,
43 },
44 ApiClient, TempTag,
45 },
46 protocol::ChunkRangesExt,
47 store::{mem::CompleteStorage, IROH_BLOCK_SIZE},
48 Hash,
49};
50
51#[derive(Debug, Clone)]
52pub struct ReadonlyMemStore {
53 client: ApiClient,
54}
55
56impl Deref for ReadonlyMemStore {
57 type Target = crate::api::Store;
58
59 fn deref(&self) -> &Self::Target {
60 crate::api::Store::ref_from_sender(&self.client)
61 }
62}
63
64impl From<ReadonlyMemStore> for crate::api::Store {
65 fn from(value: ReadonlyMemStore) -> Self {
66 crate::api::Store::from_sender(value.client)
67 }
68}
69
70impl AsRef<crate::api::Store> for ReadonlyMemStore {
71 fn as_ref(&self) -> &crate::api::Store {
72 crate::api::Store::ref_from_sender(&self.client)
73 }
74}
75
76struct Actor {
77 commands: tokio::sync::mpsc::Receiver<proto::Command>,
78 tasks: JoinSet<()>,
79 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
80 data: HashMap<Hash, CompleteStorage>,
81}
82
83impl Actor {
84 fn new(
85 commands: tokio::sync::mpsc::Receiver<proto::Command>,
86 data: HashMap<Hash, CompleteStorage>,
87 ) -> Self {
88 Self {
89 data,
90 commands,
91 tasks: JoinSet::new(),
92 idle_waiters: Vec::new(),
93 }
94 }
95
96 async fn handle_command(&mut self, cmd: Command) -> Option<irpc::channel::oneshot::Sender<()>> {
97 match cmd {
98 Command::ImportBao(ImportBaoMsg { tx, .. }) => {
99 tx.send(Err(unsupported("import not supported").into()))
100 .await
101 .ok();
102 }
103 Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
104 if self.tasks.is_empty() {
105 tx.send(()).await.ok();
107 } else {
108 self.idle_waiters.push(tx);
110 }
111 }
112 Command::ImportBytes(ImportBytesMsg { tx, .. }) => {
113 tx.send(unsupported("import not supported").into())
114 .await
115 .ok();
116 }
117 Command::ImportByteStream(ImportByteStreamMsg { tx, .. }) => {
118 tx.send(unsupported("import not supported").into())
119 .await
120 .ok();
121 }
122 Command::ImportPath(ImportPathMsg { tx, .. }) => {
123 tx.send(unsupported("import not supported").into())
124 .await
125 .ok();
126 }
127 Command::Observe(ObserveMsg {
128 inner: ObserveRequest { hash },
129 tx,
130 ..
131 }) => {
132 let size = self.data.get_mut(&hash).map(|x| x.data.len() as u64);
133 self.tasks.spawn(async move {
134 if let Some(size) = size {
135 tx.send(Bitfield::complete(size)).await.ok();
136 } else {
137 tx.send(Bitfield::empty()).await.ok();
138 future::pending::<()>().await;
139 };
140 });
141 }
142 Command::ExportBao(ExportBaoMsg {
143 inner: ExportBaoRequest { hash, ranges, .. },
144 tx,
145 ..
146 }) => {
147 let entry = self.data.get(&hash).cloned();
148 self.tasks.spawn(export_bao(hash, entry, ranges, tx));
149 }
150 Command::ExportPath(ExportPathMsg {
151 inner: ExportPathRequest { hash, target, .. },
152 tx,
153 ..
154 }) => {
155 let entry = self.data.get(&hash).cloned();
156 self.tasks.spawn(export_path(entry, target, tx));
157 }
158 Command::Batch(_cmd) => {}
159 Command::ClearProtected(cmd) => {
160 cmd.tx.send(Ok(())).await.ok();
161 }
162 Command::CreateTag(cmd) => {
163 cmd.tx
164 .send(Err(unsupported("create tag not supported").into()))
165 .await
166 .ok();
167 }
168 Command::CreateTempTag(cmd) => {
169 cmd.tx.send(TempTag::new(cmd.inner.value, None)).await.ok();
170 }
171 Command::RenameTag(cmd) => {
172 cmd.tx
173 .send(Err(unsupported("rename tag not supported").into()))
174 .await
175 .ok();
176 }
177 Command::DeleteTags(cmd) => {
178 cmd.tx
179 .send(Err(unsupported("delete tags not supported").into()))
180 .await
181 .ok();
182 }
183 Command::DeleteBlobs(cmd) => {
184 cmd.tx
185 .send(Err(unsupported("delete blobs not supported").into()))
186 .await
187 .ok();
188 }
189 Command::ListBlobs(cmd) => {
190 let hashes: Vec<Hash> = self.data.keys().cloned().collect();
191 self.tasks.spawn(async move {
192 for hash in hashes {
193 cmd.tx.send(Ok(hash)).await.ok();
194 }
195 });
196 }
197 Command::BlobStatus(cmd) => {
198 let hash = cmd.inner.hash;
199 let entry = self.data.get(&hash);
200 let status = if let Some(entry) = entry {
201 BlobStatus::Complete {
202 size: entry.data.len() as u64,
203 }
204 } else {
205 BlobStatus::NotFound
206 };
207 cmd.tx.send(status).await.ok();
208 }
209 Command::ListTags(cmd) => {
210 cmd.tx.send(Vec::new()).await.ok();
211 }
212 Command::SetTag(cmd) => {
213 cmd.tx
214 .send(Err(unsupported("set tag not supported").into()))
215 .await
216 .ok();
217 }
218 Command::ListTempTags(cmd) => {
219 cmd.tx.send(Vec::new()).await.ok();
220 }
221 Command::SyncDb(cmd) => {
222 cmd.tx.send(Ok(())).await.ok();
223 }
224 Command::Shutdown(cmd) => {
225 return Some(cmd.tx);
226 }
227 Command::ExportRanges(cmd) => {
228 let entry = self.data.get(&cmd.inner.hash).cloned();
229 self.tasks.spawn(export_ranges(cmd, entry));
230 }
231 }
232 None
233 }
234
235 fn log_unit_task(&self, res: Result<(), JoinError>) {
236 if let Err(e) = res {
237 tracing::error!("task failed: {e}");
238 }
239 }
240
241 async fn run(mut self) {
242 loop {
243 tokio::select! {
244 Some(cmd) = self.commands.recv() => {
245 if let Some(shutdown) = self.handle_command(cmd).await {
246 shutdown.send(()).await.ok();
247 break;
248 }
249 },
250 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
251 self.log_unit_task(res);
252 if self.tasks.is_empty() {
253 for tx in self.idle_waiters.drain(..) {
255 tx.send(()).await.ok();
256 }
257 }
258 },
259 else => break,
260 }
261 }
262 }
263}
264
265fn unsupported(text: &str) -> io::Error {
266 io::Error::new(io::ErrorKind::Unsupported, text)
267}
268
269async fn export_bao(
270 hash: Hash,
271 entry: Option<CompleteStorage>,
272 ranges: ChunkRanges,
273 mut sender: mpsc::Sender<EncodedItem>,
274) {
275 let entry = match entry {
276 Some(entry) => entry,
277 None => {
278 sender
279 .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(
280 io::Error::new(
281 io::ErrorKind::UnexpectedEof,
282 "export task ended unexpectedly",
283 ),
284 )))
285 .await
286 .ok();
287 return;
288 }
289 };
290 let data = entry.data;
291 let outboard = entry.outboard;
292 let size = data.as_ref().len() as u64;
293 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
294 let outboard = PreOrderMemOutboard {
295 root: hash.into(),
296 tree,
297 data: outboard,
298 };
299 let sender = BaoTreeSender::ref_cast_mut(&mut sender);
300 traverse_ranges_validated(data.as_ref(), outboard, &ranges, sender)
301 .await
302 .ok();
303}
304
305async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<CompleteStorage>) {
306 let Some(entry) = entry else {
307 cmd.tx
308 .send(ExportRangesItem::Error(api::Error::io(
309 io::ErrorKind::NotFound,
310 "hash not found",
311 )))
312 .await
313 .ok();
314 return;
315 };
316 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
317 cmd.tx
318 .send(ExportRangesItem::Error(cause.into()))
319 .await
320 .ok();
321 }
322}
323
324async fn export_ranges_impl(
325 cmd: ExportRangesRequest,
326 tx: &mut mpsc::Sender<ExportRangesItem>,
327 entry: CompleteStorage,
328) -> io::Result<()> {
329 let ExportRangesRequest { ranges, .. } = cmd;
330 let data = entry.data;
331 let size = data.len() as u64;
332 let bitfield = Bitfield::complete(size);
333 for range in ranges.iter() {
334 let range = match range {
335 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
336 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
337 };
338 let requested = ChunkRanges::bytes(range.start..range.end);
339 if !bitfield.ranges.is_superset(&requested) {
340 return Err(io::Error::other(format!(
341 "missing range: {requested:?}, present: {bitfield:?}",
342 )));
343 }
344 let bs = 1024;
345 let mut offset = range.start;
346 loop {
347 let end: u64 = (offset + bs).min(range.end);
348 let size = (end - offset) as usize;
349 tx.send(
350 Leaf {
351 offset,
352 data: data.read_bytes_at(offset, size)?,
353 }
354 .into(),
355 )
356 .await?;
357 offset = end;
358 if offset >= range.end {
359 break;
360 }
361 }
362 }
363 Ok(())
364}
365
366impl ReadonlyMemStore {
367 pub fn new(items: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
368 let mut entries = HashMap::new();
369 for item in items {
370 let data = Bytes::copy_from_slice(item.as_ref());
371 let (hash, entry) = CompleteStorage::create(data);
372 entries.insert(hash, entry);
373 }
374 let (sender, receiver) = tokio::sync::mpsc::channel(1);
375 let actor = Actor::new(receiver, entries);
376 n0_future::task::spawn(actor.run());
377 let local = irpc::LocalSender::from(sender);
378 Self {
379 client: local.into(),
380 }
381 }
382}
383
384async fn export_path(
385 entry: Option<CompleteStorage>,
386 target: PathBuf,
387 mut tx: mpsc::Sender<ExportProgressItem>,
388) {
389 let Some(entry) = entry else {
390 tx.send(api::Error::io(io::ErrorKind::NotFound, "hash not found").into())
391 .await
392 .ok();
393 return;
394 };
395 match export_path_impl(entry, target, &mut tx).await {
396 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
397 Err(cause) => tx.send(api::Error::from(cause).into()).await.ok(),
398 };
399}
400
401async fn export_path_impl(
402 entry: CompleteStorage,
403 target: PathBuf,
404 tx: &mut mpsc::Sender<ExportProgressItem>,
405) -> io::Result<()> {
406 let data = entry.data;
407 let mut file = std::fs::File::create(&target)?;
409 let size = data.len() as u64;
410 tx.send(ExportProgressItem::Size(size)).await?;
411 let mut buf = [0u8; 1024 * 64];
412 for offset in (0..size).step_by(1024 * 64) {
413 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
414 let buf = &mut buf[..len];
415 data.as_ref().read_exact_at(offset, buf)?;
416 file.write_all(buf)?;
417 tx.try_send(ExportProgressItem::CopyProgress(offset))
418 .await?;
419 yield_now().await;
420 }
421 Ok(())
422}