iroh_quinn/connection.rs
1use std::{
2 any::Any,
3 fmt,
4 future::Future,
5 io,
6 net::{IpAddr, SocketAddr},
7 pin::Pin,
8 sync::{Arc, Weak},
9 task::{Context, Poll, Waker, ready},
10};
11
12use bytes::Bytes;
13use pin_project_lite::pin_project;
14use rustc_hash::FxHashMap;
15use thiserror::Error;
16use tokio::sync::{Notify, futures::Notified, mpsc, oneshot, watch};
17use tracing::{Instrument, Span, debug_span};
18
19use crate::{
20 ConnectionEvent, Duration, Instant, Path, VarInt,
21 endpoint::ensure_ipv6,
22 mutex::Mutex,
23 path::OpenPath,
24 recv_stream::RecvStream,
25 runtime::{AsyncTimer, Runtime, UdpSender},
26 send_stream::SendStream,
27 udp_transmit,
28};
29use proto::{
30 ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, PathError, PathEvent,
31 PathId, PathStats, PathStatus, Side, StreamEvent, StreamId, TransportError, TransportErrorCode,
32 congestion::Controller, iroh_hp,
33};
34
35/// In-progress connection attempt future
36#[derive(Debug)]
37pub struct Connecting {
38 conn: Option<ConnectionRef>,
39 connected: oneshot::Receiver<bool>,
40 handshake_data_ready: Option<oneshot::Receiver<()>>,
41}
42
43impl Connecting {
44 pub(crate) fn new(
45 handle: ConnectionHandle,
46 conn: proto::Connection,
47 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
48 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
49 sender: Pin<Box<dyn UdpSender>>,
50 runtime: Arc<dyn Runtime>,
51 ) -> Self {
52 let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
53 let (on_connected_send, on_connected_recv) = oneshot::channel();
54
55 let conn = ConnectionRef(Arc::new(ConnectionInner {
56 state: Mutex::new(State::new(
57 conn,
58 handle,
59 endpoint_events,
60 conn_events,
61 on_handshake_data_send,
62 on_connected_send,
63 sender,
64 runtime.clone(),
65 )),
66 shared: Shared::default(),
67 }));
68
69 let driver = ConnectionDriver(conn.clone());
70 runtime.spawn(Box::pin(
71 async {
72 if let Err(e) = driver.await {
73 tracing::error!("I/O error: {e}");
74 }
75 }
76 .instrument(Span::current()),
77 ));
78
79 Self {
80 conn: Some(conn),
81 connected: on_connected_recv,
82 handshake_data_ready: Some(on_handshake_data_recv),
83 }
84 }
85
86 /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
87 ///
88 /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
89 /// If so, the returned [`Connection`] can be used to send application data without waiting for
90 /// the rest of the handshake to complete, at the cost of weakened cryptographic security
91 /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
92 /// complete, at which point subsequently opened streams and written data will have full
93 /// cryptographic protection.
94 ///
95 /// ## Outgoing
96 ///
97 /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
98 /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
99 /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
100 /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
101 /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
102 /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
103 /// If it was rejected, the existence of streams opened and other application data sent prior
104 /// to the handshake completing will not be conveyed to the remote application, and local
105 /// operations on them will return `ZeroRttRejected` errors.
106 ///
107 /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
108 /// relevant resumption state to be stored in the server, which servers may limit or lose for
109 /// various reasons including not persisting resumption state across server restarts.
110 ///
111 /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
112 /// implementation's docs for 0-RTT pitfalls.
113 ///
114 /// ## Incoming
115 ///
116 /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
117 /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
118 ///
119 /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
120 /// implementation's docs for 0-RTT pitfalls.
121 ///
122 /// ## Security
123 ///
124 /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
125 /// replay attacks, and should therefore never invoke non-idempotent operations.
126 ///
127 /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
128 /// before TLS client authentication has occurred, and should therefore not be used to send
129 /// data for which client authentication is being used.
130 pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
131 // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
132 // have to release it explicitly before returning `self` by value.
133 let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
134
135 let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
136 drop(conn);
137
138 if is_ok {
139 let conn = self.conn.take().unwrap();
140 Ok((Connection(conn), ZeroRttAccepted(self.connected)))
141 } else {
142 Err(self)
143 }
144 }
145
146 /// Parameters negotiated during the handshake
147 ///
148 /// The dynamic type returned is determined by the configured
149 /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
150 /// be [`downcast`](Box::downcast) to a
151 /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
152 pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
153 // Taking &mut self allows us to use a single oneshot channel rather than dealing with
154 // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
155 // simple.
156 if let Some(x) = self.handshake_data_ready.take() {
157 let _ = x.await;
158 }
159 let conn = self.conn.as_ref().unwrap();
160 let inner = conn.state.lock("handshake");
161 inner
162 .inner
163 .crypto_session()
164 .handshake_data()
165 .ok_or_else(|| {
166 inner
167 .error
168 .clone()
169 .expect("spurious handshake data ready notification")
170 })
171 }
172
173 /// The local IP address which was used when the peer established
174 /// the connection
175 ///
176 /// This can be different from the address the endpoint is bound to, in case
177 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
178 ///
179 /// This will return `None` for clients, or when the platform does not expose this
180 /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
181 /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
182 ///
183 /// Will panic if called after `poll` has returned `Ready`.
184 pub fn local_ip(&self) -> Option<IpAddr> {
185 let conn = self.conn.as_ref().unwrap();
186 let inner = conn.state.lock("local_ip");
187
188 inner.inner.local_ip()
189 }
190
191 /// The peer's UDP address
192 ///
193 /// Will panic if called after `poll` has returned `Ready`.
194 pub fn remote_address(&self) -> SocketAddr {
195 let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
196 // TODO: another unwrap
197 conn_ref
198 .state
199 .lock("remote_address")
200 .inner
201 .path_remote_address(PathId::ZERO)
202 .expect("path exists when connecting")
203 }
204}
205
206impl Future for Connecting {
207 type Output = Result<Connection, ConnectionError>;
208 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
209 Pin::new(&mut self.connected).poll(cx).map(|_| {
210 let conn = self.conn.take().unwrap();
211 let inner = conn.state.lock("connecting");
212 if inner.connected {
213 drop(inner);
214 Ok(Connection(conn))
215 } else {
216 Err(inner
217 .error
218 .clone()
219 .expect("connected signaled without connection success or error"))
220 }
221 })
222 }
223}
224
225/// Future that completes when a connection is fully established
226///
227/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
228/// value is meaningless.
229pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
230
231impl Future for ZeroRttAccepted {
232 type Output = bool;
233 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
234 Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
235 }
236}
237
238/// A future that drives protocol logic for a connection
239///
240/// This future handles the protocol logic for a single connection, routing events from the
241/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
242/// It also keeps track of outstanding timeouts for the `Connection`.
243///
244/// If the connection encounters an error condition, this future will yield an error. It will
245/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
246/// connection-related futures, this waits for the draining period to complete to ensure that
247/// packets still in flight from the peer are handled gracefully.
248#[must_use = "connection drivers must be spawned for their connections to function"]
249#[derive(Debug)]
250struct ConnectionDriver(ConnectionRef);
251
252impl Future for ConnectionDriver {
253 type Output = Result<(), io::Error>;
254
255 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
256 let conn = &mut *self.0.state.lock("poll");
257
258 let span = debug_span!("drive", id = conn.handle.0);
259 let _guard = span.enter();
260
261 if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
262 conn.terminate(e, &self.0.shared);
263 return Poll::Ready(Ok(()));
264 }
265 let mut keep_going = conn.drive_transmit(cx)?;
266 // If a timer expires, there might be more to transmit. When we transmit something, we
267 // might need to reset a timer. Hence, we must loop until neither happens.
268 keep_going |= conn.drive_timer(cx);
269 conn.forward_endpoint_events();
270 conn.forward_app_events(&self.0.shared);
271
272 if !conn.inner.is_drained() {
273 if keep_going {
274 // If the connection hasn't processed all tasks, schedule it again
275 cx.waker().wake_by_ref();
276 } else {
277 conn.driver = Some(cx.waker().clone());
278 }
279 return Poll::Pending;
280 }
281 if conn.error.is_none() {
282 unreachable!("drained connections always have an error");
283 }
284 Poll::Ready(Ok(()))
285 }
286}
287
288/// A QUIC connection.
289///
290/// If all references to a connection (including every clone of the `Connection` handle, streams of
291/// incoming streams, and the various stream types) have been dropped, then the connection will be
292/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
293/// connection explicitly by calling [`Connection::close()`].
294///
295/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon
296/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
297/// application. [`Connection::close()`] describes in more detail how to gracefully close a
298/// connection without losing application data.
299///
300/// May be cloned to obtain another handle to the same connection.
301///
302/// [`Connection::close()`]: Connection::close
303#[derive(Debug, Clone)]
304pub struct Connection(ConnectionRef);
305
306impl Connection {
307 /// Returns a weak reference to the inner connection struct.
308 pub fn weak_handle(&self) -> WeakConnectionHandle {
309 WeakConnectionHandle(Arc::downgrade(&self.0.0))
310 }
311
312 /// Initiate a new outgoing unidirectional stream.
313 ///
314 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
315 /// consequence, the peer won't be notified that a stream has been opened until the stream is
316 /// actually used.
317 pub fn open_uni(&self) -> OpenUni<'_> {
318 OpenUni {
319 conn: &self.0,
320 notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
321 }
322 }
323
324 /// Initiate a new outgoing bidirectional stream.
325 ///
326 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
327 /// consequence, the peer won't be notified that a stream has been opened until the stream is
328 /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
329 /// anything to [`SendStream`] will never succeed.
330 ///
331 /// [`open_bi()`]: crate::Connection::open_bi
332 /// [`SendStream`]: crate::SendStream
333 /// [`RecvStream`]: crate::RecvStream
334 pub fn open_bi(&self) -> OpenBi<'_> {
335 OpenBi {
336 conn: &self.0,
337 notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
338 }
339 }
340
341 /// Accept the next incoming uni-directional stream
342 pub fn accept_uni(&self) -> AcceptUni<'_> {
343 AcceptUni {
344 conn: &self.0,
345 notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
346 }
347 }
348
349 /// Accept the next incoming bidirectional stream
350 ///
351 /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
352 /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
353 /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
354 ///
355 /// [`accept_bi()`]: crate::Connection::accept_bi
356 /// [`open_bi()`]: crate::Connection::open_bi
357 /// [`SendStream`]: crate::SendStream
358 /// [`RecvStream`]: crate::RecvStream
359 pub fn accept_bi(&self) -> AcceptBi<'_> {
360 AcceptBi {
361 conn: &self.0,
362 notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
363 }
364 }
365
366 /// Receive an application datagram
367 pub fn read_datagram(&self) -> ReadDatagram<'_> {
368 ReadDatagram {
369 conn: &self.0,
370 notify: self.0.shared.datagram_received.notified(),
371 }
372 }
373
374 /// Opens a new path if no path exists yet for the remote address.
375 ///
376 /// Otherwise behaves exactly as [`open_path`].
377 ///
378 /// [`open_path`]: Self::open_path
379 pub fn open_path_ensure(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath {
380 let mut state = self.0.state.lock("open_path");
381
382 // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
383 // If not, we do not support IPv6. We can not access endpoint::State from here
384 // however, but either all our paths use an IPv6 address, or all our paths use an
385 // IPv4 address. So we can use that information.
386 let ipv6 = state
387 .inner
388 .paths()
389 .iter()
390 .filter_map(|id| {
391 state
392 .inner
393 .path_remote_address(*id)
394 .map(|ip| ip.is_ipv6())
395 .ok()
396 })
397 .next()
398 .unwrap_or_default();
399 if addr.is_ipv6() && !ipv6 {
400 return OpenPath::rejected(PathError::InvalidRemoteAddress(addr));
401 }
402 let addr = if ipv6 {
403 SocketAddr::V6(ensure_ipv6(addr))
404 } else {
405 addr
406 };
407
408 let now = state.runtime.now();
409 let open_res = state.inner.open_path_ensure(addr, initial_status, now);
410 state.wake();
411 match open_res {
412 Ok((path_id, existed)) if existed => {
413 match state.open_path.get(&path_id).map(|tx| tx.subscribe()) {
414 Some(recv) => OpenPath::new(path_id, recv, self.0.clone()),
415 None => OpenPath::ready(path_id, self.0.clone()),
416 }
417 }
418 Ok((path_id, _)) => {
419 let (tx, rx) = watch::channel(Ok(()));
420 state.open_path.insert(path_id, tx);
421 drop(state);
422 OpenPath::new(path_id, rx, self.0.clone())
423 }
424 Err(err) => OpenPath::rejected(err),
425 }
426 }
427
428 /// Opens an additional path if the multipath extension is negotiated.
429 ///
430 /// The returned future completes once the path is either fully opened and ready to
431 /// carry application data, or if there was an error.
432 ///
433 /// Dropping the returned future does not cancel the opening of the path, the
434 /// [`PathEvent::Opened`] event will still be emitted from [`Self::path_events`] if the
435 /// path opens. The [`PathId`] for the events can be extracted from
436 /// [`OpenPath::path_id`].
437 ///
438 /// Failure to open a path can either occur immediately, before polling the returned
439 /// future, or at a later time. If the failure is immediate [`OpenPath::path_id`] will
440 /// return `None` and the future will be ready immediately. If the failure happens
441 /// later, a [`PathEvent`] will be emitted.
442 pub fn open_path(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath {
443 let mut state = self.0.state.lock("open_path");
444
445 // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
446 // If not, we do not support IPv6. We can not access endpoint::State from here
447 // however, but either all our paths use an IPv6 address, or all our paths use an
448 // IPv4 address. So we can use that information.
449 let ipv6 = state
450 .inner
451 .paths()
452 .iter()
453 .filter_map(|id| {
454 state
455 .inner
456 .path_remote_address(*id)
457 .map(|ip| ip.is_ipv6())
458 .ok()
459 })
460 .next()
461 .unwrap_or_default();
462 if addr.is_ipv6() && !ipv6 {
463 return OpenPath::rejected(PathError::InvalidRemoteAddress(addr));
464 }
465 let addr = if ipv6 {
466 SocketAddr::V6(ensure_ipv6(addr))
467 } else {
468 addr
469 };
470
471 let (on_open_path_send, on_open_path_recv) = watch::channel(Ok(()));
472 let now = state.runtime.now();
473 let open_res = state.inner.open_path(addr, initial_status, now);
474 state.wake();
475 match open_res {
476 Ok(path_id) => {
477 state.open_path.insert(path_id, on_open_path_send);
478 drop(state);
479 OpenPath::new(path_id, on_open_path_recv, self.0.clone())
480 }
481 Err(err) => OpenPath::rejected(err),
482 }
483 }
484
485 /// Returns the [`Path`] structure of an open path
486 pub fn path(&self, id: PathId) -> Option<Path> {
487 // TODO(flub): Using this to know if the path still exists is... hacky.
488 self.0.state.lock("path").inner.path_status(id).ok()?;
489 Some(Path {
490 id,
491 conn: self.0.clone(),
492 })
493 }
494
495 /// A broadcast receiver of [`PathEvent`]s for all paths in this connection
496 pub fn path_events(&self) -> tokio::sync::broadcast::Receiver<PathEvent> {
497 self.0.state.lock("path_events").path_events.subscribe()
498 }
499
500 /// A broadcast receiver of [`iroh_hp::Event`]s for updates about server addresses
501 pub fn nat_traversal_updates(&self) -> tokio::sync::broadcast::Receiver<iroh_hp::Event> {
502 self.0
503 .state
504 .lock("nat_traversal_updates")
505 .nat_traversal_updates
506 .subscribe()
507 }
508
509 /// Wait for the connection to be closed for any reason
510 ///
511 /// Despite the return type's name, closed connections are often not an error condition at the
512 /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
513 /// and [`ConnectionError::ApplicationClosed`].
514 pub async fn closed(&self) -> ConnectionError {
515 {
516 let conn = self.0.state.lock("closed");
517 if let Some(error) = conn.error.as_ref() {
518 return error.clone();
519 }
520 // Construct the future while the lock is held to ensure we can't miss a wakeup if
521 // the `Notify` is signaled immediately after we release the lock. `await` it after
522 // the lock guard is out of scope.
523 self.0.shared.closed.notified()
524 }
525 .await;
526 self.0
527 .state
528 .lock("closed")
529 .error
530 .as_ref()
531 .expect("closed without an error")
532 .clone()
533 }
534
535 /// Wait for the connection to be closed without keeping a strong reference to the connection
536 ///
537 /// Returns a future that resolves, once the connection is closed, to a tuple of
538 /// ([`ConnectionError`], [`ConnectionStats`]).
539 ///
540 /// Calling [`Self::closed`] keeps the connection alive until it is either closed locally via [`Connection::close`]
541 /// or closed by the remote peer. This function instead does not keep the connection itself alive,
542 /// so if all *other* clones of the connection are dropped, the connection will be closed implicitly even
543 /// if there are futures returned from this function still being awaited.
544 pub fn on_closed(&self) -> OnClosed {
545 let (tx, rx) = oneshot::channel();
546 self.0.state.lock("on_closed").on_closed.push(tx);
547 OnClosed {
548 conn: self.weak_handle(),
549 rx,
550 }
551 }
552
553 /// If the connection is closed, the reason why.
554 ///
555 /// Returns `None` if the connection is still open.
556 pub fn close_reason(&self) -> Option<ConnectionError> {
557 self.0.state.lock("close_reason").error.clone()
558 }
559
560 /// Close the connection immediately.
561 ///
562 /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
563 /// more data is sent to the peer and the peer may drop buffered data upon receiving
564 /// the CONNECTION_CLOSE frame.
565 ///
566 /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
567 ///
568 /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
569 /// is preserved in full, it should be kept under 1KiB.
570 ///
571 /// # Gracefully closing a connection
572 ///
573 /// Only the peer last receiving application data can be certain that all data is
574 /// delivered. The only reliable action it can then take is to close the connection,
575 /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
576 /// frame is very likely if both endpoints stay online long enough, and
577 /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
578 /// remote peer will time out the connection, provided that the idle timeout is not
579 /// disabled.
580 ///
581 /// The sending side can not guarantee all stream data is delivered to the remote
582 /// application. It only knows the data is delivered to the QUIC stack of the remote
583 /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
584 /// [`close()`] the remote endpoint may drop any data it received but is as yet
585 /// undelivered to the application, including data that was acknowledged as received to
586 /// the local endpoint.
587 ///
588 /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
589 /// [`Endpoint::wait_idle()`]: crate::Endpoint::wait_idle
590 /// [`close()`]: Connection::close
591 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
592 let conn = &mut *self.0.state.lock("close");
593 conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
594 }
595
596 /// Wait for the handshake to be confirmed.
597 ///
598 /// As a server, who must be authenticated by clients,
599 /// this happens when the handshake completes
600 /// upon receiving a TLS Finished message from the client.
601 /// In return, the server send a HANDSHAKE_DONE frame.
602 ///
603 /// As a client, this happens when receiving a HANDSHAKE_DONE frame.
604 /// At this point, the server has either accepted our authentication,
605 /// or, if client authentication is not required, accepted our lack of authentication.
606 pub async fn handshake_confirmed(&self) -> Result<(), ConnectionError> {
607 {
608 let conn = self.0.state.lock("handshake_confirmed");
609 if let Some(error) = conn.error.as_ref() {
610 return Err(error.clone());
611 }
612 if conn.handshake_confirmed {
613 return Ok(());
614 }
615 // Construct the future while the lock is held to ensure we can't miss a wakeup if
616 // the `Notify` is signaled immediately after we release the lock. `await` it after
617 // the lock guard is out of scope.
618 self.0.shared.handshake_confirmed.notified()
619 }
620 .await;
621 if let Some(error) = self.0.state.lock("handshake_confirmed").error.as_ref() {
622 Err(error.clone())
623 } else {
624 Ok(())
625 }
626 }
627
628 /// Transmit `data` as an unreliable, unordered application datagram
629 ///
630 /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
631 /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
632 /// dictated by the peer.
633 ///
634 /// Previously queued datagrams which are still unsent may be discarded to make space for this
635 /// datagram, in order of oldest to newest.
636 pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
637 let conn = &mut *self.0.state.lock("send_datagram");
638 if let Some(ref x) = conn.error {
639 return Err(SendDatagramError::ConnectionLost(x.clone()));
640 }
641 use proto::SendDatagramError::*;
642 match conn.inner.datagrams().send(data, true) {
643 Ok(()) => {
644 conn.wake();
645 Ok(())
646 }
647 Err(e) => Err(match e {
648 Blocked(..) => unreachable!(),
649 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
650 Disabled => SendDatagramError::Disabled,
651 TooLarge => SendDatagramError::TooLarge,
652 }),
653 }
654 }
655
656 /// Transmit `data` as an unreliable, unordered application datagram
657 ///
658 /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
659 /// conditions, which effectively prioritizes old datagrams over new datagrams.
660 ///
661 /// See [`send_datagram()`] for details.
662 ///
663 /// [`send_datagram()`]: Connection::send_datagram
664 pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
665 SendDatagram {
666 conn: &self.0,
667 data: Some(data),
668 notify: self.0.shared.datagrams_unblocked.notified(),
669 }
670 }
671
672 /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
673 ///
674 /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
675 ///
676 /// This may change over the lifetime of a connection according to variation in the path MTU
677 /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
678 /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
679 ///
680 /// Not necessarily the maximum size of received datagrams.
681 ///
682 /// [`send_datagram()`]: Connection::send_datagram
683 pub fn max_datagram_size(&self) -> Option<usize> {
684 self.0
685 .state
686 .lock("max_datagram_size")
687 .inner
688 .datagrams()
689 .max_size()
690 }
691
692 /// Bytes available in the outgoing datagram buffer
693 ///
694 /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
695 /// at most this size is guaranteed not to cause older datagrams to be dropped.
696 pub fn datagram_send_buffer_space(&self) -> usize {
697 self.0
698 .state
699 .lock("datagram_send_buffer_space")
700 .inner
701 .datagrams()
702 .send_buffer_space()
703 }
704
705 /// The side of the connection (client or server)
706 pub fn side(&self) -> Side {
707 self.0.state.lock("side").inner.side()
708 }
709
710 /// The peer's UDP address
711 ///
712 /// If [`ServerConfig::migration`] is `true`, clients may change addresses at will,
713 /// e.g. when switching to a cellular internet connection.
714 ///
715 /// If [`multipath`] is enabled this will return the address of *any*
716 /// path, and may not be consistent. Prefer [`Path::remote_address`] instead.
717 ///
718 /// [`ServerConfig::migration`]: crate::ServerConfig::migration
719 /// [`multipath`]: crate::TransportConfig::max_concurrent_multipath_paths
720 pub fn remote_address(&self) -> SocketAddr {
721 // TODO: an unwrap again
722 let state = self.0.state.lock("remote_address");
723 state
724 .inner
725 .paths()
726 .iter()
727 .filter_map(|id| state.inner.path_remote_address(*id).ok())
728 .next()
729 .unwrap()
730 }
731
732 /// The local IP address which was used when the peer established
733 /// the connection
734 ///
735 /// This can be different from the address the endpoint is bound to, in case
736 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
737 ///
738 /// This will return `None` for clients, or when the platform does not expose this
739 /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
740 /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
741 pub fn local_ip(&self) -> Option<IpAddr> {
742 self.0.state.lock("local_ip").inner.local_ip()
743 }
744
745 /// Current best estimate of this connection's latency (round-trip-time)
746 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
747 self.0.state.lock("rtt").inner.rtt(path_id)
748 }
749
750 /// Returns connection statistics
751 pub fn stats(&self) -> ConnectionStats {
752 self.0.state.lock("stats").inner.stats()
753 }
754
755 /// Returns path statistics
756 pub fn path_stats(&self, path_id: PathId) -> Option<PathStats> {
757 self.0.state.lock("path_stats").inner.path_stats(path_id)
758 }
759
760 /// Current state of the congestion control algorithm, for debugging purposes
761 pub fn congestion_state(&self, path_id: PathId) -> Option<Box<dyn Controller>> {
762 self.0
763 .state
764 .lock("congestion_state")
765 .inner
766 .congestion_state(path_id)
767 .map(|c| c.clone_box())
768 }
769
770 /// Parameters negotiated during the handshake
771 ///
772 /// Guaranteed to return `Some` on fully established connections or after
773 /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
774 /// the returned value.
775 ///
776 /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
777 pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
778 self.0
779 .state
780 .lock("handshake_data")
781 .inner
782 .crypto_session()
783 .handshake_data()
784 }
785
786 /// Cryptographic identity of the peer
787 ///
788 /// The dynamic type returned is determined by the configured
789 /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
790 /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
791 pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
792 self.0
793 .state
794 .lock("peer_identity")
795 .inner
796 .crypto_session()
797 .peer_identity()
798 }
799
800 /// A stable identifier for this connection
801 ///
802 /// Peer addresses and connection IDs can change, but this value will remain
803 /// fixed for the lifetime of the connection.
804 pub fn stable_id(&self) -> usize {
805 self.0.stable_id()
806 }
807
808 /// Update traffic keys spontaneously
809 ///
810 /// This primarily exists for testing purposes.
811 pub fn force_key_update(&self) {
812 self.0
813 .state
814 .lock("force_key_update")
815 .inner
816 .force_key_update()
817 }
818
819 /// Derive keying material from this connection's TLS session secrets.
820 ///
821 /// When both peers call this method with the same `label` and `context`
822 /// arguments and `output` buffers of equal length, they will get the
823 /// same sequence of bytes in `output`. These bytes are cryptographically
824 /// strong and pseudorandom, and are suitable for use as keying material.
825 ///
826 /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
827 pub fn export_keying_material(
828 &self,
829 output: &mut [u8],
830 label: &[u8],
831 context: &[u8],
832 ) -> Result<(), proto::crypto::ExportKeyingMaterialError> {
833 self.0
834 .state
835 .lock("export_keying_material")
836 .inner
837 .crypto_session()
838 .export_keying_material(output, label, context)
839 }
840
841 /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
842 ///
843 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
844 /// `count`s increase both minimum and worst-case memory consumption.
845 pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
846 let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
847 conn.inner.set_max_concurrent_streams(Dir::Uni, count);
848 // May need to send MAX_STREAMS to make progress
849 conn.wake();
850 }
851
852 /// See [`proto::TransportConfig::send_window()`]
853 pub fn set_send_window(&self, send_window: u64) {
854 let mut conn = self.0.state.lock("set_send_window");
855 conn.inner.set_send_window(send_window);
856 conn.wake();
857 }
858
859 /// See [`proto::TransportConfig::receive_window()`]
860 pub fn set_receive_window(&self, receive_window: VarInt) {
861 let mut conn = self.0.state.lock("set_receive_window");
862 conn.inner.set_receive_window(receive_window);
863 conn.wake();
864 }
865
866 /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
867 ///
868 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
869 /// `count`s increase both minimum and worst-case memory consumption.
870 pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
871 let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
872 conn.inner.set_max_concurrent_streams(Dir::Bi, count);
873 // May need to send MAX_STREAMS to make progress
874 conn.wake();
875 }
876
877 /// Track changed on our external address as reported by the peer.
878 pub fn observed_external_addr(&self) -> watch::Receiver<Option<SocketAddr>> {
879 let conn = self.0.state.lock("external_addr");
880 conn.observed_external_addr.subscribe()
881 }
882
883 /// Is multipath enabled?
884 // TODO(flub): not a useful API, once we do real things with multipath we can remove
885 // this again.
886 pub fn is_multipath_enabled(&self) -> bool {
887 let conn = self.0.state.lock("is_multipath_enabled");
888 conn.inner.is_multipath_negotiated()
889 }
890
891 /// Registers one address at which this endpoint might be reachable
892 ///
893 /// When the NAT traversal extension is negotiated, servers send these addresses to clients in
894 /// `ADD_ADDRESS` frames. This allows clients to obtain server address candidates to initiate
895 /// NAT traversal attempts. Clients provide their own reachable addresses in `REACH_OUT` frames
896 /// when [`Self::initiate_nat_traversal_round`] is called.
897 pub fn add_nat_traversal_address(&self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
898 let mut conn = self.0.state.lock("add_nat_traversal_addresses");
899 conn.inner.add_nat_traversal_address(address)
900 }
901
902 /// Removes one or more addresses from the set of addresses at which this endpoint is reachable
903 ///
904 /// When the NAT traversal extension is negotiated, servers send address removals to
905 /// clients in `REMOVE_ADDRESS` frames. This allows clients to stop using outdated
906 /// server address candidates that are no longer valid for NAT traversal.
907 ///
908 /// For clients, removed addresses will no longer be advertised in `REACH_OUT` frames.
909 ///
910 /// Addresses not present in the set will be silently ignored.
911 pub fn remove_nat_traversal_address(&self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
912 let mut conn = self.0.state.lock("remove_nat_traversal_addresses");
913 conn.inner.remove_nat_traversal_address(address)
914 }
915
916 /// Get the current local nat traversal addresses
917 pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
918 let conn = self.0.state.lock("get_local_nat_traversal_addresses");
919 conn.inner.get_local_nat_traversal_addresses()
920 }
921
922 /// Get the currently advertised nat traversal addresses by the server
923 pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
924 let conn = self.0.state.lock("get_remote_nat_traversal_addresses");
925 conn.inner.get_remote_nat_traversal_addresses()
926 }
927
928 /// Initiates a new nat traversal round
929 ///
930 /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
931 /// frames, and initiating probing of the known remote addresses. When a new round is
932 /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
933 ///
934 /// Returns the server addresses that are now being probed.
935 pub fn initiate_nat_traversal_round(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
936 let mut conn = self.0.state.lock("initiate_nat_traversal_round");
937 let now = conn.runtime.now();
938 conn.inner.initiate_nat_traversal_round(now)
939 }
940}
941
942pin_project! {
943 /// Future produced by [`Connection::open_uni`]
944 pub struct OpenUni<'a> {
945 conn: &'a ConnectionRef,
946 #[pin]
947 notify: Notified<'a>,
948 }
949}
950
951impl Future for OpenUni<'_> {
952 type Output = Result<SendStream, ConnectionError>;
953 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
954 let this = self.project();
955 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
956 Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
957 }
958}
959
960pin_project! {
961 /// Future produced by [`Connection::open_bi`]
962 pub struct OpenBi<'a> {
963 conn: &'a ConnectionRef,
964 #[pin]
965 notify: Notified<'a>,
966 }
967}
968
969impl Future for OpenBi<'_> {
970 type Output = Result<(SendStream, RecvStream), ConnectionError>;
971 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
972 let this = self.project();
973 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
974
975 Poll::Ready(Ok((
976 SendStream::new(conn.clone(), id, is_0rtt),
977 RecvStream::new(conn, id, is_0rtt),
978 )))
979 }
980}
981
982fn poll_open<'a>(
983 ctx: &mut Context<'_>,
984 conn: &'a ConnectionRef,
985 mut notify: Pin<&mut Notified<'a>>,
986 dir: Dir,
987) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
988 let mut state = conn.state.lock("poll_open");
989 if let Some(ref e) = state.error {
990 return Poll::Ready(Err(e.clone()));
991 } else if let Some(id) = state.inner.streams().open(dir) {
992 let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
993 drop(state); // Release the lock so clone can take it
994 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
995 }
996 loop {
997 match notify.as_mut().poll(ctx) {
998 // `state` lock ensures we didn't race with readiness
999 Poll::Pending => return Poll::Pending,
1000 // Spurious wakeup, get a new future
1001 Poll::Ready(()) => {
1002 notify.set(conn.shared.stream_budget_available[dir as usize].notified())
1003 }
1004 }
1005 }
1006}
1007
1008pin_project! {
1009 /// Future produced by [`Connection::accept_uni`]
1010 pub struct AcceptUni<'a> {
1011 conn: &'a ConnectionRef,
1012 #[pin]
1013 notify: Notified<'a>,
1014 }
1015}
1016
1017impl Future for AcceptUni<'_> {
1018 type Output = Result<RecvStream, ConnectionError>;
1019
1020 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1021 let this = self.project();
1022 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
1023 Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
1024 }
1025}
1026
1027pin_project! {
1028 /// Future produced by [`Connection::accept_bi`]
1029 pub struct AcceptBi<'a> {
1030 conn: &'a ConnectionRef,
1031 #[pin]
1032 notify: Notified<'a>,
1033 }
1034}
1035
1036impl Future for AcceptBi<'_> {
1037 type Output = Result<(SendStream, RecvStream), ConnectionError>;
1038
1039 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1040 let this = self.project();
1041 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
1042 Poll::Ready(Ok((
1043 SendStream::new(conn.clone(), id, is_0rtt),
1044 RecvStream::new(conn, id, is_0rtt),
1045 )))
1046 }
1047}
1048
1049fn poll_accept<'a>(
1050 ctx: &mut Context<'_>,
1051 conn: &'a ConnectionRef,
1052 mut notify: Pin<&mut Notified<'a>>,
1053 dir: Dir,
1054) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1055 let mut state = conn.state.lock("poll_accept");
1056 // Check for incoming streams before checking `state.error` so that already-received streams,
1057 // which are necessarily finite, can be drained from a closed connection.
1058 if let Some(id) = state.inner.streams().accept(dir) {
1059 let is_0rtt = state.inner.is_handshaking();
1060 state.wake(); // To send additional stream ID credit
1061 drop(state); // Release the lock so clone can take it
1062 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1063 } else if let Some(ref e) = state.error {
1064 return Poll::Ready(Err(e.clone()));
1065 }
1066 loop {
1067 match notify.as_mut().poll(ctx) {
1068 // `state` lock ensures we didn't race with readiness
1069 Poll::Pending => return Poll::Pending,
1070 // Spurious wakeup, get a new future
1071 Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
1072 }
1073 }
1074}
1075
1076pin_project! {
1077 /// Future produced by [`Connection::read_datagram`]
1078 pub struct ReadDatagram<'a> {
1079 conn: &'a ConnectionRef,
1080 #[pin]
1081 notify: Notified<'a>,
1082 }
1083}
1084
1085impl Future for ReadDatagram<'_> {
1086 type Output = Result<Bytes, ConnectionError>;
1087 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1088 let mut this = self.project();
1089 let mut state = this.conn.state.lock("ReadDatagram::poll");
1090 // Check for buffered datagrams before checking `state.error` so that already-received
1091 // datagrams, which are necessarily finite, can be drained from a closed connection.
1092 if let Some(x) = state.inner.datagrams().recv() {
1093 return Poll::Ready(Ok(x));
1094 } else if let Some(ref e) = state.error {
1095 return Poll::Ready(Err(e.clone()));
1096 }
1097 loop {
1098 match this.notify.as_mut().poll(ctx) {
1099 // `state` lock ensures we didn't race with readiness
1100 Poll::Pending => return Poll::Pending,
1101 // Spurious wakeup, get a new future
1102 Poll::Ready(()) => this
1103 .notify
1104 .set(this.conn.shared.datagram_received.notified()),
1105 }
1106 }
1107 }
1108}
1109
1110pin_project! {
1111 /// Future produced by [`Connection::send_datagram_wait`]
1112 pub struct SendDatagram<'a> {
1113 conn: &'a ConnectionRef,
1114 data: Option<Bytes>,
1115 #[pin]
1116 notify: Notified<'a>,
1117 }
1118}
1119
1120impl Future for SendDatagram<'_> {
1121 type Output = Result<(), SendDatagramError>;
1122 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1123 let mut this = self.project();
1124 let mut state = this.conn.state.lock("SendDatagram::poll");
1125 if let Some(ref e) = state.error {
1126 return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
1127 }
1128 use proto::SendDatagramError::*;
1129 match state
1130 .inner
1131 .datagrams()
1132 .send(this.data.take().unwrap(), false)
1133 {
1134 Ok(()) => {
1135 state.wake();
1136 Poll::Ready(Ok(()))
1137 }
1138 Err(e) => Poll::Ready(Err(match e {
1139 Blocked(data) => {
1140 this.data.replace(data);
1141 loop {
1142 match this.notify.as_mut().poll(ctx) {
1143 Poll::Pending => return Poll::Pending,
1144 // Spurious wakeup, get a new future
1145 Poll::Ready(()) => this
1146 .notify
1147 .set(this.conn.shared.datagrams_unblocked.notified()),
1148 }
1149 }
1150 }
1151 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1152 Disabled => SendDatagramError::Disabled,
1153 TooLarge => SendDatagramError::TooLarge,
1154 })),
1155 }
1156 }
1157}
1158
1159/// Future returned by [`Connection::on_closed`]
1160///
1161/// Resolves to a tuple of ([`ConnectionError`], [`ConnectionStats`]).
1162pub struct OnClosed {
1163 rx: oneshot::Receiver<(ConnectionError, ConnectionStats)>,
1164 conn: WeakConnectionHandle,
1165}
1166
1167impl Drop for OnClosed {
1168 fn drop(&mut self) {
1169 if self.rx.is_terminated() {
1170 return;
1171 };
1172 if let Some(conn) = self.conn.upgrade() {
1173 self.rx.close();
1174 conn.0
1175 .state
1176 .lock("OnClosed::drop")
1177 .on_closed
1178 .retain(|tx| !tx.is_closed());
1179 }
1180 }
1181}
1182
1183impl Future for OnClosed {
1184 type Output = (ConnectionError, ConnectionStats);
1185
1186 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1187 let this = self.get_mut();
1188 // The `expect` is safe because `State::drop` ensures that all senders are triggered
1189 // before being dropped.
1190 Pin::new(&mut this.rx)
1191 .poll(cx)
1192 .map(|x| x.expect("on_close sender is never dropped before sending"))
1193 }
1194}
1195
1196#[derive(Debug)]
1197pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1198
1199impl ConnectionRef {
1200 fn from_arc(inner: Arc<ConnectionInner>) -> Self {
1201 inner.state.lock("from_arc").ref_count += 1;
1202 Self(inner)
1203 }
1204
1205 fn stable_id(&self) -> usize {
1206 &*self.0 as *const _ as usize
1207 }
1208}
1209
1210impl Clone for ConnectionRef {
1211 fn clone(&self) -> Self {
1212 Self::from_arc(Arc::clone(&self.0))
1213 }
1214}
1215
1216impl Drop for ConnectionRef {
1217 fn drop(&mut self) {
1218 let conn = &mut *self.state.lock("drop");
1219 if let Some(x) = conn.ref_count.checked_sub(1) {
1220 conn.ref_count = x;
1221 if x == 0 && !conn.inner.is_closed() {
1222 // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1223 // not, we can't do any harm. If there were any streams being opened, then either
1224 // the connection will be closed for an unrelated reason or a fresh reference will
1225 // be constructed for the newly opened stream.
1226 conn.implicit_close(&self.shared);
1227 }
1228 }
1229 }
1230}
1231
1232impl std::ops::Deref for ConnectionRef {
1233 type Target = ConnectionInner;
1234 fn deref(&self) -> &Self::Target {
1235 &self.0
1236 }
1237}
1238
1239#[derive(Debug)]
1240pub(crate) struct ConnectionInner {
1241 pub(crate) state: Mutex<State>,
1242 pub(crate) shared: Shared,
1243}
1244
1245/// A handle to some connection internals, use with care.
1246///
1247/// This contains a weak reference to the connection so will not itself keep the connection
1248/// alive.
1249#[derive(Debug, Clone)]
1250pub struct WeakConnectionHandle(Weak<ConnectionInner>);
1251
1252impl WeakConnectionHandle {
1253 /// Returns `true` if the [`Connection`] associated with this handle is still alive.
1254 pub fn is_alive(&self) -> bool {
1255 self.0.upgrade().is_some()
1256 }
1257
1258 /// Upgrade the handle to a full `Connection`
1259 pub fn upgrade(&self) -> Option<Connection> {
1260 self.0
1261 .upgrade()
1262 .map(|inner| Connection(ConnectionRef::from_arc(inner)))
1263 }
1264}
1265
1266#[derive(Debug, Default)]
1267pub(crate) struct Shared {
1268 handshake_confirmed: Notify,
1269 /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1270 /// control budget
1271 stream_budget_available: [Notify; 2],
1272 /// Notified when the peer has initiated a new stream
1273 stream_incoming: [Notify; 2],
1274 datagram_received: Notify,
1275 datagrams_unblocked: Notify,
1276 closed: Notify,
1277}
1278
1279pub(crate) struct State {
1280 pub(crate) inner: proto::Connection,
1281 driver: Option<Waker>,
1282 handle: ConnectionHandle,
1283 on_handshake_data: Option<oneshot::Sender<()>>,
1284 on_connected: Option<oneshot::Sender<bool>>,
1285 connected: bool,
1286 handshake_confirmed: bool,
1287 timer: Option<Pin<Box<dyn AsyncTimer>>>,
1288 timer_deadline: Option<Instant>,
1289 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1290 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1291 pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1292 pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1293 pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1294 /// Always set to Some before the connection becomes drained
1295 pub(crate) error: Option<ConnectionError>,
1296 /// Tracks paths being opened
1297 open_path: FxHashMap<PathId, watch::Sender<Result<(), PathError>>>,
1298 /// Tracks paths being closed
1299 pub(crate) close_path: FxHashMap<PathId, oneshot::Sender<VarInt>>,
1300 pub(crate) path_events: tokio::sync::broadcast::Sender<PathEvent>,
1301 /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1302 ref_count: usize,
1303 sender: Pin<Box<dyn UdpSender>>,
1304 pub(crate) runtime: Arc<dyn Runtime>,
1305 send_buffer: Vec<u8>,
1306 /// We buffer a transmit when the underlying I/O would block
1307 buffered_transmit: Option<proto::Transmit>,
1308 /// Our last external address reported by the peer. When multipath is enabled, this will be the
1309 /// last report across all paths.
1310 pub(crate) observed_external_addr: watch::Sender<Option<SocketAddr>>,
1311 pub(crate) nat_traversal_updates: tokio::sync::broadcast::Sender<iroh_hp::Event>,
1312 on_closed: Vec<oneshot::Sender<(ConnectionError, ConnectionStats)>>,
1313}
1314
1315impl State {
1316 #[allow(clippy::too_many_arguments)]
1317 fn new(
1318 inner: proto::Connection,
1319 handle: ConnectionHandle,
1320 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1321 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1322 on_handshake_data: oneshot::Sender<()>,
1323 on_connected: oneshot::Sender<bool>,
1324 sender: Pin<Box<dyn UdpSender>>,
1325 runtime: Arc<dyn Runtime>,
1326 ) -> Self {
1327 Self {
1328 inner,
1329 driver: None,
1330 handle,
1331 on_handshake_data: Some(on_handshake_data),
1332 on_connected: Some(on_connected),
1333 connected: false,
1334 handshake_confirmed: false,
1335 timer: None,
1336 timer_deadline: None,
1337 conn_events,
1338 endpoint_events,
1339 blocked_writers: FxHashMap::default(),
1340 blocked_readers: FxHashMap::default(),
1341 stopped: FxHashMap::default(),
1342 open_path: FxHashMap::default(),
1343 close_path: FxHashMap::default(),
1344 error: None,
1345 ref_count: 0,
1346 sender,
1347 runtime,
1348 send_buffer: Vec::new(),
1349 buffered_transmit: None,
1350 path_events: tokio::sync::broadcast::channel(32).0,
1351 observed_external_addr: watch::Sender::new(None),
1352 nat_traversal_updates: tokio::sync::broadcast::channel(32).0,
1353 on_closed: Vec::new(),
1354 }
1355 }
1356
1357 fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1358 let now = self.runtime.now();
1359 let mut transmits = 0;
1360
1361 let max_datagrams = self
1362 .sender
1363 .max_transmit_segments()
1364 .min(MAX_TRANSMIT_SEGMENTS);
1365
1366 loop {
1367 // Retry the last transmit, or get a new one.
1368 let t = match self.buffered_transmit.take() {
1369 Some(t) => t,
1370 None => {
1371 self.send_buffer.clear();
1372 match self
1373 .inner
1374 .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1375 {
1376 Some(t) => {
1377 transmits += match t.segment_size {
1378 None => 1,
1379 Some(s) => t.size.div_ceil(s), // round up
1380 };
1381 t
1382 }
1383 None => break,
1384 }
1385 }
1386 };
1387
1388 let len = t.size;
1389 match self
1390 .sender
1391 .as_mut()
1392 .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx)
1393 {
1394 Poll::Pending => {
1395 self.buffered_transmit = Some(t);
1396 return Ok(false);
1397 }
1398 Poll::Ready(Err(e)) => return Err(e),
1399 Poll::Ready(Ok(())) => {}
1400 }
1401
1402 if transmits >= MAX_TRANSMIT_DATAGRAMS {
1403 // TODO: What isn't ideal here yet is that if we don't poll all
1404 // datagrams that could be sent we don't go into the `app_limited`
1405 // state and CWND continues to grow until we get here the next time.
1406 // See https://github.com/quinn-rs/quinn/issues/1126
1407 return Ok(true);
1408 }
1409 }
1410
1411 Ok(false)
1412 }
1413
1414 fn forward_endpoint_events(&mut self) {
1415 while let Some(event) = self.inner.poll_endpoint_events() {
1416 // If the endpoint driver is gone, noop.
1417 let _ = self.endpoint_events.send((self.handle, event));
1418 }
1419 }
1420
1421 /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1422 fn process_conn_events(
1423 &mut self,
1424 shared: &Shared,
1425 cx: &mut Context,
1426 ) -> Result<(), ConnectionError> {
1427 loop {
1428 match self.conn_events.poll_recv(cx) {
1429 Poll::Ready(Some(ConnectionEvent::Rebind(sender))) => {
1430 self.sender = sender;
1431 self.inner.local_address_changed();
1432 }
1433 Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1434 self.inner.handle_event(event);
1435 }
1436 Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1437 self.close(error_code, reason, shared);
1438 }
1439 Poll::Ready(None) => {
1440 return Err(ConnectionError::TransportError(TransportError::new(
1441 TransportErrorCode::INTERNAL_ERROR,
1442 "endpoint driver future was dropped".to_string(),
1443 )));
1444 }
1445 Poll::Pending => {
1446 return Ok(());
1447 }
1448 }
1449 }
1450 }
1451
1452 fn forward_app_events(&mut self, shared: &Shared) {
1453 while let Some(event) = self.inner.poll() {
1454 use proto::Event::*;
1455 match event {
1456 HandshakeDataReady => {
1457 if let Some(x) = self.on_handshake_data.take() {
1458 let _ = x.send(());
1459 }
1460 }
1461 Connected => {
1462 self.connected = true;
1463 if let Some(x) = self.on_connected.take() {
1464 // We don't care if the on-connected future was dropped
1465 let _ = x.send(self.inner.accepted_0rtt());
1466 }
1467 if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1468 // Wake up rejected 0-RTT streams so they can fail immediately with
1469 // `ZeroRttRejected` errors.
1470 wake_all(&mut self.blocked_writers);
1471 wake_all(&mut self.blocked_readers);
1472 wake_all_notify(&mut self.stopped);
1473 }
1474 }
1475 HandshakeConfirmed => {
1476 self.handshake_confirmed = true;
1477 shared.handshake_confirmed.notify_waiters();
1478 }
1479 ConnectionLost { reason } => {
1480 self.terminate(reason, shared);
1481 }
1482 Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1483 Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1484 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1485 }
1486 Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1487 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1488 }
1489 DatagramReceived => {
1490 shared.datagram_received.notify_waiters();
1491 }
1492 DatagramsUnblocked => {
1493 shared.datagrams_unblocked.notify_waiters();
1494 }
1495 Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1496 Stream(StreamEvent::Available { dir }) => {
1497 // Might mean any number of streams are ready, so we wake up everyone
1498 shared.stream_budget_available[dir as usize].notify_waiters();
1499 }
1500 Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1501 Stream(StreamEvent::Stopped { id, .. }) => {
1502 wake_stream_notify(id, &mut self.stopped);
1503 wake_stream(id, &mut self.blocked_writers);
1504 }
1505 Path(ref evt @ PathEvent::ObservedAddr { addr: observed, .. }) => {
1506 self.path_events.send(evt.clone()).ok();
1507 self.observed_external_addr.send_if_modified(|addr| {
1508 let old = addr.replace(observed);
1509 old != *addr
1510 });
1511 }
1512 Path(ref evt @ PathEvent::Opened { id }) => {
1513 self.path_events.send(evt.clone()).ok();
1514 if let Some(sender) = self.open_path.remove(&id) {
1515 sender.send_modify(|value| *value = Ok(()));
1516 }
1517 }
1518 Path(ref evt @ PathEvent::Closed { id, error_code }) => {
1519 self.path_events.send(evt.clone()).ok();
1520 if let Some(sender) = self.close_path.remove(&id) {
1521 let _ = sender.send(error_code);
1522 }
1523 }
1524 Path(evt @ PathEvent::Abandoned { .. }) => {
1525 self.path_events.send(evt).ok();
1526 }
1527 Path(ref evt @ PathEvent::LocallyClosed { id, error }) => {
1528 self.path_events.send(evt.clone()).ok();
1529 if let Some(sender) = self.open_path.remove(&id) {
1530 sender.send_modify(|value| *value = Err(error));
1531 }
1532 // this will happen also for already opened paths
1533 }
1534 Path(evt @ PathEvent::RemoteStatus { .. }) => {
1535 self.path_events.send(evt).ok();
1536 }
1537 NatTraversal(update) => {
1538 self.nat_traversal_updates.send(update).ok();
1539 }
1540 }
1541 }
1542 }
1543
1544 fn drive_timer(&mut self, cx: &mut Context) -> bool {
1545 // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1546 // timer is registered with the runtime (and check whether it's already
1547 // expired).
1548 match self.inner.poll_timeout() {
1549 Some(deadline) => {
1550 if let Some(delay) = &mut self.timer {
1551 // There is no need to reset the tokio timer if the deadline
1552 // did not change
1553 if self
1554 .timer_deadline
1555 .map(|current_deadline| current_deadline != deadline)
1556 .unwrap_or(true)
1557 {
1558 delay.as_mut().reset(deadline);
1559 }
1560 } else {
1561 self.timer = Some(self.runtime.new_timer(deadline));
1562 }
1563 // Store the actual expiration time of the timer
1564 self.timer_deadline = Some(deadline);
1565 }
1566 None => {
1567 self.timer_deadline = None;
1568 return false;
1569 }
1570 }
1571
1572 if self.timer_deadline.is_none() {
1573 return false;
1574 }
1575
1576 let delay = self
1577 .timer
1578 .as_mut()
1579 .expect("timer must exist in this state")
1580 .as_mut();
1581 if delay.poll(cx).is_pending() {
1582 // Since there wasn't a timeout event, there is nothing new
1583 // for the connection to do
1584 return false;
1585 }
1586
1587 // A timer expired, so the caller needs to check for
1588 // new transmits, which might cause new timers to be set.
1589 self.inner.handle_timeout(self.runtime.now());
1590 self.timer_deadline = None;
1591 true
1592 }
1593
1594 /// Wake up a blocked `Driver` task to process I/O
1595 pub(crate) fn wake(&mut self) {
1596 if let Some(x) = self.driver.take() {
1597 x.wake();
1598 }
1599 }
1600
1601 /// Used to wake up all blocked futures when the connection becomes closed for any reason
1602 fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1603 self.error = Some(reason.clone());
1604 if let Some(x) = self.on_handshake_data.take() {
1605 let _ = x.send(());
1606 }
1607 wake_all(&mut self.blocked_writers);
1608 wake_all(&mut self.blocked_readers);
1609 shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1610 shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1611 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1612 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1613 shared.datagram_received.notify_waiters();
1614 shared.datagrams_unblocked.notify_waiters();
1615 if let Some(x) = self.on_connected.take() {
1616 let _ = x.send(false);
1617 }
1618 shared.handshake_confirmed.notify_waiters();
1619 wake_all_notify(&mut self.stopped);
1620 shared.closed.notify_waiters();
1621
1622 // Send to the registered on_closed futures.
1623 let stats = self.inner.stats();
1624 for tx in self.on_closed.drain(..) {
1625 tx.send((reason.clone(), stats.clone())).ok();
1626 }
1627 }
1628
1629 fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1630 self.inner.close(self.runtime.now(), error_code, reason);
1631 self.terminate(ConnectionError::LocallyClosed, shared);
1632 self.wake();
1633 }
1634
1635 /// Close for a reason other than the application's explicit request
1636 pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1637 self.close(0u32.into(), Bytes::new(), shared);
1638 }
1639
1640 pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1641 if self.inner.is_handshaking()
1642 || self.inner.accepted_0rtt()
1643 || self.inner.side().is_server()
1644 {
1645 Ok(())
1646 } else {
1647 Err(())
1648 }
1649 }
1650}
1651
1652impl Drop for State {
1653 fn drop(&mut self) {
1654 if !self.inner.is_drained() {
1655 // Ensure the endpoint can tidy up
1656 let _ = self
1657 .endpoint_events
1658 .send((self.handle, proto::EndpointEvent::drained()));
1659 }
1660
1661 if !self.on_closed.is_empty() {
1662 // Ensure that all on_closed oneshot senders are triggered before dropping.
1663 let reason = self.error.as_ref().expect("closed without error reason");
1664 let stats = self.inner.stats();
1665 for tx in self.on_closed.drain(..) {
1666 tx.send((reason.clone(), stats.clone())).ok();
1667 }
1668 }
1669 }
1670}
1671
1672impl fmt::Debug for State {
1673 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1674 f.debug_struct("State").field("inner", &self.inner).finish()
1675 }
1676}
1677
1678fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1679 if let Some(waker) = wakers.remove(&stream_id) {
1680 waker.wake();
1681 }
1682}
1683
1684fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1685 wakers.drain().for_each(|(_, waker)| waker.wake())
1686}
1687
1688fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1689 if let Some(notify) = wakers.remove(&stream_id) {
1690 notify.notify_waiters()
1691 }
1692}
1693
1694fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1695 wakers
1696 .drain()
1697 .for_each(|(_, notify)| notify.notify_waiters())
1698}
1699
1700/// Errors that can arise when sending a datagram
1701#[derive(Debug, Error, Clone, Eq, PartialEq)]
1702pub enum SendDatagramError {
1703 /// The peer does not support receiving datagram frames
1704 #[error("datagrams not supported by peer")]
1705 UnsupportedByPeer,
1706 /// Datagram support is disabled locally
1707 #[error("datagram support disabled")]
1708 Disabled,
1709 /// The datagram is larger than the connection can currently accommodate
1710 ///
1711 /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1712 /// exceeded.
1713 #[error("datagram too large")]
1714 TooLarge,
1715 /// The connection was lost
1716 #[error("connection lost")]
1717 ConnectionLost(#[from] ConnectionError),
1718}
1719
1720/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1721///
1722/// This limits the amount of CPU resources consumed by datagram generation,
1723/// and allows other tasks (like receiving ACKs) to run in between.
1724const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1725
1726/// The maximum amount of datagrams that are sent in a single transmit
1727///
1728/// This can be lower than the maximum platform capabilities, to avoid excessive
1729/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1730/// that numbers around 10 are a good compromise.
1731const MAX_TRANSMIT_SEGMENTS: usize = 10;