1use std::{
15 net::{SocketAddr, SocketAddrV4, SocketAddrV6},
16 sync::Arc,
17};
18
19use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr};
20use iroh_relay::{RelayConfig, RelayMap};
21use n0_error::{e, ensure, stack_error};
22use n0_watcher::Watcher;
23use tracing::{debug, instrument, trace, warn};
24use url::Url;
25
26#[cfg(feature = "qlog")]
27pub use self::quic::{QlogConfig, QlogFactory, QlogFileFactory};
28pub use self::{
29 connection::{
30 Accept, Accepting, AlpnError, AuthenticationError, Connecting, ConnectingError, Connection,
31 ConnectionInfo, ConnectionState, HandshakeCompleted, Incoming, IncomingZeroRtt,
32 IncomingZeroRttConnection, OutgoingZeroRtt, OutgoingZeroRttConnection,
33 RemoteEndpointIdError, ZeroRttStatus,
34 },
35 hooks::{AfterHandshakeOutcome, BeforeConnectOutcome, EndpointHooks},
36 quic::{
37 AcceptBi, AcceptUni, AckFrequencyConfig, AeadKey, ApplicationClose, Chunk, ClosedStream,
38 ConnectionClose, ConnectionError, ConnectionStats, Controller, ControllerFactory,
39 CryptoError, CryptoServerConfig, ExportKeyingMaterialError, FrameStats, HandshakeTokenKey,
40 IdleTimeout, MtuDiscoveryConfig, OpenBi, OpenUni, PathStats, QuicTransportConfig,
41 ReadDatagram, ReadError, ReadExactError, ReadToEndError, RecvStream, ResetError,
42 RetryError, SendDatagramError, SendStream, ServerConfig, Side, StoppedError, StreamId,
43 TransportError, TransportErrorCode, UdpStats, UnsupportedVersion, VarInt,
44 VarIntBoundsExceeded, WeakConnectionHandle, WriteError, Written,
45 },
46};
47use self::{hooks::EndpointHooksList, presets::Preset};
48#[cfg(wasm_browser)]
49use crate::discovery::pkarr::PkarrResolver;
50#[cfg(not(wasm_browser))]
51use crate::dns::DnsResolver;
52pub use crate::magicsock::{
53 DirectAddr, DirectAddrType, PathInfo,
54 remote_map::{PathInfoList, RemoteInfo, Source, TransportAddrInfo, TransportAddrUsage},
55 transports::TransportConfig,
56};
57use crate::{
58 discovery::{ConcurrentDiscovery, DiscoveryError, DynIntoDiscovery, IntoDiscovery, UserData},
59 magicsock::{self, Handle, RemoteStateActorStoppedError, mapped_addrs::MappedAddr},
60 metrics::EndpointMetrics,
61 net_report::Report,
62 tls::{self, DEFAULT_MAX_TLS_TICKETS},
63};
64
65mod connection;
66pub(crate) mod hooks;
67pub mod presets;
68mod quic;
69
70#[derive(Debug)]
77pub struct Builder {
78 secret_key: Option<SecretKey>,
79 alpn_protocols: Vec<Vec<u8>>,
80 transport_config: QuicTransportConfig,
81 keylog: bool,
82 discovery: Vec<Box<dyn DynIntoDiscovery>>,
83 discovery_user_data: Option<UserData>,
84 proxy_url: Option<Url>,
85 #[cfg(not(wasm_browser))]
86 dns_resolver: Option<DnsResolver>,
87 #[cfg(any(test, feature = "test-utils"))]
88 insecure_skip_relay_cert_verify: bool,
89 transports: Vec<TransportConfig>,
90 max_tls_tickets: usize,
91 hooks: EndpointHooksList,
92}
93
94impl From<RelayMode> for Option<TransportConfig> {
95 fn from(mode: RelayMode) -> Self {
96 match mode {
97 RelayMode::Disabled => None,
98 RelayMode::Default => Some(TransportConfig::Relay {
99 relay_map: mode.relay_map(),
100 }),
101 RelayMode::Staging => Some(TransportConfig::Relay {
102 relay_map: mode.relay_map(),
103 }),
104 RelayMode::Custom(relay_map) => Some(TransportConfig::Relay { relay_map }),
105 }
106 }
107}
108
109impl Builder {
110 pub fn new<P: Preset>(preset: P) -> Self {
117 Self::empty(RelayMode::Disabled).preset(preset)
118 }
119
120 pub fn preset<P: Preset>(mut self, preset: P) -> Self {
122 self = preset.apply(self);
123 self
124 }
125
126 pub fn empty(relay_mode: RelayMode) -> Self {
128 let mut transports = vec![
129 #[cfg(not(wasm_browser))]
130 TransportConfig::default_ipv4(),
131 #[cfg(not(wasm_browser))]
132 TransportConfig::default_ipv6(),
133 ];
134 if let Some(relay) = relay_mode.into() {
135 transports.push(relay);
136 }
137 Self {
138 secret_key: Default::default(),
139 alpn_protocols: Default::default(),
140 transport_config: QuicTransportConfig::default(),
141 keylog: Default::default(),
142 discovery: Default::default(),
143 discovery_user_data: Default::default(),
144 proxy_url: None,
145 #[cfg(not(wasm_browser))]
146 dns_resolver: None,
147 #[cfg(any(test, feature = "test-utils"))]
148 insecure_skip_relay_cert_verify: false,
149 max_tls_tickets: DEFAULT_MAX_TLS_TICKETS,
150 transports,
151 hooks: Default::default(),
152 }
153 }
154
155 pub async fn bind(self) -> Result<Endpoint, BindError> {
159 let mut rng = rand::rng();
160 let secret_key = self
161 .secret_key
162 .unwrap_or_else(move || SecretKey::generate(&mut rng));
163
164 let static_config = StaticConfig {
165 transport_config: self.transport_config.clone(),
166 tls_config: tls::TlsConfig::new(secret_key.clone(), self.max_tls_tickets),
167 keylog: self.keylog,
168 };
169 let server_config = static_config.create_server_config(self.alpn_protocols);
170
171 #[cfg(not(wasm_browser))]
172 let dns_resolver = self.dns_resolver.unwrap_or_default();
173
174 let metrics = EndpointMetrics::default();
175
176 let msock_opts = magicsock::Options {
177 transports: self.transports,
178 secret_key,
179 discovery_user_data: self.discovery_user_data,
180 proxy_url: self.proxy_url,
181 #[cfg(not(wasm_browser))]
182 dns_resolver,
183 server_config,
184 #[cfg(any(test, feature = "test-utils"))]
185 insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
186 metrics,
187 hooks: self.hooks,
188 };
189
190 let msock = magicsock::MagicSock::spawn(msock_opts).await?;
191 trace!("created magicsock");
192 debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created");
193
194 let ep = Endpoint {
195 msock,
196 static_config: Arc::new(static_config),
197 };
198
199 for create_service in self.discovery {
201 let service = create_service.into_discovery(&ep)?;
202 ep.discovery().add_boxed(service);
203 }
204
205 Ok(ep)
206 }
207
208 #[cfg(not(wasm_browser))]
217 pub fn bind_addr_v4(mut self, bind_addr: SocketAddrV4) -> Self {
218 self.transports.push(TransportConfig::Ip {
219 bind_addr: bind_addr.into(),
220 });
221 self
222 }
223
224 #[cfg(not(wasm_browser))]
231 pub fn bind_addr_v6(mut self, bind_addr: SocketAddrV6) -> Self {
232 self.transports.push(TransportConfig::Ip {
233 bind_addr: bind_addr.into(),
234 });
235 self
236 }
237
238 #[cfg(not(wasm_browser))]
240 pub fn clear_ip_transports(mut self) -> Self {
241 self.transports
242 .retain(|t| !matches!(t, TransportConfig::Ip { .. }));
243 self
244 }
245
246 pub fn clear_relay_transports(mut self) -> Self {
248 self.transports
249 .retain(|t| !matches!(t, TransportConfig::Relay { .. }));
250 self
251 }
252
253 pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
262 self.secret_key = Some(secret_key);
263 self
264 }
265
266 pub fn alpns(mut self, alpn_protocols: Vec<Vec<u8>>) -> Self {
273 self.alpn_protocols = alpn_protocols;
274 self
275 }
276
277 pub fn relay_mode(mut self, relay_mode: RelayMode) -> Self {
295 let transport: Option<_> = relay_mode.into();
296 match transport {
297 Some(transport) => {
298 if let Some(og) = self
299 .transports
300 .iter_mut()
301 .find(|t| matches!(t, TransportConfig::Relay { .. }))
302 {
303 *og = transport;
304 } else {
305 self.transports.push(transport);
306 }
307 }
308 None => {
309 self.transports
310 .retain(|t| !matches!(t, TransportConfig::Relay { .. }));
311 }
312 }
313 self
314 }
315
316 pub fn clear_discovery(mut self) -> Self {
323 self.discovery.clear();
324 self
325 }
326
327 pub fn discovery(mut self, discovery: impl IntoDiscovery) -> Self {
344 self.discovery.push(Box::new(discovery));
345 self
346 }
347
348 pub fn user_data_for_discovery(mut self, user_data: UserData) -> Self {
357 self.discovery_user_data = Some(user_data);
358 self
359 }
360
361 pub fn transport_config(mut self, transport_config: QuicTransportConfig) -> Self {
375 self.transport_config = transport_config;
376 self
377 }
378
379 #[cfg(not(wasm_browser))]
389 pub fn dns_resolver(mut self, dns_resolver: DnsResolver) -> Self {
390 self.dns_resolver = Some(dns_resolver);
391 self
392 }
393
394 pub fn proxy_url(mut self, url: Url) -> Self {
396 self.proxy_url.replace(url);
397 self
398 }
399
400 pub fn proxy_from_env(mut self) -> Self {
407 self.proxy_url = proxy_url_from_env();
408 self
409 }
410
411 pub fn keylog(mut self, keylog: bool) -> Self {
419 self.keylog = keylog;
420 self
421 }
422
423 #[cfg(any(test, feature = "test-utils"))]
427 pub fn insecure_skip_relay_cert_verify(mut self, skip_verify: bool) -> Self {
428 self.insecure_skip_relay_cert_verify = skip_verify;
429 self
430 }
431
432 pub fn max_tls_tickets(mut self, n: usize) -> Self {
439 self.max_tls_tickets = n;
440 self
441 }
442
443 pub fn hooks(mut self, hooks: impl EndpointHooks + 'static) -> Self {
454 self.hooks.push(hooks);
455 self
456 }
457}
458
459#[derive(Debug)]
461struct StaticConfig {
462 tls_config: tls::TlsConfig,
463 transport_config: QuicTransportConfig,
464 keylog: bool,
465}
466
467impl StaticConfig {
468 fn create_server_config(&self, alpn_protocols: Vec<Vec<u8>>) -> ServerConfig {
470 let quic_server_config = self
471 .tls_config
472 .make_server_config(alpn_protocols, self.keylog);
473 let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));
474 server_config.transport_config(self.transport_config.to_arc());
475
476 server_config
477 }
478}
479
480#[derive(Clone, Debug)]
506pub struct Endpoint {
507 pub(crate) msock: Handle,
509 static_config: Arc<StaticConfig>,
511}
512
513#[allow(missing_docs)]
514#[stack_error(derive, add_meta, from_sources)]
515#[non_exhaustive]
516#[allow(private_interfaces)]
517pub enum ConnectWithOptsError {
518 #[error("Connecting to ourself is not supported")]
519 SelfConnect,
520 #[error("No addressing information available")]
521 NoAddress { source: DiscoveryError },
522 #[error("Unable to connect to remote")]
523 Quinn {
524 #[error(std_err)]
525 source: quinn_proto::ConnectError,
526 },
527 #[error("Internal consistency error")]
528 InternalConsistencyError {
529 source: RemoteStateActorStoppedError,
531 },
532 #[error("Connection was rejected locally")]
533 LocallyRejected,
534}
535
536#[allow(missing_docs)]
537#[stack_error(derive, add_meta, from_sources)]
538#[non_exhaustive]
539pub enum ConnectError {
540 #[error(transparent)]
541 Connect { source: ConnectWithOptsError },
542 #[error(transparent)]
543 Connecting { source: ConnectingError },
544 #[error(transparent)]
545 Connection {
546 #[error(std_err)]
547 source: ConnectionError,
548 },
549}
550
551#[allow(missing_docs)]
552#[stack_error(derive, add_meta, from_sources)]
553#[non_exhaustive]
554pub enum BindError {
555 #[error(transparent)]
556 MagicSpawn {
557 source: magicsock::CreateHandleError,
558 },
559 #[error(transparent)]
560 Discovery {
561 source: crate::discovery::IntoDiscoveryError,
562 },
563}
564
565impl Endpoint {
566 pub fn builder() -> Builder {
576 Builder::new(presets::N0)
577 }
578
579 pub fn empty_builder(relay_mode: RelayMode) -> Builder {
583 Builder::empty(relay_mode)
584 }
585
586 pub async fn bind() -> Result<Self, BindError> {
590 Self::builder().bind().await
591 }
592
593 pub fn set_alpns(&self, alpns: Vec<Vec<u8>>) {
598 let server_config = self.static_config.create_server_config(alpns);
599 self.msock.endpoint().set_server_config(Some(server_config));
600 }
601
602 pub async fn insert_relay(
606 &self,
607 relay: RelayUrl,
608 config: Arc<RelayConfig>,
609 ) -> Option<Arc<RelayConfig>> {
610 self.msock.insert_relay(relay, config).await
611 }
612
613 pub async fn remove_relay(&self, relay: &RelayUrl) -> Option<Arc<RelayConfig>> {
617 self.msock.remove_relay(relay).await
618 }
619
620 pub async fn connect(
646 &self,
647 endpoint_addr: impl Into<EndpointAddr>,
648 alpn: &[u8],
649 ) -> Result<Connection, ConnectError> {
650 let endpoint_addr = endpoint_addr.into();
651 let remote = endpoint_addr.id;
652 let connecting = self
653 .connect_with_opts(endpoint_addr, alpn, Default::default())
654 .await?;
655 let conn = connecting.await?;
656
657 debug!(
658 me = %self.id().fmt_short(),
659 remote = %remote.fmt_short(),
660 alpn = %String::from_utf8_lossy(alpn),
661 "Connection established."
662 );
663 Ok(conn)
664 }
665
666 #[instrument(name = "connect", skip_all, fields(
681 me = %self.id().fmt_short(),
682 remote = tracing::field::Empty,
683 alpn = String::from_utf8_lossy(alpn).to_string(),
684 ))]
685 pub async fn connect_with_opts(
686 &self,
687 endpoint_addr: impl Into<EndpointAddr>,
688 alpn: &[u8],
689 options: ConnectOptions,
690 ) -> Result<Connecting, ConnectWithOptsError> {
691 let endpoint_addr: EndpointAddr = endpoint_addr.into();
692 if let BeforeConnectOutcome::Reject =
693 self.msock.hooks.before_connect(&endpoint_addr, alpn).await
694 {
695 return Err(e!(ConnectWithOptsError::LocallyRejected));
696 }
697 let endpoint_id = endpoint_addr.id;
698
699 tracing::Span::current().record("remote", tracing::field::display(endpoint_id.fmt_short()));
700
701 ensure!(endpoint_id != self.id(), ConnectWithOptsError::SelfConnect);
703
704 trace!(
705 dst_endpoint_id = %endpoint_id.fmt_short(),
706 relay_url = ?endpoint_addr.relay_urls().next().cloned(),
707 ip_addresses = ?endpoint_addr.ip_addrs().cloned().collect::<Vec<_>>(),
708 "connecting",
709 );
710
711 let mapped_addr = self.msock.resolve_remote(endpoint_addr).await??;
712
713 let transport_config = options
714 .transport_config
715 .map(|cfg| cfg.to_arc())
716 .unwrap_or(self.static_config.transport_config.to_arc());
717
718 let client_config = {
722 let mut alpn_protocols = vec![alpn.to_vec()];
723 alpn_protocols.extend(options.additional_alpns);
724 let quic_client_config = self
725 .static_config
726 .tls_config
727 .make_client_config(alpn_protocols, self.static_config.keylog);
728 let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
729 client_config.transport_config(transport_config.clone());
730 client_config
731 };
732
733 let dest_addr = mapped_addr.private_socket_addr();
734 let server_name = &tls::name::encode(endpoint_id);
735 let connect = self
736 .msock
737 .endpoint()
738 .connect_with(client_config, dest_addr, server_name)?;
739
740 Ok(Connecting::new(connect, self.clone(), endpoint_id))
741 }
742
743 pub fn accept(&self) -> Accept<'_> {
752 Accept {
753 inner: self.msock.endpoint().accept(),
754 ep: self.clone(),
755 }
756 }
757
758 pub fn secret_key(&self) -> &SecretKey {
762 &self.static_config.tls_config.secret_key
763 }
764
765 pub fn id(&self) -> EndpointId {
770 self.static_config.tls_config.secret_key.public()
771 }
772
773 pub fn addr(&self) -> EndpointAddr {
786 self.watch_addr().get()
787 }
788
789 #[cfg(not(wasm_browser))]
824 pub fn watch_addr(&self) -> impl n0_watcher::Watcher<Value = EndpointAddr> + use<> {
825 let watch_addrs = self.msock.ip_addrs();
826 let watch_relay = self.msock.home_relay();
827 let endpoint_id = self.id();
828
829 watch_addrs.or(watch_relay).map(move |(addrs, relays)| {
830 EndpointAddr::from_parts(
831 endpoint_id,
832 relays
833 .into_iter()
834 .map(TransportAddr::Relay)
835 .chain(addrs.into_iter().map(|x| TransportAddr::Ip(x.addr))),
836 )
837 })
838 }
839
840 #[cfg(wasm_browser)]
846 pub fn watch_addr(&self) -> impl n0_watcher::Watcher<Value = EndpointAddr> + use<> {
847 let watch_relay = self.msock.home_relay();
851 let endpoint_id = self.id();
852 watch_relay.map(move |mut relays| {
853 EndpointAddr::from_parts(endpoint_id, relays.into_iter().map(TransportAddr::Relay))
854 })
855 }
856
857 pub async fn online(&self) {
874 self.msock.home_relay().initialized().await;
875 }
876
877 #[doc(hidden)]
906 pub fn net_report(&self) -> impl Watcher<Value = Option<Report>> + use<> {
907 self.msock.net_report()
908 }
909
910 pub fn bound_sockets(&self) -> Vec<SocketAddr> {
915 self.msock
916 .local_addr()
917 .into_iter()
918 .filter_map(|addr| addr.into_socket_addr())
919 .collect()
920 }
921
922 #[cfg(not(wasm_browser))]
930 pub fn dns_resolver(&self) -> &DnsResolver {
931 self.msock.dns_resolver()
932 }
933
934 pub fn discovery(&self) -> &ConcurrentDiscovery {
938 self.msock.discovery()
939 }
940
941 #[cfg(feature = "metrics")]
1056 pub fn metrics(&self) -> &EndpointMetrics {
1057 &self.msock.metrics
1058 }
1059
1060 pub async fn remote_info(&self, endpoint_id: EndpointId) -> Option<RemoteInfo> {
1069 self.msock.remote_info(endpoint_id).await
1070 }
1071
1072 pub async fn network_change(&self) {
1085 self.msock.network_change().await;
1086 }
1087
1088 pub fn set_user_data_for_discovery(&self, user_data: Option<UserData>) {
1098 self.msock.set_user_data_for_discovery(user_data);
1099 }
1100
1101 pub async fn close(&self) {
1136 if self.is_closed() {
1137 return;
1138 }
1139
1140 tracing::debug!("Connections closed");
1141 self.msock.close().await;
1142 }
1143
1144 pub fn is_closed(&self) -> bool {
1146 self.msock.is_closed()
1147 }
1148
1149 #[cfg(test)]
1152 pub(crate) fn magic_sock(&self) -> Handle {
1153 self.msock.clone()
1154 }
1155 #[cfg(test)]
1156 pub(crate) fn endpoint(&self) -> &quinn::Endpoint {
1157 self.msock.endpoint()
1158 }
1159}
1160
1161#[derive(Default, Debug, Clone)]
1163pub struct ConnectOptions {
1164 transport_config: Option<QuicTransportConfig>,
1165 additional_alpns: Vec<Vec<u8>>,
1166}
1167
1168impl ConnectOptions {
1169 pub fn new() -> Self {
1174 Self::default()
1175 }
1176
1177 pub fn with_transport_config(mut self, transport_config: QuicTransportConfig) -> Self {
1179 self.transport_config = Some(transport_config);
1180 self
1181 }
1182
1183 pub fn with_additional_alpns(mut self, alpns: Vec<Vec<u8>>) -> Self {
1204 self.additional_alpns = alpns;
1205 self
1206 }
1207}
1208
1209fn proxy_url_from_env() -> Option<Url> {
1216 if let Some(url) = std::env::var("HTTP_PROXY")
1217 .ok()
1218 .and_then(|s| s.parse::<Url>().ok())
1219 {
1220 if is_cgi() {
1221 warn!("HTTP_PROXY environment variable ignored in CGI");
1222 } else {
1223 return Some(url);
1224 }
1225 }
1226 if let Some(url) = std::env::var("http_proxy")
1227 .ok()
1228 .and_then(|s| s.parse::<Url>().ok())
1229 {
1230 return Some(url);
1231 }
1232 if let Some(url) = std::env::var("HTTPS_PROXY")
1233 .ok()
1234 .and_then(|s| s.parse::<Url>().ok())
1235 {
1236 return Some(url);
1237 }
1238 if let Some(url) = std::env::var("https_proxy")
1239 .ok()
1240 .and_then(|s| s.parse::<Url>().ok())
1241 {
1242 return Some(url);
1243 }
1244
1245 None
1246}
1247
1248#[derive(Debug, Clone, PartialEq, Eq)]
1250pub enum RelayMode {
1251 Disabled,
1254 Default,
1258 Staging,
1260 Custom(RelayMap),
1262}
1263
1264impl RelayMode {
1265 pub fn relay_map(&self) -> RelayMap {
1267 match self {
1268 RelayMode::Disabled => RelayMap::empty(),
1269 RelayMode::Default => crate::defaults::prod::default_relay_map(),
1270 RelayMode::Staging => crate::defaults::staging::default_relay_map(),
1271 RelayMode::Custom(relay_map) => relay_map.clone(),
1272 }
1273 }
1274}
1275
1276pub const ENV_FORCE_STAGING_RELAYS: &str = "IROH_FORCE_STAGING_RELAYS";
1278
1279pub fn force_staging_infra() -> bool {
1281 matches!(std::env::var(ENV_FORCE_STAGING_RELAYS), Ok(value) if !value.is_empty())
1282}
1283
1284pub fn default_relay_mode() -> RelayMode {
1289 match force_staging_infra() {
1291 true => RelayMode::Staging,
1292 false => RelayMode::Default,
1293 }
1294}
1295
1296fn is_cgi() -> bool {
1301 std::env::var_os("REQUEST_METHOD").is_some()
1302}
1303
1304#[cfg(test)]
1307mod tests {
1308 use std::{
1309 sync::Arc,
1310 time::{Duration, Instant},
1311 };
1312
1313 use iroh_base::{EndpointAddr, EndpointId, SecretKey, TransportAddr};
1314 use n0_error::{AnyError as Error, Result, StdResultExt};
1315 use n0_future::{BufferedStreamExt, StreamExt, stream, time};
1316 use n0_watcher::Watcher;
1317 use quinn::ConnectionError;
1318 use rand::SeedableRng;
1319 use tokio::sync::oneshot;
1320 use tracing::{Instrument, error_span, info, info_span, instrument};
1321 use tracing_test::traced_test;
1322
1323 use super::Endpoint;
1324 use crate::{
1325 RelayMap, RelayMode,
1326 discovery::static_provider::StaticProvider,
1327 endpoint::{ConnectOptions, Connection},
1328 protocol::{AcceptError, ProtocolHandler, Router},
1329 test_utils::{QlogFileGroup, run_relay_server, run_relay_server_with},
1330 };
1331
1332 const TEST_ALPN: &[u8] = b"n0/iroh/test";
1333
1334 #[tokio::test]
1335 #[traced_test]
1336 async fn test_connect_self() -> Result {
1337 let ep = Endpoint::empty_builder(RelayMode::Disabled)
1338 .alpns(vec![TEST_ALPN.to_vec()])
1339 .bind()
1340 .await
1341 .unwrap();
1342 let my_addr = ep.addr();
1343 let res = ep.connect(my_addr.clone(), TEST_ALPN).await;
1344 assert!(res.is_err());
1345 let err = res.err().unwrap();
1346 assert!(err.to_string().starts_with("Connecting to ourself"));
1347
1348 Ok(())
1349 }
1350
1351 #[tokio::test]
1352 #[traced_test]
1353 async fn endpoint_connect_close() -> Result {
1354 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1355 let (relay_map, relay_url, _guard) = run_relay_server().await?;
1356 let server_secret_key = SecretKey::generate(&mut rng);
1357 let server_peer_id = server_secret_key.public();
1358
1359 let qlog = QlogFileGroup::from_env("endpoint_connect_close");
1360
1361 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1363 .secret_key(server_secret_key)
1364 .transport_config(qlog.create("server")?)
1365 .alpns(vec![TEST_ALPN.to_vec()])
1366 .insecure_skip_relay_cert_verify(true)
1367 .bind()
1368 .await?;
1369 ep.online().await;
1371
1372 let server = tokio::spawn(
1373 async move {
1374 info!("accepting connection");
1375 let incoming = ep.accept().await.anyerr()?;
1376 let conn = incoming.await.anyerr()?;
1377 let mut stream = conn.accept_uni().await.anyerr()?;
1378 let mut buf = [0u8; 5];
1379 stream.read_exact(&mut buf).await.anyerr()?;
1380 info!("Accepted 1 stream, received {buf:?}. Closing now.");
1381 conn.close(7u8.into(), b"bye");
1383
1384 let res = conn.accept_uni().await;
1385 assert_eq!(res.unwrap_err(), quinn::ConnectionError::LocallyClosed);
1386
1387 let res = stream.read_to_end(10).await;
1388 assert_eq!(
1389 res.unwrap_err(),
1390 quinn::ReadToEndError::Read(quinn::ReadError::ConnectionLost(
1391 quinn::ConnectionError::LocallyClosed
1392 ))
1393 );
1394 info!("server test completed");
1395 Ok::<_, Error>(())
1396 }
1397 .instrument(info_span!("test-server")),
1398 );
1399
1400 let client = tokio::spawn(
1401 async move {
1402 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1403 .alpns(vec![TEST_ALPN.to_vec()])
1404 .insecure_skip_relay_cert_verify(true)
1405 .transport_config(qlog.create("client")?)
1406 .bind()
1407 .await?;
1408 info!("client connecting");
1409 let endpoint_addr = EndpointAddr::new(server_peer_id).with_relay_url(relay_url);
1410 let conn = ep.connect(endpoint_addr, TEST_ALPN).await?;
1411 let mut stream = conn.open_uni().await.anyerr()?;
1412
1413 stream.write_all(b"hello").await.anyerr()?;
1418
1419 info!("waiting for closed");
1420 let err = conn.closed().await;
1422 let expected_err =
1423 quinn::ConnectionError::ApplicationClosed(quinn::ApplicationClose {
1424 error_code: 7u8.into(),
1425 reason: b"bye".to_vec().into(),
1426 });
1427 assert_eq!(err, expected_err);
1428
1429 info!("opening new - expect it to fail");
1430 let res = conn.open_uni().await;
1431 assert_eq!(res.unwrap_err(), expected_err);
1432 info!("client test completed");
1433 Ok::<_, Error>(())
1434 }
1435 .instrument(info_span!("test-client")),
1436 );
1437
1438 let (server, client) = tokio::time::timeout(
1439 Duration::from_secs(30),
1440 n0_future::future::zip(server, client),
1441 )
1442 .await
1443 .anyerr()?;
1444 server.anyerr()??;
1445 client.anyerr()??;
1446 Ok(())
1447 }
1448
1449 #[tokio::test]
1450 #[traced_test]
1451 async fn endpoint_relay_connect_loop() -> Result {
1452 let test_start = Instant::now();
1453 let n_clients = 5;
1454 let n_chunks_per_client = 2;
1455 let chunk_size = 10;
1456 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
1457 let (relay_map, relay_url, _relay_guard) = run_relay_server().await.unwrap();
1458 let server_secret_key = SecretKey::generate(&mut rng);
1459 let server_endpoint_id = server_secret_key.public();
1460
1461 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1463 .insecure_skip_relay_cert_verify(true)
1464 .secret_key(server_secret_key)
1465 .alpns(vec![TEST_ALPN.to_vec()])
1466 .bind()
1467 .await?;
1468 ep.online().await;
1470
1471 info!(time = ?test_start.elapsed(), "test setup done");
1472
1473 let server = tokio::spawn(
1475 async move {
1476 let eps = ep.bound_sockets();
1477
1478 info!(me = %ep.id().fmt_short(), eps = ?eps, "server listening on");
1479 for i in 0..n_clients {
1480 let round_start = Instant::now();
1481 info!("[server] round {i}");
1482 let incoming = ep.accept().await.anyerr()?;
1483 let conn = incoming.await.anyerr()?;
1484 let endpoint_id = conn.remote_id();
1485 info!(%i, peer = %endpoint_id.fmt_short(), "accepted connection");
1486 let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
1487 let mut buf = vec![0u8; chunk_size];
1488 for _i in 0..n_chunks_per_client {
1489 recv.read_exact(&mut buf).await.anyerr()?;
1490 send.write_all(&buf).await.anyerr()?;
1491 }
1492 send.finish().anyerr()?;
1493 conn.closed().await; info!(%i, peer = %endpoint_id.fmt_short(), "finished");
1495 info!("[server] round {i} done in {:?}", round_start.elapsed());
1496 }
1497 Ok::<_, Error>(())
1498 }
1499 .instrument(error_span!("server")),
1500 );
1501
1502 let start = Instant::now();
1503
1504 for i in 0..n_clients {
1505 let round_start = Instant::now();
1506 info!("[client] round {i}");
1507 let client_secret_key = SecretKey::generate(&mut rng);
1508 async {
1509 info!("client binding");
1510 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1511 .alpns(vec![TEST_ALPN.to_vec()])
1512 .insecure_skip_relay_cert_verify(true)
1513 .secret_key(client_secret_key)
1514 .bind()
1515 .await?;
1516 let eps = ep.bound_sockets();
1517
1518 info!(me = %ep.id().fmt_short(), eps=?eps, "client bound");
1519 let endpoint_addr =
1520 EndpointAddr::new(server_endpoint_id).with_relay_url(relay_url.clone());
1521 info!(to = ?endpoint_addr, "client connecting");
1522 let conn = ep.connect(endpoint_addr, TEST_ALPN).await.anyerr()?;
1523 info!("client connected");
1524 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
1525
1526 for i in 0..n_chunks_per_client {
1527 let mut buf = vec![i; chunk_size];
1528 send.write_all(&buf).await.anyerr()?;
1529 recv.read_exact(&mut buf).await.anyerr()?;
1530 assert_eq!(buf, vec![i; chunk_size]);
1531 }
1532 conn.close(0u32.into(), b"bye!");
1534 info!("client finished");
1535 ep.close().await;
1536 info!("client closed");
1537 Ok::<_, Error>(())
1538 }
1539 .instrument(error_span!("client", %i))
1540 .await?;
1541 info!("[client] round {i} done in {:?}", round_start.elapsed());
1542 }
1543
1544 server.await.anyerr()??;
1545
1546 if start.elapsed() > Duration::from_secs(15) {
1550 panic!("Test too slow, something went wrong");
1551 }
1552
1553 Ok(())
1554 }
1555
1556 #[tokio::test]
1557 #[traced_test]
1558 async fn endpoint_send_relay() -> Result {
1559 let (relay_map, _relay_url, _guard) = run_relay_server().await?;
1560 let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1561 .insecure_skip_relay_cert_verify(true)
1562 .bind()
1563 .await?;
1564 let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1565 .insecure_skip_relay_cert_verify(true)
1566 .alpns(vec![TEST_ALPN.to_vec()])
1567 .bind()
1568 .await?;
1569
1570 let task = tokio::spawn({
1571 let server = server.clone();
1572 async move {
1573 let Some(conn) = server.accept().await else {
1574 n0_error::bail_any!("Expected an incoming connection");
1575 };
1576 let conn = conn.await.anyerr()?;
1577 let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
1578 let data = recv.read_to_end(1000).await.anyerr()?;
1579 send.write_all(&data).await.anyerr()?;
1580 send.finish().anyerr()?;
1581 conn.closed().await;
1582
1583 Ok::<_, Error>(())
1584 }
1585 });
1586
1587 let addr = server.addr();
1588 let conn = client.connect(addr, TEST_ALPN).await?;
1589 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
1590 send.write_all(b"Hello, world!").await.anyerr()?;
1591 send.finish().anyerr()?;
1592 let data = recv.read_to_end(1000).await.anyerr()?;
1593 conn.close(0u32.into(), b"bye!");
1594
1595 task.await.anyerr()??;
1596
1597 client.close().await;
1598 server.close().await;
1599
1600 assert_eq!(&data, b"Hello, world!");
1601
1602 Ok(())
1603 }
1604
1605 #[tokio::test]
1606 #[traced_test]
1607 async fn endpoint_two_direct_only() -> Result {
1608 let ep1 = {
1611 let span = info_span!("server");
1612 let _guard = span.enter();
1613 Endpoint::builder()
1614 .alpns(vec![TEST_ALPN.to_vec()])
1615 .relay_mode(RelayMode::Disabled)
1616 .bind()
1617 .await?
1618 };
1619 let ep2 = {
1620 let span = info_span!("client");
1621 let _guard = span.enter();
1622 Endpoint::builder()
1623 .alpns(vec![TEST_ALPN.to_vec()])
1624 .relay_mode(RelayMode::Disabled)
1625 .bind()
1626 .await?
1627 };
1628 let ep1_nodeaddr = ep1.addr();
1629
1630 #[instrument(name = "client", skip_all)]
1631 async fn connect(ep: Endpoint, dst: EndpointAddr) -> Result<quinn::ConnectionError> {
1632 info!(me = %ep.id().fmt_short(), "client starting");
1633 let conn = ep.connect(dst, TEST_ALPN).await?;
1634 let mut send = conn.open_uni().await.anyerr()?;
1635 send.write_all(b"hello").await.anyerr()?;
1636 send.finish().anyerr()?;
1637 Ok(conn.closed().await)
1638 }
1639
1640 #[instrument(name = "server", skip_all)]
1641 async fn accept(ep: Endpoint, src: EndpointId) -> Result {
1642 info!(me = %ep.id().fmt_short(), "server starting");
1643 let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1644 let node_id = conn.remote_id();
1645 assert_eq!(node_id, src);
1646 let mut recv = conn.accept_uni().await.anyerr()?;
1647 let msg = recv.read_to_end(100).await.anyerr()?;
1648 assert_eq!(msg, b"hello");
1649 Ok(())
1651 }
1652
1653 let ep1_accept = tokio::spawn(accept(ep1.clone(), ep2.id()));
1654 let ep2_connect = tokio::spawn(connect(ep2.clone(), ep1_nodeaddr));
1655
1656 ep1_accept.await.anyerr()??;
1657 let conn_closed = dbg!(ep2_connect.await.anyerr()??);
1658 assert!(matches!(
1659 conn_closed,
1660 ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
1661 ));
1662
1663 Ok(())
1664 }
1665
1666 #[tokio::test]
1667 #[traced_test]
1668 async fn endpoint_two_relay_only_becomes_direct() -> Result {
1669 let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
1672 let (node_addr_tx, node_addr_rx) = oneshot::channel();
1673 let qlog = Arc::new(QlogFileGroup::from_env("two_relay_only_becomes_direct"));
1674
1675 #[instrument(name = "client", skip_all)]
1676 async fn connect(
1677 relay_map: RelayMap,
1678 node_addr_rx: oneshot::Receiver<EndpointAddr>,
1679 qlog: Arc<QlogFileGroup>,
1680 ) -> Result<quinn::ConnectionError> {
1681 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1682 let secret = SecretKey::generate(&mut rng);
1683 let ep = Endpoint::builder()
1684 .secret_key(secret)
1685 .alpns(vec![TEST_ALPN.to_vec()])
1686 .insecure_skip_relay_cert_verify(true)
1687 .relay_mode(RelayMode::Custom(relay_map))
1688 .transport_config(qlog.create("client")?)
1689 .bind()
1690 .await?;
1691 info!(me = %ep.id().fmt_short(), "client starting");
1692 let dst = node_addr_rx.await.anyerr()?;
1693
1694 info!(me = %ep.id().fmt_short(), "client connecting");
1695 let conn = ep.connect(dst, TEST_ALPN).await?;
1696 let mut send = conn.open_uni().await.anyerr()?;
1697 send.write_all(b"hello").await.anyerr()?;
1698 let mut paths = conn.paths().stream();
1699 info!("Waiting for direct connection");
1700 while let Some(infos) = paths.next().await {
1701 info!(?infos, "new PathInfos");
1702 if infos.iter().any(|info| info.is_ip()) {
1703 break;
1704 }
1705 }
1706 info!("Have direct connection");
1707 send.write_all(b"close please").await.anyerr()?;
1708 send.finish().anyerr()?;
1709 Ok(conn.closed().await)
1710 }
1711
1712 #[instrument(name = "server", skip_all)]
1713 async fn accept(
1714 relay_map: RelayMap,
1715 node_addr_tx: oneshot::Sender<EndpointAddr>,
1716 qlog: Arc<QlogFileGroup>,
1717 ) -> Result {
1718 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64);
1719 let secret = SecretKey::generate(&mut rng);
1720 let ep = Endpoint::builder()
1721 .secret_key(secret)
1722 .alpns(vec![TEST_ALPN.to_vec()])
1723 .insecure_skip_relay_cert_verify(true)
1724 .transport_config(qlog.create("server")?)
1725 .relay_mode(RelayMode::Custom(relay_map))
1726 .bind()
1727 .await?;
1728 ep.online().await;
1729 let mut node_addr = ep.addr();
1730 node_addr.addrs.retain(|addr| addr.is_relay());
1731 node_addr_tx.send(node_addr).unwrap();
1732
1733 info!(me = %ep.id().fmt_short(), "server starting");
1734 let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1735 let mut recv = conn.accept_uni().await.anyerr()?;
1738 let mut msg = [0u8; 5];
1739 recv.read_exact(&mut msg).await.anyerr()?;
1740 assert_eq!(&msg, b"hello");
1741 info!("received hello");
1742 let msg = recv.read_to_end(100).await.anyerr()?;
1743 assert_eq!(msg, b"close please");
1744 info!("received 'close please'");
1745 Ok(())
1747 }
1748
1749 let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx, qlog.clone()));
1750 let client_task = tokio::spawn(connect(relay_map, node_addr_rx, qlog));
1751
1752 server_task.await.anyerr()??;
1753 let conn_closed = dbg!(client_task.await.anyerr()??);
1754 assert!(matches!(
1755 conn_closed,
1756 ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
1757 ));
1758
1759 Ok(())
1760 }
1761
1762 #[tokio::test]
1763 #[traced_test]
1764 async fn endpoint_two_relay_only_no_ip() -> Result {
1765 let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
1768 let (node_addr_tx, node_addr_rx) = oneshot::channel();
1769
1770 #[instrument(name = "client", skip_all)]
1771 async fn connect(
1772 relay_map: RelayMap,
1773 node_addr_rx: oneshot::Receiver<EndpointAddr>,
1774 ) -> Result<quinn::ConnectionError> {
1775 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1776 let secret = SecretKey::generate(&mut rng);
1777 let ep = Endpoint::builder()
1778 .secret_key(secret)
1779 .alpns(vec![TEST_ALPN.to_vec()])
1780 .insecure_skip_relay_cert_verify(true)
1781 .relay_mode(RelayMode::Custom(relay_map))
1782 .clear_ip_transports() .bind()
1784 .await?;
1785 info!(me = %ep.id().fmt_short(), "client starting");
1786 let dst = node_addr_rx.await.anyerr()?;
1787
1788 info!(me = %ep.id().fmt_short(), "client connecting");
1789 let conn = ep.connect(dst, TEST_ALPN).await?;
1790 let mut send = conn.open_uni().await.anyerr()?;
1791 send.write_all(b"hello").await.anyerr()?;
1792 let mut paths = conn.paths().stream();
1793 info!("Waiting for connection");
1794 'outer: while let Some(infos) = paths.next().await {
1795 info!(?infos, "new PathInfos");
1796 for info in infos {
1797 if info.is_ip() {
1798 panic!("should not happen: {:?}", info);
1799 }
1800 if info.is_relay() {
1801 break 'outer;
1802 }
1803 }
1804 }
1805 info!("Have relay connection");
1806 send.write_all(b"close please").await.anyerr()?;
1807 send.finish().anyerr()?;
1808 Ok(conn.closed().await)
1809 }
1810
1811 #[instrument(name = "server", skip_all)]
1812 async fn accept(
1813 relay_map: RelayMap,
1814 node_addr_tx: oneshot::Sender<EndpointAddr>,
1815 ) -> Result {
1816 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64);
1817 let secret = SecretKey::generate(&mut rng);
1818 let ep = Endpoint::builder()
1819 .secret_key(secret)
1820 .alpns(vec![TEST_ALPN.to_vec()])
1821 .insecure_skip_relay_cert_verify(true)
1822 .relay_mode(RelayMode::Custom(relay_map))
1823 .clear_ip_transports()
1824 .bind()
1825 .await?;
1826 ep.online().await;
1827 let node_addr = ep.addr();
1828 node_addr_tx.send(node_addr).unwrap();
1829
1830 info!(me = %ep.id().fmt_short(), "server starting");
1831 let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1832 let mut recv = conn.accept_uni().await.anyerr()?;
1835 let mut msg = [0u8; 5];
1836 recv.read_exact(&mut msg).await.anyerr()?;
1837 assert_eq!(&msg, b"hello");
1838 info!("received hello");
1839 let msg = recv.read_to_end(100).await.anyerr()?;
1840 assert_eq!(msg, b"close please");
1841 info!("received 'close please'");
1842 Ok(())
1844 }
1845
1846 let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx));
1847 let client_task = tokio::spawn(connect(relay_map, node_addr_rx));
1848
1849 server_task.await.anyerr()??;
1850 let conn_closed = dbg!(client_task.await.anyerr()??);
1851 assert!(matches!(
1852 conn_closed,
1853 ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
1854 ));
1855
1856 Ok(())
1857 }
1858
1859 #[tokio::test]
1860 #[traced_test]
1861 async fn endpoint_two_direct_add_relay() -> Result {
1862 let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
1865 let (node_addr_tx, node_addr_rx) = oneshot::channel();
1866
1867 #[instrument(name = "client", skip_all)]
1868 async fn connect(
1869 relay_map: RelayMap,
1870 node_addr_rx: oneshot::Receiver<EndpointAddr>,
1871 ) -> Result<()> {
1872 let secret = SecretKey::from([0u8; 32]);
1873 let ep = Endpoint::builder()
1874 .secret_key(secret)
1875 .alpns(vec![TEST_ALPN.to_vec()])
1876 .insecure_skip_relay_cert_verify(true)
1877 .relay_mode(RelayMode::Custom(relay_map))
1878 .bind()
1879 .await?;
1880 info!(me = %ep.id().fmt_short(), "client starting");
1881 let dst = node_addr_rx.await.anyerr()?;
1882
1883 info!(me = %ep.id().fmt_short(), "client connecting");
1884 let conn = ep.connect(dst, TEST_ALPN).await?;
1885 info!(me = %ep.id().fmt_short(), "client connected");
1886
1887 let path_info = conn.paths().get();
1890 assert_eq!(path_info.len(), 1);
1891 assert!(path_info.iter().next().unwrap().is_ip());
1892
1893 let mut paths = conn.paths().stream();
1894 time::timeout(Duration::from_secs(5), async move {
1895 while let Some(infos) = paths.next().await {
1896 info!(?infos, "new PathInfos");
1897 if infos.iter().any(|info| info.is_relay()) {
1898 info!("client has a relay path");
1899 break;
1900 }
1901 }
1902 })
1903 .await
1904 .anyerr()?;
1905
1906 let mut stream = conn.accept_uni().await.anyerr()?;
1908 stream.read_to_end(100).await.anyerr()?;
1909
1910 info!("client closing");
1911 conn.close(0u8.into(), b"");
1912 ep.close().await;
1913 Ok(())
1914 }
1915
1916 #[instrument(name = "server", skip_all)]
1917 async fn accept(
1918 relay_map: RelayMap,
1919 node_addr_tx: oneshot::Sender<EndpointAddr>,
1920 ) -> Result<quinn::ConnectionError> {
1921 let secret = SecretKey::from([1u8; 32]);
1922 let ep = Endpoint::builder()
1923 .secret_key(secret)
1924 .alpns(vec![TEST_ALPN.to_vec()])
1925 .insecure_skip_relay_cert_verify(true)
1926 .relay_mode(RelayMode::Custom(relay_map))
1927 .bind()
1928 .await?;
1929 ep.online().await;
1930 let node_addr = ep.addr();
1931 node_addr_tx.send(node_addr).unwrap();
1932
1933 info!(me = %ep.id().fmt_short(), "server starting");
1934 let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1935 info!(me = %ep.id().fmt_short(), "server accepted connection");
1936
1937 let mut paths = conn.paths().stream();
1941 time::timeout(Duration::from_secs(5), async move {
1942 while let Some(infos) = paths.next().await {
1943 info!(?infos, "new PathInfos");
1944 if infos.iter().any(|path| path.is_relay()) {
1945 info!("server has a relay path");
1946 break;
1947 }
1948 }
1949 })
1950 .await
1951 .anyerr()?;
1952
1953 let mut stream = conn.open_uni().await.anyerr()?;
1954 stream.write_all(b"have relay").await.anyerr()?;
1955 stream.finish().anyerr()?;
1956 info!("waiting conn.closed()");
1957
1958 Ok(conn.closed().await)
1959 }
1960
1961 let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx));
1962 let client_task = tokio::spawn(connect(relay_map, node_addr_rx));
1963
1964 client_task.await.anyerr()??;
1965 let conn_closed = dbg!(server_task.await.anyerr()??);
1966 assert!(matches!(
1967 conn_closed,
1968 ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
1969 ));
1970
1971 Ok(())
1972 }
1973
1974 #[tokio::test]
1975 #[traced_test]
1976 async fn endpoint_relay_map_change() -> Result {
1977 let (relay_map, relay_url, _guard1) = run_relay_server().await?;
1978 let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1979 .insecure_skip_relay_cert_verify(true)
1980 .bind()
1981 .await?;
1982 let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1983 .insecure_skip_relay_cert_verify(true)
1984 .alpns(vec![TEST_ALPN.to_vec()])
1985 .bind()
1986 .await?;
1987
1988 let task = tokio::spawn({
1989 let server = server.clone();
1990 async move {
1991 for i in 0..2 {
1992 println!("accept: round {i}");
1993 let Some(conn) = server.accept().await else {
1994 n0_error::bail_any!("Expected an incoming connection");
1995 };
1996 let conn = conn.await.anyerr()?;
1997 let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
1998 let data = recv.read_to_end(1000).await.anyerr()?;
1999 send.write_all(&data).await.anyerr()?;
2000 send.finish().anyerr()?;
2001 conn.closed().await;
2002 }
2003 Ok::<_, Error>(())
2004 }
2005 });
2006
2007 server.online().await;
2008
2009 let mut addr = server.addr();
2010 println!("round1: {:?}", addr);
2011
2012 addr.addrs
2014 .retain(|addr| !matches!(addr, TransportAddr::Ip(_)));
2015
2016 let conn = client.connect(addr, TEST_ALPN).await?;
2017 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2018 send.write_all(b"Hello, world!").await.anyerr()?;
2019 send.finish().anyerr()?;
2020 let data = recv.read_to_end(1000).await.anyerr()?;
2021 conn.close(0u32.into(), b"bye!");
2022
2023 assert_eq!(&data, b"Hello, world!");
2024
2025 let (new_relay_map, new_relay_url, _guard2) = run_relay_server().await?;
2027 let new_endpoint = new_relay_map
2028 .get(&new_relay_url)
2029 .expect("missing endpoint")
2030 .clone();
2031 dbg!(&new_relay_map);
2032
2033 let addr_watcher = server.watch_addr();
2034
2035 assert!(
2037 server
2038 .insert_relay(new_relay_url.clone(), new_endpoint.clone())
2039 .await
2040 .is_none()
2041 );
2042 assert!(server.remove_relay(&relay_url).await.is_some());
2044
2045 println!("------- changed ----- ");
2046
2047 let mut addr = tokio::time::timeout(Duration::from_secs(10), async move {
2048 let mut stream = addr_watcher.stream();
2049 while let Some(addr) = stream.next().await {
2050 if addr.relay_urls().next() != Some(&relay_url) {
2051 return addr;
2052 }
2053 }
2054 panic!("failed to change relay");
2055 })
2056 .await
2057 .anyerr()?;
2058
2059 println!("round2: {:?}", addr);
2060 assert_eq!(addr.relay_urls().next(), Some(&new_relay_url));
2061
2062 addr.addrs
2064 .retain(|addr| !matches!(addr, TransportAddr::Ip(_)));
2065
2066 let conn = client.connect(addr, TEST_ALPN).await?;
2067 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2068 send.write_all(b"Hello, world!").await.anyerr()?;
2069 send.finish().anyerr()?;
2070 let data = recv.read_to_end(1000).await.anyerr()?;
2071 conn.close(0u32.into(), b"bye!");
2072
2073 task.await.anyerr()??;
2074
2075 client.close().await;
2076 server.close().await;
2077
2078 assert_eq!(&data, b"Hello, world!");
2079
2080 Ok(())
2081 }
2082
2083 #[tokio::test]
2084 #[traced_test]
2085 async fn endpoint_bidi_send_recv() -> Result {
2086 let disco = StaticProvider::new();
2087 let ep1 = Endpoint::empty_builder(RelayMode::Disabled)
2088 .discovery(disco.clone())
2089 .alpns(vec![TEST_ALPN.to_vec()])
2090 .bind()
2091 .await?;
2092
2093 let ep2 = Endpoint::empty_builder(RelayMode::Disabled)
2094 .discovery(disco.clone())
2095 .alpns(vec![TEST_ALPN.to_vec()])
2096 .bind()
2097 .await?;
2098
2099 disco.add_endpoint_info(ep1.addr());
2100 disco.add_endpoint_info(ep2.addr());
2101
2102 let ep1_endpointid = ep1.id();
2103 let ep2_endpointid = ep2.id();
2104 eprintln!("endpoint id 1 {ep1_endpointid}");
2105 eprintln!("endpoint id 2 {ep2_endpointid}");
2106
2107 async fn connect_hello(ep: Endpoint, dst: EndpointId) -> Result {
2108 let conn = ep.connect(dst, TEST_ALPN).await?;
2109 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2110 info!("sending hello");
2111 send.write_all(b"hello").await.anyerr()?;
2112 send.finish().anyerr()?;
2113 info!("receiving world");
2114 let m = recv.read_to_end(100).await.anyerr()?;
2115 assert_eq!(m, b"world");
2116 conn.close(1u8.into(), b"done");
2117 Ok(())
2118 }
2119
2120 async fn accept_world(ep: Endpoint, src: EndpointId) -> Result {
2121 let incoming = ep.accept().await.anyerr()?;
2122 let mut iconn = incoming.accept().anyerr()?;
2123 let alpn = iconn.alpn().await?;
2124 let conn = iconn.await.anyerr()?;
2125 let endpoint_id = conn.remote_id();
2126 assert_eq!(endpoint_id, src);
2127 assert_eq!(alpn, TEST_ALPN);
2128 let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
2129 info!("receiving hello");
2130 let m = recv.read_to_end(100).await.anyerr()?;
2131 assert_eq!(m, b"hello");
2132 info!("sending hello");
2133 send.write_all(b"world").await.anyerr()?;
2134 send.finish().anyerr()?;
2135 match conn.closed().await {
2136 ConnectionError::ApplicationClosed(closed) => {
2137 assert_eq!(closed.error_code, 1u8.into());
2138 Ok(())
2139 }
2140 _ => panic!("wrong close error"),
2141 }
2142 }
2143
2144 let p1_accept = tokio::spawn(accept_world(ep1.clone(), ep2_endpointid).instrument(
2145 info_span!(
2146 "p1_accept",
2147 ep1 = %ep1.id().fmt_short(),
2148 dst = %ep2_endpointid.fmt_short(),
2149 ),
2150 ));
2151 let p2_accept = tokio::spawn(accept_world(ep2.clone(), ep1_endpointid).instrument(
2152 info_span!(
2153 "p2_accept",
2154 ep2 = %ep2.id().fmt_short(),
2155 dst = %ep1_endpointid.fmt_short(),
2156 ),
2157 ));
2158 let p1_connect = tokio::spawn(connect_hello(ep1.clone(), ep2_endpointid).instrument(
2159 info_span!(
2160 "p1_connect",
2161 ep1 = %ep1.id().fmt_short(),
2162 dst = %ep2_endpointid.fmt_short(),
2163 ),
2164 ));
2165 let p2_connect = tokio::spawn(connect_hello(ep2.clone(), ep1_endpointid).instrument(
2166 info_span!(
2167 "p2_connect",
2168 ep2 = %ep2.id().fmt_short(),
2169 dst = %ep1_endpointid.fmt_short(),
2170 ),
2171 ));
2172
2173 p1_accept.await.anyerr()??;
2174 p2_accept.await.anyerr()??;
2175 p1_connect.await.anyerr()??;
2176 p2_connect.await.anyerr()??;
2177
2178 Ok(())
2179 }
2180
2181 #[tokio::test]
2182 #[traced_test]
2183 async fn test_direct_addresses_no_qad_relay() -> Result {
2184 let (relay_map, _, _guard) = run_relay_server_with(false).await.unwrap();
2185
2186 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
2187 .alpns(vec![TEST_ALPN.to_vec()])
2188 .insecure_skip_relay_cert_verify(true)
2189 .bind()
2190 .await?;
2191
2192 assert!(ep.addr().ip_addrs().count() > 0);
2193
2194 Ok(())
2195 }
2196
2197 #[cfg_attr(target_os = "windows", ignore = "flaky")]
2198 #[tokio::test]
2199 #[traced_test]
2200 async fn graceful_close() -> Result {
2201 let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
2202 let server = Endpoint::empty_builder(RelayMode::Disabled)
2203 .alpns(vec![TEST_ALPN.to_vec()])
2204 .bind()
2205 .await?;
2206 let server_addr = server.addr();
2207 let server_task = tokio::spawn(async move {
2208 let incoming = server.accept().await.anyerr()?;
2209 let conn = incoming.await.anyerr()?;
2210 let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
2211 let msg = recv.read_to_end(1_000).await.anyerr()?;
2212 send.write_all(&msg).await.anyerr()?;
2213 send.finish().anyerr()?;
2214 let close_reason = conn.closed().await;
2215 Ok::<_, Error>(close_reason)
2216 });
2217
2218 let conn = client.connect(server_addr, TEST_ALPN).await?;
2219 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2220 send.write_all(b"Hello, world!").await.anyerr()?;
2221 send.finish().anyerr()?;
2222 recv.read_to_end(1_000).await.anyerr()?;
2223 conn.close(42u32.into(), b"thanks, bye!");
2224 client.close().await;
2225
2226 let close_err = server_task.await.anyerr()??;
2227 let ConnectionError::ApplicationClosed(app_close) = close_err else {
2228 panic!("Unexpected close reason: {close_err:?}");
2229 };
2230
2231 assert_eq!(app_close.error_code, 42u32.into());
2232 assert_eq!(app_close.reason.as_ref(), b"thanks, bye!");
2233
2234 Ok(())
2235 }
2236
2237 #[cfg(feature = "metrics")]
2238 #[tokio::test]
2239 #[traced_test]
2240 async fn metrics_smoke() -> Result {
2241 use iroh_metrics::Registry;
2242
2243 let secret_key = SecretKey::from_bytes(&[0u8; 32]);
2244 let client = Endpoint::empty_builder(RelayMode::Disabled)
2245 .secret_key(secret_key)
2246 .bind()
2247 .await?;
2248 let secret_key = SecretKey::from_bytes(&[1u8; 32]);
2249 let server = Endpoint::empty_builder(RelayMode::Disabled)
2250 .secret_key(secret_key)
2251 .alpns(vec![TEST_ALPN.to_vec()])
2252 .bind()
2253 .await?;
2254 let server_addr = server.addr();
2255 let server_task = tokio::task::spawn(async move {
2256 let conn = server.accept().await.anyerr()?.await.anyerr()?;
2257 let mut uni = conn.accept_uni().await.anyerr()?;
2258 uni.read_to_end(10).await.anyerr()?;
2259 drop(conn);
2260 Ok::<_, Error>(server)
2261 });
2262 let conn = client.connect(server_addr, TEST_ALPN).await?;
2263 let mut uni = conn.open_uni().await.anyerr()?;
2264 uni.write_all(b"helloworld").await.anyerr()?;
2265 uni.finish().anyerr()?;
2266 conn.closed().await;
2267 drop(conn);
2268 let server = server_task.await.anyerr()??;
2269
2270 let m = client.metrics();
2271 assert!(m.magicsock.recv_datagrams.get() > 0);
2276
2277 let m = server.metrics();
2278 assert!(m.magicsock.recv_datagrams.get() > 0);
2283
2284 fn register_endpoint(registry: &mut Registry, endpoint: &Endpoint) {
2286 let id = endpoint.id().fmt_short();
2287 let sub_registry = registry.sub_registry_with_label("id", id.to_string());
2288 sub_registry.register_all(endpoint.metrics());
2289 }
2290 let mut registry = Registry::default();
2291 register_endpoint(&mut registry, &client);
2292 register_endpoint(&mut registry, &server);
2293 Ok(())
2297 }
2298
2299 async fn alpn_connection_test(
2302 accept_alpns: Vec<Vec<u8>>,
2303 primary_connect_alpn: &[u8],
2304 secondary_connect_alpns: Vec<Vec<u8>>,
2305 ) -> Result<Vec<u8>> {
2306 let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
2307 let server = Endpoint::empty_builder(RelayMode::Disabled)
2308 .alpns(accept_alpns)
2309 .bind()
2310 .await?;
2311 let server_addr = server.addr();
2312 let server_task = tokio::spawn({
2313 let server = server.clone();
2314 async move {
2315 let incoming = server.accept().await.anyerr()?;
2316 let conn = incoming.await.anyerr()?;
2317 conn.close(0u32.into(), b"bye!");
2318 n0_error::Ok(conn.alpn().to_vec())
2319 }
2320 });
2321
2322 let conn = client
2323 .connect_with_opts(
2324 server_addr,
2325 primary_connect_alpn,
2326 ConnectOptions::new().with_additional_alpns(secondary_connect_alpns),
2327 )
2328 .await?;
2329 let conn = conn.await.anyerr()?;
2330 let client_alpn = conn.alpn();
2331 conn.closed().await;
2332 client.close().await;
2333 server.close().await;
2334
2335 let server_alpn = server_task.await.anyerr()??;
2336
2337 assert_eq!(client_alpn, server_alpn);
2338
2339 Ok(server_alpn.to_vec())
2340 }
2341
2342 #[tokio::test]
2343 #[traced_test]
2344 async fn connect_multiple_alpn_negotiated() -> Result {
2345 const ALPN_ONE: &[u8] = b"alpn/1";
2346 const ALPN_TWO: &[u8] = b"alpn/2";
2347
2348 assert_eq!(
2349 alpn_connection_test(
2350 vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()],
2352 ALPN_TWO,
2353 vec![ALPN_ONE.to_vec()],
2354 )
2355 .await?,
2356 ALPN_TWO.to_vec(),
2357 "accept side prefers version 2 over 1"
2358 );
2359
2360 assert_eq!(
2361 alpn_connection_test(
2362 vec![ALPN_ONE.to_vec()],
2364 ALPN_TWO,
2365 vec![ALPN_ONE.to_vec()],
2366 )
2367 .await?,
2368 ALPN_ONE.to_vec(),
2369 "accept side only supports the old version"
2370 );
2371
2372 assert_eq!(
2373 alpn_connection_test(
2374 vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()],
2375 ALPN_ONE,
2376 vec![ALPN_TWO.to_vec()],
2377 )
2378 .await?,
2379 ALPN_TWO.to_vec(),
2380 "connect side ALPN order doesn't matter"
2381 );
2382
2383 assert_eq!(
2384 alpn_connection_test(vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()], ALPN_ONE, vec![],)
2385 .await?,
2386 ALPN_ONE.to_vec(),
2387 "connect side only supports the old version"
2388 );
2389
2390 Ok(())
2391 }
2392
2393 #[tokio::test]
2394 #[traced_test]
2395 async fn watch_net_report() -> Result {
2396 let endpoint = Endpoint::empty_builder(RelayMode::Staging).bind().await?;
2397
2398 endpoint.net_report().updated().await.anyerr()?;
2400
2401 Ok(())
2402 }
2403
2404 #[tokio::test]
2410 #[traced_test]
2411 async fn connect_multi_time() -> Result {
2412 let n = 32;
2413
2414 const NOOP_ALPN: &[u8] = b"noop";
2415
2416 #[derive(Debug, Clone)]
2417 struct Noop;
2418
2419 impl ProtocolHandler for Noop {
2420 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
2421 connection.closed().await;
2422 Ok(())
2423 }
2424 }
2425
2426 async fn noop_server() -> Result<(Router, EndpointAddr)> {
2427 let endpoint = Endpoint::empty_builder(RelayMode::Disabled)
2428 .bind()
2429 .await
2430 .anyerr()?;
2431 let addr = endpoint.addr();
2432 let router = Router::builder(endpoint).accept(NOOP_ALPN, Noop).spawn();
2433 Ok((router, addr))
2434 }
2435
2436 let routers = stream::iter(0..n)
2437 .map(|_| noop_server())
2438 .buffered_unordered(32)
2439 .collect::<Vec<_>>()
2440 .await
2441 .into_iter()
2442 .collect::<Result<Vec<_>, _>>()
2443 .anyerr()?;
2444
2445 let addrs = routers
2446 .iter()
2447 .map(|(_, addr)| addr.clone())
2448 .collect::<Vec<_>>();
2449 let ids = addrs.iter().map(|addr| addr.id).collect::<Vec<_>>();
2450 let discovery = StaticProvider::from_endpoint_info(addrs);
2451 let endpoint = Endpoint::empty_builder(RelayMode::Disabled)
2452 .discovery(discovery)
2453 .bind()
2454 .await
2455 .anyerr()?;
2456 endpoint.addr();
2460 let t0 = Instant::now();
2461 for id in &ids {
2462 let conn = endpoint.connect(*id, NOOP_ALPN).await?;
2463 conn.close(0u32.into(), b"done");
2464 }
2465 let dt0 = t0.elapsed().as_secs_f64();
2466 let t1 = Instant::now();
2467 for id in &ids {
2468 let conn = endpoint.connect(*id, NOOP_ALPN).await?;
2469 conn.close(0u32.into(), b"done");
2470 }
2471 let dt1 = t1.elapsed().as_secs_f64();
2472
2473 assert!(dt0 / dt1 < 20.0, "First round: {dt0}s, second round {dt1}s");
2474 Ok(())
2475 }
2476}