1use std::{
9 collections::HashMap,
10 io::{self, Write},
11 ops::Deref,
12 path::PathBuf,
13};
14
15use bao_tree::{
16 io::{
17 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
18 outboard::PreOrderMemOutboard,
19 sync::ReadAt,
20 Leaf,
21 },
22 BaoTree, ChunkRanges,
23};
24use bytes::Bytes;
25use irpc::channel::mpsc;
26use n0_future::{
27 future::{self, yield_now},
28 task::{JoinError, JoinSet},
29};
30use range_collections::range_set::RangeSetRange;
31use ref_cast::RefCast;
32
33use super::util::BaoTreeSender;
34use crate::{
35 api::{
36 self,
37 blobs::{Bitfield, ExportProgressItem},
38 proto::{
39 self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
40 ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
41 ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
42 ObserveRequest, WaitIdleMsg,
43 },
44 ApiClient, TempTag,
45 },
46 protocol::ChunkRangesExt,
47 store::{mem::CompleteStorage, IROH_BLOCK_SIZE},
48 Hash,
49};
50
51#[derive(Debug, Clone)]
52pub struct ReadonlyMemStore {
53 client: ApiClient,
54}
55
56impl Deref for ReadonlyMemStore {
57 type Target = crate::api::Store;
58
59 fn deref(&self) -> &Self::Target {
60 crate::api::Store::ref_from_sender(&self.client)
61 }
62}
63
64impl From<ReadonlyMemStore> for crate::api::Store {
65 fn from(value: ReadonlyMemStore) -> Self {
66 crate::api::Store::from_sender(value.client)
67 }
68}
69
70impl AsRef<crate::api::Store> for ReadonlyMemStore {
71 fn as_ref(&self) -> &crate::api::Store {
72 crate::api::Store::ref_from_sender(&self.client)
73 }
74}
75
76struct Actor {
77 commands: tokio::sync::mpsc::Receiver<proto::Command>,
78 tasks: JoinSet<()>,
79 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
80 data: HashMap<Hash, CompleteStorage>,
81}
82
83impl Actor {
84 fn new(
85 commands: tokio::sync::mpsc::Receiver<proto::Command>,
86 data: HashMap<Hash, CompleteStorage>,
87 ) -> Self {
88 Self {
89 data,
90 commands,
91 tasks: JoinSet::new(),
92 idle_waiters: Vec::new(),
93 }
94 }
95
96 async fn handle_command(&mut self, cmd: Command) -> Option<irpc::channel::oneshot::Sender<()>> {
97 match cmd {
98 Command::ImportBao(ImportBaoMsg { tx, .. }) => {
99 tx.send(Err(api::Error::Io(io::Error::other(
100 "import not supported",
101 ))))
102 .await
103 .ok();
104 }
105 Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
106 if self.tasks.is_empty() {
107 tx.send(()).await.ok();
109 } else {
110 self.idle_waiters.push(tx);
112 }
113 }
114 Command::ImportBytes(ImportBytesMsg { tx, .. }) => {
115 tx.send(io::Error::other("import not supported").into())
116 .await
117 .ok();
118 }
119 Command::ImportByteStream(ImportByteStreamMsg { tx, .. }) => {
120 tx.send(io::Error::other("import not supported").into())
121 .await
122 .ok();
123 }
124 Command::ImportPath(ImportPathMsg { tx, .. }) => {
125 tx.send(io::Error::other("import not supported").into())
126 .await
127 .ok();
128 }
129 Command::Observe(ObserveMsg {
130 inner: ObserveRequest { hash },
131 tx,
132 ..
133 }) => {
134 let size = self.data.get_mut(&hash).map(|x| x.data.len() as u64);
135 self.tasks.spawn(async move {
136 if let Some(size) = size {
137 tx.send(Bitfield::complete(size)).await.ok();
138 } else {
139 tx.send(Bitfield::empty()).await.ok();
140 future::pending::<()>().await;
141 };
142 });
143 }
144 Command::ExportBao(ExportBaoMsg {
145 inner: ExportBaoRequest { hash, ranges, .. },
146 tx,
147 ..
148 }) => {
149 let entry = self.data.get(&hash).cloned();
150 self.tasks.spawn(export_bao(hash, entry, ranges, tx));
151 }
152 Command::ExportPath(ExportPathMsg {
153 inner: ExportPathRequest { hash, target, .. },
154 tx,
155 ..
156 }) => {
157 let entry = self.data.get(&hash).cloned();
158 self.tasks.spawn(export_path(entry, target, tx));
159 }
160 Command::Batch(_cmd) => {}
161 Command::ClearProtected(cmd) => {
162 cmd.tx.send(Ok(())).await.ok();
163 }
164 Command::CreateTag(cmd) => {
165 cmd.tx
166 .send(Err(io::Error::other("create tag not supported").into()))
167 .await
168 .ok();
169 }
170 Command::CreateTempTag(cmd) => {
171 cmd.tx.send(TempTag::new(cmd.inner.value, None)).await.ok();
172 }
173 Command::RenameTag(cmd) => {
174 cmd.tx
175 .send(Err(io::Error::other("rename tag not supported").into()))
176 .await
177 .ok();
178 }
179 Command::DeleteTags(cmd) => {
180 cmd.tx
181 .send(Err(io::Error::other("delete tags not supported").into()))
182 .await
183 .ok();
184 }
185 Command::DeleteBlobs(cmd) => {
186 cmd.tx
187 .send(Err(io::Error::other("delete blobs not supported").into()))
188 .await
189 .ok();
190 }
191 Command::ListBlobs(cmd) => {
192 let hashes: Vec<Hash> = self.data.keys().cloned().collect();
193 self.tasks.spawn(async move {
194 for hash in hashes {
195 cmd.tx.send(Ok(hash)).await.ok();
196 }
197 });
198 }
199 Command::BlobStatus(cmd) => {
200 let hash = cmd.inner.hash;
201 let entry = self.data.get(&hash);
202 let status = if let Some(entry) = entry {
203 BlobStatus::Complete {
204 size: entry.data.len() as u64,
205 }
206 } else {
207 BlobStatus::NotFound
208 };
209 cmd.tx.send(status).await.ok();
210 }
211 Command::ListTags(cmd) => {
212 cmd.tx.send(Vec::new()).await.ok();
213 }
214 Command::SetTag(cmd) => {
215 cmd.tx
216 .send(Err(io::Error::other("set tag not supported").into()))
217 .await
218 .ok();
219 }
220 Command::ListTempTags(cmd) => {
221 cmd.tx.send(Vec::new()).await.ok();
222 }
223 Command::SyncDb(cmd) => {
224 cmd.tx.send(Ok(())).await.ok();
225 }
226 Command::Shutdown(cmd) => {
227 return Some(cmd.tx);
228 }
229 Command::ExportRanges(cmd) => {
230 let entry = self.data.get(&cmd.inner.hash).cloned();
231 self.tasks.spawn(export_ranges(cmd, entry));
232 }
233 }
234 None
235 }
236
237 fn log_unit_task(&self, res: Result<(), JoinError>) {
238 if let Err(e) = res {
239 tracing::error!("task failed: {e}");
240 }
241 }
242
243 async fn run(mut self) {
244 loop {
245 tokio::select! {
246 Some(cmd) = self.commands.recv() => {
247 if let Some(shutdown) = self.handle_command(cmd).await {
248 shutdown.send(()).await.ok();
249 break;
250 }
251 },
252 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
253 self.log_unit_task(res);
254 if self.tasks.is_empty() {
255 for tx in self.idle_waiters.drain(..) {
257 tx.send(()).await.ok();
258 }
259 }
260 },
261 else => break,
262 }
263 }
264 }
265}
266
267async fn export_bao(
268 hash: Hash,
269 entry: Option<CompleteStorage>,
270 ranges: ChunkRanges,
271 mut sender: mpsc::Sender<EncodedItem>,
272) {
273 let entry = match entry {
274 Some(entry) => entry,
275 None => {
276 sender
277 .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(
278 io::Error::new(
279 io::ErrorKind::UnexpectedEof,
280 "export task ended unexpectedly",
281 ),
282 )))
283 .await
284 .ok();
285 return;
286 }
287 };
288 let data = entry.data;
289 let outboard = entry.outboard;
290 let size = data.as_ref().len() as u64;
291 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
292 let outboard = PreOrderMemOutboard {
293 root: hash.into(),
294 tree,
295 data: outboard,
296 };
297 let sender = BaoTreeSender::ref_cast_mut(&mut sender);
298 traverse_ranges_validated(data.as_ref(), outboard, &ranges, sender)
299 .await
300 .ok();
301}
302
303async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<CompleteStorage>) {
304 let Some(entry) = entry else {
305 cmd.tx
306 .send(ExportRangesItem::Error(api::Error::io(
307 io::ErrorKind::NotFound,
308 "hash not found",
309 )))
310 .await
311 .ok();
312 return;
313 };
314 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
315 cmd.tx
316 .send(ExportRangesItem::Error(cause.into()))
317 .await
318 .ok();
319 }
320}
321
322async fn export_ranges_impl(
323 cmd: ExportRangesRequest,
324 tx: &mut mpsc::Sender<ExportRangesItem>,
325 entry: CompleteStorage,
326) -> io::Result<()> {
327 let ExportRangesRequest { ranges, .. } = cmd;
328 let data = entry.data;
329 let size = data.len() as u64;
330 let bitfield = Bitfield::complete(size);
331 for range in ranges.iter() {
332 let range = match range {
333 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
334 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
335 };
336 let requested = ChunkRanges::bytes(range.start..range.end);
337 if !bitfield.ranges.is_superset(&requested) {
338 return Err(io::Error::other(format!(
339 "missing range: {requested:?}, present: {bitfield:?}",
340 )));
341 }
342 let bs = 1024;
343 let mut offset = range.start;
344 loop {
345 let end: u64 = (offset + bs).min(range.end);
346 let size = (end - offset) as usize;
347 tx.send(
348 Leaf {
349 offset,
350 data: data.read_bytes_at(offset, size)?,
351 }
352 .into(),
353 )
354 .await?;
355 offset = end;
356 if offset >= range.end {
357 break;
358 }
359 }
360 }
361 Ok(())
362}
363
364impl ReadonlyMemStore {
365 pub fn new(items: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
366 let mut entries = HashMap::new();
367 for item in items {
368 let data = Bytes::copy_from_slice(item.as_ref());
369 let (hash, entry) = CompleteStorage::create(data);
370 entries.insert(hash, entry);
371 }
372 let (sender, receiver) = tokio::sync::mpsc::channel(1);
373 let actor = Actor::new(receiver, entries);
374 n0_future::task::spawn(actor.run());
375 let local = irpc::LocalSender::from(sender);
376 Self {
377 client: local.into(),
378 }
379 }
380}
381
382async fn export_path(
383 entry: Option<CompleteStorage>,
384 target: PathBuf,
385 mut tx: mpsc::Sender<ExportProgressItem>,
386) {
387 let Some(entry) = entry else {
388 tx.send(api::Error::io(io::ErrorKind::NotFound, "hash not found").into())
389 .await
390 .ok();
391 return;
392 };
393 match export_path_impl(entry, target, &mut tx).await {
394 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
395 Err(cause) => tx.send(api::Error::from(cause).into()).await.ok(),
396 };
397}
398
399async fn export_path_impl(
400 entry: CompleteStorage,
401 target: PathBuf,
402 tx: &mut mpsc::Sender<ExportProgressItem>,
403) -> io::Result<()> {
404 let data = entry.data;
405 let mut file = std::fs::File::create(&target)?;
407 let size = data.len() as u64;
408 tx.send(ExportProgressItem::Size(size)).await?;
409 let mut buf = [0u8; 1024 * 64];
410 for offset in (0..size).step_by(1024 * 64) {
411 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
412 let buf = &mut buf[..len];
413 data.as_ref().read_exact_at(offset, buf)?;
414 file.write_all(buf)?;
415 tx.try_send(ExportProgressItem::CopyProgress(offset))
416 .await
417 .map_err(|_e| io::Error::other("error"))?;
418 yield_now().await;
419 }
420 Ok(())
421}