noq_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque, btree_map},
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    num::{NonZeroU32, NonZeroUsize},
8    sync::Arc,
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13
14use rand::{RngExt, SeedableRng, rngs::StdRng};
15use rustc_hash::{FxHashMap, FxHashSet};
16use thiserror::Error;
17use tracing::{debug, error, trace, trace_span, warn};
18
19use crate::{
20    Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
21    MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
22    TransportError, TransportErrorCode, VarInt,
23    cid_generator::ConnectionIdGenerator,
24    cid_queue::CidQueue,
25    config::{ServerConfig, TransportConfig},
26    congestion::Controller,
27    connection::{
28        qlog::{QlogRecvPacket, QlogSink},
29        spaces::LostPacket,
30        timer::{ConnTimer, PathTimer},
31    },
32    crypto::{self, Keys},
33    frame::{
34        self, Close, DataBlocked, Datagram, FrameStruct, NewToken, ObservedAddr, StreamDataBlocked,
35        StreamsBlocked,
36    },
37    n0_nat_traversal,
38    packet::{
39        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40        PacketNumber, PartialDecode, SpaceId,
41    },
42    range_set::ArrayRangeSet,
43    shared::{
44        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45        EndpointEvent, EndpointEventInner,
46    },
47    token::{ResetToken, Token, TokenPayload},
48    transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54mod assembler;
55pub use assembler::Chunk;
56
57mod cid_state;
58use cid_state::CidState;
59
60mod datagrams;
61use datagrams::DatagramState;
62pub use datagrams::{Datagrams, SendDatagramError};
63
64mod mtud;
65mod pacing;
66
67mod packet_builder;
68use packet_builder::{PacketBuilder, PadDatagram};
69
70mod packet_crypto;
71use packet_crypto::CryptoState;
72pub(crate) use packet_crypto::EncryptionLevel;
73
74mod paths;
75pub use paths::{
76    ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError,
77};
78use paths::{PathData, PathState};
79
80pub(crate) mod qlog;
81pub(crate) mod send_buffer;
82
83mod spaces;
84#[cfg(fuzzing)]
85pub use spaces::Retransmits;
86#[cfg(not(fuzzing))]
87use spaces::Retransmits;
88pub(crate) use spaces::SpaceKind;
89use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
90
91mod stats;
92pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
93
94mod streams;
95#[cfg(fuzzing)]
96pub use streams::StreamsState;
97#[cfg(not(fuzzing))]
98use streams::StreamsState;
99pub use streams::{
100    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
101    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
102};
103
104mod timer;
105use timer::{Timer, TimerTable};
106
107mod transmit_buf;
108use transmit_buf::TransmitBuf;
109
110mod state;
111
112#[cfg(not(fuzzing))]
113use state::State;
114#[cfg(fuzzing)]
115pub use state::State;
116use state::StateType;
117
118/// Protocol state and logic for a single QUIC connection
119///
120/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
121/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
122/// expects timeouts through various methods. A number of simple getter methods are exposed
123/// to allow callers to inspect some of the connection state.
124///
125/// `Connection` has roughly 4 types of methods:
126///
127/// - A. Simple getters, taking `&self`
128/// - B. Handlers for incoming events from the network or system, named `handle_*`.
129/// - C. State machine mutators, for incoming commands from the application. For convenience we
130///   refer to this as "performing I/O" below, however as per the design of this library none of the
131///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
132///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
133/// - D. Polling functions for outgoing events or actions for the caller to
134///   take, named `poll_*`.
135///
136/// The simplest way to use this API correctly is to call (B) and (C) whenever
137/// appropriate, then after each of those calls, as soon as feasible call all
138/// polling methods (D) and deal with their outputs appropriately, e.g. by
139/// passing it to the application or by making a system-level I/O call. You
140/// should call the polling functions in this order:
141///
142/// 1. [`poll_transmit`](Self::poll_transmit)
143/// 2. [`poll_timeout`](Self::poll_timeout)
144/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
145/// 4. [`poll`](Self::poll)
146///
147/// Currently the only actual dependency is from (2) to (1), however additional
148/// dependencies may be added in future, so the above order is recommended.
149///
150/// (A) may be called whenever desired.
151///
152/// Care should be made to ensure that the input events represent monotonically
153/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
154/// with events of the same [`Instant`] may be interleaved in any order with a
155/// call to [`handle_event`](Self::handle_event) at that same instant; however
156/// events or timeouts with different instants must not be interleaved.
157pub struct Connection {
158    endpoint_config: Arc<EndpointConfig>,
159    config: Arc<TransportConfig>,
160    rng: StdRng,
161    /// Consolidated cryptographic state
162    crypto_state: CryptoState,
163    /// The CID we initially chose, for use during the handshake
164    handshake_cid: ConnectionId,
165    /// The CID the peer initially chose, for use during the handshake
166    remote_handshake_cid: ConnectionId,
167    /// The [`PathData`] for each path
168    ///
169    /// This needs to be ordered because [`Connection::poll_transmit`] needs to
170    /// deterministically select the next PathId to send on.
171    // TODO(flub): well does it really? But deterministic is nice for now.
172    paths: BTreeMap<PathId, PathState>,
173    /// Counter to uniquely identify every [`PathData`] created in this connection.
174    ///
175    /// Each [`PathData`] gets a [`PathData::generation`] that is unique among all
176    /// [`PathData`]s created in the lifetime of this connection. This helps identify the
177    /// correct path when RFC9000-style migrations happen, even when they are
178    /// aborted.
179    ///
180    /// Multipath does not change this, each path can also undergo RFC9000-style
181    /// migrations. So a single multipath path ID could see several [`PathData`]s each with
182    /// their unique [`PathData::generation].
183    path_generation_counter: u64,
184    /// Whether MTU detection is supported in this environment
185    allow_mtud: bool,
186    state: State,
187    side: ConnectionSide,
188    /// Transport parameters set by the peer
189    peer_params: TransportParameters,
190    /// Source ConnectionId of the first packet received from the peer
191    original_remote_cid: ConnectionId,
192    /// Destination ConnectionId sent by the client on the first Initial
193    initial_dst_cid: ConnectionId,
194    /// The value that the server included in the Source Connection ID field of a Retry packet, if
195    /// one was received
196    retry_src_cid: Option<ConnectionId>,
197    /// Events returned by [`Connection::poll`]
198    events: VecDeque<Event>,
199    endpoint_events: VecDeque<EndpointEventInner>,
200    /// Whether the spin bit is in use for this connection
201    spin_enabled: bool,
202    /// Outgoing spin bit state
203    spin: bool,
204    /// Packet number spaces: initial, handshake, 1-RTT
205    spaces: [PacketSpace; 3],
206    /// Highest usable packet space.
207    highest_space: SpaceKind,
208    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
209    permit_idle_reset: bool,
210    /// Negotiated idle timeout
211    idle_timeout: Option<Duration>,
212    timers: TimerTable,
213    /// Number of packets received which could not be authenticated
214    authentication_failures: u64,
215
216    //
217    // Queued non-retransmittable 1-RTT data
218    //
219    /// If the CONNECTION_CLOSE frame needs to be sent
220    connection_close_pending: bool,
221
222    //
223    // ACK frequency
224    //
225    ack_frequency: AckFrequencyState,
226
227    //
228    // Congestion Control
229    //
230    /// Whether the most recently received packet had an ECN codepoint set
231    receiving_ecn: bool,
232    /// Number of packets authenticated
233    total_authed_packets: u64,
234    /// Whether the last `poll_transmit` call yielded no data because there was
235    /// no outgoing application data.
236    app_limited: bool,
237
238    //
239    // ObservedAddr
240    //
241    /// Sequence number for the next observed address frame sent to the peer.
242    next_observed_addr_seq_no: VarInt,
243
244    streams: StreamsState,
245    /// Active and surplus CIDs issued by the remote, for future use on new paths.
246    ///
247    /// These are given out before multiple paths exist, also for paths that will never
248    /// exist.  So if multipath is supported the number of paths here will be higher than
249    /// the actual number of paths in use.
250    remote_cids: FxHashMap<PathId, CidQueue>,
251    /// Attributes of CIDs generated by local endpoint
252    ///
253    /// Any path that is allowed to be opened is present in this map, as well as the already
254    /// opened paths. However since CIDs are issued async by the endpoint driver via
255    /// connection events it can not be used to know if CIDs have been issued for a path or
256    /// not. See [`Connection::max_path_id_with_cids`] for this.
257    local_cid_state: FxHashMap<PathId, CidState>,
258    /// State of the unreliable datagram extension
259    datagrams: DatagramState,
260    /// Connection level statistics
261    stats: ConnectionStats,
262    /// Path level statistics
263    path_stats: FxHashMap<PathId, PathStats>,
264    /// QUIC version used for the connection.
265    version: u32,
266
267    //
268    // Multipath
269    //
270    /// Maximum number of concurrent paths
271    ///
272    /// Initially set from the [`TransportConfig::max_concurrent_multipath_paths`]. Even
273    /// when multipath is disabled this will be set to 1, it is not used in that case
274    /// though.
275    max_concurrent_paths: NonZeroU32,
276    /// Local maximum [`PathId`] to be used
277    ///
278    /// This is initially set to [`TransportConfig::get_initial_max_path_id`] when multipath
279    /// is negotiated, or to [`PathId::ZERO`] otherwise. This is essentially the value of
280    /// the highest MAX_PATH_ID frame sent.
281    ///
282    /// Any path with an ID equal or below this [`PathId`] is either:
283    ///
284    /// - Abandoned, if it is also in [`Connection::abandoned_paths`].
285    /// - Open, in this case it is present in [`Connection::paths`]
286    /// - Not yet opened, if it is in neither of these two places.
287    ///
288    /// Note that for not-yet-open there may or may not be any CIDs issued. See
289    /// [`Connection::max_path_id_with_cids`].
290    local_max_path_id: PathId,
291    /// Remote's maximum [`PathId`] to be used
292    ///
293    /// This is initially set to the peer's [`TransportParameters::initial_max_path_id`] when
294    /// multipath is negotiated, or to [`PathId::ZERO`] otherwise. A peer may increase this limit
295    /// by sending [`Frame::MaxPathId`] frames.
296    remote_max_path_id: PathId,
297    /// The greatest [`PathId`] we have issued CIDs for
298    ///
299    /// CIDs are only issued for `min(local_max_path_id, remote_max_path_id)`. It is not
300    /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
301    /// since they are issued asynchronously by the endpoint driver.
302    max_path_id_with_cids: PathId,
303    /// The paths already abandoned
304    ///
305    /// They may still have some state left in [`Connection::paths`] or
306    /// [`Connection::local_cid_state`] since some of this has to be kept around for some
307    /// time after a path is abandoned.
308    // TODO(flub): Make this a more efficient data structure.  Like ranges of abandoned
309    //    paths.  Or a set together with a minimum.  Or something.
310    abandoned_paths: FxHashSet<PathId>,
311
312    /// State for n0's (<https://n0.computer>) nat traversal protocol.
313    n0_nat_traversal: n0_nat_traversal::State,
314    qlog: QlogSink,
315}
316
317impl Connection {
318    pub(crate) fn new(
319        endpoint_config: Arc<EndpointConfig>,
320        config: Arc<TransportConfig>,
321        init_cid: ConnectionId,
322        local_cid: ConnectionId,
323        remote_cid: ConnectionId,
324        network_path: FourTuple,
325        crypto: Box<dyn crypto::Session>,
326        cid_gen: &dyn ConnectionIdGenerator,
327        now: Instant,
328        version: u32,
329        allow_mtud: bool,
330        rng_seed: [u8; 32],
331        side_args: SideArgs,
332        qlog: QlogSink,
333    ) -> Self {
334        let pref_addr_cid = side_args.pref_addr_cid();
335        let path_validated = side_args.path_validated();
336        let connection_side = ConnectionSide::from(side_args);
337        let side = connection_side.side();
338        let mut rng = StdRng::from_seed(rng_seed);
339        let initial_space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
340        let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
341        #[cfg(test)]
342        let data_space = match config.deterministic_packet_numbers {
343            true => PacketSpace::new_deterministic(now, SpaceId::Data),
344            false => PacketSpace::new(now, SpaceId::Data, &mut rng),
345        };
346        #[cfg(not(test))]
347        let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
348        let state = State::handshake(state::Handshake {
349            remote_cid_set: side.is_server(),
350            expected_token: Bytes::new(),
351            client_hello: None,
352            allow_server_migration: side.is_client(),
353        });
354        let local_cid_state = FxHashMap::from_iter([(
355            PathId::ZERO,
356            CidState::new(
357                cid_gen.cid_len(),
358                cid_gen.cid_lifetime(),
359                now,
360                if pref_addr_cid.is_some() { 2 } else { 1 },
361            ),
362        )]);
363
364        let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
365        path.open_status = paths::OpenStatus::Informed;
366        let mut this = Self {
367            endpoint_config,
368            crypto_state: CryptoState::new(crypto, init_cid, side, &mut rng),
369            handshake_cid: local_cid,
370            remote_handshake_cid: remote_cid,
371            local_cid_state,
372            paths: BTreeMap::from_iter([(
373                PathId::ZERO,
374                PathState {
375                    data: path,
376                    prev: None,
377                },
378            )]),
379            path_generation_counter: 0,
380            allow_mtud,
381            state,
382            side: connection_side,
383            peer_params: TransportParameters::default(),
384            original_remote_cid: remote_cid,
385            initial_dst_cid: init_cid,
386            retry_src_cid: None,
387            events: VecDeque::new(),
388            endpoint_events: VecDeque::new(),
389            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
390            spin: false,
391            spaces: [initial_space, handshake_space, data_space],
392            highest_space: SpaceKind::Initial,
393            permit_idle_reset: true,
394            idle_timeout: match config.max_idle_timeout {
395                None | Some(VarInt(0)) => None,
396                Some(dur) => Some(Duration::from_millis(dur.0)),
397            },
398            timers: TimerTable::default(),
399            authentication_failures: 0,
400            connection_close_pending: false,
401
402            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
403                &TransportParameters::default(),
404            )),
405
406            app_limited: false,
407            receiving_ecn: false,
408            total_authed_packets: 0,
409
410            next_observed_addr_seq_no: 0u32.into(),
411
412            streams: StreamsState::new(
413                side,
414                config.max_concurrent_uni_streams,
415                config.max_concurrent_bidi_streams,
416                config.send_window,
417                config.receive_window,
418                config.stream_receive_window,
419            ),
420            datagrams: DatagramState::default(),
421            config,
422            remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]),
423            rng,
424            stats: ConnectionStats::default(),
425            path_stats: Default::default(),
426            version,
427
428            // peer params are not yet known, so multipath is not enabled
429            max_concurrent_paths: NonZeroU32::MIN,
430            local_max_path_id: PathId::ZERO,
431            remote_max_path_id: PathId::ZERO,
432            max_path_id_with_cids: PathId::ZERO,
433            abandoned_paths: Default::default(),
434
435            n0_nat_traversal: Default::default(),
436            qlog,
437        };
438        if path_validated {
439            this.on_path_validated(PathId::ZERO);
440        }
441        if side.is_client() {
442            // Kick off the connection
443            this.write_crypto();
444            this.init_0rtt(now);
445        }
446        this.qlog
447            .emit_tuple_assigned(PathId::ZERO, network_path, now);
448        this
449    }
450
451    /// Returns the next time at which `handle_timeout` should be called
452    ///
453    /// The value returned may change after:
454    /// - the application performed some I/O on the connection
455    /// - a call was made to `handle_event`
456    /// - a call to `poll_transmit` returned `Some`
457    /// - a call was made to `handle_timeout`
458    #[must_use]
459    pub fn poll_timeout(&mut self) -> Option<Instant> {
460        self.timers.peek()
461    }
462
463    /// Returns application-facing events
464    ///
465    /// Connections should be polled for events after:
466    /// - a call was made to `handle_event`
467    /// - a call was made to `handle_timeout`
468    #[must_use]
469    pub fn poll(&mut self) -> Option<Event> {
470        if let Some(x) = self.events.pop_front() {
471            return Some(x);
472        }
473
474        if let Some(event) = self.streams.poll() {
475            return Some(Event::Stream(event));
476        }
477
478        if let Some(reason) = self.state.take_error() {
479            return Some(Event::ConnectionLost { reason });
480        }
481
482        None
483    }
484
485    /// Return endpoint-facing events
486    #[must_use]
487    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
488        self.endpoint_events.pop_front().map(EndpointEvent)
489    }
490
491    /// Provide control over streams
492    #[must_use]
493    pub fn streams(&mut self) -> Streams<'_> {
494        Streams {
495            state: &mut self.streams,
496            conn_state: &self.state,
497        }
498    }
499
500    /// Provide control over streams
501    #[must_use]
502    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
503        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
504        RecvStream {
505            id,
506            state: &mut self.streams,
507            pending: &mut self.spaces[SpaceId::Data].pending,
508        }
509    }
510
511    /// Provide control over streams
512    #[must_use]
513    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
514        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
515        SendStream {
516            id,
517            state: &mut self.streams,
518            pending: &mut self.spaces[SpaceId::Data].pending,
519            conn_state: &self.state,
520        }
521    }
522
523    /// Opens a new path only if no path on the same network path currently exists.
524    ///
525    /// This comparison will use [`FourTuple::is_probably_same_path`] on the given `network_path`
526    /// and pass it existing path's network paths.
527    ///
528    /// This means that you can pass `local_ip: None` to make the comparison only compare
529    /// remote addresses.
530    ///
531    /// This avoids having to guess which local interface will be used to communicate with the
532    /// remote, should it not be known yet. We assume that if we already have a path to the remote,
533    /// the OS is likely to use the same interface to talk to said remote.
534    ///
535    /// See also [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id,
536    /// false)` if was opened.
537    ///
538    /// [`open_path`]: Connection::open_path
539    pub fn open_path_ensure(
540        &mut self,
541        network_path: FourTuple,
542        initial_status: PathStatus,
543        now: Instant,
544    ) -> Result<(PathId, bool), PathError> {
545        let existing_open_path = self.paths.iter().find(|(id, path)| {
546            network_path.is_probably_same_path(&path.data.network_path)
547                && !self.abandoned_paths.contains(*id)
548        });
549        match existing_open_path {
550            Some((path_id, _state)) => Ok((*path_id, true)),
551            None => Ok((self.open_path(network_path, initial_status, now)?, false)),
552        }
553    }
554
555    /// Opens a new path
556    ///
557    /// Further errors might occur and they will be emitted in [`PathEvent::Abandoned`] events with this path id.
558    /// When the path is opened it will be reported as an [`PathEvent::Opened`].
559    pub fn open_path(
560        &mut self,
561        network_path: FourTuple,
562        initial_status: PathStatus,
563        now: Instant,
564    ) -> Result<PathId, PathError> {
565        if !self.is_multipath_negotiated() {
566            return Err(PathError::MultipathNotNegotiated);
567        }
568        if self.side().is_server() {
569            return Err(PathError::ServerSideNotAllowed);
570        }
571
572        let max_abandoned = self.abandoned_paths.iter().max().copied();
573        let max_used = self.paths.keys().last().copied();
574        let path_id = max_abandoned
575            .max(max_used)
576            .unwrap_or(PathId::ZERO)
577            .saturating_add(1u8);
578
579        if Some(path_id) > self.max_path_id() {
580            return Err(PathError::MaxPathIdReached);
581        }
582        if path_id > self.remote_max_path_id {
583            self.spaces[SpaceId::Data].pending.paths_blocked = true;
584            return Err(PathError::MaxPathIdReached);
585        }
586        if self
587            .remote_cids
588            .get(&path_id)
589            .map(CidQueue::active)
590            .is_none()
591        {
592            self.spaces[SpaceId::Data]
593                .pending
594                .path_cids_blocked
595                .insert(path_id);
596            return Err(PathError::RemoteCidsExhausted);
597        }
598
599        let path = self.ensure_path(path_id, network_path, now, None);
600        path.status.local_update(initial_status);
601
602        Ok(path_id)
603    }
604
605    /// Closes a path and sends a PATH_ABANDON frame with the passed error code.
606    ///
607    /// This will not allow closing the last path. It does allow closing paths which have
608    /// not yet been opened, as e.g. is the case when receiving a PATH_ABANDON from the peer
609    /// for a path that was never opened locally.
610    pub fn close_path(
611        &mut self,
612        now: Instant,
613        path_id: PathId,
614        error_code: VarInt,
615    ) -> Result<(), ClosePathError> {
616        self.close_path_inner(
617            now,
618            path_id,
619            PathAbandonReason::ApplicationClosed { error_code },
620        )
621    }
622
623    /// Closes a path and sends a PATH_ABANDON frame.
624    ///
625    /// Other than [`Self::close_path`] this allows to specify the reason for the path being closed.
626    /// Internally, this should be used over [`Self::close_path`].
627    fn close_path_inner(
628        &mut self,
629        now: Instant,
630        path_id: PathId,
631        reason: PathAbandonReason,
632    ) -> Result<(), ClosePathError> {
633        if self.state.is_drained() {
634            return Ok(());
635        }
636
637        if !self.is_multipath_negotiated() {
638            return Err(ClosePathError::MultipathNotNegotiated);
639        }
640        if self.abandoned_paths.contains(&path_id)
641            || Some(path_id) > self.max_path_id()
642            || !self.paths.contains_key(&path_id)
643        {
644            return Err(ClosePathError::ClosedPath);
645        }
646
647        if reason.is_locally_initiated() {
648            let has_remaining_validated_paths = self.paths.iter().any(|(id, path)| {
649                *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
650            });
651            if !has_remaining_validated_paths {
652                return Err(ClosePathError::LastOpenPath);
653            }
654        } else {
655            // The remote abandoned this path. We should always "accept" this. Doing so right now,
656            // however, breaks assumptions throughout the code. We error instead, for the
657            // connection to be killed. See <https://github.com/n0-computer/noq/issues/397>
658            let has_remaining_paths = self
659                .paths
660                .keys()
661                .any(|id| *id != path_id && !self.abandoned_paths.contains(id));
662            if !has_remaining_paths {
663                return Err(ClosePathError::LastOpenPath);
664            }
665        }
666
667        trace!(%path_id, ?reason, "closing path");
668
669        // Send PATH_ABANDON
670        self.spaces[SpaceId::Data]
671            .pending
672            .path_abandon
673            .insert(path_id, reason.error_code());
674
675        // Remove pending NEW CIDs for this path
676        let pending_space = &mut self.spaces[SpaceId::Data].pending;
677        pending_space.new_cids.retain(|cid| cid.path_id != path_id);
678        pending_space.path_cids_blocked.retain(|&id| id != path_id);
679        pending_space.path_status.retain(|&id| id != path_id);
680
681        // Cleanup retransmits across ALL paths (CIDs for path_id may have been transmitted on other paths)
682        for space in self.spaces[SpaceId::Data].iter_paths_mut() {
683            for sent_packet in space.sent_packets.values_mut() {
684                if let Some(retransmits) = sent_packet.retransmits.get_mut() {
685                    retransmits.new_cids.retain(|cid| cid.path_id != path_id);
686                    retransmits.path_cids_blocked.retain(|&id| id != path_id);
687                    retransmits.path_status.retain(|&id| id != path_id);
688                }
689            }
690        }
691
692        debug_assert!(!self.state.is_drained()); // requirement for endpoint_events, checked above
693        self.endpoint_events
694            .push_back(EndpointEventInner::RetireResetToken(path_id));
695
696        trace!(%path_id, "abandoning path");
697        self.abandoned_paths.insert(path_id);
698
699        for timer in timer::PathTimer::VALUES {
700            // match for completeness
701            let keep_timer = match timer {
702                // These timers deal with sending and receiving PATH_CHALLENGE and
703                // PATH_RESPONSE, but now that the path is abandoned, we no longer care about
704                // these frames or their timing
705                PathTimer::PathValidation | PathTimer::PathChallengeLost | PathTimer::PathOpen => {
706                    false
707                }
708                // These timers deal with the lifetime of the path. Now that the path is abandoned,
709                // these are not relevant.
710                PathTimer::PathKeepAlive | PathTimer::PathIdle => false,
711                // The path has already been informed that outstanding acks should be sent
712                // immediately
713                PathTimer::MaxAckDelay => false,
714                // This timer should not be set, for completeness it's not kept as it's set when
715                // the PATH_ABANDON frame is sent.
716                PathTimer::DiscardPath => false,
717                // Sent packets still need to be identified as lost to trigger timely
718                // retransmission.
719                PathTimer::LossDetection => true,
720                // This path should not be used for sending after the PATH_ABANDON frame is sent.
721                // However, any outstanding data that should be sent before PATH_ABANDON, should
722                // still respect pacing.
723                PathTimer::Pacing => true,
724            };
725
726            if !keep_timer {
727                let qlog = self.qlog.with_time(now);
728                self.timers.stop(Timer::PerPath(path_id, timer), qlog);
729            }
730        }
731
732        // Emit event to the application.
733        self.events.push_back(Event::Path(PathEvent::Abandoned {
734            id: path_id,
735            reason,
736        }));
737
738        Ok(())
739    }
740
741    /// Gets the [`PathData`] for a known [`PathId`].
742    ///
743    /// Will panic if the path_id does not reference any known path.
744    #[track_caller]
745    fn path_data(&self, path_id: PathId) -> &PathData {
746        if let Some(data) = self.paths.get(&path_id) {
747            &data.data
748        } else {
749            panic!(
750                "unknown path: {path_id}, currently known paths: {:?}",
751                self.paths.keys().collect::<Vec<_>>()
752            );
753        }
754    }
755
756    /// Gets a reference to the [`PathData`] for a [`PathId`]
757    fn path(&self, path_id: PathId) -> Option<&PathData> {
758        self.paths.get(&path_id).map(|path_state| &path_state.data)
759    }
760
761    /// Gets a mutable reference to the [`PathData`] for a [`PathId`]
762    fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
763        self.paths
764            .get_mut(&path_id)
765            .map(|path_state| &mut path_state.data)
766    }
767
768    /// Returns all known paths.
769    ///
770    /// There is no guarantee any of these paths are open or usable.
771    pub fn paths(&self) -> Vec<PathId> {
772        self.paths.keys().copied().collect()
773    }
774
775    /// Gets the local [`PathStatus`] for a known [`PathId`]
776    pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
777        self.path(path_id)
778            .map(PathData::local_status)
779            .ok_or(ClosedPath { _private: () })
780    }
781
782    /// Returns the path's network path represented as a 4-tuple.
783    pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
784        self.path(path_id)
785            .map(|path| path.network_path)
786            .ok_or(ClosedPath { _private: () })
787    }
788
789    /// Sets the [`PathStatus`] for a known [`PathId`]
790    ///
791    /// Returns the previous path status on success.
792    pub fn set_path_status(
793        &mut self,
794        path_id: PathId,
795        status: PathStatus,
796    ) -> Result<PathStatus, SetPathStatusError> {
797        if !self.is_multipath_negotiated() {
798            return Err(SetPathStatusError::MultipathNotNegotiated);
799        }
800        let path = self
801            .path_mut(path_id)
802            .ok_or(SetPathStatusError::ClosedPath)?;
803        let prev = match path.status.local_update(status) {
804            Some(prev) => {
805                self.spaces[SpaceId::Data]
806                    .pending
807                    .path_status
808                    .insert(path_id);
809                prev
810            }
811            None => path.local_status(),
812        };
813        Ok(prev)
814    }
815
816    /// Returns the remote path status
817    // TODO(flub): Probably should also be some kind of path event?  Not even sure if I like
818    //    this as an API, but for now it allows me to write a test easily.
819    // TODO(flub): Technically this should be a Result<Option<PathSTatus>>?
820    pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
821        self.path(path_id).and_then(|path| path.remote_status())
822    }
823
824    /// Sets the max_idle_timeout for a specific path
825    ///
826    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
827    ///
828    /// Returns the previous value of the setting.
829    pub fn set_path_max_idle_timeout(
830        &mut self,
831        path_id: PathId,
832        timeout: Option<Duration>,
833    ) -> Result<Option<Duration>, ClosedPath> {
834        let path = self
835            .paths
836            .get_mut(&path_id)
837            .ok_or(ClosedPath { _private: () })?;
838        Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
839    }
840
841    /// Sets the keep_alive_interval for a specific path
842    ///
843    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
844    ///
845    /// Returns the previous value of the setting.
846    pub fn set_path_keep_alive_interval(
847        &mut self,
848        path_id: PathId,
849        interval: Option<Duration>,
850    ) -> Result<Option<Duration>, ClosedPath> {
851        let path = self
852            .paths
853            .get_mut(&path_id)
854            .ok_or(ClosedPath { _private: () })?;
855        Ok(std::mem::replace(&mut path.data.keep_alive, interval))
856    }
857
858    /// Gets the [`PathData`] for a known [`PathId`].
859    ///
860    /// Will panic if the path_id does not reference any known path.
861    #[track_caller]
862    fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
863        &mut self.paths.get_mut(&path_id).expect("known path").data
864    }
865
866    /// Find an open, validated path that's on the same network path as the given network path.
867    ///
868    /// Returns the first path matching, even if there's multiple.
869    fn find_validated_path_on_network_path(
870        &self,
871        network_path: FourTuple,
872    ) -> Option<(&PathId, &PathState)> {
873        self.paths.iter().find(|(path_id, path_state)| {
874            path_state.data.validated
875                // Would this use the same network path, if network_path were used to send right now?
876                && network_path.is_probably_same_path(&path_state.data.network_path)
877                && !self.abandoned_paths.contains(path_id)
878        })
879        // TODO(@divma): we might want to ensure the path has been recently active to consider the
880        // address validated
881        // matheus23: Perhaps looking at !self.abandoned_paths.contains(path_id) is enough, given keep-alives?
882    }
883
884    /// Creates the [`PathData`] for a new [`PathId`].
885    ///
886    /// Called for incoming packets as well as when opening a new path locally.
887    fn ensure_path(
888        &mut self,
889        path_id: PathId,
890        network_path: FourTuple,
891        now: Instant,
892        pn: Option<u64>,
893    ) -> &mut PathData {
894        let valid_path = self.find_validated_path_on_network_path(network_path);
895        let validated = valid_path.is_some();
896        let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
897        let vacant_entry = match self.paths.entry(path_id) {
898            btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
899            btree_map::Entry::Occupied(occupied_entry) => {
900                return &mut occupied_entry.into_mut().data;
901            }
902        };
903
904        debug!(%validated, %path_id, %network_path, "path added");
905        let peer_max_udp_payload_size =
906            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
907        self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
908        let mut data = PathData::new(
909            network_path,
910            self.allow_mtud,
911            Some(peer_max_udp_payload_size),
912            self.path_generation_counter,
913            now,
914            &self.config,
915        );
916
917        data.validated = validated;
918        if let Some(initial_rtt) = initial_rtt {
919            data.rtt.reset_initial_rtt(initial_rtt);
920        }
921
922        // To open a path locally we need to send a packet on the path. Sending a challenge
923        // guarantees this.
924        data.pending_on_path_challenge = true;
925
926        let path = vacant_entry.insert(PathState { data, prev: None });
927
928        let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
929        if let Some(pn) = pn {
930            pn_space.dedup.insert(pn);
931        }
932        self.spaces[SpaceId::Data]
933            .number_spaces
934            .insert(path_id, pn_space);
935        self.qlog.emit_tuple_assigned(path_id, network_path, now);
936
937        // If the remote opened this path we may not have CIDs for it. For locally opened
938        // paths the caller should have already made sure we have CIDs and refused to open
939        // it if there were none.
940        if !self.remote_cids.contains_key(&path_id) {
941            debug!(%path_id, "Remote opened path without issuing CIDs");
942            self.spaces[SpaceId::Data]
943                .pending
944                .path_cids_blocked
945                .insert(path_id);
946            // Do not abandon this path right away. CIDs might be in-flight still and arrive
947            // soon. It is up to the remote to handle this situation.
948        }
949
950        &mut path.data
951    }
952
953    /// Returns packets to transmit
954    ///
955    /// Connections should be polled for transmit after:
956    /// - the application performed some I/O on the connection
957    /// - a call was made to `handle_event`
958    /// - a call was made to `handle_timeout`
959    ///
960    /// `max_datagrams` specifies how many datagrams can be returned inside a
961    /// single Transmit using GSO. This must be at least 1.
962    #[must_use]
963    pub fn poll_transmit(
964        &mut self,
965        now: Instant,
966        max_datagrams: NonZeroUsize,
967        buf: &mut Vec<u8>,
968    ) -> Option<Transmit> {
969        let max_datagrams = match self.config.enable_segmentation_offload {
970            false => NonZeroUsize::MIN,
971            true => max_datagrams,
972        };
973
974        // Each call to poll_transmit can only send datagrams to one destination, because
975        // all datagrams in a GSO batch are for the same destination.  Therefore only
976        // datagrams for one destination address are produced for each poll_transmit call.
977
978        // Check whether we need to send a close message
979        let connection_close_pending = match self.state.as_type() {
980            StateType::Drained => {
981                self.app_limited = true;
982                return None;
983            }
984            StateType::Draining | StateType::Closed => {
985                // self.connection_close_pending is only reset once the associated packet
986                // had been encoded successfully
987                if !self.connection_close_pending {
988                    self.app_limited = true;
989                    return None;
990                }
991                true
992            }
993            _ => false,
994        };
995
996        // Schedule an ACK_FREQUENCY frame if a new one needs to be sent.
997        if let Some(config) = &self.config.ack_frequency_config {
998            let rtt = self
999                .paths
1000                .values()
1001                .map(|p| p.data.rtt.get())
1002                .min()
1003                .expect("one path exists");
1004            self.spaces[SpaceId::Data].pending.ack_frequency = self
1005                .ack_frequency
1006                .should_send_ack_frequency(rtt, config, &self.peer_params)
1007                && self.highest_space == SpaceKind::Data
1008                && self.peer_supports_ack_frequency();
1009        }
1010
1011        // If we end up not sending anything, we need to know if that was because there was
1012        // nothing to send or because we were congestion blocked.
1013        let mut congestion_blocked = false;
1014
1015        let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1016        while let Some(path_id) = next_path_id {
1017            if !connection_close_pending
1018                && let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id)
1019            {
1020                return Some(transmit);
1021            }
1022
1023            let info = self.scheduling_info(path_id);
1024            match self.poll_transmit_on_path(
1025                now,
1026                buf,
1027                path_id,
1028                max_datagrams,
1029                &info,
1030                connection_close_pending,
1031            ) {
1032                PollPathStatus::Send(transmit) => {
1033                    return Some(transmit);
1034                }
1035                PollPathStatus::NothingToSend {
1036                    congestion_blocked: cb,
1037                } => {
1038                    congestion_blocked |= cb;
1039                    // Continue checking other paths, tail-loss probes may need to be sent
1040                    // in all spaces.
1041                    debug_assert!(
1042                        buf.is_empty(),
1043                        "nothing to send on path but buffer not empty"
1044                    );
1045                }
1046            }
1047
1048            next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1049        }
1050
1051        // We didn't produce any application data packet
1052        debug_assert!(
1053            buf.is_empty(),
1054            "there was data in the buffer, but it was not sent"
1055        );
1056
1057        self.app_limited = !congestion_blocked;
1058
1059        if self.state.is_established() {
1060            // Try MTU probing now
1061            let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1062            while let Some(path_id) = next_path_id {
1063                if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) {
1064                    return Some(transmit);
1065                }
1066                next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1067            }
1068        }
1069
1070        None
1071    }
1072
1073    /// Computes the packet scheduling information for this path.
1074    ///
1075    /// While this information is only returned for a single path, it is important to know
1076    /// that this information remains static for the entire span of a single
1077    /// [`Connection::poll_transmit`] call. In other words, the return value is purely
1078    /// functional and only depends on the [`PathId`] **during a single** `poll_transmit`
1079    /// call. It can be computed up-front for all paths but we don't do that because it
1080    /// involves an allocation.
1081    ///
1082    /// See the inline comments for how the  packet scheduling works.
1083    ///
1084    /// # Panics
1085    ///
1086    /// This will panic if called for a path for which we do not have any [`PathData`], like
1087    /// so many other functions we have. But this is the only one to document this in its
1088    /// doc comment. Maybe that should change. Eventually we'll refactor things for this
1089    /// panic to go away.
1090    fn scheduling_info(&self, path_id: PathId) -> PathSchedulingInfo {
1091        // Such a space is preferred for SpaceKind::Data frames.
1092        let have_validated_status_available_space = self.paths.iter().any(|(path_id, path)| {
1093            self.remote_cids.contains_key(path_id)
1094                && !self.abandoned_paths.contains(path_id)
1095                && path.data.validated
1096                && path.data.local_status() == PathStatus::Available
1097        });
1098
1099        // Such a space is able to send SpaceKind::Data frames.
1100        let have_validated_space = self.paths.iter().any(|(path_id, path)| {
1101            self.remote_cids.contains_key(path_id)
1102                && !self.abandoned_paths.contains(path_id)
1103                && path.data.validated
1104        });
1105
1106        let is_handshaking = self.is_handshaking();
1107        let has_cids = self.remote_cids.contains_key(&path_id);
1108        let is_abandoned = self.abandoned_paths.contains(&path_id);
1109        let path_data = self.path_data(path_id);
1110        let validated = path_data.validated;
1111        let status = path_data.local_status();
1112
1113        // This is the core packet scheduling, whether this space ID may send
1114        // SpaceKind::Data frames.
1115        let may_send_data = has_cids
1116            && !is_abandoned
1117            && if is_handshaking {
1118                // There is only one path during the handshake. We want to
1119                // already send 0-RTT and 0.5-RTT (permitting anti-amplification
1120                // limit) data.
1121                true
1122            } else if !validated {
1123                // TODO(flub): When we have a network change we might end up
1124                //    having to abandon all paths and re-open new ones to the
1125                //    same remotes. This leaves us without any validated
1126                //    path. Perhaps we should have a way to figure out if the
1127                //    path is to a previously-validated remote address and allow
1128                //    sending data to such remotes immediately.
1129                false
1130            } else {
1131                match status {
1132                    PathStatus::Available => {
1133                        // Best possible space to send data on.
1134                        true
1135                    }
1136                    PathStatus::Backup => {
1137                        // If there is a status-available path we prefer that.
1138                        !have_validated_status_available_space
1139                    }
1140                }
1141            };
1142
1143        // CONNECTION_CLOSE is allowed to be sent on a non-validated
1144        // path. Particularly during the handshake we want to send it before the
1145        // path is validated. Later if there is no validated path available we
1146        // will also accept sending it on an un-validated path.
1147        let may_send_close = has_cids
1148            && !is_abandoned
1149            && if !validated && have_validated_status_available_space {
1150                // We have a better space to send on.
1151                false
1152            } else {
1153                // No other validated space, this is as good as it gets.
1154                true
1155            };
1156
1157        // PATH_ABANDON is normally sent together with other SpaceKind::Data frames. But if
1158        // there is no other validated space to send it on, it can be sent on the path to be
1159        // abandoned itself if that was validated.
1160        let may_self_abandon = has_cids && validated && !have_validated_space;
1161
1162        PathSchedulingInfo {
1163            is_abandoned,
1164            may_send_data,
1165            may_send_close,
1166            may_self_abandon,
1167        }
1168    }
1169
1170    fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit {
1171        debug_assert!(
1172            !transmit.is_empty(),
1173            "must not be called with an empty transmit buffer"
1174        );
1175
1176        let network_path = self.path_data(path_id).network_path;
1177        trace!(
1178            segment_size = transmit.segment_size(),
1179            last_datagram_len = transmit.len() % transmit.segment_size(),
1180            %network_path,
1181            "sending {} bytes in {} datagrams",
1182            transmit.len(),
1183            transmit.num_datagrams()
1184        );
1185        self.path_data_mut(path_id)
1186            .inc_total_sent(transmit.len() as u64);
1187
1188        self.stats
1189            .udp_tx
1190            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1191        self.path_stats
1192            .entry(path_id)
1193            .or_default()
1194            .udp_tx
1195            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1196
1197        Transmit {
1198            destination: network_path.remote,
1199            size: transmit.len(),
1200            ecn: if self.path_data(path_id).sending_ecn {
1201                Some(EcnCodepoint::Ect0)
1202            } else {
1203                None
1204            },
1205            segment_size: match transmit.num_datagrams() {
1206                1 => None,
1207                _ => Some(transmit.segment_size()),
1208            },
1209            src_ip: network_path.local_ip,
1210        }
1211    }
1212
1213    /// poll_transmit logic for off-path data.
1214    fn poll_transmit_off_path(
1215        &mut self,
1216        now: Instant,
1217        buf: &mut Vec<u8>,
1218        path_id: PathId,
1219    ) -> Option<Transmit> {
1220        if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) {
1221            return Some(challenge);
1222        }
1223        if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
1224            return Some(response);
1225        }
1226        if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
1227            return Some(challenge);
1228        }
1229        None
1230    }
1231
1232    /// poll_transmit logic for on-path data.
1233    ///
1234    /// This is not quite the same as for a multipath packet space, since [`PathId::ZERO`]
1235    /// has 3 packet spaces, which this handles.
1236    ///
1237    /// See [`Self::poll_transmit_off_path`] for off-path data.
1238    #[must_use]
1239    fn poll_transmit_on_path(
1240        &mut self,
1241        now: Instant,
1242        buf: &mut Vec<u8>,
1243        path_id: PathId,
1244        max_datagrams: NonZeroUsize,
1245        scheduling_info: &PathSchedulingInfo,
1246        connection_close_pending: bool,
1247    ) -> PollPathStatus {
1248        // Check if there is at least one active CID to use for sending
1249        let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else {
1250            if !self.abandoned_paths.contains(&path_id) {
1251                debug!(%path_id, "no remote CIDs for path");
1252            }
1253            return PollPathStatus::NothingToSend {
1254                congestion_blocked: false,
1255            };
1256        };
1257
1258        // Whether the last packet in the datagram must be padded so the datagram takes up
1259        // an exact size. An earlier space can decide to not fill an entire datagram and
1260        // require the next space to fill it further. But may need a specific size of the
1261        // datagram containing the packet. The final packet built in the datagram must pad
1262        // to this size.
1263        let mut pad_datagram = PadDatagram::No;
1264
1265        // The packet number of the last built packet. This is kept kept across spaces.
1266        // QUIC is supposed to have a single congestion controller for the Initial,
1267        // Handshake and Data(PathId::ZERO) spaces.
1268        let mut last_packet_number = None;
1269
1270        // If we end up not sending anything, we need to know if that was because there was
1271        // nothing to send or because we were congestion blocked.
1272        let mut congestion_blocked = false;
1273
1274        // Set the segment size to this path's MTU for on-path data.
1275        let pmtu = self.path_data(path_id).current_mtu().into();
1276        let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu);
1277
1278        // Iterate over the available spaces.
1279        for space_id in SpaceId::iter() {
1280            // Only PathId::ZERO uses non Data space ids.
1281            if path_id != PathId::ZERO && space_id != SpaceId::Data {
1282                continue;
1283            }
1284            match self.poll_transmit_path_space(
1285                now,
1286                &mut transmit,
1287                path_id,
1288                space_id,
1289                remote_cid,
1290                scheduling_info,
1291                connection_close_pending,
1292                pad_datagram,
1293            ) {
1294                PollPathSpaceStatus::NothingToSend {
1295                    congestion_blocked: cb,
1296                } => {
1297                    congestion_blocked |= cb;
1298                    // Continue checking other spaces, tail-loss probes may need to be sent
1299                    // in all spaces.
1300                }
1301                PollPathSpaceStatus::WrotePacket {
1302                    last_packet_number: pn,
1303                    pad_datagram: pad,
1304                } => {
1305                    debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1306                    last_packet_number = Some(pn);
1307                    pad_datagram = pad;
1308                    // Always check higher spaces. If the transmit is full or they have
1309                    // nothing to send they will not write packets. But if they can, they
1310                    // must always be allowed to add to this transmit because coalescing may
1311                    // be required.
1312                    continue;
1313                }
1314                PollPathSpaceStatus::Send {
1315                    last_packet_number: pn,
1316                } => {
1317                    debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1318                    last_packet_number = Some(pn);
1319                    break;
1320                }
1321            }
1322        }
1323
1324        if last_packet_number.is_some() || congestion_blocked {
1325            self.qlog.emit_recovery_metrics(
1326                path_id,
1327                &mut self.paths.get_mut(&path_id).unwrap().data,
1328                now,
1329            );
1330        }
1331
1332        match last_packet_number {
1333            Some(last_packet_number) => {
1334                // Note that when sending in multiple spaces the last packet number will be
1335                // the one from the highest space.
1336                self.path_data_mut(path_id).congestion.on_sent(
1337                    now,
1338                    transmit.len() as u64,
1339                    last_packet_number,
1340                );
1341                PollPathStatus::Send(self.build_transmit(path_id, transmit))
1342            }
1343            None => PollPathStatus::NothingToSend { congestion_blocked },
1344        }
1345    }
1346
1347    /// poll_transmit logic for a QUIC-MULTIPATH packet number space (PathID + SpaceId).
1348    #[must_use]
1349    fn poll_transmit_path_space(
1350        &mut self,
1351        now: Instant,
1352        transmit: &mut TransmitBuf<'_>,
1353        path_id: PathId,
1354        space_id: SpaceId,
1355        remote_cid: ConnectionId,
1356        scheduling_info: &PathSchedulingInfo,
1357        // If we need to send a CONNECTION_CLOSE frame.
1358        connection_close_pending: bool,
1359        // Whether the current datagram needs to be padded to a certain size.
1360        mut pad_datagram: PadDatagram,
1361    ) -> PollPathSpaceStatus {
1362        // Keep track of the last packet number we wrote. If None we did not write any
1363        // packets.
1364        let mut last_packet_number = None;
1365
1366        // Each loop of this may build one packet. It works logically as follows:
1367        //
1368        // - Check if something *needs* to be sent in this space and *can* be sent.
1369        //   - If not, return to the caller who will call us again for the next space.
1370        // - Start a new datagram.
1371        //   - Unless coalescing the packet into an existing datagram.
1372        // - Write the packet header and payload.
1373        // - Check if coalescing a next packet into the datagram is possible.
1374        // - If coalescing, finish packet without padding to leave space in the datagram.
1375        // - If not coalescing, complete the datagram:
1376        //   - Finish packet with padding.
1377        //   - Set the transmit segment size if this is the first datagram.
1378        // - Loop: next iteration will exit the loop if nothing more to send in this
1379        //   space. The TransmitBuf will contain a started datagram with space if
1380        //   coalescing, or completely filled datagram if not coalescing.
1381        loop {
1382            // Determine if anything can be sent in this packet number space.
1383            let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1384                // A datagram is started already, we are coalescing another packet into it.
1385                transmit.datagram_remaining_mut()
1386            } else {
1387                // A new datagram needs to be started.
1388                transmit.segment_size()
1389            };
1390            let can_send =
1391                self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending);
1392            let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1393            let space_will_send = {
1394                if scheduling_info.is_abandoned {
1395                    // If this path is abandoned then we might still have to send
1396                    // PATH_ABANDON itself on it if there was no better space
1397                    // available. Otherwise we want to send the PATH_ABANDON as permitted by
1398                    // may_send_data however.
1399                    scheduling_info.may_self_abandon
1400                        && self.spaces[space_id]
1401                            .pending
1402                            .path_abandon
1403                            .contains_key(&path_id)
1404                } else if can_send.close && scheduling_info.may_send_close {
1405                    // This is the best path to send a CONNECTION_CLOSE on.
1406                    true
1407                } else if needs_loss_probe || can_send.space_specific {
1408                    // We always send a loss probe or space-specific frames if the path is
1409                    // not abandoned.
1410                    true
1411                } else {
1412                    // Anything else we only send if we're the best path for SpaceKind::Data
1413                    // frames.
1414                    !can_send.is_empty() && scheduling_info.may_send_data
1415                }
1416            };
1417
1418            if !space_will_send {
1419                // Nothing more to send. Previous iterations of this loop may have built
1420                // packets already.
1421                return match last_packet_number {
1422                    Some(pn) => PollPathSpaceStatus::WrotePacket {
1423                        last_packet_number: pn,
1424                        pad_datagram,
1425                    },
1426                    None => {
1427                        // Only log for spaces which have crypto.
1428                        if self.crypto_state.has_keys(space_id.encryption_level())
1429                            || (space_id == SpaceId::Data
1430                                && self.crypto_state.has_keys(EncryptionLevel::ZeroRtt))
1431                        {
1432                            trace!(?space_id, %path_id, "nothing to send in space");
1433                        }
1434                        PollPathSpaceStatus::NothingToSend {
1435                            congestion_blocked: false,
1436                        }
1437                    }
1438                };
1439            }
1440
1441            // We want to send on this space, check congestion control if we can. But only
1442            // if we will need to start a new datagram. If we are coalescing into an already
1443            // started datagram we do not need to check congestion control again.
1444            if transmit.datagram_remaining_mut() == 0 {
1445                let congestion_blocked =
1446                    self.path_congestion_check(space_id, path_id, transmit, &can_send, now);
1447                if congestion_blocked != PathBlocked::No {
1448                    // Previous iterations of this loop may have built packets already.
1449                    return match last_packet_number {
1450                        Some(pn) => PollPathSpaceStatus::WrotePacket {
1451                            last_packet_number: pn,
1452                            pad_datagram,
1453                        },
1454                        None => {
1455                            return PollPathSpaceStatus::NothingToSend {
1456                                congestion_blocked: true,
1457                            };
1458                        }
1459                    };
1460                }
1461            }
1462
1463            // If the datagram is full (or there never was one started), we need to start a
1464            // new one.
1465            if transmit.datagram_remaining_mut() == 0 {
1466                if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1467                    // No more datagrams allowed.
1468                    // Previous iterations of this loop may have built packets already.
1469                    return match last_packet_number {
1470                        Some(pn) => PollPathSpaceStatus::WrotePacket {
1471                            last_packet_number: pn,
1472                            pad_datagram,
1473                        },
1474                        None => {
1475                            return PollPathSpaceStatus::NothingToSend {
1476                                congestion_blocked: false,
1477                            };
1478                        }
1479                    };
1480                }
1481
1482                match self.spaces[space_id].for_path(path_id).loss_probes {
1483                    0 => transmit.start_new_datagram(),
1484                    _ => {
1485                        // We need something to send for a tail-loss probe.
1486                        let request_immediate_ack =
1487                            space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1488                        self.spaces[space_id].maybe_queue_probe(
1489                            path_id,
1490                            request_immediate_ack,
1491                            &self.streams,
1492                        );
1493
1494                        self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1495
1496                        // Clamp the datagram to at most the minimum MTU to ensure that loss
1497                        // probes can get through and enable recovery even if the path MTU
1498                        // has shrank unexpectedly.
1499                        transmit.start_new_datagram_with_size(std::cmp::min(
1500                            usize::from(INITIAL_MTU),
1501                            transmit.segment_size(),
1502                        ));
1503                    }
1504                }
1505                trace!(count = transmit.num_datagrams(), "new datagram started");
1506
1507                // We started a new datagram, we decide later if it needs padding.
1508                pad_datagram = PadDatagram::No;
1509            }
1510
1511            // If coalescing another packet into the existing datagram, there should
1512            // still be enough space for a whole packet.
1513            if transmit.datagram_start_offset() < transmit.len() {
1514                debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1515            }
1516
1517            //
1518            // From here on, we've determined that a packet will definitely be sent.
1519            //
1520
1521            if self.crypto_state.has_keys(EncryptionLevel::Initial)
1522                && space_id == SpaceId::Handshake
1523                && self.side.is_client()
1524            {
1525                // A client stops both sending and processing Initial packets when it
1526                // sends its first Handshake packet.
1527                self.discard_space(now, SpaceKind::Initial);
1528            }
1529            if let Some(ref mut prev) = self.crypto_state.prev_crypto {
1530                prev.update_unacked = false;
1531            }
1532
1533            let Some(mut builder) = PacketBuilder::new(
1534                now,
1535                space_id,
1536                path_id,
1537                remote_cid,
1538                transmit,
1539                can_send.is_ack_eliciting(),
1540                self,
1541            ) else {
1542                // Confidentiality limit is exceeded and the connection has been killed. We
1543                // should not send any other packets. This works in a roundabout way: We
1544                // have started a datagram but not written anything into it. So even if we
1545                // get called again for another space we will see an already started
1546                // datagram and try and start another packet here. Then be stopped by the
1547                // same confidentiality limit.
1548                return PollPathSpaceStatus::NothingToSend {
1549                    congestion_blocked: false,
1550                };
1551            };
1552            last_packet_number = Some(builder.packet_number);
1553
1554            if space_id == SpaceId::Initial
1555                && (self.side.is_client() || can_send.is_ack_eliciting())
1556            {
1557                // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1
1558                pad_datagram |= PadDatagram::ToMinMtu;
1559            }
1560            if space_id == SpaceId::Data && self.config.pad_to_mtu {
1561                pad_datagram |= PadDatagram::ToSegmentSize;
1562            }
1563
1564            if scheduling_info.may_send_close && can_send.close {
1565                trace!("sending CONNECTION_CLOSE");
1566                // Encode ACKs before the ConnectionClose message, to give the receiver
1567                // a better approximate on what data has been processed. This is
1568                // especially important with ack delay, since the peer might not
1569                // have gotten any other ACK for the data earlier on.
1570                let is_multipath_negotiated = self.is_multipath_negotiated();
1571                for path_id in self.spaces[space_id]
1572                    .number_spaces
1573                    .iter()
1574                    .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1575                    .map(|(&path_id, _)| path_id)
1576                    .collect::<Vec<_>>()
1577                {
1578                    Self::populate_acks(
1579                        now,
1580                        self.receiving_ecn,
1581                        path_id,
1582                        space_id,
1583                        &mut self.spaces[space_id],
1584                        is_multipath_negotiated,
1585                        &mut builder,
1586                        &mut self.stats.frame_tx,
1587                        self.crypto_state.has_keys(space_id.encryption_level()),
1588                    );
1589                }
1590
1591                // Since there only 64 ACK frames there will always be enough space
1592                // to encode the ConnectionClose frame too. However we still have the
1593                // check here to prevent crashes if something changes.
1594
1595                // TODO(flub): This needs fixing for multipath, to ensure we can always
1596                //    write the CONNECTION_CLOSE even if we have many PATH_ACKs to send:
1597                //    https://github.com/n0-computer/noq/issues/367.
1598                debug_assert!(
1599                    builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1600                    "ACKs should leave space for ConnectionClose"
1601                );
1602                let stats = &mut self.stats.frame_tx;
1603                if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1604                    let max_frame_size = builder.frame_space_remaining();
1605                    let close: Close = match self.state.as_type() {
1606                        StateType::Closed => {
1607                            let reason: Close =
1608                                self.state.as_closed().expect("checked").clone().into();
1609                            if space_id == SpaceId::Data || reason.is_transport_layer() {
1610                                reason
1611                            } else {
1612                                TransportError::APPLICATION_ERROR("").into()
1613                            }
1614                        }
1615                        StateType::Draining => TransportError::NO_ERROR("").into(),
1616                        _ => unreachable!(
1617                            "tried to make a close packet when the connection wasn't closed"
1618                        ),
1619                    };
1620                    builder.write_frame(close.encoder(max_frame_size), stats);
1621                }
1622                let last_pn = builder.packet_number;
1623                builder.finish_and_track(now, self, path_id, pad_datagram);
1624                if space_id.kind() == self.highest_space {
1625                    // Don't send another close packet. Even with multipath we only send
1626                    // CONNECTION_CLOSE on a single path since we expect our paths to work.
1627                    self.connection_close_pending = false;
1628                }
1629                // Send a close frame in every possible space for robustness, per
1630                // RFC9000 "Immediate Close during the Handshake". Don't bother trying
1631                // to send anything else.
1632                // TODO(flub): This breaks during the handshake if we can not coalesce
1633                //    packets due to space reasons: the next space would either fail a
1634                //    debug_assert checking for enough packet space or produce an invalid
1635                //    packet. We need to keep track of per-space pending CONNECTION_CLOSE to
1636                //    be able to send these across multiple calls to poll_transmit. Then
1637                //    check for coalescing space here because initial packets need to be in
1638                //    padded datagrams. And also add space checks for CONNECTION_CLOSE in
1639                //    space_can_send so it would stop a GSO batch if the datagram is too
1640                //    small for another CONNECTION_CLOSE packet.
1641                return PollPathSpaceStatus::WrotePacket {
1642                    last_packet_number: last_pn,
1643                    pad_datagram,
1644                };
1645            }
1646
1647            self.populate_packet(now, space_id, path_id, scheduling_info, &mut builder);
1648
1649            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
1650            // any other reason, there is a bug which leads to one component announcing write
1651            // readiness while not writing any data. This degrades performance. The condition is
1652            // only checked if the full MTU is available and when potentially large fixed-size
1653            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
1654            // writing ACKs.
1655            debug_assert!(
1656                !(builder.sent_frames().is_ack_only(&self.streams)
1657                    && !can_send.acks
1658                    && (can_send.other || can_send.space_specific)
1659                    && builder.buf.segment_size()
1660                        == self.path_data(path_id).current_mtu() as usize
1661                    && self.datagrams.outgoing.is_empty()),
1662                "SendableFrames was {can_send:?}, but only ACKs have been written"
1663            );
1664            if builder.sent_frames().requires_padding {
1665                pad_datagram |= PadDatagram::ToMinMtu;
1666            }
1667
1668            for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1669                self.spaces[space_id]
1670                    .for_path(*path_id)
1671                    .pending_acks
1672                    .acks_sent();
1673                self.timers.stop(
1674                    Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1675                    self.qlog.with_time(now),
1676                );
1677            }
1678
1679            // Now we need to finish the packet.  Before we do so we need to know if we will
1680            // be coalescing the next packet into this one, or will be ending the datagram
1681            // as well.  Because if this is the last packet in the datagram more padding
1682            // might be needed because of the packet type, or to fill the GSO segment size.
1683
1684            // Are we allowed to coalesce AND is there enough space for another *packet* in
1685            // this datagram AND will we definitely send another packet?
1686            if builder.can_coalesce && path_id == PathId::ZERO && {
1687                let max_packet_size = builder
1688                    .buf
1689                    .datagram_remaining_mut()
1690                    .saturating_sub(builder.predict_packet_end());
1691                max_packet_size > MIN_PACKET_SPACE
1692                    && self.has_pending_packet(space_id, max_packet_size, connection_close_pending)
1693            } {
1694                // We can append/coalesce the next packet into the current
1695                // datagram. Finish the current packet without adding extra padding.
1696                trace!("will coalesce with next packet");
1697                builder.finish_and_track(now, self, path_id, PadDatagram::No);
1698            } else {
1699                // We need a new datagram for the next packet.  Finish the current
1700                // packet with padding.
1701                // TODO(flub): if there isn't any more data to be sent, this will still pad
1702                //    to the segment size and only discover there is nothing to send before
1703                //    starting the next packet. That is wasting up to 32 bytes.
1704                if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1705                    // If too many padding bytes would be required to continue the
1706                    // GSO batch after this packet, end the GSO batch here. Ensures
1707                    // that fixed-size frames with heterogeneous sizes
1708                    // (e.g. application datagrams) won't inadvertently waste large
1709                    // amounts of bandwidth. The exact threshold is a bit arbitrary
1710                    // and might benefit from further tuning, though there's no
1711                    // universally optimal value.
1712                    const MAX_PADDING: usize = 32;
1713                    if builder.buf.datagram_remaining_mut()
1714                        > builder.predict_packet_end() + MAX_PADDING
1715                    {
1716                        trace!(
1717                            "GSO truncated by demand for {} padding bytes",
1718                            builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1719                        );
1720                        let last_pn = builder.packet_number;
1721                        builder.finish_and_track(now, self, path_id, PadDatagram::No);
1722                        return PollPathSpaceStatus::Send {
1723                            last_packet_number: last_pn,
1724                        };
1725                    }
1726
1727                    // Pad the current datagram to GSO segment size so it can be
1728                    // included in the GSO batch.
1729                    builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1730                } else {
1731                    builder.finish_and_track(now, self, path_id, pad_datagram);
1732                }
1733
1734                // If this is the first datagram we set the segment size to the size of the
1735                // first datagram.
1736                if transmit.num_datagrams() == 1 {
1737                    transmit.clip_segment_size();
1738                }
1739            }
1740        }
1741    }
1742
1743    fn poll_transmit_mtu_probe(
1744        &mut self,
1745        now: Instant,
1746        buf: &mut Vec<u8>,
1747        path_id: PathId,
1748    ) -> Option<Transmit> {
1749        let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?;
1750
1751        // We are definitely sending a DPLPMTUD probe.
1752        let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize);
1753        transmit.start_new_datagram_with_size(probe_size as usize);
1754
1755        let mut builder = PacketBuilder::new(
1756            now,
1757            SpaceId::Data,
1758            path_id,
1759            active_cid,
1760            &mut transmit,
1761            true,
1762            self,
1763        )?;
1764
1765        // We implement MTU probes as ping packets padded up to the probe size
1766        trace!(?probe_size, "writing MTUD probe");
1767        builder.write_frame(frame::Ping, &mut self.stats.frame_tx);
1768
1769        // If supported by the peer, we want no delays to the probe's ACK
1770        if self.peer_supports_ack_frequency() {
1771            builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx);
1772        }
1773
1774        builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1775
1776        self.path_stats
1777            .entry(path_id)
1778            .or_default()
1779            .sent_plpmtud_probes += 1;
1780
1781        Some(self.build_transmit(path_id, transmit))
1782    }
1783
1784    /// Returns the CID and probe size if a DPLPMTUD probe is needed.
1785    ///
1786    /// We MTU probe all paths for which all of the following is true:
1787    /// - We have an active destination CID for the path.
1788    /// - The remote address *and* path are validated.
1789    /// - The path is not abandoned.
1790    /// - The MTU Discovery subsystem wants to probe the path.
1791    fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> {
1792        let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?;
1793        let is_eligible = self.path_data(path_id).validated
1794            && !self.path_data(path_id).is_validating_path()
1795            && !self.abandoned_paths.contains(&path_id);
1796
1797        if !is_eligible {
1798            return None;
1799        }
1800        let next_pn = self.spaces[SpaceId::Data]
1801            .for_path(path_id)
1802            .peek_tx_number();
1803        let probe_size = self
1804            .path_data_mut(path_id)
1805            .mtud
1806            .poll_transmit(now, next_pn)?;
1807
1808        Some((active_cid, probe_size))
1809    }
1810
1811    /// Returns true if there is a further packet to send on [`PathId::ZERO`].
1812    ///
1813    /// In other words this is predicting whether the next call to
1814    /// [`Connection::space_can_send`] issued will return some frames to be sent. Including
1815    /// having to predict which packet number space it will be invoked with. This depends on
1816    /// how both [`Connection::poll_transmit_on_path`] and
1817    /// [`Connection::poll_transmit_path_space`] behave.
1818    ///
1819    /// This is needed to determine if packet coalescing can happen. Because the last packet
1820    /// in a datagram may need to be padded and thus we must know if another packet will
1821    /// follow or not.
1822    ///
1823    /// The next packet can be either in the same space, or in one of the following spaces
1824    /// on the same path. Because a 0-RTT packet can be coalesced with a 1-RTT packet and
1825    /// both are in the Data(PathId::ZERO) space. Previous spaces are not checked, because
1826    /// packets are built from Initial to Handshake to Data spaces.
1827    fn has_pending_packet(
1828        &mut self,
1829        current_space_id: SpaceId,
1830        max_packet_size: usize,
1831        connection_close_pending: bool,
1832    ) -> bool {
1833        let mut space_id = current_space_id;
1834        loop {
1835            let can_send = self.space_can_send(
1836                space_id,
1837                PathId::ZERO,
1838                max_packet_size,
1839                connection_close_pending,
1840            );
1841            if !can_send.is_empty() {
1842                return true;
1843            }
1844            match space_id.next() {
1845                Some(next_space_id) => space_id = next_space_id,
1846                None => break,
1847            }
1848        }
1849        false
1850    }
1851
1852    /// Checks if creating a new datagram would be blocked by congestion control
1853    fn path_congestion_check(
1854        &mut self,
1855        space_id: SpaceId,
1856        path_id: PathId,
1857        transmit: &TransmitBuf<'_>,
1858        can_send: &SendableFrames,
1859        now: Instant,
1860    ) -> PathBlocked {
1861        // Anti-amplification is only based on `total_sent`, which gets updated after
1862        // the transmit is sent. Therefore we pass the amount of bytes for datagrams
1863        // that are already created, as well as 1 byte for starting another datagram. If
1864        // there is any anti-amplification budget left, we always allow a full MTU to be
1865        // sent (see https://github.com/quinn-rs/quinn/issues/1082).
1866        if self.side().is_server()
1867            && self
1868                .path_data(path_id)
1869                .anti_amplification_blocked(transmit.len() as u64 + 1)
1870        {
1871            trace!(?space_id, %path_id, "blocked by anti-amplification");
1872            return PathBlocked::AntiAmplification;
1873        }
1874
1875        // Congestion control check.
1876        // Tail loss probes must not be blocked by congestion, or a deadlock could arise.
1877        let bytes_to_send = transmit.segment_size() as u64;
1878        let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1879
1880        if can_send.other && !need_loss_probe && !can_send.close {
1881            let path = self.path_data(path_id);
1882            if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1883                trace!(
1884                    ?space_id,
1885                    %path_id,
1886                    in_flight=%path.in_flight.bytes,
1887                    congestion_window=%path.congestion.window(),
1888                    "blocked by congestion control",
1889                );
1890                return PathBlocked::Congestion;
1891            }
1892        }
1893
1894        // Pacing check.
1895        if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1896            self.timers.set(
1897                Timer::PerPath(path_id, PathTimer::Pacing),
1898                delay,
1899                self.qlog.with_time(now),
1900            );
1901            // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though
1902            // they are not congestion controlled.
1903            trace!(?space_id, %path_id, "blocked by pacing");
1904            return PathBlocked::Pacing;
1905        }
1906
1907        PathBlocked::No
1908    }
1909
1910    /// Send PATH_CHALLENGE for a previous path if necessary
1911    ///
1912    /// QUIC-TRANSPORT section 9.3.3
1913    /// <https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding>
1914    fn send_prev_path_challenge(
1915        &mut self,
1916        now: Instant,
1917        buf: &mut Vec<u8>,
1918        path_id: PathId,
1919    ) -> Option<Transmit> {
1920        let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1921        // TODO (matheus23): We could use !prev_path.is_validating() here instead to
1922        // (possibly) also re-send challenges when they get lost.
1923        if !prev_path.pending_on_path_challenge {
1924            return None;
1925        };
1926        prev_path.pending_on_path_challenge = false;
1927        let token = self.rng.random();
1928        let network_path = prev_path.network_path;
1929        prev_path.record_path_challenge_sent(now, token, network_path);
1930
1931        debug_assert_eq!(
1932            self.highest_space,
1933            SpaceKind::Data,
1934            "PATH_CHALLENGE queued without 1-RTT keys"
1935        );
1936        let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1937        buf.start_new_datagram();
1938
1939        // Use the previous CID to avoid linking the new path with the previous path. We
1940        // don't bother accounting for possible retirement of that prev_cid because this is
1941        // sent once, immediately after migration, when the CID is known to be valid. Even
1942        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1943        // this is sent first.
1944        let mut builder =
1945            PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, false, self)?;
1946        let challenge = frame::PathChallenge(token);
1947        let stats = &mut self.stats.frame_tx;
1948        builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1949
1950        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1951        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1952        // unless the anti-amplification limit for the path does not permit
1953        // sending a datagram of this size
1954        builder.pad_to(MIN_INITIAL_SIZE);
1955
1956        builder.finish(self, now);
1957        self.stats.udp_tx.on_sent(1, buf.len());
1958        self.path_stats
1959            .entry(path_id)
1960            .or_default()
1961            .udp_tx
1962            .on_sent(1, buf.len());
1963
1964        Some(Transmit {
1965            destination: network_path.remote,
1966            size: buf.len(),
1967            ecn: None,
1968            segment_size: None,
1969            src_ip: network_path.local_ip,
1970        })
1971    }
1972
1973    fn send_off_path_path_response(
1974        &mut self,
1975        now: Instant,
1976        buf: &mut Vec<u8>,
1977        path_id: PathId,
1978    ) -> Option<Transmit> {
1979        let path = self.paths.get_mut(&path_id).map(|state| &mut state.data)?;
1980        let cid_queue = self.remote_cids.get_mut(&path_id)?;
1981        let (token, network_path) = path.path_responses.pop_off_path(path.network_path)?;
1982
1983        let cid = cid_queue
1984            .next_reserved()
1985            .unwrap_or_else(|| cid_queue.active());
1986        // TODO(@divma): we should take a different approach when there is no fresh CID to use.
1987        // https://github.com/quinn-rs/quinn/issues/2184
1988
1989        let frame = frame::PathResponse(token);
1990
1991        let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1992        buf.start_new_datagram();
1993
1994        let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, false, self)?;
1995        let stats = &mut self.stats.frame_tx;
1996        builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
1997        builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
1998
1999        let size = buf.len();
2000
2001        self.stats.udp_tx.on_sent(1, size);
2002        self.path_stats
2003            .entry(path_id)
2004            .or_default()
2005            .udp_tx
2006            .on_sent(1, size);
2007        Some(Transmit {
2008            destination: network_path.remote,
2009            size,
2010            ecn: None,
2011            segment_size: None,
2012            src_ip: network_path.local_ip,
2013        })
2014    }
2015
2016    /// Send a nat traversal challenge (off-path) on this path if possible.
2017    ///
2018    /// This will ensure the path still has a remaining CID to use if the active one should be
2019    /// retired.
2020    fn send_nat_traversal_path_challenge(
2021        &mut self,
2022        now: Instant,
2023        buf: &mut Vec<u8>,
2024        path_id: PathId,
2025    ) -> Option<Transmit> {
2026        let server_side = self.n0_nat_traversal.server_side_mut().ok()?;
2027        let probe = server_side.next_probe()?;
2028        if !self.paths.get(&path_id)?.data.validated {
2029            // Path is not usable for probing
2030            return None;
2031        }
2032
2033        let remote_cids = self.remote_cids.get_mut(&path_id)?;
2034
2035        // Check if this path has enough CIDs to send a probe. One to be reserved, one in case the
2036        // active CID needs to be retired.
2037        if remote_cids.remaining() < 2 {
2038            return None;
2039        }
2040
2041        let cid = remote_cids.next_reserved()?;
2042        let remote = probe.remote();
2043        let token = self.rng.random();
2044        probe.mark_as_sent();
2045
2046        let frame = frame::PathChallenge(token);
2047
2048        let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2049        buf.start_new_datagram();
2050
2051        let mut builder =
2052            PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, false, self)?;
2053        let stats = &mut self.stats.frame_tx;
2054        builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
2055        builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
2056
2057        let path = &mut self.paths.get_mut(&path_id).expect("checked").data;
2058        let network_path = FourTuple {
2059            remote,
2060            local_ip: None,
2061        };
2062
2063        path.record_path_challenge_sent(now, token, network_path);
2064
2065        let size = buf.len();
2066
2067        self.stats.udp_tx.on_sent(1, size);
2068        self.path_stats
2069            .entry(path_id)
2070            .or_default()
2071            .udp_tx
2072            .on_sent(1, size);
2073
2074        Some(Transmit {
2075            destination: remote,
2076            size,
2077            ecn: None,
2078            segment_size: None,
2079            src_ip: None,
2080        })
2081    }
2082
2083    /// Indicate what types of frames are ready to send for the given space.
2084    ///
2085    /// Only for on-path data.
2086    ///
2087    /// *packet_size* is the number of bytes available to build the next packet.
2088    /// *connection_close_pending* indicates whether a CONNECTION_CLOSE frame needs to be
2089    /// sent.
2090    fn space_can_send(
2091        &mut self,
2092        space_id: SpaceId,
2093        path_id: PathId,
2094        packet_size: usize,
2095        connection_close_pending: bool,
2096    ) -> SendableFrames {
2097        let space = &mut self.spaces[space_id];
2098        let space_has_crypto = self.crypto_state.has_keys(space_id.encryption_level());
2099
2100        if !space_has_crypto
2101            && (space_id != SpaceId::Data
2102                || !self.crypto_state.has_keys(EncryptionLevel::ZeroRtt)
2103                || self.side.is_server())
2104        {
2105            // Nothing to send in this space
2106            return SendableFrames::empty();
2107        }
2108
2109        let mut can_send = space.can_send(path_id, &self.streams);
2110
2111        // Check for 1RTT space.
2112        if space_id == SpaceId::Data {
2113            let pn = space.for_path(path_id).peek_tx_number();
2114            // Number of bytes available for frames if this is a 1-RTT packet. We're
2115            // guaranteed to be able to send an individual frame at least this large in the
2116            // next 1-RTT packet. This could be generalized to support every space, but it's
2117            // only needed to handle large fixed-size frames, which only exist in 1-RTT
2118            // (application datagrams).
2119            let frame_space_1rtt =
2120                packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
2121            can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
2122        }
2123
2124        can_send.close = connection_close_pending && space_has_crypto;
2125
2126        can_send
2127    }
2128
2129    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
2130    ///
2131    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
2132    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
2133    /// extracted through the relevant methods.
2134    pub fn handle_event(&mut self, event: ConnectionEvent) {
2135        use ConnectionEventInner::*;
2136        match event.0 {
2137            Datagram(DatagramConnectionEvent {
2138                now,
2139                network_path,
2140                path_id,
2141                ecn,
2142                first_decode,
2143                remaining,
2144            }) => {
2145                let span = trace_span!("pkt", %path_id);
2146                let _guard = span.enter();
2147
2148                if self.update_network_path_or_discard(network_path, path_id) {
2149                    // A return value of true indicates we should discard this packet.
2150                    return;
2151                }
2152
2153                let was_anti_amplification_blocked = self
2154                    .path(path_id)
2155                    .map(|path| path.anti_amplification_blocked(1))
2156                    // We never tried to send on an non-existing (new) path so have not been
2157                    // anti-amplification blocked for it previously.
2158                    .unwrap_or(false);
2159
2160                self.stats.udp_rx.datagrams += 1;
2161                self.stats.udp_rx.bytes += first_decode.len() as u64;
2162                let rx = &mut self.path_stats.entry(path_id).or_default().udp_rx;
2163                rx.datagrams += 1;
2164                rx.bytes += first_decode.len() as u64;
2165                let data_len = first_decode.len();
2166
2167                self.handle_decode(now, network_path, path_id, ecn, first_decode);
2168                // The current `path` might have changed inside `handle_decode` since the packet
2169                // could have triggered a migration. The packet might also belong to an unknown
2170                // path and have been rejected. Make sure the data received is accounted for the
2171                // most recent path by accessing `path` after `handle_decode`.
2172                if let Some(path) = self.path_mut(path_id) {
2173                    path.inc_total_recvd(data_len as u64);
2174                }
2175
2176                if let Some(data) = remaining {
2177                    self.stats.udp_rx.bytes += data.len() as u64;
2178                    self.path_stats.entry(path_id).or_default().udp_rx.bytes += data.len() as u64;
2179                    self.handle_coalesced(now, network_path, path_id, ecn, data);
2180                }
2181
2182                if let Some(path) = self.paths.get_mut(&path_id) {
2183                    self.qlog
2184                        .emit_recovery_metrics(path_id, &mut path.data, now);
2185                }
2186
2187                if was_anti_amplification_blocked {
2188                    // A prior attempt to set the loss detection timer may have failed due to
2189                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
2190                    // the server's first flight is lost.
2191                    self.set_loss_detection_timer(now, path_id);
2192                }
2193            }
2194            NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
2195                let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
2196                debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
2197                let cid_state = self
2198                    .local_cid_state
2199                    .entry(path_id)
2200                    .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
2201                cid_state.new_cids(&ids, now);
2202
2203                ids.into_iter().rev().for_each(|frame| {
2204                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
2205                });
2206                // Always update Timer::PushNewCid
2207                self.reset_cid_retirement(now);
2208            }
2209        }
2210    }
2211
2212    /// Updates the network path for `path_id`.
2213    ///
2214    /// Returns true if a packet coming in for this `path_id` over given `network_path` should be discarded.
2215    /// Returns false if the path was updated and the packet doesn't need to be discarded.
2216    fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
2217        let remote_may_migrate = self.side.remote_may_migrate(&self.state);
2218        let local_ip_may_migrate = self.side.is_client();
2219        // If this packet could initiate a migration and we're a client or a server that
2220        // forbids migration, drop the datagram. This could be relaxed to heuristically
2221        // permit NAT-rebinding-like migration.
2222        if let Some(known_path) = self.path_mut(path_id) {
2223            if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
2224                trace!(
2225                    %path_id,
2226                    %network_path,
2227                    %known_path.network_path,
2228                    "discarding packet from unrecognized peer"
2229                );
2230                return true;
2231            }
2232
2233            if known_path.network_path.local_ip.is_some()
2234                && network_path.local_ip.is_some()
2235                && known_path.network_path.local_ip != network_path.local_ip
2236                && !local_ip_may_migrate
2237            {
2238                trace!(
2239                    %path_id,
2240                    %network_path,
2241                    %known_path.network_path,
2242                    "discarding packet sent to incorrect interface"
2243                );
2244                return true;
2245            }
2246            // If the datagram indicates that we've changed our local IP, we update it.
2247            // This is alluded to in Section 5.2 of the Multipath RFC draft 18:
2248            // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#name-using-multiple-paths-on-the
2249            // > Client receives the packet, recognizes a path migration, updates the source address of path 2 to 192.0.2.1.
2250            if let Some(local_ip) = network_path.local_ip {
2251                if known_path
2252                    .network_path
2253                    .local_ip
2254                    .is_some_and(|ip| ip != local_ip)
2255                {
2256                    debug!(
2257                        %path_id,
2258                        %network_path,
2259                        %known_path.network_path,
2260                        "path's local address seemingly migrated"
2261                    );
2262                }
2263                // We update the address without path validation on the client side.
2264                // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#section-5.1
2265                // > Servers observing a 4-tuple change will perform path validation (see Section 9 of [QUIC-TRANSPORT]).
2266                // This sounds like it's *only* the server endpoints that do this.
2267                // TODO(matheus23): We should still consider doing a proper migration on the client side in the future.
2268                // For now, this preserves the behavior of this code pre 4-tuple tracking.
2269                known_path.network_path.local_ip = Some(local_ip);
2270            }
2271        }
2272        false
2273    }
2274
2275    /// Process timer expirations
2276    ///
2277    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
2278    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
2279    /// methods.
2280    ///
2281    /// It is most efficient to call this immediately after the system clock reaches the latest
2282    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
2283    /// no-op and therefore are safe.
2284    pub fn handle_timeout(&mut self, now: Instant) {
2285        while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
2286            // TODO(@divma): remove `at` when the unicorn is born
2287            trace!(?timer, at=?now, "timeout");
2288            match timer {
2289                Timer::Conn(timer) => match timer {
2290                    ConnTimer::Close => {
2291                        self.state.move_to_drained(None);
2292                        // move_to_drained checks that we weren't in drained before.
2293                        // Adding events to endpoint_events is only legal if `Drained` was never queued before.
2294                        self.endpoint_events.push_back(EndpointEventInner::Drained);
2295                    }
2296                    ConnTimer::Idle => {
2297                        self.kill(ConnectionError::TimedOut);
2298                    }
2299                    ConnTimer::KeepAlive => {
2300                        trace!("sending keep-alive");
2301                        self.ping();
2302                    }
2303                    ConnTimer::KeyDiscard => {
2304                        self.crypto_state.discard_temporary_keys();
2305                    }
2306                    ConnTimer::PushNewCid => {
2307                        while let Some((path_id, when)) = self.next_cid_retirement() {
2308                            if when > now {
2309                                break;
2310                            }
2311                            match self.local_cid_state.get_mut(&path_id) {
2312                                None => error!(%path_id, "No local CID state for path"),
2313                                Some(cid_state) => {
2314                                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
2315                                    let num_new_cid = cid_state.on_cid_timeout().into();
2316                                    if !self.state.is_closed() {
2317                                        trace!(
2318                                            "push a new CID to peer RETIRE_PRIOR_TO field {}",
2319                                            cid_state.retire_prior_to()
2320                                        );
2321                                        self.endpoint_events.push_back(
2322                                            EndpointEventInner::NeedIdentifiers(
2323                                                path_id,
2324                                                now,
2325                                                num_new_cid,
2326                                            ),
2327                                        );
2328                                    }
2329                                }
2330                            }
2331                        }
2332                    }
2333                },
2334                // TODO: add path_id as span somehow
2335                Timer::PerPath(path_id, timer) => {
2336                    let span = trace_span!("per-path timer fired", %path_id, ?timer);
2337                    let _guard = span.enter();
2338                    match timer {
2339                        PathTimer::PathIdle => {
2340                            if let Err(err) =
2341                                self.close_path_inner(now, path_id, PathAbandonReason::TimedOut)
2342                            {
2343                                warn!(?err, "failed closing path");
2344                            }
2345                        }
2346
2347                        PathTimer::PathKeepAlive => {
2348                            trace!("sending keep-alive on path");
2349                            self.ping_path(path_id).ok();
2350                        }
2351                        PathTimer::LossDetection => {
2352                            self.on_loss_detection_timeout(now, path_id);
2353                            self.qlog.emit_recovery_metrics(
2354                                path_id,
2355                                &mut self.paths.get_mut(&path_id).unwrap().data,
2356                                now,
2357                            );
2358                        }
2359                        PathTimer::PathValidation => {
2360                            let Some(path) = self.paths.get_mut(&path_id) else {
2361                                continue;
2362                            };
2363                            self.timers.stop(
2364                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2365                                self.qlog.with_time(now),
2366                            );
2367                            debug!("path validation failed");
2368                            if let Some((_, prev)) = path.prev.take() {
2369                                path.data = prev;
2370                            }
2371                            path.data.reset_on_path_challenges();
2372                        }
2373                        PathTimer::PathChallengeLost => {
2374                            let Some(path) = self.paths.get_mut(&path_id) else {
2375                                continue;
2376                            };
2377                            trace!("path challenge deemed lost");
2378                            path.data.pending_on_path_challenge = true;
2379                        }
2380                        PathTimer::PathOpen => {
2381                            let Some(path) = self.paths.get_mut(&path_id) else {
2382                                continue;
2383                            };
2384                            path.data.reset_on_path_challenges();
2385                            self.timers.stop(
2386                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2387                                self.qlog.with_time(now),
2388                            );
2389                            debug!("new path validation failed");
2390                            if let Err(err) = self.close_path_inner(
2391                                now,
2392                                path_id,
2393                                PathAbandonReason::ValidationFailed,
2394                            ) {
2395                                warn!(?err, "failed closing path");
2396                            }
2397                        }
2398                        PathTimer::Pacing => trace!("pacing timer expired"),
2399                        PathTimer::MaxAckDelay => {
2400                            trace!("max ack delay reached");
2401                            // This timer is only armed in the Data space
2402                            self.spaces[SpaceId::Data]
2403                                .for_path(path_id)
2404                                .pending_acks
2405                                .on_max_ack_delay_timeout()
2406                        }
2407                        PathTimer::DiscardPath => {
2408                            // The path was abandoned and 3*PTO has expired since.  Clean up all
2409                            // remaining state and install stateless reset token.
2410                            self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2411                            if let Some(local_cid_state) = self.local_cid_state.remove(&path_id) {
2412                                debug_assert!(!self.state.is_drained()); // requirement for endpoint_events. All timers should be cleared in drained connections.
2413                                let (min_seq, max_seq) = local_cid_state.active_seq();
2414                                for seq in min_seq..=max_seq {
2415                                    self.endpoint_events.push_back(
2416                                        EndpointEventInner::RetireConnectionId(
2417                                            now, path_id, seq, false,
2418                                        ),
2419                                    );
2420                                }
2421                            }
2422                            self.discard_path(path_id, now);
2423                        }
2424                    }
2425                }
2426            }
2427        }
2428    }
2429
2430    /// Close a connection immediately
2431    ///
2432    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
2433    /// call this only when all important communications have been completed, e.g. by calling
2434    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
2435    /// [`StreamEvent::Finished`] event.
2436    ///
2437    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
2438    /// delivered. There may still be data from the peer that has not been received.
2439    ///
2440    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
2441    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2442        self.close_inner(
2443            now,
2444            Close::Application(frame::ApplicationClose { error_code, reason }),
2445        )
2446    }
2447
2448    /// Close the connection immediately, initiated by an API call.
2449    ///
2450    /// This will not produce a [`ConnectionLost`] event propagated by the
2451    /// [`Connection::poll`] call, because the API call already propagated the error to the
2452    /// user.
2453    ///
2454    /// Not to be used when entering immediate close due to an internal state change based
2455    /// on an event. See [`State::move_to_closed_local`] for details.
2456    ///
2457    /// This initiates immediate close from
2458    /// <https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2>, moving to the closed
2459    /// state.
2460    ///
2461    /// [`ConnectionLost`]: crate::Event::ConnectionLost
2462    /// [`Connection::poll`]: super::Connection::poll
2463    fn close_inner(&mut self, now: Instant, reason: Close) {
2464        let was_closed = self.state.is_closed();
2465        if !was_closed {
2466            self.close_common();
2467            self.set_close_timer(now);
2468            self.connection_close_pending = true;
2469            self.state.move_to_closed_local(reason);
2470        }
2471    }
2472
2473    /// Control datagrams
2474    pub fn datagrams(&mut self) -> Datagrams<'_> {
2475        Datagrams { conn: self }
2476    }
2477
2478    /// Returns connection statistics
2479    pub fn stats(&mut self) -> ConnectionStats {
2480        self.stats.clone()
2481    }
2482
2483    /// Returns path statistics
2484    pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2485        let path = self.paths.get(&path_id)?;
2486        let stats = self.path_stats.entry(path_id).or_default();
2487        stats.rtt = path.data.rtt.get();
2488        stats.cwnd = path.data.congestion.window();
2489        stats.current_mtu = path.data.mtud.current_mtu();
2490        Some(*stats)
2491    }
2492
2493    /// Ping the remote endpoint
2494    ///
2495    /// Causes an ACK-eliciting packet to be transmitted on the connection.
2496    pub fn ping(&mut self) {
2497        // TODO(flub): This is very brute-force: it pings *all* the paths.  Instead it would
2498        //    be nice if we could only send a single packet for this.
2499        for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2500            path_data.ping_pending = true;
2501        }
2502    }
2503
2504    /// Ping the remote endpoint over a specific path
2505    ///
2506    /// Causes an ACK-eliciting packet to be transmitted on the path.
2507    pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2508        let path_data = self.spaces[self.highest_space]
2509            .number_spaces
2510            .get_mut(&path)
2511            .ok_or(ClosedPath { _private: () })?;
2512        path_data.ping_pending = true;
2513        Ok(())
2514    }
2515
2516    /// Update traffic keys spontaneously
2517    ///
2518    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
2519    pub fn force_key_update(&mut self) {
2520        if !self.state.is_established() {
2521            debug!("ignoring forced key update in illegal state");
2522            return;
2523        }
2524        if self.crypto_state.prev_crypto.is_some() {
2525            // We already just updated, or are currently updating, the keys. Concurrent key updates
2526            // are illegal.
2527            debug!("ignoring redundant forced key update");
2528            return;
2529        }
2530        self.crypto_state.update_keys(None, false);
2531    }
2532
2533    /// Get a session reference
2534    pub fn crypto_session(&self) -> &dyn crypto::Session {
2535        self.crypto_state.session.as_ref()
2536    }
2537
2538    /// Whether the connection is in the process of being established
2539    ///
2540    /// If this returns `false`, the connection may be either established or closed, signaled by the
2541    /// emission of a `Connected` or `ConnectionLost` message respectively.
2542    pub fn is_handshaking(&self) -> bool {
2543        self.state.is_handshake()
2544    }
2545
2546    /// Whether the connection is closed
2547    ///
2548    /// Closed connections cannot transport any further data. A connection becomes closed when
2549    /// either peer application intentionally closes it, or when either transport layer detects an
2550    /// error such as a time-out or certificate validation failure.
2551    ///
2552    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
2553    pub fn is_closed(&self) -> bool {
2554        self.state.is_closed()
2555    }
2556
2557    /// Whether there is no longer any need to keep the connection around
2558    ///
2559    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
2560    /// packets from the peer. All drained connections have been closed.
2561    pub fn is_drained(&self) -> bool {
2562        self.state.is_drained()
2563    }
2564
2565    /// For clients, if the peer accepted the 0-RTT data packets
2566    ///
2567    /// The value is meaningless until after the handshake completes.
2568    pub fn accepted_0rtt(&self) -> bool {
2569        self.crypto_state.accepted_0rtt
2570    }
2571
2572    /// Whether 0-RTT is/was possible during the handshake
2573    pub fn has_0rtt(&self) -> bool {
2574        self.crypto_state.zero_rtt_enabled
2575    }
2576
2577    /// Whether there are any pending retransmits
2578    pub fn has_pending_retransmits(&self) -> bool {
2579        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2580    }
2581
2582    /// Look up whether we're the client or server of this Connection
2583    pub fn side(&self) -> Side {
2584        self.side.side()
2585    }
2586
2587    /// Get the address observed by the remote over the given path
2588    pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2589        self.path(path_id)
2590            .map(|path_data| {
2591                path_data
2592                    .last_observed_addr_report
2593                    .as_ref()
2594                    .map(|observed| observed.socket_addr())
2595            })
2596            .ok_or(ClosedPath { _private: () })
2597    }
2598
2599    /// Current best estimate of this connection's latency (round-trip-time)
2600    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2601        self.path(path_id).map(|d| d.rtt.get())
2602    }
2603
2604    /// Current state of this connection's congestion controller, for debugging purposes
2605    pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2606        self.path(path_id).map(|d| d.congestion.as_ref())
2607    }
2608
2609    /// Modify the number of remotely initiated streams that may be concurrently open
2610    ///
2611    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
2612    /// `count`s increase both minimum and worst-case memory consumption.
2613    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2614        self.streams.set_max_concurrent(dir, count);
2615        // If the limit was reduced, then a flow control update previously deemed insignificant may
2616        // now be significant.
2617        let pending = &mut self.spaces[SpaceId::Data].pending;
2618        self.streams.queue_max_stream_id(pending);
2619    }
2620
2621    /// Modify the number of open paths allowed when multipath is enabled
2622    ///
2623    /// When reducing the number of concurrent paths this will only affect delaying sending
2624    /// new MAX_PATH_ID frames until fewer than this number of paths are possible.  To
2625    /// actively reduce paths they must be closed using [`Connection::close_path`], which
2626    /// can also be used to close not-yet-opened paths.
2627    ///
2628    /// If multipath is not negotiated (see the [`TransportConfig`]) this can not enable
2629    /// multipath and will fail.
2630    pub fn set_max_concurrent_paths(
2631        &mut self,
2632        now: Instant,
2633        count: NonZeroU32,
2634    ) -> Result<(), MultipathNotNegotiated> {
2635        if !self.is_multipath_negotiated() {
2636            return Err(MultipathNotNegotiated { _private: () });
2637        }
2638        self.max_concurrent_paths = count;
2639
2640        let in_use_count = self
2641            .local_max_path_id
2642            .next()
2643            .saturating_sub(self.abandoned_paths.len() as u32)
2644            .as_u32();
2645        let extra_needed = count.get().saturating_sub(in_use_count);
2646        let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2647
2648        self.set_max_path_id(now, new_max_path_id);
2649
2650        Ok(())
2651    }
2652
2653    /// If needed, issues a new MAX_PATH_ID frame and new CIDs for any newly allowed paths
2654    fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2655        if max_path_id <= self.local_max_path_id {
2656            return;
2657        }
2658
2659        self.local_max_path_id = max_path_id;
2660        self.spaces[SpaceId::Data].pending.max_path_id = true;
2661
2662        self.issue_first_path_cids(now);
2663    }
2664
2665    /// Current number of remotely initiated streams that may be concurrently open
2666    ///
2667    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
2668    /// it will not change immediately, even if fewer streams are open. Instead, it will
2669    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
2670    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2671        self.streams.max_concurrent(dir)
2672    }
2673
2674    /// See [`TransportConfig::send_window()`]
2675    pub fn set_send_window(&mut self, send_window: u64) {
2676        self.streams.set_send_window(send_window);
2677    }
2678
2679    /// See [`TransportConfig::receive_window()`]
2680    pub fn set_receive_window(&mut self, receive_window: VarInt) {
2681        if self.streams.set_receive_window(receive_window) {
2682            self.spaces[SpaceId::Data].pending.max_data = true;
2683        }
2684    }
2685
2686    /// Whether the Multipath for QUIC extension is enabled.
2687    ///
2688    /// Multipath is only enabled after the handshake is completed and if it was enabled by both
2689    /// peers.
2690    pub fn is_multipath_negotiated(&self) -> bool {
2691        !self.is_handshaking()
2692            && self.config.max_concurrent_multipath_paths.is_some()
2693            && self.peer_params.initial_max_path_id.is_some()
2694    }
2695
2696    fn on_ack_received(
2697        &mut self,
2698        now: Instant,
2699        space: SpaceId,
2700        ack: frame::Ack,
2701    ) -> Result<(), TransportError> {
2702        // All ACKs are referencing path 0
2703        let path = PathId::ZERO;
2704        self.inner_on_ack_received(now, space, path, ack)
2705    }
2706
2707    fn on_path_ack_received(
2708        &mut self,
2709        now: Instant,
2710        space: SpaceId,
2711        path_ack: frame::PathAck,
2712    ) -> Result<(), TransportError> {
2713        let (ack, path) = path_ack.into_ack();
2714        self.inner_on_ack_received(now, space, path, ack)
2715    }
2716
2717    /// Handles an ACK frame acknowledging packets sent on *path*.
2718    fn inner_on_ack_received(
2719        &mut self,
2720        now: Instant,
2721        space: SpaceId,
2722        path: PathId,
2723        ack: frame::Ack,
2724    ) -> Result<(), TransportError> {
2725        if self.abandoned_paths.contains(&path) {
2726            // See also https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.4.3-3
2727            // > PATH_ACK frames received with an abandoned path ID are silently ignored, as specified in Section 4.
2728            trace!("silently ignoring PATH_ACK on abandoned path");
2729            return Ok(());
2730        }
2731        if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2732            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2733        }
2734        let new_largest = {
2735            let space = &mut self.spaces[space].for_path(path);
2736            if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2737                space.largest_acked_packet = Some(ack.largest);
2738                if let Some(info) = space.sent_packets.get(ack.largest) {
2739                    // This should always succeed, but a misbehaving peer might ACK a packet we
2740                    // haven't sent. At worst, that will result in us spuriously reducing the
2741                    // congestion window.
2742                    space.largest_acked_packet_sent = info.time_sent;
2743                }
2744                true
2745            } else {
2746                false
2747            }
2748        };
2749
2750        if self.detect_spurious_loss(&ack, space, path) {
2751            self.path_data_mut(path)
2752                .congestion
2753                .on_spurious_congestion_event();
2754        }
2755
2756        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
2757        let mut newly_acked = ArrayRangeSet::new();
2758        for range in ack.iter() {
2759            self.spaces[space].for_path(path).check_ack(range.clone())?;
2760            for (pn, _) in self.spaces[space]
2761                .for_path(path)
2762                .sent_packets
2763                .iter_range(range)
2764            {
2765                newly_acked.insert_one(pn);
2766            }
2767        }
2768
2769        if newly_acked.is_empty() {
2770            return Ok(());
2771        }
2772
2773        let mut ack_eliciting_acked = false;
2774        for packet in newly_acked.elts() {
2775            if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2776                for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2777                    // Assume ACKs for all packets below the largest acknowledged in
2778                    // `packet` have been received. This can cause the peer to spuriously
2779                    // retransmit if some of our earlier ACKs were lost, but allows for
2780                    // simpler state tracking. See discussion at
2781                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
2782                    if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2783                        pns.pending_acks.subtract_below(*acked_pn);
2784                    }
2785                }
2786                ack_eliciting_acked |= info.ack_eliciting;
2787
2788                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
2789                let path_data = self.path_data_mut(path);
2790                let mtu_updated = path_data.mtud.on_acked(space.kind(), packet, info.size);
2791                if mtu_updated {
2792                    path_data
2793                        .congestion
2794                        .on_mtu_update(path_data.mtud.current_mtu());
2795                }
2796
2797                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
2798                self.ack_frequency.on_acked(path, packet);
2799
2800                self.on_packet_acked(now, path, info);
2801            }
2802        }
2803
2804        let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2805        let app_limited = self.app_limited;
2806        let path_data = self.path_data_mut(path);
2807        let in_flight = path_data.in_flight.bytes;
2808
2809        path_data
2810            .congestion
2811            .on_end_acks(now, in_flight, app_limited, largest_ackd);
2812
2813        if new_largest && ack_eliciting_acked {
2814            let ack_delay = if space != SpaceId::Data {
2815                Duration::from_micros(0)
2816            } else {
2817                cmp::min(
2818                    self.ack_frequency.peer_max_ack_delay,
2819                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2820                )
2821            };
2822            let rtt = now.saturating_duration_since(
2823                self.spaces[space].for_path(path).largest_acked_packet_sent,
2824            );
2825
2826            let next_pn = self.spaces[space].for_path(path).next_packet_number;
2827            let path_data = self.path_data_mut(path);
2828            // TODO(@divma): should be a method of path, should be contained in a single place
2829            path_data.rtt.update(ack_delay, rtt);
2830            if path_data.first_packet_after_rtt_sample.is_none() {
2831                path_data.first_packet_after_rtt_sample = Some((space.kind(), next_pn));
2832            }
2833        }
2834
2835        // Must be called before crypto/pto_count are clobbered
2836        self.detect_lost_packets(now, space, path, true);
2837
2838        if self.peer_completed_address_validation(path) {
2839            self.path_data_mut(path).pto_count = 0;
2840        }
2841
2842        // Explicit congestion notification
2843        // TODO(@divma): this code is a good example of logic that should be contained in a single
2844        // place but it's split between the path data and the packet number space data, we should
2845        // find a way to make this work without two lookups
2846        if self.path_data(path).sending_ecn {
2847            if let Some(ecn) = ack.ecn {
2848                // We only examine ECN counters from ACKs that we are certain we received in transmit
2849                // order, allowing us to compute an increase in ECN counts to compare against the number
2850                // of newly acked packets that remains well-defined in the presence of arbitrary packet
2851                // reordering.
2852                if new_largest {
2853                    let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2854                    self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2855                }
2856            } else {
2857                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
2858                debug!("ECN not acknowledged by peer");
2859                self.path_data_mut(path).sending_ecn = false;
2860            }
2861        }
2862
2863        self.set_loss_detection_timer(now, path);
2864        Ok(())
2865    }
2866
2867    fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2868        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2869
2870        if lost_packets.is_empty() {
2871            return false;
2872        }
2873
2874        for range in ack.iter() {
2875            let spurious_losses: Vec<u64> = lost_packets
2876                .iter_range(range.clone())
2877                .map(|(pn, _info)| pn)
2878                .collect();
2879
2880            for pn in spurious_losses {
2881                lost_packets.remove(pn);
2882            }
2883        }
2884
2885        // If this ACK frame acknowledged all deemed lost packets,
2886        // then we have raised a spurious congestion event in the past.
2887        // We cannot conclude when there are remaining packets,
2888        // but future ACK frames might indicate a spurious loss detection.
2889        lost_packets.is_empty()
2890    }
2891
2892    /// Drain lost packets that we reasonably think will never arrive
2893    ///
2894    /// The current criterion is copied from `msquic`:
2895    /// discard packets that were sent earlier than 2 probe timeouts ago.
2896    fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2897        let two_pto = 2 * self.path_data(path).rtt.pto_base();
2898
2899        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2900        lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2901    }
2902
2903    /// Process a new ECN block from an in-order ACK
2904    fn process_ecn(
2905        &mut self,
2906        now: Instant,
2907        space: SpaceId,
2908        path: PathId,
2909        newly_acked: u64,
2910        ecn: frame::EcnCounts,
2911        largest_sent_time: Instant,
2912    ) {
2913        match self.spaces[space]
2914            .for_path(path)
2915            .detect_ecn(newly_acked, ecn)
2916        {
2917            Err(e) => {
2918                debug!("halting ECN due to verification failure: {}", e);
2919
2920                self.path_data_mut(path).sending_ecn = false;
2921                // Wipe out the existing value because it might be garbage and could interfere with
2922                // future attempts to use ECN on new paths.
2923                self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2924            }
2925            Ok(false) => {}
2926            Ok(true) => {
2927                self.path_stats.entry(path).or_default().congestion_events += 1;
2928                self.path_data_mut(path).congestion.on_congestion_event(
2929                    now,
2930                    largest_sent_time,
2931                    false,
2932                    true,
2933                    0,
2934                );
2935            }
2936        }
2937    }
2938
2939    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
2940    // high-latency handshakes
2941    fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2942        let app_limited = self.app_limited;
2943        let path = self.path_data_mut(path_id);
2944        path.remove_in_flight(&info);
2945        if info.ack_eliciting && info.path_generation == path.generation() {
2946            // Only pass ACKs to the congestion controller if it belongs to this exact
2947            // generation of the path. Otherwise we might be feeding ACKs from the previous
2948            // 4-tuple into our congestion controller.
2949            let rtt = path.rtt;
2950            path.congestion
2951                .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2952        }
2953
2954        // Update state for confirmed delivery of frames
2955        if let Some(retransmits) = info.retransmits.get() {
2956            for (id, _) in retransmits.reset_stream.iter() {
2957                self.streams.reset_acked(*id);
2958            }
2959        }
2960
2961        for frame in info.stream_frames {
2962            self.streams.received_ack_of(frame);
2963        }
2964    }
2965
2966    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceKind) {
2967        let start = if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) {
2968            now
2969        } else {
2970            self.crypto_state
2971                .prev_crypto
2972                .as_ref()
2973                .expect("no previous keys")
2974                .end_packet
2975                .as_ref()
2976                .expect("update not acknowledged yet")
2977                .1
2978        };
2979
2980        // QUIC-MULTIPATH § 2.5 Key Phase Update Process: use largest PTO of all paths.
2981        self.timers.set(
2982            Timer::Conn(ConnTimer::KeyDiscard),
2983            start + self.max_pto_for_space(space) * 3,
2984            self.qlog.with_time(now),
2985        );
2986    }
2987
2988    /// Handle a [`PathTimer::LossDetection`] timeout.
2989    ///
2990    /// This timer expires for two reasons:
2991    /// - An ACK-eliciting packet we sent should be considered lost.
2992    /// - The PTO may have expired and a tail-loss probe needs to be scheduled.
2993    ///
2994    /// The former needs us to schedule re-transmission of the lost data.
2995    ///
2996    /// The latter means we have not received an ACK for an ack-eliciting packet we sent
2997    /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
2998    /// packet, to try and elicit new acknowledgements. These new acknowledgements will
2999    /// indicate whether the previously sent packets were lost or not.
3000    fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
3001        if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
3002            // Time threshold loss Detection
3003            self.detect_lost_packets(now, pn_space, path_id, false);
3004            self.set_loss_detection_timer(now, path_id);
3005            return;
3006        }
3007
3008        let Some((_, space)) = self.pto_time_and_space(now, path_id) else {
3009            error!(%path_id, "PTO expired while unset");
3010            return;
3011        };
3012        trace!(
3013            in_flight = self.path_data(path_id).in_flight.bytes,
3014            count = self.path_data(path_id).pto_count,
3015            ?space,
3016            %path_id,
3017            "PTO fired"
3018        );
3019
3020        let count = match self.path_data(path_id).in_flight.ack_eliciting {
3021            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
3022            // deadlock preventions
3023            0 => {
3024                debug_assert!(!self.peer_completed_address_validation(path_id));
3025                1
3026            }
3027            // Conventional loss probe
3028            _ => 2,
3029        };
3030        let pns = self.spaces[space].for_path(path_id);
3031        pns.loss_probes = pns.loss_probes.saturating_add(count);
3032        let path_data = self.path_data_mut(path_id);
3033        path_data.pto_count = path_data.pto_count.saturating_add(1);
3034        self.set_loss_detection_timer(now, path_id);
3035    }
3036
3037    /// Detect any lost packets
3038    ///
3039    /// There are two cases in which we detects lost packets:
3040    ///
3041    /// - We received an ACK packet.
3042    /// - The [`PathTimer::LossDetection`] timer expired. So there is an un-acknowledged packet
3043    ///   that was followed by an acknowledged packet. The loss timer for this
3044    ///   un-acknowledged packet expired and we need to detect that packet as lost.
3045    ///
3046    /// Packets are lost if they are both (See RFC9002 §6.1):
3047    ///
3048    /// - Unacknowledged, in flight and sent prior to an acknowledged packet.
3049    /// - Old enough by either:
3050    ///   - Having a packet number [`TransportConfig::packet_threshold`] lower then the last
3051    ///     acknowledged packet.
3052    ///   - Being sent [`TransportConfig::time_threshold`] * RTT in the past.
3053    fn detect_lost_packets(
3054        &mut self,
3055        now: Instant,
3056        pn_space: SpaceId,
3057        path_id: PathId,
3058        due_to_ack: bool,
3059    ) {
3060        let mut lost_packets = Vec::<u64>::new();
3061        let mut lost_mtu_probe = None;
3062        let mut in_persistent_congestion = false;
3063        let mut size_of_lost_packets = 0u64;
3064        self.spaces[pn_space].for_path(path_id).loss_time = None;
3065
3066        // Find all the lost packets, populating all variables initialised above.
3067
3068        let path = self.path_data(path_id);
3069        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3070        let loss_delay = path
3071            .rtt
3072            .conservative()
3073            .mul_f32(self.config.time_threshold)
3074            .max(TIMER_GRANULARITY);
3075        let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
3076
3077        let largest_acked_packet = self.spaces[pn_space]
3078            .for_path(path_id)
3079            .largest_acked_packet
3080            .expect("detect_lost_packets only to be called if path received at least one ACK");
3081        let packet_threshold = self.config.packet_threshold as u64;
3082
3083        // InPersistentCongestion: Determine if all packets in the time period before the newest
3084        // lost packet, including the edges, are marked lost. PTO computation must always
3085        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
3086        let congestion_period = self
3087            .pto(SpaceKind::Data, path_id)
3088            .saturating_mul(self.config.persistent_congestion_threshold);
3089        let mut persistent_congestion_start: Option<Instant> = None;
3090        let mut prev_packet = None;
3091        let space = self.spaces[pn_space].for_path(path_id);
3092
3093        for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
3094            if prev_packet != Some(packet.wrapping_sub(1)) {
3095                // An intervening packet was acknowledged
3096                persistent_congestion_start = None;
3097            }
3098
3099            // Packets sent before now - loss_delay are deemed lost.
3100            // However, we avoid subtraction as it can panic and there's no
3101            // saturating equivalent of this subtraction operation with a Duration.
3102            let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
3103            if packet_too_old || largest_acked_packet >= packet + packet_threshold {
3104                // The packet should be declared lost.
3105                if Some(packet) == in_flight_mtu_probe {
3106                    // Lost MTU probes are not included in `lost_packets`, because they
3107                    // should not trigger a congestion control response
3108                    lost_mtu_probe = in_flight_mtu_probe;
3109                } else {
3110                    lost_packets.push(packet);
3111                    size_of_lost_packets += info.size as u64;
3112                    if info.ack_eliciting && due_to_ack {
3113                        match persistent_congestion_start {
3114                            // Two ACK-eliciting packets lost more than
3115                            // congestion_period apart, with no ACKed packets in between
3116                            Some(start) if info.time_sent - start > congestion_period => {
3117                                in_persistent_congestion = true;
3118                            }
3119                            // Persistent congestion must start after the first RTT sample
3120                            None if first_packet_after_rtt_sample
3121                                .is_some_and(|x| x < (pn_space.kind(), packet)) =>
3122                            {
3123                                persistent_congestion_start = Some(info.time_sent);
3124                            }
3125                            _ => {}
3126                        }
3127                    }
3128                }
3129            } else {
3130                // The packet should not yet be declared lost.
3131                if space.loss_time.is_none() {
3132                    // Since we iterate in order the lowest packet number's loss time will
3133                    // always be the earliest.
3134                    space.loss_time = Some(info.time_sent + loss_delay);
3135                }
3136                persistent_congestion_start = None;
3137            }
3138
3139            prev_packet = Some(packet);
3140        }
3141
3142        self.handle_lost_packets(
3143            pn_space,
3144            path_id,
3145            now,
3146            lost_packets,
3147            lost_mtu_probe,
3148            loss_delay,
3149            in_persistent_congestion,
3150            size_of_lost_packets,
3151        );
3152    }
3153
3154    /// Drops the path state, declaring any remaining in-flight packets as lost
3155    fn discard_path(&mut self, path_id: PathId, now: Instant) {
3156        trace!(%path_id, "dropping path state");
3157        let path = self.path_data(path_id);
3158        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3159
3160        let mut size_of_lost_packets = 0u64; // add to path_stats.lost_bytes;
3161        let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
3162            .for_path(path_id)
3163            .sent_packets
3164            .iter()
3165            .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
3166            .map(|(pn, info)| {
3167                size_of_lost_packets += info.size as u64;
3168                pn
3169            })
3170            .collect();
3171
3172        if !lost_pns.is_empty() {
3173            trace!(
3174                %path_id,
3175                count = lost_pns.len(),
3176                lost_bytes = size_of_lost_packets,
3177                "packets lost on path abandon"
3178            );
3179            self.handle_lost_packets(
3180                SpaceId::Data,
3181                path_id,
3182                now,
3183                lost_pns,
3184                in_flight_mtu_probe,
3185                Duration::ZERO,
3186                false,
3187                size_of_lost_packets,
3188            );
3189        }
3190        // Before removing the path, we fetch the final path stats via `Self::path_stats`.
3191        // This updates some values for the last time.
3192        let path_stats = self.path_stats(path_id).unwrap_or_default();
3193        self.path_stats.remove(&path_id);
3194        self.paths.remove(&path_id);
3195        self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
3196
3197        self.events.push_back(
3198            PathEvent::Discarded {
3199                id: path_id,
3200                path_stats,
3201            }
3202            .into(),
3203        );
3204    }
3205
3206    fn handle_lost_packets(
3207        &mut self,
3208        pn_space: SpaceId,
3209        path_id: PathId,
3210        now: Instant,
3211        lost_packets: Vec<u64>,
3212        lost_mtu_probe: Option<u64>,
3213        loss_delay: Duration,
3214        in_persistent_congestion: bool,
3215        size_of_lost_packets: u64,
3216    ) {
3217        debug_assert!(
3218            {
3219                let mut sorted = lost_packets.clone();
3220                sorted.sort();
3221                sorted == lost_packets
3222            },
3223            "lost_packets must be sorted"
3224        );
3225
3226        self.drain_lost_packets(now, pn_space, path_id);
3227
3228        // OnPacketsLost
3229        if let Some(largest_lost) = lost_packets.last().cloned() {
3230            let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
3231            let largest_lost_sent = self.spaces[pn_space]
3232                .for_path(path_id)
3233                .sent_packets
3234                .get(largest_lost)
3235                .unwrap()
3236                .time_sent;
3237            let path_stats = self.path_stats.entry(path_id).or_default();
3238            path_stats.lost_packets += lost_packets.len() as u64;
3239            path_stats.lost_bytes += size_of_lost_packets;
3240            trace!(
3241                %path_id,
3242                count = lost_packets.len(),
3243                lost_bytes = size_of_lost_packets,
3244                "packets lost",
3245            );
3246
3247            for &packet in &lost_packets {
3248                let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
3249                    continue;
3250                };
3251                self.qlog
3252                    .emit_packet_lost(packet, &info, loss_delay, pn_space.kind(), now);
3253                self.paths
3254                    .get_mut(&path_id)
3255                    .unwrap()
3256                    .remove_in_flight(&info);
3257
3258                for frame in info.stream_frames {
3259                    self.streams.retransmit(frame);
3260                }
3261                self.spaces[pn_space].pending |= info.retransmits;
3262                self.path_data_mut(path_id)
3263                    .mtud
3264                    .on_non_probe_lost(packet, info.size);
3265
3266                self.spaces[pn_space].for_path(path_id).lost_packets.insert(
3267                    packet,
3268                    LostPacket {
3269                        time_sent: info.time_sent,
3270                    },
3271                );
3272            }
3273
3274            let path = self.path_data_mut(path_id);
3275            if path.mtud.black_hole_detected(now) {
3276                path.congestion.on_mtu_update(path.mtud.current_mtu());
3277                if let Some(max_datagram_size) = self.datagrams().max_size()
3278                    && self.datagrams.drop_oversized(max_datagram_size)
3279                    && self.datagrams.send_blocked
3280                {
3281                    self.datagrams.send_blocked = false;
3282                    self.events.push_back(Event::DatagramsUnblocked);
3283                }
3284                self.path_stats
3285                    .entry(path_id)
3286                    .or_default()
3287                    .black_holes_detected += 1;
3288            }
3289
3290            // Don't apply congestion penalty for lost ack-only packets
3291            let lost_ack_eliciting =
3292                old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
3293
3294            if lost_ack_eliciting {
3295                self.path_stats
3296                    .entry(path_id)
3297                    .or_default()
3298                    .congestion_events += 1;
3299                self.path_data_mut(path_id).congestion.on_congestion_event(
3300                    now,
3301                    largest_lost_sent,
3302                    in_persistent_congestion,
3303                    false,
3304                    size_of_lost_packets,
3305                );
3306            }
3307        }
3308
3309        // Handle a lost MTU probe
3310        if let Some(packet) = lost_mtu_probe {
3311            let info = self.spaces[SpaceId::Data]
3312                .for_path(path_id)
3313                .take(packet)
3314                .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and
3315            // therefore must not have been removed yet
3316            self.paths
3317                .get_mut(&path_id)
3318                .unwrap()
3319                .remove_in_flight(&info);
3320            self.path_data_mut(path_id).mtud.on_probe_lost();
3321            self.path_stats
3322                .entry(path_id)
3323                .or_default()
3324                .lost_plpmtud_probes += 1;
3325        }
3326    }
3327
3328    /// Returns the earliest time packets should be declared lost for all spaces on a path.
3329    ///
3330    /// If a path has an acknowledged packet with any prior un-acknowledged packets, the
3331    /// earliest un-acknowledged packet can be declared lost after a timeout has elapsed.
3332    /// The time returned is when this packet should be declared lost.
3333    fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
3334        SpaceId::iter()
3335            .filter_map(|id| {
3336                self.spaces[id]
3337                    .number_spaces
3338                    .get(&path_id)
3339                    .and_then(|pns| pns.loss_time)
3340                    .map(|time| (time, id))
3341            })
3342            .min_by_key(|&(time, _)| time)
3343    }
3344
3345    /// Returns the earliest next PTO should fire for all spaces on a path.
3346    fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3347        let path = self.path(path_id)?;
3348        let pto_count = path.pto_count;
3349        let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
3350        let mut duration = path.rtt.pto_base() * backoff;
3351
3352        if path_id == PathId::ZERO
3353            && path.in_flight.ack_eliciting == 0
3354            && !self.peer_completed_address_validation(PathId::ZERO)
3355        {
3356            // Address Validation during Connection Establishment:
3357            // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
3358            // deadlock if an Initial or Handshake packet from the server is lost and the
3359            // server can not send more due to its anti-amplification limit the client must
3360            // send another packet on PTO.
3361            let space = match self.highest_space {
3362                SpaceKind::Handshake => SpaceId::Handshake,
3363                _ => SpaceId::Initial,
3364            };
3365
3366            return Some((now + duration, space));
3367        }
3368
3369        let mut result = None;
3370        for space in SpaceId::iter() {
3371            let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3372                continue;
3373            };
3374
3375            if !pns.has_in_flight() {
3376                continue;
3377            }
3378            if space == SpaceId::Data {
3379                // Skip ApplicationData until handshake completes.
3380                if self.is_handshaking() {
3381                    return result;
3382                }
3383                // Include max_ack_delay and backoff for ApplicationData.
3384                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3385            }
3386            let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3387                continue;
3388            };
3389            let pto = last_ack_eliciting + duration;
3390            if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3391                if path.anti_amplification_blocked(1) {
3392                    // Nothing would be able to be sent.
3393                    continue;
3394                }
3395                if path.in_flight.ack_eliciting == 0 {
3396                    // Nothing ack-eliciting, no PTO to arm/fire.
3397                    continue;
3398                }
3399                result = Some((pto, space));
3400            }
3401        }
3402        result
3403    }
3404
3405    fn peer_completed_address_validation(&self, path: PathId) -> bool {
3406        // TODO(flub): This logic needs updating for multipath
3407        if self.side.is_server() || self.state.is_closed() {
3408            return true;
3409        }
3410        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
3411        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
3412        self.spaces[SpaceId::Handshake]
3413            .path_space(PathId::ZERO)
3414            .and_then(|pns| pns.largest_acked_packet)
3415            .is_some()
3416            || self.spaces[SpaceId::Data]
3417                .path_space(path)
3418                .and_then(|pns| pns.largest_acked_packet)
3419                .is_some()
3420            || (self.crypto_state.has_keys(EncryptionLevel::OneRtt)
3421                && !self.crypto_state.has_keys(EncryptionLevel::Handshake))
3422    }
3423
3424    /// Resets the the [`PathTimer::LossDetection`] timer to the next instant it may be needed
3425    ///
3426    /// The timer must fire if either:
3427    /// - An ack-eliciting packet we sent needs to be declared lost.
3428    /// - A tail-loss probe needs to be sent.
3429    ///
3430    /// See [`Connection::on_loss_detection_timeout`] for details.
3431    fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3432        if self.state.is_closed() {
3433            // No loss detection takes place on closed connections, and `close_common` already
3434            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
3435            // reordered packet being handled by state-insensitive code.
3436            return;
3437        }
3438
3439        if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3440            // Time threshold loss detection.
3441            self.timers.set(
3442                Timer::PerPath(path_id, PathTimer::LossDetection),
3443                loss_time,
3444                self.qlog.with_time(now),
3445            );
3446            return;
3447        }
3448
3449        // Determine which PN space to arm PTO for.
3450        // Calculate PTO duration
3451        if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3452            self.timers.set(
3453                Timer::PerPath(path_id, PathTimer::LossDetection),
3454                timeout,
3455                self.qlog.with_time(now),
3456            );
3457        } else {
3458            self.timers.stop(
3459                Timer::PerPath(path_id, PathTimer::LossDetection),
3460                self.qlog.with_time(now),
3461            );
3462        }
3463    }
3464
3465    /// The maximum probe timeout across all paths
3466    ///
3467    /// See [`Connection::pto`]
3468    fn max_pto_for_space(&self, space: SpaceKind) -> Duration {
3469        self.paths
3470            .keys()
3471            .map(|path_id| self.pto(space, *path_id))
3472            .max()
3473            .expect("there should be at least one path")
3474    }
3475
3476    /// Probe Timeout
3477    ///
3478    /// The PTO is logically the time in which you'd expect to receive an acknowledgement
3479    /// for a packet. So approximately RTT + max_ack_delay.
3480    fn pto(&self, space: SpaceKind, path_id: PathId) -> Duration {
3481        let max_ack_delay = match space {
3482            SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3483            SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3484        };
3485        self.path_data(path_id).rtt.pto_base() + max_ack_delay
3486    }
3487
3488    fn on_packet_authenticated(
3489        &mut self,
3490        now: Instant,
3491        space_id: SpaceKind,
3492        path_id: PathId,
3493        ecn: Option<EcnCodepoint>,
3494        packet: Option<u64>,
3495        spin: bool,
3496        is_1rtt: bool,
3497    ) {
3498        self.total_authed_packets += 1;
3499        self.reset_keep_alive(path_id, now);
3500        self.reset_idle_timeout(now, space_id, path_id);
3501        self.permit_idle_reset = true;
3502        self.receiving_ecn |= ecn.is_some();
3503        if let Some(x) = ecn {
3504            let space = &mut self.spaces[space_id];
3505            space.for_path(path_id).ecn_counters += x;
3506
3507            if x.is_ce() {
3508                space
3509                    .for_path(path_id)
3510                    .pending_acks
3511                    .set_immediate_ack_required();
3512            }
3513        }
3514
3515        let Some(packet) = packet else {
3516            return;
3517        };
3518        match &self.side {
3519            ConnectionSide::Client { .. } => {
3520                // If we received a handshake packet that authenticated, then we're talking to
3521                // the real server.  From now on we should no longer allow the server to migrate
3522                // its address.
3523                if space_id == SpaceKind::Handshake
3524                    && let Some(hs) = self.state.as_handshake_mut()
3525                {
3526                    hs.allow_server_migration = false;
3527                }
3528            }
3529            ConnectionSide::Server { .. } => {
3530                if self.crypto_state.has_keys(EncryptionLevel::Initial)
3531                    && space_id == SpaceKind::Handshake
3532                {
3533                    // A server stops sending and processing Initial packets when it receives its first Handshake packet.
3534                    self.discard_space(now, SpaceKind::Initial);
3535                }
3536                if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) && is_1rtt {
3537                    // Discard 0-RTT keys soon after receiving a 1-RTT packet
3538                    self.set_key_discard_timer(now, space_id)
3539                }
3540            }
3541        }
3542        let space = self.spaces[space_id].for_path(path_id);
3543        space.pending_acks.insert_one(packet, now);
3544        if packet >= space.rx_packet.unwrap_or_default() {
3545            space.rx_packet = Some(packet);
3546            // Update outgoing spin bit, inverting iff we're the client
3547            self.spin = self.side.is_client() ^ spin;
3548        }
3549    }
3550
3551    /// Resets the idle timeout timers
3552    ///
3553    /// Without multipath there is only the connection-wide idle timeout. When multipath is
3554    /// enabled there is an additional per-path idle timeout.
3555    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceKind, path_id: PathId) {
3556        // First reset the global idle timeout.
3557        if let Some(timeout) = self.idle_timeout {
3558            if self.state.is_closed() {
3559                self.timers
3560                    .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3561            } else {
3562                let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space));
3563                self.timers.set(
3564                    Timer::Conn(ConnTimer::Idle),
3565                    now + dt,
3566                    self.qlog.with_time(now),
3567                );
3568            }
3569        }
3570
3571        // Now handle the per-path state
3572        if let Some(timeout) = self.path_data(path_id).idle_timeout {
3573            if self.state.is_closed() {
3574                self.timers.stop(
3575                    Timer::PerPath(path_id, PathTimer::PathIdle),
3576                    self.qlog.with_time(now),
3577                );
3578            } else {
3579                let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3580                self.timers.set(
3581                    Timer::PerPath(path_id, PathTimer::PathIdle),
3582                    now + dt,
3583                    self.qlog.with_time(now),
3584                );
3585            }
3586        }
3587    }
3588
3589    /// Resets both the [`ConnTimer::KeepAlive`] and [`PathTimer::PathKeepAlive`] timers
3590    fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3591        if !self.state.is_established() {
3592            return;
3593        }
3594
3595        if let Some(interval) = self.config.keep_alive_interval {
3596            self.timers.set(
3597                Timer::Conn(ConnTimer::KeepAlive),
3598                now + interval,
3599                self.qlog.with_time(now),
3600            );
3601        }
3602
3603        if let Some(interval) = self.path_data(path_id).keep_alive {
3604            self.timers.set(
3605                Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3606                now + interval,
3607                self.qlog.with_time(now),
3608            );
3609        }
3610    }
3611
3612    /// Sets the timer for when a previously issued CID should be retired next
3613    fn reset_cid_retirement(&mut self, now: Instant) {
3614        if let Some((_path, t)) = self.next_cid_retirement() {
3615            self.timers.set(
3616                Timer::Conn(ConnTimer::PushNewCid),
3617                t,
3618                self.qlog.with_time(now),
3619            );
3620        }
3621    }
3622
3623    /// The next time when a previously issued CID should be retired
3624    fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3625        self.local_cid_state
3626            .iter()
3627            .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3628            .min_by_key(|(_path_id, timeout)| *timeout)
3629    }
3630
3631    /// Handle the already-decrypted first packet from the client
3632    ///
3633    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
3634    /// efficient.
3635    pub(crate) fn handle_first_packet(
3636        &mut self,
3637        now: Instant,
3638        network_path: FourTuple,
3639        ecn: Option<EcnCodepoint>,
3640        packet_number: u64,
3641        packet: InitialPacket,
3642        remaining: Option<BytesMut>,
3643    ) -> Result<(), ConnectionError> {
3644        let span = trace_span!("first recv");
3645        let _guard = span.enter();
3646        debug_assert!(self.side.is_server());
3647        let len = packet.header_data.len() + packet.payload.len();
3648        let path_id = PathId::ZERO;
3649        self.path_data_mut(path_id).total_recvd = len as u64;
3650
3651        if let Some(hs) = self.state.as_handshake_mut() {
3652            hs.expected_token = packet.header.token.clone();
3653        } else {
3654            unreachable!("first packet must be delivered in Handshake state");
3655        }
3656
3657        // The first packet is always on PathId::ZERO
3658        self.on_packet_authenticated(
3659            now,
3660            SpaceKind::Initial,
3661            path_id,
3662            ecn,
3663            Some(packet_number),
3664            false,
3665            false,
3666        );
3667
3668        let packet: Packet = packet.into();
3669
3670        let mut qlog = QlogRecvPacket::new(len);
3671        qlog.header(&packet.header, Some(packet_number), path_id);
3672
3673        self.process_decrypted_packet(
3674            now,
3675            network_path,
3676            path_id,
3677            Some(packet_number),
3678            packet,
3679            &mut qlog,
3680        )?;
3681        self.qlog.emit_packet_received(qlog, now);
3682        if let Some(data) = remaining {
3683            self.handle_coalesced(now, network_path, path_id, ecn, data);
3684        }
3685
3686        self.qlog.emit_recovery_metrics(
3687            path_id,
3688            &mut self.paths.get_mut(&path_id).unwrap().data,
3689            now,
3690        );
3691
3692        Ok(())
3693    }
3694
3695    fn init_0rtt(&mut self, now: Instant) {
3696        let Some((header, packet)) = self.crypto_state.session.early_crypto() else {
3697            return;
3698        };
3699        if self.side.is_client() {
3700            match self.crypto_state.session.transport_parameters() {
3701                Ok(params) => {
3702                    let params = params
3703                        .expect("crypto layer didn't supply transport parameters with ticket");
3704                    // Certain values must not be cached
3705                    let params = TransportParameters {
3706                        initial_src_cid: None,
3707                        original_dst_cid: None,
3708                        preferred_address: None,
3709                        retry_src_cid: None,
3710                        stateless_reset_token: None,
3711                        min_ack_delay: None,
3712                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3713                        max_ack_delay: TransportParameters::default().max_ack_delay,
3714                        initial_max_path_id: None,
3715                        ..params
3716                    };
3717                    self.set_peer_params(params);
3718                    self.qlog.emit_peer_transport_params_restored(self, now);
3719                }
3720                Err(e) => {
3721                    error!("session ticket has malformed transport parameters: {}", e);
3722                    return;
3723                }
3724            }
3725        }
3726        trace!("0-RTT enabled");
3727        self.crypto_state.enable_zero_rtt(header, packet);
3728    }
3729
3730    fn read_crypto(
3731        &mut self,
3732        space: SpaceId,
3733        crypto: &frame::Crypto,
3734        payload_len: usize,
3735    ) -> Result<(), TransportError> {
3736        let expected = if !self.state.is_handshake() {
3737            SpaceId::Data
3738        } else if self.highest_space == SpaceKind::Initial {
3739            SpaceId::Initial
3740        } else {
3741            // On the server, self.highest_space can be Data after receiving the client's first
3742            // flight, but we expect Handshake CRYPTO until the handshake is complete.
3743            SpaceId::Handshake
3744        };
3745        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
3746        // packets are illegal, and we don't process 1-RTT packets until the handshake is
3747        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
3748        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3749
3750        let end = crypto.offset + crypto.data.len() as u64;
3751        if space < expected
3752            && end
3753                > self.crypto_state.spaces[space.kind()]
3754                    .crypto_stream
3755                    .bytes_read()
3756        {
3757            warn!(
3758                "received new {:?} CRYPTO data when expecting {:?}",
3759                space, expected
3760            );
3761            return Err(TransportError::PROTOCOL_VIOLATION(
3762                "new data at unexpected encryption level",
3763            ));
3764        }
3765
3766        let crypto_space = &mut self.crypto_state.spaces[space.kind()];
3767        let max = end.saturating_sub(crypto_space.crypto_stream.bytes_read());
3768        if max > self.config.crypto_buffer_size as u64 {
3769            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3770        }
3771
3772        crypto_space
3773            .crypto_stream
3774            .insert(crypto.offset, crypto.data.clone(), payload_len);
3775        while let Some(chunk) = crypto_space.crypto_stream.read(usize::MAX, true) {
3776            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3777            if self.crypto_state.session.read_handshake(&chunk.bytes)? {
3778                self.events.push_back(Event::HandshakeDataReady);
3779            }
3780        }
3781
3782        Ok(())
3783    }
3784
3785    fn write_crypto(&mut self) {
3786        loop {
3787            let space = self.highest_space;
3788            let mut outgoing = Vec::new();
3789            if let Some(crypto) = self.crypto_state.session.write_handshake(&mut outgoing) {
3790                match space {
3791                    SpaceKind::Initial => {
3792                        self.upgrade_crypto(SpaceKind::Handshake, crypto);
3793                    }
3794                    SpaceKind::Handshake => {
3795                        self.upgrade_crypto(SpaceKind::Data, crypto);
3796                    }
3797                    SpaceKind::Data => unreachable!("got updated secrets during 1-RTT"),
3798                }
3799            }
3800            if outgoing.is_empty() {
3801                if space == self.highest_space {
3802                    break;
3803                } else {
3804                    // Keys updated, check for more data to send
3805                    continue;
3806                }
3807            }
3808            let offset = self.crypto_state.spaces[space].crypto_offset;
3809            let outgoing = Bytes::from(outgoing);
3810            if let Some(hs) = self.state.as_handshake_mut()
3811                && space == SpaceKind::Initial
3812                && offset == 0
3813                && self.side.is_client()
3814            {
3815                hs.client_hello = Some(outgoing.clone());
3816            }
3817            self.crypto_state.spaces[space].crypto_offset += outgoing.len() as u64;
3818            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3819            self.spaces[space].pending.crypto.push_back(frame::Crypto {
3820                offset,
3821                data: outgoing,
3822            });
3823        }
3824    }
3825
3826    /// Switch to stronger cryptography during handshake
3827    fn upgrade_crypto(&mut self, space: SpaceKind, crypto: Keys) {
3828        debug_assert!(
3829            !self.crypto_state.has_keys(space.encryption_level()),
3830            "already reached packet space {space:?}"
3831        );
3832        trace!("{:?} keys ready", space);
3833        if space == SpaceKind::Data {
3834            // Precompute the first key update
3835            self.crypto_state.next_crypto = Some(
3836                self.crypto_state
3837                    .session
3838                    .next_1rtt_keys()
3839                    .expect("handshake should be complete"),
3840            );
3841        }
3842
3843        self.crypto_state.spaces[space].keys = Some(crypto);
3844        debug_assert!(space > self.highest_space);
3845        self.highest_space = space;
3846        if space == SpaceKind::Data && self.side.is_client() {
3847            // Discard 0-RTT keys because 1-RTT keys are available.
3848            self.crypto_state.discard_zero_rtt();
3849        }
3850    }
3851
3852    fn discard_space(&mut self, now: Instant, space: SpaceKind) {
3853        debug_assert!(space != SpaceKind::Data);
3854        trace!("discarding {:?} keys", space);
3855        if space == SpaceKind::Initial {
3856            // No longer needed
3857            if let ConnectionSide::Client { token, .. } = &mut self.side {
3858                *token = Bytes::new();
3859            }
3860        }
3861        self.crypto_state.spaces[space].keys = None;
3862        let space = &mut self.spaces[space];
3863        let pns = space.for_path(PathId::ZERO);
3864        pns.time_of_last_ack_eliciting_packet = None;
3865        pns.loss_time = None;
3866        pns.loss_probes = 0;
3867        let sent_packets = mem::take(&mut pns.sent_packets);
3868        let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3869        for (_, packet) in sent_packets.into_iter() {
3870            path.data.remove_in_flight(&packet);
3871        }
3872
3873        self.set_loss_detection_timer(now, PathId::ZERO)
3874    }
3875
3876    fn handle_coalesced(
3877        &mut self,
3878        now: Instant,
3879        network_path: FourTuple,
3880        path_id: PathId,
3881        ecn: Option<EcnCodepoint>,
3882        data: BytesMut,
3883    ) {
3884        self.path_data_mut(path_id)
3885            .inc_total_recvd(data.len() as u64);
3886        let mut remaining = Some(data);
3887        let cid_len = self
3888            .local_cid_state
3889            .values()
3890            .map(|cid_state| cid_state.cid_len())
3891            .next()
3892            .expect("one cid_state must exist");
3893        while let Some(data) = remaining {
3894            match PartialDecode::new(
3895                data,
3896                &FixedLengthConnectionIdParser::new(cid_len),
3897                &[self.version],
3898                self.endpoint_config.grease_quic_bit,
3899            ) {
3900                Ok((partial_decode, rest)) => {
3901                    remaining = rest;
3902                    self.handle_decode(now, network_path, path_id, ecn, partial_decode);
3903                }
3904                Err(e) => {
3905                    trace!("malformed header: {}", e);
3906                    return;
3907                }
3908            }
3909        }
3910    }
3911
3912    fn handle_decode(
3913        &mut self,
3914        now: Instant,
3915        network_path: FourTuple,
3916        path_id: PathId,
3917        ecn: Option<EcnCodepoint>,
3918        partial_decode: PartialDecode,
3919    ) {
3920        let qlog = QlogRecvPacket::new(partial_decode.len());
3921        if let Some(decoded) = self
3922            .crypto_state
3923            .unprotect_header(partial_decode, self.peer_params.stateless_reset_token)
3924        {
3925            self.handle_packet(
3926                now,
3927                network_path,
3928                path_id,
3929                ecn,
3930                decoded.packet,
3931                decoded.stateless_reset,
3932                qlog,
3933            );
3934        }
3935    }
3936
3937    fn handle_packet(
3938        &mut self,
3939        now: Instant,
3940        network_path: FourTuple,
3941        path_id: PathId,
3942        ecn: Option<EcnCodepoint>,
3943        packet: Option<Packet>,
3944        stateless_reset: bool,
3945        mut qlog: QlogRecvPacket,
3946    ) {
3947        self.stats.udp_rx.ios += 1;
3948        self.path_stats.entry(path_id).or_default().udp_rx.ios += 1;
3949
3950        if let Some(ref packet) = packet {
3951            trace!(
3952                "got {:?} packet ({} bytes) from {} using id {}",
3953                packet.header.space(),
3954                packet.payload.len() + packet.header_data.len(),
3955                network_path,
3956                packet.header.dst_cid(),
3957            );
3958        }
3959
3960        if self.is_handshaking() {
3961            if path_id != PathId::ZERO {
3962                debug!(%network_path, %path_id, "discarding multipath packet during handshake");
3963                return;
3964            }
3965            if network_path != self.path_data_mut(path_id).network_path {
3966                if let Some(hs) = self.state.as_handshake() {
3967                    if hs.allow_server_migration {
3968                        trace!(%network_path, prev = %self.path_data(path_id).network_path, "server migrated to new remote");
3969                        self.path_data_mut(path_id).network_path = network_path;
3970                        self.qlog.emit_tuple_assigned(path_id, network_path, now);
3971                    } else {
3972                        debug!("discarding packet with unexpected remote during handshake");
3973                        return;
3974                    }
3975                } else {
3976                    debug!("discarding packet with unexpected remote during handshake");
3977                    return;
3978                }
3979            }
3980        }
3981
3982        let was_closed = self.state.is_closed();
3983        let was_drained = self.state.is_drained();
3984
3985        let decrypted = match packet {
3986            None => Err(None),
3987            Some(mut packet) => self
3988                .decrypt_packet(now, path_id, &mut packet)
3989                .map(move |number| (packet, number)),
3990        };
3991        let result = match decrypted {
3992            _ if stateless_reset => {
3993                debug!("got stateless reset");
3994                Err(ConnectionError::Reset)
3995            }
3996            Err(Some(e)) => {
3997                warn!("illegal packet: {}", e);
3998                Err(e.into())
3999            }
4000            Err(None) => {
4001                debug!("failed to authenticate packet");
4002                self.authentication_failures += 1;
4003                let integrity_limit = self
4004                    .crypto_state
4005                    .integrity_limit(self.highest_space)
4006                    .unwrap();
4007                if self.authentication_failures > integrity_limit {
4008                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
4009                } else {
4010                    return;
4011                }
4012            }
4013            Ok((packet, number)) => {
4014                qlog.header(&packet.header, number, path_id);
4015                let span = match number {
4016                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
4017                    None => trace_span!("recv", space = ?packet.header.space()),
4018                };
4019                let _guard = span.enter();
4020
4021                let dedup = self.spaces[packet.header.space()]
4022                    .path_space_mut(path_id)
4023                    .map(|pns| &mut pns.dedup);
4024                if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
4025                    debug!("discarding possible duplicate packet");
4026                    self.qlog.emit_packet_received(qlog, now);
4027                    return;
4028                } else if self.state.is_handshake() && packet.header.is_short() {
4029                    // TODO: SHOULD buffer these to improve reordering tolerance.
4030                    trace!("dropping short packet during handshake");
4031                    self.qlog.emit_packet_received(qlog, now);
4032                    return;
4033                } else {
4034                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header
4035                        && let Some(hs) = self.state.as_handshake()
4036                        && self.side.is_server()
4037                        && token != &hs.expected_token
4038                    {
4039                        // Clients must send the same retry token in every Initial. Initial
4040                        // packets can be spoofed, so we discard rather than killing the
4041                        // connection.
4042                        warn!("discarding Initial with invalid retry token");
4043                        self.qlog.emit_packet_received(qlog, now);
4044                        return;
4045                    }
4046
4047                    if !self.state.is_closed() {
4048                        let spin = match packet.header {
4049                            Header::Short { spin, .. } => spin,
4050                            _ => false,
4051                        };
4052
4053                        if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
4054                            // Only the client is allowed to open paths
4055                            self.ensure_path(path_id, network_path, now, number);
4056                        }
4057                        if self.paths.contains_key(&path_id) {
4058                            self.on_packet_authenticated(
4059                                now,
4060                                packet.header.space(),
4061                                path_id,
4062                                ecn,
4063                                number,
4064                                spin,
4065                                packet.header.is_1rtt(),
4066                            );
4067                        }
4068                    }
4069
4070                    let res = self.process_decrypted_packet(
4071                        now,
4072                        network_path,
4073                        path_id,
4074                        number,
4075                        packet,
4076                        &mut qlog,
4077                    );
4078
4079                    self.qlog.emit_packet_received(qlog, now);
4080                    res
4081                }
4082            }
4083        };
4084
4085        // State transitions for error cases
4086        if let Err(conn_err) = result {
4087            match conn_err {
4088                ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
4089                ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
4090                ConnectionError::Reset
4091                | ConnectionError::TransportError(TransportError {
4092                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
4093                    ..
4094                }) => {
4095                    self.state.move_to_drained(Some(conn_err));
4096                }
4097                ConnectionError::TimedOut => {
4098                    unreachable!("timeouts aren't generated by packet processing");
4099                }
4100                ConnectionError::TransportError(err) => {
4101                    debug!("closing connection due to transport error: {}", err);
4102                    self.state.move_to_closed(err);
4103                }
4104                ConnectionError::VersionMismatch => {
4105                    self.state.move_to_draining(Some(conn_err));
4106                }
4107                ConnectionError::LocallyClosed => {
4108                    unreachable!("LocallyClosed isn't generated by packet processing");
4109                }
4110                ConnectionError::CidsExhausted => {
4111                    unreachable!("CidsExhausted isn't generated by packet processing");
4112                }
4113            };
4114        }
4115
4116        if !was_closed && self.state.is_closed() {
4117            self.close_common();
4118            if !self.state.is_drained() {
4119                self.set_close_timer(now);
4120            }
4121        }
4122        if !was_drained && self.state.is_drained() {
4123            self.endpoint_events.push_back(EndpointEventInner::Drained);
4124            // Close timer may have been started previously, e.g. if we sent a close and got a
4125            // stateless reset in response
4126            self.timers
4127                .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
4128        }
4129
4130        // Transmit CONNECTION_CLOSE if necessary.
4131        //
4132        // If we received a valid packet and we are in the closed state we should respond
4133        // with a CONNECTION_CLOSE frame.
4134        // TODO: This SHOULD be rate-limited according to §10.2.1 of QUIC-TRANSPORT, but
4135        //    that does not yet happen. This is triggered by each received packet.
4136        if matches!(self.state.as_type(), StateType::Closed) {
4137            // From https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2.1-7
4138            //
4139            // While in the closing state we must either:
4140            // - discard packets coming from an un-validated remote OR
4141            // - ensure we do not send more than 3 times the received data
4142            //
4143            // Doing the 2nd would mean we would be able to send CONNECTION_CLOSE to a peer
4144            // who was (involuntary) migrated just at the time we initiated immediate
4145            // close. It is a lot more work though. So while we would like to do this for
4146            // now we only do 1.
4147            //
4148            // Another shortcoming of the current implementation is that when we have a
4149            // previous PathData which is validated and the remote matches that path, we
4150            // should schedule CONNECTION_CLOSE on that path. However currently we can not
4151            // schedule such a packet. We should also fix this some day. This makes us
4152            // vulnerable to an attacker faking a migration at the right time and then we'd
4153            // be unable to send the CONNECTION_CLOSE to the real remote.
4154            if self
4155                .paths
4156                .get(&path_id)
4157                .map(|p| p.data.validated && p.data.network_path == network_path)
4158                .unwrap_or(false)
4159            {
4160                self.connection_close_pending = true;
4161            }
4162        }
4163    }
4164
4165    fn process_decrypted_packet(
4166        &mut self,
4167        now: Instant,
4168        network_path: FourTuple,
4169        path_id: PathId,
4170        number: Option<u64>,
4171        packet: Packet,
4172        qlog: &mut QlogRecvPacket,
4173    ) -> Result<(), ConnectionError> {
4174        if !self.paths.contains_key(&path_id) {
4175            // There is a chance this is a server side, first (for this path) packet, which would
4176            // be a protocol violation. It's more likely, however, that this is a packet of a
4177            // pruned path
4178            trace!(%path_id, ?number, "discarding packet for unknown path");
4179            return Ok(());
4180        }
4181        let state = match self.state.as_type() {
4182            StateType::Established => {
4183                match packet.header.space() {
4184                    SpaceKind::Data => self.process_payload(
4185                        now,
4186                        network_path,
4187                        path_id,
4188                        number.unwrap(),
4189                        packet,
4190                        qlog,
4191                    )?,
4192                    _ if packet.header.has_frames() => {
4193                        self.process_early_payload(now, path_id, packet, qlog)?
4194                    }
4195                    _ => {
4196                        trace!("discarding unexpected pre-handshake packet");
4197                    }
4198                }
4199                return Ok(());
4200            }
4201            StateType::Closed => {
4202                for result in frame::Iter::new(packet.payload.freeze())? {
4203                    let frame = match result {
4204                        Ok(frame) => frame,
4205                        Err(err) => {
4206                            debug!("frame decoding error: {err:?}");
4207                            continue;
4208                        }
4209                    };
4210                    qlog.frame(&frame);
4211
4212                    if let Frame::Padding = frame {
4213                        continue;
4214                    };
4215
4216                    self.stats.frame_rx.record(frame.ty());
4217
4218                    if let Frame::Close(_error) = frame {
4219                        self.state.move_to_draining(None);
4220                        break;
4221                    }
4222                }
4223                return Ok(());
4224            }
4225            StateType::Draining | StateType::Drained => return Ok(()),
4226            StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
4227        };
4228
4229        match packet.header {
4230            Header::Retry {
4231                src_cid: remote_cid,
4232                ..
4233            } => {
4234                debug_assert_eq!(path_id, PathId::ZERO);
4235                if self.side.is_server() {
4236                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
4237                }
4238
4239                let is_valid_retry = self
4240                    .remote_cids
4241                    .get(&path_id)
4242                    .map(|cids| cids.active())
4243                    .map(|orig_dst_cid| {
4244                        self.crypto_state.session.is_valid_retry(
4245                            orig_dst_cid,
4246                            &packet.header_data,
4247                            &packet.payload,
4248                        )
4249                    })
4250                    .unwrap_or_default();
4251                if self.total_authed_packets > 1
4252                            || packet.payload.len() <= 16 // token + 16 byte tag
4253                            || !is_valid_retry
4254                {
4255                    trace!("discarding invalid Retry");
4256                    // - After the client has received and processed an Initial or Retry
4257                    //   packet from the server, it MUST discard any subsequent Retry
4258                    //   packets that it receives.
4259                    // - A client MUST discard a Retry packet with a zero-length Retry Token
4260                    //   field.
4261                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
4262                    //   that cannot be validated
4263                    return Ok(());
4264                }
4265
4266                trace!("retrying with CID {}", remote_cid);
4267                let client_hello = state.client_hello.take().unwrap();
4268                self.retry_src_cid = Some(remote_cid);
4269                self.remote_cids
4270                    .get_mut(&path_id)
4271                    .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
4272                    .update_initial_cid(remote_cid);
4273                self.remote_handshake_cid = remote_cid;
4274
4275                let space = &mut self.spaces[SpaceId::Initial];
4276                if let Some(info) = space.for_path(PathId::ZERO).take(0) {
4277                    self.on_packet_acked(now, PathId::ZERO, info);
4278                };
4279
4280                self.discard_space(now, SpaceKind::Initial); // Make sure we clean up after
4281                // any retransmitted Initials
4282                let crypto_space = &mut self.crypto_state.spaces[SpaceKind::Initial];
4283                crypto_space.keys = Some(
4284                    self.crypto_state
4285                        .session
4286                        .initial_keys(remote_cid, self.side.side()),
4287                );
4288                crypto_space.crypto_offset = client_hello.len() as u64;
4289
4290                let next_pn = self.spaces[SpaceId::Initial]
4291                    .for_path(path_id)
4292                    .next_packet_number;
4293                self.spaces[SpaceId::Initial] = {
4294                    let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
4295                    space.for_path(path_id).next_packet_number = next_pn;
4296                    space.pending.crypto.push_back(frame::Crypto {
4297                        offset: 0,
4298                        data: client_hello,
4299                    });
4300                    space
4301                };
4302
4303                // Retransmit all 0-RTT data
4304                let zero_rtt = mem::take(
4305                    &mut self.spaces[SpaceId::Data]
4306                        .for_path(PathId::ZERO)
4307                        .sent_packets,
4308                );
4309                for (_, info) in zero_rtt.into_iter() {
4310                    self.paths
4311                        .get_mut(&PathId::ZERO)
4312                        .unwrap()
4313                        .remove_in_flight(&info);
4314                    self.spaces[SpaceId::Data].pending |= info.retransmits;
4315                }
4316                self.streams.retransmit_all_for_0rtt();
4317
4318                let token_len = packet.payload.len() - 16;
4319                let ConnectionSide::Client { ref mut token, .. } = self.side else {
4320                    unreachable!("we already short-circuited if we're server");
4321                };
4322                *token = packet.payload.freeze().split_to(token_len);
4323
4324                self.state = State::handshake(state::Handshake {
4325                    expected_token: Bytes::new(),
4326                    remote_cid_set: false,
4327                    client_hello: None,
4328                    allow_server_migration: true,
4329                });
4330                Ok(())
4331            }
4332            Header::Long {
4333                ty: LongType::Handshake,
4334                src_cid: remote_cid,
4335                dst_cid: local_cid,
4336                ..
4337            } => {
4338                debug_assert_eq!(path_id, PathId::ZERO);
4339                if remote_cid != self.remote_handshake_cid {
4340                    debug!(
4341                        "discarding packet with mismatched remote CID: {} != {}",
4342                        self.remote_handshake_cid, remote_cid
4343                    );
4344                    return Ok(());
4345                }
4346                self.on_path_validated(path_id);
4347
4348                self.process_early_payload(now, path_id, packet, qlog)?;
4349                if self.state.is_closed() {
4350                    return Ok(());
4351                }
4352
4353                if self.crypto_state.session.is_handshaking() {
4354                    trace!("handshake ongoing");
4355                    return Ok(());
4356                }
4357
4358                if self.side.is_client() {
4359                    // Client-only because server params were set from the client's Initial
4360                    let params = self
4361                        .crypto_state
4362                        .session
4363                        .transport_parameters()?
4364                        .ok_or_else(|| {
4365                            TransportError::new(
4366                                TransportErrorCode::crypto(0x6d),
4367                                "transport parameters missing".to_owned(),
4368                            )
4369                        })?;
4370
4371                    if self.has_0rtt() {
4372                        if !self.crypto_state.session.early_data_accepted().unwrap() {
4373                            debug_assert!(self.side.is_client());
4374                            debug!("0-RTT rejected");
4375                            self.crypto_state.accepted_0rtt = false;
4376                            self.streams.zero_rtt_rejected();
4377
4378                            // Discard already-queued frames
4379                            self.spaces[SpaceId::Data].pending = Retransmits::default();
4380
4381                            // Discard 0-RTT packets
4382                            let sent_packets = mem::take(
4383                                &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4384                            );
4385                            for (_, packet) in sent_packets.into_iter() {
4386                                self.paths
4387                                    .get_mut(&path_id)
4388                                    .unwrap()
4389                                    .remove_in_flight(&packet);
4390                            }
4391                        } else {
4392                            self.crypto_state.accepted_0rtt = true;
4393                            params.validate_resumption_from(&self.peer_params)?;
4394                        }
4395                    }
4396                    if let Some(token) = params.stateless_reset_token {
4397                        let remote = self.path_data(path_id).network_path.remote;
4398                        debug_assert!(!self.state.is_drained()); // requirement for endpoint events, checked above
4399                        self.endpoint_events
4400                            .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4401                    }
4402                    self.handle_peer_params(params, local_cid, remote_cid, now)?;
4403                    self.issue_first_cids(now);
4404                } else {
4405                    // Server-only
4406                    self.spaces[SpaceId::Data].pending.handshake_done = true;
4407                    self.discard_space(now, SpaceKind::Handshake);
4408                    self.events.push_back(Event::HandshakeConfirmed);
4409                    trace!("handshake confirmed");
4410                }
4411
4412                self.events.push_back(Event::Connected);
4413                self.state.move_to_established();
4414                trace!("established");
4415
4416                // Multipath can only be enabled after the state has reached Established.
4417                // So this can not happen any earlier.
4418                self.issue_first_path_cids(now);
4419                Ok(())
4420            }
4421            Header::Initial(InitialHeader {
4422                src_cid: remote_cid,
4423                dst_cid: local_cid,
4424                ..
4425            }) => {
4426                debug_assert_eq!(path_id, PathId::ZERO);
4427                if !state.remote_cid_set {
4428                    trace!("switching remote CID to {}", remote_cid);
4429                    let mut state = state.clone();
4430                    self.remote_cids
4431                        .get_mut(&path_id)
4432                        .expect("PathId::ZERO not yet abandoned")
4433                        .update_initial_cid(remote_cid);
4434                    self.remote_handshake_cid = remote_cid;
4435                    self.original_remote_cid = remote_cid;
4436                    state.remote_cid_set = true;
4437                    self.state.move_to_handshake(state);
4438                } else if remote_cid != self.remote_handshake_cid {
4439                    debug!(
4440                        "discarding packet with mismatched remote CID: {} != {}",
4441                        self.remote_handshake_cid, remote_cid
4442                    );
4443                    return Ok(());
4444                }
4445
4446                let starting_space = self.highest_space;
4447                self.process_early_payload(now, path_id, packet, qlog)?;
4448
4449                if self.side.is_server()
4450                    && starting_space == SpaceKind::Initial
4451                    && self.highest_space != SpaceKind::Initial
4452                {
4453                    let params = self
4454                        .crypto_state
4455                        .session
4456                        .transport_parameters()?
4457                        .ok_or_else(|| {
4458                            TransportError::new(
4459                                TransportErrorCode::crypto(0x6d),
4460                                "transport parameters missing".to_owned(),
4461                            )
4462                        })?;
4463                    self.handle_peer_params(params, local_cid, remote_cid, now)?;
4464                    self.issue_first_cids(now);
4465                    self.init_0rtt(now);
4466                }
4467                Ok(())
4468            }
4469            Header::Long {
4470                ty: LongType::ZeroRtt,
4471                ..
4472            } => {
4473                self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4474                Ok(())
4475            }
4476            Header::VersionNegotiate { .. } => {
4477                if self.total_authed_packets > 1 {
4478                    return Ok(());
4479                }
4480                let supported = packet
4481                    .payload
4482                    .chunks(4)
4483                    .any(|x| match <[u8; 4]>::try_from(x) {
4484                        Ok(version) => self.version == u32::from_be_bytes(version),
4485                        Err(_) => false,
4486                    });
4487                if supported {
4488                    return Ok(());
4489                }
4490                debug!("remote doesn't support our version");
4491                Err(ConnectionError::VersionMismatch)
4492            }
4493            Header::Short { .. } => unreachable!(
4494                "short packets received during handshake are discarded in handle_packet"
4495            ),
4496        }
4497    }
4498
4499    /// Process an Initial or Handshake packet payload
4500    fn process_early_payload(
4501        &mut self,
4502        now: Instant,
4503        path_id: PathId,
4504        packet: Packet,
4505        #[allow(unused)] qlog: &mut QlogRecvPacket,
4506    ) -> Result<(), TransportError> {
4507        debug_assert_ne!(packet.header.space(), SpaceKind::Data);
4508        debug_assert_eq!(path_id, PathId::ZERO);
4509        let payload_len = packet.payload.len();
4510        let mut ack_eliciting = false;
4511        for result in frame::Iter::new(packet.payload.freeze())? {
4512            let frame = result?;
4513            qlog.frame(&frame);
4514            let span = match frame {
4515                Frame::Padding => continue,
4516                _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4517            };
4518
4519            self.stats.frame_rx.record(frame.ty());
4520
4521            let _guard = span.as_ref().map(|x| x.enter());
4522            ack_eliciting |= frame.is_ack_eliciting();
4523
4524            // Process frames
4525            if frame.is_1rtt() && packet.header.space() != SpaceKind::Data {
4526                return Err(TransportError::PROTOCOL_VIOLATION(
4527                    "illegal frame type in handshake",
4528                ));
4529            }
4530
4531            match frame {
4532                Frame::Padding | Frame::Ping => {}
4533                Frame::Crypto(frame) => {
4534                    self.read_crypto(packet.header.space().into(), &frame, payload_len)?;
4535                }
4536                Frame::Ack(ack) => {
4537                    self.on_ack_received(now, packet.header.space().into(), ack)?;
4538                }
4539                Frame::PathAck(ack) => {
4540                    span.as_ref()
4541                        .map(|span| span.record("path", tracing::field::display(&ack.path_id)));
4542                    self.on_path_ack_received(now, packet.header.space().into(), ack)?;
4543                }
4544                Frame::Close(reason) => {
4545                    self.state.move_to_draining(Some(reason.into()));
4546                    return Ok(());
4547                }
4548                _ => {
4549                    let mut err =
4550                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4551                    err.frame = frame::MaybeFrame::Known(frame.ty());
4552                    return Err(err);
4553                }
4554            }
4555        }
4556
4557        if ack_eliciting {
4558            // In the initial and handshake spaces, ACKs must be sent immediately
4559            self.spaces[packet.header.space()]
4560                .for_path(path_id)
4561                .pending_acks
4562                .set_immediate_ack_required();
4563        }
4564
4565        self.write_crypto();
4566        Ok(())
4567    }
4568
4569    /// Processes the packet payload, always in the data space.
4570    fn process_payload(
4571        &mut self,
4572        now: Instant,
4573        network_path: FourTuple,
4574        path_id: PathId,
4575        number: u64,
4576        packet: Packet,
4577        #[allow(unused)] qlog: &mut QlogRecvPacket,
4578    ) -> Result<(), TransportError> {
4579        let is_multipath_negotiated = self.is_multipath_negotiated();
4580        let payload = packet.payload.freeze();
4581        let mut is_probing_packet = true;
4582        let mut close = None;
4583        let payload_len = payload.len();
4584        let mut ack_eliciting = false;
4585        // if this packet triggers a path migration and includes a observed address frame, it's
4586        // stored here
4587        let mut migration_observed_addr = None;
4588        for result in frame::Iter::new(payload)? {
4589            let frame = result?;
4590            qlog.frame(&frame);
4591            let span = match frame {
4592                Frame::Padding => continue,
4593                _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4594            };
4595
4596            self.stats.frame_rx.record(frame.ty());
4597            // Crypto, Stream and Datagram frames are special cased in order no pollute
4598            // the log with payload data
4599            match &frame {
4600                Frame::Crypto(f) => {
4601                    trace!(offset = f.offset, len = f.data.len(), "got frame CRYPTO");
4602                }
4603                Frame::Stream(f) => {
4604                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got frame STREAM");
4605                }
4606                Frame::Datagram(f) => {
4607                    trace!(len = f.data.len(), "got frame DATAGRAM");
4608                }
4609                f => {
4610                    trace!("got frame {f}");
4611                }
4612            }
4613
4614            let _guard = span.enter();
4615            if packet.header.is_0rtt() {
4616                match frame {
4617                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4618                        return Err(TransportError::PROTOCOL_VIOLATION(
4619                            "illegal frame type in 0-RTT",
4620                        ));
4621                    }
4622                    _ => {
4623                        if frame.is_1rtt() {
4624                            return Err(TransportError::PROTOCOL_VIOLATION(
4625                                "illegal frame type in 0-RTT",
4626                            ));
4627                        }
4628                    }
4629                }
4630            }
4631            ack_eliciting |= frame.is_ack_eliciting();
4632
4633            // Check whether this could be a probing packet
4634            match frame {
4635                Frame::Padding
4636                | Frame::PathChallenge(_)
4637                | Frame::PathResponse(_)
4638                | Frame::NewConnectionId(_)
4639                | Frame::ObservedAddr(_) => {}
4640                _ => {
4641                    is_probing_packet = false;
4642                }
4643            }
4644
4645            match frame {
4646                Frame::Crypto(frame) => {
4647                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4648                }
4649                Frame::Stream(frame) => {
4650                    if self.streams.received(frame, payload_len)?.should_transmit() {
4651                        self.spaces[SpaceId::Data].pending.max_data = true;
4652                    }
4653                }
4654                Frame::Ack(ack) => {
4655                    self.on_ack_received(now, SpaceId::Data, ack)?;
4656                }
4657                Frame::PathAck(ack) => {
4658                    span.record("path", tracing::field::display(&ack.path_id));
4659                    self.on_path_ack_received(now, SpaceId::Data, ack)?;
4660                }
4661                Frame::Padding | Frame::Ping => {}
4662                Frame::Close(reason) => {
4663                    close = Some(reason);
4664                }
4665                Frame::PathChallenge(challenge) => {
4666                    let path = &mut self
4667                        .path_mut(path_id)
4668                        .expect("payload is processed only after the path becomes known");
4669                    path.path_responses.push(number, challenge.0, network_path);
4670                    // At this point, update_network_path_or_discard was already called, so
4671                    // we don't need to be lenient about `local_ip` possibly mis-matching.
4672                    if network_path == path.network_path {
4673                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
4674                        // attack. Send a non-probing packet to recover the active path.
4675                        // TODO(flub): No longer true! We now path_challege also to validate
4676                        //    the path if the path is new, without an RFC9000-style
4677                        //    migration involved. This means we add in an extra
4678                        //    IMMEDIATE_ACK on some challenges. It isn't really wrong to do
4679                        //    so, but it still is something untidy. We should instead
4680                        //    suppress this when we know the remote is still validating the
4681                        //    path.
4682                        match self.peer_supports_ack_frequency() {
4683                            true => self.immediate_ack(path_id),
4684                            false => {
4685                                self.ping_path(path_id).ok();
4686                            }
4687                        }
4688                    }
4689                }
4690                Frame::PathResponse(response) => {
4691                    let path = self
4692                        .paths
4693                        .get_mut(&path_id)
4694                        .expect("payload is processed only after the path becomes known");
4695
4696                    use PathTimer::*;
4697                    use paths::OnPathResponseReceived::*;
4698                    match path
4699                        .data
4700                        .on_path_response_received(now, response.0, network_path)
4701                    {
4702                        OnPath { was_open } => {
4703                            let qlog = self.qlog.with_time(now);
4704
4705                            self.timers
4706                                .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4707                            self.timers
4708                                .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4709
4710                            let next_challenge = path
4711                                .data
4712                                .earliest_on_path_expiring_challenge()
4713                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4714                            self.timers.set_or_stop(
4715                                Timer::PerPath(path_id, PathChallengeLost),
4716                                next_challenge,
4717                                qlog,
4718                            );
4719
4720                            if !was_open {
4721                                if is_multipath_negotiated {
4722                                    self.events
4723                                        .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4724                                }
4725                                if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4726                                {
4727                                    self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4728                                        id: path_id,
4729                                        addr: observed.socket_addr(),
4730                                    }));
4731                                }
4732                            }
4733                            if let Some((_, ref mut prev)) = path.prev {
4734                                prev.reset_on_path_challenges();
4735                            }
4736                        }
4737                        OffPath => {
4738                            debug!(%response, "Valid response to off-path PATH_CHALLENGE");
4739                        }
4740                        Ignored {
4741                            sent_on,
4742                            current_path,
4743                        } => {
4744                            debug!(%sent_on, %current_path, %response, "ignoring valid PATH_RESPONSE")
4745                        }
4746                        Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4747                    }
4748                }
4749                Frame::MaxData(frame::MaxData(bytes)) => {
4750                    self.streams.received_max_data(bytes);
4751                }
4752                Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
4753                    self.streams.received_max_stream_data(id, offset)?;
4754                }
4755                Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
4756                    self.streams.received_max_streams(dir, count)?;
4757                }
4758                Frame::ResetStream(frame) => {
4759                    if self.streams.received_reset(frame)?.should_transmit() {
4760                        self.spaces[SpaceId::Data].pending.max_data = true;
4761                    }
4762                }
4763                Frame::DataBlocked(DataBlocked(offset)) => {
4764                    debug!(offset, "peer claims to be blocked at connection level");
4765                }
4766                Frame::StreamDataBlocked(StreamDataBlocked { id, offset }) => {
4767                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4768                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4769                        return Err(TransportError::STREAM_STATE_ERROR(
4770                            "STREAM_DATA_BLOCKED on send-only stream",
4771                        ));
4772                    }
4773                    debug!(
4774                        stream = %id,
4775                        offset, "peer claims to be blocked at stream level"
4776                    );
4777                }
4778                Frame::StreamsBlocked(StreamsBlocked { dir, limit }) => {
4779                    if limit > MAX_STREAM_COUNT {
4780                        return Err(TransportError::FRAME_ENCODING_ERROR(
4781                            "unrepresentable stream limit",
4782                        ));
4783                    }
4784                    debug!(
4785                        "peer claims to be blocked opening more than {} {} streams",
4786                        limit, dir
4787                    );
4788                }
4789                Frame::StopSending(frame::StopSending { id, error_code }) => {
4790                    if id.initiator() != self.side.side() {
4791                        if id.dir() == Dir::Uni {
4792                            debug!("got STOP_SENDING on recv-only {}", id);
4793                            return Err(TransportError::STREAM_STATE_ERROR(
4794                                "STOP_SENDING on recv-only stream",
4795                            ));
4796                        }
4797                    } else if self.streams.is_local_unopened(id) {
4798                        return Err(TransportError::STREAM_STATE_ERROR(
4799                            "STOP_SENDING on unopened stream",
4800                        ));
4801                    }
4802                    self.streams.received_stop_sending(id, error_code);
4803                }
4804                Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4805                    if let Some(ref path_id) = path_id {
4806                        span.record("path", tracing::field::display(&path_id));
4807                    }
4808                    let path_id = path_id.unwrap_or_default();
4809                    match self.local_cid_state.get_mut(&path_id) {
4810                        None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4811                        Some(cid_state) => {
4812                            let allow_more_cids = cid_state
4813                                .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4814
4815                            // If the path has closed, we do not issue more CIDs for this path
4816                            // For details see  https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.2.2
4817                            // > an endpoint SHOULD provide new connection IDs for that path, if still open, using PATH_NEW_CONNECTION_ID frames.
4818                            let has_path = !self.abandoned_paths.contains(&path_id);
4819                            let allow_more_cids = allow_more_cids && has_path;
4820
4821                            debug_assert!(!self.state.is_drained()); // required for adding endpoint events, process_payload is never called for drained connections
4822                            self.endpoint_events
4823                                .push_back(EndpointEventInner::RetireConnectionId(
4824                                    now,
4825                                    path_id,
4826                                    sequence,
4827                                    allow_more_cids,
4828                                ));
4829                        }
4830                    }
4831                }
4832                Frame::NewConnectionId(frame) => {
4833                    let path_id = if let Some(path_id) = frame.path_id {
4834                        if !self.is_multipath_negotiated() {
4835                            return Err(TransportError::PROTOCOL_VIOLATION(
4836                                "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4837                            ));
4838                        }
4839                        if path_id > self.local_max_path_id {
4840                            return Err(TransportError::PROTOCOL_VIOLATION(
4841                                "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4842                            ));
4843                        }
4844                        path_id
4845                    } else {
4846                        PathId::ZERO
4847                    };
4848
4849                    if let Some(ref path_id) = frame.path_id {
4850                        span.record("path", tracing::field::display(&path_id));
4851                    }
4852
4853                    if self.abandoned_paths.contains(&path_id) {
4854                        trace!("ignoring issued CID for abandoned path");
4855                        continue;
4856                    }
4857                    let remote_cids = self
4858                        .remote_cids
4859                        .entry(path_id)
4860                        .or_insert_with(|| CidQueue::new(frame.id));
4861                    if remote_cids.active().is_empty() {
4862                        return Err(TransportError::PROTOCOL_VIOLATION(
4863                            "NEW_CONNECTION_ID when CIDs aren't in use",
4864                        ));
4865                    }
4866                    if frame.retire_prior_to > frame.sequence {
4867                        return Err(TransportError::PROTOCOL_VIOLATION(
4868                            "NEW_CONNECTION_ID retiring unissued CIDs",
4869                        ));
4870                    }
4871
4872                    use crate::cid_queue::InsertError;
4873                    match remote_cids.insert(frame) {
4874                        Ok(None) if self.path(path_id).is_none() => {
4875                            // if this gives us CIDs to open a new path and a nat traversal attempt
4876                            // is underway we could try to probe a pending remote
4877                            self.continue_nat_traversal_round(now);
4878                        }
4879                        Ok(None) => {}
4880                        Ok(Some((retired, reset_token))) => {
4881                            let pending_retired =
4882                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
4883                            /// Ensure `pending_retired` cannot grow without bound. Limit is
4884                            /// somewhat arbitrary but very permissive.
4885                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4886                            // We don't bother counting in-flight frames because those are bounded
4887                            // by congestion control.
4888                            if (pending_retired.len() as u64)
4889                                .saturating_add(retired.end.saturating_sub(retired.start))
4890                                > MAX_PENDING_RETIRED_CIDS
4891                            {
4892                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4893                                    "queued too many retired CIDs",
4894                                ));
4895                            }
4896                            pending_retired.extend(retired.map(|seq| (path_id, seq)));
4897                            self.set_reset_token(path_id, network_path.remote, reset_token);
4898                        }
4899                        Err(InsertError::ExceedsLimit) => {
4900                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4901                        }
4902                        Err(InsertError::Retired) => {
4903                            trace!("discarding already-retired");
4904                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
4905                            // range of connection IDs larger than the active connection ID limit
4906                            // was retired all at once via retire_prior_to.
4907                            self.spaces[SpaceId::Data]
4908                                .pending
4909                                .retire_cids
4910                                .push((path_id, frame.sequence));
4911                            continue;
4912                        }
4913                    };
4914
4915                    if self.side.is_server()
4916                        && path_id == PathId::ZERO
4917                        && self
4918                            .remote_cids
4919                            .get(&PathId::ZERO)
4920                            .map(|cids| cids.active_seq() == 0)
4921                            .unwrap_or_default()
4922                    {
4923                        // We're a server still using the initial remote CID for the client, so
4924                        // let's switch immediately to enable clientside stateless resets.
4925                        self.update_remote_cid(PathId::ZERO);
4926                    }
4927                }
4928                Frame::NewToken(NewToken { token }) => {
4929                    let ConnectionSide::Client {
4930                        token_store,
4931                        server_name,
4932                        ..
4933                    } = &self.side
4934                    else {
4935                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4936                    };
4937                    if token.is_empty() {
4938                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4939                    }
4940                    trace!("got new token");
4941                    token_store.insert(server_name, token);
4942                }
4943                Frame::Datagram(datagram) => {
4944                    if self
4945                        .datagrams
4946                        .received(datagram, &self.config.datagram_receive_buffer_size)?
4947                    {
4948                        self.events.push_back(Event::DatagramReceived);
4949                    }
4950                }
4951                Frame::AckFrequency(ack_frequency) => {
4952                    // This frame can only be sent in the Data space
4953
4954                    if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4955                        // The AckFrequency frame is stale (we have already received a more
4956                        // recent one)
4957                        continue;
4958                    }
4959
4960                    // Update the params for all of our paths
4961                    for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4962                        space.pending_acks.set_ack_frequency_params(&ack_frequency);
4963
4964                        // Our `max_ack_delay` has been updated, so we may need to adjust
4965                        // its associated timeout
4966                        if let Some(timeout) = space
4967                            .pending_acks
4968                            .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4969                        {
4970                            self.timers.set(
4971                                Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4972                                timeout,
4973                                self.qlog.with_time(now),
4974                            );
4975                        }
4976                    }
4977                }
4978                Frame::ImmediateAck => {
4979                    // This frame can only be sent in the Data space
4980                    for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4981                        pns.pending_acks.set_immediate_ack_required();
4982                    }
4983                }
4984                Frame::HandshakeDone => {
4985                    if self.side.is_server() {
4986                        return Err(TransportError::PROTOCOL_VIOLATION(
4987                            "client sent HANDSHAKE_DONE",
4988                        ));
4989                    }
4990                    if self.crypto_state.has_keys(EncryptionLevel::Handshake) {
4991                        self.discard_space(now, SpaceKind::Handshake);
4992                    }
4993                    self.events.push_back(Event::HandshakeConfirmed);
4994                    trace!("handshake confirmed");
4995                }
4996                Frame::ObservedAddr(observed) => {
4997                    // check if params allows the peer to send report and this node to receive it
4998                    trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4999                    if !self
5000                        .peer_params
5001                        .address_discovery_role
5002                        .should_report(&self.config.address_discovery_role)
5003                    {
5004                        return Err(TransportError::PROTOCOL_VIOLATION(
5005                            "received OBSERVED_ADDRESS frame when not negotiated",
5006                        ));
5007                    }
5008                    // must only be sent in data space
5009                    if packet.header.space() != SpaceKind::Data {
5010                        return Err(TransportError::PROTOCOL_VIOLATION(
5011                            "OBSERVED_ADDRESS frame outside data space",
5012                        ));
5013                    }
5014
5015                    let path = self.path_data_mut(path_id);
5016                    if network_path == path.network_path {
5017                        if let Some(updated) = path.update_observed_addr_report(observed)
5018                            && path.open_status == paths::OpenStatus::Informed
5019                        {
5020                            self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5021                                id: path_id,
5022                                addr: updated,
5023                            }));
5024                            // otherwise the event is reported when the path is deemed open
5025                        }
5026                    } else {
5027                        // include in migration
5028                        migration_observed_addr = Some(observed)
5029                    }
5030                }
5031                Frame::PathAbandon(frame::PathAbandon {
5032                    path_id,
5033                    error_code,
5034                }) => {
5035                    span.record("path", tracing::field::display(&path_id));
5036                    match self.close_path_inner(
5037                        now,
5038                        path_id,
5039                        PathAbandonReason::RemoteAbandoned {
5040                            error_code: error_code.into(),
5041                        },
5042                    ) {
5043                        Ok(()) => {
5044                            trace!("peer abandoned path");
5045                        }
5046                        Err(ClosePathError::LastOpenPath) => {
5047                            trace!("peer abandoned last path, closing connection");
5048                            return Err(TransportError::NO_VIABLE_PATH(
5049                                "last path abandoned by peer",
5050                            ));
5051                        }
5052                        Err(ClosePathError::ClosedPath) => {
5053                            trace!("peer abandoned already closed path");
5054                        }
5055                        Err(ClosePathError::MultipathNotNegotiated) => {
5056                            return Err(TransportError::PROTOCOL_VIOLATION(
5057                                "received PATH_ABANDON frame when multipath was not negotiated",
5058                            ));
5059                        }
5060                    };
5061
5062                    // Start draining the path if it still exists and hasn't started draining yet.
5063                    if let Some(path) = self.paths.get_mut(&path_id)
5064                        && !mem::replace(&mut path.data.draining, true)
5065                    {
5066                        let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5067                        let pto = path.data.rtt.pto_base() + ack_delay;
5068                        self.timers.set(
5069                            Timer::PerPath(path_id, PathTimer::DiscardPath),
5070                            now + 3 * pto,
5071                            self.qlog.with_time(now),
5072                        );
5073
5074                        self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
5075                    }
5076                }
5077                Frame::PathStatusAvailable(info) => {
5078                    span.record("path", tracing::field::display(&info.path_id));
5079                    if self.is_multipath_negotiated() {
5080                        self.on_path_status(
5081                            info.path_id,
5082                            PathStatus::Available,
5083                            info.status_seq_no,
5084                        );
5085                    } else {
5086                        return Err(TransportError::PROTOCOL_VIOLATION(
5087                            "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
5088                        ));
5089                    }
5090                }
5091                Frame::PathStatusBackup(info) => {
5092                    span.record("path", tracing::field::display(&info.path_id));
5093                    if self.is_multipath_negotiated() {
5094                        self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
5095                    } else {
5096                        return Err(TransportError::PROTOCOL_VIOLATION(
5097                            "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
5098                        ));
5099                    }
5100                }
5101                Frame::MaxPathId(frame::MaxPathId(path_id)) => {
5102                    span.record("path", tracing::field::display(&path_id));
5103                    if !self.is_multipath_negotiated() {
5104                        return Err(TransportError::PROTOCOL_VIOLATION(
5105                            "received MAX_PATH_ID frame when multipath was not negotiated",
5106                        ));
5107                    }
5108                    // frames that do not increase the path id are ignored
5109                    if path_id > self.remote_max_path_id {
5110                        self.remote_max_path_id = path_id;
5111                        self.issue_first_path_cids(now);
5112                        while let Some(true) = self.continue_nat_traversal_round(now) {}
5113                    }
5114                }
5115                Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
5116                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
5117                    // be treated as a connection error of type PROTOCOL_VIOLATION.
5118                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
5119                    if self.is_multipath_negotiated() {
5120                        if max_path_id > self.local_max_path_id {
5121                            return Err(TransportError::PROTOCOL_VIOLATION(
5122                                "PATHS_BLOCKED maximum path identifier was larger than local maximum",
5123                            ));
5124                        }
5125                        debug!("received PATHS_BLOCKED({:?})", max_path_id);
5126                        // TODO(@divma): ensure max concurrent paths
5127                    } else {
5128                        return Err(TransportError::PROTOCOL_VIOLATION(
5129                            "received PATHS_BLOCKED frame when not multipath was not negotiated",
5130                        ));
5131                    }
5132                }
5133                Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
5134                    // Nothing to do.  This is recorded in the frame stats, but otherwise we
5135                    // always issue all CIDs we're allowed to issue, so either this is an
5136                    // impatient peer or a bug on our side.
5137
5138                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
5139                    // be treated as a connection error of type PROTOCOL_VIOLATION.
5140                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
5141                    if self.is_multipath_negotiated() {
5142                        if path_id > self.local_max_path_id {
5143                            return Err(TransportError::PROTOCOL_VIOLATION(
5144                                "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
5145                            ));
5146                        }
5147                        if next_seq.0
5148                            > self
5149                                .local_cid_state
5150                                .get(&path_id)
5151                                .map(|cid_state| cid_state.active_seq().1 + 1)
5152                                .unwrap_or_default()
5153                        {
5154                            return Err(TransportError::PROTOCOL_VIOLATION(
5155                                "PATH_CIDS_BLOCKED next sequence number larger than in local state",
5156                            ));
5157                        }
5158                        debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
5159                    } else {
5160                        return Err(TransportError::PROTOCOL_VIOLATION(
5161                            "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
5162                        ));
5163                    }
5164                }
5165                Frame::AddAddress(addr) => {
5166                    let client_state = match self.n0_nat_traversal.client_side_mut() {
5167                        Ok(state) => state,
5168                        Err(err) => {
5169                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
5170                                "Nat traversal(ADD_ADDRESS): {err}"
5171                            )));
5172                        }
5173                    };
5174
5175                    if !client_state.check_remote_address(&addr) {
5176                        // if the address is not valid we flag it, but update anyway
5177                        warn!(?addr, "server sent illegal ADD_ADDRESS frame");
5178                    }
5179
5180                    match client_state.add_remote_address(addr) {
5181                        Ok(maybe_added) => {
5182                            if let Some(added) = maybe_added {
5183                                self.events.push_back(Event::NatTraversal(
5184                                    n0_nat_traversal::Event::AddressAdded(added),
5185                                ));
5186                            }
5187                        }
5188                        Err(e) => {
5189                            warn!(%e, "failed to add remote address")
5190                        }
5191                    }
5192                }
5193                Frame::RemoveAddress(addr) => {
5194                    let client_state = match self.n0_nat_traversal.client_side_mut() {
5195                        Ok(state) => state,
5196                        Err(err) => {
5197                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
5198                                "Nat traversal(REMOVE_ADDRESS): {err}"
5199                            )));
5200                        }
5201                    };
5202                    if let Some(removed_addr) = client_state.remove_remote_address(addr) {
5203                        self.events.push_back(Event::NatTraversal(
5204                            n0_nat_traversal::Event::AddressRemoved(removed_addr),
5205                        ));
5206                    }
5207                }
5208                Frame::ReachOut(reach_out) => {
5209                    let ipv6 = self.is_ipv6();
5210                    let server_state = match self.n0_nat_traversal.server_side_mut() {
5211                        Ok(state) => state,
5212                        Err(err) => {
5213                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
5214                                "Nat traversal(REACH_OUT): {err}"
5215                            )));
5216                        }
5217                    };
5218
5219                    if let Err(err) = server_state.handle_reach_out(reach_out, ipv6) {
5220                        return Err(TransportError::PROTOCOL_VIOLATION(format!(
5221                            "Nat traversal(REACH_OUT): {err}"
5222                        )));
5223                    }
5224                }
5225            }
5226        }
5227
5228        let space = self.spaces[SpaceId::Data].for_path(path_id);
5229        if space
5230            .pending_acks
5231            .packet_received(now, number, ack_eliciting, &space.dedup)
5232        {
5233            if self.abandoned_paths.contains(&path_id) {
5234                // § 3.4.3 QUIC-MULTIPATH: promptly send ACKs for packets received from
5235                // abandoned paths.
5236                space.pending_acks.set_immediate_ack_required();
5237            } else {
5238                self.timers.set(
5239                    Timer::PerPath(path_id, PathTimer::MaxAckDelay),
5240                    now + self.ack_frequency.max_ack_delay,
5241                    self.qlog.with_time(now),
5242                );
5243            }
5244        }
5245
5246        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
5247        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
5248        // are only freed, and hence only issue credit, once the application has been notified
5249        // during a read on the stream.
5250        let pending = &mut self.spaces[SpaceId::Data].pending;
5251        self.streams.queue_max_stream_id(pending);
5252
5253        if let Some(reason) = close {
5254            self.state.move_to_draining(Some(reason.into()));
5255            self.connection_close_pending = true;
5256        }
5257
5258        if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
5259            && !is_probing_packet
5260            && network_path != self.path_data(path_id).network_path
5261        {
5262            let ConnectionSide::Server { ref server_config } = self.side else {
5263                panic!("packets from unknown remote should be dropped by clients");
5264            };
5265            debug_assert!(
5266                server_config.migration,
5267                "migration-initiating packets should have been dropped immediately"
5268            );
5269            self.migrate(path_id, now, network_path, migration_observed_addr);
5270            // Break linkability, if possible
5271            self.update_remote_cid(path_id);
5272            self.spin = false;
5273        }
5274
5275        Ok(())
5276    }
5277
5278    fn migrate(
5279        &mut self,
5280        path_id: PathId,
5281        now: Instant,
5282        network_path: FourTuple,
5283        observed_addr: Option<ObservedAddr>,
5284    ) {
5285        trace!(%network_path, %path_id, "migration initiated");
5286        self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
5287        // TODO(@divma): conditions for path migration in multipath are very specific, check them
5288        // again to prevent path migrations that should actually create a new path
5289
5290        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
5291        // Note that the congestion window will not grow until validation terminates. Helps mitigate
5292        // amplification attacks performed by spoofing source addresses.
5293        let prev_pto = self.pto(SpaceKind::Data, path_id);
5294        let path = self.paths.get_mut(&path_id).expect("known path");
5295        let mut new_path_data = if network_path.remote.is_ipv4()
5296            && network_path.remote.ip() == path.data.network_path.remote.ip()
5297        {
5298            PathData::from_previous(network_path, &path.data, self.path_generation_counter, now)
5299        } else {
5300            let peer_max_udp_payload_size =
5301                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
5302                    .unwrap_or(u16::MAX);
5303            PathData::new(
5304                network_path,
5305                self.allow_mtud,
5306                Some(peer_max_udp_payload_size),
5307                self.path_generation_counter,
5308                now,
5309                &self.config,
5310            )
5311        };
5312        new_path_data.last_observed_addr_report = path.data.last_observed_addr_report.clone();
5313        if let Some(report) = observed_addr
5314            && let Some(updated) = new_path_data.update_observed_addr_report(report)
5315        {
5316            tracing::info!("adding observed addr event from migration");
5317            self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5318                id: path_id,
5319                addr: updated,
5320            }));
5321        }
5322        new_path_data.pending_on_path_challenge = true;
5323
5324        let mut prev_path_data = mem::replace(&mut path.data, new_path_data);
5325
5326        // Only store this as previous path if it was validated. For all we know there could
5327        // already be a previous path stored which might have been validated in the past,
5328        // which is more valuable than one that's not yet validated.
5329        //
5330        // With multipath it is possible that there are no remote CIDs for the path ID
5331        // yet. In this case we would never have sent on this path yet and would not be able
5332        // to send a PATH_CHALLENGE either, which is currently a fire-and-forget affair
5333        // anyway. So don't store such a path either.
5334        if !prev_path_data.validated
5335            && let Some(cid) = self.remote_cids.get(&path_id).map(CidQueue::active)
5336        {
5337            prev_path_data.pending_on_path_challenge = true;
5338            // We haven't updated the remote CID yet, this captures the remote CID we were using on
5339            // the previous path.
5340            path.prev = Some((cid, prev_path_data));
5341        }
5342
5343        // We need to re-assign the correct remote to this path in qlog
5344        self.qlog.emit_tuple_assigned(path_id, network_path, now);
5345
5346        self.timers.set(
5347            Timer::PerPath(path_id, PathTimer::PathValidation),
5348            now + 3 * cmp::max(self.pto(SpaceKind::Data, path_id), prev_pto),
5349            self.qlog.with_time(now),
5350        );
5351    }
5352
5353    /// Handle a change in the local address, i.e. an active migration
5354    ///
5355    /// In the general (non-multipath) case, paths will perform a RFC9000 migration and be pinged
5356    /// for a liveness check. This is the behaviour of a path assumed to be recoverable, even if
5357    /// this is not the case.
5358    ///
5359    /// Clients in a connection in which multipath has been negotiated should migrate paths to new
5360    /// [`PathId`]s. For paths that are known to be non-recoverable can be migrated to a new
5361    /// [`PathId`] by closing the current path, and opening a new one to the same remote. Treating
5362    /// paths as non recoverable when necessary accelerates connectivity re-establishment, or might
5363    /// allow it altogether.
5364    ///
5365    /// The optional `hint` allows callers to indicate when paths are non-recoverable and should be
5366    /// migrated to new a [`PathId`].
5367    // NOTE: only clients are allowed to migrate, but generally dealing with RFC9000 migrations is
5368    // lacking <https://github.com/n0-computer/noq/issues/364>
5369    pub fn handle_network_change(&mut self, hint: Option<&dyn NetworkChangeHint>, now: Instant) {
5370        debug!("network changed");
5371        if self.state.is_drained() {
5372            return;
5373        }
5374        if self.highest_space < SpaceKind::Data {
5375            for path in self.paths.values_mut() {
5376                // Clear the local address for it to be obtained from the socket again.
5377                path.data.network_path.local_ip = None;
5378            }
5379
5380            self.update_remote_cid(PathId::ZERO);
5381            self.ping();
5382
5383            return;
5384        }
5385
5386        // Paths that can't recover so a new path should be open instead. If multipath is not
5387        // negotiated, this will be empty.
5388        let mut non_recoverable_paths = Vec::default();
5389        let mut recoverable_paths = Vec::default();
5390        let mut open_paths = 0;
5391
5392        let is_multipath_negotiated = self.is_multipath_negotiated();
5393        let is_client = self.side().is_client();
5394        let immediate_ack_allowed = self.peer_supports_ack_frequency();
5395
5396        for (path_id, path) in self.paths.iter_mut() {
5397            if self.abandoned_paths.contains(path_id) {
5398                continue;
5399            }
5400            open_paths += 1;
5401
5402            // Clear the local address for it to be obtained from the socket again. This applies to
5403            // all paths, regardless of being considered recoverable or not
5404            path.data.network_path.local_ip = None;
5405
5406            let network_path = path.data.network_path;
5407            let remote = network_path.remote;
5408
5409            // Without multipath, the connection tries to recover the single path, whereas with
5410            // multipath, even in a single-path scenario, we attempt to migrate the path to a new
5411            // PathId.
5412            let attempt_to_recover = if is_multipath_negotiated {
5413                if is_client {
5414                    hint.map(|h| h.is_path_recoverable(*path_id, network_path))
5415                        .unwrap_or(false)
5416                } else {
5417                    // Servers should have stable addresses so this scenario is generally discouraged.
5418                    // There is no way to prevent this, so the best hope is to attempt to recover the
5419                    // path
5420                    true
5421                }
5422            } else {
5423                // In the non multipath case, we try to recover the single active path
5424                true
5425            };
5426
5427            if attempt_to_recover {
5428                recoverable_paths.push((*path_id, remote));
5429            } else {
5430                non_recoverable_paths.push((*path_id, remote, path.data.local_status()))
5431            }
5432        }
5433
5434        /* NON RECOVERABLE PATHS */
5435        // This are handled first, so that in case the treatment intended for these fails, we can
5436        // go the recoverable route instead.
5437
5438        // Decide if we need to close first or open first in the multipath case.
5439        // - Opening first has a higher risk of getting limited by the negotiated MAX_PATH_ID.
5440        // - Closing first risks this being the only open path.
5441        // We prefer closing paths first unless we identify this is the last open path.
5442        let open_first = open_paths == non_recoverable_paths.len();
5443
5444        for (path_id, remote, status) in non_recoverable_paths.into_iter() {
5445            let network_path = FourTuple {
5446                remote,
5447                local_ip: None, /* allow the local ip to be discovered */
5448            };
5449
5450            if open_first && let Err(e) = self.open_path(network_path, status, now) {
5451                debug!(%e,"Failed to open new path for network change");
5452                // if this fails, let the path try to recover itself
5453                recoverable_paths.push((path_id, remote));
5454                continue;
5455            }
5456
5457            if let Err(e) =
5458                self.close_path_inner(now, path_id, PathAbandonReason::UnusableAfterNetworkChange)
5459            {
5460                debug!(%e,"Failed to close unrecoverable path after network change");
5461                recoverable_paths.push((path_id, remote));
5462                continue;
5463            }
5464
5465            if !open_first && let Err(e) = self.open_path(network_path, status, now) {
5466                // Path has already been closed if we got here. Since the path was not recoverable,
5467                // this might be desirable in any case, because other paths exist (!open_first) and
5468                // this was is considered non recoverable
5469                debug!(%e,"Failed to open new path for network change");
5470            }
5471        }
5472
5473        /* RECOVERABLE PATHS */
5474
5475        for (path_id, remote) in recoverable_paths.into_iter() {
5476            let space = &mut self.spaces[SpaceId::Data];
5477
5478            // Schedule a Ping for a liveness check.
5479            if let Some(path_space) = space.number_spaces.get_mut(&path_id) {
5480                path_space.ping_pending = true;
5481
5482                if immediate_ack_allowed {
5483                    path_space.immediate_ack_pending = true;
5484                }
5485            }
5486
5487            let Some((reset_token, retired)) =
5488                self.remote_cids.get_mut(&path_id).and_then(CidQueue::next)
5489            else {
5490                continue;
5491            };
5492
5493            // Retire the current remote CID and any CIDs we had to skip.
5494            space
5495                .pending
5496                .retire_cids
5497                .extend(retired.map(|seq| (path_id, seq)));
5498
5499            debug_assert!(!self.state.is_drained()); // required for endpoint_events, checked above
5500            self.endpoint_events
5501                .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5502        }
5503    }
5504
5505    /// Switch to a previously unused remote connection ID, if possible
5506    fn update_remote_cid(&mut self, path_id: PathId) {
5507        let Some((reset_token, retired)) = self
5508            .remote_cids
5509            .get_mut(&path_id)
5510            .and_then(|cids| cids.next())
5511        else {
5512            return;
5513        };
5514
5515        // Retire the current remote CID and any CIDs we had to skip.
5516        self.spaces[SpaceId::Data]
5517            .pending
5518            .retire_cids
5519            .extend(retired.map(|seq| (path_id, seq)));
5520        let remote = self.path_data(path_id).network_path.remote;
5521        self.set_reset_token(path_id, remote, reset_token);
5522    }
5523
5524    /// Sends this reset token to the endpoint
5525    ///
5526    /// The endpoint needs to know the reset tokens issued by the peer, so that if the peer
5527    /// sends a reset token it knows to route it to this connection. See RFC 9000 section
5528    /// 10.3. Stateless Reset.
5529    ///
5530    /// Reset tokens are different for each path, the endpoint identifies paths by peer
5531    /// socket address however, not by path ID.
5532    fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5533        debug_assert!(!self.state.is_drained()); // required for endpoint events, set_reset_token is never called for drained connections
5534        self.endpoint_events
5535            .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5536
5537        // During the handshake the server sends a reset token in the transport
5538        // parameters. When we are the client and we receive the reset token during the
5539        // handshake we want this to affect our peer transport parameters.
5540        // TODO(flub): Pretty sure this is pointless, the entire params is overwritten
5541        //    shortly after this was called.  And then the params don't have this anymore.
5542        if path_id == PathId::ZERO {
5543            self.peer_params.stateless_reset_token = Some(reset_token);
5544        }
5545    }
5546
5547    /// Issue an initial set of connection IDs to the peer upon connection
5548    fn issue_first_cids(&mut self, now: Instant) {
5549        if self
5550            .local_cid_state
5551            .get(&PathId::ZERO)
5552            .expect("PathId::ZERO exists when the connection is created")
5553            .cid_len()
5554            == 0
5555        {
5556            return;
5557        }
5558
5559        // Subtract 1 to account for the CID we supplied while handshaking
5560        let mut n = self.peer_params.issue_cids_limit() - 1;
5561        if let ConnectionSide::Server { server_config } = &self.side
5562            && server_config.has_preferred_address()
5563        {
5564            // We also sent a CID in the transport parameters
5565            n -= 1;
5566        }
5567        debug_assert!(!self.state.is_drained()); // requirement for endpoint_events
5568        self.endpoint_events
5569            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5570    }
5571
5572    /// Issues an initial set of CIDs for paths that have not yet had any CIDs issued
5573    ///
5574    /// Later CIDs are issued when CIDs expire or are retired by the peer.
5575    fn issue_first_path_cids(&mut self, now: Instant) {
5576        if let Some(max_path_id) = self.max_path_id() {
5577            let mut path_id = self.max_path_id_with_cids.next();
5578            while path_id <= max_path_id {
5579                self.endpoint_events
5580                    .push_back(EndpointEventInner::NeedIdentifiers(
5581                        path_id,
5582                        now,
5583                        self.peer_params.issue_cids_limit(),
5584                    ));
5585                path_id = path_id.next();
5586            }
5587            self.max_path_id_with_cids = max_path_id;
5588        }
5589    }
5590
5591    /// Populates a packet with frames
5592    ///
5593    /// This tries to fit as many frames as possible into the packet.
5594    ///
5595    /// *path_exclusive_only* means to only build frames which can only be sent on this
5596    /// *path.  This is used in multipath for backup paths while there is still an active
5597    /// *path.
5598    fn populate_packet<'a, 'b>(
5599        &mut self,
5600        now: Instant,
5601        space_id: SpaceId,
5602        path_id: PathId,
5603        scheduling_info: &PathSchedulingInfo,
5604        builder: &mut PacketBuilder<'a, 'b>,
5605    ) {
5606        let is_multipath_negotiated = self.is_multipath_negotiated();
5607        let space_has_keys = self.crypto_state.has_keys(space_id.encryption_level());
5608        let is_0rtt = space_id == SpaceId::Data && !space_has_keys;
5609        let stats = &mut self.stats.frame_tx;
5610        let space = &mut self.spaces[space_id];
5611        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5612        space
5613            .for_path(path_id)
5614            .pending_acks
5615            .maybe_ack_non_eliciting();
5616
5617        // HANDSHAKE_DONE
5618        if !is_0rtt
5619            && !scheduling_info.is_abandoned
5620            && scheduling_info.may_send_data
5621            && mem::replace(&mut space.pending.handshake_done, false)
5622        {
5623            builder.write_frame(frame::HandshakeDone, stats);
5624        }
5625
5626        // PING
5627        if !scheduling_info.is_abandoned
5628            && mem::replace(&mut space.for_path(path_id).ping_pending, false)
5629        {
5630            builder.write_frame(frame::Ping, stats);
5631        }
5632
5633        // IMMEDIATE_ACK
5634        if !scheduling_info.is_abandoned
5635            && mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false)
5636        {
5637            debug_assert_eq!(
5638                space_id,
5639                SpaceId::Data,
5640                "immediate acks must be sent in the data space"
5641            );
5642            builder.write_frame(frame::ImmediateAck, stats);
5643        }
5644
5645        // ACK
5646        if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
5647            for path_id in space
5648                .number_spaces
5649                .iter_mut()
5650                .filter(|(_, pns)| pns.pending_acks.can_send())
5651                .map(|(&path_id, _)| path_id)
5652                .collect::<Vec<_>>()
5653            {
5654                Self::populate_acks(
5655                    now,
5656                    self.receiving_ecn,
5657                    path_id,
5658                    space_id,
5659                    space,
5660                    is_multipath_negotiated,
5661                    builder,
5662                    stats,
5663                    space_has_keys,
5664                );
5665            }
5666        }
5667
5668        // ACK_FREQUENCY
5669        if !scheduling_info.is_abandoned
5670            && scheduling_info.may_send_data
5671            && mem::replace(&mut space.pending.ack_frequency, false)
5672        {
5673            let sequence_number = self.ack_frequency.next_sequence_number();
5674
5675            // Safe to unwrap because this is always provided when ACK frequency is enabled
5676            let config = self.config.ack_frequency_config.as_ref().unwrap();
5677
5678            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
5679            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5680                path.rtt.get(),
5681                config,
5682                &self.peer_params,
5683            );
5684
5685            let frame = frame::AckFrequency {
5686                sequence: sequence_number,
5687                ack_eliciting_threshold: config.ack_eliciting_threshold,
5688                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5689                reordering_threshold: config.reordering_threshold,
5690            };
5691            builder.write_frame(frame, stats);
5692
5693            self.ack_frequency
5694                .ack_frequency_sent(path_id, builder.packet_number, max_ack_delay);
5695        }
5696
5697        // PATH_CHALLENGE
5698        if !scheduling_info.is_abandoned
5699            && space_id == SpaceId::Data
5700            && path.pending_on_path_challenge
5701            && !self.state.is_closed()
5702            && builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
5703        // we don't want to send new challenges if we are already closing
5704        {
5705            path.pending_on_path_challenge = false;
5706
5707            let token = self.rng.random();
5708            path.record_path_challenge_sent(now, token, path.network_path);
5709            // Generate a new challenge every time we send a new PATH_CHALLENGE
5710            let challenge = frame::PathChallenge(token);
5711            builder.write_frame(challenge, stats);
5712            builder.require_padding();
5713            let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5714            if path.open_status == paths::OpenStatus::Pending {
5715                path.open_status = paths::OpenStatus::Sent;
5716                self.timers.set(
5717                    Timer::PerPath(path_id, PathTimer::PathOpen),
5718                    now + 3 * pto,
5719                    self.qlog.with_time(now),
5720                );
5721            }
5722
5723            self.timers.set(
5724                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5725                now + pto,
5726                self.qlog.with_time(now),
5727            );
5728
5729            if is_multipath_negotiated && !path.validated && path.pending_on_path_challenge {
5730                // queue informing the path status along with the challenge
5731                space.pending.path_status.insert(path_id);
5732            }
5733
5734            // Always include an OBSERVED_ADDR frame with a PATH_CHALLENGE, regardless
5735            // of whether one has already been sent on this path.
5736            if space_id == SpaceId::Data
5737                && self
5738                    .config
5739                    .address_discovery_role
5740                    .should_report(&self.peer_params.address_discovery_role)
5741            {
5742                let frame = frame::ObservedAddr::new(
5743                    path.network_path.remote,
5744                    self.next_observed_addr_seq_no,
5745                );
5746                if builder.frame_space_remaining() > frame.size() {
5747                    builder.write_frame(frame, stats);
5748
5749                    self.next_observed_addr_seq_no =
5750                        self.next_observed_addr_seq_no.saturating_add(1u8);
5751                    path.observed_addr_sent = true;
5752
5753                    space.pending.observed_addr = false;
5754                }
5755            }
5756        }
5757
5758        // PATH_RESPONSE
5759        if !scheduling_info.is_abandoned
5760            && space_id == SpaceId::Data
5761            && builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
5762            && let Some(token) = path.path_responses.pop_on_path(path.network_path)
5763        {
5764            let response = frame::PathResponse(token);
5765            builder.write_frame(response, stats);
5766            builder.require_padding();
5767
5768            // NOTE: this is technically not required but might be useful to ride the
5769            // request/response nature of path challenges to refresh an observation
5770            // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
5771            if space_id == SpaceId::Data
5772                && self
5773                    .config
5774                    .address_discovery_role
5775                    .should_report(&self.peer_params.address_discovery_role)
5776            {
5777                let frame = frame::ObservedAddr::new(
5778                    path.network_path.remote,
5779                    self.next_observed_addr_seq_no,
5780                );
5781                if builder.frame_space_remaining() > frame.size() {
5782                    builder.write_frame(frame, stats);
5783
5784                    self.next_observed_addr_seq_no =
5785                        self.next_observed_addr_seq_no.saturating_add(1u8);
5786                    path.observed_addr_sent = true;
5787
5788                    space.pending.observed_addr = false;
5789                }
5790            }
5791        }
5792
5793        // REACH_OUT
5794        if !scheduling_info.is_abandoned
5795            && scheduling_info.may_send_data
5796            && let Some((round, addresses)) = space.pending.reach_out.as_mut()
5797        {
5798            while let Some(local_addr) = addresses.iter().next().copied() {
5799                let local_addr = addresses.take(&local_addr).expect("found from iter");
5800                let reach_out = frame::ReachOut::new(*round, local_addr);
5801                if builder.frame_space_remaining() > reach_out.size() {
5802                    builder.write_frame(reach_out, stats);
5803                } else {
5804                    addresses.insert(local_addr);
5805                    break;
5806                }
5807            }
5808            if addresses.is_empty() {
5809                space.pending.reach_out = None;
5810            }
5811        }
5812
5813        // PATH_ABANDON
5814        if space_id == SpaceId::Data
5815            && scheduling_info.is_abandoned
5816            && scheduling_info.may_self_abandon
5817            && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5818            && let Some(error_code) = space.pending.path_abandon.remove(&path_id)
5819        {
5820            let frame = frame::PathAbandon {
5821                path_id,
5822                error_code,
5823            };
5824            builder.write_frame(frame, stats);
5825
5826            // Consider remotely issued CIDs as retired now that we have sent this frame at
5827            // least once.
5828            self.remote_cids.remove(&path_id);
5829        }
5830        while space_id == SpaceId::Data
5831            && scheduling_info.may_send_data
5832            && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5833            && let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
5834        {
5835            let frame = frame::PathAbandon {
5836                path_id: abandoned_path_id,
5837                error_code,
5838            };
5839            builder.write_frame(frame, stats);
5840
5841            // Consider remotely issued CIDs as retired now that we have sent this frame at
5842            // least once.
5843            self.remote_cids.remove(&abandoned_path_id);
5844        }
5845
5846        // OBSERVED_ADDR
5847        if !scheduling_info.is_abandoned
5848            && scheduling_info.may_send_data
5849            && space_id == SpaceId::Data
5850            && self
5851                .config
5852                .address_discovery_role
5853                .should_report(&self.peer_params.address_discovery_role)
5854            && (!path.observed_addr_sent || space.pending.observed_addr)
5855        {
5856            let frame =
5857                frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
5858            if builder.frame_space_remaining() > frame.size() {
5859                builder.write_frame(frame, stats);
5860
5861                self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5862                path.observed_addr_sent = true;
5863
5864                space.pending.observed_addr = false;
5865            }
5866        }
5867
5868        // CRYPTO
5869        while !is_0rtt
5870            && !scheduling_info.is_abandoned
5871            && scheduling_info.may_send_data
5872            && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
5873        {
5874            let Some(mut frame) = space.pending.crypto.pop_front() else {
5875                break;
5876            };
5877
5878            // Calculate the maximum amount of crypto data we can store in the buffer.
5879            // Since the offset is known, we can reserve the exact size required to encode it.
5880            // For length we reserve 2bytes which allows to encode up to 2^14,
5881            // which is more than what fits into normally sized QUIC frames.
5882            let max_crypto_data_size = builder.frame_space_remaining()
5883                - 1 // Frame Type
5884                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5885                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
5886
5887            let len = frame
5888                .data
5889                .len()
5890                .min(2usize.pow(14) - 1)
5891                .min(max_crypto_data_size);
5892
5893            let data = frame.data.split_to(len);
5894            let offset = frame.offset;
5895            let truncated = frame::Crypto { offset, data };
5896            builder.write_frame(truncated, stats);
5897
5898            if !frame.data.is_empty() {
5899                frame.offset += len as u64;
5900                space.pending.crypto.push_front(frame);
5901            }
5902        }
5903
5904        // PATH_STATUS_AVAILABLE & PATH_STATUS_BACKUP
5905        while space_id == SpaceId::Data
5906            && !scheduling_info.is_abandoned
5907            && scheduling_info.may_send_data
5908            && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
5909        {
5910            let Some(path_id) = space.pending.path_status.pop_first() else {
5911                break;
5912            };
5913            let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5914                trace!(%path_id, "discarding queued path status for unknown path");
5915                continue;
5916            };
5917
5918            let seq = path.status.seq();
5919            match path.local_status() {
5920                PathStatus::Available => {
5921                    let frame = frame::PathStatusAvailable {
5922                        path_id,
5923                        status_seq_no: seq,
5924                    };
5925                    builder.write_frame(frame, stats);
5926                }
5927                PathStatus::Backup => {
5928                    let frame = frame::PathStatusBackup {
5929                        path_id,
5930                        status_seq_no: seq,
5931                    };
5932                    builder.write_frame(frame, stats);
5933                }
5934            }
5935        }
5936
5937        // MAX_PATH_ID
5938        if space_id == SpaceId::Data
5939            && !scheduling_info.is_abandoned
5940            && scheduling_info.may_send_data
5941            && space.pending.max_path_id
5942            && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
5943        {
5944            let frame = frame::MaxPathId(self.local_max_path_id);
5945            builder.write_frame(frame, stats);
5946            space.pending.max_path_id = false;
5947        }
5948
5949        // PATHS_BLOCKED
5950        if space_id == SpaceId::Data
5951            && !scheduling_info.is_abandoned
5952            && scheduling_info.may_send_data
5953            && space.pending.paths_blocked
5954            && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5955        {
5956            let frame = frame::PathsBlocked(self.remote_max_path_id);
5957            builder.write_frame(frame, stats);
5958            space.pending.paths_blocked = false;
5959        }
5960
5961        // PATH_CIDS_BLOCKED
5962        while space_id == SpaceId::Data
5963            && !scheduling_info.is_abandoned
5964            && scheduling_info.may_send_data
5965            && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5966        {
5967            let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
5968                break;
5969            };
5970            let next_seq = match self.remote_cids.get(&path_id) {
5971                Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
5972                None => VarInt(0),
5973            };
5974            let frame = frame::PathCidsBlocked { path_id, next_seq };
5975            builder.write_frame(frame, stats);
5976        }
5977
5978        // RESET_STREAM, STOP_SENDING, MAX_DATA, MAX_STREAM_DATA, MAX_STREAMS
5979        if space_id == SpaceId::Data
5980            && !scheduling_info.is_abandoned
5981            && scheduling_info.may_send_data
5982        {
5983            self.streams
5984                .write_control_frames(builder, &mut space.pending, stats);
5985        }
5986
5987        // NEW_CONNECTION_ID
5988        let cid_len = self
5989            .local_cid_state
5990            .values()
5991            .map(|cid_state| cid_state.cid_len())
5992            .max()
5993            .expect("some local CID state must exist");
5994        let new_cid_size_bound =
5995            frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5996        while !scheduling_info.is_abandoned
5997            && scheduling_info.may_send_data
5998            && builder.frame_space_remaining() > new_cid_size_bound
5999        {
6000            let Some(issued) = space.pending.new_cids.pop() else {
6001                break;
6002            };
6003            let retire_prior_to = self
6004                .local_cid_state
6005                .get(&issued.path_id)
6006                .map(|cid_state| cid_state.retire_prior_to())
6007                .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
6008
6009            let cid_path_id = match is_multipath_negotiated {
6010                true => Some(issued.path_id),
6011                false => {
6012                    debug_assert_eq!(issued.path_id, PathId::ZERO);
6013                    None
6014                }
6015            };
6016            let frame = frame::NewConnectionId {
6017                path_id: cid_path_id,
6018                sequence: issued.sequence,
6019                retire_prior_to,
6020                id: issued.id,
6021                reset_token: issued.reset_token,
6022            };
6023            builder.write_frame(frame, stats);
6024        }
6025
6026        // RETIRE_CONNECTION_ID
6027        let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
6028        while !scheduling_info.is_abandoned
6029            && scheduling_info.may_send_data
6030            && builder.frame_space_remaining() > retire_cid_bound
6031        {
6032            let (path_id, sequence) = match space.pending.retire_cids.pop() {
6033                Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
6034                Some((path_id, seq)) => (Some(path_id), seq),
6035                None => break,
6036            };
6037            let frame = frame::RetireConnectionId { path_id, sequence };
6038            builder.write_frame(frame, stats);
6039        }
6040
6041        // DATAGRAM
6042        let mut sent_datagrams = false;
6043        while !scheduling_info.is_abandoned
6044            && scheduling_info.may_send_data
6045            && builder.frame_space_remaining() > Datagram::SIZE_BOUND
6046            && space_id == SpaceId::Data
6047        {
6048            match self.datagrams.write(builder, stats) {
6049                true => {
6050                    sent_datagrams = true;
6051                }
6052                false => break,
6053            }
6054        }
6055        if self.datagrams.send_blocked && sent_datagrams {
6056            self.events.push_back(Event::DatagramsUnblocked);
6057            self.datagrams.send_blocked = false;
6058        }
6059
6060        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6061
6062        // NEW_TOKEN
6063        if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
6064            while let Some(network_path) = space.pending.new_tokens.pop() {
6065                debug_assert_eq!(space_id, SpaceId::Data);
6066                let ConnectionSide::Server { server_config } = &self.side else {
6067                    panic!("NEW_TOKEN frames should not be enqueued by clients");
6068                };
6069
6070                if !network_path.is_probably_same_path(&path.network_path) {
6071                    // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
6072                    // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
6073                    // frames upon an path change. Instead, when the new path becomes validated,
6074                    // NEW_TOKEN frames may be enqueued for the new path instead.
6075                    continue;
6076                }
6077
6078                let token = Token::new(
6079                    TokenPayload::Validation {
6080                        ip: network_path.remote.ip(),
6081                        issued: server_config.time_source.now(),
6082                    },
6083                    &mut self.rng,
6084                );
6085                let new_token = NewToken {
6086                    token: token.encode(&*server_config.token_key).into(),
6087                };
6088
6089                if builder.frame_space_remaining() < new_token.size() {
6090                    space.pending.new_tokens.push(network_path);
6091                    break;
6092                }
6093
6094                builder.write_frame(new_token, stats);
6095                builder.retransmits_mut().new_tokens.push(network_path);
6096            }
6097        }
6098
6099        // STREAM
6100        if !scheduling_info.is_abandoned
6101            && scheduling_info.may_send_data
6102            && space_id == SpaceId::Data
6103        {
6104            self.streams
6105                .write_stream_frames(builder, self.config.send_fairness, stats);
6106        }
6107
6108        // ADD_ADDRESS
6109        while space_id == SpaceId::Data
6110            && !scheduling_info.is_abandoned
6111            && scheduling_info.may_send_data
6112            && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
6113        {
6114            if let Some(added_address) = space.pending.add_address.pop_last() {
6115                builder.write_frame(added_address, stats);
6116            } else {
6117                break;
6118            }
6119        }
6120
6121        // REMOVE_ADDRESS
6122        while space_id == SpaceId::Data
6123            && !scheduling_info.is_abandoned
6124            && scheduling_info.may_send_data
6125            && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
6126        {
6127            if let Some(removed_address) = space.pending.remove_address.pop_last() {
6128                builder.write_frame(removed_address, stats);
6129            } else {
6130                break;
6131            }
6132        }
6133    }
6134
6135    /// Write pending ACKs into a buffer
6136    fn populate_acks<'a, 'b>(
6137        now: Instant,
6138        receiving_ecn: bool,
6139        path_id: PathId,
6140        space_id: SpaceId,
6141        space: &mut PacketSpace,
6142        is_multipath_negotiated: bool,
6143        builder: &mut PacketBuilder<'a, 'b>,
6144        stats: &mut FrameStats,
6145        space_has_keys: bool,
6146    ) {
6147        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
6148        debug_assert!(space_has_keys, "tried to send ACK in 0-RTT");
6149
6150        debug_assert!(
6151            is_multipath_negotiated || path_id == PathId::ZERO,
6152            "Only PathId::ZERO allowed without multipath (have {path_id:?})"
6153        );
6154        if is_multipath_negotiated {
6155            debug_assert!(
6156                space_id == SpaceId::Data || path_id == PathId::ZERO,
6157                "path acks must be sent in 1RTT space (have {space_id:?})"
6158            );
6159        }
6160
6161        let pns = space.for_path(path_id);
6162        let ranges = pns.pending_acks.ranges();
6163        debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
6164        let ecn = if receiving_ecn {
6165            Some(&pns.ecn_counters)
6166        } else {
6167            None
6168        };
6169
6170        let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
6171        // TODO: This should come from `TransportConfig` if that gets configurable.
6172        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
6173        let delay = delay_micros >> ack_delay_exp.into_inner();
6174
6175        if is_multipath_negotiated && space_id == SpaceId::Data {
6176            if !ranges.is_empty() {
6177                let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
6178                builder.write_frame(frame, stats);
6179            }
6180        } else {
6181            builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
6182        }
6183    }
6184
6185    fn close_common(&mut self) {
6186        trace!("connection closed");
6187        self.timers.reset();
6188    }
6189
6190    fn set_close_timer(&mut self, now: Instant) {
6191        // QUIC-MULTIPATH § 2.6 Connection Closure: draining for 3*PTO using the max PTO of
6192        // all paths.
6193        let pto_max = self.max_pto_for_space(self.highest_space);
6194        self.timers.set(
6195            Timer::Conn(ConnTimer::Close),
6196            now + 3 * pto_max,
6197            self.qlog.with_time(now),
6198        );
6199    }
6200
6201    /// Handle transport parameters received from the peer
6202    ///
6203    /// *remote_cid* and *local_cid* are the source and destination CIDs respectively of the
6204    /// *packet into which the transport parameters arrived.
6205    fn handle_peer_params(
6206        &mut self,
6207        params: TransportParameters,
6208        local_cid: ConnectionId,
6209        remote_cid: ConnectionId,
6210        now: Instant,
6211    ) -> Result<(), TransportError> {
6212        if Some(self.original_remote_cid) != params.initial_src_cid
6213            || (self.side.is_client()
6214                && (Some(self.initial_dst_cid) != params.original_dst_cid
6215                    || self.retry_src_cid != params.retry_src_cid))
6216        {
6217            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
6218                "CID authentication failure",
6219            ));
6220        }
6221        if params.initial_max_path_id.is_some() && (local_cid.is_empty() || remote_cid.is_empty()) {
6222            return Err(TransportError::PROTOCOL_VIOLATION(
6223                "multipath must not use zero-length CIDs",
6224            ));
6225        }
6226
6227        self.set_peer_params(params);
6228        self.qlog.emit_peer_transport_params_received(self, now);
6229
6230        Ok(())
6231    }
6232
6233    fn set_peer_params(&mut self, params: TransportParameters) {
6234        self.streams.set_params(&params);
6235        self.idle_timeout =
6236            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
6237        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
6238
6239        if let Some(ref info) = params.preferred_address {
6240            // During the handshake PathId::ZERO exists.
6241            self.remote_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
6242                path_id: None,
6243                sequence: 1,
6244                id: info.connection_id,
6245                reset_token: info.stateless_reset_token,
6246                retire_prior_to: 0,
6247            })
6248            .expect(
6249                "preferred address CID is the first received, and hence is guaranteed to be legal",
6250            );
6251            let remote = self.path_data(PathId::ZERO).network_path.remote;
6252            self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
6253        }
6254        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
6255
6256        let mut multipath_enabled = None;
6257        if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
6258            self.config.get_initial_max_path_id(),
6259            params.initial_max_path_id,
6260        ) {
6261            // multipath is enabled, register the local and remote maximums
6262            self.local_max_path_id = local_max_path_id;
6263            self.remote_max_path_id = remote_max_path_id;
6264            let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
6265            debug!(%initial_max_path_id, "multipath negotiated");
6266            multipath_enabled = Some(initial_max_path_id);
6267        }
6268
6269        if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
6270            self.config
6271                .max_remote_nat_traversal_addresses
6272                .zip(params.max_remote_nat_traversal_addresses)
6273        {
6274            if let Some(max_initial_paths) =
6275                multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
6276            {
6277                let max_local_addresses = max_remotely_allowed_remote_addresses.get();
6278                let max_remote_addresses = max_locally_allowed_remote_addresses.get();
6279                self.n0_nat_traversal = n0_nat_traversal::State::new(
6280                    max_remote_addresses,
6281                    max_local_addresses,
6282                    self.side(),
6283                );
6284                debug!(
6285                    %max_remote_addresses, %max_local_addresses,
6286                    "n0's nat traversal negotiated"
6287                );
6288
6289                match self.side() {
6290                    Side::Client => {
6291                        if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
6292                            // in this case the client might try to open `max_remote_addresses` new
6293                            // paths, but the current multipath configuration will not allow it
6294                            debug!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
6295                        } else if max_local_addresses as u64
6296                            > params.active_connection_id_limit.into_inner()
6297                        {
6298                            // the server allows us to send at most `params.active_connection_id_limit`
6299                            // but they might need at least `max_local_addresses` to effectively send
6300                            // `PATH_CHALLENGE` frames to each advertised local address
6301                            debug!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
6302                        }
6303                    }
6304                    Side::Server => {
6305                        if (max_initial_paths.as_u32() as u64) < crate::LOCAL_CID_COUNT {
6306                            debug!(%max_initial_paths, local_cid_limit=%crate::LOCAL_CID_COUNT, "local server configuration might cause nat traversal issues")
6307                        }
6308                    }
6309                }
6310            } else {
6311                debug!("n0 nat traversal enabled for both endpoints, but multipath is missing")
6312            }
6313        }
6314
6315        self.peer_params = params;
6316        let peer_max_udp_payload_size =
6317            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
6318        self.path_data_mut(PathId::ZERO)
6319            .mtud
6320            .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
6321    }
6322
6323    /// Decrypts a packet, returning the packet number on success
6324    fn decrypt_packet(
6325        &mut self,
6326        now: Instant,
6327        path_id: PathId,
6328        packet: &mut Packet,
6329    ) -> Result<Option<u64>, Option<TransportError>> {
6330        let result = self
6331            .crypto_state
6332            .decrypt_packet_body(packet, path_id, &self.spaces)?;
6333
6334        let Some(result) = result else {
6335            return Ok(None);
6336        };
6337
6338        if result.outgoing_key_update_acked
6339            && let Some(prev) = self.crypto_state.prev_crypto.as_mut()
6340        {
6341            prev.end_packet = Some((result.number, now));
6342            self.set_key_discard_timer(now, packet.header.space());
6343        }
6344
6345        if result.incoming_key_update {
6346            trace!("key update authenticated");
6347            self.crypto_state
6348                .update_keys(Some((result.number, now)), true);
6349            self.set_key_discard_timer(now, packet.header.space());
6350        }
6351
6352        Ok(Some(result.number))
6353    }
6354
6355    fn peer_supports_ack_frequency(&self) -> bool {
6356        self.peer_params.min_ack_delay.is_some()
6357    }
6358
6359    /// Send an IMMEDIATE_ACK frame to the remote endpoint
6360    ///
6361    /// According to the spec, this will result in an error if the remote endpoint does not support
6362    /// the Acknowledgement Frequency extension
6363    pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
6364        debug_assert_eq!(
6365            self.highest_space,
6366            SpaceKind::Data,
6367            "immediate ack must be written in the data space"
6368        );
6369        self.spaces[SpaceId::Data]
6370            .for_path(path_id)
6371            .immediate_ack_pending = true;
6372    }
6373
6374    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
6375    #[cfg(test)]
6376    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
6377        let ConnectionEventInner::Datagram(DatagramConnectionEvent {
6378            path_id,
6379            first_decode,
6380            remaining,
6381            ..
6382        }) = &event.0
6383        else {
6384            return None;
6385        };
6386
6387        if remaining.is_some() {
6388            panic!("Packets should never be coalesced in tests");
6389        }
6390
6391        let decrypted_header = self
6392            .crypto_state
6393            .unprotect_header(first_decode.clone(), self.peer_params.stateless_reset_token)?;
6394
6395        let mut packet = decrypted_header.packet?;
6396        self.crypto_state
6397            .decrypt_packet_body(&mut packet, *path_id, &self.spaces)
6398            .ok()?;
6399
6400        Some(packet.payload.to_vec())
6401    }
6402
6403    /// The number of bytes of packets containing retransmittable frames that have not been
6404    /// acknowledged or declared lost.
6405    #[cfg(test)]
6406    pub(crate) fn bytes_in_flight(&self) -> u64 {
6407        // TODO(@divma): consider including for multipath?
6408        self.path_data(PathId::ZERO).in_flight.bytes
6409    }
6410
6411    /// Number of bytes worth of non-ack-only packets that may be sent
6412    #[cfg(test)]
6413    pub(crate) fn congestion_window(&self) -> u64 {
6414        let path = self.path_data(PathId::ZERO);
6415        path.congestion
6416            .window()
6417            .saturating_sub(path.in_flight.bytes)
6418    }
6419
6420    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
6421    #[cfg(test)]
6422    pub(crate) fn is_idle(&self) -> bool {
6423        let current_timers = self.timers.values();
6424        current_timers
6425            .into_iter()
6426            .filter(|(timer, _)| {
6427                !matches!(
6428                    timer,
6429                    Timer::Conn(ConnTimer::KeepAlive)
6430                        | Timer::PerPath(_, PathTimer::PathKeepAlive)
6431                        | Timer::Conn(ConnTimer::PushNewCid)
6432                        | Timer::Conn(ConnTimer::KeyDiscard)
6433                )
6434            })
6435            .min_by_key(|(_, time)| *time)
6436            .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6437    }
6438
6439    /// Whether explicit congestion notification is in use on outgoing packets.
6440    #[cfg(test)]
6441    pub(crate) fn using_ecn(&self) -> bool {
6442        self.path_data(PathId::ZERO).sending_ecn
6443    }
6444
6445    /// The number of received bytes in the current path
6446    #[cfg(test)]
6447    pub(crate) fn total_recvd(&self) -> u64 {
6448        self.path_data(PathId::ZERO).total_recvd
6449    }
6450
6451    #[cfg(test)]
6452    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6453        self.local_cid_state
6454            .get(&PathId::ZERO)
6455            .unwrap()
6456            .active_seq()
6457    }
6458
6459    #[cfg(test)]
6460    #[track_caller]
6461    pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6462        self.local_cid_state
6463            .get(&PathId(path_id))
6464            .unwrap()
6465            .active_seq()
6466    }
6467
6468    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
6469    /// with updated `retire_prior_to` field set to `v`
6470    #[cfg(test)]
6471    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6472        let n = self
6473            .local_cid_state
6474            .get_mut(&PathId::ZERO)
6475            .unwrap()
6476            .assign_retire_seq(v);
6477        debug_assert!(!self.state.is_drained()); // requirement for endpoint_events
6478        self.endpoint_events
6479            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6480    }
6481
6482    /// Check the current active remote CID sequence for `PathId::ZERO`
6483    #[cfg(test)]
6484    pub(crate) fn active_remote_cid_seq(&self) -> u64 {
6485        self.remote_cids.get(&PathId::ZERO).unwrap().active_seq()
6486    }
6487
6488    /// Returns the detected maximum udp payload size for the current path
6489    #[cfg(test)]
6490    pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6491        self.path_data(path_id).current_mtu()
6492    }
6493
6494    /// Triggers path validation on all paths
6495    #[cfg(test)]
6496    pub(crate) fn trigger_path_validation(&mut self) {
6497        for path in self.paths.values_mut() {
6498            path.data.pending_on_path_challenge = true;
6499        }
6500    }
6501
6502    /// Simulates a protocol violation error for test purposes.
6503    #[cfg(test)]
6504    pub fn simulate_protocol_violation(&mut self, now: Instant) {
6505        if !self.state.is_closed() {
6506            self.state
6507                .move_to_closed(TransportError::PROTOCOL_VIOLATION("simulated violation"));
6508            self.close_common();
6509            if !self.state.is_drained() {
6510                self.set_close_timer(now);
6511            }
6512            self.connection_close_pending = true;
6513        }
6514    }
6515
6516    /// Whether we have on-path 1-RTT data to send.
6517    ///
6518    /// This checks for frames that can only be sent in the data space (1-RTT):
6519    /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated.
6520    /// - Pending PATH_RESPONSE frames.
6521    /// - Pending data to send in STREAM frames.
6522    /// - Pending DATAGRAM frames to send.
6523    ///
6524    /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that
6525    /// may need to be sent.
6526    fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6527        let space_specific = self.paths.get(&path_id).is_some_and(|path| {
6528            path.data.pending_on_path_challenge || !path.data.path_responses.is_empty()
6529        });
6530
6531        // Stream control frames are checked in PacketSpace::can_send, only check data here.
6532        let other = self.streams.can_send_stream_data()
6533            || self
6534                .datagrams
6535                .outgoing
6536                .front()
6537                .is_some_and(|x| x.size(true) <= max_size);
6538
6539        // All `false` fields are set in PacketSpace::can_send.
6540        SendableFrames {
6541            acks: false,
6542            close: false,
6543            space_specific,
6544            other,
6545        }
6546    }
6547
6548    /// Terminate the connection instantly, without sending a close packet
6549    fn kill(&mut self, reason: ConnectionError) {
6550        self.close_common();
6551        self.state.move_to_drained(Some(reason));
6552        // move_to_drained checks that we were never in drained before, so we
6553        // never sent a `Drained` event before (it's illegal to send more events after drained).
6554        self.endpoint_events.push_back(EndpointEventInner::Drained);
6555    }
6556
6557    /// Storage size required for the largest packet that can be transmitted on all currently
6558    /// available paths
6559    ///
6560    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
6561    ///
6562    /// When multipath is enabled, this value is the minimum MTU across all available paths.
6563    pub fn current_mtu(&self) -> u16 {
6564        self.paths
6565            .iter()
6566            .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6567            .map(|(_path_id, path_state)| path_state.data.current_mtu())
6568            .min()
6569            .expect("There is always at least one available path")
6570    }
6571
6572    /// Size of non-frame data for a 1-RTT packet
6573    ///
6574    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
6575    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
6576    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
6577    /// latency and packet loss.
6578    fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6579        let pn_len = PacketNumber::new(
6580            pn,
6581            self.spaces[SpaceId::Data]
6582                .for_path(path)
6583                .largest_acked_packet
6584                .unwrap_or(0),
6585        )
6586        .len();
6587
6588        // 1 byte for flags
6589        1 + self
6590            .remote_cids
6591            .get(&path)
6592            .map(|cids| cids.active().len())
6593            .unwrap_or(20)      // Max CID len in QUIC v1
6594            + pn_len
6595            + self.tag_len_1rtt()
6596    }
6597
6598    fn predict_1rtt_overhead_no_pn(&self) -> usize {
6599        let pn_len = 4;
6600
6601        let cid_len = self
6602            .remote_cids
6603            .values()
6604            .map(|cids| cids.active().len())
6605            .max()
6606            .unwrap_or(20); // Max CID len in QUIC v1
6607
6608        // 1 byte for flags
6609        1 + cid_len + pn_len + self.tag_len_1rtt()
6610    }
6611
6612    fn tag_len_1rtt(&self) -> usize {
6613        // encryption_keys for Data space returns 1-RTT keys if available, otherwise 0-RTT keys
6614        let packet_crypto = self
6615            .crypto_state
6616            .encryption_keys(SpaceKind::Data, self.side.side())
6617            .map(|(_header, packet, _level)| packet);
6618        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
6619        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
6620        // but that would needlessly prevent sending datagrams during 0-RTT.
6621        packet_crypto.map_or(16, |x| x.tag_len())
6622    }
6623
6624    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
6625    fn on_path_validated(&mut self, path_id: PathId) {
6626        self.path_data_mut(path_id).validated = true;
6627        let ConnectionSide::Server { server_config } = &self.side else {
6628            return;
6629        };
6630        let network_path = self.path_data(path_id).network_path;
6631        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6632        new_tokens.clear();
6633        for _ in 0..server_config.validation_token.sent {
6634            new_tokens.push(network_path);
6635        }
6636    }
6637
6638    /// Handle new path status information: PATH_STATUS_AVAILABLE, PATH_STATUS_BACKUP
6639    fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6640        if let Some(path) = self.paths.get_mut(&path_id) {
6641            path.data.status.remote_update(status, status_seq_no);
6642        } else {
6643            debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6644        }
6645        self.events.push_back(
6646            PathEvent::RemoteStatus {
6647                id: path_id,
6648                status,
6649            }
6650            .into(),
6651        );
6652    }
6653
6654    /// Returns the maximum [`PathId`] to be used for sending in this connection.
6655    ///
6656    /// This is calculated as minimum between the local and remote's maximums when multipath is
6657    /// enabled, or `None` when disabled.
6658    ///
6659    /// For data that's received, we should use [`Self::local_max_path_id`] instead.
6660    /// The reasoning is that the remote might already have updated to its own newer
6661    /// [`Self::max_path_id`] after sending out a `MAX_PATH_ID` frame, but it got re-ordered.
6662    fn max_path_id(&self) -> Option<PathId> {
6663        if self.is_multipath_negotiated() {
6664            Some(self.remote_max_path_id.min(self.local_max_path_id))
6665        } else {
6666            None
6667        }
6668    }
6669
6670    /// Returns whether this connection has a socket that supports IPv6.
6671    ///
6672    /// TODO(matheus23): This is related to noq endpoint state's `ipv6` bool. We should move that info
6673    /// here instead of trying to hack around not knowing it exactly.
6674    fn is_ipv6(&self) -> bool {
6675        self.paths
6676            .values()
6677            .any(|p| p.data.network_path.remote.is_ipv6())
6678    }
6679
6680    /// Add addresses the local endpoint considers are reachable for nat traversal.
6681    pub fn add_nat_traversal_address(
6682        &mut self,
6683        address: SocketAddr,
6684    ) -> Result<(), n0_nat_traversal::Error> {
6685        if let Some(added) = self.n0_nat_traversal.add_local_address(address)? {
6686            self.spaces[SpaceId::Data].pending.add_address.insert(added);
6687        };
6688        Ok(())
6689    }
6690
6691    /// Removes an address the endpoing no longer considers reachable for nat traversal
6692    ///
6693    /// Addresses not present in the set will be silently ignored.
6694    pub fn remove_nat_traversal_address(
6695        &mut self,
6696        address: SocketAddr,
6697    ) -> Result<(), n0_nat_traversal::Error> {
6698        if let Some(removed) = self.n0_nat_traversal.remove_local_address(address)? {
6699            self.spaces[SpaceId::Data]
6700                .pending
6701                .remove_address
6702                .insert(removed);
6703        }
6704        Ok(())
6705    }
6706
6707    /// Get the current local nat traversal addresses
6708    pub fn get_local_nat_traversal_addresses(
6709        &self,
6710    ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6711        self.n0_nat_traversal.get_local_nat_traversal_addresses()
6712    }
6713
6714    /// Get the currently advertised nat traversal addresses by the server
6715    pub fn get_remote_nat_traversal_addresses(
6716        &self,
6717    ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6718        Ok(self
6719            .n0_nat_traversal
6720            .client_side()?
6721            .get_remote_nat_traversal_addresses())
6722    }
6723
6724    /// Attempts to open a path for nat traversal.
6725    ///
6726    /// On success returns the [`PathId`] and remote address of the path.
6727    fn open_nat_traversal_path(
6728        &mut self,
6729        now: Instant,
6730        ip_port: (IpAddr, u16),
6731    ) -> Result<Option<(PathId, SocketAddr)>, PathError> {
6732        let remote = ip_port.into();
6733        // TODO(matheus23): Probe the correct 4-tuple, instead of only a remote address?
6734        // By specifying None for `local_ip`, we do two things: 1. open_path_ensure won't
6735        // generate two paths to the same remote and 2. we let the OS choose which
6736        // interface to use for sending on that path.
6737        let network_path = FourTuple {
6738            remote,
6739            local_ip: None,
6740        };
6741        match self.open_path_ensure(network_path, PathStatus::Backup, now) {
6742            Ok((path_id, path_was_known)) => {
6743                if path_was_known {
6744                    trace!(%path_id, %remote, "nat traversal: path existed for remote");
6745                }
6746                Ok(Some((path_id, remote)))
6747            }
6748            Err(e) => {
6749                debug!(%remote, %e, "nat traversal: failed to probe remote");
6750                Err(e)
6751            }
6752        }
6753    }
6754
6755    /// Initiates a new nat traversal round
6756    ///
6757    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
6758    /// frames, and initiating probing of the known remote addresses. When a new round is
6759    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
6760    ///
6761    /// Returns the server addresses that are now being probed.
6762    /// If addresses fail due to spurious errors, these might succeed later and not be returned in
6763    /// this set.
6764    pub fn initiate_nat_traversal_round(
6765        &mut self,
6766        now: Instant,
6767    ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6768        if self.state.is_closed() {
6769            return Err(n0_nat_traversal::Error::Closed);
6770        }
6771
6772        let ipv6 = self.is_ipv6();
6773        let client_state = self.n0_nat_traversal.client_side_mut()?;
6774        let n0_nat_traversal::NatTraversalRound {
6775            new_round,
6776            reach_out_at,
6777            addresses_to_probe,
6778            prev_round_path_ids,
6779        } = client_state.initiate_nat_traversal_round(ipv6)?;
6780
6781        trace!(%new_round, reach_out=reach_out_at.len(), to_probe=addresses_to_probe.len(),
6782            "initiating nat traversal round");
6783
6784        self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6785
6786        for path_id in prev_round_path_ids {
6787            let Some(path) = self.path(path_id) else {
6788                continue;
6789            };
6790            let ip = path.network_path.remote.ip();
6791            let port = path.network_path.remote.port();
6792
6793            // We only close paths that aren't validated (thus are working) that we opened
6794            // in a previous round.
6795            // And we only close paths that we don't want to probe anyways.
6796            if !addresses_to_probe
6797                .iter()
6798                .any(|(_, probe)| *probe == (ip, port))
6799                && !path.validated
6800                && !self.abandoned_paths.contains(&path_id)
6801            {
6802                trace!(%path_id, "closing path from previous round");
6803                let _ =
6804                    self.close_path_inner(now, path_id, PathAbandonReason::NatTraversalRoundEnded);
6805            }
6806        }
6807
6808        let mut err = None;
6809
6810        let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6811        let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6812
6813        for (id, address) in addresses_to_probe {
6814            match self.open_nat_traversal_path(now, address) {
6815                Ok(None) => {}
6816                Ok(Some((path_id, remote))) => {
6817                    path_ids.push(path_id);
6818                    probed_addresses.push(remote);
6819                }
6820                Err(e) => {
6821                    self.n0_nat_traversal
6822                        .client_side_mut()
6823                        .expect("validated")
6824                        .report_in_continuation(id, e);
6825                    err.get_or_insert(e);
6826                }
6827            }
6828        }
6829
6830        if let Some(err) = err {
6831            // We failed to probe any addresses, bail out
6832            if probed_addresses.is_empty() {
6833                return Err(n0_nat_traversal::Error::Multipath(err));
6834            }
6835        }
6836
6837        self.n0_nat_traversal
6838            .client_side_mut()
6839            .expect("connection side validated")
6840            .set_round_path_ids(path_ids);
6841
6842        Ok(probed_addresses)
6843    }
6844
6845    /// Attempts to continue a nat traversal round by trying to open paths for pending client probes.
6846    ///
6847    /// If there was nothing to do, it returns `None`. Otherwise it returns whether the path was
6848    /// successfully open.
6849    fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6850        let ipv6 = self.is_ipv6();
6851        let client_state = self.n0_nat_traversal.client_side_mut().ok()?;
6852        let (id, address) = client_state.continue_nat_traversal_round(ipv6)?;
6853        let open_result = self.open_nat_traversal_path(now, address);
6854        let client_state = self.n0_nat_traversal.client_side_mut().expect("validated");
6855        match open_result {
6856            Ok(None) => Some(true),
6857            Ok(Some((path_id, _remote))) => {
6858                client_state.add_round_path_id(path_id);
6859                Some(true)
6860            }
6861            Err(e) => {
6862                client_state.report_in_continuation(id, e);
6863                Some(false)
6864            }
6865        }
6866    }
6867}
6868
6869impl fmt::Debug for Connection {
6870    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
6871        f.debug_struct("Connection")
6872            .field("handshake_cid", &self.handshake_cid)
6873            .finish()
6874    }
6875}
6876
6877/// Hints when the caller identifies a network change.
6878pub trait NetworkChangeHint: std::fmt::Debug + 'static {
6879    /// Inform the connection if a path may recover after a network change.
6880    ///
6881    /// After network changes, paths may not be recoverable. In this case, waiting for the path to
6882    /// become idle may take longer than what is desirable. If [`Self::is_path_recoverable`]
6883    /// returns `false`, a multipath-enabled, client-side connection will establish a new path to
6884    /// the same remote, closing the current one, instead of migrating the path.
6885    ///
6886    /// Paths that are deemed recoverable will simply be sent a PING for a liveness check.
6887    fn is_path_recoverable(&self, path_id: PathId, network_path: FourTuple) -> bool;
6888}
6889
6890/// Return value for [`Connection::poll_transmit_on_path`].
6891#[derive(Debug)]
6892enum PollPathStatus {
6893    /// Nothing to send on the path, nothing was written into the [`TransmitBuf`].
6894    NothingToSend {
6895        /// If true there was data to send but congestion control did not allow so.
6896        congestion_blocked: bool,
6897    },
6898    /// The transmit is ready to be sent.
6899    Send(Transmit),
6900}
6901
6902/// Return value for [`Connection::poll_transmit_path_space`].
6903#[derive(Debug)]
6904enum PollPathSpaceStatus {
6905    /// Nothing to send in the space, nothing was written into the [`TransmitBuf`].
6906    NothingToSend {
6907        /// If true there was data to send but congestion control did not allow so.
6908        congestion_blocked: bool,
6909    },
6910    /// One or more packets have been written into the [`TransmitBuf`].
6911    WrotePacket {
6912        /// The highest packet number.
6913        last_packet_number: u64,
6914        /// Whether to pad an already started datagram in the next packet.
6915        ///
6916        /// When packets in Initial, 0-RTT or Handshake packet do not fill the entire
6917        /// datagram they may decide to coalesce with the next packet from a higher
6918        /// encryption level on the same path. But the earlier packet may require specific
6919        /// size requirements for the datagram they are sent in.
6920        ///
6921        /// If a space did not complete the datagram, they use this to request the correct
6922        /// padding in the final packet of the datagram so that the final datagram will have
6923        /// the correct size.
6924        ///
6925        /// If a space did fill an entire datagram, it leaves this to the default of
6926        /// [`PadDatagram::No`].
6927        pad_datagram: PadDatagram,
6928    },
6929    /// Send the contents of the transmit immediately.
6930    ///
6931    /// Packets were written and the GSO batch must end now, regardless from whether higher
6932    /// spaces still have frames to write. This is used when the last datagram written would
6933    /// require too much padding to continue a GSO batch, which would waste space on the
6934    /// wire.
6935    Send {
6936        /// The highest packet number written into the transmit.
6937        last_packet_number: u64,
6938    },
6939}
6940
6941/// Information used to decide what frames to schedule into which packets.
6942///
6943/// Primarily used by [`Connection::poll_transmit_on_path`] and the functions that help
6944/// building packets for it: [`Connection::poll_transmit_path_space`] and
6945/// [`Connection::populate_packet`].
6946#[derive(Debug, Copy, Clone)]
6947struct PathSchedulingInfo {
6948    /// Whether the path is abandoned.
6949    ///
6950    /// Note that a path that is abandoned but still has CIDs can still send a packet. After
6951    /// sending that packet the CIDs issued by the remote have to be considered retired as
6952    /// well.
6953    is_abandoned: bool,
6954    /// Whether the path may send [`SpaceKind::Data`] frames.
6955    ///
6956    /// Some paths should only send frames from [`SendableFrames::space_specific`]. All other
6957    /// frames are essentially frames that can be sent on any [`SpaceKind::Data`] space. For
6958    /// those we want to respect packet scheduling rules however.
6959    ///
6960    /// Roughly speaking data frames are only sent on spaces that have CIDs, are not
6961    /// abandoned and have no *better* spaces. However see to comments where this is
6962    /// populated for the exact packet scheduling implementation.
6963    ///
6964    /// This essentially marks this paths as the best validated space ID. Except during
6965    /// the handshake in which case it does not need to be validated. Several paths could be
6966    /// equally good and all have this set to `true`, in that case packet scheduling can
6967    /// choose which path to use. Currently it chooses the lowest path that is not
6968    /// congestion blocked.
6969    ///
6970    /// Note that once in the closed or draining states this will never be true.
6971    may_send_data: bool,
6972    /// Whether the path may send a CONNECTION_CLOSE frame.
6973    ///
6974    /// This essentially marks this path as the best validated space ID with a fallback
6975    /// to unvalidated spaces if there are no validated spaces. Like for
6976    /// [`Self::may_send_data`] other paths could be equally good.
6977    may_send_close: bool,
6978    may_self_abandon: bool,
6979}
6980
6981#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6982enum PathBlocked {
6983    No,
6984    AntiAmplification,
6985    Congestion,
6986    Pacing,
6987}
6988
6989/// Fields of `Connection` specific to it being client-side or server-side
6990enum ConnectionSide {
6991    Client {
6992        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
6993        token: Bytes,
6994        token_store: Arc<dyn TokenStore>,
6995        server_name: String,
6996    },
6997    Server {
6998        server_config: Arc<ServerConfig>,
6999    },
7000}
7001
7002impl ConnectionSide {
7003    fn remote_may_migrate(&self, state: &State) -> bool {
7004        match self {
7005            Self::Server { server_config } => server_config.migration,
7006            Self::Client { .. } => {
7007                if let Some(hs) = state.as_handshake() {
7008                    hs.allow_server_migration
7009                } else {
7010                    false
7011                }
7012            }
7013        }
7014    }
7015
7016    fn is_client(&self) -> bool {
7017        self.side().is_client()
7018    }
7019
7020    fn is_server(&self) -> bool {
7021        self.side().is_server()
7022    }
7023
7024    fn side(&self) -> Side {
7025        match *self {
7026            Self::Client { .. } => Side::Client,
7027            Self::Server { .. } => Side::Server,
7028        }
7029    }
7030}
7031
7032impl From<SideArgs> for ConnectionSide {
7033    fn from(side: SideArgs) -> Self {
7034        match side {
7035            SideArgs::Client {
7036                token_store,
7037                server_name,
7038            } => Self::Client {
7039                token: token_store.take(&server_name).unwrap_or_default(),
7040                token_store,
7041                server_name,
7042            },
7043            SideArgs::Server {
7044                server_config,
7045                pref_addr_cid: _,
7046                path_validated: _,
7047            } => Self::Server { server_config },
7048        }
7049    }
7050}
7051
7052/// Parameters to `Connection::new` specific to it being client-side or server-side
7053pub(crate) enum SideArgs {
7054    Client {
7055        token_store: Arc<dyn TokenStore>,
7056        server_name: String,
7057    },
7058    Server {
7059        server_config: Arc<ServerConfig>,
7060        pref_addr_cid: Option<ConnectionId>,
7061        path_validated: bool,
7062    },
7063}
7064
7065impl SideArgs {
7066    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
7067        match *self {
7068            Self::Client { .. } => None,
7069            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
7070        }
7071    }
7072
7073    pub(crate) fn path_validated(&self) -> bool {
7074        match *self {
7075            Self::Client { .. } => true,
7076            Self::Server { path_validated, .. } => path_validated,
7077        }
7078    }
7079
7080    pub(crate) fn side(&self) -> Side {
7081        match *self {
7082            Self::Client { .. } => Side::Client,
7083            Self::Server { .. } => Side::Server,
7084        }
7085    }
7086}
7087
7088/// Reasons why a connection might be lost
7089#[derive(Debug, Error, Clone, PartialEq, Eq)]
7090pub enum ConnectionError {
7091    /// The peer doesn't implement any supported version
7092    #[error("peer doesn't implement any supported version")]
7093    VersionMismatch,
7094    /// The peer violated the QUIC specification as understood by this implementation
7095    #[error(transparent)]
7096    TransportError(#[from] TransportError),
7097    /// The peer's QUIC stack aborted the connection automatically
7098    #[error("aborted by peer: {0}")]
7099    ConnectionClosed(frame::ConnectionClose),
7100    /// The peer closed the connection
7101    #[error("closed by peer: {0}")]
7102    ApplicationClosed(frame::ApplicationClose),
7103    /// The peer is unable to continue processing this connection, usually due to having restarted
7104    #[error("reset by peer")]
7105    Reset,
7106    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
7107    ///
7108    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
7109    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
7110    /// and [`TransportConfig::keep_alive_interval()`].
7111    #[error("timed out")]
7112    TimedOut,
7113    /// The local application closed the connection
7114    #[error("closed")]
7115    LocallyClosed,
7116    /// The connection could not be created because not enough of the CID space is available
7117    ///
7118    /// Try using longer connection IDs.
7119    #[error("CIDs exhausted")]
7120    CidsExhausted,
7121}
7122
7123impl From<Close> for ConnectionError {
7124    fn from(x: Close) -> Self {
7125        match x {
7126            Close::Connection(reason) => Self::ConnectionClosed(reason),
7127            Close::Application(reason) => Self::ApplicationClosed(reason),
7128        }
7129    }
7130}
7131
7132// For compatibility with API consumers
7133impl From<ConnectionError> for io::Error {
7134    fn from(x: ConnectionError) -> Self {
7135        use ConnectionError::*;
7136        let kind = match x {
7137            TimedOut => io::ErrorKind::TimedOut,
7138            Reset => io::ErrorKind::ConnectionReset,
7139            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
7140            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
7141                io::ErrorKind::Other
7142            }
7143        };
7144        Self::new(kind, x)
7145    }
7146}
7147
7148/// Errors that might trigger a path being closed
7149// TODO(@divma): maybe needs to be reworked based on what we want to do with the public API
7150#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
7151pub enum PathError {
7152    /// The extension was not negotiated with the peer
7153    #[error("multipath extension not negotiated")]
7154    MultipathNotNegotiated,
7155    /// Paths can only be opened client-side
7156    #[error("the server side may not open a path")]
7157    ServerSideNotAllowed,
7158    /// Current limits do not allow us to open more paths
7159    #[error("maximum number of concurrent paths reached")]
7160    MaxPathIdReached,
7161    /// No remote CIDs available to open a new path
7162    #[error("remoted CIDs exhausted")]
7163    RemoteCidsExhausted,
7164    /// Path could not be validated and will be abandoned
7165    #[error("path validation failed")]
7166    ValidationFailed,
7167    /// The remote address for the path is not supported by the endpoint
7168    #[error("invalid remote address")]
7169    InvalidRemoteAddress(SocketAddr),
7170}
7171
7172/// Errors triggered when abandoning a path
7173#[derive(Debug, Error, Clone, Eq, PartialEq)]
7174pub enum ClosePathError {
7175    /// Multipath is not negotiated
7176    #[error("Multipath extension not negotiated")]
7177    MultipathNotNegotiated,
7178    /// The path is already closed or was never opened
7179    #[error("closed path")]
7180    ClosedPath,
7181    /// This is the last path, which can not be abandoned
7182    #[error("last open path")]
7183    LastOpenPath,
7184}
7185
7186/// Error when the multipath extension was not negotiated, but attempted to be used.
7187#[derive(Debug, Error, Clone, Copy)]
7188#[error("Multipath extension not negotiated")]
7189pub struct MultipathNotNegotiated {
7190    _private: (),
7191}
7192
7193/// Events of interest to the application
7194#[derive(Debug)]
7195pub enum Event {
7196    /// The connection's handshake data is ready
7197    HandshakeDataReady,
7198    /// The connection was successfully established
7199    Connected,
7200    /// The TLS handshake was confirmed
7201    HandshakeConfirmed,
7202    /// The connection was lost
7203    ///
7204    /// Emitted if the peer closes the connection or an error is encountered.
7205    ConnectionLost {
7206        /// Reason that the connection was closed
7207        reason: ConnectionError,
7208    },
7209    /// Stream events
7210    Stream(StreamEvent),
7211    /// One or more application datagrams have been received
7212    DatagramReceived,
7213    /// One or more application datagrams have been sent after blocking
7214    DatagramsUnblocked,
7215    /// (Multi)Path events
7216    Path(PathEvent),
7217    /// n0's nat traversal events
7218    NatTraversal(n0_nat_traversal::Event),
7219}
7220
7221impl From<PathEvent> for Event {
7222    fn from(source: PathEvent) -> Self {
7223        Self::Path(source)
7224    }
7225}
7226
7227fn get_max_ack_delay(params: &TransportParameters) -> Duration {
7228    Duration::from_micros(params.max_ack_delay.0 * 1000)
7229}
7230
7231// Prevents overflow and improves behavior in extreme circumstances
7232const MAX_BACKOFF_EXPONENT: u32 = 16;
7233
7234/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
7235///
7236/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
7237/// plus some space for frames. We only care about handshake headers because short header packets
7238/// necessarily have smaller headers, and initial packets are only ever the first packet in a
7239/// datagram (because we coalesce in ascending packet space order and the only reason to split a
7240/// packet is when packet space changes).
7241const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
7242
7243/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
7244///
7245/// Excludes packet-type-specific fields such as packet number or Initial token
7246// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
7247// scid len + scid + length + pn
7248const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
7249    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
7250
7251#[derive(Default)]
7252struct SentFrames {
7253    retransmits: ThinRetransmits,
7254    /// The packet number of the largest acknowledged packet for each path
7255    largest_acked: FxHashMap<PathId, u64>,
7256    stream_frames: StreamMetaVec,
7257    /// Whether the packet contains non-retransmittable frames (like datagrams)
7258    non_retransmits: bool,
7259    /// If the datagram containing these frames should be padded to the min MTU
7260    requires_padding: bool,
7261}
7262
7263impl SentFrames {
7264    /// Returns whether the packet contains only ACKs
7265    fn is_ack_only(&self, streams: &StreamsState) -> bool {
7266        !self.largest_acked.is_empty()
7267            && !self.non_retransmits
7268            && self.stream_frames.is_empty()
7269            && self.retransmits.is_empty(streams)
7270    }
7271
7272    fn retransmits_mut(&mut self) -> &mut Retransmits {
7273        self.retransmits.get_or_create()
7274    }
7275
7276    fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
7277        use frame::EncodableFrame::*;
7278        match frame {
7279            PathAck(path_ack_encoder) => {
7280                if let Some(max) = path_ack_encoder.ranges.max() {
7281                    self.largest_acked.insert(path_ack_encoder.path_id, max);
7282                }
7283            }
7284            Ack(ack_encoder) => {
7285                if let Some(max) = ack_encoder.ranges.max() {
7286                    self.largest_acked.insert(PathId::ZERO, max);
7287                }
7288            }
7289            Close(_) => { /* non retransmittable, but after this we don't really care */ }
7290            PathResponse(_) => self.non_retransmits = true,
7291            HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
7292            ReachOut(frame::ReachOut { round, ip, port }) => {
7293                let (recorded_round, reach_outs) = self
7294                    .retransmits_mut()
7295                    .reach_out
7296                    .get_or_insert_with(|| (round, FxHashSet::default()));
7297                // Only record reach outs for the current round or a newer than the recorded one.
7298                if *recorded_round == round {
7299                    // Same round, simply append.
7300                    reach_outs.insert((ip, port));
7301                } else if *recorded_round < round {
7302                    // New round.
7303                    *recorded_round = round;
7304                    reach_outs.drain();
7305                    reach_outs.insert((ip, port));
7306                } else {
7307                    // ignore old reach out that was sent
7308                }
7309            }
7310
7311            ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
7312            Ping(_) => self.non_retransmits = true,
7313            ImmediateAck(_) => self.non_retransmits = true,
7314            AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
7315            PathChallenge(_) => self.non_retransmits = true,
7316            Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
7317            PathAbandon(path_abandon) => {
7318                self.retransmits_mut()
7319                    .path_abandon
7320                    .entry(path_abandon.path_id)
7321                    .or_insert(path_abandon.error_code);
7322            }
7323            PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
7324            | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
7325                self.retransmits_mut().path_status.insert(path_id);
7326            }
7327            MaxPathId(_) => self.retransmits_mut().max_path_id = true,
7328            PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
7329            PathCidsBlocked(path_cids_blocked) => {
7330                self.retransmits_mut()
7331                    .path_cids_blocked
7332                    .insert(path_cids_blocked.path_id);
7333            }
7334            ResetStream(reset) => self
7335                .retransmits_mut()
7336                .reset_stream
7337                .push((reset.id, reset.error_code)),
7338            StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
7339            NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
7340            RetireConnectionId(retire_cid) => self
7341                .retransmits_mut()
7342                .retire_cids
7343                .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
7344            Datagram(_) => self.non_retransmits = true,
7345            NewToken(_) => {}
7346            AddAddress(add_address) => {
7347                self.retransmits_mut().add_address.insert(add_address);
7348            }
7349            RemoveAddress(remove_address) => {
7350                self.retransmits_mut().remove_address.insert(remove_address);
7351            }
7352            StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
7353            MaxData(_) => self.retransmits_mut().max_data = true,
7354            MaxStreamData(max) => {
7355                self.retransmits_mut().max_stream_data.insert(max.id);
7356            }
7357            MaxStreams(max_streams) => {
7358                self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
7359            }
7360        }
7361    }
7362}
7363
7364/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
7365///
7366/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
7367///
7368/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
7369///
7370/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
7371fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
7372    match (x, y) {
7373        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
7374        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
7375        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
7376        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
7377    }
7378}
7379
7380#[cfg(test)]
7381mod tests {
7382    use super::*;
7383
7384    #[test]
7385    fn negotiate_max_idle_timeout_commutative() {
7386        let test_params = [
7387            (None, None, None),
7388            (None, Some(VarInt(0)), None),
7389            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7390            (Some(VarInt(0)), Some(VarInt(0)), None),
7391            (
7392                Some(VarInt(2)),
7393                Some(VarInt(0)),
7394                Some(Duration::from_millis(2)),
7395            ),
7396            (
7397                Some(VarInt(1)),
7398                Some(VarInt(4)),
7399                Some(Duration::from_millis(1)),
7400            ),
7401        ];
7402
7403        for (left, right, result) in test_params {
7404            assert_eq!(negotiate_max_idle_timeout(left, right), result);
7405            assert_eq!(negotiate_max_idle_timeout(right, left), result);
7406        }
7407    }
7408}