iroh_blobs/api/blobs/
reader.rs

1use std::{
2    io::{self, ErrorKind, SeekFrom},
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use n0_future::StreamExt;
8
9use crate::{
10    api::{
11        blobs::{Blobs, ReaderOptions},
12        proto::ExportRangesItem,
13    },
14    Hash,
15};
16
17/// A reader for blobs that implements `AsyncRead` and `AsyncSeek`.
18#[derive(Debug)]
19pub struct BlobReader {
20    blobs: Blobs,
21    options: ReaderOptions,
22    state: ReaderState,
23}
24
25#[derive(Default, derive_more::Debug)]
26enum ReaderState {
27    Idle {
28        position: u64,
29    },
30    Seeking {
31        position: u64,
32    },
33    Reading {
34        position: u64,
35        #[debug(skip)]
36        op: n0_future::boxed::BoxStream<ExportRangesItem>,
37    },
38    #[default]
39    Poisoned,
40}
41
42impl BlobReader {
43    pub(super) fn new(blobs: Blobs, options: ReaderOptions) -> Self {
44        Self {
45            blobs,
46            options,
47            state: ReaderState::Idle { position: 0 },
48        }
49    }
50
51    pub fn hash(&self) -> &Hash {
52        &self.options.hash
53    }
54}
55
56impl tokio::io::AsyncRead for BlobReader {
57    fn poll_read(
58        self: Pin<&mut Self>,
59        cx: &mut Context<'_>,
60        buf: &mut tokio::io::ReadBuf<'_>,
61    ) -> Poll<io::Result<()>> {
62        let this = self.get_mut();
63        let mut position1 = None;
64        loop {
65            let guard = &mut this.state;
66            match std::mem::take(guard) {
67                ReaderState::Idle { position } => {
68                    // todo: read until next page boundary instead of fixed size
69                    let len = buf.remaining() as u64;
70                    let end = position.checked_add(len).ok_or_else(|| {
71                        io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading")
72                    })?;
73                    // start the export op for the entire size of the buffer, and convert to a stream
74                    let stream = this
75                        .blobs
76                        .export_ranges(this.options.hash, position..end)
77                        .stream();
78                    position1 = Some(position);
79                    *guard = ReaderState::Reading {
80                        position,
81                        op: Box::pin(stream),
82                    };
83                }
84                ReaderState::Reading { position, mut op } => {
85                    let position1 = position1.get_or_insert(position);
86                    match op.poll_next(cx) {
87                        Poll::Ready(Some(ExportRangesItem::Size(_))) => {
88                            *guard = ReaderState::Reading { position, op };
89                        }
90                        Poll::Ready(Some(ExportRangesItem::Data(data))) => {
91                            if data.offset != *position1 {
92                                break Poll::Ready(Err(io::Error::other(
93                                    "Data offset does not match expected position",
94                                )));
95                            }
96                            buf.put_slice(&data.data);
97                            // update just local position1, not the position in the state.
98                            *position1 =
99                                position1
100                                    .checked_add(data.data.len() as u64)
101                                    .ok_or_else(|| {
102                                        io::Error::new(ErrorKind::InvalidInput, "Position overflow")
103                                    })?;
104                            *guard = ReaderState::Reading { position, op };
105                        }
106                        Poll::Ready(Some(ExportRangesItem::Error(err))) => {
107                            *guard = ReaderState::Idle { position };
108                            break Poll::Ready(Err(io::Error::other(format!(
109                                "Error reading data: {err}"
110                            ))));
111                        }
112                        Poll::Ready(None) => {
113                            // done with the stream, go back in idle.
114                            *guard = ReaderState::Idle {
115                                position: *position1,
116                            };
117                            break Poll::Ready(Ok(()));
118                        }
119                        Poll::Pending => {
120                            break if position != *position1 {
121                                // we read some data so we need to abort the op.
122                                //
123                                // we can't be sure we won't be called with the same buf size next time.
124                                *guard = ReaderState::Idle {
125                                    position: *position1,
126                                };
127                                Poll::Ready(Ok(()))
128                            } else {
129                                // nothing was read yet, we remain in the reading state
130                                //
131                                // we make an assumption here that the next call will be with the same buf size.
132                                *guard = ReaderState::Reading {
133                                    position: *position1,
134                                    op,
135                                };
136                                Poll::Pending
137                            };
138                        }
139                    }
140                }
141                state @ ReaderState::Seeking { .. } => {
142                    // should I try to recover from this or just keep it poisoned?
143                    this.state = state;
144                    break Poll::Ready(Err(io::Error::other("Can't read while seeking")));
145                }
146                ReaderState::Poisoned => {
147                    break Poll::Ready(Err(io::Error::other("Reader is poisoned")));
148                }
149            };
150        }
151    }
152}
153
154impl tokio::io::AsyncSeek for BlobReader {
155    fn start_seek(
156        self: std::pin::Pin<&mut Self>,
157        seek_from: tokio::io::SeekFrom,
158    ) -> io::Result<()> {
159        let this = self.get_mut();
160        let guard = &mut this.state;
161        match std::mem::take(guard) {
162            ReaderState::Idle { position } => {
163                let position1 = match seek_from {
164                    SeekFrom::Start(pos) => pos,
165                    SeekFrom::Current(offset) => {
166                        position.checked_add_signed(offset).ok_or_else(|| {
167                            io::Error::new(
168                                ErrorKind::InvalidInput,
169                                "Position overflow when seeking",
170                            )
171                        })?
172                    }
173                    SeekFrom::End(_offset) => {
174                        // todo: support seeking from end if we know the size
175                        return Err(io::Error::new(
176                            ErrorKind::InvalidInput,
177                            "Seeking from end is not supported yet",
178                        ))?;
179                    }
180                };
181                *guard = ReaderState::Seeking {
182                    position: position1,
183                };
184                Ok(())
185            }
186            ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
187            ReaderState::Seeking { .. } => Err(io::Error::other("Already seeking")),
188            ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
189        }
190    }
191
192    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
193        let this = self.get_mut();
194        let guard = &mut this.state;
195        Poll::Ready(match std::mem::take(guard) {
196            ReaderState::Seeking { position } => {
197                *guard = ReaderState::Idle { position };
198                Ok(position)
199            }
200            ReaderState::Idle { position } => {
201                // seek calls poll_complete just in case, to finish a pending seek operation
202                // before the next seek operation. So it is poll_complete/start_seek/poll_complete
203                *guard = ReaderState::Idle { position };
204                Ok(position)
205            }
206            state @ ReaderState::Reading { .. } => {
207                // should I try to recover from this or just keep it poisoned?
208                *guard = state;
209                Err(io::Error::other("Can't seek while reading"))
210            }
211            ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
212        })
213    }
214}
215
216#[cfg(test)]
217#[cfg(feature = "fs-store")]
218mod tests {
219    use bao_tree::ChunkRanges;
220    use testresult::TestResult;
221    use tokio::io::{AsyncReadExt, AsyncSeekExt};
222
223    use super::*;
224    use crate::{
225        protocol::ChunkRangesExt,
226        store::{
227            fs::{
228                tests::{test_data, INTERESTING_SIZES},
229                FsStore,
230            },
231            mem::MemStore,
232            util::tests::create_n0_bao,
233        },
234    };
235
236    async fn reader_smoke(blobs: &Blobs) -> TestResult<()> {
237        for size in INTERESTING_SIZES {
238            let data = test_data(size);
239            let tag = blobs.add_bytes(data.clone()).await?;
240            // read all
241            {
242                let mut reader = blobs.reader(tag.hash);
243                let mut buf = Vec::new();
244                reader.read_to_end(&mut buf).await?;
245                assert_eq!(buf, data);
246                let pos = reader.stream_position().await?;
247                assert_eq!(pos, data.len() as u64);
248            }
249            // seek to mid and read all
250            {
251                let mut reader = blobs.reader(tag.hash);
252                let mid = size / 2;
253                reader.seek(SeekFrom::Start(mid as u64)).await?;
254                let mut buf = Vec::new();
255                reader.read_to_end(&mut buf).await?;
256                assert_eq!(buf, data[mid..].to_vec());
257                let pos = reader.stream_position().await?;
258                assert_eq!(pos, data.len() as u64);
259            }
260        }
261        Ok(())
262    }
263
264    async fn reader_partial(blobs: &Blobs) -> TestResult<()> {
265        for size in INTERESTING_SIZES {
266            let data = test_data(size);
267            let ranges = ChunkRanges::chunk(0);
268            let (hash, bao) = create_n0_bao(&data, &ranges)?;
269            println!("importing {} bytes", bao.len());
270            blobs.import_bao_bytes(hash, ranges.clone(), bao).await?;
271            // read the first chunk or the entire blob, whatever is smaller
272            // this should work!
273            {
274                let mut reader = blobs.reader(hash);
275                let valid = size.min(1024);
276                let mut buf = vec![0u8; valid];
277                reader.read_exact(&mut buf).await?;
278                assert_eq!(buf, data[..valid]);
279                let pos = reader.stream_position().await?;
280                assert_eq!(pos, valid as u64);
281            }
282            if size > 1024 {
283                // read the part we don't have - should immediately return an error
284                {
285                    let mut reader = blobs.reader(hash);
286                    let mut rest = vec![0u8; size - 1024];
287                    reader.seek(SeekFrom::Start(1024)).await?;
288                    let res = reader.read_exact(&mut rest).await;
289                    assert!(res.is_err());
290                }
291                // read crossing the end of the blob - should return an error despite
292                // the first bytes being valid.
293                // A read that fails should not update the stream position.
294                {
295                    let mut reader = blobs.reader(hash);
296                    let mut buf = vec![0u8; size];
297                    let res = reader.read(&mut buf).await;
298                    assert!(res.is_err());
299                    let pos = reader.stream_position().await?;
300                    assert_eq!(pos, 0);
301                }
302            }
303        }
304        Ok(())
305    }
306
307    #[tokio::test]
308    async fn reader_partial_fs() -> TestResult<()> {
309        let testdir = tempfile::tempdir()?;
310        let store = FsStore::load(testdir.path().to_owned()).await?;
311        reader_partial(store.blobs()).await?;
312        Ok(())
313    }
314
315    #[tokio::test]
316    async fn reader_partial_memory() -> TestResult<()> {
317        let store = MemStore::new();
318        reader_partial(store.blobs()).await?;
319        Ok(())
320    }
321
322    #[tokio::test]
323    async fn reader_smoke_fs() -> TestResult<()> {
324        let testdir = tempfile::tempdir()?;
325        let store = FsStore::load(testdir.path().to_owned()).await?;
326        reader_smoke(store.blobs()).await?;
327        Ok(())
328    }
329
330    #[tokio::test]
331    async fn reader_smoke_memory() -> TestResult<()> {
332        let store = MemStore::new();
333        reader_smoke(store.blobs()).await?;
334        Ok(())
335    }
336}