Skip to main content

iroh_blobs/store/
util.rs

1use std::{borrow::Borrow, fmt};
2
3use bao_tree::io::mixed::EncodedItem;
4use bytes::Bytes;
5use derive_more::{From, Into};
6use n0_future::time::SystemTime;
7
8mod sparse_mem_file;
9use irpc::channel::mpsc;
10use range_collections::{range_set::RangeSetEntry, RangeSetRef};
11use ref_cast::RefCast;
12use serde::{Deserialize, Serialize};
13pub use sparse_mem_file::SparseMemFile;
14pub mod observer;
15mod size_info;
16pub use size_info::SizeInfo;
17mod partial_mem_storage;
18pub use partial_mem_storage::PartialMemStorage;
19
20#[cfg(feature = "fs-store")]
21mod mem_or_file;
22#[cfg(feature = "fs-store")]
23#[cfg_attr(iroh_blobs_docsrs, doc(cfg(feature = "fs-store")))]
24pub use mem_or_file::{FixedSize, MemOrFile};
25
26/// A named, persistent tag.
27#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, From, Into)]
28pub struct Tag(pub Bytes);
29
30impl From<&[u8]> for Tag {
31    fn from(value: &[u8]) -> Self {
32        Self(Bytes::copy_from_slice(value))
33    }
34}
35
36impl AsRef<[u8]> for Tag {
37    fn as_ref(&self) -> &[u8] {
38        self.0.as_ref()
39    }
40}
41
42impl Borrow<[u8]> for Tag {
43    fn borrow(&self) -> &[u8] {
44        self.0.as_ref()
45    }
46}
47
48impl From<String> for Tag {
49    fn from(value: String) -> Self {
50        Self(Bytes::from(value))
51    }
52}
53
54impl From<&str> for Tag {
55    fn from(value: &str) -> Self {
56        Self(Bytes::from(value.to_owned()))
57    }
58}
59
60impl fmt::Display for Tag {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        let bytes = self.0.as_ref();
63        match std::str::from_utf8(bytes) {
64            Ok(s) => write!(f, "\"{s}\""),
65            Err(_) => write!(f, "{}", hex::encode(bytes)),
66        }
67    }
68}
69
70impl Tag {
71    /// Create a new tag that does not exist yet.
72    pub fn auto(time: SystemTime, exists: impl Fn(&[u8]) -> bool) -> Self {
73        // On wasm, SystemTime is web_time::SystemTime, but we need a std system time
74        // to convert to chrono.
75        // TODO: Upstream to n0-future or expose SystemTimeExt on wasm
76        #[cfg(wasm_browser)]
77        let time = std::time::SystemTime::UNIX_EPOCH
78            + time.duration_since(SystemTime::UNIX_EPOCH).unwrap();
79
80        let now = chrono::DateTime::<chrono::Utc>::from(time);
81        let mut i = 0;
82        loop {
83            let mut text = format!("auto-{}", now.format("%Y-%m-%dT%H:%M:%S%.3fZ"));
84            if i != 0 {
85                text.push_str(&format!("-{i}"));
86            }
87            if !exists(text.as_bytes()) {
88                return Self::from(text);
89            }
90            i += 1;
91        }
92    }
93
94    /// The successor of this tag in lexicographic order.
95    pub fn successor(&self) -> Self {
96        let mut bytes = self.0.to_vec();
97        // increment_vec(&mut bytes);
98        bytes.push(0);
99        Self(bytes.into())
100    }
101
102    /// If this is a prefix, get the next prefix.
103    ///
104    /// This is like successor, except that it will return None if the prefix is all 0xFF instead of appending a 0 byte.
105    pub fn next_prefix(&self) -> Option<Self> {
106        let mut bytes = self.0.to_vec();
107        if next_prefix(&mut bytes) {
108            Some(Self(bytes.into()))
109        } else {
110            None
111        }
112    }
113}
114
115pub struct DD<T: fmt::Display>(pub T);
116
117impl<T: fmt::Display> fmt::Debug for DD<T> {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        fmt::Display::fmt(&self.0, f)
120    }
121}
122
123impl fmt::Debug for Tag {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_tuple("Tag").field(&DD(self)).finish()
126    }
127}
128
129pub(crate) fn limited_range(offset: u64, len: usize, buf_len: usize) -> std::ops::Range<usize> {
130    if offset < buf_len as u64 {
131        let start = offset as usize;
132        let end = start.saturating_add(len).min(buf_len);
133        start..end
134    } else {
135        0..0
136    }
137}
138
139/// zero copy get a limited slice from a `Bytes` as a `Bytes`.
140#[allow(dead_code)]
141pub(crate) fn get_limited_slice(bytes: &Bytes, offset: u64, len: usize) -> Bytes {
142    bytes.slice(limited_range(offset, len, bytes.len()))
143}
144
145pub trait RangeSetExt<T> {
146    fn upper_bound(&self) -> Option<T>;
147}
148
149impl<T: RangeSetEntry + Clone> RangeSetExt<T> for RangeSetRef<T> {
150    /// The upper (exclusive) bound of the bitfield
151    fn upper_bound(&self) -> Option<T> {
152        let boundaries = self.boundaries();
153        if boundaries.is_empty() {
154            Some(RangeSetEntry::min_value())
155        } else if boundaries.len().is_multiple_of(2) {
156            Some(boundaries[boundaries.len() - 1].clone())
157        } else {
158            None
159        }
160    }
161}
162
163#[cfg(feature = "fs-store")]
164mod fs {
165    use std::{
166        fmt,
167        fs::{File, OpenOptions},
168        io::{self, Read, Write},
169        path::Path,
170    };
171
172    use arrayvec::ArrayString;
173    use bao_tree::blake3;
174    use serde::{de::DeserializeOwned, Serialize};
175
176    mod redb_support {
177        use bytes::Bytes;
178        use redb::{Key as RedbKey, Value as RedbValue};
179
180        use super::super::Tag;
181
182        impl RedbValue for Tag {
183            type SelfType<'a> = Self;
184
185            type AsBytes<'a> = bytes::Bytes;
186
187            fn fixed_width() -> Option<usize> {
188                None
189            }
190
191            fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
192            where
193                Self: 'a,
194            {
195                Self(Bytes::copy_from_slice(data))
196            }
197
198            fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
199            where
200                Self: 'a,
201                Self: 'b,
202            {
203                value.0.clone()
204            }
205
206            fn type_name() -> redb::TypeName {
207                redb::TypeName::new("Tag")
208            }
209        }
210
211        impl RedbKey for Tag {
212            fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
213                data1.cmp(data2)
214            }
215        }
216    }
217
218    pub fn write_checksummed<P: AsRef<Path>, T: Serialize>(path: P, data: &T) -> io::Result<()> {
219        // Build Vec with space for hash
220        let mut buffer = Vec::with_capacity(32 + 128);
221        buffer.extend_from_slice(&[0u8; 32]);
222
223        // Serialize directly into buffer
224        postcard::to_io(data, &mut buffer).map_err(io::Error::other)?;
225
226        // Compute hash over data (skip first 32 bytes)
227        let data_slice = &buffer[32..];
228        let hash = blake3::hash(data_slice);
229        buffer[..32].copy_from_slice(hash.as_bytes());
230
231        // Write all at once
232        let mut file = File::create(&path)?;
233        file.write_all(&buffer)?;
234        file.sync_all()?;
235
236        Ok(())
237    }
238
239    pub fn read_checksummed_and_truncate<T: DeserializeOwned>(
240        path: impl AsRef<Path>,
241    ) -> io::Result<T> {
242        let path = path.as_ref();
243        let mut file = OpenOptions::new()
244            .read(true)
245            .write(true)
246            .truncate(false)
247            .open(path)?;
248        let mut buffer = Vec::new();
249        file.read_to_end(&mut buffer)?;
250        file.set_len(0)?;
251        file.sync_all()?;
252
253        if buffer.is_empty() {
254            return Err(io::Error::new(
255                io::ErrorKind::InvalidData,
256                "File marked dirty",
257            ));
258        }
259
260        if buffer.len() < 32 {
261            return Err(io::Error::new(io::ErrorKind::InvalidData, "File too short"));
262        }
263
264        let stored_hash = &buffer[..32];
265        let data = &buffer[32..];
266
267        let computed_hash = blake3::hash(data);
268        if computed_hash.as_bytes() != stored_hash {
269            return Err(io::Error::new(io::ErrorKind::InvalidData, "Hash mismatch"));
270        }
271
272        let deserialized = postcard::from_bytes(data).map_err(io::Error::other)?;
273
274        Ok(deserialized)
275    }
276
277    #[cfg(test)]
278    pub fn read_checksummed<T: DeserializeOwned>(path: impl AsRef<Path>) -> io::Result<T> {
279        use std::{fs::File, io::Read};
280
281        use bao_tree::blake3;
282        use tracing::info;
283
284        let path = path.as_ref();
285        let mut file = File::open(path)?;
286        let mut buffer = Vec::new();
287        file.read_to_end(&mut buffer)?;
288        info!("{} {}", path.display(), hex::encode(&buffer));
289
290        if buffer.is_empty() {
291            use std::io;
292
293            return Err(io::Error::new(
294                io::ErrorKind::InvalidData,
295                "File marked dirty",
296            ));
297        }
298
299        if buffer.len() < 32 {
300            return Err(io::Error::new(io::ErrorKind::InvalidData, "File too short"));
301        }
302
303        let stored_hash = &buffer[..32];
304        let data = &buffer[32..];
305
306        let computed_hash = blake3::hash(data);
307        if computed_hash.as_bytes() != stored_hash {
308            return Err(io::Error::new(io::ErrorKind::InvalidData, "Hash mismatch"));
309        }
310
311        let deserialized = postcard::from_bytes(data).map_err(io::Error::other)?;
312
313        Ok(deserialized)
314    }
315
316    /// Helper trait for bytes for debugging
317    pub trait SliceInfoExt: AsRef<[u8]> {
318        // get the addr of the actual data, to check if data was copied
319        fn addr(&self) -> usize;
320
321        // a short symbol string for the address
322        fn addr_short(&self) -> ArrayString<12> {
323            let addr = self.addr().to_le_bytes();
324            symbol_string(&addr)
325        }
326
327        #[allow(dead_code)]
328        fn hash_short(&self) -> ArrayString<10> {
329            crate::Hash::new(self.as_ref()).fmt_short()
330        }
331    }
332
333    impl<T: AsRef<[u8]>> SliceInfoExt for T {
334        fn addr(&self) -> usize {
335            self.as_ref() as *const [u8] as *const u8 as usize
336        }
337
338        fn hash_short(&self) -> ArrayString<10> {
339            crate::Hash::new(self.as_ref()).fmt_short()
340        }
341    }
342
343    pub fn symbol_string(data: &[u8]) -> ArrayString<12> {
344        const SYMBOLS: &[char] = &[
345            '😀', '😂', '😍', '😎', '😢', '😡', '😱', '😴', '🤓', '🤔', '🤗', '🤢', '🤡', '🤖',
346            '👽', '👾', '👻', '💀', '💩', '♥', '💥', '💦', '💨', '💫', '💬', '💭', '💰', '💳',
347            '💼', '📈', '📉', '📍', '📢', '📦', '📱', '📷', '📺', '🎃', '🎄', '🎉', '🎋', '🎍',
348            '🎒', '🎓', '🎖', '🎤', '🎧', '🎮', '🎰', '🎲', '🎳', '🎴', '🎵', '🎷', '🎸', '🎹',
349            '🎺', '🎻', '🎼', '🏀', '🏁', '🏆', '🏈',
350        ];
351        const BASE: usize = SYMBOLS.len(); // 64
352
353        // Hash the input with BLAKE3
354        let hash = blake3::hash(data);
355        let bytes = hash.as_bytes(); // 32-byte hash
356
357        // Create an ArrayString with capacity 12 (bytes)
358        let mut result = ArrayString::<12>::new();
359
360        // Fill with 3 symbols
361        for byte in bytes.iter().take(3) {
362            let byte = *byte as usize;
363            let index = byte % BASE;
364            result.push(SYMBOLS[index]); // Each char can be up to 4 bytes
365        }
366
367        result
368    }
369
370    pub struct ValueOrPoisioned<T>(pub Option<T>);
371
372    impl<T: fmt::Debug> fmt::Debug for ValueOrPoisioned<T> {
373        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374            match &self.0 {
375                Some(x) => x.fmt(f),
376                None => f.debug_tuple("Poisoned").finish(),
377            }
378        }
379    }
380}
381#[cfg(feature = "fs-store")]
382pub use fs::*;
383
384/// Given a prefix, increment it lexographically.
385///
386/// If the prefix is all FF, this will return false because there is no
387/// higher prefix than that.
388#[allow(dead_code)]
389pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool {
390    for byte in bytes.iter_mut().rev() {
391        if *byte < 255 {
392            *byte += 1;
393            return true;
394        }
395        *byte = 0;
396    }
397    false
398}
399
400#[derive(ref_cast::RefCast)]
401#[repr(transparent)]
402pub struct BaoTreeSender(mpsc::Sender<EncodedItem>);
403
404impl BaoTreeSender {
405    pub fn new(sender: &mut mpsc::Sender<EncodedItem>) -> &mut Self {
406        BaoTreeSender::ref_cast_mut(sender)
407    }
408}
409
410impl bao_tree::io::mixed::Sender for BaoTreeSender {
411    type Error = irpc::channel::SendError;
412    async fn send(&mut self, item: EncodedItem) -> std::result::Result<(), Self::Error> {
413        self.0.send(item).await
414    }
415}
416
417#[cfg(test)]
418#[cfg(feature = "fs-store")]
419pub mod tests {
420    use bao_tree::{io::outboard::PreOrderMemOutboard, ChunkRanges};
421    use n0_error::{Result, StdResultExt};
422
423    use crate::{hash::Hash, store::IROH_BLOCK_SIZE};
424
425    /// Create n0 flavoured bao. Note that this can be used to request ranges below a chunk group size,
426    /// which can not be exported via bao because we don't store hashes below the chunk group level.
427    pub fn create_n0_bao(data: &[u8], ranges: &ChunkRanges) -> Result<(Hash, Vec<u8>)> {
428        let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
429        let mut encoded = Vec::new();
430        let size = data.len() as u64;
431        encoded.extend_from_slice(&size.to_le_bytes());
432        bao_tree::io::sync::encode_ranges_validated(data, &outboard, ranges, &mut encoded)
433            .anyerr()?;
434        Ok((outboard.root.into(), encoded))
435    }
436}