1use std::{future::Future, time::Duration};
4
5use iroh::{Endpoint, EndpointAddr, PublicKey};
6use n0_future::time::Instant;
7use serde::{Deserialize, Serialize};
8use tracing::{debug, error_span, trace, Instrument};
9
10use crate::{
11 actor::SyncHandle,
12 metrics::Metrics,
13 net::codec::{run_alice, BobState},
14 NamespaceId, SyncOutcome,
15};
16
17pub const ALPN: &[u8] = b"/iroh-sync/1";
19
20mod codec;
21
22pub async fn connect_and_sync(
24 endpoint: &Endpoint,
25 sync: &SyncHandle,
26 namespace: NamespaceId,
27 peer: EndpointAddr,
28 metrics: Option<&Metrics>,
29) -> Result<SyncFinished, ConnectError> {
30 let t_start = Instant::now();
31 let peer_id = peer.id;
32 trace!("connect");
33 let connection = endpoint
34 .connect(peer, crate::ALPN)
35 .await
36 .map_err(ConnectError::connect)?;
37
38 let (mut send_stream, mut recv_stream) =
39 connection.open_bi().await.map_err(ConnectError::connect)?;
40
41 let t_connect = t_start.elapsed();
42 debug!(?t_connect, "connected");
43
44 let res = run_alice(&mut send_stream, &mut recv_stream, sync, namespace, peer_id).await;
45
46 send_stream.finish().map_err(ConnectError::close)?;
47 send_stream.stopped().await.map_err(ConnectError::close)?;
48 recv_stream
49 .read_to_end(0)
50 .await
51 .map_err(ConnectError::close)?;
52
53 if let Some(metrics) = metrics {
54 if res.is_ok() {
55 metrics.sync_via_connect_success.inc();
56 } else {
57 metrics.sync_via_connect_failure.inc();
58 }
59 }
60
61 let t_process = t_start.elapsed() - t_connect;
62 match &res {
63 Ok(res) => {
64 debug!(
65 ?t_connect,
66 ?t_process,
67 sent = %res.num_sent,
68 recv = %res.num_recv,
69 "done, ok"
70 );
71 }
72 Err(err) => {
73 debug!(?t_connect, ?t_process, ?err, "done, failed");
74 }
75 }
76
77 let outcome = res?;
78
79 let timings = Timings {
80 connect: t_connect,
81 process: t_process,
82 };
83
84 let res = SyncFinished {
85 namespace,
86 peer: peer_id,
87 outcome,
88 timings,
89 };
90
91 Ok(res)
92}
93
94#[derive(Debug, Clone)]
96pub enum AcceptOutcome {
97 Allow,
99 Reject(AbortReason),
101}
102
103pub async fn handle_connection<F, Fut>(
105 sync: SyncHandle,
106 connection: iroh::endpoint::Connection,
107 accept_cb: F,
108 metrics: Option<&Metrics>,
109) -> Result<SyncFinished, AcceptError>
110where
111 F: Fn(NamespaceId, PublicKey) -> Fut,
112 Fut: Future<Output = AcceptOutcome>,
113{
114 let t_start = Instant::now();
115 let peer = connection.remote_id().map_err(AcceptError::connect)?;
116 let (mut send_stream, mut recv_stream) = connection
117 .accept_bi()
118 .await
119 .map_err(|e| AcceptError::open(peer, e))?;
120
121 let t_connect = t_start.elapsed();
122 let span = error_span!("accept", peer = %peer.fmt_short(), namespace = tracing::field::Empty);
123 span.in_scope(|| {
124 debug!(?t_connect, "connection established");
125 });
126
127 let mut state = BobState::new(peer);
128 let res = state
129 .run(&mut send_stream, &mut recv_stream, sync, accept_cb)
130 .instrument(span.clone())
131 .await;
132
133 if let Some(metrics) = metrics {
134 if res.is_ok() {
135 metrics.sync_via_accept_success.inc();
136 } else {
137 metrics.sync_via_accept_failure.inc();
138 }
139 }
140
141 let namespace = state.namespace();
142 let outcome = state.into_outcome();
143
144 send_stream
145 .finish()
146 .map_err(|error| AcceptError::close(peer, namespace, error))?;
147 send_stream
148 .stopped()
149 .await
150 .map_err(|error| AcceptError::close(peer, namespace, error))?;
151 recv_stream
152 .read_to_end(0)
153 .await
154 .map_err(|error| AcceptError::close(peer, namespace, error))?;
155
156 let t_process = t_start.elapsed() - t_connect;
157 span.in_scope(|| match &res {
158 Ok(_res) => {
159 debug!(
160 ?t_connect,
161 ?t_process,
162 sent = %outcome.num_sent,
163 recv = %outcome.num_recv,
164 "done, ok"
165 );
166 }
167 Err(err) => {
168 debug!(?t_connect, ?t_process, ?err, "done, failed");
169 }
170 });
171
172 let namespace = res?;
173
174 let timings = Timings {
175 connect: t_connect,
176 process: t_process,
177 };
178 let res = SyncFinished {
179 namespace,
180 outcome,
181 peer,
182 timings,
183 };
184
185 Ok(res)
186}
187
188#[derive(Debug, Clone)]
190pub struct SyncFinished {
191 pub namespace: NamespaceId,
193 pub peer: PublicKey,
195 pub outcome: SyncOutcome,
197 pub timings: Timings,
199}
200
201#[derive(Debug, Default, Clone)]
203pub struct Timings {
204 pub connect: Duration,
206 pub process: Duration,
208}
209
210#[derive(thiserror::Error, Debug)]
212#[allow(missing_docs)]
213pub enum AcceptError {
214 #[error("Failed to establish connection")]
216 Connect {
217 #[source]
218 error: anyhow::Error,
219 },
220 #[error("Failed to open replica with {peer:?}")]
222 Open {
223 peer: PublicKey,
224 #[source]
225 error: anyhow::Error,
226 },
227 #[error("Aborted sync of {namespace:?} with {peer:?}: {reason:?}")]
229 Abort {
230 peer: PublicKey,
231 namespace: NamespaceId,
232 reason: AbortReason,
233 },
234 #[error("Failed to sync {namespace:?} with {peer:?}")]
236 Sync {
237 peer: PublicKey,
238 namespace: Option<NamespaceId>,
239 #[source]
240 error: anyhow::Error,
241 },
242 #[error("Failed to close {namespace:?} with {peer:?}")]
244 Close {
245 peer: PublicKey,
246 namespace: Option<NamespaceId>,
247 #[source]
248 error: anyhow::Error,
249 },
250}
251
252#[derive(thiserror::Error, Debug)]
254#[allow(missing_docs)]
255pub enum ConnectError {
256 #[error("Failed to establish connection")]
258 Connect {
259 #[source]
260 error: anyhow::Error,
261 },
262 #[error("Remote peer aborted sync: {0:?}")]
264 RemoteAbort(AbortReason),
265 #[error("Failed to sync")]
267 Sync {
268 #[source]
269 error: anyhow::Error,
270 },
271 #[error("Failed to close connection1")]
273 Close {
274 #[source]
275 error: anyhow::Error,
276 },
277}
278
279#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
281pub enum AbortReason {
282 NotFound,
284 AlreadySyncing,
286 InternalServerError,
288}
289
290impl AcceptError {
291 fn connect(error: impl Into<anyhow::Error>) -> Self {
292 Self::Connect {
293 error: error.into(),
294 }
295 }
296 fn open(peer: PublicKey, error: impl Into<anyhow::Error>) -> Self {
297 Self::Open {
298 peer,
299 error: error.into(),
300 }
301 }
302 pub(crate) fn sync(
303 peer: PublicKey,
304 namespace: Option<NamespaceId>,
305 error: impl Into<anyhow::Error>,
306 ) -> Self {
307 Self::Sync {
308 peer,
309 namespace,
310 error: error.into(),
311 }
312 }
313 fn close(
314 peer: PublicKey,
315 namespace: Option<NamespaceId>,
316 error: impl Into<anyhow::Error>,
317 ) -> Self {
318 Self::Close {
319 peer,
320 namespace,
321 error: error.into(),
322 }
323 }
324 pub fn peer(&self) -> Option<PublicKey> {
326 match self {
327 AcceptError::Connect { .. } => None,
328 AcceptError::Open { peer, .. } => Some(*peer),
329 AcceptError::Sync { peer, .. } => Some(*peer),
330 AcceptError::Close { peer, .. } => Some(*peer),
331 AcceptError::Abort { peer, .. } => Some(*peer),
332 }
333 }
334
335 pub fn namespace(&self) -> Option<NamespaceId> {
337 match self {
338 AcceptError::Connect { .. } => None,
339 AcceptError::Open { .. } => None,
340 AcceptError::Sync { namespace, .. } => namespace.to_owned(),
341 AcceptError::Close { namespace, .. } => namespace.to_owned(),
342 AcceptError::Abort { namespace, .. } => Some(*namespace),
343 }
344 }
345}
346
347impl ConnectError {
348 fn connect(error: impl Into<anyhow::Error>) -> Self {
349 Self::Connect {
350 error: error.into(),
351 }
352 }
353 fn close(error: impl Into<anyhow::Error>) -> Self {
354 Self::Close {
355 error: error.into(),
356 }
357 }
358 pub(crate) fn sync(error: impl Into<anyhow::Error>) -> Self {
359 Self::Sync {
360 error: error.into(),
361 }
362 }
363 pub(crate) fn remote_abort(reason: AbortReason) -> Self {
364 Self::RemoteAbort(reason)
365 }
366}