iroh_blobs/api/
proto.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
#![cfg_attr(feature = "hide-proto-docs", doc(hidden))]
//! The protocol that a store implementation needs to implement.
//!
//! A store needs to handle [`Request`]s. It is fine to just return an error for some
//! commands. E.g. an immutable store can just return an error for import commands.
//!
//! Each command consists of a serializable request message and channels for updates
//! and responses. The enum containing the full requests is [`Command`]. These are the
//! commands you will have to handle in a store actor handler.
//!
//! This crate provides a file system based store implementation, [`crate::store::fs::FsStore`],
//! as well as a mutable in-memory store and an immutable in-memory store.
//!
//! The file system store is quite complex and optimized, so to get started take a look at
//! the much simpler memory store.
use std::{
    fmt::{self, Debug},
    io,
    num::NonZeroU64,
    ops::{Bound, RangeBounds},
    path::PathBuf,
    pin::Pin,
};

use arrayvec::ArrayString;
use bao_tree::{
    io::{mixed::EncodedItem, BaoContentItem, Leaf},
    ChunkRanges,
};
use bytes::Bytes;
use irpc::{
    channel::{mpsc, oneshot},
    rpc_requests,
};
use n0_future::Stream;
use range_collections::RangeSet2;
use serde::{Deserialize, Serialize};
pub(crate) mod bitfield;
pub use bitfield::Bitfield;

use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};

pub(crate) trait HashSpecific {
    fn hash(&self) -> Hash;

    fn hash_short(&self) -> ArrayString<10> {
        self.hash().fmt_short()
    }
}

impl HashSpecific for ImportBaoMsg {
    fn hash(&self) -> crate::Hash {
        self.inner.hash
    }
}

impl HashSpecific for ObserveMsg {
    fn hash(&self) -> crate::Hash {
        self.inner.hash
    }
}

impl HashSpecific for ExportBaoMsg {
    fn hash(&self) -> crate::Hash {
        self.inner.hash
    }
}

impl HashSpecific for ExportRangesMsg {
    fn hash(&self) -> crate::Hash {
        self.inner.hash
    }
}

impl HashSpecific for ExportPathMsg {
    fn hash(&self) -> crate::Hash {
        self.inner.hash
    }
}

pub type BoxedByteStream = Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Send + Sync + 'static>>;

impl HashSpecific for CreateTagMsg {
    fn hash(&self) -> crate::Hash {
        self.inner.value.hash
    }
}

#[derive(Debug, Clone)]
pub struct StoreService;
impl irpc::Service for StoreService {}

#[rpc_requests(StoreService, message = Command, alias = "Msg")]
#[derive(Debug, Serialize, Deserialize)]
pub enum Request {
    #[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
    ListBlobs(ListRequest),
    #[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
    Batch(BatchRequest),
    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
    DeleteBlobs(BlobDeleteRequest),
    #[rpc(rx = mpsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
    ImportBao(ImportBaoRequest),
    #[rpc(tx = mpsc::Sender<EncodedItem>)]
    ExportBao(ExportBaoRequest),
    #[rpc(tx = mpsc::Sender<ExportRangesItem>)]
    ExportRanges(ExportRangesRequest),
    #[rpc(tx = mpsc::Sender<Bitfield>)]
    Observe(ObserveRequest),
    #[rpc(tx = oneshot::Sender<BlobStatus>)]
    BlobStatus(BlobStatusRequest),
    #[rpc(tx = mpsc::Sender<AddProgressItem>)]
    ImportBytes(ImportBytesRequest),
    #[rpc(rx = mpsc::Receiver<ImportByteStreamUpdate>, tx = mpsc::Sender<AddProgressItem>)]
    ImportByteStream(ImportByteStreamRequest),
    #[rpc(tx = mpsc::Sender<AddProgressItem>)]
    ImportPath(ImportPathRequest),
    #[rpc(tx = mpsc::Sender<ExportProgressItem>)]
    ExportPath(ExportPathRequest),
    #[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
    ListTags(ListTagsRequest),
    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
    SetTag(SetTagRequest),
    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
    DeleteTags(DeleteTagsRequest),
    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
    RenameTag(RenameTagRequest),
    #[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
    CreateTag(CreateTagRequest),
    #[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
    ListTempTags(ListTempTagsRequest),
    #[rpc(tx = oneshot::Sender<TempTag>)]
    CreateTempTag(CreateTempTagRequest),
    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
    SyncDb(SyncDbRequest),
    #[rpc(tx = oneshot::Sender<()>)]
    Shutdown(ShutdownRequest),
    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
    ClearProtected(ClearProtectedRequest),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct SyncDbRequest;

#[derive(Debug, Serialize, Deserialize)]
pub struct ShutdownRequest;

#[derive(Debug, Serialize, Deserialize)]
pub struct ClearProtectedRequest;

#[derive(Debug, Serialize, Deserialize)]
pub struct BlobStatusRequest {
    pub hash: Hash,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ListRequest;

#[derive(Debug, Serialize, Deserialize)]
pub struct BatchRequest;

#[derive(Debug, Serialize, Deserialize)]
pub enum BatchResponse {
    Drop(HashAndFormat),
    Ping,
}

/// Options for force deletion of blobs.
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobDeleteRequest {
    pub hashes: Vec<Hash>,
    pub force: bool,
}

/// Import the given bytes.
#[derive(Serialize, Deserialize)]
pub struct ImportBytesRequest {
    pub data: Bytes,
    pub format: BlobFormat,
    pub scope: Scope,
}

impl fmt::Debug for ImportBytesRequest {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ImportBytes")
            .field("data", &self.data.len())
            .field("format", &self.format)
            .field("scope", &self.scope)
            .finish()
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ImportPathRequest {
    pub path: PathBuf,
    pub mode: ImportMode,
    pub format: BlobFormat,
    pub scope: Scope,
}

/// Import bao encoded data for the given hash with the iroh block size.
///
/// The result is just a single item, indicating if a write error occurred.
/// To observe the incoming data more granularly, use the `Observe` command
/// concurrently.
#[derive(Debug, Serialize, Deserialize)]
pub struct ImportBaoRequest {
    pub hash: Hash,
    pub size: NonZeroU64,
}

/// Observe the local bitfield of the given hash.
#[derive(Debug, Serialize, Deserialize)]
pub struct ObserveRequest {
    pub hash: Hash,
}

/// Export the given ranges in bao format, with the iroh block size.
///
/// The returned stream should be verified by the store.
#[derive(Debug, Serialize, Deserialize)]
pub struct ExportBaoRequest {
    pub hash: Hash,
    pub ranges: ChunkRanges,
}

/// Export the given ranges as chunkks, without validation.
#[derive(Debug, Serialize, Deserialize)]
pub struct ExportRangesRequest {
    pub hash: Hash,
    pub ranges: RangeSet2<u64>,
}

/// Export a file to a target path.
///
/// For an incomplete file, the size might be truncated and gaps will be filled
/// with zeros. If possible, a store implementation should try to write as a
/// sparse file.

#[derive(Debug, Serialize, Deserialize)]
pub struct ExportPathRequest {
    pub hash: Hash,
    pub mode: ExportMode,
    pub target: PathBuf,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ImportByteStreamRequest {
    pub format: BlobFormat,
    pub scope: Scope,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum ImportByteStreamUpdate {
    Bytes(Bytes),
    Done,
}

/// Options for a list operation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListTagsRequest {
    /// List tags to hash seqs
    pub hash_seq: bool,
    /// List tags to raw blobs
    pub raw: bool,
    /// Optional from tag (inclusive)
    pub from: Option<Tag>,
    /// Optional to tag (exclusive)
    pub to: Option<Tag>,
}

impl ListTagsRequest {
    /// List a range of tags
    pub fn range<R, E>(range: R) -> Self
    where
        R: RangeBounds<E>,
        E: AsRef<[u8]>,
    {
        let (from, to) = tags_from_range(range);
        Self {
            from,
            to,
            raw: true,
            hash_seq: true,
        }
    }

    /// List tags with a prefix
    pub fn prefix(prefix: &[u8]) -> Self {
        let from = Tag::from(prefix);
        let to = from.next_prefix();
        Self {
            raw: true,
            hash_seq: true,
            from: Some(from),
            to,
        }
    }

    /// List a single tag
    pub fn single(name: &[u8]) -> Self {
        let from = Tag::from(name);
        Self {
            to: Some(from.successor()),
            from: Some(from),
            raw: true,
            hash_seq: true,
        }
    }

    /// List all tags
    pub fn all() -> Self {
        Self {
            raw: true,
            hash_seq: true,
            from: None,
            to: None,
        }
    }

    /// List raw tags
    pub fn raw() -> Self {
        Self {
            raw: true,
            hash_seq: false,
            from: None,
            to: None,
        }
    }

    /// List hash seq tags
    pub fn hash_seq() -> Self {
        Self {
            raw: false,
            hash_seq: true,
            from: None,
            to: None,
        }
    }
}

/// Information about a tag.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct TagInfo {
    /// Name of the tag
    pub name: Tag,
    /// Format of the data
    pub format: BlobFormat,
    /// Hash of the data
    pub hash: Hash,
}

impl From<TagInfo> for HashAndFormat {
    fn from(tag_info: TagInfo) -> Self {
        HashAndFormat {
            hash: tag_info.hash,
            format: tag_info.format,
        }
    }
}

impl TagInfo {
    /// Create a new tag info.
    pub fn new(name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Self {
        let name = name.as_ref();
        let value = value.into();
        Self {
            name: Tag::from(name),
            hash: value.hash,
            format: value.format,
        }
    }

    /// Get the hash and format of the tag.
    pub fn hash_and_format(&self) -> HashAndFormat {
        HashAndFormat {
            hash: self.hash,
            format: self.format,
        }
    }
}

pub(crate) fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>)
where
    R: RangeBounds<E>,
    E: AsRef<[u8]>,
{
    let from = match range.start_bound() {
        Bound::Included(start) => Some(Tag::from(start.as_ref())),
        Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()),
        Bound::Unbounded => None,
    };
    let to = match range.end_bound() {
        Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()),
        Bound::Excluded(end) => Some(Tag::from(end.as_ref())),
        Bound::Unbounded => None,
    };
    (from, to)
}

/// List all temp tags
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTempTagRequest {
    pub scope: Scope,
    pub value: HashAndFormat,
}

/// List all temp tags
#[derive(Debug, Serialize, Deserialize)]
pub struct ListTempTagsRequest;

/// Rename a tag atomically
#[derive(Debug, Serialize, Deserialize)]
pub struct RenameTagRequest {
    /// Old tag name
    pub from: Tag,
    /// New tag name
    pub to: Tag,
}

/// Options for a delete operation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteTagsRequest {
    /// Optional from tag (inclusive)
    pub from: Option<Tag>,
    /// Optional to tag (exclusive)
    pub to: Option<Tag>,
}

impl DeleteTagsRequest {
    /// Delete a single tag
    pub fn single(name: &[u8]) -> Self {
        let name = Tag::from(name);
        Self {
            to: Some(name.successor()),
            from: Some(name),
        }
    }

    /// Delete a range of tags
    pub fn range<R, E>(range: R) -> Self
    where
        R: RangeBounds<E>,
        E: AsRef<[u8]>,
    {
        let (from, to) = tags_from_range(range);
        Self { from, to }
    }

    /// Delete tags with a prefix
    pub fn prefix(prefix: &[u8]) -> Self {
        let from = Tag::from(prefix);
        let to = from.next_prefix();
        Self {
            from: Some(from),
            to,
        }
    }
}

/// Options for creating a tag or setting it to a new value.
#[derive(Debug, Serialize, Deserialize)]
pub struct SetTagRequest {
    pub name: Tag,
    pub value: HashAndFormat,
}

/// Options for creating a tag
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTagRequest {
    pub value: HashAndFormat,
}

/// Debug tool to exit the process in the middle of a write transaction, for testing.
#[derive(Debug, Serialize, Deserialize)]
pub struct ProcessExitRequest {
    pub code: i32,
}

/// Progress events for importing from any local source.
///
/// For sources with known size such as blobs or files, you will get the events
/// in the following order:
///
/// Size -> CopyProgress(*n) -> CopyDone -> OutboardProgress(*n) -> Done
///
/// For sources with unknown size such as streams, you will get the events
/// in the following order:
///
/// CopyProgress(*n) -> Size -> CopyDone -> OutboardProgress(*n) -> Done
///
/// Errors can happen at any time, and will be reported as an `Error` event.
#[derive(Debug, Serialize, Deserialize)]
pub enum AddProgressItem {
    /// Progress copying the file into the data directory.
    ///
    /// On most modern systems, copying will be done with copy on write,
    /// so copying will be instantaneous and you won't get any of these.
    ///
    /// The number is the *byte offset* of the copy process.
    ///
    /// This is an ephemeral progress event, so you can't rely on getting
    /// regular updates.
    CopyProgress(u64),
    /// Size of the file or stream has been determined.
    ///
    /// For some input such as blobs or files, the size is immediately known.
    /// For other inputs such as streams, the size is determined by reading
    /// the stream to the end.
    ///
    /// This is a guaranteed progress event, so you can rely on getting exactly
    /// one of these.
    Size(u64),
    /// The copy part of the import operation is done.
    ///
    /// This is a guaranteed progress event, so you can rely on getting exactly
    /// one of these.
    CopyDone,
    /// Progress computing the outboard and root hash of the imported data.
    ///
    /// This is an ephemeral progress event, so you can't rely on getting
    /// regular updates.
    OutboardProgress(u64),
    /// The import is done. Once you get this event the data is available
    /// and protected in the store via the temp tag.
    ///
    /// This is a guaranteed progress event, so you can rely on getting exactly
    /// one of these if the operation was successful.
    ///
    /// This is one of the two possible final events. After this event, there
    /// won't be any more progress events.
    Done(TempTag),
    /// The import failed with an error. Partial data will be deleted.
    ///
    /// This is a guaranteed progress event, so you can rely on getting exactly
    /// one of these if the operation was unsuccessful.
    ///
    /// This is one of the two possible final events. After this event, there
    /// won't be any more progress events.
    Error(#[serde(with = "crate::util::serde::io_error_serde")] io::Error),
}

impl From<io::Error> for AddProgressItem {
    fn from(e: io::Error) -> Self {
        Self::Error(e)
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub enum ExportRangesItem {
    /// The size of the file being exported.
    ///
    /// This is a guaranteed progress event, so you can rely on getting exactly
    /// one of these.
    Size(u64),
    /// A range of the file being exported.
    Data(Leaf),
    /// Error while exporting the ranges.
    Error(super::Error),
}

impl From<super::Error> for ExportRangesItem {
    fn from(e: super::Error) -> Self {
        Self::Error(e)
    }
}

impl From<Leaf> for ExportRangesItem {
    fn from(leaf: Leaf) -> Self {
        Self::Data(leaf)
    }
}

/// Progress events for exporting to a local file.
///
/// Exporting does not involve outboard computation, so the events are simpler
/// than [`AddProgressItem`].
///
/// Size -> CopyProgress(*n) -> Done
///
/// Errors can happen at any time, and will be reported as an `Error` event.
#[derive(Debug, Serialize, Deserialize)]
pub enum ExportProgressItem {
    /// The size of the file being exported.
    ///
    /// This is a guaranteed progress event, so you can rely on getting exactly
    /// one of these.
    Size(u64),
    /// Progress copying the file to the target directory.
    ///
    /// On many modern systems, copying will be done with copy on write,
    /// so copying will be instantaneous and you won't get any of these.
    ///
    /// This is an ephemeral progress event, so you can't rely on getting
    /// regular updates.
    CopyProgress(u64),
    /// The export is done. Once you get this event the data is available.
    ///
    /// This is a guaranteed progress event, so you can rely on getting exactly
    /// one of these if the operation was successful.
    ///
    /// This is one of the two possible final events. After this event, there
    /// won't be any more progress events.
    Done,
    /// The export failed with an error.
    ///
    /// This is a guaranteed progress event, so you can rely on getting exactly
    /// one of these if the operation was unsuccessful.
    ///
    /// This is one of the two possible final events. After this event, there
    /// won't be any more progress events.
    Error(super::Error),
}

impl From<super::Error> for ExportProgressItem {
    fn from(e: super::Error) -> Self {
        Self::Error(e)
    }
}

/// The import mode describes how files will be imported.
///
/// This is a hint to the import trait method. For some implementations, this
/// does not make any sense. E.g. an in memory implementation will always have
/// to copy the file into memory. Also, a disk based implementation might choose
/// to copy small files even if the mode is `Reference`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum ImportMode {
    /// This mode will copy the file into the database before hashing.
    ///
    /// This is the safe default because the file can not be accidentally modified
    /// after it has been imported.
    #[default]
    Copy,
    /// This mode will try to reference the file in place and assume it is unchanged after import.
    ///
    /// This has a large performance and storage benefit, but it is less safe since
    /// the file might be modified after it has been imported.
    ///
    /// Stores are allowed to ignore this mode and always copy the file, e.g.
    /// if the file is very small or if the store does not support referencing files.
    TryReference,
}

/// The import mode describes how files will be imported.
///
/// This is a hint to the import trait method. For some implementations, this
/// does not make any sense. E.g. an in memory implementation will always have
/// to copy the file into memory. Also, a disk based implementation might choose
/// to copy small files even if the mode is `Reference`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
pub enum ExportMode {
    /// This mode will copy the file to the target directory.
    ///
    /// This is the safe default because the file can not be accidentally modified
    /// after it has been exported.
    #[default]
    Copy,
    /// This mode will try to move the file to the target directory and then reference it from
    /// the database.
    ///
    /// This has a large performance and storage benefit, but it is less safe since
    /// the file might be modified in the target directory after it has been exported.
    ///
    /// Stores are allowed to ignore this mode and always copy the file, e.g.
    /// if the file is very small or if the store does not support referencing files.
    TryReference,
}

/// Status information about a blob.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlobStatus {
    /// The blob is not stored at all.
    NotFound,
    /// The blob is only stored partially.
    Partial {
        /// The size of the currently stored partial blob.
        size: Option<u64>,
    },
    /// The blob is stored completely.
    Complete {
        /// The size of the blob.
        size: u64,
    },
}

/// A scope for a write operation.
#[derive(
    Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display,
)]
pub struct Scope(pub(crate) u64);

impl Scope {
    pub const GLOBAL: Self = Self(0);
}

impl std::fmt::Debug for Scope {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        if self.0 == 0 {
            write!(f, "Global")
        } else {
            f.debug_tuple("Scope").field(&self.0).finish()
        }
    }
}