iroh_blobs/
api.rs

1//! The user facing API of the store.
2//!
3//! This API is both for interacting with an in-process store and for interacting
4//! with a remote store via rpc calls.
5//!
6//! The entry point for the api is the [`Store`] struct. There are several ways
7//! to obtain a `Store` instance: it is available via [`Deref`]
8//! from the different store implementations
9//! (e.g. [`MemStore`](crate::store::mem::MemStore)
10//! and [`FsStore`](crate::store::fs::FsStore)) as well as on the
11//! [`BlobsProtocol`](crate::BlobsProtocol) iroh protocol handler.
12//!
13//! You can also [`connect`](Store::connect) to a remote store that is listening
14//! to rpc requests.
15use std::{io, ops::Deref};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use n0_snafu::SpanTrace;
20use nested_enum_utils::common_fields;
21use proto::{ShutdownRequest, SyncDbRequest};
22use ref_cast::RefCast;
23use serde::{Deserialize, Serialize};
24use snafu::{Backtrace, IntoError, Snafu};
25use tags::Tags;
26
27pub mod blobs;
28pub mod downloader;
29pub mod proto;
30pub mod remote;
31pub mod tags;
32use crate::{api::proto::WaitIdleRequest, provider::events::ProgressError};
33pub use crate::{store::util::Tag, util::temp_tag::TempTag};
34
35pub(crate) type ApiClient = irpc::Client<proto::Request>;
36
37#[common_fields({
38    backtrace: Option<Backtrace>,
39    #[snafu(implicit)]
40    span_trace: SpanTrace,
41})]
42#[allow(missing_docs)]
43#[non_exhaustive]
44#[derive(Debug, Snafu)]
45pub enum RequestError {
46    /// Request failed due to rpc error.
47    #[snafu(display("rpc error: {source}"))]
48    Rpc { source: irpc::Error },
49    /// Request failed due an actual error.
50    #[snafu(display("inner error: {source}"))]
51    Inner { source: Error },
52}
53
54impl From<irpc::Error> for RequestError {
55    fn from(value: irpc::Error) -> Self {
56        RpcSnafu.into_error(value)
57    }
58}
59
60impl From<Error> for RequestError {
61    fn from(value: Error) -> Self {
62        InnerSnafu.into_error(value)
63    }
64}
65
66impl From<io::Error> for RequestError {
67    fn from(value: io::Error) -> Self {
68        InnerSnafu.into_error(value.into())
69    }
70}
71
72impl From<irpc::channel::mpsc::RecvError> for RequestError {
73    fn from(value: irpc::channel::mpsc::RecvError) -> Self {
74        RpcSnafu.into_error(value.into())
75    }
76}
77
78pub type RequestResult<T> = std::result::Result<T, RequestError>;
79
80#[common_fields({
81    backtrace: Option<Backtrace>,
82    #[snafu(implicit)]
83    span_trace: SpanTrace,
84})]
85#[allow(missing_docs)]
86#[non_exhaustive]
87#[derive(Debug, Snafu)]
88pub enum ExportBaoError {
89    #[snafu(display("send error: {source}"))]
90    Send { source: irpc::channel::SendError },
91    #[snafu(display("mpsc recv error: {source}"))]
92    MpscRecv {
93        source: irpc::channel::mpsc::RecvError,
94    },
95    #[snafu(display("oneshot recv error: {source}"))]
96    OneshotRecv {
97        source: irpc::channel::oneshot::RecvError,
98    },
99    #[snafu(display("request error: {source}"))]
100    Request { source: irpc::RequestError },
101    #[snafu(display("io error: {source}"))]
102    ExportBaoIo { source: io::Error },
103    #[snafu(display("encode error: {source}"))]
104    ExportBaoInner { source: bao_tree::io::EncodeError },
105    #[snafu(display("client error: {source}"))]
106    ClientError { source: ProgressError },
107}
108
109impl From<ExportBaoError> for Error {
110    fn from(e: ExportBaoError) -> Self {
111        match e {
112            ExportBaoError::Send { source, .. } => Self::Io(source.into()),
113            ExportBaoError::MpscRecv { source, .. } => Self::Io(source.into()),
114            ExportBaoError::OneshotRecv { source, .. } => Self::Io(source.into()),
115            ExportBaoError::Request { source, .. } => Self::Io(source.into()),
116            ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
117            ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
118            ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
119        }
120    }
121}
122
123impl From<irpc::Error> for ExportBaoError {
124    fn from(e: irpc::Error) -> Self {
125        match e {
126            irpc::Error::MpscRecv { source, .. } => MpscRecvSnafu.into_error(source),
127            irpc::Error::OneshotRecv { source, .. } => OneshotRecvSnafu.into_error(source),
128            irpc::Error::Send { source, .. } => SendSnafu.into_error(source),
129            irpc::Error::Request { source, .. } => RequestSnafu.into_error(source),
130            irpc::Error::Write { source, .. } => ExportBaoIoSnafu.into_error(source.into()),
131        }
132    }
133}
134
135impl From<io::Error> for ExportBaoError {
136    fn from(value: io::Error) -> Self {
137        ExportBaoIoSnafu.into_error(value)
138    }
139}
140
141impl From<irpc::channel::mpsc::RecvError> for ExportBaoError {
142    fn from(value: irpc::channel::mpsc::RecvError) -> Self {
143        MpscRecvSnafu.into_error(value)
144    }
145}
146
147impl From<irpc::channel::oneshot::RecvError> for ExportBaoError {
148    fn from(value: irpc::channel::oneshot::RecvError) -> Self {
149        OneshotRecvSnafu.into_error(value)
150    }
151}
152
153impl From<irpc::channel::SendError> for ExportBaoError {
154    fn from(value: irpc::channel::SendError) -> Self {
155        SendSnafu.into_error(value)
156    }
157}
158
159impl From<irpc::RequestError> for ExportBaoError {
160    fn from(value: irpc::RequestError) -> Self {
161        RequestSnafu.into_error(value)
162    }
163}
164
165impl From<bao_tree::io::EncodeError> for ExportBaoError {
166    fn from(value: bao_tree::io::EncodeError) -> Self {
167        ExportBaoInnerSnafu.into_error(value)
168    }
169}
170
171impl From<ProgressError> for ExportBaoError {
172    fn from(value: ProgressError) -> Self {
173        ClientSnafu.into_error(value)
174    }
175}
176
177pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
178
179#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
180pub enum Error {
181    #[serde(with = "crate::util::serde::io_error_serde")]
182    Io(io::Error),
183}
184
185impl Error {
186    pub fn io(
187        kind: io::ErrorKind,
188        msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
189    ) -> Self {
190        Self::Io(io::Error::new(kind, msg.into()))
191    }
192
193    pub fn other<E>(msg: E) -> Self
194    where
195        E: Into<Box<dyn std::error::Error + Send + Sync>>,
196    {
197        Self::Io(io::Error::other(msg.into()))
198    }
199}
200
201impl From<irpc::Error> for Error {
202    fn from(e: irpc::Error) -> Self {
203        Self::Io(e.into())
204    }
205}
206
207impl From<RequestError> for Error {
208    fn from(e: RequestError) -> Self {
209        match e {
210            RequestError::Rpc { source, .. } => Self::Io(source.into()),
211            RequestError::Inner { source, .. } => source,
212        }
213    }
214}
215
216impl From<irpc::channel::mpsc::RecvError> for Error {
217    fn from(e: irpc::channel::mpsc::RecvError) -> Self {
218        Self::Io(e.into())
219    }
220}
221
222#[cfg(feature = "rpc")]
223impl From<irpc::rpc::WriteError> for Error {
224    fn from(e: irpc::rpc::WriteError) -> Self {
225        Self::Io(e.into())
226    }
227}
228
229impl From<irpc::RequestError> for Error {
230    fn from(e: irpc::RequestError) -> Self {
231        Self::Io(e.into())
232    }
233}
234
235impl From<irpc::channel::SendError> for Error {
236    fn from(e: irpc::channel::SendError) -> Self {
237        Self::Io(e.into())
238    }
239}
240
241impl std::error::Error for Error {
242    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
243        match self {
244            Error::Io(e) => Some(e),
245        }
246    }
247}
248
249impl From<EncodeError> for Error {
250    fn from(value: EncodeError) -> Self {
251        match value {
252            EncodeError::Io(cause) => Self::Io(cause),
253            _ => Self::other(value),
254        }
255    }
256}
257
258pub type Result<T> = std::result::Result<T, Error>;
259
260/// The main entry point for the store API.
261#[derive(Debug, Clone, ref_cast::RefCast)]
262#[repr(transparent)]
263pub struct Store {
264    client: ApiClient,
265}
266
267impl Deref for Store {
268    type Target = blobs::Blobs;
269
270    fn deref(&self) -> &Self::Target {
271        blobs::Blobs::ref_from_sender(&self.client)
272    }
273}
274
275impl Store {
276    /// The tags API.
277    pub fn tags(&self) -> &Tags {
278        Tags::ref_from_sender(&self.client)
279    }
280
281    /// The blobs API.
282    pub fn blobs(&self) -> &blobs::Blobs {
283        blobs::Blobs::ref_from_sender(&self.client)
284    }
285
286    /// API for getting blobs from a *single* remote node.
287    pub fn remote(&self) -> &remote::Remote {
288        remote::Remote::ref_from_sender(&self.client)
289    }
290
291    /// Create a downloader for more complex downloads.
292    ///
293    /// Unlike the other APIs, this creates an object that has internal state,
294    /// so don't create it ad hoc but store it somewhere if you need it multiple
295    /// times.
296    pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
297        downloader::Downloader::new(self, endpoint)
298    }
299
300    /// Connect to a remote store as a rpc client.
301    #[cfg(feature = "rpc")]
302    pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self {
303        let sender = irpc::Client::quinn(endpoint, addr);
304        Store::from_sender(sender)
305    }
306
307    /// Listen on a quinn endpoint for incoming rpc connections.
308    #[cfg(feature = "rpc")]
309    pub async fn listen(self, endpoint: quinn::Endpoint) {
310        use irpc::rpc::RemoteService;
311
312        use self::proto::Request;
313        let local = self.client.as_local().unwrap().clone();
314        let handler = Request::remote_handler(local);
315        irpc::rpc::listen::<Request>(endpoint, handler).await
316    }
317
318    pub async fn sync_db(&self) -> RequestResult<()> {
319        let msg = SyncDbRequest;
320        self.client.rpc(msg).await??;
321        Ok(())
322    }
323
324    pub async fn shutdown(&self) -> irpc::Result<()> {
325        let msg = ShutdownRequest;
326        self.client.rpc(msg).await?;
327        Ok(())
328    }
329
330    /// Waits for the store to become completely idle.
331    ///
332    /// This is mostly useful for tests, where you want to check that e.g. the
333    /// store has written all data to disk.
334    ///
335    /// Note that a store is not guaranteed to become idle, if it is being
336    /// interacted with concurrently. So this might wait forever.
337    ///
338    /// Also note that once you get the callback, the store is not guaranteed to
339    /// still be idle. All this tells you that there was a point in time where
340    /// the store was idle between the call and the response.
341    pub async fn wait_idle(&self) -> irpc::Result<()> {
342        let msg = WaitIdleRequest;
343        self.client.rpc(msg).await?;
344        Ok(())
345    }
346
347    pub(crate) fn from_sender(client: ApiClient) -> Self {
348        Self { client }
349    }
350
351    pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
352        Self::ref_cast(client)
353    }
354}