iroh_blobs/store/
readonly_mem.rs

1//! Readonly in-memory store.
2//!
3//! This can only serve data that is provided at creation time. It is much simpler
4//! than the mutable in-memory store and the file system store, and can serve as a
5//! good starting point for custom implementations.
6//!
7//! It can also be useful as a lightweight store for tests.
8use std::{
9    collections::HashMap,
10    io::{self, Write},
11    ops::Deref,
12    path::PathBuf,
13};
14
15use bao_tree::{
16    io::{
17        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
18        outboard::PreOrderMemOutboard,
19        sync::ReadAt,
20        Leaf,
21    },
22    BaoTree, ChunkRanges,
23};
24use bytes::Bytes;
25use irpc::channel::mpsc;
26use n0_future::{
27    future::{self, yield_now},
28    task::{JoinError, JoinSet},
29};
30use range_collections::range_set::RangeSetRange;
31use ref_cast::RefCast;
32
33use super::util::BaoTreeSender;
34use crate::{
35    api::{
36        self,
37        blobs::{Bitfield, ExportProgressItem},
38        proto::{
39            self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
40            ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
41            ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
42            ObserveRequest, WaitIdleMsg,
43        },
44        ApiClient, TempTag,
45    },
46    protocol::ChunkRangesExt,
47    store::{mem::CompleteStorage, IROH_BLOCK_SIZE},
48    Hash,
49};
50
51#[derive(Debug, Clone)]
52pub struct ReadonlyMemStore {
53    client: ApiClient,
54}
55
56impl Deref for ReadonlyMemStore {
57    type Target = crate::api::Store;
58
59    fn deref(&self) -> &Self::Target {
60        crate::api::Store::ref_from_sender(&self.client)
61    }
62}
63
64impl From<ReadonlyMemStore> for crate::api::Store {
65    fn from(value: ReadonlyMemStore) -> Self {
66        crate::api::Store::from_sender(value.client)
67    }
68}
69
70impl AsRef<crate::api::Store> for ReadonlyMemStore {
71    fn as_ref(&self) -> &crate::api::Store {
72        crate::api::Store::ref_from_sender(&self.client)
73    }
74}
75
76struct Actor {
77    commands: tokio::sync::mpsc::Receiver<proto::Command>,
78    tasks: JoinSet<()>,
79    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
80    data: HashMap<Hash, CompleteStorage>,
81}
82
83impl Actor {
84    fn new(
85        commands: tokio::sync::mpsc::Receiver<proto::Command>,
86        data: HashMap<Hash, CompleteStorage>,
87    ) -> Self {
88        Self {
89            data,
90            commands,
91            tasks: JoinSet::new(),
92            idle_waiters: Vec::new(),
93        }
94    }
95
96    async fn handle_command(&mut self, cmd: Command) -> Option<irpc::channel::oneshot::Sender<()>> {
97        match cmd {
98            Command::ImportBao(ImportBaoMsg { tx, .. }) => {
99                tx.send(Err(api::Error::Io(io::Error::other(
100                    "import not supported",
101                ))))
102                .await
103                .ok();
104            }
105            Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
106                if self.tasks.is_empty() {
107                    // we are currently idle
108                    tx.send(()).await.ok();
109                } else {
110                    // wait for idle state
111                    self.idle_waiters.push(tx);
112                }
113            }
114            Command::ImportBytes(ImportBytesMsg { tx, .. }) => {
115                tx.send(io::Error::other("import not supported").into())
116                    .await
117                    .ok();
118            }
119            Command::ImportByteStream(ImportByteStreamMsg { tx, .. }) => {
120                tx.send(io::Error::other("import not supported").into())
121                    .await
122                    .ok();
123            }
124            Command::ImportPath(ImportPathMsg { tx, .. }) => {
125                tx.send(io::Error::other("import not supported").into())
126                    .await
127                    .ok();
128            }
129            Command::Observe(ObserveMsg {
130                inner: ObserveRequest { hash },
131                tx,
132                ..
133            }) => {
134                let size = self.data.get_mut(&hash).map(|x| x.data.len() as u64);
135                self.tasks.spawn(async move {
136                    if let Some(size) = size {
137                        tx.send(Bitfield::complete(size)).await.ok();
138                    } else {
139                        tx.send(Bitfield::empty()).await.ok();
140                        future::pending::<()>().await;
141                    };
142                });
143            }
144            Command::ExportBao(ExportBaoMsg {
145                inner: ExportBaoRequest { hash, ranges, .. },
146                tx,
147                ..
148            }) => {
149                let entry = self.data.get(&hash).cloned();
150                self.tasks.spawn(export_bao(hash, entry, ranges, tx));
151            }
152            Command::ExportPath(ExportPathMsg {
153                inner: ExportPathRequest { hash, target, .. },
154                tx,
155                ..
156            }) => {
157                let entry = self.data.get(&hash).cloned();
158                self.tasks.spawn(export_path(entry, target, tx));
159            }
160            Command::Batch(_cmd) => {}
161            Command::ClearProtected(cmd) => {
162                cmd.tx.send(Ok(())).await.ok();
163            }
164            Command::CreateTag(cmd) => {
165                cmd.tx
166                    .send(Err(io::Error::other("create tag not supported").into()))
167                    .await
168                    .ok();
169            }
170            Command::CreateTempTag(cmd) => {
171                cmd.tx.send(TempTag::new(cmd.inner.value, None)).await.ok();
172            }
173            Command::RenameTag(cmd) => {
174                cmd.tx
175                    .send(Err(io::Error::other("rename tag not supported").into()))
176                    .await
177                    .ok();
178            }
179            Command::DeleteTags(cmd) => {
180                cmd.tx
181                    .send(Err(io::Error::other("delete tags not supported").into()))
182                    .await
183                    .ok();
184            }
185            Command::DeleteBlobs(cmd) => {
186                cmd.tx
187                    .send(Err(io::Error::other("delete blobs not supported").into()))
188                    .await
189                    .ok();
190            }
191            Command::ListBlobs(cmd) => {
192                let hashes: Vec<Hash> = self.data.keys().cloned().collect();
193                self.tasks.spawn(async move {
194                    for hash in hashes {
195                        cmd.tx.send(Ok(hash)).await.ok();
196                    }
197                });
198            }
199            Command::BlobStatus(cmd) => {
200                let hash = cmd.inner.hash;
201                let entry = self.data.get(&hash);
202                let status = if let Some(entry) = entry {
203                    BlobStatus::Complete {
204                        size: entry.data.len() as u64,
205                    }
206                } else {
207                    BlobStatus::NotFound
208                };
209                cmd.tx.send(status).await.ok();
210            }
211            Command::ListTags(cmd) => {
212                cmd.tx.send(Vec::new()).await.ok();
213            }
214            Command::SetTag(cmd) => {
215                cmd.tx
216                    .send(Err(io::Error::other("set tag not supported").into()))
217                    .await
218                    .ok();
219            }
220            Command::ListTempTags(cmd) => {
221                cmd.tx.send(Vec::new()).await.ok();
222            }
223            Command::SyncDb(cmd) => {
224                cmd.tx.send(Ok(())).await.ok();
225            }
226            Command::Shutdown(cmd) => {
227                return Some(cmd.tx);
228            }
229            Command::ExportRanges(cmd) => {
230                let entry = self.data.get(&cmd.inner.hash).cloned();
231                self.tasks.spawn(export_ranges(cmd, entry));
232            }
233        }
234        None
235    }
236
237    fn log_unit_task(&self, res: Result<(), JoinError>) {
238        if let Err(e) = res {
239            tracing::error!("task failed: {e}");
240        }
241    }
242
243    async fn run(mut self) {
244        loop {
245            tokio::select! {
246                Some(cmd) = self.commands.recv() => {
247                    if let Some(shutdown) = self.handle_command(cmd).await {
248                        shutdown.send(()).await.ok();
249                        break;
250                    }
251                },
252                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
253                    self.log_unit_task(res);
254                    if self.tasks.is_empty() {
255                        // we are idle now
256                        for tx in self.idle_waiters.drain(..) {
257                            tx.send(()).await.ok();
258                        }
259                    }
260                },
261                else => break,
262            }
263        }
264    }
265}
266
267async fn export_bao(
268    hash: Hash,
269    entry: Option<CompleteStorage>,
270    ranges: ChunkRanges,
271    mut sender: mpsc::Sender<EncodedItem>,
272) {
273    let entry = match entry {
274        Some(entry) => entry,
275        None => {
276            sender
277                .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(
278                    io::Error::new(
279                        io::ErrorKind::UnexpectedEof,
280                        "export task ended unexpectedly",
281                    ),
282                )))
283                .await
284                .ok();
285            return;
286        }
287    };
288    let data = entry.data;
289    let outboard = entry.outboard;
290    let size = data.as_ref().len() as u64;
291    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
292    let outboard = PreOrderMemOutboard {
293        root: hash.into(),
294        tree,
295        data: outboard,
296    };
297    let sender = BaoTreeSender::ref_cast_mut(&mut sender);
298    traverse_ranges_validated(data.as_ref(), outboard, &ranges, sender)
299        .await
300        .ok();
301}
302
303async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<CompleteStorage>) {
304    let Some(entry) = entry else {
305        cmd.tx
306            .send(ExportRangesItem::Error(api::Error::io(
307                io::ErrorKind::NotFound,
308                "hash not found",
309            )))
310            .await
311            .ok();
312        return;
313    };
314    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
315        cmd.tx
316            .send(ExportRangesItem::Error(cause.into()))
317            .await
318            .ok();
319    }
320}
321
322async fn export_ranges_impl(
323    cmd: ExportRangesRequest,
324    tx: &mut mpsc::Sender<ExportRangesItem>,
325    entry: CompleteStorage,
326) -> io::Result<()> {
327    let ExportRangesRequest { ranges, .. } = cmd;
328    let data = entry.data;
329    let size = data.len() as u64;
330    let bitfield = Bitfield::complete(size);
331    for range in ranges.iter() {
332        let range = match range {
333            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
334            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
335        };
336        let requested = ChunkRanges::bytes(range.start..range.end);
337        if !bitfield.ranges.is_superset(&requested) {
338            return Err(io::Error::other(format!(
339                "missing range: {requested:?}, present: {bitfield:?}",
340            )));
341        }
342        let bs = 1024;
343        let mut offset = range.start;
344        loop {
345            let end: u64 = (offset + bs).min(range.end);
346            let size = (end - offset) as usize;
347            tx.send(
348                Leaf {
349                    offset,
350                    data: data.read_bytes_at(offset, size)?,
351                }
352                .into(),
353            )
354            .await?;
355            offset = end;
356            if offset >= range.end {
357                break;
358            }
359        }
360    }
361    Ok(())
362}
363
364impl ReadonlyMemStore {
365    pub fn new(items: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
366        let mut entries = HashMap::new();
367        for item in items {
368            let data = Bytes::copy_from_slice(item.as_ref());
369            let (hash, entry) = CompleteStorage::create(data);
370            entries.insert(hash, entry);
371        }
372        let (sender, receiver) = tokio::sync::mpsc::channel(1);
373        let actor = Actor::new(receiver, entries);
374        n0_future::task::spawn(actor.run());
375        let local = irpc::LocalSender::from(sender);
376        Self {
377            client: local.into(),
378        }
379    }
380}
381
382async fn export_path(
383    entry: Option<CompleteStorage>,
384    target: PathBuf,
385    mut tx: mpsc::Sender<ExportProgressItem>,
386) {
387    let Some(entry) = entry else {
388        tx.send(api::Error::io(io::ErrorKind::NotFound, "hash not found").into())
389            .await
390            .ok();
391        return;
392    };
393    match export_path_impl(entry, target, &mut tx).await {
394        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
395        Err(cause) => tx.send(api::Error::from(cause).into()).await.ok(),
396    };
397}
398
399async fn export_path_impl(
400    entry: CompleteStorage,
401    target: PathBuf,
402    tx: &mut mpsc::Sender<ExportProgressItem>,
403) -> io::Result<()> {
404    let data = entry.data;
405    // todo: for partial entries make sure to only write the part that is actually present
406    let mut file = std::fs::File::create(&target)?;
407    let size = data.len() as u64;
408    tx.send(ExportProgressItem::Size(size)).await?;
409    let mut buf = [0u8; 1024 * 64];
410    for offset in (0..size).step_by(1024 * 64) {
411        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
412        let buf = &mut buf[..len];
413        data.as_ref().read_exact_at(offset, buf)?;
414        file.write_all(buf)?;
415        tx.try_send(ExportProgressItem::CopyProgress(offset))
416            .await
417            .map_err(|_e| io::Error::other("error"))?;
418        yield_now().await;
419    }
420    Ok(())
421}