noq_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque, btree_map},
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    num::{NonZeroU32, NonZeroUsize},
8    sync::Arc,
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13
14use rand::{Rng, SeedableRng, rngs::StdRng};
15use rustc_hash::{FxHashMap, FxHashSet};
16use thiserror::Error;
17use tracing::{debug, error, trace, trace_span, warn};
18
19use crate::{
20    Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
21    MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
22    TransportError, TransportErrorCode, VarInt,
23    cid_generator::ConnectionIdGenerator,
24    cid_queue::CidQueue,
25    config::{ServerConfig, TransportConfig},
26    congestion::Controller,
27    connection::{
28        qlog::{QlogRecvPacket, QlogSink},
29        spaces::LostPacket,
30        timer::{ConnTimer, PathTimer},
31    },
32    crypto::{self, Keys},
33    frame::{
34        self, Close, DataBlocked, Datagram, FrameStruct, NewToken, ObservedAddr, StreamDataBlocked,
35        StreamsBlocked,
36    },
37    n0_nat_traversal,
38    packet::{
39        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40        PacketNumber, PartialDecode, SpaceId,
41    },
42    range_set::ArrayRangeSet,
43    shared::{
44        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45        EndpointEvent, EndpointEventInner,
46    },
47    token::{ResetToken, Token, TokenPayload},
48    transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54mod assembler;
55pub use assembler::Chunk;
56
57mod cid_state;
58use cid_state::CidState;
59
60mod datagrams;
61use datagrams::DatagramState;
62pub use datagrams::{Datagrams, SendDatagramError};
63
64mod mtud;
65mod pacing;
66
67mod packet_builder;
68use packet_builder::{PacketBuilder, PadDatagram};
69
70mod packet_crypto;
71use packet_crypto::CryptoState;
72pub(crate) use packet_crypto::EncryptionLevel;
73
74mod paths;
75pub use paths::{
76    ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError,
77};
78use paths::{PathData, PathState};
79
80pub(crate) mod qlog;
81pub(crate) mod send_buffer;
82
83mod spaces;
84#[cfg(fuzzing)]
85pub use spaces::Retransmits;
86#[cfg(not(fuzzing))]
87use spaces::Retransmits;
88pub(crate) use spaces::SpaceKind;
89use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
90
91mod stats;
92pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
93
94mod streams;
95#[cfg(fuzzing)]
96pub use streams::StreamsState;
97#[cfg(not(fuzzing))]
98use streams::StreamsState;
99pub use streams::{
100    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
101    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
102};
103
104mod timer;
105use timer::{Timer, TimerTable};
106
107mod transmit_buf;
108use transmit_buf::TransmitBuf;
109
110mod state;
111
112#[cfg(not(fuzzing))]
113use state::State;
114#[cfg(fuzzing)]
115pub use state::State;
116use state::StateType;
117
118/// Protocol state and logic for a single QUIC connection
119///
120/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
121/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
122/// expects timeouts through various methods. A number of simple getter methods are exposed
123/// to allow callers to inspect some of the connection state.
124///
125/// `Connection` has roughly 4 types of methods:
126///
127/// - A. Simple getters, taking `&self`
128/// - B. Handlers for incoming events from the network or system, named `handle_*`.
129/// - C. State machine mutators, for incoming commands from the application. For convenience we
130///   refer to this as "performing I/O" below, however as per the design of this library none of the
131///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
132///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
133/// - D. Polling functions for outgoing events or actions for the caller to
134///   take, named `poll_*`.
135///
136/// The simplest way to use this API correctly is to call (B) and (C) whenever
137/// appropriate, then after each of those calls, as soon as feasible call all
138/// polling methods (D) and deal with their outputs appropriately, e.g. by
139/// passing it to the application or by making a system-level I/O call. You
140/// should call the polling functions in this order:
141///
142/// 1. [`poll_transmit`](Self::poll_transmit)
143/// 2. [`poll_timeout`](Self::poll_timeout)
144/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
145/// 4. [`poll`](Self::poll)
146///
147/// Currently the only actual dependency is from (2) to (1), however additional
148/// dependencies may be added in future, so the above order is recommended.
149///
150/// (A) may be called whenever desired.
151///
152/// Care should be made to ensure that the input events represent monotonically
153/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
154/// with events of the same [`Instant`] may be interleaved in any order with a
155/// call to [`handle_event`](Self::handle_event) at that same instant; however
156/// events or timeouts with different instants must not be interleaved.
157pub struct Connection {
158    endpoint_config: Arc<EndpointConfig>,
159    config: Arc<TransportConfig>,
160    rng: StdRng,
161    /// Consolidated cryptographic state
162    crypto_state: CryptoState,
163    /// The CID we initially chose, for use during the handshake
164    handshake_cid: ConnectionId,
165    /// The CID the peer initially chose, for use during the handshake
166    remote_handshake_cid: ConnectionId,
167    /// The [`PathData`] for each path
168    ///
169    /// This needs to be ordered because [`Connection::poll_transmit`] needs to
170    /// deterministically select the next PathId to send on.
171    // TODO(flub): well does it really? But deterministic is nice for now.
172    paths: BTreeMap<PathId, PathState>,
173    /// Counter to uniquely identify every [`PathData`] created in this connection.
174    ///
175    /// Each [`PathData`] gets a [`PathData::generation`] that is unique among all
176    /// [`PathData`]s created in the lifetime of this connection. This helps identify the
177    /// correct path when RFC9000-style migrations happen, even when they are
178    /// aborted.
179    ///
180    /// Multipath does not change this, each path can also undergo RFC9000-style
181    /// migrations. So a single multipath path ID could see several [`PathData`]s each with
182    /// their unique [`PathData::generation].
183    path_generation_counter: u64,
184    /// Whether MTU detection is supported in this environment
185    allow_mtud: bool,
186    state: State,
187    side: ConnectionSide,
188    /// Transport parameters set by the peer
189    peer_params: TransportParameters,
190    /// Source ConnectionId of the first packet received from the peer
191    original_remote_cid: ConnectionId,
192    /// Destination ConnectionId sent by the client on the first Initial
193    initial_dst_cid: ConnectionId,
194    /// The value that the server included in the Source Connection ID field of a Retry packet, if
195    /// one was received
196    retry_src_cid: Option<ConnectionId>,
197    /// Events returned by [`Connection::poll`]
198    events: VecDeque<Event>,
199    endpoint_events: VecDeque<EndpointEventInner>,
200    /// Whether the spin bit is in use for this connection
201    spin_enabled: bool,
202    /// Outgoing spin bit state
203    spin: bool,
204    /// Packet number spaces: initial, handshake, 1-RTT
205    spaces: [PacketSpace; 3],
206    /// Highest usable packet space.
207    highest_space: SpaceKind,
208    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
209    permit_idle_reset: bool,
210    /// Negotiated idle timeout
211    idle_timeout: Option<Duration>,
212    timers: TimerTable,
213    /// Number of packets received which could not be authenticated
214    authentication_failures: u64,
215
216    //
217    // Queued non-retransmittable 1-RTT data
218    //
219    /// If the CONNECTION_CLOSE frame needs to be sent
220    connection_close_pending: bool,
221
222    //
223    // ACK frequency
224    //
225    ack_frequency: AckFrequencyState,
226
227    //
228    // Congestion Control
229    //
230    /// Whether the most recently received packet had an ECN codepoint set
231    receiving_ecn: bool,
232    /// Number of packets authenticated
233    total_authed_packets: u64,
234    /// Whether the last `poll_transmit` call yielded no data because there was
235    /// no outgoing application data.
236    app_limited: bool,
237
238    //
239    // ObservedAddr
240    //
241    /// Sequence number for the next observed address frame sent to the peer.
242    next_observed_addr_seq_no: VarInt,
243
244    streams: StreamsState,
245    /// Surplus remote CIDs for future use on new paths
246    ///
247    /// These are given out before multiple paths exist, also for paths that will never
248    /// exist.  So if multipath is supported the number of paths here will be higher than
249    /// the actual number of paths in use.
250    remote_cids: FxHashMap<PathId, CidQueue>,
251    /// Attributes of CIDs generated by local endpoint
252    ///
253    /// Any path that is allowed to be opened is present in this map, as well as the already
254    /// opened paths. However since CIDs are issued async by the endpoint driver via
255    /// connection events it can not be used to know if CIDs have been issued for a path or
256    /// not. See [`Connection::max_path_id_with_cids`] for this.
257    local_cid_state: FxHashMap<PathId, CidState>,
258    /// State of the unreliable datagram extension
259    datagrams: DatagramState,
260    /// Connection level statistics
261    stats: ConnectionStats,
262    /// Path level statistics
263    path_stats: FxHashMap<PathId, PathStats>,
264    /// QUIC version used for the connection.
265    version: u32,
266
267    //
268    // Multipath
269    //
270    /// Maximum number of concurrent paths
271    ///
272    /// Initially set from the [`TransportConfig::max_concurrent_multipath_paths`]. Even
273    /// when multipath is disabled this will be set to 1, it is not used in that case
274    /// though.
275    max_concurrent_paths: NonZeroU32,
276    /// Local maximum [`PathId`] to be used
277    ///
278    /// This is initially set to [`TransportConfig::get_initial_max_path_id`] when multipath
279    /// is negotiated, or to [`PathId::ZERO`] otherwise. This is essentially the value of
280    /// the highest MAX_PATH_ID frame sent.
281    ///
282    /// Any path with an ID equal or below this [`PathId`] is either:
283    ///
284    /// - Abandoned, if it is also in [`Connection::abandoned_paths`].
285    /// - Open, in this case it is present in [`Connection::paths`]
286    /// - Not yet opened, if it is in neither of these two places.
287    ///
288    /// Note that for not-yet-open there may or may not be any CIDs issued. See
289    /// [`Connection::max_path_id_with_cids`].
290    local_max_path_id: PathId,
291    /// Remote's maximum [`PathId`] to be used
292    ///
293    /// This is initially set to the peer's [`TransportParameters::initial_max_path_id`] when
294    /// multipath is negotiated, or to [`PathId::ZERO`] otherwise. A peer may increase this limit
295    /// by sending [`Frame::MaxPathId`] frames.
296    remote_max_path_id: PathId,
297    /// The greatest [`PathId`] we have issued CIDs for
298    ///
299    /// CIDs are only issued for `min(local_max_path_id, remote_max_path_id)`. It is not
300    /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
301    /// since they are issued asynchronously by the endpoint driver.
302    max_path_id_with_cids: PathId,
303    /// The paths already abandoned
304    ///
305    /// They may still have some state left in [`Connection::paths`] or
306    /// [`Connection::local_cid_state`] since some of this has to be kept around for some
307    /// time after a path is abandoned.
308    // TODO(flub): Make this a more efficient data structure.  Like ranges of abandoned
309    //    paths.  Or a set together with a minimum.  Or something.
310    abandoned_paths: FxHashSet<PathId>,
311
312    /// State for n0's (<https://n0.computer>) nat traversal protocol.
313    n0_nat_traversal: n0_nat_traversal::State,
314    qlog: QlogSink,
315}
316
317impl Connection {
318    pub(crate) fn new(
319        endpoint_config: Arc<EndpointConfig>,
320        config: Arc<TransportConfig>,
321        init_cid: ConnectionId,
322        local_cid: ConnectionId,
323        remote_cid: ConnectionId,
324        network_path: FourTuple,
325        crypto: Box<dyn crypto::Session>,
326        cid_gen: &dyn ConnectionIdGenerator,
327        now: Instant,
328        version: u32,
329        allow_mtud: bool,
330        rng_seed: [u8; 32],
331        side_args: SideArgs,
332        qlog: QlogSink,
333    ) -> Self {
334        let pref_addr_cid = side_args.pref_addr_cid();
335        let path_validated = side_args.path_validated();
336        let connection_side = ConnectionSide::from(side_args);
337        let side = connection_side.side();
338        let mut rng = StdRng::from_seed(rng_seed);
339        let initial_space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
340        let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
341        #[cfg(test)]
342        let data_space = match config.deterministic_packet_numbers {
343            true => PacketSpace::new_deterministic(now, SpaceId::Data),
344            false => PacketSpace::new(now, SpaceId::Data, &mut rng),
345        };
346        #[cfg(not(test))]
347        let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
348        let state = State::handshake(state::Handshake {
349            remote_cid_set: side.is_server(),
350            expected_token: Bytes::new(),
351            client_hello: None,
352            allow_server_migration: side.is_client(),
353        });
354        let local_cid_state = FxHashMap::from_iter([(
355            PathId::ZERO,
356            CidState::new(
357                cid_gen.cid_len(),
358                cid_gen.cid_lifetime(),
359                now,
360                if pref_addr_cid.is_some() { 2 } else { 1 },
361            ),
362        )]);
363
364        let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
365        path.open_status = paths::OpenStatus::Informed;
366        let mut this = Self {
367            endpoint_config,
368            crypto_state: CryptoState::new(crypto, init_cid, side, &mut rng),
369            handshake_cid: local_cid,
370            remote_handshake_cid: remote_cid,
371            local_cid_state,
372            paths: BTreeMap::from_iter([(
373                PathId::ZERO,
374                PathState {
375                    data: path,
376                    prev: None,
377                },
378            )]),
379            path_generation_counter: 0,
380            allow_mtud,
381            state,
382            side: connection_side,
383            peer_params: TransportParameters::default(),
384            original_remote_cid: remote_cid,
385            initial_dst_cid: init_cid,
386            retry_src_cid: None,
387            events: VecDeque::new(),
388            endpoint_events: VecDeque::new(),
389            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
390            spin: false,
391            spaces: [initial_space, handshake_space, data_space],
392            highest_space: SpaceKind::Initial,
393            permit_idle_reset: true,
394            idle_timeout: match config.max_idle_timeout {
395                None | Some(VarInt(0)) => None,
396                Some(dur) => Some(Duration::from_millis(dur.0)),
397            },
398            timers: TimerTable::default(),
399            authentication_failures: 0,
400            connection_close_pending: false,
401
402            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
403                &TransportParameters::default(),
404            )),
405
406            app_limited: false,
407            receiving_ecn: false,
408            total_authed_packets: 0,
409
410            next_observed_addr_seq_no: 0u32.into(),
411
412            streams: StreamsState::new(
413                side,
414                config.max_concurrent_uni_streams,
415                config.max_concurrent_bidi_streams,
416                config.send_window,
417                config.receive_window,
418                config.stream_receive_window,
419            ),
420            datagrams: DatagramState::default(),
421            config,
422            remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]),
423            rng,
424            stats: ConnectionStats::default(),
425            path_stats: Default::default(),
426            version,
427
428            // peer params are not yet known, so multipath is not enabled
429            max_concurrent_paths: NonZeroU32::MIN,
430            local_max_path_id: PathId::ZERO,
431            remote_max_path_id: PathId::ZERO,
432            max_path_id_with_cids: PathId::ZERO,
433            abandoned_paths: Default::default(),
434
435            n0_nat_traversal: Default::default(),
436            qlog,
437        };
438        if path_validated {
439            this.on_path_validated(PathId::ZERO);
440        }
441        if side.is_client() {
442            // Kick off the connection
443            this.write_crypto();
444            this.init_0rtt(now);
445        }
446        this.qlog
447            .emit_tuple_assigned(PathId::ZERO, network_path, now);
448        this
449    }
450
451    /// Returns the next time at which `handle_timeout` should be called
452    ///
453    /// The value returned may change after:
454    /// - the application performed some I/O on the connection
455    /// - a call was made to `handle_event`
456    /// - a call to `poll_transmit` returned `Some`
457    /// - a call was made to `handle_timeout`
458    #[must_use]
459    pub fn poll_timeout(&mut self) -> Option<Instant> {
460        self.timers.peek()
461    }
462
463    /// Returns application-facing events
464    ///
465    /// Connections should be polled for events after:
466    /// - a call was made to `handle_event`
467    /// - a call was made to `handle_timeout`
468    #[must_use]
469    pub fn poll(&mut self) -> Option<Event> {
470        if let Some(x) = self.events.pop_front() {
471            return Some(x);
472        }
473
474        if let Some(event) = self.streams.poll() {
475            return Some(Event::Stream(event));
476        }
477
478        if let Some(reason) = self.state.take_error() {
479            return Some(Event::ConnectionLost { reason });
480        }
481
482        None
483    }
484
485    /// Return endpoint-facing events
486    #[must_use]
487    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
488        self.endpoint_events.pop_front().map(EndpointEvent)
489    }
490
491    /// Provide control over streams
492    #[must_use]
493    pub fn streams(&mut self) -> Streams<'_> {
494        Streams {
495            state: &mut self.streams,
496            conn_state: &self.state,
497        }
498    }
499
500    /// Provide control over streams
501    #[must_use]
502    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
503        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
504        RecvStream {
505            id,
506            state: &mut self.streams,
507            pending: &mut self.spaces[SpaceId::Data].pending,
508        }
509    }
510
511    /// Provide control over streams
512    #[must_use]
513    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
514        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
515        SendStream {
516            id,
517            state: &mut self.streams,
518            pending: &mut self.spaces[SpaceId::Data].pending,
519            conn_state: &self.state,
520        }
521    }
522
523    /// Opens a new path only if no path on the same network path currently exists.
524    ///
525    /// This comparison will use [`FourTuple::is_probably_same_path`] on the given `network_path`
526    /// and pass it existing path's network paths.
527    ///
528    /// This means that you can pass `local_ip: None` to make the comparison only compare
529    /// remote addresses.
530    ///
531    /// This avoids having to guess which local interface will be used to communicate with the
532    /// remote, should it not be known yet. We assume that if we already have a path to the remote,
533    /// the OS is likely to use the same interface to talk to said remote.
534    ///
535    /// See also [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id,
536    /// false)` if was opened.
537    ///
538    /// [`open_path`]: Connection::open_path
539    pub fn open_path_ensure(
540        &mut self,
541        network_path: FourTuple,
542        initial_status: PathStatus,
543        now: Instant,
544    ) -> Result<(PathId, bool), PathError> {
545        let existing_open_path = self.paths.iter().find(|(id, path)| {
546            network_path.is_probably_same_path(&path.data.network_path)
547                && !self.abandoned_paths.contains(*id)
548        });
549        match existing_open_path {
550            Some((path_id, _state)) => Ok((*path_id, true)),
551            None => Ok((self.open_path(network_path, initial_status, now)?, false)),
552        }
553    }
554
555    /// Opens a new path
556    ///
557    /// Further errors might occur and they will be emitted in [`PathEvent::Abandoned`] events with this path id.
558    /// When the path is opened it will be reported as an [`PathEvent::Opened`].
559    pub fn open_path(
560        &mut self,
561        network_path: FourTuple,
562        initial_status: PathStatus,
563        now: Instant,
564    ) -> Result<PathId, PathError> {
565        if !self.is_multipath_negotiated() {
566            return Err(PathError::MultipathNotNegotiated);
567        }
568        if self.side().is_server() {
569            return Err(PathError::ServerSideNotAllowed);
570        }
571
572        let max_abandoned = self.abandoned_paths.iter().max().copied();
573        let max_used = self.paths.keys().last().copied();
574        let path_id = max_abandoned
575            .max(max_used)
576            .unwrap_or(PathId::ZERO)
577            .saturating_add(1u8);
578
579        if Some(path_id) > self.max_path_id() {
580            return Err(PathError::MaxPathIdReached);
581        }
582        if path_id > self.remote_max_path_id {
583            self.spaces[SpaceId::Data].pending.paths_blocked = true;
584            return Err(PathError::MaxPathIdReached);
585        }
586        if self
587            .remote_cids
588            .get(&path_id)
589            .map(CidQueue::active)
590            .is_none()
591        {
592            self.spaces[SpaceId::Data]
593                .pending
594                .path_cids_blocked
595                .insert(path_id);
596            return Err(PathError::RemoteCidsExhausted);
597        }
598
599        let path = self.ensure_path(path_id, network_path, now, None);
600        path.status.local_update(initial_status);
601
602        Ok(path_id)
603    }
604
605    /// Closes a path and sends a PATH_ABANDON frame with the passed error code.
606    ///
607    /// This will not allow closing the last path. It does allow closing paths which have
608    /// not yet been opened, as e.g. is the case when receiving a PATH_ABANDON from the peer
609    /// for a path that was never opened locally.
610    pub fn close_path(
611        &mut self,
612        now: Instant,
613        path_id: PathId,
614        error_code: VarInt,
615    ) -> Result<(), ClosePathError> {
616        self.close_path_inner(
617            now,
618            path_id,
619            PathAbandonReason::ApplicationClosed { error_code },
620        )
621    }
622
623    /// Closes a path and sends a PATH_ABANDON frame.
624    ///
625    /// Other than [`Self::close_path`] this allows to specify the reason for the path being closed.
626    /// Internally, this should be used over [`Self::close_path`].
627    fn close_path_inner(
628        &mut self,
629        now: Instant,
630        path_id: PathId,
631        reason: PathAbandonReason,
632    ) -> Result<(), ClosePathError> {
633        if self.state.is_drained() {
634            return Ok(());
635        }
636
637        if !self.is_multipath_negotiated() {
638            return Err(ClosePathError::MultipathNotNegotiated);
639        }
640        if self.abandoned_paths.contains(&path_id)
641            || Some(path_id) > self.max_path_id()
642            || !self.paths.contains_key(&path_id)
643        {
644            return Err(ClosePathError::ClosedPath);
645        }
646
647        if reason.is_locally_initiated() {
648            let has_remaining_validated_paths = self.paths.iter().any(|(id, path)| {
649                *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
650            });
651            if !has_remaining_validated_paths {
652                return Err(ClosePathError::LastOpenPath);
653            }
654        } else {
655            // The remote abandoned this path. We should always "accept" this. Doing so right now,
656            // however, breaks assumptions throughout the code. We error instead, for the
657            // connection to be killed. See <https://github.com/n0-computer/noq/issues/397>
658            let has_remaining_paths = self
659                .paths
660                .keys()
661                .any(|id| *id != path_id && !self.abandoned_paths.contains(id));
662            if !has_remaining_paths {
663                return Err(ClosePathError::LastOpenPath);
664            }
665        }
666
667        // Send PATH_ABANDON
668        self.spaces[SpaceId::Data]
669            .pending
670            .path_abandon
671            .insert(path_id, reason.error_code());
672
673        // Remove pending NEW CIDs for this path
674        let pending_space = &mut self.spaces[SpaceId::Data].pending;
675        pending_space.new_cids.retain(|cid| cid.path_id != path_id);
676        pending_space.path_cids_blocked.retain(|&id| id != path_id);
677        pending_space.path_status.retain(|&id| id != path_id);
678
679        // Cleanup retransmits across ALL paths (CIDs for path_id may have been transmitted on other paths)
680        for space in self.spaces[SpaceId::Data].iter_paths_mut() {
681            for sent_packet in space.sent_packets.values_mut() {
682                if let Some(retransmits) = sent_packet.retransmits.get_mut() {
683                    retransmits.new_cids.retain(|cid| cid.path_id != path_id);
684                    retransmits.path_cids_blocked.retain(|&id| id != path_id);
685                    retransmits.path_status.retain(|&id| id != path_id);
686                }
687            }
688        }
689
690        // Consider remotely issued CIDs as retired.
691        // Technically we don't have to do this just yet.  We only need to do this *after*
692        // the ABANDON_PATH frame is sent, allowing us to still send it on the
693        // to-be-abandoned path.  However it is recommended to send it on another path, and
694        // we do not allow abandoning the last path anyway.
695        self.remote_cids.remove(&path_id);
696        debug_assert!(!self.state.is_drained()); // requirement for endpoint_events, checked above
697        self.endpoint_events
698            .push_back(EndpointEventInner::RetireResetToken(path_id));
699
700        trace!(%path_id, "abandoning path");
701        self.abandoned_paths.insert(path_id);
702
703        for timer in timer::PathTimer::VALUES {
704            // match for completeness
705            let keep_timer = match timer {
706                // These timers deal with sending and receiving PATH_CHALLENGE and
707                // PATH_RESPONSE, but now that the path is abandoned, we no longer care about
708                // these frames or their timing
709                PathTimer::PathValidation | PathTimer::PathChallengeLost | PathTimer::PathOpen => {
710                    false
711                }
712                // These timers deal with the lifetime of the path. Now that the path is abandoned,
713                // these are not relevant.
714                PathTimer::PathKeepAlive | PathTimer::PathIdle => false,
715                // The path has already been informed that outstanding acks should be sent
716                // immediately
717                PathTimer::MaxAckDelay => false,
718                // This timer should not be set, for completeness it's not kept as it's set when
719                // the PATH_ABANDON frame is sent.
720                PathTimer::DiscardPath => false,
721                // Sent packets still need to be identified as lost to trigger timely
722                // retransmission.
723                PathTimer::LossDetection => true,
724                // This path should not be used for sending after the PATH_ABANDON frame is sent.
725                // However, any outstanding data that should be sent before PATH_ABANDON, should
726                // still respect pacing.
727                PathTimer::Pacing => true,
728            };
729
730            if !keep_timer {
731                let qlog = self.qlog.with_time(now);
732                self.timers.stop(Timer::PerPath(path_id, timer), qlog);
733            }
734        }
735
736        // Emit event to the application.
737        self.events.push_back(Event::Path(PathEvent::Abandoned {
738            id: path_id,
739            reason,
740        }));
741
742        Ok(())
743    }
744
745    /// Gets the [`PathData`] for a known [`PathId`].
746    ///
747    /// Will panic if the path_id does not reference any known path.
748    #[track_caller]
749    fn path_data(&self, path_id: PathId) -> &PathData {
750        if let Some(data) = self.paths.get(&path_id) {
751            &data.data
752        } else {
753            panic!(
754                "unknown path: {path_id}, currently known paths: {:?}",
755                self.paths.keys().collect::<Vec<_>>()
756            );
757        }
758    }
759
760    /// Gets a reference to the [`PathData`] for a [`PathId`]
761    fn path(&self, path_id: PathId) -> Option<&PathData> {
762        self.paths.get(&path_id).map(|path_state| &path_state.data)
763    }
764
765    /// Gets a mutable reference to the [`PathData`] for a [`PathId`]
766    fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
767        self.paths
768            .get_mut(&path_id)
769            .map(|path_state| &mut path_state.data)
770    }
771
772    /// Returns all known paths.
773    ///
774    /// There is no guarantee any of these paths are open or usable.
775    pub fn paths(&self) -> Vec<PathId> {
776        self.paths.keys().copied().collect()
777    }
778
779    /// Gets the local [`PathStatus`] for a known [`PathId`]
780    pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
781        self.path(path_id)
782            .map(PathData::local_status)
783            .ok_or(ClosedPath { _private: () })
784    }
785
786    /// Returns the path's network path represented as a 4-tuple.
787    pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
788        self.path(path_id)
789            .map(|path| path.network_path)
790            .ok_or(ClosedPath { _private: () })
791    }
792
793    /// Sets the [`PathStatus`] for a known [`PathId`]
794    ///
795    /// Returns the previous path status on success.
796    pub fn set_path_status(
797        &mut self,
798        path_id: PathId,
799        status: PathStatus,
800    ) -> Result<PathStatus, SetPathStatusError> {
801        if !self.is_multipath_negotiated() {
802            return Err(SetPathStatusError::MultipathNotNegotiated);
803        }
804        let path = self
805            .path_mut(path_id)
806            .ok_or(SetPathStatusError::ClosedPath)?;
807        let prev = match path.status.local_update(status) {
808            Some(prev) => {
809                self.spaces[SpaceId::Data]
810                    .pending
811                    .path_status
812                    .insert(path_id);
813                prev
814            }
815            None => path.local_status(),
816        };
817        Ok(prev)
818    }
819
820    /// Returns the remote path status
821    // TODO(flub): Probably should also be some kind of path event?  Not even sure if I like
822    //    this as an API, but for now it allows me to write a test easily.
823    // TODO(flub): Technically this should be a Result<Option<PathSTatus>>?
824    pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
825        self.path(path_id).and_then(|path| path.remote_status())
826    }
827
828    /// Sets the max_idle_timeout for a specific path
829    ///
830    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
831    ///
832    /// Returns the previous value of the setting.
833    pub fn set_path_max_idle_timeout(
834        &mut self,
835        path_id: PathId,
836        timeout: Option<Duration>,
837    ) -> Result<Option<Duration>, ClosedPath> {
838        let path = self
839            .paths
840            .get_mut(&path_id)
841            .ok_or(ClosedPath { _private: () })?;
842        Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
843    }
844
845    /// Sets the keep_alive_interval for a specific path
846    ///
847    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
848    ///
849    /// Returns the previous value of the setting.
850    pub fn set_path_keep_alive_interval(
851        &mut self,
852        path_id: PathId,
853        interval: Option<Duration>,
854    ) -> Result<Option<Duration>, ClosedPath> {
855        let path = self
856            .paths
857            .get_mut(&path_id)
858            .ok_or(ClosedPath { _private: () })?;
859        Ok(std::mem::replace(&mut path.data.keep_alive, interval))
860    }
861
862    /// Gets the [`PathData`] for a known [`PathId`].
863    ///
864    /// Will panic if the path_id does not reference any known path.
865    #[track_caller]
866    fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
867        &mut self.paths.get_mut(&path_id).expect("known path").data
868    }
869
870    /// Find an open, validated path that's on the same network path as the given network path.
871    ///
872    /// Returns the first path matching, even if there's multiple.
873    fn find_validated_path_on_network_path(
874        &self,
875        network_path: FourTuple,
876    ) -> Option<(&PathId, &PathState)> {
877        self.paths.iter().find(|(path_id, path_state)| {
878            path_state.data.validated
879                // Would this use the same network path, if network_path were used to send right now?
880                && network_path.is_probably_same_path(&path_state.data.network_path)
881                && !self.abandoned_paths.contains(path_id)
882        })
883        // TODO(@divma): we might want to ensure the path has been recently active to consider the
884        // address validated
885        // matheus23: Perhaps looking at !self.abandoned_paths.contains(path_id) is enough, given keep-alives?
886    }
887
888    /// Creates the [`PathData`] for a new [`PathId`].
889    ///
890    /// Called for incoming packets as well as when opening a new path locally.
891    fn ensure_path(
892        &mut self,
893        path_id: PathId,
894        network_path: FourTuple,
895        now: Instant,
896        pn: Option<u64>,
897    ) -> &mut PathData {
898        let valid_path = self.find_validated_path_on_network_path(network_path);
899        let validated = valid_path.is_some();
900        let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
901        let vacant_entry = match self.paths.entry(path_id) {
902            btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
903            btree_map::Entry::Occupied(occupied_entry) => {
904                return &mut occupied_entry.into_mut().data;
905            }
906        };
907
908        debug!(%validated, %path_id, %network_path, "path added");
909        let peer_max_udp_payload_size =
910            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
911        self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
912        let mut data = PathData::new(
913            network_path,
914            self.allow_mtud,
915            Some(peer_max_udp_payload_size),
916            self.path_generation_counter,
917            now,
918            &self.config,
919        );
920
921        data.validated = validated;
922        if let Some(initial_rtt) = initial_rtt {
923            data.rtt.reset_initial_rtt(initial_rtt);
924        }
925
926        // To open a path locally we need to send a packet on the path. Sending a challenge
927        // guarantees this.
928        data.pending_on_path_challenge = true;
929
930        let path = vacant_entry.insert(PathState { data, prev: None });
931
932        let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
933        if let Some(pn) = pn {
934            pn_space.dedup.insert(pn);
935        }
936        self.spaces[SpaceId::Data]
937            .number_spaces
938            .insert(path_id, pn_space);
939        self.qlog.emit_tuple_assigned(path_id, network_path, now);
940
941        // If the remote opened this path we may not have CIDs for it. For locally opened
942        // paths the caller should have already made sure we have CIDs and refused to open
943        // it if there were none.
944        if !self.remote_cids.contains_key(&path_id) {
945            debug!(%path_id, "Remote opened path without issuing CIDs");
946            self.spaces[SpaceId::Data]
947                .pending
948                .path_cids_blocked
949                .insert(path_id);
950            // Do not abandon this path right away. CIDs might be in-flight still and arrive
951            // soon. It is up to the remote to handle this situation.
952        }
953
954        &mut path.data
955    }
956
957    /// Returns packets to transmit
958    ///
959    /// Connections should be polled for transmit after:
960    /// - the application performed some I/O on the connection
961    /// - a call was made to `handle_event`
962    /// - a call was made to `handle_timeout`
963    ///
964    /// `max_datagrams` specifies how many datagrams can be returned inside a
965    /// single Transmit using GSO. This must be at least 1.
966    #[must_use]
967    pub fn poll_transmit(
968        &mut self,
969        now: Instant,
970        max_datagrams: NonZeroUsize,
971        buf: &mut Vec<u8>,
972    ) -> Option<Transmit> {
973        let max_datagrams = match self.config.enable_segmentation_offload {
974            false => NonZeroUsize::MIN,
975            true => max_datagrams,
976        };
977
978        // Each call to poll_transmit can only send datagrams to one destination, because
979        // all datagrams in a GSO batch are for the same destination.  Therefore only
980        // datagrams for one destination address are produced for each poll_transmit call.
981
982        // Check whether we need to send a close message
983        let connection_close_pending = match self.state.as_type() {
984            StateType::Drained => {
985                self.app_limited = true;
986                return None;
987            }
988            StateType::Draining | StateType::Closed => {
989                // self.connection_close_pending is only reset once the associated packet
990                // had been encoded successfully
991                if !self.connection_close_pending {
992                    self.app_limited = true;
993                    return None;
994                }
995                true
996            }
997            _ => false,
998        };
999
1000        // Schedule an ACK_FREQUENCY frame if a new one needs to be sent.
1001        if let Some(config) = &self.config.ack_frequency_config {
1002            let rtt = self
1003                .paths
1004                .values()
1005                .map(|p| p.data.rtt.get())
1006                .min()
1007                .expect("one path exists");
1008            self.spaces[SpaceId::Data].pending.ack_frequency = self
1009                .ack_frequency
1010                .should_send_ack_frequency(rtt, config, &self.peer_params)
1011                && self.highest_space == SpaceKind::Data
1012                && self.peer_supports_ack_frequency();
1013        }
1014
1015        // Build up some packet scheduling information about all paths.
1016        let scheduling_info: BTreeMap<PathId, PathSchedulingInfo> = {
1017            let have_validated_status_available_space = self.paths.iter().any(|(path_id, path)| {
1018                self.remote_cids.contains_key(path_id)
1019                    && !self.abandoned_paths.contains(path_id)
1020                    && path.data.validated
1021                    && path.data.local_status() == PathStatus::Available
1022            });
1023            let is_handshaking = self.is_handshaking();
1024            tracing::warn!(?is_handshaking);
1025            self.paths
1026                .iter()
1027                .map(|(path_id, path)| {
1028                    let has_cids = self.remote_cids.contains_key(path_id);
1029                    let validated = path.data.validated;
1030                    let abandoned = self.abandoned_paths.contains(path_id);
1031                    let status = path.data.local_status();
1032
1033                    // This is the core packet scheduling, whether this space ID may send
1034                    // SpaceKind::Data frames.
1035                    let may_send_data = has_cids
1036                        && !abandoned
1037                        && if is_handshaking {
1038                            // There is only one path during the handshake. We want to
1039                            // already send 0-RTT and 0.5-RTT (permitting anti-amplification
1040                            // limit) data.
1041                            true
1042                        } else if !validated {
1043                            // TODO(flub): When we have a network change we might end up
1044                            //    having to abandon all paths and re-open new ones to the
1045                            //    same remotes. This leaves us without any validated
1046                            //    path. Perhaps we should have a way to figure out if the
1047                            //    path is to a previously-validated remote address and allow
1048                            //    sending data to such remotes immediately.
1049                            false
1050                        } else {
1051                            match status {
1052                                PathStatus::Available => {
1053                                    // Best possible space to send data on.
1054                                    true
1055                                }
1056                                PathStatus::Backup => {
1057                                    // If there is an status-available path we prefer that.
1058                                    !have_validated_status_available_space
1059                                }
1060                            }
1061                        };
1062                    // CONNECTION_CLOSE is allowed to be sent on an non-validated
1063                    // path. Particularly during the handshake we want to send it before the
1064                    // path is validated. Later if there is no validated path available we
1065                    // will also accept sending it on an un-validated path.
1066                    let may_send_close = has_cids
1067                        && !abandoned
1068                        && if !validated && have_validated_status_available_space {
1069                            // We have a better space to send on.
1070                            false
1071                        } else {
1072                            // No other validated space, this is as good as it gets.
1073                            true
1074                        };
1075                    (
1076                        *path_id,
1077                        PathSchedulingInfo {
1078                            abandoned,
1079                            may_send_data,
1080                            may_send_close,
1081                        },
1082                    )
1083                })
1084                .collect()
1085        };
1086
1087        // TODO: how to avoid the allocation? Cannot use a for loop because of
1088        // borrowing. Maybe SmallVec or similar.
1089        let path_ids: Vec<_> = self.paths.keys().copied().collect();
1090
1091        // If we end up not sending anything, we need to know if that was because there was
1092        // nothing to send or because we were congestion blocked.
1093        let mut congestion_blocked = false;
1094
1095        for (&path_id, info) in scheduling_info.iter() {
1096            if !connection_close_pending
1097                && let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id)
1098            {
1099                return Some(transmit);
1100            }
1101
1102            // Poll for on-path transmits.
1103            match self.poll_transmit_on_path(
1104                now,
1105                buf,
1106                path_id,
1107                max_datagrams,
1108                info,
1109                connection_close_pending,
1110            ) {
1111                PollPathStatus::Send(transmit) => {
1112                    return Some(transmit);
1113                }
1114                PollPathStatus::NothingToSend {
1115                    congestion_blocked: cb,
1116                } => {
1117                    congestion_blocked |= cb;
1118                    // Continue checking other paths, tail-loss probes may need to be sent
1119                    // in all spaces.
1120                    debug_assert!(
1121                        buf.is_empty(),
1122                        "nothing to send on path but buffer not empty"
1123                    );
1124                }
1125            }
1126        }
1127
1128        // We didn't produce any application data packet
1129        debug_assert!(
1130            buf.is_empty(),
1131            "there was data in the buffer, but it was not sent"
1132        );
1133
1134        self.app_limited = !congestion_blocked;
1135
1136        if self.state.is_established() {
1137            // Try MTU probing now
1138            for path_id in path_ids {
1139                if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) {
1140                    return Some(transmit);
1141                }
1142            }
1143        }
1144
1145        None
1146    }
1147
1148    fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit {
1149        debug_assert!(
1150            !transmit.is_empty(),
1151            "must not be called with an empty transmit buffer"
1152        );
1153
1154        let network_path = self.path_data(path_id).network_path;
1155        trace!(
1156            segment_size = transmit.segment_size(),
1157            last_datagram_len = transmit.len() % transmit.segment_size(),
1158            %network_path,
1159            "sending {} bytes in {} datagrams",
1160            transmit.len(),
1161            transmit.num_datagrams()
1162        );
1163        self.path_data_mut(path_id)
1164            .inc_total_sent(transmit.len() as u64);
1165
1166        self.stats
1167            .udp_tx
1168            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1169        self.path_stats
1170            .entry(path_id)
1171            .or_default()
1172            .udp_tx
1173            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1174
1175        Transmit {
1176            destination: network_path.remote,
1177            size: transmit.len(),
1178            ecn: if self.path_data(path_id).sending_ecn {
1179                Some(EcnCodepoint::Ect0)
1180            } else {
1181                None
1182            },
1183            segment_size: match transmit.num_datagrams() {
1184                1 => None,
1185                _ => Some(transmit.segment_size()),
1186            },
1187            src_ip: network_path.local_ip,
1188        }
1189    }
1190
1191    /// poll_transmit logic for off-path data.
1192    fn poll_transmit_off_path(
1193        &mut self,
1194        now: Instant,
1195        buf: &mut Vec<u8>,
1196        path_id: PathId,
1197    ) -> Option<Transmit> {
1198        if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) {
1199            return Some(challenge);
1200        }
1201        if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
1202            return Some(response);
1203        }
1204        if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
1205            return Some(challenge);
1206        }
1207        None
1208    }
1209
1210    /// poll_transmit logic for on-path data.
1211    ///
1212    /// This is not quite the same as for a multipath packet space, since [`PathId::ZERO`]
1213    /// has 3 packet spaces, which this handles.
1214    ///
1215    /// See [`Self::poll_transmit_off_path`] for off-path data.
1216    #[must_use]
1217    fn poll_transmit_on_path(
1218        &mut self,
1219        now: Instant,
1220        buf: &mut Vec<u8>,
1221        path_id: PathId,
1222        max_datagrams: NonZeroUsize,
1223        scheduling_info: &PathSchedulingInfo,
1224        connection_close_pending: bool,
1225    ) -> PollPathStatus {
1226        // Check if there is at least one active CID to use for sending
1227        let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else {
1228            if !self.abandoned_paths.contains(&path_id) {
1229                debug!(%path_id, "no remote CIDs for path");
1230            }
1231            return PollPathStatus::NothingToSend {
1232                congestion_blocked: false,
1233            };
1234        };
1235
1236        // Whether the last packet in the datagram must be padded so the datagram takes up
1237        // an exact size. An earlier space can decide to not fill an entire datagram and
1238        // require the next space to fill it further. But may need a specific size of the
1239        // datagram containing the packet. The final packet built in the datagram must pad
1240        // to this size.
1241        let mut pad_datagram = PadDatagram::No;
1242
1243        // The packet number of the last built packet. This is kept kept across spaces.
1244        // QUIC is supposed to have a single congestion controller for the Initial,
1245        // Handshake and Data(PathId::ZERO) spaces.
1246        let mut last_packet_number = None;
1247
1248        // If we end up not sending anything, we need to know if that was because there was
1249        // nothing to send or because we were congestion blocked.
1250        let mut congestion_blocked = false;
1251
1252        // Set the segment size to this path's MTU for on-path data.
1253        let pmtu = self.path_data(path_id).current_mtu().into();
1254        let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu);
1255
1256        // Iterate over the available spaces.
1257        for space_id in SpaceId::iter() {
1258            // Only PathId::ZERO uses non Data space ids.
1259            if path_id != PathId::ZERO && space_id != SpaceId::Data {
1260                continue;
1261            }
1262            match self.poll_transmit_path_space(
1263                now,
1264                &mut transmit,
1265                path_id,
1266                space_id,
1267                remote_cid,
1268                scheduling_info,
1269                connection_close_pending,
1270                pad_datagram,
1271            ) {
1272                PollPathSpaceStatus::NothingToSend {
1273                    congestion_blocked: cb,
1274                } => {
1275                    congestion_blocked |= cb;
1276                    // Continue checking other spaces, tail-loss probes may need to be sent
1277                    // in all spaces.
1278                }
1279                PollPathSpaceStatus::WrotePacket {
1280                    last_packet_number: pn,
1281                    pad_datagram: pad,
1282                } => {
1283                    debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1284                    last_packet_number = Some(pn);
1285                    pad_datagram = pad;
1286                    // Always check higher spaces. If the transmit is full or they have
1287                    // nothing to send they will not write packets. But if they can, they
1288                    // must always be allowed to add to this transmit because coalescing may
1289                    // be required.
1290                    continue;
1291                }
1292                PollPathSpaceStatus::Send {
1293                    last_packet_number: pn,
1294                } => {
1295                    debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1296                    last_packet_number = Some(pn);
1297                    break;
1298                }
1299            }
1300        }
1301
1302        if last_packet_number.is_some() || congestion_blocked {
1303            self.qlog.emit_recovery_metrics(
1304                path_id,
1305                &mut self.paths.get_mut(&path_id).unwrap().data,
1306                now,
1307            );
1308        }
1309
1310        match last_packet_number {
1311            Some(last_packet_number) => {
1312                // Note that when sending in multiple spaces the last packet number will be
1313                // the one from the highest space.
1314                self.path_data_mut(path_id).congestion.on_sent(
1315                    now,
1316                    transmit.len() as u64,
1317                    last_packet_number,
1318                );
1319                PollPathStatus::Send(self.build_transmit(path_id, transmit))
1320            }
1321            None => PollPathStatus::NothingToSend { congestion_blocked },
1322        }
1323    }
1324
1325    /// poll_transmit logic for a QUIC-MULTIPATH packet number space (PathID + SpaceId).
1326    #[must_use]
1327    fn poll_transmit_path_space(
1328        &mut self,
1329        now: Instant,
1330        transmit: &mut TransmitBuf<'_>,
1331        path_id: PathId,
1332        space_id: SpaceId,
1333        remote_cid: ConnectionId,
1334        scheduling_info: &PathSchedulingInfo,
1335        // If we need to send a CONNECTION_CLOSE frame.
1336        connection_close_pending: bool,
1337        // Whether the current datagram needs to be padded to a certain size.
1338        mut pad_datagram: PadDatagram,
1339    ) -> PollPathSpaceStatus {
1340        // Keep track of the last packet number we wrote. If None we did not write any
1341        // packets.
1342        let mut last_packet_number = None;
1343
1344        // Each loop of this may build one packet. It works logically as follows:
1345        //
1346        // - Check if something *needs* to be sent in this space and *can* be sent.
1347        //   - If not, return to the caller who will call us again for the next space.
1348        // - Start a new datagram.
1349        //   - Unless coalescing the packet into an existing datagram.
1350        // - Write the packet header and payload.
1351        // - Check if coalescing a next packet into the datagram is possible.
1352        // - If coalescing, finish packet without padding to leave space in the datagram.
1353        // - If not coalescing, complete the datagram:
1354        //   - Finish packet with padding.
1355        //   - Set the transmit segment size if this is the first datagram.
1356        // - Loop: next iteration will exit the loop if nothing more to send in this
1357        //   space. The TransmitBuf will contain a started datagram with space if
1358        //   coalescing, or completely filled datagram if not coalescing.
1359        loop {
1360            // Determine if anything can be sent in this packet number space.
1361            let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1362                // A datagram is started already, we are coalescing another packet into it.
1363                transmit.datagram_remaining_mut()
1364            } else {
1365                // A new datagram needs to be started.
1366                transmit.segment_size()
1367            };
1368            let can_send =
1369                self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending);
1370            let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1371            let space_will_send = {
1372                if scheduling_info.abandoned {
1373                    // Currently we don't send on an abandoned path, PATH_ABANDON is always
1374                    // sent on a different path.
1375                    false
1376                } else if can_send.close && scheduling_info.may_send_close {
1377                    // This is the best path to send a CONNECTION_CLOSE on.
1378                    true
1379                } else if needs_loss_probe {
1380                    // We always send if a loss probe if the path is not abandoned.
1381                    true
1382                } else if can_send.space_id_only {
1383                    // We always send space-specific frames if not abandoned.
1384                    true
1385                } else {
1386                    // Anything else we only send if we're the best path for SpaceKind::Data
1387                    // frames.
1388                    !can_send.is_empty() && scheduling_info.may_send_data
1389                }
1390            };
1391            tracing::warn!(?can_send, ?scheduling_info, ?space_will_send, "checking");
1392
1393            if !space_will_send {
1394                // Nothing more to send. Previous iterations of this loop may have built
1395                // packets already.
1396                return match last_packet_number {
1397                    Some(pn) => PollPathSpaceStatus::WrotePacket {
1398                        last_packet_number: pn,
1399                        pad_datagram,
1400                    },
1401                    None => {
1402                        // Only log for spaces which have crypto.
1403                        if self.crypto_state.has_keys(space_id.encryption_level())
1404                            || (space_id == SpaceId::Data
1405                                && self.crypto_state.has_keys(EncryptionLevel::ZeroRtt))
1406                        {
1407                            trace!(?space_id, %path_id, "nothing to send in space");
1408                        }
1409                        PollPathSpaceStatus::NothingToSend {
1410                            congestion_blocked: false,
1411                        }
1412                    }
1413                };
1414            }
1415
1416            // We want to send on this space, check congestion control if we can. But only
1417            // if we will need to start a new datagram. If we are coalescing into an already
1418            // started datagram we do not need to check congestion control again.
1419            if transmit.datagram_remaining_mut() == 0 {
1420                let congestion_blocked =
1421                    self.path_congestion_check(space_id, path_id, transmit, &can_send, now);
1422                if congestion_blocked != PathBlocked::No {
1423                    // Previous iterations of this loop may have built packets already.
1424                    return match last_packet_number {
1425                        Some(pn) => PollPathSpaceStatus::WrotePacket {
1426                            last_packet_number: pn,
1427                            pad_datagram,
1428                        },
1429                        None => {
1430                            return PollPathSpaceStatus::NothingToSend {
1431                                congestion_blocked: true,
1432                            };
1433                        }
1434                    };
1435                }
1436            }
1437
1438            // If the datagram is full (or there never was one started), we need to start a
1439            // new one.
1440            if transmit.datagram_remaining_mut() == 0 {
1441                if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1442                    // No more datagrams allowed.
1443                    // Previous iterations of this loop may have built packets already.
1444                    return match last_packet_number {
1445                        Some(pn) => PollPathSpaceStatus::WrotePacket {
1446                            last_packet_number: pn,
1447                            pad_datagram,
1448                        },
1449                        None => {
1450                            return PollPathSpaceStatus::NothingToSend {
1451                                congestion_blocked: false,
1452                            };
1453                        }
1454                    };
1455                }
1456
1457                match self.spaces[space_id].for_path(path_id).loss_probes {
1458                    0 => transmit.start_new_datagram(),
1459                    _ => {
1460                        // We need something to send for a tail-loss probe.
1461                        let request_immediate_ack =
1462                            space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1463                        // TODO(flub): this is really scheduling logic hiding here.
1464                        self.spaces[space_id].maybe_queue_probe(
1465                            path_id,
1466                            request_immediate_ack,
1467                            &self.streams,
1468                        );
1469
1470                        self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1471
1472                        // Clamp the datagram to at most the minimum MTU to ensure that loss
1473                        // probes can get through and enable recovery even if the path MTU
1474                        // has shrank unexpectedly.
1475                        transmit.start_new_datagram_with_size(std::cmp::min(
1476                            usize::from(INITIAL_MTU),
1477                            transmit.segment_size(),
1478                        ));
1479                    }
1480                }
1481                trace!(count = transmit.num_datagrams(), "new datagram started");
1482
1483                // We started a new datagram, we decide later if it needs padding.
1484                pad_datagram = PadDatagram::No;
1485            }
1486
1487            // If coalescing another packet into the existing datagram, there should
1488            // still be enough space for a whole packet.
1489            if transmit.datagram_start_offset() < transmit.len() {
1490                debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1491            }
1492
1493            //
1494            // From here on, we've determined that a packet will definitely be sent.
1495            //
1496
1497            if self.crypto_state.has_keys(EncryptionLevel::Initial)
1498                && space_id == SpaceId::Handshake
1499                && self.side.is_client()
1500            {
1501                // A client stops both sending and processing Initial packets when it
1502                // sends its first Handshake packet.
1503                self.discard_space(now, SpaceKind::Initial);
1504            }
1505            if let Some(ref mut prev) = self.crypto_state.prev_crypto {
1506                prev.update_unacked = false;
1507            }
1508
1509            let Some(mut builder) = PacketBuilder::new(
1510                now,
1511                space_id,
1512                path_id,
1513                remote_cid,
1514                transmit,
1515                can_send.is_ack_eliciting(),
1516                self,
1517            ) else {
1518                // Confidentiality limit is exceeded and the connection has been killed. We
1519                // should not send any other packets. This works in a roundabout way: We
1520                // have started a datagram but not written anything into it. So even if we
1521                // get called again for another space we will see an already started
1522                // datagram and try and start another packet here. Then be stopped by the
1523                // same confidentiality limit.
1524                return PollPathSpaceStatus::NothingToSend {
1525                    congestion_blocked: false,
1526                };
1527            };
1528            last_packet_number = Some(builder.packet_number);
1529
1530            if space_id == SpaceId::Initial
1531                && (self.side.is_client() || can_send.is_ack_eliciting())
1532            {
1533                // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1
1534                pad_datagram |= PadDatagram::ToMinMtu;
1535            }
1536            if space_id == SpaceId::Data && self.config.pad_to_mtu {
1537                pad_datagram |= PadDatagram::ToSegmentSize;
1538            }
1539
1540            if can_send.close {
1541                trace!("sending CONNECTION_CLOSE");
1542                // Encode ACKs before the ConnectionClose message, to give the receiver
1543                // a better approximate on what data has been processed. This is
1544                // especially important with ack delay, since the peer might not
1545                // have gotten any other ACK for the data earlier on.
1546                let is_multipath_negotiated = self.is_multipath_negotiated();
1547                for path_id in self.spaces[space_id]
1548                    .number_spaces
1549                    .iter()
1550                    .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1551                    .map(|(&path_id, _)| path_id)
1552                    .collect::<Vec<_>>()
1553                {
1554                    Self::populate_acks(
1555                        now,
1556                        self.receiving_ecn,
1557                        path_id,
1558                        space_id,
1559                        &mut self.spaces[space_id],
1560                        is_multipath_negotiated,
1561                        &mut builder,
1562                        &mut self.stats.frame_tx,
1563                        self.crypto_state.has_keys(space_id.encryption_level()),
1564                    );
1565                }
1566
1567                // Since there only 64 ACK frames there will always be enough space
1568                // to encode the ConnectionClose frame too. However we still have the
1569                // check here to prevent crashes if something changes.
1570
1571                // TODO(flub): This needs fixing for multipath, to ensure we can always
1572                //    write the CONNECTION_CLOSE even if we have many PATH_ACKs to send:
1573                //    https://github.com/n0-computer/noq/issues/367.
1574                debug_assert!(
1575                    builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1576                    "ACKs should leave space for ConnectionClose"
1577                );
1578                let stats = &mut self.stats.frame_tx;
1579                if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1580                    let max_frame_size = builder.frame_space_remaining();
1581                    let close: Close = match self.state.as_type() {
1582                        StateType::Closed => {
1583                            let reason: Close =
1584                                self.state.as_closed().expect("checked").clone().into();
1585                            if space_id == SpaceId::Data || reason.is_transport_layer() {
1586                                reason
1587                            } else {
1588                                TransportError::APPLICATION_ERROR("").into()
1589                            }
1590                        }
1591                        StateType::Draining => TransportError::NO_ERROR("").into(),
1592                        _ => unreachable!(
1593                            "tried to make a close packet when the connection wasn't closed"
1594                        ),
1595                    };
1596                    builder.write_frame(close.encoder(max_frame_size), stats);
1597                }
1598                let last_pn = builder.packet_number;
1599                builder.finish_and_track(now, self, path_id, pad_datagram);
1600                if space_id.kind() == self.highest_space {
1601                    // Don't send another close packet. Even with multipath we only send
1602                    // CONNECTION_CLOSE on a single path since we expect our paths to work.
1603                    self.connection_close_pending = false;
1604                }
1605                // Send a close frame in every possible space for robustness, per
1606                // RFC9000 "Immediate Close during the Handshake". Don't bother trying
1607                // to send anything else.
1608                // TODO(flub): This breaks during the handshake if we can not coalesce
1609                //    packets due to space reasons: the next space would either fail a
1610                //    debug_assert checking for enough packet space or produce an invalid
1611                //    packet. We need to keep track of per-space pending CONNECTION_CLOSE to
1612                //    be able to send these across multiple calls to poll_transmit. Then
1613                //    check for coalescing space here because initial packets need to be in
1614                //    padded datagrams. And also add space checks for CONNECTION_CLOSE in
1615                //    space_can_send so it would stop a GSO batch if the datagram is too
1616                //    small for another CONNECTION_CLOSE packet.
1617                return PollPathSpaceStatus::WrotePacket {
1618                    last_packet_number: last_pn,
1619                    pad_datagram,
1620                };
1621            }
1622
1623            self.populate_packet(now, space_id, path_id, scheduling_info, &mut builder);
1624
1625            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
1626            // any other reason, there is a bug which leads to one component announcing write
1627            // readiness while not writing any data. This degrades performance. The condition is
1628            // only checked if the full MTU is available and when potentially large fixed-size
1629            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
1630            // writing ACKs.
1631            debug_assert!(
1632                !(builder.sent_frames().is_ack_only(&self.streams)
1633                    && !can_send.acks
1634                    && (can_send.other || can_send.space_id_only)
1635                    && builder.buf.segment_size()
1636                        == self.path_data(path_id).current_mtu() as usize
1637                    && self.datagrams.outgoing.is_empty()),
1638                "SendableFrames was {can_send:?}, but only ACKs have been written"
1639            );
1640            if builder.sent_frames().requires_padding {
1641                pad_datagram |= PadDatagram::ToMinMtu;
1642            }
1643
1644            for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1645                self.spaces[space_id]
1646                    .for_path(*path_id)
1647                    .pending_acks
1648                    .acks_sent();
1649                self.timers.stop(
1650                    Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1651                    self.qlog.with_time(now),
1652                );
1653            }
1654
1655            // Now we need to finish the packet.  Before we do so we need to know if we will
1656            // be coalescing the next packet into this one, or will be ending the datagram
1657            // as well.  Because if this is the last packet in the datagram more padding
1658            // might be needed because of the packet type, or to fill the GSO segment size.
1659
1660            // Are we allowed to coalesce AND is there enough space for another *packet* in
1661            // this datagram AND will we definitely send another packet?
1662            if builder.can_coalesce && path_id == PathId::ZERO && {
1663                let max_packet_size = builder
1664                    .buf
1665                    .datagram_remaining_mut()
1666                    .saturating_sub(builder.predict_packet_end());
1667                max_packet_size > MIN_PACKET_SPACE
1668                    && self.has_pending_packet(space_id, max_packet_size, connection_close_pending)
1669            } {
1670                // We can append/coalesce the next packet into the current
1671                // datagram. Finish the current packet without adding extra padding.
1672                trace!("will coalesce with next packet");
1673                builder.finish_and_track(now, self, path_id, PadDatagram::No);
1674            } else {
1675                // We need a new datagram for the next packet.  Finish the current
1676                // packet with padding.
1677                // TODO(flub): if there isn't any more data to be sent, this will still pad
1678                //    to the segment size and only discover there is nothing to send before
1679                //    starting the next packet. That is wasting up to 32 bytes.
1680                if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1681                    // If too many padding bytes would be required to continue the
1682                    // GSO batch after this packet, end the GSO batch here. Ensures
1683                    // that fixed-size frames with heterogeneous sizes
1684                    // (e.g. application datagrams) won't inadvertently waste large
1685                    // amounts of bandwidth. The exact threshold is a bit arbitrary
1686                    // and might benefit from further tuning, though there's no
1687                    // universally optimal value.
1688                    const MAX_PADDING: usize = 32;
1689                    if builder.buf.datagram_remaining_mut()
1690                        > builder.predict_packet_end() + MAX_PADDING
1691                    {
1692                        trace!(
1693                            "GSO truncated by demand for {} padding bytes",
1694                            builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1695                        );
1696                        let last_pn = builder.packet_number;
1697                        builder.finish_and_track(now, self, path_id, PadDatagram::No);
1698                        return PollPathSpaceStatus::Send {
1699                            last_packet_number: last_pn,
1700                        };
1701                    }
1702
1703                    // Pad the current datagram to GSO segment size so it can be
1704                    // included in the GSO batch.
1705                    builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1706                } else {
1707                    builder.finish_and_track(now, self, path_id, pad_datagram);
1708                }
1709
1710                // If this is the first datagram we set the segment size to the size of the
1711                // first datagram.
1712                if transmit.num_datagrams() == 1 {
1713                    transmit.clip_segment_size();
1714                }
1715            }
1716        }
1717    }
1718
1719    fn poll_transmit_mtu_probe(
1720        &mut self,
1721        now: Instant,
1722        buf: &mut Vec<u8>,
1723        path_id: PathId,
1724    ) -> Option<Transmit> {
1725        let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?;
1726
1727        // We are definitely sending a DPLPMTUD probe.
1728        let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize);
1729        transmit.start_new_datagram_with_size(probe_size as usize);
1730
1731        let mut builder = PacketBuilder::new(
1732            now,
1733            SpaceId::Data,
1734            path_id,
1735            active_cid,
1736            &mut transmit,
1737            true,
1738            self,
1739        )?;
1740
1741        // We implement MTU probes as ping packets padded up to the probe size
1742        trace!(?probe_size, "writing MTUD probe");
1743        builder.write_frame(frame::Ping, &mut self.stats.frame_tx);
1744
1745        // If supported by the peer, we want no delays to the probe's ACK
1746        if self.peer_supports_ack_frequency() {
1747            builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx);
1748        }
1749
1750        builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1751
1752        self.path_stats
1753            .entry(path_id)
1754            .or_default()
1755            .sent_plpmtud_probes += 1;
1756
1757        Some(self.build_transmit(path_id, transmit))
1758    }
1759
1760    /// Returns the CID and probe size if a DPLPMTUD probe is needed.
1761    ///
1762    /// We MTU probe all paths for which all of the following is true:
1763    /// - We have an active destination CID for the path.
1764    /// - The remote address *and* path are validated.
1765    /// - The path is not abandoned.
1766    /// - The MTU Discovery subsystem wants to probe the path.
1767    fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> {
1768        let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?;
1769        let is_eligible = self.path_data(path_id).validated
1770            && !self.path_data(path_id).is_validating_path()
1771            && !self.abandoned_paths.contains(&path_id);
1772
1773        if !is_eligible {
1774            return None;
1775        }
1776        let next_pn = self.spaces[SpaceId::Data]
1777            .for_path(path_id)
1778            .peek_tx_number();
1779        let probe_size = self
1780            .path_data_mut(path_id)
1781            .mtud
1782            .poll_transmit(now, next_pn)?;
1783
1784        Some((active_cid, probe_size))
1785    }
1786
1787    /// Returns true if there is a further packet to send on [`PathId::ZERO`].
1788    ///
1789    /// In other words this is predicting whether the next call to
1790    /// [`Connection::space_can_send`] issued will return some frames to be sent. Including
1791    /// having to predict which packet number space it will be invoked with. This depends on
1792    /// how both [`Connection::poll_transmit_path`] and
1793    /// [`Connection::poll_transmit_path_space`] behave.
1794    ///
1795    /// This is needed to determine if packet coalescing can happen. Because the last packet
1796    /// in a datagram may need to be padded and thus we must know if another packet will
1797    /// follow or not.
1798    ///
1799    /// The next packet can be either in the same space, or in one of the following spaces
1800    /// on the same path. Because a 0-RTT packet can be coalesced with a 1-RTT packet and
1801    /// both are in the Data(PathId::ZERO) space. Previous spaces are not checked, because
1802    /// packets are built from Initial to Handshake to Data spaces.
1803    fn has_pending_packet(
1804        &mut self,
1805        current_space_id: SpaceId,
1806        max_packet_size: usize,
1807        connection_close_pending: bool,
1808    ) -> bool {
1809        let mut space_id = current_space_id;
1810        loop {
1811            let can_send = self.space_can_send(
1812                space_id,
1813                PathId::ZERO, // TODO: why only PathId 0?????
1814                max_packet_size,
1815                connection_close_pending,
1816            );
1817            if !can_send.is_empty() {
1818                return true;
1819            }
1820            match space_id.next() {
1821                Some(next_space_id) => space_id = next_space_id,
1822                None => break,
1823            }
1824        }
1825        false
1826    }
1827
1828    /// Checks if creating a new datagram would be blocked by congestion control
1829    fn path_congestion_check(
1830        &mut self,
1831        space_id: SpaceId,
1832        path_id: PathId,
1833        transmit: &TransmitBuf<'_>,
1834        can_send: &SendableFrames,
1835        now: Instant,
1836    ) -> PathBlocked {
1837        // Anti-amplification is only based on `total_sent`, which gets updated after
1838        // the transmit is sent. Therefore we pass the amount of bytes for datagrams
1839        // that are already created, as well as 1 byte for starting another datagram. If
1840        // there is any anti-amplification budget left, we always allow a full MTU to be
1841        // sent (see https://github.com/quinn-rs/quinn/issues/1082).
1842        if self.side().is_server()
1843            && self
1844                .path_data(path_id)
1845                .anti_amplification_blocked(transmit.len() as u64 + 1)
1846        {
1847            trace!(?space_id, %path_id, "blocked by anti-amplification");
1848            return PathBlocked::AntiAmplification;
1849        }
1850
1851        // Congestion control check.
1852        // Tail loss probes must not be blocked by congestion, or a deadlock could arise.
1853        let bytes_to_send = transmit.segment_size() as u64;
1854        let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1855
1856        if can_send.other && !need_loss_probe && !can_send.close {
1857            let path = self.path_data(path_id);
1858            if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1859                trace!(?space_id, %path_id, "blocked by congestion control");
1860                return PathBlocked::Congestion;
1861            }
1862        }
1863
1864        // Pacing check.
1865        if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1866            self.timers.set(
1867                Timer::PerPath(path_id, PathTimer::Pacing),
1868                delay,
1869                self.qlog.with_time(now),
1870            );
1871            // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though
1872            // they are not congestion controlled.
1873            trace!(?space_id, %path_id, "blocked by pacing");
1874            return PathBlocked::Pacing;
1875        }
1876
1877        PathBlocked::No
1878    }
1879
1880    /// Send PATH_CHALLENGE for a previous path if necessary
1881    ///
1882    /// QUIC-TRANSPORT section 9.3.3
1883    /// <https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding>
1884    fn send_prev_path_challenge(
1885        &mut self,
1886        now: Instant,
1887        buf: &mut Vec<u8>,
1888        path_id: PathId,
1889    ) -> Option<Transmit> {
1890        let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1891        // TODO (matheus23): We could use !prev_path.is_validating() here instead to
1892        // (possibly) also re-send challenges when they get lost.
1893        if !prev_path.pending_on_path_challenge {
1894            return None;
1895        };
1896        prev_path.pending_on_path_challenge = false;
1897        let token = self.rng.random();
1898        let network_path = prev_path.network_path;
1899        prev_path.record_path_challenge_sent(now, token, network_path);
1900
1901        debug_assert_eq!(
1902            self.highest_space,
1903            SpaceKind::Data,
1904            "PATH_CHALLENGE queued without 1-RTT keys"
1905        );
1906        let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1907        buf.start_new_datagram();
1908
1909        // Use the previous CID to avoid linking the new path with the previous path. We
1910        // don't bother accounting for possible retirement of that prev_cid because this is
1911        // sent once, immediately after migration, when the CID is known to be valid. Even
1912        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1913        // this is sent first.
1914        let mut builder =
1915            PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, false, self)?;
1916        let challenge = frame::PathChallenge(token);
1917        let stats = &mut self.stats.frame_tx;
1918        builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1919
1920        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1921        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1922        // unless the anti-amplification limit for the path does not permit
1923        // sending a datagram of this size
1924        builder.pad_to(MIN_INITIAL_SIZE);
1925
1926        builder.finish(self, now);
1927        self.stats.udp_tx.on_sent(1, buf.len());
1928        self.path_stats
1929            .entry(path_id)
1930            .or_default()
1931            .udp_tx
1932            .on_sent(1, buf.len());
1933
1934        Some(Transmit {
1935            destination: network_path.remote,
1936            size: buf.len(),
1937            ecn: None,
1938            segment_size: None,
1939            src_ip: network_path.local_ip,
1940        })
1941    }
1942
1943    fn send_off_path_path_response(
1944        &mut self,
1945        now: Instant,
1946        buf: &mut Vec<u8>,
1947        path_id: PathId,
1948    ) -> Option<Transmit> {
1949        let path = self.paths.get_mut(&path_id).map(|state| &mut state.data)?;
1950        let cid_queue = self.remote_cids.get_mut(&path_id)?;
1951        let (token, network_path) = path.path_responses.pop_off_path(path.network_path)?;
1952
1953        let cid = cid_queue
1954            .next_reserved()
1955            .unwrap_or_else(|| cid_queue.active());
1956        // TODO(@divma): we should take a different approach when there is no fresh CID to use.
1957        // https://github.com/quinn-rs/quinn/issues/2184
1958
1959        let frame = frame::PathResponse(token);
1960
1961        let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1962        buf.start_new_datagram();
1963
1964        let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, false, self)?;
1965        let stats = &mut self.stats.frame_tx;
1966        builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
1967        builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
1968
1969        let size = buf.len();
1970
1971        self.stats.udp_tx.on_sent(1, size);
1972        self.path_stats
1973            .entry(path_id)
1974            .or_default()
1975            .udp_tx
1976            .on_sent(1, size);
1977        Some(Transmit {
1978            destination: network_path.remote,
1979            size,
1980            ecn: None,
1981            segment_size: None,
1982            src_ip: network_path.local_ip,
1983        })
1984    }
1985
1986    /// Send a nat traversal challenge (off-path) on this path if possible.
1987    ///
1988    /// This will ensure the path still has a remaining CID to use if the active one should be
1989    /// retired.
1990    fn send_nat_traversal_path_challenge(
1991        &mut self,
1992        now: Instant,
1993        buf: &mut Vec<u8>,
1994        path_id: PathId,
1995    ) -> Option<Transmit> {
1996        let server_side = self.n0_nat_traversal.server_side_mut().ok()?;
1997        let probe = server_side.next_probe()?;
1998        if !self.paths.get(&path_id)?.data.validated {
1999            // Path is not usable for probing
2000            return None;
2001        }
2002
2003        let remote_cids = self.remote_cids.get_mut(&path_id)?;
2004
2005        // Check if this path has enough CIDs to send a probe. One to be reserved, one in case the
2006        // active CID needs to be retired.
2007        if remote_cids.remaining() < 2 {
2008            return None;
2009        }
2010
2011        let cid = remote_cids.next_reserved()?;
2012        let remote = probe.remote();
2013        let token = self.rng.random();
2014        probe.mark_as_sent();
2015
2016        let frame = frame::PathChallenge(token);
2017
2018        let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2019        buf.start_new_datagram();
2020
2021        let mut builder =
2022            PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, false, self)?;
2023        let stats = &mut self.stats.frame_tx;
2024        builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
2025        builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
2026
2027        let path = &mut self.paths.get_mut(&path_id).expect("checked").data;
2028        let network_path = FourTuple {
2029            remote,
2030            local_ip: None,
2031        };
2032
2033        path.record_path_challenge_sent(now, token, network_path);
2034
2035        let size = buf.len();
2036
2037        self.stats.udp_tx.on_sent(1, size);
2038        self.path_stats
2039            .entry(path_id)
2040            .or_default()
2041            .udp_tx
2042            .on_sent(1, size);
2043
2044        Some(Transmit {
2045            destination: remote,
2046            size,
2047            ecn: None,
2048            segment_size: None,
2049            src_ip: None,
2050        })
2051    }
2052
2053    /// Indicate what types of frames are ready to send for the given space.
2054    ///
2055    /// Only for on-path data.
2056    ///
2057    /// *packet_size* is the number of bytes available to build the next packet.
2058    /// *connection_close_pending* indicates whether a CONNECTION_CLOSE frame needs to be
2059    /// sent.
2060    fn space_can_send(
2061        &mut self,
2062        space_id: SpaceId,
2063        path_id: PathId,
2064        packet_size: usize,
2065        connection_close_pending: bool,
2066    ) -> SendableFrames {
2067        let space = &mut self.spaces[space_id];
2068        let space_has_crypto = self.crypto_state.has_keys(space_id.encryption_level());
2069
2070        if !space_has_crypto
2071            && (space_id != SpaceId::Data
2072                || !self.crypto_state.has_keys(EncryptionLevel::ZeroRtt)
2073                || self.side.is_server())
2074        {
2075            // Nothing to send in this space
2076            return SendableFrames::empty();
2077        }
2078
2079        let mut can_send = space.can_send(path_id, &self.streams);
2080
2081        // Check for 1RTT space.
2082        if space_id == SpaceId::Data {
2083            let pn = space.for_path(path_id).peek_tx_number();
2084            // Number of bytes available for frames if this is a 1-RTT packet. We're
2085            // guaranteed to be able to send an individual frame at least this large in the
2086            // next 1-RTT packet. This could be generalized to support every space, but it's
2087            // only needed to handle large fixed-size frames, which only exist in 1-RTT
2088            // (application datagrams).
2089            let frame_space_1rtt =
2090                packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
2091            can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
2092        }
2093
2094        can_send.close = connection_close_pending && space_has_crypto;
2095
2096        can_send
2097    }
2098
2099    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
2100    ///
2101    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
2102    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
2103    /// extracted through the relevant methods.
2104    pub fn handle_event(&mut self, event: ConnectionEvent) {
2105        use ConnectionEventInner::*;
2106        match event.0 {
2107            Datagram(DatagramConnectionEvent {
2108                now,
2109                network_path,
2110                path_id,
2111                ecn,
2112                first_decode,
2113                remaining,
2114            }) => {
2115                let span = trace_span!("pkt", %path_id);
2116                let _guard = span.enter();
2117
2118                if self.update_network_path_or_discard(network_path, path_id) {
2119                    // A return value of true indicates we should discard this packet.
2120                    return;
2121                }
2122
2123                let was_anti_amplification_blocked = self
2124                    .path(path_id)
2125                    .map(|path| path.anti_amplification_blocked(1))
2126                    // We never tried to send on an non-existing (new) path so have not been
2127                    // anti-amplification blocked for it previously.
2128                    .unwrap_or(false);
2129
2130                self.stats.udp_rx.datagrams += 1;
2131                self.stats.udp_rx.bytes += first_decode.len() as u64;
2132                let rx = &mut self.path_stats.entry(path_id).or_default().udp_rx;
2133                rx.datagrams += 1;
2134                rx.bytes += first_decode.len() as u64;
2135                let data_len = first_decode.len();
2136
2137                self.handle_decode(now, network_path, path_id, ecn, first_decode);
2138                // The current `path` might have changed inside `handle_decode` since the packet
2139                // could have triggered a migration. The packet might also belong to an unknown
2140                // path and have been rejected. Make sure the data received is accounted for the
2141                // most recent path by accessing `path` after `handle_decode`.
2142                if let Some(path) = self.path_mut(path_id) {
2143                    path.inc_total_recvd(data_len as u64);
2144                }
2145
2146                if let Some(data) = remaining {
2147                    self.stats.udp_rx.bytes += data.len() as u64;
2148                    self.path_stats.entry(path_id).or_default().udp_rx.bytes += data.len() as u64;
2149                    self.handle_coalesced(now, network_path, path_id, ecn, data);
2150                }
2151
2152                if let Some(path) = self.paths.get_mut(&path_id) {
2153                    self.qlog
2154                        .emit_recovery_metrics(path_id, &mut path.data, now);
2155                }
2156
2157                if was_anti_amplification_blocked {
2158                    // A prior attempt to set the loss detection timer may have failed due to
2159                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
2160                    // the server's first flight is lost.
2161                    self.set_loss_detection_timer(now, path_id);
2162                }
2163            }
2164            NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
2165                let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
2166                debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
2167                let cid_state = self
2168                    .local_cid_state
2169                    .entry(path_id)
2170                    .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
2171                cid_state.new_cids(&ids, now);
2172
2173                ids.into_iter().rev().for_each(|frame| {
2174                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
2175                });
2176                // Always update Timer::PushNewCid
2177                self.reset_cid_retirement(now);
2178            }
2179        }
2180    }
2181
2182    /// Updates the network path for `path_id`.
2183    ///
2184    /// Returns true if a packet coming in for this `path_id` over given `network_path` should be discarded.
2185    /// Returns false if the path was updated and the packet doesn't need to be discarded.
2186    fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
2187        let remote_may_migrate = self.side.remote_may_migrate(&self.state);
2188        let local_ip_may_migrate = self.side.is_client();
2189        // If this packet could initiate a migration and we're a client or a server that
2190        // forbids migration, drop the datagram. This could be relaxed to heuristically
2191        // permit NAT-rebinding-like migration.
2192        if let Some(known_path) = self.path_mut(path_id) {
2193            if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
2194                trace!(
2195                    %path_id,
2196                    %network_path,
2197                    %known_path.network_path,
2198                    "discarding packet from unrecognized peer"
2199                );
2200                return true;
2201            }
2202
2203            if known_path.network_path.local_ip.is_some()
2204                && network_path.local_ip.is_some()
2205                && known_path.network_path.local_ip != network_path.local_ip
2206                && !local_ip_may_migrate
2207            {
2208                trace!(
2209                    %path_id,
2210                    %network_path,
2211                    %known_path.network_path,
2212                    "discarding packet sent to incorrect interface"
2213                );
2214                return true;
2215            }
2216            // If the datagram indicates that we've changed our local IP, we update it.
2217            // This is alluded to in Section 5.2 of the Multipath RFC draft 18:
2218            // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#name-using-multiple-paths-on-the
2219            // > Client receives the packet, recognizes a path migration, updates the source address of path 2 to 192.0.2.1.
2220            if let Some(local_ip) = network_path.local_ip {
2221                if known_path
2222                    .network_path
2223                    .local_ip
2224                    .is_some_and(|ip| ip != local_ip)
2225                {
2226                    debug!(
2227                        %path_id,
2228                        %network_path,
2229                        %known_path.network_path,
2230                        "path's local address seemingly migrated"
2231                    );
2232                }
2233                // We update the address without path validation on the client side.
2234                // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#section-5.1
2235                // > Servers observing a 4-tuple change will perform path validation (see Section 9 of [QUIC-TRANSPORT]).
2236                // This sounds like it's *only* the server endpoints that do this.
2237                // TODO(matheus23): We should still consider doing a proper migration on the client side in the future.
2238                // For now, this preserves the behavior of this code pre 4-tuple tracking.
2239                known_path.network_path.local_ip = Some(local_ip);
2240            }
2241        }
2242        false
2243    }
2244
2245    /// Process timer expirations
2246    ///
2247    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
2248    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
2249    /// methods.
2250    ///
2251    /// It is most efficient to call this immediately after the system clock reaches the latest
2252    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
2253    /// no-op and therefore are safe.
2254    pub fn handle_timeout(&mut self, now: Instant) {
2255        while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
2256            // TODO(@divma): remove `at` when the unicorn is born
2257            trace!(?timer, at=?now, "timeout");
2258            match timer {
2259                Timer::Conn(timer) => match timer {
2260                    ConnTimer::Close => {
2261                        self.state.move_to_drained(None);
2262                        // move_to_drained checks that we weren't in drained before.
2263                        // Adding events to endpoint_events is only legal if `Drained` was never queued before.
2264                        self.endpoint_events.push_back(EndpointEventInner::Drained);
2265                    }
2266                    ConnTimer::Idle => {
2267                        self.kill(ConnectionError::TimedOut);
2268                    }
2269                    ConnTimer::KeepAlive => {
2270                        trace!("sending keep-alive");
2271                        self.ping();
2272                    }
2273                    ConnTimer::KeyDiscard => {
2274                        self.crypto_state.discard_temporary_keys();
2275                    }
2276                    ConnTimer::PushNewCid => {
2277                        while let Some((path_id, when)) = self.next_cid_retirement() {
2278                            if when > now {
2279                                break;
2280                            }
2281                            match self.local_cid_state.get_mut(&path_id) {
2282                                None => error!(%path_id, "No local CID state for path"),
2283                                Some(cid_state) => {
2284                                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
2285                                    let num_new_cid = cid_state.on_cid_timeout().into();
2286                                    if !self.state.is_closed() {
2287                                        trace!(
2288                                            "push a new CID to peer RETIRE_PRIOR_TO field {}",
2289                                            cid_state.retire_prior_to()
2290                                        );
2291                                        self.endpoint_events.push_back(
2292                                            EndpointEventInner::NeedIdentifiers(
2293                                                path_id,
2294                                                now,
2295                                                num_new_cid,
2296                                            ),
2297                                        );
2298                                    }
2299                                }
2300                            }
2301                        }
2302                    }
2303                },
2304                // TODO: add path_id as span somehow
2305                Timer::PerPath(path_id, timer) => {
2306                    let span = trace_span!("per-path timer fired", %path_id, ?timer);
2307                    let _guard = span.enter();
2308                    match timer {
2309                        PathTimer::PathIdle => {
2310                            if let Err(err) =
2311                                self.close_path_inner(now, path_id, PathAbandonReason::TimedOut)
2312                            {
2313                                warn!(?err, "failed closing path");
2314                            }
2315                        }
2316
2317                        PathTimer::PathKeepAlive => {
2318                            trace!("sending keep-alive on path");
2319                            self.ping_path(path_id).ok();
2320                        }
2321                        PathTimer::LossDetection => {
2322                            self.on_loss_detection_timeout(now, path_id);
2323                            self.qlog.emit_recovery_metrics(
2324                                path_id,
2325                                &mut self.paths.get_mut(&path_id).unwrap().data,
2326                                now,
2327                            );
2328                        }
2329                        PathTimer::PathValidation => {
2330                            let Some(path) = self.paths.get_mut(&path_id) else {
2331                                continue;
2332                            };
2333                            self.timers.stop(
2334                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2335                                self.qlog.with_time(now),
2336                            );
2337                            debug!("path validation failed");
2338                            if let Some((_, prev)) = path.prev.take() {
2339                                path.data = prev;
2340                            }
2341                            path.data.reset_on_path_challenges();
2342                        }
2343                        PathTimer::PathChallengeLost => {
2344                            let Some(path) = self.paths.get_mut(&path_id) else {
2345                                continue;
2346                            };
2347                            trace!("path challenge deemed lost");
2348                            path.data.pending_on_path_challenge = true;
2349                        }
2350                        PathTimer::PathOpen => {
2351                            let Some(path) = self.paths.get_mut(&path_id) else {
2352                                continue;
2353                            };
2354                            path.data.reset_on_path_challenges();
2355                            self.timers.stop(
2356                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2357                                self.qlog.with_time(now),
2358                            );
2359                            debug!("new path validation failed");
2360                            if let Err(err) = self.close_path_inner(
2361                                now,
2362                                path_id,
2363                                PathAbandonReason::ValidationFailed,
2364                            ) {
2365                                warn!(?err, "failed closing path");
2366                            }
2367                        }
2368                        PathTimer::Pacing => trace!("pacing timer expired"),
2369                        PathTimer::MaxAckDelay => {
2370                            trace!("max ack delay reached");
2371                            // This timer is only armed in the Data space
2372                            self.spaces[SpaceId::Data]
2373                                .for_path(path_id)
2374                                .pending_acks
2375                                .on_max_ack_delay_timeout()
2376                        }
2377                        PathTimer::DiscardPath => {
2378                            // The path was abandoned and 3*PTO has expired since.  Clean up all
2379                            // remaining state and install stateless reset token.
2380                            self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2381                            if let Some(local_cid_state) = self.local_cid_state.remove(&path_id) {
2382                                debug_assert!(!self.state.is_drained()); // requirement for endpoint_events. All timers should be cleared in drained connections.
2383                                let (min_seq, max_seq) = local_cid_state.active_seq();
2384                                for seq in min_seq..=max_seq {
2385                                    self.endpoint_events.push_back(
2386                                        EndpointEventInner::RetireConnectionId(
2387                                            now, path_id, seq, false,
2388                                        ),
2389                                    );
2390                                }
2391                            }
2392                            self.discard_path(path_id, now);
2393                        }
2394                    }
2395                }
2396            }
2397        }
2398    }
2399
2400    /// Close a connection immediately
2401    ///
2402    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
2403    /// call this only when all important communications have been completed, e.g. by calling
2404    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
2405    /// [`StreamEvent::Finished`] event.
2406    ///
2407    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
2408    /// delivered. There may still be data from the peer that has not been received.
2409    ///
2410    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
2411    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2412        self.close_inner(
2413            now,
2414            Close::Application(frame::ApplicationClose { error_code, reason }),
2415        )
2416    }
2417
2418    /// Close the connection immediately, initiated by an API call.
2419    ///
2420    /// This will not produce a [`ConnectionLost`] event propagated by the
2421    /// [`Connection::poll`] call, because the API call already propagated the error to the
2422    /// user.
2423    ///
2424    /// Not to be used when entering immediate close due to an internal state change based
2425    /// on an event. See [`State::move_to_closed_local`] for details.
2426    ///
2427    /// This initiates immediate close from
2428    /// <https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2>, moving to the closed
2429    /// state.
2430    ///
2431    /// [`ConnectionLost`]: crate::Event::ConnectionLost
2432    /// [`Connection::poll`]: super::Connection::poll
2433    fn close_inner(&mut self, now: Instant, reason: Close) {
2434        let was_closed = self.state.is_closed();
2435        if !was_closed {
2436            self.close_common();
2437            self.set_close_timer(now);
2438            self.connection_close_pending = true;
2439            self.state.move_to_closed_local(reason);
2440        }
2441    }
2442
2443    /// Control datagrams
2444    pub fn datagrams(&mut self) -> Datagrams<'_> {
2445        Datagrams { conn: self }
2446    }
2447
2448    /// Returns connection statistics
2449    pub fn stats(&mut self) -> ConnectionStats {
2450        self.stats.clone()
2451    }
2452
2453    /// Returns path statistics
2454    pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2455        let path = self.paths.get(&path_id)?;
2456        let stats = self.path_stats.entry(path_id).or_default();
2457        stats.rtt = path.data.rtt.get();
2458        stats.cwnd = path.data.congestion.window();
2459        stats.current_mtu = path.data.mtud.current_mtu();
2460        Some(*stats)
2461    }
2462
2463    /// Ping the remote endpoint
2464    ///
2465    /// Causes an ACK-eliciting packet to be transmitted on the connection.
2466    pub fn ping(&mut self) {
2467        // TODO(flub): This is very brute-force: it pings *all* the paths.  Instead it would
2468        //    be nice if we could only send a single packet for this.
2469        for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2470            path_data.ping_pending = true;
2471        }
2472    }
2473
2474    /// Ping the remote endpoint over a specific path
2475    ///
2476    /// Causes an ACK-eliciting packet to be transmitted on the path.
2477    pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2478        let path_data = self.spaces[self.highest_space]
2479            .number_spaces
2480            .get_mut(&path)
2481            .ok_or(ClosedPath { _private: () })?;
2482        path_data.ping_pending = true;
2483        Ok(())
2484    }
2485
2486    /// Update traffic keys spontaneously
2487    ///
2488    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
2489    pub fn force_key_update(&mut self) {
2490        if !self.state.is_established() {
2491            debug!("ignoring forced key update in illegal state");
2492            return;
2493        }
2494        if self.crypto_state.prev_crypto.is_some() {
2495            // We already just updated, or are currently updating, the keys. Concurrent key updates
2496            // are illegal.
2497            debug!("ignoring redundant forced key update");
2498            return;
2499        }
2500        self.crypto_state.update_keys(None, false);
2501    }
2502
2503    /// Get a session reference
2504    pub fn crypto_session(&self) -> &dyn crypto::Session {
2505        self.crypto_state.session.as_ref()
2506    }
2507
2508    /// Whether the connection is in the process of being established
2509    ///
2510    /// If this returns `false`, the connection may be either established or closed, signaled by the
2511    /// emission of a `Connected` or `ConnectionLost` message respectively.
2512    pub fn is_handshaking(&self) -> bool {
2513        self.state.is_handshake()
2514    }
2515
2516    /// Whether the connection is closed
2517    ///
2518    /// Closed connections cannot transport any further data. A connection becomes closed when
2519    /// either peer application intentionally closes it, or when either transport layer detects an
2520    /// error such as a time-out or certificate validation failure.
2521    ///
2522    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
2523    pub fn is_closed(&self) -> bool {
2524        self.state.is_closed()
2525    }
2526
2527    /// Whether there is no longer any need to keep the connection around
2528    ///
2529    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
2530    /// packets from the peer. All drained connections have been closed.
2531    pub fn is_drained(&self) -> bool {
2532        self.state.is_drained()
2533    }
2534
2535    /// For clients, if the peer accepted the 0-RTT data packets
2536    ///
2537    /// The value is meaningless until after the handshake completes.
2538    pub fn accepted_0rtt(&self) -> bool {
2539        self.crypto_state.accepted_0rtt
2540    }
2541
2542    /// Whether 0-RTT is/was possible during the handshake
2543    pub fn has_0rtt(&self) -> bool {
2544        self.crypto_state.zero_rtt_enabled
2545    }
2546
2547    /// Whether there are any pending retransmits
2548    pub fn has_pending_retransmits(&self) -> bool {
2549        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2550    }
2551
2552    /// Look up whether we're the client or server of this Connection
2553    pub fn side(&self) -> Side {
2554        self.side.side()
2555    }
2556
2557    /// Get the address observed by the remote over the given path
2558    pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2559        self.path(path_id)
2560            .map(|path_data| {
2561                path_data
2562                    .last_observed_addr_report
2563                    .as_ref()
2564                    .map(|observed| observed.socket_addr())
2565            })
2566            .ok_or(ClosedPath { _private: () })
2567    }
2568
2569    /// Current best estimate of this connection's latency (round-trip-time)
2570    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2571        self.path(path_id).map(|d| d.rtt.get())
2572    }
2573
2574    /// Current state of this connection's congestion controller, for debugging purposes
2575    pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2576        self.path(path_id).map(|d| d.congestion.as_ref())
2577    }
2578
2579    /// Modify the number of remotely initiated streams that may be concurrently open
2580    ///
2581    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
2582    /// `count`s increase both minimum and worst-case memory consumption.
2583    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2584        self.streams.set_max_concurrent(dir, count);
2585        // If the limit was reduced, then a flow control update previously deemed insignificant may
2586        // now be significant.
2587        let pending = &mut self.spaces[SpaceId::Data].pending;
2588        self.streams.queue_max_stream_id(pending);
2589    }
2590
2591    /// Modify the number of open paths allowed when multipath is enabled
2592    ///
2593    /// When reducing the number of concurrent paths this will only affect delaying sending
2594    /// new MAX_PATH_ID frames until fewer than this number of paths are possible.  To
2595    /// actively reduce paths they must be closed using [`Connection::close_path`], which
2596    /// can also be used to close not-yet-opened paths.
2597    ///
2598    /// If multipath is not negotiated (see the [`TransportConfig`]) this can not enable
2599    /// multipath and will fail.
2600    pub fn set_max_concurrent_paths(
2601        &mut self,
2602        now: Instant,
2603        count: NonZeroU32,
2604    ) -> Result<(), MultipathNotNegotiated> {
2605        if !self.is_multipath_negotiated() {
2606            return Err(MultipathNotNegotiated { _private: () });
2607        }
2608        self.max_concurrent_paths = count;
2609
2610        let in_use_count = self
2611            .local_max_path_id
2612            .next()
2613            .saturating_sub(self.abandoned_paths.len() as u32)
2614            .as_u32();
2615        let extra_needed = count.get().saturating_sub(in_use_count);
2616        let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2617
2618        self.set_max_path_id(now, new_max_path_id);
2619
2620        Ok(())
2621    }
2622
2623    /// If needed, issues a new MAX_PATH_ID frame and new CIDs for any newly allowed paths
2624    fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2625        if max_path_id <= self.local_max_path_id {
2626            return;
2627        }
2628
2629        self.local_max_path_id = max_path_id;
2630        self.spaces[SpaceId::Data].pending.max_path_id = true;
2631
2632        self.issue_first_path_cids(now);
2633    }
2634
2635    /// Current number of remotely initiated streams that may be concurrently open
2636    ///
2637    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
2638    /// it will not change immediately, even if fewer streams are open. Instead, it will
2639    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
2640    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2641        self.streams.max_concurrent(dir)
2642    }
2643
2644    /// See [`TransportConfig::send_window()`]
2645    pub fn set_send_window(&mut self, send_window: u64) {
2646        self.streams.set_send_window(send_window);
2647    }
2648
2649    /// See [`TransportConfig::receive_window()`]
2650    pub fn set_receive_window(&mut self, receive_window: VarInt) {
2651        if self.streams.set_receive_window(receive_window) {
2652            self.spaces[SpaceId::Data].pending.max_data = true;
2653        }
2654    }
2655
2656    /// Whether the Multipath for QUIC extension is enabled.
2657    ///
2658    /// Multipath is only enabled after the handshake is completed and if it was enabled by both
2659    /// peers.
2660    pub fn is_multipath_negotiated(&self) -> bool {
2661        !self.is_handshaking()
2662            && self.config.max_concurrent_multipath_paths.is_some()
2663            && self.peer_params.initial_max_path_id.is_some()
2664    }
2665
2666    fn on_ack_received(
2667        &mut self,
2668        now: Instant,
2669        space: SpaceId,
2670        ack: frame::Ack,
2671    ) -> Result<(), TransportError> {
2672        // All ACKs are referencing path 0
2673        let path = PathId::ZERO;
2674        self.inner_on_ack_received(now, space, path, ack)
2675    }
2676
2677    fn on_path_ack_received(
2678        &mut self,
2679        now: Instant,
2680        space: SpaceId,
2681        path_ack: frame::PathAck,
2682    ) -> Result<(), TransportError> {
2683        let (ack, path) = path_ack.into_ack();
2684        self.inner_on_ack_received(now, space, path, ack)
2685    }
2686
2687    /// Handles an ACK frame acknowledging packets sent on *path*.
2688    fn inner_on_ack_received(
2689        &mut self,
2690        now: Instant,
2691        space: SpaceId,
2692        path: PathId,
2693        ack: frame::Ack,
2694    ) -> Result<(), TransportError> {
2695        if self.abandoned_paths.contains(&path) {
2696            // See also https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.4.3-3
2697            // > PATH_ACK frames received with an abandoned path ID are silently ignored, as specified in Section 4.
2698            trace!("silently ignoring PATH_ACK on abandoned path");
2699            return Ok(());
2700        }
2701        if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2702            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2703        }
2704        let new_largest = {
2705            let space = &mut self.spaces[space].for_path(path);
2706            if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2707                space.largest_acked_packet = Some(ack.largest);
2708                if let Some(info) = space.sent_packets.get(ack.largest) {
2709                    // This should always succeed, but a misbehaving peer might ACK a packet we
2710                    // haven't sent. At worst, that will result in us spuriously reducing the
2711                    // congestion window.
2712                    space.largest_acked_packet_sent = info.time_sent;
2713                }
2714                true
2715            } else {
2716                false
2717            }
2718        };
2719
2720        if self.detect_spurious_loss(&ack, space, path) {
2721            self.path_data_mut(path)
2722                .congestion
2723                .on_spurious_congestion_event();
2724        }
2725
2726        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
2727        let mut newly_acked = ArrayRangeSet::new();
2728        for range in ack.iter() {
2729            self.spaces[space].for_path(path).check_ack(range.clone())?;
2730            for (pn, _) in self.spaces[space]
2731                .for_path(path)
2732                .sent_packets
2733                .iter_range(range)
2734            {
2735                newly_acked.insert_one(pn);
2736            }
2737        }
2738
2739        if newly_acked.is_empty() {
2740            return Ok(());
2741        }
2742
2743        let mut ack_eliciting_acked = false;
2744        for packet in newly_acked.elts() {
2745            if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2746                for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2747                    // Assume ACKs for all packets below the largest acknowledged in
2748                    // `packet` have been received. This can cause the peer to spuriously
2749                    // retransmit if some of our earlier ACKs were lost, but allows for
2750                    // simpler state tracking. See discussion at
2751                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
2752                    if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2753                        pns.pending_acks.subtract_below(*acked_pn);
2754                    }
2755                }
2756                ack_eliciting_acked |= info.ack_eliciting;
2757
2758                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
2759                let path_data = self.path_data_mut(path);
2760                let mtu_updated = path_data.mtud.on_acked(space.kind(), packet, info.size);
2761                if mtu_updated {
2762                    path_data
2763                        .congestion
2764                        .on_mtu_update(path_data.mtud.current_mtu());
2765                }
2766
2767                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
2768                self.ack_frequency.on_acked(path, packet);
2769
2770                self.on_packet_acked(now, path, info);
2771            }
2772        }
2773
2774        let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2775        let app_limited = self.app_limited;
2776        let path_data = self.path_data_mut(path);
2777        let in_flight = path_data.in_flight.bytes;
2778
2779        path_data
2780            .congestion
2781            .on_end_acks(now, in_flight, app_limited, largest_ackd);
2782
2783        if new_largest && ack_eliciting_acked {
2784            let ack_delay = if space != SpaceId::Data {
2785                Duration::from_micros(0)
2786            } else {
2787                cmp::min(
2788                    self.ack_frequency.peer_max_ack_delay,
2789                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2790                )
2791            };
2792            let rtt = now.saturating_duration_since(
2793                self.spaces[space].for_path(path).largest_acked_packet_sent,
2794            );
2795
2796            let next_pn = self.spaces[space].for_path(path).next_packet_number;
2797            let path_data = self.path_data_mut(path);
2798            // TODO(@divma): should be a method of path, should be contained in a single place
2799            path_data.rtt.update(ack_delay, rtt);
2800            if path_data.first_packet_after_rtt_sample.is_none() {
2801                path_data.first_packet_after_rtt_sample = Some((space.kind(), next_pn));
2802            }
2803        }
2804
2805        // Must be called before crypto/pto_count are clobbered
2806        self.detect_lost_packets(now, space, path, true);
2807
2808        if self.peer_completed_address_validation(path) {
2809            self.path_data_mut(path).pto_count = 0;
2810        }
2811
2812        // Explicit congestion notification
2813        // TODO(@divma): this code is a good example of logic that should be contained in a single
2814        // place but it's split between the path data and the packet number space data, we should
2815        // find a way to make this work without two lookups
2816        if self.path_data(path).sending_ecn {
2817            if let Some(ecn) = ack.ecn {
2818                // We only examine ECN counters from ACKs that we are certain we received in transmit
2819                // order, allowing us to compute an increase in ECN counts to compare against the number
2820                // of newly acked packets that remains well-defined in the presence of arbitrary packet
2821                // reordering.
2822                if new_largest {
2823                    let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2824                    self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2825                }
2826            } else {
2827                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
2828                debug!("ECN not acknowledged by peer");
2829                self.path_data_mut(path).sending_ecn = false;
2830            }
2831        }
2832
2833        self.set_loss_detection_timer(now, path);
2834        Ok(())
2835    }
2836
2837    fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2838        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2839
2840        if lost_packets.is_empty() {
2841            return false;
2842        }
2843
2844        for range in ack.iter() {
2845            let spurious_losses: Vec<u64> = lost_packets
2846                .iter_range(range.clone())
2847                .map(|(pn, _info)| pn)
2848                .collect();
2849
2850            for pn in spurious_losses {
2851                lost_packets.remove(pn);
2852            }
2853        }
2854
2855        // If this ACK frame acknowledged all deemed lost packets,
2856        // then we have raised a spurious congestion event in the past.
2857        // We cannot conclude when there are remaining packets,
2858        // but future ACK frames might indicate a spurious loss detection.
2859        lost_packets.is_empty()
2860    }
2861
2862    /// Drain lost packets that we reasonably think will never arrive
2863    ///
2864    /// The current criterion is copied from `msquic`:
2865    /// discard packets that were sent earlier than 2 probe timeouts ago.
2866    fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2867        let two_pto = 2 * self.path_data(path).rtt.pto_base();
2868
2869        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2870        lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2871    }
2872
2873    /// Process a new ECN block from an in-order ACK
2874    fn process_ecn(
2875        &mut self,
2876        now: Instant,
2877        space: SpaceId,
2878        path: PathId,
2879        newly_acked: u64,
2880        ecn: frame::EcnCounts,
2881        largest_sent_time: Instant,
2882    ) {
2883        match self.spaces[space]
2884            .for_path(path)
2885            .detect_ecn(newly_acked, ecn)
2886        {
2887            Err(e) => {
2888                debug!("halting ECN due to verification failure: {}", e);
2889
2890                self.path_data_mut(path).sending_ecn = false;
2891                // Wipe out the existing value because it might be garbage and could interfere with
2892                // future attempts to use ECN on new paths.
2893                self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2894            }
2895            Ok(false) => {}
2896            Ok(true) => {
2897                self.path_stats.entry(path).or_default().congestion_events += 1;
2898                self.path_data_mut(path).congestion.on_congestion_event(
2899                    now,
2900                    largest_sent_time,
2901                    false,
2902                    true,
2903                    0,
2904                );
2905            }
2906        }
2907    }
2908
2909    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
2910    // high-latency handshakes
2911    fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2912        self.paths
2913            .get_mut(&path_id)
2914            .expect("known path")
2915            .remove_in_flight(&info);
2916        let app_limited = self.app_limited;
2917        let path = self.path_data_mut(path_id);
2918        if info.ack_eliciting && !path.is_validating_path() {
2919            // Only pass ACKs to the congestion controller if we are not validating the current
2920            // path, so as to ignore any ACKs from older paths still coming in.
2921            let rtt = path.rtt;
2922            path.congestion
2923                .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2924        }
2925
2926        // Update state for confirmed delivery of frames
2927        if let Some(retransmits) = info.retransmits.get() {
2928            for (id, _) in retransmits.reset_stream.iter() {
2929                self.streams.reset_acked(*id);
2930            }
2931        }
2932
2933        for frame in info.stream_frames {
2934            self.streams.received_ack_of(frame);
2935        }
2936    }
2937
2938    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceKind) {
2939        let start = if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) {
2940            now
2941        } else {
2942            self.crypto_state
2943                .prev_crypto
2944                .as_ref()
2945                .expect("no previous keys")
2946                .end_packet
2947                .as_ref()
2948                .expect("update not acknowledged yet")
2949                .1
2950        };
2951
2952        // QUIC-MULTIPATH § 2.5 Key Phase Update Process: use largest PTO of all paths.
2953        self.timers.set(
2954            Timer::Conn(ConnTimer::KeyDiscard),
2955            start + self.max_pto_for_space(space) * 3,
2956            self.qlog.with_time(now),
2957        );
2958    }
2959
2960    /// Handle a [`PathTimer::LossDetection`] timeout.
2961    ///
2962    /// This timer expires for two reasons:
2963    /// - An ACK-eliciting packet we sent should be considered lost.
2964    /// - The PTO may have expired and a tail-loss probe needs to be scheduled.
2965    ///
2966    /// The former needs us to schedule re-transmission of the lost data.
2967    ///
2968    /// The latter means we have not received an ACK for an ack-eliciting packet we sent
2969    /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
2970    /// packet, to try and elicit new acknowledgements. These new acknowledgements will
2971    /// indicate whether the previously sent packets were lost or not.
2972    fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2973        if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2974            // Time threshold loss Detection
2975            self.detect_lost_packets(now, pn_space, path_id, false);
2976            self.set_loss_detection_timer(now, path_id);
2977            return;
2978        }
2979
2980        let (_, space) = match self.pto_time_and_space(now, path_id) {
2981            Some(x) => x,
2982            None => {
2983                error!(%path_id, "PTO expired while unset");
2984                return;
2985            }
2986        };
2987        trace!(
2988            in_flight = self.path_data(path_id).in_flight.bytes,
2989            count = self.path_data(path_id).pto_count,
2990            ?space,
2991            %path_id,
2992            "PTO fired"
2993        );
2994
2995        let count = match self.path_data(path_id).in_flight.ack_eliciting {
2996            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
2997            // deadlock preventions
2998            0 => {
2999                debug_assert!(!self.peer_completed_address_validation(path_id));
3000                1
3001            }
3002            // Conventional loss probe
3003            _ => 2,
3004        };
3005        let pns = self.spaces[space].for_path(path_id);
3006        pns.loss_probes = pns.loss_probes.saturating_add(count);
3007        let path_data = self.path_data_mut(path_id);
3008        path_data.pto_count = path_data.pto_count.saturating_add(1);
3009        self.set_loss_detection_timer(now, path_id);
3010    }
3011
3012    /// Detect any lost packets
3013    ///
3014    /// There are two cases in which we detects lost packets:
3015    ///
3016    /// - We received an ACK packet.
3017    /// - The [`PathTimer::LossDetection`] timer expired. So there is an un-acknowledged packet
3018    ///   that was followed by an acknowledged packet. The loss timer for this
3019    ///   un-acknowledged packet expired and we need to detect that packet as lost.
3020    ///
3021    /// Packets are lost if they are both (See RFC9002 §6.1):
3022    ///
3023    /// - Unacknowledged, in flight and sent prior to an acknowledged packet.
3024    /// - Old enough by either:
3025    ///   - Having a packet number [`TransportConfig::packet_threshold`] lower then the last
3026    ///     acknowledged packet.
3027    ///   - Being sent [`TransportConfig::time_threshold`] * RTT in the past.
3028    fn detect_lost_packets(
3029        &mut self,
3030        now: Instant,
3031        pn_space: SpaceId,
3032        path_id: PathId,
3033        due_to_ack: bool,
3034    ) {
3035        let mut lost_packets = Vec::<u64>::new();
3036        let mut lost_mtu_probe = None;
3037        let mut in_persistent_congestion = false;
3038        let mut size_of_lost_packets = 0u64;
3039        self.spaces[pn_space].for_path(path_id).loss_time = None;
3040
3041        // Find all the lost packets, populating all variables initialised above.
3042
3043        let path = self.path_data(path_id);
3044        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3045        let loss_delay = path
3046            .rtt
3047            .conservative()
3048            .mul_f32(self.config.time_threshold)
3049            .max(TIMER_GRANULARITY);
3050        let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
3051
3052        let largest_acked_packet = self.spaces[pn_space]
3053            .for_path(path_id)
3054            .largest_acked_packet
3055            .expect("detect_lost_packets only to be called if path received at least one ACK");
3056        let packet_threshold = self.config.packet_threshold as u64;
3057
3058        // InPersistentCongestion: Determine if all packets in the time period before the newest
3059        // lost packet, including the edges, are marked lost. PTO computation must always
3060        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
3061        let congestion_period = self
3062            .pto(SpaceKind::Data, path_id)
3063            .saturating_mul(self.config.persistent_congestion_threshold);
3064        let mut persistent_congestion_start: Option<Instant> = None;
3065        let mut prev_packet = None;
3066        let space = self.spaces[pn_space].for_path(path_id);
3067
3068        for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
3069            if prev_packet != Some(packet.wrapping_sub(1)) {
3070                // An intervening packet was acknowledged
3071                persistent_congestion_start = None;
3072            }
3073
3074            // Packets sent before now - loss_delay are deemed lost.
3075            // However, we avoid subtraction as it can panic and there's no
3076            // saturating equivalent of this subtraction operation with a Duration.
3077            let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
3078            if packet_too_old || largest_acked_packet >= packet + packet_threshold {
3079                // The packet should be declared lost.
3080                if Some(packet) == in_flight_mtu_probe {
3081                    // Lost MTU probes are not included in `lost_packets`, because they
3082                    // should not trigger a congestion control response
3083                    lost_mtu_probe = in_flight_mtu_probe;
3084                } else {
3085                    lost_packets.push(packet);
3086                    size_of_lost_packets += info.size as u64;
3087                    if info.ack_eliciting && due_to_ack {
3088                        match persistent_congestion_start {
3089                            // Two ACK-eliciting packets lost more than
3090                            // congestion_period apart, with no ACKed packets in between
3091                            Some(start) if info.time_sent - start > congestion_period => {
3092                                in_persistent_congestion = true;
3093                            }
3094                            // Persistent congestion must start after the first RTT sample
3095                            None if first_packet_after_rtt_sample
3096                                .is_some_and(|x| x < (pn_space.kind(), packet)) =>
3097                            {
3098                                persistent_congestion_start = Some(info.time_sent);
3099                            }
3100                            _ => {}
3101                        }
3102                    }
3103                }
3104            } else {
3105                // The packet should not yet be declared lost.
3106                if space.loss_time.is_none() {
3107                    // Since we iterate in order the lowest packet number's loss time will
3108                    // always be the earliest.
3109                    space.loss_time = Some(info.time_sent + loss_delay);
3110                }
3111                persistent_congestion_start = None;
3112            }
3113
3114            prev_packet = Some(packet);
3115        }
3116
3117        self.handle_lost_packets(
3118            pn_space,
3119            path_id,
3120            now,
3121            lost_packets,
3122            lost_mtu_probe,
3123            loss_delay,
3124            in_persistent_congestion,
3125            size_of_lost_packets,
3126        );
3127    }
3128
3129    /// Drops the path state, declaring any remaining in-flight packets as lost
3130    fn discard_path(&mut self, path_id: PathId, now: Instant) {
3131        trace!(%path_id, "dropping path state");
3132        let path = self.path_data(path_id);
3133        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3134
3135        let mut size_of_lost_packets = 0u64; // add to path_stats.lost_bytes;
3136        let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
3137            .for_path(path_id)
3138            .sent_packets
3139            .iter()
3140            .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
3141            .map(|(pn, info)| {
3142                size_of_lost_packets += info.size as u64;
3143                pn
3144            })
3145            .collect();
3146
3147        if !lost_pns.is_empty() {
3148            trace!(
3149                %path_id,
3150                count = lost_pns.len(),
3151                lost_bytes = size_of_lost_packets,
3152                "packets lost on path abandon"
3153            );
3154            self.handle_lost_packets(
3155                SpaceId::Data,
3156                path_id,
3157                now,
3158                lost_pns,
3159                in_flight_mtu_probe,
3160                Duration::ZERO,
3161                false,
3162                size_of_lost_packets,
3163            );
3164        }
3165        // Before removing the path, we fetch the final path stats via `Self::path_stats`.
3166        // This updates some values for the last time.
3167        let path_stats = self.path_stats(path_id).unwrap_or_default();
3168        self.path_stats.remove(&path_id);
3169        self.paths.remove(&path_id);
3170        self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
3171
3172        self.events.push_back(
3173            PathEvent::Discarded {
3174                id: path_id,
3175                path_stats,
3176            }
3177            .into(),
3178        );
3179    }
3180
3181    fn handle_lost_packets(
3182        &mut self,
3183        pn_space: SpaceId,
3184        path_id: PathId,
3185        now: Instant,
3186        lost_packets: Vec<u64>,
3187        lost_mtu_probe: Option<u64>,
3188        loss_delay: Duration,
3189        in_persistent_congestion: bool,
3190        size_of_lost_packets: u64,
3191    ) {
3192        debug_assert!(
3193            {
3194                let mut sorted = lost_packets.clone();
3195                sorted.sort();
3196                sorted == lost_packets
3197            },
3198            "lost_packets must be sorted"
3199        );
3200
3201        self.drain_lost_packets(now, pn_space, path_id);
3202
3203        // OnPacketsLost
3204        if let Some(largest_lost) = lost_packets.last().cloned() {
3205            let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
3206            let largest_lost_sent = self.spaces[pn_space]
3207                .for_path(path_id)
3208                .sent_packets
3209                .get(largest_lost)
3210                .unwrap()
3211                .time_sent;
3212            let path_stats = self.path_stats.entry(path_id).or_default();
3213            path_stats.lost_packets += lost_packets.len() as u64;
3214            path_stats.lost_bytes += size_of_lost_packets;
3215            trace!(
3216                %path_id,
3217                count = lost_packets.len(),
3218                lost_bytes = size_of_lost_packets,
3219                "packets lost",
3220            );
3221
3222            for &packet in &lost_packets {
3223                let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
3224                    continue;
3225                };
3226                self.qlog
3227                    .emit_packet_lost(packet, &info, loss_delay, pn_space.kind(), now);
3228                self.paths
3229                    .get_mut(&path_id)
3230                    .unwrap()
3231                    .remove_in_flight(&info);
3232
3233                for frame in info.stream_frames {
3234                    self.streams.retransmit(frame);
3235                }
3236                self.spaces[pn_space].pending |= info.retransmits;
3237                self.path_data_mut(path_id)
3238                    .mtud
3239                    .on_non_probe_lost(packet, info.size);
3240
3241                self.spaces[pn_space].for_path(path_id).lost_packets.insert(
3242                    packet,
3243                    LostPacket {
3244                        time_sent: info.time_sent,
3245                    },
3246                );
3247            }
3248
3249            let path = self.path_data_mut(path_id);
3250            if path.mtud.black_hole_detected(now) {
3251                path.congestion.on_mtu_update(path.mtud.current_mtu());
3252                if let Some(max_datagram_size) = self.datagrams().max_size()
3253                    && self.datagrams.drop_oversized(max_datagram_size)
3254                    && self.datagrams.send_blocked
3255                {
3256                    self.datagrams.send_blocked = false;
3257                    self.events.push_back(Event::DatagramsUnblocked);
3258                }
3259                self.path_stats
3260                    .entry(path_id)
3261                    .or_default()
3262                    .black_holes_detected += 1;
3263            }
3264
3265            // Don't apply congestion penalty for lost ack-only packets
3266            let lost_ack_eliciting =
3267                old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
3268
3269            if lost_ack_eliciting {
3270                self.path_stats
3271                    .entry(path_id)
3272                    .or_default()
3273                    .congestion_events += 1;
3274                self.path_data_mut(path_id).congestion.on_congestion_event(
3275                    now,
3276                    largest_lost_sent,
3277                    in_persistent_congestion,
3278                    false,
3279                    size_of_lost_packets,
3280                );
3281            }
3282        }
3283
3284        // Handle a lost MTU probe
3285        if let Some(packet) = lost_mtu_probe {
3286            let info = self.spaces[SpaceId::Data]
3287                .for_path(path_id)
3288                .take(packet)
3289                .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and
3290            // therefore must not have been removed yet
3291            self.paths
3292                .get_mut(&path_id)
3293                .unwrap()
3294                .remove_in_flight(&info);
3295            self.path_data_mut(path_id).mtud.on_probe_lost();
3296            self.path_stats
3297                .entry(path_id)
3298                .or_default()
3299                .lost_plpmtud_probes += 1;
3300        }
3301    }
3302
3303    /// Returns the earliest time packets should be declared lost for all spaces on a path.
3304    ///
3305    /// If a path has an acknowledged packet with any prior un-acknowledged packets, the
3306    /// earliest un-acknowledged packet can be declared lost after a timeout has elapsed.
3307    /// The time returned is when this packet should be declared lost.
3308    fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
3309        SpaceId::iter()
3310            .filter_map(|id| {
3311                self.spaces[id]
3312                    .number_spaces
3313                    .get(&path_id)
3314                    .and_then(|pns| pns.loss_time)
3315                    .map(|time| (time, id))
3316            })
3317            .min_by_key(|&(time, _)| time)
3318    }
3319
3320    /// Returns the earliest next PTO should fire for all spaces on a path.
3321    fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3322        let path = self.path(path_id)?;
3323        let pto_count = path.pto_count;
3324        let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
3325        let mut duration = path.rtt.pto_base() * backoff;
3326
3327        if path_id == PathId::ZERO
3328            && path.in_flight.ack_eliciting == 0
3329            && !self.peer_completed_address_validation(PathId::ZERO)
3330        {
3331            // Address Validation during Connection Establishment:
3332            // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
3333            // deadlock if an Initial or Handshake packet from the server is lost and the
3334            // server can not send more due to its anti-amplification limit the client must
3335            // send another packet on PTO.
3336            let space = match self.highest_space {
3337                SpaceKind::Handshake => SpaceId::Handshake,
3338                _ => SpaceId::Initial,
3339            };
3340
3341            return Some((now + duration, space));
3342        }
3343
3344        let mut result = None;
3345        for space in SpaceId::iter() {
3346            let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3347                continue;
3348            };
3349
3350            if !pns.has_in_flight() {
3351                continue;
3352            }
3353            if space == SpaceId::Data {
3354                // Skip ApplicationData until handshake completes.
3355                if self.is_handshaking() {
3356                    return result;
3357                }
3358                // Include max_ack_delay and backoff for ApplicationData.
3359                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3360            }
3361            let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3362                continue;
3363            };
3364            let pto = last_ack_eliciting + duration;
3365            if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3366                if path.anti_amplification_blocked(1) {
3367                    // Nothing would be able to be sent.
3368                    continue;
3369                }
3370                if path.in_flight.ack_eliciting == 0 {
3371                    // Nothing ack-eliciting, no PTO to arm/fire.
3372                    continue;
3373                }
3374                result = Some((pto, space));
3375            }
3376        }
3377        result
3378    }
3379
3380    fn peer_completed_address_validation(&self, path: PathId) -> bool {
3381        // TODO(flub): This logic needs updating for multipath
3382        if self.side.is_server() || self.state.is_closed() {
3383            return true;
3384        }
3385        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
3386        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
3387        self.spaces[SpaceId::Handshake]
3388            .path_space(PathId::ZERO)
3389            .and_then(|pns| pns.largest_acked_packet)
3390            .is_some()
3391            || self.spaces[SpaceId::Data]
3392                .path_space(path)
3393                .and_then(|pns| pns.largest_acked_packet)
3394                .is_some()
3395            || (self.crypto_state.has_keys(EncryptionLevel::OneRtt)
3396                && !self.crypto_state.has_keys(EncryptionLevel::Handshake))
3397    }
3398
3399    /// Resets the the [`PathTimer::LossDetection`] timer to the next instant it may be needed
3400    ///
3401    /// The timer must fire if either:
3402    /// - An ack-eliciting packet we sent needs to be declared lost.
3403    /// - A tail-loss probe needs to be sent.
3404    ///
3405    /// See [`Connection::on_loss_detection_timeout`] for details.
3406    fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3407        if self.state.is_closed() {
3408            // No loss detection takes place on closed connections, and `close_common` already
3409            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
3410            // reordered packet being handled by state-insensitive code.
3411            return;
3412        }
3413
3414        if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3415            // Time threshold loss detection.
3416            self.timers.set(
3417                Timer::PerPath(path_id, PathTimer::LossDetection),
3418                loss_time,
3419                self.qlog.with_time(now),
3420            );
3421            return;
3422        }
3423
3424        // Determine which PN space to arm PTO for.
3425        // Calculate PTO duration
3426        if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3427            self.timers.set(
3428                Timer::PerPath(path_id, PathTimer::LossDetection),
3429                timeout,
3430                self.qlog.with_time(now),
3431            );
3432        } else {
3433            self.timers.stop(
3434                Timer::PerPath(path_id, PathTimer::LossDetection),
3435                self.qlog.with_time(now),
3436            );
3437        }
3438    }
3439
3440    /// The maximum probe timeout across all paths
3441    ///
3442    /// See [`Connection::pto`]
3443    fn max_pto_for_space(&self, space: SpaceKind) -> Duration {
3444        self.paths
3445            .keys()
3446            .map(|path_id| self.pto(space, *path_id))
3447            .max()
3448            .expect("there should be at least one path")
3449    }
3450
3451    /// Probe Timeout
3452    ///
3453    /// The PTO is logically the time in which you'd expect to receive an acknowledgement
3454    /// for a packet. So approximately RTT + max_ack_delay.
3455    fn pto(&self, space: SpaceKind, path_id: PathId) -> Duration {
3456        let max_ack_delay = match space {
3457            SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3458            SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3459        };
3460        self.path_data(path_id).rtt.pto_base() + max_ack_delay
3461    }
3462
3463    fn on_packet_authenticated(
3464        &mut self,
3465        now: Instant,
3466        space_id: SpaceKind,
3467        path_id: PathId,
3468        ecn: Option<EcnCodepoint>,
3469        packet: Option<u64>,
3470        spin: bool,
3471        is_1rtt: bool,
3472    ) {
3473        self.total_authed_packets += 1;
3474        self.reset_keep_alive(path_id, now);
3475        self.reset_idle_timeout(now, space_id, path_id);
3476        self.permit_idle_reset = true;
3477        self.receiving_ecn |= ecn.is_some();
3478        if let Some(x) = ecn {
3479            let space = &mut self.spaces[space_id];
3480            space.for_path(path_id).ecn_counters += x;
3481
3482            if x.is_ce() {
3483                space
3484                    .for_path(path_id)
3485                    .pending_acks
3486                    .set_immediate_ack_required();
3487            }
3488        }
3489
3490        let packet = match packet {
3491            Some(x) => x,
3492            None => return,
3493        };
3494        match &self.side {
3495            ConnectionSide::Client { .. } => {
3496                // If we received a handshake packet that authenticated, then we're talking to
3497                // the real server.  From now on we should no longer allow the server to migrate
3498                // its address.
3499                if space_id == SpaceKind::Handshake
3500                    && let Some(hs) = self.state.as_handshake_mut()
3501                {
3502                    hs.allow_server_migration = false;
3503                }
3504            }
3505            ConnectionSide::Server { .. } => {
3506                if self.crypto_state.has_keys(EncryptionLevel::Initial)
3507                    && space_id == SpaceKind::Handshake
3508                {
3509                    // A server stops sending and processing Initial packets when it receives its first Handshake packet.
3510                    self.discard_space(now, SpaceKind::Initial);
3511                }
3512                if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) && is_1rtt {
3513                    // Discard 0-RTT keys soon after receiving a 1-RTT packet
3514                    self.set_key_discard_timer(now, space_id)
3515                }
3516            }
3517        }
3518        let space = self.spaces[space_id].for_path(path_id);
3519        space.pending_acks.insert_one(packet, now);
3520        if packet >= space.rx_packet.unwrap_or_default() {
3521            space.rx_packet = Some(packet);
3522            // Update outgoing spin bit, inverting iff we're the client
3523            self.spin = self.side.is_client() ^ spin;
3524        }
3525    }
3526
3527    /// Resets the idle timeout timers
3528    ///
3529    /// Without multipath there is only the connection-wide idle timeout. When multipath is
3530    /// enabled there is an additional per-path idle timeout.
3531    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceKind, path_id: PathId) {
3532        // First reset the global idle timeout.
3533        if let Some(timeout) = self.idle_timeout {
3534            if self.state.is_closed() {
3535                self.timers
3536                    .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3537            } else {
3538                let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space));
3539                self.timers.set(
3540                    Timer::Conn(ConnTimer::Idle),
3541                    now + dt,
3542                    self.qlog.with_time(now),
3543                );
3544            }
3545        }
3546
3547        // Now handle the per-path state
3548        if let Some(timeout) = self.path_data(path_id).idle_timeout {
3549            if self.state.is_closed() {
3550                self.timers.stop(
3551                    Timer::PerPath(path_id, PathTimer::PathIdle),
3552                    self.qlog.with_time(now),
3553                );
3554            } else {
3555                let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3556                self.timers.set(
3557                    Timer::PerPath(path_id, PathTimer::PathIdle),
3558                    now + dt,
3559                    self.qlog.with_time(now),
3560                );
3561            }
3562        }
3563    }
3564
3565    /// Resets both the [`ConnTimer::KeepAlive`] and [`PathTimer::PathKeepAlive`] timers
3566    fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3567        if !self.state.is_established() {
3568            return;
3569        }
3570
3571        if let Some(interval) = self.config.keep_alive_interval {
3572            self.timers.set(
3573                Timer::Conn(ConnTimer::KeepAlive),
3574                now + interval,
3575                self.qlog.with_time(now),
3576            );
3577        }
3578
3579        if let Some(interval) = self.path_data(path_id).keep_alive {
3580            self.timers.set(
3581                Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3582                now + interval,
3583                self.qlog.with_time(now),
3584            );
3585        }
3586    }
3587
3588    /// Sets the timer for when a previously issued CID should be retired next
3589    fn reset_cid_retirement(&mut self, now: Instant) {
3590        if let Some((_path, t)) = self.next_cid_retirement() {
3591            self.timers.set(
3592                Timer::Conn(ConnTimer::PushNewCid),
3593                t,
3594                self.qlog.with_time(now),
3595            );
3596        }
3597    }
3598
3599    /// The next time when a previously issued CID should be retired
3600    fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3601        self.local_cid_state
3602            .iter()
3603            .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3604            .min_by_key(|(_path_id, timeout)| *timeout)
3605    }
3606
3607    /// Handle the already-decrypted first packet from the client
3608    ///
3609    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
3610    /// efficient.
3611    pub(crate) fn handle_first_packet(
3612        &mut self,
3613        now: Instant,
3614        network_path: FourTuple,
3615        ecn: Option<EcnCodepoint>,
3616        packet_number: u64,
3617        packet: InitialPacket,
3618        remaining: Option<BytesMut>,
3619    ) -> Result<(), ConnectionError> {
3620        let span = trace_span!("first recv");
3621        let _guard = span.enter();
3622        debug_assert!(self.side.is_server());
3623        let len = packet.header_data.len() + packet.payload.len();
3624        let path_id = PathId::ZERO;
3625        self.path_data_mut(path_id).total_recvd = len as u64;
3626
3627        if let Some(hs) = self.state.as_handshake_mut() {
3628            hs.expected_token = packet.header.token.clone();
3629        } else {
3630            unreachable!("first packet must be delivered in Handshake state");
3631        }
3632
3633        // The first packet is always on PathId::ZERO
3634        self.on_packet_authenticated(
3635            now,
3636            SpaceKind::Initial,
3637            path_id,
3638            ecn,
3639            Some(packet_number),
3640            false,
3641            false,
3642        );
3643
3644        let packet: Packet = packet.into();
3645
3646        let mut qlog = QlogRecvPacket::new(len);
3647        qlog.header(&packet.header, Some(packet_number), path_id);
3648
3649        self.process_decrypted_packet(
3650            now,
3651            network_path,
3652            path_id,
3653            Some(packet_number),
3654            packet,
3655            &mut qlog,
3656        )?;
3657        self.qlog.emit_packet_received(qlog, now);
3658        if let Some(data) = remaining {
3659            self.handle_coalesced(now, network_path, path_id, ecn, data);
3660        }
3661
3662        self.qlog.emit_recovery_metrics(
3663            path_id,
3664            &mut self.paths.get_mut(&path_id).unwrap().data,
3665            now,
3666        );
3667
3668        Ok(())
3669    }
3670
3671    fn init_0rtt(&mut self, now: Instant) {
3672        let (header, packet) = match self.crypto_state.session.early_crypto() {
3673            Some(x) => x,
3674            None => return,
3675        };
3676        if self.side.is_client() {
3677            match self.crypto_state.session.transport_parameters() {
3678                Ok(params) => {
3679                    let params = params
3680                        .expect("crypto layer didn't supply transport parameters with ticket");
3681                    // Certain values must not be cached
3682                    let params = TransportParameters {
3683                        initial_src_cid: None,
3684                        original_dst_cid: None,
3685                        preferred_address: None,
3686                        retry_src_cid: None,
3687                        stateless_reset_token: None,
3688                        min_ack_delay: None,
3689                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3690                        max_ack_delay: TransportParameters::default().max_ack_delay,
3691                        initial_max_path_id: None,
3692                        ..params
3693                    };
3694                    self.set_peer_params(params);
3695                    self.qlog.emit_peer_transport_params_restored(self, now);
3696                }
3697                Err(e) => {
3698                    error!("session ticket has malformed transport parameters: {}", e);
3699                    return;
3700                }
3701            }
3702        }
3703        trace!("0-RTT enabled");
3704        self.crypto_state.enable_zero_rtt(header, packet);
3705    }
3706
3707    fn read_crypto(
3708        &mut self,
3709        space: SpaceId,
3710        crypto: &frame::Crypto,
3711        payload_len: usize,
3712    ) -> Result<(), TransportError> {
3713        let expected = if !self.state.is_handshake() {
3714            SpaceId::Data
3715        } else if self.highest_space == SpaceKind::Initial {
3716            SpaceId::Initial
3717        } else {
3718            // On the server, self.highest_space can be Data after receiving the client's first
3719            // flight, but we expect Handshake CRYPTO until the handshake is complete.
3720            SpaceId::Handshake
3721        };
3722        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
3723        // packets are illegal, and we don't process 1-RTT packets until the handshake is
3724        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
3725        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3726
3727        let end = crypto.offset + crypto.data.len() as u64;
3728        if space < expected
3729            && end
3730                > self.crypto_state.spaces[space.kind()]
3731                    .crypto_stream
3732                    .bytes_read()
3733        {
3734            warn!(
3735                "received new {:?} CRYPTO data when expecting {:?}",
3736                space, expected
3737            );
3738            return Err(TransportError::PROTOCOL_VIOLATION(
3739                "new data at unexpected encryption level",
3740            ));
3741        }
3742
3743        let crypto_space = &mut self.crypto_state.spaces[space.kind()];
3744        let max = end.saturating_sub(crypto_space.crypto_stream.bytes_read());
3745        if max > self.config.crypto_buffer_size as u64 {
3746            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3747        }
3748
3749        crypto_space
3750            .crypto_stream
3751            .insert(crypto.offset, crypto.data.clone(), payload_len);
3752        while let Some(chunk) = crypto_space.crypto_stream.read(usize::MAX, true) {
3753            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3754            if self.crypto_state.session.read_handshake(&chunk.bytes)? {
3755                self.events.push_back(Event::HandshakeDataReady);
3756            }
3757        }
3758
3759        Ok(())
3760    }
3761
3762    fn write_crypto(&mut self) {
3763        loop {
3764            let space = self.highest_space;
3765            let mut outgoing = Vec::new();
3766            if let Some(crypto) = self.crypto_state.session.write_handshake(&mut outgoing) {
3767                match space {
3768                    SpaceKind::Initial => {
3769                        self.upgrade_crypto(SpaceKind::Handshake, crypto);
3770                    }
3771                    SpaceKind::Handshake => {
3772                        self.upgrade_crypto(SpaceKind::Data, crypto);
3773                    }
3774                    SpaceKind::Data => unreachable!("got updated secrets during 1-RTT"),
3775                }
3776            }
3777            if outgoing.is_empty() {
3778                if space == self.highest_space {
3779                    break;
3780                } else {
3781                    // Keys updated, check for more data to send
3782                    continue;
3783                }
3784            }
3785            let offset = self.crypto_state.spaces[space].crypto_offset;
3786            let outgoing = Bytes::from(outgoing);
3787            if let Some(hs) = self.state.as_handshake_mut()
3788                && space == SpaceKind::Initial
3789                && offset == 0
3790                && self.side.is_client()
3791            {
3792                hs.client_hello = Some(outgoing.clone());
3793            }
3794            self.crypto_state.spaces[space].crypto_offset += outgoing.len() as u64;
3795            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3796            self.spaces[space].pending.crypto.push_back(frame::Crypto {
3797                offset,
3798                data: outgoing,
3799            });
3800        }
3801    }
3802
3803    /// Switch to stronger cryptography during handshake
3804    fn upgrade_crypto(&mut self, space: SpaceKind, crypto: Keys) {
3805        debug_assert!(
3806            !self.crypto_state.has_keys(space.encryption_level()),
3807            "already reached packet space {space:?}"
3808        );
3809        trace!("{:?} keys ready", space);
3810        if space == SpaceKind::Data {
3811            // Precompute the first key update
3812            self.crypto_state.next_crypto = Some(
3813                self.crypto_state
3814                    .session
3815                    .next_1rtt_keys()
3816                    .expect("handshake should be complete"),
3817            );
3818        }
3819
3820        self.crypto_state.spaces[space].keys = Some(crypto);
3821        debug_assert!(space > self.highest_space);
3822        self.highest_space = space;
3823        if space == SpaceKind::Data && self.side.is_client() {
3824            // Discard 0-RTT keys because 1-RTT keys are available.
3825            self.crypto_state.discard_zero_rtt();
3826        }
3827    }
3828
3829    fn discard_space(&mut self, now: Instant, space: SpaceKind) {
3830        debug_assert!(space != SpaceKind::Data);
3831        trace!("discarding {:?} keys", space);
3832        if space == SpaceKind::Initial {
3833            // No longer needed
3834            if let ConnectionSide::Client { token, .. } = &mut self.side {
3835                *token = Bytes::new();
3836            }
3837        }
3838        self.crypto_state.spaces[space].keys = None;
3839        let space = &mut self.spaces[space];
3840        let pns = space.for_path(PathId::ZERO);
3841        pns.time_of_last_ack_eliciting_packet = None;
3842        pns.loss_time = None;
3843        pns.loss_probes = 0;
3844        let sent_packets = mem::take(&mut pns.sent_packets);
3845        let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3846        for (_, packet) in sent_packets.into_iter() {
3847            path.data.remove_in_flight(&packet);
3848        }
3849
3850        self.set_loss_detection_timer(now, PathId::ZERO)
3851    }
3852
3853    fn handle_coalesced(
3854        &mut self,
3855        now: Instant,
3856        network_path: FourTuple,
3857        path_id: PathId,
3858        ecn: Option<EcnCodepoint>,
3859        data: BytesMut,
3860    ) {
3861        self.path_data_mut(path_id)
3862            .inc_total_recvd(data.len() as u64);
3863        let mut remaining = Some(data);
3864        let cid_len = self
3865            .local_cid_state
3866            .values()
3867            .map(|cid_state| cid_state.cid_len())
3868            .next()
3869            .expect("one cid_state must exist");
3870        while let Some(data) = remaining {
3871            match PartialDecode::new(
3872                data,
3873                &FixedLengthConnectionIdParser::new(cid_len),
3874                &[self.version],
3875                self.endpoint_config.grease_quic_bit,
3876            ) {
3877                Ok((partial_decode, rest)) => {
3878                    remaining = rest;
3879                    self.handle_decode(now, network_path, path_id, ecn, partial_decode);
3880                }
3881                Err(e) => {
3882                    trace!("malformed header: {}", e);
3883                    return;
3884                }
3885            }
3886        }
3887    }
3888
3889    fn handle_decode(
3890        &mut self,
3891        now: Instant,
3892        network_path: FourTuple,
3893        path_id: PathId,
3894        ecn: Option<EcnCodepoint>,
3895        partial_decode: PartialDecode,
3896    ) {
3897        let qlog = QlogRecvPacket::new(partial_decode.len());
3898        if let Some(decoded) = self
3899            .crypto_state
3900            .unprotect_header(partial_decode, self.peer_params.stateless_reset_token)
3901        {
3902            self.handle_packet(
3903                now,
3904                network_path,
3905                path_id,
3906                ecn,
3907                decoded.packet,
3908                decoded.stateless_reset,
3909                qlog,
3910            );
3911        }
3912    }
3913
3914    fn handle_packet(
3915        &mut self,
3916        now: Instant,
3917        network_path: FourTuple,
3918        path_id: PathId,
3919        ecn: Option<EcnCodepoint>,
3920        packet: Option<Packet>,
3921        stateless_reset: bool,
3922        mut qlog: QlogRecvPacket,
3923    ) {
3924        self.stats.udp_rx.ios += 1;
3925        self.path_stats.entry(path_id).or_default().udp_rx.ios += 1;
3926
3927        if let Some(ref packet) = packet {
3928            trace!(
3929                "got {:?} packet ({} bytes) from {} using id {}",
3930                packet.header.space(),
3931                packet.payload.len() + packet.header_data.len(),
3932                network_path,
3933                packet.header.dst_cid(),
3934            );
3935        }
3936
3937        if self.is_handshaking() {
3938            if path_id != PathId::ZERO {
3939                debug!(%network_path, %path_id, "discarding multipath packet during handshake");
3940                return;
3941            }
3942            if network_path != self.path_data_mut(path_id).network_path {
3943                if let Some(hs) = self.state.as_handshake() {
3944                    if hs.allow_server_migration {
3945                        trace!(%network_path, prev = %self.path_data(path_id).network_path, "server migrated to new remote");
3946                        self.path_data_mut(path_id).network_path = network_path;
3947                        self.qlog.emit_tuple_assigned(path_id, network_path, now);
3948                    } else {
3949                        debug!("discarding packet with unexpected remote during handshake");
3950                        return;
3951                    }
3952                } else {
3953                    debug!("discarding packet with unexpected remote during handshake");
3954                    return;
3955                }
3956            }
3957        }
3958
3959        let was_closed = self.state.is_closed();
3960        let was_drained = self.state.is_drained();
3961
3962        let decrypted = match packet {
3963            None => Err(None),
3964            Some(mut packet) => self
3965                .decrypt_packet(now, path_id, &mut packet)
3966                .map(move |number| (packet, number)),
3967        };
3968        let result = match decrypted {
3969            _ if stateless_reset => {
3970                debug!("got stateless reset");
3971                Err(ConnectionError::Reset)
3972            }
3973            Err(Some(e)) => {
3974                warn!("illegal packet: {}", e);
3975                Err(e.into())
3976            }
3977            Err(None) => {
3978                debug!("failed to authenticate packet");
3979                self.authentication_failures += 1;
3980                let integrity_limit = self
3981                    .crypto_state
3982                    .integrity_limit(self.highest_space)
3983                    .unwrap();
3984                if self.authentication_failures > integrity_limit {
3985                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3986                } else {
3987                    return;
3988                }
3989            }
3990            Ok((packet, number)) => {
3991                qlog.header(&packet.header, number, path_id);
3992                let span = match number {
3993                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3994                    None => trace_span!("recv", space = ?packet.header.space()),
3995                };
3996                let _guard = span.enter();
3997
3998                let dedup = self.spaces[packet.header.space()]
3999                    .path_space_mut(path_id)
4000                    .map(|pns| &mut pns.dedup);
4001                if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
4002                    debug!("discarding possible duplicate packet");
4003                    self.qlog.emit_packet_received(qlog, now);
4004                    return;
4005                } else if self.state.is_handshake() && packet.header.is_short() {
4006                    // TODO: SHOULD buffer these to improve reordering tolerance.
4007                    trace!("dropping short packet during handshake");
4008                    self.qlog.emit_packet_received(qlog, now);
4009                    return;
4010                } else {
4011                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header
4012                        && let Some(hs) = self.state.as_handshake()
4013                        && self.side.is_server()
4014                        && token != &hs.expected_token
4015                    {
4016                        // Clients must send the same retry token in every Initial. Initial
4017                        // packets can be spoofed, so we discard rather than killing the
4018                        // connection.
4019                        warn!("discarding Initial with invalid retry token");
4020                        self.qlog.emit_packet_received(qlog, now);
4021                        return;
4022                    }
4023
4024                    if !self.state.is_closed() {
4025                        let spin = match packet.header {
4026                            Header::Short { spin, .. } => spin,
4027                            _ => false,
4028                        };
4029
4030                        if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
4031                            // Only the client is allowed to open paths
4032                            self.ensure_path(path_id, network_path, now, number);
4033                        }
4034                        if self.paths.contains_key(&path_id) {
4035                            self.on_packet_authenticated(
4036                                now,
4037                                packet.header.space(),
4038                                path_id,
4039                                ecn,
4040                                number,
4041                                spin,
4042                                packet.header.is_1rtt(),
4043                            );
4044                        }
4045                    }
4046
4047                    let res = self.process_decrypted_packet(
4048                        now,
4049                        network_path,
4050                        path_id,
4051                        number,
4052                        packet,
4053                        &mut qlog,
4054                    );
4055
4056                    self.qlog.emit_packet_received(qlog, now);
4057                    res
4058                }
4059            }
4060        };
4061
4062        // State transitions for error cases
4063        if let Err(conn_err) = result {
4064            match conn_err {
4065                ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
4066                ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
4067                ConnectionError::Reset
4068                | ConnectionError::TransportError(TransportError {
4069                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
4070                    ..
4071                }) => {
4072                    self.state.move_to_drained(Some(conn_err));
4073                }
4074                ConnectionError::TimedOut => {
4075                    unreachable!("timeouts aren't generated by packet processing");
4076                }
4077                ConnectionError::TransportError(err) => {
4078                    debug!("closing connection due to transport error: {}", err);
4079                    self.state.move_to_closed(err);
4080                }
4081                ConnectionError::VersionMismatch => {
4082                    self.state.move_to_draining(Some(conn_err));
4083                }
4084                ConnectionError::LocallyClosed => {
4085                    unreachable!("LocallyClosed isn't generated by packet processing");
4086                }
4087                ConnectionError::CidsExhausted => {
4088                    unreachable!("CidsExhausted isn't generated by packet processing");
4089                }
4090            };
4091        }
4092
4093        if !was_closed && self.state.is_closed() {
4094            self.close_common();
4095            if !self.state.is_drained() {
4096                self.set_close_timer(now);
4097            }
4098        }
4099        if !was_drained && self.state.is_drained() {
4100            self.endpoint_events.push_back(EndpointEventInner::Drained);
4101            // Close timer may have been started previously, e.g. if we sent a close and got a
4102            // stateless reset in response
4103            self.timers
4104                .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
4105        }
4106
4107        // Transmit CONNECTION_CLOSE if necessary.
4108        //
4109        // If we received a valid packet and we are in the closed state we should respond
4110        // with a CONNECTION_CLOSE frame.
4111        // TODO: This SHOULD be rate-limited according to §10.2.1 of QUIC-TRANSPORT, but
4112        //    that does not yet happen. This is triggered by each received packet.
4113        if matches!(self.state.as_type(), StateType::Closed) {
4114            // From https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2.1-7
4115            //
4116            // While in the closing state we must either:
4117            // - discard packets coming from an un-validated remote OR
4118            // - ensure we do not send more than 3 times the received data
4119            //
4120            // Doing the 2nd would mean we would be able to send CONNECTION_CLOSE to a peer
4121            // who was (involuntary) migrated just at the time we initiated immediate
4122            // close. It is a lot more work though. So while we would like to do this for
4123            // now we only do 1.
4124            //
4125            // Another shortcoming of the current implementation is that when we have a
4126            // previous PathData which is validated and the remote matches that path, we
4127            // should schedule CONNECTION_CLOSE on that path. However currently we can not
4128            // schedule such a packet. We should also fix this some day. This makes us
4129            // vulnerable to an attacker faking a migration at the right time and then we'd
4130            // be unable to send the CONNECTION_CLOSE to the real remote.
4131            if self
4132                .paths
4133                .get(&path_id)
4134                .map(|p| p.data.validated && p.data.network_path == network_path)
4135                .unwrap_or(false)
4136            {
4137                self.connection_close_pending = true;
4138            }
4139        }
4140    }
4141
4142    fn process_decrypted_packet(
4143        &mut self,
4144        now: Instant,
4145        network_path: FourTuple,
4146        path_id: PathId,
4147        number: Option<u64>,
4148        packet: Packet,
4149        qlog: &mut QlogRecvPacket,
4150    ) -> Result<(), ConnectionError> {
4151        if !self.paths.contains_key(&path_id) {
4152            // There is a chance this is a server side, first (for this path) packet, which would
4153            // be a protocol violation. It's more likely, however, that this is a packet of a
4154            // pruned path
4155            trace!(%path_id, ?number, "discarding packet for unknown path");
4156            return Ok(());
4157        }
4158        let state = match self.state.as_type() {
4159            StateType::Established => {
4160                match packet.header.space() {
4161                    SpaceKind::Data => self.process_payload(
4162                        now,
4163                        network_path,
4164                        path_id,
4165                        number.unwrap(),
4166                        packet,
4167                        qlog,
4168                    )?,
4169                    _ if packet.header.has_frames() => {
4170                        self.process_early_payload(now, path_id, packet, qlog)?
4171                    }
4172                    _ => {
4173                        trace!("discarding unexpected pre-handshake packet");
4174                    }
4175                }
4176                return Ok(());
4177            }
4178            StateType::Closed => {
4179                for result in frame::Iter::new(packet.payload.freeze())? {
4180                    let frame = match result {
4181                        Ok(frame) => frame,
4182                        Err(err) => {
4183                            debug!("frame decoding error: {err:?}");
4184                            continue;
4185                        }
4186                    };
4187                    qlog.frame(&frame);
4188
4189                    if let Frame::Padding = frame {
4190                        continue;
4191                    };
4192
4193                    self.stats.frame_rx.record(frame.ty());
4194
4195                    if let Frame::Close(_error) = frame {
4196                        self.state.move_to_draining(None);
4197                        break;
4198                    }
4199                }
4200                return Ok(());
4201            }
4202            StateType::Draining | StateType::Drained => return Ok(()),
4203            StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
4204        };
4205
4206        match packet.header {
4207            Header::Retry {
4208                src_cid: remote_cid,
4209                ..
4210            } => {
4211                debug_assert_eq!(path_id, PathId::ZERO);
4212                if self.side.is_server() {
4213                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
4214                }
4215
4216                let is_valid_retry = self
4217                    .remote_cids
4218                    .get(&path_id)
4219                    .map(|cids| cids.active())
4220                    .map(|orig_dst_cid| {
4221                        self.crypto_state.session.is_valid_retry(
4222                            orig_dst_cid,
4223                            &packet.header_data,
4224                            &packet.payload,
4225                        )
4226                    })
4227                    .unwrap_or_default();
4228                if self.total_authed_packets > 1
4229                            || packet.payload.len() <= 16 // token + 16 byte tag
4230                            || !is_valid_retry
4231                {
4232                    trace!("discarding invalid Retry");
4233                    // - After the client has received and processed an Initial or Retry
4234                    //   packet from the server, it MUST discard any subsequent Retry
4235                    //   packets that it receives.
4236                    // - A client MUST discard a Retry packet with a zero-length Retry Token
4237                    //   field.
4238                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
4239                    //   that cannot be validated
4240                    return Ok(());
4241                }
4242
4243                trace!("retrying with CID {}", remote_cid);
4244                let client_hello = state.client_hello.take().unwrap();
4245                self.retry_src_cid = Some(remote_cid);
4246                self.remote_cids
4247                    .get_mut(&path_id)
4248                    .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
4249                    .update_initial_cid(remote_cid);
4250                self.remote_handshake_cid = remote_cid;
4251
4252                let space = &mut self.spaces[SpaceId::Initial];
4253                if let Some(info) = space.for_path(PathId::ZERO).take(0) {
4254                    self.on_packet_acked(now, PathId::ZERO, info);
4255                };
4256
4257                self.discard_space(now, SpaceKind::Initial); // Make sure we clean up after
4258                // any retransmitted Initials
4259                let crypto_space = &mut self.crypto_state.spaces[SpaceKind::Initial];
4260                crypto_space.keys = Some(
4261                    self.crypto_state
4262                        .session
4263                        .initial_keys(remote_cid, self.side.side()),
4264                );
4265                crypto_space.crypto_offset = client_hello.len() as u64;
4266
4267                let next_pn = self.spaces[SpaceId::Initial]
4268                    .for_path(path_id)
4269                    .next_packet_number;
4270                self.spaces[SpaceId::Initial] = {
4271                    let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
4272                    space.for_path(path_id).next_packet_number = next_pn;
4273                    space.pending.crypto.push_back(frame::Crypto {
4274                        offset: 0,
4275                        data: client_hello,
4276                    });
4277                    space
4278                };
4279
4280                // Retransmit all 0-RTT data
4281                let zero_rtt = mem::take(
4282                    &mut self.spaces[SpaceId::Data]
4283                        .for_path(PathId::ZERO)
4284                        .sent_packets,
4285                );
4286                for (_, info) in zero_rtt.into_iter() {
4287                    self.paths
4288                        .get_mut(&PathId::ZERO)
4289                        .unwrap()
4290                        .remove_in_flight(&info);
4291                    self.spaces[SpaceId::Data].pending |= info.retransmits;
4292                }
4293                self.streams.retransmit_all_for_0rtt();
4294
4295                let token_len = packet.payload.len() - 16;
4296                let ConnectionSide::Client { ref mut token, .. } = self.side else {
4297                    unreachable!("we already short-circuited if we're server");
4298                };
4299                *token = packet.payload.freeze().split_to(token_len);
4300
4301                self.state = State::handshake(state::Handshake {
4302                    expected_token: Bytes::new(),
4303                    remote_cid_set: false,
4304                    client_hello: None,
4305                    allow_server_migration: true,
4306                });
4307                Ok(())
4308            }
4309            Header::Long {
4310                ty: LongType::Handshake,
4311                src_cid: remote_cid,
4312                dst_cid: local_cid,
4313                ..
4314            } => {
4315                debug_assert_eq!(path_id, PathId::ZERO);
4316                if remote_cid != self.remote_handshake_cid {
4317                    debug!(
4318                        "discarding packet with mismatched remote CID: {} != {}",
4319                        self.remote_handshake_cid, remote_cid
4320                    );
4321                    return Ok(());
4322                }
4323                self.on_path_validated(path_id);
4324
4325                self.process_early_payload(now, path_id, packet, qlog)?;
4326                if self.state.is_closed() {
4327                    return Ok(());
4328                }
4329
4330                if self.crypto_state.session.is_handshaking() {
4331                    trace!("handshake ongoing");
4332                    return Ok(());
4333                }
4334
4335                if self.side.is_client() {
4336                    // Client-only because server params were set from the client's Initial
4337                    let params = self
4338                        .crypto_state
4339                        .session
4340                        .transport_parameters()?
4341                        .ok_or_else(|| {
4342                            TransportError::new(
4343                                TransportErrorCode::crypto(0x6d),
4344                                "transport parameters missing".to_owned(),
4345                            )
4346                        })?;
4347
4348                    if self.has_0rtt() {
4349                        if !self.crypto_state.session.early_data_accepted().unwrap() {
4350                            debug_assert!(self.side.is_client());
4351                            debug!("0-RTT rejected");
4352                            self.crypto_state.accepted_0rtt = false;
4353                            self.streams.zero_rtt_rejected();
4354
4355                            // Discard already-queued frames
4356                            self.spaces[SpaceId::Data].pending = Retransmits::default();
4357
4358                            // Discard 0-RTT packets
4359                            let sent_packets = mem::take(
4360                                &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4361                            );
4362                            for (_, packet) in sent_packets.into_iter() {
4363                                self.paths
4364                                    .get_mut(&path_id)
4365                                    .unwrap()
4366                                    .remove_in_flight(&packet);
4367                            }
4368                        } else {
4369                            self.crypto_state.accepted_0rtt = true;
4370                            params.validate_resumption_from(&self.peer_params)?;
4371                        }
4372                    }
4373                    if let Some(token) = params.stateless_reset_token {
4374                        let remote = self.path_data(path_id).network_path.remote;
4375                        debug_assert!(!self.state.is_drained()); // requirement for endpoint events, checked above
4376                        self.endpoint_events
4377                            .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4378                    }
4379                    self.handle_peer_params(params, local_cid, remote_cid, now)?;
4380                    self.issue_first_cids(now);
4381                } else {
4382                    // Server-only
4383                    self.spaces[SpaceId::Data].pending.handshake_done = true;
4384                    self.discard_space(now, SpaceKind::Handshake);
4385                    self.events.push_back(Event::HandshakeConfirmed);
4386                    trace!("handshake confirmed");
4387                }
4388
4389                self.events.push_back(Event::Connected);
4390                self.state.move_to_established();
4391                trace!("established");
4392
4393                // Multipath can only be enabled after the state has reached Established.
4394                // So this can not happen any earlier.
4395                self.issue_first_path_cids(now);
4396                Ok(())
4397            }
4398            Header::Initial(InitialHeader {
4399                src_cid: remote_cid,
4400                dst_cid: local_cid,
4401                ..
4402            }) => {
4403                debug_assert_eq!(path_id, PathId::ZERO);
4404                if !state.remote_cid_set {
4405                    trace!("switching remote CID to {}", remote_cid);
4406                    let mut state = state.clone();
4407                    self.remote_cids
4408                        .get_mut(&path_id)
4409                        .expect("PathId::ZERO not yet abandoned")
4410                        .update_initial_cid(remote_cid);
4411                    self.remote_handshake_cid = remote_cid;
4412                    self.original_remote_cid = remote_cid;
4413                    state.remote_cid_set = true;
4414                    self.state.move_to_handshake(state);
4415                } else if remote_cid != self.remote_handshake_cid {
4416                    debug!(
4417                        "discarding packet with mismatched remote CID: {} != {}",
4418                        self.remote_handshake_cid, remote_cid
4419                    );
4420                    return Ok(());
4421                }
4422
4423                let starting_space = self.highest_space;
4424                self.process_early_payload(now, path_id, packet, qlog)?;
4425
4426                if self.side.is_server()
4427                    && starting_space == SpaceKind::Initial
4428                    && self.highest_space != SpaceKind::Initial
4429                {
4430                    let params = self
4431                        .crypto_state
4432                        .session
4433                        .transport_parameters()?
4434                        .ok_or_else(|| {
4435                            TransportError::new(
4436                                TransportErrorCode::crypto(0x6d),
4437                                "transport parameters missing".to_owned(),
4438                            )
4439                        })?;
4440                    self.handle_peer_params(params, local_cid, remote_cid, now)?;
4441                    self.issue_first_cids(now);
4442                    self.init_0rtt(now);
4443                }
4444                Ok(())
4445            }
4446            Header::Long {
4447                ty: LongType::ZeroRtt,
4448                ..
4449            } => {
4450                self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4451                Ok(())
4452            }
4453            Header::VersionNegotiate { .. } => {
4454                if self.total_authed_packets > 1 {
4455                    return Ok(());
4456                }
4457                let supported = packet
4458                    .payload
4459                    .chunks(4)
4460                    .any(|x| match <[u8; 4]>::try_from(x) {
4461                        Ok(version) => self.version == u32::from_be_bytes(version),
4462                        Err(_) => false,
4463                    });
4464                if supported {
4465                    return Ok(());
4466                }
4467                debug!("remote doesn't support our version");
4468                Err(ConnectionError::VersionMismatch)
4469            }
4470            Header::Short { .. } => unreachable!(
4471                "short packets received during handshake are discarded in handle_packet"
4472            ),
4473        }
4474    }
4475
4476    /// Process an Initial or Handshake packet payload
4477    fn process_early_payload(
4478        &mut self,
4479        now: Instant,
4480        path_id: PathId,
4481        packet: Packet,
4482        #[allow(unused)] qlog: &mut QlogRecvPacket,
4483    ) -> Result<(), TransportError> {
4484        debug_assert_ne!(packet.header.space(), SpaceKind::Data);
4485        debug_assert_eq!(path_id, PathId::ZERO);
4486        let payload_len = packet.payload.len();
4487        let mut ack_eliciting = false;
4488        for result in frame::Iter::new(packet.payload.freeze())? {
4489            let frame = result?;
4490            qlog.frame(&frame);
4491            let span = match frame {
4492                Frame::Padding => continue,
4493                _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4494            };
4495
4496            self.stats.frame_rx.record(frame.ty());
4497
4498            let _guard = span.as_ref().map(|x| x.enter());
4499            ack_eliciting |= frame.is_ack_eliciting();
4500
4501            // Process frames
4502            if frame.is_1rtt() && packet.header.space() != SpaceKind::Data {
4503                return Err(TransportError::PROTOCOL_VIOLATION(
4504                    "illegal frame type in handshake",
4505                ));
4506            }
4507
4508            match frame {
4509                Frame::Padding | Frame::Ping => {}
4510                Frame::Crypto(frame) => {
4511                    self.read_crypto(packet.header.space().into(), &frame, payload_len)?;
4512                }
4513                Frame::Ack(ack) => {
4514                    self.on_ack_received(now, packet.header.space().into(), ack)?;
4515                }
4516                Frame::PathAck(ack) => {
4517                    span.as_ref()
4518                        .map(|span| span.record("path", tracing::field::display(&ack.path_id)));
4519                    self.on_path_ack_received(now, packet.header.space().into(), ack)?;
4520                }
4521                Frame::Close(reason) => {
4522                    self.state.move_to_draining(Some(reason.into()));
4523                    return Ok(());
4524                }
4525                _ => {
4526                    let mut err =
4527                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4528                    err.frame = frame::MaybeFrame::Known(frame.ty());
4529                    return Err(err);
4530                }
4531            }
4532        }
4533
4534        if ack_eliciting {
4535            // In the initial and handshake spaces, ACKs must be sent immediately
4536            self.spaces[packet.header.space()]
4537                .for_path(path_id)
4538                .pending_acks
4539                .set_immediate_ack_required();
4540        }
4541
4542        self.write_crypto();
4543        Ok(())
4544    }
4545
4546    /// Processes the packet payload, always in the data space.
4547    fn process_payload(
4548        &mut self,
4549        now: Instant,
4550        network_path: FourTuple,
4551        path_id: PathId,
4552        number: u64,
4553        packet: Packet,
4554        #[allow(unused)] qlog: &mut QlogRecvPacket,
4555    ) -> Result<(), TransportError> {
4556        let is_multipath_negotiated = self.is_multipath_negotiated();
4557        let payload = packet.payload.freeze();
4558        let mut is_probing_packet = true;
4559        let mut close = None;
4560        let payload_len = payload.len();
4561        let mut ack_eliciting = false;
4562        // if this packet triggers a path migration and includes a observed address frame, it's
4563        // stored here
4564        let mut migration_observed_addr = None;
4565        for result in frame::Iter::new(payload)? {
4566            let frame = result?;
4567            qlog.frame(&frame);
4568            let span = match frame {
4569                Frame::Padding => continue,
4570                _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4571            };
4572
4573            self.stats.frame_rx.record(frame.ty());
4574            // Crypto, Stream and Datagram frames are special cased in order no pollute
4575            // the log with payload data
4576            match &frame {
4577                Frame::Crypto(f) => {
4578                    trace!(offset = f.offset, len = f.data.len(), "got frame CRYPTO");
4579                }
4580                Frame::Stream(f) => {
4581                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got frame STREAM");
4582                }
4583                Frame::Datagram(f) => {
4584                    trace!(len = f.data.len(), "got frame DATAGRAM");
4585                }
4586                f => {
4587                    trace!("got frame {f}");
4588                }
4589            }
4590
4591            let _guard = span.enter();
4592            if packet.header.is_0rtt() {
4593                match frame {
4594                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4595                        return Err(TransportError::PROTOCOL_VIOLATION(
4596                            "illegal frame type in 0-RTT",
4597                        ));
4598                    }
4599                    _ => {
4600                        if frame.is_1rtt() {
4601                            return Err(TransportError::PROTOCOL_VIOLATION(
4602                                "illegal frame type in 0-RTT",
4603                            ));
4604                        }
4605                    }
4606                }
4607            }
4608            ack_eliciting |= frame.is_ack_eliciting();
4609
4610            // Check whether this could be a probing packet
4611            match frame {
4612                Frame::Padding
4613                | Frame::PathChallenge(_)
4614                | Frame::PathResponse(_)
4615                | Frame::NewConnectionId(_)
4616                | Frame::ObservedAddr(_) => {}
4617                _ => {
4618                    is_probing_packet = false;
4619                }
4620            }
4621
4622            match frame {
4623                Frame::Crypto(frame) => {
4624                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4625                }
4626                Frame::Stream(frame) => {
4627                    if self.streams.received(frame, payload_len)?.should_transmit() {
4628                        self.spaces[SpaceId::Data].pending.max_data = true;
4629                    }
4630                }
4631                Frame::Ack(ack) => {
4632                    self.on_ack_received(now, SpaceId::Data, ack)?;
4633                }
4634                Frame::PathAck(ack) => {
4635                    span.record("path", tracing::field::display(&ack.path_id));
4636                    self.on_path_ack_received(now, SpaceId::Data, ack)?;
4637                }
4638                Frame::Padding | Frame::Ping => {}
4639                Frame::Close(reason) => {
4640                    close = Some(reason);
4641                }
4642                Frame::PathChallenge(challenge) => {
4643                    let path = &mut self
4644                        .path_mut(path_id)
4645                        .expect("payload is processed only after the path becomes known");
4646                    path.path_responses.push(number, challenge.0, network_path);
4647                    // At this point, update_network_path_or_discard was already called, so
4648                    // we don't need to be lenient about `local_ip` possibly mis-matching.
4649                    if network_path == path.network_path {
4650                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
4651                        // attack. Send a non-probing packet to recover the active path.
4652                        // TODO(flub): No longer true! We now path_challege also to validate
4653                        //    the path if the path is new, without an RFC9000-style
4654                        //    migration involved. This means we add in an extra
4655                        //    IMMEDIATE_ACK on some challenges. It isn't really wrong to do
4656                        //    so, but it still is something untidy. We should instead
4657                        //    suppress this when we know the remote is still validating the
4658                        //    path.
4659                        match self.peer_supports_ack_frequency() {
4660                            true => self.immediate_ack(path_id),
4661                            false => {
4662                                self.ping_path(path_id).ok();
4663                            }
4664                        }
4665                    }
4666                }
4667                Frame::PathResponse(response) => {
4668                    let path = self
4669                        .paths
4670                        .get_mut(&path_id)
4671                        .expect("payload is processed only after the path becomes known");
4672
4673                    use PathTimer::*;
4674                    use paths::OnPathResponseReceived::*;
4675                    match path
4676                        .data
4677                        .on_path_response_received(now, response.0, network_path)
4678                    {
4679                        OnPath { was_open } => {
4680                            let qlog = self.qlog.with_time(now);
4681
4682                            self.timers
4683                                .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4684                            self.timers
4685                                .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4686
4687                            let next_challenge = path
4688                                .data
4689                                .earliest_on_path_expiring_challenge()
4690                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4691                            self.timers.set_or_stop(
4692                                Timer::PerPath(path_id, PathChallengeLost),
4693                                next_challenge,
4694                                qlog,
4695                            );
4696
4697                            if !was_open {
4698                                if is_multipath_negotiated {
4699                                    self.events
4700                                        .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4701                                }
4702                                if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4703                                {
4704                                    self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4705                                        id: path_id,
4706                                        addr: observed.socket_addr(),
4707                                    }));
4708                                }
4709                            }
4710                            if let Some((_, ref mut prev)) = path.prev {
4711                                prev.reset_on_path_challenges();
4712                            }
4713                        }
4714                        OffPath => {
4715                            debug!(%response, "Valid response to off-path PATH_CHALLENGE");
4716                        }
4717                        Ignored {
4718                            sent_on,
4719                            current_path,
4720                        } => {
4721                            debug!(%sent_on, %current_path, %response, "ignoring valid PATH_RESPONSE")
4722                        }
4723                        Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4724                    }
4725                }
4726                Frame::MaxData(frame::MaxData(bytes)) => {
4727                    self.streams.received_max_data(bytes);
4728                }
4729                Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
4730                    self.streams.received_max_stream_data(id, offset)?;
4731                }
4732                Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
4733                    self.streams.received_max_streams(dir, count)?;
4734                }
4735                Frame::ResetStream(frame) => {
4736                    if self.streams.received_reset(frame)?.should_transmit() {
4737                        self.spaces[SpaceId::Data].pending.max_data = true;
4738                    }
4739                }
4740                Frame::DataBlocked(DataBlocked(offset)) => {
4741                    debug!(offset, "peer claims to be blocked at connection level");
4742                }
4743                Frame::StreamDataBlocked(StreamDataBlocked { id, offset }) => {
4744                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4745                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4746                        return Err(TransportError::STREAM_STATE_ERROR(
4747                            "STREAM_DATA_BLOCKED on send-only stream",
4748                        ));
4749                    }
4750                    debug!(
4751                        stream = %id,
4752                        offset, "peer claims to be blocked at stream level"
4753                    );
4754                }
4755                Frame::StreamsBlocked(StreamsBlocked { dir, limit }) => {
4756                    if limit > MAX_STREAM_COUNT {
4757                        return Err(TransportError::FRAME_ENCODING_ERROR(
4758                            "unrepresentable stream limit",
4759                        ));
4760                    }
4761                    debug!(
4762                        "peer claims to be blocked opening more than {} {} streams",
4763                        limit, dir
4764                    );
4765                }
4766                Frame::StopSending(frame::StopSending { id, error_code }) => {
4767                    if id.initiator() != self.side.side() {
4768                        if id.dir() == Dir::Uni {
4769                            debug!("got STOP_SENDING on recv-only {}", id);
4770                            return Err(TransportError::STREAM_STATE_ERROR(
4771                                "STOP_SENDING on recv-only stream",
4772                            ));
4773                        }
4774                    } else if self.streams.is_local_unopened(id) {
4775                        return Err(TransportError::STREAM_STATE_ERROR(
4776                            "STOP_SENDING on unopened stream",
4777                        ));
4778                    }
4779                    self.streams.received_stop_sending(id, error_code);
4780                }
4781                Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4782                    if let Some(ref path_id) = path_id {
4783                        span.record("path", tracing::field::display(&path_id));
4784                    }
4785                    let path_id = path_id.unwrap_or_default();
4786                    match self.local_cid_state.get_mut(&path_id) {
4787                        None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4788                        Some(cid_state) => {
4789                            let allow_more_cids = cid_state
4790                                .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4791
4792                            // If the path has closed, we do not issue more CIDs for this path
4793                            // For details see  https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.2.2
4794                            // > an endpoint SHOULD provide new connection IDs for that path, if still open, using PATH_NEW_CONNECTION_ID frames.
4795                            let has_path = !self.abandoned_paths.contains(&path_id);
4796                            let allow_more_cids = allow_more_cids && has_path;
4797
4798                            debug_assert!(!self.state.is_drained()); // required for adding endpoint events, process_payload is never called for drained connections
4799                            self.endpoint_events
4800                                .push_back(EndpointEventInner::RetireConnectionId(
4801                                    now,
4802                                    path_id,
4803                                    sequence,
4804                                    allow_more_cids,
4805                                ));
4806                        }
4807                    }
4808                }
4809                Frame::NewConnectionId(frame) => {
4810                    let path_id = if let Some(path_id) = frame.path_id {
4811                        if !self.is_multipath_negotiated() {
4812                            return Err(TransportError::PROTOCOL_VIOLATION(
4813                                "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4814                            ));
4815                        }
4816                        if path_id > self.local_max_path_id {
4817                            return Err(TransportError::PROTOCOL_VIOLATION(
4818                                "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4819                            ));
4820                        }
4821                        path_id
4822                    } else {
4823                        PathId::ZERO
4824                    };
4825
4826                    if let Some(ref path_id) = frame.path_id {
4827                        span.record("path", tracing::field::display(&path_id));
4828                    }
4829
4830                    if self.abandoned_paths.contains(&path_id) {
4831                        trace!("ignoring issued CID for abandoned path");
4832                        continue;
4833                    }
4834                    let remote_cids = self
4835                        .remote_cids
4836                        .entry(path_id)
4837                        .or_insert_with(|| CidQueue::new(frame.id));
4838                    if remote_cids.active().is_empty() {
4839                        return Err(TransportError::PROTOCOL_VIOLATION(
4840                            "NEW_CONNECTION_ID when CIDs aren't in use",
4841                        ));
4842                    }
4843                    if frame.retire_prior_to > frame.sequence {
4844                        return Err(TransportError::PROTOCOL_VIOLATION(
4845                            "NEW_CONNECTION_ID retiring unissued CIDs",
4846                        ));
4847                    }
4848
4849                    use crate::cid_queue::InsertError;
4850                    match remote_cids.insert(frame) {
4851                        Ok(None) if self.path(path_id).is_none() => {
4852                            // if this gives us CIDs to open a new path and a nat traversal attempt
4853                            // is underway we could try to probe a pending remote
4854                            self.continue_nat_traversal_round(now);
4855                        }
4856                        Ok(None) => {}
4857                        Ok(Some((retired, reset_token))) => {
4858                            let pending_retired =
4859                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
4860                            /// Ensure `pending_retired` cannot grow without bound. Limit is
4861                            /// somewhat arbitrary but very permissive.
4862                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4863                            // We don't bother counting in-flight frames because those are bounded
4864                            // by congestion control.
4865                            if (pending_retired.len() as u64)
4866                                .saturating_add(retired.end.saturating_sub(retired.start))
4867                                > MAX_PENDING_RETIRED_CIDS
4868                            {
4869                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4870                                    "queued too many retired CIDs",
4871                                ));
4872                            }
4873                            pending_retired.extend(retired.map(|seq| (path_id, seq)));
4874                            self.set_reset_token(path_id, network_path.remote, reset_token);
4875                        }
4876                        Err(InsertError::ExceedsLimit) => {
4877                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4878                        }
4879                        Err(InsertError::Retired) => {
4880                            trace!("discarding already-retired");
4881                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
4882                            // range of connection IDs larger than the active connection ID limit
4883                            // was retired all at once via retire_prior_to.
4884                            self.spaces[SpaceId::Data]
4885                                .pending
4886                                .retire_cids
4887                                .push((path_id, frame.sequence));
4888                            continue;
4889                        }
4890                    };
4891
4892                    if self.side.is_server()
4893                        && path_id == PathId::ZERO
4894                        && self
4895                            .remote_cids
4896                            .get(&PathId::ZERO)
4897                            .map(|cids| cids.active_seq() == 0)
4898                            .unwrap_or_default()
4899                    {
4900                        // We're a server still using the initial remote CID for the client, so
4901                        // let's switch immediately to enable clientside stateless resets.
4902                        self.update_remote_cid(PathId::ZERO);
4903                    }
4904                }
4905                Frame::NewToken(NewToken { token }) => {
4906                    let ConnectionSide::Client {
4907                        token_store,
4908                        server_name,
4909                        ..
4910                    } = &self.side
4911                    else {
4912                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4913                    };
4914                    if token.is_empty() {
4915                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4916                    }
4917                    trace!("got new token");
4918                    token_store.insert(server_name, token);
4919                }
4920                Frame::Datagram(datagram) => {
4921                    if self
4922                        .datagrams
4923                        .received(datagram, &self.config.datagram_receive_buffer_size)?
4924                    {
4925                        self.events.push_back(Event::DatagramReceived);
4926                    }
4927                }
4928                Frame::AckFrequency(ack_frequency) => {
4929                    // This frame can only be sent in the Data space
4930
4931                    if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4932                        // The AckFrequency frame is stale (we have already received a more
4933                        // recent one)
4934                        continue;
4935                    }
4936
4937                    // Update the params for all of our paths
4938                    for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4939                        space.pending_acks.set_ack_frequency_params(&ack_frequency);
4940
4941                        // Our `max_ack_delay` has been updated, so we may need to adjust
4942                        // its associated timeout
4943                        if let Some(timeout) = space
4944                            .pending_acks
4945                            .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4946                        {
4947                            self.timers.set(
4948                                Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4949                                timeout,
4950                                self.qlog.with_time(now),
4951                            );
4952                        }
4953                    }
4954                }
4955                Frame::ImmediateAck => {
4956                    // This frame can only be sent in the Data space
4957                    for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4958                        pns.pending_acks.set_immediate_ack_required();
4959                    }
4960                }
4961                Frame::HandshakeDone => {
4962                    if self.side.is_server() {
4963                        return Err(TransportError::PROTOCOL_VIOLATION(
4964                            "client sent HANDSHAKE_DONE",
4965                        ));
4966                    }
4967                    if self.crypto_state.has_keys(EncryptionLevel::Handshake) {
4968                        self.discard_space(now, SpaceKind::Handshake);
4969                    }
4970                    self.events.push_back(Event::HandshakeConfirmed);
4971                    trace!("handshake confirmed");
4972                }
4973                Frame::ObservedAddr(observed) => {
4974                    // check if params allows the peer to send report and this node to receive it
4975                    trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4976                    if !self
4977                        .peer_params
4978                        .address_discovery_role
4979                        .should_report(&self.config.address_discovery_role)
4980                    {
4981                        return Err(TransportError::PROTOCOL_VIOLATION(
4982                            "received OBSERVED_ADDRESS frame when not negotiated",
4983                        ));
4984                    }
4985                    // must only be sent in data space
4986                    if packet.header.space() != SpaceKind::Data {
4987                        return Err(TransportError::PROTOCOL_VIOLATION(
4988                            "OBSERVED_ADDRESS frame outside data space",
4989                        ));
4990                    }
4991
4992                    let path = self.path_data_mut(path_id);
4993                    if network_path == path.network_path {
4994                        if let Some(updated) = path.update_observed_addr_report(observed)
4995                            && path.open_status == paths::OpenStatus::Informed
4996                        {
4997                            self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4998                                id: path_id,
4999                                addr: updated,
5000                            }));
5001                            // otherwise the event is reported when the path is deemed open
5002                        }
5003                    } else {
5004                        // include in migration
5005                        migration_observed_addr = Some(observed)
5006                    }
5007                }
5008                Frame::PathAbandon(frame::PathAbandon {
5009                    path_id,
5010                    error_code,
5011                }) => {
5012                    span.record("path", tracing::field::display(&path_id));
5013                    match self.close_path_inner(
5014                        now,
5015                        path_id,
5016                        PathAbandonReason::RemoteAbandoned {
5017                            error_code: error_code.into(),
5018                        },
5019                    ) {
5020                        Ok(()) => {
5021                            trace!("peer abandoned path");
5022                        }
5023                        Err(ClosePathError::LastOpenPath) => {
5024                            trace!("peer abandoned last path, closing connection");
5025                            return Err(TransportError::NO_VIABLE_PATH(
5026                                "last path abandoned by peer",
5027                            ));
5028                        }
5029                        Err(ClosePathError::ClosedPath) => {
5030                            trace!("peer abandoned already closed path");
5031                        }
5032                        Err(ClosePathError::MultipathNotNegotiated) => {
5033                            return Err(TransportError::PROTOCOL_VIOLATION(
5034                                "received PATH_ABANDON frame when multipath was not negotiated",
5035                            ));
5036                        }
5037                    };
5038
5039                    // Start draining the path if it still exists and hasn't started draining yet.
5040                    if let Some(path) = self.paths.get_mut(&path_id)
5041                        && !mem::replace(&mut path.data.draining, true)
5042                    {
5043                        let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5044                        let pto = path.data.rtt.pto_base() + ack_delay;
5045                        self.timers.set(
5046                            Timer::PerPath(path_id, PathTimer::DiscardPath),
5047                            now + 3 * pto,
5048                            self.qlog.with_time(now),
5049                        );
5050
5051                        self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
5052                    }
5053                }
5054                Frame::PathStatusAvailable(info) => {
5055                    span.record("path", tracing::field::display(&info.path_id));
5056                    if self.is_multipath_negotiated() {
5057                        self.on_path_status(
5058                            info.path_id,
5059                            PathStatus::Available,
5060                            info.status_seq_no,
5061                        );
5062                    } else {
5063                        return Err(TransportError::PROTOCOL_VIOLATION(
5064                            "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
5065                        ));
5066                    }
5067                }
5068                Frame::PathStatusBackup(info) => {
5069                    span.record("path", tracing::field::display(&info.path_id));
5070                    if self.is_multipath_negotiated() {
5071                        self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
5072                    } else {
5073                        return Err(TransportError::PROTOCOL_VIOLATION(
5074                            "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
5075                        ));
5076                    }
5077                }
5078                Frame::MaxPathId(frame::MaxPathId(path_id)) => {
5079                    span.record("path", tracing::field::display(&path_id));
5080                    if !self.is_multipath_negotiated() {
5081                        return Err(TransportError::PROTOCOL_VIOLATION(
5082                            "received MAX_PATH_ID frame when multipath was not negotiated",
5083                        ));
5084                    }
5085                    // frames that do not increase the path id are ignored
5086                    if path_id > self.remote_max_path_id {
5087                        self.remote_max_path_id = path_id;
5088                        self.issue_first_path_cids(now);
5089                        while let Some(true) = self.continue_nat_traversal_round(now) {}
5090                    }
5091                }
5092                Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
5093                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
5094                    // be treated as a connection error of type PROTOCOL_VIOLATION.
5095                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
5096                    if self.is_multipath_negotiated() {
5097                        if max_path_id > self.local_max_path_id {
5098                            return Err(TransportError::PROTOCOL_VIOLATION(
5099                                "PATHS_BLOCKED maximum path identifier was larger than local maximum",
5100                            ));
5101                        }
5102                        debug!("received PATHS_BLOCKED({:?})", max_path_id);
5103                        // TODO(@divma): ensure max concurrent paths
5104                    } else {
5105                        return Err(TransportError::PROTOCOL_VIOLATION(
5106                            "received PATHS_BLOCKED frame when not multipath was not negotiated",
5107                        ));
5108                    }
5109                }
5110                Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
5111                    // Nothing to do.  This is recorded in the frame stats, but otherwise we
5112                    // always issue all CIDs we're allowed to issue, so either this is an
5113                    // impatient peer or a bug on our side.
5114
5115                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
5116                    // be treated as a connection error of type PROTOCOL_VIOLATION.
5117                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
5118                    if self.is_multipath_negotiated() {
5119                        if path_id > self.local_max_path_id {
5120                            return Err(TransportError::PROTOCOL_VIOLATION(
5121                                "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
5122                            ));
5123                        }
5124                        if next_seq.0
5125                            > self
5126                                .local_cid_state
5127                                .get(&path_id)
5128                                .map(|cid_state| cid_state.active_seq().1 + 1)
5129                                .unwrap_or_default()
5130                        {
5131                            return Err(TransportError::PROTOCOL_VIOLATION(
5132                                "PATH_CIDS_BLOCKED next sequence number larger than in local state",
5133                            ));
5134                        }
5135                        debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
5136                    } else {
5137                        return Err(TransportError::PROTOCOL_VIOLATION(
5138                            "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
5139                        ));
5140                    }
5141                }
5142                Frame::AddAddress(addr) => {
5143                    let client_state = match self.n0_nat_traversal.client_side_mut() {
5144                        Ok(state) => state,
5145                        Err(err) => {
5146                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
5147                                "Nat traversal(ADD_ADDRESS): {err}"
5148                            )));
5149                        }
5150                    };
5151
5152                    if !client_state.check_remote_address(&addr) {
5153                        // if the address is not valid we flag it, but update anyway
5154                        warn!(?addr, "server sent illegal ADD_ADDRESS frame");
5155                    }
5156
5157                    match client_state.add_remote_address(addr) {
5158                        Ok(maybe_added) => {
5159                            if let Some(added) = maybe_added {
5160                                self.events.push_back(Event::NatTraversal(
5161                                    n0_nat_traversal::Event::AddressAdded(added),
5162                                ));
5163                            }
5164                        }
5165                        Err(e) => {
5166                            warn!(%e, "failed to add remote address")
5167                        }
5168                    }
5169                }
5170                Frame::RemoveAddress(addr) => {
5171                    let client_state = match self.n0_nat_traversal.client_side_mut() {
5172                        Ok(state) => state,
5173                        Err(err) => {
5174                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
5175                                "Nat traversal(REMOVE_ADDRESS): {err}"
5176                            )));
5177                        }
5178                    };
5179                    if let Some(removed_addr) = client_state.remove_remote_address(addr) {
5180                        self.events.push_back(Event::NatTraversal(
5181                            n0_nat_traversal::Event::AddressRemoved(removed_addr),
5182                        ));
5183                    }
5184                }
5185                Frame::ReachOut(reach_out) => {
5186                    let ipv6 = self.is_ipv6();
5187                    let server_state = match self.n0_nat_traversal.server_side_mut() {
5188                        Ok(state) => state,
5189                        Err(err) => {
5190                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
5191                                "Nat traversal(REACH_OUT): {err}"
5192                            )));
5193                        }
5194                    };
5195
5196                    if let Err(err) = server_state.handle_reach_out(reach_out, ipv6) {
5197                        return Err(TransportError::PROTOCOL_VIOLATION(format!(
5198                            "Nat traversal(REACH_OUT): {err}"
5199                        )));
5200                    }
5201                }
5202            }
5203        }
5204
5205        let space = self.spaces[SpaceId::Data].for_path(path_id);
5206        if space
5207            .pending_acks
5208            .packet_received(now, number, ack_eliciting, &space.dedup)
5209        {
5210            if self.abandoned_paths.contains(&path_id) {
5211                // § 3.4.3 QUIC-MULTIPATH: promptly send ACKs for packets received from
5212                // abandoned paths.
5213                space.pending_acks.set_immediate_ack_required();
5214            } else {
5215                self.timers.set(
5216                    Timer::PerPath(path_id, PathTimer::MaxAckDelay),
5217                    now + self.ack_frequency.max_ack_delay,
5218                    self.qlog.with_time(now),
5219                );
5220            }
5221        }
5222
5223        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
5224        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
5225        // are only freed, and hence only issue credit, once the application has been notified
5226        // during a read on the stream.
5227        let pending = &mut self.spaces[SpaceId::Data].pending;
5228        self.streams.queue_max_stream_id(pending);
5229
5230        if let Some(reason) = close {
5231            self.state.move_to_draining(Some(reason.into()));
5232            self.connection_close_pending = true;
5233        }
5234
5235        if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
5236            && !is_probing_packet
5237            && network_path != self.path_data(path_id).network_path
5238        {
5239            let ConnectionSide::Server { ref server_config } = self.side else {
5240                panic!("packets from unknown remote should be dropped by clients");
5241            };
5242            debug_assert!(
5243                server_config.migration,
5244                "migration-initiating packets should have been dropped immediately"
5245            );
5246            self.migrate(path_id, now, network_path, migration_observed_addr);
5247            // Break linkability, if possible
5248            self.update_remote_cid(path_id);
5249            self.spin = false;
5250        }
5251
5252        Ok(())
5253    }
5254
5255    fn migrate(
5256        &mut self,
5257        path_id: PathId,
5258        now: Instant,
5259        network_path: FourTuple,
5260        observed_addr: Option<ObservedAddr>,
5261    ) {
5262        trace!(%network_path, %path_id, "migration initiated");
5263        self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
5264        // TODO(@divma): conditions for path migration in multipath are very specific, check them
5265        // again to prevent path migrations that should actually create a new path
5266
5267        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
5268        // Note that the congestion window will not grow until validation terminates. Helps mitigate
5269        // amplification attacks performed by spoofing source addresses.
5270        let prev_pto = self.pto(SpaceKind::Data, path_id);
5271        let path = self.paths.get_mut(&path_id).expect("known path");
5272        let mut new_path_data = if network_path.remote.is_ipv4()
5273            && network_path.remote.ip() == path.data.network_path.remote.ip()
5274        {
5275            PathData::from_previous(network_path, &path.data, self.path_generation_counter, now)
5276        } else {
5277            let peer_max_udp_payload_size =
5278                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
5279                    .unwrap_or(u16::MAX);
5280            PathData::new(
5281                network_path,
5282                self.allow_mtud,
5283                Some(peer_max_udp_payload_size),
5284                self.path_generation_counter,
5285                now,
5286                &self.config,
5287            )
5288        };
5289        new_path_data.last_observed_addr_report = path.data.last_observed_addr_report.clone();
5290        if let Some(report) = observed_addr
5291            && let Some(updated) = new_path_data.update_observed_addr_report(report)
5292        {
5293            tracing::info!("adding observed addr event from migration");
5294            self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5295                id: path_id,
5296                addr: updated,
5297            }));
5298        }
5299        new_path_data.pending_on_path_challenge = true;
5300
5301        let mut prev_path_data = mem::replace(&mut path.data, new_path_data);
5302
5303        // Only store this as previous path if it was validated. For all we know there could
5304        // already be a previous path stored which might have been validated in the past,
5305        // which is more valuable than one that's not yet validated.
5306        //
5307        // With multipath it is possible that there are no remote CIDs for the path ID
5308        // yet. In this case we would never have sent on this path yet and would not be able
5309        // to send a PATH_CHALLENGE either, which is currently a fire-and-forget affair
5310        // anyway. So don't store such a path either.
5311        if !prev_path_data.validated
5312            && let Some(cid) = self.remote_cids.get(&path_id).map(CidQueue::active)
5313        {
5314            prev_path_data.pending_on_path_challenge = true;
5315            // We haven't updated the remote CID yet, this captures the remote CID we were using on
5316            // the previous path.
5317            path.prev = Some((cid, prev_path_data));
5318        }
5319
5320        // We need to re-assign the correct remote to this path in qlog
5321        self.qlog.emit_tuple_assigned(path_id, network_path, now);
5322
5323        self.timers.set(
5324            Timer::PerPath(path_id, PathTimer::PathValidation),
5325            now + 3 * cmp::max(self.pto(SpaceKind::Data, path_id), prev_pto),
5326            self.qlog.with_time(now),
5327        );
5328    }
5329
5330    /// Handle a change in the local address, i.e. an active migration
5331    ///
5332    /// In the general (non-multipath) case, paths will perform a RFC9000 migration and be pinged
5333    /// for a liveness check. This is the behaviour of a path assumed to be recoverable, even if
5334    /// this is not the case.
5335    ///
5336    /// Clients in a connection in which multipath has been negotiated should migrate paths to new
5337    /// [`PathId`]s. For paths that are known to be non-recoverable can be migrated to a new
5338    /// [`PathId`] by closing the current path, and opening a new one to the same remote. Treating
5339    /// paths as non recoverable when necessary accelerates connectivity re-establishment, or might
5340    /// allow it altogether.
5341    ///
5342    /// The optional `hint` allows callers to indicate when paths are non-recoverable and should be
5343    /// migrated to new a [`PathId`].
5344    // NOTE: only clients are allowed to migrate, but generally dealing with RFC9000 migrations is
5345    // lacking <https://github.com/n0-computer/noq/issues/364>
5346    pub fn handle_network_change(&mut self, hint: Option<&dyn NetworkChangeHint>, now: Instant) {
5347        debug!("network changed");
5348        if self.state.is_drained() {
5349            return;
5350        }
5351        if self.highest_space < SpaceKind::Data {
5352            for path in self.paths.values_mut() {
5353                // Clear the local address for it to be obtained from the socket again.
5354                path.data.network_path.local_ip = None;
5355            }
5356
5357            self.update_remote_cid(PathId::ZERO);
5358            self.ping();
5359
5360            return;
5361        }
5362
5363        // Paths that can't recover so a new path should be open instead. If multipath is not
5364        // negotiated, this will be empty.
5365        let mut non_recoverable_paths = Vec::default();
5366        let mut recoverable_paths = Vec::default();
5367        let mut open_paths = 0;
5368
5369        let is_multipath_negotiated = self.is_multipath_negotiated();
5370        let is_client = self.side().is_client();
5371        let immediate_ack_allowed = self.peer_supports_ack_frequency();
5372
5373        for (path_id, path) in self.paths.iter_mut() {
5374            if self.abandoned_paths.contains(path_id) {
5375                continue;
5376            }
5377            open_paths += 1;
5378
5379            // Clear the local address for it to be obtained from the socket again. This applies to
5380            // all paths, regardless of being considered recoverable or not
5381            path.data.network_path.local_ip = None;
5382
5383            let network_path = path.data.network_path;
5384            let remote = network_path.remote;
5385
5386            // Without multipath, the connection tries to recover the single path, whereas with
5387            // multipath, even in a single-path scenario, we attempt to migrate the path to a new
5388            // PathId.
5389            let attempt_to_recover = if is_multipath_negotiated {
5390                if is_client {
5391                    hint.map(|h| h.is_path_recoverable(*path_id, network_path))
5392                        .unwrap_or(false)
5393                } else {
5394                    // Servers should have stable addresses so this scenario is generally discouraged.
5395                    // There is no way to prevent this, so the best hope is to attempt to recover the
5396                    // path
5397                    true
5398                }
5399            } else {
5400                // In the non multipath case, we try to recover the single active path
5401                true
5402            };
5403
5404            if attempt_to_recover {
5405                recoverable_paths.push((*path_id, remote));
5406            } else {
5407                non_recoverable_paths.push((*path_id, remote, path.data.local_status()))
5408            }
5409        }
5410
5411        /* NON RECOVERABLE PATHS */
5412        // This are handled first, so that in case the treatment intended for these fails, we can
5413        // go the recoverable route instead.
5414
5415        // Decide if we need to close first or open first in the multipath case.
5416        // - Opening first has a higher risk of getting limited by the negotiated MAX_PATH_ID.
5417        // - Closing first risks this being the only open path.
5418        // We prefer closing paths first unless we identify this is the last open path.
5419        let open_first = open_paths == non_recoverable_paths.len();
5420
5421        for (path_id, remote, status) in non_recoverable_paths.into_iter() {
5422            let network_path = FourTuple {
5423                remote,
5424                local_ip: None, /* allow the local ip to be discovered */
5425            };
5426
5427            if open_first && let Err(e) = self.open_path(network_path, status, now) {
5428                debug!(%e,"Failed to open new path for network change");
5429                // if this fails, let the path try to recover itself
5430                recoverable_paths.push((path_id, remote));
5431                continue;
5432            }
5433
5434            if let Err(e) =
5435                self.close_path_inner(now, path_id, PathAbandonReason::UnusableAfterNetworkChange)
5436            {
5437                debug!(%e,"Failed to close unrecoverable path after network change");
5438                recoverable_paths.push((path_id, remote));
5439                continue;
5440            }
5441
5442            if !open_first && let Err(e) = self.open_path(network_path, status, now) {
5443                // Path has already been closed if we got here. Since the path was not recoverable,
5444                // this might be desirable in any case, because other paths exist (!open_first) and
5445                // this was is considered non recoverable
5446                debug!(%e,"Failed to open new path for network change");
5447            }
5448        }
5449
5450        /* RECOVERABLE PATHS */
5451
5452        for (path_id, remote) in recoverable_paths.into_iter() {
5453            let space = &mut self.spaces[SpaceId::Data];
5454
5455            // Schedule a Ping for a liveness check.
5456            if let Some(path_space) = space.number_spaces.get_mut(&path_id) {
5457                path_space.ping_pending = true;
5458
5459                if immediate_ack_allowed {
5460                    path_space.immediate_ack_pending = true;
5461                }
5462            }
5463
5464            let Some((reset_token, retired)) =
5465                self.remote_cids.get_mut(&path_id).and_then(CidQueue::next)
5466            else {
5467                continue;
5468            };
5469
5470            // Retire the current remote CID and any CIDs we had to skip.
5471            space
5472                .pending
5473                .retire_cids
5474                .extend(retired.map(|seq| (path_id, seq)));
5475
5476            debug_assert!(!self.state.is_drained()); // required for endpoint_events, checked above
5477            self.endpoint_events
5478                .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5479        }
5480    }
5481
5482    /// Switch to a previously unused remote connection ID, if possible
5483    fn update_remote_cid(&mut self, path_id: PathId) {
5484        let Some((reset_token, retired)) = self
5485            .remote_cids
5486            .get_mut(&path_id)
5487            .and_then(|cids| cids.next())
5488        else {
5489            return;
5490        };
5491
5492        // Retire the current remote CID and any CIDs we had to skip.
5493        self.spaces[SpaceId::Data]
5494            .pending
5495            .retire_cids
5496            .extend(retired.map(|seq| (path_id, seq)));
5497        let remote = self.path_data(path_id).network_path.remote;
5498        self.set_reset_token(path_id, remote, reset_token);
5499    }
5500
5501    /// Sends this reset token to the endpoint
5502    ///
5503    /// The endpoint needs to know the reset tokens issued by the peer, so that if the peer
5504    /// sends a reset token it knows to route it to this connection. See RFC 9000 section
5505    /// 10.3. Stateless Reset.
5506    ///
5507    /// Reset tokens are different for each path, the endpoint identifies paths by peer
5508    /// socket address however, not by path ID.
5509    fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5510        debug_assert!(!self.state.is_drained()); // required for endpoint events, set_reset_token is never called for drained connections
5511        self.endpoint_events
5512            .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5513
5514        // During the handshake the server sends a reset token in the transport
5515        // parameters. When we are the client and we receive the reset token during the
5516        // handshake we want this to affect our peer transport parameters.
5517        // TODO(flub): Pretty sure this is pointless, the entire params is overwritten
5518        //    shortly after this was called.  And then the params don't have this anymore.
5519        if path_id == PathId::ZERO {
5520            self.peer_params.stateless_reset_token = Some(reset_token);
5521        }
5522    }
5523
5524    /// Issue an initial set of connection IDs to the peer upon connection
5525    fn issue_first_cids(&mut self, now: Instant) {
5526        if self
5527            .local_cid_state
5528            .get(&PathId::ZERO)
5529            .expect("PathId::ZERO exists when the connection is created")
5530            .cid_len()
5531            == 0
5532        {
5533            return;
5534        }
5535
5536        // Subtract 1 to account for the CID we supplied while handshaking
5537        let mut n = self.peer_params.issue_cids_limit() - 1;
5538        if let ConnectionSide::Server { server_config } = &self.side
5539            && server_config.has_preferred_address()
5540        {
5541            // We also sent a CID in the transport parameters
5542            n -= 1;
5543        }
5544        debug_assert!(!self.state.is_drained()); // requirement for endpoint_events
5545        self.endpoint_events
5546            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5547    }
5548
5549    /// Issues an initial set of CIDs for paths that have not yet had any CIDs issued
5550    ///
5551    /// Later CIDs are issued when CIDs expire or are retired by the peer.
5552    fn issue_first_path_cids(&mut self, now: Instant) {
5553        if let Some(max_path_id) = self.max_path_id() {
5554            let mut path_id = self.max_path_id_with_cids.next();
5555            while path_id <= max_path_id {
5556                self.endpoint_events
5557                    .push_back(EndpointEventInner::NeedIdentifiers(
5558                        path_id,
5559                        now,
5560                        self.peer_params.issue_cids_limit(),
5561                    ));
5562                path_id = path_id.next();
5563            }
5564            self.max_path_id_with_cids = max_path_id;
5565        }
5566    }
5567
5568    /// Populates a packet with frames
5569    ///
5570    /// This tries to fit as many frames as possible into the packet.
5571    ///
5572    /// *path_exclusive_only* means to only build frames which can only be sent on this
5573    /// *path.  This is used in multipath for backup paths while there is still an active
5574    /// *path.
5575    fn populate_packet<'a, 'b>(
5576        &mut self,
5577        now: Instant,
5578        space_id: SpaceId,
5579        path_id: PathId,
5580        scheduling_info: &PathSchedulingInfo,
5581        builder: &mut PacketBuilder<'a, 'b>,
5582    ) {
5583        let is_multipath_negotiated = self.is_multipath_negotiated();
5584        let space_has_keys = self.crypto_state.has_keys(space_id.encryption_level());
5585        let is_0rtt = space_id == SpaceId::Data && !space_has_keys;
5586        let stats = &mut self.stats.frame_tx;
5587        let space = &mut self.spaces[space_id];
5588        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5589        space
5590            .for_path(path_id)
5591            .pending_acks
5592            .maybe_ack_non_eliciting();
5593
5594        // HANDSHAKE_DONE
5595        if !is_0rtt
5596            && scheduling_info.may_send_data
5597            && mem::replace(&mut space.pending.handshake_done, false)
5598        {
5599            builder.write_frame(frame::HandshakeDone, stats);
5600        }
5601
5602        // REACH_OUT
5603        if let Some((round, addresses)) = space.pending.reach_out.as_mut()
5604            && scheduling_info.may_send_data
5605        {
5606            while let Some(local_addr) = addresses.iter().next().copied() {
5607                let local_addr = addresses.take(&local_addr).expect("found from iter");
5608                let reach_out = frame::ReachOut::new(*round, local_addr);
5609                if builder.frame_space_remaining() > reach_out.size() {
5610                    builder.write_frame(reach_out, stats);
5611                } else {
5612                    addresses.insert(local_addr);
5613                    break;
5614                }
5615            }
5616            if addresses.is_empty() {
5617                space.pending.reach_out = None;
5618            }
5619        }
5620
5621        // OBSERVED_ADDR
5622        if scheduling_info.may_send_data
5623            && space_id == SpaceId::Data
5624            && self
5625                .config
5626                .address_discovery_role
5627                .should_report(&self.peer_params.address_discovery_role)
5628            && (!path.observed_addr_sent || space.pending.observed_addr)
5629        {
5630            let frame =
5631                frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
5632            if builder.frame_space_remaining() > frame.size() {
5633                builder.write_frame(frame, stats);
5634
5635                self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5636                path.observed_addr_sent = true;
5637
5638                space.pending.observed_addr = false;
5639            }
5640        }
5641
5642        // PING
5643        if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5644            builder.write_frame(frame::Ping, stats);
5645        }
5646
5647        // IMMEDIATE_ACK
5648        if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5649            debug_assert_eq!(
5650                space_id,
5651                SpaceId::Data,
5652                "immediate acks must be sent in the data space"
5653            );
5654            builder.write_frame(frame::ImmediateAck, stats);
5655        }
5656
5657        // ACK
5658        if scheduling_info.may_send_data {
5659            for path_id in space
5660                .number_spaces
5661                .iter_mut()
5662                .filter(|(_, pns)| pns.pending_acks.can_send())
5663                .map(|(&path_id, _)| path_id)
5664                .collect::<Vec<_>>()
5665            {
5666                Self::populate_acks(
5667                    now,
5668                    self.receiving_ecn,
5669                    path_id,
5670                    space_id,
5671                    space,
5672                    is_multipath_negotiated,
5673                    builder,
5674                    stats,
5675                    space_has_keys,
5676                );
5677            }
5678        }
5679
5680        // ACK_FREQUENCY
5681        if scheduling_info.may_send_data && mem::replace(&mut space.pending.ack_frequency, false) {
5682            let sequence_number = self.ack_frequency.next_sequence_number();
5683
5684            // Safe to unwrap because this is always provided when ACK frequency is enabled
5685            let config = self.config.ack_frequency_config.as_ref().unwrap();
5686
5687            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
5688            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5689                path.rtt.get(),
5690                config,
5691                &self.peer_params,
5692            );
5693
5694            let frame = frame::AckFrequency {
5695                sequence: sequence_number,
5696                ack_eliciting_threshold: config.ack_eliciting_threshold,
5697                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5698                reordering_threshold: config.reordering_threshold,
5699            };
5700            builder.write_frame(frame, stats);
5701
5702            self.ack_frequency
5703                .ack_frequency_sent(path_id, builder.packet_number, max_ack_delay);
5704        }
5705
5706        // PATH_CHALLENGE
5707        if builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
5708            && space_id == SpaceId::Data
5709            && path.pending_on_path_challenge
5710            && !self.state.is_closed()
5711        // we don't want to send new challenges if we are already closing
5712        {
5713            path.pending_on_path_challenge = false;
5714
5715            let token = self.rng.random();
5716            path.record_path_challenge_sent(now, token, path.network_path);
5717            // Generate a new challenge every time we send a new PATH_CHALLENGE
5718            let challenge = frame::PathChallenge(token);
5719            builder.write_frame(challenge, stats);
5720            builder.require_padding();
5721            let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5722            if path.open_status == paths::OpenStatus::Pending {
5723                path.open_status = paths::OpenStatus::Sent;
5724                self.timers.set(
5725                    Timer::PerPath(path_id, PathTimer::PathOpen),
5726                    now + 3 * pto,
5727                    self.qlog.with_time(now),
5728                );
5729            }
5730
5731            self.timers.set(
5732                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5733                now + pto,
5734                self.qlog.with_time(now),
5735            );
5736
5737            if is_multipath_negotiated && !path.validated && path.pending_on_path_challenge {
5738                // queue informing the path status along with the challenge
5739                space.pending.path_status.insert(path_id);
5740            }
5741
5742            // Always include an OBSERVED_ADDR frame with a PATH_CHALLENGE, regardless
5743            // of whether one has already been sent on this path.
5744            if space_id == SpaceId::Data
5745                && self
5746                    .config
5747                    .address_discovery_role
5748                    .should_report(&self.peer_params.address_discovery_role)
5749            {
5750                let frame = frame::ObservedAddr::new(
5751                    path.network_path.remote,
5752                    self.next_observed_addr_seq_no,
5753                );
5754                if builder.frame_space_remaining() > frame.size() {
5755                    builder.write_frame(frame, stats);
5756
5757                    self.next_observed_addr_seq_no =
5758                        self.next_observed_addr_seq_no.saturating_add(1u8);
5759                    path.observed_addr_sent = true;
5760
5761                    space.pending.observed_addr = false;
5762                }
5763            }
5764        }
5765
5766        // PATH_RESPONSE
5767        if builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
5768            && space_id == SpaceId::Data
5769            && let Some(token) = path.path_responses.pop_on_path(path.network_path)
5770        {
5771            let response = frame::PathResponse(token);
5772            trace!(frame = %response);
5773            builder.write_frame(response, stats);
5774            builder.require_padding();
5775
5776            // NOTE: this is technically not required but might be useful to ride the
5777            // request/response nature of path challenges to refresh an observation
5778            // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
5779            if space_id == SpaceId::Data
5780                && self
5781                    .config
5782                    .address_discovery_role
5783                    .should_report(&self.peer_params.address_discovery_role)
5784            {
5785                let frame = frame::ObservedAddr::new(
5786                    path.network_path.remote,
5787                    self.next_observed_addr_seq_no,
5788                );
5789                if builder.frame_space_remaining() > frame.size() {
5790                    builder.write_frame(frame, stats);
5791
5792                    self.next_observed_addr_seq_no =
5793                        self.next_observed_addr_seq_no.saturating_add(1u8);
5794                    path.observed_addr_sent = true;
5795
5796                    space.pending.observed_addr = false;
5797                }
5798            }
5799        }
5800
5801        // CRYPTO
5802        while !is_0rtt
5803            && scheduling_info.may_send_data
5804            && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
5805        {
5806            let mut frame = match space.pending.crypto.pop_front() {
5807                Some(x) => x,
5808                None => break,
5809            };
5810
5811            // Calculate the maximum amount of crypto data we can store in the buffer.
5812            // Since the offset is known, we can reserve the exact size required to encode it.
5813            // For length we reserve 2bytes which allows to encode up to 2^14,
5814            // which is more than what fits into normally sized QUIC frames.
5815            let max_crypto_data_size = builder.frame_space_remaining()
5816                - 1 // Frame Type
5817                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5818                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
5819
5820            let len = frame
5821                .data
5822                .len()
5823                .min(2usize.pow(14) - 1)
5824                .min(max_crypto_data_size);
5825
5826            let data = frame.data.split_to(len);
5827            let offset = frame.offset;
5828            let truncated = frame::Crypto { offset, data };
5829            builder.write_frame(truncated, stats);
5830
5831            if !frame.data.is_empty() {
5832                frame.offset += len as u64;
5833                space.pending.crypto.push_front(frame);
5834            }
5835        }
5836
5837        // TODO(flub): maybe this is much higher priority?
5838        // PATH_ABANDON
5839        while space_id == SpaceId::Data
5840            && scheduling_info.may_send_data
5841            && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5842        {
5843            let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
5844            else {
5845                break;
5846            };
5847            let frame = frame::PathAbandon {
5848                path_id: abandoned_path_id,
5849                error_code,
5850            };
5851            builder.write_frame(frame, stats);
5852        }
5853
5854        // PATH_STATUS_AVAILABLE & PATH_STATUS_BACKUP
5855        while space_id == SpaceId::Data
5856            && scheduling_info.may_send_data
5857            && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
5858        {
5859            let Some(path_id) = space.pending.path_status.pop_first() else {
5860                break;
5861            };
5862            let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5863                trace!(%path_id, "discarding queued path status for unknown path");
5864                continue;
5865            };
5866
5867            let seq = path.status.seq();
5868            match path.local_status() {
5869                PathStatus::Available => {
5870                    let frame = frame::PathStatusAvailable {
5871                        path_id,
5872                        status_seq_no: seq,
5873                    };
5874                    builder.write_frame(frame, stats);
5875                }
5876                PathStatus::Backup => {
5877                    let frame = frame::PathStatusBackup {
5878                        path_id,
5879                        status_seq_no: seq,
5880                    };
5881                    builder.write_frame(frame, stats);
5882                }
5883            }
5884        }
5885
5886        // MAX_PATH_ID
5887        if space_id == SpaceId::Data
5888            && scheduling_info.may_send_data
5889            && space.pending.max_path_id
5890            && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
5891        {
5892            let frame = frame::MaxPathId(self.local_max_path_id);
5893            builder.write_frame(frame, stats);
5894            space.pending.max_path_id = false;
5895        }
5896
5897        // PATHS_BLOCKED
5898        if space_id == SpaceId::Data
5899            && scheduling_info.may_send_data
5900            && space.pending.paths_blocked
5901            && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5902        {
5903            let frame = frame::PathsBlocked(self.remote_max_path_id);
5904            builder.write_frame(frame, stats);
5905            space.pending.paths_blocked = false;
5906        }
5907
5908        // PATH_CIDS_BLOCKED
5909        while space_id == SpaceId::Data
5910            && scheduling_info.may_send_data
5911            && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5912        {
5913            let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
5914                break;
5915            };
5916            let next_seq = match self.remote_cids.get(&path_id) {
5917                Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
5918                None => VarInt(0),
5919            };
5920            let frame = frame::PathCidsBlocked { path_id, next_seq };
5921            builder.write_frame(frame, stats);
5922        }
5923
5924        // RESET_STREAM, STOP_SENDING, MAX_DATA, MAX_STREAM_DATA, MAX_STREAMS
5925        if space_id == SpaceId::Data && scheduling_info.may_send_data {
5926            self.streams
5927                .write_control_frames(builder, &mut space.pending, stats);
5928        }
5929
5930        // NEW_CONNECTION_ID
5931        let cid_len = self
5932            .local_cid_state
5933            .values()
5934            .map(|cid_state| cid_state.cid_len())
5935            .max()
5936            .expect("some local CID state must exist");
5937        let new_cid_size_bound =
5938            frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5939        while scheduling_info.may_send_data && builder.frame_space_remaining() > new_cid_size_bound
5940        {
5941            let issued = match space.pending.new_cids.pop() {
5942                Some(x) => x,
5943                None => break,
5944            };
5945            let retire_prior_to = self
5946                .local_cid_state
5947                .get(&issued.path_id)
5948                .map(|cid_state| cid_state.retire_prior_to())
5949                .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5950
5951            let cid_path_id = match is_multipath_negotiated {
5952                true => Some(issued.path_id),
5953                false => {
5954                    debug_assert_eq!(issued.path_id, PathId::ZERO);
5955                    None
5956                }
5957            };
5958            let frame = frame::NewConnectionId {
5959                path_id: cid_path_id,
5960                sequence: issued.sequence,
5961                retire_prior_to,
5962                id: issued.id,
5963                reset_token: issued.reset_token,
5964            };
5965            builder.write_frame(frame, stats);
5966        }
5967
5968        // RETIRE_CONNECTION_ID
5969        let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5970        while scheduling_info.may_send_data && builder.frame_space_remaining() > retire_cid_bound {
5971            let (path_id, sequence) = match space.pending.retire_cids.pop() {
5972                Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
5973                Some((path_id, seq)) => (Some(path_id), seq),
5974                None => break,
5975            };
5976            let frame = frame::RetireConnectionId { path_id, sequence };
5977            builder.write_frame(frame, stats);
5978        }
5979
5980        // DATAGRAM
5981        let mut sent_datagrams = false;
5982        while scheduling_info.may_send_data
5983            && builder.frame_space_remaining() > Datagram::SIZE_BOUND
5984            && space_id == SpaceId::Data
5985        {
5986            match self.datagrams.write(builder, stats) {
5987                true => {
5988                    sent_datagrams = true;
5989                }
5990                false => break,
5991            }
5992        }
5993        if self.datagrams.send_blocked && sent_datagrams {
5994            self.events.push_back(Event::DatagramsUnblocked);
5995            self.datagrams.send_blocked = false;
5996        }
5997
5998        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5999
6000        // NEW_TOKEN
6001        if scheduling_info.may_send_data {
6002            while let Some(network_path) = space.pending.new_tokens.pop() {
6003                debug_assert_eq!(space_id, SpaceId::Data);
6004                let ConnectionSide::Server { server_config } = &self.side else {
6005                    panic!("NEW_TOKEN frames should not be enqueued by clients");
6006                };
6007
6008                if !network_path.is_probably_same_path(&path.network_path) {
6009                    // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
6010                    // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
6011                    // frames upon an path change. Instead, when the new path becomes validated,
6012                    // NEW_TOKEN frames may be enqueued for the new path instead.
6013                    continue;
6014                }
6015
6016                let token = Token::new(
6017                    TokenPayload::Validation {
6018                        ip: network_path.remote.ip(),
6019                        issued: server_config.time_source.now(),
6020                    },
6021                    &mut self.rng,
6022                );
6023                let new_token = NewToken {
6024                    token: token.encode(&*server_config.token_key).into(),
6025                };
6026
6027                if builder.frame_space_remaining() < new_token.size() {
6028                    space.pending.new_tokens.push(network_path);
6029                    break;
6030                }
6031
6032                builder.write_frame(new_token, stats);
6033                builder.retransmits_mut().new_tokens.push(network_path);
6034            }
6035        }
6036
6037        // STREAM
6038        if scheduling_info.may_send_data && space_id == SpaceId::Data {
6039            self.streams
6040                .write_stream_frames(builder, self.config.send_fairness, stats);
6041        }
6042
6043        // ADD_ADDRESS
6044        while space_id == SpaceId::Data
6045            && scheduling_info.may_send_data
6046            && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
6047        {
6048            if let Some(added_address) = space.pending.add_address.pop_last() {
6049                builder.write_frame(added_address, stats);
6050            } else {
6051                break;
6052            }
6053        }
6054
6055        // REMOVE_ADDRESS
6056        while space_id == SpaceId::Data
6057            && scheduling_info.may_send_data
6058            && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
6059        {
6060            if let Some(removed_address) = space.pending.remove_address.pop_last() {
6061                builder.write_frame(removed_address, stats);
6062            } else {
6063                break;
6064            }
6065        }
6066    }
6067
6068    /// Write pending ACKs into a buffer
6069    fn populate_acks<'a, 'b>(
6070        now: Instant,
6071        receiving_ecn: bool,
6072        path_id: PathId,
6073        space_id: SpaceId,
6074        space: &mut PacketSpace,
6075        is_multipath_negotiated: bool,
6076        builder: &mut PacketBuilder<'a, 'b>,
6077        stats: &mut FrameStats,
6078        space_has_keys: bool,
6079    ) {
6080        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
6081        debug_assert!(space_has_keys, "tried to send ACK in 0-RTT");
6082
6083        debug_assert!(
6084            is_multipath_negotiated || path_id == PathId::ZERO,
6085            "Only PathId::ZERO allowed without multipath (have {path_id:?})"
6086        );
6087        if is_multipath_negotiated {
6088            debug_assert!(
6089                space_id == SpaceId::Data || path_id == PathId::ZERO,
6090                "path acks must be sent in 1RTT space (have {space_id:?})"
6091            );
6092        }
6093
6094        let pns = space.for_path(path_id);
6095        let ranges = pns.pending_acks.ranges();
6096        debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
6097        let ecn = if receiving_ecn {
6098            Some(&pns.ecn_counters)
6099        } else {
6100            None
6101        };
6102
6103        let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
6104        // TODO: This should come from `TransportConfig` if that gets configurable.
6105        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
6106        let delay = delay_micros >> ack_delay_exp.into_inner();
6107
6108        if is_multipath_negotiated && space_id == SpaceId::Data {
6109            if !ranges.is_empty() {
6110                let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
6111                builder.write_frame(frame, stats);
6112            }
6113        } else {
6114            builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
6115        }
6116    }
6117
6118    fn close_common(&mut self) {
6119        trace!("connection closed");
6120        self.timers.reset();
6121    }
6122
6123    fn set_close_timer(&mut self, now: Instant) {
6124        // QUIC-MULTIPATH § 2.6 Connection Closure: draining for 3*PTO using the max PTO of
6125        // all paths.
6126        let pto_max = self.max_pto_for_space(self.highest_space);
6127        self.timers.set(
6128            Timer::Conn(ConnTimer::Close),
6129            now + 3 * pto_max,
6130            self.qlog.with_time(now),
6131        );
6132    }
6133
6134    /// Handle transport parameters received from the peer
6135    ///
6136    /// *remote_cid* and *local_cid* are the source and destination CIDs respectively of the
6137    /// *packet into which the transport parameters arrived.
6138    fn handle_peer_params(
6139        &mut self,
6140        params: TransportParameters,
6141        local_cid: ConnectionId,
6142        remote_cid: ConnectionId,
6143        now: Instant,
6144    ) -> Result<(), TransportError> {
6145        if Some(self.original_remote_cid) != params.initial_src_cid
6146            || (self.side.is_client()
6147                && (Some(self.initial_dst_cid) != params.original_dst_cid
6148                    || self.retry_src_cid != params.retry_src_cid))
6149        {
6150            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
6151                "CID authentication failure",
6152            ));
6153        }
6154        if params.initial_max_path_id.is_some() && (local_cid.is_empty() || remote_cid.is_empty()) {
6155            return Err(TransportError::PROTOCOL_VIOLATION(
6156                "multipath must not use zero-length CIDs",
6157            ));
6158        }
6159
6160        self.set_peer_params(params);
6161        self.qlog.emit_peer_transport_params_received(self, now);
6162
6163        Ok(())
6164    }
6165
6166    fn set_peer_params(&mut self, params: TransportParameters) {
6167        self.streams.set_params(&params);
6168        self.idle_timeout =
6169            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
6170        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
6171
6172        if let Some(ref info) = params.preferred_address {
6173            // During the handshake PathId::ZERO exists.
6174            self.remote_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
6175                path_id: None,
6176                sequence: 1,
6177                id: info.connection_id,
6178                reset_token: info.stateless_reset_token,
6179                retire_prior_to: 0,
6180            })
6181            .expect(
6182                "preferred address CID is the first received, and hence is guaranteed to be legal",
6183            );
6184            let remote = self.path_data(PathId::ZERO).network_path.remote;
6185            self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
6186        }
6187        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
6188
6189        let mut multipath_enabled = None;
6190        if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
6191            self.config.get_initial_max_path_id(),
6192            params.initial_max_path_id,
6193        ) {
6194            // multipath is enabled, register the local and remote maximums
6195            self.local_max_path_id = local_max_path_id;
6196            self.remote_max_path_id = remote_max_path_id;
6197            let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
6198            debug!(%initial_max_path_id, "multipath negotiated");
6199            multipath_enabled = Some(initial_max_path_id);
6200        }
6201
6202        if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
6203            self.config
6204                .max_remote_nat_traversal_addresses
6205                .zip(params.max_remote_nat_traversal_addresses)
6206        {
6207            if let Some(max_initial_paths) =
6208                multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
6209            {
6210                let max_local_addresses = max_remotely_allowed_remote_addresses.get();
6211                let max_remote_addresses = max_locally_allowed_remote_addresses.get();
6212                self.n0_nat_traversal = n0_nat_traversal::State::new(
6213                    max_remote_addresses,
6214                    max_local_addresses,
6215                    self.side(),
6216                );
6217                debug!(
6218                    %max_remote_addresses, %max_local_addresses,
6219                    "n0's nat traversal negotiated"
6220                );
6221
6222                match self.side() {
6223                    Side::Client => {
6224                        if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
6225                            // in this case the client might try to open `max_remote_addresses` new
6226                            // paths, but the current multipath configuration will not allow it
6227                            debug!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
6228                        } else if max_local_addresses as u64
6229                            > params.active_connection_id_limit.into_inner()
6230                        {
6231                            // the server allows us to send at most `params.active_connection_id_limit`
6232                            // but they might need at least `max_local_addresses` to effectively send
6233                            // `PATH_CHALLENGE` frames to each advertised local address
6234                            debug!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
6235                        }
6236                    }
6237                    Side::Server => {
6238                        if (max_initial_paths.as_u32() as u64) < crate::LOCAL_CID_COUNT {
6239                            debug!(%max_initial_paths, local_cid_limit=%crate::LOCAL_CID_COUNT, "local server configuration might cause nat traversal issues")
6240                        }
6241                    }
6242                }
6243            } else {
6244                debug!("n0 nat traversal enabled for both endpoints, but multipath is missing")
6245            }
6246        }
6247
6248        self.peer_params = params;
6249        let peer_max_udp_payload_size =
6250            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
6251        self.path_data_mut(PathId::ZERO)
6252            .mtud
6253            .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
6254    }
6255
6256    /// Decrypts a packet, returning the packet number on success
6257    fn decrypt_packet(
6258        &mut self,
6259        now: Instant,
6260        path_id: PathId,
6261        packet: &mut Packet,
6262    ) -> Result<Option<u64>, Option<TransportError>> {
6263        let result = self
6264            .crypto_state
6265            .decrypt_packet_body(packet, path_id, &self.spaces)?;
6266
6267        let result = match result {
6268            Some(r) => r,
6269            None => return Ok(None),
6270        };
6271
6272        if result.outgoing_key_update_acked
6273            && let Some(prev) = self.crypto_state.prev_crypto.as_mut()
6274        {
6275            prev.end_packet = Some((result.number, now));
6276            self.set_key_discard_timer(now, packet.header.space());
6277        }
6278
6279        if result.incoming_key_update {
6280            trace!("key update authenticated");
6281            self.crypto_state
6282                .update_keys(Some((result.number, now)), true);
6283            self.set_key_discard_timer(now, packet.header.space());
6284        }
6285
6286        Ok(Some(result.number))
6287    }
6288
6289    fn peer_supports_ack_frequency(&self) -> bool {
6290        self.peer_params.min_ack_delay.is_some()
6291    }
6292
6293    /// Send an IMMEDIATE_ACK frame to the remote endpoint
6294    ///
6295    /// According to the spec, this will result in an error if the remote endpoint does not support
6296    /// the Acknowledgement Frequency extension
6297    pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
6298        debug_assert_eq!(
6299            self.highest_space,
6300            SpaceKind::Data,
6301            "immediate ack must be written in the data space"
6302        );
6303        self.spaces[SpaceId::Data]
6304            .for_path(path_id)
6305            .immediate_ack_pending = true;
6306    }
6307
6308    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
6309    #[cfg(test)]
6310    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
6311        let (path_id, first_decode, remaining) = match &event.0 {
6312            ConnectionEventInner::Datagram(DatagramConnectionEvent {
6313                path_id,
6314                first_decode,
6315                remaining,
6316                ..
6317            }) => (path_id, first_decode, remaining),
6318            _ => return None,
6319        };
6320
6321        if remaining.is_some() {
6322            panic!("Packets should never be coalesced in tests");
6323        }
6324
6325        let decrypted_header = self
6326            .crypto_state
6327            .unprotect_header(first_decode.clone(), self.peer_params.stateless_reset_token)?;
6328
6329        let mut packet = decrypted_header.packet?;
6330        self.crypto_state
6331            .decrypt_packet_body(&mut packet, *path_id, &self.spaces)
6332            .ok()?;
6333
6334        Some(packet.payload.to_vec())
6335    }
6336
6337    /// The number of bytes of packets containing retransmittable frames that have not been
6338    /// acknowledged or declared lost.
6339    #[cfg(test)]
6340    pub(crate) fn bytes_in_flight(&self) -> u64 {
6341        // TODO(@divma): consider including for multipath?
6342        self.path_data(PathId::ZERO).in_flight.bytes
6343    }
6344
6345    /// Number of bytes worth of non-ack-only packets that may be sent
6346    #[cfg(test)]
6347    pub(crate) fn congestion_window(&self) -> u64 {
6348        let path = self.path_data(PathId::ZERO);
6349        path.congestion
6350            .window()
6351            .saturating_sub(path.in_flight.bytes)
6352    }
6353
6354    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
6355    #[cfg(test)]
6356    pub(crate) fn is_idle(&self) -> bool {
6357        let current_timers = self.timers.values();
6358        current_timers
6359            .into_iter()
6360            .filter(|(timer, _)| {
6361                !matches!(
6362                    timer,
6363                    Timer::Conn(ConnTimer::KeepAlive)
6364                        | Timer::PerPath(_, PathTimer::PathKeepAlive)
6365                        | Timer::Conn(ConnTimer::PushNewCid)
6366                        | Timer::Conn(ConnTimer::KeyDiscard)
6367                )
6368            })
6369            .min_by_key(|(_, time)| *time)
6370            .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6371    }
6372
6373    /// Whether explicit congestion notification is in use on outgoing packets.
6374    #[cfg(test)]
6375    pub(crate) fn using_ecn(&self) -> bool {
6376        self.path_data(PathId::ZERO).sending_ecn
6377    }
6378
6379    /// The number of received bytes in the current path
6380    #[cfg(test)]
6381    pub(crate) fn total_recvd(&self) -> u64 {
6382        self.path_data(PathId::ZERO).total_recvd
6383    }
6384
6385    #[cfg(test)]
6386    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6387        self.local_cid_state
6388            .get(&PathId::ZERO)
6389            .unwrap()
6390            .active_seq()
6391    }
6392
6393    #[cfg(test)]
6394    #[track_caller]
6395    pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6396        self.local_cid_state
6397            .get(&PathId(path_id))
6398            .unwrap()
6399            .active_seq()
6400    }
6401
6402    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
6403    /// with updated `retire_prior_to` field set to `v`
6404    #[cfg(test)]
6405    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6406        let n = self
6407            .local_cid_state
6408            .get_mut(&PathId::ZERO)
6409            .unwrap()
6410            .assign_retire_seq(v);
6411        debug_assert!(!self.state.is_drained()); // requirement for endpoint_events
6412        self.endpoint_events
6413            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6414    }
6415
6416    /// Check the current active remote CID sequence for `PathId::ZERO`
6417    #[cfg(test)]
6418    pub(crate) fn active_remote_cid_seq(&self) -> u64 {
6419        self.remote_cids.get(&PathId::ZERO).unwrap().active_seq()
6420    }
6421
6422    /// Returns the detected maximum udp payload size for the current path
6423    #[cfg(test)]
6424    pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6425        self.path_data(path_id).current_mtu()
6426    }
6427
6428    /// Triggers path validation on all paths
6429    #[cfg(test)]
6430    pub(crate) fn trigger_path_validation(&mut self) {
6431        for path in self.paths.values_mut() {
6432            path.data.pending_on_path_challenge = true;
6433        }
6434    }
6435
6436    /// Simulates a protocol violation error for test purposes.
6437    #[cfg(test)]
6438    pub fn simulate_protocol_violation(&mut self, now: Instant) {
6439        if !self.state.is_closed() {
6440            self.state
6441                .move_to_closed(TransportError::PROTOCOL_VIOLATION("simulated violation"));
6442            self.close_common();
6443            if !self.state.is_drained() {
6444                self.set_close_timer(now);
6445            }
6446            self.connection_close_pending = true;
6447        }
6448    }
6449
6450    /// Whether we have on-path 1-RTT data to send.
6451    ///
6452    /// This checks for frames that can only be sent in the data space (1-RTT):
6453    /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated.
6454    /// - Pending PATH_RESPONSE frames.
6455    /// - Pending data to send in STREAM frames.
6456    /// - Pending DATAGRAM frames to send.
6457    ///
6458    /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that
6459    /// may need to be sent.
6460    fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6461        let space_id_only = self.paths.get(&path_id).is_some_and(|path| {
6462            path.data.pending_on_path_challenge || !path.data.path_responses.is_empty()
6463        });
6464
6465        // Stream control frames are checked in PacketSpace::can_send, only check data here.
6466        let other = self.streams.can_send_stream_data()
6467            || self
6468                .datagrams
6469                .outgoing
6470                .front()
6471                .is_some_and(|x| x.size(true) <= max_size);
6472
6473        // All `false` fields are set in PacketSpace::can_send.
6474        SendableFrames {
6475            acks: false,
6476            close: false,
6477            space_id_only,
6478            other,
6479        }
6480    }
6481
6482    /// Terminate the connection instantly, without sending a close packet
6483    fn kill(&mut self, reason: ConnectionError) {
6484        self.close_common();
6485        self.state.move_to_drained(Some(reason));
6486        // move_to_drained checks that we were never in drained before, so we
6487        // never sent a `Drained` event before (it's illegal to send more events after drained).
6488        self.endpoint_events.push_back(EndpointEventInner::Drained);
6489    }
6490
6491    /// Storage size required for the largest packet that can be transmitted on all currently
6492    /// available paths
6493    ///
6494    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
6495    ///
6496    /// When multipath is enabled, this value is the minimum MTU across all available paths.
6497    pub fn current_mtu(&self) -> u16 {
6498        self.paths
6499            .iter()
6500            .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6501            .map(|(_path_id, path_state)| path_state.data.current_mtu())
6502            .min()
6503            .expect("There is always at least one available path")
6504    }
6505
6506    /// Size of non-frame data for a 1-RTT packet
6507    ///
6508    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
6509    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
6510    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
6511    /// latency and packet loss.
6512    fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6513        let pn_len = PacketNumber::new(
6514            pn,
6515            self.spaces[SpaceId::Data]
6516                .for_path(path)
6517                .largest_acked_packet
6518                .unwrap_or(0),
6519        )
6520        .len();
6521
6522        // 1 byte for flags
6523        1 + self
6524            .remote_cids
6525            .get(&path)
6526            .map(|cids| cids.active().len())
6527            .unwrap_or(20)      // Max CID len in QUIC v1
6528            + pn_len
6529            + self.tag_len_1rtt()
6530    }
6531
6532    fn predict_1rtt_overhead_no_pn(&self) -> usize {
6533        let pn_len = 4;
6534
6535        let cid_len = self
6536            .remote_cids
6537            .values()
6538            .map(|cids| cids.active().len())
6539            .max()
6540            .unwrap_or(20); // Max CID len in QUIC v1
6541
6542        // 1 byte for flags
6543        1 + cid_len + pn_len + self.tag_len_1rtt()
6544    }
6545
6546    fn tag_len_1rtt(&self) -> usize {
6547        // encryption_keys for Data space returns 1-RTT keys if available, otherwise 0-RTT keys
6548        let packet_crypto = self
6549            .crypto_state
6550            .encryption_keys(SpaceKind::Data, self.side.side())
6551            .map(|(_header, packet, _level)| packet);
6552        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
6553        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
6554        // but that would needlessly prevent sending datagrams during 0-RTT.
6555        packet_crypto.map_or(16, |x| x.tag_len())
6556    }
6557
6558    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
6559    fn on_path_validated(&mut self, path_id: PathId) {
6560        self.path_data_mut(path_id).validated = true;
6561        let ConnectionSide::Server { server_config } = &self.side else {
6562            return;
6563        };
6564        let network_path = self.path_data(path_id).network_path;
6565        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6566        new_tokens.clear();
6567        for _ in 0..server_config.validation_token.sent {
6568            new_tokens.push(network_path);
6569        }
6570    }
6571
6572    /// Handle new path status information: PATH_STATUS_AVAILABLE, PATH_STATUS_BACKUP
6573    fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6574        if let Some(path) = self.paths.get_mut(&path_id) {
6575            path.data.status.remote_update(status, status_seq_no);
6576        } else {
6577            debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6578        }
6579        self.events.push_back(
6580            PathEvent::RemoteStatus {
6581                id: path_id,
6582                status,
6583            }
6584            .into(),
6585        );
6586    }
6587
6588    /// Returns the maximum [`PathId`] to be used for sending in this connection.
6589    ///
6590    /// This is calculated as minimum between the local and remote's maximums when multipath is
6591    /// enabled, or `None` when disabled.
6592    ///
6593    /// For data that's received, we should use [`Self::local_max_path_id`] instead.
6594    /// The reasoning is that the remote might already have updated to its own newer
6595    /// [`Self::max_path_id`] after sending out a `MAX_PATH_ID` frame, but it got re-ordered.
6596    fn max_path_id(&self) -> Option<PathId> {
6597        if self.is_multipath_negotiated() {
6598            Some(self.remote_max_path_id.min(self.local_max_path_id))
6599        } else {
6600            None
6601        }
6602    }
6603
6604    /// Returns whether this connection has a socket that supports IPv6.
6605    ///
6606    /// TODO(matheus23): This is related to noq endpoint state's `ipv6` bool. We should move that info
6607    /// here instead of trying to hack around not knowing it exactly.
6608    fn is_ipv6(&self) -> bool {
6609        self.paths
6610            .values()
6611            .any(|p| p.data.network_path.remote.is_ipv6())
6612    }
6613
6614    /// Add addresses the local endpoint considers are reachable for nat traversal.
6615    pub fn add_nat_traversal_address(
6616        &mut self,
6617        address: SocketAddr,
6618    ) -> Result<(), n0_nat_traversal::Error> {
6619        if let Some(added) = self.n0_nat_traversal.add_local_address(address)? {
6620            self.spaces[SpaceId::Data].pending.add_address.insert(added);
6621        };
6622        Ok(())
6623    }
6624
6625    /// Removes an address the endpoing no longer considers reachable for nat traversal
6626    ///
6627    /// Addresses not present in the set will be silently ignored.
6628    pub fn remove_nat_traversal_address(
6629        &mut self,
6630        address: SocketAddr,
6631    ) -> Result<(), n0_nat_traversal::Error> {
6632        if let Some(removed) = self.n0_nat_traversal.remove_local_address(address)? {
6633            self.spaces[SpaceId::Data]
6634                .pending
6635                .remove_address
6636                .insert(removed);
6637        }
6638        Ok(())
6639    }
6640
6641    /// Get the current local nat traversal addresses
6642    pub fn get_local_nat_traversal_addresses(
6643        &self,
6644    ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6645        self.n0_nat_traversal.get_local_nat_traversal_addresses()
6646    }
6647
6648    /// Get the currently advertised nat traversal addresses by the server
6649    pub fn get_remote_nat_traversal_addresses(
6650        &self,
6651    ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6652        Ok(self
6653            .n0_nat_traversal
6654            .client_side()?
6655            .get_remote_nat_traversal_addresses())
6656    }
6657
6658    /// Attempts to open a path for nat traversal.
6659    ///
6660    /// On success returns the [`PathId`] and remote address of the path.
6661    fn open_nat_traversal_path(
6662        &mut self,
6663        now: Instant,
6664        ip_port: (IpAddr, u16),
6665    ) -> Result<Option<(PathId, SocketAddr)>, PathError> {
6666        let remote = ip_port.into();
6667        // TODO(matheus23): Probe the correct 4-tuple, instead of only a remote address?
6668        // By specifying None for `local_ip`, we do two things: 1. open_path_ensure won't
6669        // generate two paths to the same remote and 2. we let the OS choose which
6670        // interface to use for sending on that path.
6671        let network_path = FourTuple {
6672            remote,
6673            local_ip: None,
6674        };
6675        match self.open_path_ensure(network_path, PathStatus::Backup, now) {
6676            Ok((path_id, path_was_known)) => {
6677                if path_was_known {
6678                    trace!(%path_id, %remote, "nat traversal: path existed for remote");
6679                }
6680                Ok(Some((path_id, remote)))
6681            }
6682            Err(e) => {
6683                debug!(%remote, %e, "nat traversal: failed to probe remote");
6684                Err(e)
6685            }
6686        }
6687    }
6688
6689    /// Initiates a new nat traversal round
6690    ///
6691    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
6692    /// frames, and initiating probing of the known remote addresses. When a new round is
6693    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
6694    ///
6695    /// Returns the server addresses that are now being probed.
6696    /// If addresses fail due to spurious errors, these might succeed later and not be returned in
6697    /// this set.
6698    pub fn initiate_nat_traversal_round(
6699        &mut self,
6700        now: Instant,
6701    ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6702        if self.state.is_closed() {
6703            return Err(n0_nat_traversal::Error::Closed);
6704        }
6705
6706        let ipv6 = self.is_ipv6();
6707        let client_state = self.n0_nat_traversal.client_side_mut()?;
6708        let n0_nat_traversal::NatTraversalRound {
6709            new_round,
6710            reach_out_at,
6711            addresses_to_probe,
6712            prev_round_path_ids,
6713        } = client_state.initiate_nat_traversal_round(ipv6)?;
6714
6715        trace!(%new_round, reach_out=reach_out_at.len(), to_probe=addresses_to_probe.len(),
6716            "initiating nat traversal round");
6717
6718        self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6719
6720        for path_id in prev_round_path_ids {
6721            let Some(path) = self.path(path_id) else {
6722                continue;
6723            };
6724            let ip = path.network_path.remote.ip();
6725            let port = path.network_path.remote.port();
6726
6727            // We only close paths that aren't validated (thus are working) that we opened
6728            // in a previous round.
6729            // And we only close paths that we don't want to probe anyways.
6730            if !addresses_to_probe
6731                .iter()
6732                .any(|(_, probe)| *probe == (ip, port))
6733                && !path.validated
6734                && !self.abandoned_paths.contains(&path_id)
6735            {
6736                trace!(%path_id, "closing path from previous round");
6737                let _ =
6738                    self.close_path_inner(now, path_id, PathAbandonReason::NatTraversalRoundEnded);
6739            }
6740        }
6741
6742        let mut err = None;
6743
6744        let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6745        let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6746
6747        for (id, address) in addresses_to_probe {
6748            match self.open_nat_traversal_path(now, address) {
6749                Ok(None) => {}
6750                Ok(Some((path_id, remote))) => {
6751                    path_ids.push(path_id);
6752                    probed_addresses.push(remote);
6753                }
6754                Err(e) => {
6755                    self.n0_nat_traversal
6756                        .client_side_mut()
6757                        .expect("validated")
6758                        .report_in_continuation(id, e);
6759                    err.get_or_insert(e);
6760                }
6761            }
6762        }
6763
6764        if let Some(err) = err {
6765            // We failed to probe any addresses, bail out
6766            if probed_addresses.is_empty() {
6767                return Err(n0_nat_traversal::Error::Multipath(err));
6768            }
6769        }
6770
6771        self.n0_nat_traversal
6772            .client_side_mut()
6773            .expect("connection side validated")
6774            .set_round_path_ids(path_ids);
6775
6776        Ok(probed_addresses)
6777    }
6778
6779    /// Attempts to continue a nat traversal round by trying to open paths for pending client probes.
6780    ///
6781    /// If there was nothing to do, it returns `None`. Otherwise it returns whether the path was
6782    /// successfully open.
6783    fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6784        let ipv6 = self.is_ipv6();
6785        let client_state = self.n0_nat_traversal.client_side_mut().ok()?;
6786        let (id, address) = client_state.continue_nat_traversal_round(ipv6)?;
6787        let open_result = self.open_nat_traversal_path(now, address);
6788        let client_state = self.n0_nat_traversal.client_side_mut().expect("validated");
6789        match open_result {
6790            Ok(None) => Some(true),
6791            Ok(Some((path_id, _remote))) => {
6792                client_state.add_round_path_id(path_id);
6793                Some(true)
6794            }
6795            Err(e) => {
6796                client_state.report_in_continuation(id, e);
6797                Some(false)
6798            }
6799        }
6800    }
6801}
6802
6803impl fmt::Debug for Connection {
6804    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
6805        f.debug_struct("Connection")
6806            .field("handshake_cid", &self.handshake_cid)
6807            .finish()
6808    }
6809}
6810
6811/// Hints when the caller identifies a network change.
6812pub trait NetworkChangeHint: std::fmt::Debug + 'static {
6813    /// Inform the connection if a path may recover after a network change.
6814    ///
6815    /// After network changes, paths may not be recoverable. In this case, waiting for the path to
6816    /// become idle may take longer than what is desirable. If [`Self::is_path_recoverable`]
6817    /// returns `false`, a multipath-enabled, client-side connection will establish a new path to
6818    /// the same remote, closing the current one, instead of migrating the path.
6819    ///
6820    /// Paths that are deemed recoverable will simply be sent a PING for a liveness check.
6821    fn is_path_recoverable(&self, path_id: PathId, network_path: FourTuple) -> bool;
6822}
6823
6824/// Return value for [`Connection::poll_transmit_path`].
6825#[derive(Debug)]
6826enum PollPathStatus {
6827    /// Nothing to send on the path, nothing was written into the [`TransmitBuf`].
6828    NothingToSend {
6829        /// If true there was data to send but congestion control did not allow so.
6830        congestion_blocked: bool,
6831    },
6832    /// The transmit is ready to be sent.
6833    Send(Transmit),
6834}
6835
6836/// Return value for [`Connection::poll_transmit_path_space`].
6837#[derive(Debug)]
6838enum PollPathSpaceStatus {
6839    /// Nothing to send in the space, nothing was written into the [`TransmitBuf`].
6840    NothingToSend {
6841        /// If true there was data to send but congestion control did not allow so.
6842        congestion_blocked: bool,
6843    },
6844    /// One or more packets have been written into the [`TransmitBuf`].
6845    WrotePacket {
6846        /// The highest packet number.
6847        last_packet_number: u64,
6848        /// Whether to pad an already started datagram in the next packet.
6849        ///
6850        /// When packets in Initial, 0-RTT or Handshake packet do not fill the entire
6851        /// datagram they may decide to coalesce with the next packet from a higher
6852        /// encryption level on the same path. But the earlier packet may require specific
6853        /// size requirements for the datagram they are sent in.
6854        ///
6855        /// If a space did not complete the datagram, they use this to request the correct
6856        /// padding in the final packet of the datagram so that the final datagram will have
6857        /// the correct size.
6858        ///
6859        /// If a space did fill an entire datagram, it leaves this to the default of
6860        /// [`PadDatagram::No`].
6861        pad_datagram: PadDatagram,
6862    },
6863    /// Send the contents of the transmit immediately.
6864    ///
6865    /// Packets were written and the GSO batch must end now, regardless from whether higher
6866    /// spaces still have frames to write. This is used when the last datagram written would
6867    /// require too much padding to continue a GSO batch, which would waste space on the
6868    /// wire.
6869    Send {
6870        /// The highest packet number written into the transmit.
6871        last_packet_number: u64,
6872    },
6873}
6874
6875/// Information used to decide what frames to schedule into which packets.
6876///
6877/// Primarily used by [`Connection::poll_transmit_on_path`] and the functions that help
6878/// building packets for it: [`Connection::poll_transmit_path_space`] and
6879/// [`Connection::populate_packet`].
6880#[derive(Debug, Copy, Clone)]
6881struct PathSchedulingInfo {
6882    /// Whether the path is abandoned.
6883    ///
6884    /// Note that a path that is abandoned but still has CIDs can still send a packet. After
6885    /// sending that packet the CIDs have to be considered retired as well and
6886    /// [`Self::has_cids`] should turn `false`.
6887    abandoned: bool,
6888    /// Whether the path may send [`SpaceKind::Data`] frames.
6889    ///
6890    /// Some paths should only send frames from [`SendableFrames::space_id_only`]. All other
6891    /// frames are essentially frames that can be sent on any [`SpaceKind::Data`] space. For
6892    /// those we want to respect packet scheduling rules however.
6893    ///
6894    /// Roughly speaking data frames are only sent on spaces that have CIDs, are not
6895    /// abandoned and have no *better* spaces. However see to comments where this is
6896    /// populated for the exact packet scheduling implementation.
6897    ///
6898    /// This is essentially marks this paths as the best validated space ID, though other
6899    /// paths could be qually good. Except during the handshake in which case it does not
6900    /// need to be validated. Note that once in the closed or draining states this will
6901    /// never be true.
6902    may_send_data: bool,
6903    /// Whether the path may send a CONNECTION_CLOSE frame.
6904    ///
6905    /// This is essentially marks this path as the best validated space ID with a fallback
6906    /// to unvalidated spaces if there are no validated spaces. Though other paths could be
6907    /// equally good.
6908    may_send_close: bool,
6909}
6910
6911#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6912enum PathBlocked {
6913    No,
6914    AntiAmplification,
6915    Congestion,
6916    Pacing,
6917}
6918
6919/// Fields of `Connection` specific to it being client-side or server-side
6920enum ConnectionSide {
6921    Client {
6922        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
6923        token: Bytes,
6924        token_store: Arc<dyn TokenStore>,
6925        server_name: String,
6926    },
6927    Server {
6928        server_config: Arc<ServerConfig>,
6929    },
6930}
6931
6932impl ConnectionSide {
6933    fn remote_may_migrate(&self, state: &State) -> bool {
6934        match self {
6935            Self::Server { server_config } => server_config.migration,
6936            Self::Client { .. } => {
6937                if let Some(hs) = state.as_handshake() {
6938                    hs.allow_server_migration
6939                } else {
6940                    false
6941                }
6942            }
6943        }
6944    }
6945
6946    fn is_client(&self) -> bool {
6947        self.side().is_client()
6948    }
6949
6950    fn is_server(&self) -> bool {
6951        self.side().is_server()
6952    }
6953
6954    fn side(&self) -> Side {
6955        match *self {
6956            Self::Client { .. } => Side::Client,
6957            Self::Server { .. } => Side::Server,
6958        }
6959    }
6960}
6961
6962impl From<SideArgs> for ConnectionSide {
6963    fn from(side: SideArgs) -> Self {
6964        match side {
6965            SideArgs::Client {
6966                token_store,
6967                server_name,
6968            } => Self::Client {
6969                token: token_store.take(&server_name).unwrap_or_default(),
6970                token_store,
6971                server_name,
6972            },
6973            SideArgs::Server {
6974                server_config,
6975                pref_addr_cid: _,
6976                path_validated: _,
6977            } => Self::Server { server_config },
6978        }
6979    }
6980}
6981
6982/// Parameters to `Connection::new` specific to it being client-side or server-side
6983pub(crate) enum SideArgs {
6984    Client {
6985        token_store: Arc<dyn TokenStore>,
6986        server_name: String,
6987    },
6988    Server {
6989        server_config: Arc<ServerConfig>,
6990        pref_addr_cid: Option<ConnectionId>,
6991        path_validated: bool,
6992    },
6993}
6994
6995impl SideArgs {
6996    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6997        match *self {
6998            Self::Client { .. } => None,
6999            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
7000        }
7001    }
7002
7003    pub(crate) fn path_validated(&self) -> bool {
7004        match *self {
7005            Self::Client { .. } => true,
7006            Self::Server { path_validated, .. } => path_validated,
7007        }
7008    }
7009
7010    pub(crate) fn side(&self) -> Side {
7011        match *self {
7012            Self::Client { .. } => Side::Client,
7013            Self::Server { .. } => Side::Server,
7014        }
7015    }
7016}
7017
7018/// Reasons why a connection might be lost
7019#[derive(Debug, Error, Clone, PartialEq, Eq)]
7020pub enum ConnectionError {
7021    /// The peer doesn't implement any supported version
7022    #[error("peer doesn't implement any supported version")]
7023    VersionMismatch,
7024    /// The peer violated the QUIC specification as understood by this implementation
7025    #[error(transparent)]
7026    TransportError(#[from] TransportError),
7027    /// The peer's QUIC stack aborted the connection automatically
7028    #[error("aborted by peer: {0}")]
7029    ConnectionClosed(frame::ConnectionClose),
7030    /// The peer closed the connection
7031    #[error("closed by peer: {0}")]
7032    ApplicationClosed(frame::ApplicationClose),
7033    /// The peer is unable to continue processing this connection, usually due to having restarted
7034    #[error("reset by peer")]
7035    Reset,
7036    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
7037    ///
7038    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
7039    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
7040    /// and [`TransportConfig::keep_alive_interval()`].
7041    #[error("timed out")]
7042    TimedOut,
7043    /// The local application closed the connection
7044    #[error("closed")]
7045    LocallyClosed,
7046    /// The connection could not be created because not enough of the CID space is available
7047    ///
7048    /// Try using longer connection IDs.
7049    #[error("CIDs exhausted")]
7050    CidsExhausted,
7051}
7052
7053impl From<Close> for ConnectionError {
7054    fn from(x: Close) -> Self {
7055        match x {
7056            Close::Connection(reason) => Self::ConnectionClosed(reason),
7057            Close::Application(reason) => Self::ApplicationClosed(reason),
7058        }
7059    }
7060}
7061
7062// For compatibility with API consumers
7063impl From<ConnectionError> for io::Error {
7064    fn from(x: ConnectionError) -> Self {
7065        use ConnectionError::*;
7066        let kind = match x {
7067            TimedOut => io::ErrorKind::TimedOut,
7068            Reset => io::ErrorKind::ConnectionReset,
7069            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
7070            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
7071                io::ErrorKind::Other
7072            }
7073        };
7074        Self::new(kind, x)
7075    }
7076}
7077
7078/// Errors that might trigger a path being closed
7079// TODO(@divma): maybe needs to be reworked based on what we want to do with the public API
7080#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
7081pub enum PathError {
7082    /// The extension was not negotiated with the peer
7083    #[error("multipath extension not negotiated")]
7084    MultipathNotNegotiated,
7085    /// Paths can only be opened client-side
7086    #[error("the server side may not open a path")]
7087    ServerSideNotAllowed,
7088    /// Current limits do not allow us to open more paths
7089    #[error("maximum number of concurrent paths reached")]
7090    MaxPathIdReached,
7091    /// No remote CIDs available to open a new path
7092    #[error("remoted CIDs exhausted")]
7093    RemoteCidsExhausted,
7094    /// Path could not be validated and will be abandoned
7095    #[error("path validation failed")]
7096    ValidationFailed,
7097    /// The remote address for the path is not supported by the endpoint
7098    #[error("invalid remote address")]
7099    InvalidRemoteAddress(SocketAddr),
7100}
7101
7102/// Errors triggered when abandoning a path
7103#[derive(Debug, Error, Clone, Eq, PartialEq)]
7104pub enum ClosePathError {
7105    /// Multipath is not negotiated
7106    #[error("Multipath extension not negotiated")]
7107    MultipathNotNegotiated,
7108    /// The path is already closed or was never opened
7109    #[error("closed path")]
7110    ClosedPath,
7111    /// This is the last path, which can not be abandoned
7112    #[error("last open path")]
7113    LastOpenPath,
7114}
7115
7116/// Error when the multipath extension was not negotiated, but attempted to be used.
7117#[derive(Debug, Error, Clone, Copy)]
7118#[error("Multipath extension not negotiated")]
7119pub struct MultipathNotNegotiated {
7120    _private: (),
7121}
7122
7123/// Events of interest to the application
7124#[derive(Debug)]
7125pub enum Event {
7126    /// The connection's handshake data is ready
7127    HandshakeDataReady,
7128    /// The connection was successfully established
7129    Connected,
7130    /// The TLS handshake was confirmed
7131    HandshakeConfirmed,
7132    /// The connection was lost
7133    ///
7134    /// Emitted if the peer closes the connection or an error is encountered.
7135    ConnectionLost {
7136        /// Reason that the connection was closed
7137        reason: ConnectionError,
7138    },
7139    /// Stream events
7140    Stream(StreamEvent),
7141    /// One or more application datagrams have been received
7142    DatagramReceived,
7143    /// One or more application datagrams have been sent after blocking
7144    DatagramsUnblocked,
7145    /// (Multi)Path events
7146    Path(PathEvent),
7147    /// n0's nat traversal events
7148    NatTraversal(n0_nat_traversal::Event),
7149}
7150
7151impl From<PathEvent> for Event {
7152    fn from(source: PathEvent) -> Self {
7153        Self::Path(source)
7154    }
7155}
7156
7157fn get_max_ack_delay(params: &TransportParameters) -> Duration {
7158    Duration::from_micros(params.max_ack_delay.0 * 1000)
7159}
7160
7161// Prevents overflow and improves behavior in extreme circumstances
7162const MAX_BACKOFF_EXPONENT: u32 = 16;
7163
7164/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
7165///
7166/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
7167/// plus some space for frames. We only care about handshake headers because short header packets
7168/// necessarily have smaller headers, and initial packets are only ever the first packet in a
7169/// datagram (because we coalesce in ascending packet space order and the only reason to split a
7170/// packet is when packet space changes).
7171const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
7172
7173/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
7174///
7175/// Excludes packet-type-specific fields such as packet number or Initial token
7176// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
7177// scid len + scid + length + pn
7178const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
7179    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
7180
7181#[derive(Default)]
7182struct SentFrames {
7183    retransmits: ThinRetransmits,
7184    /// The packet number of the largest acknowledged packet for each path
7185    largest_acked: FxHashMap<PathId, u64>,
7186    stream_frames: StreamMetaVec,
7187    /// Whether the packet contains non-retransmittable frames (like datagrams)
7188    non_retransmits: bool,
7189    /// If the datagram containing these frames should be padded to the min MTU
7190    requires_padding: bool,
7191}
7192
7193impl SentFrames {
7194    /// Returns whether the packet contains only ACKs
7195    fn is_ack_only(&self, streams: &StreamsState) -> bool {
7196        !self.largest_acked.is_empty()
7197            && !self.non_retransmits
7198            && self.stream_frames.is_empty()
7199            && self.retransmits.is_empty(streams)
7200    }
7201
7202    fn retransmits_mut(&mut self) -> &mut Retransmits {
7203        self.retransmits.get_or_create()
7204    }
7205
7206    fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
7207        use frame::EncodableFrame::*;
7208        match frame {
7209            PathAck(path_ack_encoder) => {
7210                if let Some(max) = path_ack_encoder.ranges.max() {
7211                    self.largest_acked.insert(path_ack_encoder.path_id, max);
7212                }
7213            }
7214            Ack(ack_encoder) => {
7215                if let Some(max) = ack_encoder.ranges.max() {
7216                    self.largest_acked.insert(PathId::ZERO, max);
7217                }
7218            }
7219            Close(_) => { /* non retransmittable, but after this we don't really care */ }
7220            PathResponse(_) => self.non_retransmits = true,
7221            HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
7222            ReachOut(frame::ReachOut { round, ip, port }) => {
7223                let (recorded_round, reach_outs) = self
7224                    .retransmits_mut()
7225                    .reach_out
7226                    .get_or_insert_with(|| (round, FxHashSet::default()));
7227                // Only record reach outs for the current round or a newer than the recorded one.
7228                if *recorded_round == round {
7229                    // Same round, simply append.
7230                    reach_outs.insert((ip, port));
7231                } else if *recorded_round < round {
7232                    // New round.
7233                    *recorded_round = round;
7234                    reach_outs.drain();
7235                    reach_outs.insert((ip, port));
7236                } else {
7237                    // ignore old reach out that was sent
7238                }
7239            }
7240
7241            ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
7242            Ping(_) => self.non_retransmits = true,
7243            ImmediateAck(_) => self.non_retransmits = true,
7244            AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
7245            PathChallenge(_) => self.non_retransmits = true,
7246            Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
7247            PathAbandon(path_abandon) => {
7248                self.retransmits_mut()
7249                    .path_abandon
7250                    .entry(path_abandon.path_id)
7251                    .or_insert(path_abandon.error_code);
7252            }
7253            PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
7254            | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
7255                self.retransmits_mut().path_status.insert(path_id);
7256            }
7257            MaxPathId(_) => self.retransmits_mut().max_path_id = true,
7258            PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
7259            PathCidsBlocked(path_cids_blocked) => {
7260                self.retransmits_mut()
7261                    .path_cids_blocked
7262                    .insert(path_cids_blocked.path_id);
7263            }
7264            ResetStream(reset) => self
7265                .retransmits_mut()
7266                .reset_stream
7267                .push((reset.id, reset.error_code)),
7268            StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
7269            NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
7270            RetireConnectionId(retire_cid) => self
7271                .retransmits_mut()
7272                .retire_cids
7273                .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
7274            Datagram(_) => self.non_retransmits = true,
7275            NewToken(_) => {}
7276            AddAddress(add_address) => {
7277                self.retransmits_mut().add_address.insert(add_address);
7278            }
7279            RemoveAddress(remove_address) => {
7280                self.retransmits_mut().remove_address.insert(remove_address);
7281            }
7282            StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
7283            MaxData(_) => self.retransmits_mut().max_data = true,
7284            MaxStreamData(max) => {
7285                self.retransmits_mut().max_stream_data.insert(max.id);
7286            }
7287            MaxStreams(max_streams) => {
7288                self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
7289            }
7290        }
7291    }
7292}
7293
7294/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
7295///
7296/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
7297///
7298/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
7299///
7300/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
7301fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
7302    match (x, y) {
7303        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
7304        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
7305        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
7306        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
7307    }
7308}
7309
7310#[cfg(test)]
7311mod tests {
7312    use super::*;
7313
7314    #[test]
7315    fn negotiate_max_idle_timeout_commutative() {
7316        let test_params = [
7317            (None, None, None),
7318            (None, Some(VarInt(0)), None),
7319            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7320            (Some(VarInt(0)), Some(VarInt(0)), None),
7321            (
7322                Some(VarInt(2)),
7323                Some(VarInt(0)),
7324                Some(Duration::from_millis(2)),
7325            ),
7326            (
7327                Some(VarInt(1)),
7328                Some(VarInt(4)),
7329                Some(Duration::from_millis(1)),
7330            ),
7331        ];
7332
7333        for (left, right, result) in test_params {
7334            assert_eq!(negotiate_max_idle_timeout(left, right), result);
7335            assert_eq!(negotiate_max_idle_timeout(right, left), result);
7336        }
7337    }
7338}