iroh_docs/
net.rs

1//! Network implementation of the iroh-docs protocol
2
3use std::{future::Future, time::Duration};
4
5use iroh::{Endpoint, EndpointAddr, PublicKey};
6use n0_future::time::Instant;
7use serde::{Deserialize, Serialize};
8use tracing::{debug, error_span, trace, Instrument};
9
10use crate::{
11    actor::SyncHandle,
12    metrics::Metrics,
13    net::codec::{run_alice, BobState},
14    NamespaceId, SyncOutcome,
15};
16
17/// The ALPN identifier for the iroh-docs protocol
18pub const ALPN: &[u8] = b"/iroh-sync/1";
19
20mod codec;
21
22/// Connect to a peer and sync a replica
23pub async fn connect_and_sync(
24    endpoint: &Endpoint,
25    sync: &SyncHandle,
26    namespace: NamespaceId,
27    peer: EndpointAddr,
28    metrics: Option<&Metrics>,
29) -> Result<SyncFinished, ConnectError> {
30    let t_start = Instant::now();
31    let peer_id = peer.id;
32    trace!("connect");
33    let connection = endpoint
34        .connect(peer, crate::ALPN)
35        .await
36        .map_err(ConnectError::connect)?;
37
38    let (mut send_stream, mut recv_stream) =
39        connection.open_bi().await.map_err(ConnectError::connect)?;
40
41    let t_connect = t_start.elapsed();
42    debug!(?t_connect, "connected");
43
44    let res = run_alice(&mut send_stream, &mut recv_stream, sync, namespace, peer_id).await;
45
46    send_stream.finish().map_err(ConnectError::close)?;
47    send_stream.stopped().await.map_err(ConnectError::close)?;
48    recv_stream
49        .read_to_end(0)
50        .await
51        .map_err(ConnectError::close)?;
52
53    if let Some(metrics) = metrics {
54        if res.is_ok() {
55            metrics.sync_via_connect_success.inc();
56        } else {
57            metrics.sync_via_connect_failure.inc();
58        }
59    }
60
61    let t_process = t_start.elapsed() - t_connect;
62    match &res {
63        Ok(res) => {
64            debug!(
65                ?t_connect,
66                ?t_process,
67                sent = %res.num_sent,
68                recv = %res.num_recv,
69                "done, ok"
70            );
71        }
72        Err(err) => {
73            debug!(?t_connect, ?t_process, ?err, "done, failed");
74        }
75    }
76
77    let outcome = res?;
78
79    let timings = Timings {
80        connect: t_connect,
81        process: t_process,
82    };
83
84    let res = SyncFinished {
85        namespace,
86        peer: peer_id,
87        outcome,
88        timings,
89    };
90
91    Ok(res)
92}
93
94/// Whether we want to accept or reject an incoming sync request.
95#[derive(Debug, Clone)]
96pub enum AcceptOutcome {
97    /// Accept the sync request.
98    Allow,
99    /// Decline the sync request
100    Reject(AbortReason),
101}
102
103/// Handle an iroh-docs connection and sync all shared documents in the replica store.
104pub async fn handle_connection<F, Fut>(
105    sync: SyncHandle,
106    connection: iroh::endpoint::Connection,
107    accept_cb: F,
108    metrics: Option<&Metrics>,
109) -> Result<SyncFinished, AcceptError>
110where
111    F: Fn(NamespaceId, PublicKey) -> Fut,
112    Fut: Future<Output = AcceptOutcome>,
113{
114    let t_start = Instant::now();
115    let peer = connection.remote_id().map_err(AcceptError::connect)?;
116    let (mut send_stream, mut recv_stream) = connection
117        .accept_bi()
118        .await
119        .map_err(|e| AcceptError::open(peer, e))?;
120
121    let t_connect = t_start.elapsed();
122    let span = error_span!("accept", peer = %peer.fmt_short(), namespace = tracing::field::Empty);
123    span.in_scope(|| {
124        debug!(?t_connect, "connection established");
125    });
126
127    let mut state = BobState::new(peer);
128    let res = state
129        .run(&mut send_stream, &mut recv_stream, sync, accept_cb)
130        .instrument(span.clone())
131        .await;
132
133    if let Some(metrics) = metrics {
134        if res.is_ok() {
135            metrics.sync_via_accept_success.inc();
136        } else {
137            metrics.sync_via_accept_failure.inc();
138        }
139    }
140
141    let namespace = state.namespace();
142    let outcome = state.into_outcome();
143
144    send_stream
145        .finish()
146        .map_err(|error| AcceptError::close(peer, namespace, error))?;
147    send_stream
148        .stopped()
149        .await
150        .map_err(|error| AcceptError::close(peer, namespace, error))?;
151    recv_stream
152        .read_to_end(0)
153        .await
154        .map_err(|error| AcceptError::close(peer, namespace, error))?;
155
156    let t_process = t_start.elapsed() - t_connect;
157    span.in_scope(|| match &res {
158        Ok(_res) => {
159            debug!(
160                ?t_connect,
161                ?t_process,
162                sent = %outcome.num_sent,
163                recv = %outcome.num_recv,
164                "done, ok"
165            );
166        }
167        Err(err) => {
168            debug!(?t_connect, ?t_process, ?err, "done, failed");
169        }
170    });
171
172    let namespace = res?;
173
174    let timings = Timings {
175        connect: t_connect,
176        process: t_process,
177    };
178    let res = SyncFinished {
179        namespace,
180        outcome,
181        peer,
182        timings,
183    };
184
185    Ok(res)
186}
187
188/// Details of a finished sync operation.
189#[derive(Debug, Clone)]
190pub struct SyncFinished {
191    /// The namespace that was synced.
192    pub namespace: NamespaceId,
193    /// The peer we syned with.
194    pub peer: PublicKey,
195    /// The outcome of the sync operation
196    pub outcome: SyncOutcome,
197    /// The time this operation took
198    pub timings: Timings,
199}
200
201/// Time a sync operation took
202#[derive(Debug, Default, Clone)]
203pub struct Timings {
204    /// Time to establish connection
205    pub connect: Duration,
206    /// Time to run sync exchange
207    pub process: Duration,
208}
209
210/// Errors that may occur on handling incoming sync connections.
211#[derive(thiserror::Error, Debug)]
212#[allow(missing_docs)]
213pub enum AcceptError {
214    /// Failed to establish connection
215    #[error("Failed to establish connection")]
216    Connect {
217        #[source]
218        error: anyhow::Error,
219    },
220    /// Failed to open replica
221    #[error("Failed to open replica with {peer:?}")]
222    Open {
223        peer: PublicKey,
224        #[source]
225        error: anyhow::Error,
226    },
227    /// We aborted the sync request.
228    #[error("Aborted sync of {namespace:?} with {peer:?}: {reason:?}")]
229    Abort {
230        peer: PublicKey,
231        namespace: NamespaceId,
232        reason: AbortReason,
233    },
234    /// Failed to run sync
235    #[error("Failed to sync {namespace:?} with {peer:?}")]
236    Sync {
237        peer: PublicKey,
238        namespace: Option<NamespaceId>,
239        #[source]
240        error: anyhow::Error,
241    },
242    /// Failed to close
243    #[error("Failed to close {namespace:?} with {peer:?}")]
244    Close {
245        peer: PublicKey,
246        namespace: Option<NamespaceId>,
247        #[source]
248        error: anyhow::Error,
249    },
250}
251
252/// Errors that may occur on outgoing sync requests.
253#[derive(thiserror::Error, Debug)]
254#[allow(missing_docs)]
255pub enum ConnectError {
256    /// Failed to establish connection
257    #[error("Failed to establish connection")]
258    Connect {
259        #[source]
260        error: anyhow::Error,
261    },
262    /// The remote peer aborted the sync request.
263    #[error("Remote peer aborted sync: {0:?}")]
264    RemoteAbort(AbortReason),
265    /// Failed to run sync
266    #[error("Failed to sync")]
267    Sync {
268        #[source]
269        error: anyhow::Error,
270    },
271    /// Failed to close
272    #[error("Failed to close connection1")]
273    Close {
274        #[source]
275        error: anyhow::Error,
276    },
277}
278
279/// Reason why we aborted an incoming sync request.
280#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
281pub enum AbortReason {
282    /// Namespace is not available.
283    NotFound,
284    /// We are already syncing this namespace.
285    AlreadySyncing,
286    /// We experienced an error while trying to provide the requested resource
287    InternalServerError,
288}
289
290impl AcceptError {
291    fn connect(error: impl Into<anyhow::Error>) -> Self {
292        Self::Connect {
293            error: error.into(),
294        }
295    }
296    fn open(peer: PublicKey, error: impl Into<anyhow::Error>) -> Self {
297        Self::Open {
298            peer,
299            error: error.into(),
300        }
301    }
302    pub(crate) fn sync(
303        peer: PublicKey,
304        namespace: Option<NamespaceId>,
305        error: impl Into<anyhow::Error>,
306    ) -> Self {
307        Self::Sync {
308            peer,
309            namespace,
310            error: error.into(),
311        }
312    }
313    fn close(
314        peer: PublicKey,
315        namespace: Option<NamespaceId>,
316        error: impl Into<anyhow::Error>,
317    ) -> Self {
318        Self::Close {
319            peer,
320            namespace,
321            error: error.into(),
322        }
323    }
324    /// Get the peer's node ID (if available)
325    pub fn peer(&self) -> Option<PublicKey> {
326        match self {
327            AcceptError::Connect { .. } => None,
328            AcceptError::Open { peer, .. } => Some(*peer),
329            AcceptError::Sync { peer, .. } => Some(*peer),
330            AcceptError::Close { peer, .. } => Some(*peer),
331            AcceptError::Abort { peer, .. } => Some(*peer),
332        }
333    }
334
335    /// Get the namespace (if available)
336    pub fn namespace(&self) -> Option<NamespaceId> {
337        match self {
338            AcceptError::Connect { .. } => None,
339            AcceptError::Open { .. } => None,
340            AcceptError::Sync { namespace, .. } => namespace.to_owned(),
341            AcceptError::Close { namespace, .. } => namespace.to_owned(),
342            AcceptError::Abort { namespace, .. } => Some(*namespace),
343        }
344    }
345}
346
347impl ConnectError {
348    fn connect(error: impl Into<anyhow::Error>) -> Self {
349        Self::Connect {
350            error: error.into(),
351        }
352    }
353    fn close(error: impl Into<anyhow::Error>) -> Self {
354        Self::Close {
355            error: error.into(),
356        }
357    }
358    pub(crate) fn sync(error: impl Into<anyhow::Error>) -> Self {
359        Self::Sync {
360            error: error.into(),
361        }
362    }
363    pub(crate) fn remote_abort(reason: AbortReason) -> Self {
364        Self::RemoteAbort(reason)
365    }
366}