1use std::{
6 net::SocketAddr,
7 pin::Pin,
8 sync::Arc,
9 task::{self, Poll},
10};
11
12use conn::Conn;
13use iroh_base::{RelayUrl, SecretKey};
14use n0_error::{e, stack_error};
15use n0_future::{
16 Sink, Stream,
17 split::{SplitSink, SplitStream, split},
18 time,
19};
20#[cfg(any(test, feature = "test-utils"))]
21use tracing::warn;
22use tracing::{Level, debug, event, trace};
23use url::Url;
24
25pub use self::conn::{RecvError, SendError};
26#[cfg(not(wasm_browser))]
27use crate::dns::{DnsError, DnsResolver};
28use crate::{
29 KeyCache,
30 http::RELAY_PATH,
31 protos::{
32 handshake,
33 relay::{ClientToRelayMsg, RelayToClientMsg},
34 },
35};
36
37pub(crate) mod conn;
38#[cfg(not(wasm_browser))]
39pub(crate) mod streams;
40#[cfg(not(wasm_browser))]
41mod tls;
42#[cfg(not(wasm_browser))]
43mod util;
44
45#[stack_error(derive, add_meta, from_sources)]
50#[allow(missing_docs)]
51#[non_exhaustive]
52pub enum ConnectError {
53 #[error("Invalid URL for websocket: {url}")]
54 InvalidWebsocketUrl { url: Url },
55 #[error("Invalid relay URL: {url}")]
56 InvalidRelayUrl { url: Url },
57 #[error(transparent)]
58 Websocket {
59 #[cfg(not(wasm_browser))]
60 #[error(std_err)]
61 source: tokio_websockets::Error,
62 #[cfg(wasm_browser)]
63 #[error(std_err)]
64 source: ws_stream_wasm::WsErr,
65 },
66 #[error(transparent)]
67 Handshake {
68 #[error(std_err)]
69 source: handshake::Error,
70 },
71 #[error(transparent)]
72 Dial { source: DialError },
73 #[error("Unexpected status during upgrade: {code}")]
74 UnexpectedUpgradeStatus { code: hyper::StatusCode },
75 #[error("Failed to upgrade response")]
76 Upgrade {
77 #[error(std_err)]
78 source: hyper::Error,
79 },
80 #[error("Invalid TLS servername")]
81 InvalidTlsServername {},
82 #[error("No local address available")]
83 NoLocalAddr {},
84 #[error("tls connection failed")]
85 Tls {
86 #[error(std_err)]
87 source: std::io::Error,
88 },
89 #[cfg(wasm_browser)]
90 #[error("The relay protocol is not available in browsers")]
91 RelayProtoNotAvailable {},
92}
93
94#[stack_error(derive, add_meta, from_sources)]
96#[allow(missing_docs)]
97#[non_exhaustive]
98pub enum DialError {
99 #[error("Invalid target port")]
100 InvalidTargetPort {},
101 #[error(transparent)]
102 #[cfg(not(wasm_browser))]
103 Dns { source: DnsError },
104 #[error(transparent)]
105 Timeout {
106 #[error(std_err)]
107 source: time::Elapsed,
108 },
109 #[error(transparent)]
110 Io {
111 #[error(std_err)]
112 source: std::io::Error,
113 },
114 #[error("Invalid URL: {url}")]
115 InvalidUrl { url: Url },
116 #[error("Failed proxy connection: {status}")]
117 ProxyConnectInvalidStatus { status: hyper::StatusCode },
118 #[error("Invalid Proxy URL {proxy_url}")]
119 ProxyInvalidUrl { proxy_url: Url },
120 #[error("failed to establish proxy connection")]
121 ProxyConnect {
122 #[error(std_err)]
123 source: hyper::Error,
124 },
125 #[error("Invalid proxy TLS servername: {proxy_hostname}")]
126 ProxyInvalidTlsServername { proxy_hostname: String },
127 #[error("Invalid proxy target port")]
128 ProxyInvalidTargetPort {},
129}
130
131#[derive(derive_more::Debug, Clone)]
133pub struct ClientBuilder {
134 #[debug("address family selector callback")]
136 address_family_selector: Option<Arc<dyn Fn() -> bool + Send + Sync>>,
137 url: RelayUrl,
139 tls_config: Option<rustls::ClientConfig>,
141 proxy_url: Option<Url>,
143 secret_key: SecretKey,
145 #[cfg(not(wasm_browser))]
147 dns_resolver: DnsResolver,
148 key_cache: KeyCache,
150}
151
152impl ClientBuilder {
153 pub fn new(
155 url: impl Into<RelayUrl>,
156 secret_key: SecretKey,
157 #[cfg(not(wasm_browser))] dns_resolver: DnsResolver,
158 ) -> Self {
159 ClientBuilder {
160 address_family_selector: None,
161 url: url.into(),
162 tls_config: None,
163 proxy_url: None,
164 secret_key,
165 #[cfg(not(wasm_browser))]
166 dns_resolver,
167 key_cache: KeyCache::new(128),
168 }
169 }
170
171 pub fn tls_client_config(mut self, tls_config: rustls::ClientConfig) -> Self {
173 self.tls_config = Some(tls_config);
174 self
175 }
176
177 pub fn address_family_selector<S>(mut self, selector: S) -> Self
184 where
185 S: Fn() -> bool + Send + Sync + 'static,
186 {
187 self.address_family_selector = Some(Arc::new(selector));
188 self
189 }
190
191 pub fn proxy_url(mut self, url: Url) -> Self {
193 self.proxy_url.replace(url);
194 self
195 }
196
197 pub fn key_cache_capacity(mut self, capacity: usize) -> Self {
199 self.key_cache = KeyCache::new(capacity);
200 self
201 }
202
203 #[cfg(not(wasm_browser))]
205 pub async fn connect(&self) -> Result<Client, ConnectError> {
206 use http::header::SEC_WEBSOCKET_PROTOCOL;
207 use tls::MaybeTlsStreamBuilder;
208
209 use crate::{
210 http::{CLIENT_AUTH_HEADER, RELAY_PROTOCOL_VERSION},
211 protos::{handshake::KeyMaterialClientAuth, relay::MAX_FRAME_SIZE},
212 tls::{CaRootsConfig, default_provider},
213 };
214
215 let mut dial_url = (*self.url).clone();
216 dial_url.set_path(RELAY_PATH);
217 dial_url
220 .set_scheme(match self.url.scheme() {
221 "http" => "ws",
222 "ws" => "ws",
223 _ => "wss",
224 })
225 .map_err(|_| {
226 e!(ConnectError::InvalidWebsocketUrl {
227 url: dial_url.clone()
228 })
229 })?;
230
231 debug!(%dial_url, "Dialing relay by websocket");
232
233 let tls_config = match self.tls_config.clone() {
234 Some(config) => config,
235 None => CaRootsConfig::default().client_config(default_provider())?,
236 };
237
238 #[allow(unused_mut)]
239 let mut builder =
240 MaybeTlsStreamBuilder::new(dial_url.clone(), self.dns_resolver.clone(), tls_config)
241 .prefer_ipv6(self.prefer_ipv6())
242 .proxy_url(self.proxy_url.clone());
243
244 let stream = builder.connect().await?;
245 let local_addr = stream
246 .as_ref()
247 .local_addr()
248 .map_err(|_| e!(ConnectError::NoLocalAddr))?;
249 let mut builder = tokio_websockets::ClientBuilder::new()
250 .uri(dial_url.as_str())
251 .map_err(|_| {
252 e!(ConnectError::InvalidRelayUrl {
253 url: dial_url.clone()
254 })
255 })?
256 .add_header(
257 SEC_WEBSOCKET_PROTOCOL,
258 http::HeaderValue::from_static(RELAY_PROTOCOL_VERSION),
259 )
260 .expect("valid header name and value")
261 .limits(tokio_websockets::Limits::default().max_payload_len(Some(MAX_FRAME_SIZE)))
262 .config(tokio_websockets::Config::default().flush_threshold(usize::MAX));
266 if let Some(client_auth) = KeyMaterialClientAuth::new(&self.secret_key, &stream) {
267 debug!("Using TLS key export for relay client authentication");
268 builder = builder
269 .add_header(CLIENT_AUTH_HEADER, client_auth.into_header_value())
270 .expect(
271 "impossible: CLIENT_AUTH_HEADER isn't a disallowed header value for websockets",
272 );
273 }
274 let (conn, response) = builder.connect_on(stream).await?;
275
276 n0_error::ensure!(
277 response.status() == hyper::StatusCode::SWITCHING_PROTOCOLS,
278 ConnectError::UnexpectedUpgradeStatus {
279 code: response.status()
280 }
281 );
282
283 let conn = Conn::new(conn, self.key_cache.clone(), &self.secret_key).await?;
284
285 event!(
286 target: "iroh::_events::net::relay::connected",
287 Level::DEBUG,
288 url = %self.url,
289 );
290
291 trace!("connect done");
292
293 Ok(Client {
294 conn,
295 local_addr: Some(local_addr),
296 })
297 }
298
299 #[cfg(not(wasm_browser))]
305 fn prefer_ipv6(&self) -> bool {
306 match self.address_family_selector {
307 Some(ref selector) => selector(),
308 None => false,
309 }
310 }
311
312 #[cfg(wasm_browser)]
314 pub async fn connect(&self) -> Result<Client, ConnectError> {
315 use crate::http::RELAY_PROTOCOL_VERSION;
316
317 let mut dial_url = (*self.url).clone();
318 dial_url.set_path(RELAY_PATH);
319 dial_url
322 .set_scheme(match self.url.scheme() {
323 "http" => "ws",
324 "ws" => "ws",
325 _ => "wss",
326 })
327 .map_err(|_| {
328 e!(ConnectError::InvalidWebsocketUrl {
329 url: dial_url.clone()
330 })
331 })?;
332
333 debug!(%dial_url, "Dialing relay by websocket");
334
335 let (_, ws_stream) =
336 ws_stream_wasm::WsMeta::connect(dial_url.as_str(), Some(vec![RELAY_PROTOCOL_VERSION]))
337 .await?;
338 let conn = Conn::new(ws_stream, self.key_cache.clone(), &self.secret_key).await?;
339
340 event!(
341 target: "iroh::_events::net::relay::connected",
342 Level::DEBUG,
343 url = %self.url,
344 );
345
346 trace!("connect done");
347
348 Ok(Client {
349 conn,
350 local_addr: None,
351 })
352 }
353}
354
355#[derive(Debug)]
357pub struct Client {
358 conn: Conn,
359 local_addr: Option<SocketAddr>,
360}
361
362impl Client {
363 pub fn split(self) -> (ClientStream, ClientSink) {
365 let (sink, stream) = split(self.conn);
366 (
367 ClientStream {
368 stream,
369 local_addr: self.local_addr,
370 },
371 ClientSink { sink },
372 )
373 }
374}
375
376impl Stream for Client {
377 type Item = Result<RelayToClientMsg, RecvError>;
378
379 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
380 Pin::new(&mut self.conn).poll_next(cx)
381 }
382}
383
384impl Sink<ClientToRelayMsg> for Client {
385 type Error = SendError;
386
387 fn poll_ready(
388 mut self: Pin<&mut Self>,
389 cx: &mut task::Context<'_>,
390 ) -> Poll<Result<(), Self::Error>> {
391 Pin::new(&mut self.conn).poll_ready(cx)
392 }
393
394 fn start_send(mut self: Pin<&mut Self>, item: ClientToRelayMsg) -> Result<(), Self::Error> {
395 Pin::new(&mut self.conn).start_send(item)
396 }
397
398 fn poll_flush(
399 mut self: Pin<&mut Self>,
400 cx: &mut task::Context<'_>,
401 ) -> Poll<Result<(), Self::Error>> {
402 Pin::new(&mut self.conn).poll_flush(cx)
403 }
404
405 fn poll_close(
406 mut self: Pin<&mut Self>,
407 cx: &mut task::Context<'_>,
408 ) -> Poll<Result<(), Self::Error>> {
409 Pin::new(&mut self.conn).poll_close(cx)
410 }
411}
412
413#[derive(Debug)]
415pub struct ClientSink {
416 sink: SplitSink<Conn, ClientToRelayMsg>,
417}
418
419impl Sink<ClientToRelayMsg> for ClientSink {
420 type Error = SendError;
421
422 fn poll_ready(
423 mut self: Pin<&mut Self>,
424 cx: &mut task::Context<'_>,
425 ) -> Poll<Result<(), Self::Error>> {
426 Pin::new(&mut self.sink).poll_ready(cx)
427 }
428
429 fn start_send(mut self: Pin<&mut Self>, item: ClientToRelayMsg) -> Result<(), Self::Error> {
430 Pin::new(&mut self.sink).start_send(item)
431 }
432
433 fn poll_flush(
434 mut self: Pin<&mut Self>,
435 cx: &mut task::Context<'_>,
436 ) -> Poll<Result<(), Self::Error>> {
437 Pin::new(&mut self.sink).poll_flush(cx)
438 }
439
440 fn poll_close(
441 mut self: Pin<&mut Self>,
442 cx: &mut task::Context<'_>,
443 ) -> Poll<Result<(), Self::Error>> {
444 Pin::new(&mut self.sink).poll_close(cx)
445 }
446}
447
448#[derive(Debug)]
450pub struct ClientStream {
451 stream: SplitStream<Conn>,
452 local_addr: Option<SocketAddr>,
453}
454
455impl ClientStream {
456 pub fn local_addr(&self) -> Option<SocketAddr> {
458 self.local_addr
459 }
460}
461
462impl Stream for ClientStream {
463 type Item = Result<RelayToClientMsg, RecvError>;
464
465 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
466 Pin::new(&mut self.stream).poll_next(cx)
467 }
468}
469
470#[cfg(any(test, feature = "test-utils"))]
471pub fn make_dangerous_client_config() -> rustls::ClientConfig {
475 warn!(
476 "Insecure config: SSL certificates from relay servers will be trusted without verification"
477 );
478 rustls::client::ClientConfig::builder_with_provider(Arc::new(
479 rustls::crypto::ring::default_provider(),
480 ))
481 .with_protocol_versions(&[&rustls::version::TLS13])
482 .expect("protocols supported by ring")
483 .dangerous()
484 .with_custom_certificate_verifier(Arc::new(NoCertVerifier))
485 .with_no_client_auth()
486}
487
488#[cfg(any(test, feature = "test-utils"))]
490#[derive(Debug)]
491struct NoCertVerifier;
492
493#[cfg(any(test, feature = "test-utils"))]
494impl rustls::client::danger::ServerCertVerifier for NoCertVerifier {
495 fn verify_server_cert(
496 &self,
497 _end_entity: &rustls::pki_types::CertificateDer,
498 _intermediates: &[rustls::pki_types::CertificateDer],
499 _server_name: &rustls::pki_types::ServerName,
500 _ocsp_response: &[u8],
501 _now: rustls::pki_types::UnixTime,
502 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
503 Ok(rustls::client::danger::ServerCertVerified::assertion())
504 }
505 fn verify_tls12_signature(
506 &self,
507 _message: &[u8],
508 _cert: &rustls::pki_types::CertificateDer<'_>,
509 _dss: &rustls::DigitallySignedStruct,
510 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
511 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
512 }
513
514 fn verify_tls13_signature(
515 &self,
516 _message: &[u8],
517 _cert: &rustls::pki_types::CertificateDer<'_>,
518 _dss: &rustls::DigitallySignedStruct,
519 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
520 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
521 }
522
523 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
524 rustls::crypto::ring::default_provider()
525 .signature_verification_algorithms
526 .supported_schemes()
527 }
528}