Skip to main content

iroh_blobs/
api.rs

1//! The user facing API of the store.
2//!
3//! This API is both for interacting with an in-process store and for interacting
4//! with a remote store via rpc calls.
5//!
6//! The entry point for the api is the [`Store`] struct. There are several ways
7//! to obtain a `Store` instance: it is available via [`Deref`]
8//! from the different store implementations
9//! (e.g. [`MemStore`](crate::store::mem::MemStore)
10//! and [`FsStore`](crate::store::fs::FsStore)) as well as on the
11//! [`BlobsProtocol`](crate::BlobsProtocol) iroh protocol handler.
12//!
13//! You can also [`connect`](Store::connect) to a remote store that is listening
14//! to rpc requests.
15use std::{io, ops::Deref};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use n0_error::{e, stack_error};
20use proto::{ShutdownRequest, SyncDbRequest};
21use ref_cast::RefCast;
22use serde::{Deserialize, Serialize};
23use tags::Tags;
24
25pub mod blobs;
26pub mod downloader;
27pub mod proto;
28pub mod remote;
29pub mod tags;
30use crate::{api::proto::WaitIdleRequest, provider::events::ProgressError};
31pub use crate::{store::util::Tag, util::temp_tag::TempTag};
32
33pub(crate) type ApiClient = irpc::Client<proto::Request>;
34
35#[allow(missing_docs)]
36#[non_exhaustive]
37#[stack_error(derive, add_meta)]
38pub enum RequestError {
39    /// Request failed due to rpc error.
40    #[error("rpc error: {source}")]
41    Rpc { source: irpc::Error },
42    /// Request failed due an actual error.
43    #[error("inner error: {source}")]
44    Inner {
45        #[error(std_err)]
46        source: Error,
47    },
48}
49
50impl From<irpc::Error> for RequestError {
51    fn from(value: irpc::Error) -> Self {
52        e!(RequestError::Rpc, value)
53    }
54}
55
56impl From<Error> for RequestError {
57    fn from(value: Error) -> Self {
58        e!(RequestError::Inner, value)
59    }
60}
61
62impl From<io::Error> for RequestError {
63    fn from(value: io::Error) -> Self {
64        e!(RequestError::Inner, value.into())
65    }
66}
67
68impl From<irpc::channel::mpsc::RecvError> for RequestError {
69    fn from(value: irpc::channel::mpsc::RecvError) -> Self {
70        e!(RequestError::Rpc, value.into())
71    }
72}
73
74pub type RequestResult<T> = std::result::Result<T, RequestError>;
75
76#[allow(missing_docs)]
77#[non_exhaustive]
78#[stack_error(derive, add_meta, from_sources)]
79pub enum ExportBaoError {
80    #[error("send error")]
81    Send { source: irpc::channel::SendError },
82    #[error("mpsc recv e api.acp.pro-channelsrror")]
83    MpscRecv {
84        source: irpc::channel::mpsc::RecvError,
85    },
86    #[error("oneshot recv error")]
87    OneshotRecv {
88        source: irpc::channel::oneshot::RecvError,
89    },
90    #[error("request error")]
91    Request { source: irpc::RequestError },
92    #[error("io error")]
93    ExportBaoIo {
94        #[error(std_err)]
95        source: io::Error,
96    },
97    #[error("encode error")]
98    ExportBaoInner {
99        #[error(std_err)]
100        source: bao_tree::io::EncodeError,
101    },
102    #[error("client error")]
103    ClientError { source: ProgressError },
104}
105
106impl From<ExportBaoError> for Error {
107    fn from(e: ExportBaoError) -> Self {
108        match e {
109            ExportBaoError::Send { source, .. } => Self::Io(source.into()),
110            ExportBaoError::MpscRecv { source, .. } => Self::Io(source.into()),
111            ExportBaoError::OneshotRecv { source, .. } => Self::Io(source.into()),
112            ExportBaoError::Request { source, .. } => Self::Io(source.into()),
113            ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
114            ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
115            ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
116        }
117    }
118}
119
120impl From<irpc::Error> for ExportBaoError {
121    fn from(e: irpc::Error) -> Self {
122        match e {
123            irpc::Error::MpscRecv { source: e, .. } => e!(ExportBaoError::MpscRecv, e),
124            irpc::Error::OneshotRecv { source: e, .. } => e!(ExportBaoError::OneshotRecv, e),
125            irpc::Error::Send { source: e, .. } => e!(ExportBaoError::Send, e),
126            irpc::Error::Request { source: e, .. } => e!(ExportBaoError::Request, e),
127            #[cfg(feature = "rpc")]
128            #[cfg_attr(iroh_blobs_docsrs, doc(cfg(feature = "rpc")))]
129            irpc::Error::Write { source: e, .. } => e!(ExportBaoError::ExportBaoIo, e.into()),
130        }
131    }
132}
133
134pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
135
136#[derive(Serialize, Deserialize)]
137#[stack_error(derive, std_sources, from_sources)]
138pub enum Error {
139    #[serde(with = "crate::util::serde::io_error_serde")]
140    Io(#[error(source)] io::Error),
141}
142
143impl Error {
144    pub fn io(
145        kind: io::ErrorKind,
146        msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
147    ) -> Self {
148        Self::Io(io::Error::new(kind, msg.into()))
149    }
150
151    pub fn other<E>(msg: E) -> Self
152    where
153        E: Into<Box<dyn std::error::Error + Send + Sync>>,
154    {
155        Self::Io(io::Error::other(msg.into()))
156    }
157}
158
159impl From<irpc::Error> for Error {
160    fn from(e: irpc::Error) -> Self {
161        Self::Io(e.into())
162    }
163}
164
165impl From<RequestError> for Error {
166    fn from(e: RequestError) -> Self {
167        match e {
168            RequestError::Rpc { source, .. } => Self::Io(source.into()),
169            RequestError::Inner { source, .. } => source,
170        }
171    }
172}
173
174impl From<irpc::channel::mpsc::RecvError> for Error {
175    fn from(e: irpc::channel::mpsc::RecvError) -> Self {
176        Self::Io(e.into())
177    }
178}
179
180#[cfg(feature = "rpc")]
181#[cfg_attr(iroh_blobs_docsrs, doc(cfg(feature = "rpc")))]
182impl From<irpc::rpc::WriteError> for Error {
183    fn from(e: irpc::rpc::WriteError) -> Self {
184        Self::Io(e.into())
185    }
186}
187
188impl From<irpc::RequestError> for Error {
189    fn from(e: irpc::RequestError) -> Self {
190        Self::Io(e.into())
191    }
192}
193
194impl From<irpc::channel::SendError> for Error {
195    fn from(e: irpc::channel::SendError) -> Self {
196        Self::Io(e.into())
197    }
198}
199
200impl From<EncodeError> for Error {
201    fn from(value: EncodeError) -> Self {
202        match value {
203            EncodeError::Io(cause) => Self::Io(cause),
204            _ => Self::Io(io::Error::other(value)),
205        }
206    }
207}
208
209pub type Result<T> = std::result::Result<T, Error>;
210
211/// The main entry point for the store API.
212#[derive(Debug, Clone, ref_cast::RefCast)]
213#[repr(transparent)]
214pub struct Store {
215    client: ApiClient,
216}
217
218impl Deref for Store {
219    type Target = blobs::Blobs;
220
221    fn deref(&self) -> &Self::Target {
222        blobs::Blobs::ref_from_sender(&self.client)
223    }
224}
225
226impl Store {
227    /// The tags API.
228    pub fn tags(&self) -> &Tags {
229        Tags::ref_from_sender(&self.client)
230    }
231
232    /// The blobs API.
233    pub fn blobs(&self) -> &blobs::Blobs {
234        blobs::Blobs::ref_from_sender(&self.client)
235    }
236
237    /// API for getting blobs from a *single* remote node.
238    pub fn remote(&self) -> &remote::Remote {
239        remote::Remote::ref_from_sender(&self.client)
240    }
241
242    /// Create a downloader for more complex downloads.
243    ///
244    /// Unlike the other APIs, this creates an object that has internal state,
245    /// so don't create it ad hoc but store it somewhere if you need it multiple
246    /// times.
247    pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
248        downloader::Downloader::new(self, endpoint)
249    }
250
251    /// Connect to a remote store as a rpc client.
252    #[cfg(feature = "rpc")]
253    #[cfg_attr(iroh_blobs_docsrs, doc(cfg(feature = "rpc")))]
254    pub fn connect(endpoint: noq::Endpoint, addr: std::net::SocketAddr) -> Self {
255        let sender = irpc::Client::noq(endpoint, addr);
256        Store::from_sender(sender)
257    }
258
259    /// Listen on a noq endpoint for incoming rpc connections.
260    #[cfg(feature = "rpc")]
261    #[cfg_attr(iroh_blobs_docsrs, doc(cfg(feature = "rpc")))]
262    pub async fn listen(self, endpoint: noq::Endpoint) {
263        use irpc::rpc::RemoteService;
264
265        use self::proto::Request;
266        let local = self.client.as_local().unwrap().clone();
267        let handler = Request::remote_handler(local);
268        irpc::rpc::listen::<Request>(endpoint, handler).await
269    }
270
271    pub async fn sync_db(&self) -> RequestResult<()> {
272        let msg = SyncDbRequest;
273        self.client.rpc(msg).await??;
274        Ok(())
275    }
276
277    pub async fn shutdown(&self) -> irpc::Result<()> {
278        let msg = ShutdownRequest;
279        self.client.rpc(msg).await?;
280        Ok(())
281    }
282
283    /// Waits for the store to become completely idle.
284    ///
285    /// This is mostly useful for tests, where you want to check that e.g. the
286    /// store has written all data to disk.
287    ///
288    /// Note that a store is not guaranteed to become idle, if it is being
289    /// interacted with concurrently. So this might wait forever.
290    ///
291    /// Also note that once you get the callback, the store is not guaranteed to
292    /// still be idle. All this tells you that there was a point in time where
293    /// the store was idle between the call and the response.
294    pub async fn wait_idle(&self) -> irpc::Result<()> {
295        let msg = WaitIdleRequest;
296        self.client.rpc(msg).await?;
297        Ok(())
298    }
299
300    pub(crate) fn from_sender(client: ApiClient) -> Self {
301        Self { client }
302    }
303
304    pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
305        Self::ref_cast(client)
306    }
307}