use std::{
future::Future,
time::{Duration, Instant},
};
use iroh::{Endpoint, NodeAddr, PublicKey};
#[cfg(feature = "metrics")]
use iroh_metrics::inc;
use serde::{Deserialize, Serialize};
use tracing::{debug, error_span, trace, Instrument};
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
use crate::{
actor::SyncHandle,
net::codec::{run_alice, BobState},
NamespaceId, SyncOutcome,
};
pub const ALPN: &[u8] = b"/iroh-sync/1";
mod codec;
pub async fn connect_and_sync(
endpoint: &Endpoint,
sync: &SyncHandle,
namespace: NamespaceId,
peer: NodeAddr,
) -> Result<SyncFinished, ConnectError> {
let t_start = Instant::now();
let peer_id = peer.node_id;
trace!("connect");
let connection = endpoint
.connect(peer, crate::ALPN)
.await
.map_err(ConnectError::connect)?;
let (mut send_stream, mut recv_stream) =
connection.open_bi().await.map_err(ConnectError::connect)?;
let t_connect = t_start.elapsed();
debug!(?t_connect, "connected");
let res = run_alice(&mut send_stream, &mut recv_stream, sync, namespace, peer_id).await;
send_stream.finish().map_err(ConnectError::close)?;
send_stream.stopped().await.map_err(ConnectError::close)?;
recv_stream
.read_to_end(0)
.await
.map_err(ConnectError::close)?;
#[cfg(feature = "metrics")]
if res.is_ok() {
inc!(Metrics, sync_via_connect_success);
} else {
inc!(Metrics, sync_via_connect_failure);
}
let t_process = t_start.elapsed() - t_connect;
match &res {
Ok(res) => {
debug!(
?t_connect,
?t_process,
sent = %res.num_sent,
recv = %res.num_recv,
"done, ok"
);
}
Err(err) => {
debug!(?t_connect, ?t_process, ?err, "done, failed");
}
}
let outcome = res?;
let timings = Timings {
connect: t_connect,
process: t_process,
};
let res = SyncFinished {
namespace,
peer: peer_id,
outcome,
timings,
};
Ok(res)
}
#[derive(Debug, Clone)]
pub enum AcceptOutcome {
Allow,
Reject(AbortReason),
}
pub async fn handle_connection<F, Fut>(
sync: SyncHandle,
connection: iroh::endpoint::Connection,
accept_cb: F,
) -> Result<SyncFinished, AcceptError>
where
F: Fn(NamespaceId, PublicKey) -> Fut,
Fut: Future<Output = AcceptOutcome>,
{
let t_start = Instant::now();
let peer = connection.remote_node_id().map_err(AcceptError::connect)?;
let (mut send_stream, mut recv_stream) = connection
.accept_bi()
.await
.map_err(|e| AcceptError::open(peer, e))?;
let t_connect = t_start.elapsed();
let span = error_span!("accept", peer = %peer.fmt_short(), namespace = tracing::field::Empty);
span.in_scope(|| {
debug!(?t_connect, "connection established");
});
let mut state = BobState::new(peer);
let res = state
.run(&mut send_stream, &mut recv_stream, sync, accept_cb)
.instrument(span.clone())
.await;
#[cfg(feature = "metrics")]
if res.is_ok() {
inc!(Metrics, sync_via_accept_success);
} else {
inc!(Metrics, sync_via_accept_failure);
}
let namespace = state.namespace();
let outcome = state.into_outcome();
send_stream
.finish()
.map_err(|error| AcceptError::close(peer, namespace, error))?;
send_stream
.stopped()
.await
.map_err(|error| AcceptError::close(peer, namespace, error))?;
recv_stream
.read_to_end(0)
.await
.map_err(|error| AcceptError::close(peer, namespace, error))?;
let t_process = t_start.elapsed() - t_connect;
span.in_scope(|| match &res {
Ok(_res) => {
debug!(
?t_connect,
?t_process,
sent = %outcome.num_sent,
recv = %outcome.num_recv,
"done, ok"
);
}
Err(err) => {
debug!(?t_connect, ?t_process, ?err, "done, failed");
}
});
let namespace = res?;
let timings = Timings {
connect: t_connect,
process: t_process,
};
let res = SyncFinished {
namespace,
outcome,
peer,
timings,
};
Ok(res)
}
#[derive(Debug, Clone)]
pub struct SyncFinished {
pub namespace: NamespaceId,
pub peer: PublicKey,
pub outcome: SyncOutcome,
pub timings: Timings,
}
#[derive(Debug, Default, Clone)]
pub struct Timings {
pub connect: Duration,
pub process: Duration,
}
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub enum AcceptError {
#[error("Failed to establish connection")]
Connect {
#[source]
error: anyhow::Error,
},
#[error("Failed to open replica with {peer:?}")]
Open {
peer: PublicKey,
#[source]
error: anyhow::Error,
},
#[error("Aborted sync of {namespace:?} with {peer:?}: {reason:?}")]
Abort {
peer: PublicKey,
namespace: NamespaceId,
reason: AbortReason,
},
#[error("Failed to sync {namespace:?} with {peer:?}")]
Sync {
peer: PublicKey,
namespace: Option<NamespaceId>,
#[source]
error: anyhow::Error,
},
#[error("Failed to close {namespace:?} with {peer:?}")]
Close {
peer: PublicKey,
namespace: Option<NamespaceId>,
#[source]
error: anyhow::Error,
},
}
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub enum ConnectError {
#[error("Failed to establish connection")]
Connect {
#[source]
error: anyhow::Error,
},
#[error("Remote peer aborted sync: {0:?}")]
RemoteAbort(AbortReason),
#[error("Failed to sync")]
Sync {
#[source]
error: anyhow::Error,
},
#[error("Failed to close connection1")]
Close {
#[source]
error: anyhow::Error,
},
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum AbortReason {
NotFound,
AlreadySyncing,
InternalServerError,
}
impl AcceptError {
fn connect(error: impl Into<anyhow::Error>) -> Self {
Self::Connect {
error: error.into(),
}
}
fn open(peer: PublicKey, error: impl Into<anyhow::Error>) -> Self {
Self::Open {
peer,
error: error.into(),
}
}
pub(crate) fn sync(
peer: PublicKey,
namespace: Option<NamespaceId>,
error: impl Into<anyhow::Error>,
) -> Self {
Self::Sync {
peer,
namespace,
error: error.into(),
}
}
fn close(
peer: PublicKey,
namespace: Option<NamespaceId>,
error: impl Into<anyhow::Error>,
) -> Self {
Self::Close {
peer,
namespace,
error: error.into(),
}
}
pub fn peer(&self) -> Option<PublicKey> {
match self {
AcceptError::Connect { .. } => None,
AcceptError::Open { peer, .. } => Some(*peer),
AcceptError::Sync { peer, .. } => Some(*peer),
AcceptError::Close { peer, .. } => Some(*peer),
AcceptError::Abort { peer, .. } => Some(*peer),
}
}
pub fn namespace(&self) -> Option<NamespaceId> {
match self {
AcceptError::Connect { .. } => None,
AcceptError::Open { .. } => None,
AcceptError::Sync { namespace, .. } => namespace.to_owned(),
AcceptError::Close { namespace, .. } => namespace.to_owned(),
AcceptError::Abort { namespace, .. } => Some(*namespace),
}
}
}
impl ConnectError {
fn connect(error: impl Into<anyhow::Error>) -> Self {
Self::Connect {
error: error.into(),
}
}
fn close(error: impl Into<anyhow::Error>) -> Self {
Self::Close {
error: error.into(),
}
}
pub(crate) fn sync(error: impl Into<anyhow::Error>) -> Self {
Self::Sync {
error: error.into(),
}
}
pub(crate) fn remote_abort(reason: AbortReason) -> Self {
Self::RemoteAbort(reason)
}
}