1use std::{io, ops::Deref};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use n0_snafu::SpanTrace;
20use nested_enum_utils::common_fields;
21use proto::{ShutdownRequest, SyncDbRequest};
22use ref_cast::RefCast;
23use serde::{Deserialize, Serialize};
24use snafu::{Backtrace, IntoError, Snafu};
25use tags::Tags;
26
27pub mod blobs;
28pub mod downloader;
29pub mod proto;
30pub mod remote;
31pub mod tags;
32use crate::{api::proto::WaitIdleRequest, provider::events::ProgressError};
33pub use crate::{store::util::Tag, util::temp_tag::TempTag};
34
35pub(crate) type ApiClient = irpc::Client<proto::Request>;
36
37#[common_fields({
38 backtrace: Option<Backtrace>,
39 #[snafu(implicit)]
40 span_trace: SpanTrace,
41})]
42#[allow(missing_docs)]
43#[non_exhaustive]
44#[derive(Debug, Snafu)]
45pub enum RequestError {
46 #[snafu(display("rpc error: {source}"))]
48 Rpc { source: irpc::Error },
49 #[snafu(display("inner error: {source}"))]
51 Inner { source: Error },
52}
53
54impl From<irpc::Error> for RequestError {
55 fn from(value: irpc::Error) -> Self {
56 RpcSnafu.into_error(value)
57 }
58}
59
60impl From<Error> for RequestError {
61 fn from(value: Error) -> Self {
62 InnerSnafu.into_error(value)
63 }
64}
65
66impl From<io::Error> for RequestError {
67 fn from(value: io::Error) -> Self {
68 InnerSnafu.into_error(value.into())
69 }
70}
71
72impl From<irpc::channel::mpsc::RecvError> for RequestError {
73 fn from(value: irpc::channel::mpsc::RecvError) -> Self {
74 RpcSnafu.into_error(value.into())
75 }
76}
77
78pub type RequestResult<T> = std::result::Result<T, RequestError>;
79
80#[common_fields({
81 backtrace: Option<Backtrace>,
82 #[snafu(implicit)]
83 span_trace: SpanTrace,
84})]
85#[allow(missing_docs)]
86#[non_exhaustive]
87#[derive(Debug, Snafu)]
88pub enum ExportBaoError {
89 #[snafu(display("send error: {source}"))]
90 Send { source: irpc::channel::SendError },
91 #[snafu(display("mpsc recv error: {source}"))]
92 MpscRecv {
93 source: irpc::channel::mpsc::RecvError,
94 },
95 #[snafu(display("oneshot recv error: {source}"))]
96 OneshotRecv {
97 source: irpc::channel::oneshot::RecvError,
98 },
99 #[snafu(display("request error: {source}"))]
100 Request { source: irpc::RequestError },
101 #[snafu(display("io error: {source}"))]
102 ExportBaoIo { source: io::Error },
103 #[snafu(display("encode error: {source}"))]
104 ExportBaoInner { source: bao_tree::io::EncodeError },
105 #[snafu(display("client error: {source}"))]
106 ClientError { source: ProgressError },
107}
108
109impl From<ExportBaoError> for Error {
110 fn from(e: ExportBaoError) -> Self {
111 match e {
112 ExportBaoError::Send { source, .. } => Self::Io(source.into()),
113 ExportBaoError::MpscRecv { source, .. } => Self::Io(source.into()),
114 ExportBaoError::OneshotRecv { source, .. } => Self::Io(source.into()),
115 ExportBaoError::Request { source, .. } => Self::Io(source.into()),
116 ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
117 ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
118 ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
119 }
120 }
121}
122
123impl From<irpc::Error> for ExportBaoError {
124 fn from(e: irpc::Error) -> Self {
125 match e {
126 irpc::Error::MpscRecv { source, .. } => MpscRecvSnafu.into_error(source),
127 irpc::Error::OneshotRecv { source, .. } => OneshotRecvSnafu.into_error(source),
128 irpc::Error::Send { source, .. } => SendSnafu.into_error(source),
129 irpc::Error::Request { source, .. } => RequestSnafu.into_error(source),
130 irpc::Error::Write { source, .. } => ExportBaoIoSnafu.into_error(source.into()),
131 }
132 }
133}
134
135impl From<io::Error> for ExportBaoError {
136 fn from(value: io::Error) -> Self {
137 ExportBaoIoSnafu.into_error(value)
138 }
139}
140
141impl From<irpc::channel::mpsc::RecvError> for ExportBaoError {
142 fn from(value: irpc::channel::mpsc::RecvError) -> Self {
143 MpscRecvSnafu.into_error(value)
144 }
145}
146
147impl From<irpc::channel::oneshot::RecvError> for ExportBaoError {
148 fn from(value: irpc::channel::oneshot::RecvError) -> Self {
149 OneshotRecvSnafu.into_error(value)
150 }
151}
152
153impl From<irpc::channel::SendError> for ExportBaoError {
154 fn from(value: irpc::channel::SendError) -> Self {
155 SendSnafu.into_error(value)
156 }
157}
158
159impl From<irpc::RequestError> for ExportBaoError {
160 fn from(value: irpc::RequestError) -> Self {
161 RequestSnafu.into_error(value)
162 }
163}
164
165impl From<bao_tree::io::EncodeError> for ExportBaoError {
166 fn from(value: bao_tree::io::EncodeError) -> Self {
167 ExportBaoInnerSnafu.into_error(value)
168 }
169}
170
171impl From<ProgressError> for ExportBaoError {
172 fn from(value: ProgressError) -> Self {
173 ClientSnafu.into_error(value)
174 }
175}
176
177pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
178
179#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
180pub enum Error {
181 #[serde(with = "crate::util::serde::io_error_serde")]
182 Io(io::Error),
183}
184
185impl Error {
186 pub fn io(
187 kind: io::ErrorKind,
188 msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
189 ) -> Self {
190 Self::Io(io::Error::new(kind, msg.into()))
191 }
192
193 pub fn other<E>(msg: E) -> Self
194 where
195 E: Into<Box<dyn std::error::Error + Send + Sync>>,
196 {
197 Self::Io(io::Error::other(msg.into()))
198 }
199}
200
201impl From<irpc::Error> for Error {
202 fn from(e: irpc::Error) -> Self {
203 Self::Io(e.into())
204 }
205}
206
207impl From<RequestError> for Error {
208 fn from(e: RequestError) -> Self {
209 match e {
210 RequestError::Rpc { source, .. } => Self::Io(source.into()),
211 RequestError::Inner { source, .. } => source,
212 }
213 }
214}
215
216impl From<irpc::channel::mpsc::RecvError> for Error {
217 fn from(e: irpc::channel::mpsc::RecvError) -> Self {
218 Self::Io(e.into())
219 }
220}
221
222#[cfg(feature = "rpc")]
223impl From<irpc::rpc::WriteError> for Error {
224 fn from(e: irpc::rpc::WriteError) -> Self {
225 Self::Io(e.into())
226 }
227}
228
229impl From<irpc::RequestError> for Error {
230 fn from(e: irpc::RequestError) -> Self {
231 Self::Io(e.into())
232 }
233}
234
235impl From<irpc::channel::SendError> for Error {
236 fn from(e: irpc::channel::SendError) -> Self {
237 Self::Io(e.into())
238 }
239}
240
241impl std::error::Error for Error {
242 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
243 match self {
244 Error::Io(e) => Some(e),
245 }
246 }
247}
248
249impl From<EncodeError> for Error {
250 fn from(value: EncodeError) -> Self {
251 match value {
252 EncodeError::Io(cause) => Self::Io(cause),
253 _ => Self::other(value),
254 }
255 }
256}
257
258pub type Result<T> = std::result::Result<T, Error>;
259
260#[derive(Debug, Clone, ref_cast::RefCast)]
262#[repr(transparent)]
263pub struct Store {
264 client: ApiClient,
265}
266
267impl Deref for Store {
268 type Target = blobs::Blobs;
269
270 fn deref(&self) -> &Self::Target {
271 blobs::Blobs::ref_from_sender(&self.client)
272 }
273}
274
275impl Store {
276 pub fn tags(&self) -> &Tags {
278 Tags::ref_from_sender(&self.client)
279 }
280
281 pub fn blobs(&self) -> &blobs::Blobs {
283 blobs::Blobs::ref_from_sender(&self.client)
284 }
285
286 pub fn remote(&self) -> &remote::Remote {
288 remote::Remote::ref_from_sender(&self.client)
289 }
290
291 pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
297 downloader::Downloader::new(self, endpoint)
298 }
299
300 #[cfg(feature = "rpc")]
302 pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self {
303 let sender = irpc::Client::quinn(endpoint, addr);
304 Store::from_sender(sender)
305 }
306
307 #[cfg(feature = "rpc")]
309 pub async fn listen(self, endpoint: quinn::Endpoint) {
310 use irpc::rpc::RemoteService;
311
312 use self::proto::Request;
313 let local = self.client.as_local().unwrap().clone();
314 let handler = Request::remote_handler(local);
315 irpc::rpc::listen::<Request>(endpoint, handler).await
316 }
317
318 pub async fn sync_db(&self) -> RequestResult<()> {
319 let msg = SyncDbRequest;
320 self.client.rpc(msg).await??;
321 Ok(())
322 }
323
324 pub async fn shutdown(&self) -> irpc::Result<()> {
325 let msg = ShutdownRequest;
326 self.client.rpc(msg).await?;
327 Ok(())
328 }
329
330 pub async fn wait_idle(&self) -> irpc::Result<()> {
342 let msg = WaitIdleRequest;
343 self.client.rpc(msg).await?;
344 Ok(())
345 }
346
347 pub(crate) fn from_sender(client: ApiClient) -> Self {
348 Self { client }
349 }
350
351 pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
352 Self::ref_cast(client)
353 }
354}