iroh_blobs/store/
util.rs

1use std::{borrow::Borrow, fmt};
2
3use bao_tree::io::mixed::EncodedItem;
4use bytes::Bytes;
5use derive_more::{From, Into};
6use n0_future::time::SystemTime;
7
8mod sparse_mem_file;
9use irpc::channel::mpsc;
10use range_collections::{range_set::RangeSetEntry, RangeSetRef};
11use ref_cast::RefCast;
12use serde::{Deserialize, Serialize};
13pub use sparse_mem_file::SparseMemFile;
14pub mod observer;
15mod size_info;
16pub use size_info::SizeInfo;
17mod partial_mem_storage;
18pub use partial_mem_storage::PartialMemStorage;
19
20#[cfg(feature = "fs-store")]
21mod mem_or_file;
22#[cfg(feature = "fs-store")]
23pub use mem_or_file::{FixedSize, MemOrFile};
24
25/// A named, persistent tag.
26#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, From, Into)]
27pub struct Tag(pub Bytes);
28
29impl From<&[u8]> for Tag {
30    fn from(value: &[u8]) -> Self {
31        Self(Bytes::copy_from_slice(value))
32    }
33}
34
35impl AsRef<[u8]> for Tag {
36    fn as_ref(&self) -> &[u8] {
37        self.0.as_ref()
38    }
39}
40
41impl Borrow<[u8]> for Tag {
42    fn borrow(&self) -> &[u8] {
43        self.0.as_ref()
44    }
45}
46
47impl From<String> for Tag {
48    fn from(value: String) -> Self {
49        Self(Bytes::from(value))
50    }
51}
52
53impl From<&str> for Tag {
54    fn from(value: &str) -> Self {
55        Self(Bytes::from(value.to_owned()))
56    }
57}
58
59impl fmt::Display for Tag {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        let bytes = self.0.as_ref();
62        match std::str::from_utf8(bytes) {
63            Ok(s) => write!(f, "\"{s}\""),
64            Err(_) => write!(f, "{}", hex::encode(bytes)),
65        }
66    }
67}
68
69impl Tag {
70    /// Create a new tag that does not exist yet.
71    pub fn auto(time: SystemTime, exists: impl Fn(&[u8]) -> bool) -> Self {
72        // On wasm, SystemTime is web_time::SystemTime, but we need a std system time
73        // to convert to chrono.
74        // TODO: Upstream to n0-future or expose SystemTimeExt on wasm
75        #[cfg(wasm_browser)]
76        let time = std::time::SystemTime::UNIX_EPOCH
77            + time.duration_since(SystemTime::UNIX_EPOCH).unwrap();
78
79        let now = chrono::DateTime::<chrono::Utc>::from(time);
80        let mut i = 0;
81        loop {
82            let mut text = format!("auto-{}", now.format("%Y-%m-%dT%H:%M:%S%.3fZ"));
83            if i != 0 {
84                text.push_str(&format!("-{i}"));
85            }
86            if !exists(text.as_bytes()) {
87                return Self::from(text);
88            }
89            i += 1;
90        }
91    }
92
93    /// The successor of this tag in lexicographic order.
94    pub fn successor(&self) -> Self {
95        let mut bytes = self.0.to_vec();
96        // increment_vec(&mut bytes);
97        bytes.push(0);
98        Self(bytes.into())
99    }
100
101    /// If this is a prefix, get the next prefix.
102    ///
103    /// This is like successor, except that it will return None if the prefix is all 0xFF instead of appending a 0 byte.
104    pub fn next_prefix(&self) -> Option<Self> {
105        let mut bytes = self.0.to_vec();
106        if next_prefix(&mut bytes) {
107            Some(Self(bytes.into()))
108        } else {
109            None
110        }
111    }
112}
113
114pub struct DD<T: fmt::Display>(pub T);
115
116impl<T: fmt::Display> fmt::Debug for DD<T> {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        fmt::Display::fmt(&self.0, f)
119    }
120}
121
122impl fmt::Debug for Tag {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        f.debug_tuple("Tag").field(&DD(self)).finish()
125    }
126}
127
128pub(crate) fn limited_range(offset: u64, len: usize, buf_len: usize) -> std::ops::Range<usize> {
129    if offset < buf_len as u64 {
130        let start = offset as usize;
131        let end = start.saturating_add(len).min(buf_len);
132        start..end
133    } else {
134        0..0
135    }
136}
137
138/// zero copy get a limited slice from a `Bytes` as a `Bytes`.
139#[allow(dead_code)]
140pub(crate) fn get_limited_slice(bytes: &Bytes, offset: u64, len: usize) -> Bytes {
141    bytes.slice(limited_range(offset, len, bytes.len()))
142}
143
144pub trait RangeSetExt<T> {
145    fn upper_bound(&self) -> Option<T>;
146}
147
148impl<T: RangeSetEntry + Clone> RangeSetExt<T> for RangeSetRef<T> {
149    /// The upper (exclusive) bound of the bitfield
150    fn upper_bound(&self) -> Option<T> {
151        let boundaries = self.boundaries();
152        if boundaries.is_empty() {
153            Some(RangeSetEntry::min_value())
154        } else if boundaries.len() % 2 == 0 {
155            Some(boundaries[boundaries.len() - 1].clone())
156        } else {
157            None
158        }
159    }
160}
161
162#[cfg(feature = "fs-store")]
163mod fs {
164    use std::{
165        fmt,
166        fs::{File, OpenOptions},
167        io::{self, Read, Write},
168        path::Path,
169    };
170
171    use arrayvec::ArrayString;
172    use bao_tree::blake3;
173    use serde::{de::DeserializeOwned, Serialize};
174
175    mod redb_support {
176        use bytes::Bytes;
177        use redb::{Key as RedbKey, Value as RedbValue};
178
179        use super::super::Tag;
180
181        impl RedbValue for Tag {
182            type SelfType<'a> = Self;
183
184            type AsBytes<'a> = bytes::Bytes;
185
186            fn fixed_width() -> Option<usize> {
187                None
188            }
189
190            fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
191            where
192                Self: 'a,
193            {
194                Self(Bytes::copy_from_slice(data))
195            }
196
197            fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
198            where
199                Self: 'a,
200                Self: 'b,
201            {
202                value.0.clone()
203            }
204
205            fn type_name() -> redb::TypeName {
206                redb::TypeName::new("Tag")
207            }
208        }
209
210        impl RedbKey for Tag {
211            fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
212                data1.cmp(data2)
213            }
214        }
215    }
216
217    pub fn write_checksummed<P: AsRef<Path>, T: Serialize>(path: P, data: &T) -> io::Result<()> {
218        // Build Vec with space for hash
219        let mut buffer = Vec::with_capacity(32 + 128);
220        buffer.extend_from_slice(&[0u8; 32]);
221
222        // Serialize directly into buffer
223        postcard::to_io(data, &mut buffer).map_err(io::Error::other)?;
224
225        // Compute hash over data (skip first 32 bytes)
226        let data_slice = &buffer[32..];
227        let hash = blake3::hash(data_slice);
228        buffer[..32].copy_from_slice(hash.as_bytes());
229
230        // Write all at once
231        let mut file = File::create(&path)?;
232        file.write_all(&buffer)?;
233        file.sync_all()?;
234
235        Ok(())
236    }
237
238    pub fn read_checksummed_and_truncate<T: DeserializeOwned>(
239        path: impl AsRef<Path>,
240    ) -> io::Result<T> {
241        let path = path.as_ref();
242        let mut file = OpenOptions::new()
243            .read(true)
244            .write(true)
245            .truncate(false)
246            .open(path)?;
247        let mut buffer = Vec::new();
248        file.read_to_end(&mut buffer)?;
249        file.set_len(0)?;
250        file.sync_all()?;
251
252        if buffer.is_empty() {
253            return Err(io::Error::new(
254                io::ErrorKind::InvalidData,
255                "File marked dirty",
256            ));
257        }
258
259        if buffer.len() < 32 {
260            return Err(io::Error::new(io::ErrorKind::InvalidData, "File too short"));
261        }
262
263        let stored_hash = &buffer[..32];
264        let data = &buffer[32..];
265
266        let computed_hash = blake3::hash(data);
267        if computed_hash.as_bytes() != stored_hash {
268            return Err(io::Error::new(io::ErrorKind::InvalidData, "Hash mismatch"));
269        }
270
271        let deserialized = postcard::from_bytes(data).map_err(io::Error::other)?;
272
273        Ok(deserialized)
274    }
275
276    #[cfg(test)]
277    pub fn read_checksummed<T: DeserializeOwned>(path: impl AsRef<Path>) -> io::Result<T> {
278        use std::{fs::File, io::Read};
279
280        use bao_tree::blake3;
281        use tracing::info;
282
283        let path = path.as_ref();
284        let mut file = File::open(path)?;
285        let mut buffer = Vec::new();
286        file.read_to_end(&mut buffer)?;
287        info!("{} {}", path.display(), hex::encode(&buffer));
288
289        if buffer.is_empty() {
290            use std::io;
291
292            return Err(io::Error::new(
293                io::ErrorKind::InvalidData,
294                "File marked dirty",
295            ));
296        }
297
298        if buffer.len() < 32 {
299            return Err(io::Error::new(io::ErrorKind::InvalidData, "File too short"));
300        }
301
302        let stored_hash = &buffer[..32];
303        let data = &buffer[32..];
304
305        let computed_hash = blake3::hash(data);
306        if computed_hash.as_bytes() != stored_hash {
307            return Err(io::Error::new(io::ErrorKind::InvalidData, "Hash mismatch"));
308        }
309
310        let deserialized = postcard::from_bytes(data).map_err(io::Error::other)?;
311
312        Ok(deserialized)
313    }
314
315    /// Helper trait for bytes for debugging
316    pub trait SliceInfoExt: AsRef<[u8]> {
317        // get the addr of the actual data, to check if data was copied
318        fn addr(&self) -> usize;
319
320        // a short symbol string for the address
321        fn addr_short(&self) -> ArrayString<12> {
322            let addr = self.addr().to_le_bytes();
323            symbol_string(&addr)
324        }
325
326        #[allow(dead_code)]
327        fn hash_short(&self) -> ArrayString<10> {
328            crate::Hash::new(self.as_ref()).fmt_short()
329        }
330    }
331
332    impl<T: AsRef<[u8]>> SliceInfoExt for T {
333        fn addr(&self) -> usize {
334            self.as_ref() as *const [u8] as *const u8 as usize
335        }
336
337        fn hash_short(&self) -> ArrayString<10> {
338            crate::Hash::new(self.as_ref()).fmt_short()
339        }
340    }
341
342    pub fn symbol_string(data: &[u8]) -> ArrayString<12> {
343        const SYMBOLS: &[char] = &[
344            '😀', '😂', '😍', '😎', '😢', '😡', '😱', '😴', '🤓', '🤔', '🤗', '🤢', '🤡', '🤖',
345            '👽', '👾', '👻', '💀', '💩', '♥', '💥', '💦', '💨', '💫', '💬', '💭', '💰', '💳',
346            '💼', '📈', '📉', '📍', '📢', '📦', '📱', '📷', '📺', '🎃', '🎄', '🎉', '🎋', '🎍',
347            '🎒', '🎓', '🎖', '🎤', '🎧', '🎮', '🎰', '🎲', '🎳', '🎴', '🎵', '🎷', '🎸', '🎹',
348            '🎺', '🎻', '🎼', '🏀', '🏁', '🏆', '🏈',
349        ];
350        const BASE: usize = SYMBOLS.len(); // 64
351
352        // Hash the input with BLAKE3
353        let hash = blake3::hash(data);
354        let bytes = hash.as_bytes(); // 32-byte hash
355
356        // Create an ArrayString with capacity 12 (bytes)
357        let mut result = ArrayString::<12>::new();
358
359        // Fill with 3 symbols
360        for byte in bytes.iter().take(3) {
361            let byte = *byte as usize;
362            let index = byte % BASE;
363            result.push(SYMBOLS[index]); // Each char can be up to 4 bytes
364        }
365
366        result
367    }
368
369    pub struct ValueOrPoisioned<T>(pub Option<T>);
370
371    impl<T: fmt::Debug> fmt::Debug for ValueOrPoisioned<T> {
372        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373            match &self.0 {
374                Some(x) => x.fmt(f),
375                None => f.debug_tuple("Poisoned").finish(),
376            }
377        }
378    }
379}
380#[cfg(feature = "fs-store")]
381pub use fs::*;
382
383/// Given a prefix, increment it lexographically.
384///
385/// If the prefix is all FF, this will return false because there is no
386/// higher prefix than that.
387#[allow(dead_code)]
388pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool {
389    for byte in bytes.iter_mut().rev() {
390        if *byte < 255 {
391            *byte += 1;
392            return true;
393        }
394        *byte = 0;
395    }
396    false
397}
398
399#[derive(ref_cast::RefCast)]
400#[repr(transparent)]
401pub struct BaoTreeSender(mpsc::Sender<EncodedItem>);
402
403impl BaoTreeSender {
404    pub fn new(sender: &mut mpsc::Sender<EncodedItem>) -> &mut Self {
405        BaoTreeSender::ref_cast_mut(sender)
406    }
407}
408
409impl bao_tree::io::mixed::Sender for BaoTreeSender {
410    type Error = irpc::channel::SendError;
411    async fn send(&mut self, item: EncodedItem) -> std::result::Result<(), Self::Error> {
412        self.0.send(item).await
413    }
414}
415
416#[cfg(test)]
417#[cfg(feature = "fs-store")]
418pub mod tests {
419    use bao_tree::{io::outboard::PreOrderMemOutboard, ChunkRanges};
420
421    use crate::{hash::Hash, store::IROH_BLOCK_SIZE};
422
423    /// Create n0 flavoured bao. Note that this can be used to request ranges below a chunk group size,
424    /// which can not be exported via bao because we don't store hashes below the chunk group level.
425    pub fn create_n0_bao(data: &[u8], ranges: &ChunkRanges) -> anyhow::Result<(Hash, Vec<u8>)> {
426        let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
427        let mut encoded = Vec::new();
428        let size = data.len() as u64;
429        encoded.extend_from_slice(&size.to_le_bytes());
430        bao_tree::io::sync::encode_ranges_validated(data, &outboard, ranges, &mut encoded)?;
431        Ok((outboard.root.into(), encoded))
432    }
433}