iroh_blobs/store/
readonly_mem.rs

1//! Readonly in-memory store.
2//!
3//! This can only serve data that is provided at creation time. It is much simpler
4//! than the mutable in-memory store and the file system store, and can serve as a
5//! good starting point for custom implementations.
6//!
7//! It can also be useful as a lightweight store for tests.
8use std::{
9    collections::HashMap,
10    io::{self, Write},
11    ops::Deref,
12    path::PathBuf,
13};
14
15use bao_tree::{
16    io::{
17        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
18        outboard::PreOrderMemOutboard,
19        sync::ReadAt,
20        Leaf,
21    },
22    BaoTree, ChunkRanges,
23};
24use bytes::Bytes;
25use irpc::channel::mpsc;
26use n0_future::{
27    future::{self, yield_now},
28    task::{JoinError, JoinSet},
29};
30use range_collections::range_set::RangeSetRange;
31use ref_cast::RefCast;
32
33use super::util::BaoTreeSender;
34use crate::{
35    api::{
36        self,
37        blobs::{Bitfield, ExportProgressItem},
38        proto::{
39            self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
40            ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
41            ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
42            ObserveRequest, WaitIdleMsg,
43        },
44        ApiClient, TempTag,
45    },
46    protocol::ChunkRangesExt,
47    store::{mem::CompleteStorage, IROH_BLOCK_SIZE},
48    Hash,
49};
50
51#[derive(Debug, Clone)]
52pub struct ReadonlyMemStore {
53    client: ApiClient,
54}
55
56impl Deref for ReadonlyMemStore {
57    type Target = crate::api::Store;
58
59    fn deref(&self) -> &Self::Target {
60        crate::api::Store::ref_from_sender(&self.client)
61    }
62}
63
64impl From<ReadonlyMemStore> for crate::api::Store {
65    fn from(value: ReadonlyMemStore) -> Self {
66        crate::api::Store::from_sender(value.client)
67    }
68}
69
70impl AsRef<crate::api::Store> for ReadonlyMemStore {
71    fn as_ref(&self) -> &crate::api::Store {
72        crate::api::Store::ref_from_sender(&self.client)
73    }
74}
75
76struct Actor {
77    commands: tokio::sync::mpsc::Receiver<proto::Command>,
78    tasks: JoinSet<()>,
79    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
80    data: HashMap<Hash, CompleteStorage>,
81}
82
83impl Actor {
84    fn new(
85        commands: tokio::sync::mpsc::Receiver<proto::Command>,
86        data: HashMap<Hash, CompleteStorage>,
87    ) -> Self {
88        Self {
89            data,
90            commands,
91            tasks: JoinSet::new(),
92            idle_waiters: Vec::new(),
93        }
94    }
95
96    async fn handle_command(&mut self, cmd: Command) -> Option<irpc::channel::oneshot::Sender<()>> {
97        match cmd {
98            Command::ImportBao(ImportBaoMsg { tx, .. }) => {
99                tx.send(Err(unsupported("import not supported").into()))
100                    .await
101                    .ok();
102            }
103            Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
104                if self.tasks.is_empty() {
105                    // we are currently idle
106                    tx.send(()).await.ok();
107                } else {
108                    // wait for idle state
109                    self.idle_waiters.push(tx);
110                }
111            }
112            Command::ImportBytes(ImportBytesMsg { tx, .. }) => {
113                tx.send(unsupported("import not supported").into())
114                    .await
115                    .ok();
116            }
117            Command::ImportByteStream(ImportByteStreamMsg { tx, .. }) => {
118                tx.send(unsupported("import not supported").into())
119                    .await
120                    .ok();
121            }
122            Command::ImportPath(ImportPathMsg { tx, .. }) => {
123                tx.send(unsupported("import not supported").into())
124                    .await
125                    .ok();
126            }
127            Command::Observe(ObserveMsg {
128                inner: ObserveRequest { hash },
129                tx,
130                ..
131            }) => {
132                let size = self.data.get_mut(&hash).map(|x| x.data.len() as u64);
133                self.tasks.spawn(async move {
134                    if let Some(size) = size {
135                        tx.send(Bitfield::complete(size)).await.ok();
136                    } else {
137                        tx.send(Bitfield::empty()).await.ok();
138                        future::pending::<()>().await;
139                    };
140                });
141            }
142            Command::ExportBao(ExportBaoMsg {
143                inner: ExportBaoRequest { hash, ranges, .. },
144                tx,
145                ..
146            }) => {
147                let entry = self.data.get(&hash).cloned();
148                self.tasks.spawn(export_bao(hash, entry, ranges, tx));
149            }
150            Command::ExportPath(ExportPathMsg {
151                inner: ExportPathRequest { hash, target, .. },
152                tx,
153                ..
154            }) => {
155                let entry = self.data.get(&hash).cloned();
156                self.tasks.spawn(export_path(entry, target, tx));
157            }
158            Command::Batch(_cmd) => {}
159            Command::ClearProtected(cmd) => {
160                cmd.tx.send(Ok(())).await.ok();
161            }
162            Command::CreateTag(cmd) => {
163                cmd.tx
164                    .send(Err(unsupported("create tag not supported").into()))
165                    .await
166                    .ok();
167            }
168            Command::CreateTempTag(cmd) => {
169                cmd.tx.send(TempTag::new(cmd.inner.value, None)).await.ok();
170            }
171            Command::RenameTag(cmd) => {
172                cmd.tx
173                    .send(Err(unsupported("rename tag not supported").into()))
174                    .await
175                    .ok();
176            }
177            Command::DeleteTags(cmd) => {
178                cmd.tx
179                    .send(Err(unsupported("delete tags not supported").into()))
180                    .await
181                    .ok();
182            }
183            Command::DeleteBlobs(cmd) => {
184                cmd.tx
185                    .send(Err(unsupported("delete blobs not supported").into()))
186                    .await
187                    .ok();
188            }
189            Command::ListBlobs(cmd) => {
190                let hashes: Vec<Hash> = self.data.keys().cloned().collect();
191                self.tasks.spawn(async move {
192                    for hash in hashes {
193                        cmd.tx.send(Ok(hash)).await.ok();
194                    }
195                });
196            }
197            Command::BlobStatus(cmd) => {
198                let hash = cmd.inner.hash;
199                let entry = self.data.get(&hash);
200                let status = if let Some(entry) = entry {
201                    BlobStatus::Complete {
202                        size: entry.data.len() as u64,
203                    }
204                } else {
205                    BlobStatus::NotFound
206                };
207                cmd.tx.send(status).await.ok();
208            }
209            Command::ListTags(cmd) => {
210                cmd.tx.send(Vec::new()).await.ok();
211            }
212            Command::SetTag(cmd) => {
213                cmd.tx
214                    .send(Err(unsupported("set tag not supported").into()))
215                    .await
216                    .ok();
217            }
218            Command::ListTempTags(cmd) => {
219                cmd.tx.send(Vec::new()).await.ok();
220            }
221            Command::SyncDb(cmd) => {
222                cmd.tx.send(Ok(())).await.ok();
223            }
224            Command::Shutdown(cmd) => {
225                return Some(cmd.tx);
226            }
227            Command::ExportRanges(cmd) => {
228                let entry = self.data.get(&cmd.inner.hash).cloned();
229                self.tasks.spawn(export_ranges(cmd, entry));
230            }
231        }
232        None
233    }
234
235    fn log_unit_task(&self, res: Result<(), JoinError>) {
236        if let Err(e) = res {
237            tracing::error!("task failed: {e}");
238        }
239    }
240
241    async fn run(mut self) {
242        loop {
243            tokio::select! {
244                Some(cmd) = self.commands.recv() => {
245                    if let Some(shutdown) = self.handle_command(cmd).await {
246                        shutdown.send(()).await.ok();
247                        break;
248                    }
249                },
250                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
251                    self.log_unit_task(res);
252                    if self.tasks.is_empty() {
253                        // we are idle now
254                        for tx in self.idle_waiters.drain(..) {
255                            tx.send(()).await.ok();
256                        }
257                    }
258                },
259                else => break,
260            }
261        }
262    }
263}
264
265fn unsupported(text: &str) -> io::Error {
266    io::Error::new(io::ErrorKind::Unsupported, text)
267}
268
269async fn export_bao(
270    hash: Hash,
271    entry: Option<CompleteStorage>,
272    ranges: ChunkRanges,
273    mut sender: mpsc::Sender<EncodedItem>,
274) {
275    let entry = match entry {
276        Some(entry) => entry,
277        None => {
278            sender
279                .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(
280                    io::Error::new(
281                        io::ErrorKind::UnexpectedEof,
282                        "export task ended unexpectedly",
283                    ),
284                )))
285                .await
286                .ok();
287            return;
288        }
289    };
290    let data = entry.data;
291    let outboard = entry.outboard;
292    let size = data.as_ref().len() as u64;
293    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
294    let outboard = PreOrderMemOutboard {
295        root: hash.into(),
296        tree,
297        data: outboard,
298    };
299    let sender = BaoTreeSender::ref_cast_mut(&mut sender);
300    traverse_ranges_validated(data.as_ref(), outboard, &ranges, sender)
301        .await
302        .ok();
303}
304
305async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<CompleteStorage>) {
306    let Some(entry) = entry else {
307        cmd.tx
308            .send(ExportRangesItem::Error(api::Error::io(
309                io::ErrorKind::NotFound,
310                "hash not found",
311            )))
312            .await
313            .ok();
314        return;
315    };
316    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
317        cmd.tx
318            .send(ExportRangesItem::Error(cause.into()))
319            .await
320            .ok();
321    }
322}
323
324async fn export_ranges_impl(
325    cmd: ExportRangesRequest,
326    tx: &mut mpsc::Sender<ExportRangesItem>,
327    entry: CompleteStorage,
328) -> io::Result<()> {
329    let ExportRangesRequest { ranges, .. } = cmd;
330    let data = entry.data;
331    let size = data.len() as u64;
332    let bitfield = Bitfield::complete(size);
333    for range in ranges.iter() {
334        let range = match range {
335            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
336            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
337        };
338        let requested = ChunkRanges::bytes(range.start..range.end);
339        if !bitfield.ranges.is_superset(&requested) {
340            return Err(io::Error::other(format!(
341                "missing range: {requested:?}, present: {bitfield:?}",
342            )));
343        }
344        let bs = 1024;
345        let mut offset = range.start;
346        loop {
347            let end: u64 = (offset + bs).min(range.end);
348            let size = (end - offset) as usize;
349            tx.send(
350                Leaf {
351                    offset,
352                    data: data.read_bytes_at(offset, size)?,
353                }
354                .into(),
355            )
356            .await?;
357            offset = end;
358            if offset >= range.end {
359                break;
360            }
361        }
362    }
363    Ok(())
364}
365
366impl ReadonlyMemStore {
367    pub fn new(items: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
368        let mut entries = HashMap::new();
369        for item in items {
370            let data = Bytes::copy_from_slice(item.as_ref());
371            let (hash, entry) = CompleteStorage::create(data);
372            entries.insert(hash, entry);
373        }
374        let (sender, receiver) = tokio::sync::mpsc::channel(1);
375        let actor = Actor::new(receiver, entries);
376        n0_future::task::spawn(actor.run());
377        let local = irpc::LocalSender::from(sender);
378        Self {
379            client: local.into(),
380        }
381    }
382}
383
384async fn export_path(
385    entry: Option<CompleteStorage>,
386    target: PathBuf,
387    mut tx: mpsc::Sender<ExportProgressItem>,
388) {
389    let Some(entry) = entry else {
390        tx.send(api::Error::io(io::ErrorKind::NotFound, "hash not found").into())
391            .await
392            .ok();
393        return;
394    };
395    match export_path_impl(entry, target, &mut tx).await {
396        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
397        Err(cause) => tx.send(api::Error::from(cause).into()).await.ok(),
398    };
399}
400
401async fn export_path_impl(
402    entry: CompleteStorage,
403    target: PathBuf,
404    tx: &mut mpsc::Sender<ExportProgressItem>,
405) -> io::Result<()> {
406    let data = entry.data;
407    // todo: for partial entries make sure to only write the part that is actually present
408    let mut file = std::fs::File::create(&target)?;
409    let size = data.len() as u64;
410    tx.send(ExportProgressItem::Size(size)).await?;
411    let mut buf = [0u8; 1024 * 64];
412    for offset in (0..size).step_by(1024 * 64) {
413        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
414        let buf = &mut buf[..len];
415        data.as_ref().read_exact_at(offset, buf)?;
416        file.write_all(buf)?;
417        tx.try_send(ExportProgressItem::CopyProgress(offset))
418            .await?;
419        yield_now().await;
420    }
421    Ok(())
422}