noq/
connection.rs

1use std::{
2    any::Any,
3    fmt,
4    future::Future,
5    io,
6    net::{IpAddr, SocketAddr},
7    num::NonZeroUsize,
8    pin::Pin,
9    sync::{
10        Arc, Weak,
11        atomic::{AtomicUsize, Ordering},
12    },
13    task::{Context, Poll, Waker, ready},
14};
15
16use bytes::Bytes;
17use pin_project_lite::pin_project;
18use rustc_hash::FxHashMap;
19use thiserror::Error;
20use tokio::sync::{Notify, futures::Notified, mpsc, oneshot, watch};
21use tracing::{Instrument, Span, debug_span};
22
23use crate::{
24    ConnectionEvent, Duration, Instant, Path, VarInt,
25    endpoint::ensure_ipv6,
26    mutex::{Mutex, MutexGuard},
27    path::{OpenPath, PathRef, PathRefOwner},
28    recv_stream::RecvStream,
29    runtime::{AsyncTimer, Runtime, UdpSender},
30    send_stream::SendStream,
31    udp_transmit,
32};
33use proto::{
34    ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, FourTuple, PathError,
35    PathEvent, PathId, PathStats, PathStatus, Side, StreamEvent, StreamId, TransportError,
36    TransportErrorCode, congestion::Controller, n0_nat_traversal,
37};
38
39/// In-progress connection attempt future
40#[derive(Debug)]
41pub struct Connecting {
42    conn: Option<ConnectionRef>,
43    connected: oneshot::Receiver<bool>,
44    handshake_data_ready: Option<oneshot::Receiver<()>>,
45}
46
47impl Connecting {
48    pub(crate) fn new(
49        handle: ConnectionHandle,
50        conn: proto::Connection,
51        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
52        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
53        sender: Pin<Box<dyn UdpSender>>,
54        runtime: Arc<dyn Runtime>,
55    ) -> Self {
56        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
57        let (on_connected_send, on_connected_recv) = oneshot::channel();
58
59        let conn = ConnectionRef(Arc::new(Arc::new(ConnectionInner {
60            state: Mutex::new(State::new(
61                conn,
62                handle,
63                endpoint_events,
64                conn_events,
65                on_handshake_data_send,
66                on_connected_send,
67                sender,
68                runtime.clone(),
69            )),
70            shared: Shared::default(),
71        })));
72
73        let driver = ConnectionDriver(conn.clone());
74        runtime.spawn(Box::pin(
75            async {
76                if let Err(e) = driver.await {
77                    tracing::error!("I/O error: {e}");
78                }
79            }
80            .instrument(Span::current()),
81        ));
82
83        Self {
84            conn: Some(conn),
85            connected: on_connected_recv,
86            handshake_data_ready: Some(on_handshake_data_recv),
87        }
88    }
89
90    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
91    ///
92    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
93    /// If so, the returned [`Connection`] can be used to send application data without waiting for
94    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
95    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
96    /// complete, at which point subsequently opened streams and written data will have full
97    /// cryptographic protection.
98    ///
99    /// ## Outgoing
100    ///
101    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
102    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
103    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
104    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
105    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
106    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
107    /// If it was rejected, the existence of streams opened and other application data sent prior
108    /// to the handshake completing will not be conveyed to the remote application, and local
109    /// operations on them will return `ZeroRttRejected` errors.
110    ///
111    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
112    /// relevant resumption state to be stored in the server, which servers may limit or lose for
113    /// various reasons including not persisting resumption state across server restarts.
114    ///
115    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
116    /// implementation's docs for 0-RTT pitfalls.
117    ///
118    /// ## Incoming
119    ///
120    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
121    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
122    ///
123    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
124    /// implementation's docs for 0-RTT pitfalls.
125    ///
126    /// ## Security
127    ///
128    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
129    /// replay attacks, and should therefore never invoke non-idempotent operations.
130    ///
131    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
132    /// before TLS client authentication has occurred, and should therefore not be used to send
133    /// data for which client authentication is being used.
134    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
135        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
136        // have to release it explicitly before returning `self` by value.
137        let conn = (self.conn.as_mut().unwrap()).lock_without_waking("into_0rtt");
138
139        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
140        drop(conn);
141
142        if is_ok {
143            let conn = self.conn.take().unwrap();
144            Ok((Connection(conn), ZeroRttAccepted(self.connected)))
145        } else {
146            Err(self)
147        }
148    }
149
150    /// Parameters negotiated during the handshake
151    ///
152    /// The dynamic type returned is determined by the configured
153    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
154    /// be [`downcast`](Box::downcast) to a
155    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
156    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
157        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
158        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
159        // simple.
160        if let Some(x) = self.handshake_data_ready.take() {
161            let _ = x.await;
162        }
163        let conn = self.conn.as_ref().unwrap();
164        let inner = conn.lock_without_waking("handshake");
165        inner
166            .inner
167            .crypto_session()
168            .handshake_data()
169            .ok_or_else(|| {
170                inner
171                    .error
172                    .clone()
173                    .expect("spurious handshake data ready notification")
174            })
175    }
176
177    /// The local IP address which was used when the peer established
178    /// the connection
179    ///
180    /// This can be different from the address the endpoint is bound to, in case
181    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
182    ///
183    /// This will return `None` for clients, or when the platform does not expose this
184    /// information. See [`noq_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
185    /// supported platforms when using [`noq_udp`](udp) for I/O, which is the default.
186    ///
187    /// Will panic if called after `poll` has returned `Ready`.
188    pub fn local_ip(&self) -> Option<IpAddr> {
189        let conn = self.conn.as_ref().expect("used after yielding Ready");
190        let inner = conn.lock_without_waking("local_ip");
191
192        inner
193            .inner
194            .network_path(PathId::ZERO)
195            .expect("PathId::ZERO is the only path during the handshake")
196            .local_ip()
197    }
198
199    /// The peer's UDP addresses
200    ///
201    /// Will panic if called after `poll` has returned `Ready`.
202    pub fn remote_address(&self) -> SocketAddr {
203        let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
204        conn_ref
205            .lock_without_waking("remote_address")
206            .inner
207            .network_path(PathId::ZERO)
208            .expect("PathId::ZERO is the only path during the handshake")
209            .remote()
210    }
211}
212
213impl Future for Connecting {
214    type Output = Result<Connection, ConnectionError>;
215    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216        Pin::new(&mut self.connected).poll(cx).map(|_| {
217            let conn = self.conn.take().unwrap();
218            let inner = conn.lock_without_waking("connecting");
219            if inner.connected {
220                drop(inner);
221                Ok(Connection(conn))
222            } else {
223                Err(inner
224                    .error
225                    .clone()
226                    .expect("connected signaled without connection success or error"))
227            }
228        })
229    }
230}
231
232/// Future that completes when a connection is fully established
233///
234/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
235/// value is meaningless.
236pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
237
238impl Future for ZeroRttAccepted {
239    type Output = bool;
240    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
241        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
242    }
243}
244
245/// A future that drives protocol logic for a connection
246///
247/// This future handles the protocol logic for a single connection, routing events from the
248/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
249/// It also keeps track of outstanding timeouts for the `Connection`.
250///
251/// If the connection encounters an error condition, this future will yield an error. It will
252/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
253/// connection-related futures, this waits for the draining period to complete to ensure that
254/// packets still in flight from the peer are handled gracefully.
255#[must_use = "connection drivers must be spawned for their connections to function"]
256#[derive(Debug)]
257struct ConnectionDriver(ConnectionRef);
258
259impl Future for ConnectionDriver {
260    type Output = Result<(), io::Error>;
261
262    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
263        let conn = &mut *self.0.lock_without_waking("poll");
264
265        let span = debug_span!("drive", id = conn.handle.0);
266        let _guard = span.enter();
267
268        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
269            conn.terminate(e, &self.0.shared);
270            return Poll::Ready(Ok(()));
271        }
272        let mut keep_going = conn.drive_transmit(cx)?;
273        // If a timer expires, there might be more to transmit. When we transmit something, we
274        // might need to reset a timer. Hence, we must loop until neither happens.
275        keep_going |= conn.drive_timer(cx);
276        conn.forward_endpoint_events();
277        conn.forward_app_events(&self.0.shared);
278
279        if !conn.inner.is_drained() {
280            if keep_going {
281                // If the connection hasn't processed all tasks, schedule it again
282                cx.waker().wake_by_ref();
283            } else {
284                conn.driver = Some(cx.waker().clone());
285            }
286            return Poll::Pending;
287        }
288        if conn.error.is_none() {
289            unreachable!("drained connections always have an error");
290        }
291        Poll::Ready(Ok(()))
292    }
293}
294
295/// A QUIC connection.
296///
297/// If all references to a connection (including every clone of the `Connection` handle, streams of
298/// incoming streams, and the various stream types) have been dropped, then the connection will be
299/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
300/// connection explicitly by calling [`Connection::close()`].
301///
302/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
303/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
304/// application. [`Connection::close()`] describes in more detail how to gracefully close a
305/// connection without losing application data.
306///
307/// May be cloned to obtain another handle to the same connection.
308///
309/// [`Connection::close()`]: Connection::close
310#[derive(Debug, Clone)]
311pub struct Connection(ConnectionRef);
312
313impl Connection {
314    /// Returns a weak reference to the inner connection struct.
315    pub fn weak_handle(&self) -> WeakConnectionHandle {
316        self.0.weak_handle()
317    }
318
319    /// Initiate a new outgoing unidirectional stream.
320    ///
321    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
322    /// consequence, the peer won't be notified that a stream has been opened until the stream is
323    /// actually used.
324    pub fn open_uni(&self) -> OpenUni<'_> {
325        OpenUni {
326            conn: &self.0,
327            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
328        }
329    }
330
331    /// Initiate a new outgoing bidirectional stream.
332    ///
333    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
334    /// consequence, the peer won't be notified that a stream has been opened until the stream is
335    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
336    /// anything to [`SendStream`] will never succeed.
337    ///
338    /// [`open_bi()`]: crate::Connection::open_bi
339    /// [`SendStream`]: crate::SendStream
340    /// [`RecvStream`]: crate::RecvStream
341    pub fn open_bi(&self) -> OpenBi<'_> {
342        OpenBi {
343            conn: &self.0,
344            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
345        }
346    }
347
348    /// Accept the next incoming uni-directional stream
349    pub fn accept_uni(&self) -> AcceptUni<'_> {
350        AcceptUni {
351            conn: &self.0,
352            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
353        }
354    }
355
356    /// Accept the next incoming bidirectional stream
357    ///
358    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
359    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
360    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
361    ///
362    /// [`accept_bi()`]: crate::Connection::accept_bi
363    /// [`open_bi()`]: crate::Connection::open_bi
364    /// [`SendStream`]: crate::SendStream
365    /// [`RecvStream`]: crate::RecvStream
366    pub fn accept_bi(&self) -> AcceptBi<'_> {
367        AcceptBi {
368            conn: &self.0,
369            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
370        }
371    }
372
373    /// Receive an application datagram
374    pub fn read_datagram(&self) -> ReadDatagram<'_> {
375        ReadDatagram {
376            conn: &self.0,
377            notify: self.0.shared.datagram_received.notified(),
378        }
379    }
380
381    /// Opens a new path if no path exists yet for `network_path`.
382    ///
383    /// If `network_path` has no local IP set, then this will open a new path
384    /// if no path exists for this remote address, independent of the existing
385    /// path's local IP. If a local IP is set, it will match against the full
386    /// four-tuple of existing paths.
387    ///
388    /// Otherwise behaves exactly as [`open_path`].
389    ///
390    /// [`open_path`]: Self::open_path
391    pub fn open_path_ensure(
392        &self,
393        network_path: impl Into<FourTuple>,
394        initial_status: PathStatus,
395    ) -> OpenPath {
396        let network_path = network_path.into();
397        let mut state = self.0.lock_and_wake("open_path");
398
399        let network_path = match normalize_network_path(network_path, &state.inner) {
400            Ok(network_path) => network_path,
401            Err(err) => return OpenPath::rejected(err),
402        };
403
404        let now = state.runtime.now();
405        let open_res = state
406            .inner
407            .open_path_ensure(network_path, initial_status, now);
408        match open_res {
409            Ok((path_id, existed)) if existed => {
410                let recv = state.open_path.get(&path_id).map(|tx| tx.subscribe());
411                drop(state);
412                match recv {
413                    Some(recv) => OpenPath::new(path_id, recv, self.0.clone()),
414                    None => OpenPath::ready(path_id, self.0.clone()),
415                }
416            }
417            Ok((path_id, _)) => {
418                let (tx, rx) = watch::channel(Ok(()));
419                state.open_path.insert(path_id, tx);
420                drop(state);
421                OpenPath::new(path_id, rx, self.0.clone())
422            }
423            Err(err) => OpenPath::rejected(err),
424        }
425    }
426
427    /// Opens an additional path if the multipath extension is negotiated.
428    ///
429    /// This function takes a [`FourTuple`], which contains the remote address and an optional
430    /// local IP. If the local IP is set, the path will be opened with this source address,
431    /// and the endpoint must support sending from that IP address. You can also pass a
432    /// [`SocketAddr`] to only set the remote address and leave the local IP interface unspecified.
433    ///
434    /// The returned future completes once the path is either fully opened and ready to
435    /// carry application data, or if there was an error.
436    ///
437    /// Dropping the returned future does not cancel the opening of the path, the
438    /// [`PathEvent::Established`] event will still be emitted from [`Self::path_events`] if
439    /// the path opens.  The [`PathId`] for the events can be extracted from
440    /// [`OpenPath::path_id`].
441    ///
442    /// Failure to open a path can either occur immediately, before polling the returned
443    /// future, or at a later time.  If the failure is immediate [`OpenPath::path_id`] will
444    /// return `None` and the future will be ready immediately.  If the failure happens
445    /// later, a [`PathEvent`] will be emitted.
446    pub fn open_path(
447        &self,
448        network_path: impl Into<FourTuple>,
449        initial_status: PathStatus,
450    ) -> OpenPath {
451        let network_path = network_path.into();
452        let mut state = self.0.lock_and_wake("open_path");
453
454        let network_path = match normalize_network_path(network_path, &state.inner) {
455            Ok(network_path) => network_path,
456            Err(err) => return OpenPath::rejected(err),
457        };
458
459        let (on_open_path_send, on_open_path_recv) = watch::channel(Ok(()));
460        let now = state.runtime.now();
461        let open_res = state.inner.open_path(network_path, initial_status, now);
462        match open_res {
463            Ok(path_id) => {
464                state.open_path.insert(path_id, on_open_path_send);
465                drop(state);
466                OpenPath::new(path_id, on_open_path_recv, self.0.clone())
467            }
468            Err(err) => OpenPath::rejected(err),
469        }
470    }
471
472    /// Returns the [`Path`] structure of an open path
473    pub fn path(&self, id: PathId) -> Option<Path> {
474        Path::new(&self.0, id)
475    }
476
477    /// A stream of [`PathEvent`]s for all paths in this connection.
478    ///
479    /// The stream will yield a [`PathEvent`] whenever there is a change in the state of any path in this connection.
480    /// The events need to be processed immediately, since there isn't an unbounded buffer for them.
481    ///
482    /// If processing of events lags behind too much, you will get an error of type [`crate::Lagged`] indicating
483    /// how many events were lost. The stream continues after a lag, delivering the oldest retained message next.
484    pub fn path_events(&self) -> crate::PathEvents {
485        crate::PathEvents::new(
486            self.0
487                .lock_without_waking("path_events")
488                .path_events
489                .subscribe(),
490        )
491    }
492
493    /// A stream of NAT traversal updates for this connection.
494    ///
495    /// The events need to be processed immediately, since there isn't an unbounded buffer for them.
496    ///
497    /// If processing of events lags behind too much, you will get an error of type [`crate::Lagged`] indicating
498    /// how many events were lost. The stream continues after a lag, delivering the oldest retained message next.
499    pub fn nat_traversal_updates(&self) -> crate::NatTraversalUpdates {
500        crate::NatTraversalUpdates::new(
501            self.0
502                .lock_without_waking("nat_traversal_updates")
503                .nat_traversal_updates
504                .subscribe(),
505        )
506    }
507
508    /// Wait for the connection to be closed for any reason
509    ///
510    /// Despite the return type's name, closed connections are often not an error condition at the
511    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
512    /// and [`ConnectionError::ApplicationClosed`].
513    pub async fn closed(&self) -> ConnectionError {
514        {
515            let conn = self.0.lock_without_waking("closed");
516            if let Some(error) = conn.error.as_ref() {
517                return error.clone();
518            }
519            // Construct the future while the lock is held to ensure we can't miss a wakeup if
520            // the `Notify` is signaled immediately after we release the lock. `await` it after
521            // the lock guard is out of scope.
522            self.0.shared.closed.notified()
523        }
524        .await;
525        self.0
526            .lock_without_waking("closed")
527            .error
528            .as_ref()
529            .expect("closed without an error")
530            .clone()
531    }
532
533    /// Wait for the connection to be closed without keeping a strong reference to the connection
534    ///
535    /// Returns a future that resolves, once the connection is closed, to a [`Closed`] struct
536    /// describing the close reason and final connection and per-path statistics.
537    ///
538    /// Calling [`Self::closed`] keeps the connection alive until it is either closed locally via [`Connection::close`]
539    /// or closed by the remote peer. This function instead does not keep the connection itself alive,
540    /// so if all *other* clones of the connection are dropped, the connection will be closed implicitly even
541    /// if there are futures returned from this function still being awaited.
542    pub fn on_closed(&self) -> OnClosed {
543        let (tx, rx) = oneshot::channel();
544        let mut state = self.0.lock_without_waking("on_closed");
545        if let Some(reason) = state.error.clone() {
546            // Connection already closed, send immediately
547            let _ = tx.send(Closed::new(&mut state, reason));
548        } else {
549            state.on_closed.push(tx);
550        }
551        drop(state);
552        OnClosed {
553            conn: self.weak_handle(),
554            rx,
555        }
556    }
557
558    /// Whether the connection is closed, and why.
559    ///
560    /// The close_reason is always set to `Some(ConnectionError)` when a socket is
561    /// closed; whether it was closed manually by calling [`Connection::close()`] or due to
562    /// an internal error (such as an idle timeout or the peer closing the
563    /// connection).
564    ///
565    /// Note: when the connection is closed, `connection.close_reason().is_some()` will always be true.
566    pub fn close_reason(&self) -> Option<ConnectionError> {
567        self.0.lock_without_waking("close_reason").error.clone()
568    }
569
570    /// Close the connection immediately.
571    ///
572    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
573    /// more data is sent to the peer and the peer may drop buffered data upon receiving
574    /// the CONNECTION_CLOSE frame.
575    ///
576    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
577    ///
578    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
579    /// is preserved in full, it should be kept under 1KiB.
580    ///
581    /// # Gracefully closing a connection
582    ///
583    /// Only the peer last receiving application data can be certain that all data is
584    /// delivered. The only reliable action it can then take is to close the connection,
585    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
586    /// frame is very likely if both endpoints stay online long enough, and
587    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
588    /// remote peer will time out the connection, provided that the idle timeout is not
589    /// disabled.
590    ///
591    /// The sending side can not guarantee all stream data is delivered to the remote
592    /// application. It only knows the data is delivered to the QUIC stack of the remote
593    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
594    /// [`close()`] the remote endpoint may drop any data it received but is as yet
595    /// undelivered to the application, including data that was acknowledged as received to
596    /// the local endpoint.
597    ///
598    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
599    /// [`Endpoint::wait_idle()`]: crate::Endpoint::wait_idle
600    /// [`close()`]: Connection::close
601    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
602        let conn = &mut *self.0.lock_without_waking("close"); // conn.close self-wakes
603        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
604    }
605
606    /// Wait for the handshake to be confirmed.
607    ///
608    /// As a server, who must be authenticated by clients,
609    /// this happens when the handshake completes
610    /// upon receiving a TLS Finished message from the client.
611    /// In return, the server send a HANDSHAKE_DONE frame.
612    ///
613    /// As a client, this happens when receiving a HANDSHAKE_DONE frame.
614    /// At this point, the server has either accepted our authentication,
615    /// or, if client authentication is not required, accepted our lack of authentication.
616    pub async fn handshake_confirmed(&self) -> Result<(), ConnectionError> {
617        {
618            let conn = self.0.lock_without_waking("handshake_confirmed");
619            if let Some(error) = conn.error.as_ref() {
620                return Err(error.clone());
621            }
622            if conn.handshake_confirmed {
623                return Ok(());
624            }
625            // Construct the future while the lock is held to ensure we can't miss a wakeup if
626            // the `Notify` is signaled immediately after we release the lock. `await` it after
627            // the lock guard is out of scope.
628            self.0.shared.handshake_confirmed.notified()
629        }
630        .await;
631        if let Some(error) = self
632            .0
633            .lock_without_waking("handshake_confirmed")
634            .error
635            .as_ref()
636        {
637            Err(error.clone())
638        } else {
639            Ok(())
640        }
641    }
642
643    /// Transmit `data` as an unreliable, unordered application datagram
644    ///
645    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
646    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
647    /// dictated by the peer.
648    ///
649    /// Previously queued datagrams which are still unsent may be discarded to make space for this
650    /// datagram, in order of oldest to newest.
651    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
652        let conn = &mut *self.0.lock_and_wake("send_datagram");
653        if let Some(ref x) = conn.error {
654            return Err(SendDatagramError::ConnectionLost(x.clone()));
655        }
656        use proto::SendDatagramError::*;
657        match conn.inner.datagrams().send(data, true) {
658            Ok(()) => Ok(()),
659            Err(e) => Err(match e {
660                Blocked(..) => unreachable!(),
661                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
662                Disabled => SendDatagramError::Disabled,
663                TooLarge => SendDatagramError::TooLarge,
664            }),
665        }
666    }
667
668    /// Transmit `data` as an unreliable, unordered application datagram
669    ///
670    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
671    /// conditions, which effectively prioritizes old datagrams over new datagrams.
672    ///
673    /// See [`send_datagram()`] for details.
674    ///
675    /// [`send_datagram()`]: Connection::send_datagram
676    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
677        SendDatagram {
678            conn: &self.0,
679            data: Some(data),
680            notify: self.0.shared.datagrams_unblocked.notified(),
681        }
682    }
683
684    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
685    ///
686    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
687    ///
688    /// This may change over the lifetime of a connection according to variation in the path MTU
689    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
690    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
691    ///
692    /// Not necessarily the maximum size of received datagrams.
693    ///
694    /// [`send_datagram()`]: Connection::send_datagram
695    pub fn max_datagram_size(&self) -> Option<usize> {
696        self.0
697            .lock_without_waking("max_datagram_size")
698            .inner
699            .datagrams()
700            .max_size()
701    }
702
703    /// Bytes available in the outgoing datagram buffer
704    ///
705    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
706    /// at most this size is guaranteed not to cause older datagrams to be dropped.
707    pub fn datagram_send_buffer_space(&self) -> usize {
708        self.0
709            .lock_without_waking("datagram_send_buffer_space")
710            .inner
711            .datagrams()
712            .send_buffer_space()
713    }
714
715    /// The side of the connection (client or server)
716    pub fn side(&self) -> Side {
717        self.0.lock_without_waking("side").inner.side()
718    }
719
720    /// Current best estimate of this connection's latency (round-trip-time)
721    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
722        self.0.lock_without_waking("rtt").inner.rtt(path_id)
723    }
724
725    /// Returns connection statistics
726    pub fn stats(&self) -> ConnectionStats {
727        self.0.lock_without_waking("stats").inner.stats()
728    }
729
730    /// Returns path statistics
731    pub fn path_stats(&self, path_id: PathId) -> Option<PathStats> {
732        self.0.lock_without_waking("path_stats").path_stats(path_id)
733    }
734
735    /// Current state of the congestion control algorithm, for debugging purposes
736    pub fn congestion_state(&self, path_id: PathId) -> Option<Box<dyn Controller>> {
737        self.0
738            .lock_without_waking("congestion_state")
739            .inner
740            .congestion_state(path_id)
741            .map(|c| c.clone_box())
742    }
743
744    /// Parameters negotiated during the handshake
745    ///
746    /// Guaranteed to return `Some` on fully established connections or after
747    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
748    /// the returned value.
749    ///
750    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
751    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
752        self.0
753            .lock_without_waking("handshake_data")
754            .inner
755            .crypto_session()
756            .handshake_data()
757    }
758
759    /// Cryptographic identity of the peer
760    ///
761    /// The dynamic type returned is determined by the configured
762    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
763    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
764    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
765        self.0
766            .lock_without_waking("peer_identity")
767            .inner
768            .crypto_session()
769            .peer_identity()
770    }
771
772    /// A stable identifier for this connection
773    ///
774    /// Peer addresses and connection IDs can change, but this value will remain
775    /// fixed for the lifetime of the connection.
776    pub fn stable_id(&self) -> usize {
777        self.0.stable_id()
778    }
779
780    /// Update traffic keys spontaneously
781    ///
782    /// This primarily exists for testing purposes.
783    pub fn force_key_update(&self) {
784        self.0
785            .lock_and_wake("force_key_update")
786            .inner
787            .force_key_update()
788    }
789
790    /// Derive keying material from this connection's TLS session secrets.
791    ///
792    /// When both peers call this method with the same `label` and `context`
793    /// arguments and `output` buffers of equal length, they will get the
794    /// same sequence of bytes in `output`. These bytes are cryptographically
795    /// strong and pseudorandom, and are suitable for use as keying material.
796    ///
797    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
798    pub fn export_keying_material(
799        &self,
800        output: &mut [u8],
801        label: &[u8],
802        context: &[u8],
803    ) -> Result<(), proto::crypto::ExportKeyingMaterialError> {
804        self.0
805            .lock_without_waking("export_keying_material")
806            .inner
807            .crypto_session()
808            .export_keying_material(output, label, context)
809    }
810
811    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
812    ///
813    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
814    /// `count`s increase both minimum and worst-case memory consumption.
815    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
816        let mut conn = self.0.lock_and_wake("set_max_concurrent_uni_streams");
817        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
818    }
819
820    /// See [`proto::TransportConfig::send_window()`]
821    pub fn set_send_window(&self, send_window: u64) {
822        let mut conn = self.0.lock_and_wake("set_send_window");
823        conn.inner.set_send_window(send_window);
824    }
825
826    /// See [`proto::TransportConfig::receive_window()`]
827    pub fn set_receive_window(&self, receive_window: VarInt) {
828        let mut conn = self.0.lock_and_wake("set_receive_window");
829        conn.inner.set_receive_window(receive_window);
830    }
831
832    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
833    ///
834    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
835    /// `count`s increase both minimum and worst-case memory consumption.
836    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
837        let mut conn = self.0.lock_and_wake("set_max_concurrent_bi_streams");
838        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
839    }
840
841    /// Track changes on our external address as reported by the peer.
842    pub fn observed_external_addr(&self) -> crate::ObservedExternalAddr {
843        let conn = self.0.lock_without_waking("external_addr");
844        crate::ObservedExternalAddr::new(conn.observed_external_addr.subscribe())
845    }
846
847    /// Is multipath enabled?
848    // TODO(flub): not a useful API, once we do real things with multipath we can remove
849    // this again.
850    pub fn is_multipath_enabled(&self) -> bool {
851        let conn = self.0.lock_without_waking("is_multipath_enabled");
852        conn.inner.is_multipath_negotiated()
853    }
854
855    /// Registers one address at which this endpoint might be reachable
856    ///
857    /// When the NAT traversal extension is negotiated, servers send these addresses to clients in
858    /// `ADD_ADDRESS` frames. This allows clients to obtain server address candidates to initiate
859    /// NAT traversal attempts. Clients provide their own reachable addresses in `REACH_OUT` frames
860    /// when [`Self::initiate_nat_traversal_round`] is called.
861    pub fn add_nat_traversal_address(
862        &self,
863        address: SocketAddr,
864    ) -> Result<(), n0_nat_traversal::Error> {
865        let mut conn = self.0.lock_and_wake("add_nat_traversal_addresses");
866        conn.inner.add_nat_traversal_address(address)
867    }
868
869    /// Removes one or more addresses from the set of addresses at which this endpoint is reachable
870    ///
871    /// When the NAT traversal extension is negotiated, servers send address removals to
872    /// clients in `REMOVE_ADDRESS` frames. This allows clients to stop using outdated
873    /// server address candidates that are no longer valid for NAT traversal.
874    ///
875    /// For clients, removed addresses will no longer be advertised in `REACH_OUT` frames.
876    ///
877    /// Addresses not present in the set will be silently ignored.
878    pub fn remove_nat_traversal_address(
879        &self,
880        address: SocketAddr,
881    ) -> Result<(), n0_nat_traversal::Error> {
882        let mut conn = self.0.lock_and_wake("remove_nat_traversal_addresses");
883        conn.inner.remove_nat_traversal_address(address)
884    }
885
886    /// Get the current local nat traversal addresses
887    pub fn get_local_nat_traversal_addresses(
888        &self,
889    ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
890        let conn = self
891            .0
892            .lock_without_waking("get_local_nat_traversal_addresses");
893        conn.inner.get_local_nat_traversal_addresses()
894    }
895
896    /// Get the currently advertised nat traversal addresses by the server
897    pub fn get_remote_nat_traversal_addresses(
898        &self,
899    ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
900        let conn = self
901            .0
902            .lock_without_waking("get_remote_nat_traversal_addresses");
903        conn.inner.get_remote_nat_traversal_addresses()
904    }
905
906    /// Initiates a new nat traversal round
907    ///
908    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
909    /// frames, and initiating probing of the known remote addresses. When a new round is
910    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
911    ///
912    /// Returns the server addresses that are now being probed.
913    pub fn initiate_nat_traversal_round(&self) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
914        let mut conn = self.0.lock_and_wake("initiate_nat_traversal_round");
915        let now = conn.runtime.now();
916        conn.inner.initiate_nat_traversal_round(now)
917    }
918}
919
920/// Normalizes a [`FourTuple`] against the connection's address family.
921///
922/// If the connection already uses IPv6 paths, the remote is canonicalised via
923/// [`ensure_ipv6`]. If it uses IPv4 and the requested remote is IPv6, this returns
924/// [`PathError::InvalidRemoteAddress`].
925fn normalize_network_path(
926    network_path: FourTuple,
927    conn: &proto::Connection,
928) -> Result<FourTuple, PathError> {
929    // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
930    // If not, we do not support IPv6.  We can not access endpoint::State from here
931    // however, but either all our paths use an IPv6 address, or all our paths use an
932    // IPv4 address.  So we can use that information.
933    let ipv6 = conn
934        .paths()
935        .iter()
936        .filter_map(|id| {
937            conn.network_path(*id)
938                .map(|addrs| addrs.remote().is_ipv6())
939                .ok()
940        })
941        .next()
942        .unwrap_or_default();
943    let remote = network_path.remote();
944    if remote.is_ipv6() && !ipv6 {
945        Err(PathError::InvalidRemoteAddress(remote))
946    } else if ipv6 {
947        let remote = SocketAddr::V6(ensure_ipv6(remote));
948        Ok(FourTuple::new(remote, network_path.local_ip()))
949    } else {
950        Ok(network_path)
951    }
952}
953
954pin_project! {
955    /// Future produced by [`Connection::open_uni`]
956    pub struct OpenUni<'a> {
957        conn: &'a ConnectionRef,
958        #[pin]
959        notify: Notified<'a>,
960    }
961}
962
963impl Future for OpenUni<'_> {
964    type Output = Result<SendStream, ConnectionError>;
965    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
966        let this = self.project();
967        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
968        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
969    }
970}
971
972pin_project! {
973    /// Future produced by [`Connection::open_bi`]
974    pub struct OpenBi<'a> {
975        conn: &'a ConnectionRef,
976        #[pin]
977        notify: Notified<'a>,
978    }
979}
980
981impl Future for OpenBi<'_> {
982    type Output = Result<(SendStream, RecvStream), ConnectionError>;
983    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
984        let this = self.project();
985        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
986
987        Poll::Ready(Ok((
988            SendStream::new(conn.clone(), id, is_0rtt),
989            RecvStream::new(conn, id, is_0rtt),
990        )))
991    }
992}
993
994fn poll_open<'a>(
995    ctx: &mut Context<'_>,
996    conn: &'a ConnectionRef,
997    mut notify: Pin<&mut Notified<'a>>,
998    dir: Dir,
999) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1000    let mut state = conn.lock_without_waking("poll_open");
1001    if let Some(ref e) = state.error {
1002        return Poll::Ready(Err(e.clone()));
1003    } else if let Some(id) = state.inner.streams().open(dir) {
1004        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
1005        drop(state); // Release the lock so clone can take it
1006        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1007    }
1008    loop {
1009        match notify.as_mut().poll(ctx) {
1010            // `state` lock ensures we didn't race with readiness
1011            Poll::Pending => return Poll::Pending,
1012            // Spurious wakeup, get a new future
1013            Poll::Ready(()) => {
1014                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
1015            }
1016        }
1017    }
1018}
1019
1020pin_project! {
1021    /// Future produced by [`Connection::accept_uni`]
1022    pub struct AcceptUni<'a> {
1023        conn: &'a ConnectionRef,
1024        #[pin]
1025        notify: Notified<'a>,
1026    }
1027}
1028
1029impl Future for AcceptUni<'_> {
1030    type Output = Result<RecvStream, ConnectionError>;
1031
1032    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1033        let this = self.project();
1034        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
1035        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
1036    }
1037}
1038
1039pin_project! {
1040    /// Future produced by [`Connection::accept_bi`]
1041    pub struct AcceptBi<'a> {
1042        conn: &'a ConnectionRef,
1043        #[pin]
1044        notify: Notified<'a>,
1045    }
1046}
1047
1048impl Future for AcceptBi<'_> {
1049    type Output = Result<(SendStream, RecvStream), ConnectionError>;
1050
1051    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1052        let this = self.project();
1053        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
1054        Poll::Ready(Ok((
1055            SendStream::new(conn.clone(), id, is_0rtt),
1056            RecvStream::new(conn, id, is_0rtt),
1057        )))
1058    }
1059}
1060
1061fn poll_accept<'a>(
1062    ctx: &mut Context<'_>,
1063    conn: &'a ConnectionRef,
1064    mut notify: Pin<&mut Notified<'a>>,
1065    dir: Dir,
1066) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1067    let mut state = conn.lock_and_wake("poll_accept");
1068    // Check for incoming streams before checking `state.error` so that already-received streams,
1069    // which are necessarily finite, can be drained from a closed connection.
1070    if let Some(id) = state.inner.streams().accept(dir) {
1071        let is_0rtt = state.inner.is_handshaking();
1072        drop(state); // Release the lock (wake on drop) so clone can take it
1073        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1074    } else if let Some(ref e) = state.error {
1075        return Poll::Ready(Err(e.clone()));
1076    }
1077    loop {
1078        match notify.as_mut().poll(ctx) {
1079            // `state` lock ensures we didn't race with readiness
1080            Poll::Pending => return Poll::Pending,
1081            // Spurious wakeup, get a new future
1082            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
1083        }
1084    }
1085}
1086
1087pin_project! {
1088    /// Future produced by [`Connection::read_datagram`]
1089    pub struct ReadDatagram<'a> {
1090        conn: &'a ConnectionRef,
1091        #[pin]
1092        notify: Notified<'a>,
1093    }
1094}
1095
1096impl Future for ReadDatagram<'_> {
1097    type Output = Result<Bytes, ConnectionError>;
1098    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1099        let mut this = self.project();
1100        let mut state = this.conn.lock_without_waking("ReadDatagram::poll");
1101        // Check for buffered datagrams before checking `state.error` so that already-received
1102        // datagrams, which are necessarily finite, can be drained from a closed connection.
1103        if let Some(x) = state.inner.datagrams().recv() {
1104            return Poll::Ready(Ok(x));
1105        } else if let Some(ref e) = state.error {
1106            return Poll::Ready(Err(e.clone()));
1107        }
1108        loop {
1109            match this.notify.as_mut().poll(ctx) {
1110                // `state` lock ensures we didn't race with readiness
1111                Poll::Pending => return Poll::Pending,
1112                // Spurious wakeup, get a new future
1113                Poll::Ready(()) => this
1114                    .notify
1115                    .set(this.conn.shared.datagram_received.notified()),
1116            }
1117        }
1118    }
1119}
1120
1121pin_project! {
1122    /// Future produced by [`Connection::send_datagram_wait`]
1123    pub struct SendDatagram<'a> {
1124        conn: &'a ConnectionRef,
1125        data: Option<Bytes>,
1126        #[pin]
1127        notify: Notified<'a>,
1128    }
1129}
1130
1131impl Future for SendDatagram<'_> {
1132    type Output = Result<(), SendDatagramError>;
1133    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1134        let mut this = self.project();
1135        let mut state = this.conn.lock_and_wake("SendDatagram::poll");
1136        if let Some(ref e) = state.error {
1137            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
1138        }
1139        use proto::SendDatagramError::*;
1140        match state
1141            .inner
1142            .datagrams()
1143            .send(this.data.take().unwrap(), false)
1144        {
1145            Ok(()) => Poll::Ready(Ok(())),
1146            Err(e) => Poll::Ready(Err(match e {
1147                Blocked(data) => {
1148                    this.data.replace(data);
1149                    loop {
1150                        match this.notify.as_mut().poll(ctx) {
1151                            Poll::Pending => return Poll::Pending,
1152                            // Spurious wakeup, get a new future
1153                            Poll::Ready(()) => this
1154                                .notify
1155                                .set(this.conn.shared.datagrams_unblocked.notified()),
1156                        }
1157                    }
1158                }
1159                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1160                Disabled => SendDatagramError::Disabled,
1161                TooLarge => SendDatagramError::TooLarge,
1162            })),
1163        }
1164    }
1165}
1166
1167/// State of a [`Connection`] at the moment it was closed.
1168///
1169/// Returned by the [`OnClosed`] future from [`Connection::on_closed`].
1170#[derive(Debug, Clone)]
1171#[non_exhaustive]
1172pub struct Closed {
1173    /// The reason the connection was closed.
1174    pub reason: ConnectionError,
1175    /// Aggregate connection statistics at the moment of close.
1176    pub stats: ConnectionStats,
1177    /// Per-path statistics for every path the connection knew about at close time.
1178    ///
1179    /// This includes paths that haven't been discarded at close time, plus any
1180    /// already-discarded paths whose final stats had been retained because a [`Path`]
1181    /// or [`WeakPathHandle`] handle was kept alive.
1182    ///
1183    /// [`WeakPathHandle`]: crate::WeakPathHandle
1184    pub path_stats: Vec<(PathId, PathStats)>,
1185}
1186
1187impl Closed {
1188    /// Snapshot the current connection state into a [`Closed`] value.
1189    ///
1190    /// Must only be called once `state.error` has been set.
1191    pub(crate) fn new(state: &mut State, reason: ConnectionError) -> Self {
1192        let stats = state.inner.stats();
1193
1194        let non_discarded_paths = state.inner.paths();
1195        let mut path_stats =
1196            Vec::with_capacity(non_discarded_paths.len() + state.final_path_stats.len());
1197
1198        // Non-discarded paths are tracked by proto::Connection.
1199        path_stats.extend(
1200            non_discarded_paths
1201                .into_iter()
1202                .filter_map(|id| state.inner.path_stats(id).map(|stats| (id, stats))),
1203        );
1204        // Already-discarded paths whose final stats we kept around.
1205        path_stats.extend(
1206            state
1207                .final_path_stats
1208                .iter()
1209                .map(|(id, stats)| (*id, *stats)),
1210        );
1211        Self {
1212            reason,
1213            stats,
1214            path_stats,
1215        }
1216    }
1217}
1218
1219/// Future returned by [`Connection::on_closed`]
1220///
1221/// Resolves to [`Closed`].
1222pub struct OnClosed {
1223    rx: oneshot::Receiver<Closed>,
1224    conn: WeakConnectionHandle,
1225}
1226
1227impl Drop for OnClosed {
1228    fn drop(&mut self) {
1229        if self.rx.is_terminated() {
1230            return;
1231        };
1232        if let Some(conn) = self.conn.upgrade() {
1233            self.rx.close();
1234            conn.0
1235                .lock_without_waking("OnClosed::drop")
1236                .on_closed
1237                .retain(|tx| !tx.is_closed());
1238        }
1239    }
1240}
1241
1242impl Future for OnClosed {
1243    type Output = Closed;
1244
1245    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1246        let this = self.get_mut();
1247        // The `expect` is safe because `State::drop` ensures that all senders are triggered
1248        // before being dropped.
1249        Pin::new(&mut this.rx)
1250            .poll(cx)
1251            .map(|x| x.expect("on_close sender is never dropped before sending"))
1252    }
1253}
1254
1255#[derive(Debug)]
1256#[allow(clippy::redundant_allocation)]
1257pub(crate) struct ConnectionRef(Arc<Arc<ConnectionInner>>);
1258
1259impl ConnectionRef {
1260    #[allow(clippy::redundant_allocation)]
1261    fn from_arc(inner: Arc<Arc<ConnectionInner>>) -> Self {
1262        inner.shared.ref_count.fetch_add(1, Ordering::Relaxed);
1263        Self(inner)
1264    }
1265
1266    pub(crate) fn stable_id(&self) -> usize {
1267        &*self.0 as *const _ as usize
1268    }
1269
1270    pub(crate) fn weak_handle(&self) -> WeakConnectionHandle {
1271        WeakConnectionHandle(Arc::downgrade(&self.0))
1272    }
1273}
1274
1275impl Clone for ConnectionRef {
1276    fn clone(&self) -> Self {
1277        Self::from_arc(Arc::clone(&self.0))
1278    }
1279}
1280
1281impl Drop for ConnectionRef {
1282    fn drop(&mut self) {
1283        if self.shared.ref_count.fetch_sub(1, Ordering::Relaxed) > 1 {
1284            return;
1285        }
1286
1287        let conn = &mut *self.lock_without_waking("drop");
1288
1289        if !conn.inner.is_closed() {
1290            // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1291            // not, we can't do any harm. If there were any streams being opened, then either
1292            // the connection will be closed for an unrelated reason or a fresh reference will
1293            // be constructed for the newly opened stream.
1294            conn.implicit_close(&self.shared);
1295        }
1296    }
1297}
1298
1299impl std::ops::Deref for ConnectionRef {
1300    type Target = ConnectionInner;
1301    fn deref(&self) -> &Self::Target {
1302        &self.0
1303    }
1304}
1305
1306#[derive(Debug)]
1307pub(crate) struct ConnectionInner {
1308    /// Kept private intentionally, use [`Self::lock_and_wake`].
1309    state: Mutex<State>,
1310    pub(crate) shared: Shared,
1311}
1312
1313impl ConnectionInner {
1314    /// Lock the state and return a guard that wakes the connection driver on drop.
1315    ///
1316    /// Use this for operations that may queue frames. The wake ensures the driver sends queued frames.
1317    /// If that's not needed, use [`Self::lock_without_waking`].
1318    pub(crate) fn lock_and_wake(&self, purpose: &'static str) -> WakeGuard<'_> {
1319        WakeGuard {
1320            guard: self.state.lock(purpose),
1321            wake: true,
1322        }
1323    }
1324
1325    /// Lock the state and return a guard that unlocks once dropped.
1326    ///
1327    /// Use this for operations that don't require any action from the connection driver.
1328    /// Otherwise, use [`Self::lock_and_wake`] instead.
1329    pub(crate) fn lock_without_waking(&self, purpose: &'static str) -> WakeGuard<'_> {
1330        WakeGuard {
1331            guard: self.state.lock(purpose),
1332            wake: false,
1333        }
1334    }
1335}
1336
1337/// [`MutexGuard`] wrapper that calls [`State::wake`] on drop.
1338#[derive(derive_more::Deref, derive_more::DerefMut)]
1339pub(crate) struct WakeGuard<'a> {
1340    #[deref]
1341    #[deref_mut]
1342    guard: MutexGuard<'a, State>,
1343    wake: bool,
1344}
1345
1346impl WakeGuard<'_> {
1347    pub(crate) fn skip_waking(&mut self) {
1348        self.wake = false;
1349    }
1350}
1351
1352impl Drop for WakeGuard<'_> {
1353    fn drop(&mut self) {
1354        if self.wake {
1355            self.guard.wake();
1356        }
1357    }
1358}
1359
1360/// A handle to some connection internals, use with care.
1361///
1362/// This contains a weak reference to the connection so will not itself keep the connection
1363/// alive.
1364#[derive(Debug, Clone)]
1365pub struct WeakConnectionHandle(Weak<Arc<ConnectionInner>>);
1366
1367impl WeakConnectionHandle {
1368    /// Returns `true` if the [`Connection`] associated with this handle is still alive.
1369    pub fn is_alive(&self) -> bool {
1370        self.0.upgrade().is_some()
1371    }
1372
1373    /// Upgrade the handle to a full `Connection`
1374    pub fn upgrade(&self) -> Option<Connection> {
1375        self.upgrade_to_ref().map(Connection)
1376    }
1377
1378    pub(crate) fn upgrade_to_ref(&self) -> Option<ConnectionRef> {
1379        self.0.upgrade().map(ConnectionRef::from_arc)
1380    }
1381
1382    /// Returns `true` if the two [`WeakConnectionHandle`] point at the same connection.
1383    pub fn is_same_connection(&self, other: &Self) -> bool {
1384        self.0.ptr_eq(&other.0)
1385    }
1386}
1387
1388#[derive(Debug, Default)]
1389pub(crate) struct Shared {
1390    handshake_confirmed: Notify,
1391    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1392    /// control budget
1393    stream_budget_available: [Notify; 2],
1394    /// Notified when the peer has initiated a new stream
1395    stream_incoming: [Notify; 2],
1396    datagram_received: Notify,
1397    datagrams_unblocked: Notify,
1398    closed: Notify,
1399    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1400    ref_count: AtomicUsize,
1401}
1402
1403pub(crate) struct State {
1404    pub(crate) inner: proto::Connection,
1405    driver: Option<Waker>,
1406    handle: ConnectionHandle,
1407    on_handshake_data: Option<oneshot::Sender<()>>,
1408    on_connected: Option<oneshot::Sender<bool>>,
1409    connected: bool,
1410    handshake_confirmed: bool,
1411    timer: Option<Pin<Box<dyn AsyncTimer>>>,
1412    timer_deadline: Option<Instant>,
1413    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1414    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1415    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1416    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1417    pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1418    /// Always set to Some before the connection becomes drained
1419    pub(crate) error: Option<ConnectionError>,
1420    /// Tracks paths being opened
1421    open_path: FxHashMap<PathId, watch::Sender<Result<(), PathError>>>,
1422    /// Tracks reference counts for paths.
1423    ///
1424    /// I.e. how many [`Path`] and [`WeakPathHandle`] structs are alive for a path.
1425    /// Each entry's [`PathRefOwner`] holds an [`AtomicUsize`] so that cloning or
1426    /// dropping a [`PathRef`] (held by [`Path`] or [`WeakPathHandle`]) does not need
1427    /// to lock this state.
1428    ///
1429    /// [`WeakPathHandle`]: crate::path::WeakPathHandle
1430    pub(crate) path_refs: FxHashMap<PathId, PathRefOwner>,
1431    /// Final path stats for discarded paths.
1432    ///
1433    /// We only insert entries if the discarded path has a non-zero reference count in [`Self::path_refs`].
1434    /// When the last reference to a path is dropped its entry is removed from both maps.
1435    pub(crate) final_path_stats: FxHashMap<PathId, PathStats>,
1436    pub(crate) path_events: tokio::sync::broadcast::Sender<PathEvent>,
1437    sender: Pin<Box<dyn UdpSender>>,
1438    pub(crate) runtime: Arc<dyn Runtime>,
1439    send_buffer: Vec<u8>,
1440    /// We buffer a transmit when the underlying I/O would block
1441    buffered_transmit: Option<proto::Transmit>,
1442    /// Our last external address reported by the peer. When multipath is enabled, this will be the
1443    /// last report across all paths.
1444    pub(crate) observed_external_addr: watch::Sender<Option<SocketAddr>>,
1445    pub(crate) nat_traversal_updates: tokio::sync::broadcast::Sender<n0_nat_traversal::Event>,
1446    on_closed: Vec<oneshot::Sender<Closed>>,
1447}
1448
1449impl State {
1450    #[allow(clippy::too_many_arguments)]
1451    fn new(
1452        inner: proto::Connection,
1453        handle: ConnectionHandle,
1454        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1455        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1456        on_handshake_data: oneshot::Sender<()>,
1457        on_connected: oneshot::Sender<bool>,
1458        sender: Pin<Box<dyn UdpSender>>,
1459        runtime: Arc<dyn Runtime>,
1460    ) -> Self {
1461        Self {
1462            inner,
1463            driver: None,
1464            handle,
1465            on_handshake_data: Some(on_handshake_data),
1466            on_connected: Some(on_connected),
1467            connected: false,
1468            handshake_confirmed: false,
1469            timer: None,
1470            timer_deadline: None,
1471            conn_events,
1472            endpoint_events,
1473            blocked_writers: FxHashMap::default(),
1474            blocked_readers: FxHashMap::default(),
1475            stopped: FxHashMap::default(),
1476            open_path: FxHashMap::default(),
1477            error: None,
1478            sender,
1479            runtime,
1480            send_buffer: Vec::new(),
1481            buffered_transmit: None,
1482            path_events: tokio::sync::broadcast::channel(32).0,
1483            observed_external_addr: watch::Sender::new(None),
1484            nat_traversal_updates: tokio::sync::broadcast::channel(32).0,
1485            on_closed: Vec::new(),
1486            final_path_stats: Default::default(),
1487            path_refs: Default::default(),
1488        }
1489    }
1490
1491    fn drive_transmit(&mut self, cx: &mut Context<'_>) -> io::Result<bool> {
1492        let now = self.runtime.now();
1493        let mut transmits = 0;
1494
1495        let max_datagrams = self
1496            .sender
1497            .max_transmit_segments()
1498            .min(MAX_TRANSMIT_SEGMENTS);
1499
1500        loop {
1501            // Retry the last transmit, or get a new one.
1502            let t = match self.buffered_transmit.take() {
1503                Some(t) => t,
1504                None => {
1505                    self.send_buffer.clear();
1506                    match self
1507                        .inner
1508                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1509                    {
1510                        Some(t) => {
1511                            transmits += match t.segment_size {
1512                                None => 1,
1513                                Some(s) => t.size.div_ceil(s), // round up
1514                            };
1515                            t
1516                        }
1517                        None => break,
1518                    }
1519                }
1520            };
1521
1522            let len = t.size;
1523            match self
1524                .sender
1525                .as_mut()
1526                .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx)
1527            {
1528                Poll::Pending => {
1529                    self.buffered_transmit = Some(t);
1530                    return Ok(false);
1531                }
1532                Poll::Ready(Err(e)) => return Err(e),
1533                Poll::Ready(Ok(())) => {}
1534            }
1535
1536            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1537                // TODO: What isn't ideal here yet is that if we don't poll all
1538                // datagrams that could be sent we don't go into the `app_limited`
1539                // state and CWND continues to grow until we get here the next time.
1540                // See https://github.com/quinn-rs/quinn/issues/1126
1541                return Ok(true);
1542            }
1543        }
1544
1545        Ok(false)
1546    }
1547
1548    fn forward_endpoint_events(&mut self) {
1549        while let Some(event) = self.inner.poll_endpoint_events() {
1550            // If the endpoint driver is gone, noop.
1551            let _ = self.endpoint_events.send((self.handle, event));
1552        }
1553    }
1554
1555    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1556    fn process_conn_events(
1557        &mut self,
1558        shared: &Shared,
1559        cx: &mut Context<'_>,
1560    ) -> Result<(), ConnectionError> {
1561        loop {
1562            match self.conn_events.poll_recv(cx) {
1563                Poll::Ready(Some(ConnectionEvent::Rebind(sender))) => {
1564                    self.sender = sender;
1565                    self.inner.handle_network_change(None, self.runtime.now());
1566                }
1567                Poll::Ready(Some(ConnectionEvent::LocalAddressChanged(hint))) => {
1568                    self.inner
1569                        .handle_network_change(hint.as_deref().map(|x| x as _), self.runtime.now());
1570                }
1571                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1572                    self.inner.handle_event(event);
1573                }
1574                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1575                    self.close(error_code, reason, shared);
1576                }
1577                Poll::Ready(None) => {
1578                    return Err(ConnectionError::TransportError(TransportError::new(
1579                        TransportErrorCode::INTERNAL_ERROR,
1580                        "endpoint driver future was dropped".to_string(),
1581                    )));
1582                }
1583                Poll::Pending => {
1584                    return Ok(());
1585                }
1586            }
1587        }
1588    }
1589
1590    fn forward_app_events(&mut self, shared: &Shared) {
1591        while let Some(event) = self.inner.poll() {
1592            use proto::Event::*;
1593            match event {
1594                HandshakeDataReady => {
1595                    if let Some(x) = self.on_handshake_data.take() {
1596                        let _ = x.send(());
1597                    }
1598                }
1599                Connected => {
1600                    self.connected = true;
1601                    if let Some(x) = self.on_connected.take() {
1602                        // We don't care if the on-connected future was dropped
1603                        let _ = x.send(self.inner.accepted_0rtt());
1604                    }
1605                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1606                        // Wake up rejected 0-RTT streams so they can fail immediately with
1607                        // `ZeroRttRejected` errors.
1608                        wake_all(&mut self.blocked_writers);
1609                        wake_all(&mut self.blocked_readers);
1610                        wake_all_notify(&mut self.stopped);
1611                    }
1612                }
1613                HandshakeConfirmed => {
1614                    self.handshake_confirmed = true;
1615                    shared.handshake_confirmed.notify_waiters();
1616                }
1617                ConnectionLost { reason } => {
1618                    self.terminate(reason, shared);
1619                }
1620                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1621                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1622                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1623                }
1624                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1625                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1626                }
1627                DatagramReceived => {
1628                    shared.datagram_received.notify_waiters();
1629                }
1630                DatagramsUnblocked => {
1631                    shared.datagrams_unblocked.notify_waiters();
1632                }
1633                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1634                Stream(StreamEvent::Available { dir }) => {
1635                    // Might mean any number of streams are ready, so we wake up everyone
1636                    shared.stream_budget_available[dir as usize].notify_waiters();
1637                }
1638                Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1639                Stream(StreamEvent::Stopped { id, .. }) => {
1640                    wake_stream_notify(id, &mut self.stopped);
1641                    wake_stream(id, &mut self.blocked_writers);
1642                }
1643                Path(ref evt @ PathEvent::ObservedAddr { addr: observed, .. }) => {
1644                    self.path_events.send(evt.clone()).ok();
1645                    self.observed_external_addr.send_if_modified(|addr| {
1646                        let old = addr.replace(observed);
1647                        old != *addr
1648                    });
1649                }
1650                Path(ref evt @ PathEvent::Established { id, .. }) => {
1651                    self.path_events.send(evt.clone()).ok();
1652                    if let Some(sender) = self.open_path.remove(&id) {
1653                        sender.send_modify(|value| *value = Ok(()));
1654                    }
1655                }
1656                Path(
1657                    ref evt @ PathEvent::Discarded {
1658                        id, ref path_stats, ..
1659                    },
1660                ) => {
1661                    if self.path_refs.contains_key(&id) {
1662                        self.final_path_stats.insert(id, *path_stats.clone());
1663                    }
1664                    self.path_events.send(evt.clone()).ok();
1665                }
1666                Path(ref evt @ PathEvent::Abandoned { id, .. }) => {
1667                    if let Some(sender) = self.open_path.remove(&id) {
1668                        // We don't care for the reason why this path was closed here, because semantically
1669                        // all close reasons for a path that has not yet been opened equals to `ValidationFailed`.
1670                        // With the noq API, there is no way to application-close a not-yet-opened path, so
1671                        // `ApplicationClosed` cannot occur. And all other variants will only occur for paths
1672                        // that have already been opened.
1673                        // The previous iteration of this code had another event `PathEvent::LocallyClosed` which
1674                        // contained a `PathError`, but that was only ever set to `ValidationFailed`.
1675                        let error = PathError::ValidationFailed;
1676                        sender.send_modify(|value| *value = Err(error));
1677                    }
1678                    // this will happen also for already opened paths
1679                    self.path_events.send(evt.clone()).ok();
1680                }
1681                Path(evt @ PathEvent::RemoteStatus { .. }) => {
1682                    self.path_events.send(evt).ok();
1683                }
1684                NatTraversal(update) => {
1685                    self.nat_traversal_updates.send(update).ok();
1686                }
1687                _ => {
1688                    // PathEvent is #[non_exhaustive].
1689                    // It's possible that noq is built against a newer noq-proto version.
1690                    // In that case, we need to ignore path events we can't handle yet.
1691                    // But for tests, we expect noq and noq-proto to be in sync, so we
1692                    // should panic in case we don't actually handle new cases.
1693                    #[cfg(test)]
1694                    panic!("Unhandled PathEvent variant: {event:?}");
1695                }
1696            }
1697        }
1698    }
1699
1700    fn drive_timer(&mut self, cx: &mut Context<'_>) -> bool {
1701        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1702        // timer is registered with the runtime (and check whether it's already
1703        // expired).
1704        match self.inner.poll_timeout() {
1705            Some(deadline) => {
1706                if let Some(delay) = &mut self.timer {
1707                    // There is no need to reset the tokio timer if the deadline
1708                    // did not change
1709                    if self
1710                        .timer_deadline
1711                        .map(|current_deadline| current_deadline != deadline)
1712                        .unwrap_or(true)
1713                    {
1714                        delay.as_mut().reset(deadline);
1715                    }
1716                } else {
1717                    self.timer = Some(self.runtime.new_timer(deadline));
1718                }
1719                // Store the actual expiration time of the timer
1720                self.timer_deadline = Some(deadline);
1721            }
1722            None => {
1723                self.timer_deadline = None;
1724                return false;
1725            }
1726        }
1727
1728        if self.timer_deadline.is_none() {
1729            return false;
1730        }
1731
1732        let delay = self
1733            .timer
1734            .as_mut()
1735            .expect("timer must exist in this state")
1736            .as_mut();
1737        if delay.poll(cx).is_pending() {
1738            // Since there wasn't a timeout event, there is nothing new
1739            // for the connection to do
1740            return false;
1741        }
1742
1743        // A timer expired, so the caller needs to check for
1744        // new transmits, which might cause new timers to be set.
1745        self.inner.handle_timeout(self.runtime.now());
1746        self.timer_deadline = None;
1747        true
1748    }
1749
1750    /// Wake up a blocked `Driver` task to process I/O
1751    pub(crate) fn wake(&mut self) {
1752        if let Some(x) = self.driver.take() {
1753            x.wake();
1754        }
1755    }
1756
1757    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1758    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1759        self.error = Some(reason.clone());
1760        if let Some(x) = self.on_handshake_data.take() {
1761            let _ = x.send(());
1762        }
1763        wake_all(&mut self.blocked_writers);
1764        wake_all(&mut self.blocked_readers);
1765        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1766        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1767        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1768        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1769        shared.datagram_received.notify_waiters();
1770        shared.datagrams_unblocked.notify_waiters();
1771        if let Some(x) = self.on_connected.take() {
1772            let _ = x.send(false);
1773        }
1774        shared.handshake_confirmed.notify_waiters();
1775        wake_all_notify(&mut self.stopped);
1776        shared.closed.notify_waiters();
1777
1778        // Send to the registered on_closed futures.
1779        if !self.on_closed.is_empty() {
1780            let closed = Closed::new(self, reason);
1781            for tx in self.on_closed.drain(..) {
1782                tx.send(closed.clone()).ok();
1783            }
1784        }
1785    }
1786
1787    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1788        self.inner.close(self.runtime.now(), error_code, reason);
1789        self.terminate(ConnectionError::LocallyClosed, shared);
1790        self.wake();
1791    }
1792
1793    /// Close for a reason other than the application's explicit request
1794    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1795        self.close(0u32.into(), Bytes::new(), shared);
1796    }
1797
1798    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1799        if self.inner.is_handshaking()
1800            || self.inner.accepted_0rtt()
1801            || self.inner.side().is_server()
1802        {
1803            Ok(())
1804        } else {
1805            Err(())
1806        }
1807    }
1808
1809    /// Returns [`PathStats`] for a path, if available.
1810    ///
1811    /// This gets the stats from [`proto::Connection`]. If that returns `None`
1812    /// it gets them from `Self::final_path_stats` instead.
1813    pub(crate) fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
1814        self.inner
1815            .path_stats(path_id)
1816            .or_else(|| self.final_path_stats.get(&path_id).copied())
1817    }
1818
1819    /// Acquire a new [`PathRef`] for a path id, bumping its reference counter by 1.
1820    ///
1821    /// The returned [`PathRef`] is intended to be stored on a [`Path`] or [`WeakPathHandle`].
1822    /// Its reference count is automatically increased when cloned. When its owner is dropped,
1823    /// [`PathRef::on_drop`] must be called to decrement the refcount.
1824    ///
1825    /// [`WeakPathHandle`]: crate::path::WeakPathHandle
1826    pub(crate) fn acquire_path_ref(&mut self, path_id: PathId) -> PathRef {
1827        self.path_refs.entry(path_id).or_default().acquire(path_id)
1828    }
1829}
1830
1831impl Drop for State {
1832    fn drop(&mut self) {
1833        if !self.inner.is_drained() {
1834            // Ensure the endpoint can tidy up
1835            let _ = self
1836                .endpoint_events
1837                .send((self.handle, proto::EndpointEvent::drained()));
1838        }
1839
1840        if !self.on_closed.is_empty()
1841            && let Some(reason) = self.error.clone()
1842        {
1843            // Ensure that all on_closed oneshot senders are triggered before dropping.
1844            let closed = Closed::new(self, reason);
1845            for tx in self.on_closed.drain(..) {
1846                tx.send(closed.clone()).ok();
1847            }
1848        }
1849    }
1850}
1851
1852impl fmt::Debug for State {
1853    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1854        f.debug_struct("State").field("inner", &self.inner).finish()
1855    }
1856}
1857
1858fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1859    if let Some(waker) = wakers.remove(&stream_id) {
1860        waker.wake();
1861    }
1862}
1863
1864fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1865    wakers.drain().for_each(|(_, waker)| waker.wake())
1866}
1867
1868fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1869    if let Some(notify) = wakers.remove(&stream_id) {
1870        notify.notify_waiters()
1871    }
1872}
1873
1874fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1875    wakers
1876        .drain()
1877        .for_each(|(_, notify)| notify.notify_waiters())
1878}
1879
1880/// Errors that can arise when sending a datagram
1881#[derive(Debug, Error, Clone, Eq, PartialEq)]
1882pub enum SendDatagramError {
1883    /// The peer does not support receiving datagram frames
1884    #[error("datagrams not supported by peer")]
1885    UnsupportedByPeer,
1886    /// Datagram support is disabled locally
1887    #[error("datagram support disabled")]
1888    Disabled,
1889    /// The datagram is larger than the connection can currently accommodate
1890    ///
1891    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1892    /// exceeded.
1893    #[error("datagram too large")]
1894    TooLarge,
1895    /// The connection was lost
1896    #[error("connection lost")]
1897    ConnectionLost(#[from] ConnectionError),
1898}
1899
1900/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1901///
1902/// This limits the amount of CPU resources consumed by datagram generation,
1903/// and allows other tasks (like receiving ACKs) to run in between.
1904const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1905
1906/// The maximum amount of datagrams that are sent in a single transmit
1907///
1908/// This can be lower than the maximum platform capabilities, to avoid excessive
1909/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1910/// that numbers around 10 are a good compromise.
1911const MAX_TRANSMIT_SEGMENTS: NonZeroUsize = NonZeroUsize::new(10).expect("known");