noq/connection.rs
1use std::{
2 any::Any,
3 fmt,
4 future::Future,
5 io,
6 net::{IpAddr, SocketAddr},
7 num::NonZeroUsize,
8 pin::Pin,
9 sync::{
10 Arc, Weak,
11 atomic::{AtomicUsize, Ordering},
12 },
13 task::{Context, Poll, Waker, ready},
14};
15
16use bytes::Bytes;
17use pin_project_lite::pin_project;
18use rustc_hash::FxHashMap;
19use thiserror::Error;
20use tokio::sync::{Notify, futures::Notified, mpsc, oneshot, watch};
21use tracing::{Instrument, Span, debug_span};
22
23use crate::{
24 ConnectionEvent, Duration, Instant, Path, VarInt,
25 endpoint::ensure_ipv6,
26 mutex::{Mutex, MutexGuard},
27 path::{OpenPath, PathRef, PathRefOwner},
28 recv_stream::RecvStream,
29 runtime::{AsyncTimer, Runtime, UdpSender},
30 send_stream::SendStream,
31 udp_transmit,
32};
33use proto::{
34 ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, FourTuple, PathError,
35 PathEvent, PathId, PathStats, PathStatus, Side, StreamEvent, StreamId, TransportError,
36 TransportErrorCode, congestion::Controller, n0_nat_traversal,
37};
38
39/// In-progress connection attempt future
40#[derive(Debug)]
41pub struct Connecting {
42 conn: Option<ConnectionRef>,
43 connected: oneshot::Receiver<bool>,
44 handshake_data_ready: Option<oneshot::Receiver<()>>,
45}
46
47impl Connecting {
48 pub(crate) fn new(
49 handle: ConnectionHandle,
50 conn: proto::Connection,
51 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
52 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
53 sender: Pin<Box<dyn UdpSender>>,
54 runtime: Arc<dyn Runtime>,
55 ) -> Self {
56 let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
57 let (on_connected_send, on_connected_recv) = oneshot::channel();
58
59 let conn = ConnectionRef(Arc::new(Arc::new(ConnectionInner {
60 state: Mutex::new(State::new(
61 conn,
62 handle,
63 endpoint_events,
64 conn_events,
65 on_handshake_data_send,
66 on_connected_send,
67 sender,
68 runtime.clone(),
69 )),
70 shared: Shared::default(),
71 })));
72
73 let driver = ConnectionDriver(conn.clone());
74 runtime.spawn(Box::pin(
75 async {
76 if let Err(e) = driver.await {
77 tracing::error!("I/O error: {e}");
78 }
79 }
80 .instrument(Span::current()),
81 ));
82
83 Self {
84 conn: Some(conn),
85 connected: on_connected_recv,
86 handshake_data_ready: Some(on_handshake_data_recv),
87 }
88 }
89
90 /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
91 ///
92 /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
93 /// If so, the returned [`Connection`] can be used to send application data without waiting for
94 /// the rest of the handshake to complete, at the cost of weakened cryptographic security
95 /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
96 /// complete, at which point subsequently opened streams and written data will have full
97 /// cryptographic protection.
98 ///
99 /// ## Outgoing
100 ///
101 /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
102 /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
103 /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
104 /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
105 /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
106 /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
107 /// If it was rejected, the existence of streams opened and other application data sent prior
108 /// to the handshake completing will not be conveyed to the remote application, and local
109 /// operations on them will return `ZeroRttRejected` errors.
110 ///
111 /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
112 /// relevant resumption state to be stored in the server, which servers may limit or lose for
113 /// various reasons including not persisting resumption state across server restarts.
114 ///
115 /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
116 /// implementation's docs for 0-RTT pitfalls.
117 ///
118 /// ## Incoming
119 ///
120 /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
121 /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
122 ///
123 /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
124 /// implementation's docs for 0-RTT pitfalls.
125 ///
126 /// ## Security
127 ///
128 /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
129 /// replay attacks, and should therefore never invoke non-idempotent operations.
130 ///
131 /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
132 /// before TLS client authentication has occurred, and should therefore not be used to send
133 /// data for which client authentication is being used.
134 pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
135 // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
136 // have to release it explicitly before returning `self` by value.
137 let conn = (self.conn.as_mut().unwrap()).lock_without_waking("into_0rtt");
138
139 let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
140 drop(conn);
141
142 if is_ok {
143 let conn = self.conn.take().unwrap();
144 Ok((Connection(conn), ZeroRttAccepted(self.connected)))
145 } else {
146 Err(self)
147 }
148 }
149
150 /// Parameters negotiated during the handshake
151 ///
152 /// The dynamic type returned is determined by the configured
153 /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
154 /// be [`downcast`](Box::downcast) to a
155 /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
156 pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
157 // Taking &mut self allows us to use a single oneshot channel rather than dealing with
158 // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
159 // simple.
160 if let Some(x) = self.handshake_data_ready.take() {
161 let _ = x.await;
162 }
163 let conn = self.conn.as_ref().unwrap();
164 let inner = conn.lock_without_waking("handshake");
165 inner
166 .inner
167 .crypto_session()
168 .handshake_data()
169 .ok_or_else(|| {
170 inner
171 .error
172 .clone()
173 .expect("spurious handshake data ready notification")
174 })
175 }
176
177 /// The local IP address which was used when the peer established
178 /// the connection
179 ///
180 /// This can be different from the address the endpoint is bound to, in case
181 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
182 ///
183 /// This will return `None` for clients, or when the platform does not expose this
184 /// information. See [`noq_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
185 /// supported platforms when using [`noq_udp`](udp) for I/O, which is the default.
186 ///
187 /// Will panic if called after `poll` has returned `Ready`.
188 pub fn local_ip(&self) -> Option<IpAddr> {
189 let conn = self.conn.as_ref().expect("used after yielding Ready");
190 let inner = conn.lock_without_waking("local_ip");
191
192 inner
193 .inner
194 .network_path(PathId::ZERO)
195 .expect("PathId::ZERO is the only path during the handshake")
196 .local_ip()
197 }
198
199 /// The peer's UDP addresses
200 ///
201 /// Will panic if called after `poll` has returned `Ready`.
202 pub fn remote_address(&self) -> SocketAddr {
203 let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
204 conn_ref
205 .lock_without_waking("remote_address")
206 .inner
207 .network_path(PathId::ZERO)
208 .expect("PathId::ZERO is the only path during the handshake")
209 .remote()
210 }
211}
212
213impl Future for Connecting {
214 type Output = Result<Connection, ConnectionError>;
215 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216 Pin::new(&mut self.connected).poll(cx).map(|_| {
217 let conn = self.conn.take().unwrap();
218 let inner = conn.lock_without_waking("connecting");
219 if inner.connected {
220 drop(inner);
221 Ok(Connection(conn))
222 } else {
223 Err(inner
224 .error
225 .clone()
226 .expect("connected signaled without connection success or error"))
227 }
228 })
229 }
230}
231
232/// Future that completes when a connection is fully established
233///
234/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
235/// value is meaningless.
236pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
237
238impl Future for ZeroRttAccepted {
239 type Output = bool;
240 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
241 Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
242 }
243}
244
245/// A future that drives protocol logic for a connection
246///
247/// This future handles the protocol logic for a single connection, routing events from the
248/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
249/// It also keeps track of outstanding timeouts for the `Connection`.
250///
251/// If the connection encounters an error condition, this future will yield an error. It will
252/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
253/// connection-related futures, this waits for the draining period to complete to ensure that
254/// packets still in flight from the peer are handled gracefully.
255#[must_use = "connection drivers must be spawned for their connections to function"]
256#[derive(Debug)]
257struct ConnectionDriver(ConnectionRef);
258
259impl Future for ConnectionDriver {
260 type Output = Result<(), io::Error>;
261
262 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
263 let conn = &mut *self.0.lock_without_waking("poll");
264
265 let span = debug_span!("drive", id = conn.handle.0);
266 let _guard = span.enter();
267
268 if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
269 conn.terminate(e, &self.0.shared);
270 return Poll::Ready(Ok(()));
271 }
272 let mut keep_going = conn.drive_transmit(cx)?;
273 // If a timer expires, there might be more to transmit. When we transmit something, we
274 // might need to reset a timer. Hence, we must loop until neither happens.
275 keep_going |= conn.drive_timer(cx);
276 conn.forward_endpoint_events();
277 conn.forward_app_events(&self.0.shared);
278
279 if !conn.inner.is_drained() {
280 if keep_going {
281 // If the connection hasn't processed all tasks, schedule it again
282 cx.waker().wake_by_ref();
283 } else {
284 conn.driver = Some(cx.waker().clone());
285 }
286 return Poll::Pending;
287 }
288 if conn.error.is_none() {
289 unreachable!("drained connections always have an error");
290 }
291 Poll::Ready(Ok(()))
292 }
293}
294
295/// A QUIC connection.
296///
297/// If all references to a connection (including every clone of the `Connection` handle, streams of
298/// incoming streams, and the various stream types) have been dropped, then the connection will be
299/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
300/// connection explicitly by calling [`Connection::close()`].
301///
302/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon
303/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
304/// application. [`Connection::close()`] describes in more detail how to gracefully close a
305/// connection without losing application data.
306///
307/// May be cloned to obtain another handle to the same connection.
308///
309/// [`Connection::close()`]: Connection::close
310#[derive(Debug, Clone)]
311pub struct Connection(ConnectionRef);
312
313impl Connection {
314 /// Returns a weak reference to the inner connection struct.
315 pub fn weak_handle(&self) -> WeakConnectionHandle {
316 self.0.weak_handle()
317 }
318
319 /// Initiate a new outgoing unidirectional stream.
320 ///
321 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
322 /// consequence, the peer won't be notified that a stream has been opened until the stream is
323 /// actually used.
324 pub fn open_uni(&self) -> OpenUni<'_> {
325 OpenUni {
326 conn: &self.0,
327 notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
328 }
329 }
330
331 /// Initiate a new outgoing bidirectional stream.
332 ///
333 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
334 /// consequence, the peer won't be notified that a stream has been opened until the stream is
335 /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
336 /// anything to [`SendStream`] will never succeed.
337 ///
338 /// [`open_bi()`]: crate::Connection::open_bi
339 /// [`SendStream`]: crate::SendStream
340 /// [`RecvStream`]: crate::RecvStream
341 pub fn open_bi(&self) -> OpenBi<'_> {
342 OpenBi {
343 conn: &self.0,
344 notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
345 }
346 }
347
348 /// Accept the next incoming uni-directional stream
349 pub fn accept_uni(&self) -> AcceptUni<'_> {
350 AcceptUni {
351 conn: &self.0,
352 notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
353 }
354 }
355
356 /// Accept the next incoming bidirectional stream
357 ///
358 /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
359 /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
360 /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
361 ///
362 /// [`accept_bi()`]: crate::Connection::accept_bi
363 /// [`open_bi()`]: crate::Connection::open_bi
364 /// [`SendStream`]: crate::SendStream
365 /// [`RecvStream`]: crate::RecvStream
366 pub fn accept_bi(&self) -> AcceptBi<'_> {
367 AcceptBi {
368 conn: &self.0,
369 notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
370 }
371 }
372
373 /// Receive an application datagram
374 pub fn read_datagram(&self) -> ReadDatagram<'_> {
375 ReadDatagram {
376 conn: &self.0,
377 notify: self.0.shared.datagram_received.notified(),
378 }
379 }
380
381 /// Opens a new path if no path exists yet for `network_path`.
382 ///
383 /// If `network_path` has no local IP set, then this will open a new path
384 /// if no path exists for this remote address, independent of the existing
385 /// path's local IP. If a local IP is set, it will match against the full
386 /// four-tuple of existing paths.
387 ///
388 /// Otherwise behaves exactly as [`open_path`].
389 ///
390 /// [`open_path`]: Self::open_path
391 pub fn open_path_ensure(
392 &self,
393 network_path: impl Into<FourTuple>,
394 initial_status: PathStatus,
395 ) -> OpenPath {
396 let network_path = network_path.into();
397 let mut state = self.0.lock_and_wake("open_path");
398
399 let network_path = match normalize_network_path(network_path, &state.inner) {
400 Ok(network_path) => network_path,
401 Err(err) => return OpenPath::rejected(err),
402 };
403
404 let now = state.runtime.now();
405 let open_res = state
406 .inner
407 .open_path_ensure(network_path, initial_status, now);
408 match open_res {
409 Ok((path_id, existed)) if existed => {
410 let recv = state.open_path.get(&path_id).map(|tx| tx.subscribe());
411 drop(state);
412 match recv {
413 Some(recv) => OpenPath::new(path_id, recv, self.0.clone()),
414 None => OpenPath::ready(path_id, self.0.clone()),
415 }
416 }
417 Ok((path_id, _)) => {
418 let (tx, rx) = watch::channel(Ok(()));
419 state.open_path.insert(path_id, tx);
420 drop(state);
421 OpenPath::new(path_id, rx, self.0.clone())
422 }
423 Err(err) => OpenPath::rejected(err),
424 }
425 }
426
427 /// Opens an additional path if the multipath extension is negotiated.
428 ///
429 /// This function takes a [`FourTuple`], which contains the remote address and an optional
430 /// local IP. If the local IP is set, the path will be opened with this source address,
431 /// and the endpoint must support sending from that IP address. You can also pass a
432 /// [`SocketAddr`] to only set the remote address and leave the local IP interface unspecified.
433 ///
434 /// The returned future completes once the path is either fully opened and ready to
435 /// carry application data, or if there was an error.
436 ///
437 /// Dropping the returned future does not cancel the opening of the path, the
438 /// [`PathEvent::Established`] event will still be emitted from [`Self::path_events`] if
439 /// the path opens. The [`PathId`] for the events can be extracted from
440 /// [`OpenPath::path_id`].
441 ///
442 /// Failure to open a path can either occur immediately, before polling the returned
443 /// future, or at a later time. If the failure is immediate [`OpenPath::path_id`] will
444 /// return `None` and the future will be ready immediately. If the failure happens
445 /// later, a [`PathEvent`] will be emitted.
446 pub fn open_path(
447 &self,
448 network_path: impl Into<FourTuple>,
449 initial_status: PathStatus,
450 ) -> OpenPath {
451 let network_path = network_path.into();
452 let mut state = self.0.lock_and_wake("open_path");
453
454 let network_path = match normalize_network_path(network_path, &state.inner) {
455 Ok(network_path) => network_path,
456 Err(err) => return OpenPath::rejected(err),
457 };
458
459 let (on_open_path_send, on_open_path_recv) = watch::channel(Ok(()));
460 let now = state.runtime.now();
461 let open_res = state.inner.open_path(network_path, initial_status, now);
462 match open_res {
463 Ok(path_id) => {
464 state.open_path.insert(path_id, on_open_path_send);
465 drop(state);
466 OpenPath::new(path_id, on_open_path_recv, self.0.clone())
467 }
468 Err(err) => OpenPath::rejected(err),
469 }
470 }
471
472 /// Returns the [`Path`] structure of an open path
473 pub fn path(&self, id: PathId) -> Option<Path> {
474 Path::new(&self.0, id)
475 }
476
477 /// A stream of [`PathEvent`]s for all paths in this connection.
478 ///
479 /// The stream will yield a [`PathEvent`] whenever there is a change in the state of any path in this connection.
480 /// The events need to be processed immediately, since there isn't an unbounded buffer for them.
481 ///
482 /// If processing of events lags behind too much, you will get an error of type [`crate::Lagged`] indicating
483 /// how many events were lost. The stream continues after a lag, delivering the oldest retained message next.
484 pub fn path_events(&self) -> crate::PathEvents {
485 crate::PathEvents::new(
486 self.0
487 .lock_without_waking("path_events")
488 .path_events
489 .subscribe(),
490 )
491 }
492
493 /// A stream of NAT traversal updates for this connection.
494 ///
495 /// The events need to be processed immediately, since there isn't an unbounded buffer for them.
496 ///
497 /// If processing of events lags behind too much, you will get an error of type [`crate::Lagged`] indicating
498 /// how many events were lost. The stream continues after a lag, delivering the oldest retained message next.
499 pub fn nat_traversal_updates(&self) -> crate::NatTraversalUpdates {
500 crate::NatTraversalUpdates::new(
501 self.0
502 .lock_without_waking("nat_traversal_updates")
503 .nat_traversal_updates
504 .subscribe(),
505 )
506 }
507
508 /// Wait for the connection to be closed for any reason
509 ///
510 /// Despite the return type's name, closed connections are often not an error condition at the
511 /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
512 /// and [`ConnectionError::ApplicationClosed`].
513 pub async fn closed(&self) -> ConnectionError {
514 {
515 let conn = self.0.lock_without_waking("closed");
516 if let Some(error) = conn.error.as_ref() {
517 return error.clone();
518 }
519 // Construct the future while the lock is held to ensure we can't miss a wakeup if
520 // the `Notify` is signaled immediately after we release the lock. `await` it after
521 // the lock guard is out of scope.
522 self.0.shared.closed.notified()
523 }
524 .await;
525 self.0
526 .lock_without_waking("closed")
527 .error
528 .as_ref()
529 .expect("closed without an error")
530 .clone()
531 }
532
533 /// Wait for the connection to be closed without keeping a strong reference to the connection
534 ///
535 /// Returns a future that resolves, once the connection is closed, to a [`Closed`] struct
536 /// describing the close reason and final connection and per-path statistics.
537 ///
538 /// Calling [`Self::closed`] keeps the connection alive until it is either closed locally via [`Connection::close`]
539 /// or closed by the remote peer. This function instead does not keep the connection itself alive,
540 /// so if all *other* clones of the connection are dropped, the connection will be closed implicitly even
541 /// if there are futures returned from this function still being awaited.
542 pub fn on_closed(&self) -> OnClosed {
543 let (tx, rx) = oneshot::channel();
544 let mut state = self.0.lock_without_waking("on_closed");
545 if let Some(reason) = state.error.clone() {
546 // Connection already closed, send immediately
547 let _ = tx.send(Closed::new(&mut state, reason));
548 } else {
549 state.on_closed.push(tx);
550 }
551 drop(state);
552 OnClosed {
553 conn: self.weak_handle(),
554 rx,
555 }
556 }
557
558 /// Whether the connection is closed, and why.
559 ///
560 /// The close_reason is always set to `Some(ConnectionError)` when a socket is
561 /// closed; whether it was closed manually by calling [`Connection::close()`] or due to
562 /// an internal error (such as an idle timeout or the peer closing the
563 /// connection).
564 ///
565 /// Note: when the connection is closed, `connection.close_reason().is_some()` will always be true.
566 pub fn close_reason(&self) -> Option<ConnectionError> {
567 self.0.lock_without_waking("close_reason").error.clone()
568 }
569
570 /// Close the connection immediately.
571 ///
572 /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
573 /// more data is sent to the peer and the peer may drop buffered data upon receiving
574 /// the CONNECTION_CLOSE frame.
575 ///
576 /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
577 ///
578 /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
579 /// is preserved in full, it should be kept under 1KiB.
580 ///
581 /// # Gracefully closing a connection
582 ///
583 /// Only the peer last receiving application data can be certain that all data is
584 /// delivered. The only reliable action it can then take is to close the connection,
585 /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
586 /// frame is very likely if both endpoints stay online long enough, and
587 /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
588 /// remote peer will time out the connection, provided that the idle timeout is not
589 /// disabled.
590 ///
591 /// The sending side can not guarantee all stream data is delivered to the remote
592 /// application. It only knows the data is delivered to the QUIC stack of the remote
593 /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
594 /// [`close()`] the remote endpoint may drop any data it received but is as yet
595 /// undelivered to the application, including data that was acknowledged as received to
596 /// the local endpoint.
597 ///
598 /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
599 /// [`Endpoint::wait_idle()`]: crate::Endpoint::wait_idle
600 /// [`close()`]: Connection::close
601 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
602 let conn = &mut *self.0.lock_without_waking("close"); // conn.close self-wakes
603 conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
604 }
605
606 /// Wait for the handshake to be confirmed.
607 ///
608 /// As a server, who must be authenticated by clients,
609 /// this happens when the handshake completes
610 /// upon receiving a TLS Finished message from the client.
611 /// In return, the server send a HANDSHAKE_DONE frame.
612 ///
613 /// As a client, this happens when receiving a HANDSHAKE_DONE frame.
614 /// At this point, the server has either accepted our authentication,
615 /// or, if client authentication is not required, accepted our lack of authentication.
616 pub async fn handshake_confirmed(&self) -> Result<(), ConnectionError> {
617 {
618 let conn = self.0.lock_without_waking("handshake_confirmed");
619 if let Some(error) = conn.error.as_ref() {
620 return Err(error.clone());
621 }
622 if conn.handshake_confirmed {
623 return Ok(());
624 }
625 // Construct the future while the lock is held to ensure we can't miss a wakeup if
626 // the `Notify` is signaled immediately after we release the lock. `await` it after
627 // the lock guard is out of scope.
628 self.0.shared.handshake_confirmed.notified()
629 }
630 .await;
631 if let Some(error) = self
632 .0
633 .lock_without_waking("handshake_confirmed")
634 .error
635 .as_ref()
636 {
637 Err(error.clone())
638 } else {
639 Ok(())
640 }
641 }
642
643 /// Transmit `data` as an unreliable, unordered application datagram
644 ///
645 /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
646 /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
647 /// dictated by the peer.
648 ///
649 /// Previously queued datagrams which are still unsent may be discarded to make space for this
650 /// datagram, in order of oldest to newest.
651 pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
652 let conn = &mut *self.0.lock_and_wake("send_datagram");
653 if let Some(ref x) = conn.error {
654 return Err(SendDatagramError::ConnectionLost(x.clone()));
655 }
656 use proto::SendDatagramError::*;
657 match conn.inner.datagrams().send(data, true) {
658 Ok(()) => Ok(()),
659 Err(e) => Err(match e {
660 Blocked(..) => unreachable!(),
661 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
662 Disabled => SendDatagramError::Disabled,
663 TooLarge => SendDatagramError::TooLarge,
664 }),
665 }
666 }
667
668 /// Transmit `data` as an unreliable, unordered application datagram
669 ///
670 /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
671 /// conditions, which effectively prioritizes old datagrams over new datagrams.
672 ///
673 /// See [`send_datagram()`] for details.
674 ///
675 /// [`send_datagram()`]: Connection::send_datagram
676 pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
677 SendDatagram {
678 conn: &self.0,
679 data: Some(data),
680 notify: self.0.shared.datagrams_unblocked.notified(),
681 }
682 }
683
684 /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
685 ///
686 /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
687 ///
688 /// This may change over the lifetime of a connection according to variation in the path MTU
689 /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
690 /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
691 ///
692 /// Not necessarily the maximum size of received datagrams.
693 ///
694 /// [`send_datagram()`]: Connection::send_datagram
695 pub fn max_datagram_size(&self) -> Option<usize> {
696 self.0
697 .lock_without_waking("max_datagram_size")
698 .inner
699 .datagrams()
700 .max_size()
701 }
702
703 /// Bytes available in the outgoing datagram buffer
704 ///
705 /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
706 /// at most this size is guaranteed not to cause older datagrams to be dropped.
707 pub fn datagram_send_buffer_space(&self) -> usize {
708 self.0
709 .lock_without_waking("datagram_send_buffer_space")
710 .inner
711 .datagrams()
712 .send_buffer_space()
713 }
714
715 /// The side of the connection (client or server)
716 pub fn side(&self) -> Side {
717 self.0.lock_without_waking("side").inner.side()
718 }
719
720 /// Current best estimate of this connection's latency (round-trip-time)
721 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
722 self.0.lock_without_waking("rtt").inner.rtt(path_id)
723 }
724
725 /// Returns connection statistics
726 pub fn stats(&self) -> ConnectionStats {
727 self.0.lock_without_waking("stats").inner.stats()
728 }
729
730 /// Returns path statistics
731 pub fn path_stats(&self, path_id: PathId) -> Option<PathStats> {
732 self.0.lock_without_waking("path_stats").path_stats(path_id)
733 }
734
735 /// Current state of the congestion control algorithm, for debugging purposes
736 pub fn congestion_state(&self, path_id: PathId) -> Option<Box<dyn Controller>> {
737 self.0
738 .lock_without_waking("congestion_state")
739 .inner
740 .congestion_state(path_id)
741 .map(|c| c.clone_box())
742 }
743
744 /// Parameters negotiated during the handshake
745 ///
746 /// Guaranteed to return `Some` on fully established connections or after
747 /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
748 /// the returned value.
749 ///
750 /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
751 pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
752 self.0
753 .lock_without_waking("handshake_data")
754 .inner
755 .crypto_session()
756 .handshake_data()
757 }
758
759 /// Cryptographic identity of the peer
760 ///
761 /// The dynamic type returned is determined by the configured
762 /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
763 /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
764 pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
765 self.0
766 .lock_without_waking("peer_identity")
767 .inner
768 .crypto_session()
769 .peer_identity()
770 }
771
772 /// A stable identifier for this connection
773 ///
774 /// Peer addresses and connection IDs can change, but this value will remain
775 /// fixed for the lifetime of the connection.
776 pub fn stable_id(&self) -> usize {
777 self.0.stable_id()
778 }
779
780 /// Update traffic keys spontaneously
781 ///
782 /// This primarily exists for testing purposes.
783 pub fn force_key_update(&self) {
784 self.0
785 .lock_and_wake("force_key_update")
786 .inner
787 .force_key_update()
788 }
789
790 /// Derive keying material from this connection's TLS session secrets.
791 ///
792 /// When both peers call this method with the same `label` and `context`
793 /// arguments and `output` buffers of equal length, they will get the
794 /// same sequence of bytes in `output`. These bytes are cryptographically
795 /// strong and pseudorandom, and are suitable for use as keying material.
796 ///
797 /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
798 pub fn export_keying_material(
799 &self,
800 output: &mut [u8],
801 label: &[u8],
802 context: &[u8],
803 ) -> Result<(), proto::crypto::ExportKeyingMaterialError> {
804 self.0
805 .lock_without_waking("export_keying_material")
806 .inner
807 .crypto_session()
808 .export_keying_material(output, label, context)
809 }
810
811 /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
812 ///
813 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
814 /// `count`s increase both minimum and worst-case memory consumption.
815 pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
816 let mut conn = self.0.lock_and_wake("set_max_concurrent_uni_streams");
817 conn.inner.set_max_concurrent_streams(Dir::Uni, count);
818 }
819
820 /// See [`proto::TransportConfig::send_window()`]
821 pub fn set_send_window(&self, send_window: u64) {
822 let mut conn = self.0.lock_and_wake("set_send_window");
823 conn.inner.set_send_window(send_window);
824 }
825
826 /// See [`proto::TransportConfig::receive_window()`]
827 pub fn set_receive_window(&self, receive_window: VarInt) {
828 let mut conn = self.0.lock_and_wake("set_receive_window");
829 conn.inner.set_receive_window(receive_window);
830 }
831
832 /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
833 ///
834 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
835 /// `count`s increase both minimum and worst-case memory consumption.
836 pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
837 let mut conn = self.0.lock_and_wake("set_max_concurrent_bi_streams");
838 conn.inner.set_max_concurrent_streams(Dir::Bi, count);
839 }
840
841 /// Track changes on our external address as reported by the peer.
842 pub fn observed_external_addr(&self) -> crate::ObservedExternalAddr {
843 let conn = self.0.lock_without_waking("external_addr");
844 crate::ObservedExternalAddr::new(conn.observed_external_addr.subscribe())
845 }
846
847 /// Is multipath enabled?
848 // TODO(flub): not a useful API, once we do real things with multipath we can remove
849 // this again.
850 pub fn is_multipath_enabled(&self) -> bool {
851 let conn = self.0.lock_without_waking("is_multipath_enabled");
852 conn.inner.is_multipath_negotiated()
853 }
854
855 /// Registers one address at which this endpoint might be reachable
856 ///
857 /// When the NAT traversal extension is negotiated, servers send these addresses to clients in
858 /// `ADD_ADDRESS` frames. This allows clients to obtain server address candidates to initiate
859 /// NAT traversal attempts. Clients provide their own reachable addresses in `REACH_OUT` frames
860 /// when [`Self::initiate_nat_traversal_round`] is called.
861 pub fn add_nat_traversal_address(
862 &self,
863 address: SocketAddr,
864 ) -> Result<(), n0_nat_traversal::Error> {
865 let mut conn = self.0.lock_and_wake("add_nat_traversal_addresses");
866 conn.inner.add_nat_traversal_address(address)
867 }
868
869 /// Removes one or more addresses from the set of addresses at which this endpoint is reachable
870 ///
871 /// When the NAT traversal extension is negotiated, servers send address removals to
872 /// clients in `REMOVE_ADDRESS` frames. This allows clients to stop using outdated
873 /// server address candidates that are no longer valid for NAT traversal.
874 ///
875 /// For clients, removed addresses will no longer be advertised in `REACH_OUT` frames.
876 ///
877 /// Addresses not present in the set will be silently ignored.
878 pub fn remove_nat_traversal_address(
879 &self,
880 address: SocketAddr,
881 ) -> Result<(), n0_nat_traversal::Error> {
882 let mut conn = self.0.lock_and_wake("remove_nat_traversal_addresses");
883 conn.inner.remove_nat_traversal_address(address)
884 }
885
886 /// Get the current local nat traversal addresses
887 pub fn get_local_nat_traversal_addresses(
888 &self,
889 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
890 let conn = self
891 .0
892 .lock_without_waking("get_local_nat_traversal_addresses");
893 conn.inner.get_local_nat_traversal_addresses()
894 }
895
896 /// Get the currently advertised nat traversal addresses by the server
897 pub fn get_remote_nat_traversal_addresses(
898 &self,
899 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
900 let conn = self
901 .0
902 .lock_without_waking("get_remote_nat_traversal_addresses");
903 conn.inner.get_remote_nat_traversal_addresses()
904 }
905
906 /// Initiates a new nat traversal round
907 ///
908 /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
909 /// frames, and initiating probing of the known remote addresses. When a new round is
910 /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
911 ///
912 /// Returns the server addresses that are now being probed.
913 pub fn initiate_nat_traversal_round(&self) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
914 let mut conn = self.0.lock_and_wake("initiate_nat_traversal_round");
915 let now = conn.runtime.now();
916 conn.inner.initiate_nat_traversal_round(now)
917 }
918}
919
920/// Normalizes a [`FourTuple`] against the connection's address family.
921///
922/// If the connection already uses IPv6 paths, the remote is canonicalised via
923/// [`ensure_ipv6`]. If it uses IPv4 and the requested remote is IPv6, this returns
924/// [`PathError::InvalidRemoteAddress`].
925fn normalize_network_path(
926 network_path: FourTuple,
927 conn: &proto::Connection,
928) -> Result<FourTuple, PathError> {
929 // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
930 // If not, we do not support IPv6. We can not access endpoint::State from here
931 // however, but either all our paths use an IPv6 address, or all our paths use an
932 // IPv4 address. So we can use that information.
933 let ipv6 = conn
934 .paths()
935 .iter()
936 .filter_map(|id| {
937 conn.network_path(*id)
938 .map(|addrs| addrs.remote().is_ipv6())
939 .ok()
940 })
941 .next()
942 .unwrap_or_default();
943 let remote = network_path.remote();
944 if remote.is_ipv6() && !ipv6 {
945 Err(PathError::InvalidRemoteAddress(remote))
946 } else if ipv6 {
947 let remote = SocketAddr::V6(ensure_ipv6(remote));
948 Ok(FourTuple::new(remote, network_path.local_ip()))
949 } else {
950 Ok(network_path)
951 }
952}
953
954pin_project! {
955 /// Future produced by [`Connection::open_uni`]
956 pub struct OpenUni<'a> {
957 conn: &'a ConnectionRef,
958 #[pin]
959 notify: Notified<'a>,
960 }
961}
962
963impl Future for OpenUni<'_> {
964 type Output = Result<SendStream, ConnectionError>;
965 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
966 let this = self.project();
967 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
968 Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
969 }
970}
971
972pin_project! {
973 /// Future produced by [`Connection::open_bi`]
974 pub struct OpenBi<'a> {
975 conn: &'a ConnectionRef,
976 #[pin]
977 notify: Notified<'a>,
978 }
979}
980
981impl Future for OpenBi<'_> {
982 type Output = Result<(SendStream, RecvStream), ConnectionError>;
983 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
984 let this = self.project();
985 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
986
987 Poll::Ready(Ok((
988 SendStream::new(conn.clone(), id, is_0rtt),
989 RecvStream::new(conn, id, is_0rtt),
990 )))
991 }
992}
993
994fn poll_open<'a>(
995 ctx: &mut Context<'_>,
996 conn: &'a ConnectionRef,
997 mut notify: Pin<&mut Notified<'a>>,
998 dir: Dir,
999) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1000 let mut state = conn.lock_without_waking("poll_open");
1001 if let Some(ref e) = state.error {
1002 return Poll::Ready(Err(e.clone()));
1003 } else if let Some(id) = state.inner.streams().open(dir) {
1004 let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
1005 drop(state); // Release the lock so clone can take it
1006 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1007 }
1008 loop {
1009 match notify.as_mut().poll(ctx) {
1010 // `state` lock ensures we didn't race with readiness
1011 Poll::Pending => return Poll::Pending,
1012 // Spurious wakeup, get a new future
1013 Poll::Ready(()) => {
1014 notify.set(conn.shared.stream_budget_available[dir as usize].notified())
1015 }
1016 }
1017 }
1018}
1019
1020pin_project! {
1021 /// Future produced by [`Connection::accept_uni`]
1022 pub struct AcceptUni<'a> {
1023 conn: &'a ConnectionRef,
1024 #[pin]
1025 notify: Notified<'a>,
1026 }
1027}
1028
1029impl Future for AcceptUni<'_> {
1030 type Output = Result<RecvStream, ConnectionError>;
1031
1032 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1033 let this = self.project();
1034 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
1035 Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
1036 }
1037}
1038
1039pin_project! {
1040 /// Future produced by [`Connection::accept_bi`]
1041 pub struct AcceptBi<'a> {
1042 conn: &'a ConnectionRef,
1043 #[pin]
1044 notify: Notified<'a>,
1045 }
1046}
1047
1048impl Future for AcceptBi<'_> {
1049 type Output = Result<(SendStream, RecvStream), ConnectionError>;
1050
1051 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1052 let this = self.project();
1053 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
1054 Poll::Ready(Ok((
1055 SendStream::new(conn.clone(), id, is_0rtt),
1056 RecvStream::new(conn, id, is_0rtt),
1057 )))
1058 }
1059}
1060
1061fn poll_accept<'a>(
1062 ctx: &mut Context<'_>,
1063 conn: &'a ConnectionRef,
1064 mut notify: Pin<&mut Notified<'a>>,
1065 dir: Dir,
1066) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1067 let mut state = conn.lock_and_wake("poll_accept");
1068 // Check for incoming streams before checking `state.error` so that already-received streams,
1069 // which are necessarily finite, can be drained from a closed connection.
1070 if let Some(id) = state.inner.streams().accept(dir) {
1071 let is_0rtt = state.inner.is_handshaking();
1072 drop(state); // Release the lock (wake on drop) so clone can take it
1073 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1074 } else if let Some(ref e) = state.error {
1075 return Poll::Ready(Err(e.clone()));
1076 }
1077 loop {
1078 match notify.as_mut().poll(ctx) {
1079 // `state` lock ensures we didn't race with readiness
1080 Poll::Pending => return Poll::Pending,
1081 // Spurious wakeup, get a new future
1082 Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
1083 }
1084 }
1085}
1086
1087pin_project! {
1088 /// Future produced by [`Connection::read_datagram`]
1089 pub struct ReadDatagram<'a> {
1090 conn: &'a ConnectionRef,
1091 #[pin]
1092 notify: Notified<'a>,
1093 }
1094}
1095
1096impl Future for ReadDatagram<'_> {
1097 type Output = Result<Bytes, ConnectionError>;
1098 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1099 let mut this = self.project();
1100 let mut state = this.conn.lock_without_waking("ReadDatagram::poll");
1101 // Check for buffered datagrams before checking `state.error` so that already-received
1102 // datagrams, which are necessarily finite, can be drained from a closed connection.
1103 if let Some(x) = state.inner.datagrams().recv() {
1104 return Poll::Ready(Ok(x));
1105 } else if let Some(ref e) = state.error {
1106 return Poll::Ready(Err(e.clone()));
1107 }
1108 loop {
1109 match this.notify.as_mut().poll(ctx) {
1110 // `state` lock ensures we didn't race with readiness
1111 Poll::Pending => return Poll::Pending,
1112 // Spurious wakeup, get a new future
1113 Poll::Ready(()) => this
1114 .notify
1115 .set(this.conn.shared.datagram_received.notified()),
1116 }
1117 }
1118 }
1119}
1120
1121pin_project! {
1122 /// Future produced by [`Connection::send_datagram_wait`]
1123 pub struct SendDatagram<'a> {
1124 conn: &'a ConnectionRef,
1125 data: Option<Bytes>,
1126 #[pin]
1127 notify: Notified<'a>,
1128 }
1129}
1130
1131impl Future for SendDatagram<'_> {
1132 type Output = Result<(), SendDatagramError>;
1133 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1134 let mut this = self.project();
1135 let mut state = this.conn.lock_and_wake("SendDatagram::poll");
1136 if let Some(ref e) = state.error {
1137 return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
1138 }
1139 use proto::SendDatagramError::*;
1140 match state
1141 .inner
1142 .datagrams()
1143 .send(this.data.take().unwrap(), false)
1144 {
1145 Ok(()) => Poll::Ready(Ok(())),
1146 Err(e) => Poll::Ready(Err(match e {
1147 Blocked(data) => {
1148 this.data.replace(data);
1149 loop {
1150 match this.notify.as_mut().poll(ctx) {
1151 Poll::Pending => return Poll::Pending,
1152 // Spurious wakeup, get a new future
1153 Poll::Ready(()) => this
1154 .notify
1155 .set(this.conn.shared.datagrams_unblocked.notified()),
1156 }
1157 }
1158 }
1159 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1160 Disabled => SendDatagramError::Disabled,
1161 TooLarge => SendDatagramError::TooLarge,
1162 })),
1163 }
1164 }
1165}
1166
1167/// State of a [`Connection`] at the moment it was closed.
1168///
1169/// Returned by the [`OnClosed`] future from [`Connection::on_closed`].
1170#[derive(Debug, Clone)]
1171#[non_exhaustive]
1172pub struct Closed {
1173 /// The reason the connection was closed.
1174 pub reason: ConnectionError,
1175 /// Aggregate connection statistics at the moment of close.
1176 pub stats: ConnectionStats,
1177 /// Per-path statistics for every path the connection knew about at close time.
1178 ///
1179 /// This includes paths that haven't been discarded at close time, plus any
1180 /// already-discarded paths whose final stats had been retained because a [`Path`]
1181 /// or [`WeakPathHandle`] handle was kept alive.
1182 ///
1183 /// [`WeakPathHandle`]: crate::WeakPathHandle
1184 pub path_stats: Vec<(PathId, PathStats)>,
1185}
1186
1187impl Closed {
1188 /// Snapshot the current connection state into a [`Closed`] value.
1189 ///
1190 /// Must only be called once `state.error` has been set.
1191 pub(crate) fn new(state: &mut State, reason: ConnectionError) -> Self {
1192 let stats = state.inner.stats();
1193
1194 let non_discarded_paths = state.inner.paths();
1195 let mut path_stats =
1196 Vec::with_capacity(non_discarded_paths.len() + state.final_path_stats.len());
1197
1198 // Non-discarded paths are tracked by proto::Connection.
1199 path_stats.extend(
1200 non_discarded_paths
1201 .into_iter()
1202 .filter_map(|id| state.inner.path_stats(id).map(|stats| (id, stats))),
1203 );
1204 // Already-discarded paths whose final stats we kept around.
1205 path_stats.extend(
1206 state
1207 .final_path_stats
1208 .iter()
1209 .map(|(id, stats)| (*id, *stats)),
1210 );
1211 Self {
1212 reason,
1213 stats,
1214 path_stats,
1215 }
1216 }
1217}
1218
1219/// Future returned by [`Connection::on_closed`]
1220///
1221/// Resolves to [`Closed`].
1222pub struct OnClosed {
1223 rx: oneshot::Receiver<Closed>,
1224 conn: WeakConnectionHandle,
1225}
1226
1227impl Drop for OnClosed {
1228 fn drop(&mut self) {
1229 if self.rx.is_terminated() {
1230 return;
1231 };
1232 if let Some(conn) = self.conn.upgrade() {
1233 self.rx.close();
1234 conn.0
1235 .lock_without_waking("OnClosed::drop")
1236 .on_closed
1237 .retain(|tx| !tx.is_closed());
1238 }
1239 }
1240}
1241
1242impl Future for OnClosed {
1243 type Output = Closed;
1244
1245 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1246 let this = self.get_mut();
1247 // The `expect` is safe because `State::drop` ensures that all senders are triggered
1248 // before being dropped.
1249 Pin::new(&mut this.rx)
1250 .poll(cx)
1251 .map(|x| x.expect("on_close sender is never dropped before sending"))
1252 }
1253}
1254
1255#[derive(Debug)]
1256#[allow(clippy::redundant_allocation)]
1257pub(crate) struct ConnectionRef(Arc<Arc<ConnectionInner>>);
1258
1259impl ConnectionRef {
1260 #[allow(clippy::redundant_allocation)]
1261 fn from_arc(inner: Arc<Arc<ConnectionInner>>) -> Self {
1262 inner.shared.ref_count.fetch_add(1, Ordering::Relaxed);
1263 Self(inner)
1264 }
1265
1266 pub(crate) fn stable_id(&self) -> usize {
1267 &*self.0 as *const _ as usize
1268 }
1269
1270 pub(crate) fn weak_handle(&self) -> WeakConnectionHandle {
1271 WeakConnectionHandle(Arc::downgrade(&self.0))
1272 }
1273}
1274
1275impl Clone for ConnectionRef {
1276 fn clone(&self) -> Self {
1277 Self::from_arc(Arc::clone(&self.0))
1278 }
1279}
1280
1281impl Drop for ConnectionRef {
1282 fn drop(&mut self) {
1283 if self.shared.ref_count.fetch_sub(1, Ordering::Relaxed) > 1 {
1284 return;
1285 }
1286
1287 let conn = &mut *self.lock_without_waking("drop");
1288
1289 if !conn.inner.is_closed() {
1290 // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1291 // not, we can't do any harm. If there were any streams being opened, then either
1292 // the connection will be closed for an unrelated reason or a fresh reference will
1293 // be constructed for the newly opened stream.
1294 conn.implicit_close(&self.shared);
1295 }
1296 }
1297}
1298
1299impl std::ops::Deref for ConnectionRef {
1300 type Target = ConnectionInner;
1301 fn deref(&self) -> &Self::Target {
1302 &self.0
1303 }
1304}
1305
1306#[derive(Debug)]
1307pub(crate) struct ConnectionInner {
1308 /// Kept private intentionally, use [`Self::lock_and_wake`].
1309 state: Mutex<State>,
1310 pub(crate) shared: Shared,
1311}
1312
1313impl ConnectionInner {
1314 /// Lock the state and return a guard that wakes the connection driver on drop.
1315 ///
1316 /// Use this for operations that may queue frames. The wake ensures the driver sends queued frames.
1317 /// If that's not needed, use [`Self::lock_without_waking`].
1318 pub(crate) fn lock_and_wake(&self, purpose: &'static str) -> WakeGuard<'_> {
1319 WakeGuard {
1320 guard: self.state.lock(purpose),
1321 wake: true,
1322 }
1323 }
1324
1325 /// Lock the state and return a guard that unlocks once dropped.
1326 ///
1327 /// Use this for operations that don't require any action from the connection driver.
1328 /// Otherwise, use [`Self::lock_and_wake`] instead.
1329 pub(crate) fn lock_without_waking(&self, purpose: &'static str) -> WakeGuard<'_> {
1330 WakeGuard {
1331 guard: self.state.lock(purpose),
1332 wake: false,
1333 }
1334 }
1335}
1336
1337/// [`MutexGuard`] wrapper that calls [`State::wake`] on drop.
1338#[derive(derive_more::Deref, derive_more::DerefMut)]
1339pub(crate) struct WakeGuard<'a> {
1340 #[deref]
1341 #[deref_mut]
1342 guard: MutexGuard<'a, State>,
1343 wake: bool,
1344}
1345
1346impl WakeGuard<'_> {
1347 pub(crate) fn skip_waking(&mut self) {
1348 self.wake = false;
1349 }
1350}
1351
1352impl Drop for WakeGuard<'_> {
1353 fn drop(&mut self) {
1354 if self.wake {
1355 self.guard.wake();
1356 }
1357 }
1358}
1359
1360/// A handle to some connection internals, use with care.
1361///
1362/// This contains a weak reference to the connection so will not itself keep the connection
1363/// alive.
1364#[derive(Debug, Clone)]
1365pub struct WeakConnectionHandle(Weak<Arc<ConnectionInner>>);
1366
1367impl WeakConnectionHandle {
1368 /// Returns `true` if the [`Connection`] associated with this handle is still alive.
1369 pub fn is_alive(&self) -> bool {
1370 self.0.upgrade().is_some()
1371 }
1372
1373 /// Upgrade the handle to a full `Connection`
1374 pub fn upgrade(&self) -> Option<Connection> {
1375 self.upgrade_to_ref().map(Connection)
1376 }
1377
1378 pub(crate) fn upgrade_to_ref(&self) -> Option<ConnectionRef> {
1379 self.0.upgrade().map(ConnectionRef::from_arc)
1380 }
1381
1382 /// Returns `true` if the two [`WeakConnectionHandle`] point at the same connection.
1383 pub fn is_same_connection(&self, other: &Self) -> bool {
1384 self.0.ptr_eq(&other.0)
1385 }
1386}
1387
1388#[derive(Debug, Default)]
1389pub(crate) struct Shared {
1390 handshake_confirmed: Notify,
1391 /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1392 /// control budget
1393 stream_budget_available: [Notify; 2],
1394 /// Notified when the peer has initiated a new stream
1395 stream_incoming: [Notify; 2],
1396 datagram_received: Notify,
1397 datagrams_unblocked: Notify,
1398 closed: Notify,
1399 /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1400 ref_count: AtomicUsize,
1401}
1402
1403pub(crate) struct State {
1404 pub(crate) inner: proto::Connection,
1405 driver: Option<Waker>,
1406 handle: ConnectionHandle,
1407 on_handshake_data: Option<oneshot::Sender<()>>,
1408 on_connected: Option<oneshot::Sender<bool>>,
1409 connected: bool,
1410 handshake_confirmed: bool,
1411 timer: Option<Pin<Box<dyn AsyncTimer>>>,
1412 timer_deadline: Option<Instant>,
1413 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1414 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1415 pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1416 pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1417 pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1418 /// Always set to Some before the connection becomes drained
1419 pub(crate) error: Option<ConnectionError>,
1420 /// Tracks paths being opened
1421 open_path: FxHashMap<PathId, watch::Sender<Result<(), PathError>>>,
1422 /// Tracks reference counts for paths.
1423 ///
1424 /// I.e. how many [`Path`] and [`WeakPathHandle`] structs are alive for a path.
1425 /// Each entry's [`PathRefOwner`] holds an [`AtomicUsize`] so that cloning or
1426 /// dropping a [`PathRef`] (held by [`Path`] or [`WeakPathHandle`]) does not need
1427 /// to lock this state.
1428 ///
1429 /// [`WeakPathHandle`]: crate::path::WeakPathHandle
1430 pub(crate) path_refs: FxHashMap<PathId, PathRefOwner>,
1431 /// Final path stats for discarded paths.
1432 ///
1433 /// We only insert entries if the discarded path has a non-zero reference count in [`Self::path_refs`].
1434 /// When the last reference to a path is dropped its entry is removed from both maps.
1435 pub(crate) final_path_stats: FxHashMap<PathId, PathStats>,
1436 pub(crate) path_events: tokio::sync::broadcast::Sender<PathEvent>,
1437 sender: Pin<Box<dyn UdpSender>>,
1438 pub(crate) runtime: Arc<dyn Runtime>,
1439 send_buffer: Vec<u8>,
1440 /// We buffer a transmit when the underlying I/O would block
1441 buffered_transmit: Option<proto::Transmit>,
1442 /// Our last external address reported by the peer. When multipath is enabled, this will be the
1443 /// last report across all paths.
1444 pub(crate) observed_external_addr: watch::Sender<Option<SocketAddr>>,
1445 pub(crate) nat_traversal_updates: tokio::sync::broadcast::Sender<n0_nat_traversal::Event>,
1446 on_closed: Vec<oneshot::Sender<Closed>>,
1447}
1448
1449impl State {
1450 #[allow(clippy::too_many_arguments)]
1451 fn new(
1452 inner: proto::Connection,
1453 handle: ConnectionHandle,
1454 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1455 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1456 on_handshake_data: oneshot::Sender<()>,
1457 on_connected: oneshot::Sender<bool>,
1458 sender: Pin<Box<dyn UdpSender>>,
1459 runtime: Arc<dyn Runtime>,
1460 ) -> Self {
1461 Self {
1462 inner,
1463 driver: None,
1464 handle,
1465 on_handshake_data: Some(on_handshake_data),
1466 on_connected: Some(on_connected),
1467 connected: false,
1468 handshake_confirmed: false,
1469 timer: None,
1470 timer_deadline: None,
1471 conn_events,
1472 endpoint_events,
1473 blocked_writers: FxHashMap::default(),
1474 blocked_readers: FxHashMap::default(),
1475 stopped: FxHashMap::default(),
1476 open_path: FxHashMap::default(),
1477 error: None,
1478 sender,
1479 runtime,
1480 send_buffer: Vec::new(),
1481 buffered_transmit: None,
1482 path_events: tokio::sync::broadcast::channel(32).0,
1483 observed_external_addr: watch::Sender::new(None),
1484 nat_traversal_updates: tokio::sync::broadcast::channel(32).0,
1485 on_closed: Vec::new(),
1486 final_path_stats: Default::default(),
1487 path_refs: Default::default(),
1488 }
1489 }
1490
1491 fn drive_transmit(&mut self, cx: &mut Context<'_>) -> io::Result<bool> {
1492 let now = self.runtime.now();
1493 let mut transmits = 0;
1494
1495 let max_datagrams = self
1496 .sender
1497 .max_transmit_segments()
1498 .min(MAX_TRANSMIT_SEGMENTS);
1499
1500 loop {
1501 // Retry the last transmit, or get a new one.
1502 let t = match self.buffered_transmit.take() {
1503 Some(t) => t,
1504 None => {
1505 self.send_buffer.clear();
1506 match self
1507 .inner
1508 .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1509 {
1510 Some(t) => {
1511 transmits += match t.segment_size {
1512 None => 1,
1513 Some(s) => t.size.div_ceil(s), // round up
1514 };
1515 t
1516 }
1517 None => break,
1518 }
1519 }
1520 };
1521
1522 let len = t.size;
1523 match self
1524 .sender
1525 .as_mut()
1526 .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx)
1527 {
1528 Poll::Pending => {
1529 self.buffered_transmit = Some(t);
1530 return Ok(false);
1531 }
1532 Poll::Ready(Err(e)) => return Err(e),
1533 Poll::Ready(Ok(())) => {}
1534 }
1535
1536 if transmits >= MAX_TRANSMIT_DATAGRAMS {
1537 // TODO: What isn't ideal here yet is that if we don't poll all
1538 // datagrams that could be sent we don't go into the `app_limited`
1539 // state and CWND continues to grow until we get here the next time.
1540 // See https://github.com/quinn-rs/quinn/issues/1126
1541 return Ok(true);
1542 }
1543 }
1544
1545 Ok(false)
1546 }
1547
1548 fn forward_endpoint_events(&mut self) {
1549 while let Some(event) = self.inner.poll_endpoint_events() {
1550 // If the endpoint driver is gone, noop.
1551 let _ = self.endpoint_events.send((self.handle, event));
1552 }
1553 }
1554
1555 /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1556 fn process_conn_events(
1557 &mut self,
1558 shared: &Shared,
1559 cx: &mut Context<'_>,
1560 ) -> Result<(), ConnectionError> {
1561 loop {
1562 match self.conn_events.poll_recv(cx) {
1563 Poll::Ready(Some(ConnectionEvent::Rebind(sender))) => {
1564 self.sender = sender;
1565 self.inner.handle_network_change(None, self.runtime.now());
1566 }
1567 Poll::Ready(Some(ConnectionEvent::LocalAddressChanged(hint))) => {
1568 self.inner
1569 .handle_network_change(hint.as_deref().map(|x| x as _), self.runtime.now());
1570 }
1571 Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1572 self.inner.handle_event(event);
1573 }
1574 Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1575 self.close(error_code, reason, shared);
1576 }
1577 Poll::Ready(None) => {
1578 return Err(ConnectionError::TransportError(TransportError::new(
1579 TransportErrorCode::INTERNAL_ERROR,
1580 "endpoint driver future was dropped".to_string(),
1581 )));
1582 }
1583 Poll::Pending => {
1584 return Ok(());
1585 }
1586 }
1587 }
1588 }
1589
1590 fn forward_app_events(&mut self, shared: &Shared) {
1591 while let Some(event) = self.inner.poll() {
1592 use proto::Event::*;
1593 match event {
1594 HandshakeDataReady => {
1595 if let Some(x) = self.on_handshake_data.take() {
1596 let _ = x.send(());
1597 }
1598 }
1599 Connected => {
1600 self.connected = true;
1601 if let Some(x) = self.on_connected.take() {
1602 // We don't care if the on-connected future was dropped
1603 let _ = x.send(self.inner.accepted_0rtt());
1604 }
1605 if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1606 // Wake up rejected 0-RTT streams so they can fail immediately with
1607 // `ZeroRttRejected` errors.
1608 wake_all(&mut self.blocked_writers);
1609 wake_all(&mut self.blocked_readers);
1610 wake_all_notify(&mut self.stopped);
1611 }
1612 }
1613 HandshakeConfirmed => {
1614 self.handshake_confirmed = true;
1615 shared.handshake_confirmed.notify_waiters();
1616 }
1617 ConnectionLost { reason } => {
1618 self.terminate(reason, shared);
1619 }
1620 Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1621 Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1622 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1623 }
1624 Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1625 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1626 }
1627 DatagramReceived => {
1628 shared.datagram_received.notify_waiters();
1629 }
1630 DatagramsUnblocked => {
1631 shared.datagrams_unblocked.notify_waiters();
1632 }
1633 Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1634 Stream(StreamEvent::Available { dir }) => {
1635 // Might mean any number of streams are ready, so we wake up everyone
1636 shared.stream_budget_available[dir as usize].notify_waiters();
1637 }
1638 Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1639 Stream(StreamEvent::Stopped { id, .. }) => {
1640 wake_stream_notify(id, &mut self.stopped);
1641 wake_stream(id, &mut self.blocked_writers);
1642 }
1643 Path(ref evt @ PathEvent::ObservedAddr { addr: observed, .. }) => {
1644 self.path_events.send(evt.clone()).ok();
1645 self.observed_external_addr.send_if_modified(|addr| {
1646 let old = addr.replace(observed);
1647 old != *addr
1648 });
1649 }
1650 Path(ref evt @ PathEvent::Established { id, .. }) => {
1651 self.path_events.send(evt.clone()).ok();
1652 if let Some(sender) = self.open_path.remove(&id) {
1653 sender.send_modify(|value| *value = Ok(()));
1654 }
1655 }
1656 Path(
1657 ref evt @ PathEvent::Discarded {
1658 id, ref path_stats, ..
1659 },
1660 ) => {
1661 if self.path_refs.contains_key(&id) {
1662 self.final_path_stats.insert(id, *path_stats.clone());
1663 }
1664 self.path_events.send(evt.clone()).ok();
1665 }
1666 Path(ref evt @ PathEvent::Abandoned { id, .. }) => {
1667 if let Some(sender) = self.open_path.remove(&id) {
1668 // We don't care for the reason why this path was closed here, because semantically
1669 // all close reasons for a path that has not yet been opened equals to `ValidationFailed`.
1670 // With the noq API, there is no way to application-close a not-yet-opened path, so
1671 // `ApplicationClosed` cannot occur. And all other variants will only occur for paths
1672 // that have already been opened.
1673 // The previous iteration of this code had another event `PathEvent::LocallyClosed` which
1674 // contained a `PathError`, but that was only ever set to `ValidationFailed`.
1675 let error = PathError::ValidationFailed;
1676 sender.send_modify(|value| *value = Err(error));
1677 }
1678 // this will happen also for already opened paths
1679 self.path_events.send(evt.clone()).ok();
1680 }
1681 Path(evt @ PathEvent::RemoteStatus { .. }) => {
1682 self.path_events.send(evt).ok();
1683 }
1684 NatTraversal(update) => {
1685 self.nat_traversal_updates.send(update).ok();
1686 }
1687 _ => {
1688 // PathEvent is #[non_exhaustive].
1689 // It's possible that noq is built against a newer noq-proto version.
1690 // In that case, we need to ignore path events we can't handle yet.
1691 // But for tests, we expect noq and noq-proto to be in sync, so we
1692 // should panic in case we don't actually handle new cases.
1693 #[cfg(test)]
1694 panic!("Unhandled PathEvent variant: {event:?}");
1695 }
1696 }
1697 }
1698 }
1699
1700 fn drive_timer(&mut self, cx: &mut Context<'_>) -> bool {
1701 // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1702 // timer is registered with the runtime (and check whether it's already
1703 // expired).
1704 match self.inner.poll_timeout() {
1705 Some(deadline) => {
1706 if let Some(delay) = &mut self.timer {
1707 // There is no need to reset the tokio timer if the deadline
1708 // did not change
1709 if self
1710 .timer_deadline
1711 .map(|current_deadline| current_deadline != deadline)
1712 .unwrap_or(true)
1713 {
1714 delay.as_mut().reset(deadline);
1715 }
1716 } else {
1717 self.timer = Some(self.runtime.new_timer(deadline));
1718 }
1719 // Store the actual expiration time of the timer
1720 self.timer_deadline = Some(deadline);
1721 }
1722 None => {
1723 self.timer_deadline = None;
1724 return false;
1725 }
1726 }
1727
1728 if self.timer_deadline.is_none() {
1729 return false;
1730 }
1731
1732 let delay = self
1733 .timer
1734 .as_mut()
1735 .expect("timer must exist in this state")
1736 .as_mut();
1737 if delay.poll(cx).is_pending() {
1738 // Since there wasn't a timeout event, there is nothing new
1739 // for the connection to do
1740 return false;
1741 }
1742
1743 // A timer expired, so the caller needs to check for
1744 // new transmits, which might cause new timers to be set.
1745 self.inner.handle_timeout(self.runtime.now());
1746 self.timer_deadline = None;
1747 true
1748 }
1749
1750 /// Wake up a blocked `Driver` task to process I/O
1751 pub(crate) fn wake(&mut self) {
1752 if let Some(x) = self.driver.take() {
1753 x.wake();
1754 }
1755 }
1756
1757 /// Used to wake up all blocked futures when the connection becomes closed for any reason
1758 fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1759 self.error = Some(reason.clone());
1760 if let Some(x) = self.on_handshake_data.take() {
1761 let _ = x.send(());
1762 }
1763 wake_all(&mut self.blocked_writers);
1764 wake_all(&mut self.blocked_readers);
1765 shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1766 shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1767 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1768 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1769 shared.datagram_received.notify_waiters();
1770 shared.datagrams_unblocked.notify_waiters();
1771 if let Some(x) = self.on_connected.take() {
1772 let _ = x.send(false);
1773 }
1774 shared.handshake_confirmed.notify_waiters();
1775 wake_all_notify(&mut self.stopped);
1776 shared.closed.notify_waiters();
1777
1778 // Send to the registered on_closed futures.
1779 if !self.on_closed.is_empty() {
1780 let closed = Closed::new(self, reason);
1781 for tx in self.on_closed.drain(..) {
1782 tx.send(closed.clone()).ok();
1783 }
1784 }
1785 }
1786
1787 fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1788 self.inner.close(self.runtime.now(), error_code, reason);
1789 self.terminate(ConnectionError::LocallyClosed, shared);
1790 self.wake();
1791 }
1792
1793 /// Close for a reason other than the application's explicit request
1794 pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1795 self.close(0u32.into(), Bytes::new(), shared);
1796 }
1797
1798 pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1799 if self.inner.is_handshaking()
1800 || self.inner.accepted_0rtt()
1801 || self.inner.side().is_server()
1802 {
1803 Ok(())
1804 } else {
1805 Err(())
1806 }
1807 }
1808
1809 /// Returns [`PathStats`] for a path, if available.
1810 ///
1811 /// This gets the stats from [`proto::Connection`]. If that returns `None`
1812 /// it gets them from `Self::final_path_stats` instead.
1813 pub(crate) fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
1814 self.inner
1815 .path_stats(path_id)
1816 .or_else(|| self.final_path_stats.get(&path_id).copied())
1817 }
1818
1819 /// Acquire a new [`PathRef`] for a path id, bumping its reference counter by 1.
1820 ///
1821 /// The returned [`PathRef`] is intended to be stored on a [`Path`] or [`WeakPathHandle`].
1822 /// Its reference count is automatically increased when cloned. When its owner is dropped,
1823 /// [`PathRef::on_drop`] must be called to decrement the refcount.
1824 ///
1825 /// [`WeakPathHandle`]: crate::path::WeakPathHandle
1826 pub(crate) fn acquire_path_ref(&mut self, path_id: PathId) -> PathRef {
1827 self.path_refs.entry(path_id).or_default().acquire(path_id)
1828 }
1829}
1830
1831impl Drop for State {
1832 fn drop(&mut self) {
1833 if !self.inner.is_drained() {
1834 // Ensure the endpoint can tidy up
1835 let _ = self
1836 .endpoint_events
1837 .send((self.handle, proto::EndpointEvent::drained()));
1838 }
1839
1840 if !self.on_closed.is_empty()
1841 && let Some(reason) = self.error.clone()
1842 {
1843 // Ensure that all on_closed oneshot senders are triggered before dropping.
1844 let closed = Closed::new(self, reason);
1845 for tx in self.on_closed.drain(..) {
1846 tx.send(closed.clone()).ok();
1847 }
1848 }
1849 }
1850}
1851
1852impl fmt::Debug for State {
1853 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1854 f.debug_struct("State").field("inner", &self.inner).finish()
1855 }
1856}
1857
1858fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1859 if let Some(waker) = wakers.remove(&stream_id) {
1860 waker.wake();
1861 }
1862}
1863
1864fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1865 wakers.drain().for_each(|(_, waker)| waker.wake())
1866}
1867
1868fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1869 if let Some(notify) = wakers.remove(&stream_id) {
1870 notify.notify_waiters()
1871 }
1872}
1873
1874fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1875 wakers
1876 .drain()
1877 .for_each(|(_, notify)| notify.notify_waiters())
1878}
1879
1880/// Errors that can arise when sending a datagram
1881#[derive(Debug, Error, Clone, Eq, PartialEq)]
1882pub enum SendDatagramError {
1883 /// The peer does not support receiving datagram frames
1884 #[error("datagrams not supported by peer")]
1885 UnsupportedByPeer,
1886 /// Datagram support is disabled locally
1887 #[error("datagram support disabled")]
1888 Disabled,
1889 /// The datagram is larger than the connection can currently accommodate
1890 ///
1891 /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1892 /// exceeded.
1893 #[error("datagram too large")]
1894 TooLarge,
1895 /// The connection was lost
1896 #[error("connection lost")]
1897 ConnectionLost(#[from] ConnectionError),
1898}
1899
1900/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1901///
1902/// This limits the amount of CPU resources consumed by datagram generation,
1903/// and allows other tasks (like receiving ACKs) to run in between.
1904const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1905
1906/// The maximum amount of datagrams that are sent in a single transmit
1907///
1908/// This can be lower than the maximum platform capabilities, to avoid excessive
1909/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1910/// that numbers around 10 are a good compromise.
1911const MAX_TRANSMIT_SEGMENTS: NonZeroUsize = NonZeroUsize::new(10).expect("known");