iroh_relay/
client.rs

1//! Exposes [`Client`], which allows to establish connections to a relay server.
2//!
3//! Based on tailscale/derp/derphttp/derphttp_client.go
4
5use std::{
6    net::SocketAddr,
7    pin::Pin,
8    sync::Arc,
9    task::{self, Poll},
10};
11
12use conn::Conn;
13use iroh_base::{RelayUrl, SecretKey};
14use n0_error::{e, stack_error};
15use n0_future::{
16    Sink, Stream,
17    split::{SplitSink, SplitStream, split},
18    time,
19};
20#[cfg(any(test, feature = "test-utils"))]
21use tracing::warn;
22use tracing::{Level, debug, event, trace};
23use url::Url;
24
25pub use self::conn::{RecvError, SendError};
26#[cfg(not(wasm_browser))]
27use crate::dns::{DnsError, DnsResolver};
28use crate::{
29    KeyCache,
30    http::RELAY_PATH,
31    protos::{
32        handshake,
33        relay::{ClientToRelayMsg, RelayToClientMsg},
34    },
35};
36
37pub(crate) mod conn;
38#[cfg(not(wasm_browser))]
39pub(crate) mod streams;
40#[cfg(not(wasm_browser))]
41mod tls;
42#[cfg(not(wasm_browser))]
43mod util;
44
45/// Connection errors.
46///
47/// `ConnectError` contains `DialError`, errors that can occur while dialing the
48/// relay, as well as errors that occur while creating or maintaining a connection.
49#[stack_error(derive, add_meta, from_sources)]
50#[allow(missing_docs)]
51#[non_exhaustive]
52pub enum ConnectError {
53    #[error("Invalid URL for websocket: {url}")]
54    InvalidWebsocketUrl { url: Url },
55    #[error("Invalid relay URL: {url}")]
56    InvalidRelayUrl { url: Url },
57    #[error(transparent)]
58    Websocket {
59        #[cfg(not(wasm_browser))]
60        #[error(std_err)]
61        source: tokio_websockets::Error,
62        #[cfg(wasm_browser)]
63        #[error(std_err)]
64        source: ws_stream_wasm::WsErr,
65    },
66    #[error(transparent)]
67    Handshake {
68        #[error(std_err)]
69        source: handshake::Error,
70    },
71    #[error(transparent)]
72    Dial { source: DialError },
73    #[error("Unexpected status during upgrade: {code}")]
74    UnexpectedUpgradeStatus { code: hyper::StatusCode },
75    #[error("Failed to upgrade response")]
76    Upgrade {
77        #[error(std_err)]
78        source: hyper::Error,
79    },
80    #[error("Invalid TLS servername")]
81    InvalidTlsServername {},
82    #[error("No local address available")]
83    NoLocalAddr {},
84    #[error("tls connection failed")]
85    Tls {
86        #[error(std_err)]
87        source: std::io::Error,
88    },
89    #[cfg(wasm_browser)]
90    #[error("The relay protocol is not available in browsers")]
91    RelayProtoNotAvailable {},
92}
93
94/// Errors that can occur while dialing the relay server.
95#[stack_error(derive, add_meta, from_sources)]
96#[allow(missing_docs)]
97#[non_exhaustive]
98pub enum DialError {
99    #[error("Invalid target port")]
100    InvalidTargetPort {},
101    #[error(transparent)]
102    #[cfg(not(wasm_browser))]
103    Dns { source: DnsError },
104    #[error(transparent)]
105    Timeout {
106        #[error(std_err)]
107        source: time::Elapsed,
108    },
109    #[error(transparent)]
110    Io {
111        #[error(std_err)]
112        source: std::io::Error,
113    },
114    #[error("Invalid URL: {url}")]
115    InvalidUrl { url: Url },
116    #[error("Failed proxy connection: {status}")]
117    ProxyConnectInvalidStatus { status: hyper::StatusCode },
118    #[error("Invalid Proxy URL {proxy_url}")]
119    ProxyInvalidUrl { proxy_url: Url },
120    #[error("failed to establish proxy connection")]
121    ProxyConnect {
122        #[error(std_err)]
123        source: hyper::Error,
124    },
125    #[error("Invalid proxy TLS servername: {proxy_hostname}")]
126    ProxyInvalidTlsServername { proxy_hostname: String },
127    #[error("Invalid proxy target port")]
128    ProxyInvalidTargetPort {},
129}
130
131/// Build a Client.
132#[derive(derive_more::Debug, Clone)]
133pub struct ClientBuilder {
134    /// Default is None
135    #[debug("address family selector callback")]
136    address_family_selector: Option<Arc<dyn Fn() -> bool + Send + Sync>>,
137    /// Server url.
138    url: RelayUrl,
139    /// TLS verification config.
140    tls_config: Option<rustls::ClientConfig>,
141    /// HTTP Proxy
142    proxy_url: Option<Url>,
143    /// The secret key of this client.
144    secret_key: SecretKey,
145    /// The DNS resolver to use.
146    #[cfg(not(wasm_browser))]
147    dns_resolver: DnsResolver,
148    /// Cache for public keys of remote endpoints.
149    key_cache: KeyCache,
150}
151
152impl ClientBuilder {
153    /// Create a new [`ClientBuilder`]
154    pub fn new(
155        url: impl Into<RelayUrl>,
156        secret_key: SecretKey,
157        #[cfg(not(wasm_browser))] dns_resolver: DnsResolver,
158    ) -> Self {
159        ClientBuilder {
160            address_family_selector: None,
161            url: url.into(),
162            tls_config: None,
163            proxy_url: None,
164            secret_key,
165            #[cfg(not(wasm_browser))]
166            dns_resolver,
167            key_cache: KeyCache::new(128),
168        }
169    }
170
171    /// Sets a custom TLS config.
172    pub fn tls_client_config(mut self, tls_config: rustls::ClientConfig) -> Self {
173        self.tls_config = Some(tls_config);
174        self
175    }
176
177    /// Returns if we should prefer ipv6
178    /// it replaces the relayhttp.AddressFamilySelector we pass
179    /// It provides the hint as to whether in an IPv4-vs-IPv6 race that
180    /// IPv4 should be held back a bit to give IPv6 a better-than-50/50
181    /// chance of winning. We only return true when we believe IPv6 will
182    /// work anyway, so we don't artificially delay the connection speed.
183    pub fn address_family_selector<S>(mut self, selector: S) -> Self
184    where
185        S: Fn() -> bool + Send + Sync + 'static,
186    {
187        self.address_family_selector = Some(Arc::new(selector));
188        self
189    }
190
191    /// Set an explicit proxy url to proxy all HTTP(S) traffic through.
192    pub fn proxy_url(mut self, url: Url) -> Self {
193        self.proxy_url.replace(url);
194        self
195    }
196
197    /// Set the capacity of the cache for public keys.
198    pub fn key_cache_capacity(mut self, capacity: usize) -> Self {
199        self.key_cache = KeyCache::new(capacity);
200        self
201    }
202
203    /// Establishes a new connection to the relay server.
204    #[cfg(not(wasm_browser))]
205    pub async fn connect(&self) -> Result<Client, ConnectError> {
206        use http::header::SEC_WEBSOCKET_PROTOCOL;
207        use tls::MaybeTlsStreamBuilder;
208
209        use crate::{
210            http::{CLIENT_AUTH_HEADER, RELAY_PROTOCOL_VERSION},
211            protos::{handshake::KeyMaterialClientAuth, relay::MAX_FRAME_SIZE},
212            tls::{CaRootsConfig, default_provider},
213        };
214
215        let mut dial_url = (*self.url).clone();
216        dial_url.set_path(RELAY_PATH);
217        // The relay URL is exchanged with the http(s) scheme in tickets and similar.
218        // We need to use the ws:// or wss:// schemes when connecting with websockets, though.
219        dial_url
220            .set_scheme(match self.url.scheme() {
221                "http" => "ws",
222                "ws" => "ws",
223                _ => "wss",
224            })
225            .map_err(|_| {
226                e!(ConnectError::InvalidWebsocketUrl {
227                    url: dial_url.clone()
228                })
229            })?;
230
231        debug!(%dial_url, "Dialing relay by websocket");
232
233        let tls_config = match self.tls_config.clone() {
234            Some(config) => config,
235            None => CaRootsConfig::default().client_config(default_provider())?,
236        };
237
238        #[allow(unused_mut)]
239        let mut builder =
240            MaybeTlsStreamBuilder::new(dial_url.clone(), self.dns_resolver.clone(), tls_config)
241                .prefer_ipv6(self.prefer_ipv6())
242                .proxy_url(self.proxy_url.clone());
243
244        let stream = builder.connect().await?;
245        let local_addr = stream
246            .as_ref()
247            .local_addr()
248            .map_err(|_| e!(ConnectError::NoLocalAddr))?;
249        let mut builder = tokio_websockets::ClientBuilder::new()
250            .uri(dial_url.as_str())
251            .map_err(|_| {
252                e!(ConnectError::InvalidRelayUrl {
253                    url: dial_url.clone()
254                })
255            })?
256            .add_header(
257                SEC_WEBSOCKET_PROTOCOL,
258                http::HeaderValue::from_static(RELAY_PROTOCOL_VERSION),
259            )
260            .expect("valid header name and value")
261            .limits(tokio_websockets::Limits::default().max_payload_len(Some(MAX_FRAME_SIZE)))
262            // We turn off automatic flushing after a threshold (the default would be after 8KB).
263            // This means we need to flush manually, which we do by calling `Sink::send_all` or
264            // `Sink::send` (which calls `Sink::flush`) in the `ActiveRelayActor`.
265            .config(tokio_websockets::Config::default().flush_threshold(usize::MAX));
266        if let Some(client_auth) = KeyMaterialClientAuth::new(&self.secret_key, &stream) {
267            debug!("Using TLS key export for relay client authentication");
268            builder = builder
269                .add_header(CLIENT_AUTH_HEADER, client_auth.into_header_value())
270                .expect(
271                    "impossible: CLIENT_AUTH_HEADER isn't a disallowed header value for websockets",
272                );
273        }
274        let (conn, response) = builder.connect_on(stream).await?;
275
276        n0_error::ensure!(
277            response.status() == hyper::StatusCode::SWITCHING_PROTOCOLS,
278            ConnectError::UnexpectedUpgradeStatus {
279                code: response.status()
280            }
281        );
282
283        let conn = Conn::new(conn, self.key_cache.clone(), &self.secret_key).await?;
284
285        event!(
286            target: "iroh::_events::net::relay::connected",
287            Level::DEBUG,
288            url = %self.url,
289        );
290
291        trace!("connect done");
292
293        Ok(Client {
294            conn,
295            local_addr: Some(local_addr),
296        })
297    }
298
299    /// Reports whether IPv4 dials should be slightly
300    /// delayed to give IPv6 a better chance of winning dial races.
301    /// Implementations should only return true if IPv6 is expected
302    /// to succeed. (otherwise delaying IPv4 will delay the connection
303    /// overall)
304    #[cfg(not(wasm_browser))]
305    fn prefer_ipv6(&self) -> bool {
306        match self.address_family_selector {
307            Some(ref selector) => selector(),
308            None => false,
309        }
310    }
311
312    /// Establishes a new connection to the relay server.
313    #[cfg(wasm_browser)]
314    pub async fn connect(&self) -> Result<Client, ConnectError> {
315        use crate::http::RELAY_PROTOCOL_VERSION;
316
317        let mut dial_url = (*self.url).clone();
318        dial_url.set_path(RELAY_PATH);
319        // The relay URL is exchanged with the http(s) scheme in tickets and similar.
320        // We need to use the ws:// or wss:// schemes when connecting with websockets, though.
321        dial_url
322            .set_scheme(match self.url.scheme() {
323                "http" => "ws",
324                "ws" => "ws",
325                _ => "wss",
326            })
327            .map_err(|_| {
328                e!(ConnectError::InvalidWebsocketUrl {
329                    url: dial_url.clone()
330                })
331            })?;
332
333        debug!(%dial_url, "Dialing relay by websocket");
334
335        let (_, ws_stream) =
336            ws_stream_wasm::WsMeta::connect(dial_url.as_str(), Some(vec![RELAY_PROTOCOL_VERSION]))
337                .await?;
338        let conn = Conn::new(ws_stream, self.key_cache.clone(), &self.secret_key).await?;
339
340        event!(
341            target: "iroh::_events::net::relay::connected",
342            Level::DEBUG,
343            url = %self.url,
344        );
345
346        trace!("connect done");
347
348        Ok(Client {
349            conn,
350            local_addr: None,
351        })
352    }
353}
354
355/// A relay client.
356#[derive(Debug)]
357pub struct Client {
358    conn: Conn,
359    local_addr: Option<SocketAddr>,
360}
361
362impl Client {
363    /// Splits the client into a sink and a stream.
364    pub fn split(self) -> (ClientStream, ClientSink) {
365        let (sink, stream) = split(self.conn);
366        (
367            ClientStream {
368                stream,
369                local_addr: self.local_addr,
370            },
371            ClientSink { sink },
372        )
373    }
374}
375
376impl Stream for Client {
377    type Item = Result<RelayToClientMsg, RecvError>;
378
379    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
380        Pin::new(&mut self.conn).poll_next(cx)
381    }
382}
383
384impl Sink<ClientToRelayMsg> for Client {
385    type Error = SendError;
386
387    fn poll_ready(
388        mut self: Pin<&mut Self>,
389        cx: &mut task::Context<'_>,
390    ) -> Poll<Result<(), Self::Error>> {
391        Pin::new(&mut self.conn).poll_ready(cx)
392    }
393
394    fn start_send(mut self: Pin<&mut Self>, item: ClientToRelayMsg) -> Result<(), Self::Error> {
395        Pin::new(&mut self.conn).start_send(item)
396    }
397
398    fn poll_flush(
399        mut self: Pin<&mut Self>,
400        cx: &mut task::Context<'_>,
401    ) -> Poll<Result<(), Self::Error>> {
402        Pin::new(&mut self.conn).poll_flush(cx)
403    }
404
405    fn poll_close(
406        mut self: Pin<&mut Self>,
407        cx: &mut task::Context<'_>,
408    ) -> Poll<Result<(), Self::Error>> {
409        Pin::new(&mut self.conn).poll_close(cx)
410    }
411}
412
413/// The send half of a relay client.
414#[derive(Debug)]
415pub struct ClientSink {
416    sink: SplitSink<Conn, ClientToRelayMsg>,
417}
418
419impl Sink<ClientToRelayMsg> for ClientSink {
420    type Error = SendError;
421
422    fn poll_ready(
423        mut self: Pin<&mut Self>,
424        cx: &mut task::Context<'_>,
425    ) -> Poll<Result<(), Self::Error>> {
426        Pin::new(&mut self.sink).poll_ready(cx)
427    }
428
429    fn start_send(mut self: Pin<&mut Self>, item: ClientToRelayMsg) -> Result<(), Self::Error> {
430        Pin::new(&mut self.sink).start_send(item)
431    }
432
433    fn poll_flush(
434        mut self: Pin<&mut Self>,
435        cx: &mut task::Context<'_>,
436    ) -> Poll<Result<(), Self::Error>> {
437        Pin::new(&mut self.sink).poll_flush(cx)
438    }
439
440    fn poll_close(
441        mut self: Pin<&mut Self>,
442        cx: &mut task::Context<'_>,
443    ) -> Poll<Result<(), Self::Error>> {
444        Pin::new(&mut self.sink).poll_close(cx)
445    }
446}
447
448/// The receive half of a relay client.
449#[derive(Debug)]
450pub struct ClientStream {
451    stream: SplitStream<Conn>,
452    local_addr: Option<SocketAddr>,
453}
454
455impl ClientStream {
456    /// Returns the local address of the client.
457    pub fn local_addr(&self) -> Option<SocketAddr> {
458        self.local_addr
459    }
460}
461
462impl Stream for ClientStream {
463    type Item = Result<RelayToClientMsg, RecvError>;
464
465    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
466        Pin::new(&mut self.stream).poll_next(cx)
467    }
468}
469
470#[cfg(any(test, feature = "test-utils"))]
471/// Creates a client config that trusts any servers without verifying their TLS certificate.
472///
473/// Should be used for testing local relay setups only.
474pub fn make_dangerous_client_config() -> rustls::ClientConfig {
475    warn!(
476        "Insecure config: SSL certificates from relay servers will be trusted without verification"
477    );
478    rustls::client::ClientConfig::builder_with_provider(Arc::new(
479        rustls::crypto::ring::default_provider(),
480    ))
481    .with_protocol_versions(&[&rustls::version::TLS13])
482    .expect("protocols supported by ring")
483    .dangerous()
484    .with_custom_certificate_verifier(Arc::new(NoCertVerifier))
485    .with_no_client_auth()
486}
487
488/// Used to allow self signed certificates in tests
489#[cfg(any(test, feature = "test-utils"))]
490#[derive(Debug)]
491struct NoCertVerifier;
492
493#[cfg(any(test, feature = "test-utils"))]
494impl rustls::client::danger::ServerCertVerifier for NoCertVerifier {
495    fn verify_server_cert(
496        &self,
497        _end_entity: &rustls::pki_types::CertificateDer,
498        _intermediates: &[rustls::pki_types::CertificateDer],
499        _server_name: &rustls::pki_types::ServerName,
500        _ocsp_response: &[u8],
501        _now: rustls::pki_types::UnixTime,
502    ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
503        Ok(rustls::client::danger::ServerCertVerified::assertion())
504    }
505    fn verify_tls12_signature(
506        &self,
507        _message: &[u8],
508        _cert: &rustls::pki_types::CertificateDer<'_>,
509        _dss: &rustls::DigitallySignedStruct,
510    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
511        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
512    }
513
514    fn verify_tls13_signature(
515        &self,
516        _message: &[u8],
517        _cert: &rustls::pki_types::CertificateDer<'_>,
518        _dss: &rustls::DigitallySignedStruct,
519    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
520        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
521    }
522
523    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
524        rustls::crypto::ring::default_provider()
525            .signature_verification_algorithms
526            .supported_schemes()
527    }
528}