1#![cfg_attr(feature = "hide-proto-docs", doc(hidden))]
2use std::{
17    fmt::{self, Debug},
18    io,
19    num::NonZeroU64,
20    ops::{Bound, RangeBounds},
21    path::PathBuf,
22    pin::Pin,
23};
24
25use arrayvec::ArrayString;
26use bao_tree::{
27    io::{mixed::EncodedItem, BaoContentItem, Leaf},
28    ChunkRanges,
29};
30use bytes::Bytes;
31use irpc::{
32    channel::{mpsc, oneshot},
33    rpc_requests,
34};
35use n0_future::Stream;
36use range_collections::RangeSet2;
37use serde::{Deserialize, Serialize};
38pub(crate) mod bitfield;
39pub use bitfield::Bitfield;
40
41use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
42
43#[allow(dead_code)]
44pub(crate) trait HashSpecific {
45    fn hash(&self) -> Hash;
46
47    fn hash_short(&self) -> ArrayString<10> {
48        self.hash().fmt_short()
49    }
50}
51
52impl HashSpecific for ImportBaoMsg {
53    fn hash(&self) -> crate::Hash {
54        self.inner.hash
55    }
56}
57
58impl HashSpecific for ObserveMsg {
59    fn hash(&self) -> crate::Hash {
60        self.inner.hash
61    }
62}
63
64impl HashSpecific for ExportBaoMsg {
65    fn hash(&self) -> crate::Hash {
66        self.inner.hash
67    }
68}
69
70impl HashSpecific for ExportRangesMsg {
71    fn hash(&self) -> crate::Hash {
72        self.inner.hash
73    }
74}
75
76impl HashSpecific for ExportPathMsg {
77    fn hash(&self) -> crate::Hash {
78        self.inner.hash
79    }
80}
81
82pub type BoxedByteStream = Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Send + Sync + 'static>>;
83
84impl HashSpecific for CreateTagMsg {
85    fn hash(&self) -> crate::Hash {
86        self.inner.value.hash
87    }
88}
89
90#[rpc_requests(message = Command, alias = "Msg")]
91#[derive(Debug, Serialize, Deserialize)]
92pub enum Request {
93    #[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
94    ListBlobs(ListRequest),
95    #[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
96    Batch(BatchRequest),
97    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
98    DeleteBlobs(BlobDeleteRequest),
99    #[rpc(rx = mpsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
100    ImportBao(ImportBaoRequest),
101    #[rpc(tx = mpsc::Sender<EncodedItem>)]
102    ExportBao(ExportBaoRequest),
103    #[rpc(tx = mpsc::Sender<ExportRangesItem>)]
104    ExportRanges(ExportRangesRequest),
105    #[rpc(tx = mpsc::Sender<Bitfield>)]
106    Observe(ObserveRequest),
107    #[rpc(tx = oneshot::Sender<BlobStatus>)]
108    BlobStatus(BlobStatusRequest),
109    #[rpc(tx = mpsc::Sender<AddProgressItem>)]
110    ImportBytes(ImportBytesRequest),
111    #[rpc(rx = mpsc::Receiver<ImportByteStreamUpdate>, tx = mpsc::Sender<AddProgressItem>)]
112    ImportByteStream(ImportByteStreamRequest),
113    #[rpc(tx = mpsc::Sender<AddProgressItem>)]
114    ImportPath(ImportPathRequest),
115    #[rpc(tx = mpsc::Sender<ExportProgressItem>)]
116    ExportPath(ExportPathRequest),
117    #[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
118    ListTags(ListTagsRequest),
119    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
120    SetTag(SetTagRequest),
121    #[rpc(tx = oneshot::Sender<super::Result<u64>>)]
122    DeleteTags(DeleteTagsRequest),
123    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
124    RenameTag(RenameTagRequest),
125    #[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
126    CreateTag(CreateTagRequest),
127    #[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
128    ListTempTags(ListTempTagsRequest),
129    #[rpc(tx = oneshot::Sender<TempTag>)]
130    CreateTempTag(CreateTempTagRequest),
131    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
132    SyncDb(SyncDbRequest),
133    #[rpc(tx = oneshot::Sender<()>)]
134    WaitIdle(WaitIdleRequest),
135    #[rpc(tx = oneshot::Sender<()>)]
136    Shutdown(ShutdownRequest),
137    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
138    ClearProtected(ClearProtectedRequest),
139}
140
141#[derive(Debug, Serialize, Deserialize)]
142pub struct WaitIdleRequest;
143
144#[derive(Debug, Serialize, Deserialize)]
145pub struct SyncDbRequest;
146
147#[derive(Debug, Serialize, Deserialize)]
148pub struct ShutdownRequest;
149
150#[derive(Debug, Serialize, Deserialize)]
151pub struct ClearProtectedRequest;
152
153#[derive(Debug, Serialize, Deserialize)]
154pub struct BlobStatusRequest {
155    pub hash: Hash,
156}
157
158#[derive(Debug, Serialize, Deserialize)]
159pub struct ListRequest;
160
161#[derive(Debug, Serialize, Deserialize)]
162pub struct BatchRequest;
163
164#[derive(Debug, Serialize, Deserialize)]
165pub enum BatchResponse {
166    Drop(HashAndFormat),
167    Ping,
168}
169
170#[derive(Debug, Serialize, Deserialize)]
172pub struct BlobDeleteRequest {
173    pub hashes: Vec<Hash>,
174    pub force: bool,
175}
176
177#[derive(Serialize, Deserialize)]
179pub struct ImportBytesRequest {
180    pub data: Bytes,
181    pub format: BlobFormat,
182    pub scope: Scope,
183}
184
185impl fmt::Debug for ImportBytesRequest {
186    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187        f.debug_struct("ImportBytes")
188            .field("data", &self.data.len())
189            .field("format", &self.format)
190            .field("scope", &self.scope)
191            .finish()
192    }
193}
194
195#[derive(Debug, Serialize, Deserialize)]
196pub struct ImportPathRequest {
197    pub path: PathBuf,
198    pub mode: ImportMode,
199    pub format: BlobFormat,
200    pub scope: Scope,
201}
202
203#[derive(Debug, Serialize, Deserialize)]
209pub struct ImportBaoRequest {
210    pub hash: Hash,
211    pub size: NonZeroU64,
212}
213
214#[derive(Debug, Serialize, Deserialize)]
216pub struct ObserveRequest {
217    pub hash: Hash,
218}
219
220#[derive(Debug, Serialize, Deserialize)]
224pub struct ExportBaoRequest {
225    pub hash: Hash,
226    pub ranges: ChunkRanges,
227}
228
229#[derive(Debug, Serialize, Deserialize)]
231pub struct ExportRangesRequest {
232    pub hash: Hash,
233    pub ranges: RangeSet2<u64>,
234}
235
236#[derive(Debug, Serialize, Deserialize)]
243pub struct ExportPathRequest {
244    pub hash: Hash,
245    pub mode: ExportMode,
246    pub target: PathBuf,
247}
248
249#[derive(Debug, Serialize, Deserialize)]
250pub struct ImportByteStreamRequest {
251    pub format: BlobFormat,
252    pub scope: Scope,
253}
254
255#[derive(Debug, Serialize, Deserialize)]
256pub enum ImportByteStreamUpdate {
257    Bytes(Bytes),
258    Done,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct ListTagsRequest {
264    pub hash_seq: bool,
266    pub raw: bool,
268    pub from: Option<Tag>,
270    pub to: Option<Tag>,
272}
273
274impl ListTagsRequest {
275    pub fn range<R, E>(range: R) -> Self
277    where
278        R: RangeBounds<E>,
279        E: AsRef<[u8]>,
280    {
281        let (from, to) = tags_from_range(range);
282        Self {
283            from,
284            to,
285            raw: true,
286            hash_seq: true,
287        }
288    }
289
290    pub fn prefix(prefix: &[u8]) -> Self {
292        let from = Tag::from(prefix);
293        let to = from.next_prefix();
294        Self {
295            raw: true,
296            hash_seq: true,
297            from: Some(from),
298            to,
299        }
300    }
301
302    pub fn single(name: &[u8]) -> Self {
304        let from = Tag::from(name);
305        Self {
306            to: Some(from.successor()),
307            from: Some(from),
308            raw: true,
309            hash_seq: true,
310        }
311    }
312
313    pub fn all() -> Self {
315        Self {
316            raw: true,
317            hash_seq: true,
318            from: None,
319            to: None,
320        }
321    }
322
323    pub fn raw() -> Self {
325        Self {
326            raw: true,
327            hash_seq: false,
328            from: None,
329            to: None,
330        }
331    }
332
333    pub fn hash_seq() -> Self {
335        Self {
336            raw: false,
337            hash_seq: true,
338            from: None,
339            to: None,
340        }
341    }
342}
343
344#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
346pub struct TagInfo {
347    pub name: Tag,
349    pub format: BlobFormat,
351    pub hash: Hash,
353}
354
355impl From<TagInfo> for HashAndFormat {
356    fn from(tag_info: TagInfo) -> Self {
357        HashAndFormat {
358            hash: tag_info.hash,
359            format: tag_info.format,
360        }
361    }
362}
363
364impl TagInfo {
365    pub fn new(name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Self {
367        let name = name.as_ref();
368        let value = value.into();
369        Self {
370            name: Tag::from(name),
371            hash: value.hash,
372            format: value.format,
373        }
374    }
375
376    pub fn hash_and_format(&self) -> HashAndFormat {
378        HashAndFormat {
379            hash: self.hash,
380            format: self.format,
381        }
382    }
383}
384
385pub(crate) fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>)
386where
387    R: RangeBounds<E>,
388    E: AsRef<[u8]>,
389{
390    let from = match range.start_bound() {
391        Bound::Included(start) => Some(Tag::from(start.as_ref())),
392        Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()),
393        Bound::Unbounded => None,
394    };
395    let to = match range.end_bound() {
396        Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()),
397        Bound::Excluded(end) => Some(Tag::from(end.as_ref())),
398        Bound::Unbounded => None,
399    };
400    (from, to)
401}
402
403#[derive(Debug, Serialize, Deserialize)]
405pub struct CreateTempTagRequest {
406    pub scope: Scope,
407    pub value: HashAndFormat,
408}
409
410#[derive(Debug, Serialize, Deserialize)]
412pub struct ListTempTagsRequest;
413
414#[derive(Debug, Serialize, Deserialize)]
416pub struct RenameTagRequest {
417    pub from: Tag,
419    pub to: Tag,
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
425pub struct DeleteTagsRequest {
426    pub from: Option<Tag>,
428    pub to: Option<Tag>,
430}
431
432impl DeleteTagsRequest {
433    pub fn single(name: &[u8]) -> Self {
435        let name = Tag::from(name);
436        Self {
437            to: Some(name.successor()),
438            from: Some(name),
439        }
440    }
441
442    pub fn range<R, E>(range: R) -> Self
444    where
445        R: RangeBounds<E>,
446        E: AsRef<[u8]>,
447    {
448        let (from, to) = tags_from_range(range);
449        Self { from, to }
450    }
451
452    pub fn prefix(prefix: &[u8]) -> Self {
454        let from = Tag::from(prefix);
455        let to = from.next_prefix();
456        Self {
457            from: Some(from),
458            to,
459        }
460    }
461}
462
463#[derive(Debug, Serialize, Deserialize)]
465pub struct SetTagRequest {
466    pub name: Tag,
467    pub value: HashAndFormat,
468}
469
470#[derive(Debug, Serialize, Deserialize)]
472pub struct CreateTagRequest {
473    pub value: HashAndFormat,
474}
475
476#[derive(Debug, Serialize, Deserialize)]
478pub struct ProcessExitRequest {
479    pub code: i32,
480}
481
482#[derive(Debug, Serialize, Deserialize)]
496pub enum AddProgressItem {
497    CopyProgress(u64),
507    Size(u64),
516    CopyDone,
521    OutboardProgress(u64),
526    Done(TempTag),
535    Error(#[serde(with = "crate::util::serde::io_error_serde")] io::Error),
543}
544
545impl From<io::Error> for AddProgressItem {
546    fn from(e: io::Error) -> Self {
547        Self::Error(e)
548    }
549}
550
551#[derive(Debug, Serialize, Deserialize)]
552pub enum ExportRangesItem {
553    Size(u64),
558    Data(Leaf),
560    Error(super::Error),
562}
563
564impl From<super::Error> for ExportRangesItem {
565    fn from(e: super::Error) -> Self {
566        Self::Error(e)
567    }
568}
569
570impl From<Leaf> for ExportRangesItem {
571    fn from(leaf: Leaf) -> Self {
572        Self::Data(leaf)
573    }
574}
575
576#[derive(Debug, Serialize, Deserialize)]
585pub enum ExportProgressItem {
586    Size(u64),
591    CopyProgress(u64),
599    Done,
607    Error(super::Error),
615}
616
617impl From<super::Error> for ExportProgressItem {
618    fn from(e: super::Error) -> Self {
619        Self::Error(e)
620    }
621}
622
623#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
630pub enum ImportMode {
631    #[default]
636    Copy,
637    TryReference,
645}
646
647#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
654pub enum ExportMode {
655    #[default]
660    Copy,
661    TryReference,
670}
671
672#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
674pub enum BlobStatus {
675    NotFound,
677    Partial {
679        size: Option<u64>,
681    },
682    Complete {
684        size: u64,
686    },
687}
688
689#[derive(
691    Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display,
692)]
693pub struct Scope(pub(crate) u64);
694
695impl Scope {
696    pub const GLOBAL: Self = Self(0);
697}
698
699impl std::fmt::Debug for Scope {
700    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
701        if self.0 == 0 {
702            write!(f, "Global")
703        } else {
704            f.debug_tuple("Scope").field(&self.0).finish()
705        }
706    }
707}