iroh_blobs/
api.rs

1//! The user facing API of the store.
2//!
3//! This API is both for interacting with an in-process store and for interacting
4//! with a remote store via rpc calls.
5//!
6//! The entry point for the api is the [`Store`] struct. There are several ways
7//! to obtain a `Store` instance: it is available via [`Deref`]
8//! from the different store implementations
9//! (e.g. [`MemStore`](crate::store::mem::MemStore)
10//! and [`FsStore`](crate::store::fs::FsStore)) as well as on the
11//! [`BlobsProtocol`](crate::BlobsProtocol) iroh protocol handler.
12//!
13//! You can also [`connect`](Store::connect) to a remote store that is listening
14//! to rpc requests.
15use std::{io, ops::Deref};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use n0_snafu::SpanTrace;
20use nested_enum_utils::common_fields;
21use proto::{ShutdownRequest, SyncDbRequest};
22use ref_cast::RefCast;
23use serde::{Deserialize, Serialize};
24use snafu::{Backtrace, IntoError, Snafu};
25use tags::Tags;
26
27pub mod blobs;
28pub mod downloader;
29pub mod proto;
30pub mod remote;
31pub mod tags;
32use crate::{api::proto::WaitIdleRequest, provider::events::ProgressError};
33pub use crate::{store::util::Tag, util::temp_tag::TempTag};
34
35pub(crate) type ApiClient = irpc::Client<proto::Request>;
36
37#[common_fields({
38    backtrace: Option<Backtrace>,
39    #[snafu(implicit)]
40    span_trace: SpanTrace,
41})]
42#[allow(missing_docs)]
43#[non_exhaustive]
44#[derive(Debug, Snafu)]
45pub enum RequestError {
46    /// Request failed due to rpc error.
47    #[snafu(display("rpc error: {source}"))]
48    Rpc { source: irpc::Error },
49    /// Request failed due an actual error.
50    #[snafu(display("inner error: {source}"))]
51    Inner { source: Error },
52}
53
54impl From<irpc::Error> for RequestError {
55    fn from(value: irpc::Error) -> Self {
56        RpcSnafu.into_error(value)
57    }
58}
59
60impl From<Error> for RequestError {
61    fn from(value: Error) -> Self {
62        InnerSnafu.into_error(value)
63    }
64}
65
66impl From<io::Error> for RequestError {
67    fn from(value: io::Error) -> Self {
68        InnerSnafu.into_error(value.into())
69    }
70}
71
72impl From<irpc::channel::mpsc::RecvError> for RequestError {
73    fn from(value: irpc::channel::mpsc::RecvError) -> Self {
74        RpcSnafu.into_error(value.into())
75    }
76}
77
78pub type RequestResult<T> = std::result::Result<T, RequestError>;
79
80#[common_fields({
81    backtrace: Option<Backtrace>,
82    #[snafu(implicit)]
83    span_trace: SpanTrace,
84})]
85#[allow(missing_docs)]
86#[non_exhaustive]
87#[derive(Debug, Snafu)]
88pub enum ExportBaoError {
89    #[snafu(display("send error: {source}"))]
90    Send { source: irpc::channel::SendError },
91    #[snafu(display("mpsc recv error: {source}"))]
92    MpscRecv {
93        source: irpc::channel::mpsc::RecvError,
94    },
95    #[snafu(display("oneshot recv error: {source}"))]
96    OneshotRecv {
97        source: irpc::channel::oneshot::RecvError,
98    },
99    #[snafu(display("request error: {source}"))]
100    Request { source: irpc::RequestError },
101    #[snafu(display("io error: {source}"))]
102    ExportBaoIo { source: io::Error },
103    #[snafu(display("encode error: {source}"))]
104    ExportBaoInner { source: bao_tree::io::EncodeError },
105    #[snafu(display("client error: {source}"))]
106    ClientError { source: ProgressError },
107}
108
109impl From<ExportBaoError> for Error {
110    fn from(e: ExportBaoError) -> Self {
111        match e {
112            ExportBaoError::Send { source, .. } => Self::Io(source.into()),
113            ExportBaoError::MpscRecv { source, .. } => Self::Io(source.into()),
114            ExportBaoError::OneshotRecv { source, .. } => Self::Io(source.into()),
115            ExportBaoError::Request { source, .. } => Self::Io(source.into()),
116            ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
117            ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
118            ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
119        }
120    }
121}
122
123impl From<irpc::Error> for ExportBaoError {
124    fn from(e: irpc::Error) -> Self {
125        match e {
126            irpc::Error::MpscRecv(e) => MpscRecvSnafu.into_error(e),
127            irpc::Error::OneshotRecv(e) => OneshotRecvSnafu.into_error(e),
128            irpc::Error::Send(e) => SendSnafu.into_error(e),
129            irpc::Error::Request(e) => RequestSnafu.into_error(e),
130            #[cfg(feature = "rpc")]
131            irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()),
132        }
133    }
134}
135
136impl From<io::Error> for ExportBaoError {
137    fn from(value: io::Error) -> Self {
138        ExportBaoIoSnafu.into_error(value)
139    }
140}
141
142impl From<irpc::channel::mpsc::RecvError> for ExportBaoError {
143    fn from(value: irpc::channel::mpsc::RecvError) -> Self {
144        MpscRecvSnafu.into_error(value)
145    }
146}
147
148impl From<irpc::channel::oneshot::RecvError> for ExportBaoError {
149    fn from(value: irpc::channel::oneshot::RecvError) -> Self {
150        OneshotRecvSnafu.into_error(value)
151    }
152}
153
154impl From<irpc::channel::SendError> for ExportBaoError {
155    fn from(value: irpc::channel::SendError) -> Self {
156        SendSnafu.into_error(value)
157    }
158}
159
160impl From<irpc::RequestError> for ExportBaoError {
161    fn from(value: irpc::RequestError) -> Self {
162        RequestSnafu.into_error(value)
163    }
164}
165
166impl From<bao_tree::io::EncodeError> for ExportBaoError {
167    fn from(value: bao_tree::io::EncodeError) -> Self {
168        ExportBaoInnerSnafu.into_error(value)
169    }
170}
171
172impl From<ProgressError> for ExportBaoError {
173    fn from(value: ProgressError) -> Self {
174        ClientSnafu.into_error(value)
175    }
176}
177
178pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
179
180#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
181pub enum Error {
182    #[serde(with = "crate::util::serde::io_error_serde")]
183    Io(io::Error),
184}
185
186impl Error {
187    pub fn io(
188        kind: io::ErrorKind,
189        msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
190    ) -> Self {
191        Self::Io(io::Error::new(kind, msg.into()))
192    }
193
194    pub fn other<E>(msg: E) -> Self
195    where
196        E: Into<Box<dyn std::error::Error + Send + Sync>>,
197    {
198        Self::Io(io::Error::other(msg.into()))
199    }
200}
201
202impl From<irpc::Error> for Error {
203    fn from(e: irpc::Error) -> Self {
204        Self::Io(e.into())
205    }
206}
207
208impl From<RequestError> for Error {
209    fn from(e: RequestError) -> Self {
210        match e {
211            RequestError::Rpc { source, .. } => Self::Io(source.into()),
212            RequestError::Inner { source, .. } => source,
213        }
214    }
215}
216
217impl From<irpc::channel::mpsc::RecvError> for Error {
218    fn from(e: irpc::channel::mpsc::RecvError) -> Self {
219        Self::Io(e.into())
220    }
221}
222
223#[cfg(feature = "rpc")]
224impl From<irpc::rpc::WriteError> for Error {
225    fn from(e: irpc::rpc::WriteError) -> Self {
226        Self::Io(e.into())
227    }
228}
229
230impl From<irpc::RequestError> for Error {
231    fn from(e: irpc::RequestError) -> Self {
232        Self::Io(e.into())
233    }
234}
235
236impl From<irpc::channel::SendError> for Error {
237    fn from(e: irpc::channel::SendError) -> Self {
238        Self::Io(e.into())
239    }
240}
241
242impl std::error::Error for Error {
243    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
244        match self {
245            Error::Io(e) => Some(e),
246        }
247    }
248}
249
250impl From<EncodeError> for Error {
251    fn from(value: EncodeError) -> Self {
252        match value {
253            EncodeError::Io(cause) => Self::Io(cause),
254            _ => Self::other(value),
255        }
256    }
257}
258
259pub type Result<T> = std::result::Result<T, Error>;
260
261/// The main entry point for the store API.
262#[derive(Debug, Clone, ref_cast::RefCast)]
263#[repr(transparent)]
264pub struct Store {
265    client: ApiClient,
266}
267
268impl Deref for Store {
269    type Target = blobs::Blobs;
270
271    fn deref(&self) -> &Self::Target {
272        blobs::Blobs::ref_from_sender(&self.client)
273    }
274}
275
276impl Store {
277    /// The tags API.
278    pub fn tags(&self) -> &Tags {
279        Tags::ref_from_sender(&self.client)
280    }
281
282    /// The blobs API.
283    pub fn blobs(&self) -> &blobs::Blobs {
284        blobs::Blobs::ref_from_sender(&self.client)
285    }
286
287    /// API for getting blobs from a *single* remote node.
288    pub fn remote(&self) -> &remote::Remote {
289        remote::Remote::ref_from_sender(&self.client)
290    }
291
292    /// Create a downloader for more complex downloads.
293    ///
294    /// Unlike the other APIs, this creates an object that has internal state,
295    /// so don't create it ad hoc but store it somewhere if you need it multiple
296    /// times.
297    pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
298        downloader::Downloader::new(self, endpoint)
299    }
300
301    /// Connect to a remote store as a rpc client.
302    #[cfg(feature = "rpc")]
303    pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self {
304        let sender = irpc::Client::quinn(endpoint, addr);
305        Store::from_sender(sender)
306    }
307
308    /// Listen on a quinn endpoint for incoming rpc connections.
309    #[cfg(feature = "rpc")]
310    pub async fn listen(self, endpoint: quinn::Endpoint) {
311        use irpc::rpc::RemoteService;
312
313        use self::proto::Request;
314        let local = self.client.as_local().unwrap().clone();
315        let handler = Request::remote_handler(local);
316        irpc::rpc::listen::<Request>(endpoint, handler).await
317    }
318
319    pub async fn sync_db(&self) -> RequestResult<()> {
320        let msg = SyncDbRequest;
321        self.client.rpc(msg).await??;
322        Ok(())
323    }
324
325    pub async fn shutdown(&self) -> irpc::Result<()> {
326        let msg = ShutdownRequest;
327        self.client.rpc(msg).await?;
328        Ok(())
329    }
330
331    /// Waits for the store to become completely idle.
332    ///
333    /// This is mostly useful for tests, where you want to check that e.g. the
334    /// store has written all data to disk.
335    ///
336    /// Note that a store is not guaranteed to become idle, if it is being
337    /// interacted with concurrently. So this might wait forever.
338    ///
339    /// Also note that once you get the callback, the store is not guaranteed to
340    /// still be idle. All this tells you that there was a point in time where
341    /// the store was idle between the call and the response.
342    pub async fn wait_idle(&self) -> irpc::Result<()> {
343        let msg = WaitIdleRequest;
344        self.client.rpc(msg).await?;
345        Ok(())
346    }
347
348    pub(crate) fn from_sender(client: ApiClient) -> Self {
349        Self { client }
350    }
351
352    pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
353        Self::ref_cast(client)
354    }
355}