iroh/
endpoint.rs

1//! The [`Endpoint`] allows establishing connections to other iroh endpoints.
2//!
3//! The [`Endpoint`] is the main API interface to manage a local iroh endpoint.  It allows
4//! connecting to and accepting connections from other endpoints.  See the [module docs] for
5//! more details on how iroh connections work.
6//!
7//! The main items in this module are:
8//!
9//! - [`Endpoint`] to establish iroh connections with other endpoints.
10//! - [`Builder`] to create an [`Endpoint`].
11//!
12//! [module docs]: crate
13
14use std::{
15    net::{SocketAddr, SocketAddrV4, SocketAddrV6},
16    sync::Arc,
17};
18
19use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr};
20use iroh_relay::{RelayConfig, RelayMap};
21use n0_error::{e, ensure, stack_error};
22use n0_watcher::Watcher;
23use tracing::{debug, instrument, trace, warn};
24use url::Url;
25
26#[cfg(feature = "qlog")]
27pub use self::quic::{QlogConfig, QlogFactory, QlogFileFactory};
28pub use self::{
29    connection::{
30        Accept, Accepting, AlpnError, AuthenticationError, Connecting, ConnectingError, Connection,
31        ConnectionInfo, ConnectionState, HandshakeCompleted, Incoming, IncomingZeroRtt,
32        IncomingZeroRttConnection, OutgoingZeroRtt, OutgoingZeroRttConnection,
33        RemoteEndpointIdError, ZeroRttStatus,
34    },
35    hooks::{AfterHandshakeOutcome, BeforeConnectOutcome, EndpointHooks},
36    quic::{
37        AcceptBi, AcceptUni, AckFrequencyConfig, AeadKey, ApplicationClose, Chunk, ClosedStream,
38        ConnectionClose, ConnectionError, ConnectionStats, Controller, ControllerFactory,
39        CryptoError, CryptoServerConfig, ExportKeyingMaterialError, FrameStats, HandshakeTokenKey,
40        IdleTimeout, MtuDiscoveryConfig, OpenBi, OpenUni, PathStats, QuicTransportConfig,
41        ReadDatagram, ReadError, ReadExactError, ReadToEndError, RecvStream, ResetError,
42        RetryError, SendDatagramError, SendStream, ServerConfig, Side, StoppedError, StreamId,
43        TransportError, TransportErrorCode, UdpStats, UnsupportedVersion, VarInt,
44        VarIntBoundsExceeded, WeakConnectionHandle, WriteError, Written,
45    },
46};
47use self::{hooks::EndpointHooksList, presets::Preset};
48#[cfg(wasm_browser)]
49use crate::discovery::pkarr::PkarrResolver;
50#[cfg(not(wasm_browser))]
51use crate::dns::DnsResolver;
52pub use crate::magicsock::{
53    DirectAddr, DirectAddrType, PathInfo,
54    remote_map::{PathInfoList, RemoteInfo, Source, TransportAddrInfo, TransportAddrUsage},
55    transports::TransportConfig,
56};
57use crate::{
58    discovery::{ConcurrentDiscovery, DiscoveryError, DynIntoDiscovery, IntoDiscovery, UserData},
59    magicsock::{self, Handle, RemoteStateActorStoppedError, mapped_addrs::MappedAddr},
60    metrics::EndpointMetrics,
61    net_report::Report,
62    tls::{self, DEFAULT_MAX_TLS_TICKETS},
63};
64
65mod connection;
66pub(crate) mod hooks;
67pub mod presets;
68mod quic;
69
70/// Builder for [`Endpoint`].
71///
72/// By default the endpoint will generate a new random [`SecretKey`], which will result in a
73/// new [`EndpointId`].
74///
75/// To create the [`Endpoint`] call [`Builder::bind`].
76#[derive(Debug)]
77pub struct Builder {
78    secret_key: Option<SecretKey>,
79    alpn_protocols: Vec<Vec<u8>>,
80    transport_config: QuicTransportConfig,
81    keylog: bool,
82    discovery: Vec<Box<dyn DynIntoDiscovery>>,
83    discovery_user_data: Option<UserData>,
84    proxy_url: Option<Url>,
85    #[cfg(not(wasm_browser))]
86    dns_resolver: Option<DnsResolver>,
87    #[cfg(any(test, feature = "test-utils"))]
88    insecure_skip_relay_cert_verify: bool,
89    transports: Vec<TransportConfig>,
90    max_tls_tickets: usize,
91    hooks: EndpointHooksList,
92}
93
94impl From<RelayMode> for Option<TransportConfig> {
95    fn from(mode: RelayMode) -> Self {
96        match mode {
97            RelayMode::Disabled => None,
98            RelayMode::Default => Some(TransportConfig::Relay {
99                relay_map: mode.relay_map(),
100            }),
101            RelayMode::Staging => Some(TransportConfig::Relay {
102                relay_map: mode.relay_map(),
103            }),
104            RelayMode::Custom(relay_map) => Some(TransportConfig::Relay { relay_map }),
105        }
106    }
107}
108
109impl Builder {
110    // The ordering of public methods is reflected directly in the documentation.  This is
111    // roughly ordered by what is most commonly needed by users.
112
113    /// Creates a new [`Builder`] using the given [`Preset`].
114    ///
115    /// See [`presets`] for more.
116    pub fn new<P: Preset>(preset: P) -> Self {
117        Self::empty(RelayMode::Disabled).preset(preset)
118    }
119
120    /// Applies the given [`Preset`].
121    pub fn preset<P: Preset>(mut self, preset: P) -> Self {
122        self = preset.apply(self);
123        self
124    }
125
126    /// Creates an empty builder with no discovery services.
127    pub fn empty(relay_mode: RelayMode) -> Self {
128        let mut transports = vec![
129            #[cfg(not(wasm_browser))]
130            TransportConfig::default_ipv4(),
131            #[cfg(not(wasm_browser))]
132            TransportConfig::default_ipv6(),
133        ];
134        if let Some(relay) = relay_mode.into() {
135            transports.push(relay);
136        }
137        Self {
138            secret_key: Default::default(),
139            alpn_protocols: Default::default(),
140            transport_config: QuicTransportConfig::default(),
141            keylog: Default::default(),
142            discovery: Default::default(),
143            discovery_user_data: Default::default(),
144            proxy_url: None,
145            #[cfg(not(wasm_browser))]
146            dns_resolver: None,
147            #[cfg(any(test, feature = "test-utils"))]
148            insecure_skip_relay_cert_verify: false,
149            max_tls_tickets: DEFAULT_MAX_TLS_TICKETS,
150            transports,
151            hooks: Default::default(),
152        }
153    }
154
155    // # The final constructor that everyone needs.
156
157    /// Binds the magic endpoint.
158    pub async fn bind(self) -> Result<Endpoint, BindError> {
159        let mut rng = rand::rng();
160        let secret_key = self
161            .secret_key
162            .unwrap_or_else(move || SecretKey::generate(&mut rng));
163
164        let static_config = StaticConfig {
165            transport_config: self.transport_config.clone(),
166            tls_config: tls::TlsConfig::new(secret_key.clone(), self.max_tls_tickets),
167            keylog: self.keylog,
168        };
169        let server_config = static_config.create_server_config(self.alpn_protocols);
170
171        #[cfg(not(wasm_browser))]
172        let dns_resolver = self.dns_resolver.unwrap_or_default();
173
174        let metrics = EndpointMetrics::default();
175
176        let msock_opts = magicsock::Options {
177            transports: self.transports,
178            secret_key,
179            discovery_user_data: self.discovery_user_data,
180            proxy_url: self.proxy_url,
181            #[cfg(not(wasm_browser))]
182            dns_resolver,
183            server_config,
184            #[cfg(any(test, feature = "test-utils"))]
185            insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
186            metrics,
187            hooks: self.hooks,
188        };
189
190        let msock = magicsock::MagicSock::spawn(msock_opts).await?;
191        trace!("created magicsock");
192        debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created");
193
194        let ep = Endpoint {
195            msock,
196            static_config: Arc::new(static_config),
197        };
198
199        // Add discovery mechanisms
200        for create_service in self.discovery {
201            let service = create_service.into_discovery(&ep)?;
202            ep.discovery().add_boxed(service);
203        }
204
205        Ok(ep)
206    }
207
208    // # The very common methods everyone basically needs.
209
210    /// Adds an IP transport, binding to the provided IPv4 address.
211    ///
212    /// If you want to remove the default transports, make sure to call `clear_ip` first.
213    ///
214    /// Setting the port to `0` will use a random port.
215    /// If the port specified is already in use, it will fallback to choosing a random port.
216    #[cfg(not(wasm_browser))]
217    pub fn bind_addr_v4(mut self, bind_addr: SocketAddrV4) -> Self {
218        self.transports.push(TransportConfig::Ip {
219            bind_addr: bind_addr.into(),
220        });
221        self
222    }
223
224    /// Adds an IP transport, binding to the provided IPv6 address.
225    ///
226    /// If you want to remove the default transports, make sure to call `clear_ip` first.
227    ///
228    /// Setting the port to `0` will use a random port.
229    /// If the port specified is already in use, it will fallback to choosing a random port.
230    #[cfg(not(wasm_browser))]
231    pub fn bind_addr_v6(mut self, bind_addr: SocketAddrV6) -> Self {
232        self.transports.push(TransportConfig::Ip {
233            bind_addr: bind_addr.into(),
234        });
235        self
236    }
237
238    /// Removes all IP based transports
239    #[cfg(not(wasm_browser))]
240    pub fn clear_ip_transports(mut self) -> Self {
241        self.transports
242            .retain(|t| !matches!(t, TransportConfig::Ip { .. }));
243        self
244    }
245
246    /// Removes all relay based transports
247    pub fn clear_relay_transports(mut self) -> Self {
248        self.transports
249            .retain(|t| !matches!(t, TransportConfig::Relay { .. }));
250        self
251    }
252
253    /// Sets a secret key to authenticate with other peers.
254    ///
255    /// This secret key's public key will be the [`PublicKey`] of this endpoint and thus
256    /// also its [`EndpointId`]
257    ///
258    /// If not set, a new secret key will be generated.
259    ///
260    /// [`PublicKey`]: iroh_base::PublicKey
261    pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
262        self.secret_key = Some(secret_key);
263        self
264    }
265
266    /// Sets the [ALPN] protocols that this endpoint will accept on incoming connections.
267    ///
268    /// Not setting this will still allow creating connections, but to accept incoming
269    /// connections at least one [ALPN] must be set.
270    ///
271    /// [ALPN]: https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation
272    pub fn alpns(mut self, alpn_protocols: Vec<Vec<u8>>) -> Self {
273        self.alpn_protocols = alpn_protocols;
274        self
275    }
276
277    // # Methods for common customisation items.
278
279    /// Sets the relay servers to assist in establishing connectivity.
280    ///
281    /// Relay servers are used to establish initial connection with another iroh endpoint.
282    /// They also perform various functions related to hole punching, see the [crate docs]
283    /// for more details.
284    ///
285    /// By default the [number 0] relay servers are used, see [`RelayMode::Default`].
286    ///
287    /// When using [RelayMode::Custom], the provided `relay_map` must contain at least one
288    /// configured relay endpoint.  If an invalid RelayMap is provided [`bind`]
289    /// will result in an error.
290    ///
291    /// [`bind`]: Builder::bind
292    /// [crate docs]: crate
293    /// [number 0]: https://n0.computer
294    pub fn relay_mode(mut self, relay_mode: RelayMode) -> Self {
295        let transport: Option<_> = relay_mode.into();
296        match transport {
297            Some(transport) => {
298                if let Some(og) = self
299                    .transports
300                    .iter_mut()
301                    .find(|t| matches!(t, TransportConfig::Relay { .. }))
302                {
303                    *og = transport;
304                } else {
305                    self.transports.push(transport);
306                }
307            }
308            None => {
309                self.transports
310                    .retain(|t| !matches!(t, TransportConfig::Relay { .. }));
311            }
312        }
313        self
314    }
315
316    /// Removes all discovery services from the builder.
317    ///
318    /// If no discovery service is set, connecting to an endpoint without providing its
319    /// direct addresses or relay URLs will fail.
320    ///
321    /// See the documentation of the [`crate::discovery::Discovery`] trait for details.
322    pub fn clear_discovery(mut self) -> Self {
323        self.discovery.clear();
324        self
325    }
326
327    /// Adds a discovery mechanism for this endpoint.
328    ///
329    /// The function `discovery`
330    /// will be called on endpoint creation with the configured secret key of
331    /// the endpoint. Discovery services that need to publish information need
332    /// to use this secret key to sign the information.
333    ///
334    /// If you add multiple discovery services, they will be combined using a
335    /// [`crate::discovery::ConcurrentDiscovery`].
336    ///
337    /// If no discovery service is set, connecting to an endpoint without providing its
338    /// direct addresses or relay URLs will fail.
339    ///
340    /// To clear all discovery services, use [`Builder::clear_discovery`].
341    ///
342    /// See the documentation of the [`crate::discovery::Discovery`] trait for details.
343    pub fn discovery(mut self, discovery: impl IntoDiscovery) -> Self {
344        self.discovery.push(Box::new(discovery));
345        self
346    }
347
348    /// Sets the initial user-defined data to be published in discovery services for this node.
349    ///
350    /// When using discovery services, this string of [`UserData`] will be published together
351    /// with the endpoint's addresses and relay URL. When other endpoints discover this endpoint,
352    /// they retrieve the [`UserData`] in addition to the addressing info.
353    ///
354    /// Iroh itself does not interpret the user-defined data in any way, it is purely left
355    /// for applications to parse and use.
356    pub fn user_data_for_discovery(mut self, user_data: UserData) -> Self {
357        self.discovery_user_data = Some(user_data);
358        self
359    }
360
361    // # Methods for more specialist customisation.
362
363    /// Sets a custom [`QuicTransportConfig`] for this endpoint.
364    ///
365    /// The transport config contains parameters governing the QUIC state machine.
366    ///
367    /// If unset, the default config is used. Default values should be suitable for most
368    /// internet applications. Applications protocols which forbid remotely-initiated
369    /// streams should set `max_concurrent_bidi_streams` and `max_concurrent_uni_streams` to
370    /// zero.
371    ///
372    /// Please be aware that changing some settings may have adverse effects on establishing
373    /// and maintaining direct connections.
374    pub fn transport_config(mut self, transport_config: QuicTransportConfig) -> Self {
375        self.transport_config = transport_config;
376        self
377    }
378
379    /// Optionally sets a custom DNS resolver to use for this endpoint.
380    ///
381    /// The DNS resolver is used to resolve relay hostnames, and endpoint addresses if
382    /// [`crate::discovery::dns::DnsDiscovery`] is configured.
383    ///
384    /// By default, a new DNS resolver is created which is configured to use the
385    /// host system's DNS configuration. You can pass a custom instance of [`DnsResolver`]
386    /// here to use a differently configured DNS resolver for this endpoint, or to share
387    /// a [`DnsResolver`] between multiple endpoints.
388    #[cfg(not(wasm_browser))]
389    pub fn dns_resolver(mut self, dns_resolver: DnsResolver) -> Self {
390        self.dns_resolver = Some(dns_resolver);
391        self
392    }
393
394    /// Sets an explicit proxy url to proxy all HTTP(S) traffic through.
395    pub fn proxy_url(mut self, url: Url) -> Self {
396        self.proxy_url.replace(url);
397        self
398    }
399
400    /// Sets the proxy url from the environment, in this order:
401    ///
402    /// - `HTTP_PROXY`
403    /// - `http_proxy`
404    /// - `HTTPS_PROXY`
405    /// - `https_proxy`
406    pub fn proxy_from_env(mut self) -> Self {
407        self.proxy_url = proxy_url_from_env();
408        self
409    }
410
411    /// Enables saving the TLS pre-master key for connections.
412    ///
413    /// This key should normally remain secret but can be useful to debug networking issues
414    /// by decrypting captured traffic.
415    ///
416    /// If *keylog* is `true` then setting the `SSLKEYLOGFILE` environment variable to a
417    /// filename will result in this file being used to log the TLS pre-master keys.
418    pub fn keylog(mut self, keylog: bool) -> Self {
419        self.keylog = keylog;
420        self
421    }
422
423    /// Skip verification of SSL certificates from relay servers
424    ///
425    /// May only be used in tests.
426    #[cfg(any(test, feature = "test-utils"))]
427    pub fn insecure_skip_relay_cert_verify(mut self, skip_verify: bool) -> Self {
428        self.insecure_skip_relay_cert_verify = skip_verify;
429        self
430    }
431
432    /// Set the maximum number of TLS tickets to cache.
433    ///
434    /// Set this to a larger value if you want to do 0rtt connections to a large
435    /// number of clients.
436    ///
437    /// The default is 256, taking about 150 KiB in memory.
438    pub fn max_tls_tickets(mut self, n: usize) -> Self {
439        self.max_tls_tickets = n;
440        self
441    }
442
443    /// Install hooks onto the endpoint.
444    ///
445    /// Endpoint hooks intercept the connection establishment process of an [`Endpoint`].
446    ///
447    /// You can install multiple [`EndpointHooks`] by calling this function multiple times.
448    /// Order matters: hooks are invoked in the order they were installed onto the endpoint
449    /// builder. Once a hook returns reject, further processing
450    /// is aborted and other hooks won't be invoked.
451    ///
452    /// See [`EndpointHooks`] for details on the possible interception points in the connection lifecycle.
453    pub fn hooks(mut self, hooks: impl EndpointHooks + 'static) -> Self {
454        self.hooks.push(hooks);
455        self
456    }
457}
458
459/// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime.
460#[derive(Debug)]
461struct StaticConfig {
462    tls_config: tls::TlsConfig,
463    transport_config: QuicTransportConfig,
464    keylog: bool,
465}
466
467impl StaticConfig {
468    /// Create a [`quinn::ServerConfig`] with the specified ALPN protocols.
469    fn create_server_config(&self, alpn_protocols: Vec<Vec<u8>>) -> ServerConfig {
470        let quic_server_config = self
471            .tls_config
472            .make_server_config(alpn_protocols, self.keylog);
473        let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));
474        server_config.transport_config(self.transport_config.to_arc());
475
476        server_config
477    }
478}
479
480/// Controls an iroh endpoint, establishing connections with other endpoints.
481///
482/// This is the main API interface to create connections to, and accept connections from
483/// other iroh endpoints.  The connections are peer-to-peer and encrypted, a Relay server is
484/// used to make the connections reliable.  See the [crate docs] for a more detailed
485/// overview of iroh.
486///
487/// It is recommended to only create a single instance per application.  This ensures all
488/// the connections made share the same peer-to-peer connections to other iroh endpoints,
489/// while still remaining independent connections.  This will result in more optimal network
490/// behaviour.
491///
492/// The endpoint is created using the [`Builder`], which can be created using
493/// [`Endpoint::builder`].
494///
495/// Once an endpoint exists, new connections are typically created using the
496/// [`Endpoint::connect`] and [`Endpoint::accept`] methods.  Once established, the
497/// [`Connection`] gives access to most [QUIC] features.  Individual streams to send data to
498/// the peer are created using the [`Connection::open_bi`], [`Connection::accept_bi`],
499/// [`Connection::open_uni`] and [`Connection::open_bi`] functions.
500///
501/// Note that due to the light-weight properties of streams a stream will only be accepted
502/// once the initiating peer has sent some data on it.
503///
504/// [QUIC]: https://quicwg.org
505#[derive(Clone, Debug)]
506pub struct Endpoint {
507    /// Handle to the magicsocket/actor
508    pub(crate) msock: Handle,
509    /// Configuration structs for quinn, holds the transport config, certificate setup, secret key etc.
510    static_config: Arc<StaticConfig>,
511}
512
513#[allow(missing_docs)]
514#[stack_error(derive, add_meta, from_sources)]
515#[non_exhaustive]
516#[allow(private_interfaces)]
517pub enum ConnectWithOptsError {
518    #[error("Connecting to ourself is not supported")]
519    SelfConnect,
520    #[error("No addressing information available")]
521    NoAddress { source: DiscoveryError },
522    #[error("Unable to connect to remote")]
523    Quinn {
524        #[error(std_err)]
525        source: quinn_proto::ConnectError,
526    },
527    #[error("Internal consistency error")]
528    InternalConsistencyError {
529        /// Private source type, cannot be created publicly.
530        source: RemoteStateActorStoppedError,
531    },
532    #[error("Connection was rejected locally")]
533    LocallyRejected,
534}
535
536#[allow(missing_docs)]
537#[stack_error(derive, add_meta, from_sources)]
538#[non_exhaustive]
539pub enum ConnectError {
540    #[error(transparent)]
541    Connect { source: ConnectWithOptsError },
542    #[error(transparent)]
543    Connecting { source: ConnectingError },
544    #[error(transparent)]
545    Connection {
546        #[error(std_err)]
547        source: ConnectionError,
548    },
549}
550
551#[allow(missing_docs)]
552#[stack_error(derive, add_meta, from_sources)]
553#[non_exhaustive]
554pub enum BindError {
555    #[error(transparent)]
556    MagicSpawn {
557        source: magicsock::CreateHandleError,
558    },
559    #[error(transparent)]
560    Discovery {
561        source: crate::discovery::IntoDiscoveryError,
562    },
563}
564
565impl Endpoint {
566    // The ordering of public methods is reflected directly in the documentation.  This is
567    // roughly ordered by what is most commonly needed by users, but grouped in similar
568    // items.
569
570    // # Methods relating to construction.
571
572    /// Returns the builder for an [`Endpoint`], with a production configuration.
573    ///
574    /// This uses the [`presets::N0`] as the configuration.
575    pub fn builder() -> Builder {
576        Builder::new(presets::N0)
577    }
578
579    /// Returns the builder for an [`Endpoint`], with an empty configuration.
580    ///
581    /// See [`Builder::empty`] for details.
582    pub fn empty_builder(relay_mode: RelayMode) -> Builder {
583        Builder::empty(relay_mode)
584    }
585
586    /// Constructs a default [`Endpoint`] and binds it immediately.
587    ///
588    /// Uses the [`presets::N0`] as configuration.
589    pub async fn bind() -> Result<Self, BindError> {
590        Self::builder().bind().await
591    }
592
593    /// Sets the list of accepted ALPN protocols.
594    ///
595    /// This will only affect new incoming connections.
596    /// Note that this *overrides* the current list of ALPNs.
597    pub fn set_alpns(&self, alpns: Vec<Vec<u8>>) {
598        let server_config = self.static_config.create_server_config(alpns);
599        self.msock.endpoint().set_server_config(Some(server_config));
600    }
601
602    /// Adds the provided configuration to the [`RelayMap`].
603    ///
604    /// Replacing and returning any existing configuration for [`RelayUrl`].
605    pub async fn insert_relay(
606        &self,
607        relay: RelayUrl,
608        config: Arc<RelayConfig>,
609    ) -> Option<Arc<RelayConfig>> {
610        self.msock.insert_relay(relay, config).await
611    }
612
613    /// Removes the configuration from the [`RelayMap`] for the provided [`RelayUrl`].
614    ///
615    /// Returns any existing configuration.
616    pub async fn remove_relay(&self, relay: &RelayUrl) -> Option<Arc<RelayConfig>> {
617        self.msock.remove_relay(relay).await
618    }
619
620    // # Methods for establishing connectivity.
621
622    /// Connects to a remote [`Endpoint`].
623    ///
624    /// A value that can be converted into an [`EndpointAddr`] is required. This can be either an
625    /// [`EndpointAddr`] or an [`EndpointId`].
626    ///
627    /// The [`EndpointAddr`] must contain the [`EndpointId`] to dial and may also contain a [`RelayUrl`]
628    /// and direct addresses. If direct addresses are provided, they will be used to try and
629    /// establish a direct connection without involving a relay server.
630    ///
631    /// If neither a [`RelayUrl`] or direct addresses are configured in the [`EndpointAddr`] it
632    /// may still be possible a connection can be established.  This depends on which, if any,
633    /// [`crate::discovery::Discovery`] services were configured using [`Builder::discovery`].  The discovery
634    /// service will also be used if the remote endpoint is not reachable on the provided direct
635    /// addresses and there is no [`RelayUrl`].
636    ///
637    /// If addresses or relay servers are neither provided nor can be discovered, the
638    /// connection attempt will fail with an error.
639    ///
640    /// The `alpn`, or application-level protocol identifier, is also required. The remote
641    /// endpoint must support this `alpn`, otherwise the connection attempt will fail with
642    /// an error.
643    ///
644    /// [`RelayUrl`]: crate::RelayUrl
645    pub async fn connect(
646        &self,
647        endpoint_addr: impl Into<EndpointAddr>,
648        alpn: &[u8],
649    ) -> Result<Connection, ConnectError> {
650        let endpoint_addr = endpoint_addr.into();
651        let remote = endpoint_addr.id;
652        let connecting = self
653            .connect_with_opts(endpoint_addr, alpn, Default::default())
654            .await?;
655        let conn = connecting.await?;
656
657        debug!(
658            me = %self.id().fmt_short(),
659            remote = %remote.fmt_short(),
660            alpn = %String::from_utf8_lossy(alpn),
661            "Connection established."
662        );
663        Ok(conn)
664    }
665
666    /// Starts a connection attempt with a remote [`Endpoint`].
667    ///
668    /// Like [`Endpoint::connect`] (see also its docs for general details), but allows for a more
669    /// advanced connection setup with more customization in two aspects:
670    /// 1. The returned future resolves to a [`Connecting`], which can be further processed into
671    ///    a [`Connection`] by awaiting, or alternatively allows connecting with 0-RTT via
672    ///    [`Connecting::into_0rtt`].
673    ///    **Note:** Please read the documentation for `into_0rtt` carefully to assess
674    ///    security concerns.
675    /// 2. The [`TransportConfig`] for the connection can be modified via the provided
676    ///    [`ConnectOptions`].
677    ///    **Note:** Please be aware that changing transport config settings may have adverse effects on
678    ///    establishing and maintaining direct connections.  Carefully test settings you use and
679    ///    consider this currently as still rather experimental.
680    #[instrument(name = "connect", skip_all, fields(
681        me = %self.id().fmt_short(),
682        remote = tracing::field::Empty,
683        alpn = String::from_utf8_lossy(alpn).to_string(),
684    ))]
685    pub async fn connect_with_opts(
686        &self,
687        endpoint_addr: impl Into<EndpointAddr>,
688        alpn: &[u8],
689        options: ConnectOptions,
690    ) -> Result<Connecting, ConnectWithOptsError> {
691        let endpoint_addr: EndpointAddr = endpoint_addr.into();
692        if let BeforeConnectOutcome::Reject =
693            self.msock.hooks.before_connect(&endpoint_addr, alpn).await
694        {
695            return Err(e!(ConnectWithOptsError::LocallyRejected));
696        }
697        let endpoint_id = endpoint_addr.id;
698
699        tracing::Span::current().record("remote", tracing::field::display(endpoint_id.fmt_short()));
700
701        // Connecting to ourselves is not supported.
702        ensure!(endpoint_id != self.id(), ConnectWithOptsError::SelfConnect);
703
704        trace!(
705            dst_endpoint_id = %endpoint_id.fmt_short(),
706            relay_url = ?endpoint_addr.relay_urls().next().cloned(),
707            ip_addresses = ?endpoint_addr.ip_addrs().cloned().collect::<Vec<_>>(),
708            "connecting",
709        );
710
711        let mapped_addr = self.msock.resolve_remote(endpoint_addr).await??;
712
713        let transport_config = options
714            .transport_config
715            .map(|cfg| cfg.to_arc())
716            .unwrap_or(self.static_config.transport_config.to_arc());
717
718        // Start connecting via quinn. This will time out after 10 seconds if no reachable
719        // address is available.
720
721        let client_config = {
722            let mut alpn_protocols = vec![alpn.to_vec()];
723            alpn_protocols.extend(options.additional_alpns);
724            let quic_client_config = self
725                .static_config
726                .tls_config
727                .make_client_config(alpn_protocols, self.static_config.keylog);
728            let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
729            client_config.transport_config(transport_config.clone());
730            client_config
731        };
732
733        let dest_addr = mapped_addr.private_socket_addr();
734        let server_name = &tls::name::encode(endpoint_id);
735        let connect = self
736            .msock
737            .endpoint()
738            .connect_with(client_config, dest_addr, server_name)?;
739
740        Ok(Connecting::new(connect, self.clone(), endpoint_id))
741    }
742
743    /// Accepts an incoming connection on the endpoint.
744    ///
745    /// Only connections with the ALPNs configured in [`Builder::alpns`] will be accepted.
746    /// If multiple ALPNs have been configured the ALPN can be inspected before accepting
747    /// the connection using [`Connecting::alpn`].
748    ///
749    /// The returned future will yield `None` if the endpoint is closed by calling
750    /// [`Endpoint::close`].
751    pub fn accept(&self) -> Accept<'_> {
752        Accept {
753            inner: self.msock.endpoint().accept(),
754            ep: self.clone(),
755        }
756    }
757
758    // # Getter methods for properties of this Endpoint itself.
759
760    /// Returns the secret_key of this endpoint.
761    pub fn secret_key(&self) -> &SecretKey {
762        &self.static_config.tls_config.secret_key
763    }
764
765    /// Returns the endpoint id of this endpoint.
766    ///
767    /// This ID is the unique addressing information of this endpoint and other peers must know
768    /// it to be able to connect to this endpoint.
769    pub fn id(&self) -> EndpointId {
770        self.static_config.tls_config.secret_key.public()
771    }
772
773    /// Returns the current [`EndpointAddr`].
774    /// As long as the endpoint was able to bind to a network interface, some
775    /// local addresses will be available.
776    ///
777    /// The state of other fields depends on the state of networking and connectivity.
778    /// Use the [`Endpoint::online`] method to ensure that the endpoint is considered
779    /// "online" (has contacted a relay server) before calling this method, if you want
780    /// to ensure that the `EndpointAddr` will contain enough information to allow this endpoint
781    /// to be dialable by a remote endpoint over the internet.
782    ///
783    /// You can use the [`Endpoint::watch_addr`] method to get updates when the `EndpointAddr`
784    /// changes.
785    pub fn addr(&self) -> EndpointAddr {
786        self.watch_addr().get()
787    }
788
789    /// Returns a [`Watcher`] for the current [`EndpointAddr`] for this endpoint.
790    ///
791    /// The observed [`EndpointAddr`] will have the current [`RelayUrl`] and direct addresses.
792    ///
793    /// ```no_run
794    /// # async fn wrapper() -> n0_error::Result<()> {
795    /// use iroh::{Endpoint, Watcher};
796    ///
797    /// let endpoint = Endpoint::builder()
798    ///     .alpns(vec![b"my-alpn".to_vec()])
799    ///     .bind()
800    ///     .await?;
801    /// let endpoint_addr = endpoint.watch_addr().get();
802    /// # let _ = endpoint_addr;
803    /// # Ok(())
804    /// # }
805    /// ```
806    ///
807    /// The [`Endpoint::online`] method can be used as a convenience method to
808    /// understand if the endpoint has ever been considered "online". But after
809    /// that initial call to [`Endpoint::online`], to understand if your
810    /// endpoint is no longer able to be connected to by endpoints outside
811    /// of the private or local network, watch for changes in it's [`EndpointAddr`].
812    /// If there are no `addrs`in the [`EndpointAddr`], you may not be dialable by other endpoints
813    /// on the internet.
814    ///
815    ///
816    /// The `EndpointAddr` will change as:
817    /// - network conditions change
818    /// - the endpoint connects to a relay server
819    /// - the endpoint changes its preferred relay server
820    /// - more addresses are discovered for this endpoint
821    ///
822    /// [`RelayUrl`]: crate::RelayUrl
823    #[cfg(not(wasm_browser))]
824    pub fn watch_addr(&self) -> impl n0_watcher::Watcher<Value = EndpointAddr> + use<> {
825        let watch_addrs = self.msock.ip_addrs();
826        let watch_relay = self.msock.home_relay();
827        let endpoint_id = self.id();
828
829        watch_addrs.or(watch_relay).map(move |(addrs, relays)| {
830            EndpointAddr::from_parts(
831                endpoint_id,
832                relays
833                    .into_iter()
834                    .map(TransportAddr::Relay)
835                    .chain(addrs.into_iter().map(|x| TransportAddr::Ip(x.addr))),
836            )
837        })
838    }
839
840    /// Returns a [`Watcher`] for the current [`EndpointAddr`] for this endpoint.
841    ///
842    /// When compiled to Wasm, this function returns a watcher that initializes
843    /// with an [`EndpointAddr`] that only contains a relay URL, but no direct addresses,
844    /// as there are no APIs for directly using sockets in browsers.
845    #[cfg(wasm_browser)]
846    pub fn watch_addr(&self) -> impl n0_watcher::Watcher<Value = EndpointAddr> + use<> {
847        // In browsers, there will never be any direct addresses, so we wait
848        // for the home relay instead. This makes the `EndpointAddr` have *some* way
849        // of connecting to us.
850        let watch_relay = self.msock.home_relay();
851        let endpoint_id = self.id();
852        watch_relay.map(move |mut relays| {
853            EndpointAddr::from_parts(endpoint_id, relays.into_iter().map(TransportAddr::Relay))
854        })
855    }
856
857    /// A convenience method that waits for the endpoint to be considered "online".
858    ///
859    /// This currently means at least one relay server was connected,
860    /// and at least one local IP address is available.
861    /// Event if no relays are configured, this will still wait for a relay connection.
862    ///
863    /// Once this has been resolved once, this will always immediately resolve.
864    ///
865    /// This has no timeout, so if that is needed, you need to wrap it in a
866    /// timeout. We recommend using a timeout close to
867    /// [`crate::net_report::TIMEOUT`], so you can be sure that at least one
868    /// [`crate::net_report::Report`] has been attempted.
869    ///
870    /// To understand if the endpoint has gone back "offline",
871    /// you must use the [`Endpoint::watch_addr`] method, to
872    /// get information on the current relay and direct address information.
873    pub async fn online(&self) {
874        self.msock.home_relay().initialized().await;
875    }
876
877    /// Returns a [`Watcher`] for any net-reports run from this [`Endpoint`].
878    ///
879    /// A `net-report` checks the network conditions of the [`Endpoint`], such as
880    /// whether it is connected to the internet via Ipv4 and/or Ipv6, its NAT
881    /// status, its latency to the relay servers, and its public addresses.
882    ///
883    /// The [`Endpoint`] continuously runs `net-reports` to monitor if network
884    /// conditions have changed. This [`Watcher`] will return the latest result
885    /// of the `net-report`.
886    ///
887    /// When issuing the first call to this method the first report might
888    /// still be underway, in this case the [`Watcher`] might not be initialized
889    /// with [`Some`] value yet.  Once the net-report has been successfully
890    /// run, the [`Watcher`] will always return [`Some`] report immediately, which
891    /// is the most recently run `net-report`.
892    ///
893    /// # Examples
894    ///
895    /// To get the first report use [`Watcher::initialized`]:
896    /// ```no_run
897    /// use iroh::{Endpoint, Watcher as _};
898    ///
899    /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
900    /// # rt.block_on(async move {
901    /// let ep = Endpoint::bind().await.unwrap();
902    /// let _report = ep.net_report().initialized().await;
903    /// # });
904    /// ```
905    #[doc(hidden)]
906    pub fn net_report(&self) -> impl Watcher<Value = Option<Report>> + use<> {
907        self.msock.net_report()
908    }
909
910    /// Returns the local socket addresses on which the underlying sockets are bound.
911    ///
912    /// The [`Endpoint`] always binds on an IPv4 address and also tries to bind on an IPv6
913    /// address if available.
914    pub fn bound_sockets(&self) -> Vec<SocketAddr> {
915        self.msock
916            .local_addr()
917            .into_iter()
918            .filter_map(|addr| addr.into_socket_addr())
919            .collect()
920    }
921
922    // # Methods for less common getters.
923    //
924    // Partially they return things passed into the builder.
925
926    /// Returns the DNS resolver used in this [`Endpoint`].
927    ///
928    /// See [`Builder::dns_resolver`].
929    #[cfg(not(wasm_browser))]
930    pub fn dns_resolver(&self) -> &DnsResolver {
931        self.msock.dns_resolver()
932    }
933
934    /// Returns the discovery mechanism, if configured.
935    ///
936    /// See [`Builder::discovery`].
937    pub fn discovery(&self) -> &ConcurrentDiscovery {
938        self.msock.discovery()
939    }
940
941    /// Returns metrics collected for this endpoint.
942    ///
943    /// The endpoint internally collects various metrics about its operation.
944    /// The returned [`EndpointMetrics`] struct contains all of these metrics.
945    ///
946    /// You can access individual metrics directly by using the public fields:
947    /// ```rust
948    /// # use std::collections::BTreeMap;
949    /// # use iroh::endpoint::Endpoint;
950    /// # async fn wrapper() -> n0_error::Result<()> {
951    /// let endpoint = Endpoint::bind().await?;
952    /// assert_eq!(endpoint.metrics().magicsock.recv_datagrams.get(), 0);
953    /// # Ok(())
954    /// # }
955    /// ```
956    ///
957    /// [`EndpointMetrics`] implements [`MetricsGroupSet`], and each field
958    /// implements [`MetricsGroup`]. These traits provide methods to iterate over
959    /// the groups in the set, and over the individual metrics in each group, without having
960    /// to access each field manually. With these methods, it is straightforward to collect
961    /// all metrics into a map or push their values to a metrics collector.
962    ///
963    /// For example, the following snippet collects all metrics into a map:
964    /// ```rust
965    /// # use std::collections::BTreeMap;
966    /// # use iroh_metrics::{Metric, MetricsGroup, MetricValue, MetricsGroupSet};
967    /// # use iroh::endpoint::Endpoint;
968    /// # async fn wrapper() -> n0_error::Result<()> {
969    /// let endpoint = Endpoint::bind().await?;
970    /// let metrics: BTreeMap<String, MetricValue> = endpoint
971    ///     .metrics()
972    ///     .iter()
973    ///     .map(|(group, metric)| {
974    ///         let name = [group, metric.name()].join(":");
975    ///         (name, metric.value())
976    ///     })
977    ///     .collect();
978    ///
979    /// assert_eq!(metrics["magicsock:recv_datagrams"], MetricValue::Counter(0));
980    /// # Ok(())
981    /// # }
982    /// ```
983    ///
984    /// The metrics can also be encoded into the OpenMetrics text format, as used by Prometheus.
985    /// To do so, use the [`iroh_metrics::Registry`], add the endpoint metrics to the
986    /// registry with [`Registry::register_all`], and encode the metrics to a string with
987    /// [`encode_openmetrics_to_string`]:
988    /// ```rust
989    /// # use iroh_metrics::{Registry, MetricsSource};
990    /// # use iroh::endpoint::Endpoint;
991    /// # async fn wrapper() -> n0_error::Result<()> {
992    /// let endpoint = Endpoint::bind().await?;
993    /// let mut registry = Registry::default();
994    /// registry.register_all(endpoint.metrics());
995    /// let s = registry.encode_openmetrics_to_string()?;
996    /// assert!(s.contains(r#"TYPE magicsock_recv_datagrams counter"#));
997    /// assert!(s.contains(r#"magicsock_recv_datagrams_total 0"#));
998    /// # Ok(())
999    /// # }
1000    /// ```
1001    ///
1002    /// Through a registry, you can also add labels or prefixes to metrics with
1003    /// [`Registry::sub_registry_with_label`] or [`Registry::sub_registry_with_prefix`].
1004    /// Furthermore, [`iroh_metrics::service`] provides functions to easily start services
1005    /// to serve the metrics with a HTTP server, dump them to a file, or push them
1006    /// to a Prometheus gateway.
1007    ///
1008    /// For example, the following snippet launches an HTTP server that serves the metrics in the
1009    /// OpenMetrics text format:
1010    /// ```no_run
1011    /// # use std::{sync::{Arc, RwLock}, time::Duration};
1012    /// # use iroh_metrics::{Registry, MetricsSource};
1013    /// # use iroh::endpoint::Endpoint;
1014    /// # use n0_error::{StackResultExt, StdResultExt};
1015    /// # async fn wrapper() -> n0_error::Result<()> {
1016    /// // Create a registry, wrapped in a read-write lock so that we can register and serve
1017    /// // the metrics independently.
1018    /// let registry = Arc::new(RwLock::new(Registry::default()));
1019    /// // Spawn a task to serve the metrics on an OpenMetrics HTTP endpoint.
1020    /// let metrics_task = tokio::task::spawn({
1021    ///     let registry = registry.clone();
1022    ///     async move {
1023    ///         let addr = "0.0.0.0:9100".parse().unwrap();
1024    ///         iroh_metrics::service::start_metrics_server(addr, registry).await
1025    ///     }
1026    /// });
1027    ///
1028    /// // Spawn an endpoint and add the metrics to the registry.
1029    /// let endpoint = Endpoint::bind().await?;
1030    /// registry.write().unwrap().register_all(endpoint.metrics());
1031    ///
1032    /// // Wait for the metrics server to bind, then fetch the metrics via HTTP.
1033    /// tokio::time::sleep(Duration::from_millis(500));
1034    /// let res = reqwest::get("http://localhost:9100/metrics")
1035    ///     .await
1036    ///     .std_context("get")?
1037    ///     .text()
1038    ///     .await
1039    ///     .std_context("text")?;
1040    ///
1041    /// assert!(res.contains(r#"TYPE magicsock_recv_datagrams counter"#));
1042    /// assert!(res.contains(r#"magicsock_recv_datagrams_total 0"#));
1043    /// # metrics_task.abort();
1044    /// # Ok(())
1045    /// # }
1046    /// ```
1047    ///
1048    /// [`Registry`]: iroh_metrics::Registry
1049    /// [`Registry::register_all`]: iroh_metrics::Registry::register_all
1050    /// [`Registry::sub_registry_with_label`]: iroh_metrics::Registry::sub_registry_with_label
1051    /// [`Registry::sub_registry_with_prefix`]: iroh_metrics::Registry::sub_registry_with_prefix
1052    /// [`encode_openmetrics_to_string`]: iroh_metrics::MetricsSource::encode_openmetrics_to_string
1053    /// [`MetricsGroup`]: iroh_metrics::MetricsGroup
1054    /// [`MetricsGroupSet`]: iroh_metrics::MetricsGroupSet
1055    #[cfg(feature = "metrics")]
1056    pub fn metrics(&self) -> &EndpointMetrics {
1057        &self.msock.metrics
1058    }
1059
1060    /// Returns addressing information about a recently used remote endpoint.
1061    ///
1062    /// The returned [`RemoteInfo`] contains a list of all transport addresses for the remote
1063    /// that we know about. This is a snapshot in time and not a watcher.
1064    ///
1065    /// Returns `None` if the endpoint doesn't have information about the remote.
1066    /// When remote endpoints are no longer used, our endpoint will keep information around
1067    /// for a little while, and then drop it. Afterwards, this will return `None`.
1068    pub async fn remote_info(&self, endpoint_id: EndpointId) -> Option<RemoteInfo> {
1069        self.msock.remote_info(endpoint_id).await
1070    }
1071
1072    // # Methods for less common state updates.
1073
1074    /// Notifies the system of potential network changes.
1075    ///
1076    /// On many systems iroh is able to detect network changes by itself, however
1077    /// some systems like android do not expose this functionality to native code.
1078    /// Android does however provide this functionality to Java code.  This
1079    /// function allows for notifying iroh of any potential network changes like
1080    /// this.
1081    ///
1082    /// Even when the network did not change, or iroh was already able to detect
1083    /// the network change itself, there is no harm in calling this function.
1084    pub async fn network_change(&self) {
1085        self.msock.network_change().await;
1086    }
1087
1088    // # Methods to update internal state.
1089
1090    /// Sets the initial user-defined data to be published in discovery services for this endpoint.
1091    ///
1092    /// If the user-defined data passed to this function is different to the previous one,
1093    /// the endpoint will republish its endpoint info to the configured discovery services.
1094    ///
1095    /// See also [`Builder::user_data_for_discovery`] for setting an initial value when
1096    /// building the endpoint.
1097    pub fn set_user_data_for_discovery(&self, user_data: Option<UserData>) {
1098        self.msock.set_user_data_for_discovery(user_data);
1099    }
1100
1101    // # Methods for terminating the endpoint.
1102
1103    /// Closes the QUIC endpoint and the magic socket.
1104    ///
1105    /// This will close any remaining open [`Connection`]s with an error code
1106    /// of `0` and an empty reason.  Though it is best practice to close those
1107    /// explicitly before with a custom error code and reason.
1108    ///
1109    /// It will then make a best effort to wait for all close notifications to be
1110    /// acknowledged by the peers, re-transmitting them if needed. This ensures the
1111    /// peers are aware of the closed connections instead of having to wait for a timeout
1112    /// on the connection. Once all connections are closed or timed out, the future
1113    /// finishes.
1114    ///
1115    /// The maximum time-out that this future will wait for depends on QUIC transport
1116    /// configurations of non-drained connections at the time of calling, and their current
1117    /// estimates of round trip time. With default parameters and a conservative estimate
1118    /// of round trip time, this call's future should take 3 seconds to resolve in cases of
1119    /// bad connectivity or failed connections. In the usual case, this call's future should
1120    /// return much more quickly.
1121    ///
1122    /// It is highly recommended you *do* wait for this close call to finish, if possible.
1123    /// Not doing so will make connections that were still open while closing the endpoint
1124    /// time out on the remote end. Thus remote ends will assume connections to have failed
1125    /// even if all application data was transmitted successfully.
1126    ///
1127    /// Note: Someone used to closing TCP sockets might wonder why it is necessary to wait
1128    /// for timeouts when closing QUIC endpoints, while they don't have to do this for TCP
1129    /// sockets. This is due to QUIC and its acknowledgments being implemented in user-land,
1130    /// while TCP sockets usually get closed and drained by the operating system in the
1131    /// kernel during the "Time-Wait" period of the TCP socket.
1132    ///
1133    /// Be aware however that the underlying UDP sockets are only closed once all clones of
1134    /// the the respective [`Endpoint`] are dropped.
1135    pub async fn close(&self) {
1136        if self.is_closed() {
1137            return;
1138        }
1139
1140        tracing::debug!("Connections closed");
1141        self.msock.close().await;
1142    }
1143
1144    /// Check if this endpoint is still alive, or already closed.
1145    pub fn is_closed(&self) -> bool {
1146        self.msock.is_closed()
1147    }
1148
1149    // # Remaining private methods
1150
1151    #[cfg(test)]
1152    pub(crate) fn magic_sock(&self) -> Handle {
1153        self.msock.clone()
1154    }
1155    #[cfg(test)]
1156    pub(crate) fn endpoint(&self) -> &quinn::Endpoint {
1157        self.msock.endpoint()
1158    }
1159}
1160
1161/// Options for the [`Endpoint::connect_with_opts`] function.
1162#[derive(Default, Debug, Clone)]
1163pub struct ConnectOptions {
1164    transport_config: Option<QuicTransportConfig>,
1165    additional_alpns: Vec<Vec<u8>>,
1166}
1167
1168impl ConnectOptions {
1169    /// Initializes new connection options.
1170    ///
1171    /// By default, the connection will use the same options
1172    /// as [`Endpoint::connect`], e.g. a default [`TransportConfig`].
1173    pub fn new() -> Self {
1174        Self::default()
1175    }
1176
1177    /// Sets the QUIC transport config options for this connection.
1178    pub fn with_transport_config(mut self, transport_config: QuicTransportConfig) -> Self {
1179        self.transport_config = Some(transport_config);
1180        self
1181    }
1182
1183    /// Sets [ALPN] identifiers that should be signaled as supported on connection, *in
1184    /// addition* to the main [ALPN] identifier used in [`Endpoint::connect_with_opts`].
1185    ///
1186    /// This allows connecting to servers that may only support older versions of your
1187    /// protocol. In this case, you would add the older [ALPN] identifiers with this
1188    /// function.
1189    ///
1190    /// You'll know the final negotiated [ALPN] identifier once your connection was
1191    /// established using [`Connection::alpn`], or even slightly earlier in the
1192    /// handshake by using [`Connecting::alpn`].
1193    /// The negotiated [ALPN] identifier may be any of the [ALPN] identifiers in this
1194    /// list or the main [ALPN] used in [`Endpoint::connect_with_opts`].
1195    ///
1196    /// The [ALPN] identifier order on the connect side doesn't matter, since it's the
1197    /// accept side that determines the protocol.
1198    ///
1199    /// For setting the supported [ALPN] identifiers on the accept side, see the endpoint
1200    /// builder's [`Builder::alpns`] function.
1201    ///
1202    /// [ALPN]: https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation
1203    pub fn with_additional_alpns(mut self, alpns: Vec<Vec<u8>>) -> Self {
1204        self.additional_alpns = alpns;
1205        self
1206    }
1207}
1208
1209/// Read a proxy url from the environment, in this order
1210///
1211/// - `HTTP_PROXY`
1212/// - `http_proxy`
1213/// - `HTTPS_PROXY`
1214/// - `https_proxy`
1215fn proxy_url_from_env() -> Option<Url> {
1216    if let Some(url) = std::env::var("HTTP_PROXY")
1217        .ok()
1218        .and_then(|s| s.parse::<Url>().ok())
1219    {
1220        if is_cgi() {
1221            warn!("HTTP_PROXY environment variable ignored in CGI");
1222        } else {
1223            return Some(url);
1224        }
1225    }
1226    if let Some(url) = std::env::var("http_proxy")
1227        .ok()
1228        .and_then(|s| s.parse::<Url>().ok())
1229    {
1230        return Some(url);
1231    }
1232    if let Some(url) = std::env::var("HTTPS_PROXY")
1233        .ok()
1234        .and_then(|s| s.parse::<Url>().ok())
1235    {
1236        return Some(url);
1237    }
1238    if let Some(url) = std::env::var("https_proxy")
1239        .ok()
1240        .and_then(|s| s.parse::<Url>().ok())
1241    {
1242        return Some(url);
1243    }
1244
1245    None
1246}
1247
1248/// Configuration of the relay servers for an [`Endpoint`].
1249#[derive(Debug, Clone, PartialEq, Eq)]
1250pub enum RelayMode {
1251    /// Disable relay servers completely.
1252    /// This means that neither listening nor dialing relays will be available.
1253    Disabled,
1254    /// Use the default relay map, with production relay servers from n0.
1255    ///
1256    /// See [`crate::defaults::prod`] for the severs used.
1257    Default,
1258    /// Use the staging relay servers from n0.
1259    Staging,
1260    /// Use a custom relay map.
1261    Custom(RelayMap),
1262}
1263
1264impl RelayMode {
1265    /// Returns the relay map for this mode.
1266    pub fn relay_map(&self) -> RelayMap {
1267        match self {
1268            RelayMode::Disabled => RelayMap::empty(),
1269            RelayMode::Default => crate::defaults::prod::default_relay_map(),
1270            RelayMode::Staging => crate::defaults::staging::default_relay_map(),
1271            RelayMode::Custom(relay_map) => relay_map.clone(),
1272        }
1273    }
1274}
1275
1276/// Environment variable to force the use of staging relays.
1277pub const ENV_FORCE_STAGING_RELAYS: &str = "IROH_FORCE_STAGING_RELAYS";
1278
1279/// Returns `true` if the use of staging relays is forced.
1280pub fn force_staging_infra() -> bool {
1281    matches!(std::env::var(ENV_FORCE_STAGING_RELAYS), Ok(value) if !value.is_empty())
1282}
1283
1284/// Returns the default relay mode.
1285///
1286/// If the `IROH_FORCE_STAGING_RELAYS` environment variable is non empty, it will return `RelayMode::Staging`.
1287/// Otherwise, it will return `RelayMode::Default`.
1288pub fn default_relay_mode() -> RelayMode {
1289    // Use staging in testing
1290    match force_staging_infra() {
1291        true => RelayMode::Staging,
1292        false => RelayMode::Default,
1293    }
1294}
1295
1296/// Check if we are being executed in a CGI context.
1297///
1298/// If so, a malicious client can send the `Proxy:` header, and it will
1299/// be in the `HTTP_PROXY` env var. So we don't use it :)
1300fn is_cgi() -> bool {
1301    std::env::var_os("REQUEST_METHOD").is_some()
1302}
1303
1304// TODO: These tests could still be flaky, lets fix that:
1305// https://github.com/n0-computer/iroh/issues/1183
1306#[cfg(test)]
1307mod tests {
1308    use std::{
1309        sync::Arc,
1310        time::{Duration, Instant},
1311    };
1312
1313    use iroh_base::{EndpointAddr, EndpointId, SecretKey, TransportAddr};
1314    use n0_error::{AnyError as Error, Result, StdResultExt};
1315    use n0_future::{BufferedStreamExt, StreamExt, stream, time};
1316    use n0_watcher::Watcher;
1317    use quinn::ConnectionError;
1318    use rand::SeedableRng;
1319    use tokio::sync::oneshot;
1320    use tracing::{Instrument, error_span, info, info_span, instrument};
1321    use tracing_test::traced_test;
1322
1323    use super::Endpoint;
1324    use crate::{
1325        RelayMap, RelayMode,
1326        discovery::static_provider::StaticProvider,
1327        endpoint::{ConnectOptions, Connection},
1328        protocol::{AcceptError, ProtocolHandler, Router},
1329        test_utils::{QlogFileGroup, run_relay_server, run_relay_server_with},
1330    };
1331
1332    const TEST_ALPN: &[u8] = b"n0/iroh/test";
1333
1334    #[tokio::test]
1335    #[traced_test]
1336    async fn test_connect_self() -> Result {
1337        let ep = Endpoint::empty_builder(RelayMode::Disabled)
1338            .alpns(vec![TEST_ALPN.to_vec()])
1339            .bind()
1340            .await
1341            .unwrap();
1342        let my_addr = ep.addr();
1343        let res = ep.connect(my_addr.clone(), TEST_ALPN).await;
1344        assert!(res.is_err());
1345        let err = res.err().unwrap();
1346        assert!(err.to_string().starts_with("Connecting to ourself"));
1347
1348        Ok(())
1349    }
1350
1351    #[tokio::test]
1352    #[traced_test]
1353    async fn endpoint_connect_close() -> Result {
1354        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1355        let (relay_map, relay_url, _guard) = run_relay_server().await?;
1356        let server_secret_key = SecretKey::generate(&mut rng);
1357        let server_peer_id = server_secret_key.public();
1358
1359        let qlog = QlogFileGroup::from_env("endpoint_connect_close");
1360
1361        // Wait for the endpoint to be started to make sure it's up before clients try to connect
1362        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1363            .secret_key(server_secret_key)
1364            .transport_config(qlog.create("server")?)
1365            .alpns(vec![TEST_ALPN.to_vec()])
1366            .insecure_skip_relay_cert_verify(true)
1367            .bind()
1368            .await?;
1369        // Wait for the endpoint to be reachable via relay
1370        ep.online().await;
1371
1372        let server = tokio::spawn(
1373            async move {
1374                info!("accepting connection");
1375                let incoming = ep.accept().await.anyerr()?;
1376                let conn = incoming.await.anyerr()?;
1377                let mut stream = conn.accept_uni().await.anyerr()?;
1378                let mut buf = [0u8; 5];
1379                stream.read_exact(&mut buf).await.anyerr()?;
1380                info!("Accepted 1 stream, received {buf:?}.  Closing now.");
1381                // close the connection
1382                conn.close(7u8.into(), b"bye");
1383
1384                let res = conn.accept_uni().await;
1385                assert_eq!(res.unwrap_err(), quinn::ConnectionError::LocallyClosed);
1386
1387                let res = stream.read_to_end(10).await;
1388                assert_eq!(
1389                    res.unwrap_err(),
1390                    quinn::ReadToEndError::Read(quinn::ReadError::ConnectionLost(
1391                        quinn::ConnectionError::LocallyClosed
1392                    ))
1393                );
1394                info!("server test completed");
1395                Ok::<_, Error>(())
1396            }
1397            .instrument(info_span!("test-server")),
1398        );
1399
1400        let client = tokio::spawn(
1401            async move {
1402                let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1403                    .alpns(vec![TEST_ALPN.to_vec()])
1404                    .insecure_skip_relay_cert_verify(true)
1405                    .transport_config(qlog.create("client")?)
1406                    .bind()
1407                    .await?;
1408                info!("client connecting");
1409                let endpoint_addr = EndpointAddr::new(server_peer_id).with_relay_url(relay_url);
1410                let conn = ep.connect(endpoint_addr, TEST_ALPN).await?;
1411                let mut stream = conn.open_uni().await.anyerr()?;
1412
1413                // First write is accepted by server.  We need this bit of synchronisation
1414                // because if the server closes after simply accepting the connection we can
1415                // not be sure our .open_uni() call would succeed as it may already receive
1416                // the error.
1417                stream.write_all(b"hello").await.anyerr()?;
1418
1419                info!("waiting for closed");
1420                // Remote now closes the connection, we should see an error sometime soon.
1421                let err = conn.closed().await;
1422                let expected_err =
1423                    quinn::ConnectionError::ApplicationClosed(quinn::ApplicationClose {
1424                        error_code: 7u8.into(),
1425                        reason: b"bye".to_vec().into(),
1426                    });
1427                assert_eq!(err, expected_err);
1428
1429                info!("opening new - expect it to fail");
1430                let res = conn.open_uni().await;
1431                assert_eq!(res.unwrap_err(), expected_err);
1432                info!("client test completed");
1433                Ok::<_, Error>(())
1434            }
1435            .instrument(info_span!("test-client")),
1436        );
1437
1438        let (server, client) = tokio::time::timeout(
1439            Duration::from_secs(30),
1440            n0_future::future::zip(server, client),
1441        )
1442        .await
1443        .anyerr()?;
1444        server.anyerr()??;
1445        client.anyerr()??;
1446        Ok(())
1447    }
1448
1449    #[tokio::test]
1450    #[traced_test]
1451    async fn endpoint_relay_connect_loop() -> Result {
1452        let test_start = Instant::now();
1453        let n_clients = 5;
1454        let n_chunks_per_client = 2;
1455        let chunk_size = 10;
1456        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
1457        let (relay_map, relay_url, _relay_guard) = run_relay_server().await.unwrap();
1458        let server_secret_key = SecretKey::generate(&mut rng);
1459        let server_endpoint_id = server_secret_key.public();
1460
1461        // Make sure the server is bound before having clients connect to it:
1462        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1463            .insecure_skip_relay_cert_verify(true)
1464            .secret_key(server_secret_key)
1465            .alpns(vec![TEST_ALPN.to_vec()])
1466            .bind()
1467            .await?;
1468        // Also make sure the server has a working relay connection
1469        ep.online().await;
1470
1471        info!(time = ?test_start.elapsed(), "test setup done");
1472
1473        // The server accepts the connections of the clients sequentially.
1474        let server = tokio::spawn(
1475            async move {
1476                let eps = ep.bound_sockets();
1477
1478                info!(me = %ep.id().fmt_short(), eps = ?eps, "server listening on");
1479                for i in 0..n_clients {
1480                    let round_start = Instant::now();
1481                    info!("[server] round {i}");
1482                    let incoming = ep.accept().await.anyerr()?;
1483                    let conn = incoming.await.anyerr()?;
1484                    let endpoint_id = conn.remote_id();
1485                    info!(%i, peer = %endpoint_id.fmt_short(), "accepted connection");
1486                    let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
1487                    let mut buf = vec![0u8; chunk_size];
1488                    for _i in 0..n_chunks_per_client {
1489                        recv.read_exact(&mut buf).await.anyerr()?;
1490                        send.write_all(&buf).await.anyerr()?;
1491                    }
1492                    send.finish().anyerr()?;
1493                    conn.closed().await; // we're the last to send data, so we wait for the other side to close
1494                    info!(%i, peer = %endpoint_id.fmt_short(), "finished");
1495                    info!("[server] round {i} done in {:?}", round_start.elapsed());
1496                }
1497                Ok::<_, Error>(())
1498            }
1499            .instrument(error_span!("server")),
1500        );
1501
1502        let start = Instant::now();
1503
1504        for i in 0..n_clients {
1505            let round_start = Instant::now();
1506            info!("[client] round {i}");
1507            let client_secret_key = SecretKey::generate(&mut rng);
1508            async {
1509                info!("client binding");
1510                let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1511                    .alpns(vec![TEST_ALPN.to_vec()])
1512                    .insecure_skip_relay_cert_verify(true)
1513                    .secret_key(client_secret_key)
1514                    .bind()
1515                    .await?;
1516                let eps = ep.bound_sockets();
1517
1518                info!(me = %ep.id().fmt_short(), eps=?eps, "client bound");
1519                let endpoint_addr =
1520                    EndpointAddr::new(server_endpoint_id).with_relay_url(relay_url.clone());
1521                info!(to = ?endpoint_addr, "client connecting");
1522                let conn = ep.connect(endpoint_addr, TEST_ALPN).await.anyerr()?;
1523                info!("client connected");
1524                let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
1525
1526                for i in 0..n_chunks_per_client {
1527                    let mut buf = vec![i; chunk_size];
1528                    send.write_all(&buf).await.anyerr()?;
1529                    recv.read_exact(&mut buf).await.anyerr()?;
1530                    assert_eq!(buf, vec![i; chunk_size]);
1531                }
1532                // we're the last to receive data, so we close
1533                conn.close(0u32.into(), b"bye!");
1534                info!("client finished");
1535                ep.close().await;
1536                info!("client closed");
1537                Ok::<_, Error>(())
1538            }
1539            .instrument(error_span!("client", %i))
1540            .await?;
1541            info!("[client] round {i} done in {:?}", round_start.elapsed());
1542        }
1543
1544        server.await.anyerr()??;
1545
1546        // We appear to have seen this being very slow at times.  So ensure we fail if this
1547        // test is too slow.  We're only making two connections transferring very little
1548        // data, this really shouldn't take long.
1549        if start.elapsed() > Duration::from_secs(15) {
1550            panic!("Test too slow, something went wrong");
1551        }
1552
1553        Ok(())
1554    }
1555
1556    #[tokio::test]
1557    #[traced_test]
1558    async fn endpoint_send_relay() -> Result {
1559        let (relay_map, _relay_url, _guard) = run_relay_server().await?;
1560        let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1561            .insecure_skip_relay_cert_verify(true)
1562            .bind()
1563            .await?;
1564        let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1565            .insecure_skip_relay_cert_verify(true)
1566            .alpns(vec![TEST_ALPN.to_vec()])
1567            .bind()
1568            .await?;
1569
1570        let task = tokio::spawn({
1571            let server = server.clone();
1572            async move {
1573                let Some(conn) = server.accept().await else {
1574                    n0_error::bail_any!("Expected an incoming connection");
1575                };
1576                let conn = conn.await.anyerr()?;
1577                let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
1578                let data = recv.read_to_end(1000).await.anyerr()?;
1579                send.write_all(&data).await.anyerr()?;
1580                send.finish().anyerr()?;
1581                conn.closed().await;
1582
1583                Ok::<_, Error>(())
1584            }
1585        });
1586
1587        let addr = server.addr();
1588        let conn = client.connect(addr, TEST_ALPN).await?;
1589        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
1590        send.write_all(b"Hello, world!").await.anyerr()?;
1591        send.finish().anyerr()?;
1592        let data = recv.read_to_end(1000).await.anyerr()?;
1593        conn.close(0u32.into(), b"bye!");
1594
1595        task.await.anyerr()??;
1596
1597        client.close().await;
1598        server.close().await;
1599
1600        assert_eq!(&data, b"Hello, world!");
1601
1602        Ok(())
1603    }
1604
1605    #[tokio::test]
1606    #[traced_test]
1607    async fn endpoint_two_direct_only() -> Result {
1608        // Connect two endpoints on the same network, without a relay server, without
1609        // discovery.
1610        let ep1 = {
1611            let span = info_span!("server");
1612            let _guard = span.enter();
1613            Endpoint::builder()
1614                .alpns(vec![TEST_ALPN.to_vec()])
1615                .relay_mode(RelayMode::Disabled)
1616                .bind()
1617                .await?
1618        };
1619        let ep2 = {
1620            let span = info_span!("client");
1621            let _guard = span.enter();
1622            Endpoint::builder()
1623                .alpns(vec![TEST_ALPN.to_vec()])
1624                .relay_mode(RelayMode::Disabled)
1625                .bind()
1626                .await?
1627        };
1628        let ep1_nodeaddr = ep1.addr();
1629
1630        #[instrument(name = "client", skip_all)]
1631        async fn connect(ep: Endpoint, dst: EndpointAddr) -> Result<quinn::ConnectionError> {
1632            info!(me = %ep.id().fmt_short(), "client starting");
1633            let conn = ep.connect(dst, TEST_ALPN).await?;
1634            let mut send = conn.open_uni().await.anyerr()?;
1635            send.write_all(b"hello").await.anyerr()?;
1636            send.finish().anyerr()?;
1637            Ok(conn.closed().await)
1638        }
1639
1640        #[instrument(name = "server", skip_all)]
1641        async fn accept(ep: Endpoint, src: EndpointId) -> Result {
1642            info!(me = %ep.id().fmt_short(), "server starting");
1643            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1644            let node_id = conn.remote_id();
1645            assert_eq!(node_id, src);
1646            let mut recv = conn.accept_uni().await.anyerr()?;
1647            let msg = recv.read_to_end(100).await.anyerr()?;
1648            assert_eq!(msg, b"hello");
1649            // Dropping the connection closes it just fine.
1650            Ok(())
1651        }
1652
1653        let ep1_accept = tokio::spawn(accept(ep1.clone(), ep2.id()));
1654        let ep2_connect = tokio::spawn(connect(ep2.clone(), ep1_nodeaddr));
1655
1656        ep1_accept.await.anyerr()??;
1657        let conn_closed = dbg!(ep2_connect.await.anyerr()??);
1658        assert!(matches!(
1659            conn_closed,
1660            ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
1661        ));
1662
1663        Ok(())
1664    }
1665
1666    #[tokio::test]
1667    #[traced_test]
1668    async fn endpoint_two_relay_only_becomes_direct() -> Result {
1669        // Connect two endpoints on the same network, via a relay server, without
1670        // discovery.  Wait until there is a direct connection.
1671        let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
1672        let (node_addr_tx, node_addr_rx) = oneshot::channel();
1673        let qlog = Arc::new(QlogFileGroup::from_env("two_relay_only_becomes_direct"));
1674
1675        #[instrument(name = "client", skip_all)]
1676        async fn connect(
1677            relay_map: RelayMap,
1678            node_addr_rx: oneshot::Receiver<EndpointAddr>,
1679            qlog: Arc<QlogFileGroup>,
1680        ) -> Result<quinn::ConnectionError> {
1681            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1682            let secret = SecretKey::generate(&mut rng);
1683            let ep = Endpoint::builder()
1684                .secret_key(secret)
1685                .alpns(vec![TEST_ALPN.to_vec()])
1686                .insecure_skip_relay_cert_verify(true)
1687                .relay_mode(RelayMode::Custom(relay_map))
1688                .transport_config(qlog.create("client")?)
1689                .bind()
1690                .await?;
1691            info!(me = %ep.id().fmt_short(), "client starting");
1692            let dst = node_addr_rx.await.anyerr()?;
1693
1694            info!(me = %ep.id().fmt_short(), "client connecting");
1695            let conn = ep.connect(dst, TEST_ALPN).await?;
1696            let mut send = conn.open_uni().await.anyerr()?;
1697            send.write_all(b"hello").await.anyerr()?;
1698            let mut paths = conn.paths().stream();
1699            info!("Waiting for direct connection");
1700            while let Some(infos) = paths.next().await {
1701                info!(?infos, "new PathInfos");
1702                if infos.iter().any(|info| info.is_ip()) {
1703                    break;
1704                }
1705            }
1706            info!("Have direct connection");
1707            send.write_all(b"close please").await.anyerr()?;
1708            send.finish().anyerr()?;
1709            Ok(conn.closed().await)
1710        }
1711
1712        #[instrument(name = "server", skip_all)]
1713        async fn accept(
1714            relay_map: RelayMap,
1715            node_addr_tx: oneshot::Sender<EndpointAddr>,
1716            qlog: Arc<QlogFileGroup>,
1717        ) -> Result {
1718            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64);
1719            let secret = SecretKey::generate(&mut rng);
1720            let ep = Endpoint::builder()
1721                .secret_key(secret)
1722                .alpns(vec![TEST_ALPN.to_vec()])
1723                .insecure_skip_relay_cert_verify(true)
1724                .transport_config(qlog.create("server")?)
1725                .relay_mode(RelayMode::Custom(relay_map))
1726                .bind()
1727                .await?;
1728            ep.online().await;
1729            let mut node_addr = ep.addr();
1730            node_addr.addrs.retain(|addr| addr.is_relay());
1731            node_addr_tx.send(node_addr).unwrap();
1732
1733            info!(me = %ep.id().fmt_short(), "server starting");
1734            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1735            // let node_id = conn.remote_node_id()?;
1736            // assert_eq!(node_id, src);
1737            let mut recv = conn.accept_uni().await.anyerr()?;
1738            let mut msg = [0u8; 5];
1739            recv.read_exact(&mut msg).await.anyerr()?;
1740            assert_eq!(&msg, b"hello");
1741            info!("received hello");
1742            let msg = recv.read_to_end(100).await.anyerr()?;
1743            assert_eq!(msg, b"close please");
1744            info!("received 'close please'");
1745            // Dropping the connection closes it just fine.
1746            Ok(())
1747        }
1748
1749        let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx, qlog.clone()));
1750        let client_task = tokio::spawn(connect(relay_map, node_addr_rx, qlog));
1751
1752        server_task.await.anyerr()??;
1753        let conn_closed = dbg!(client_task.await.anyerr()??);
1754        assert!(matches!(
1755            conn_closed,
1756            ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
1757        ));
1758
1759        Ok(())
1760    }
1761
1762    #[tokio::test]
1763    #[traced_test]
1764    async fn endpoint_two_relay_only_no_ip() -> Result {
1765        // Connect two endpoints on the same network, via a relay server, without
1766        // discovery.
1767        let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
1768        let (node_addr_tx, node_addr_rx) = oneshot::channel();
1769
1770        #[instrument(name = "client", skip_all)]
1771        async fn connect(
1772            relay_map: RelayMap,
1773            node_addr_rx: oneshot::Receiver<EndpointAddr>,
1774        ) -> Result<quinn::ConnectionError> {
1775            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1776            let secret = SecretKey::generate(&mut rng);
1777            let ep = Endpoint::builder()
1778                .secret_key(secret)
1779                .alpns(vec![TEST_ALPN.to_vec()])
1780                .insecure_skip_relay_cert_verify(true)
1781                .relay_mode(RelayMode::Custom(relay_map))
1782                .clear_ip_transports() // disable direct
1783                .bind()
1784                .await?;
1785            info!(me = %ep.id().fmt_short(), "client starting");
1786            let dst = node_addr_rx.await.anyerr()?;
1787
1788            info!(me = %ep.id().fmt_short(), "client connecting");
1789            let conn = ep.connect(dst, TEST_ALPN).await?;
1790            let mut send = conn.open_uni().await.anyerr()?;
1791            send.write_all(b"hello").await.anyerr()?;
1792            let mut paths = conn.paths().stream();
1793            info!("Waiting for connection");
1794            'outer: while let Some(infos) = paths.next().await {
1795                info!(?infos, "new PathInfos");
1796                for info in infos {
1797                    if info.is_ip() {
1798                        panic!("should not happen: {:?}", info);
1799                    }
1800                    if info.is_relay() {
1801                        break 'outer;
1802                    }
1803                }
1804            }
1805            info!("Have relay connection");
1806            send.write_all(b"close please").await.anyerr()?;
1807            send.finish().anyerr()?;
1808            Ok(conn.closed().await)
1809        }
1810
1811        #[instrument(name = "server", skip_all)]
1812        async fn accept(
1813            relay_map: RelayMap,
1814            node_addr_tx: oneshot::Sender<EndpointAddr>,
1815        ) -> Result {
1816            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64);
1817            let secret = SecretKey::generate(&mut rng);
1818            let ep = Endpoint::builder()
1819                .secret_key(secret)
1820                .alpns(vec![TEST_ALPN.to_vec()])
1821                .insecure_skip_relay_cert_verify(true)
1822                .relay_mode(RelayMode::Custom(relay_map))
1823                .clear_ip_transports()
1824                .bind()
1825                .await?;
1826            ep.online().await;
1827            let node_addr = ep.addr();
1828            node_addr_tx.send(node_addr).unwrap();
1829
1830            info!(me = %ep.id().fmt_short(), "server starting");
1831            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1832            // let node_id = conn.remote_node_id()?;
1833            // assert_eq!(node_id, src);
1834            let mut recv = conn.accept_uni().await.anyerr()?;
1835            let mut msg = [0u8; 5];
1836            recv.read_exact(&mut msg).await.anyerr()?;
1837            assert_eq!(&msg, b"hello");
1838            info!("received hello");
1839            let msg = recv.read_to_end(100).await.anyerr()?;
1840            assert_eq!(msg, b"close please");
1841            info!("received 'close please'");
1842            // Dropping the connection closes it just fine.
1843            Ok(())
1844        }
1845
1846        let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx));
1847        let client_task = tokio::spawn(connect(relay_map, node_addr_rx));
1848
1849        server_task.await.anyerr()??;
1850        let conn_closed = dbg!(client_task.await.anyerr()??);
1851        assert!(matches!(
1852            conn_closed,
1853            ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
1854        ));
1855
1856        Ok(())
1857    }
1858
1859    #[tokio::test]
1860    #[traced_test]
1861    async fn endpoint_two_direct_add_relay() -> Result {
1862        // Connect two endpoints on the same network, without relay server and without
1863        // discovery.  Add a relay connection later.
1864        let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
1865        let (node_addr_tx, node_addr_rx) = oneshot::channel();
1866
1867        #[instrument(name = "client", skip_all)]
1868        async fn connect(
1869            relay_map: RelayMap,
1870            node_addr_rx: oneshot::Receiver<EndpointAddr>,
1871        ) -> Result<()> {
1872            let secret = SecretKey::from([0u8; 32]);
1873            let ep = Endpoint::builder()
1874                .secret_key(secret)
1875                .alpns(vec![TEST_ALPN.to_vec()])
1876                .insecure_skip_relay_cert_verify(true)
1877                .relay_mode(RelayMode::Custom(relay_map))
1878                .bind()
1879                .await?;
1880            info!(me = %ep.id().fmt_short(), "client starting");
1881            let dst = node_addr_rx.await.anyerr()?;
1882
1883            info!(me = %ep.id().fmt_short(), "client connecting");
1884            let conn = ep.connect(dst, TEST_ALPN).await?;
1885            info!(me = %ep.id().fmt_short(), "client connected");
1886
1887            // We should be connected via IP, because it is faster than the relay server.
1888            // TODO: Maybe not panic if this is not true?
1889            let path_info = conn.paths().get();
1890            assert_eq!(path_info.len(), 1);
1891            assert!(path_info.iter().next().unwrap().is_ip());
1892
1893            let mut paths = conn.paths().stream();
1894            time::timeout(Duration::from_secs(5), async move {
1895                while let Some(infos) = paths.next().await {
1896                    info!(?infos, "new PathInfos");
1897                    if infos.iter().any(|info| info.is_relay()) {
1898                        info!("client has a relay path");
1899                        break;
1900                    }
1901                }
1902            })
1903            .await
1904            .anyerr()?;
1905
1906            // wait for the server to signal it has the relay connection
1907            let mut stream = conn.accept_uni().await.anyerr()?;
1908            stream.read_to_end(100).await.anyerr()?;
1909
1910            info!("client closing");
1911            conn.close(0u8.into(), b"");
1912            ep.close().await;
1913            Ok(())
1914        }
1915
1916        #[instrument(name = "server", skip_all)]
1917        async fn accept(
1918            relay_map: RelayMap,
1919            node_addr_tx: oneshot::Sender<EndpointAddr>,
1920        ) -> Result<quinn::ConnectionError> {
1921            let secret = SecretKey::from([1u8; 32]);
1922            let ep = Endpoint::builder()
1923                .secret_key(secret)
1924                .alpns(vec![TEST_ALPN.to_vec()])
1925                .insecure_skip_relay_cert_verify(true)
1926                .relay_mode(RelayMode::Custom(relay_map))
1927                .bind()
1928                .await?;
1929            ep.online().await;
1930            let node_addr = ep.addr();
1931            node_addr_tx.send(node_addr).unwrap();
1932
1933            info!(me = %ep.id().fmt_short(), "server starting");
1934            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1935            info!(me = %ep.id().fmt_short(), "server accepted connection");
1936
1937            // Wait for a relay connection to be added.  Client does all the asserting here,
1938            // we just want to wait so we get to see all the mechanics of the connection
1939            // being added on this side too.
1940            let mut paths = conn.paths().stream();
1941            time::timeout(Duration::from_secs(5), async move {
1942                while let Some(infos) = paths.next().await {
1943                    info!(?infos, "new PathInfos");
1944                    if infos.iter().any(|path| path.is_relay()) {
1945                        info!("server has a relay path");
1946                        break;
1947                    }
1948                }
1949            })
1950            .await
1951            .anyerr()?;
1952
1953            let mut stream = conn.open_uni().await.anyerr()?;
1954            stream.write_all(b"have relay").await.anyerr()?;
1955            stream.finish().anyerr()?;
1956            info!("waiting conn.closed()");
1957
1958            Ok(conn.closed().await)
1959        }
1960
1961        let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx));
1962        let client_task = tokio::spawn(connect(relay_map, node_addr_rx));
1963
1964        client_task.await.anyerr()??;
1965        let conn_closed = dbg!(server_task.await.anyerr()??);
1966        assert!(matches!(
1967            conn_closed,
1968            ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
1969        ));
1970
1971        Ok(())
1972    }
1973
1974    #[tokio::test]
1975    #[traced_test]
1976    async fn endpoint_relay_map_change() -> Result {
1977        let (relay_map, relay_url, _guard1) = run_relay_server().await?;
1978        let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1979            .insecure_skip_relay_cert_verify(true)
1980            .bind()
1981            .await?;
1982        let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1983            .insecure_skip_relay_cert_verify(true)
1984            .alpns(vec![TEST_ALPN.to_vec()])
1985            .bind()
1986            .await?;
1987
1988        let task = tokio::spawn({
1989            let server = server.clone();
1990            async move {
1991                for i in 0..2 {
1992                    println!("accept: round {i}");
1993                    let Some(conn) = server.accept().await else {
1994                        n0_error::bail_any!("Expected an incoming connection");
1995                    };
1996                    let conn = conn.await.anyerr()?;
1997                    let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
1998                    let data = recv.read_to_end(1000).await.anyerr()?;
1999                    send.write_all(&data).await.anyerr()?;
2000                    send.finish().anyerr()?;
2001                    conn.closed().await;
2002                }
2003                Ok::<_, Error>(())
2004            }
2005        });
2006
2007        server.online().await;
2008
2009        let mut addr = server.addr();
2010        println!("round1: {:?}", addr);
2011
2012        // remove direct addrs to force relay usage
2013        addr.addrs
2014            .retain(|addr| !matches!(addr, TransportAddr::Ip(_)));
2015
2016        let conn = client.connect(addr, TEST_ALPN).await?;
2017        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2018        send.write_all(b"Hello, world!").await.anyerr()?;
2019        send.finish().anyerr()?;
2020        let data = recv.read_to_end(1000).await.anyerr()?;
2021        conn.close(0u32.into(), b"bye!");
2022
2023        assert_eq!(&data, b"Hello, world!");
2024
2025        // setup a second relay server
2026        let (new_relay_map, new_relay_url, _guard2) = run_relay_server().await?;
2027        let new_endpoint = new_relay_map
2028            .get(&new_relay_url)
2029            .expect("missing endpoint")
2030            .clone();
2031        dbg!(&new_relay_map);
2032
2033        let addr_watcher = server.watch_addr();
2034
2035        // add new new relay
2036        assert!(
2037            server
2038                .insert_relay(new_relay_url.clone(), new_endpoint.clone())
2039                .await
2040                .is_none()
2041        );
2042        // remove the old relay
2043        assert!(server.remove_relay(&relay_url).await.is_some());
2044
2045        println!("------- changed ----- ");
2046
2047        let mut addr = tokio::time::timeout(Duration::from_secs(10), async move {
2048            let mut stream = addr_watcher.stream();
2049            while let Some(addr) = stream.next().await {
2050                if addr.relay_urls().next() != Some(&relay_url) {
2051                    return addr;
2052                }
2053            }
2054            panic!("failed to change relay");
2055        })
2056        .await
2057        .anyerr()?;
2058
2059        println!("round2: {:?}", addr);
2060        assert_eq!(addr.relay_urls().next(), Some(&new_relay_url));
2061
2062        // remove direct addrs to force relay usage
2063        addr.addrs
2064            .retain(|addr| !matches!(addr, TransportAddr::Ip(_)));
2065
2066        let conn = client.connect(addr, TEST_ALPN).await?;
2067        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2068        send.write_all(b"Hello, world!").await.anyerr()?;
2069        send.finish().anyerr()?;
2070        let data = recv.read_to_end(1000).await.anyerr()?;
2071        conn.close(0u32.into(), b"bye!");
2072
2073        task.await.anyerr()??;
2074
2075        client.close().await;
2076        server.close().await;
2077
2078        assert_eq!(&data, b"Hello, world!");
2079
2080        Ok(())
2081    }
2082
2083    #[tokio::test]
2084    #[traced_test]
2085    async fn endpoint_bidi_send_recv() -> Result {
2086        let disco = StaticProvider::new();
2087        let ep1 = Endpoint::empty_builder(RelayMode::Disabled)
2088            .discovery(disco.clone())
2089            .alpns(vec![TEST_ALPN.to_vec()])
2090            .bind()
2091            .await?;
2092
2093        let ep2 = Endpoint::empty_builder(RelayMode::Disabled)
2094            .discovery(disco.clone())
2095            .alpns(vec![TEST_ALPN.to_vec()])
2096            .bind()
2097            .await?;
2098
2099        disco.add_endpoint_info(ep1.addr());
2100        disco.add_endpoint_info(ep2.addr());
2101
2102        let ep1_endpointid = ep1.id();
2103        let ep2_endpointid = ep2.id();
2104        eprintln!("endpoint id 1 {ep1_endpointid}");
2105        eprintln!("endpoint id 2 {ep2_endpointid}");
2106
2107        async fn connect_hello(ep: Endpoint, dst: EndpointId) -> Result {
2108            let conn = ep.connect(dst, TEST_ALPN).await?;
2109            let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2110            info!("sending hello");
2111            send.write_all(b"hello").await.anyerr()?;
2112            send.finish().anyerr()?;
2113            info!("receiving world");
2114            let m = recv.read_to_end(100).await.anyerr()?;
2115            assert_eq!(m, b"world");
2116            conn.close(1u8.into(), b"done");
2117            Ok(())
2118        }
2119
2120        async fn accept_world(ep: Endpoint, src: EndpointId) -> Result {
2121            let incoming = ep.accept().await.anyerr()?;
2122            let mut iconn = incoming.accept().anyerr()?;
2123            let alpn = iconn.alpn().await?;
2124            let conn = iconn.await.anyerr()?;
2125            let endpoint_id = conn.remote_id();
2126            assert_eq!(endpoint_id, src);
2127            assert_eq!(alpn, TEST_ALPN);
2128            let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
2129            info!("receiving hello");
2130            let m = recv.read_to_end(100).await.anyerr()?;
2131            assert_eq!(m, b"hello");
2132            info!("sending hello");
2133            send.write_all(b"world").await.anyerr()?;
2134            send.finish().anyerr()?;
2135            match conn.closed().await {
2136                ConnectionError::ApplicationClosed(closed) => {
2137                    assert_eq!(closed.error_code, 1u8.into());
2138                    Ok(())
2139                }
2140                _ => panic!("wrong close error"),
2141            }
2142        }
2143
2144        let p1_accept = tokio::spawn(accept_world(ep1.clone(), ep2_endpointid).instrument(
2145            info_span!(
2146                "p1_accept",
2147                ep1 = %ep1.id().fmt_short(),
2148                dst = %ep2_endpointid.fmt_short(),
2149            ),
2150        ));
2151        let p2_accept = tokio::spawn(accept_world(ep2.clone(), ep1_endpointid).instrument(
2152            info_span!(
2153                "p2_accept",
2154                ep2 = %ep2.id().fmt_short(),
2155                dst = %ep1_endpointid.fmt_short(),
2156            ),
2157        ));
2158        let p1_connect = tokio::spawn(connect_hello(ep1.clone(), ep2_endpointid).instrument(
2159            info_span!(
2160                "p1_connect",
2161                ep1 = %ep1.id().fmt_short(),
2162                dst = %ep2_endpointid.fmt_short(),
2163            ),
2164        ));
2165        let p2_connect = tokio::spawn(connect_hello(ep2.clone(), ep1_endpointid).instrument(
2166            info_span!(
2167                "p2_connect",
2168                ep2 = %ep2.id().fmt_short(),
2169                dst = %ep1_endpointid.fmt_short(),
2170            ),
2171        ));
2172
2173        p1_accept.await.anyerr()??;
2174        p2_accept.await.anyerr()??;
2175        p1_connect.await.anyerr()??;
2176        p2_connect.await.anyerr()??;
2177
2178        Ok(())
2179    }
2180
2181    #[tokio::test]
2182    #[traced_test]
2183    async fn test_direct_addresses_no_qad_relay() -> Result {
2184        let (relay_map, _, _guard) = run_relay_server_with(false).await.unwrap();
2185
2186        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
2187            .alpns(vec![TEST_ALPN.to_vec()])
2188            .insecure_skip_relay_cert_verify(true)
2189            .bind()
2190            .await?;
2191
2192        assert!(ep.addr().ip_addrs().count() > 0);
2193
2194        Ok(())
2195    }
2196
2197    #[cfg_attr(target_os = "windows", ignore = "flaky")]
2198    #[tokio::test]
2199    #[traced_test]
2200    async fn graceful_close() -> Result {
2201        let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
2202        let server = Endpoint::empty_builder(RelayMode::Disabled)
2203            .alpns(vec![TEST_ALPN.to_vec()])
2204            .bind()
2205            .await?;
2206        let server_addr = server.addr();
2207        let server_task = tokio::spawn(async move {
2208            let incoming = server.accept().await.anyerr()?;
2209            let conn = incoming.await.anyerr()?;
2210            let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
2211            let msg = recv.read_to_end(1_000).await.anyerr()?;
2212            send.write_all(&msg).await.anyerr()?;
2213            send.finish().anyerr()?;
2214            let close_reason = conn.closed().await;
2215            Ok::<_, Error>(close_reason)
2216        });
2217
2218        let conn = client.connect(server_addr, TEST_ALPN).await?;
2219        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2220        send.write_all(b"Hello, world!").await.anyerr()?;
2221        send.finish().anyerr()?;
2222        recv.read_to_end(1_000).await.anyerr()?;
2223        conn.close(42u32.into(), b"thanks, bye!");
2224        client.close().await;
2225
2226        let close_err = server_task.await.anyerr()??;
2227        let ConnectionError::ApplicationClosed(app_close) = close_err else {
2228            panic!("Unexpected close reason: {close_err:?}");
2229        };
2230
2231        assert_eq!(app_close.error_code, 42u32.into());
2232        assert_eq!(app_close.reason.as_ref(), b"thanks, bye!");
2233
2234        Ok(())
2235    }
2236
2237    #[cfg(feature = "metrics")]
2238    #[tokio::test]
2239    #[traced_test]
2240    async fn metrics_smoke() -> Result {
2241        use iroh_metrics::Registry;
2242
2243        let secret_key = SecretKey::from_bytes(&[0u8; 32]);
2244        let client = Endpoint::empty_builder(RelayMode::Disabled)
2245            .secret_key(secret_key)
2246            .bind()
2247            .await?;
2248        let secret_key = SecretKey::from_bytes(&[1u8; 32]);
2249        let server = Endpoint::empty_builder(RelayMode::Disabled)
2250            .secret_key(secret_key)
2251            .alpns(vec![TEST_ALPN.to_vec()])
2252            .bind()
2253            .await?;
2254        let server_addr = server.addr();
2255        let server_task = tokio::task::spawn(async move {
2256            let conn = server.accept().await.anyerr()?.await.anyerr()?;
2257            let mut uni = conn.accept_uni().await.anyerr()?;
2258            uni.read_to_end(10).await.anyerr()?;
2259            drop(conn);
2260            Ok::<_, Error>(server)
2261        });
2262        let conn = client.connect(server_addr, TEST_ALPN).await?;
2263        let mut uni = conn.open_uni().await.anyerr()?;
2264        uni.write_all(b"helloworld").await.anyerr()?;
2265        uni.finish().anyerr()?;
2266        conn.closed().await;
2267        drop(conn);
2268        let server = server_task.await.anyerr()??;
2269
2270        let m = client.metrics();
2271        // assert_eq!(m.magicsock.num_direct_conns_added.get(), 1);
2272        // assert_eq!(m.magicsock.connection_became_direct.get(), 1);
2273        // assert_eq!(m.magicsock.connection_handshake_success.get(), 1);
2274        // assert_eq!(m.magicsock.endpoints_contacted_directly.get(), 1);
2275        assert!(m.magicsock.recv_datagrams.get() > 0);
2276
2277        let m = server.metrics();
2278        // assert_eq!(m.magicsock.num_direct_conns_added.get(), 1);
2279        // assert_eq!(m.magicsock.connection_became_direct.get(), 1);
2280        // assert_eq!(m.magicsock.endpoints_contacted_directly.get(), 1);
2281        // assert_eq!(m.magicsock.connection_handshake_success.get(), 1);
2282        assert!(m.magicsock.recv_datagrams.get() > 0);
2283
2284        // test openmetrics encoding with labeled subregistries per endpoint
2285        fn register_endpoint(registry: &mut Registry, endpoint: &Endpoint) {
2286            let id = endpoint.id().fmt_short();
2287            let sub_registry = registry.sub_registry_with_label("id", id.to_string());
2288            sub_registry.register_all(endpoint.metrics());
2289        }
2290        let mut registry = Registry::default();
2291        register_endpoint(&mut registry, &client);
2292        register_endpoint(&mut registry, &server);
2293        // let s = registry.encode_openmetrics_to_string().anyerr()?;
2294        // assert!(s.contains(r#"magicsock_endpoints_contacted_directly_total{id="3b6a27bcce"} 1"#));
2295        // assert!(s.contains(r#"magicsock_endpoints_contacted_directly_total{id="8a88e3dd74"} 1"#));
2296        Ok(())
2297    }
2298
2299    /// Configures the accept side to take `accept_alpns` ALPNs, then connects to it with `primary_connect_alpn`
2300    /// with `secondary_connect_alpns` set, and finally returns the negotiated ALPN.
2301    async fn alpn_connection_test(
2302        accept_alpns: Vec<Vec<u8>>,
2303        primary_connect_alpn: &[u8],
2304        secondary_connect_alpns: Vec<Vec<u8>>,
2305    ) -> Result<Vec<u8>> {
2306        let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
2307        let server = Endpoint::empty_builder(RelayMode::Disabled)
2308            .alpns(accept_alpns)
2309            .bind()
2310            .await?;
2311        let server_addr = server.addr();
2312        let server_task = tokio::spawn({
2313            let server = server.clone();
2314            async move {
2315                let incoming = server.accept().await.anyerr()?;
2316                let conn = incoming.await.anyerr()?;
2317                conn.close(0u32.into(), b"bye!");
2318                n0_error::Ok(conn.alpn().to_vec())
2319            }
2320        });
2321
2322        let conn = client
2323            .connect_with_opts(
2324                server_addr,
2325                primary_connect_alpn,
2326                ConnectOptions::new().with_additional_alpns(secondary_connect_alpns),
2327            )
2328            .await?;
2329        let conn = conn.await.anyerr()?;
2330        let client_alpn = conn.alpn();
2331        conn.closed().await;
2332        client.close().await;
2333        server.close().await;
2334
2335        let server_alpn = server_task.await.anyerr()??;
2336
2337        assert_eq!(client_alpn, server_alpn);
2338
2339        Ok(server_alpn.to_vec())
2340    }
2341
2342    #[tokio::test]
2343    #[traced_test]
2344    async fn connect_multiple_alpn_negotiated() -> Result {
2345        const ALPN_ONE: &[u8] = b"alpn/1";
2346        const ALPN_TWO: &[u8] = b"alpn/2";
2347
2348        assert_eq!(
2349            alpn_connection_test(
2350                // Prefer version 2 over version 1 on the accept side
2351                vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()],
2352                ALPN_TWO,
2353                vec![ALPN_ONE.to_vec()],
2354            )
2355            .await?,
2356            ALPN_TWO.to_vec(),
2357            "accept side prefers version 2 over 1"
2358        );
2359
2360        assert_eq!(
2361            alpn_connection_test(
2362                // Only support the old version
2363                vec![ALPN_ONE.to_vec()],
2364                ALPN_TWO,
2365                vec![ALPN_ONE.to_vec()],
2366            )
2367            .await?,
2368            ALPN_ONE.to_vec(),
2369            "accept side only supports the old version"
2370        );
2371
2372        assert_eq!(
2373            alpn_connection_test(
2374                vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()],
2375                ALPN_ONE,
2376                vec![ALPN_TWO.to_vec()],
2377            )
2378            .await?,
2379            ALPN_TWO.to_vec(),
2380            "connect side ALPN order doesn't matter"
2381        );
2382
2383        assert_eq!(
2384            alpn_connection_test(vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()], ALPN_ONE, vec![],)
2385                .await?,
2386            ALPN_ONE.to_vec(),
2387            "connect side only supports the old version"
2388        );
2389
2390        Ok(())
2391    }
2392
2393    #[tokio::test]
2394    #[traced_test]
2395    async fn watch_net_report() -> Result {
2396        let endpoint = Endpoint::empty_builder(RelayMode::Staging).bind().await?;
2397
2398        // can get a first report
2399        endpoint.net_report().updated().await.anyerr()?;
2400
2401        Ok(())
2402    }
2403
2404    /// Tests that initial connection establishment isn't extremely slow compared
2405    /// to subsequent connections.
2406    ///
2407    /// This is a time based test, but uses a very large ratio to reduce flakiness.
2408    /// It also does a number of connections to average out any anomalies.
2409    #[tokio::test]
2410    #[traced_test]
2411    async fn connect_multi_time() -> Result {
2412        let n = 32;
2413
2414        const NOOP_ALPN: &[u8] = b"noop";
2415
2416        #[derive(Debug, Clone)]
2417        struct Noop;
2418
2419        impl ProtocolHandler for Noop {
2420            async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
2421                connection.closed().await;
2422                Ok(())
2423            }
2424        }
2425
2426        async fn noop_server() -> Result<(Router, EndpointAddr)> {
2427            let endpoint = Endpoint::empty_builder(RelayMode::Disabled)
2428                .bind()
2429                .await
2430                .anyerr()?;
2431            let addr = endpoint.addr();
2432            let router = Router::builder(endpoint).accept(NOOP_ALPN, Noop).spawn();
2433            Ok((router, addr))
2434        }
2435
2436        let routers = stream::iter(0..n)
2437            .map(|_| noop_server())
2438            .buffered_unordered(32)
2439            .collect::<Vec<_>>()
2440            .await
2441            .into_iter()
2442            .collect::<Result<Vec<_>, _>>()
2443            .anyerr()?;
2444
2445        let addrs = routers
2446            .iter()
2447            .map(|(_, addr)| addr.clone())
2448            .collect::<Vec<_>>();
2449        let ids = addrs.iter().map(|addr| addr.id).collect::<Vec<_>>();
2450        let discovery = StaticProvider::from_endpoint_info(addrs);
2451        let endpoint = Endpoint::empty_builder(RelayMode::Disabled)
2452            .discovery(discovery)
2453            .bind()
2454            .await
2455            .anyerr()?;
2456        // wait for the endpoint to be initialized. This should not be needed,
2457        // but we don't want to measure endpoint init time but connection time
2458        // from a fully initialized endpoint.
2459        endpoint.addr();
2460        let t0 = Instant::now();
2461        for id in &ids {
2462            let conn = endpoint.connect(*id, NOOP_ALPN).await?;
2463            conn.close(0u32.into(), b"done");
2464        }
2465        let dt0 = t0.elapsed().as_secs_f64();
2466        let t1 = Instant::now();
2467        for id in &ids {
2468            let conn = endpoint.connect(*id, NOOP_ALPN).await?;
2469            conn.close(0u32.into(), b"done");
2470        }
2471        let dt1 = t1.elapsed().as_secs_f64();
2472
2473        assert!(dt0 / dt1 < 20.0, "First round: {dt0}s, second round {dt1}s");
2474        Ok(())
2475    }
2476}