1use std::{io, ops::Deref};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use n0_snafu::SpanTrace;
20use nested_enum_utils::common_fields;
21use proto::{ShutdownRequest, SyncDbRequest};
22use ref_cast::RefCast;
23use serde::{Deserialize, Serialize};
24use snafu::{Backtrace, IntoError, Snafu};
25use tags::Tags;
26
27pub mod blobs;
28pub mod downloader;
29pub mod proto;
30pub mod remote;
31pub mod tags;
32use crate::{api::proto::WaitIdleRequest, provider::events::ProgressError};
33pub use crate::{store::util::Tag, util::temp_tag::TempTag};
34
35pub(crate) type ApiClient = irpc::Client<proto::Request>;
36
37#[common_fields({
38 backtrace: Option<Backtrace>,
39 #[snafu(implicit)]
40 span_trace: SpanTrace,
41})]
42#[allow(missing_docs)]
43#[non_exhaustive]
44#[derive(Debug, Snafu)]
45pub enum RequestError {
46 #[snafu(display("rpc error: {source}"))]
48 Rpc { source: irpc::Error },
49 #[snafu(display("inner error: {source}"))]
51 Inner { source: Error },
52}
53
54impl From<irpc::Error> for RequestError {
55 fn from(value: irpc::Error) -> Self {
56 RpcSnafu.into_error(value)
57 }
58}
59
60impl From<Error> for RequestError {
61 fn from(value: Error) -> Self {
62 InnerSnafu.into_error(value)
63 }
64}
65
66impl From<io::Error> for RequestError {
67 fn from(value: io::Error) -> Self {
68 InnerSnafu.into_error(value.into())
69 }
70}
71
72impl From<irpc::channel::mpsc::RecvError> for RequestError {
73 fn from(value: irpc::channel::mpsc::RecvError) -> Self {
74 RpcSnafu.into_error(value.into())
75 }
76}
77
78pub type RequestResult<T> = std::result::Result<T, RequestError>;
79
80#[common_fields({
81 backtrace: Option<Backtrace>,
82 #[snafu(implicit)]
83 span_trace: SpanTrace,
84})]
85#[allow(missing_docs)]
86#[non_exhaustive]
87#[derive(Debug, Snafu)]
88pub enum ExportBaoError {
89 #[snafu(display("send error: {source}"))]
90 Send { source: irpc::channel::SendError },
91 #[snafu(display("mpsc recv error: {source}"))]
92 MpscRecv {
93 source: irpc::channel::mpsc::RecvError,
94 },
95 #[snafu(display("oneshot recv error: {source}"))]
96 OneshotRecv {
97 source: irpc::channel::oneshot::RecvError,
98 },
99 #[snafu(display("request error: {source}"))]
100 Request { source: irpc::RequestError },
101 #[snafu(display("io error: {source}"))]
102 ExportBaoIo { source: io::Error },
103 #[snafu(display("encode error: {source}"))]
104 ExportBaoInner { source: bao_tree::io::EncodeError },
105 #[snafu(display("client error: {source}"))]
106 ClientError { source: ProgressError },
107}
108
109impl From<ExportBaoError> for Error {
110 fn from(e: ExportBaoError) -> Self {
111 match e {
112 ExportBaoError::Send { source, .. } => Self::Io(source.into()),
113 ExportBaoError::MpscRecv { source, .. } => Self::Io(source.into()),
114 ExportBaoError::OneshotRecv { source, .. } => Self::Io(source.into()),
115 ExportBaoError::Request { source, .. } => Self::Io(source.into()),
116 ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
117 ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
118 ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
119 }
120 }
121}
122
123impl From<irpc::Error> for ExportBaoError {
124 fn from(e: irpc::Error) -> Self {
125 match e {
126 irpc::Error::MpscRecv(e) => MpscRecvSnafu.into_error(e),
127 irpc::Error::OneshotRecv(e) => OneshotRecvSnafu.into_error(e),
128 irpc::Error::Send(e) => SendSnafu.into_error(e),
129 irpc::Error::Request(e) => RequestSnafu.into_error(e),
130 #[cfg(feature = "rpc")]
131 irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()),
132 }
133 }
134}
135
136impl From<io::Error> for ExportBaoError {
137 fn from(value: io::Error) -> Self {
138 ExportBaoIoSnafu.into_error(value)
139 }
140}
141
142impl From<irpc::channel::mpsc::RecvError> for ExportBaoError {
143 fn from(value: irpc::channel::mpsc::RecvError) -> Self {
144 MpscRecvSnafu.into_error(value)
145 }
146}
147
148impl From<irpc::channel::oneshot::RecvError> for ExportBaoError {
149 fn from(value: irpc::channel::oneshot::RecvError) -> Self {
150 OneshotRecvSnafu.into_error(value)
151 }
152}
153
154impl From<irpc::channel::SendError> for ExportBaoError {
155 fn from(value: irpc::channel::SendError) -> Self {
156 SendSnafu.into_error(value)
157 }
158}
159
160impl From<irpc::RequestError> for ExportBaoError {
161 fn from(value: irpc::RequestError) -> Self {
162 RequestSnafu.into_error(value)
163 }
164}
165
166impl From<bao_tree::io::EncodeError> for ExportBaoError {
167 fn from(value: bao_tree::io::EncodeError) -> Self {
168 ExportBaoInnerSnafu.into_error(value)
169 }
170}
171
172impl From<ProgressError> for ExportBaoError {
173 fn from(value: ProgressError) -> Self {
174 ClientSnafu.into_error(value)
175 }
176}
177
178pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
179
180#[derive(Debug, derive_more::Display, derive_more::From, Serialize, Deserialize)]
181pub enum Error {
182 #[serde(with = "crate::util::serde::io_error_serde")]
183 Io(io::Error),
184}
185
186impl Error {
187 pub fn io(
188 kind: io::ErrorKind,
189 msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
190 ) -> Self {
191 Self::Io(io::Error::new(kind, msg.into()))
192 }
193
194 pub fn other<E>(msg: E) -> Self
195 where
196 E: Into<Box<dyn std::error::Error + Send + Sync>>,
197 {
198 Self::Io(io::Error::other(msg.into()))
199 }
200}
201
202impl From<irpc::Error> for Error {
203 fn from(e: irpc::Error) -> Self {
204 Self::Io(e.into())
205 }
206}
207
208impl From<RequestError> for Error {
209 fn from(e: RequestError) -> Self {
210 match e {
211 RequestError::Rpc { source, .. } => Self::Io(source.into()),
212 RequestError::Inner { source, .. } => source,
213 }
214 }
215}
216
217impl From<irpc::channel::mpsc::RecvError> for Error {
218 fn from(e: irpc::channel::mpsc::RecvError) -> Self {
219 Self::Io(e.into())
220 }
221}
222
223#[cfg(feature = "rpc")]
224impl From<irpc::rpc::WriteError> for Error {
225 fn from(e: irpc::rpc::WriteError) -> Self {
226 Self::Io(e.into())
227 }
228}
229
230impl From<irpc::RequestError> for Error {
231 fn from(e: irpc::RequestError) -> Self {
232 Self::Io(e.into())
233 }
234}
235
236impl From<irpc::channel::SendError> for Error {
237 fn from(e: irpc::channel::SendError) -> Self {
238 Self::Io(e.into())
239 }
240}
241
242impl std::error::Error for Error {
243 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
244 match self {
245 Error::Io(e) => Some(e),
246 }
247 }
248}
249
250impl From<EncodeError> for Error {
251 fn from(value: EncodeError) -> Self {
252 match value {
253 EncodeError::Io(cause) => Self::Io(cause),
254 _ => Self::other(value),
255 }
256 }
257}
258
259pub type Result<T> = std::result::Result<T, Error>;
260
261#[derive(Debug, Clone, ref_cast::RefCast)]
263#[repr(transparent)]
264pub struct Store {
265 client: ApiClient,
266}
267
268impl Deref for Store {
269 type Target = blobs::Blobs;
270
271 fn deref(&self) -> &Self::Target {
272 blobs::Blobs::ref_from_sender(&self.client)
273 }
274}
275
276impl Store {
277 pub fn tags(&self) -> &Tags {
279 Tags::ref_from_sender(&self.client)
280 }
281
282 pub fn blobs(&self) -> &blobs::Blobs {
284 blobs::Blobs::ref_from_sender(&self.client)
285 }
286
287 pub fn remote(&self) -> &remote::Remote {
289 remote::Remote::ref_from_sender(&self.client)
290 }
291
292 pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
298 downloader::Downloader::new(self, endpoint)
299 }
300
301 #[cfg(feature = "rpc")]
303 pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self {
304 let sender = irpc::Client::quinn(endpoint, addr);
305 Store::from_sender(sender)
306 }
307
308 #[cfg(feature = "rpc")]
310 pub async fn listen(self, endpoint: quinn::Endpoint) {
311 use irpc::rpc::RemoteService;
312
313 use self::proto::Request;
314 let local = self.client.as_local().unwrap().clone();
315 let handler = Request::remote_handler(local);
316 irpc::rpc::listen::<Request>(endpoint, handler).await
317 }
318
319 pub async fn sync_db(&self) -> RequestResult<()> {
320 let msg = SyncDbRequest;
321 self.client.rpc(msg).await??;
322 Ok(())
323 }
324
325 pub async fn shutdown(&self) -> irpc::Result<()> {
326 let msg = ShutdownRequest;
327 self.client.rpc(msg).await?;
328 Ok(())
329 }
330
331 pub async fn wait_idle(&self) -> irpc::Result<()> {
343 let msg = WaitIdleRequest;
344 self.client.rpc(msg).await?;
345 Ok(())
346 }
347
348 pub(crate) fn from_sender(client: ApiClient) -> Self {
349 Self { client }
350 }
351
352 pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
353 Self::ref_cast(client)
354 }
355}