noq_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque, btree_map},
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    num::{NonZeroU32, NonZeroUsize},
8    sync::Arc,
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13
14use rand::{Rng, SeedableRng, rngs::StdRng};
15use rustc_hash::{FxHashMap, FxHashSet};
16use thiserror::Error;
17use tracing::{debug, error, trace, trace_span, warn};
18
19use crate::{
20    Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
21    MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
22    TransportError, TransportErrorCode, VarInt,
23    cid_generator::ConnectionIdGenerator,
24    cid_queue::CidQueue,
25    config::{ServerConfig, TransportConfig},
26    congestion::Controller,
27    connection::{
28        qlog::{QlogRecvPacket, QlogSink},
29        spaces::LostPacket,
30        timer::{ConnTimer, PathTimer},
31    },
32    crypto::{self, Keys},
33    frame::{
34        self, Close, DataBlocked, Datagram, FrameStruct, NewToken, ObservedAddr, StreamDataBlocked,
35        StreamsBlocked,
36    },
37    n0_nat_traversal,
38    packet::{
39        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40        PacketNumber, PartialDecode, SpaceId,
41    },
42    range_set::ArrayRangeSet,
43    shared::{
44        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45        EndpointEvent, EndpointEventInner,
46    },
47    token::{ResetToken, Token, TokenPayload},
48    transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54mod assembler;
55pub use assembler::Chunk;
56
57mod cid_state;
58use cid_state::CidState;
59
60mod datagrams;
61use datagrams::DatagramState;
62pub use datagrams::{Datagrams, SendDatagramError};
63
64mod mtud;
65mod pacing;
66
67mod packet_builder;
68use packet_builder::{PacketBuilder, PadDatagram};
69
70mod packet_crypto;
71use packet_crypto::CryptoState;
72pub(crate) use packet_crypto::EncryptionLevel;
73
74mod paths;
75pub use paths::{
76    ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError,
77};
78use paths::{PathData, PathState};
79
80pub(crate) mod qlog;
81pub(crate) mod send_buffer;
82
83mod spaces;
84#[cfg(fuzzing)]
85pub use spaces::Retransmits;
86#[cfg(not(fuzzing))]
87use spaces::Retransmits;
88pub(crate) use spaces::SpaceKind;
89use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
90
91mod stats;
92pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
93
94mod streams;
95#[cfg(fuzzing)]
96pub use streams::StreamsState;
97#[cfg(not(fuzzing))]
98use streams::StreamsState;
99pub use streams::{
100    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
101    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
102};
103
104mod timer;
105use timer::{Timer, TimerTable};
106
107mod transmit_buf;
108use transmit_buf::TransmitBuf;
109
110mod state;
111
112#[cfg(not(fuzzing))]
113use state::State;
114#[cfg(fuzzing)]
115pub use state::State;
116use state::StateType;
117
118/// Protocol state and logic for a single QUIC connection
119///
120/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
121/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
122/// expects timeouts through various methods. A number of simple getter methods are exposed
123/// to allow callers to inspect some of the connection state.
124///
125/// `Connection` has roughly 4 types of methods:
126///
127/// - A. Simple getters, taking `&self`
128/// - B. Handlers for incoming events from the network or system, named `handle_*`.
129/// - C. State machine mutators, for incoming commands from the application. For convenience we
130///   refer to this as "performing I/O" below, however as per the design of this library none of the
131///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
132///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
133/// - D. Polling functions for outgoing events or actions for the caller to
134///   take, named `poll_*`.
135///
136/// The simplest way to use this API correctly is to call (B) and (C) whenever
137/// appropriate, then after each of those calls, as soon as feasible call all
138/// polling methods (D) and deal with their outputs appropriately, e.g. by
139/// passing it to the application or by making a system-level I/O call. You
140/// should call the polling functions in this order:
141///
142/// 1. [`poll_transmit`](Self::poll_transmit)
143/// 2. [`poll_timeout`](Self::poll_timeout)
144/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
145/// 4. [`poll`](Self::poll)
146///
147/// Currently the only actual dependency is from (2) to (1), however additional
148/// dependencies may be added in future, so the above order is recommended.
149///
150/// (A) may be called whenever desired.
151///
152/// Care should be made to ensure that the input events represent monotonically
153/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
154/// with events of the same [`Instant`] may be interleaved in any order with a
155/// call to [`handle_event`](Self::handle_event) at that same instant; however
156/// events or timeouts with different instants must not be interleaved.
157pub struct Connection {
158    endpoint_config: Arc<EndpointConfig>,
159    config: Arc<TransportConfig>,
160    rng: StdRng,
161    /// Consolidated cryptographic state
162    crypto_state: CryptoState,
163    /// The CID we initially chose, for use during the handshake
164    handshake_cid: ConnectionId,
165    /// The CID the peer initially chose, for use during the handshake
166    remote_handshake_cid: ConnectionId,
167    /// The [`PathData`] for each path
168    ///
169    /// This needs to be ordered because [`Connection::poll_transmit`] needs to
170    /// deterministically select the next PathId to send on.
171    // TODO(flub): well does it really? But deterministic is nice for now.
172    paths: BTreeMap<PathId, PathState>,
173    /// Counter to uniquely identify every [`PathData`] created in this connection.
174    ///
175    /// Each [`PathData`] gets a [`PathData::generation`] that is unique among all
176    /// [`PathData`]s created in the lifetime of this connection. This helps identify the
177    /// correct path when RFC9000-style migrations happen, even when they are
178    /// aborted.
179    ///
180    /// Multipath does not change this, each path can also undergo RFC9000-style
181    /// migrations. So a single multipath path ID could see several [`PathData`]s each with
182    /// their unique [`PathData::generation].
183    path_generation_counter: u64,
184    /// Whether MTU detection is supported in this environment
185    allow_mtud: bool,
186    state: State,
187    side: ConnectionSide,
188    /// Transport parameters set by the peer
189    peer_params: TransportParameters,
190    /// Source ConnectionId of the first packet received from the peer
191    original_remote_cid: ConnectionId,
192    /// Destination ConnectionId sent by the client on the first Initial
193    initial_dst_cid: ConnectionId,
194    /// The value that the server included in the Source Connection ID field of a Retry packet, if
195    /// one was received
196    retry_src_cid: Option<ConnectionId>,
197    /// Events returned by [`Connection::poll`]
198    events: VecDeque<Event>,
199    endpoint_events: VecDeque<EndpointEventInner>,
200    /// Whether the spin bit is in use for this connection
201    spin_enabled: bool,
202    /// Outgoing spin bit state
203    spin: bool,
204    /// Packet number spaces: initial, handshake, 1-RTT
205    spaces: [PacketSpace; 3],
206    /// Highest usable packet space.
207    highest_space: SpaceKind,
208    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
209    permit_idle_reset: bool,
210    /// Negotiated idle timeout
211    idle_timeout: Option<Duration>,
212    timers: TimerTable,
213    /// Number of packets received which could not be authenticated
214    authentication_failures: u64,
215
216    //
217    // Queued non-retransmittable 1-RTT data
218    //
219    /// If the CONNECTION_CLOSE frame needs to be sent
220    connection_close_pending: bool,
221
222    //
223    // ACK frequency
224    //
225    ack_frequency: AckFrequencyState,
226
227    //
228    // Congestion Control
229    //
230    /// Whether the most recently received packet had an ECN codepoint set
231    receiving_ecn: bool,
232    /// Number of packets authenticated
233    total_authed_packets: u64,
234    /// Whether the last `poll_transmit` call yielded no data because there was
235    /// no outgoing application data.
236    app_limited: bool,
237
238    //
239    // ObservedAddr
240    //
241    /// Sequence number for the next observed address frame sent to the peer.
242    next_observed_addr_seq_no: VarInt,
243
244    streams: StreamsState,
245    /// Active and surplus CIDs issued by the remote, for future use on new paths.
246    ///
247    /// These are given out before multiple paths exist, also for paths that will never
248    /// exist.  So if multipath is supported the number of paths here will be higher than
249    /// the actual number of paths in use.
250    remote_cids: FxHashMap<PathId, CidQueue>,
251    /// Attributes of CIDs generated by local endpoint
252    ///
253    /// Any path that is allowed to be opened is present in this map, as well as the already
254    /// opened paths. However since CIDs are issued async by the endpoint driver via
255    /// connection events it can not be used to know if CIDs have been issued for a path or
256    /// not. See [`Connection::max_path_id_with_cids`] for this.
257    local_cid_state: FxHashMap<PathId, CidState>,
258    /// State of the unreliable datagram extension
259    datagrams: DatagramState,
260    /// Connection level statistics
261    stats: ConnectionStats,
262    /// Path level statistics
263    path_stats: FxHashMap<PathId, PathStats>,
264    /// QUIC version used for the connection.
265    version: u32,
266
267    //
268    // Multipath
269    //
270    /// Maximum number of concurrent paths
271    ///
272    /// Initially set from the [`TransportConfig::max_concurrent_multipath_paths`]. Even
273    /// when multipath is disabled this will be set to 1, it is not used in that case
274    /// though.
275    max_concurrent_paths: NonZeroU32,
276    /// Local maximum [`PathId`] to be used
277    ///
278    /// This is initially set to [`TransportConfig::get_initial_max_path_id`] when multipath
279    /// is negotiated, or to [`PathId::ZERO`] otherwise. This is essentially the value of
280    /// the highest MAX_PATH_ID frame sent.
281    ///
282    /// Any path with an ID equal or below this [`PathId`] is either:
283    ///
284    /// - Abandoned, if it is also in [`Connection::abandoned_paths`].
285    /// - Open, in this case it is present in [`Connection::paths`]
286    /// - Not yet opened, if it is in neither of these two places.
287    ///
288    /// Note that for not-yet-open there may or may not be any CIDs issued. See
289    /// [`Connection::max_path_id_with_cids`].
290    local_max_path_id: PathId,
291    /// Remote's maximum [`PathId`] to be used
292    ///
293    /// This is initially set to the peer's [`TransportParameters::initial_max_path_id`] when
294    /// multipath is negotiated, or to [`PathId::ZERO`] otherwise. A peer may increase this limit
295    /// by sending [`Frame::MaxPathId`] frames.
296    remote_max_path_id: PathId,
297    /// The greatest [`PathId`] we have issued CIDs for
298    ///
299    /// CIDs are only issued for `min(local_max_path_id, remote_max_path_id)`. It is not
300    /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
301    /// since they are issued asynchronously by the endpoint driver.
302    max_path_id_with_cids: PathId,
303    /// The paths already abandoned
304    ///
305    /// They may still have some state left in [`Connection::paths`] or
306    /// [`Connection::local_cid_state`] since some of this has to be kept around for some
307    /// time after a path is abandoned.
308    // TODO(flub): Make this a more efficient data structure.  Like ranges of abandoned
309    //    paths.  Or a set together with a minimum.  Or something.
310    abandoned_paths: FxHashSet<PathId>,
311
312    /// State for n0's (<https://n0.computer>) nat traversal protocol.
313    n0_nat_traversal: n0_nat_traversal::State,
314    qlog: QlogSink,
315}
316
317impl Connection {
318    pub(crate) fn new(
319        endpoint_config: Arc<EndpointConfig>,
320        config: Arc<TransportConfig>,
321        init_cid: ConnectionId,
322        local_cid: ConnectionId,
323        remote_cid: ConnectionId,
324        network_path: FourTuple,
325        crypto: Box<dyn crypto::Session>,
326        cid_gen: &dyn ConnectionIdGenerator,
327        now: Instant,
328        version: u32,
329        allow_mtud: bool,
330        rng_seed: [u8; 32],
331        side_args: SideArgs,
332        qlog: QlogSink,
333    ) -> Self {
334        let pref_addr_cid = side_args.pref_addr_cid();
335        let path_validated = side_args.path_validated();
336        let connection_side = ConnectionSide::from(side_args);
337        let side = connection_side.side();
338        let mut rng = StdRng::from_seed(rng_seed);
339        let initial_space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
340        let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
341        #[cfg(test)]
342        let data_space = match config.deterministic_packet_numbers {
343            true => PacketSpace::new_deterministic(now, SpaceId::Data),
344            false => PacketSpace::new(now, SpaceId::Data, &mut rng),
345        };
346        #[cfg(not(test))]
347        let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
348        let state = State::handshake(state::Handshake {
349            remote_cid_set: side.is_server(),
350            expected_token: Bytes::new(),
351            client_hello: None,
352            allow_server_migration: side.is_client(),
353        });
354        let local_cid_state = FxHashMap::from_iter([(
355            PathId::ZERO,
356            CidState::new(
357                cid_gen.cid_len(),
358                cid_gen.cid_lifetime(),
359                now,
360                if pref_addr_cid.is_some() { 2 } else { 1 },
361            ),
362        )]);
363
364        let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
365        path.open_status = paths::OpenStatus::Informed;
366        let mut this = Self {
367            endpoint_config,
368            crypto_state: CryptoState::new(crypto, init_cid, side, &mut rng),
369            handshake_cid: local_cid,
370            remote_handshake_cid: remote_cid,
371            local_cid_state,
372            paths: BTreeMap::from_iter([(
373                PathId::ZERO,
374                PathState {
375                    data: path,
376                    prev: None,
377                },
378            )]),
379            path_generation_counter: 0,
380            allow_mtud,
381            state,
382            side: connection_side,
383            peer_params: TransportParameters::default(),
384            original_remote_cid: remote_cid,
385            initial_dst_cid: init_cid,
386            retry_src_cid: None,
387            events: VecDeque::new(),
388            endpoint_events: VecDeque::new(),
389            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
390            spin: false,
391            spaces: [initial_space, handshake_space, data_space],
392            highest_space: SpaceKind::Initial,
393            permit_idle_reset: true,
394            idle_timeout: match config.max_idle_timeout {
395                None | Some(VarInt(0)) => None,
396                Some(dur) => Some(Duration::from_millis(dur.0)),
397            },
398            timers: TimerTable::default(),
399            authentication_failures: 0,
400            connection_close_pending: false,
401
402            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
403                &TransportParameters::default(),
404            )),
405
406            app_limited: false,
407            receiving_ecn: false,
408            total_authed_packets: 0,
409
410            next_observed_addr_seq_no: 0u32.into(),
411
412            streams: StreamsState::new(
413                side,
414                config.max_concurrent_uni_streams,
415                config.max_concurrent_bidi_streams,
416                config.send_window,
417                config.receive_window,
418                config.stream_receive_window,
419            ),
420            datagrams: DatagramState::default(),
421            config,
422            remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]),
423            rng,
424            stats: ConnectionStats::default(),
425            path_stats: Default::default(),
426            version,
427
428            // peer params are not yet known, so multipath is not enabled
429            max_concurrent_paths: NonZeroU32::MIN,
430            local_max_path_id: PathId::ZERO,
431            remote_max_path_id: PathId::ZERO,
432            max_path_id_with_cids: PathId::ZERO,
433            abandoned_paths: Default::default(),
434
435            n0_nat_traversal: Default::default(),
436            qlog,
437        };
438        if path_validated {
439            this.on_path_validated(PathId::ZERO);
440        }
441        if side.is_client() {
442            // Kick off the connection
443            this.write_crypto();
444            this.init_0rtt(now);
445        }
446        this.qlog
447            .emit_tuple_assigned(PathId::ZERO, network_path, now);
448        this
449    }
450
451    /// Returns the next time at which `handle_timeout` should be called
452    ///
453    /// The value returned may change after:
454    /// - the application performed some I/O on the connection
455    /// - a call was made to `handle_event`
456    /// - a call to `poll_transmit` returned `Some`
457    /// - a call was made to `handle_timeout`
458    #[must_use]
459    pub fn poll_timeout(&mut self) -> Option<Instant> {
460        self.timers.peek()
461    }
462
463    /// Returns application-facing events
464    ///
465    /// Connections should be polled for events after:
466    /// - a call was made to `handle_event`
467    /// - a call was made to `handle_timeout`
468    #[must_use]
469    pub fn poll(&mut self) -> Option<Event> {
470        if let Some(x) = self.events.pop_front() {
471            return Some(x);
472        }
473
474        if let Some(event) = self.streams.poll() {
475            return Some(Event::Stream(event));
476        }
477
478        if let Some(reason) = self.state.take_error() {
479            return Some(Event::ConnectionLost { reason });
480        }
481
482        None
483    }
484
485    /// Return endpoint-facing events
486    #[must_use]
487    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
488        self.endpoint_events.pop_front().map(EndpointEvent)
489    }
490
491    /// Provide control over streams
492    #[must_use]
493    pub fn streams(&mut self) -> Streams<'_> {
494        Streams {
495            state: &mut self.streams,
496            conn_state: &self.state,
497        }
498    }
499
500    /// Provide control over streams
501    #[must_use]
502    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
503        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
504        RecvStream {
505            id,
506            state: &mut self.streams,
507            pending: &mut self.spaces[SpaceId::Data].pending,
508        }
509    }
510
511    /// Provide control over streams
512    #[must_use]
513    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
514        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
515        SendStream {
516            id,
517            state: &mut self.streams,
518            pending: &mut self.spaces[SpaceId::Data].pending,
519            conn_state: &self.state,
520        }
521    }
522
523    /// Opens a new path only if no path on the same network path currently exists.
524    ///
525    /// This comparison will use [`FourTuple::is_probably_same_path`] on the given `network_path`
526    /// and pass it existing path's network paths.
527    ///
528    /// This means that you can pass `local_ip: None` to make the comparison only compare
529    /// remote addresses.
530    ///
531    /// This avoids having to guess which local interface will be used to communicate with the
532    /// remote, should it not be known yet. We assume that if we already have a path to the remote,
533    /// the OS is likely to use the same interface to talk to said remote.
534    ///
535    /// See also [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id,
536    /// false)` if was opened.
537    ///
538    /// [`open_path`]: Connection::open_path
539    pub fn open_path_ensure(
540        &mut self,
541        network_path: FourTuple,
542        initial_status: PathStatus,
543        now: Instant,
544    ) -> Result<(PathId, bool), PathError> {
545        let existing_open_path = self.paths.iter().find(|(id, path)| {
546            network_path.is_probably_same_path(&path.data.network_path)
547                && !self.abandoned_paths.contains(*id)
548        });
549        match existing_open_path {
550            Some((path_id, _state)) => Ok((*path_id, true)),
551            None => Ok((self.open_path(network_path, initial_status, now)?, false)),
552        }
553    }
554
555    /// Opens a new path
556    ///
557    /// Further errors might occur and they will be emitted in [`PathEvent::Abandoned`] events with this path id.
558    /// When the path is opened it will be reported as an [`PathEvent::Opened`].
559    pub fn open_path(
560        &mut self,
561        network_path: FourTuple,
562        initial_status: PathStatus,
563        now: Instant,
564    ) -> Result<PathId, PathError> {
565        if !self.is_multipath_negotiated() {
566            return Err(PathError::MultipathNotNegotiated);
567        }
568        if self.side().is_server() {
569            return Err(PathError::ServerSideNotAllowed);
570        }
571
572        let max_abandoned = self.abandoned_paths.iter().max().copied();
573        let max_used = self.paths.keys().last().copied();
574        let path_id = max_abandoned
575            .max(max_used)
576            .unwrap_or(PathId::ZERO)
577            .saturating_add(1u8);
578
579        if Some(path_id) > self.max_path_id() {
580            return Err(PathError::MaxPathIdReached);
581        }
582        if path_id > self.remote_max_path_id {
583            self.spaces[SpaceId::Data].pending.paths_blocked = true;
584            return Err(PathError::MaxPathIdReached);
585        }
586        if self
587            .remote_cids
588            .get(&path_id)
589            .map(CidQueue::active)
590            .is_none()
591        {
592            self.spaces[SpaceId::Data]
593                .pending
594                .path_cids_blocked
595                .insert(path_id);
596            return Err(PathError::RemoteCidsExhausted);
597        }
598
599        let path = self.ensure_path(path_id, network_path, now, None);
600        path.status.local_update(initial_status);
601
602        Ok(path_id)
603    }
604
605    /// Closes a path and sends a PATH_ABANDON frame with the passed error code.
606    ///
607    /// This will not allow closing the last path. It does allow closing paths which have
608    /// not yet been opened, as e.g. is the case when receiving a PATH_ABANDON from the peer
609    /// for a path that was never opened locally.
610    pub fn close_path(
611        &mut self,
612        now: Instant,
613        path_id: PathId,
614        error_code: VarInt,
615    ) -> Result<(), ClosePathError> {
616        self.close_path_inner(
617            now,
618            path_id,
619            PathAbandonReason::ApplicationClosed { error_code },
620        )
621    }
622
623    /// Closes a path and sends a PATH_ABANDON frame.
624    ///
625    /// Other than [`Self::close_path`] this allows to specify the reason for the path being closed.
626    /// Internally, this should be used over [`Self::close_path`].
627    fn close_path_inner(
628        &mut self,
629        now: Instant,
630        path_id: PathId,
631        reason: PathAbandonReason,
632    ) -> Result<(), ClosePathError> {
633        if self.state.is_drained() {
634            return Ok(());
635        }
636
637        if !self.is_multipath_negotiated() {
638            return Err(ClosePathError::MultipathNotNegotiated);
639        }
640        if self.abandoned_paths.contains(&path_id)
641            || Some(path_id) > self.max_path_id()
642            || !self.paths.contains_key(&path_id)
643        {
644            return Err(ClosePathError::ClosedPath);
645        }
646
647        if reason.is_locally_initiated() {
648            let has_remaining_validated_paths = self.paths.iter().any(|(id, path)| {
649                *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
650            });
651            if !has_remaining_validated_paths {
652                return Err(ClosePathError::LastOpenPath);
653            }
654        } else {
655            // The remote abandoned this path. We should always "accept" this. Doing so right now,
656            // however, breaks assumptions throughout the code. We error instead, for the
657            // connection to be killed. See <https://github.com/n0-computer/noq/issues/397>
658            let has_remaining_paths = self
659                .paths
660                .keys()
661                .any(|id| *id != path_id && !self.abandoned_paths.contains(id));
662            if !has_remaining_paths {
663                return Err(ClosePathError::LastOpenPath);
664            }
665        }
666
667        trace!(%path_id, ?reason, "closing path");
668
669        // Send PATH_ABANDON
670        self.spaces[SpaceId::Data]
671            .pending
672            .path_abandon
673            .insert(path_id, reason.error_code());
674
675        // Remove pending NEW CIDs for this path
676        let pending_space = &mut self.spaces[SpaceId::Data].pending;
677        pending_space.new_cids.retain(|cid| cid.path_id != path_id);
678        pending_space.path_cids_blocked.retain(|&id| id != path_id);
679        pending_space.path_status.retain(|&id| id != path_id);
680
681        // Cleanup retransmits across ALL paths (CIDs for path_id may have been transmitted on other paths)
682        for space in self.spaces[SpaceId::Data].iter_paths_mut() {
683            for sent_packet in space.sent_packets.values_mut() {
684                if let Some(retransmits) = sent_packet.retransmits.get_mut() {
685                    retransmits.new_cids.retain(|cid| cid.path_id != path_id);
686                    retransmits.path_cids_blocked.retain(|&id| id != path_id);
687                    retransmits.path_status.retain(|&id| id != path_id);
688                }
689            }
690        }
691
692        // Consider remotely issued CIDs as retired.
693        // Technically we don't have to do this just yet.  We only need to do this *after*
694        // the ABANDON_PATH frame is sent, allowing us to still send it on the
695        // to-be-abandoned path.  However it is recommended to send it on another path, and
696        // we do not allow abandoning the last path anyway.
697        self.remote_cids.remove(&path_id);
698        debug_assert!(!self.state.is_drained()); // requirement for endpoint_events, checked above
699        self.endpoint_events
700            .push_back(EndpointEventInner::RetireResetToken(path_id));
701
702        trace!(%path_id, "abandoning path");
703        self.abandoned_paths.insert(path_id);
704
705        for timer in timer::PathTimer::VALUES {
706            // match for completeness
707            let keep_timer = match timer {
708                // These timers deal with sending and receiving PATH_CHALLENGE and
709                // PATH_RESPONSE, but now that the path is abandoned, we no longer care about
710                // these frames or their timing
711                PathTimer::PathValidation | PathTimer::PathChallengeLost | PathTimer::PathOpen => {
712                    false
713                }
714                // These timers deal with the lifetime of the path. Now that the path is abandoned,
715                // these are not relevant.
716                PathTimer::PathKeepAlive | PathTimer::PathIdle => false,
717                // The path has already been informed that outstanding acks should be sent
718                // immediately
719                PathTimer::MaxAckDelay => false,
720                // This timer should not be set, for completeness it's not kept as it's set when
721                // the PATH_ABANDON frame is sent.
722                PathTimer::DiscardPath => false,
723                // Sent packets still need to be identified as lost to trigger timely
724                // retransmission.
725                PathTimer::LossDetection => true,
726                // This path should not be used for sending after the PATH_ABANDON frame is sent.
727                // However, any outstanding data that should be sent before PATH_ABANDON, should
728                // still respect pacing.
729                PathTimer::Pacing => true,
730            };
731
732            if !keep_timer {
733                let qlog = self.qlog.with_time(now);
734                self.timers.stop(Timer::PerPath(path_id, timer), qlog);
735            }
736        }
737
738        // Emit event to the application.
739        self.events.push_back(Event::Path(PathEvent::Abandoned {
740            id: path_id,
741            reason,
742        }));
743
744        Ok(())
745    }
746
747    /// Gets the [`PathData`] for a known [`PathId`].
748    ///
749    /// Will panic if the path_id does not reference any known path.
750    #[track_caller]
751    fn path_data(&self, path_id: PathId) -> &PathData {
752        if let Some(data) = self.paths.get(&path_id) {
753            &data.data
754        } else {
755            panic!(
756                "unknown path: {path_id}, currently known paths: {:?}",
757                self.paths.keys().collect::<Vec<_>>()
758            );
759        }
760    }
761
762    /// Gets a reference to the [`PathData`] for a [`PathId`]
763    fn path(&self, path_id: PathId) -> Option<&PathData> {
764        self.paths.get(&path_id).map(|path_state| &path_state.data)
765    }
766
767    /// Gets a mutable reference to the [`PathData`] for a [`PathId`]
768    fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
769        self.paths
770            .get_mut(&path_id)
771            .map(|path_state| &mut path_state.data)
772    }
773
774    /// Returns all known paths.
775    ///
776    /// There is no guarantee any of these paths are open or usable.
777    pub fn paths(&self) -> Vec<PathId> {
778        self.paths.keys().copied().collect()
779    }
780
781    /// Gets the local [`PathStatus`] for a known [`PathId`]
782    pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
783        self.path(path_id)
784            .map(PathData::local_status)
785            .ok_or(ClosedPath { _private: () })
786    }
787
788    /// Returns the path's network path represented as a 4-tuple.
789    pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
790        self.path(path_id)
791            .map(|path| path.network_path)
792            .ok_or(ClosedPath { _private: () })
793    }
794
795    /// Sets the [`PathStatus`] for a known [`PathId`]
796    ///
797    /// Returns the previous path status on success.
798    pub fn set_path_status(
799        &mut self,
800        path_id: PathId,
801        status: PathStatus,
802    ) -> Result<PathStatus, SetPathStatusError> {
803        if !self.is_multipath_negotiated() {
804            return Err(SetPathStatusError::MultipathNotNegotiated);
805        }
806        let path = self
807            .path_mut(path_id)
808            .ok_or(SetPathStatusError::ClosedPath)?;
809        let prev = match path.status.local_update(status) {
810            Some(prev) => {
811                self.spaces[SpaceId::Data]
812                    .pending
813                    .path_status
814                    .insert(path_id);
815                prev
816            }
817            None => path.local_status(),
818        };
819        Ok(prev)
820    }
821
822    /// Returns the remote path status
823    // TODO(flub): Probably should also be some kind of path event?  Not even sure if I like
824    //    this as an API, but for now it allows me to write a test easily.
825    // TODO(flub): Technically this should be a Result<Option<PathSTatus>>?
826    pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
827        self.path(path_id).and_then(|path| path.remote_status())
828    }
829
830    /// Sets the max_idle_timeout for a specific path
831    ///
832    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
833    ///
834    /// Returns the previous value of the setting.
835    pub fn set_path_max_idle_timeout(
836        &mut self,
837        path_id: PathId,
838        timeout: Option<Duration>,
839    ) -> Result<Option<Duration>, ClosedPath> {
840        let path = self
841            .paths
842            .get_mut(&path_id)
843            .ok_or(ClosedPath { _private: () })?;
844        Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
845    }
846
847    /// Sets the keep_alive_interval for a specific path
848    ///
849    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
850    ///
851    /// Returns the previous value of the setting.
852    pub fn set_path_keep_alive_interval(
853        &mut self,
854        path_id: PathId,
855        interval: Option<Duration>,
856    ) -> Result<Option<Duration>, ClosedPath> {
857        let path = self
858            .paths
859            .get_mut(&path_id)
860            .ok_or(ClosedPath { _private: () })?;
861        Ok(std::mem::replace(&mut path.data.keep_alive, interval))
862    }
863
864    /// Gets the [`PathData`] for a known [`PathId`].
865    ///
866    /// Will panic if the path_id does not reference any known path.
867    #[track_caller]
868    fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
869        &mut self.paths.get_mut(&path_id).expect("known path").data
870    }
871
872    /// Find an open, validated path that's on the same network path as the given network path.
873    ///
874    /// Returns the first path matching, even if there's multiple.
875    fn find_validated_path_on_network_path(
876        &self,
877        network_path: FourTuple,
878    ) -> Option<(&PathId, &PathState)> {
879        self.paths.iter().find(|(path_id, path_state)| {
880            path_state.data.validated
881                // Would this use the same network path, if network_path were used to send right now?
882                && network_path.is_probably_same_path(&path_state.data.network_path)
883                && !self.abandoned_paths.contains(path_id)
884        })
885        // TODO(@divma): we might want to ensure the path has been recently active to consider the
886        // address validated
887        // matheus23: Perhaps looking at !self.abandoned_paths.contains(path_id) is enough, given keep-alives?
888    }
889
890    /// Creates the [`PathData`] for a new [`PathId`].
891    ///
892    /// Called for incoming packets as well as when opening a new path locally.
893    fn ensure_path(
894        &mut self,
895        path_id: PathId,
896        network_path: FourTuple,
897        now: Instant,
898        pn: Option<u64>,
899    ) -> &mut PathData {
900        let valid_path = self.find_validated_path_on_network_path(network_path);
901        let validated = valid_path.is_some();
902        let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
903        let vacant_entry = match self.paths.entry(path_id) {
904            btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
905            btree_map::Entry::Occupied(occupied_entry) => {
906                return &mut occupied_entry.into_mut().data;
907            }
908        };
909
910        debug!(%validated, %path_id, %network_path, "path added");
911        let peer_max_udp_payload_size =
912            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
913        self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
914        let mut data = PathData::new(
915            network_path,
916            self.allow_mtud,
917            Some(peer_max_udp_payload_size),
918            self.path_generation_counter,
919            now,
920            &self.config,
921        );
922
923        data.validated = validated;
924        if let Some(initial_rtt) = initial_rtt {
925            data.rtt.reset_initial_rtt(initial_rtt);
926        }
927
928        // To open a path locally we need to send a packet on the path. Sending a challenge
929        // guarantees this.
930        data.pending_on_path_challenge = true;
931
932        let path = vacant_entry.insert(PathState { data, prev: None });
933
934        let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
935        if let Some(pn) = pn {
936            pn_space.dedup.insert(pn);
937        }
938        self.spaces[SpaceId::Data]
939            .number_spaces
940            .insert(path_id, pn_space);
941        self.qlog.emit_tuple_assigned(path_id, network_path, now);
942
943        // If the remote opened this path we may not have CIDs for it. For locally opened
944        // paths the caller should have already made sure we have CIDs and refused to open
945        // it if there were none.
946        if !self.remote_cids.contains_key(&path_id) {
947            debug!(%path_id, "Remote opened path without issuing CIDs");
948            self.spaces[SpaceId::Data]
949                .pending
950                .path_cids_blocked
951                .insert(path_id);
952            // Do not abandon this path right away. CIDs might be in-flight still and arrive
953            // soon. It is up to the remote to handle this situation.
954        }
955
956        &mut path.data
957    }
958
959    /// Returns packets to transmit
960    ///
961    /// Connections should be polled for transmit after:
962    /// - the application performed some I/O on the connection
963    /// - a call was made to `handle_event`
964    /// - a call was made to `handle_timeout`
965    ///
966    /// `max_datagrams` specifies how many datagrams can be returned inside a
967    /// single Transmit using GSO. This must be at least 1.
968    #[must_use]
969    pub fn poll_transmit(
970        &mut self,
971        now: Instant,
972        max_datagrams: NonZeroUsize,
973        buf: &mut Vec<u8>,
974    ) -> Option<Transmit> {
975        let max_datagrams = match self.config.enable_segmentation_offload {
976            false => NonZeroUsize::MIN,
977            true => max_datagrams,
978        };
979
980        // Each call to poll_transmit can only send datagrams to one destination, because
981        // all datagrams in a GSO batch are for the same destination.  Therefore only
982        // datagrams for one destination address are produced for each poll_transmit call.
983
984        // Check whether we need to send a close message
985        let connection_close_pending = match self.state.as_type() {
986            StateType::Drained => {
987                self.app_limited = true;
988                return None;
989            }
990            StateType::Draining | StateType::Closed => {
991                // self.connection_close_pending is only reset once the associated packet
992                // had been encoded successfully
993                if !self.connection_close_pending {
994                    self.app_limited = true;
995                    return None;
996                }
997                true
998            }
999            _ => false,
1000        };
1001
1002        // Schedule an ACK_FREQUENCY frame if a new one needs to be sent.
1003        if let Some(config) = &self.config.ack_frequency_config {
1004            let rtt = self
1005                .paths
1006                .values()
1007                .map(|p| p.data.rtt.get())
1008                .min()
1009                .expect("one path exists");
1010            self.spaces[SpaceId::Data].pending.ack_frequency = self
1011                .ack_frequency
1012                .should_send_ack_frequency(rtt, config, &self.peer_params)
1013                && self.highest_space == SpaceKind::Data
1014                && self.peer_supports_ack_frequency();
1015        }
1016
1017        // If we end up not sending anything, we need to know if that was because there was
1018        // nothing to send or because we were congestion blocked.
1019        let mut congestion_blocked = false;
1020
1021        let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1022        while let Some(path_id) = next_path_id {
1023            if !connection_close_pending
1024                && let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id)
1025            {
1026                return Some(transmit);
1027            }
1028
1029            let info = self.scheduling_info(path_id);
1030            match self.poll_transmit_on_path(
1031                now,
1032                buf,
1033                path_id,
1034                max_datagrams,
1035                &info,
1036                connection_close_pending,
1037            ) {
1038                PollPathStatus::Send(transmit) => {
1039                    return Some(transmit);
1040                }
1041                PollPathStatus::NothingToSend {
1042                    congestion_blocked: cb,
1043                } => {
1044                    congestion_blocked |= cb;
1045                    // Continue checking other paths, tail-loss probes may need to be sent
1046                    // in all spaces.
1047                    debug_assert!(
1048                        buf.is_empty(),
1049                        "nothing to send on path but buffer not empty"
1050                    );
1051                }
1052            }
1053
1054            next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1055        }
1056
1057        // We didn't produce any application data packet
1058        debug_assert!(
1059            buf.is_empty(),
1060            "there was data in the buffer, but it was not sent"
1061        );
1062
1063        self.app_limited = !congestion_blocked;
1064
1065        if self.state.is_established() {
1066            // Try MTU probing now
1067            let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1068            while let Some(path_id) = next_path_id {
1069                if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) {
1070                    return Some(transmit);
1071                }
1072                next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1073            }
1074        }
1075
1076        None
1077    }
1078
1079    /// Computes the packet scheduling information for this path.
1080    ///
1081    /// While this information is only returned for a single path, it is important to know
1082    /// that this information remains static for the entire span of a single
1083    /// [`Connection::poll_transmit`] call. In other words, the return value is purely
1084    /// functional and only depends on the [`PathId`] **during a single** `poll_transmit`
1085    /// call. It can be computed up-front for all paths but we don't do that because it
1086    /// involves an allocation.
1087    ///
1088    /// See the inline comments for how the  packet scheduling works.
1089    ///
1090    /// # Panics
1091    ///
1092    /// This will panic if called for a path for which we do not have any [`PathData`], like
1093    /// so many other functions we have. But this is the only one to document this in its
1094    /// doc comment. Maybe that should change. Eventually we'll refactor things for this
1095    /// panic to go away.
1096    fn scheduling_info(&self, path_id: PathId) -> PathSchedulingInfo {
1097        let have_validated_status_available_space = self.paths.iter().any(|(path_id, path)| {
1098            self.remote_cids.contains_key(path_id)
1099                && !self.abandoned_paths.contains(path_id)
1100                && path.data.validated
1101                && path.data.local_status() == PathStatus::Available
1102        });
1103        let is_handshaking = self.is_handshaking();
1104        let has_cids = self.remote_cids.contains_key(&path_id);
1105        let abandoned = self.abandoned_paths.contains(&path_id);
1106        let path_data = self.path_data(path_id);
1107        let validated = path_data.validated;
1108        let status = path_data.local_status();
1109
1110        // This is the core packet scheduling, whether this space ID may send
1111        // SpaceKind::Data frames.
1112        let may_send_data = has_cids
1113            && !abandoned
1114            && if is_handshaking {
1115                // There is only one path during the handshake. We want to
1116                // already send 0-RTT and 0.5-RTT (permitting anti-amplification
1117                // limit) data.
1118                true
1119            } else if !validated {
1120                // TODO(flub): When we have a network change we might end up
1121                //    having to abandon all paths and re-open new ones to the
1122                //    same remotes. This leaves us without any validated
1123                //    path. Perhaps we should have a way to figure out if the
1124                //    path is to a previously-validated remote address and allow
1125                //    sending data to such remotes immediately.
1126                false
1127            } else {
1128                match status {
1129                    PathStatus::Available => {
1130                        // Best possible space to send data on.
1131                        true
1132                    }
1133                    PathStatus::Backup => {
1134                        // If there is a status-available path we prefer that.
1135                        !have_validated_status_available_space
1136                    }
1137                }
1138            };
1139
1140        // CONNECTION_CLOSE is allowed to be sent on a non-validated
1141        // path. Particularly during the handshake we want to send it before the
1142        // path is validated. Later if there is no validated path available we
1143        // will also accept sending it on an un-validated path.
1144        let may_send_close = has_cids
1145            && !abandoned
1146            && if !validated && have_validated_status_available_space {
1147                // We have a better space to send on.
1148                false
1149            } else {
1150                // No other validated space, this is as good as it gets.
1151                true
1152            };
1153
1154        PathSchedulingInfo {
1155            abandoned,
1156            may_send_data,
1157            may_send_close,
1158        }
1159    }
1160
1161    fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit {
1162        debug_assert!(
1163            !transmit.is_empty(),
1164            "must not be called with an empty transmit buffer"
1165        );
1166
1167        let network_path = self.path_data(path_id).network_path;
1168        trace!(
1169            segment_size = transmit.segment_size(),
1170            last_datagram_len = transmit.len() % transmit.segment_size(),
1171            %network_path,
1172            "sending {} bytes in {} datagrams",
1173            transmit.len(),
1174            transmit.num_datagrams()
1175        );
1176        self.path_data_mut(path_id)
1177            .inc_total_sent(transmit.len() as u64);
1178
1179        self.stats
1180            .udp_tx
1181            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1182        self.path_stats
1183            .entry(path_id)
1184            .or_default()
1185            .udp_tx
1186            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1187
1188        Transmit {
1189            destination: network_path.remote,
1190            size: transmit.len(),
1191            ecn: if self.path_data(path_id).sending_ecn {
1192                Some(EcnCodepoint::Ect0)
1193            } else {
1194                None
1195            },
1196            segment_size: match transmit.num_datagrams() {
1197                1 => None,
1198                _ => Some(transmit.segment_size()),
1199            },
1200            src_ip: network_path.local_ip,
1201        }
1202    }
1203
1204    /// poll_transmit logic for off-path data.
1205    fn poll_transmit_off_path(
1206        &mut self,
1207        now: Instant,
1208        buf: &mut Vec<u8>,
1209        path_id: PathId,
1210    ) -> Option<Transmit> {
1211        if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) {
1212            return Some(challenge);
1213        }
1214        if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
1215            return Some(response);
1216        }
1217        if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
1218            return Some(challenge);
1219        }
1220        None
1221    }
1222
1223    /// poll_transmit logic for on-path data.
1224    ///
1225    /// This is not quite the same as for a multipath packet space, since [`PathId::ZERO`]
1226    /// has 3 packet spaces, which this handles.
1227    ///
1228    /// See [`Self::poll_transmit_off_path`] for off-path data.
1229    #[must_use]
1230    fn poll_transmit_on_path(
1231        &mut self,
1232        now: Instant,
1233        buf: &mut Vec<u8>,
1234        path_id: PathId,
1235        max_datagrams: NonZeroUsize,
1236        scheduling_info: &PathSchedulingInfo,
1237        connection_close_pending: bool,
1238    ) -> PollPathStatus {
1239        // Check if there is at least one active CID to use for sending
1240        let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else {
1241            if !self.abandoned_paths.contains(&path_id) {
1242                debug!(%path_id, "no remote CIDs for path");
1243            }
1244            return PollPathStatus::NothingToSend {
1245                congestion_blocked: false,
1246            };
1247        };
1248
1249        // Whether the last packet in the datagram must be padded so the datagram takes up
1250        // an exact size. An earlier space can decide to not fill an entire datagram and
1251        // require the next space to fill it further. But may need a specific size of the
1252        // datagram containing the packet. The final packet built in the datagram must pad
1253        // to this size.
1254        let mut pad_datagram = PadDatagram::No;
1255
1256        // The packet number of the last built packet. This is kept kept across spaces.
1257        // QUIC is supposed to have a single congestion controller for the Initial,
1258        // Handshake and Data(PathId::ZERO) spaces.
1259        let mut last_packet_number = None;
1260
1261        // If we end up not sending anything, we need to know if that was because there was
1262        // nothing to send or because we were congestion blocked.
1263        let mut congestion_blocked = false;
1264
1265        // Set the segment size to this path's MTU for on-path data.
1266        let pmtu = self.path_data(path_id).current_mtu().into();
1267        let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu);
1268
1269        // Iterate over the available spaces.
1270        for space_id in SpaceId::iter() {
1271            // Only PathId::ZERO uses non Data space ids.
1272            if path_id != PathId::ZERO && space_id != SpaceId::Data {
1273                continue;
1274            }
1275            match self.poll_transmit_path_space(
1276                now,
1277                &mut transmit,
1278                path_id,
1279                space_id,
1280                remote_cid,
1281                scheduling_info,
1282                connection_close_pending,
1283                pad_datagram,
1284            ) {
1285                PollPathSpaceStatus::NothingToSend {
1286                    congestion_blocked: cb,
1287                } => {
1288                    congestion_blocked |= cb;
1289                    // Continue checking other spaces, tail-loss probes may need to be sent
1290                    // in all spaces.
1291                }
1292                PollPathSpaceStatus::WrotePacket {
1293                    last_packet_number: pn,
1294                    pad_datagram: pad,
1295                } => {
1296                    debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1297                    last_packet_number = Some(pn);
1298                    pad_datagram = pad;
1299                    // Always check higher spaces. If the transmit is full or they have
1300                    // nothing to send they will not write packets. But if they can, they
1301                    // must always be allowed to add to this transmit because coalescing may
1302                    // be required.
1303                    continue;
1304                }
1305                PollPathSpaceStatus::Send {
1306                    last_packet_number: pn,
1307                } => {
1308                    debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1309                    last_packet_number = Some(pn);
1310                    break;
1311                }
1312            }
1313        }
1314
1315        if last_packet_number.is_some() || congestion_blocked {
1316            self.qlog.emit_recovery_metrics(
1317                path_id,
1318                &mut self.paths.get_mut(&path_id).unwrap().data,
1319                now,
1320            );
1321        }
1322
1323        match last_packet_number {
1324            Some(last_packet_number) => {
1325                // Note that when sending in multiple spaces the last packet number will be
1326                // the one from the highest space.
1327                self.path_data_mut(path_id).congestion.on_sent(
1328                    now,
1329                    transmit.len() as u64,
1330                    last_packet_number,
1331                );
1332                PollPathStatus::Send(self.build_transmit(path_id, transmit))
1333            }
1334            None => PollPathStatus::NothingToSend { congestion_blocked },
1335        }
1336    }
1337
1338    /// poll_transmit logic for a QUIC-MULTIPATH packet number space (PathID + SpaceId).
1339    #[must_use]
1340    fn poll_transmit_path_space(
1341        &mut self,
1342        now: Instant,
1343        transmit: &mut TransmitBuf<'_>,
1344        path_id: PathId,
1345        space_id: SpaceId,
1346        remote_cid: ConnectionId,
1347        scheduling_info: &PathSchedulingInfo,
1348        // If we need to send a CONNECTION_CLOSE frame.
1349        connection_close_pending: bool,
1350        // Whether the current datagram needs to be padded to a certain size.
1351        mut pad_datagram: PadDatagram,
1352    ) -> PollPathSpaceStatus {
1353        // Keep track of the last packet number we wrote. If None we did not write any
1354        // packets.
1355        let mut last_packet_number = None;
1356
1357        // Each loop of this may build one packet. It works logically as follows:
1358        //
1359        // - Check if something *needs* to be sent in this space and *can* be sent.
1360        //   - If not, return to the caller who will call us again for the next space.
1361        // - Start a new datagram.
1362        //   - Unless coalescing the packet into an existing datagram.
1363        // - Write the packet header and payload.
1364        // - Check if coalescing a next packet into the datagram is possible.
1365        // - If coalescing, finish packet without padding to leave space in the datagram.
1366        // - If not coalescing, complete the datagram:
1367        //   - Finish packet with padding.
1368        //   - Set the transmit segment size if this is the first datagram.
1369        // - Loop: next iteration will exit the loop if nothing more to send in this
1370        //   space. The TransmitBuf will contain a started datagram with space if
1371        //   coalescing, or completely filled datagram if not coalescing.
1372        loop {
1373            // Determine if anything can be sent in this packet number space.
1374            let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1375                // A datagram is started already, we are coalescing another packet into it.
1376                transmit.datagram_remaining_mut()
1377            } else {
1378                // A new datagram needs to be started.
1379                transmit.segment_size()
1380            };
1381            let can_send =
1382                self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending);
1383            let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1384            let space_will_send = {
1385                if scheduling_info.abandoned {
1386                    // Currently we don't send on an abandoned path, PATH_ABANDON is always
1387                    // sent on a different path.
1388                    false
1389                } else if can_send.close && scheduling_info.may_send_close {
1390                    // This is the best path to send a CONNECTION_CLOSE on.
1391                    true
1392                } else if needs_loss_probe || can_send.space_specific {
1393                    // We always send a loss probe or space-specific frames if the path is
1394                    // not abandoned.
1395                    true
1396                } else {
1397                    // Anything else we only send if we're the best path for SpaceKind::Data
1398                    // frames.
1399                    !can_send.is_empty() && scheduling_info.may_send_data
1400                }
1401            };
1402
1403            if !space_will_send {
1404                // Nothing more to send. Previous iterations of this loop may have built
1405                // packets already.
1406                return match last_packet_number {
1407                    Some(pn) => PollPathSpaceStatus::WrotePacket {
1408                        last_packet_number: pn,
1409                        pad_datagram,
1410                    },
1411                    None => {
1412                        // Only log for spaces which have crypto.
1413                        if self.crypto_state.has_keys(space_id.encryption_level())
1414                            || (space_id == SpaceId::Data
1415                                && self.crypto_state.has_keys(EncryptionLevel::ZeroRtt))
1416                        {
1417                            trace!(?space_id, %path_id, "nothing to send in space");
1418                        }
1419                        PollPathSpaceStatus::NothingToSend {
1420                            congestion_blocked: false,
1421                        }
1422                    }
1423                };
1424            }
1425
1426            // We want to send on this space, check congestion control if we can. But only
1427            // if we will need to start a new datagram. If we are coalescing into an already
1428            // started datagram we do not need to check congestion control again.
1429            if transmit.datagram_remaining_mut() == 0 {
1430                let congestion_blocked =
1431                    self.path_congestion_check(space_id, path_id, transmit, &can_send, now);
1432                if congestion_blocked != PathBlocked::No {
1433                    // Previous iterations of this loop may have built packets already.
1434                    return match last_packet_number {
1435                        Some(pn) => PollPathSpaceStatus::WrotePacket {
1436                            last_packet_number: pn,
1437                            pad_datagram,
1438                        },
1439                        None => {
1440                            return PollPathSpaceStatus::NothingToSend {
1441                                congestion_blocked: true,
1442                            };
1443                        }
1444                    };
1445                }
1446            }
1447
1448            // If the datagram is full (or there never was one started), we need to start a
1449            // new one.
1450            if transmit.datagram_remaining_mut() == 0 {
1451                if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1452                    // No more datagrams allowed.
1453                    // Previous iterations of this loop may have built packets already.
1454                    return match last_packet_number {
1455                        Some(pn) => PollPathSpaceStatus::WrotePacket {
1456                            last_packet_number: pn,
1457                            pad_datagram,
1458                        },
1459                        None => {
1460                            return PollPathSpaceStatus::NothingToSend {
1461                                congestion_blocked: false,
1462                            };
1463                        }
1464                    };
1465                }
1466
1467                match self.spaces[space_id].for_path(path_id).loss_probes {
1468                    0 => transmit.start_new_datagram(),
1469                    _ => {
1470                        // We need something to send for a tail-loss probe.
1471                        let request_immediate_ack =
1472                            space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1473                        self.spaces[space_id].maybe_queue_probe(
1474                            path_id,
1475                            request_immediate_ack,
1476                            &self.streams,
1477                        );
1478
1479                        self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1480
1481                        // Clamp the datagram to at most the minimum MTU to ensure that loss
1482                        // probes can get through and enable recovery even if the path MTU
1483                        // has shrank unexpectedly.
1484                        transmit.start_new_datagram_with_size(std::cmp::min(
1485                            usize::from(INITIAL_MTU),
1486                            transmit.segment_size(),
1487                        ));
1488                    }
1489                }
1490                trace!(count = transmit.num_datagrams(), "new datagram started");
1491
1492                // We started a new datagram, we decide later if it needs padding.
1493                pad_datagram = PadDatagram::No;
1494            }
1495
1496            // If coalescing another packet into the existing datagram, there should
1497            // still be enough space for a whole packet.
1498            if transmit.datagram_start_offset() < transmit.len() {
1499                debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1500            }
1501
1502            //
1503            // From here on, we've determined that a packet will definitely be sent.
1504            //
1505
1506            if self.crypto_state.has_keys(EncryptionLevel::Initial)
1507                && space_id == SpaceId::Handshake
1508                && self.side.is_client()
1509            {
1510                // A client stops both sending and processing Initial packets when it
1511                // sends its first Handshake packet.
1512                self.discard_space(now, SpaceKind::Initial);
1513            }
1514            if let Some(ref mut prev) = self.crypto_state.prev_crypto {
1515                prev.update_unacked = false;
1516            }
1517
1518            let Some(mut builder) = PacketBuilder::new(
1519                now,
1520                space_id,
1521                path_id,
1522                remote_cid,
1523                transmit,
1524                can_send.is_ack_eliciting(),
1525                self,
1526            ) else {
1527                // Confidentiality limit is exceeded and the connection has been killed. We
1528                // should not send any other packets. This works in a roundabout way: We
1529                // have started a datagram but not written anything into it. So even if we
1530                // get called again for another space we will see an already started
1531                // datagram and try and start another packet here. Then be stopped by the
1532                // same confidentiality limit.
1533                return PollPathSpaceStatus::NothingToSend {
1534                    congestion_blocked: false,
1535                };
1536            };
1537            last_packet_number = Some(builder.packet_number);
1538
1539            if space_id == SpaceId::Initial
1540                && (self.side.is_client() || can_send.is_ack_eliciting())
1541            {
1542                // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1
1543                pad_datagram |= PadDatagram::ToMinMtu;
1544            }
1545            if space_id == SpaceId::Data && self.config.pad_to_mtu {
1546                pad_datagram |= PadDatagram::ToSegmentSize;
1547            }
1548
1549            if can_send.close {
1550                trace!("sending CONNECTION_CLOSE");
1551                // Encode ACKs before the ConnectionClose message, to give the receiver
1552                // a better approximate on what data has been processed. This is
1553                // especially important with ack delay, since the peer might not
1554                // have gotten any other ACK for the data earlier on.
1555                let is_multipath_negotiated = self.is_multipath_negotiated();
1556                for path_id in self.spaces[space_id]
1557                    .number_spaces
1558                    .iter()
1559                    .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1560                    .map(|(&path_id, _)| path_id)
1561                    .collect::<Vec<_>>()
1562                {
1563                    Self::populate_acks(
1564                        now,
1565                        self.receiving_ecn,
1566                        path_id,
1567                        space_id,
1568                        &mut self.spaces[space_id],
1569                        is_multipath_negotiated,
1570                        &mut builder,
1571                        &mut self.stats.frame_tx,
1572                        self.crypto_state.has_keys(space_id.encryption_level()),
1573                    );
1574                }
1575
1576                // Since there only 64 ACK frames there will always be enough space
1577                // to encode the ConnectionClose frame too. However we still have the
1578                // check here to prevent crashes if something changes.
1579
1580                // TODO(flub): This needs fixing for multipath, to ensure we can always
1581                //    write the CONNECTION_CLOSE even if we have many PATH_ACKs to send:
1582                //    https://github.com/n0-computer/noq/issues/367.
1583                debug_assert!(
1584                    builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1585                    "ACKs should leave space for ConnectionClose"
1586                );
1587                let stats = &mut self.stats.frame_tx;
1588                if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1589                    let max_frame_size = builder.frame_space_remaining();
1590                    let close: Close = match self.state.as_type() {
1591                        StateType::Closed => {
1592                            let reason: Close =
1593                                self.state.as_closed().expect("checked").clone().into();
1594                            if space_id == SpaceId::Data || reason.is_transport_layer() {
1595                                reason
1596                            } else {
1597                                TransportError::APPLICATION_ERROR("").into()
1598                            }
1599                        }
1600                        StateType::Draining => TransportError::NO_ERROR("").into(),
1601                        _ => unreachable!(
1602                            "tried to make a close packet when the connection wasn't closed"
1603                        ),
1604                    };
1605                    builder.write_frame(close.encoder(max_frame_size), stats);
1606                }
1607                let last_pn = builder.packet_number;
1608                builder.finish_and_track(now, self, path_id, pad_datagram);
1609                if space_id.kind() == self.highest_space {
1610                    // Don't send another close packet. Even with multipath we only send
1611                    // CONNECTION_CLOSE on a single path since we expect our paths to work.
1612                    self.connection_close_pending = false;
1613                }
1614                // Send a close frame in every possible space for robustness, per
1615                // RFC9000 "Immediate Close during the Handshake". Don't bother trying
1616                // to send anything else.
1617                // TODO(flub): This breaks during the handshake if we can not coalesce
1618                //    packets due to space reasons: the next space would either fail a
1619                //    debug_assert checking for enough packet space or produce an invalid
1620                //    packet. We need to keep track of per-space pending CONNECTION_CLOSE to
1621                //    be able to send these across multiple calls to poll_transmit. Then
1622                //    check for coalescing space here because initial packets need to be in
1623                //    padded datagrams. And also add space checks for CONNECTION_CLOSE in
1624                //    space_can_send so it would stop a GSO batch if the datagram is too
1625                //    small for another CONNECTION_CLOSE packet.
1626                return PollPathSpaceStatus::WrotePacket {
1627                    last_packet_number: last_pn,
1628                    pad_datagram,
1629                };
1630            }
1631
1632            self.populate_packet(now, space_id, path_id, scheduling_info, &mut builder);
1633
1634            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
1635            // any other reason, there is a bug which leads to one component announcing write
1636            // readiness while not writing any data. This degrades performance. The condition is
1637            // only checked if the full MTU is available and when potentially large fixed-size
1638            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
1639            // writing ACKs.
1640            debug_assert!(
1641                !(builder.sent_frames().is_ack_only(&self.streams)
1642                    && !can_send.acks
1643                    && (can_send.other || can_send.space_specific)
1644                    && builder.buf.segment_size()
1645                        == self.path_data(path_id).current_mtu() as usize
1646                    && self.datagrams.outgoing.is_empty()),
1647                "SendableFrames was {can_send:?}, but only ACKs have been written"
1648            );
1649            if builder.sent_frames().requires_padding {
1650                pad_datagram |= PadDatagram::ToMinMtu;
1651            }
1652
1653            for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1654                self.spaces[space_id]
1655                    .for_path(*path_id)
1656                    .pending_acks
1657                    .acks_sent();
1658                self.timers.stop(
1659                    Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1660                    self.qlog.with_time(now),
1661                );
1662            }
1663
1664            // Now we need to finish the packet.  Before we do so we need to know if we will
1665            // be coalescing the next packet into this one, or will be ending the datagram
1666            // as well.  Because if this is the last packet in the datagram more padding
1667            // might be needed because of the packet type, or to fill the GSO segment size.
1668
1669            // Are we allowed to coalesce AND is there enough space for another *packet* in
1670            // this datagram AND will we definitely send another packet?
1671            if builder.can_coalesce && path_id == PathId::ZERO && {
1672                let max_packet_size = builder
1673                    .buf
1674                    .datagram_remaining_mut()
1675                    .saturating_sub(builder.predict_packet_end());
1676                max_packet_size > MIN_PACKET_SPACE
1677                    && self.has_pending_packet(space_id, max_packet_size, connection_close_pending)
1678            } {
1679                // We can append/coalesce the next packet into the current
1680                // datagram. Finish the current packet without adding extra padding.
1681                trace!("will coalesce with next packet");
1682                builder.finish_and_track(now, self, path_id, PadDatagram::No);
1683            } else {
1684                // We need a new datagram for the next packet.  Finish the current
1685                // packet with padding.
1686                // TODO(flub): if there isn't any more data to be sent, this will still pad
1687                //    to the segment size and only discover there is nothing to send before
1688                //    starting the next packet. That is wasting up to 32 bytes.
1689                if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1690                    // If too many padding bytes would be required to continue the
1691                    // GSO batch after this packet, end the GSO batch here. Ensures
1692                    // that fixed-size frames with heterogeneous sizes
1693                    // (e.g. application datagrams) won't inadvertently waste large
1694                    // amounts of bandwidth. The exact threshold is a bit arbitrary
1695                    // and might benefit from further tuning, though there's no
1696                    // universally optimal value.
1697                    const MAX_PADDING: usize = 32;
1698                    if builder.buf.datagram_remaining_mut()
1699                        > builder.predict_packet_end() + MAX_PADDING
1700                    {
1701                        trace!(
1702                            "GSO truncated by demand for {} padding bytes",
1703                            builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1704                        );
1705                        let last_pn = builder.packet_number;
1706                        builder.finish_and_track(now, self, path_id, PadDatagram::No);
1707                        return PollPathSpaceStatus::Send {
1708                            last_packet_number: last_pn,
1709                        };
1710                    }
1711
1712                    // Pad the current datagram to GSO segment size so it can be
1713                    // included in the GSO batch.
1714                    builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1715                } else {
1716                    builder.finish_and_track(now, self, path_id, pad_datagram);
1717                }
1718
1719                // If this is the first datagram we set the segment size to the size of the
1720                // first datagram.
1721                if transmit.num_datagrams() == 1 {
1722                    transmit.clip_segment_size();
1723                }
1724            }
1725        }
1726    }
1727
1728    fn poll_transmit_mtu_probe(
1729        &mut self,
1730        now: Instant,
1731        buf: &mut Vec<u8>,
1732        path_id: PathId,
1733    ) -> Option<Transmit> {
1734        let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?;
1735
1736        // We are definitely sending a DPLPMTUD probe.
1737        let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize);
1738        transmit.start_new_datagram_with_size(probe_size as usize);
1739
1740        let mut builder = PacketBuilder::new(
1741            now,
1742            SpaceId::Data,
1743            path_id,
1744            active_cid,
1745            &mut transmit,
1746            true,
1747            self,
1748        )?;
1749
1750        // We implement MTU probes as ping packets padded up to the probe size
1751        trace!(?probe_size, "writing MTUD probe");
1752        builder.write_frame(frame::Ping, &mut self.stats.frame_tx);
1753
1754        // If supported by the peer, we want no delays to the probe's ACK
1755        if self.peer_supports_ack_frequency() {
1756            builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx);
1757        }
1758
1759        builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1760
1761        self.path_stats
1762            .entry(path_id)
1763            .or_default()
1764            .sent_plpmtud_probes += 1;
1765
1766        Some(self.build_transmit(path_id, transmit))
1767    }
1768
1769    /// Returns the CID and probe size if a DPLPMTUD probe is needed.
1770    ///
1771    /// We MTU probe all paths for which all of the following is true:
1772    /// - We have an active destination CID for the path.
1773    /// - The remote address *and* path are validated.
1774    /// - The path is not abandoned.
1775    /// - The MTU Discovery subsystem wants to probe the path.
1776    fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> {
1777        let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?;
1778        let is_eligible = self.path_data(path_id).validated
1779            && !self.path_data(path_id).is_validating_path()
1780            && !self.abandoned_paths.contains(&path_id);
1781
1782        if !is_eligible {
1783            return None;
1784        }
1785        let next_pn = self.spaces[SpaceId::Data]
1786            .for_path(path_id)
1787            .peek_tx_number();
1788        let probe_size = self
1789            .path_data_mut(path_id)
1790            .mtud
1791            .poll_transmit(now, next_pn)?;
1792
1793        Some((active_cid, probe_size))
1794    }
1795
1796    /// Returns true if there is a further packet to send on [`PathId::ZERO`].
1797    ///
1798    /// In other words this is predicting whether the next call to
1799    /// [`Connection::space_can_send`] issued will return some frames to be sent. Including
1800    /// having to predict which packet number space it will be invoked with. This depends on
1801    /// how both [`Connection::poll_transmit_on_path`] and
1802    /// [`Connection::poll_transmit_path_space`] behave.
1803    ///
1804    /// This is needed to determine if packet coalescing can happen. Because the last packet
1805    /// in a datagram may need to be padded and thus we must know if another packet will
1806    /// follow or not.
1807    ///
1808    /// The next packet can be either in the same space, or in one of the following spaces
1809    /// on the same path. Because a 0-RTT packet can be coalesced with a 1-RTT packet and
1810    /// both are in the Data(PathId::ZERO) space. Previous spaces are not checked, because
1811    /// packets are built from Initial to Handshake to Data spaces.
1812    fn has_pending_packet(
1813        &mut self,
1814        current_space_id: SpaceId,
1815        max_packet_size: usize,
1816        connection_close_pending: bool,
1817    ) -> bool {
1818        let mut space_id = current_space_id;
1819        loop {
1820            let can_send = self.space_can_send(
1821                space_id,
1822                PathId::ZERO,
1823                max_packet_size,
1824                connection_close_pending,
1825            );
1826            if !can_send.is_empty() {
1827                return true;
1828            }
1829            match space_id.next() {
1830                Some(next_space_id) => space_id = next_space_id,
1831                None => break,
1832            }
1833        }
1834        false
1835    }
1836
1837    /// Checks if creating a new datagram would be blocked by congestion control
1838    fn path_congestion_check(
1839        &mut self,
1840        space_id: SpaceId,
1841        path_id: PathId,
1842        transmit: &TransmitBuf<'_>,
1843        can_send: &SendableFrames,
1844        now: Instant,
1845    ) -> PathBlocked {
1846        // Anti-amplification is only based on `total_sent`, which gets updated after
1847        // the transmit is sent. Therefore we pass the amount of bytes for datagrams
1848        // that are already created, as well as 1 byte for starting another datagram. If
1849        // there is any anti-amplification budget left, we always allow a full MTU to be
1850        // sent (see https://github.com/quinn-rs/quinn/issues/1082).
1851        if self.side().is_server()
1852            && self
1853                .path_data(path_id)
1854                .anti_amplification_blocked(transmit.len() as u64 + 1)
1855        {
1856            trace!(?space_id, %path_id, "blocked by anti-amplification");
1857            return PathBlocked::AntiAmplification;
1858        }
1859
1860        // Congestion control check.
1861        // Tail loss probes must not be blocked by congestion, or a deadlock could arise.
1862        let bytes_to_send = transmit.segment_size() as u64;
1863        let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1864
1865        if can_send.other && !need_loss_probe && !can_send.close {
1866            let path = self.path_data(path_id);
1867            if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1868                trace!(
1869                    ?space_id,
1870                    %path_id,
1871                    in_flight=%path.in_flight.bytes,
1872                    congestion_window=%path.congestion.window(),
1873                    "blocked by congestion control",
1874                );
1875                return PathBlocked::Congestion;
1876            }
1877        }
1878
1879        // Pacing check.
1880        if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1881            self.timers.set(
1882                Timer::PerPath(path_id, PathTimer::Pacing),
1883                delay,
1884                self.qlog.with_time(now),
1885            );
1886            // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though
1887            // they are not congestion controlled.
1888            trace!(?space_id, %path_id, "blocked by pacing");
1889            return PathBlocked::Pacing;
1890        }
1891
1892        PathBlocked::No
1893    }
1894
1895    /// Send PATH_CHALLENGE for a previous path if necessary
1896    ///
1897    /// QUIC-TRANSPORT section 9.3.3
1898    /// <https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding>
1899    fn send_prev_path_challenge(
1900        &mut self,
1901        now: Instant,
1902        buf: &mut Vec<u8>,
1903        path_id: PathId,
1904    ) -> Option<Transmit> {
1905        let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1906        // TODO (matheus23): We could use !prev_path.is_validating() here instead to
1907        // (possibly) also re-send challenges when they get lost.
1908        if !prev_path.pending_on_path_challenge {
1909            return None;
1910        };
1911        prev_path.pending_on_path_challenge = false;
1912        let token = self.rng.random();
1913        let network_path = prev_path.network_path;
1914        prev_path.record_path_challenge_sent(now, token, network_path);
1915
1916        debug_assert_eq!(
1917            self.highest_space,
1918            SpaceKind::Data,
1919            "PATH_CHALLENGE queued without 1-RTT keys"
1920        );
1921        let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1922        buf.start_new_datagram();
1923
1924        // Use the previous CID to avoid linking the new path with the previous path. We
1925        // don't bother accounting for possible retirement of that prev_cid because this is
1926        // sent once, immediately after migration, when the CID is known to be valid. Even
1927        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1928        // this is sent first.
1929        let mut builder =
1930            PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, false, self)?;
1931        let challenge = frame::PathChallenge(token);
1932        let stats = &mut self.stats.frame_tx;
1933        builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1934
1935        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1936        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1937        // unless the anti-amplification limit for the path does not permit
1938        // sending a datagram of this size
1939        builder.pad_to(MIN_INITIAL_SIZE);
1940
1941        builder.finish(self, now);
1942        self.stats.udp_tx.on_sent(1, buf.len());
1943        self.path_stats
1944            .entry(path_id)
1945            .or_default()
1946            .udp_tx
1947            .on_sent(1, buf.len());
1948
1949        Some(Transmit {
1950            destination: network_path.remote,
1951            size: buf.len(),
1952            ecn: None,
1953            segment_size: None,
1954            src_ip: network_path.local_ip,
1955        })
1956    }
1957
1958    fn send_off_path_path_response(
1959        &mut self,
1960        now: Instant,
1961        buf: &mut Vec<u8>,
1962        path_id: PathId,
1963    ) -> Option<Transmit> {
1964        let path = self.paths.get_mut(&path_id).map(|state| &mut state.data)?;
1965        let cid_queue = self.remote_cids.get_mut(&path_id)?;
1966        let (token, network_path) = path.path_responses.pop_off_path(path.network_path)?;
1967
1968        let cid = cid_queue
1969            .next_reserved()
1970            .unwrap_or_else(|| cid_queue.active());
1971        // TODO(@divma): we should take a different approach when there is no fresh CID to use.
1972        // https://github.com/quinn-rs/quinn/issues/2184
1973
1974        let frame = frame::PathResponse(token);
1975
1976        let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1977        buf.start_new_datagram();
1978
1979        let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, false, self)?;
1980        let stats = &mut self.stats.frame_tx;
1981        builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
1982        builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
1983
1984        let size = buf.len();
1985
1986        self.stats.udp_tx.on_sent(1, size);
1987        self.path_stats
1988            .entry(path_id)
1989            .or_default()
1990            .udp_tx
1991            .on_sent(1, size);
1992        Some(Transmit {
1993            destination: network_path.remote,
1994            size,
1995            ecn: None,
1996            segment_size: None,
1997            src_ip: network_path.local_ip,
1998        })
1999    }
2000
2001    /// Send a nat traversal challenge (off-path) on this path if possible.
2002    ///
2003    /// This will ensure the path still has a remaining CID to use if the active one should be
2004    /// retired.
2005    fn send_nat_traversal_path_challenge(
2006        &mut self,
2007        now: Instant,
2008        buf: &mut Vec<u8>,
2009        path_id: PathId,
2010    ) -> Option<Transmit> {
2011        let server_side = self.n0_nat_traversal.server_side_mut().ok()?;
2012        let probe = server_side.next_probe()?;
2013        if !self.paths.get(&path_id)?.data.validated {
2014            // Path is not usable for probing
2015            return None;
2016        }
2017
2018        let remote_cids = self.remote_cids.get_mut(&path_id)?;
2019
2020        // Check if this path has enough CIDs to send a probe. One to be reserved, one in case the
2021        // active CID needs to be retired.
2022        if remote_cids.remaining() < 2 {
2023            return None;
2024        }
2025
2026        let cid = remote_cids.next_reserved()?;
2027        let remote = probe.remote();
2028        let token = self.rng.random();
2029        probe.mark_as_sent();
2030
2031        let frame = frame::PathChallenge(token);
2032
2033        let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2034        buf.start_new_datagram();
2035
2036        let mut builder =
2037            PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, false, self)?;
2038        let stats = &mut self.stats.frame_tx;
2039        builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
2040        builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
2041
2042        let path = &mut self.paths.get_mut(&path_id).expect("checked").data;
2043        let network_path = FourTuple {
2044            remote,
2045            local_ip: None,
2046        };
2047
2048        path.record_path_challenge_sent(now, token, network_path);
2049
2050        let size = buf.len();
2051
2052        self.stats.udp_tx.on_sent(1, size);
2053        self.path_stats
2054            .entry(path_id)
2055            .or_default()
2056            .udp_tx
2057            .on_sent(1, size);
2058
2059        Some(Transmit {
2060            destination: remote,
2061            size,
2062            ecn: None,
2063            segment_size: None,
2064            src_ip: None,
2065        })
2066    }
2067
2068    /// Indicate what types of frames are ready to send for the given space.
2069    ///
2070    /// Only for on-path data.
2071    ///
2072    /// *packet_size* is the number of bytes available to build the next packet.
2073    /// *connection_close_pending* indicates whether a CONNECTION_CLOSE frame needs to be
2074    /// sent.
2075    fn space_can_send(
2076        &mut self,
2077        space_id: SpaceId,
2078        path_id: PathId,
2079        packet_size: usize,
2080        connection_close_pending: bool,
2081    ) -> SendableFrames {
2082        let space = &mut self.spaces[space_id];
2083        let space_has_crypto = self.crypto_state.has_keys(space_id.encryption_level());
2084
2085        if !space_has_crypto
2086            && (space_id != SpaceId::Data
2087                || !self.crypto_state.has_keys(EncryptionLevel::ZeroRtt)
2088                || self.side.is_server())
2089        {
2090            // Nothing to send in this space
2091            return SendableFrames::empty();
2092        }
2093
2094        let mut can_send = space.can_send(path_id, &self.streams);
2095
2096        // Check for 1RTT space.
2097        if space_id == SpaceId::Data {
2098            let pn = space.for_path(path_id).peek_tx_number();
2099            // Number of bytes available for frames if this is a 1-RTT packet. We're
2100            // guaranteed to be able to send an individual frame at least this large in the
2101            // next 1-RTT packet. This could be generalized to support every space, but it's
2102            // only needed to handle large fixed-size frames, which only exist in 1-RTT
2103            // (application datagrams).
2104            let frame_space_1rtt =
2105                packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
2106            can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
2107        }
2108
2109        can_send.close = connection_close_pending && space_has_crypto;
2110
2111        can_send
2112    }
2113
2114    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
2115    ///
2116    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
2117    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
2118    /// extracted through the relevant methods.
2119    pub fn handle_event(&mut self, event: ConnectionEvent) {
2120        use ConnectionEventInner::*;
2121        match event.0 {
2122            Datagram(DatagramConnectionEvent {
2123                now,
2124                network_path,
2125                path_id,
2126                ecn,
2127                first_decode,
2128                remaining,
2129            }) => {
2130                let span = trace_span!("pkt", %path_id);
2131                let _guard = span.enter();
2132
2133                if self.update_network_path_or_discard(network_path, path_id) {
2134                    // A return value of true indicates we should discard this packet.
2135                    return;
2136                }
2137
2138                let was_anti_amplification_blocked = self
2139                    .path(path_id)
2140                    .map(|path| path.anti_amplification_blocked(1))
2141                    // We never tried to send on an non-existing (new) path so have not been
2142                    // anti-amplification blocked for it previously.
2143                    .unwrap_or(false);
2144
2145                self.stats.udp_rx.datagrams += 1;
2146                self.stats.udp_rx.bytes += first_decode.len() as u64;
2147                let rx = &mut self.path_stats.entry(path_id).or_default().udp_rx;
2148                rx.datagrams += 1;
2149                rx.bytes += first_decode.len() as u64;
2150                let data_len = first_decode.len();
2151
2152                self.handle_decode(now, network_path, path_id, ecn, first_decode);
2153                // The current `path` might have changed inside `handle_decode` since the packet
2154                // could have triggered a migration. The packet might also belong to an unknown
2155                // path and have been rejected. Make sure the data received is accounted for the
2156                // most recent path by accessing `path` after `handle_decode`.
2157                if let Some(path) = self.path_mut(path_id) {
2158                    path.inc_total_recvd(data_len as u64);
2159                }
2160
2161                if let Some(data) = remaining {
2162                    self.stats.udp_rx.bytes += data.len() as u64;
2163                    self.path_stats.entry(path_id).or_default().udp_rx.bytes += data.len() as u64;
2164                    self.handle_coalesced(now, network_path, path_id, ecn, data);
2165                }
2166
2167                if let Some(path) = self.paths.get_mut(&path_id) {
2168                    self.qlog
2169                        .emit_recovery_metrics(path_id, &mut path.data, now);
2170                }
2171
2172                if was_anti_amplification_blocked {
2173                    // A prior attempt to set the loss detection timer may have failed due to
2174                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
2175                    // the server's first flight is lost.
2176                    self.set_loss_detection_timer(now, path_id);
2177                }
2178            }
2179            NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
2180                let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
2181                debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
2182                let cid_state = self
2183                    .local_cid_state
2184                    .entry(path_id)
2185                    .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
2186                cid_state.new_cids(&ids, now);
2187
2188                ids.into_iter().rev().for_each(|frame| {
2189                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
2190                });
2191                // Always update Timer::PushNewCid
2192                self.reset_cid_retirement(now);
2193            }
2194        }
2195    }
2196
2197    /// Updates the network path for `path_id`.
2198    ///
2199    /// Returns true if a packet coming in for this `path_id` over given `network_path` should be discarded.
2200    /// Returns false if the path was updated and the packet doesn't need to be discarded.
2201    fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
2202        let remote_may_migrate = self.side.remote_may_migrate(&self.state);
2203        let local_ip_may_migrate = self.side.is_client();
2204        // If this packet could initiate a migration and we're a client or a server that
2205        // forbids migration, drop the datagram. This could be relaxed to heuristically
2206        // permit NAT-rebinding-like migration.
2207        if let Some(known_path) = self.path_mut(path_id) {
2208            if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
2209                trace!(
2210                    %path_id,
2211                    %network_path,
2212                    %known_path.network_path,
2213                    "discarding packet from unrecognized peer"
2214                );
2215                return true;
2216            }
2217
2218            if known_path.network_path.local_ip.is_some()
2219                && network_path.local_ip.is_some()
2220                && known_path.network_path.local_ip != network_path.local_ip
2221                && !local_ip_may_migrate
2222            {
2223                trace!(
2224                    %path_id,
2225                    %network_path,
2226                    %known_path.network_path,
2227                    "discarding packet sent to incorrect interface"
2228                );
2229                return true;
2230            }
2231            // If the datagram indicates that we've changed our local IP, we update it.
2232            // This is alluded to in Section 5.2 of the Multipath RFC draft 18:
2233            // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#name-using-multiple-paths-on-the
2234            // > Client receives the packet, recognizes a path migration, updates the source address of path 2 to 192.0.2.1.
2235            if let Some(local_ip) = network_path.local_ip {
2236                if known_path
2237                    .network_path
2238                    .local_ip
2239                    .is_some_and(|ip| ip != local_ip)
2240                {
2241                    debug!(
2242                        %path_id,
2243                        %network_path,
2244                        %known_path.network_path,
2245                        "path's local address seemingly migrated"
2246                    );
2247                }
2248                // We update the address without path validation on the client side.
2249                // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#section-5.1
2250                // > Servers observing a 4-tuple change will perform path validation (see Section 9 of [QUIC-TRANSPORT]).
2251                // This sounds like it's *only* the server endpoints that do this.
2252                // TODO(matheus23): We should still consider doing a proper migration on the client side in the future.
2253                // For now, this preserves the behavior of this code pre 4-tuple tracking.
2254                known_path.network_path.local_ip = Some(local_ip);
2255            }
2256        }
2257        false
2258    }
2259
2260    /// Process timer expirations
2261    ///
2262    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
2263    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
2264    /// methods.
2265    ///
2266    /// It is most efficient to call this immediately after the system clock reaches the latest
2267    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
2268    /// no-op and therefore are safe.
2269    pub fn handle_timeout(&mut self, now: Instant) {
2270        while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
2271            // TODO(@divma): remove `at` when the unicorn is born
2272            trace!(?timer, at=?now, "timeout");
2273            match timer {
2274                Timer::Conn(timer) => match timer {
2275                    ConnTimer::Close => {
2276                        self.state.move_to_drained(None);
2277                        // move_to_drained checks that we weren't in drained before.
2278                        // Adding events to endpoint_events is only legal if `Drained` was never queued before.
2279                        self.endpoint_events.push_back(EndpointEventInner::Drained);
2280                    }
2281                    ConnTimer::Idle => {
2282                        self.kill(ConnectionError::TimedOut);
2283                    }
2284                    ConnTimer::KeepAlive => {
2285                        trace!("sending keep-alive");
2286                        self.ping();
2287                    }
2288                    ConnTimer::KeyDiscard => {
2289                        self.crypto_state.discard_temporary_keys();
2290                    }
2291                    ConnTimer::PushNewCid => {
2292                        while let Some((path_id, when)) = self.next_cid_retirement() {
2293                            if when > now {
2294                                break;
2295                            }
2296                            match self.local_cid_state.get_mut(&path_id) {
2297                                None => error!(%path_id, "No local CID state for path"),
2298                                Some(cid_state) => {
2299                                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
2300                                    let num_new_cid = cid_state.on_cid_timeout().into();
2301                                    if !self.state.is_closed() {
2302                                        trace!(
2303                                            "push a new CID to peer RETIRE_PRIOR_TO field {}",
2304                                            cid_state.retire_prior_to()
2305                                        );
2306                                        self.endpoint_events.push_back(
2307                                            EndpointEventInner::NeedIdentifiers(
2308                                                path_id,
2309                                                now,
2310                                                num_new_cid,
2311                                            ),
2312                                        );
2313                                    }
2314                                }
2315                            }
2316                        }
2317                    }
2318                },
2319                // TODO: add path_id as span somehow
2320                Timer::PerPath(path_id, timer) => {
2321                    let span = trace_span!("per-path timer fired", %path_id, ?timer);
2322                    let _guard = span.enter();
2323                    match timer {
2324                        PathTimer::PathIdle => {
2325                            if let Err(err) =
2326                                self.close_path_inner(now, path_id, PathAbandonReason::TimedOut)
2327                            {
2328                                warn!(?err, "failed closing path");
2329                            }
2330                        }
2331
2332                        PathTimer::PathKeepAlive => {
2333                            trace!("sending keep-alive on path");
2334                            self.ping_path(path_id).ok();
2335                        }
2336                        PathTimer::LossDetection => {
2337                            self.on_loss_detection_timeout(now, path_id);
2338                            self.qlog.emit_recovery_metrics(
2339                                path_id,
2340                                &mut self.paths.get_mut(&path_id).unwrap().data,
2341                                now,
2342                            );
2343                        }
2344                        PathTimer::PathValidation => {
2345                            let Some(path) = self.paths.get_mut(&path_id) else {
2346                                continue;
2347                            };
2348                            self.timers.stop(
2349                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2350                                self.qlog.with_time(now),
2351                            );
2352                            debug!("path validation failed");
2353                            if let Some((_, prev)) = path.prev.take() {
2354                                path.data = prev;
2355                            }
2356                            path.data.reset_on_path_challenges();
2357                        }
2358                        PathTimer::PathChallengeLost => {
2359                            let Some(path) = self.paths.get_mut(&path_id) else {
2360                                continue;
2361                            };
2362                            trace!("path challenge deemed lost");
2363                            path.data.pending_on_path_challenge = true;
2364                        }
2365                        PathTimer::PathOpen => {
2366                            let Some(path) = self.paths.get_mut(&path_id) else {
2367                                continue;
2368                            };
2369                            path.data.reset_on_path_challenges();
2370                            self.timers.stop(
2371                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2372                                self.qlog.with_time(now),
2373                            );
2374                            debug!("new path validation failed");
2375                            if let Err(err) = self.close_path_inner(
2376                                now,
2377                                path_id,
2378                                PathAbandonReason::ValidationFailed,
2379                            ) {
2380                                warn!(?err, "failed closing path");
2381                            }
2382                        }
2383                        PathTimer::Pacing => trace!("pacing timer expired"),
2384                        PathTimer::MaxAckDelay => {
2385                            trace!("max ack delay reached");
2386                            // This timer is only armed in the Data space
2387                            self.spaces[SpaceId::Data]
2388                                .for_path(path_id)
2389                                .pending_acks
2390                                .on_max_ack_delay_timeout()
2391                        }
2392                        PathTimer::DiscardPath => {
2393                            // The path was abandoned and 3*PTO has expired since.  Clean up all
2394                            // remaining state and install stateless reset token.
2395                            self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2396                            if let Some(local_cid_state) = self.local_cid_state.remove(&path_id) {
2397                                debug_assert!(!self.state.is_drained()); // requirement for endpoint_events. All timers should be cleared in drained connections.
2398                                let (min_seq, max_seq) = local_cid_state.active_seq();
2399                                for seq in min_seq..=max_seq {
2400                                    self.endpoint_events.push_back(
2401                                        EndpointEventInner::RetireConnectionId(
2402                                            now, path_id, seq, false,
2403                                        ),
2404                                    );
2405                                }
2406                            }
2407                            self.discard_path(path_id, now);
2408                        }
2409                    }
2410                }
2411            }
2412        }
2413    }
2414
2415    /// Close a connection immediately
2416    ///
2417    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
2418    /// call this only when all important communications have been completed, e.g. by calling
2419    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
2420    /// [`StreamEvent::Finished`] event.
2421    ///
2422    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
2423    /// delivered. There may still be data from the peer that has not been received.
2424    ///
2425    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
2426    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2427        self.close_inner(
2428            now,
2429            Close::Application(frame::ApplicationClose { error_code, reason }),
2430        )
2431    }
2432
2433    /// Close the connection immediately, initiated by an API call.
2434    ///
2435    /// This will not produce a [`ConnectionLost`] event propagated by the
2436    /// [`Connection::poll`] call, because the API call already propagated the error to the
2437    /// user.
2438    ///
2439    /// Not to be used when entering immediate close due to an internal state change based
2440    /// on an event. See [`State::move_to_closed_local`] for details.
2441    ///
2442    /// This initiates immediate close from
2443    /// <https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2>, moving to the closed
2444    /// state.
2445    ///
2446    /// [`ConnectionLost`]: crate::Event::ConnectionLost
2447    /// [`Connection::poll`]: super::Connection::poll
2448    fn close_inner(&mut self, now: Instant, reason: Close) {
2449        let was_closed = self.state.is_closed();
2450        if !was_closed {
2451            self.close_common();
2452            self.set_close_timer(now);
2453            self.connection_close_pending = true;
2454            self.state.move_to_closed_local(reason);
2455        }
2456    }
2457
2458    /// Control datagrams
2459    pub fn datagrams(&mut self) -> Datagrams<'_> {
2460        Datagrams { conn: self }
2461    }
2462
2463    /// Returns connection statistics
2464    pub fn stats(&mut self) -> ConnectionStats {
2465        self.stats.clone()
2466    }
2467
2468    /// Returns path statistics
2469    pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2470        let path = self.paths.get(&path_id)?;
2471        let stats = self.path_stats.entry(path_id).or_default();
2472        stats.rtt = path.data.rtt.get();
2473        stats.cwnd = path.data.congestion.window();
2474        stats.current_mtu = path.data.mtud.current_mtu();
2475        Some(*stats)
2476    }
2477
2478    /// Ping the remote endpoint
2479    ///
2480    /// Causes an ACK-eliciting packet to be transmitted on the connection.
2481    pub fn ping(&mut self) {
2482        // TODO(flub): This is very brute-force: it pings *all* the paths.  Instead it would
2483        //    be nice if we could only send a single packet for this.
2484        for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2485            path_data.ping_pending = true;
2486        }
2487    }
2488
2489    /// Ping the remote endpoint over a specific path
2490    ///
2491    /// Causes an ACK-eliciting packet to be transmitted on the path.
2492    pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2493        let path_data = self.spaces[self.highest_space]
2494            .number_spaces
2495            .get_mut(&path)
2496            .ok_or(ClosedPath { _private: () })?;
2497        path_data.ping_pending = true;
2498        Ok(())
2499    }
2500
2501    /// Update traffic keys spontaneously
2502    ///
2503    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
2504    pub fn force_key_update(&mut self) {
2505        if !self.state.is_established() {
2506            debug!("ignoring forced key update in illegal state");
2507            return;
2508        }
2509        if self.crypto_state.prev_crypto.is_some() {
2510            // We already just updated, or are currently updating, the keys. Concurrent key updates
2511            // are illegal.
2512            debug!("ignoring redundant forced key update");
2513            return;
2514        }
2515        self.crypto_state.update_keys(None, false);
2516    }
2517
2518    /// Get a session reference
2519    pub fn crypto_session(&self) -> &dyn crypto::Session {
2520        self.crypto_state.session.as_ref()
2521    }
2522
2523    /// Whether the connection is in the process of being established
2524    ///
2525    /// If this returns `false`, the connection may be either established or closed, signaled by the
2526    /// emission of a `Connected` or `ConnectionLost` message respectively.
2527    pub fn is_handshaking(&self) -> bool {
2528        self.state.is_handshake()
2529    }
2530
2531    /// Whether the connection is closed
2532    ///
2533    /// Closed connections cannot transport any further data. A connection becomes closed when
2534    /// either peer application intentionally closes it, or when either transport layer detects an
2535    /// error such as a time-out or certificate validation failure.
2536    ///
2537    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
2538    pub fn is_closed(&self) -> bool {
2539        self.state.is_closed()
2540    }
2541
2542    /// Whether there is no longer any need to keep the connection around
2543    ///
2544    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
2545    /// packets from the peer. All drained connections have been closed.
2546    pub fn is_drained(&self) -> bool {
2547        self.state.is_drained()
2548    }
2549
2550    /// For clients, if the peer accepted the 0-RTT data packets
2551    ///
2552    /// The value is meaningless until after the handshake completes.
2553    pub fn accepted_0rtt(&self) -> bool {
2554        self.crypto_state.accepted_0rtt
2555    }
2556
2557    /// Whether 0-RTT is/was possible during the handshake
2558    pub fn has_0rtt(&self) -> bool {
2559        self.crypto_state.zero_rtt_enabled
2560    }
2561
2562    /// Whether there are any pending retransmits
2563    pub fn has_pending_retransmits(&self) -> bool {
2564        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2565    }
2566
2567    /// Look up whether we're the client or server of this Connection
2568    pub fn side(&self) -> Side {
2569        self.side.side()
2570    }
2571
2572    /// Get the address observed by the remote over the given path
2573    pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2574        self.path(path_id)
2575            .map(|path_data| {
2576                path_data
2577                    .last_observed_addr_report
2578                    .as_ref()
2579                    .map(|observed| observed.socket_addr())
2580            })
2581            .ok_or(ClosedPath { _private: () })
2582    }
2583
2584    /// Current best estimate of this connection's latency (round-trip-time)
2585    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2586        self.path(path_id).map(|d| d.rtt.get())
2587    }
2588
2589    /// Current state of this connection's congestion controller, for debugging purposes
2590    pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2591        self.path(path_id).map(|d| d.congestion.as_ref())
2592    }
2593
2594    /// Modify the number of remotely initiated streams that may be concurrently open
2595    ///
2596    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
2597    /// `count`s increase both minimum and worst-case memory consumption.
2598    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2599        self.streams.set_max_concurrent(dir, count);
2600        // If the limit was reduced, then a flow control update previously deemed insignificant may
2601        // now be significant.
2602        let pending = &mut self.spaces[SpaceId::Data].pending;
2603        self.streams.queue_max_stream_id(pending);
2604    }
2605
2606    /// Modify the number of open paths allowed when multipath is enabled
2607    ///
2608    /// When reducing the number of concurrent paths this will only affect delaying sending
2609    /// new MAX_PATH_ID frames until fewer than this number of paths are possible.  To
2610    /// actively reduce paths they must be closed using [`Connection::close_path`], which
2611    /// can also be used to close not-yet-opened paths.
2612    ///
2613    /// If multipath is not negotiated (see the [`TransportConfig`]) this can not enable
2614    /// multipath and will fail.
2615    pub fn set_max_concurrent_paths(
2616        &mut self,
2617        now: Instant,
2618        count: NonZeroU32,
2619    ) -> Result<(), MultipathNotNegotiated> {
2620        if !self.is_multipath_negotiated() {
2621            return Err(MultipathNotNegotiated { _private: () });
2622        }
2623        self.max_concurrent_paths = count;
2624
2625        let in_use_count = self
2626            .local_max_path_id
2627            .next()
2628            .saturating_sub(self.abandoned_paths.len() as u32)
2629            .as_u32();
2630        let extra_needed = count.get().saturating_sub(in_use_count);
2631        let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2632
2633        self.set_max_path_id(now, new_max_path_id);
2634
2635        Ok(())
2636    }
2637
2638    /// If needed, issues a new MAX_PATH_ID frame and new CIDs for any newly allowed paths
2639    fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2640        if max_path_id <= self.local_max_path_id {
2641            return;
2642        }
2643
2644        self.local_max_path_id = max_path_id;
2645        self.spaces[SpaceId::Data].pending.max_path_id = true;
2646
2647        self.issue_first_path_cids(now);
2648    }
2649
2650    /// Current number of remotely initiated streams that may be concurrently open
2651    ///
2652    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
2653    /// it will not change immediately, even if fewer streams are open. Instead, it will
2654    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
2655    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2656        self.streams.max_concurrent(dir)
2657    }
2658
2659    /// See [`TransportConfig::send_window()`]
2660    pub fn set_send_window(&mut self, send_window: u64) {
2661        self.streams.set_send_window(send_window);
2662    }
2663
2664    /// See [`TransportConfig::receive_window()`]
2665    pub fn set_receive_window(&mut self, receive_window: VarInt) {
2666        if self.streams.set_receive_window(receive_window) {
2667            self.spaces[SpaceId::Data].pending.max_data = true;
2668        }
2669    }
2670
2671    /// Whether the Multipath for QUIC extension is enabled.
2672    ///
2673    /// Multipath is only enabled after the handshake is completed and if it was enabled by both
2674    /// peers.
2675    pub fn is_multipath_negotiated(&self) -> bool {
2676        !self.is_handshaking()
2677            && self.config.max_concurrent_multipath_paths.is_some()
2678            && self.peer_params.initial_max_path_id.is_some()
2679    }
2680
2681    fn on_ack_received(
2682        &mut self,
2683        now: Instant,
2684        space: SpaceId,
2685        ack: frame::Ack,
2686    ) -> Result<(), TransportError> {
2687        // All ACKs are referencing path 0
2688        let path = PathId::ZERO;
2689        self.inner_on_ack_received(now, space, path, ack)
2690    }
2691
2692    fn on_path_ack_received(
2693        &mut self,
2694        now: Instant,
2695        space: SpaceId,
2696        path_ack: frame::PathAck,
2697    ) -> Result<(), TransportError> {
2698        let (ack, path) = path_ack.into_ack();
2699        self.inner_on_ack_received(now, space, path, ack)
2700    }
2701
2702    /// Handles an ACK frame acknowledging packets sent on *path*.
2703    fn inner_on_ack_received(
2704        &mut self,
2705        now: Instant,
2706        space: SpaceId,
2707        path: PathId,
2708        ack: frame::Ack,
2709    ) -> Result<(), TransportError> {
2710        if self.abandoned_paths.contains(&path) {
2711            // See also https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.4.3-3
2712            // > PATH_ACK frames received with an abandoned path ID are silently ignored, as specified in Section 4.
2713            trace!("silently ignoring PATH_ACK on abandoned path");
2714            return Ok(());
2715        }
2716        if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2717            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2718        }
2719        let new_largest = {
2720            let space = &mut self.spaces[space].for_path(path);
2721            if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2722                space.largest_acked_packet = Some(ack.largest);
2723                if let Some(info) = space.sent_packets.get(ack.largest) {
2724                    // This should always succeed, but a misbehaving peer might ACK a packet we
2725                    // haven't sent. At worst, that will result in us spuriously reducing the
2726                    // congestion window.
2727                    space.largest_acked_packet_sent = info.time_sent;
2728                }
2729                true
2730            } else {
2731                false
2732            }
2733        };
2734
2735        if self.detect_spurious_loss(&ack, space, path) {
2736            self.path_data_mut(path)
2737                .congestion
2738                .on_spurious_congestion_event();
2739        }
2740
2741        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
2742        let mut newly_acked = ArrayRangeSet::new();
2743        for range in ack.iter() {
2744            self.spaces[space].for_path(path).check_ack(range.clone())?;
2745            for (pn, _) in self.spaces[space]
2746                .for_path(path)
2747                .sent_packets
2748                .iter_range(range)
2749            {
2750                newly_acked.insert_one(pn);
2751            }
2752        }
2753
2754        if newly_acked.is_empty() {
2755            return Ok(());
2756        }
2757
2758        let mut ack_eliciting_acked = false;
2759        for packet in newly_acked.elts() {
2760            if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2761                for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2762                    // Assume ACKs for all packets below the largest acknowledged in
2763                    // `packet` have been received. This can cause the peer to spuriously
2764                    // retransmit if some of our earlier ACKs were lost, but allows for
2765                    // simpler state tracking. See discussion at
2766                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
2767                    if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2768                        pns.pending_acks.subtract_below(*acked_pn);
2769                    }
2770                }
2771                ack_eliciting_acked |= info.ack_eliciting;
2772
2773                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
2774                let path_data = self.path_data_mut(path);
2775                let mtu_updated = path_data.mtud.on_acked(space.kind(), packet, info.size);
2776                if mtu_updated {
2777                    path_data
2778                        .congestion
2779                        .on_mtu_update(path_data.mtud.current_mtu());
2780                }
2781
2782                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
2783                self.ack_frequency.on_acked(path, packet);
2784
2785                self.on_packet_acked(now, path, info);
2786            }
2787        }
2788
2789        let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2790        let app_limited = self.app_limited;
2791        let path_data = self.path_data_mut(path);
2792        let in_flight = path_data.in_flight.bytes;
2793
2794        path_data
2795            .congestion
2796            .on_end_acks(now, in_flight, app_limited, largest_ackd);
2797
2798        if new_largest && ack_eliciting_acked {
2799            let ack_delay = if space != SpaceId::Data {
2800                Duration::from_micros(0)
2801            } else {
2802                cmp::min(
2803                    self.ack_frequency.peer_max_ack_delay,
2804                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2805                )
2806            };
2807            let rtt = now.saturating_duration_since(
2808                self.spaces[space].for_path(path).largest_acked_packet_sent,
2809            );
2810
2811            let next_pn = self.spaces[space].for_path(path).next_packet_number;
2812            let path_data = self.path_data_mut(path);
2813            // TODO(@divma): should be a method of path, should be contained in a single place
2814            path_data.rtt.update(ack_delay, rtt);
2815            if path_data.first_packet_after_rtt_sample.is_none() {
2816                path_data.first_packet_after_rtt_sample = Some((space.kind(), next_pn));
2817            }
2818        }
2819
2820        // Must be called before crypto/pto_count are clobbered
2821        self.detect_lost_packets(now, space, path, true);
2822
2823        if self.peer_completed_address_validation(path) {
2824            self.path_data_mut(path).pto_count = 0;
2825        }
2826
2827        // Explicit congestion notification
2828        // TODO(@divma): this code is a good example of logic that should be contained in a single
2829        // place but it's split between the path data and the packet number space data, we should
2830        // find a way to make this work without two lookups
2831        if self.path_data(path).sending_ecn {
2832            if let Some(ecn) = ack.ecn {
2833                // We only examine ECN counters from ACKs that we are certain we received in transmit
2834                // order, allowing us to compute an increase in ECN counts to compare against the number
2835                // of newly acked packets that remains well-defined in the presence of arbitrary packet
2836                // reordering.
2837                if new_largest {
2838                    let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2839                    self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2840                }
2841            } else {
2842                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
2843                debug!("ECN not acknowledged by peer");
2844                self.path_data_mut(path).sending_ecn = false;
2845            }
2846        }
2847
2848        self.set_loss_detection_timer(now, path);
2849        Ok(())
2850    }
2851
2852    fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2853        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2854
2855        if lost_packets.is_empty() {
2856            return false;
2857        }
2858
2859        for range in ack.iter() {
2860            let spurious_losses: Vec<u64> = lost_packets
2861                .iter_range(range.clone())
2862                .map(|(pn, _info)| pn)
2863                .collect();
2864
2865            for pn in spurious_losses {
2866                lost_packets.remove(pn);
2867            }
2868        }
2869
2870        // If this ACK frame acknowledged all deemed lost packets,
2871        // then we have raised a spurious congestion event in the past.
2872        // We cannot conclude when there are remaining packets,
2873        // but future ACK frames might indicate a spurious loss detection.
2874        lost_packets.is_empty()
2875    }
2876
2877    /// Drain lost packets that we reasonably think will never arrive
2878    ///
2879    /// The current criterion is copied from `msquic`:
2880    /// discard packets that were sent earlier than 2 probe timeouts ago.
2881    fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2882        let two_pto = 2 * self.path_data(path).rtt.pto_base();
2883
2884        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2885        lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2886    }
2887
2888    /// Process a new ECN block from an in-order ACK
2889    fn process_ecn(
2890        &mut self,
2891        now: Instant,
2892        space: SpaceId,
2893        path: PathId,
2894        newly_acked: u64,
2895        ecn: frame::EcnCounts,
2896        largest_sent_time: Instant,
2897    ) {
2898        match self.spaces[space]
2899            .for_path(path)
2900            .detect_ecn(newly_acked, ecn)
2901        {
2902            Err(e) => {
2903                debug!("halting ECN due to verification failure: {}", e);
2904
2905                self.path_data_mut(path).sending_ecn = false;
2906                // Wipe out the existing value because it might be garbage and could interfere with
2907                // future attempts to use ECN on new paths.
2908                self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2909            }
2910            Ok(false) => {}
2911            Ok(true) => {
2912                self.path_stats.entry(path).or_default().congestion_events += 1;
2913                self.path_data_mut(path).congestion.on_congestion_event(
2914                    now,
2915                    largest_sent_time,
2916                    false,
2917                    true,
2918                    0,
2919                );
2920            }
2921        }
2922    }
2923
2924    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
2925    // high-latency handshakes
2926    fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2927        self.paths
2928            .get_mut(&path_id)
2929            .expect("known path")
2930            .remove_in_flight(&info);
2931        let app_limited = self.app_limited;
2932        let path = self.path_data_mut(path_id);
2933        if info.ack_eliciting && !path.is_validating_path() {
2934            // Only pass ACKs to the congestion controller if we are not validating the current
2935            // path, so as to ignore any ACKs from older paths still coming in.
2936            let rtt = path.rtt;
2937            path.congestion
2938                .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2939        }
2940
2941        // Update state for confirmed delivery of frames
2942        if let Some(retransmits) = info.retransmits.get() {
2943            for (id, _) in retransmits.reset_stream.iter() {
2944                self.streams.reset_acked(*id);
2945            }
2946        }
2947
2948        for frame in info.stream_frames {
2949            self.streams.received_ack_of(frame);
2950        }
2951    }
2952
2953    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceKind) {
2954        let start = if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) {
2955            now
2956        } else {
2957            self.crypto_state
2958                .prev_crypto
2959                .as_ref()
2960                .expect("no previous keys")
2961                .end_packet
2962                .as_ref()
2963                .expect("update not acknowledged yet")
2964                .1
2965        };
2966
2967        // QUIC-MULTIPATH § 2.5 Key Phase Update Process: use largest PTO of all paths.
2968        self.timers.set(
2969            Timer::Conn(ConnTimer::KeyDiscard),
2970            start + self.max_pto_for_space(space) * 3,
2971            self.qlog.with_time(now),
2972        );
2973    }
2974
2975    /// Handle a [`PathTimer::LossDetection`] timeout.
2976    ///
2977    /// This timer expires for two reasons:
2978    /// - An ACK-eliciting packet we sent should be considered lost.
2979    /// - The PTO may have expired and a tail-loss probe needs to be scheduled.
2980    ///
2981    /// The former needs us to schedule re-transmission of the lost data.
2982    ///
2983    /// The latter means we have not received an ACK for an ack-eliciting packet we sent
2984    /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
2985    /// packet, to try and elicit new acknowledgements. These new acknowledgements will
2986    /// indicate whether the previously sent packets were lost or not.
2987    fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2988        if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2989            // Time threshold loss Detection
2990            self.detect_lost_packets(now, pn_space, path_id, false);
2991            self.set_loss_detection_timer(now, path_id);
2992            return;
2993        }
2994
2995        let Some((_, space)) = self.pto_time_and_space(now, path_id) else {
2996            error!(%path_id, "PTO expired while unset");
2997            return;
2998        };
2999        trace!(
3000            in_flight = self.path_data(path_id).in_flight.bytes,
3001            count = self.path_data(path_id).pto_count,
3002            ?space,
3003            %path_id,
3004            "PTO fired"
3005        );
3006
3007        let count = match self.path_data(path_id).in_flight.ack_eliciting {
3008            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
3009            // deadlock preventions
3010            0 => {
3011                debug_assert!(!self.peer_completed_address_validation(path_id));
3012                1
3013            }
3014            // Conventional loss probe
3015            _ => 2,
3016        };
3017        let pns = self.spaces[space].for_path(path_id);
3018        pns.loss_probes = pns.loss_probes.saturating_add(count);
3019        let path_data = self.path_data_mut(path_id);
3020        path_data.pto_count = path_data.pto_count.saturating_add(1);
3021        self.set_loss_detection_timer(now, path_id);
3022    }
3023
3024    /// Detect any lost packets
3025    ///
3026    /// There are two cases in which we detects lost packets:
3027    ///
3028    /// - We received an ACK packet.
3029    /// - The [`PathTimer::LossDetection`] timer expired. So there is an un-acknowledged packet
3030    ///   that was followed by an acknowledged packet. The loss timer for this
3031    ///   un-acknowledged packet expired and we need to detect that packet as lost.
3032    ///
3033    /// Packets are lost if they are both (See RFC9002 §6.1):
3034    ///
3035    /// - Unacknowledged, in flight and sent prior to an acknowledged packet.
3036    /// - Old enough by either:
3037    ///   - Having a packet number [`TransportConfig::packet_threshold`] lower then the last
3038    ///     acknowledged packet.
3039    ///   - Being sent [`TransportConfig::time_threshold`] * RTT in the past.
3040    fn detect_lost_packets(
3041        &mut self,
3042        now: Instant,
3043        pn_space: SpaceId,
3044        path_id: PathId,
3045        due_to_ack: bool,
3046    ) {
3047        let mut lost_packets = Vec::<u64>::new();
3048        let mut lost_mtu_probe = None;
3049        let mut in_persistent_congestion = false;
3050        let mut size_of_lost_packets = 0u64;
3051        self.spaces[pn_space].for_path(path_id).loss_time = None;
3052
3053        // Find all the lost packets, populating all variables initialised above.
3054
3055        let path = self.path_data(path_id);
3056        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3057        let loss_delay = path
3058            .rtt
3059            .conservative()
3060            .mul_f32(self.config.time_threshold)
3061            .max(TIMER_GRANULARITY);
3062        let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
3063
3064        let largest_acked_packet = self.spaces[pn_space]
3065            .for_path(path_id)
3066            .largest_acked_packet
3067            .expect("detect_lost_packets only to be called if path received at least one ACK");
3068        let packet_threshold = self.config.packet_threshold as u64;
3069
3070        // InPersistentCongestion: Determine if all packets in the time period before the newest
3071        // lost packet, including the edges, are marked lost. PTO computation must always
3072        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
3073        let congestion_period = self
3074            .pto(SpaceKind::Data, path_id)
3075            .saturating_mul(self.config.persistent_congestion_threshold);
3076        let mut persistent_congestion_start: Option<Instant> = None;
3077        let mut prev_packet = None;
3078        let space = self.spaces[pn_space].for_path(path_id);
3079
3080        for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
3081            if prev_packet != Some(packet.wrapping_sub(1)) {
3082                // An intervening packet was acknowledged
3083                persistent_congestion_start = None;
3084            }
3085
3086            // Packets sent before now - loss_delay are deemed lost.
3087            // However, we avoid subtraction as it can panic and there's no
3088            // saturating equivalent of this subtraction operation with a Duration.
3089            let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
3090            if packet_too_old || largest_acked_packet >= packet + packet_threshold {
3091                // The packet should be declared lost.
3092                if Some(packet) == in_flight_mtu_probe {
3093                    // Lost MTU probes are not included in `lost_packets`, because they
3094                    // should not trigger a congestion control response
3095                    lost_mtu_probe = in_flight_mtu_probe;
3096                } else {
3097                    lost_packets.push(packet);
3098                    size_of_lost_packets += info.size as u64;
3099                    if info.ack_eliciting && due_to_ack {
3100                        match persistent_congestion_start {
3101                            // Two ACK-eliciting packets lost more than
3102                            // congestion_period apart, with no ACKed packets in between
3103                            Some(start) if info.time_sent - start > congestion_period => {
3104                                in_persistent_congestion = true;
3105                            }
3106                            // Persistent congestion must start after the first RTT sample
3107                            None if first_packet_after_rtt_sample
3108                                .is_some_and(|x| x < (pn_space.kind(), packet)) =>
3109                            {
3110                                persistent_congestion_start = Some(info.time_sent);
3111                            }
3112                            _ => {}
3113                        }
3114                    }
3115                }
3116            } else {
3117                // The packet should not yet be declared lost.
3118                if space.loss_time.is_none() {
3119                    // Since we iterate in order the lowest packet number's loss time will
3120                    // always be the earliest.
3121                    space.loss_time = Some(info.time_sent + loss_delay);
3122                }
3123                persistent_congestion_start = None;
3124            }
3125
3126            prev_packet = Some(packet);
3127        }
3128
3129        self.handle_lost_packets(
3130            pn_space,
3131            path_id,
3132            now,
3133            lost_packets,
3134            lost_mtu_probe,
3135            loss_delay,
3136            in_persistent_congestion,
3137            size_of_lost_packets,
3138        );
3139    }
3140
3141    /// Drops the path state, declaring any remaining in-flight packets as lost
3142    fn discard_path(&mut self, path_id: PathId, now: Instant) {
3143        trace!(%path_id, "dropping path state");
3144        let path = self.path_data(path_id);
3145        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3146
3147        let mut size_of_lost_packets = 0u64; // add to path_stats.lost_bytes;
3148        let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
3149            .for_path(path_id)
3150            .sent_packets
3151            .iter()
3152            .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
3153            .map(|(pn, info)| {
3154                size_of_lost_packets += info.size as u64;
3155                pn
3156            })
3157            .collect();
3158
3159        if !lost_pns.is_empty() {
3160            trace!(
3161                %path_id,
3162                count = lost_pns.len(),
3163                lost_bytes = size_of_lost_packets,
3164                "packets lost on path abandon"
3165            );
3166            self.handle_lost_packets(
3167                SpaceId::Data,
3168                path_id,
3169                now,
3170                lost_pns,
3171                in_flight_mtu_probe,
3172                Duration::ZERO,
3173                false,
3174                size_of_lost_packets,
3175            );
3176        }
3177        // Before removing the path, we fetch the final path stats via `Self::path_stats`.
3178        // This updates some values for the last time.
3179        let path_stats = self.path_stats(path_id).unwrap_or_default();
3180        self.path_stats.remove(&path_id);
3181        self.paths.remove(&path_id);
3182        self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
3183
3184        self.events.push_back(
3185            PathEvent::Discarded {
3186                id: path_id,
3187                path_stats,
3188            }
3189            .into(),
3190        );
3191    }
3192
3193    fn handle_lost_packets(
3194        &mut self,
3195        pn_space: SpaceId,
3196        path_id: PathId,
3197        now: Instant,
3198        lost_packets: Vec<u64>,
3199        lost_mtu_probe: Option<u64>,
3200        loss_delay: Duration,
3201        in_persistent_congestion: bool,
3202        size_of_lost_packets: u64,
3203    ) {
3204        debug_assert!(
3205            {
3206                let mut sorted = lost_packets.clone();
3207                sorted.sort();
3208                sorted == lost_packets
3209            },
3210            "lost_packets must be sorted"
3211        );
3212
3213        self.drain_lost_packets(now, pn_space, path_id);
3214
3215        // OnPacketsLost
3216        if let Some(largest_lost) = lost_packets.last().cloned() {
3217            let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
3218            let largest_lost_sent = self.spaces[pn_space]
3219                .for_path(path_id)
3220                .sent_packets
3221                .get(largest_lost)
3222                .unwrap()
3223                .time_sent;
3224            let path_stats = self.path_stats.entry(path_id).or_default();
3225            path_stats.lost_packets += lost_packets.len() as u64;
3226            path_stats.lost_bytes += size_of_lost_packets;
3227            trace!(
3228                %path_id,
3229                count = lost_packets.len(),
3230                lost_bytes = size_of_lost_packets,
3231                "packets lost",
3232            );
3233
3234            for &packet in &lost_packets {
3235                let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
3236                    continue;
3237                };
3238                self.qlog
3239                    .emit_packet_lost(packet, &info, loss_delay, pn_space.kind(), now);
3240                self.paths
3241                    .get_mut(&path_id)
3242                    .unwrap()
3243                    .remove_in_flight(&info);
3244
3245                for frame in info.stream_frames {
3246                    self.streams.retransmit(frame);
3247                }
3248                self.spaces[pn_space].pending |= info.retransmits;
3249                self.path_data_mut(path_id)
3250                    .mtud
3251                    .on_non_probe_lost(packet, info.size);
3252
3253                self.spaces[pn_space].for_path(path_id).lost_packets.insert(
3254                    packet,
3255                    LostPacket {
3256                        time_sent: info.time_sent,
3257                    },
3258                );
3259            }
3260
3261            let path = self.path_data_mut(path_id);
3262            if path.mtud.black_hole_detected(now) {
3263                path.congestion.on_mtu_update(path.mtud.current_mtu());
3264                if let Some(max_datagram_size) = self.datagrams().max_size()
3265                    && self.datagrams.drop_oversized(max_datagram_size)
3266                    && self.datagrams.send_blocked
3267                {
3268                    self.datagrams.send_blocked = false;
3269                    self.events.push_back(Event::DatagramsUnblocked);
3270                }
3271                self.path_stats
3272                    .entry(path_id)
3273                    .or_default()
3274                    .black_holes_detected += 1;
3275            }
3276
3277            // Don't apply congestion penalty for lost ack-only packets
3278            let lost_ack_eliciting =
3279                old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
3280
3281            if lost_ack_eliciting {
3282                self.path_stats
3283                    .entry(path_id)
3284                    .or_default()
3285                    .congestion_events += 1;
3286                self.path_data_mut(path_id).congestion.on_congestion_event(
3287                    now,
3288                    largest_lost_sent,
3289                    in_persistent_congestion,
3290                    false,
3291                    size_of_lost_packets,
3292                );
3293            }
3294        }
3295
3296        // Handle a lost MTU probe
3297        if let Some(packet) = lost_mtu_probe {
3298            let info = self.spaces[SpaceId::Data]
3299                .for_path(path_id)
3300                .take(packet)
3301                .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and
3302            // therefore must not have been removed yet
3303            self.paths
3304                .get_mut(&path_id)
3305                .unwrap()
3306                .remove_in_flight(&info);
3307            self.path_data_mut(path_id).mtud.on_probe_lost();
3308            self.path_stats
3309                .entry(path_id)
3310                .or_default()
3311                .lost_plpmtud_probes += 1;
3312        }
3313    }
3314
3315    /// Returns the earliest time packets should be declared lost for all spaces on a path.
3316    ///
3317    /// If a path has an acknowledged packet with any prior un-acknowledged packets, the
3318    /// earliest un-acknowledged packet can be declared lost after a timeout has elapsed.
3319    /// The time returned is when this packet should be declared lost.
3320    fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
3321        SpaceId::iter()
3322            .filter_map(|id| {
3323                self.spaces[id]
3324                    .number_spaces
3325                    .get(&path_id)
3326                    .and_then(|pns| pns.loss_time)
3327                    .map(|time| (time, id))
3328            })
3329            .min_by_key(|&(time, _)| time)
3330    }
3331
3332    /// Returns the earliest next PTO should fire for all spaces on a path.
3333    fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3334        let path = self.path(path_id)?;
3335        let pto_count = path.pto_count;
3336        let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
3337        let mut duration = path.rtt.pto_base() * backoff;
3338
3339        if path_id == PathId::ZERO
3340            && path.in_flight.ack_eliciting == 0
3341            && !self.peer_completed_address_validation(PathId::ZERO)
3342        {
3343            // Address Validation during Connection Establishment:
3344            // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
3345            // deadlock if an Initial or Handshake packet from the server is lost and the
3346            // server can not send more due to its anti-amplification limit the client must
3347            // send another packet on PTO.
3348            let space = match self.highest_space {
3349                SpaceKind::Handshake => SpaceId::Handshake,
3350                _ => SpaceId::Initial,
3351            };
3352
3353            return Some((now + duration, space));
3354        }
3355
3356        let mut result = None;
3357        for space in SpaceId::iter() {
3358            let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3359                continue;
3360            };
3361
3362            if !pns.has_in_flight() {
3363                continue;
3364            }
3365            if space == SpaceId::Data {
3366                // Skip ApplicationData until handshake completes.
3367                if self.is_handshaking() {
3368                    return result;
3369                }
3370                // Include max_ack_delay and backoff for ApplicationData.
3371                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3372            }
3373            let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3374                continue;
3375            };
3376            let pto = last_ack_eliciting + duration;
3377            if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3378                if path.anti_amplification_blocked(1) {
3379                    // Nothing would be able to be sent.
3380                    continue;
3381                }
3382                if path.in_flight.ack_eliciting == 0 {
3383                    // Nothing ack-eliciting, no PTO to arm/fire.
3384                    continue;
3385                }
3386                result = Some((pto, space));
3387            }
3388        }
3389        result
3390    }
3391
3392    fn peer_completed_address_validation(&self, path: PathId) -> bool {
3393        // TODO(flub): This logic needs updating for multipath
3394        if self.side.is_server() || self.state.is_closed() {
3395            return true;
3396        }
3397        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
3398        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
3399        self.spaces[SpaceId::Handshake]
3400            .path_space(PathId::ZERO)
3401            .and_then(|pns| pns.largest_acked_packet)
3402            .is_some()
3403            || self.spaces[SpaceId::Data]
3404                .path_space(path)
3405                .and_then(|pns| pns.largest_acked_packet)
3406                .is_some()
3407            || (self.crypto_state.has_keys(EncryptionLevel::OneRtt)
3408                && !self.crypto_state.has_keys(EncryptionLevel::Handshake))
3409    }
3410
3411    /// Resets the the [`PathTimer::LossDetection`] timer to the next instant it may be needed
3412    ///
3413    /// The timer must fire if either:
3414    /// - An ack-eliciting packet we sent needs to be declared lost.
3415    /// - A tail-loss probe needs to be sent.
3416    ///
3417    /// See [`Connection::on_loss_detection_timeout`] for details.
3418    fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3419        if self.state.is_closed() {
3420            // No loss detection takes place on closed connections, and `close_common` already
3421            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
3422            // reordered packet being handled by state-insensitive code.
3423            return;
3424        }
3425
3426        if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3427            // Time threshold loss detection.
3428            self.timers.set(
3429                Timer::PerPath(path_id, PathTimer::LossDetection),
3430                loss_time,
3431                self.qlog.with_time(now),
3432            );
3433            return;
3434        }
3435
3436        // Determine which PN space to arm PTO for.
3437        // Calculate PTO duration
3438        if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3439            self.timers.set(
3440                Timer::PerPath(path_id, PathTimer::LossDetection),
3441                timeout,
3442                self.qlog.with_time(now),
3443            );
3444        } else {
3445            self.timers.stop(
3446                Timer::PerPath(path_id, PathTimer::LossDetection),
3447                self.qlog.with_time(now),
3448            );
3449        }
3450    }
3451
3452    /// The maximum probe timeout across all paths
3453    ///
3454    /// See [`Connection::pto`]
3455    fn max_pto_for_space(&self, space: SpaceKind) -> Duration {
3456        self.paths
3457            .keys()
3458            .map(|path_id| self.pto(space, *path_id))
3459            .max()
3460            .expect("there should be at least one path")
3461    }
3462
3463    /// Probe Timeout
3464    ///
3465    /// The PTO is logically the time in which you'd expect to receive an acknowledgement
3466    /// for a packet. So approximately RTT + max_ack_delay.
3467    fn pto(&self, space: SpaceKind, path_id: PathId) -> Duration {
3468        let max_ack_delay = match space {
3469            SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3470            SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3471        };
3472        self.path_data(path_id).rtt.pto_base() + max_ack_delay
3473    }
3474
3475    fn on_packet_authenticated(
3476        &mut self,
3477        now: Instant,
3478        space_id: SpaceKind,
3479        path_id: PathId,
3480        ecn: Option<EcnCodepoint>,
3481        packet: Option<u64>,
3482        spin: bool,
3483        is_1rtt: bool,
3484    ) {
3485        self.total_authed_packets += 1;
3486        self.reset_keep_alive(path_id, now);
3487        self.reset_idle_timeout(now, space_id, path_id);
3488        self.permit_idle_reset = true;
3489        self.receiving_ecn |= ecn.is_some();
3490        if let Some(x) = ecn {
3491            let space = &mut self.spaces[space_id];
3492            space.for_path(path_id).ecn_counters += x;
3493
3494            if x.is_ce() {
3495                space
3496                    .for_path(path_id)
3497                    .pending_acks
3498                    .set_immediate_ack_required();
3499            }
3500        }
3501
3502        let Some(packet) = packet else {
3503            return;
3504        };
3505        match &self.side {
3506            ConnectionSide::Client { .. } => {
3507                // If we received a handshake packet that authenticated, then we're talking to
3508                // the real server.  From now on we should no longer allow the server to migrate
3509                // its address.
3510                if space_id == SpaceKind::Handshake
3511                    && let Some(hs) = self.state.as_handshake_mut()
3512                {
3513                    hs.allow_server_migration = false;
3514                }
3515            }
3516            ConnectionSide::Server { .. } => {
3517                if self.crypto_state.has_keys(EncryptionLevel::Initial)
3518                    && space_id == SpaceKind::Handshake
3519                {
3520                    // A server stops sending and processing Initial packets when it receives its first Handshake packet.
3521                    self.discard_space(now, SpaceKind::Initial);
3522                }
3523                if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) && is_1rtt {
3524                    // Discard 0-RTT keys soon after receiving a 1-RTT packet
3525                    self.set_key_discard_timer(now, space_id)
3526                }
3527            }
3528        }
3529        let space = self.spaces[space_id].for_path(path_id);
3530        space.pending_acks.insert_one(packet, now);
3531        if packet >= space.rx_packet.unwrap_or_default() {
3532            space.rx_packet = Some(packet);
3533            // Update outgoing spin bit, inverting iff we're the client
3534            self.spin = self.side.is_client() ^ spin;
3535        }
3536    }
3537
3538    /// Resets the idle timeout timers
3539    ///
3540    /// Without multipath there is only the connection-wide idle timeout. When multipath is
3541    /// enabled there is an additional per-path idle timeout.
3542    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceKind, path_id: PathId) {
3543        // First reset the global idle timeout.
3544        if let Some(timeout) = self.idle_timeout {
3545            if self.state.is_closed() {
3546                self.timers
3547                    .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3548            } else {
3549                let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space));
3550                self.timers.set(
3551                    Timer::Conn(ConnTimer::Idle),
3552                    now + dt,
3553                    self.qlog.with_time(now),
3554                );
3555            }
3556        }
3557
3558        // Now handle the per-path state
3559        if let Some(timeout) = self.path_data(path_id).idle_timeout {
3560            if self.state.is_closed() {
3561                self.timers.stop(
3562                    Timer::PerPath(path_id, PathTimer::PathIdle),
3563                    self.qlog.with_time(now),
3564                );
3565            } else {
3566                let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3567                self.timers.set(
3568                    Timer::PerPath(path_id, PathTimer::PathIdle),
3569                    now + dt,
3570                    self.qlog.with_time(now),
3571                );
3572            }
3573        }
3574    }
3575
3576    /// Resets both the [`ConnTimer::KeepAlive`] and [`PathTimer::PathKeepAlive`] timers
3577    fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3578        if !self.state.is_established() {
3579            return;
3580        }
3581
3582        if let Some(interval) = self.config.keep_alive_interval {
3583            self.timers.set(
3584                Timer::Conn(ConnTimer::KeepAlive),
3585                now + interval,
3586                self.qlog.with_time(now),
3587            );
3588        }
3589
3590        if let Some(interval) = self.path_data(path_id).keep_alive {
3591            self.timers.set(
3592                Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3593                now + interval,
3594                self.qlog.with_time(now),
3595            );
3596        }
3597    }
3598
3599    /// Sets the timer for when a previously issued CID should be retired next
3600    fn reset_cid_retirement(&mut self, now: Instant) {
3601        if let Some((_path, t)) = self.next_cid_retirement() {
3602            self.timers.set(
3603                Timer::Conn(ConnTimer::PushNewCid),
3604                t,
3605                self.qlog.with_time(now),
3606            );
3607        }
3608    }
3609
3610    /// The next time when a previously issued CID should be retired
3611    fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3612        self.local_cid_state
3613            .iter()
3614            .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3615            .min_by_key(|(_path_id, timeout)| *timeout)
3616    }
3617
3618    /// Handle the already-decrypted first packet from the client
3619    ///
3620    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
3621    /// efficient.
3622    pub(crate) fn handle_first_packet(
3623        &mut self,
3624        now: Instant,
3625        network_path: FourTuple,
3626        ecn: Option<EcnCodepoint>,
3627        packet_number: u64,
3628        packet: InitialPacket,
3629        remaining: Option<BytesMut>,
3630    ) -> Result<(), ConnectionError> {
3631        let span = trace_span!("first recv");
3632        let _guard = span.enter();
3633        debug_assert!(self.side.is_server());
3634        let len = packet.header_data.len() + packet.payload.len();
3635        let path_id = PathId::ZERO;
3636        self.path_data_mut(path_id).total_recvd = len as u64;
3637
3638        if let Some(hs) = self.state.as_handshake_mut() {
3639            hs.expected_token = packet.header.token.clone();
3640        } else {
3641            unreachable!("first packet must be delivered in Handshake state");
3642        }
3643
3644        // The first packet is always on PathId::ZERO
3645        self.on_packet_authenticated(
3646            now,
3647            SpaceKind::Initial,
3648            path_id,
3649            ecn,
3650            Some(packet_number),
3651            false,
3652            false,
3653        );
3654
3655        let packet: Packet = packet.into();
3656
3657        let mut qlog = QlogRecvPacket::new(len);
3658        qlog.header(&packet.header, Some(packet_number), path_id);
3659
3660        self.process_decrypted_packet(
3661            now,
3662            network_path,
3663            path_id,
3664            Some(packet_number),
3665            packet,
3666            &mut qlog,
3667        )?;
3668        self.qlog.emit_packet_received(qlog, now);
3669        if let Some(data) = remaining {
3670            self.handle_coalesced(now, network_path, path_id, ecn, data);
3671        }
3672
3673        self.qlog.emit_recovery_metrics(
3674            path_id,
3675            &mut self.paths.get_mut(&path_id).unwrap().data,
3676            now,
3677        );
3678
3679        Ok(())
3680    }
3681
3682    fn init_0rtt(&mut self, now: Instant) {
3683        let Some((header, packet)) = self.crypto_state.session.early_crypto() else {
3684            return;
3685        };
3686        if self.side.is_client() {
3687            match self.crypto_state.session.transport_parameters() {
3688                Ok(params) => {
3689                    let params = params
3690                        .expect("crypto layer didn't supply transport parameters with ticket");
3691                    // Certain values must not be cached
3692                    let params = TransportParameters {
3693                        initial_src_cid: None,
3694                        original_dst_cid: None,
3695                        preferred_address: None,
3696                        retry_src_cid: None,
3697                        stateless_reset_token: None,
3698                        min_ack_delay: None,
3699                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3700                        max_ack_delay: TransportParameters::default().max_ack_delay,
3701                        initial_max_path_id: None,
3702                        ..params
3703                    };
3704                    self.set_peer_params(params);
3705                    self.qlog.emit_peer_transport_params_restored(self, now);
3706                }
3707                Err(e) => {
3708                    error!("session ticket has malformed transport parameters: {}", e);
3709                    return;
3710                }
3711            }
3712        }
3713        trace!("0-RTT enabled");
3714        self.crypto_state.enable_zero_rtt(header, packet);
3715    }
3716
3717    fn read_crypto(
3718        &mut self,
3719        space: SpaceId,
3720        crypto: &frame::Crypto,
3721        payload_len: usize,
3722    ) -> Result<(), TransportError> {
3723        let expected = if !self.state.is_handshake() {
3724            SpaceId::Data
3725        } else if self.highest_space == SpaceKind::Initial {
3726            SpaceId::Initial
3727        } else {
3728            // On the server, self.highest_space can be Data after receiving the client's first
3729            // flight, but we expect Handshake CRYPTO until the handshake is complete.
3730            SpaceId::Handshake
3731        };
3732        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
3733        // packets are illegal, and we don't process 1-RTT packets until the handshake is
3734        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
3735        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3736
3737        let end = crypto.offset + crypto.data.len() as u64;
3738        if space < expected
3739            && end
3740                > self.crypto_state.spaces[space.kind()]
3741                    .crypto_stream
3742                    .bytes_read()
3743        {
3744            warn!(
3745                "received new {:?} CRYPTO data when expecting {:?}",
3746                space, expected
3747            );
3748            return Err(TransportError::PROTOCOL_VIOLATION(
3749                "new data at unexpected encryption level",
3750            ));
3751        }
3752
3753        let crypto_space = &mut self.crypto_state.spaces[space.kind()];
3754        let max = end.saturating_sub(crypto_space.crypto_stream.bytes_read());
3755        if max > self.config.crypto_buffer_size as u64 {
3756            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3757        }
3758
3759        crypto_space
3760            .crypto_stream
3761            .insert(crypto.offset, crypto.data.clone(), payload_len);
3762        while let Some(chunk) = crypto_space.crypto_stream.read(usize::MAX, true) {
3763            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3764            if self.crypto_state.session.read_handshake(&chunk.bytes)? {
3765                self.events.push_back(Event::HandshakeDataReady);
3766            }
3767        }
3768
3769        Ok(())
3770    }
3771
3772    fn write_crypto(&mut self) {
3773        loop {
3774            let space = self.highest_space;
3775            let mut outgoing = Vec::new();
3776            if let Some(crypto) = self.crypto_state.session.write_handshake(&mut outgoing) {
3777                match space {
3778                    SpaceKind::Initial => {
3779                        self.upgrade_crypto(SpaceKind::Handshake, crypto);
3780                    }
3781                    SpaceKind::Handshake => {
3782                        self.upgrade_crypto(SpaceKind::Data, crypto);
3783                    }
3784                    SpaceKind::Data => unreachable!("got updated secrets during 1-RTT"),
3785                }
3786            }
3787            if outgoing.is_empty() {
3788                if space == self.highest_space {
3789                    break;
3790                } else {
3791                    // Keys updated, check for more data to send
3792                    continue;
3793                }
3794            }
3795            let offset = self.crypto_state.spaces[space].crypto_offset;
3796            let outgoing = Bytes::from(outgoing);
3797            if let Some(hs) = self.state.as_handshake_mut()
3798                && space == SpaceKind::Initial
3799                && offset == 0
3800                && self.side.is_client()
3801            {
3802                hs.client_hello = Some(outgoing.clone());
3803            }
3804            self.crypto_state.spaces[space].crypto_offset += outgoing.len() as u64;
3805            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3806            self.spaces[space].pending.crypto.push_back(frame::Crypto {
3807                offset,
3808                data: outgoing,
3809            });
3810        }
3811    }
3812
3813    /// Switch to stronger cryptography during handshake
3814    fn upgrade_crypto(&mut self, space: SpaceKind, crypto: Keys) {
3815        debug_assert!(
3816            !self.crypto_state.has_keys(space.encryption_level()),
3817            "already reached packet space {space:?}"
3818        );
3819        trace!("{:?} keys ready", space);
3820        if space == SpaceKind::Data {
3821            // Precompute the first key update
3822            self.crypto_state.next_crypto = Some(
3823                self.crypto_state
3824                    .session
3825                    .next_1rtt_keys()
3826                    .expect("handshake should be complete"),
3827            );
3828        }
3829
3830        self.crypto_state.spaces[space].keys = Some(crypto);
3831        debug_assert!(space > self.highest_space);
3832        self.highest_space = space;
3833        if space == SpaceKind::Data && self.side.is_client() {
3834            // Discard 0-RTT keys because 1-RTT keys are available.
3835            self.crypto_state.discard_zero_rtt();
3836        }
3837    }
3838
3839    fn discard_space(&mut self, now: Instant, space: SpaceKind) {
3840        debug_assert!(space != SpaceKind::Data);
3841        trace!("discarding {:?} keys", space);
3842        if space == SpaceKind::Initial {
3843            // No longer needed
3844            if let ConnectionSide::Client { token, .. } = &mut self.side {
3845                *token = Bytes::new();
3846            }
3847        }
3848        self.crypto_state.spaces[space].keys = None;
3849        let space = &mut self.spaces[space];
3850        let pns = space.for_path(PathId::ZERO);
3851        pns.time_of_last_ack_eliciting_packet = None;
3852        pns.loss_time = None;
3853        pns.loss_probes = 0;
3854        let sent_packets = mem::take(&mut pns.sent_packets);
3855        let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3856        for (_, packet) in sent_packets.into_iter() {
3857            path.data.remove_in_flight(&packet);
3858        }
3859
3860        self.set_loss_detection_timer(now, PathId::ZERO)
3861    }
3862
3863    fn handle_coalesced(
3864        &mut self,
3865        now: Instant,
3866        network_path: FourTuple,
3867        path_id: PathId,
3868        ecn: Option<EcnCodepoint>,
3869        data: BytesMut,
3870    ) {
3871        self.path_data_mut(path_id)
3872            .inc_total_recvd(data.len() as u64);
3873        let mut remaining = Some(data);
3874        let cid_len = self
3875            .local_cid_state
3876            .values()
3877            .map(|cid_state| cid_state.cid_len())
3878            .next()
3879            .expect("one cid_state must exist");
3880        while let Some(data) = remaining {
3881            match PartialDecode::new(
3882                data,
3883                &FixedLengthConnectionIdParser::new(cid_len),
3884                &[self.version],
3885                self.endpoint_config.grease_quic_bit,
3886            ) {
3887                Ok((partial_decode, rest)) => {
3888                    remaining = rest;
3889                    self.handle_decode(now, network_path, path_id, ecn, partial_decode);
3890                }
3891                Err(e) => {
3892                    trace!("malformed header: {}", e);
3893                    return;
3894                }
3895            }
3896        }
3897    }
3898
3899    fn handle_decode(
3900        &mut self,
3901        now: Instant,
3902        network_path: FourTuple,
3903        path_id: PathId,
3904        ecn: Option<EcnCodepoint>,
3905        partial_decode: PartialDecode,
3906    ) {
3907        let qlog = QlogRecvPacket::new(partial_decode.len());
3908        if let Some(decoded) = self
3909            .crypto_state
3910            .unprotect_header(partial_decode, self.peer_params.stateless_reset_token)
3911        {
3912            self.handle_packet(
3913                now,
3914                network_path,
3915                path_id,
3916                ecn,
3917                decoded.packet,
3918                decoded.stateless_reset,
3919                qlog,
3920            );
3921        }
3922    }
3923
3924    fn handle_packet(
3925        &mut self,
3926        now: Instant,
3927        network_path: FourTuple,
3928        path_id: PathId,
3929        ecn: Option<EcnCodepoint>,
3930        packet: Option<Packet>,
3931        stateless_reset: bool,
3932        mut qlog: QlogRecvPacket,
3933    ) {
3934        self.stats.udp_rx.ios += 1;
3935        self.path_stats.entry(path_id).or_default().udp_rx.ios += 1;
3936
3937        if let Some(ref packet) = packet {
3938            trace!(
3939                "got {:?} packet ({} bytes) from {} using id {}",
3940                packet.header.space(),
3941                packet.payload.len() + packet.header_data.len(),
3942                network_path,
3943                packet.header.dst_cid(),
3944            );
3945        }
3946
3947        if self.is_handshaking() {
3948            if path_id != PathId::ZERO {
3949                debug!(%network_path, %path_id, "discarding multipath packet during handshake");
3950                return;
3951            }
3952            if network_path != self.path_data_mut(path_id).network_path {
3953                if let Some(hs) = self.state.as_handshake() {
3954                    if hs.allow_server_migration {
3955                        trace!(%network_path, prev = %self.path_data(path_id).network_path, "server migrated to new remote");
3956                        self.path_data_mut(path_id).network_path = network_path;
3957                        self.qlog.emit_tuple_assigned(path_id, network_path, now);
3958                    } else {
3959                        debug!("discarding packet with unexpected remote during handshake");
3960                        return;
3961                    }
3962                } else {
3963                    debug!("discarding packet with unexpected remote during handshake");
3964                    return;
3965                }
3966            }
3967        }
3968
3969        let was_closed = self.state.is_closed();
3970        let was_drained = self.state.is_drained();
3971
3972        let decrypted = match packet {
3973            None => Err(None),
3974            Some(mut packet) => self
3975                .decrypt_packet(now, path_id, &mut packet)
3976                .map(move |number| (packet, number)),
3977        };
3978        let result = match decrypted {
3979            _ if stateless_reset => {
3980                debug!("got stateless reset");
3981                Err(ConnectionError::Reset)
3982            }
3983            Err(Some(e)) => {
3984                warn!("illegal packet: {}", e);
3985                Err(e.into())
3986            }
3987            Err(None) => {
3988                debug!("failed to authenticate packet");
3989                self.authentication_failures += 1;
3990                let integrity_limit = self
3991                    .crypto_state
3992                    .integrity_limit(self.highest_space)
3993                    .unwrap();
3994                if self.authentication_failures > integrity_limit {
3995                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3996                } else {
3997                    return;
3998                }
3999            }
4000            Ok((packet, number)) => {
4001                qlog.header(&packet.header, number, path_id);
4002                let span = match number {
4003                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
4004                    None => trace_span!("recv", space = ?packet.header.space()),
4005                };
4006                let _guard = span.enter();
4007
4008                let dedup = self.spaces[packet.header.space()]
4009                    .path_space_mut(path_id)
4010                    .map(|pns| &mut pns.dedup);
4011                if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
4012                    debug!("discarding possible duplicate packet");
4013                    self.qlog.emit_packet_received(qlog, now);
4014                    return;
4015                } else if self.state.is_handshake() && packet.header.is_short() {
4016                    // TODO: SHOULD buffer these to improve reordering tolerance.
4017                    trace!("dropping short packet during handshake");
4018                    self.qlog.emit_packet_received(qlog, now);
4019                    return;
4020                } else {
4021                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header
4022                        && let Some(hs) = self.state.as_handshake()
4023                        && self.side.is_server()
4024                        && token != &hs.expected_token
4025                    {
4026                        // Clients must send the same retry token in every Initial. Initial
4027                        // packets can be spoofed, so we discard rather than killing the
4028                        // connection.
4029                        warn!("discarding Initial with invalid retry token");
4030                        self.qlog.emit_packet_received(qlog, now);
4031                        return;
4032                    }
4033
4034                    if !self.state.is_closed() {
4035                        let spin = match packet.header {
4036                            Header::Short { spin, .. } => spin,
4037                            _ => false,
4038                        };
4039
4040                        if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
4041                            // Only the client is allowed to open paths
4042                            self.ensure_path(path_id, network_path, now, number);
4043                        }
4044                        if self.paths.contains_key(&path_id) {
4045                            self.on_packet_authenticated(
4046                                now,
4047                                packet.header.space(),
4048                                path_id,
4049                                ecn,
4050                                number,
4051                                spin,
4052                                packet.header.is_1rtt(),
4053                            );
4054                        }
4055                    }
4056
4057                    let res = self.process_decrypted_packet(
4058                        now,
4059                        network_path,
4060                        path_id,
4061                        number,
4062                        packet,
4063                        &mut qlog,
4064                    );
4065
4066                    self.qlog.emit_packet_received(qlog, now);
4067                    res
4068                }
4069            }
4070        };
4071
4072        // State transitions for error cases
4073        if let Err(conn_err) = result {
4074            match conn_err {
4075                ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
4076                ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
4077                ConnectionError::Reset
4078                | ConnectionError::TransportError(TransportError {
4079                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
4080                    ..
4081                }) => {
4082                    self.state.move_to_drained(Some(conn_err));
4083                }
4084                ConnectionError::TimedOut => {
4085                    unreachable!("timeouts aren't generated by packet processing");
4086                }
4087                ConnectionError::TransportError(err) => {
4088                    debug!("closing connection due to transport error: {}", err);
4089                    self.state.move_to_closed(err);
4090                }
4091                ConnectionError::VersionMismatch => {
4092                    self.state.move_to_draining(Some(conn_err));
4093                }
4094                ConnectionError::LocallyClosed => {
4095                    unreachable!("LocallyClosed isn't generated by packet processing");
4096                }
4097                ConnectionError::CidsExhausted => {
4098                    unreachable!("CidsExhausted isn't generated by packet processing");
4099                }
4100            };
4101        }
4102
4103        if !was_closed && self.state.is_closed() {
4104            self.close_common();
4105            if !self.state.is_drained() {
4106                self.set_close_timer(now);
4107            }
4108        }
4109        if !was_drained && self.state.is_drained() {
4110            self.endpoint_events.push_back(EndpointEventInner::Drained);
4111            // Close timer may have been started previously, e.g. if we sent a close and got a
4112            // stateless reset in response
4113            self.timers
4114                .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
4115        }
4116
4117        // Transmit CONNECTION_CLOSE if necessary.
4118        //
4119        // If we received a valid packet and we are in the closed state we should respond
4120        // with a CONNECTION_CLOSE frame.
4121        // TODO: This SHOULD be rate-limited according to §10.2.1 of QUIC-TRANSPORT, but
4122        //    that does not yet happen. This is triggered by each received packet.
4123        if matches!(self.state.as_type(), StateType::Closed) {
4124            // From https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2.1-7
4125            //
4126            // While in the closing state we must either:
4127            // - discard packets coming from an un-validated remote OR
4128            // - ensure we do not send more than 3 times the received data
4129            //
4130            // Doing the 2nd would mean we would be able to send CONNECTION_CLOSE to a peer
4131            // who was (involuntary) migrated just at the time we initiated immediate
4132            // close. It is a lot more work though. So while we would like to do this for
4133            // now we only do 1.
4134            //
4135            // Another shortcoming of the current implementation is that when we have a
4136            // previous PathData which is validated and the remote matches that path, we
4137            // should schedule CONNECTION_CLOSE on that path. However currently we can not
4138            // schedule such a packet. We should also fix this some day. This makes us
4139            // vulnerable to an attacker faking a migration at the right time and then we'd
4140            // be unable to send the CONNECTION_CLOSE to the real remote.
4141            if self
4142                .paths
4143                .get(&path_id)
4144                .map(|p| p.data.validated && p.data.network_path == network_path)
4145                .unwrap_or(false)
4146            {
4147                self.connection_close_pending = true;
4148            }
4149        }
4150    }
4151
4152    fn process_decrypted_packet(
4153        &mut self,
4154        now: Instant,
4155        network_path: FourTuple,
4156        path_id: PathId,
4157        number: Option<u64>,
4158        packet: Packet,
4159        qlog: &mut QlogRecvPacket,
4160    ) -> Result<(), ConnectionError> {
4161        if !self.paths.contains_key(&path_id) {
4162            // There is a chance this is a server side, first (for this path) packet, which would
4163            // be a protocol violation. It's more likely, however, that this is a packet of a
4164            // pruned path
4165            trace!(%path_id, ?number, "discarding packet for unknown path");
4166            return Ok(());
4167        }
4168        let state = match self.state.as_type() {
4169            StateType::Established => {
4170                match packet.header.space() {
4171                    SpaceKind::Data => self.process_payload(
4172                        now,
4173                        network_path,
4174                        path_id,
4175                        number.unwrap(),
4176                        packet,
4177                        qlog,
4178                    )?,
4179                    _ if packet.header.has_frames() => {
4180                        self.process_early_payload(now, path_id, packet, qlog)?
4181                    }
4182                    _ => {
4183                        trace!("discarding unexpected pre-handshake packet");
4184                    }
4185                }
4186                return Ok(());
4187            }
4188            StateType::Closed => {
4189                for result in frame::Iter::new(packet.payload.freeze())? {
4190                    let frame = match result {
4191                        Ok(frame) => frame,
4192                        Err(err) => {
4193                            debug!("frame decoding error: {err:?}");
4194                            continue;
4195                        }
4196                    };
4197                    qlog.frame(&frame);
4198
4199                    if let Frame::Padding = frame {
4200                        continue;
4201                    };
4202
4203                    self.stats.frame_rx.record(frame.ty());
4204
4205                    if let Frame::Close(_error) = frame {
4206                        self.state.move_to_draining(None);
4207                        break;
4208                    }
4209                }
4210                return Ok(());
4211            }
4212            StateType::Draining | StateType::Drained => return Ok(()),
4213            StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
4214        };
4215
4216        match packet.header {
4217            Header::Retry {
4218                src_cid: remote_cid,
4219                ..
4220            } => {
4221                debug_assert_eq!(path_id, PathId::ZERO);
4222                if self.side.is_server() {
4223                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
4224                }
4225
4226                let is_valid_retry = self
4227                    .remote_cids
4228                    .get(&path_id)
4229                    .map(|cids| cids.active())
4230                    .map(|orig_dst_cid| {
4231                        self.crypto_state.session.is_valid_retry(
4232                            orig_dst_cid,
4233                            &packet.header_data,
4234                            &packet.payload,
4235                        )
4236                    })
4237                    .unwrap_or_default();
4238                if self.total_authed_packets > 1
4239                            || packet.payload.len() <= 16 // token + 16 byte tag
4240                            || !is_valid_retry
4241                {
4242                    trace!("discarding invalid Retry");
4243                    // - After the client has received and processed an Initial or Retry
4244                    //   packet from the server, it MUST discard any subsequent Retry
4245                    //   packets that it receives.
4246                    // - A client MUST discard a Retry packet with a zero-length Retry Token
4247                    //   field.
4248                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
4249                    //   that cannot be validated
4250                    return Ok(());
4251                }
4252
4253                trace!("retrying with CID {}", remote_cid);
4254                let client_hello = state.client_hello.take().unwrap();
4255                self.retry_src_cid = Some(remote_cid);
4256                self.remote_cids
4257                    .get_mut(&path_id)
4258                    .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
4259                    .update_initial_cid(remote_cid);
4260                self.remote_handshake_cid = remote_cid;
4261
4262                let space = &mut self.spaces[SpaceId::Initial];
4263                if let Some(info) = space.for_path(PathId::ZERO).take(0) {
4264                    self.on_packet_acked(now, PathId::ZERO, info);
4265                };
4266
4267                self.discard_space(now, SpaceKind::Initial); // Make sure we clean up after
4268                // any retransmitted Initials
4269                let crypto_space = &mut self.crypto_state.spaces[SpaceKind::Initial];
4270                crypto_space.keys = Some(
4271                    self.crypto_state
4272                        .session
4273                        .initial_keys(remote_cid, self.side.side()),
4274                );
4275                crypto_space.crypto_offset = client_hello.len() as u64;
4276
4277                let next_pn = self.spaces[SpaceId::Initial]
4278                    .for_path(path_id)
4279                    .next_packet_number;
4280                self.spaces[SpaceId::Initial] = {
4281                    let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
4282                    space.for_path(path_id).next_packet_number = next_pn;
4283                    space.pending.crypto.push_back(frame::Crypto {
4284                        offset: 0,
4285                        data: client_hello,
4286                    });
4287                    space
4288                };
4289
4290                // Retransmit all 0-RTT data
4291                let zero_rtt = mem::take(
4292                    &mut self.spaces[SpaceId::Data]
4293                        .for_path(PathId::ZERO)
4294                        .sent_packets,
4295                );
4296                for (_, info) in zero_rtt.into_iter() {
4297                    self.paths
4298                        .get_mut(&PathId::ZERO)
4299                        .unwrap()
4300                        .remove_in_flight(&info);
4301                    self.spaces[SpaceId::Data].pending |= info.retransmits;
4302                }
4303                self.streams.retransmit_all_for_0rtt();
4304
4305                let token_len = packet.payload.len() - 16;
4306                let ConnectionSide::Client { ref mut token, .. } = self.side else {
4307                    unreachable!("we already short-circuited if we're server");
4308                };
4309                *token = packet.payload.freeze().split_to(token_len);
4310
4311                self.state = State::handshake(state::Handshake {
4312                    expected_token: Bytes::new(),
4313                    remote_cid_set: false,
4314                    client_hello: None,
4315                    allow_server_migration: true,
4316                });
4317                Ok(())
4318            }
4319            Header::Long {
4320                ty: LongType::Handshake,
4321                src_cid: remote_cid,
4322                dst_cid: local_cid,
4323                ..
4324            } => {
4325                debug_assert_eq!(path_id, PathId::ZERO);
4326                if remote_cid != self.remote_handshake_cid {
4327                    debug!(
4328                        "discarding packet with mismatched remote CID: {} != {}",
4329                        self.remote_handshake_cid, remote_cid
4330                    );
4331                    return Ok(());
4332                }
4333                self.on_path_validated(path_id);
4334
4335                self.process_early_payload(now, path_id, packet, qlog)?;
4336                if self.state.is_closed() {
4337                    return Ok(());
4338                }
4339
4340                if self.crypto_state.session.is_handshaking() {
4341                    trace!("handshake ongoing");
4342                    return Ok(());
4343                }
4344
4345                if self.side.is_client() {
4346                    // Client-only because server params were set from the client's Initial
4347                    let params = self
4348                        .crypto_state
4349                        .session
4350                        .transport_parameters()?
4351                        .ok_or_else(|| {
4352                            TransportError::new(
4353                                TransportErrorCode::crypto(0x6d),
4354                                "transport parameters missing".to_owned(),
4355                            )
4356                        })?;
4357
4358                    if self.has_0rtt() {
4359                        if !self.crypto_state.session.early_data_accepted().unwrap() {
4360                            debug_assert!(self.side.is_client());
4361                            debug!("0-RTT rejected");
4362                            self.crypto_state.accepted_0rtt = false;
4363                            self.streams.zero_rtt_rejected();
4364
4365                            // Discard already-queued frames
4366                            self.spaces[SpaceId::Data].pending = Retransmits::default();
4367
4368                            // Discard 0-RTT packets
4369                            let sent_packets = mem::take(
4370                                &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4371                            );
4372                            for (_, packet) in sent_packets.into_iter() {
4373                                self.paths
4374                                    .get_mut(&path_id)
4375                                    .unwrap()
4376                                    .remove_in_flight(&packet);
4377                            }
4378                        } else {
4379                            self.crypto_state.accepted_0rtt = true;
4380                            params.validate_resumption_from(&self.peer_params)?;
4381                        }
4382                    }
4383                    if let Some(token) = params.stateless_reset_token {
4384                        let remote = self.path_data(path_id).network_path.remote;
4385                        debug_assert!(!self.state.is_drained()); // requirement for endpoint events, checked above
4386                        self.endpoint_events
4387                            .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4388                    }
4389                    self.handle_peer_params(params, local_cid, remote_cid, now)?;
4390                    self.issue_first_cids(now);
4391                } else {
4392                    // Server-only
4393                    self.spaces[SpaceId::Data].pending.handshake_done = true;
4394                    self.discard_space(now, SpaceKind::Handshake);
4395                    self.events.push_back(Event::HandshakeConfirmed);
4396                    trace!("handshake confirmed");
4397                }
4398
4399                self.events.push_back(Event::Connected);
4400                self.state.move_to_established();
4401                trace!("established");
4402
4403                // Multipath can only be enabled after the state has reached Established.
4404                // So this can not happen any earlier.
4405                self.issue_first_path_cids(now);
4406                Ok(())
4407            }
4408            Header::Initial(InitialHeader {
4409                src_cid: remote_cid,
4410                dst_cid: local_cid,
4411                ..
4412            }) => {
4413                debug_assert_eq!(path_id, PathId::ZERO);
4414                if !state.remote_cid_set {
4415                    trace!("switching remote CID to {}", remote_cid);
4416                    let mut state = state.clone();
4417                    self.remote_cids
4418                        .get_mut(&path_id)
4419                        .expect("PathId::ZERO not yet abandoned")
4420                        .update_initial_cid(remote_cid);
4421                    self.remote_handshake_cid = remote_cid;
4422                    self.original_remote_cid = remote_cid;
4423                    state.remote_cid_set = true;
4424                    self.state.move_to_handshake(state);
4425                } else if remote_cid != self.remote_handshake_cid {
4426                    debug!(
4427                        "discarding packet with mismatched remote CID: {} != {}",
4428                        self.remote_handshake_cid, remote_cid
4429                    );
4430                    return Ok(());
4431                }
4432
4433                let starting_space = self.highest_space;
4434                self.process_early_payload(now, path_id, packet, qlog)?;
4435
4436                if self.side.is_server()
4437                    && starting_space == SpaceKind::Initial
4438                    && self.highest_space != SpaceKind::Initial
4439                {
4440                    let params = self
4441                        .crypto_state
4442                        .session
4443                        .transport_parameters()?
4444                        .ok_or_else(|| {
4445                            TransportError::new(
4446                                TransportErrorCode::crypto(0x6d),
4447                                "transport parameters missing".to_owned(),
4448                            )
4449                        })?;
4450                    self.handle_peer_params(params, local_cid, remote_cid, now)?;
4451                    self.issue_first_cids(now);
4452                    self.init_0rtt(now);
4453                }
4454                Ok(())
4455            }
4456            Header::Long {
4457                ty: LongType::ZeroRtt,
4458                ..
4459            } => {
4460                self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4461                Ok(())
4462            }
4463            Header::VersionNegotiate { .. } => {
4464                if self.total_authed_packets > 1 {
4465                    return Ok(());
4466                }
4467                let supported = packet
4468                    .payload
4469                    .chunks(4)
4470                    .any(|x| match <[u8; 4]>::try_from(x) {
4471                        Ok(version) => self.version == u32::from_be_bytes(version),
4472                        Err(_) => false,
4473                    });
4474                if supported {
4475                    return Ok(());
4476                }
4477                debug!("remote doesn't support our version");
4478                Err(ConnectionError::VersionMismatch)
4479            }
4480            Header::Short { .. } => unreachable!(
4481                "short packets received during handshake are discarded in handle_packet"
4482            ),
4483        }
4484    }
4485
4486    /// Process an Initial or Handshake packet payload
4487    fn process_early_payload(
4488        &mut self,
4489        now: Instant,
4490        path_id: PathId,
4491        packet: Packet,
4492        #[allow(unused)] qlog: &mut QlogRecvPacket,
4493    ) -> Result<(), TransportError> {
4494        debug_assert_ne!(packet.header.space(), SpaceKind::Data);
4495        debug_assert_eq!(path_id, PathId::ZERO);
4496        let payload_len = packet.payload.len();
4497        let mut ack_eliciting = false;
4498        for result in frame::Iter::new(packet.payload.freeze())? {
4499            let frame = result?;
4500            qlog.frame(&frame);
4501            let span = match frame {
4502                Frame::Padding => continue,
4503                _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4504            };
4505
4506            self.stats.frame_rx.record(frame.ty());
4507
4508            let _guard = span.as_ref().map(|x| x.enter());
4509            ack_eliciting |= frame.is_ack_eliciting();
4510
4511            // Process frames
4512            if frame.is_1rtt() && packet.header.space() != SpaceKind::Data {
4513                return Err(TransportError::PROTOCOL_VIOLATION(
4514                    "illegal frame type in handshake",
4515                ));
4516            }
4517
4518            match frame {
4519                Frame::Padding | Frame::Ping => {}
4520                Frame::Crypto(frame) => {
4521                    self.read_crypto(packet.header.space().into(), &frame, payload_len)?;
4522                }
4523                Frame::Ack(ack) => {
4524                    self.on_ack_received(now, packet.header.space().into(), ack)?;
4525                }
4526                Frame::PathAck(ack) => {
4527                    span.as_ref()
4528                        .map(|span| span.record("path", tracing::field::display(&ack.path_id)));
4529                    self.on_path_ack_received(now, packet.header.space().into(), ack)?;
4530                }
4531                Frame::Close(reason) => {
4532                    self.state.move_to_draining(Some(reason.into()));
4533                    return Ok(());
4534                }
4535                _ => {
4536                    let mut err =
4537                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4538                    err.frame = frame::MaybeFrame::Known(frame.ty());
4539                    return Err(err);
4540                }
4541            }
4542        }
4543
4544        if ack_eliciting {
4545            // In the initial and handshake spaces, ACKs must be sent immediately
4546            self.spaces[packet.header.space()]
4547                .for_path(path_id)
4548                .pending_acks
4549                .set_immediate_ack_required();
4550        }
4551
4552        self.write_crypto();
4553        Ok(())
4554    }
4555
4556    /// Processes the packet payload, always in the data space.
4557    fn process_payload(
4558        &mut self,
4559        now: Instant,
4560        network_path: FourTuple,
4561        path_id: PathId,
4562        number: u64,
4563        packet: Packet,
4564        #[allow(unused)] qlog: &mut QlogRecvPacket,
4565    ) -> Result<(), TransportError> {
4566        let is_multipath_negotiated = self.is_multipath_negotiated();
4567        let payload = packet.payload.freeze();
4568        let mut is_probing_packet = true;
4569        let mut close = None;
4570        let payload_len = payload.len();
4571        let mut ack_eliciting = false;
4572        // if this packet triggers a path migration and includes a observed address frame, it's
4573        // stored here
4574        let mut migration_observed_addr = None;
4575        for result in frame::Iter::new(payload)? {
4576            let frame = result?;
4577            qlog.frame(&frame);
4578            let span = match frame {
4579                Frame::Padding => continue,
4580                _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4581            };
4582
4583            self.stats.frame_rx.record(frame.ty());
4584            // Crypto, Stream and Datagram frames are special cased in order no pollute
4585            // the log with payload data
4586            match &frame {
4587                Frame::Crypto(f) => {
4588                    trace!(offset = f.offset, len = f.data.len(), "got frame CRYPTO");
4589                }
4590                Frame::Stream(f) => {
4591                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got frame STREAM");
4592                }
4593                Frame::Datagram(f) => {
4594                    trace!(len = f.data.len(), "got frame DATAGRAM");
4595                }
4596                f => {
4597                    trace!("got frame {f}");
4598                }
4599            }
4600
4601            let _guard = span.enter();
4602            if packet.header.is_0rtt() {
4603                match frame {
4604                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4605                        return Err(TransportError::PROTOCOL_VIOLATION(
4606                            "illegal frame type in 0-RTT",
4607                        ));
4608                    }
4609                    _ => {
4610                        if frame.is_1rtt() {
4611                            return Err(TransportError::PROTOCOL_VIOLATION(
4612                                "illegal frame type in 0-RTT",
4613                            ));
4614                        }
4615                    }
4616                }
4617            }
4618            ack_eliciting |= frame.is_ack_eliciting();
4619
4620            // Check whether this could be a probing packet
4621            match frame {
4622                Frame::Padding
4623                | Frame::PathChallenge(_)
4624                | Frame::PathResponse(_)
4625                | Frame::NewConnectionId(_)
4626                | Frame::ObservedAddr(_) => {}
4627                _ => {
4628                    is_probing_packet = false;
4629                }
4630            }
4631
4632            match frame {
4633                Frame::Crypto(frame) => {
4634                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4635                }
4636                Frame::Stream(frame) => {
4637                    if self.streams.received(frame, payload_len)?.should_transmit() {
4638                        self.spaces[SpaceId::Data].pending.max_data = true;
4639                    }
4640                }
4641                Frame::Ack(ack) => {
4642                    self.on_ack_received(now, SpaceId::Data, ack)?;
4643                }
4644                Frame::PathAck(ack) => {
4645                    span.record("path", tracing::field::display(&ack.path_id));
4646                    self.on_path_ack_received(now, SpaceId::Data, ack)?;
4647                }
4648                Frame::Padding | Frame::Ping => {}
4649                Frame::Close(reason) => {
4650                    close = Some(reason);
4651                }
4652                Frame::PathChallenge(challenge) => {
4653                    let path = &mut self
4654                        .path_mut(path_id)
4655                        .expect("payload is processed only after the path becomes known");
4656                    path.path_responses.push(number, challenge.0, network_path);
4657                    // At this point, update_network_path_or_discard was already called, so
4658                    // we don't need to be lenient about `local_ip` possibly mis-matching.
4659                    if network_path == path.network_path {
4660                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
4661                        // attack. Send a non-probing packet to recover the active path.
4662                        // TODO(flub): No longer true! We now path_challege also to validate
4663                        //    the path if the path is new, without an RFC9000-style
4664                        //    migration involved. This means we add in an extra
4665                        //    IMMEDIATE_ACK on some challenges. It isn't really wrong to do
4666                        //    so, but it still is something untidy. We should instead
4667                        //    suppress this when we know the remote is still validating the
4668                        //    path.
4669                        match self.peer_supports_ack_frequency() {
4670                            true => self.immediate_ack(path_id),
4671                            false => {
4672                                self.ping_path(path_id).ok();
4673                            }
4674                        }
4675                    }
4676                }
4677                Frame::PathResponse(response) => {
4678                    let path = self
4679                        .paths
4680                        .get_mut(&path_id)
4681                        .expect("payload is processed only after the path becomes known");
4682
4683                    use PathTimer::*;
4684                    use paths::OnPathResponseReceived::*;
4685                    match path
4686                        .data
4687                        .on_path_response_received(now, response.0, network_path)
4688                    {
4689                        OnPath { was_open } => {
4690                            let qlog = self.qlog.with_time(now);
4691
4692                            self.timers
4693                                .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4694                            self.timers
4695                                .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4696
4697                            let next_challenge = path
4698                                .data
4699                                .earliest_on_path_expiring_challenge()
4700                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4701                            self.timers.set_or_stop(
4702                                Timer::PerPath(path_id, PathChallengeLost),
4703                                next_challenge,
4704                                qlog,
4705                            );
4706
4707                            if !was_open {
4708                                if is_multipath_negotiated {
4709                                    self.events
4710                                        .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4711                                }
4712                                if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4713                                {
4714                                    self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4715                                        id: path_id,
4716                                        addr: observed.socket_addr(),
4717                                    }));
4718                                }
4719                            }
4720                            if let Some((_, ref mut prev)) = path.prev {
4721                                prev.reset_on_path_challenges();
4722                            }
4723                        }
4724                        OffPath => {
4725                            debug!(%response, "Valid response to off-path PATH_CHALLENGE");
4726                        }
4727                        Ignored {
4728                            sent_on,
4729                            current_path,
4730                        } => {
4731                            debug!(%sent_on, %current_path, %response, "ignoring valid PATH_RESPONSE")
4732                        }
4733                        Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4734                    }
4735                }
4736                Frame::MaxData(frame::MaxData(bytes)) => {
4737                    self.streams.received_max_data(bytes);
4738                }
4739                Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
4740                    self.streams.received_max_stream_data(id, offset)?;
4741                }
4742                Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
4743                    self.streams.received_max_streams(dir, count)?;
4744                }
4745                Frame::ResetStream(frame) => {
4746                    if self.streams.received_reset(frame)?.should_transmit() {
4747                        self.spaces[SpaceId::Data].pending.max_data = true;
4748                    }
4749                }
4750                Frame::DataBlocked(DataBlocked(offset)) => {
4751                    debug!(offset, "peer claims to be blocked at connection level");
4752                }
4753                Frame::StreamDataBlocked(StreamDataBlocked { id, offset }) => {
4754                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4755                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4756                        return Err(TransportError::STREAM_STATE_ERROR(
4757                            "STREAM_DATA_BLOCKED on send-only stream",
4758                        ));
4759                    }
4760                    debug!(
4761                        stream = %id,
4762                        offset, "peer claims to be blocked at stream level"
4763                    );
4764                }
4765                Frame::StreamsBlocked(StreamsBlocked { dir, limit }) => {
4766                    if limit > MAX_STREAM_COUNT {
4767                        return Err(TransportError::FRAME_ENCODING_ERROR(
4768                            "unrepresentable stream limit",
4769                        ));
4770                    }
4771                    debug!(
4772                        "peer claims to be blocked opening more than {} {} streams",
4773                        limit, dir
4774                    );
4775                }
4776                Frame::StopSending(frame::StopSending { id, error_code }) => {
4777                    if id.initiator() != self.side.side() {
4778                        if id.dir() == Dir::Uni {
4779                            debug!("got STOP_SENDING on recv-only {}", id);
4780                            return Err(TransportError::STREAM_STATE_ERROR(
4781                                "STOP_SENDING on recv-only stream",
4782                            ));
4783                        }
4784                    } else if self.streams.is_local_unopened(id) {
4785                        return Err(TransportError::STREAM_STATE_ERROR(
4786                            "STOP_SENDING on unopened stream",
4787                        ));
4788                    }
4789                    self.streams.received_stop_sending(id, error_code);
4790                }
4791                Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4792                    if let Some(ref path_id) = path_id {
4793                        span.record("path", tracing::field::display(&path_id));
4794                    }
4795                    let path_id = path_id.unwrap_or_default();
4796                    match self.local_cid_state.get_mut(&path_id) {
4797                        None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4798                        Some(cid_state) => {
4799                            let allow_more_cids = cid_state
4800                                .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4801
4802                            // If the path has closed, we do not issue more CIDs for this path
4803                            // For details see  https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.2.2
4804                            // > an endpoint SHOULD provide new connection IDs for that path, if still open, using PATH_NEW_CONNECTION_ID frames.
4805                            let has_path = !self.abandoned_paths.contains(&path_id);
4806                            let allow_more_cids = allow_more_cids && has_path;
4807
4808                            debug_assert!(!self.state.is_drained()); // required for adding endpoint events, process_payload is never called for drained connections
4809                            self.endpoint_events
4810                                .push_back(EndpointEventInner::RetireConnectionId(
4811                                    now,
4812                                    path_id,
4813                                    sequence,
4814                                    allow_more_cids,
4815                                ));
4816                        }
4817                    }
4818                }
4819                Frame::NewConnectionId(frame) => {
4820                    let path_id = if let Some(path_id) = frame.path_id {
4821                        if !self.is_multipath_negotiated() {
4822                            return Err(TransportError::PROTOCOL_VIOLATION(
4823                                "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4824                            ));
4825                        }
4826                        if path_id > self.local_max_path_id {
4827                            return Err(TransportError::PROTOCOL_VIOLATION(
4828                                "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4829                            ));
4830                        }
4831                        path_id
4832                    } else {
4833                        PathId::ZERO
4834                    };
4835
4836                    if let Some(ref path_id) = frame.path_id {
4837                        span.record("path", tracing::field::display(&path_id));
4838                    }
4839
4840                    if self.abandoned_paths.contains(&path_id) {
4841                        trace!("ignoring issued CID for abandoned path");
4842                        continue;
4843                    }
4844                    let remote_cids = self
4845                        .remote_cids
4846                        .entry(path_id)
4847                        .or_insert_with(|| CidQueue::new(frame.id));
4848                    if remote_cids.active().is_empty() {
4849                        return Err(TransportError::PROTOCOL_VIOLATION(
4850                            "NEW_CONNECTION_ID when CIDs aren't in use",
4851                        ));
4852                    }
4853                    if frame.retire_prior_to > frame.sequence {
4854                        return Err(TransportError::PROTOCOL_VIOLATION(
4855                            "NEW_CONNECTION_ID retiring unissued CIDs",
4856                        ));
4857                    }
4858
4859                    use crate::cid_queue::InsertError;
4860                    match remote_cids.insert(frame) {
4861                        Ok(None) if self.path(path_id).is_none() => {
4862                            // if this gives us CIDs to open a new path and a nat traversal attempt
4863                            // is underway we could try to probe a pending remote
4864                            self.continue_nat_traversal_round(now);
4865                        }
4866                        Ok(None) => {}
4867                        Ok(Some((retired, reset_token))) => {
4868                            let pending_retired =
4869                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
4870                            /// Ensure `pending_retired` cannot grow without bound. Limit is
4871                            /// somewhat arbitrary but very permissive.
4872                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4873                            // We don't bother counting in-flight frames because those are bounded
4874                            // by congestion control.
4875                            if (pending_retired.len() as u64)
4876                                .saturating_add(retired.end.saturating_sub(retired.start))
4877                                > MAX_PENDING_RETIRED_CIDS
4878                            {
4879                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4880                                    "queued too many retired CIDs",
4881                                ));
4882                            }
4883                            pending_retired.extend(retired.map(|seq| (path_id, seq)));
4884                            self.set_reset_token(path_id, network_path.remote, reset_token);
4885                        }
4886                        Err(InsertError::ExceedsLimit) => {
4887                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4888                        }
4889                        Err(InsertError::Retired) => {
4890                            trace!("discarding already-retired");
4891                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
4892                            // range of connection IDs larger than the active connection ID limit
4893                            // was retired all at once via retire_prior_to.
4894                            self.spaces[SpaceId::Data]
4895                                .pending
4896                                .retire_cids
4897                                .push((path_id, frame.sequence));
4898                            continue;
4899                        }
4900                    };
4901
4902                    if self.side.is_server()
4903                        && path_id == PathId::ZERO
4904                        && self
4905                            .remote_cids
4906                            .get(&PathId::ZERO)
4907                            .map(|cids| cids.active_seq() == 0)
4908                            .unwrap_or_default()
4909                    {
4910                        // We're a server still using the initial remote CID for the client, so
4911                        // let's switch immediately to enable clientside stateless resets.
4912                        self.update_remote_cid(PathId::ZERO);
4913                    }
4914                }
4915                Frame::NewToken(NewToken { token }) => {
4916                    let ConnectionSide::Client {
4917                        token_store,
4918                        server_name,
4919                        ..
4920                    } = &self.side
4921                    else {
4922                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4923                    };
4924                    if token.is_empty() {
4925                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4926                    }
4927                    trace!("got new token");
4928                    token_store.insert(server_name, token);
4929                }
4930                Frame::Datagram(datagram) => {
4931                    if self
4932                        .datagrams
4933                        .received(datagram, &self.config.datagram_receive_buffer_size)?
4934                    {
4935                        self.events.push_back(Event::DatagramReceived);
4936                    }
4937                }
4938                Frame::AckFrequency(ack_frequency) => {
4939                    // This frame can only be sent in the Data space
4940
4941                    if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4942                        // The AckFrequency frame is stale (we have already received a more
4943                        // recent one)
4944                        continue;
4945                    }
4946
4947                    // Update the params for all of our paths
4948                    for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4949                        space.pending_acks.set_ack_frequency_params(&ack_frequency);
4950
4951                        // Our `max_ack_delay` has been updated, so we may need to adjust
4952                        // its associated timeout
4953                        if let Some(timeout) = space
4954                            .pending_acks
4955                            .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4956                        {
4957                            self.timers.set(
4958                                Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4959                                timeout,
4960                                self.qlog.with_time(now),
4961                            );
4962                        }
4963                    }
4964                }
4965                Frame::ImmediateAck => {
4966                    // This frame can only be sent in the Data space
4967                    for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4968                        pns.pending_acks.set_immediate_ack_required();
4969                    }
4970                }
4971                Frame::HandshakeDone => {
4972                    if self.side.is_server() {
4973                        return Err(TransportError::PROTOCOL_VIOLATION(
4974                            "client sent HANDSHAKE_DONE",
4975                        ));
4976                    }
4977                    if self.crypto_state.has_keys(EncryptionLevel::Handshake) {
4978                        self.discard_space(now, SpaceKind::Handshake);
4979                    }
4980                    self.events.push_back(Event::HandshakeConfirmed);
4981                    trace!("handshake confirmed");
4982                }
4983                Frame::ObservedAddr(observed) => {
4984                    // check if params allows the peer to send report and this node to receive it
4985                    trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4986                    if !self
4987                        .peer_params
4988                        .address_discovery_role
4989                        .should_report(&self.config.address_discovery_role)
4990                    {
4991                        return Err(TransportError::PROTOCOL_VIOLATION(
4992                            "received OBSERVED_ADDRESS frame when not negotiated",
4993                        ));
4994                    }
4995                    // must only be sent in data space
4996                    if packet.header.space() != SpaceKind::Data {
4997                        return Err(TransportError::PROTOCOL_VIOLATION(
4998                            "OBSERVED_ADDRESS frame outside data space",
4999                        ));
5000                    }
5001
5002                    let path = self.path_data_mut(path_id);
5003                    if network_path == path.network_path {
5004                        if let Some(updated) = path.update_observed_addr_report(observed)
5005                            && path.open_status == paths::OpenStatus::Informed
5006                        {
5007                            self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5008                                id: path_id,
5009                                addr: updated,
5010                            }));
5011                            // otherwise the event is reported when the path is deemed open
5012                        }
5013                    } else {
5014                        // include in migration
5015                        migration_observed_addr = Some(observed)
5016                    }
5017                }
5018                Frame::PathAbandon(frame::PathAbandon {
5019                    path_id,
5020                    error_code,
5021                }) => {
5022                    span.record("path", tracing::field::display(&path_id));
5023                    match self.close_path_inner(
5024                        now,
5025                        path_id,
5026                        PathAbandonReason::RemoteAbandoned {
5027                            error_code: error_code.into(),
5028                        },
5029                    ) {
5030                        Ok(()) => {
5031                            trace!("peer abandoned path");
5032                        }
5033                        Err(ClosePathError::LastOpenPath) => {
5034                            trace!("peer abandoned last path, closing connection");
5035                            return Err(TransportError::NO_VIABLE_PATH(
5036                                "last path abandoned by peer",
5037                            ));
5038                        }
5039                        Err(ClosePathError::ClosedPath) => {
5040                            trace!("peer abandoned already closed path");
5041                        }
5042                        Err(ClosePathError::MultipathNotNegotiated) => {
5043                            return Err(TransportError::PROTOCOL_VIOLATION(
5044                                "received PATH_ABANDON frame when multipath was not negotiated",
5045                            ));
5046                        }
5047                    };
5048
5049                    // Start draining the path if it still exists and hasn't started draining yet.
5050                    if let Some(path) = self.paths.get_mut(&path_id)
5051                        && !mem::replace(&mut path.data.draining, true)
5052                    {
5053                        let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5054                        let pto = path.data.rtt.pto_base() + ack_delay;
5055                        self.timers.set(
5056                            Timer::PerPath(path_id, PathTimer::DiscardPath),
5057                            now + 3 * pto,
5058                            self.qlog.with_time(now),
5059                        );
5060
5061                        self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
5062                    }
5063                }
5064                Frame::PathStatusAvailable(info) => {
5065                    span.record("path", tracing::field::display(&info.path_id));
5066                    if self.is_multipath_negotiated() {
5067                        self.on_path_status(
5068                            info.path_id,
5069                            PathStatus::Available,
5070                            info.status_seq_no,
5071                        );
5072                    } else {
5073                        return Err(TransportError::PROTOCOL_VIOLATION(
5074                            "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
5075                        ));
5076                    }
5077                }
5078                Frame::PathStatusBackup(info) => {
5079                    span.record("path", tracing::field::display(&info.path_id));
5080                    if self.is_multipath_negotiated() {
5081                        self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
5082                    } else {
5083                        return Err(TransportError::PROTOCOL_VIOLATION(
5084                            "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
5085                        ));
5086                    }
5087                }
5088                Frame::MaxPathId(frame::MaxPathId(path_id)) => {
5089                    span.record("path", tracing::field::display(&path_id));
5090                    if !self.is_multipath_negotiated() {
5091                        return Err(TransportError::PROTOCOL_VIOLATION(
5092                            "received MAX_PATH_ID frame when multipath was not negotiated",
5093                        ));
5094                    }
5095                    // frames that do not increase the path id are ignored
5096                    if path_id > self.remote_max_path_id {
5097                        self.remote_max_path_id = path_id;
5098                        self.issue_first_path_cids(now);
5099                        while let Some(true) = self.continue_nat_traversal_round(now) {}
5100                    }
5101                }
5102                Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
5103                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
5104                    // be treated as a connection error of type PROTOCOL_VIOLATION.
5105                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
5106                    if self.is_multipath_negotiated() {
5107                        if max_path_id > self.local_max_path_id {
5108                            return Err(TransportError::PROTOCOL_VIOLATION(
5109                                "PATHS_BLOCKED maximum path identifier was larger than local maximum",
5110                            ));
5111                        }
5112                        debug!("received PATHS_BLOCKED({:?})", max_path_id);
5113                        // TODO(@divma): ensure max concurrent paths
5114                    } else {
5115                        return Err(TransportError::PROTOCOL_VIOLATION(
5116                            "received PATHS_BLOCKED frame when not multipath was not negotiated",
5117                        ));
5118                    }
5119                }
5120                Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
5121                    // Nothing to do.  This is recorded in the frame stats, but otherwise we
5122                    // always issue all CIDs we're allowed to issue, so either this is an
5123                    // impatient peer or a bug on our side.
5124
5125                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
5126                    // be treated as a connection error of type PROTOCOL_VIOLATION.
5127                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
5128                    if self.is_multipath_negotiated() {
5129                        if path_id > self.local_max_path_id {
5130                            return Err(TransportError::PROTOCOL_VIOLATION(
5131                                "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
5132                            ));
5133                        }
5134                        if next_seq.0
5135                            > self
5136                                .local_cid_state
5137                                .get(&path_id)
5138                                .map(|cid_state| cid_state.active_seq().1 + 1)
5139                                .unwrap_or_default()
5140                        {
5141                            return Err(TransportError::PROTOCOL_VIOLATION(
5142                                "PATH_CIDS_BLOCKED next sequence number larger than in local state",
5143                            ));
5144                        }
5145                        debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
5146                    } else {
5147                        return Err(TransportError::PROTOCOL_VIOLATION(
5148                            "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
5149                        ));
5150                    }
5151                }
5152                Frame::AddAddress(addr) => {
5153                    let client_state = match self.n0_nat_traversal.client_side_mut() {
5154                        Ok(state) => state,
5155                        Err(err) => {
5156                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
5157                                "Nat traversal(ADD_ADDRESS): {err}"
5158                            )));
5159                        }
5160                    };
5161
5162                    if !client_state.check_remote_address(&addr) {
5163                        // if the address is not valid we flag it, but update anyway
5164                        warn!(?addr, "server sent illegal ADD_ADDRESS frame");
5165                    }
5166
5167                    match client_state.add_remote_address(addr) {
5168                        Ok(maybe_added) => {
5169                            if let Some(added) = maybe_added {
5170                                self.events.push_back(Event::NatTraversal(
5171                                    n0_nat_traversal::Event::AddressAdded(added),
5172                                ));
5173                            }
5174                        }
5175                        Err(e) => {
5176                            warn!(%e, "failed to add remote address")
5177                        }
5178                    }
5179                }
5180                Frame::RemoveAddress(addr) => {
5181                    let client_state = match self.n0_nat_traversal.client_side_mut() {
5182                        Ok(state) => state,
5183                        Err(err) => {
5184                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
5185                                "Nat traversal(REMOVE_ADDRESS): {err}"
5186                            )));
5187                        }
5188                    };
5189                    if let Some(removed_addr) = client_state.remove_remote_address(addr) {
5190                        self.events.push_back(Event::NatTraversal(
5191                            n0_nat_traversal::Event::AddressRemoved(removed_addr),
5192                        ));
5193                    }
5194                }
5195                Frame::ReachOut(reach_out) => {
5196                    let ipv6 = self.is_ipv6();
5197                    let server_state = match self.n0_nat_traversal.server_side_mut() {
5198                        Ok(state) => state,
5199                        Err(err) => {
5200                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
5201                                "Nat traversal(REACH_OUT): {err}"
5202                            )));
5203                        }
5204                    };
5205
5206                    if let Err(err) = server_state.handle_reach_out(reach_out, ipv6) {
5207                        return Err(TransportError::PROTOCOL_VIOLATION(format!(
5208                            "Nat traversal(REACH_OUT): {err}"
5209                        )));
5210                    }
5211                }
5212            }
5213        }
5214
5215        let space = self.spaces[SpaceId::Data].for_path(path_id);
5216        if space
5217            .pending_acks
5218            .packet_received(now, number, ack_eliciting, &space.dedup)
5219        {
5220            if self.abandoned_paths.contains(&path_id) {
5221                // § 3.4.3 QUIC-MULTIPATH: promptly send ACKs for packets received from
5222                // abandoned paths.
5223                space.pending_acks.set_immediate_ack_required();
5224            } else {
5225                self.timers.set(
5226                    Timer::PerPath(path_id, PathTimer::MaxAckDelay),
5227                    now + self.ack_frequency.max_ack_delay,
5228                    self.qlog.with_time(now),
5229                );
5230            }
5231        }
5232
5233        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
5234        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
5235        // are only freed, and hence only issue credit, once the application has been notified
5236        // during a read on the stream.
5237        let pending = &mut self.spaces[SpaceId::Data].pending;
5238        self.streams.queue_max_stream_id(pending);
5239
5240        if let Some(reason) = close {
5241            self.state.move_to_draining(Some(reason.into()));
5242            self.connection_close_pending = true;
5243        }
5244
5245        if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
5246            && !is_probing_packet
5247            && network_path != self.path_data(path_id).network_path
5248        {
5249            let ConnectionSide::Server { ref server_config } = self.side else {
5250                panic!("packets from unknown remote should be dropped by clients");
5251            };
5252            debug_assert!(
5253                server_config.migration,
5254                "migration-initiating packets should have been dropped immediately"
5255            );
5256            self.migrate(path_id, now, network_path, migration_observed_addr);
5257            // Break linkability, if possible
5258            self.update_remote_cid(path_id);
5259            self.spin = false;
5260        }
5261
5262        Ok(())
5263    }
5264
5265    fn migrate(
5266        &mut self,
5267        path_id: PathId,
5268        now: Instant,
5269        network_path: FourTuple,
5270        observed_addr: Option<ObservedAddr>,
5271    ) {
5272        trace!(%network_path, %path_id, "migration initiated");
5273        self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
5274        // TODO(@divma): conditions for path migration in multipath are very specific, check them
5275        // again to prevent path migrations that should actually create a new path
5276
5277        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
5278        // Note that the congestion window will not grow until validation terminates. Helps mitigate
5279        // amplification attacks performed by spoofing source addresses.
5280        let prev_pto = self.pto(SpaceKind::Data, path_id);
5281        let path = self.paths.get_mut(&path_id).expect("known path");
5282        let mut new_path_data = if network_path.remote.is_ipv4()
5283            && network_path.remote.ip() == path.data.network_path.remote.ip()
5284        {
5285            PathData::from_previous(network_path, &path.data, self.path_generation_counter, now)
5286        } else {
5287            let peer_max_udp_payload_size =
5288                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
5289                    .unwrap_or(u16::MAX);
5290            PathData::new(
5291                network_path,
5292                self.allow_mtud,
5293                Some(peer_max_udp_payload_size),
5294                self.path_generation_counter,
5295                now,
5296                &self.config,
5297            )
5298        };
5299        new_path_data.last_observed_addr_report = path.data.last_observed_addr_report.clone();
5300        if let Some(report) = observed_addr
5301            && let Some(updated) = new_path_data.update_observed_addr_report(report)
5302        {
5303            tracing::info!("adding observed addr event from migration");
5304            self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5305                id: path_id,
5306                addr: updated,
5307            }));
5308        }
5309        new_path_data.pending_on_path_challenge = true;
5310
5311        let mut prev_path_data = mem::replace(&mut path.data, new_path_data);
5312
5313        // Only store this as previous path if it was validated. For all we know there could
5314        // already be a previous path stored which might have been validated in the past,
5315        // which is more valuable than one that's not yet validated.
5316        //
5317        // With multipath it is possible that there are no remote CIDs for the path ID
5318        // yet. In this case we would never have sent on this path yet and would not be able
5319        // to send a PATH_CHALLENGE either, which is currently a fire-and-forget affair
5320        // anyway. So don't store such a path either.
5321        if !prev_path_data.validated
5322            && let Some(cid) = self.remote_cids.get(&path_id).map(CidQueue::active)
5323        {
5324            prev_path_data.pending_on_path_challenge = true;
5325            // We haven't updated the remote CID yet, this captures the remote CID we were using on
5326            // the previous path.
5327            path.prev = Some((cid, prev_path_data));
5328        }
5329
5330        // We need to re-assign the correct remote to this path in qlog
5331        self.qlog.emit_tuple_assigned(path_id, network_path, now);
5332
5333        self.timers.set(
5334            Timer::PerPath(path_id, PathTimer::PathValidation),
5335            now + 3 * cmp::max(self.pto(SpaceKind::Data, path_id), prev_pto),
5336            self.qlog.with_time(now),
5337        );
5338    }
5339
5340    /// Handle a change in the local address, i.e. an active migration
5341    ///
5342    /// In the general (non-multipath) case, paths will perform a RFC9000 migration and be pinged
5343    /// for a liveness check. This is the behaviour of a path assumed to be recoverable, even if
5344    /// this is not the case.
5345    ///
5346    /// Clients in a connection in which multipath has been negotiated should migrate paths to new
5347    /// [`PathId`]s. For paths that are known to be non-recoverable can be migrated to a new
5348    /// [`PathId`] by closing the current path, and opening a new one to the same remote. Treating
5349    /// paths as non recoverable when necessary accelerates connectivity re-establishment, or might
5350    /// allow it altogether.
5351    ///
5352    /// The optional `hint` allows callers to indicate when paths are non-recoverable and should be
5353    /// migrated to new a [`PathId`].
5354    // NOTE: only clients are allowed to migrate, but generally dealing with RFC9000 migrations is
5355    // lacking <https://github.com/n0-computer/noq/issues/364>
5356    pub fn handle_network_change(&mut self, hint: Option<&dyn NetworkChangeHint>, now: Instant) {
5357        debug!("network changed");
5358        if self.state.is_drained() {
5359            return;
5360        }
5361        if self.highest_space < SpaceKind::Data {
5362            for path in self.paths.values_mut() {
5363                // Clear the local address for it to be obtained from the socket again.
5364                path.data.network_path.local_ip = None;
5365            }
5366
5367            self.update_remote_cid(PathId::ZERO);
5368            self.ping();
5369
5370            return;
5371        }
5372
5373        // Paths that can't recover so a new path should be open instead. If multipath is not
5374        // negotiated, this will be empty.
5375        let mut non_recoverable_paths = Vec::default();
5376        let mut recoverable_paths = Vec::default();
5377        let mut open_paths = 0;
5378
5379        let is_multipath_negotiated = self.is_multipath_negotiated();
5380        let is_client = self.side().is_client();
5381        let immediate_ack_allowed = self.peer_supports_ack_frequency();
5382
5383        for (path_id, path) in self.paths.iter_mut() {
5384            if self.abandoned_paths.contains(path_id) {
5385                continue;
5386            }
5387            open_paths += 1;
5388
5389            // Clear the local address for it to be obtained from the socket again. This applies to
5390            // all paths, regardless of being considered recoverable or not
5391            path.data.network_path.local_ip = None;
5392
5393            let network_path = path.data.network_path;
5394            let remote = network_path.remote;
5395
5396            // Without multipath, the connection tries to recover the single path, whereas with
5397            // multipath, even in a single-path scenario, we attempt to migrate the path to a new
5398            // PathId.
5399            let attempt_to_recover = if is_multipath_negotiated {
5400                if is_client {
5401                    hint.map(|h| h.is_path_recoverable(*path_id, network_path))
5402                        .unwrap_or(false)
5403                } else {
5404                    // Servers should have stable addresses so this scenario is generally discouraged.
5405                    // There is no way to prevent this, so the best hope is to attempt to recover the
5406                    // path
5407                    true
5408                }
5409            } else {
5410                // In the non multipath case, we try to recover the single active path
5411                true
5412            };
5413
5414            if attempt_to_recover {
5415                recoverable_paths.push((*path_id, remote));
5416            } else {
5417                non_recoverable_paths.push((*path_id, remote, path.data.local_status()))
5418            }
5419        }
5420
5421        /* NON RECOVERABLE PATHS */
5422        // This are handled first, so that in case the treatment intended for these fails, we can
5423        // go the recoverable route instead.
5424
5425        // Decide if we need to close first or open first in the multipath case.
5426        // - Opening first has a higher risk of getting limited by the negotiated MAX_PATH_ID.
5427        // - Closing first risks this being the only open path.
5428        // We prefer closing paths first unless we identify this is the last open path.
5429        let open_first = open_paths == non_recoverable_paths.len();
5430
5431        for (path_id, remote, status) in non_recoverable_paths.into_iter() {
5432            let network_path = FourTuple {
5433                remote,
5434                local_ip: None, /* allow the local ip to be discovered */
5435            };
5436
5437            if open_first && let Err(e) = self.open_path(network_path, status, now) {
5438                debug!(%e,"Failed to open new path for network change");
5439                // if this fails, let the path try to recover itself
5440                recoverable_paths.push((path_id, remote));
5441                continue;
5442            }
5443
5444            if let Err(e) =
5445                self.close_path_inner(now, path_id, PathAbandonReason::UnusableAfterNetworkChange)
5446            {
5447                debug!(%e,"Failed to close unrecoverable path after network change");
5448                recoverable_paths.push((path_id, remote));
5449                continue;
5450            }
5451
5452            if !open_first && let Err(e) = self.open_path(network_path, status, now) {
5453                // Path has already been closed if we got here. Since the path was not recoverable,
5454                // this might be desirable in any case, because other paths exist (!open_first) and
5455                // this was is considered non recoverable
5456                debug!(%e,"Failed to open new path for network change");
5457            }
5458        }
5459
5460        /* RECOVERABLE PATHS */
5461
5462        for (path_id, remote) in recoverable_paths.into_iter() {
5463            let space = &mut self.spaces[SpaceId::Data];
5464
5465            // Schedule a Ping for a liveness check.
5466            if let Some(path_space) = space.number_spaces.get_mut(&path_id) {
5467                path_space.ping_pending = true;
5468
5469                if immediate_ack_allowed {
5470                    path_space.immediate_ack_pending = true;
5471                }
5472            }
5473
5474            let Some((reset_token, retired)) =
5475                self.remote_cids.get_mut(&path_id).and_then(CidQueue::next)
5476            else {
5477                continue;
5478            };
5479
5480            // Retire the current remote CID and any CIDs we had to skip.
5481            space
5482                .pending
5483                .retire_cids
5484                .extend(retired.map(|seq| (path_id, seq)));
5485
5486            debug_assert!(!self.state.is_drained()); // required for endpoint_events, checked above
5487            self.endpoint_events
5488                .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5489        }
5490    }
5491
5492    /// Switch to a previously unused remote connection ID, if possible
5493    fn update_remote_cid(&mut self, path_id: PathId) {
5494        let Some((reset_token, retired)) = self
5495            .remote_cids
5496            .get_mut(&path_id)
5497            .and_then(|cids| cids.next())
5498        else {
5499            return;
5500        };
5501
5502        // Retire the current remote CID and any CIDs we had to skip.
5503        self.spaces[SpaceId::Data]
5504            .pending
5505            .retire_cids
5506            .extend(retired.map(|seq| (path_id, seq)));
5507        let remote = self.path_data(path_id).network_path.remote;
5508        self.set_reset_token(path_id, remote, reset_token);
5509    }
5510
5511    /// Sends this reset token to the endpoint
5512    ///
5513    /// The endpoint needs to know the reset tokens issued by the peer, so that if the peer
5514    /// sends a reset token it knows to route it to this connection. See RFC 9000 section
5515    /// 10.3. Stateless Reset.
5516    ///
5517    /// Reset tokens are different for each path, the endpoint identifies paths by peer
5518    /// socket address however, not by path ID.
5519    fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5520        debug_assert!(!self.state.is_drained()); // required for endpoint events, set_reset_token is never called for drained connections
5521        self.endpoint_events
5522            .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5523
5524        // During the handshake the server sends a reset token in the transport
5525        // parameters. When we are the client and we receive the reset token during the
5526        // handshake we want this to affect our peer transport parameters.
5527        // TODO(flub): Pretty sure this is pointless, the entire params is overwritten
5528        //    shortly after this was called.  And then the params don't have this anymore.
5529        if path_id == PathId::ZERO {
5530            self.peer_params.stateless_reset_token = Some(reset_token);
5531        }
5532    }
5533
5534    /// Issue an initial set of connection IDs to the peer upon connection
5535    fn issue_first_cids(&mut self, now: Instant) {
5536        if self
5537            .local_cid_state
5538            .get(&PathId::ZERO)
5539            .expect("PathId::ZERO exists when the connection is created")
5540            .cid_len()
5541            == 0
5542        {
5543            return;
5544        }
5545
5546        // Subtract 1 to account for the CID we supplied while handshaking
5547        let mut n = self.peer_params.issue_cids_limit() - 1;
5548        if let ConnectionSide::Server { server_config } = &self.side
5549            && server_config.has_preferred_address()
5550        {
5551            // We also sent a CID in the transport parameters
5552            n -= 1;
5553        }
5554        debug_assert!(!self.state.is_drained()); // requirement for endpoint_events
5555        self.endpoint_events
5556            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5557    }
5558
5559    /// Issues an initial set of CIDs for paths that have not yet had any CIDs issued
5560    ///
5561    /// Later CIDs are issued when CIDs expire or are retired by the peer.
5562    fn issue_first_path_cids(&mut self, now: Instant) {
5563        if let Some(max_path_id) = self.max_path_id() {
5564            let mut path_id = self.max_path_id_with_cids.next();
5565            while path_id <= max_path_id {
5566                self.endpoint_events
5567                    .push_back(EndpointEventInner::NeedIdentifiers(
5568                        path_id,
5569                        now,
5570                        self.peer_params.issue_cids_limit(),
5571                    ));
5572                path_id = path_id.next();
5573            }
5574            self.max_path_id_with_cids = max_path_id;
5575        }
5576    }
5577
5578    /// Populates a packet with frames
5579    ///
5580    /// This tries to fit as many frames as possible into the packet.
5581    ///
5582    /// *path_exclusive_only* means to only build frames which can only be sent on this
5583    /// *path.  This is used in multipath for backup paths while there is still an active
5584    /// *path.
5585    fn populate_packet<'a, 'b>(
5586        &mut self,
5587        now: Instant,
5588        space_id: SpaceId,
5589        path_id: PathId,
5590        scheduling_info: &PathSchedulingInfo,
5591        builder: &mut PacketBuilder<'a, 'b>,
5592    ) {
5593        let is_multipath_negotiated = self.is_multipath_negotiated();
5594        let space_has_keys = self.crypto_state.has_keys(space_id.encryption_level());
5595        let is_0rtt = space_id == SpaceId::Data && !space_has_keys;
5596        let stats = &mut self.stats.frame_tx;
5597        let space = &mut self.spaces[space_id];
5598        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5599        space
5600            .for_path(path_id)
5601            .pending_acks
5602            .maybe_ack_non_eliciting();
5603
5604        // HANDSHAKE_DONE
5605        if !is_0rtt
5606            && scheduling_info.may_send_data
5607            && mem::replace(&mut space.pending.handshake_done, false)
5608        {
5609            builder.write_frame(frame::HandshakeDone, stats);
5610        }
5611
5612        // REACH_OUT
5613        if let Some((round, addresses)) = space.pending.reach_out.as_mut()
5614            && scheduling_info.may_send_data
5615        {
5616            while let Some(local_addr) = addresses.iter().next().copied() {
5617                let local_addr = addresses.take(&local_addr).expect("found from iter");
5618                let reach_out = frame::ReachOut::new(*round, local_addr);
5619                if builder.frame_space_remaining() > reach_out.size() {
5620                    builder.write_frame(reach_out, stats);
5621                } else {
5622                    addresses.insert(local_addr);
5623                    break;
5624                }
5625            }
5626            if addresses.is_empty() {
5627                space.pending.reach_out = None;
5628            }
5629        }
5630
5631        // OBSERVED_ADDR
5632        if scheduling_info.may_send_data
5633            && space_id == SpaceId::Data
5634            && self
5635                .config
5636                .address_discovery_role
5637                .should_report(&self.peer_params.address_discovery_role)
5638            && (!path.observed_addr_sent || space.pending.observed_addr)
5639        {
5640            let frame =
5641                frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
5642            if builder.frame_space_remaining() > frame.size() {
5643                builder.write_frame(frame, stats);
5644
5645                self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5646                path.observed_addr_sent = true;
5647
5648                space.pending.observed_addr = false;
5649            }
5650        }
5651
5652        // PING
5653        if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5654            builder.write_frame(frame::Ping, stats);
5655        }
5656
5657        // IMMEDIATE_ACK
5658        if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5659            debug_assert_eq!(
5660                space_id,
5661                SpaceId::Data,
5662                "immediate acks must be sent in the data space"
5663            );
5664            builder.write_frame(frame::ImmediateAck, stats);
5665        }
5666
5667        // ACK
5668        if scheduling_info.may_send_data {
5669            for path_id in space
5670                .number_spaces
5671                .iter_mut()
5672                .filter(|(_, pns)| pns.pending_acks.can_send())
5673                .map(|(&path_id, _)| path_id)
5674                .collect::<Vec<_>>()
5675            {
5676                Self::populate_acks(
5677                    now,
5678                    self.receiving_ecn,
5679                    path_id,
5680                    space_id,
5681                    space,
5682                    is_multipath_negotiated,
5683                    builder,
5684                    stats,
5685                    space_has_keys,
5686                );
5687            }
5688        }
5689
5690        // ACK_FREQUENCY
5691        if scheduling_info.may_send_data && mem::replace(&mut space.pending.ack_frequency, false) {
5692            let sequence_number = self.ack_frequency.next_sequence_number();
5693
5694            // Safe to unwrap because this is always provided when ACK frequency is enabled
5695            let config = self.config.ack_frequency_config.as_ref().unwrap();
5696
5697            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
5698            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5699                path.rtt.get(),
5700                config,
5701                &self.peer_params,
5702            );
5703
5704            let frame = frame::AckFrequency {
5705                sequence: sequence_number,
5706                ack_eliciting_threshold: config.ack_eliciting_threshold,
5707                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5708                reordering_threshold: config.reordering_threshold,
5709            };
5710            builder.write_frame(frame, stats);
5711
5712            self.ack_frequency
5713                .ack_frequency_sent(path_id, builder.packet_number, max_ack_delay);
5714        }
5715
5716        // PATH_CHALLENGE
5717        if builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
5718            && space_id == SpaceId::Data
5719            && path.pending_on_path_challenge
5720            && !self.state.is_closed()
5721        // we don't want to send new challenges if we are already closing
5722        {
5723            path.pending_on_path_challenge = false;
5724
5725            let token = self.rng.random();
5726            path.record_path_challenge_sent(now, token, path.network_path);
5727            // Generate a new challenge every time we send a new PATH_CHALLENGE
5728            let challenge = frame::PathChallenge(token);
5729            builder.write_frame(challenge, stats);
5730            builder.require_padding();
5731            let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5732            if path.open_status == paths::OpenStatus::Pending {
5733                path.open_status = paths::OpenStatus::Sent;
5734                self.timers.set(
5735                    Timer::PerPath(path_id, PathTimer::PathOpen),
5736                    now + 3 * pto,
5737                    self.qlog.with_time(now),
5738                );
5739            }
5740
5741            self.timers.set(
5742                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5743                now + pto,
5744                self.qlog.with_time(now),
5745            );
5746
5747            if is_multipath_negotiated && !path.validated && path.pending_on_path_challenge {
5748                // queue informing the path status along with the challenge
5749                space.pending.path_status.insert(path_id);
5750            }
5751
5752            // Always include an OBSERVED_ADDR frame with a PATH_CHALLENGE, regardless
5753            // of whether one has already been sent on this path.
5754            if space_id == SpaceId::Data
5755                && self
5756                    .config
5757                    .address_discovery_role
5758                    .should_report(&self.peer_params.address_discovery_role)
5759            {
5760                let frame = frame::ObservedAddr::new(
5761                    path.network_path.remote,
5762                    self.next_observed_addr_seq_no,
5763                );
5764                if builder.frame_space_remaining() > frame.size() {
5765                    builder.write_frame(frame, stats);
5766
5767                    self.next_observed_addr_seq_no =
5768                        self.next_observed_addr_seq_no.saturating_add(1u8);
5769                    path.observed_addr_sent = true;
5770
5771                    space.pending.observed_addr = false;
5772                }
5773            }
5774        }
5775
5776        // PATH_RESPONSE
5777        if builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
5778            && space_id == SpaceId::Data
5779            && let Some(token) = path.path_responses.pop_on_path(path.network_path)
5780        {
5781            let response = frame::PathResponse(token);
5782            builder.write_frame(response, stats);
5783            builder.require_padding();
5784
5785            // NOTE: this is technically not required but might be useful to ride the
5786            // request/response nature of path challenges to refresh an observation
5787            // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
5788            if space_id == SpaceId::Data
5789                && self
5790                    .config
5791                    .address_discovery_role
5792                    .should_report(&self.peer_params.address_discovery_role)
5793            {
5794                let frame = frame::ObservedAddr::new(
5795                    path.network_path.remote,
5796                    self.next_observed_addr_seq_no,
5797                );
5798                if builder.frame_space_remaining() > frame.size() {
5799                    builder.write_frame(frame, stats);
5800
5801                    self.next_observed_addr_seq_no =
5802                        self.next_observed_addr_seq_no.saturating_add(1u8);
5803                    path.observed_addr_sent = true;
5804
5805                    space.pending.observed_addr = false;
5806                }
5807            }
5808        }
5809
5810        // CRYPTO
5811        while !is_0rtt
5812            && scheduling_info.may_send_data
5813            && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
5814        {
5815            let Some(mut frame) = space.pending.crypto.pop_front() else {
5816                break;
5817            };
5818
5819            // Calculate the maximum amount of crypto data we can store in the buffer.
5820            // Since the offset is known, we can reserve the exact size required to encode it.
5821            // For length we reserve 2bytes which allows to encode up to 2^14,
5822            // which is more than what fits into normally sized QUIC frames.
5823            let max_crypto_data_size = builder.frame_space_remaining()
5824                - 1 // Frame Type
5825                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5826                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
5827
5828            let len = frame
5829                .data
5830                .len()
5831                .min(2usize.pow(14) - 1)
5832                .min(max_crypto_data_size);
5833
5834            let data = frame.data.split_to(len);
5835            let offset = frame.offset;
5836            let truncated = frame::Crypto { offset, data };
5837            builder.write_frame(truncated, stats);
5838
5839            if !frame.data.is_empty() {
5840                frame.offset += len as u64;
5841                space.pending.crypto.push_front(frame);
5842            }
5843        }
5844
5845        // TODO(flub): maybe this is much higher priority?
5846        // PATH_ABANDON
5847        while space_id == SpaceId::Data
5848            && scheduling_info.may_send_data
5849            && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5850        {
5851            let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
5852            else {
5853                break;
5854            };
5855            let frame = frame::PathAbandon {
5856                path_id: abandoned_path_id,
5857                error_code,
5858            };
5859            builder.write_frame(frame, stats);
5860        }
5861
5862        // PATH_STATUS_AVAILABLE & PATH_STATUS_BACKUP
5863        while space_id == SpaceId::Data
5864            && scheduling_info.may_send_data
5865            && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
5866        {
5867            let Some(path_id) = space.pending.path_status.pop_first() else {
5868                break;
5869            };
5870            let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5871                trace!(%path_id, "discarding queued path status for unknown path");
5872                continue;
5873            };
5874
5875            let seq = path.status.seq();
5876            match path.local_status() {
5877                PathStatus::Available => {
5878                    let frame = frame::PathStatusAvailable {
5879                        path_id,
5880                        status_seq_no: seq,
5881                    };
5882                    builder.write_frame(frame, stats);
5883                }
5884                PathStatus::Backup => {
5885                    let frame = frame::PathStatusBackup {
5886                        path_id,
5887                        status_seq_no: seq,
5888                    };
5889                    builder.write_frame(frame, stats);
5890                }
5891            }
5892        }
5893
5894        // MAX_PATH_ID
5895        if space_id == SpaceId::Data
5896            && scheduling_info.may_send_data
5897            && space.pending.max_path_id
5898            && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
5899        {
5900            let frame = frame::MaxPathId(self.local_max_path_id);
5901            builder.write_frame(frame, stats);
5902            space.pending.max_path_id = false;
5903        }
5904
5905        // PATHS_BLOCKED
5906        if space_id == SpaceId::Data
5907            && scheduling_info.may_send_data
5908            && space.pending.paths_blocked
5909            && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5910        {
5911            let frame = frame::PathsBlocked(self.remote_max_path_id);
5912            builder.write_frame(frame, stats);
5913            space.pending.paths_blocked = false;
5914        }
5915
5916        // PATH_CIDS_BLOCKED
5917        while space_id == SpaceId::Data
5918            && scheduling_info.may_send_data
5919            && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5920        {
5921            let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
5922                break;
5923            };
5924            let next_seq = match self.remote_cids.get(&path_id) {
5925                Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
5926                None => VarInt(0),
5927            };
5928            let frame = frame::PathCidsBlocked { path_id, next_seq };
5929            builder.write_frame(frame, stats);
5930        }
5931
5932        // RESET_STREAM, STOP_SENDING, MAX_DATA, MAX_STREAM_DATA, MAX_STREAMS
5933        if space_id == SpaceId::Data && scheduling_info.may_send_data {
5934            self.streams
5935                .write_control_frames(builder, &mut space.pending, stats);
5936        }
5937
5938        // NEW_CONNECTION_ID
5939        let cid_len = self
5940            .local_cid_state
5941            .values()
5942            .map(|cid_state| cid_state.cid_len())
5943            .max()
5944            .expect("some local CID state must exist");
5945        let new_cid_size_bound =
5946            frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5947        while scheduling_info.may_send_data && builder.frame_space_remaining() > new_cid_size_bound
5948        {
5949            let Some(issued) = space.pending.new_cids.pop() else {
5950                break;
5951            };
5952            let retire_prior_to = self
5953                .local_cid_state
5954                .get(&issued.path_id)
5955                .map(|cid_state| cid_state.retire_prior_to())
5956                .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5957
5958            let cid_path_id = match is_multipath_negotiated {
5959                true => Some(issued.path_id),
5960                false => {
5961                    debug_assert_eq!(issued.path_id, PathId::ZERO);
5962                    None
5963                }
5964            };
5965            let frame = frame::NewConnectionId {
5966                path_id: cid_path_id,
5967                sequence: issued.sequence,
5968                retire_prior_to,
5969                id: issued.id,
5970                reset_token: issued.reset_token,
5971            };
5972            builder.write_frame(frame, stats);
5973        }
5974
5975        // RETIRE_CONNECTION_ID
5976        let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5977        while scheduling_info.may_send_data && builder.frame_space_remaining() > retire_cid_bound {
5978            let (path_id, sequence) = match space.pending.retire_cids.pop() {
5979                Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
5980                Some((path_id, seq)) => (Some(path_id), seq),
5981                None => break,
5982            };
5983            let frame = frame::RetireConnectionId { path_id, sequence };
5984            builder.write_frame(frame, stats);
5985        }
5986
5987        // DATAGRAM
5988        let mut sent_datagrams = false;
5989        while scheduling_info.may_send_data
5990            && builder.frame_space_remaining() > Datagram::SIZE_BOUND
5991            && space_id == SpaceId::Data
5992        {
5993            match self.datagrams.write(builder, stats) {
5994                true => {
5995                    sent_datagrams = true;
5996                }
5997                false => break,
5998            }
5999        }
6000        if self.datagrams.send_blocked && sent_datagrams {
6001            self.events.push_back(Event::DatagramsUnblocked);
6002            self.datagrams.send_blocked = false;
6003        }
6004
6005        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6006
6007        // NEW_TOKEN
6008        if scheduling_info.may_send_data {
6009            while let Some(network_path) = space.pending.new_tokens.pop() {
6010                debug_assert_eq!(space_id, SpaceId::Data);
6011                let ConnectionSide::Server { server_config } = &self.side else {
6012                    panic!("NEW_TOKEN frames should not be enqueued by clients");
6013                };
6014
6015                if !network_path.is_probably_same_path(&path.network_path) {
6016                    // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
6017                    // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
6018                    // frames upon an path change. Instead, when the new path becomes validated,
6019                    // NEW_TOKEN frames may be enqueued for the new path instead.
6020                    continue;
6021                }
6022
6023                let token = Token::new(
6024                    TokenPayload::Validation {
6025                        ip: network_path.remote.ip(),
6026                        issued: server_config.time_source.now(),
6027                    },
6028                    &mut self.rng,
6029                );
6030                let new_token = NewToken {
6031                    token: token.encode(&*server_config.token_key).into(),
6032                };
6033
6034                if builder.frame_space_remaining() < new_token.size() {
6035                    space.pending.new_tokens.push(network_path);
6036                    break;
6037                }
6038
6039                builder.write_frame(new_token, stats);
6040                builder.retransmits_mut().new_tokens.push(network_path);
6041            }
6042        }
6043
6044        // STREAM
6045        if scheduling_info.may_send_data && space_id == SpaceId::Data {
6046            self.streams
6047                .write_stream_frames(builder, self.config.send_fairness, stats);
6048        }
6049
6050        // ADD_ADDRESS
6051        while space_id == SpaceId::Data
6052            && scheduling_info.may_send_data
6053            && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
6054        {
6055            if let Some(added_address) = space.pending.add_address.pop_last() {
6056                builder.write_frame(added_address, stats);
6057            } else {
6058                break;
6059            }
6060        }
6061
6062        // REMOVE_ADDRESS
6063        while space_id == SpaceId::Data
6064            && scheduling_info.may_send_data
6065            && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
6066        {
6067            if let Some(removed_address) = space.pending.remove_address.pop_last() {
6068                builder.write_frame(removed_address, stats);
6069            } else {
6070                break;
6071            }
6072        }
6073    }
6074
6075    /// Write pending ACKs into a buffer
6076    fn populate_acks<'a, 'b>(
6077        now: Instant,
6078        receiving_ecn: bool,
6079        path_id: PathId,
6080        space_id: SpaceId,
6081        space: &mut PacketSpace,
6082        is_multipath_negotiated: bool,
6083        builder: &mut PacketBuilder<'a, 'b>,
6084        stats: &mut FrameStats,
6085        space_has_keys: bool,
6086    ) {
6087        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
6088        debug_assert!(space_has_keys, "tried to send ACK in 0-RTT");
6089
6090        debug_assert!(
6091            is_multipath_negotiated || path_id == PathId::ZERO,
6092            "Only PathId::ZERO allowed without multipath (have {path_id:?})"
6093        );
6094        if is_multipath_negotiated {
6095            debug_assert!(
6096                space_id == SpaceId::Data || path_id == PathId::ZERO,
6097                "path acks must be sent in 1RTT space (have {space_id:?})"
6098            );
6099        }
6100
6101        let pns = space.for_path(path_id);
6102        let ranges = pns.pending_acks.ranges();
6103        debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
6104        let ecn = if receiving_ecn {
6105            Some(&pns.ecn_counters)
6106        } else {
6107            None
6108        };
6109
6110        let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
6111        // TODO: This should come from `TransportConfig` if that gets configurable.
6112        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
6113        let delay = delay_micros >> ack_delay_exp.into_inner();
6114
6115        if is_multipath_negotiated && space_id == SpaceId::Data {
6116            if !ranges.is_empty() {
6117                let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
6118                builder.write_frame(frame, stats);
6119            }
6120        } else {
6121            builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
6122        }
6123    }
6124
6125    fn close_common(&mut self) {
6126        trace!("connection closed");
6127        self.timers.reset();
6128    }
6129
6130    fn set_close_timer(&mut self, now: Instant) {
6131        // QUIC-MULTIPATH § 2.6 Connection Closure: draining for 3*PTO using the max PTO of
6132        // all paths.
6133        let pto_max = self.max_pto_for_space(self.highest_space);
6134        self.timers.set(
6135            Timer::Conn(ConnTimer::Close),
6136            now + 3 * pto_max,
6137            self.qlog.with_time(now),
6138        );
6139    }
6140
6141    /// Handle transport parameters received from the peer
6142    ///
6143    /// *remote_cid* and *local_cid* are the source and destination CIDs respectively of the
6144    /// *packet into which the transport parameters arrived.
6145    fn handle_peer_params(
6146        &mut self,
6147        params: TransportParameters,
6148        local_cid: ConnectionId,
6149        remote_cid: ConnectionId,
6150        now: Instant,
6151    ) -> Result<(), TransportError> {
6152        if Some(self.original_remote_cid) != params.initial_src_cid
6153            || (self.side.is_client()
6154                && (Some(self.initial_dst_cid) != params.original_dst_cid
6155                    || self.retry_src_cid != params.retry_src_cid))
6156        {
6157            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
6158                "CID authentication failure",
6159            ));
6160        }
6161        if params.initial_max_path_id.is_some() && (local_cid.is_empty() || remote_cid.is_empty()) {
6162            return Err(TransportError::PROTOCOL_VIOLATION(
6163                "multipath must not use zero-length CIDs",
6164            ));
6165        }
6166
6167        self.set_peer_params(params);
6168        self.qlog.emit_peer_transport_params_received(self, now);
6169
6170        Ok(())
6171    }
6172
6173    fn set_peer_params(&mut self, params: TransportParameters) {
6174        self.streams.set_params(&params);
6175        self.idle_timeout =
6176            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
6177        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
6178
6179        if let Some(ref info) = params.preferred_address {
6180            // During the handshake PathId::ZERO exists.
6181            self.remote_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
6182                path_id: None,
6183                sequence: 1,
6184                id: info.connection_id,
6185                reset_token: info.stateless_reset_token,
6186                retire_prior_to: 0,
6187            })
6188            .expect(
6189                "preferred address CID is the first received, and hence is guaranteed to be legal",
6190            );
6191            let remote = self.path_data(PathId::ZERO).network_path.remote;
6192            self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
6193        }
6194        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
6195
6196        let mut multipath_enabled = None;
6197        if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
6198            self.config.get_initial_max_path_id(),
6199            params.initial_max_path_id,
6200        ) {
6201            // multipath is enabled, register the local and remote maximums
6202            self.local_max_path_id = local_max_path_id;
6203            self.remote_max_path_id = remote_max_path_id;
6204            let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
6205            debug!(%initial_max_path_id, "multipath negotiated");
6206            multipath_enabled = Some(initial_max_path_id);
6207        }
6208
6209        if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
6210            self.config
6211                .max_remote_nat_traversal_addresses
6212                .zip(params.max_remote_nat_traversal_addresses)
6213        {
6214            if let Some(max_initial_paths) =
6215                multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
6216            {
6217                let max_local_addresses = max_remotely_allowed_remote_addresses.get();
6218                let max_remote_addresses = max_locally_allowed_remote_addresses.get();
6219                self.n0_nat_traversal = n0_nat_traversal::State::new(
6220                    max_remote_addresses,
6221                    max_local_addresses,
6222                    self.side(),
6223                );
6224                debug!(
6225                    %max_remote_addresses, %max_local_addresses,
6226                    "n0's nat traversal negotiated"
6227                );
6228
6229                match self.side() {
6230                    Side::Client => {
6231                        if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
6232                            // in this case the client might try to open `max_remote_addresses` new
6233                            // paths, but the current multipath configuration will not allow it
6234                            debug!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
6235                        } else if max_local_addresses as u64
6236                            > params.active_connection_id_limit.into_inner()
6237                        {
6238                            // the server allows us to send at most `params.active_connection_id_limit`
6239                            // but they might need at least `max_local_addresses` to effectively send
6240                            // `PATH_CHALLENGE` frames to each advertised local address
6241                            debug!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
6242                        }
6243                    }
6244                    Side::Server => {
6245                        if (max_initial_paths.as_u32() as u64) < crate::LOCAL_CID_COUNT {
6246                            debug!(%max_initial_paths, local_cid_limit=%crate::LOCAL_CID_COUNT, "local server configuration might cause nat traversal issues")
6247                        }
6248                    }
6249                }
6250            } else {
6251                debug!("n0 nat traversal enabled for both endpoints, but multipath is missing")
6252            }
6253        }
6254
6255        self.peer_params = params;
6256        let peer_max_udp_payload_size =
6257            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
6258        self.path_data_mut(PathId::ZERO)
6259            .mtud
6260            .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
6261    }
6262
6263    /// Decrypts a packet, returning the packet number on success
6264    fn decrypt_packet(
6265        &mut self,
6266        now: Instant,
6267        path_id: PathId,
6268        packet: &mut Packet,
6269    ) -> Result<Option<u64>, Option<TransportError>> {
6270        let result = self
6271            .crypto_state
6272            .decrypt_packet_body(packet, path_id, &self.spaces)?;
6273
6274        let Some(result) = result else {
6275            return Ok(None);
6276        };
6277
6278        if result.outgoing_key_update_acked
6279            && let Some(prev) = self.crypto_state.prev_crypto.as_mut()
6280        {
6281            prev.end_packet = Some((result.number, now));
6282            self.set_key_discard_timer(now, packet.header.space());
6283        }
6284
6285        if result.incoming_key_update {
6286            trace!("key update authenticated");
6287            self.crypto_state
6288                .update_keys(Some((result.number, now)), true);
6289            self.set_key_discard_timer(now, packet.header.space());
6290        }
6291
6292        Ok(Some(result.number))
6293    }
6294
6295    fn peer_supports_ack_frequency(&self) -> bool {
6296        self.peer_params.min_ack_delay.is_some()
6297    }
6298
6299    /// Send an IMMEDIATE_ACK frame to the remote endpoint
6300    ///
6301    /// According to the spec, this will result in an error if the remote endpoint does not support
6302    /// the Acknowledgement Frequency extension
6303    pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
6304        debug_assert_eq!(
6305            self.highest_space,
6306            SpaceKind::Data,
6307            "immediate ack must be written in the data space"
6308        );
6309        self.spaces[SpaceId::Data]
6310            .for_path(path_id)
6311            .immediate_ack_pending = true;
6312    }
6313
6314    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
6315    #[cfg(test)]
6316    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
6317        let ConnectionEventInner::Datagram(DatagramConnectionEvent {
6318            path_id,
6319            first_decode,
6320            remaining,
6321            ..
6322        }) = &event.0
6323        else {
6324            return None;
6325        };
6326
6327        if remaining.is_some() {
6328            panic!("Packets should never be coalesced in tests");
6329        }
6330
6331        let decrypted_header = self
6332            .crypto_state
6333            .unprotect_header(first_decode.clone(), self.peer_params.stateless_reset_token)?;
6334
6335        let mut packet = decrypted_header.packet?;
6336        self.crypto_state
6337            .decrypt_packet_body(&mut packet, *path_id, &self.spaces)
6338            .ok()?;
6339
6340        Some(packet.payload.to_vec())
6341    }
6342
6343    /// The number of bytes of packets containing retransmittable frames that have not been
6344    /// acknowledged or declared lost.
6345    #[cfg(test)]
6346    pub(crate) fn bytes_in_flight(&self) -> u64 {
6347        // TODO(@divma): consider including for multipath?
6348        self.path_data(PathId::ZERO).in_flight.bytes
6349    }
6350
6351    /// Number of bytes worth of non-ack-only packets that may be sent
6352    #[cfg(test)]
6353    pub(crate) fn congestion_window(&self) -> u64 {
6354        let path = self.path_data(PathId::ZERO);
6355        path.congestion
6356            .window()
6357            .saturating_sub(path.in_flight.bytes)
6358    }
6359
6360    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
6361    #[cfg(test)]
6362    pub(crate) fn is_idle(&self) -> bool {
6363        let current_timers = self.timers.values();
6364        current_timers
6365            .into_iter()
6366            .filter(|(timer, _)| {
6367                !matches!(
6368                    timer,
6369                    Timer::Conn(ConnTimer::KeepAlive)
6370                        | Timer::PerPath(_, PathTimer::PathKeepAlive)
6371                        | Timer::Conn(ConnTimer::PushNewCid)
6372                        | Timer::Conn(ConnTimer::KeyDiscard)
6373                )
6374            })
6375            .min_by_key(|(_, time)| *time)
6376            .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6377    }
6378
6379    /// Whether explicit congestion notification is in use on outgoing packets.
6380    #[cfg(test)]
6381    pub(crate) fn using_ecn(&self) -> bool {
6382        self.path_data(PathId::ZERO).sending_ecn
6383    }
6384
6385    /// The number of received bytes in the current path
6386    #[cfg(test)]
6387    pub(crate) fn total_recvd(&self) -> u64 {
6388        self.path_data(PathId::ZERO).total_recvd
6389    }
6390
6391    #[cfg(test)]
6392    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6393        self.local_cid_state
6394            .get(&PathId::ZERO)
6395            .unwrap()
6396            .active_seq()
6397    }
6398
6399    #[cfg(test)]
6400    #[track_caller]
6401    pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6402        self.local_cid_state
6403            .get(&PathId(path_id))
6404            .unwrap()
6405            .active_seq()
6406    }
6407
6408    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
6409    /// with updated `retire_prior_to` field set to `v`
6410    #[cfg(test)]
6411    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6412        let n = self
6413            .local_cid_state
6414            .get_mut(&PathId::ZERO)
6415            .unwrap()
6416            .assign_retire_seq(v);
6417        debug_assert!(!self.state.is_drained()); // requirement for endpoint_events
6418        self.endpoint_events
6419            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6420    }
6421
6422    /// Check the current active remote CID sequence for `PathId::ZERO`
6423    #[cfg(test)]
6424    pub(crate) fn active_remote_cid_seq(&self) -> u64 {
6425        self.remote_cids.get(&PathId::ZERO).unwrap().active_seq()
6426    }
6427
6428    /// Returns the detected maximum udp payload size for the current path
6429    #[cfg(test)]
6430    pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6431        self.path_data(path_id).current_mtu()
6432    }
6433
6434    /// Triggers path validation on all paths
6435    #[cfg(test)]
6436    pub(crate) fn trigger_path_validation(&mut self) {
6437        for path in self.paths.values_mut() {
6438            path.data.pending_on_path_challenge = true;
6439        }
6440    }
6441
6442    /// Simulates a protocol violation error for test purposes.
6443    #[cfg(test)]
6444    pub fn simulate_protocol_violation(&mut self, now: Instant) {
6445        if !self.state.is_closed() {
6446            self.state
6447                .move_to_closed(TransportError::PROTOCOL_VIOLATION("simulated violation"));
6448            self.close_common();
6449            if !self.state.is_drained() {
6450                self.set_close_timer(now);
6451            }
6452            self.connection_close_pending = true;
6453        }
6454    }
6455
6456    /// Whether we have on-path 1-RTT data to send.
6457    ///
6458    /// This checks for frames that can only be sent in the data space (1-RTT):
6459    /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated.
6460    /// - Pending PATH_RESPONSE frames.
6461    /// - Pending data to send in STREAM frames.
6462    /// - Pending DATAGRAM frames to send.
6463    ///
6464    /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that
6465    /// may need to be sent.
6466    fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6467        let space_specific = self.paths.get(&path_id).is_some_and(|path| {
6468            path.data.pending_on_path_challenge || !path.data.path_responses.is_empty()
6469        });
6470
6471        // Stream control frames are checked in PacketSpace::can_send, only check data here.
6472        let other = self.streams.can_send_stream_data()
6473            || self
6474                .datagrams
6475                .outgoing
6476                .front()
6477                .is_some_and(|x| x.size(true) <= max_size);
6478
6479        // All `false` fields are set in PacketSpace::can_send.
6480        SendableFrames {
6481            acks: false,
6482            close: false,
6483            space_specific,
6484            other,
6485        }
6486    }
6487
6488    /// Terminate the connection instantly, without sending a close packet
6489    fn kill(&mut self, reason: ConnectionError) {
6490        self.close_common();
6491        self.state.move_to_drained(Some(reason));
6492        // move_to_drained checks that we were never in drained before, so we
6493        // never sent a `Drained` event before (it's illegal to send more events after drained).
6494        self.endpoint_events.push_back(EndpointEventInner::Drained);
6495    }
6496
6497    /// Storage size required for the largest packet that can be transmitted on all currently
6498    /// available paths
6499    ///
6500    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
6501    ///
6502    /// When multipath is enabled, this value is the minimum MTU across all available paths.
6503    pub fn current_mtu(&self) -> u16 {
6504        self.paths
6505            .iter()
6506            .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6507            .map(|(_path_id, path_state)| path_state.data.current_mtu())
6508            .min()
6509            .expect("There is always at least one available path")
6510    }
6511
6512    /// Size of non-frame data for a 1-RTT packet
6513    ///
6514    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
6515    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
6516    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
6517    /// latency and packet loss.
6518    fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6519        let pn_len = PacketNumber::new(
6520            pn,
6521            self.spaces[SpaceId::Data]
6522                .for_path(path)
6523                .largest_acked_packet
6524                .unwrap_or(0),
6525        )
6526        .len();
6527
6528        // 1 byte for flags
6529        1 + self
6530            .remote_cids
6531            .get(&path)
6532            .map(|cids| cids.active().len())
6533            .unwrap_or(20)      // Max CID len in QUIC v1
6534            + pn_len
6535            + self.tag_len_1rtt()
6536    }
6537
6538    fn predict_1rtt_overhead_no_pn(&self) -> usize {
6539        let pn_len = 4;
6540
6541        let cid_len = self
6542            .remote_cids
6543            .values()
6544            .map(|cids| cids.active().len())
6545            .max()
6546            .unwrap_or(20); // Max CID len in QUIC v1
6547
6548        // 1 byte for flags
6549        1 + cid_len + pn_len + self.tag_len_1rtt()
6550    }
6551
6552    fn tag_len_1rtt(&self) -> usize {
6553        // encryption_keys for Data space returns 1-RTT keys if available, otherwise 0-RTT keys
6554        let packet_crypto = self
6555            .crypto_state
6556            .encryption_keys(SpaceKind::Data, self.side.side())
6557            .map(|(_header, packet, _level)| packet);
6558        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
6559        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
6560        // but that would needlessly prevent sending datagrams during 0-RTT.
6561        packet_crypto.map_or(16, |x| x.tag_len())
6562    }
6563
6564    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
6565    fn on_path_validated(&mut self, path_id: PathId) {
6566        self.path_data_mut(path_id).validated = true;
6567        let ConnectionSide::Server { server_config } = &self.side else {
6568            return;
6569        };
6570        let network_path = self.path_data(path_id).network_path;
6571        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6572        new_tokens.clear();
6573        for _ in 0..server_config.validation_token.sent {
6574            new_tokens.push(network_path);
6575        }
6576    }
6577
6578    /// Handle new path status information: PATH_STATUS_AVAILABLE, PATH_STATUS_BACKUP
6579    fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6580        if let Some(path) = self.paths.get_mut(&path_id) {
6581            path.data.status.remote_update(status, status_seq_no);
6582        } else {
6583            debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6584        }
6585        self.events.push_back(
6586            PathEvent::RemoteStatus {
6587                id: path_id,
6588                status,
6589            }
6590            .into(),
6591        );
6592    }
6593
6594    /// Returns the maximum [`PathId`] to be used for sending in this connection.
6595    ///
6596    /// This is calculated as minimum between the local and remote's maximums when multipath is
6597    /// enabled, or `None` when disabled.
6598    ///
6599    /// For data that's received, we should use [`Self::local_max_path_id`] instead.
6600    /// The reasoning is that the remote might already have updated to its own newer
6601    /// [`Self::max_path_id`] after sending out a `MAX_PATH_ID` frame, but it got re-ordered.
6602    fn max_path_id(&self) -> Option<PathId> {
6603        if self.is_multipath_negotiated() {
6604            Some(self.remote_max_path_id.min(self.local_max_path_id))
6605        } else {
6606            None
6607        }
6608    }
6609
6610    /// Returns whether this connection has a socket that supports IPv6.
6611    ///
6612    /// TODO(matheus23): This is related to noq endpoint state's `ipv6` bool. We should move that info
6613    /// here instead of trying to hack around not knowing it exactly.
6614    fn is_ipv6(&self) -> bool {
6615        self.paths
6616            .values()
6617            .any(|p| p.data.network_path.remote.is_ipv6())
6618    }
6619
6620    /// Add addresses the local endpoint considers are reachable for nat traversal.
6621    pub fn add_nat_traversal_address(
6622        &mut self,
6623        address: SocketAddr,
6624    ) -> Result<(), n0_nat_traversal::Error> {
6625        if let Some(added) = self.n0_nat_traversal.add_local_address(address)? {
6626            self.spaces[SpaceId::Data].pending.add_address.insert(added);
6627        };
6628        Ok(())
6629    }
6630
6631    /// Removes an address the endpoing no longer considers reachable for nat traversal
6632    ///
6633    /// Addresses not present in the set will be silently ignored.
6634    pub fn remove_nat_traversal_address(
6635        &mut self,
6636        address: SocketAddr,
6637    ) -> Result<(), n0_nat_traversal::Error> {
6638        if let Some(removed) = self.n0_nat_traversal.remove_local_address(address)? {
6639            self.spaces[SpaceId::Data]
6640                .pending
6641                .remove_address
6642                .insert(removed);
6643        }
6644        Ok(())
6645    }
6646
6647    /// Get the current local nat traversal addresses
6648    pub fn get_local_nat_traversal_addresses(
6649        &self,
6650    ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6651        self.n0_nat_traversal.get_local_nat_traversal_addresses()
6652    }
6653
6654    /// Get the currently advertised nat traversal addresses by the server
6655    pub fn get_remote_nat_traversal_addresses(
6656        &self,
6657    ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6658        Ok(self
6659            .n0_nat_traversal
6660            .client_side()?
6661            .get_remote_nat_traversal_addresses())
6662    }
6663
6664    /// Attempts to open a path for nat traversal.
6665    ///
6666    /// On success returns the [`PathId`] and remote address of the path.
6667    fn open_nat_traversal_path(
6668        &mut self,
6669        now: Instant,
6670        ip_port: (IpAddr, u16),
6671    ) -> Result<Option<(PathId, SocketAddr)>, PathError> {
6672        let remote = ip_port.into();
6673        // TODO(matheus23): Probe the correct 4-tuple, instead of only a remote address?
6674        // By specifying None for `local_ip`, we do two things: 1. open_path_ensure won't
6675        // generate two paths to the same remote and 2. we let the OS choose which
6676        // interface to use for sending on that path.
6677        let network_path = FourTuple {
6678            remote,
6679            local_ip: None,
6680        };
6681        match self.open_path_ensure(network_path, PathStatus::Backup, now) {
6682            Ok((path_id, path_was_known)) => {
6683                if path_was_known {
6684                    trace!(%path_id, %remote, "nat traversal: path existed for remote");
6685                }
6686                Ok(Some((path_id, remote)))
6687            }
6688            Err(e) => {
6689                debug!(%remote, %e, "nat traversal: failed to probe remote");
6690                Err(e)
6691            }
6692        }
6693    }
6694
6695    /// Initiates a new nat traversal round
6696    ///
6697    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
6698    /// frames, and initiating probing of the known remote addresses. When a new round is
6699    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
6700    ///
6701    /// Returns the server addresses that are now being probed.
6702    /// If addresses fail due to spurious errors, these might succeed later and not be returned in
6703    /// this set.
6704    pub fn initiate_nat_traversal_round(
6705        &mut self,
6706        now: Instant,
6707    ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6708        if self.state.is_closed() {
6709            return Err(n0_nat_traversal::Error::Closed);
6710        }
6711
6712        let ipv6 = self.is_ipv6();
6713        let client_state = self.n0_nat_traversal.client_side_mut()?;
6714        let n0_nat_traversal::NatTraversalRound {
6715            new_round,
6716            reach_out_at,
6717            addresses_to_probe,
6718            prev_round_path_ids,
6719        } = client_state.initiate_nat_traversal_round(ipv6)?;
6720
6721        trace!(%new_round, reach_out=reach_out_at.len(), to_probe=addresses_to_probe.len(),
6722            "initiating nat traversal round");
6723
6724        self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6725
6726        for path_id in prev_round_path_ids {
6727            let Some(path) = self.path(path_id) else {
6728                continue;
6729            };
6730            let ip = path.network_path.remote.ip();
6731            let port = path.network_path.remote.port();
6732
6733            // We only close paths that aren't validated (thus are working) that we opened
6734            // in a previous round.
6735            // And we only close paths that we don't want to probe anyways.
6736            if !addresses_to_probe
6737                .iter()
6738                .any(|(_, probe)| *probe == (ip, port))
6739                && !path.validated
6740                && !self.abandoned_paths.contains(&path_id)
6741            {
6742                trace!(%path_id, "closing path from previous round");
6743                let _ =
6744                    self.close_path_inner(now, path_id, PathAbandonReason::NatTraversalRoundEnded);
6745            }
6746        }
6747
6748        let mut err = None;
6749
6750        let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6751        let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6752
6753        for (id, address) in addresses_to_probe {
6754            match self.open_nat_traversal_path(now, address) {
6755                Ok(None) => {}
6756                Ok(Some((path_id, remote))) => {
6757                    path_ids.push(path_id);
6758                    probed_addresses.push(remote);
6759                }
6760                Err(e) => {
6761                    self.n0_nat_traversal
6762                        .client_side_mut()
6763                        .expect("validated")
6764                        .report_in_continuation(id, e);
6765                    err.get_or_insert(e);
6766                }
6767            }
6768        }
6769
6770        if let Some(err) = err {
6771            // We failed to probe any addresses, bail out
6772            if probed_addresses.is_empty() {
6773                return Err(n0_nat_traversal::Error::Multipath(err));
6774            }
6775        }
6776
6777        self.n0_nat_traversal
6778            .client_side_mut()
6779            .expect("connection side validated")
6780            .set_round_path_ids(path_ids);
6781
6782        Ok(probed_addresses)
6783    }
6784
6785    /// Attempts to continue a nat traversal round by trying to open paths for pending client probes.
6786    ///
6787    /// If there was nothing to do, it returns `None`. Otherwise it returns whether the path was
6788    /// successfully open.
6789    fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6790        let ipv6 = self.is_ipv6();
6791        let client_state = self.n0_nat_traversal.client_side_mut().ok()?;
6792        let (id, address) = client_state.continue_nat_traversal_round(ipv6)?;
6793        let open_result = self.open_nat_traversal_path(now, address);
6794        let client_state = self.n0_nat_traversal.client_side_mut().expect("validated");
6795        match open_result {
6796            Ok(None) => Some(true),
6797            Ok(Some((path_id, _remote))) => {
6798                client_state.add_round_path_id(path_id);
6799                Some(true)
6800            }
6801            Err(e) => {
6802                client_state.report_in_continuation(id, e);
6803                Some(false)
6804            }
6805        }
6806    }
6807}
6808
6809impl fmt::Debug for Connection {
6810    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
6811        f.debug_struct("Connection")
6812            .field("handshake_cid", &self.handshake_cid)
6813            .finish()
6814    }
6815}
6816
6817/// Hints when the caller identifies a network change.
6818pub trait NetworkChangeHint: std::fmt::Debug + 'static {
6819    /// Inform the connection if a path may recover after a network change.
6820    ///
6821    /// After network changes, paths may not be recoverable. In this case, waiting for the path to
6822    /// become idle may take longer than what is desirable. If [`Self::is_path_recoverable`]
6823    /// returns `false`, a multipath-enabled, client-side connection will establish a new path to
6824    /// the same remote, closing the current one, instead of migrating the path.
6825    ///
6826    /// Paths that are deemed recoverable will simply be sent a PING for a liveness check.
6827    fn is_path_recoverable(&self, path_id: PathId, network_path: FourTuple) -> bool;
6828}
6829
6830/// Return value for [`Connection::poll_transmit_on_path`].
6831#[derive(Debug)]
6832enum PollPathStatus {
6833    /// Nothing to send on the path, nothing was written into the [`TransmitBuf`].
6834    NothingToSend {
6835        /// If true there was data to send but congestion control did not allow so.
6836        congestion_blocked: bool,
6837    },
6838    /// The transmit is ready to be sent.
6839    Send(Transmit),
6840}
6841
6842/// Return value for [`Connection::poll_transmit_path_space`].
6843#[derive(Debug)]
6844enum PollPathSpaceStatus {
6845    /// Nothing to send in the space, nothing was written into the [`TransmitBuf`].
6846    NothingToSend {
6847        /// If true there was data to send but congestion control did not allow so.
6848        congestion_blocked: bool,
6849    },
6850    /// One or more packets have been written into the [`TransmitBuf`].
6851    WrotePacket {
6852        /// The highest packet number.
6853        last_packet_number: u64,
6854        /// Whether to pad an already started datagram in the next packet.
6855        ///
6856        /// When packets in Initial, 0-RTT or Handshake packet do not fill the entire
6857        /// datagram they may decide to coalesce with the next packet from a higher
6858        /// encryption level on the same path. But the earlier packet may require specific
6859        /// size requirements for the datagram they are sent in.
6860        ///
6861        /// If a space did not complete the datagram, they use this to request the correct
6862        /// padding in the final packet of the datagram so that the final datagram will have
6863        /// the correct size.
6864        ///
6865        /// If a space did fill an entire datagram, it leaves this to the default of
6866        /// [`PadDatagram::No`].
6867        pad_datagram: PadDatagram,
6868    },
6869    /// Send the contents of the transmit immediately.
6870    ///
6871    /// Packets were written and the GSO batch must end now, regardless from whether higher
6872    /// spaces still have frames to write. This is used when the last datagram written would
6873    /// require too much padding to continue a GSO batch, which would waste space on the
6874    /// wire.
6875    Send {
6876        /// The highest packet number written into the transmit.
6877        last_packet_number: u64,
6878    },
6879}
6880
6881/// Information used to decide what frames to schedule into which packets.
6882///
6883/// Primarily used by [`Connection::poll_transmit_on_path`] and the functions that help
6884/// building packets for it: [`Connection::poll_transmit_path_space`] and
6885/// [`Connection::populate_packet`].
6886#[derive(Debug, Copy, Clone)]
6887struct PathSchedulingInfo {
6888    /// Whether the path is abandoned.
6889    ///
6890    /// Note that a path that is abandoned but still has CIDs can still send a packet. After
6891    /// sending that packet the CIDs issued by the remote have to be considered retired as
6892    /// well.
6893    abandoned: bool,
6894    /// Whether the path may send [`SpaceKind::Data`] frames.
6895    ///
6896    /// Some paths should only send frames from [`SendableFrames::space_specific`]. All other
6897    /// frames are essentially frames that can be sent on any [`SpaceKind::Data`] space. For
6898    /// those we want to respect packet scheduling rules however.
6899    ///
6900    /// Roughly speaking data frames are only sent on spaces that have CIDs, are not
6901    /// abandoned and have no *better* spaces. However see to comments where this is
6902    /// populated for the exact packet scheduling implementation.
6903    ///
6904    /// This essentially marks this paths as the best validated space ID. Except during
6905    /// the handshake in which case it does not need to be validated. Several paths could be
6906    /// equally good and all have this set to `true`, in that case packet scheduling can
6907    /// choose which path to use. Currently it chooses the lowest path that is not
6908    /// congestion blocked.
6909    ///
6910    /// Note that once in the closed or draining states this will never be true.
6911    may_send_data: bool,
6912    /// Whether the path may send a CONNECTION_CLOSE frame.
6913    ///
6914    /// This essentially marks this path as the best validated space ID with a fallback
6915    /// to unvalidated spaces if there are no validated spaces. Like for
6916    /// [`Self::may_send_data`] other paths could be equally good.
6917    may_send_close: bool,
6918}
6919
6920#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6921enum PathBlocked {
6922    No,
6923    AntiAmplification,
6924    Congestion,
6925    Pacing,
6926}
6927
6928/// Fields of `Connection` specific to it being client-side or server-side
6929enum ConnectionSide {
6930    Client {
6931        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
6932        token: Bytes,
6933        token_store: Arc<dyn TokenStore>,
6934        server_name: String,
6935    },
6936    Server {
6937        server_config: Arc<ServerConfig>,
6938    },
6939}
6940
6941impl ConnectionSide {
6942    fn remote_may_migrate(&self, state: &State) -> bool {
6943        match self {
6944            Self::Server { server_config } => server_config.migration,
6945            Self::Client { .. } => {
6946                if let Some(hs) = state.as_handshake() {
6947                    hs.allow_server_migration
6948                } else {
6949                    false
6950                }
6951            }
6952        }
6953    }
6954
6955    fn is_client(&self) -> bool {
6956        self.side().is_client()
6957    }
6958
6959    fn is_server(&self) -> bool {
6960        self.side().is_server()
6961    }
6962
6963    fn side(&self) -> Side {
6964        match *self {
6965            Self::Client { .. } => Side::Client,
6966            Self::Server { .. } => Side::Server,
6967        }
6968    }
6969}
6970
6971impl From<SideArgs> for ConnectionSide {
6972    fn from(side: SideArgs) -> Self {
6973        match side {
6974            SideArgs::Client {
6975                token_store,
6976                server_name,
6977            } => Self::Client {
6978                token: token_store.take(&server_name).unwrap_or_default(),
6979                token_store,
6980                server_name,
6981            },
6982            SideArgs::Server {
6983                server_config,
6984                pref_addr_cid: _,
6985                path_validated: _,
6986            } => Self::Server { server_config },
6987        }
6988    }
6989}
6990
6991/// Parameters to `Connection::new` specific to it being client-side or server-side
6992pub(crate) enum SideArgs {
6993    Client {
6994        token_store: Arc<dyn TokenStore>,
6995        server_name: String,
6996    },
6997    Server {
6998        server_config: Arc<ServerConfig>,
6999        pref_addr_cid: Option<ConnectionId>,
7000        path_validated: bool,
7001    },
7002}
7003
7004impl SideArgs {
7005    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
7006        match *self {
7007            Self::Client { .. } => None,
7008            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
7009        }
7010    }
7011
7012    pub(crate) fn path_validated(&self) -> bool {
7013        match *self {
7014            Self::Client { .. } => true,
7015            Self::Server { path_validated, .. } => path_validated,
7016        }
7017    }
7018
7019    pub(crate) fn side(&self) -> Side {
7020        match *self {
7021            Self::Client { .. } => Side::Client,
7022            Self::Server { .. } => Side::Server,
7023        }
7024    }
7025}
7026
7027/// Reasons why a connection might be lost
7028#[derive(Debug, Error, Clone, PartialEq, Eq)]
7029pub enum ConnectionError {
7030    /// The peer doesn't implement any supported version
7031    #[error("peer doesn't implement any supported version")]
7032    VersionMismatch,
7033    /// The peer violated the QUIC specification as understood by this implementation
7034    #[error(transparent)]
7035    TransportError(#[from] TransportError),
7036    /// The peer's QUIC stack aborted the connection automatically
7037    #[error("aborted by peer: {0}")]
7038    ConnectionClosed(frame::ConnectionClose),
7039    /// The peer closed the connection
7040    #[error("closed by peer: {0}")]
7041    ApplicationClosed(frame::ApplicationClose),
7042    /// The peer is unable to continue processing this connection, usually due to having restarted
7043    #[error("reset by peer")]
7044    Reset,
7045    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
7046    ///
7047    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
7048    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
7049    /// and [`TransportConfig::keep_alive_interval()`].
7050    #[error("timed out")]
7051    TimedOut,
7052    /// The local application closed the connection
7053    #[error("closed")]
7054    LocallyClosed,
7055    /// The connection could not be created because not enough of the CID space is available
7056    ///
7057    /// Try using longer connection IDs.
7058    #[error("CIDs exhausted")]
7059    CidsExhausted,
7060}
7061
7062impl From<Close> for ConnectionError {
7063    fn from(x: Close) -> Self {
7064        match x {
7065            Close::Connection(reason) => Self::ConnectionClosed(reason),
7066            Close::Application(reason) => Self::ApplicationClosed(reason),
7067        }
7068    }
7069}
7070
7071// For compatibility with API consumers
7072impl From<ConnectionError> for io::Error {
7073    fn from(x: ConnectionError) -> Self {
7074        use ConnectionError::*;
7075        let kind = match x {
7076            TimedOut => io::ErrorKind::TimedOut,
7077            Reset => io::ErrorKind::ConnectionReset,
7078            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
7079            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
7080                io::ErrorKind::Other
7081            }
7082        };
7083        Self::new(kind, x)
7084    }
7085}
7086
7087/// Errors that might trigger a path being closed
7088// TODO(@divma): maybe needs to be reworked based on what we want to do with the public API
7089#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
7090pub enum PathError {
7091    /// The extension was not negotiated with the peer
7092    #[error("multipath extension not negotiated")]
7093    MultipathNotNegotiated,
7094    /// Paths can only be opened client-side
7095    #[error("the server side may not open a path")]
7096    ServerSideNotAllowed,
7097    /// Current limits do not allow us to open more paths
7098    #[error("maximum number of concurrent paths reached")]
7099    MaxPathIdReached,
7100    /// No remote CIDs available to open a new path
7101    #[error("remoted CIDs exhausted")]
7102    RemoteCidsExhausted,
7103    /// Path could not be validated and will be abandoned
7104    #[error("path validation failed")]
7105    ValidationFailed,
7106    /// The remote address for the path is not supported by the endpoint
7107    #[error("invalid remote address")]
7108    InvalidRemoteAddress(SocketAddr),
7109}
7110
7111/// Errors triggered when abandoning a path
7112#[derive(Debug, Error, Clone, Eq, PartialEq)]
7113pub enum ClosePathError {
7114    /// Multipath is not negotiated
7115    #[error("Multipath extension not negotiated")]
7116    MultipathNotNegotiated,
7117    /// The path is already closed or was never opened
7118    #[error("closed path")]
7119    ClosedPath,
7120    /// This is the last path, which can not be abandoned
7121    #[error("last open path")]
7122    LastOpenPath,
7123}
7124
7125/// Error when the multipath extension was not negotiated, but attempted to be used.
7126#[derive(Debug, Error, Clone, Copy)]
7127#[error("Multipath extension not negotiated")]
7128pub struct MultipathNotNegotiated {
7129    _private: (),
7130}
7131
7132/// Events of interest to the application
7133#[derive(Debug)]
7134pub enum Event {
7135    /// The connection's handshake data is ready
7136    HandshakeDataReady,
7137    /// The connection was successfully established
7138    Connected,
7139    /// The TLS handshake was confirmed
7140    HandshakeConfirmed,
7141    /// The connection was lost
7142    ///
7143    /// Emitted if the peer closes the connection or an error is encountered.
7144    ConnectionLost {
7145        /// Reason that the connection was closed
7146        reason: ConnectionError,
7147    },
7148    /// Stream events
7149    Stream(StreamEvent),
7150    /// One or more application datagrams have been received
7151    DatagramReceived,
7152    /// One or more application datagrams have been sent after blocking
7153    DatagramsUnblocked,
7154    /// (Multi)Path events
7155    Path(PathEvent),
7156    /// n0's nat traversal events
7157    NatTraversal(n0_nat_traversal::Event),
7158}
7159
7160impl From<PathEvent> for Event {
7161    fn from(source: PathEvent) -> Self {
7162        Self::Path(source)
7163    }
7164}
7165
7166fn get_max_ack_delay(params: &TransportParameters) -> Duration {
7167    Duration::from_micros(params.max_ack_delay.0 * 1000)
7168}
7169
7170// Prevents overflow and improves behavior in extreme circumstances
7171const MAX_BACKOFF_EXPONENT: u32 = 16;
7172
7173/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
7174///
7175/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
7176/// plus some space for frames. We only care about handshake headers because short header packets
7177/// necessarily have smaller headers, and initial packets are only ever the first packet in a
7178/// datagram (because we coalesce in ascending packet space order and the only reason to split a
7179/// packet is when packet space changes).
7180const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
7181
7182/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
7183///
7184/// Excludes packet-type-specific fields such as packet number or Initial token
7185// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
7186// scid len + scid + length + pn
7187const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
7188    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
7189
7190#[derive(Default)]
7191struct SentFrames {
7192    retransmits: ThinRetransmits,
7193    /// The packet number of the largest acknowledged packet for each path
7194    largest_acked: FxHashMap<PathId, u64>,
7195    stream_frames: StreamMetaVec,
7196    /// Whether the packet contains non-retransmittable frames (like datagrams)
7197    non_retransmits: bool,
7198    /// If the datagram containing these frames should be padded to the min MTU
7199    requires_padding: bool,
7200}
7201
7202impl SentFrames {
7203    /// Returns whether the packet contains only ACKs
7204    fn is_ack_only(&self, streams: &StreamsState) -> bool {
7205        !self.largest_acked.is_empty()
7206            && !self.non_retransmits
7207            && self.stream_frames.is_empty()
7208            && self.retransmits.is_empty(streams)
7209    }
7210
7211    fn retransmits_mut(&mut self) -> &mut Retransmits {
7212        self.retransmits.get_or_create()
7213    }
7214
7215    fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
7216        use frame::EncodableFrame::*;
7217        match frame {
7218            PathAck(path_ack_encoder) => {
7219                if let Some(max) = path_ack_encoder.ranges.max() {
7220                    self.largest_acked.insert(path_ack_encoder.path_id, max);
7221                }
7222            }
7223            Ack(ack_encoder) => {
7224                if let Some(max) = ack_encoder.ranges.max() {
7225                    self.largest_acked.insert(PathId::ZERO, max);
7226                }
7227            }
7228            Close(_) => { /* non retransmittable, but after this we don't really care */ }
7229            PathResponse(_) => self.non_retransmits = true,
7230            HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
7231            ReachOut(frame::ReachOut { round, ip, port }) => {
7232                let (recorded_round, reach_outs) = self
7233                    .retransmits_mut()
7234                    .reach_out
7235                    .get_or_insert_with(|| (round, FxHashSet::default()));
7236                // Only record reach outs for the current round or a newer than the recorded one.
7237                if *recorded_round == round {
7238                    // Same round, simply append.
7239                    reach_outs.insert((ip, port));
7240                } else if *recorded_round < round {
7241                    // New round.
7242                    *recorded_round = round;
7243                    reach_outs.drain();
7244                    reach_outs.insert((ip, port));
7245                } else {
7246                    // ignore old reach out that was sent
7247                }
7248            }
7249
7250            ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
7251            Ping(_) => self.non_retransmits = true,
7252            ImmediateAck(_) => self.non_retransmits = true,
7253            AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
7254            PathChallenge(_) => self.non_retransmits = true,
7255            Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
7256            PathAbandon(path_abandon) => {
7257                self.retransmits_mut()
7258                    .path_abandon
7259                    .entry(path_abandon.path_id)
7260                    .or_insert(path_abandon.error_code);
7261            }
7262            PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
7263            | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
7264                self.retransmits_mut().path_status.insert(path_id);
7265            }
7266            MaxPathId(_) => self.retransmits_mut().max_path_id = true,
7267            PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
7268            PathCidsBlocked(path_cids_blocked) => {
7269                self.retransmits_mut()
7270                    .path_cids_blocked
7271                    .insert(path_cids_blocked.path_id);
7272            }
7273            ResetStream(reset) => self
7274                .retransmits_mut()
7275                .reset_stream
7276                .push((reset.id, reset.error_code)),
7277            StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
7278            NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
7279            RetireConnectionId(retire_cid) => self
7280                .retransmits_mut()
7281                .retire_cids
7282                .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
7283            Datagram(_) => self.non_retransmits = true,
7284            NewToken(_) => {}
7285            AddAddress(add_address) => {
7286                self.retransmits_mut().add_address.insert(add_address);
7287            }
7288            RemoveAddress(remove_address) => {
7289                self.retransmits_mut().remove_address.insert(remove_address);
7290            }
7291            StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
7292            MaxData(_) => self.retransmits_mut().max_data = true,
7293            MaxStreamData(max) => {
7294                self.retransmits_mut().max_stream_data.insert(max.id);
7295            }
7296            MaxStreams(max_streams) => {
7297                self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
7298            }
7299        }
7300    }
7301}
7302
7303/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
7304///
7305/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
7306///
7307/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
7308///
7309/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
7310fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
7311    match (x, y) {
7312        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
7313        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
7314        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
7315        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
7316    }
7317}
7318
7319#[cfg(test)]
7320mod tests {
7321    use super::*;
7322
7323    #[test]
7324    fn negotiate_max_idle_timeout_commutative() {
7325        let test_params = [
7326            (None, None, None),
7327            (None, Some(VarInt(0)), None),
7328            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7329            (Some(VarInt(0)), Some(VarInt(0)), None),
7330            (
7331                Some(VarInt(2)),
7332                Some(VarInt(0)),
7333                Some(Duration::from_millis(2)),
7334            ),
7335            (
7336                Some(VarInt(1)),
7337                Some(VarInt(4)),
7338                Some(Duration::from_millis(1)),
7339            ),
7340        ];
7341
7342        for (left, right, result) in test_params {
7343            assert_eq!(negotiate_max_idle_timeout(left, right), result);
7344            assert_eq!(negotiate_max_idle_timeout(right, left), result);
7345        }
7346    }
7347}