iroh_blobs/
api.rs

1//! The user facing API of the store.
2//!
3//! This API is both for interacting with an in-process store and for interacting
4//! with a remote store via rpc calls.
5//!
6//! The entry point for the api is the [`Store`] struct. There are several ways
7//! to obtain a `Store` instance: it is available via [`Deref`]
8//! from the different store implementations
9//! (e.g. [`MemStore`](crate::store::mem::MemStore)
10//! and [`FsStore`](crate::store::fs::FsStore)) as well as on the
11//! [`BlobsProtocol`](crate::BlobsProtocol) iroh protocol handler.
12//!
13//! You can also [`connect`](Store::connect) to a remote store that is listening
14//! to rpc requests.
15use std::{io, net::SocketAddr, ops::Deref};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use irpc::rpc::{listen, RemoteService};
20use n0_error::{e, stack_error};
21use proto::{Request, ShutdownRequest, SyncDbRequest};
22use ref_cast::RefCast;
23use serde::{Deserialize, Serialize};
24use tags::Tags;
25
26pub mod blobs;
27pub mod downloader;
28pub mod proto;
29pub mod remote;
30pub mod tags;
31use crate::{api::proto::WaitIdleRequest, provider::events::ProgressError};
32pub use crate::{store::util::Tag, util::temp_tag::TempTag};
33
34pub(crate) type ApiClient = irpc::Client<proto::Request>;
35
36#[allow(missing_docs)]
37#[non_exhaustive]
38#[stack_error(derive, add_meta)]
39pub enum RequestError {
40    /// Request failed due to rpc error.
41    #[error("rpc error: {source}")]
42    Rpc { source: irpc::Error },
43    /// Request failed due an actual error.
44    #[error("inner error: {source}")]
45    Inner {
46        #[error(std_err)]
47        source: Error,
48    },
49}
50
51impl From<irpc::Error> for RequestError {
52    fn from(value: irpc::Error) -> Self {
53        e!(RequestError::Rpc, value)
54    }
55}
56
57impl From<Error> for RequestError {
58    fn from(value: Error) -> Self {
59        e!(RequestError::Inner, value)
60    }
61}
62
63impl From<io::Error> for RequestError {
64    fn from(value: io::Error) -> Self {
65        e!(RequestError::Inner, value.into())
66    }
67}
68
69impl From<irpc::channel::mpsc::RecvError> for RequestError {
70    fn from(value: irpc::channel::mpsc::RecvError) -> Self {
71        e!(RequestError::Rpc, value.into())
72    }
73}
74
75pub type RequestResult<T> = std::result::Result<T, RequestError>;
76
77#[allow(missing_docs)]
78#[non_exhaustive]
79#[stack_error(derive, add_meta, from_sources)]
80pub enum ExportBaoError {
81    #[error("send error")]
82    Send { source: irpc::channel::SendError },
83    #[error("mpsc recv e api.acp.pro-channelsrror")]
84    MpscRecv {
85        source: irpc::channel::mpsc::RecvError,
86    },
87    #[error("oneshot recv error")]
88    OneshotRecv {
89        source: irpc::channel::oneshot::RecvError,
90    },
91    #[error("request error")]
92    Request { source: irpc::RequestError },
93    #[error("io error")]
94    ExportBaoIo {
95        #[error(std_err)]
96        source: io::Error,
97    },
98    #[error("encode error")]
99    ExportBaoInner {
100        #[error(std_err)]
101        source: bao_tree::io::EncodeError,
102    },
103    #[error("client error")]
104    ClientError { source: ProgressError },
105}
106
107impl From<ExportBaoError> for Error {
108    fn from(e: ExportBaoError) -> Self {
109        match e {
110            ExportBaoError::Send { source, .. } => Self::Io(source.into()),
111            ExportBaoError::MpscRecv { source, .. } => Self::Io(source.into()),
112            ExportBaoError::OneshotRecv { source, .. } => Self::Io(source.into()),
113            ExportBaoError::Request { source, .. } => Self::Io(source.into()),
114            ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
115            ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
116            ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
117        }
118    }
119}
120
121impl From<irpc::Error> for ExportBaoError {
122    fn from(e: irpc::Error) -> Self {
123        match e {
124            irpc::Error::MpscRecv { source: e, .. } => e!(ExportBaoError::MpscRecv, e),
125            irpc::Error::OneshotRecv { source: e, .. } => e!(ExportBaoError::OneshotRecv, e),
126            irpc::Error::Send { source: e, .. } => e!(ExportBaoError::Send, e),
127            irpc::Error::Request { source: e, .. } => e!(ExportBaoError::Request, e),
128            irpc::Error::Write { source: e, .. } => e!(ExportBaoError::ExportBaoIo, e.into()),
129        }
130    }
131}
132
133pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
134
135#[derive(Serialize, Deserialize)]
136#[stack_error(derive, std_sources, from_sources)]
137pub enum Error {
138    #[serde(with = "crate::util::serde::io_error_serde")]
139    Io(#[error(source)] io::Error),
140}
141
142impl Error {
143    pub fn io(
144        kind: io::ErrorKind,
145        msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
146    ) -> Self {
147        Self::Io(io::Error::new(kind, msg.into()))
148    }
149
150    pub fn other<E>(msg: E) -> Self
151    where
152        E: Into<Box<dyn std::error::Error + Send + Sync>>,
153    {
154        Self::Io(io::Error::other(msg.into()))
155    }
156}
157
158impl From<irpc::Error> for Error {
159    fn from(e: irpc::Error) -> Self {
160        Self::Io(e.into())
161    }
162}
163
164impl From<RequestError> for Error {
165    fn from(e: RequestError) -> Self {
166        match e {
167            RequestError::Rpc { source, .. } => Self::Io(source.into()),
168            RequestError::Inner { source, .. } => source,
169        }
170    }
171}
172
173impl From<irpc::channel::mpsc::RecvError> for Error {
174    fn from(e: irpc::channel::mpsc::RecvError) -> Self {
175        Self::Io(e.into())
176    }
177}
178
179impl From<irpc::rpc::WriteError> for Error {
180    fn from(e: irpc::rpc::WriteError) -> Self {
181        Self::Io(e.into())
182    }
183}
184
185impl From<irpc::RequestError> for Error {
186    fn from(e: irpc::RequestError) -> Self {
187        Self::Io(e.into())
188    }
189}
190
191impl From<irpc::channel::SendError> for Error {
192    fn from(e: irpc::channel::SendError) -> Self {
193        Self::Io(e.into())
194    }
195}
196
197impl From<EncodeError> for Error {
198    fn from(value: EncodeError) -> Self {
199        match value {
200            EncodeError::Io(cause) => Self::Io(cause),
201            _ => Self::other(value),
202        }
203    }
204}
205
206pub type Result<T> = std::result::Result<T, Error>;
207
208/// The main entry point for the store API.
209#[derive(Debug, Clone, ref_cast::RefCast)]
210#[repr(transparent)]
211pub struct Store {
212    client: ApiClient,
213}
214
215impl Deref for Store {
216    type Target = blobs::Blobs;
217
218    fn deref(&self) -> &Self::Target {
219        blobs::Blobs::ref_from_sender(&self.client)
220    }
221}
222
223impl Store {
224    /// The tags API.
225    pub fn tags(&self) -> &Tags {
226        Tags::ref_from_sender(&self.client)
227    }
228
229    /// The blobs API.
230    pub fn blobs(&self) -> &blobs::Blobs {
231        blobs::Blobs::ref_from_sender(&self.client)
232    }
233
234    /// API for getting blobs from a *single* remote node.
235    pub fn remote(&self) -> &remote::Remote {
236        remote::Remote::ref_from_sender(&self.client)
237    }
238
239    /// Create a downloader for more complex downloads.
240    ///
241    /// Unlike the other APIs, this creates an object that has internal state,
242    /// so don't create it ad hoc but store it somewhere if you need it multiple
243    /// times.
244    pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
245        downloader::Downloader::new(self, endpoint)
246    }
247
248    /// Connect to a remote store as a rpc client.
249    pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self {
250        let sender = irpc::Client::quinn(endpoint, addr);
251        Store::from_sender(sender)
252    }
253
254    /// Listen on a quinn endpoint for incoming rpc connections.
255    pub async fn listen(self, endpoint: quinn::Endpoint) {
256        let local = self.client.as_local().unwrap().clone();
257        let handler = Request::remote_handler(local);
258        listen::<Request>(endpoint, handler).await
259    }
260
261    pub async fn sync_db(&self) -> RequestResult<()> {
262        let msg = SyncDbRequest;
263        self.client.rpc(msg).await??;
264        Ok(())
265    }
266
267    pub async fn shutdown(&self) -> irpc::Result<()> {
268        let msg = ShutdownRequest;
269        self.client.rpc(msg).await?;
270        Ok(())
271    }
272
273    /// Waits for the store to become completely idle.
274    ///
275    /// This is mostly useful for tests, where you want to check that e.g. the
276    /// store has written all data to disk.
277    ///
278    /// Note that a store is not guaranteed to become idle, if it is being
279    /// interacted with concurrently. So this might wait forever.
280    ///
281    /// Also note that once you get the callback, the store is not guaranteed to
282    /// still be idle. All this tells you that there was a point in time where
283    /// the store was idle between the call and the response.
284    pub async fn wait_idle(&self) -> irpc::Result<()> {
285        let msg = WaitIdleRequest;
286        self.client.rpc(msg).await?;
287        Ok(())
288    }
289
290    pub(crate) fn from_sender(client: ApiClient) -> Self {
291        Self { client }
292    }
293
294    pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
295        Self::ref_cast(client)
296    }
297}