1use std::{io, net::SocketAddr, ops::Deref};
16
17use bao_tree::io::EncodeError;
18use iroh::Endpoint;
19use irpc::rpc::{listen, RemoteService};
20use n0_error::{e, stack_error};
21use proto::{Request, ShutdownRequest, SyncDbRequest};
22use ref_cast::RefCast;
23use serde::{Deserialize, Serialize};
24use tags::Tags;
25
26pub mod blobs;
27pub mod downloader;
28pub mod proto;
29pub mod remote;
30pub mod tags;
31use crate::{api::proto::WaitIdleRequest, provider::events::ProgressError};
32pub use crate::{store::util::Tag, util::temp_tag::TempTag};
33
34pub(crate) type ApiClient = irpc::Client<proto::Request>;
35
36#[allow(missing_docs)]
37#[non_exhaustive]
38#[stack_error(derive, add_meta)]
39pub enum RequestError {
40 #[error("rpc error: {source}")]
42 Rpc { source: irpc::Error },
43 #[error("inner error: {source}")]
45 Inner {
46 #[error(std_err)]
47 source: Error,
48 },
49}
50
51impl From<irpc::Error> for RequestError {
52 fn from(value: irpc::Error) -> Self {
53 e!(RequestError::Rpc, value)
54 }
55}
56
57impl From<Error> for RequestError {
58 fn from(value: Error) -> Self {
59 e!(RequestError::Inner, value)
60 }
61}
62
63impl From<io::Error> for RequestError {
64 fn from(value: io::Error) -> Self {
65 e!(RequestError::Inner, value.into())
66 }
67}
68
69impl From<irpc::channel::mpsc::RecvError> for RequestError {
70 fn from(value: irpc::channel::mpsc::RecvError) -> Self {
71 e!(RequestError::Rpc, value.into())
72 }
73}
74
75pub type RequestResult<T> = std::result::Result<T, RequestError>;
76
77#[allow(missing_docs)]
78#[non_exhaustive]
79#[stack_error(derive, add_meta, from_sources)]
80pub enum ExportBaoError {
81 #[error("send error")]
82 Send { source: irpc::channel::SendError },
83 #[error("mpsc recv e api.acp.pro-channelsrror")]
84 MpscRecv {
85 source: irpc::channel::mpsc::RecvError,
86 },
87 #[error("oneshot recv error")]
88 OneshotRecv {
89 source: irpc::channel::oneshot::RecvError,
90 },
91 #[error("request error")]
92 Request { source: irpc::RequestError },
93 #[error("io error")]
94 ExportBaoIo {
95 #[error(std_err)]
96 source: io::Error,
97 },
98 #[error("encode error")]
99 ExportBaoInner {
100 #[error(std_err)]
101 source: bao_tree::io::EncodeError,
102 },
103 #[error("client error")]
104 ClientError { source: ProgressError },
105}
106
107impl From<ExportBaoError> for Error {
108 fn from(e: ExportBaoError) -> Self {
109 match e {
110 ExportBaoError::Send { source, .. } => Self::Io(source.into()),
111 ExportBaoError::MpscRecv { source, .. } => Self::Io(source.into()),
112 ExportBaoError::OneshotRecv { source, .. } => Self::Io(source.into()),
113 ExportBaoError::Request { source, .. } => Self::Io(source.into()),
114 ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
115 ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
116 ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
117 }
118 }
119}
120
121impl From<irpc::Error> for ExportBaoError {
122 fn from(e: irpc::Error) -> Self {
123 match e {
124 irpc::Error::MpscRecv { source: e, .. } => e!(ExportBaoError::MpscRecv, e),
125 irpc::Error::OneshotRecv { source: e, .. } => e!(ExportBaoError::OneshotRecv, e),
126 irpc::Error::Send { source: e, .. } => e!(ExportBaoError::Send, e),
127 irpc::Error::Request { source: e, .. } => e!(ExportBaoError::Request, e),
128 irpc::Error::Write { source: e, .. } => e!(ExportBaoError::ExportBaoIo, e.into()),
129 }
130 }
131}
132
133pub type ExportBaoResult<T> = std::result::Result<T, ExportBaoError>;
134
135#[derive(Serialize, Deserialize)]
136#[stack_error(derive, std_sources, from_sources)]
137pub enum Error {
138 #[serde(with = "crate::util::serde::io_error_serde")]
139 Io(#[error(source)] io::Error),
140}
141
142impl Error {
143 pub fn io(
144 kind: io::ErrorKind,
145 msg: impl Into<Box<dyn std::error::Error + Send + Sync>>,
146 ) -> Self {
147 Self::Io(io::Error::new(kind, msg.into()))
148 }
149
150 pub fn other<E>(msg: E) -> Self
151 where
152 E: Into<Box<dyn std::error::Error + Send + Sync>>,
153 {
154 Self::Io(io::Error::other(msg.into()))
155 }
156}
157
158impl From<irpc::Error> for Error {
159 fn from(e: irpc::Error) -> Self {
160 Self::Io(e.into())
161 }
162}
163
164impl From<RequestError> for Error {
165 fn from(e: RequestError) -> Self {
166 match e {
167 RequestError::Rpc { source, .. } => Self::Io(source.into()),
168 RequestError::Inner { source, .. } => source,
169 }
170 }
171}
172
173impl From<irpc::channel::mpsc::RecvError> for Error {
174 fn from(e: irpc::channel::mpsc::RecvError) -> Self {
175 Self::Io(e.into())
176 }
177}
178
179impl From<irpc::rpc::WriteError> for Error {
180 fn from(e: irpc::rpc::WriteError) -> Self {
181 Self::Io(e.into())
182 }
183}
184
185impl From<irpc::RequestError> for Error {
186 fn from(e: irpc::RequestError) -> Self {
187 Self::Io(e.into())
188 }
189}
190
191impl From<irpc::channel::SendError> for Error {
192 fn from(e: irpc::channel::SendError) -> Self {
193 Self::Io(e.into())
194 }
195}
196
197impl From<EncodeError> for Error {
198 fn from(value: EncodeError) -> Self {
199 match value {
200 EncodeError::Io(cause) => Self::Io(cause),
201 _ => Self::other(value),
202 }
203 }
204}
205
206pub type Result<T> = std::result::Result<T, Error>;
207
208#[derive(Debug, Clone, ref_cast::RefCast)]
210#[repr(transparent)]
211pub struct Store {
212 client: ApiClient,
213}
214
215impl Deref for Store {
216 type Target = blobs::Blobs;
217
218 fn deref(&self) -> &Self::Target {
219 blobs::Blobs::ref_from_sender(&self.client)
220 }
221}
222
223impl Store {
224 pub fn tags(&self) -> &Tags {
226 Tags::ref_from_sender(&self.client)
227 }
228
229 pub fn blobs(&self) -> &blobs::Blobs {
231 blobs::Blobs::ref_from_sender(&self.client)
232 }
233
234 pub fn remote(&self) -> &remote::Remote {
236 remote::Remote::ref_from_sender(&self.client)
237 }
238
239 pub fn downloader(&self, endpoint: &Endpoint) -> downloader::Downloader {
245 downloader::Downloader::new(self, endpoint)
246 }
247
248 pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self {
250 let sender = irpc::Client::quinn(endpoint, addr);
251 Store::from_sender(sender)
252 }
253
254 pub async fn listen(self, endpoint: quinn::Endpoint) {
256 let local = self.client.as_local().unwrap().clone();
257 let handler = Request::remote_handler(local);
258 listen::<Request>(endpoint, handler).await
259 }
260
261 pub async fn sync_db(&self) -> RequestResult<()> {
262 let msg = SyncDbRequest;
263 self.client.rpc(msg).await??;
264 Ok(())
265 }
266
267 pub async fn shutdown(&self) -> irpc::Result<()> {
268 let msg = ShutdownRequest;
269 self.client.rpc(msg).await?;
270 Ok(())
271 }
272
273 pub async fn wait_idle(&self) -> irpc::Result<()> {
285 let msg = WaitIdleRequest;
286 self.client.rpc(msg).await?;
287 Ok(())
288 }
289
290 pub(crate) fn from_sender(client: ApiClient) -> Self {
291 Self { client }
292 }
293
294 pub(crate) fn ref_from_sender(client: &ApiClient) -> &Self {
295 Self::ref_cast(client)
296 }
297}