1use std::{
9 collections::HashMap,
10 io::{self, Write},
11 ops::Deref,
12 path::PathBuf,
13};
14
15use bao_tree::{
16 io::{
17 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
18 outboard::PreOrderMemOutboard,
19 sync::ReadAt,
20 Leaf,
21 },
22 BaoTree, ChunkRanges,
23};
24use bytes::Bytes;
25use irpc::channel::mpsc;
26use n0_future::future::{self, yield_now};
27use range_collections::range_set::RangeSetRange;
28use ref_cast::RefCast;
29use tokio::task::{JoinError, JoinSet};
30
31use super::util::BaoTreeSender;
32use crate::{
33 api::{
34 self,
35 blobs::{Bitfield, ExportProgressItem},
36 proto::{
37 self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
38 ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
39 ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
40 ObserveRequest, WaitIdleMsg,
41 },
42 ApiClient, TempTag,
43 },
44 protocol::ChunkRangesExt,
45 store::{mem::CompleteStorage, IROH_BLOCK_SIZE},
46 Hash,
47};
48
49#[derive(Debug, Clone)]
50pub struct ReadonlyMemStore {
51 client: ApiClient,
52}
53
54impl Deref for ReadonlyMemStore {
55 type Target = crate::api::Store;
56
57 fn deref(&self) -> &Self::Target {
58 crate::api::Store::ref_from_sender(&self.client)
59 }
60}
61
62impl From<ReadonlyMemStore> for crate::api::Store {
63 fn from(value: ReadonlyMemStore) -> Self {
64 crate::api::Store::from_sender(value.client)
65 }
66}
67
68impl AsRef<crate::api::Store> for ReadonlyMemStore {
69 fn as_ref(&self) -> &crate::api::Store {
70 crate::api::Store::ref_from_sender(&self.client)
71 }
72}
73
74struct Actor {
75 commands: tokio::sync::mpsc::Receiver<proto::Command>,
76 tasks: JoinSet<()>,
77 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
78 data: HashMap<Hash, CompleteStorage>,
79}
80
81impl Actor {
82 fn new(
83 commands: tokio::sync::mpsc::Receiver<proto::Command>,
84 data: HashMap<Hash, CompleteStorage>,
85 ) -> Self {
86 Self {
87 data,
88 commands,
89 tasks: JoinSet::new(),
90 idle_waiters: Vec::new(),
91 }
92 }
93
94 async fn handle_command(&mut self, cmd: Command) -> Option<irpc::channel::oneshot::Sender<()>> {
95 match cmd {
96 Command::ImportBao(ImportBaoMsg { tx, .. }) => {
97 tx.send(Err(api::Error::Io(io::Error::other(
98 "import not supported",
99 ))))
100 .await
101 .ok();
102 }
103 Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
104 if self.tasks.is_empty() {
105 tx.send(()).await.ok();
107 } else {
108 self.idle_waiters.push(tx);
110 }
111 }
112 Command::ImportBytes(ImportBytesMsg { tx, .. }) => {
113 tx.send(io::Error::other("import not supported").into())
114 .await
115 .ok();
116 }
117 Command::ImportByteStream(ImportByteStreamMsg { tx, .. }) => {
118 tx.send(io::Error::other("import not supported").into())
119 .await
120 .ok();
121 }
122 Command::ImportPath(ImportPathMsg { tx, .. }) => {
123 tx.send(io::Error::other("import not supported").into())
124 .await
125 .ok();
126 }
127 Command::Observe(ObserveMsg {
128 inner: ObserveRequest { hash },
129 tx,
130 ..
131 }) => {
132 let size = self.data.get_mut(&hash).map(|x| x.data.len() as u64);
133 self.tasks.spawn(async move {
134 if let Some(size) = size {
135 tx.send(Bitfield::complete(size)).await.ok();
136 } else {
137 tx.send(Bitfield::empty()).await.ok();
138 future::pending::<()>().await;
139 };
140 });
141 }
142 Command::ExportBao(ExportBaoMsg {
143 inner: ExportBaoRequest { hash, ranges, .. },
144 tx,
145 ..
146 }) => {
147 let entry = self.data.get(&hash).cloned();
148 self.tasks.spawn(export_bao(hash, entry, ranges, tx));
149 }
150 Command::ExportPath(ExportPathMsg {
151 inner: ExportPathRequest { hash, target, .. },
152 tx,
153 ..
154 }) => {
155 let entry = self.data.get(&hash).cloned();
156 self.tasks.spawn(export_path(entry, target, tx));
157 }
158 Command::Batch(_cmd) => {}
159 Command::ClearProtected(cmd) => {
160 cmd.tx.send(Ok(())).await.ok();
161 }
162 Command::CreateTag(cmd) => {
163 cmd.tx
164 .send(Err(io::Error::other("create tag not supported").into()))
165 .await
166 .ok();
167 }
168 Command::CreateTempTag(cmd) => {
169 cmd.tx.send(TempTag::new(cmd.inner.value, None)).await.ok();
170 }
171 Command::RenameTag(cmd) => {
172 cmd.tx
173 .send(Err(io::Error::other("rename tag not supported").into()))
174 .await
175 .ok();
176 }
177 Command::DeleteTags(cmd) => {
178 cmd.tx
179 .send(Err(io::Error::other("delete tags not supported").into()))
180 .await
181 .ok();
182 }
183 Command::DeleteBlobs(cmd) => {
184 cmd.tx
185 .send(Err(io::Error::other("delete blobs not supported").into()))
186 .await
187 .ok();
188 }
189 Command::ListBlobs(cmd) => {
190 let hashes: Vec<Hash> = self.data.keys().cloned().collect();
191 self.tasks.spawn(async move {
192 for hash in hashes {
193 cmd.tx.send(Ok(hash)).await.ok();
194 }
195 });
196 }
197 Command::BlobStatus(cmd) => {
198 let hash = cmd.inner.hash;
199 let entry = self.data.get(&hash);
200 let status = if let Some(entry) = entry {
201 BlobStatus::Complete {
202 size: entry.data.len() as u64,
203 }
204 } else {
205 BlobStatus::NotFound
206 };
207 cmd.tx.send(status).await.ok();
208 }
209 Command::ListTags(cmd) => {
210 cmd.tx.send(Vec::new()).await.ok();
211 }
212 Command::SetTag(cmd) => {
213 cmd.tx
214 .send(Err(io::Error::other("set tag not supported").into()))
215 .await
216 .ok();
217 }
218 Command::ListTempTags(cmd) => {
219 cmd.tx.send(Vec::new()).await.ok();
220 }
221 Command::SyncDb(cmd) => {
222 cmd.tx.send(Ok(())).await.ok();
223 }
224 Command::Shutdown(cmd) => {
225 return Some(cmd.tx);
226 }
227 Command::ExportRanges(cmd) => {
228 let entry = self.data.get(&cmd.inner.hash).cloned();
229 self.tasks.spawn(export_ranges(cmd, entry));
230 }
231 }
232 None
233 }
234
235 fn log_unit_task(&self, res: Result<(), JoinError>) {
236 if let Err(e) = res {
237 tracing::error!("task failed: {e}");
238 }
239 }
240
241 async fn run(mut self) {
242 loop {
243 tokio::select! {
244 Some(cmd) = self.commands.recv() => {
245 if let Some(shutdown) = self.handle_command(cmd).await {
246 shutdown.send(()).await.ok();
247 break;
248 }
249 },
250 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
251 self.log_unit_task(res);
252 if self.tasks.is_empty() {
253 for tx in self.idle_waiters.drain(..) {
255 tx.send(()).await.ok();
256 }
257 }
258 },
259 else => break,
260 }
261 }
262 }
263}
264
265async fn export_bao(
266 hash: Hash,
267 entry: Option<CompleteStorage>,
268 ranges: ChunkRanges,
269 mut sender: mpsc::Sender<EncodedItem>,
270) {
271 let entry = match entry {
272 Some(entry) => entry,
273 None => {
274 sender
275 .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(
276 io::Error::new(
277 io::ErrorKind::UnexpectedEof,
278 "export task ended unexpectedly",
279 ),
280 )))
281 .await
282 .ok();
283 return;
284 }
285 };
286 let data = entry.data;
287 let outboard = entry.outboard;
288 let size = data.as_ref().len() as u64;
289 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
290 let outboard = PreOrderMemOutboard {
291 root: hash.into(),
292 tree,
293 data: outboard,
294 };
295 let sender = BaoTreeSender::ref_cast_mut(&mut sender);
296 traverse_ranges_validated(data.as_ref(), outboard, &ranges, sender)
297 .await
298 .ok();
299}
300
301async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<CompleteStorage>) {
302 let Some(entry) = entry else {
303 cmd.tx
304 .send(ExportRangesItem::Error(api::Error::io(
305 io::ErrorKind::NotFound,
306 "hash not found",
307 )))
308 .await
309 .ok();
310 return;
311 };
312 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
313 cmd.tx
314 .send(ExportRangesItem::Error(cause.into()))
315 .await
316 .ok();
317 }
318}
319
320async fn export_ranges_impl(
321 cmd: ExportRangesRequest,
322 tx: &mut mpsc::Sender<ExportRangesItem>,
323 entry: CompleteStorage,
324) -> io::Result<()> {
325 let ExportRangesRequest { ranges, .. } = cmd;
326 let data = entry.data;
327 let size = data.len() as u64;
328 let bitfield = Bitfield::complete(size);
329 for range in ranges.iter() {
330 let range = match range {
331 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
332 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
333 };
334 let requested = ChunkRanges::bytes(range.start..range.end);
335 if !bitfield.ranges.is_superset(&requested) {
336 return Err(io::Error::other(format!(
337 "missing range: {requested:?}, present: {bitfield:?}",
338 )));
339 }
340 let bs = 1024;
341 let mut offset = range.start;
342 loop {
343 let end: u64 = (offset + bs).min(range.end);
344 let size = (end - offset) as usize;
345 tx.send(
346 Leaf {
347 offset,
348 data: data.read_bytes_at(offset, size)?,
349 }
350 .into(),
351 )
352 .await?;
353 offset = end;
354 if offset >= range.end {
355 break;
356 }
357 }
358 }
359 Ok(())
360}
361
362impl ReadonlyMemStore {
363 pub fn new(items: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
364 let mut entries = HashMap::new();
365 for item in items {
366 let data = Bytes::copy_from_slice(item.as_ref());
367 let (hash, entry) = CompleteStorage::create(data);
368 entries.insert(hash, entry);
369 }
370 let (sender, receiver) = tokio::sync::mpsc::channel(1);
371 let actor = Actor::new(receiver, entries);
372 tokio::spawn(actor.run());
373 let local = irpc::LocalSender::from(sender);
374 Self {
375 client: local.into(),
376 }
377 }
378}
379
380async fn export_path(
381 entry: Option<CompleteStorage>,
382 target: PathBuf,
383 mut tx: mpsc::Sender<ExportProgressItem>,
384) {
385 let Some(entry) = entry else {
386 tx.send(api::Error::io(io::ErrorKind::NotFound, "hash not found").into())
387 .await
388 .ok();
389 return;
390 };
391 match export_path_impl(entry, target, &mut tx).await {
392 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
393 Err(cause) => tx.send(api::Error::from(cause).into()).await.ok(),
394 };
395}
396
397async fn export_path_impl(
398 entry: CompleteStorage,
399 target: PathBuf,
400 tx: &mut mpsc::Sender<ExportProgressItem>,
401) -> io::Result<()> {
402 let data = entry.data;
403 let mut file = std::fs::File::create(&target)?;
405 let size = data.len() as u64;
406 tx.send(ExportProgressItem::Size(size)).await?;
407 let mut buf = [0u8; 1024 * 64];
408 for offset in (0..size).step_by(1024 * 64) {
409 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
410 let buf = &mut buf[..len];
411 data.as_ref().read_exact_at(offset, buf)?;
412 file.write_all(buf)?;
413 tx.try_send(ExportProgressItem::CopyProgress(offset))
414 .await
415 .map_err(|_e| io::Error::other("error"))?;
416 yield_now().await;
417 }
418 Ok(())
419}