iroh_quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque, btree_map},
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    num::{NonZeroU32, NonZeroUsize},
8    ops::Not,
9    sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21    Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22    MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23    TransportErrorCode, VarInt,
24    cid_generator::ConnectionIdGenerator,
25    cid_queue::CidQueue,
26    coding::BufMutExt,
27    config::{ServerConfig, TransportConfig},
28    congestion::Controller,
29    connection::{
30        paths::AbandonState,
31        qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
32        spaces::LostPacket,
33        timer::{ConnTimer, PathTimer},
34    },
35    crypto::{self, KeyPair, Keys, PacketKey},
36    frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
37    iroh_hp,
38    packet::{
39        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40        PacketNumber, PartialDecode, SpaceId,
41    },
42    range_set::ArrayRangeSet,
43    shared::{
44        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45        EndpointEvent, EndpointEventInner,
46    },
47    token::{ResetToken, Token, TokenPayload},
48    transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54mod assembler;
55pub use assembler::Chunk;
56
57mod cid_state;
58use cid_state::CidState;
59
60mod datagrams;
61use datagrams::DatagramState;
62pub use datagrams::{Datagrams, SendDatagramError};
63
64mod mtud;
65mod pacing;
66
67mod packet_builder;
68use packet_builder::{PacketBuilder, PadDatagram};
69
70mod packet_crypto;
71use packet_crypto::{PrevCrypto, ZeroRttCrypto};
72
73mod paths;
74pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
75use paths::{PathData, PathState};
76
77pub(crate) mod qlog;
78
79mod send_buffer;
80
81mod spaces;
82#[cfg(fuzzing)]
83pub use spaces::Retransmits;
84#[cfg(not(fuzzing))]
85use spaces::Retransmits;
86use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
87
88mod stats;
89pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
90
91mod streams;
92#[cfg(fuzzing)]
93pub use streams::StreamsState;
94#[cfg(not(fuzzing))]
95use streams::StreamsState;
96pub use streams::{
97    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
98    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
99};
100
101mod timer;
102use timer::{Timer, TimerTable};
103
104mod transmit_buf;
105use transmit_buf::TransmitBuf;
106
107mod state;
108
109#[cfg(not(fuzzing))]
110use state::State;
111#[cfg(fuzzing)]
112pub use state::State;
113use state::StateType;
114
115/// Protocol state and logic for a single QUIC connection
116///
117/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
118/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
119/// expects timeouts through various methods. A number of simple getter methods are exposed
120/// to allow callers to inspect some of the connection state.
121///
122/// `Connection` has roughly 4 types of methods:
123///
124/// - A. Simple getters, taking `&self`
125/// - B. Handlers for incoming events from the network or system, named `handle_*`.
126/// - C. State machine mutators, for incoming commands from the application. For convenience we
127///   refer to this as "performing I/O" below, however as per the design of this library none of the
128///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
129///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
130/// - D. Polling functions for outgoing events or actions for the caller to
131///   take, named `poll_*`.
132///
133/// The simplest way to use this API correctly is to call (B) and (C) whenever
134/// appropriate, then after each of those calls, as soon as feasible call all
135/// polling methods (D) and deal with their outputs appropriately, e.g. by
136/// passing it to the application or by making a system-level I/O call. You
137/// should call the polling functions in this order:
138///
139/// 1. [`poll_transmit`](Self::poll_transmit)
140/// 2. [`poll_timeout`](Self::poll_timeout)
141/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
142/// 4. [`poll`](Self::poll)
143///
144/// Currently the only actual dependency is from (2) to (1), however additional
145/// dependencies may be added in future, so the above order is recommended.
146///
147/// (A) may be called whenever desired.
148///
149/// Care should be made to ensure that the input events represent monotonically
150/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
151/// with events of the same [`Instant`] may be interleaved in any order with a
152/// call to [`handle_event`](Self::handle_event) at that same instant; however
153/// events or timeouts with different instants must not be interleaved.
154pub struct Connection {
155    endpoint_config: Arc<EndpointConfig>,
156    config: Arc<TransportConfig>,
157    rng: StdRng,
158    crypto: Box<dyn crypto::Session>,
159    /// The CID we initially chose, for use during the handshake
160    handshake_cid: ConnectionId,
161    /// The CID the peer initially chose, for use during the handshake
162    rem_handshake_cid: ConnectionId,
163    /// The "real" local IP address which was was used to receive the initial packet.
164    /// This is only populated for the server case, and if known
165    local_ip: Option<IpAddr>,
166    /// The [`PathData`] for each path
167    ///
168    /// This needs to be ordered because [`Connection::poll_transmit`] needs to
169    /// deterministically select the next PathId to send on.
170    // TODO(flub): well does it really? But deterministic is nice for now.
171    paths: BTreeMap<PathId, PathState>,
172    /// Incremented every time we see a new path
173    ///
174    /// Stored separately from `path.generation` to account for aborted migrations
175    path_counter: u64,
176    /// Whether MTU detection is supported in this environment
177    allow_mtud: bool,
178    state: State,
179    side: ConnectionSide,
180    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
181    zero_rtt_enabled: bool,
182    /// Set if 0-RTT is supported, then cleared when no longer needed.
183    zero_rtt_crypto: Option<ZeroRttCrypto>,
184    key_phase: bool,
185    /// How many packets are in the current key phase. Used only for `Data` space.
186    key_phase_size: u64,
187    /// Transport parameters set by the peer
188    peer_params: TransportParameters,
189    /// Source ConnectionId of the first packet received from the peer
190    orig_rem_cid: ConnectionId,
191    /// Destination ConnectionId sent by the client on the first Initial
192    initial_dst_cid: ConnectionId,
193    /// The value that the server included in the Source Connection ID field of a Retry packet, if
194    /// one was received
195    retry_src_cid: Option<ConnectionId>,
196    /// Events returned by [`Connection::poll`]
197    events: VecDeque<Event>,
198    endpoint_events: VecDeque<EndpointEventInner>,
199    /// Whether the spin bit is in use for this connection
200    spin_enabled: bool,
201    /// Outgoing spin bit state
202    spin: bool,
203    /// Packet number spaces: initial, handshake, 1-RTT
204    spaces: [PacketSpace; 3],
205    /// Highest usable [`SpaceId`]
206    highest_space: SpaceId,
207    /// 1-RTT keys used prior to a key update
208    prev_crypto: Option<PrevCrypto>,
209    /// 1-RTT keys to be used for the next key update
210    ///
211    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
212    /// spoofing key updates.
213    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
214    accepted_0rtt: bool,
215    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
216    permit_idle_reset: bool,
217    /// Negotiated idle timeout
218    idle_timeout: Option<Duration>,
219    timers: TimerTable,
220    /// Number of packets received which could not be authenticated
221    authentication_failures: u64,
222
223    //
224    // Queued non-retransmittable 1-RTT data
225    //
226    /// If the CONNECTION_CLOSE frame needs to be sent
227    close: bool,
228
229    //
230    // ACK frequency
231    //
232    ack_frequency: AckFrequencyState,
233
234    //
235    // Congestion Control
236    //
237    /// Whether the most recently received packet had an ECN codepoint set
238    receiving_ecn: bool,
239    /// Number of packets authenticated
240    total_authed_packets: u64,
241    /// Whether the last `poll_transmit` call yielded no data because there was
242    /// no outgoing application data.
243    app_limited: bool,
244
245    //
246    // ObservedAddr
247    //
248    /// Sequence number for the next observed address frame sent to the peer.
249    next_observed_addr_seq_no: VarInt,
250
251    streams: StreamsState,
252    /// Surplus remote CIDs for future use on new paths
253    ///
254    /// These are given out before multiple paths exist, also for paths that will never
255    /// exist.  So if multipath is supported the number of paths here will be higher than
256    /// the actual number of paths in use.
257    rem_cids: FxHashMap<PathId, CidQueue>,
258    /// Attributes of CIDs generated by local endpoint
259    ///
260    /// Any path that is allowed to be opened is present in this map, as well as the already
261    /// opened paths. However since CIDs are issued async by the endpoint driver via
262    /// connection events it can not be used to know if CIDs have been issued for a path or
263    /// not. See [`Connection::max_path_id_with_cids`] for this.
264    local_cid_state: FxHashMap<PathId, CidState>,
265    /// State of the unreliable datagram extension
266    datagrams: DatagramState,
267    /// Connection level statistics
268    stats: ConnectionStats,
269    /// Path level statistics
270    path_stats: FxHashMap<PathId, PathStats>,
271    /// QUIC version used for the connection.
272    version: u32,
273
274    //
275    // Multipath
276    //
277    /// Maximum number of concurrent paths
278    ///
279    /// Initially set from the [`TransportConfig::max_concurrent_multipath_paths`]. Even
280    /// when multipath is disabled this will be set to 1, it is not used in that case
281    /// though.
282    max_concurrent_paths: NonZeroU32,
283    /// Local maximum [`PathId`] to be used
284    ///
285    /// This is initially set to [`TransportConfig::get_initial_max_path_id`] when multipath
286    /// is negotiated, or to [`PathId::ZERO`] otherwise. This is essentially the value of
287    /// the highest MAX_PATH_ID frame sent.
288    ///
289    /// Any path with an ID equal or below this [`PathId`] is either:
290    ///
291    /// - Abandoned, if it is also in [`Connection::abandoned_paths`].
292    /// - Open, in this case it is present in [`Connection::paths`]
293    /// - Not yet opened, if it is in neither of these two places.
294    ///
295    /// Note that for not-yet-open there may or may not be any CIDs issued. See
296    /// [`Connection::max_path_id_with_cids`].
297    local_max_path_id: PathId,
298    /// Remote's maximum [`PathId`] to be used
299    ///
300    /// This is initially set to the peer's [`TransportParameters::initial_max_path_id`] when
301    /// multipath is negotiated, or to [`PathId::ZERO`] otherwise. A peer may increase this limit
302    /// by sending [`Frame::MaxPathId`] frames.
303    remote_max_path_id: PathId,
304    /// The greatest [`PathId`] we have issued CIDs for
305    ///
306    /// CIDs are only issued for `min(local_max_path_id, remote_max_path_id)`. It is not
307    /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
308    /// since they are issued asynchronously by the endpoint driver.
309    max_path_id_with_cids: PathId,
310    /// The paths already abandoned
311    ///
312    /// They may still have some state left in [`Connection::paths`] or
313    /// [`Connection::local_cid_state`] since some of this has to be kept around for some
314    /// time after a path is abandoned.
315    // TODO(flub): Make this a more efficient data structure.  Like ranges of abandoned
316    //    paths.  Or a set together with a minimum.  Or something.
317    abandoned_paths: FxHashSet<PathId>,
318
319    iroh_hp: iroh_hp::State,
320    qlog: QlogSink,
321}
322
323impl Connection {
324    pub(crate) fn new(
325        endpoint_config: Arc<EndpointConfig>,
326        config: Arc<TransportConfig>,
327        init_cid: ConnectionId,
328        loc_cid: ConnectionId,
329        rem_cid: ConnectionId,
330        remote: SocketAddr,
331        local_ip: Option<IpAddr>,
332        crypto: Box<dyn crypto::Session>,
333        cid_gen: &dyn ConnectionIdGenerator,
334        now: Instant,
335        version: u32,
336        allow_mtud: bool,
337        rng_seed: [u8; 32],
338        side_args: SideArgs,
339        qlog: QlogSink,
340    ) -> Self {
341        let pref_addr_cid = side_args.pref_addr_cid();
342        let path_validated = side_args.path_validated();
343        let connection_side = ConnectionSide::from(side_args);
344        let side = connection_side.side();
345        let mut rng = StdRng::from_seed(rng_seed);
346        let initial_space = {
347            let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
348            space.crypto = Some(crypto.initial_keys(init_cid, side));
349            space
350        };
351        let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
352        #[cfg(test)]
353        let data_space = match config.deterministic_packet_numbers {
354            true => PacketSpace::new_deterministic(now, SpaceId::Data),
355            false => PacketSpace::new(now, SpaceId::Data, &mut rng),
356        };
357        #[cfg(not(test))]
358        let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
359        let state = State::handshake(state::Handshake {
360            rem_cid_set: side.is_server(),
361            expected_token: Bytes::new(),
362            client_hello: None,
363            allow_server_migration: side.is_client(),
364        });
365        let local_cid_state = FxHashMap::from_iter([(
366            PathId::ZERO,
367            CidState::new(
368                cid_gen.cid_len(),
369                cid_gen.cid_lifetime(),
370                now,
371                if pref_addr_cid.is_some() { 2 } else { 1 },
372            ),
373        )]);
374
375        let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
376        // TODO(@divma): consider if we want to delay this until the path is validated
377        path.open = true;
378        let mut this = Self {
379            endpoint_config,
380            crypto,
381            handshake_cid: loc_cid,
382            rem_handshake_cid: rem_cid,
383            local_cid_state,
384            paths: BTreeMap::from_iter([(
385                PathId::ZERO,
386                PathState {
387                    data: path,
388                    prev: None,
389                },
390            )]),
391            path_counter: 0,
392            allow_mtud,
393            local_ip,
394            state,
395            side: connection_side,
396            zero_rtt_enabled: false,
397            zero_rtt_crypto: None,
398            key_phase: false,
399            // A small initial key phase size ensures peers that don't handle key updates correctly
400            // fail sooner rather than later. It's okay for both peers to do this, as the first one
401            // to perform an update will reset the other's key phase size in `update_keys`, and a
402            // simultaneous key update by both is just like a regular key update with a really fast
403            // response. Inspired by quic-go's similar behavior of performing the first key update
404            // at the 100th short-header packet.
405            key_phase_size: rng.random_range(10..1000),
406            peer_params: TransportParameters::default(),
407            orig_rem_cid: rem_cid,
408            initial_dst_cid: init_cid,
409            retry_src_cid: None,
410            events: VecDeque::new(),
411            endpoint_events: VecDeque::new(),
412            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
413            spin: false,
414            spaces: [initial_space, handshake_space, data_space],
415            highest_space: SpaceId::Initial,
416            prev_crypto: None,
417            next_crypto: None,
418            accepted_0rtt: false,
419            permit_idle_reset: true,
420            idle_timeout: match config.max_idle_timeout {
421                None | Some(VarInt(0)) => None,
422                Some(dur) => Some(Duration::from_millis(dur.0)),
423            },
424            timers: TimerTable::default(),
425            authentication_failures: 0,
426            close: false,
427
428            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
429                &TransportParameters::default(),
430            )),
431
432            app_limited: false,
433            receiving_ecn: false,
434            total_authed_packets: 0,
435
436            next_observed_addr_seq_no: 0u32.into(),
437
438            streams: StreamsState::new(
439                side,
440                config.max_concurrent_uni_streams,
441                config.max_concurrent_bidi_streams,
442                config.send_window,
443                config.receive_window,
444                config.stream_receive_window,
445            ),
446            datagrams: DatagramState::default(),
447            config,
448            rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
449            rng,
450            stats: ConnectionStats::default(),
451            path_stats: Default::default(),
452            version,
453
454            // peer params are not yet known, so multipath is not enabled
455            max_concurrent_paths: NonZeroU32::MIN,
456            local_max_path_id: PathId::ZERO,
457            remote_max_path_id: PathId::ZERO,
458            max_path_id_with_cids: PathId::ZERO,
459            abandoned_paths: Default::default(),
460
461            // iroh's nat traversal
462            iroh_hp: Default::default(),
463            qlog,
464        };
465        if path_validated {
466            this.on_path_validated(PathId::ZERO);
467        }
468        if side.is_client() {
469            // Kick off the connection
470            this.write_crypto();
471            this.init_0rtt(now);
472        }
473        this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
474        this
475    }
476
477    /// Returns the next time at which `handle_timeout` should be called
478    ///
479    /// The value returned may change after:
480    /// - the application performed some I/O on the connection
481    /// - a call was made to `handle_event`
482    /// - a call to `poll_transmit` returned `Some`
483    /// - a call was made to `handle_timeout`
484    #[must_use]
485    pub fn poll_timeout(&mut self) -> Option<Instant> {
486        self.timers.peek()
487    }
488
489    /// Returns application-facing events
490    ///
491    /// Connections should be polled for events after:
492    /// - a call was made to `handle_event`
493    /// - a call was made to `handle_timeout`
494    #[must_use]
495    pub fn poll(&mut self) -> Option<Event> {
496        if let Some(x) = self.events.pop_front() {
497            return Some(x);
498        }
499
500        if let Some(event) = self.streams.poll() {
501            return Some(Event::Stream(event));
502        }
503
504        if let Some(reason) = self.state.take_error() {
505            return Some(Event::ConnectionLost { reason });
506        }
507
508        None
509    }
510
511    /// Return endpoint-facing events
512    #[must_use]
513    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
514        self.endpoint_events.pop_front().map(EndpointEvent)
515    }
516
517    /// Provide control over streams
518    #[must_use]
519    pub fn streams(&mut self) -> Streams<'_> {
520        Streams {
521            state: &mut self.streams,
522            conn_state: &self.state,
523        }
524    }
525
526    /// Provide control over streams
527    #[must_use]
528    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
529        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
530        RecvStream {
531            id,
532            state: &mut self.streams,
533            pending: &mut self.spaces[SpaceId::Data].pending,
534        }
535    }
536
537    /// Provide control over streams
538    #[must_use]
539    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
540        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
541        SendStream {
542            id,
543            state: &mut self.streams,
544            pending: &mut self.spaces[SpaceId::Data].pending,
545            conn_state: &self.state,
546        }
547    }
548
549    /// Opens a new path only if no path to the remote address exists so far
550    ///
551    /// See [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id,
552    /// false)` if was opened.
553    ///
554    /// [`open_path`]: Connection::open_path
555    pub fn open_path_ensure(
556        &mut self,
557        remote: SocketAddr,
558        initial_status: PathStatus,
559        now: Instant,
560    ) -> Result<(PathId, bool), PathError> {
561        match self
562            .paths
563            .iter()
564            .find(|(_id, path)| path.data.remote == remote)
565        {
566            Some((path_id, _state)) => Ok((*path_id, true)),
567            None => self
568                .open_path(remote, initial_status, now)
569                .map(|id| (id, false)),
570        }
571    }
572
573    /// Opens a new path
574    ///
575    /// Further errors might occur and they will be emitted in [`PathEvent::LocallyClosed`] events.
576    /// When the path is opened it will be reported as an [`PathEvent::Opened`].
577    pub fn open_path(
578        &mut self,
579        remote: SocketAddr,
580        initial_status: PathStatus,
581        now: Instant,
582    ) -> Result<PathId, PathError> {
583        if !self.is_multipath_negotiated() {
584            return Err(PathError::MultipathNotNegotiated);
585        }
586        if self.side().is_server() {
587            return Err(PathError::ServerSideNotAllowed);
588        }
589
590        let max_abandoned = self.abandoned_paths.iter().max().copied();
591        let max_used = self.paths.keys().last().copied();
592        let path_id = max_abandoned
593            .max(max_used)
594            .unwrap_or(PathId::ZERO)
595            .saturating_add(1u8);
596
597        if Some(path_id) > self.max_path_id() {
598            return Err(PathError::MaxPathIdReached);
599        }
600        if path_id > self.remote_max_path_id {
601            self.spaces[SpaceId::Data].pending.paths_blocked = true;
602            return Err(PathError::MaxPathIdReached);
603        }
604        if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
605            self.spaces[SpaceId::Data]
606                .pending
607                .path_cids_blocked
608                .push(path_id);
609            return Err(PathError::RemoteCidsExhausted);
610        }
611
612        let path = self.ensure_path(path_id, remote, now, None);
613        path.status.local_update(initial_status);
614
615        Ok(path_id)
616    }
617
618    /// Closes a path by sending a PATH_ABANDON frame
619    ///
620    /// This will not allow closing the last path. It does allow closing paths which have
621    /// not yet been opened, as e.g. is the case when receiving a PATH_ABANDON from the peer
622    /// for a path that was never opened locally.
623    pub fn close_path(
624        &mut self,
625        now: Instant,
626        path_id: PathId,
627        error_code: VarInt,
628    ) -> Result<(), ClosePathError> {
629        if self.abandoned_paths.contains(&path_id)
630            || Some(path_id) > self.max_path_id()
631            || !self.paths.contains_key(&path_id)
632        {
633            return Err(ClosePathError::ClosedPath);
634        }
635        if self
636            .paths
637            .iter()
638            // Would there be any remaining, non-abandoned, validated paths
639            .any(|(id, path)| {
640                *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
641            })
642            .not()
643        {
644            return Err(ClosePathError::LastOpenPath);
645        }
646
647        // Send PATH_ABANDON
648        self.spaces[SpaceId::Data]
649            .pending
650            .path_abandon
651            .insert(path_id, error_code.into());
652
653        // Remove pending NEW CIDs for this path
654        let pending_space = &mut self.spaces[SpaceId::Data].pending;
655        pending_space.new_cids.retain(|cid| cid.path_id != path_id);
656        pending_space.path_cids_blocked.retain(|&id| id != path_id);
657        pending_space.path_status.retain(|&id| id != path_id);
658
659        // Cleanup retransmits across ALL paths (CIDs for path_id may have been transmitted on other paths)
660        for space in self.spaces[SpaceId::Data].iter_paths_mut() {
661            for sent_packet in space.sent_packets.values_mut() {
662                if let Some(retransmits) = sent_packet.retransmits.get_mut() {
663                    retransmits.new_cids.retain(|cid| cid.path_id != path_id);
664                    retransmits.path_cids_blocked.retain(|&id| id != path_id);
665                    retransmits.path_status.retain(|&id| id != path_id);
666                }
667            }
668        }
669
670        // Consider remotely issued CIDs as retired.
671        // Technically we don't have to do this just yet.  We only need to do this *after*
672        // the ABANDON_PATH frame is sent, allowing us to still send it on the
673        // to-be-abandoned path.  However it is recommended to send it on another path, and
674        // we do not allow abandoning the last path anyway.
675        self.rem_cids.remove(&path_id);
676        self.endpoint_events
677            .push_back(EndpointEventInner::RetireResetToken(path_id));
678
679        self.abandoned_paths.insert(path_id);
680
681        self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
682
683        Ok(())
684    }
685
686    /// Gets the [`PathData`] for a known [`PathId`].
687    ///
688    /// Will panic if the path_id does not reference any known path.
689    #[track_caller]
690    fn path_data(&self, path_id: PathId) -> &PathData {
691        if let Some(data) = self.paths.get(&path_id) {
692            &data.data
693        } else {
694            panic!(
695                "unknown path: {path_id}, currently known paths: {:?}",
696                self.paths.keys().collect::<Vec<_>>()
697            );
698        }
699    }
700
701    /// Gets a reference to the [`PathData`] for a [`PathId`]
702    fn path(&self, path_id: PathId) -> Option<&PathData> {
703        self.paths.get(&path_id).map(|path_state| &path_state.data)
704    }
705
706    /// Gets a mutable reference to the [`PathData`] for a [`PathId`]
707    fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
708        self.paths
709            .get_mut(&path_id)
710            .map(|path_state| &mut path_state.data)
711    }
712
713    /// Returns all known paths.
714    ///
715    /// There is no guarantee any of these paths are open or usable.
716    pub fn paths(&self) -> Vec<PathId> {
717        self.paths.keys().copied().collect()
718    }
719
720    /// Gets the local [`PathStatus`] for a known [`PathId`]
721    pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
722        self.path(path_id)
723            .map(PathData::local_status)
724            .ok_or(ClosedPath { _private: () })
725    }
726
727    /// Returns the path's remote socket address
728    pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
729        self.path(path_id)
730            .map(|path| path.remote)
731            .ok_or(ClosedPath { _private: () })
732    }
733
734    /// Sets the [`PathStatus`] for a known [`PathId`]
735    ///
736    /// Returns the previous path status on success.
737    pub fn set_path_status(
738        &mut self,
739        path_id: PathId,
740        status: PathStatus,
741    ) -> Result<PathStatus, SetPathStatusError> {
742        if !self.is_multipath_negotiated() {
743            return Err(SetPathStatusError::MultipathNotNegotiated);
744        }
745        let path = self
746            .path_mut(path_id)
747            .ok_or(SetPathStatusError::ClosedPath)?;
748        let prev = match path.status.local_update(status) {
749            Some(prev) => {
750                self.spaces[SpaceId::Data]
751                    .pending
752                    .path_status
753                    .insert(path_id);
754                prev
755            }
756            None => path.local_status(),
757        };
758        Ok(prev)
759    }
760
761    /// Returns the remote path status
762    // TODO(flub): Probably should also be some kind of path event?  Not even sure if I like
763    //    this as an API, but for now it allows me to write a test easily.
764    // TODO(flub): Technically this should be a Result<Option<PathSTatus>>?
765    pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
766        self.path(path_id).and_then(|path| path.remote_status())
767    }
768
769    /// Sets the max_idle_timeout for a specific path
770    ///
771    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
772    ///
773    /// Returns the previous value of the setting.
774    pub fn set_path_max_idle_timeout(
775        &mut self,
776        path_id: PathId,
777        timeout: Option<Duration>,
778    ) -> Result<Option<Duration>, ClosedPath> {
779        let path = self
780            .paths
781            .get_mut(&path_id)
782            .ok_or(ClosedPath { _private: () })?;
783        Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
784    }
785
786    /// Sets the keep_alive_interval for a specific path
787    ///
788    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
789    ///
790    /// Returns the previous value of the setting.
791    pub fn set_path_keep_alive_interval(
792        &mut self,
793        path_id: PathId,
794        interval: Option<Duration>,
795    ) -> Result<Option<Duration>, ClosedPath> {
796        let path = self
797            .paths
798            .get_mut(&path_id)
799            .ok_or(ClosedPath { _private: () })?;
800        Ok(std::mem::replace(&mut path.data.keep_alive, interval))
801    }
802
803    /// Gets the [`PathData`] for a known [`PathId`].
804    ///
805    /// Will panic if the path_id does not reference any known path.
806    #[track_caller]
807    fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
808        &mut self.paths.get_mut(&path_id).expect("known path").data
809    }
810
811    fn ensure_path(
812        &mut self,
813        path_id: PathId,
814        remote: SocketAddr,
815        now: Instant,
816        pn: Option<u64>,
817    ) -> &mut PathData {
818        let vacant_entry = match self.paths.entry(path_id) {
819            btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
820            btree_map::Entry::Occupied(occupied_entry) => {
821                return &mut occupied_entry.into_mut().data;
822            }
823        };
824
825        // TODO(matheus23): Add back short-circuiting path.validated = true, if we know that the
826        // path's four-tuple was already validated.
827        debug!(%path_id, ?remote, "path added");
828        let peer_max_udp_payload_size =
829            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
830        self.path_counter = self.path_counter.wrapping_add(1);
831        let mut data = PathData::new(
832            remote,
833            self.allow_mtud,
834            Some(peer_max_udp_payload_size),
835            self.path_counter,
836            now,
837            &self.config,
838        );
839
840        let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
841        self.timers.set(
842            Timer::PerPath(path_id, PathTimer::PathOpen),
843            now + 3 * pto,
844            self.qlog.with_time(now),
845        );
846
847        // for the path to be opened we need to send a packet on the path. Sending a challenge
848        // guarantees this
849        data.send_new_challenge = true;
850
851        let path = vacant_entry.insert(PathState { data, prev: None });
852
853        let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
854        if let Some(pn) = pn {
855            pn_space.dedup.insert(pn);
856        }
857        self.spaces[SpaceId::Data]
858            .number_spaces
859            .insert(path_id, pn_space);
860        self.qlog.emit_tuple_assigned(path_id, remote, now);
861        &mut path.data
862    }
863
864    /// Returns packets to transmit
865    ///
866    /// Connections should be polled for transmit after:
867    /// - the application performed some I/O on the connection
868    /// - a call was made to `handle_event`
869    /// - a call was made to `handle_timeout`
870    ///
871    /// `max_datagrams` specifies how many datagrams can be returned inside a
872    /// single Transmit using GSO. This must be at least 1.
873    #[must_use]
874    pub fn poll_transmit(
875        &mut self,
876        now: Instant,
877        max_datagrams: NonZeroUsize,
878        buf: &mut Vec<u8>,
879    ) -> Option<Transmit> {
880        if let Some(probing) = self
881            .iroh_hp
882            .server_side_mut()
883            .ok()
884            .and_then(iroh_hp::ServerState::next_probe)
885        {
886            let destination = probing.remote();
887            trace!(%destination, "RAND_DATA packet");
888            let token: u64 = self.rng.random();
889            buf.put_u64(token);
890            probing.finish(token);
891            return Some(Transmit {
892                destination,
893                ecn: None,
894                size: 8,
895                segment_size: None,
896                src_ip: None,
897            });
898        }
899
900        let max_datagrams = match self.config.enable_segmentation_offload {
901            false => NonZeroUsize::MIN,
902            true => max_datagrams,
903        };
904
905        // Each call to poll_transmit can only send datagrams to one destination, because
906        // all datagrams in a GSO batch are for the same destination.  Therefore only
907        // datagrams for one Path ID are produced for each poll_transmit call.
908
909        // TODO(flub): this is wishful thinking and not actually implemented, but perhaps it
910        //   should be:
911
912        // First, if we have to send a close, select a path for that.
913        // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending.
914
915        // For all, open, validated and AVAILABLE paths:
916        // - Is the path congestion blocked or pacing blocked?
917        // - call maybe_queue_ to ensure a tail-loss probe would be sent?
918        // - do we need to send a close message?
919        // - call can_send
920        // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths
921
922        // Check whether we need to send a close message
923        let close = match self.state.as_type() {
924            StateType::Drained => {
925                self.app_limited = true;
926                return None;
927            }
928            StateType::Draining | StateType::Closed => {
929                // self.close is only reset once the associated packet had been
930                // encoded successfully
931                if !self.close {
932                    self.app_limited = true;
933                    return None;
934                }
935                true
936            }
937            _ => false,
938        };
939
940        // Check whether we need to send an ACK_FREQUENCY frame
941        if let Some(config) = &self.config.ack_frequency_config {
942            let rtt = self
943                .paths
944                .values()
945                .map(|p| p.data.rtt.get())
946                .min()
947                .expect("one path exists");
948            self.spaces[SpaceId::Data].pending.ack_frequency = self
949                .ack_frequency
950                .should_send_ack_frequency(rtt, config, &self.peer_params)
951                && self.highest_space == SpaceId::Data
952                && self.peer_supports_ack_frequency();
953        }
954
955        // Whether this packet can be coalesced with another one in the same datagram.
956        let mut coalesce = true;
957
958        // Whether the last packet in the datagram must be padded so the datagram takes up
959        // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller.
960        let mut pad_datagram = PadDatagram::No;
961
962        // Whether congestion control stopped the next packet from being sent. Further
963        // packets could still be built, as e.g. tail-loss probes are not congestion
964        // limited.
965        let mut congestion_blocked = false;
966
967        // The packet number of the last built packet.
968        let mut last_packet_number = None;
969
970        let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
971
972        // If there is any open, validated and available path we only want to send frames to
973        // any backup path that must be sent on that backup path exclusively.
974        let have_available_path = self.paths.iter().any(|(id, path)| {
975            path.data.validated
976                && path.data.local_status() == PathStatus::Available
977                && self.rem_cids.contains_key(id)
978        });
979
980        // Setup for the first path_id
981        let mut transmit = TransmitBuf::new(
982            buf,
983            max_datagrams,
984            self.path_data(path_id).current_mtu().into(),
985        );
986        if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
987            return Some(challenge);
988        }
989        let mut space_id = match path_id {
990            PathId::ZERO => SpaceId::Initial,
991            _ => SpaceId::Data,
992        };
993
994        loop {
995            // check if there is at least one active CID to use for sending
996            let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
997                let err = PathError::RemoteCidsExhausted;
998                if !self.abandoned_paths.contains(&path_id) {
999                    debug!(?err, %path_id, "no active CID for path");
1000                    self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1001                        id: path_id,
1002                        error: err,
1003                    }));
1004                    // Locally we should have refused to open this path, the remote should
1005                    // have given us CIDs for this path before opening it.  So we can always
1006                    // abandon this here.
1007                    self.close_path(
1008                        now,
1009                        path_id,
1010                        TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1011                    )
1012                    .ok();
1013                    self.spaces[SpaceId::Data]
1014                        .pending
1015                        .path_cids_blocked
1016                        .push(path_id);
1017                } else {
1018                    trace!(%path_id, "remote CIDs retired for abandoned path");
1019                }
1020
1021                match self.paths.keys().find(|&&next| next > path_id) {
1022                    Some(next_path_id) => {
1023                        // See if this next path can send anything.
1024                        path_id = *next_path_id;
1025                        space_id = SpaceId::Data;
1026
1027                        // update per path state
1028                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1029                        if let Some(challenge) =
1030                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1031                        {
1032                            return Some(challenge);
1033                        }
1034
1035                        continue;
1036                    }
1037                    None => {
1038                        // Nothing more to send.
1039                        trace!(
1040                            ?space_id,
1041                            %path_id,
1042                            "no CIDs to send on path, no more paths"
1043                        );
1044                        break;
1045                    }
1046                }
1047            };
1048
1049            // Determine if anything can be sent in this packet number space (SpaceId +
1050            // PathId).
1051            let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1052                // We are trying to coalesce another packet into this datagram.
1053                transmit.datagram_remaining_mut()
1054            } else {
1055                // A new datagram needs to be started.
1056                transmit.segment_size()
1057            };
1058            let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1059            let path_should_send = {
1060                let path_exclusive_only = space_id == SpaceId::Data
1061                    && have_available_path
1062                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1063                let path_should_send = if path_exclusive_only {
1064                    can_send.path_exclusive
1065                } else {
1066                    !can_send.is_empty()
1067                };
1068                let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1069                path_should_send || needs_loss_probe
1070            };
1071
1072            if !path_should_send && space_id < SpaceId::Data {
1073                if self.spaces[space_id].crypto.is_some() {
1074                    trace!(?space_id, %path_id, "nothing to send in space");
1075                }
1076                space_id = space_id.next();
1077                continue;
1078            }
1079
1080            let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1081                // Only check congestion control if a new datagram is needed.
1082                self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1083            } else {
1084                PathBlocked::No
1085            };
1086            if send_blocked != PathBlocked::No {
1087                trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1088                congestion_blocked = true;
1089            }
1090            if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1091                // Higher spaces might still have tail-loss probes to send, which are not
1092                // congestion blocked.
1093                space_id = space_id.next();
1094                continue;
1095            }
1096            if !path_should_send || send_blocked != PathBlocked::No {
1097                // Nothing more to send on this path, check the next path if possible.
1098
1099                // If there are any datagrams in the transmit, packets for another path can
1100                // not be built.
1101                if transmit.num_datagrams() > 0 {
1102                    break;
1103                }
1104
1105                match self.paths.keys().find(|&&next| next > path_id) {
1106                    Some(next_path_id) => {
1107                        // See if this next path can send anything.
1108                        trace!(
1109                            ?space_id,
1110                            %path_id,
1111                            %next_path_id,
1112                            "nothing to send on path"
1113                        );
1114                        path_id = *next_path_id;
1115                        space_id = SpaceId::Data;
1116
1117                        // update per path state
1118                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1119                        if let Some(challenge) =
1120                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1121                        {
1122                            return Some(challenge);
1123                        }
1124
1125                        continue;
1126                    }
1127                    None => {
1128                        // Nothing more to send.
1129                        trace!(
1130                            ?space_id,
1131                            %path_id,
1132                            next_path_id=?None::<PathId>,
1133                            "nothing to send on path"
1134                        );
1135                        break;
1136                    }
1137                }
1138            }
1139
1140            // If the datagram is full, we need to start a new one.
1141            if transmit.datagram_remaining_mut() == 0 {
1142                if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1143                    // No more datagrams allowed
1144                    break;
1145                }
1146
1147                match self.spaces[space_id].for_path(path_id).loss_probes {
1148                    0 => transmit.start_new_datagram(),
1149                    _ => {
1150                        // We need something to send for a tail-loss probe.
1151                        let request_immediate_ack =
1152                            space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1153                        self.spaces[space_id].maybe_queue_probe(
1154                            path_id,
1155                            request_immediate_ack,
1156                            &self.streams,
1157                        );
1158
1159                        self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1160
1161                        // Clamp the datagram to at most the minimum MTU to ensure that loss
1162                        // probes can get through and enable recovery even if the path MTU
1163                        // has shrank unexpectedly.
1164                        transmit.start_new_datagram_with_size(std::cmp::min(
1165                            usize::from(INITIAL_MTU),
1166                            transmit.segment_size(),
1167                        ));
1168                    }
1169                }
1170                trace!(count = transmit.num_datagrams(), "new datagram started");
1171                coalesce = true;
1172                pad_datagram = PadDatagram::No;
1173            }
1174
1175            // If coalescing another packet into the existing datagram, there should
1176            // still be enough space for a whole packet.
1177            if transmit.datagram_start_offset() < transmit.len() {
1178                debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1179            }
1180
1181            //
1182            // From here on, we've determined that a packet will definitely be sent.
1183            //
1184
1185            if self.spaces[SpaceId::Initial].crypto.is_some()
1186                && space_id == SpaceId::Handshake
1187                && self.side.is_client()
1188            {
1189                // A client stops both sending and processing Initial packets when it
1190                // sends its first Handshake packet.
1191                self.discard_space(now, SpaceId::Initial);
1192            }
1193            if let Some(ref mut prev) = self.prev_crypto {
1194                prev.update_unacked = false;
1195            }
1196
1197            let mut qlog = QlogSentPacket::default();
1198            let mut builder = PacketBuilder::new(
1199                now,
1200                space_id,
1201                path_id,
1202                remote_cid,
1203                &mut transmit,
1204                can_send.other,
1205                self,
1206                &mut qlog,
1207            )?;
1208            last_packet_number = Some(builder.exact_number);
1209            coalesce = coalesce && !builder.short_header;
1210
1211            if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1212                // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1
1213                pad_datagram |= PadDatagram::ToMinMtu;
1214            }
1215            if space_id == SpaceId::Data && self.config.pad_to_mtu {
1216                pad_datagram |= PadDatagram::ToSegmentSize;
1217            }
1218
1219            if can_send.close {
1220                trace!("sending CONNECTION_CLOSE");
1221                // Encode ACKs before the ConnectionClose message, to give the receiver
1222                // a better approximate on what data has been processed. This is
1223                // especially important with ack delay, since the peer might not
1224                // have gotten any other ACK for the data earlier on.
1225                let mut sent_frames = SentFrames::default();
1226                let is_multipath_negotiated = self.is_multipath_negotiated();
1227                for path_id in self.spaces[space_id]
1228                    .number_spaces
1229                    .iter()
1230                    .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1231                    .map(|(&path_id, _)| path_id)
1232                    .collect::<Vec<_>>()
1233                {
1234                    Self::populate_acks(
1235                        now,
1236                        self.receiving_ecn,
1237                        &mut sent_frames,
1238                        path_id,
1239                        space_id,
1240                        &mut self.spaces[space_id],
1241                        is_multipath_negotiated,
1242                        &mut builder.frame_space_mut(),
1243                        &mut self.stats,
1244                        &mut qlog,
1245                    );
1246                }
1247
1248                // Since there only 64 ACK frames there will always be enough space
1249                // to encode the ConnectionClose frame too. However we still have the
1250                // check here to prevent crashes if something changes.
1251                debug_assert!(
1252                    builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1253                    "ACKs should leave space for ConnectionClose"
1254                );
1255                if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1256                    let max_frame_size = builder.frame_space_remaining();
1257                    match self.state.as_type() {
1258                        StateType::Closed => {
1259                            let reason: Close =
1260                                self.state.as_closed().expect("checked").clone().into();
1261                            if space_id == SpaceId::Data || reason.is_transport_layer() {
1262                                reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1263                                qlog.frame(&Frame::Close(reason));
1264                            } else {
1265                                let frame = frame::ConnectionClose {
1266                                    error_code: TransportErrorCode::APPLICATION_ERROR,
1267                                    frame_type: None,
1268                                    reason: Bytes::new(),
1269                                };
1270                                frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1271                                qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1272                            }
1273                        }
1274                        StateType::Draining => {
1275                            let frame = frame::ConnectionClose {
1276                                error_code: TransportErrorCode::NO_ERROR,
1277                                frame_type: None,
1278                                reason: Bytes::new(),
1279                            };
1280                            frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1281                            qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1282                        }
1283                        _ => unreachable!(
1284                            "tried to make a close packet when the connection wasn't closed"
1285                        ),
1286                    };
1287                }
1288                builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1289                if space_id == self.highest_space {
1290                    // Don't send another close packet. Even with multipath we only send
1291                    // CONNECTION_CLOSE on a single path since we expect our paths to work.
1292                    self.close = false;
1293                    // `CONNECTION_CLOSE` is the final packet
1294                    break;
1295                } else {
1296                    // Send a close frame in every possible space for robustness, per
1297                    // RFC9000 "Immediate Close during the Handshake". Don't bother trying
1298                    // to send anything else.
1299                    space_id = space_id.next();
1300                    continue;
1301                }
1302            }
1303
1304            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that
1305            // path validation can occur while the link is saturated.
1306            if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1307                let path = self.path_data_mut(path_id);
1308                if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1309                    // TODO(flub): We need to use the right CID!  We shouldn't use the same
1310                    //    CID as the current active one for the path.  Though see also
1311                    //    https://github.com/quinn-rs/quinn/issues/2184
1312                    let response = frame::PathResponse(token);
1313                    trace!(%response, "(off-path)");
1314                    builder.frame_space_mut().write(response);
1315                    qlog.frame(&Frame::PathResponse(response));
1316                    self.stats.frame_tx.path_response += 1;
1317                    builder.finish_and_track(
1318                        now,
1319                        self,
1320                        path_id,
1321                        SentFrames {
1322                            non_retransmits: true,
1323                            ..SentFrames::default()
1324                        },
1325                        PadDatagram::ToMinMtu,
1326                        qlog,
1327                    );
1328                    self.stats.udp_tx.on_sent(1, transmit.len());
1329                    return Some(Transmit {
1330                        destination: remote,
1331                        size: transmit.len(),
1332                        ecn: None,
1333                        segment_size: None,
1334                        src_ip: self.local_ip,
1335                    });
1336                }
1337            }
1338
1339            let sent_frames = {
1340                let path_exclusive_only = have_available_path
1341                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1342                let pn = builder.exact_number;
1343                self.populate_packet(
1344                    now,
1345                    space_id,
1346                    path_id,
1347                    path_exclusive_only,
1348                    &mut builder.frame_space_mut(),
1349                    pn,
1350                    &mut qlog,
1351                )
1352            };
1353
1354            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
1355            // any other reason, there is a bug which leads to one component announcing write
1356            // readiness while not writing any data. This degrades performance. The condition is
1357            // only checked if the full MTU is available and when potentially large fixed-size
1358            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
1359            // writing ACKs.
1360            debug_assert!(
1361                !(sent_frames.is_ack_only(&self.streams)
1362                    && !can_send.acks
1363                    && can_send.other
1364                    && builder.buf.segment_size()
1365                        == self.path_data(path_id).current_mtu() as usize
1366                    && self.datagrams.outgoing.is_empty()),
1367                "SendableFrames was {can_send:?}, but only ACKs have been written"
1368            );
1369            if sent_frames.requires_padding {
1370                pad_datagram |= PadDatagram::ToMinMtu;
1371            }
1372
1373            for (path_id, _pn) in sent_frames.largest_acked.iter() {
1374                self.spaces[space_id]
1375                    .for_path(*path_id)
1376                    .pending_acks
1377                    .acks_sent();
1378                self.timers.stop(
1379                    Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1380                    self.qlog.with_time(now),
1381                );
1382            }
1383
1384            // Now we need to finish the packet.  Before we do so we need to know if we will
1385            // be coalescing the next packet into this one, or will be ending the datagram
1386            // as well.  Because if this is the last packet in the datagram more padding
1387            // might be needed because of the packet type, or to fill the GSO segment size.
1388
1389            // Are we allowed to coalesce AND is there enough space for another *packet* in
1390            // this datagram AND is there another packet to send in this or the next space?
1391            if coalesce
1392                && builder
1393                    .buf
1394                    .datagram_remaining_mut()
1395                    .saturating_sub(builder.predict_packet_end())
1396                    > MIN_PACKET_SPACE
1397                && self
1398                    .next_send_space(space_id, path_id, builder.buf, close)
1399                    .is_some()
1400            {
1401                // We can append/coalesce the next packet into the current
1402                // datagram. Finish the current packet without adding extra padding.
1403                builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1404            } else {
1405                // We need a new datagram for the next packet.  Finish the current
1406                // packet with padding.
1407                if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1408                    // If too many padding bytes would be required to continue the
1409                    // GSO batch after this packet, end the GSO batch here. Ensures
1410                    // that fixed-size frames with heterogeneous sizes
1411                    // (e.g. application datagrams) won't inadvertently waste large
1412                    // amounts of bandwidth. The exact threshold is a bit arbitrary
1413                    // and might benefit from further tuning, though there's no
1414                    // universally optimal value.
1415                    const MAX_PADDING: usize = 32;
1416                    if builder.buf.datagram_remaining_mut()
1417                        > builder.predict_packet_end() + MAX_PADDING
1418                    {
1419                        trace!(
1420                            "GSO truncated by demand for {} padding bytes",
1421                            builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1422                        );
1423                        builder.finish_and_track(
1424                            now,
1425                            self,
1426                            path_id,
1427                            sent_frames,
1428                            PadDatagram::No,
1429                            qlog,
1430                        );
1431                        break;
1432                    }
1433
1434                    // Pad the current datagram to GSO segment size so it can be
1435                    // included in the GSO batch.
1436                    builder.finish_and_track(
1437                        now,
1438                        self,
1439                        path_id,
1440                        sent_frames,
1441                        PadDatagram::ToSegmentSize,
1442                        qlog,
1443                    );
1444                } else {
1445                    builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1446                }
1447                if transmit.num_datagrams() == 1 {
1448                    transmit.clip_datagram_size();
1449                }
1450            }
1451        }
1452
1453        if let Some(last_packet_number) = last_packet_number {
1454            // Note that when sending in multiple packet spaces the last packet number will
1455            // be the one from the highest packet space.
1456            self.path_data_mut(path_id).congestion.on_sent(
1457                now,
1458                transmit.len() as u64,
1459                last_packet_number,
1460            );
1461        }
1462
1463        self.qlog.emit_recovery_metrics(
1464            path_id,
1465            &mut self.paths.get_mut(&path_id).unwrap().data,
1466            now,
1467        );
1468
1469        self.app_limited = transmit.is_empty() && !congestion_blocked;
1470
1471        // Send MTU probe if necessary
1472        if transmit.is_empty() && self.state.is_established() {
1473            // MTU probing happens only in Data space.
1474            let space_id = SpaceId::Data;
1475            path_id = *self.paths.first_key_value().expect("one path must exist").0;
1476            let probe_data = loop {
1477                // We MTU probe all paths for which all of the following is true:
1478                // - We have an active destination CID for the path.
1479                // - The remote address *and* path are validated.
1480                // - The path is not abandoned.
1481                // - The MTU Discovery subsystem wants to probe the path.
1482                let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1483                let eligible = self.path_data(path_id).validated
1484                    && !self.path_data(path_id).is_validating_path()
1485                    && !self.abandoned_paths.contains(&path_id);
1486                let probe_size = eligible
1487                    .then(|| {
1488                        let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1489                        self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1490                    })
1491                    .flatten();
1492                match (active_cid, probe_size) {
1493                    (Some(active_cid), Some(probe_size)) => {
1494                        // Let's send an MTUD probe!
1495                        break Some((active_cid, probe_size));
1496                    }
1497                    _ => {
1498                        // Find the next path to check if it needs an MTUD probe.
1499                        match self.paths.keys().find(|&&next| next > path_id) {
1500                            Some(next) => {
1501                                path_id = *next;
1502                                continue;
1503                            }
1504                            None => break None,
1505                        }
1506                    }
1507                }
1508            };
1509            if let Some((active_cid, probe_size)) = probe_data {
1510                // We are definitely sending a DPLPMTUD probe.
1511                debug_assert_eq!(transmit.num_datagrams(), 0);
1512                transmit.start_new_datagram_with_size(probe_size as usize);
1513
1514                let mut qlog = QlogSentPacket::default();
1515                let mut builder = PacketBuilder::new(
1516                    now,
1517                    space_id,
1518                    path_id,
1519                    active_cid,
1520                    &mut transmit,
1521                    true,
1522                    self,
1523                    &mut qlog,
1524                )?;
1525
1526                // We implement MTU probes as ping packets padded up to the probe size
1527                trace!(?probe_size, "writing MTUD probe");
1528                trace!("PING");
1529                builder.frame_space_mut().write(frame::FrameType::PING);
1530                qlog.frame(&Frame::Ping);
1531                self.stats.frame_tx.ping += 1;
1532
1533                // If supported by the peer, we want no delays to the probe's ACK
1534                if self.peer_supports_ack_frequency() {
1535                    trace!("IMMEDIATE_ACK");
1536                    builder
1537                        .frame_space_mut()
1538                        .write(frame::FrameType::IMMEDIATE_ACK);
1539                    self.stats.frame_tx.immediate_ack += 1;
1540                    qlog.frame(&Frame::ImmediateAck);
1541                }
1542
1543                let sent_frames = SentFrames {
1544                    non_retransmits: true,
1545                    ..Default::default()
1546                };
1547                builder.finish_and_track(
1548                    now,
1549                    self,
1550                    path_id,
1551                    sent_frames,
1552                    PadDatagram::ToSize(probe_size),
1553                    qlog,
1554                );
1555
1556                self.path_stats
1557                    .entry(path_id)
1558                    .or_default()
1559                    .sent_plpmtud_probes += 1;
1560            }
1561        }
1562
1563        if transmit.is_empty() {
1564            return None;
1565        }
1566
1567        let destination = self.path_data(path_id).remote;
1568        trace!(
1569            segment_size = transmit.segment_size(),
1570            last_datagram_len = transmit.len() % transmit.segment_size(),
1571            ?destination,
1572            "sending {} bytes in {} datagrams",
1573            transmit.len(),
1574            transmit.num_datagrams()
1575        );
1576        self.path_data_mut(path_id)
1577            .inc_total_sent(transmit.len() as u64);
1578
1579        self.stats
1580            .udp_tx
1581            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1582
1583        Some(Transmit {
1584            destination,
1585            size: transmit.len(),
1586            ecn: if self.path_data(path_id).sending_ecn {
1587                Some(EcnCodepoint::Ect0)
1588            } else {
1589                None
1590            },
1591            segment_size: match transmit.num_datagrams() {
1592                1 => None,
1593                _ => Some(transmit.segment_size()),
1594            },
1595            src_ip: self.local_ip,
1596        })
1597    }
1598
1599    /// Returns the [`SpaceId`] of the next packet space which has data to send
1600    ///
1601    /// This takes into account the space available to frames in the next datagram.
1602    // TODO(flub): This duplication is not nice.
1603    fn next_send_space(
1604        &mut self,
1605        current_space_id: SpaceId,
1606        path_id: PathId,
1607        buf: &TransmitBuf<'_>,
1608        close: bool,
1609    ) -> Option<SpaceId> {
1610        // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed
1611        // to be able to send an individual frame at least this large in the next 1-RTT
1612        // packet. This could be generalized to support every space, but it's only needed to
1613        // handle large fixed-size frames, which only exist in 1-RTT (application
1614        // datagrams). We don't account for coalesced packets potentially occupying space
1615        // because frames can always spill into the next datagram.
1616        let mut space_id = current_space_id;
1617        loop {
1618            let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1619            if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1620                return Some(space_id);
1621            }
1622            space_id = match space_id {
1623                SpaceId::Initial => SpaceId::Handshake,
1624                SpaceId::Handshake => SpaceId::Data,
1625                SpaceId::Data => break,
1626            }
1627        }
1628        None
1629    }
1630
1631    /// Checks if creating a new datagram would be blocked by congestion control
1632    fn path_congestion_check(
1633        &mut self,
1634        space_id: SpaceId,
1635        path_id: PathId,
1636        transmit: &TransmitBuf<'_>,
1637        can_send: &SendableFrames,
1638        now: Instant,
1639    ) -> PathBlocked {
1640        // Anti-amplification is only based on `total_sent`, which gets updated after
1641        // the transmit is sent. Therefore we pass the amount of bytes for datagrams
1642        // that are already created, as well as 1 byte for starting another datagram. If
1643        // there is any anti-amplification budget left, we always allow a full MTU to be
1644        // sent (see https://github.com/quinn-rs/quinn/issues/1082).
1645        if self.side().is_server()
1646            && self
1647                .path_data(path_id)
1648                .anti_amplification_blocked(transmit.len() as u64 + 1)
1649        {
1650            trace!(?space_id, %path_id, "blocked by anti-amplification");
1651            return PathBlocked::AntiAmplification;
1652        }
1653
1654        // Congestion control check.
1655        // Tail loss probes must not be blocked by congestion, or a deadlock could arise.
1656        let bytes_to_send = transmit.segment_size() as u64;
1657        let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1658
1659        if can_send.other && !need_loss_probe && !can_send.close {
1660            let path = self.path_data(path_id);
1661            if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1662                trace!(?space_id, %path_id, "blocked by congestion control");
1663                return PathBlocked::Congestion;
1664            }
1665        }
1666
1667        // Pacing check.
1668        if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1669            self.timers.set(
1670                Timer::PerPath(path_id, PathTimer::Pacing),
1671                delay,
1672                self.qlog.with_time(now),
1673            );
1674            // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though
1675            // they are not congestion controlled.
1676            trace!(?space_id, %path_id, "blocked by pacing");
1677            return PathBlocked::Pacing;
1678        }
1679
1680        PathBlocked::No
1681    }
1682
1683    /// Send PATH_CHALLENGE for a previous path if necessary
1684    ///
1685    /// QUIC-TRANSPORT section 9.3.3
1686    /// <https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding>
1687    fn send_prev_path_challenge(
1688        &mut self,
1689        now: Instant,
1690        buf: &mut TransmitBuf<'_>,
1691        path_id: PathId,
1692    ) -> Option<Transmit> {
1693        let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1694        // TODO (matheus23): We could use !prev_path.is_validating() here instead to
1695        // (possibly) also re-send challenges when they get lost.
1696        if !prev_path.send_new_challenge {
1697            return None;
1698        };
1699        prev_path.send_new_challenge = false;
1700        let destination = prev_path.remote;
1701        let token = self.rng.random();
1702        let info = paths::SentChallengeInfo {
1703            sent_instant: now,
1704            remote: destination,
1705        };
1706        prev_path.challenges_sent.insert(token, info);
1707        debug_assert_eq!(
1708            self.highest_space,
1709            SpaceId::Data,
1710            "PATH_CHALLENGE queued without 1-RTT keys"
1711        );
1712        buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1713
1714        // Use the previous CID to avoid linking the new path with the previous path. We
1715        // don't bother accounting for possible retirement of that prev_cid because this is
1716        // sent once, immediately after migration, when the CID is known to be valid. Even
1717        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1718        // this is sent first.
1719        debug_assert_eq!(buf.datagram_start_offset(), 0);
1720        let mut qlog = QlogSentPacket::default();
1721        let mut builder = PacketBuilder::new(
1722            now,
1723            SpaceId::Data,
1724            path_id,
1725            *prev_cid,
1726            buf,
1727            false,
1728            self,
1729            &mut qlog,
1730        )?;
1731        let challenge = frame::PathChallenge(token);
1732        trace!(%challenge, "validating previous path");
1733        qlog.frame(&Frame::PathChallenge(challenge));
1734        builder.frame_space_mut().write(challenge);
1735        self.stats.frame_tx.path_challenge += 1;
1736
1737        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1738        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1739        // unless the anti-amplification limit for the path does not permit
1740        // sending a datagram of this size
1741        builder.pad_to(MIN_INITIAL_SIZE);
1742
1743        builder.finish(self, now, qlog);
1744        self.stats.udp_tx.on_sent(1, buf.len());
1745
1746        Some(Transmit {
1747            destination,
1748            size: buf.len(),
1749            ecn: None,
1750            segment_size: None,
1751            src_ip: self.local_ip,
1752        })
1753    }
1754
1755    /// Indicate what types of frames are ready to send for the given space
1756    ///
1757    /// *packet_size* is the number of bytes available to build the next packet.  *close*
1758    /// *indicates whether a CONNECTION_CLOSE frame needs to be sent.
1759    fn space_can_send(
1760        &mut self,
1761        space_id: SpaceId,
1762        path_id: PathId,
1763        packet_size: usize,
1764        close: bool,
1765    ) -> SendableFrames {
1766        let pn = self.spaces[SpaceId::Data]
1767            .for_path(path_id)
1768            .peek_tx_number();
1769        let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1770        if self.spaces[space_id].crypto.is_none()
1771            && (space_id != SpaceId::Data
1772                || self.zero_rtt_crypto.is_none()
1773                || self.side.is_server())
1774        {
1775            // No keys available for this space
1776            return SendableFrames::empty();
1777        }
1778        let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1779        if space_id == SpaceId::Data {
1780            can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1781        }
1782
1783        can_send.close = close && self.spaces[space_id].crypto.is_some();
1784
1785        can_send
1786    }
1787
1788    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1789    ///
1790    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1791    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1792    /// extracted through the relevant methods.
1793    pub fn handle_event(&mut self, event: ConnectionEvent) {
1794        use ConnectionEventInner::*;
1795        match event.0 {
1796            Datagram(DatagramConnectionEvent {
1797                now,
1798                remote,
1799                path_id,
1800                ecn,
1801                first_decode,
1802                remaining,
1803            }) => {
1804                let span = trace_span!("pkt", %path_id);
1805                let _guard = span.enter();
1806                // If this packet could initiate a migration and we're a client or a server that
1807                // forbids migration, drop the datagram. This could be relaxed to heuristically
1808                // permit NAT-rebinding-like migration.
1809                if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1810                    if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1811                        trace!(
1812                            %path_id,
1813                            ?remote,
1814                            path_remote = ?self.path(path_id).map(|p| p.remote),
1815                            "discarding packet from unrecognized peer"
1816                        );
1817                        return;
1818                    }
1819                }
1820
1821                let was_anti_amplification_blocked = self
1822                    .path(path_id)
1823                    .map(|path| path.anti_amplification_blocked(1))
1824                    .unwrap_or(true); // if we don't know about this path it's eagerly considered as unvalidated
1825                // TODO(@divma): revisit this
1826
1827                self.stats.udp_rx.datagrams += 1;
1828                self.stats.udp_rx.bytes += first_decode.len() as u64;
1829                let data_len = first_decode.len();
1830
1831                self.handle_decode(now, remote, path_id, ecn, first_decode);
1832                // The current `path` might have changed inside `handle_decode` since the packet
1833                // could have triggered a migration. The packet might also belong to an unknown
1834                // path and have been rejected. Make sure the data received is accounted for the
1835                // most recent path by accessing `path` after `handle_decode`.
1836                if let Some(path) = self.path_mut(path_id) {
1837                    path.inc_total_recvd(data_len as u64);
1838                }
1839
1840                if let Some(data) = remaining {
1841                    self.stats.udp_rx.bytes += data.len() as u64;
1842                    self.handle_coalesced(now, remote, path_id, ecn, data);
1843                }
1844
1845                if let Some(path) = self.paths.get_mut(&path_id) {
1846                    self.qlog
1847                        .emit_recovery_metrics(path_id, &mut path.data, now);
1848                }
1849
1850                if was_anti_amplification_blocked {
1851                    // A prior attempt to set the loss detection timer may have failed due to
1852                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1853                    // the server's first flight is lost.
1854                    self.set_loss_detection_timer(now, path_id);
1855                }
1856            }
1857            NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1858                let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1859                debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1860                let cid_state = self
1861                    .local_cid_state
1862                    .entry(path_id)
1863                    .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1864                cid_state.new_cids(&ids, now);
1865
1866                ids.into_iter().rev().for_each(|frame| {
1867                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1868                });
1869                // Always update Timer::PushNewCid
1870                self.reset_cid_retirement(now);
1871            }
1872        }
1873    }
1874
1875    /// Process timer expirations
1876    ///
1877    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1878    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1879    /// methods.
1880    ///
1881    /// It is most efficient to call this immediately after the system clock reaches the latest
1882    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1883    /// no-op and therefore are safe.
1884    pub fn handle_timeout(&mut self, now: Instant) {
1885        while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1886            // TODO(@divma): remove `at` when the unicorn is born
1887            trace!(?timer, at=?now, "timeout");
1888            match timer {
1889                Timer::Conn(timer) => match timer {
1890                    ConnTimer::Close => {
1891                        self.state.move_to_drained(None);
1892                        self.endpoint_events.push_back(EndpointEventInner::Drained);
1893                    }
1894                    ConnTimer::Idle => {
1895                        self.kill(ConnectionError::TimedOut);
1896                    }
1897                    ConnTimer::KeepAlive => {
1898                        trace!("sending keep-alive");
1899                        self.ping();
1900                    }
1901                    ConnTimer::KeyDiscard => {
1902                        self.zero_rtt_crypto = None;
1903                        self.prev_crypto = None;
1904                    }
1905                    ConnTimer::PushNewCid => {
1906                        while let Some((path_id, when)) = self.next_cid_retirement() {
1907                            if when > now {
1908                                break;
1909                            }
1910                            match self.local_cid_state.get_mut(&path_id) {
1911                                None => error!(%path_id, "No local CID state for path"),
1912                                Some(cid_state) => {
1913                                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1914                                    let num_new_cid = cid_state.on_cid_timeout().into();
1915                                    if !self.state.is_closed() {
1916                                        trace!(
1917                                            "push a new CID to peer RETIRE_PRIOR_TO field {}",
1918                                            cid_state.retire_prior_to()
1919                                        );
1920                                        self.endpoint_events.push_back(
1921                                            EndpointEventInner::NeedIdentifiers(
1922                                                path_id,
1923                                                now,
1924                                                num_new_cid,
1925                                            ),
1926                                        );
1927                                    }
1928                                }
1929                            }
1930                        }
1931                    }
1932                },
1933                // TODO: add path_id as span somehow
1934                Timer::PerPath(path_id, timer) => {
1935                    let span = trace_span!("per-path timer fired", %path_id, ?timer);
1936                    let _guard = span.enter();
1937                    match timer {
1938                        PathTimer::PathIdle => {
1939                            self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1940                                .ok();
1941                        }
1942
1943                        PathTimer::PathKeepAlive => {
1944                            trace!("sending keep-alive on path");
1945                            self.ping_path(path_id).ok();
1946                        }
1947                        PathTimer::LossDetection => {
1948                            self.on_loss_detection_timeout(now, path_id);
1949                            self.qlog.emit_recovery_metrics(
1950                                path_id,
1951                                &mut self.paths.get_mut(&path_id).unwrap().data,
1952                                now,
1953                            );
1954                        }
1955                        PathTimer::PathValidation => {
1956                            let Some(path) = self.paths.get_mut(&path_id) else {
1957                                continue;
1958                            };
1959                            self.timers.stop(
1960                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1961                                self.qlog.with_time(now),
1962                            );
1963                            debug!("path validation failed");
1964                            if let Some((_, prev)) = path.prev.take() {
1965                                path.data = prev;
1966                            }
1967                            path.data.challenges_sent.clear();
1968                            path.data.send_new_challenge = false;
1969                        }
1970                        PathTimer::PathChallengeLost => {
1971                            let Some(path) = self.paths.get_mut(&path_id) else {
1972                                continue;
1973                            };
1974                            trace!("path challenge deemed lost");
1975                            path.data.send_new_challenge = true;
1976                        }
1977                        PathTimer::PathOpen => {
1978                            let Some(path) = self.path_mut(path_id) else {
1979                                continue;
1980                            };
1981                            path.challenges_sent.clear();
1982                            path.send_new_challenge = false;
1983                            debug!("new path validation failed");
1984                            if let Err(err) = self.close_path(
1985                                now,
1986                                path_id,
1987                                TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
1988                            ) {
1989                                warn!(?err, "failed closing path");
1990                            }
1991
1992                            self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1993                                id: path_id,
1994                                error: PathError::ValidationFailed,
1995                            }));
1996                        }
1997                        PathTimer::Pacing => trace!("pacing timer expired"),
1998                        PathTimer::MaxAckDelay => {
1999                            trace!("max ack delay reached");
2000                            // This timer is only armed in the Data space
2001                            self.spaces[SpaceId::Data]
2002                                .for_path(path_id)
2003                                .pending_acks
2004                                .on_max_ack_delay_timeout()
2005                        }
2006                        PathTimer::DiscardPath => {
2007                            // The path was abandoned and 3*PTO has expired since.  Clean up all
2008                            // remaining state and install stateless reset token.
2009                            self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2010                            if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2011                                let (min_seq, max_seq) = loc_cid_state.active_seq();
2012                                for seq in min_seq..=max_seq {
2013                                    self.endpoint_events.push_back(
2014                                        EndpointEventInner::RetireConnectionId(
2015                                            now, path_id, seq, false,
2016                                        ),
2017                                    );
2018                                }
2019                            }
2020                            self.discard_path(path_id, now);
2021                        }
2022                    }
2023                }
2024            }
2025        }
2026    }
2027
2028    /// Close a connection immediately
2029    ///
2030    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
2031    /// call this only when all important communications have been completed, e.g. by calling
2032    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
2033    /// [`StreamEvent::Finished`] event.
2034    ///
2035    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
2036    /// delivered. There may still be data from the peer that has not been received.
2037    ///
2038    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
2039    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2040        self.close_inner(
2041            now,
2042            Close::Application(frame::ApplicationClose { error_code, reason }),
2043        )
2044    }
2045
2046    fn close_inner(&mut self, now: Instant, reason: Close) {
2047        let was_closed = self.state.is_closed();
2048        if !was_closed {
2049            self.close_common();
2050            self.set_close_timer(now);
2051            self.close = true;
2052            self.state.move_to_closed_local(reason);
2053        }
2054    }
2055
2056    /// Control datagrams
2057    pub fn datagrams(&mut self) -> Datagrams<'_> {
2058        Datagrams { conn: self }
2059    }
2060
2061    /// Returns connection statistics
2062    pub fn stats(&mut self) -> ConnectionStats {
2063        self.stats.clone()
2064    }
2065
2066    /// Returns path statistics
2067    pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2068        let path = self.paths.get(&path_id)?;
2069        let stats = self.path_stats.entry(path_id).or_default();
2070        stats.rtt = path.data.rtt.get();
2071        stats.cwnd = path.data.congestion.window();
2072        stats.current_mtu = path.data.mtud.current_mtu();
2073        Some(*stats)
2074    }
2075
2076    /// Ping the remote endpoint
2077    ///
2078    /// Causes an ACK-eliciting packet to be transmitted on the connection.
2079    pub fn ping(&mut self) {
2080        // TODO(flub): This is very brute-force: it pings *all* the paths.  Instead it would
2081        //    be nice if we could only send a single packet for this.
2082        for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2083            path_data.ping_pending = true;
2084        }
2085    }
2086
2087    /// Ping the remote endpoint over a specific path
2088    ///
2089    /// Causes an ACK-eliciting packet to be transmitted on the path.
2090    pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2091        let path_data = self.spaces[self.highest_space]
2092            .number_spaces
2093            .get_mut(&path)
2094            .ok_or(ClosedPath { _private: () })?;
2095        path_data.ping_pending = true;
2096        Ok(())
2097    }
2098
2099    /// Update traffic keys spontaneously
2100    ///
2101    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
2102    pub fn force_key_update(&mut self) {
2103        if !self.state.is_established() {
2104            debug!("ignoring forced key update in illegal state");
2105            return;
2106        }
2107        if self.prev_crypto.is_some() {
2108            // We already just updated, or are currently updating, the keys. Concurrent key updates
2109            // are illegal.
2110            debug!("ignoring redundant forced key update");
2111            return;
2112        }
2113        self.update_keys(None, false);
2114    }
2115
2116    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
2117    #[doc(hidden)]
2118    #[deprecated]
2119    pub fn initiate_key_update(&mut self) {
2120        self.force_key_update();
2121    }
2122
2123    /// Get a session reference
2124    pub fn crypto_session(&self) -> &dyn crypto::Session {
2125        &*self.crypto
2126    }
2127
2128    /// Whether the connection is in the process of being established
2129    ///
2130    /// If this returns `false`, the connection may be either established or closed, signaled by the
2131    /// emission of a `Connected` or `ConnectionLost` message respectively.
2132    pub fn is_handshaking(&self) -> bool {
2133        self.state.is_handshake()
2134    }
2135
2136    /// Whether the connection is closed
2137    ///
2138    /// Closed connections cannot transport any further data. A connection becomes closed when
2139    /// either peer application intentionally closes it, or when either transport layer detects an
2140    /// error such as a time-out or certificate validation failure.
2141    ///
2142    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
2143    pub fn is_closed(&self) -> bool {
2144        self.state.is_closed()
2145    }
2146
2147    /// Whether there is no longer any need to keep the connection around
2148    ///
2149    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
2150    /// packets from the peer. All drained connections have been closed.
2151    pub fn is_drained(&self) -> bool {
2152        self.state.is_drained()
2153    }
2154
2155    /// For clients, if the peer accepted the 0-RTT data packets
2156    ///
2157    /// The value is meaningless until after the handshake completes.
2158    pub fn accepted_0rtt(&self) -> bool {
2159        self.accepted_0rtt
2160    }
2161
2162    /// Whether 0-RTT is/was possible during the handshake
2163    pub fn has_0rtt(&self) -> bool {
2164        self.zero_rtt_enabled
2165    }
2166
2167    /// Whether there are any pending retransmits
2168    pub fn has_pending_retransmits(&self) -> bool {
2169        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2170    }
2171
2172    /// Look up whether we're the client or server of this Connection
2173    pub fn side(&self) -> Side {
2174        self.side.side()
2175    }
2176
2177    /// Get the address observed by the remote over the given path
2178    pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2179        self.path(path_id)
2180            .map(|path_data| {
2181                path_data
2182                    .last_observed_addr_report
2183                    .as_ref()
2184                    .map(|observed| observed.socket_addr())
2185            })
2186            .ok_or(ClosedPath { _private: () })
2187    }
2188
2189    /// The local IP address which was used when the peer established
2190    /// the connection
2191    ///
2192    /// This can be different from the address the endpoint is bound to, in case
2193    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
2194    ///
2195    /// This will return `None` for clients, or when no `local_ip` was passed to
2196    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
2197    /// connection.
2198    pub fn local_ip(&self) -> Option<IpAddr> {
2199        self.local_ip
2200    }
2201
2202    /// Current best estimate of this connection's latency (round-trip-time)
2203    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2204        self.path(path_id).map(|d| d.rtt.get())
2205    }
2206
2207    /// Current state of this connection's congestion controller, for debugging purposes
2208    pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2209        self.path(path_id).map(|d| d.congestion.as_ref())
2210    }
2211
2212    /// Modify the number of remotely initiated streams that may be concurrently open
2213    ///
2214    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
2215    /// `count`s increase both minimum and worst-case memory consumption.
2216    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2217        self.streams.set_max_concurrent(dir, count);
2218        // If the limit was reduced, then a flow control update previously deemed insignificant may
2219        // now be significant.
2220        let pending = &mut self.spaces[SpaceId::Data].pending;
2221        self.streams.queue_max_stream_id(pending);
2222    }
2223
2224    /// Modify the number of open paths allowed when multipath is enabled
2225    ///
2226    /// When reducing the number of concurrent paths this will only affect delaying sending
2227    /// new MAX_PATH_ID frames until fewer than this number of paths are possible.  To
2228    /// actively reduce paths they must be closed using [`Connection::close_path`], which
2229    /// can also be used to close not-yet-opened paths.
2230    ///
2231    /// If multipath is not negotiated (see the [`TransportConfig`]) this can not enable
2232    /// multipath and will fail.
2233    pub fn set_max_concurrent_paths(
2234        &mut self,
2235        now: Instant,
2236        count: NonZeroU32,
2237    ) -> Result<(), MultipathNotNegotiated> {
2238        if !self.is_multipath_negotiated() {
2239            return Err(MultipathNotNegotiated { _private: () });
2240        }
2241        self.max_concurrent_paths = count;
2242
2243        let in_use_count = self
2244            .local_max_path_id
2245            .next()
2246            .saturating_sub(self.abandoned_paths.len() as u32)
2247            .as_u32();
2248        let extra_needed = count.get().saturating_sub(in_use_count);
2249        let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2250
2251        self.set_max_path_id(now, new_max_path_id);
2252
2253        Ok(())
2254    }
2255
2256    /// If needed, issues a new MAX_PATH_ID frame and new CIDs for any newly allowed paths
2257    fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2258        if max_path_id <= self.local_max_path_id {
2259            return;
2260        }
2261
2262        self.local_max_path_id = max_path_id;
2263        self.spaces[SpaceId::Data].pending.max_path_id = true;
2264
2265        self.issue_first_path_cids(now);
2266    }
2267
2268    /// Current number of remotely initiated streams that may be concurrently open
2269    ///
2270    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
2271    /// it will not change immediately, even if fewer streams are open. Instead, it will
2272    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
2273    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2274        self.streams.max_concurrent(dir)
2275    }
2276
2277    /// See [`TransportConfig::send_window()`]
2278    pub fn set_send_window(&mut self, send_window: u64) {
2279        self.streams.set_send_window(send_window);
2280    }
2281
2282    /// See [`TransportConfig::receive_window()`]
2283    pub fn set_receive_window(&mut self, receive_window: VarInt) {
2284        if self.streams.set_receive_window(receive_window) {
2285            self.spaces[SpaceId::Data].pending.max_data = true;
2286        }
2287    }
2288
2289    /// Whether the Multipath for QUIC extension is enabled.
2290    ///
2291    /// Multipath is only enabled after the handshake is completed and if it was enabled by both
2292    /// peers.
2293    pub fn is_multipath_negotiated(&self) -> bool {
2294        !self.is_handshaking()
2295            && self.config.max_concurrent_multipath_paths.is_some()
2296            && self.peer_params.initial_max_path_id.is_some()
2297    }
2298
2299    fn on_ack_received(
2300        &mut self,
2301        now: Instant,
2302        space: SpaceId,
2303        ack: frame::Ack,
2304    ) -> Result<(), TransportError> {
2305        // All ACKs are referencing path 0
2306        let path = PathId::ZERO;
2307        self.inner_on_ack_received(now, space, path, ack)
2308    }
2309
2310    fn on_path_ack_received(
2311        &mut self,
2312        now: Instant,
2313        space: SpaceId,
2314        path_ack: frame::PathAck,
2315    ) -> Result<(), TransportError> {
2316        let (ack, path) = path_ack.into_ack();
2317        self.inner_on_ack_received(now, space, path, ack)
2318    }
2319
2320    /// Handles an ACK frame acknowledging packets sent on *path*.
2321    fn inner_on_ack_received(
2322        &mut self,
2323        now: Instant,
2324        space: SpaceId,
2325        path: PathId,
2326        ack: frame::Ack,
2327    ) -> Result<(), TransportError> {
2328        if self.abandoned_paths.contains(&path) {
2329            // See also https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.4.3-3
2330            // > PATH_ACK frames received with an abandoned path ID are silently ignored, as specified in Section 4.
2331            trace!("silently ignoring PATH_ACK on abandoned path");
2332            return Ok(());
2333        }
2334        if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2335            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2336        }
2337        let new_largest = {
2338            let space = &mut self.spaces[space].for_path(path);
2339            if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2340                space.largest_acked_packet = Some(ack.largest);
2341                if let Some(info) = space.sent_packets.get(ack.largest) {
2342                    // This should always succeed, but a misbehaving peer might ACK a packet we
2343                    // haven't sent. At worst, that will result in us spuriously reducing the
2344                    // congestion window.
2345                    space.largest_acked_packet_sent = info.time_sent;
2346                }
2347                true
2348            } else {
2349                false
2350            }
2351        };
2352
2353        if self.detect_spurious_loss(&ack, space, path) {
2354            self.path_data_mut(path)
2355                .congestion
2356                .on_spurious_congestion_event();
2357        }
2358
2359        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
2360        let mut newly_acked = ArrayRangeSet::new();
2361        for range in ack.iter() {
2362            self.spaces[space].for_path(path).check_ack(range.clone())?;
2363            for (pn, _) in self.spaces[space]
2364                .for_path(path)
2365                .sent_packets
2366                .iter_range(range)
2367            {
2368                newly_acked.insert_one(pn);
2369            }
2370        }
2371
2372        if newly_acked.is_empty() {
2373            return Ok(());
2374        }
2375
2376        let mut ack_eliciting_acked = false;
2377        for packet in newly_acked.elts() {
2378            if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2379                for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2380                    // Assume ACKs for all packets below the largest acknowledged in
2381                    // `packet` have been received. This can cause the peer to spuriously
2382                    // retransmit if some of our earlier ACKs were lost, but allows for
2383                    // simpler state tracking. See discussion at
2384                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
2385                    if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2386                        pns.pending_acks.subtract_below(*acked_pn);
2387                    }
2388                }
2389                ack_eliciting_acked |= info.ack_eliciting;
2390
2391                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
2392                let path_data = self.path_data_mut(path);
2393                let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2394                if mtu_updated {
2395                    path_data
2396                        .congestion
2397                        .on_mtu_update(path_data.mtud.current_mtu());
2398                }
2399
2400                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
2401                self.ack_frequency.on_acked(path, packet);
2402
2403                self.on_packet_acked(now, path, info);
2404            }
2405        }
2406
2407        let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2408        let app_limited = self.app_limited;
2409        let path_data = self.path_data_mut(path);
2410        let in_flight = path_data.in_flight.bytes;
2411
2412        path_data
2413            .congestion
2414            .on_end_acks(now, in_flight, app_limited, largest_ackd);
2415
2416        if new_largest && ack_eliciting_acked {
2417            let ack_delay = if space != SpaceId::Data {
2418                Duration::from_micros(0)
2419            } else {
2420                cmp::min(
2421                    self.ack_frequency.peer_max_ack_delay,
2422                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2423                )
2424            };
2425            let rtt = now.saturating_duration_since(
2426                self.spaces[space].for_path(path).largest_acked_packet_sent,
2427            );
2428
2429            let next_pn = self.spaces[space].for_path(path).next_packet_number;
2430            let path_data = self.path_data_mut(path);
2431            // TODO(@divma): should be a method of path, should be contained in a single place
2432            path_data.rtt.update(ack_delay, rtt);
2433            if path_data.first_packet_after_rtt_sample.is_none() {
2434                path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2435            }
2436        }
2437
2438        // Must be called before crypto/pto_count are clobbered
2439        self.detect_lost_packets(now, space, path, true);
2440
2441        if self.peer_completed_address_validation(path) {
2442            self.path_data_mut(path).pto_count = 0;
2443        }
2444
2445        // Explicit congestion notification
2446        // TODO(@divma): this code is a good example of logic that should be contained in a single
2447        // place but it's split between the path data and the packet number space data, we should
2448        // find a way to make this work without two lookups
2449        if self.path_data(path).sending_ecn {
2450            if let Some(ecn) = ack.ecn {
2451                // We only examine ECN counters from ACKs that we are certain we received in transmit
2452                // order, allowing us to compute an increase in ECN counts to compare against the number
2453                // of newly acked packets that remains well-defined in the presence of arbitrary packet
2454                // reordering.
2455                if new_largest {
2456                    let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2457                    self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2458                }
2459            } else {
2460                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
2461                debug!("ECN not acknowledged by peer");
2462                self.path_data_mut(path).sending_ecn = false;
2463            }
2464        }
2465
2466        self.set_loss_detection_timer(now, path);
2467        Ok(())
2468    }
2469
2470    fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2471        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2472
2473        if lost_packets.is_empty() {
2474            return false;
2475        }
2476
2477        for range in ack.iter() {
2478            let spurious_losses: Vec<u64> = lost_packets
2479                .iter_range(range.clone())
2480                .map(|(pn, _info)| pn)
2481                .collect();
2482
2483            for pn in spurious_losses {
2484                lost_packets.remove(pn);
2485            }
2486        }
2487
2488        // If this ACK frame acknowledged all deemed lost packets,
2489        // then we have raised a spurious congestion event in the past.
2490        // We cannot conclude when there are remaining packets,
2491        // but future ACK frames might indicate a spurious loss detection.
2492        lost_packets.is_empty()
2493    }
2494
2495    /// Drain lost packets that we reasonably think will never arrive
2496    ///
2497    /// The current criterion is copied from `msquic`:
2498    /// discard packets that were sent earlier than 2 probe timeouts ago.
2499    fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2500        let two_pto = 2 * self.path_data(path).rtt.pto_base();
2501
2502        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2503        lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2504    }
2505
2506    /// Process a new ECN block from an in-order ACK
2507    fn process_ecn(
2508        &mut self,
2509        now: Instant,
2510        space: SpaceId,
2511        path: PathId,
2512        newly_acked: u64,
2513        ecn: frame::EcnCounts,
2514        largest_sent_time: Instant,
2515    ) {
2516        match self.spaces[space]
2517            .for_path(path)
2518            .detect_ecn(newly_acked, ecn)
2519        {
2520            Err(e) => {
2521                debug!("halting ECN due to verification failure: {}", e);
2522
2523                self.path_data_mut(path).sending_ecn = false;
2524                // Wipe out the existing value because it might be garbage and could interfere with
2525                // future attempts to use ECN on new paths.
2526                self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2527            }
2528            Ok(false) => {}
2529            Ok(true) => {
2530                self.path_stats.entry(path).or_default().congestion_events += 1;
2531                self.path_data_mut(path).congestion.on_congestion_event(
2532                    now,
2533                    largest_sent_time,
2534                    false,
2535                    true,
2536                    0,
2537                );
2538            }
2539        }
2540    }
2541
2542    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
2543    // high-latency handshakes
2544    fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2545        self.paths
2546            .get_mut(&path_id)
2547            .expect("known path")
2548            .remove_in_flight(&info);
2549        let app_limited = self.app_limited;
2550        let path = self.path_data_mut(path_id);
2551        if info.ack_eliciting && !path.is_validating_path() {
2552            // Only pass ACKs to the congestion controller if we are not validating the current
2553            // path, so as to ignore any ACKs from older paths still coming in.
2554            let rtt = path.rtt;
2555            path.congestion
2556                .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2557        }
2558
2559        // Update state for confirmed delivery of frames
2560        if let Some(retransmits) = info.retransmits.get() {
2561            for (id, _) in retransmits.reset_stream.iter() {
2562                self.streams.reset_acked(*id);
2563            }
2564        }
2565
2566        for frame in info.stream_frames {
2567            self.streams.received_ack_of(frame);
2568        }
2569    }
2570
2571    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2572        let start = if self.zero_rtt_crypto.is_some() {
2573            now
2574        } else {
2575            self.prev_crypto
2576                .as_ref()
2577                .expect("no previous keys")
2578                .end_packet
2579                .as_ref()
2580                .expect("update not acknowledged yet")
2581                .1
2582        };
2583
2584        // QUIC-MULTIPATH § 2.5 Key Phase Update Process: use largest PTO off all paths.
2585        self.timers.set(
2586            Timer::Conn(ConnTimer::KeyDiscard),
2587            start + self.pto_max_path(space, false) * 3,
2588            self.qlog.with_time(now),
2589        );
2590    }
2591
2592    /// Handle a [`PathTimer::LossDetection`] timeout.
2593    ///
2594    /// This timer expires for two reasons:
2595    /// - An ACK-eliciting packet we sent should be considered lost.
2596    /// - The PTO may have expired and a tail-loss probe needs to be scheduled.
2597    ///
2598    /// The former needs us to schedule re-transmission of the lost data.
2599    ///
2600    /// The latter means we have not received an ACK for an ack-eliciting packet we sent
2601    /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
2602    /// packet, to try and elicit new acknowledgements. These new acknowledgements will
2603    /// indicate whether the previously sent packets were lost or not.
2604    fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2605        if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2606            // Time threshold loss Detection
2607            self.detect_lost_packets(now, pn_space, path_id, false);
2608            self.set_loss_detection_timer(now, path_id);
2609            return;
2610        }
2611
2612        let (_, space) = match self.pto_time_and_space(now, path_id) {
2613            Some(x) => x,
2614            None => {
2615                error!(%path_id, "PTO expired while unset");
2616                return;
2617            }
2618        };
2619        trace!(
2620            in_flight = self.path_data(path_id).in_flight.bytes,
2621            count = self.path_data(path_id).pto_count,
2622            ?space,
2623            %path_id,
2624            "PTO fired"
2625        );
2626
2627        let count = match self.path_data(path_id).in_flight.ack_eliciting {
2628            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
2629            // deadlock preventions
2630            0 => {
2631                debug_assert!(!self.peer_completed_address_validation(path_id));
2632                1
2633            }
2634            // Conventional loss probe
2635            _ => 2,
2636        };
2637        let pns = self.spaces[space].for_path(path_id);
2638        pns.loss_probes = pns.loss_probes.saturating_add(count);
2639        let path_data = self.path_data_mut(path_id);
2640        path_data.pto_count = path_data.pto_count.saturating_add(1);
2641        self.set_loss_detection_timer(now, path_id);
2642    }
2643
2644    /// Detect any lost packets
2645    ///
2646    /// There are two cases in which we detects lost packets:
2647    ///
2648    /// - We received an ACK packet.
2649    /// - The [`PathTimer::LossDetection`] timer expired. So there is an un-acknowledged packet
2650    ///   that was followed by an acknowledged packet. The loss timer for this
2651    ///   un-acknowledged packet expired and we need to detect that packet as lost.
2652    ///
2653    /// Packets are lost if they are both (See RFC9002 §6.1):
2654    ///
2655    /// - Unacknowledged, in flight and sent prior to an acknowledged packet.
2656    /// - Old enough by either:
2657    ///   - Having a packet number [`TransportConfig::packet_threshold`] lower then the last
2658    ///     acknowledged packet.
2659    ///   - Being sent [`TransportConfig::time_threshold`] * RTT in the past.
2660    fn detect_lost_packets(
2661        &mut self,
2662        now: Instant,
2663        pn_space: SpaceId,
2664        path_id: PathId,
2665        due_to_ack: bool,
2666    ) {
2667        let mut lost_packets = Vec::<u64>::new();
2668        let mut lost_mtu_probe = None;
2669        let mut in_persistent_congestion = false;
2670        let mut size_of_lost_packets = 0u64;
2671        self.spaces[pn_space].for_path(path_id).loss_time = None;
2672
2673        // Find all the lost packets, populating all variables initialised above.
2674
2675        let path = self.path_data(path_id);
2676        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2677        let loss_delay = path
2678            .rtt
2679            .conservative()
2680            .mul_f32(self.config.time_threshold)
2681            .max(TIMER_GRANULARITY);
2682        let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2683
2684        let largest_acked_packet = self.spaces[pn_space]
2685            .for_path(path_id)
2686            .largest_acked_packet
2687            .expect("detect_lost_packets only to be called if path received at least one ACK");
2688        let packet_threshold = self.config.packet_threshold as u64;
2689
2690        // InPersistentCongestion: Determine if all packets in the time period before the newest
2691        // lost packet, including the edges, are marked lost. PTO computation must always
2692        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
2693        let congestion_period = self
2694            .pto(SpaceId::Data, path_id)
2695            .saturating_mul(self.config.persistent_congestion_threshold);
2696        let mut persistent_congestion_start: Option<Instant> = None;
2697        let mut prev_packet = None;
2698        let space = self.spaces[pn_space].for_path(path_id);
2699
2700        for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2701            if prev_packet != Some(packet.wrapping_sub(1)) {
2702                // An intervening packet was acknowledged
2703                persistent_congestion_start = None;
2704            }
2705
2706            // Packets sent before now - loss_delay are deemed lost.
2707            // However, we avoid subtraction as it can panic and there's no
2708            // saturating equivalent of this subtraction operation with a Duration.
2709            let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2710            if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2711                // The packet should be declared lost.
2712                if Some(packet) == in_flight_mtu_probe {
2713                    // Lost MTU probes are not included in `lost_packets`, because they
2714                    // should not trigger a congestion control response
2715                    lost_mtu_probe = in_flight_mtu_probe;
2716                } else {
2717                    lost_packets.push(packet);
2718                    size_of_lost_packets += info.size as u64;
2719                    if info.ack_eliciting && due_to_ack {
2720                        match persistent_congestion_start {
2721                            // Two ACK-eliciting packets lost more than
2722                            // congestion_period apart, with no ACKed packets in between
2723                            Some(start) if info.time_sent - start > congestion_period => {
2724                                in_persistent_congestion = true;
2725                            }
2726                            // Persistent congestion must start after the first RTT sample
2727                            None if first_packet_after_rtt_sample
2728                                .is_some_and(|x| x < (pn_space, packet)) =>
2729                            {
2730                                persistent_congestion_start = Some(info.time_sent);
2731                            }
2732                            _ => {}
2733                        }
2734                    }
2735                }
2736            } else {
2737                // The packet should not yet be declared lost.
2738                if space.loss_time.is_none() {
2739                    // Since we iterate in order the lowest packet number's loss time will
2740                    // always be the earliest.
2741                    space.loss_time = Some(info.time_sent + loss_delay);
2742                }
2743                persistent_congestion_start = None;
2744            }
2745
2746            prev_packet = Some(packet);
2747        }
2748
2749        self.handle_lost_packets(
2750            pn_space,
2751            path_id,
2752            now,
2753            lost_packets,
2754            lost_mtu_probe,
2755            loss_delay,
2756            in_persistent_congestion,
2757            size_of_lost_packets,
2758        );
2759    }
2760
2761    /// Drops the path state, declaring any remaining in-flight packets as lost
2762    fn discard_path(&mut self, path_id: PathId, now: Instant) {
2763        trace!(%path_id, "dropping path state");
2764        let path = self.path_data(path_id);
2765        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2766
2767        let mut size_of_lost_packets = 0u64; // add to path_stats.lost_bytes;
2768        let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2769            .for_path(path_id)
2770            .sent_packets
2771            .iter()
2772            .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2773            .map(|(pn, info)| {
2774                size_of_lost_packets += info.size as u64;
2775                pn
2776            })
2777            .collect();
2778
2779        if !lost_pns.is_empty() {
2780            trace!(
2781                %path_id,
2782                count = lost_pns.len(),
2783                lost_bytes = size_of_lost_packets,
2784                "packets lost on path abandon"
2785            );
2786            self.handle_lost_packets(
2787                SpaceId::Data,
2788                path_id,
2789                now,
2790                lost_pns,
2791                in_flight_mtu_probe,
2792                Duration::ZERO,
2793                false,
2794                size_of_lost_packets,
2795            );
2796        }
2797        self.paths.remove(&path_id);
2798        self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2799
2800        let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2801        self.events.push_back(
2802            PathEvent::Abandoned {
2803                id: path_id,
2804                path_stats,
2805            }
2806            .into(),
2807        );
2808    }
2809
2810    fn handle_lost_packets(
2811        &mut self,
2812        pn_space: SpaceId,
2813        path_id: PathId,
2814        now: Instant,
2815        lost_packets: Vec<u64>,
2816        lost_mtu_probe: Option<u64>,
2817        loss_delay: Duration,
2818        in_persistent_congestion: bool,
2819        size_of_lost_packets: u64,
2820    ) {
2821        debug_assert!(
2822            {
2823                let mut sorted = lost_packets.clone();
2824                sorted.sort();
2825                sorted == lost_packets
2826            },
2827            "lost_packets must be sorted"
2828        );
2829
2830        self.drain_lost_packets(now, pn_space, path_id);
2831
2832        // OnPacketsLost
2833        if let Some(largest_lost) = lost_packets.last().cloned() {
2834            let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2835            let largest_lost_sent = self.spaces[pn_space]
2836                .for_path(path_id)
2837                .sent_packets
2838                .get(largest_lost)
2839                .unwrap()
2840                .time_sent;
2841            let path_stats = self.path_stats.entry(path_id).or_default();
2842            path_stats.lost_packets += lost_packets.len() as u64;
2843            path_stats.lost_bytes += size_of_lost_packets;
2844            trace!(
2845                %path_id,
2846                count = lost_packets.len(),
2847                lost_bytes = size_of_lost_packets,
2848                "packets lost",
2849            );
2850
2851            for &packet in &lost_packets {
2852                let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2853                    continue;
2854                };
2855                self.qlog
2856                    .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2857                self.paths
2858                    .get_mut(&path_id)
2859                    .unwrap()
2860                    .remove_in_flight(&info);
2861
2862                for frame in info.stream_frames {
2863                    self.streams.retransmit(frame);
2864                }
2865                self.spaces[pn_space].pending |= info.retransmits;
2866                self.path_data_mut(path_id)
2867                    .mtud
2868                    .on_non_probe_lost(packet, info.size);
2869
2870                self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2871                    packet,
2872                    LostPacket {
2873                        time_sent: info.time_sent,
2874                    },
2875                );
2876            }
2877
2878            let path = self.path_data_mut(path_id);
2879            if path.mtud.black_hole_detected(now) {
2880                path.congestion.on_mtu_update(path.mtud.current_mtu());
2881                if let Some(max_datagram_size) = self.datagrams().max_size() {
2882                    self.datagrams.drop_oversized(max_datagram_size);
2883                }
2884                self.path_stats
2885                    .entry(path_id)
2886                    .or_default()
2887                    .black_holes_detected += 1;
2888            }
2889
2890            // Don't apply congestion penalty for lost ack-only packets
2891            let lost_ack_eliciting =
2892                old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2893
2894            if lost_ack_eliciting {
2895                self.path_stats
2896                    .entry(path_id)
2897                    .or_default()
2898                    .congestion_events += 1;
2899                self.path_data_mut(path_id).congestion.on_congestion_event(
2900                    now,
2901                    largest_lost_sent,
2902                    in_persistent_congestion,
2903                    false,
2904                    size_of_lost_packets,
2905                );
2906            }
2907        }
2908
2909        // Handle a lost MTU probe
2910        if let Some(packet) = lost_mtu_probe {
2911            let info = self.spaces[SpaceId::Data]
2912                .for_path(path_id)
2913                .take(packet)
2914                .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and
2915            // therefore must not have been removed yet
2916            self.paths
2917                .get_mut(&path_id)
2918                .unwrap()
2919                .remove_in_flight(&info);
2920            self.path_data_mut(path_id).mtud.on_probe_lost();
2921            self.path_stats
2922                .entry(path_id)
2923                .or_default()
2924                .lost_plpmtud_probes += 1;
2925        }
2926    }
2927
2928    /// Returns the earliest time packets should be declared lost for all spaces on a path.
2929    ///
2930    /// If a path has an acknowledged packet with any prior un-acknowledged packets, the
2931    /// earliest un-acknowledged packet can be declared lost after a timeout has elapsed.
2932    /// The time returned is when this packet should be declared lost.
2933    fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2934        SpaceId::iter()
2935            .filter_map(|id| {
2936                self.spaces[id]
2937                    .number_spaces
2938                    .get(&path_id)
2939                    .and_then(|pns| pns.loss_time)
2940                    .map(|time| (time, id))
2941            })
2942            .min_by_key(|&(time, _)| time)
2943    }
2944
2945    /// Returns the earliest next PTO should fire for all spaces on a path.
2946    fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2947        let path = self.path(path_id)?;
2948        let pto_count = path.pto_count;
2949        let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2950        let mut duration = path.rtt.pto_base() * backoff;
2951
2952        if path_id == PathId::ZERO
2953            && path.in_flight.ack_eliciting == 0
2954            && !self.peer_completed_address_validation(PathId::ZERO)
2955        {
2956            // Address Validation during Connection Establishment:
2957            // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
2958            // deadlock if an Initial or Handshake packet from the server is lost and the
2959            // server can not send more due to its anti-amplification limit the client must
2960            // send another packet on PTO.
2961            let space = match self.highest_space {
2962                SpaceId::Handshake => SpaceId::Handshake,
2963                _ => SpaceId::Initial,
2964            };
2965
2966            return Some((now + duration, space));
2967        }
2968
2969        let mut result = None;
2970        for space in SpaceId::iter() {
2971            let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2972                continue;
2973            };
2974
2975            if !pns.has_in_flight() {
2976                continue;
2977            }
2978            if space == SpaceId::Data {
2979                // Skip ApplicationData until handshake completes.
2980                if self.is_handshaking() {
2981                    return result;
2982                }
2983                // Include max_ack_delay and backoff for ApplicationData.
2984                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2985            }
2986            let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
2987                continue;
2988            };
2989            let pto = last_ack_eliciting + duration;
2990            if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2991                if path.anti_amplification_blocked(1) {
2992                    // Nothing would be able to be sent.
2993                    continue;
2994                }
2995                if path.in_flight.ack_eliciting == 0 {
2996                    // Nothing ack-eliciting, no PTO to arm/fire.
2997                    continue;
2998                }
2999                result = Some((pto, space));
3000            }
3001        }
3002        result
3003    }
3004
3005    fn peer_completed_address_validation(&self, path: PathId) -> bool {
3006        // TODO(flub): This logic needs updating for multipath
3007        if self.side.is_server() || self.state.is_closed() {
3008            return true;
3009        }
3010        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
3011        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
3012        self.spaces[SpaceId::Handshake]
3013            .path_space(PathId::ZERO)
3014            .and_then(|pns| pns.largest_acked_packet)
3015            .is_some()
3016            || self.spaces[SpaceId::Data]
3017                .path_space(path)
3018                .and_then(|pns| pns.largest_acked_packet)
3019                .is_some()
3020            || (self.spaces[SpaceId::Data].crypto.is_some()
3021                && self.spaces[SpaceId::Handshake].crypto.is_none())
3022    }
3023
3024    /// Resets the the [`PathTimer::LossDetection`] timer to the next instant it may be needed
3025    ///
3026    /// The timer must fire if either:
3027    /// - An ack-eliciting packet we sent needs to be declared lost.
3028    /// - A tail-loss probe needs to be sent.
3029    ///
3030    /// See [`Connection::on_loss_detection_timeout`] for details.
3031    fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3032        if self.state.is_closed() {
3033            // No loss detection takes place on closed connections, and `close_common` already
3034            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
3035            // reordered packet being handled by state-insensitive code.
3036            return;
3037        }
3038
3039        if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3040            // Time threshold loss detection.
3041            self.timers.set(
3042                Timer::PerPath(path_id, PathTimer::LossDetection),
3043                loss_time,
3044                self.qlog.with_time(now),
3045            );
3046            return;
3047        }
3048
3049        // Determine which PN space to arm PTO for.
3050        // Calculate PTO duration
3051        if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3052            self.timers.set(
3053                Timer::PerPath(path_id, PathTimer::LossDetection),
3054                timeout,
3055                self.qlog.with_time(now),
3056            );
3057        } else {
3058            self.timers.stop(
3059                Timer::PerPath(path_id, PathTimer::LossDetection),
3060                self.qlog.with_time(now),
3061            );
3062        }
3063    }
3064
3065    /// The maximum probe timeout across all paths
3066    ///
3067    /// If `is_closing` is set to `true` it will filter out paths that have not yet been used.
3068    ///
3069    /// See [`Connection::pto`]
3070    fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3071        match space {
3072            SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3073            SpaceId::Data => self
3074                .paths
3075                .iter()
3076                .filter_map(|(path_id, state)| {
3077                    if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3078                        // If we are closing and haven't sent anything yet, do not include
3079                        None
3080                    } else {
3081                        let pto = self.pto(space, *path_id);
3082                        Some(pto)
3083                    }
3084                })
3085                .max()
3086                .expect("there should be at least one path"),
3087        }
3088    }
3089
3090    /// Probe Timeout
3091    ///
3092    /// The PTO is logically the time in which you'd expect to receive an acknowledgement
3093    /// for a packet. So approximately RTT + max_ack_delay.
3094    fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3095        let max_ack_delay = match space {
3096            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3097            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3098        };
3099        self.path_data(path_id).rtt.pto_base() + max_ack_delay
3100    }
3101
3102    fn on_packet_authenticated(
3103        &mut self,
3104        now: Instant,
3105        space_id: SpaceId,
3106        path_id: PathId,
3107        ecn: Option<EcnCodepoint>,
3108        packet: Option<u64>,
3109        spin: bool,
3110        is_1rtt: bool,
3111    ) {
3112        self.total_authed_packets += 1;
3113        if let Some(AbandonState::ExpectingPathAbandon { deadline }) = self
3114            .paths
3115            .get(&path_id)
3116            .map(|path| &path.data.abandon_state)
3117        {
3118            if now > *deadline {
3119                warn!("received data on path which we abandoned more than 3 * PTO ago");
3120                // The peer failed to respond with a PATH_ABANDON in time.
3121                if !self.state.is_closed() {
3122                    // TODO(flub): What should the error code be?
3123                    self.state.move_to_closed(TransportError::NO_ERROR(
3124                        "peer failed to respond with PATH_ABANDON in time",
3125                    ));
3126                    self.close_common();
3127                    self.set_close_timer(now);
3128                    self.close = true;
3129                }
3130                return;
3131            }
3132        }
3133
3134        self.reset_keep_alive(path_id, now);
3135        self.reset_idle_timeout(now, space_id, path_id);
3136        self.permit_idle_reset = true;
3137        self.receiving_ecn |= ecn.is_some();
3138        if let Some(x) = ecn {
3139            let space = &mut self.spaces[space_id];
3140            space.for_path(path_id).ecn_counters += x;
3141
3142            if x.is_ce() {
3143                space
3144                    .for_path(path_id)
3145                    .pending_acks
3146                    .set_immediate_ack_required();
3147            }
3148        }
3149
3150        let packet = match packet {
3151            Some(x) => x,
3152            None => return,
3153        };
3154        match &self.side {
3155            ConnectionSide::Client { .. } => {
3156                // If we received a handshake packet that authenticated, then we're talking to
3157                // the real server.  From now on we should no longer allow the server to migrate
3158                // its address.
3159                if space_id == SpaceId::Handshake {
3160                    if let Some(hs) = self.state.as_handshake_mut() {
3161                        hs.allow_server_migration = false;
3162                    }
3163                }
3164            }
3165            ConnectionSide::Server { .. } => {
3166                if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3167                {
3168                    // A server stops sending and processing Initial packets when it receives its first Handshake packet.
3169                    self.discard_space(now, SpaceId::Initial);
3170                }
3171                if self.zero_rtt_crypto.is_some() && is_1rtt {
3172                    // Discard 0-RTT keys soon after receiving a 1-RTT packet
3173                    self.set_key_discard_timer(now, space_id)
3174                }
3175            }
3176        }
3177        let space = self.spaces[space_id].for_path(path_id);
3178        space.pending_acks.insert_one(packet, now);
3179        if packet >= space.rx_packet.unwrap_or_default() {
3180            space.rx_packet = Some(packet);
3181            // Update outgoing spin bit, inverting iff we're the client
3182            self.spin = self.side.is_client() ^ spin;
3183        }
3184    }
3185
3186    /// Resets the idle timeout timers
3187    ///
3188    /// Without multipath there is only the connection-wide idle timeout. When multipath is
3189    /// enabled there is an additional per-path idle timeout.
3190    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3191        // First reset the global idle timeout.
3192        if let Some(timeout) = self.idle_timeout {
3193            if self.state.is_closed() {
3194                self.timers
3195                    .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3196            } else {
3197                let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3198                self.timers.set(
3199                    Timer::Conn(ConnTimer::Idle),
3200                    now + dt,
3201                    self.qlog.with_time(now),
3202                );
3203            }
3204        }
3205
3206        // Now handle the per-path state
3207        if let Some(timeout) = self.path_data(path_id).idle_timeout {
3208            if self.state.is_closed() {
3209                self.timers.stop(
3210                    Timer::PerPath(path_id, PathTimer::PathIdle),
3211                    self.qlog.with_time(now),
3212                );
3213            } else {
3214                let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3215                self.timers.set(
3216                    Timer::PerPath(path_id, PathTimer::PathIdle),
3217                    now + dt,
3218                    self.qlog.with_time(now),
3219                );
3220            }
3221        }
3222    }
3223
3224    /// Resets both the [`ConnTimer::KeepAlive`] and [`PathTimer::PathKeepAlive`] timers
3225    fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3226        if !self.state.is_established() {
3227            return;
3228        }
3229
3230        if let Some(interval) = self.config.keep_alive_interval {
3231            self.timers.set(
3232                Timer::Conn(ConnTimer::KeepAlive),
3233                now + interval,
3234                self.qlog.with_time(now),
3235            );
3236        }
3237
3238        if let Some(interval) = self.path_data(path_id).keep_alive {
3239            self.timers.set(
3240                Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3241                now + interval,
3242                self.qlog.with_time(now),
3243            );
3244        }
3245    }
3246
3247    /// Sets the timer for when a previously issued CID should be retired next
3248    fn reset_cid_retirement(&mut self, now: Instant) {
3249        if let Some((_path, t)) = self.next_cid_retirement() {
3250            self.timers.set(
3251                Timer::Conn(ConnTimer::PushNewCid),
3252                t,
3253                self.qlog.with_time(now),
3254            );
3255        }
3256    }
3257
3258    /// The next time when a previously issued CID should be retired
3259    fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3260        self.local_cid_state
3261            .iter()
3262            .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3263            .min_by_key(|(_path_id, timeout)| *timeout)
3264    }
3265
3266    /// Handle the already-decrypted first packet from the client
3267    ///
3268    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
3269    /// efficient.
3270    pub(crate) fn handle_first_packet(
3271        &mut self,
3272        now: Instant,
3273        remote: SocketAddr,
3274        ecn: Option<EcnCodepoint>,
3275        packet_number: u64,
3276        packet: InitialPacket,
3277        remaining: Option<BytesMut>,
3278    ) -> Result<(), ConnectionError> {
3279        let span = trace_span!("first recv");
3280        let _guard = span.enter();
3281        debug_assert!(self.side.is_server());
3282        let len = packet.header_data.len() + packet.payload.len();
3283        let path_id = PathId::ZERO;
3284        self.path_data_mut(path_id).total_recvd = len as u64;
3285
3286        if let Some(hs) = self.state.as_handshake_mut() {
3287            hs.expected_token = packet.header.token.clone();
3288        } else {
3289            unreachable!("first packet must be delivered in Handshake state");
3290        }
3291
3292        // The first packet is always on PathId::ZERO
3293        self.on_packet_authenticated(
3294            now,
3295            SpaceId::Initial,
3296            path_id,
3297            ecn,
3298            Some(packet_number),
3299            false,
3300            false,
3301        );
3302
3303        let packet: Packet = packet.into();
3304
3305        let mut qlog = QlogRecvPacket::new(len);
3306        qlog.header(&packet.header, Some(packet_number), path_id);
3307
3308        self.process_decrypted_packet(
3309            now,
3310            remote,
3311            path_id,
3312            Some(packet_number),
3313            packet,
3314            &mut qlog,
3315        )?;
3316        self.qlog.emit_packet_received(qlog, now);
3317        if let Some(data) = remaining {
3318            self.handle_coalesced(now, remote, path_id, ecn, data);
3319        }
3320
3321        self.qlog.emit_recovery_metrics(
3322            path_id,
3323            &mut self.paths.get_mut(&path_id).unwrap().data,
3324            now,
3325        );
3326
3327        Ok(())
3328    }
3329
3330    fn init_0rtt(&mut self, now: Instant) {
3331        let (header, packet) = match self.crypto.early_crypto() {
3332            Some(x) => x,
3333            None => return,
3334        };
3335        if self.side.is_client() {
3336            match self.crypto.transport_parameters() {
3337                Ok(params) => {
3338                    let params = params
3339                        .expect("crypto layer didn't supply transport parameters with ticket");
3340                    // Certain values must not be cached
3341                    let params = TransportParameters {
3342                        initial_src_cid: None,
3343                        original_dst_cid: None,
3344                        preferred_address: None,
3345                        retry_src_cid: None,
3346                        stateless_reset_token: None,
3347                        min_ack_delay: None,
3348                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3349                        max_ack_delay: TransportParameters::default().max_ack_delay,
3350                        initial_max_path_id: None,
3351                        ..params
3352                    };
3353                    self.set_peer_params(params);
3354                    self.qlog.emit_peer_transport_params_restored(self, now);
3355                }
3356                Err(e) => {
3357                    error!("session ticket has malformed transport parameters: {}", e);
3358                    return;
3359                }
3360            }
3361        }
3362        trace!("0-RTT enabled");
3363        self.zero_rtt_enabled = true;
3364        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3365    }
3366
3367    fn read_crypto(
3368        &mut self,
3369        space: SpaceId,
3370        crypto: &frame::Crypto,
3371        payload_len: usize,
3372    ) -> Result<(), TransportError> {
3373        let expected = if !self.state.is_handshake() {
3374            SpaceId::Data
3375        } else if self.highest_space == SpaceId::Initial {
3376            SpaceId::Initial
3377        } else {
3378            // On the server, self.highest_space can be Data after receiving the client's first
3379            // flight, but we expect Handshake CRYPTO until the handshake is complete.
3380            SpaceId::Handshake
3381        };
3382        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
3383        // packets are illegal, and we don't process 1-RTT packets until the handshake is
3384        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
3385        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3386
3387        let end = crypto.offset + crypto.data.len() as u64;
3388        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3389            warn!(
3390                "received new {:?} CRYPTO data when expecting {:?}",
3391                space, expected
3392            );
3393            return Err(TransportError::PROTOCOL_VIOLATION(
3394                "new data at unexpected encryption level",
3395            ));
3396        }
3397
3398        let space = &mut self.spaces[space];
3399        let max = end.saturating_sub(space.crypto_stream.bytes_read());
3400        if max > self.config.crypto_buffer_size as u64 {
3401            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3402        }
3403
3404        space
3405            .crypto_stream
3406            .insert(crypto.offset, crypto.data.clone(), payload_len);
3407        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3408            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3409            if self.crypto.read_handshake(&chunk.bytes)? {
3410                self.events.push_back(Event::HandshakeDataReady);
3411            }
3412        }
3413
3414        Ok(())
3415    }
3416
3417    fn write_crypto(&mut self) {
3418        loop {
3419            let space = self.highest_space;
3420            let mut outgoing = Vec::new();
3421            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3422                match space {
3423                    SpaceId::Initial => {
3424                        self.upgrade_crypto(SpaceId::Handshake, crypto);
3425                    }
3426                    SpaceId::Handshake => {
3427                        self.upgrade_crypto(SpaceId::Data, crypto);
3428                    }
3429                    _ => unreachable!("got updated secrets during 1-RTT"),
3430                }
3431            }
3432            if outgoing.is_empty() {
3433                if space == self.highest_space {
3434                    break;
3435                } else {
3436                    // Keys updated, check for more data to send
3437                    continue;
3438                }
3439            }
3440            let offset = self.spaces[space].crypto_offset;
3441            let outgoing = Bytes::from(outgoing);
3442            if let Some(hs) = self.state.as_handshake_mut() {
3443                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3444                    hs.client_hello = Some(outgoing.clone());
3445                }
3446            }
3447            self.spaces[space].crypto_offset += outgoing.len() as u64;
3448            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3449            self.spaces[space].pending.crypto.push_back(frame::Crypto {
3450                offset,
3451                data: outgoing,
3452            });
3453        }
3454    }
3455
3456    /// Switch to stronger cryptography during handshake
3457    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3458        debug_assert!(
3459            self.spaces[space].crypto.is_none(),
3460            "already reached packet space {space:?}"
3461        );
3462        trace!("{:?} keys ready", space);
3463        if space == SpaceId::Data {
3464            // Precompute the first key update
3465            self.next_crypto = Some(
3466                self.crypto
3467                    .next_1rtt_keys()
3468                    .expect("handshake should be complete"),
3469            );
3470        }
3471
3472        self.spaces[space].crypto = Some(crypto);
3473        debug_assert!(space as usize > self.highest_space as usize);
3474        self.highest_space = space;
3475        if space == SpaceId::Data && self.side.is_client() {
3476            // Discard 0-RTT keys because 1-RTT keys are available.
3477            self.zero_rtt_crypto = None;
3478        }
3479    }
3480
3481    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3482        debug_assert!(space_id != SpaceId::Data);
3483        trace!("discarding {:?} keys", space_id);
3484        if space_id == SpaceId::Initial {
3485            // No longer needed
3486            if let ConnectionSide::Client { token, .. } = &mut self.side {
3487                *token = Bytes::new();
3488            }
3489        }
3490        let space = &mut self.spaces[space_id];
3491        space.crypto = None;
3492        let pns = space.for_path(PathId::ZERO);
3493        pns.time_of_last_ack_eliciting_packet = None;
3494        pns.loss_time = None;
3495        pns.loss_probes = 0;
3496        let sent_packets = mem::take(&mut pns.sent_packets);
3497        let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3498        for (_, packet) in sent_packets.into_iter() {
3499            path.data.remove_in_flight(&packet);
3500        }
3501
3502        self.set_loss_detection_timer(now, PathId::ZERO)
3503    }
3504
3505    fn handle_coalesced(
3506        &mut self,
3507        now: Instant,
3508        remote: SocketAddr,
3509        path_id: PathId,
3510        ecn: Option<EcnCodepoint>,
3511        data: BytesMut,
3512    ) {
3513        self.path_data_mut(path_id)
3514            .inc_total_recvd(data.len() as u64);
3515        let mut remaining = Some(data);
3516        let cid_len = self
3517            .local_cid_state
3518            .values()
3519            .map(|cid_state| cid_state.cid_len())
3520            .next()
3521            .expect("one cid_state must exist");
3522        while let Some(data) = remaining {
3523            match PartialDecode::new(
3524                data,
3525                &FixedLengthConnectionIdParser::new(cid_len),
3526                &[self.version],
3527                self.endpoint_config.grease_quic_bit,
3528            ) {
3529                Ok((partial_decode, rest)) => {
3530                    remaining = rest;
3531                    self.handle_decode(now, remote, path_id, ecn, partial_decode);
3532                }
3533                Err(e) => {
3534                    trace!("malformed header: {}", e);
3535                    return;
3536                }
3537            }
3538        }
3539    }
3540
3541    fn handle_decode(
3542        &mut self,
3543        now: Instant,
3544        remote: SocketAddr,
3545        path_id: PathId,
3546        ecn: Option<EcnCodepoint>,
3547        partial_decode: PartialDecode,
3548    ) {
3549        let qlog = QlogRecvPacket::new(partial_decode.len());
3550        if let Some(decoded) = packet_crypto::unprotect_header(
3551            partial_decode,
3552            &self.spaces,
3553            self.zero_rtt_crypto.as_ref(),
3554            self.peer_params.stateless_reset_token,
3555        ) {
3556            self.handle_packet(
3557                now,
3558                remote,
3559                path_id,
3560                ecn,
3561                decoded.packet,
3562                decoded.stateless_reset,
3563                qlog,
3564            );
3565        }
3566    }
3567
3568    fn handle_packet(
3569        &mut self,
3570        now: Instant,
3571        remote: SocketAddr,
3572        path_id: PathId,
3573        ecn: Option<EcnCodepoint>,
3574        packet: Option<Packet>,
3575        stateless_reset: bool,
3576        mut qlog: QlogRecvPacket,
3577    ) {
3578        self.stats.udp_rx.ios += 1;
3579        if let Some(ref packet) = packet {
3580            trace!(
3581                "got {:?} packet ({} bytes) from {} using id {}",
3582                packet.header.space(),
3583                packet.payload.len() + packet.header_data.len(),
3584                remote,
3585                packet.header.dst_cid(),
3586            );
3587        }
3588
3589        if self.is_handshaking() {
3590            if path_id != PathId::ZERO {
3591                debug!(%remote, %path_id, "discarding multipath packet during handshake");
3592                return;
3593            }
3594            if remote != self.path_data_mut(path_id).remote {
3595                if let Some(hs) = self.state.as_handshake() {
3596                    if hs.allow_server_migration {
3597                        trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3598                        self.path_data_mut(path_id).remote = remote;
3599                        self.qlog.emit_tuple_assigned(path_id, remote, now);
3600                    } else {
3601                        debug!("discarding packet with unexpected remote during handshake");
3602                        return;
3603                    }
3604                } else {
3605                    debug!("discarding packet with unexpected remote during handshake");
3606                    return;
3607                }
3608            }
3609        }
3610
3611        let was_closed = self.state.is_closed();
3612        let was_drained = self.state.is_drained();
3613
3614        let decrypted = match packet {
3615            None => Err(None),
3616            Some(mut packet) => self
3617                .decrypt_packet(now, path_id, &mut packet)
3618                .map(move |number| (packet, number)),
3619        };
3620        let result = match decrypted {
3621            _ if stateless_reset => {
3622                debug!("got stateless reset");
3623                Err(ConnectionError::Reset)
3624            }
3625            Err(Some(e)) => {
3626                warn!("illegal packet: {}", e);
3627                Err(e.into())
3628            }
3629            Err(None) => {
3630                debug!("failed to authenticate packet");
3631                self.authentication_failures += 1;
3632                let integrity_limit = self.spaces[self.highest_space]
3633                    .crypto
3634                    .as_ref()
3635                    .unwrap()
3636                    .packet
3637                    .local
3638                    .integrity_limit();
3639                if self.authentication_failures > integrity_limit {
3640                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3641                } else {
3642                    return;
3643                }
3644            }
3645            Ok((packet, number)) => {
3646                qlog.header(&packet.header, number, path_id);
3647                let span = match number {
3648                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3649                    None => trace_span!("recv", space = ?packet.header.space()),
3650                };
3651                let _guard = span.enter();
3652
3653                let dedup = self.spaces[packet.header.space()]
3654                    .path_space_mut(path_id)
3655                    .map(|pns| &mut pns.dedup);
3656                if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3657                    debug!("discarding possible duplicate packet");
3658                    self.qlog.emit_packet_received(qlog, now);
3659                    return;
3660                } else if self.state.is_handshake() && packet.header.is_short() {
3661                    // TODO: SHOULD buffer these to improve reordering tolerance.
3662                    trace!("dropping short packet during handshake");
3663                    self.qlog.emit_packet_received(qlog, now);
3664                    return;
3665                } else {
3666                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3667                        if let Some(hs) = self.state.as_handshake() {
3668                            if self.side.is_server() && token != &hs.expected_token {
3669                                // Clients must send the same retry token in every Initial. Initial
3670                                // packets can be spoofed, so we discard rather than killing the
3671                                // connection.
3672                                warn!("discarding Initial with invalid retry token");
3673                                self.qlog.emit_packet_received(qlog, now);
3674                                return;
3675                            }
3676                        }
3677                    }
3678
3679                    if !self.state.is_closed() {
3680                        let spin = match packet.header {
3681                            Header::Short { spin, .. } => spin,
3682                            _ => false,
3683                        };
3684
3685                        if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3686                            // Only the client is allowed to open paths
3687                            self.ensure_path(path_id, remote, now, number);
3688                        }
3689                        if self.paths.contains_key(&path_id) {
3690                            self.on_packet_authenticated(
3691                                now,
3692                                packet.header.space(),
3693                                path_id,
3694                                ecn,
3695                                number,
3696                                spin,
3697                                packet.header.is_1rtt(),
3698                            );
3699                        }
3700                    }
3701
3702                    let res = self
3703                        .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3704
3705                    self.qlog.emit_packet_received(qlog, now);
3706                    res
3707                }
3708            }
3709        };
3710
3711        // State transitions for error cases
3712        if let Err(conn_err) = result {
3713            match conn_err {
3714                ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3715                ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3716                ConnectionError::Reset
3717                | ConnectionError::TransportError(TransportError {
3718                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
3719                    ..
3720                }) => {
3721                    self.state.move_to_drained(Some(conn_err));
3722                }
3723                ConnectionError::TimedOut => {
3724                    unreachable!("timeouts aren't generated by packet processing");
3725                }
3726                ConnectionError::TransportError(err) => {
3727                    debug!("closing connection due to transport error: {}", err);
3728                    self.state.move_to_closed(err);
3729                }
3730                ConnectionError::VersionMismatch => {
3731                    self.state.move_to_draining(Some(conn_err));
3732                }
3733                ConnectionError::LocallyClosed => {
3734                    unreachable!("LocallyClosed isn't generated by packet processing");
3735                }
3736                ConnectionError::CidsExhausted => {
3737                    unreachable!("CidsExhausted isn't generated by packet processing");
3738                }
3739            };
3740        }
3741
3742        if !was_closed && self.state.is_closed() {
3743            self.close_common();
3744            if !self.state.is_drained() {
3745                self.set_close_timer(now);
3746            }
3747        }
3748        if !was_drained && self.state.is_drained() {
3749            self.endpoint_events.push_back(EndpointEventInner::Drained);
3750            // Close timer may have been started previously, e.g. if we sent a close and got a
3751            // stateless reset in response
3752            self.timers
3753                .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3754        }
3755
3756        // Transmit CONNECTION_CLOSE if necessary
3757        if matches!(self.state.as_type(), StateType::Closed) {
3758            // If there is no PathData for this PathId the packet was for a brand new
3759            // path. It was a valid packet however, so the remote is valid and we want to
3760            // send CONNECTION_CLOSE.
3761            let path_remote = self
3762                .paths
3763                .get(&path_id)
3764                .map(|p| p.data.remote)
3765                .unwrap_or(remote);
3766            self.close = remote == path_remote;
3767        }
3768    }
3769
3770    fn process_decrypted_packet(
3771        &mut self,
3772        now: Instant,
3773        remote: SocketAddr,
3774        path_id: PathId,
3775        number: Option<u64>,
3776        packet: Packet,
3777        qlog: &mut QlogRecvPacket,
3778    ) -> Result<(), ConnectionError> {
3779        if !self.paths.contains_key(&path_id) {
3780            // There is a chance this is a server side, first (for this path) packet, which would
3781            // be a protocol violation. It's more likely, however, that this is a packet of a
3782            // pruned path
3783            trace!(%path_id, ?number, "discarding packet for unknown path");
3784            return Ok(());
3785        }
3786        let state = match self.state.as_type() {
3787            StateType::Established => {
3788                match packet.header.space() {
3789                    SpaceId::Data => {
3790                        self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3791                    }
3792                    _ if packet.header.has_frames() => {
3793                        self.process_early_payload(now, path_id, packet, qlog)?
3794                    }
3795                    _ => {
3796                        trace!("discarding unexpected pre-handshake packet");
3797                    }
3798                }
3799                return Ok(());
3800            }
3801            StateType::Closed => {
3802                for result in frame::Iter::new(packet.payload.freeze())? {
3803                    let frame = match result {
3804                        Ok(frame) => frame,
3805                        Err(err) => {
3806                            debug!("frame decoding error: {err:?}");
3807                            continue;
3808                        }
3809                    };
3810                    qlog.frame(&frame);
3811
3812                    if let Frame::Padding = frame {
3813                        continue;
3814                    };
3815
3816                    self.stats.frame_rx.record(&frame);
3817
3818                    if let Frame::Close(_error) = frame {
3819                        self.state.move_to_draining(None);
3820                        break;
3821                    }
3822                }
3823                return Ok(());
3824            }
3825            StateType::Draining | StateType::Drained => return Ok(()),
3826            StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3827        };
3828
3829        match packet.header {
3830            Header::Retry {
3831                src_cid: rem_cid, ..
3832            } => {
3833                debug_assert_eq!(path_id, PathId::ZERO);
3834                if self.side.is_server() {
3835                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3836                }
3837
3838                let is_valid_retry = self
3839                    .rem_cids
3840                    .get(&path_id)
3841                    .map(|cids| cids.active())
3842                    .map(|orig_dst_cid| {
3843                        self.crypto.is_valid_retry(
3844                            orig_dst_cid,
3845                            &packet.header_data,
3846                            &packet.payload,
3847                        )
3848                    })
3849                    .unwrap_or_default();
3850                if self.total_authed_packets > 1
3851                            || packet.payload.len() <= 16 // token + 16 byte tag
3852                            || !is_valid_retry
3853                {
3854                    trace!("discarding invalid Retry");
3855                    // - After the client has received and processed an Initial or Retry
3856                    //   packet from the server, it MUST discard any subsequent Retry
3857                    //   packets that it receives.
3858                    // - A client MUST discard a Retry packet with a zero-length Retry Token
3859                    //   field.
3860                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
3861                    //   that cannot be validated
3862                    return Ok(());
3863                }
3864
3865                trace!("retrying with CID {}", rem_cid);
3866                let client_hello = state.client_hello.take().unwrap();
3867                self.retry_src_cid = Some(rem_cid);
3868                self.rem_cids
3869                    .get_mut(&path_id)
3870                    .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3871                    .update_initial_cid(rem_cid);
3872                self.rem_handshake_cid = rem_cid;
3873
3874                let space = &mut self.spaces[SpaceId::Initial];
3875                if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3876                    self.on_packet_acked(now, PathId::ZERO, info);
3877                };
3878
3879                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after
3880                // any retransmitted Initials
3881                self.spaces[SpaceId::Initial] = {
3882                    let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3883                    space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3884                    space.crypto_offset = client_hello.len() as u64;
3885                    space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3886                        .for_path(path_id)
3887                        .next_packet_number;
3888                    space.pending.crypto.push_back(frame::Crypto {
3889                        offset: 0,
3890                        data: client_hello,
3891                    });
3892                    space
3893                };
3894
3895                // Retransmit all 0-RTT data
3896                let zero_rtt = mem::take(
3897                    &mut self.spaces[SpaceId::Data]
3898                        .for_path(PathId::ZERO)
3899                        .sent_packets,
3900                );
3901                for (_, info) in zero_rtt.into_iter() {
3902                    self.paths
3903                        .get_mut(&PathId::ZERO)
3904                        .unwrap()
3905                        .remove_in_flight(&info);
3906                    self.spaces[SpaceId::Data].pending |= info.retransmits;
3907                }
3908                self.streams.retransmit_all_for_0rtt();
3909
3910                let token_len = packet.payload.len() - 16;
3911                let ConnectionSide::Client { ref mut token, .. } = self.side else {
3912                    unreachable!("we already short-circuited if we're server");
3913                };
3914                *token = packet.payload.freeze().split_to(token_len);
3915
3916                self.state = State::handshake(state::Handshake {
3917                    expected_token: Bytes::new(),
3918                    rem_cid_set: false,
3919                    client_hello: None,
3920                    allow_server_migration: true,
3921                });
3922                Ok(())
3923            }
3924            Header::Long {
3925                ty: LongType::Handshake,
3926                src_cid: rem_cid,
3927                dst_cid: loc_cid,
3928                ..
3929            } => {
3930                debug_assert_eq!(path_id, PathId::ZERO);
3931                if rem_cid != self.rem_handshake_cid {
3932                    debug!(
3933                        "discarding packet with mismatched remote CID: {} != {}",
3934                        self.rem_handshake_cid, rem_cid
3935                    );
3936                    return Ok(());
3937                }
3938                self.on_path_validated(path_id);
3939
3940                self.process_early_payload(now, path_id, packet, qlog)?;
3941                if self.state.is_closed() {
3942                    return Ok(());
3943                }
3944
3945                if self.crypto.is_handshaking() {
3946                    trace!("handshake ongoing");
3947                    return Ok(());
3948                }
3949
3950                if self.side.is_client() {
3951                    // Client-only because server params were set from the client's Initial
3952                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3953                        TransportError::new(
3954                            TransportErrorCode::crypto(0x6d),
3955                            "transport parameters missing".to_owned(),
3956                        )
3957                    })?;
3958
3959                    if self.has_0rtt() {
3960                        if !self.crypto.early_data_accepted().unwrap() {
3961                            debug_assert!(self.side.is_client());
3962                            debug!("0-RTT rejected");
3963                            self.accepted_0rtt = false;
3964                            self.streams.zero_rtt_rejected();
3965
3966                            // Discard already-queued frames
3967                            self.spaces[SpaceId::Data].pending = Retransmits::default();
3968
3969                            // Discard 0-RTT packets
3970                            let sent_packets = mem::take(
3971                                &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3972                            );
3973                            for (_, packet) in sent_packets.into_iter() {
3974                                self.paths
3975                                    .get_mut(&path_id)
3976                                    .unwrap()
3977                                    .remove_in_flight(&packet);
3978                            }
3979                        } else {
3980                            self.accepted_0rtt = true;
3981                            params.validate_resumption_from(&self.peer_params)?;
3982                        }
3983                    }
3984                    if let Some(token) = params.stateless_reset_token {
3985                        let remote = self.path_data(path_id).remote;
3986                        self.endpoint_events
3987                            .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
3988                    }
3989                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
3990                    self.issue_first_cids(now);
3991                } else {
3992                    // Server-only
3993                    self.spaces[SpaceId::Data].pending.handshake_done = true;
3994                    self.discard_space(now, SpaceId::Handshake);
3995                    self.events.push_back(Event::HandshakeConfirmed);
3996                    trace!("handshake confirmed");
3997                }
3998
3999                self.events.push_back(Event::Connected);
4000                self.state.move_to_established();
4001                trace!("established");
4002
4003                // Multipath can only be enabled after the state has reached Established.
4004                // So this can not happen any earlier.
4005                self.issue_first_path_cids(now);
4006                Ok(())
4007            }
4008            Header::Initial(InitialHeader {
4009                src_cid: rem_cid,
4010                dst_cid: loc_cid,
4011                ..
4012            }) => {
4013                debug_assert_eq!(path_id, PathId::ZERO);
4014                if !state.rem_cid_set {
4015                    trace!("switching remote CID to {}", rem_cid);
4016                    let mut state = state.clone();
4017                    self.rem_cids
4018                        .get_mut(&path_id)
4019                        .expect("PathId::ZERO not yet abandoned")
4020                        .update_initial_cid(rem_cid);
4021                    self.rem_handshake_cid = rem_cid;
4022                    self.orig_rem_cid = rem_cid;
4023                    state.rem_cid_set = true;
4024                    self.state.move_to_handshake(state);
4025                } else if rem_cid != self.rem_handshake_cid {
4026                    debug!(
4027                        "discarding packet with mismatched remote CID: {} != {}",
4028                        self.rem_handshake_cid, rem_cid
4029                    );
4030                    return Ok(());
4031                }
4032
4033                let starting_space = self.highest_space;
4034                self.process_early_payload(now, path_id, packet, qlog)?;
4035
4036                if self.side.is_server()
4037                    && starting_space == SpaceId::Initial
4038                    && self.highest_space != SpaceId::Initial
4039                {
4040                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4041                        TransportError::new(
4042                            TransportErrorCode::crypto(0x6d),
4043                            "transport parameters missing".to_owned(),
4044                        )
4045                    })?;
4046                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4047                    self.issue_first_cids(now);
4048                    self.init_0rtt(now);
4049                }
4050                Ok(())
4051            }
4052            Header::Long {
4053                ty: LongType::ZeroRtt,
4054                ..
4055            } => {
4056                self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4057                Ok(())
4058            }
4059            Header::VersionNegotiate { .. } => {
4060                if self.total_authed_packets > 1 {
4061                    return Ok(());
4062                }
4063                let supported = packet
4064                    .payload
4065                    .chunks(4)
4066                    .any(|x| match <[u8; 4]>::try_from(x) {
4067                        Ok(version) => self.version == u32::from_be_bytes(version),
4068                        Err(_) => false,
4069                    });
4070                if supported {
4071                    return Ok(());
4072                }
4073                debug!("remote doesn't support our version");
4074                Err(ConnectionError::VersionMismatch)
4075            }
4076            Header::Short { .. } => unreachable!(
4077                "short packets received during handshake are discarded in handle_packet"
4078            ),
4079        }
4080    }
4081
4082    /// Process an Initial or Handshake packet payload
4083    fn process_early_payload(
4084        &mut self,
4085        now: Instant,
4086        path_id: PathId,
4087        packet: Packet,
4088        #[allow(unused)] qlog: &mut QlogRecvPacket,
4089    ) -> Result<(), TransportError> {
4090        debug_assert_ne!(packet.header.space(), SpaceId::Data);
4091        debug_assert_eq!(path_id, PathId::ZERO);
4092        let payload_len = packet.payload.len();
4093        let mut ack_eliciting = false;
4094        for result in frame::Iter::new(packet.payload.freeze())? {
4095            let frame = result?;
4096            qlog.frame(&frame);
4097            let span = match frame {
4098                Frame::Padding => continue,
4099                _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4100            };
4101
4102            self.stats.frame_rx.record(&frame);
4103
4104            let _guard = span.as_ref().map(|x| x.enter());
4105            ack_eliciting |= frame.is_ack_eliciting();
4106
4107            // Process frames
4108            if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4109                return Err(TransportError::PROTOCOL_VIOLATION(
4110                    "illegal frame type in handshake",
4111                ));
4112            }
4113
4114            match frame {
4115                Frame::Padding | Frame::Ping => {}
4116                Frame::Crypto(frame) => {
4117                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
4118                }
4119                Frame::Ack(ack) => {
4120                    self.on_ack_received(now, packet.header.space(), ack)?;
4121                }
4122                Frame::PathAck(ack) => {
4123                    span.as_ref()
4124                        .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4125                    self.on_path_ack_received(now, packet.header.space(), ack)?;
4126                }
4127                Frame::Close(reason) => {
4128                    self.state.move_to_draining(Some(reason.into()));
4129                    return Ok(());
4130                }
4131                _ => {
4132                    let mut err =
4133                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4134                    err.frame = Some(frame.ty());
4135                    return Err(err);
4136                }
4137            }
4138        }
4139
4140        if ack_eliciting {
4141            // In the initial and handshake spaces, ACKs must be sent immediately
4142            self.spaces[packet.header.space()]
4143                .for_path(path_id)
4144                .pending_acks
4145                .set_immediate_ack_required();
4146        }
4147
4148        self.write_crypto();
4149        Ok(())
4150    }
4151
4152    /// Processes the packet payload, always in the data space.
4153    fn process_payload(
4154        &mut self,
4155        now: Instant,
4156        remote: SocketAddr,
4157        path_id: PathId,
4158        number: u64,
4159        packet: Packet,
4160        #[allow(unused)] qlog: &mut QlogRecvPacket,
4161    ) -> Result<(), TransportError> {
4162        let payload = packet.payload.freeze();
4163        let mut is_probing_packet = true;
4164        let mut close = None;
4165        let payload_len = payload.len();
4166        let mut ack_eliciting = false;
4167        // if this packet triggers a path migration and includes a observed address frame, it's
4168        // stored here
4169        let mut migration_observed_addr = None;
4170        for result in frame::Iter::new(payload)? {
4171            let frame = result?;
4172            qlog.frame(&frame);
4173            let span = match frame {
4174                Frame::Padding => continue,
4175                _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4176            };
4177
4178            self.stats.frame_rx.record(&frame);
4179            // Crypto, Stream and Datagram frames are special cased in order no pollute
4180            // the log with payload data
4181            match &frame {
4182                Frame::Crypto(f) => {
4183                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4184                }
4185                Frame::Stream(f) => {
4186                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4187                }
4188                Frame::Datagram(f) => {
4189                    trace!(len = f.data.len(), "got datagram frame");
4190                }
4191                f => {
4192                    trace!("got frame {f}");
4193                }
4194            }
4195
4196            let _guard = span.enter();
4197            if packet.header.is_0rtt() {
4198                match frame {
4199                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4200                        return Err(TransportError::PROTOCOL_VIOLATION(
4201                            "illegal frame type in 0-RTT",
4202                        ));
4203                    }
4204                    _ => {
4205                        if frame.is_1rtt() {
4206                            return Err(TransportError::PROTOCOL_VIOLATION(
4207                                "illegal frame type in 0-RTT",
4208                            ));
4209                        }
4210                    }
4211                }
4212            }
4213            ack_eliciting |= frame.is_ack_eliciting();
4214
4215            // Check whether this could be a probing packet
4216            match frame {
4217                Frame::Padding
4218                | Frame::PathChallenge(_)
4219                | Frame::PathResponse(_)
4220                | Frame::NewConnectionId(_)
4221                | Frame::ObservedAddr(_) => {}
4222                _ => {
4223                    is_probing_packet = false;
4224                }
4225            }
4226
4227            match frame {
4228                Frame::Crypto(frame) => {
4229                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4230                }
4231                Frame::Stream(frame) => {
4232                    if self.streams.received(frame, payload_len)?.should_transmit() {
4233                        self.spaces[SpaceId::Data].pending.max_data = true;
4234                    }
4235                }
4236                Frame::Ack(ack) => {
4237                    self.on_ack_received(now, SpaceId::Data, ack)?;
4238                }
4239                Frame::PathAck(ack) => {
4240                    span.record("path", tracing::field::debug(&ack.path_id));
4241                    self.on_path_ack_received(now, SpaceId::Data, ack)?;
4242                }
4243                Frame::Padding | Frame::Ping => {}
4244                Frame::Close(reason) => {
4245                    close = Some(reason);
4246                }
4247                Frame::PathChallenge(challenge) => {
4248                    let path = &mut self
4249                        .path_mut(path_id)
4250                        .expect("payload is processed only after the path becomes known");
4251                    path.path_responses.push(number, challenge.0, remote);
4252                    if remote == path.remote {
4253                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
4254                        // attack. Send a non-probing packet to recover the active path.
4255                        // TODO(flub): No longer true! We now path_challege also to validate
4256                        //    the path if the path is new, without an RFC9000-style
4257                        //    migration involved. This means we add in an extra
4258                        //    IMMEDIATE_ACK on some challenges. It isn't really wrong to do
4259                        //    so, but it still is something untidy. We should instead
4260                        //    suppress this when we know the remote is still validating the
4261                        //    path.
4262                        match self.peer_supports_ack_frequency() {
4263                            true => self.immediate_ack(path_id),
4264                            false => {
4265                                self.ping_path(path_id).ok();
4266                            }
4267                        }
4268                    }
4269                }
4270                Frame::PathResponse(response) => {
4271                    let path = self
4272                        .paths
4273                        .get_mut(&path_id)
4274                        .expect("payload is processed only after the path becomes known");
4275
4276                    use PathTimer::*;
4277                    use paths::OnPathResponseReceived::*;
4278                    match path.data.on_path_response_received(now, response.0, remote) {
4279                        OnPath { was_open } => {
4280                            let qlog = self.qlog.with_time(now);
4281
4282                            self.timers
4283                                .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4284                            self.timers
4285                                .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4286
4287                            let next_challenge = path
4288                                .data
4289                                .earliest_expiring_challenge()
4290                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4291                            self.timers.set_or_stop(
4292                                Timer::PerPath(path_id, PathChallengeLost),
4293                                next_challenge,
4294                                qlog,
4295                            );
4296
4297                            if !was_open {
4298                                self.events
4299                                    .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4300                                if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4301                                {
4302                                    self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4303                                        id: path_id,
4304                                        addr: observed.socket_addr(),
4305                                    }));
4306                                }
4307                            }
4308                            if let Some((_, ref mut prev)) = path.prev {
4309                                prev.challenges_sent.clear();
4310                                prev.send_new_challenge = false;
4311                            }
4312                        }
4313                        OffPath => {
4314                            debug!("Response to off-path PathChallenge!");
4315                            let next_challenge = path
4316                                .data
4317                                .earliest_expiring_challenge()
4318                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4319                            self.timers.set_or_stop(
4320                                Timer::PerPath(path_id, PathChallengeLost),
4321                                next_challenge,
4322                                self.qlog.with_time(now),
4323                            );
4324                        }
4325                        Invalid { expected } => {
4326                            debug!(%response, from=%remote, %expected, "ignoring invalid PATH_RESPONSE")
4327                        }
4328                        Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4329                    }
4330                }
4331                Frame::MaxData(bytes) => {
4332                    self.streams.received_max_data(bytes);
4333                }
4334                Frame::MaxStreamData { id, offset } => {
4335                    self.streams.received_max_stream_data(id, offset)?;
4336                }
4337                Frame::MaxStreams { dir, count } => {
4338                    self.streams.received_max_streams(dir, count)?;
4339                }
4340                Frame::ResetStream(frame) => {
4341                    if self.streams.received_reset(frame)?.should_transmit() {
4342                        self.spaces[SpaceId::Data].pending.max_data = true;
4343                    }
4344                }
4345                Frame::DataBlocked { offset } => {
4346                    debug!(offset, "peer claims to be blocked at connection level");
4347                }
4348                Frame::StreamDataBlocked { id, offset } => {
4349                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4350                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4351                        return Err(TransportError::STREAM_STATE_ERROR(
4352                            "STREAM_DATA_BLOCKED on send-only stream",
4353                        ));
4354                    }
4355                    debug!(
4356                        stream = %id,
4357                        offset, "peer claims to be blocked at stream level"
4358                    );
4359                }
4360                Frame::StreamsBlocked { dir, limit } => {
4361                    if limit > MAX_STREAM_COUNT {
4362                        return Err(TransportError::FRAME_ENCODING_ERROR(
4363                            "unrepresentable stream limit",
4364                        ));
4365                    }
4366                    debug!(
4367                        "peer claims to be blocked opening more than {} {} streams",
4368                        limit, dir
4369                    );
4370                }
4371                Frame::StopSending(frame::StopSending { id, error_code }) => {
4372                    if id.initiator() != self.side.side() {
4373                        if id.dir() == Dir::Uni {
4374                            debug!("got STOP_SENDING on recv-only {}", id);
4375                            return Err(TransportError::STREAM_STATE_ERROR(
4376                                "STOP_SENDING on recv-only stream",
4377                            ));
4378                        }
4379                    } else if self.streams.is_local_unopened(id) {
4380                        return Err(TransportError::STREAM_STATE_ERROR(
4381                            "STOP_SENDING on unopened stream",
4382                        ));
4383                    }
4384                    self.streams.received_stop_sending(id, error_code);
4385                }
4386                Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4387                    if let Some(ref path_id) = path_id {
4388                        span.record("path", tracing::field::debug(&path_id));
4389                    }
4390                    let path_id = path_id.unwrap_or_default();
4391                    match self.local_cid_state.get_mut(&path_id) {
4392                        None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4393                        Some(cid_state) => {
4394                            let allow_more_cids = cid_state
4395                                .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4396
4397                            // If the path has closed, we do not issue more CIDs for this path
4398                            // For details see  https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.2.2
4399                            // > an endpoint SHOULD provide new connection IDs for that path, if still open, using PATH_NEW_CONNECTION_ID frames.
4400                            let has_path = !self.abandoned_paths.contains(&path_id);
4401                            let allow_more_cids = allow_more_cids && has_path;
4402
4403                            self.endpoint_events
4404                                .push_back(EndpointEventInner::RetireConnectionId(
4405                                    now,
4406                                    path_id,
4407                                    sequence,
4408                                    allow_more_cids,
4409                                ));
4410                        }
4411                    }
4412                }
4413                Frame::NewConnectionId(frame) => {
4414                    let path_id = if let Some(path_id) = frame.path_id {
4415                        if !self.is_multipath_negotiated() {
4416                            return Err(TransportError::PROTOCOL_VIOLATION(
4417                                "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4418                            ));
4419                        }
4420                        if path_id > self.local_max_path_id {
4421                            return Err(TransportError::PROTOCOL_VIOLATION(
4422                                "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4423                            ));
4424                        }
4425                        path_id
4426                    } else {
4427                        PathId::ZERO
4428                    };
4429
4430                    if self.abandoned_paths.contains(&path_id) {
4431                        trace!("ignoring issued CID for abandoned path");
4432                        continue;
4433                    }
4434                    if let Some(ref path_id) = frame.path_id {
4435                        span.record("path", tracing::field::debug(&path_id));
4436                    }
4437                    let rem_cids = self
4438                        .rem_cids
4439                        .entry(path_id)
4440                        .or_insert_with(|| CidQueue::new(frame.id));
4441                    if rem_cids.active().is_empty() {
4442                        return Err(TransportError::PROTOCOL_VIOLATION(
4443                            "NEW_CONNECTION_ID when CIDs aren't in use",
4444                        ));
4445                    }
4446                    if frame.retire_prior_to > frame.sequence {
4447                        return Err(TransportError::PROTOCOL_VIOLATION(
4448                            "NEW_CONNECTION_ID retiring unissued CIDs",
4449                        ));
4450                    }
4451
4452                    use crate::cid_queue::InsertError;
4453                    match rem_cids.insert(frame) {
4454                        Ok(None) if self.path(path_id).is_none() => {
4455                            // if this gives us CIDs to open a new path and a nat traversal attempt
4456                            // is underway we could try to probe a pending remote
4457                            self.continue_nat_traversal_round(now);
4458                        }
4459                        Ok(None) => {}
4460                        Ok(Some((retired, reset_token))) => {
4461                            let pending_retired =
4462                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
4463                            /// Ensure `pending_retired` cannot grow without bound. Limit is
4464                            /// somewhat arbitrary but very permissive.
4465                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4466                            // We don't bother counting in-flight frames because those are bounded
4467                            // by congestion control.
4468                            if (pending_retired.len() as u64)
4469                                .saturating_add(retired.end.saturating_sub(retired.start))
4470                                > MAX_PENDING_RETIRED_CIDS
4471                            {
4472                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4473                                    "queued too many retired CIDs",
4474                                ));
4475                            }
4476                            pending_retired.extend(retired.map(|seq| (path_id, seq)));
4477                            self.set_reset_token(path_id, remote, reset_token);
4478                        }
4479                        Err(InsertError::ExceedsLimit) => {
4480                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4481                        }
4482                        Err(InsertError::Retired) => {
4483                            trace!("discarding already-retired");
4484                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
4485                            // range of connection IDs larger than the active connection ID limit
4486                            // was retired all at once via retire_prior_to.
4487                            self.spaces[SpaceId::Data]
4488                                .pending
4489                                .retire_cids
4490                                .push((path_id, frame.sequence));
4491                            continue;
4492                        }
4493                    };
4494
4495                    if self.side.is_server()
4496                        && path_id == PathId::ZERO
4497                        && self
4498                            .rem_cids
4499                            .get(&PathId::ZERO)
4500                            .map(|cids| cids.active_seq() == 0)
4501                            .unwrap_or_default()
4502                    {
4503                        // We're a server still using the initial remote CID for the client, so
4504                        // let's switch immediately to enable clientside stateless resets.
4505                        self.update_rem_cid(PathId::ZERO);
4506                    }
4507                }
4508                Frame::NewToken(NewToken { token }) => {
4509                    let ConnectionSide::Client {
4510                        token_store,
4511                        server_name,
4512                        ..
4513                    } = &self.side
4514                    else {
4515                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4516                    };
4517                    if token.is_empty() {
4518                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4519                    }
4520                    trace!("got new token");
4521                    token_store.insert(server_name, token);
4522                }
4523                Frame::Datagram(datagram) => {
4524                    if self
4525                        .datagrams
4526                        .received(datagram, &self.config.datagram_receive_buffer_size)?
4527                    {
4528                        self.events.push_back(Event::DatagramReceived);
4529                    }
4530                }
4531                Frame::AckFrequency(ack_frequency) => {
4532                    // This frame can only be sent in the Data space
4533
4534                    if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4535                        // The AckFrequency frame is stale (we have already received a more
4536                        // recent one)
4537                        continue;
4538                    }
4539
4540                    // Update the params for all of our paths
4541                    for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4542                        space.pending_acks.set_ack_frequency_params(&ack_frequency);
4543
4544                        // Our `max_ack_delay` has been updated, so we may need to adjust
4545                        // its associated timeout
4546                        if let Some(timeout) = space
4547                            .pending_acks
4548                            .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4549                        {
4550                            self.timers.set(
4551                                Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4552                                timeout,
4553                                self.qlog.with_time(now),
4554                            );
4555                        }
4556                    }
4557                }
4558                Frame::ImmediateAck => {
4559                    // This frame can only be sent in the Data space
4560                    for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4561                        pns.pending_acks.set_immediate_ack_required();
4562                    }
4563                }
4564                Frame::HandshakeDone => {
4565                    if self.side.is_server() {
4566                        return Err(TransportError::PROTOCOL_VIOLATION(
4567                            "client sent HANDSHAKE_DONE",
4568                        ));
4569                    }
4570                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
4571                        self.discard_space(now, SpaceId::Handshake);
4572                    }
4573                    self.events.push_back(Event::HandshakeConfirmed);
4574                    trace!("handshake confirmed");
4575                }
4576                Frame::ObservedAddr(observed) => {
4577                    // check if params allows the peer to send report and this node to receive it
4578                    trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4579                    if !self
4580                        .peer_params
4581                        .address_discovery_role
4582                        .should_report(&self.config.address_discovery_role)
4583                    {
4584                        return Err(TransportError::PROTOCOL_VIOLATION(
4585                            "received OBSERVED_ADDRESS frame when not negotiated",
4586                        ));
4587                    }
4588                    // must only be sent in data space
4589                    if packet.header.space() != SpaceId::Data {
4590                        return Err(TransportError::PROTOCOL_VIOLATION(
4591                            "OBSERVED_ADDRESS frame outside data space",
4592                        ));
4593                    }
4594
4595                    let path = self.path_data_mut(path_id);
4596                    if remote == path.remote {
4597                        if let Some(updated) = path.update_observed_addr_report(observed) {
4598                            if path.open {
4599                                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4600                                    id: path_id,
4601                                    addr: updated,
4602                                }));
4603                            }
4604                            // otherwise the event is reported when the path is deemed open
4605                        }
4606                    } else {
4607                        // include in migration
4608                        migration_observed_addr = Some(observed)
4609                    }
4610                }
4611                Frame::PathAbandon(frame::PathAbandon {
4612                    path_id,
4613                    error_code,
4614                }) => {
4615                    span.record("path", tracing::field::debug(&path_id));
4616                    // TODO(flub): don't really know which error code to use here.
4617                    match self.close_path(now, path_id, error_code.into()) {
4618                        Ok(()) => {
4619                            trace!("peer abandoned path");
4620                        }
4621                        Err(ClosePathError::LastOpenPath) => {
4622                            trace!("peer abandoned last path, closing connection");
4623                            // TODO(flub): which error code?
4624                            return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4625                        }
4626                        Err(ClosePathError::ClosedPath) => {
4627                            trace!("peer abandoned already closed path");
4628                        }
4629                    };
4630                    // If we receive a retransmit of PATH_ABANDON then we may already have
4631                    // abandoned this path locally.  In that case the DiscardPath timer
4632                    // may already have fired and we no longer have any state for this path.
4633                    // Only set this timer if we still have path state.
4634                    if let Some(path) = self.paths.get_mut(&path_id) {
4635                        if !matches!(path.data.abandon_state, AbandonState::ReceivedPathAbandon) {
4636                            let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
4637                            let pto = path.data.rtt.pto_base() + ack_delay;
4638                            self.timers.set(
4639                                Timer::PerPath(path_id, PathTimer::DiscardPath),
4640                                now + 3 * pto,
4641                                self.qlog.with_time(now),
4642                            );
4643                            // We received a PATH_ABANDON, we don't expect another one by a certain time.
4644                            path.data.abandon_state = AbandonState::ReceivedPathAbandon;
4645                        }
4646                    }
4647                }
4648                Frame::PathStatusAvailable(info) => {
4649                    span.record("path", tracing::field::debug(&info.path_id));
4650                    if self.is_multipath_negotiated() {
4651                        self.on_path_status(
4652                            info.path_id,
4653                            PathStatus::Available,
4654                            info.status_seq_no,
4655                        );
4656                    } else {
4657                        return Err(TransportError::PROTOCOL_VIOLATION(
4658                            "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4659                        ));
4660                    }
4661                }
4662                Frame::PathStatusBackup(info) => {
4663                    span.record("path", tracing::field::debug(&info.path_id));
4664                    if self.is_multipath_negotiated() {
4665                        self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4666                    } else {
4667                        return Err(TransportError::PROTOCOL_VIOLATION(
4668                            "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4669                        ));
4670                    }
4671                }
4672                Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4673                    span.record("path", tracing::field::debug(&path_id));
4674                    if !self.is_multipath_negotiated() {
4675                        return Err(TransportError::PROTOCOL_VIOLATION(
4676                            "received MAX_PATH_ID frame when multipath was not negotiated",
4677                        ));
4678                    }
4679                    // frames that do not increase the path id are ignored
4680                    if path_id > self.remote_max_path_id {
4681                        self.remote_max_path_id = path_id;
4682                        self.issue_first_path_cids(now);
4683                        while let Some(true) = self.continue_nat_traversal_round(now) {}
4684                    }
4685                }
4686                Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4687                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4688                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4689                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4690                    if self.is_multipath_negotiated() {
4691                        if self.local_max_path_id > max_path_id {
4692                            return Err(TransportError::PROTOCOL_VIOLATION(
4693                                "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4694                            ));
4695                        }
4696                        debug!("received PATHS_BLOCKED({:?})", max_path_id);
4697                        // TODO(@divma): ensure max concurrent paths
4698                    } else {
4699                        return Err(TransportError::PROTOCOL_VIOLATION(
4700                            "received PATHS_BLOCKED frame when not multipath was not negotiated",
4701                        ));
4702                    }
4703                }
4704                Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4705                    // Nothing to do.  This is recorded in the frame stats, but otherwise we
4706                    // always issue all CIDs we're allowed to issue, so either this is an
4707                    // impatient peer or a bug on our side.
4708
4709                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4710                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4711                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4712                    if self.is_multipath_negotiated() {
4713                        if path_id > self.local_max_path_id {
4714                            return Err(TransportError::PROTOCOL_VIOLATION(
4715                                "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4716                            ));
4717                        }
4718                        if next_seq.0
4719                            > self
4720                                .local_cid_state
4721                                .get(&path_id)
4722                                .map(|cid_state| cid_state.active_seq().1 + 1)
4723                                .unwrap_or_default()
4724                        {
4725                            return Err(TransportError::PROTOCOL_VIOLATION(
4726                                "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4727                            ));
4728                        }
4729                        debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4730                    } else {
4731                        return Err(TransportError::PROTOCOL_VIOLATION(
4732                            "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4733                        ));
4734                    }
4735                }
4736                Frame::AddAddress(addr) => {
4737                    let client_state = match self.iroh_hp.client_side_mut() {
4738                        Ok(state) => state,
4739                        Err(err) => {
4740                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4741                                "Nat traversal(ADD_ADDRESS): {err}"
4742                            )));
4743                        }
4744                    };
4745
4746                    if !client_state.check_remote_address(&addr) {
4747                        // if the address is not valid we flag it, but update anyway
4748                        warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4749                    }
4750
4751                    match client_state.add_remote_address(addr) {
4752                        Ok(maybe_added) => {
4753                            if let Some(added) = maybe_added {
4754                                self.events.push_back(Event::NatTraversal(
4755                                    iroh_hp::Event::AddressAdded(added),
4756                                ));
4757                            }
4758                        }
4759                        Err(e) => {
4760                            warn!(%e, "failed to add remote address")
4761                        }
4762                    }
4763                }
4764                Frame::RemoveAddress(addr) => {
4765                    let client_state = match self.iroh_hp.client_side_mut() {
4766                        Ok(state) => state,
4767                        Err(err) => {
4768                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4769                                "Nat traversal(REMOVE_ADDRESS): {err}"
4770                            )));
4771                        }
4772                    };
4773                    if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4774                        self.events
4775                            .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4776                                removed_addr,
4777                            )));
4778                    }
4779                }
4780                Frame::ReachOut(reach_out) => {
4781                    let server_state = match self.iroh_hp.server_side_mut() {
4782                        Ok(state) => state,
4783                        Err(err) => {
4784                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4785                                "Nat traversal(REACH_OUT): {err}"
4786                            )));
4787                        }
4788                    };
4789
4790                    if let Err(err) = server_state.handle_reach_out(reach_out) {
4791                        return Err(TransportError::PROTOCOL_VIOLATION(format!(
4792                            "Nat traversal(REACH_OUT): {err}"
4793                        )));
4794                    }
4795                }
4796            }
4797        }
4798
4799        let space = self.spaces[SpaceId::Data].for_path(path_id);
4800        if space
4801            .pending_acks
4802            .packet_received(now, number, ack_eliciting, &space.dedup)
4803        {
4804            if self.abandoned_paths.contains(&path_id) {
4805                // § 3.4.3 QUIC-MULTIPATH: promptly send ACKs for packets received from
4806                // abandoned paths.
4807                space.pending_acks.set_immediate_ack_required();
4808            } else {
4809                self.timers.set(
4810                    Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4811                    now + self.ack_frequency.max_ack_delay,
4812                    self.qlog.with_time(now),
4813                );
4814            }
4815        }
4816
4817        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
4818        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
4819        // are only freed, and hence only issue credit, once the application has been notified
4820        // during a read on the stream.
4821        let pending = &mut self.spaces[SpaceId::Data].pending;
4822        self.streams.queue_max_stream_id(pending);
4823
4824        if let Some(reason) = close {
4825            self.state.move_to_draining(Some(reason.into()));
4826            self.close = true;
4827        }
4828
4829        if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4830            && !is_probing_packet
4831            && remote != self.path_data(path_id).remote
4832        {
4833            let ConnectionSide::Server { ref server_config } = self.side else {
4834                panic!("packets from unknown remote should be dropped by clients");
4835            };
4836            debug_assert!(
4837                server_config.migration,
4838                "migration-initiating packets should have been dropped immediately"
4839            );
4840            self.migrate(path_id, now, remote, migration_observed_addr);
4841            // Break linkability, if possible
4842            self.update_rem_cid(path_id);
4843            self.spin = false;
4844        }
4845
4846        Ok(())
4847    }
4848
4849    fn migrate(
4850        &mut self,
4851        path_id: PathId,
4852        now: Instant,
4853        remote: SocketAddr,
4854        observed_addr: Option<ObservedAddr>,
4855    ) {
4856        trace!(%remote, %path_id, "migration initiated");
4857        self.path_counter = self.path_counter.wrapping_add(1);
4858        // TODO(@divma): conditions for path migration in multipath are very specific, check them
4859        // again to prevent path migrations that should actually create a new path
4860
4861        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
4862        // Note that the congestion window will not grow until validation terminates. Helps mitigate
4863        // amplification attacks performed by spoofing source addresses.
4864        let prev_pto = self.pto(SpaceId::Data, path_id);
4865        let known_path = self.paths.get_mut(&path_id).expect("known path");
4866        let path = &mut known_path.data;
4867        let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4868            PathData::from_previous(remote, path, self.path_counter, now)
4869        } else {
4870            let peer_max_udp_payload_size =
4871                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4872                    .unwrap_or(u16::MAX);
4873            PathData::new(
4874                remote,
4875                self.allow_mtud,
4876                Some(peer_max_udp_payload_size),
4877                self.path_counter,
4878                now,
4879                &self.config,
4880            )
4881        };
4882        new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4883        if let Some(report) = observed_addr {
4884            if let Some(updated) = new_path.update_observed_addr_report(report) {
4885                tracing::info!("adding observed addr event from migration");
4886                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4887                    id: path_id,
4888                    addr: updated,
4889                }));
4890            }
4891        }
4892        new_path.send_new_challenge = true;
4893
4894        let mut prev = mem::replace(path, new_path);
4895        // Don't clobber the original path if the previous one hasn't been validated yet
4896        if !prev.is_validating_path() {
4897            prev.send_new_challenge = true;
4898            // We haven't updated the remote CID yet, this captures the remote CID we were using on
4899            // the previous path.
4900
4901            known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4902        }
4903
4904        // We need to re-assign the correct remote to this path in qlog
4905        self.qlog.emit_tuple_assigned(path_id, remote, now);
4906
4907        self.timers.set(
4908            Timer::PerPath(path_id, PathTimer::PathValidation),
4909            now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4910            self.qlog.with_time(now),
4911        );
4912    }
4913
4914    /// Handle a change in the local address, i.e. an active migration
4915    pub fn local_address_changed(&mut self) {
4916        // TODO(flub): if multipath is enabled this needs to create a new path entirely.
4917        self.update_rem_cid(PathId::ZERO);
4918        self.ping();
4919    }
4920
4921    /// Switch to a previously unused remote connection ID, if possible
4922    fn update_rem_cid(&mut self, path_id: PathId) {
4923        let Some((reset_token, retired)) =
4924            self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4925        else {
4926            return;
4927        };
4928
4929        // Retire the current remote CID and any CIDs we had to skip.
4930        self.spaces[SpaceId::Data]
4931            .pending
4932            .retire_cids
4933            .extend(retired.map(|seq| (path_id, seq)));
4934        let remote = self.path_data(path_id).remote;
4935        self.set_reset_token(path_id, remote, reset_token);
4936    }
4937
4938    /// Sends this reset token to the endpoint
4939    ///
4940    /// The endpoint needs to know the reset tokens issued by the peer, so that if the peer
4941    /// sends a reset token it knows to route it to this connection. See RFC 9000 section
4942    /// 10.3. Stateless Reset.
4943    ///
4944    /// Reset tokens are different for each path, the endpoint identifies paths by peer
4945    /// socket address however, not by path ID.
4946    fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4947        self.endpoint_events
4948            .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4949
4950        // During the handshake the server sends a reset token in the transport
4951        // parameters. When we are the client and we receive the reset token during the
4952        // handshake we want this to affect our peer transport parameters.
4953        // TODO(flub): Pretty sure this is pointless, the entire params is overwritten
4954        //    shortly after this was called.  And then the params don't have this anymore.
4955        if path_id == PathId::ZERO {
4956            self.peer_params.stateless_reset_token = Some(reset_token);
4957        }
4958    }
4959
4960    /// Issue an initial set of connection IDs to the peer upon connection
4961    fn issue_first_cids(&mut self, now: Instant) {
4962        if self
4963            .local_cid_state
4964            .get(&PathId::ZERO)
4965            .expect("PathId::ZERO exists when the connection is created")
4966            .cid_len()
4967            == 0
4968        {
4969            return;
4970        }
4971
4972        // Subtract 1 to account for the CID we supplied while handshaking
4973        let mut n = self.peer_params.issue_cids_limit() - 1;
4974        if let ConnectionSide::Server { server_config } = &self.side {
4975            if server_config.has_preferred_address() {
4976                // We also sent a CID in the transport parameters
4977                n -= 1;
4978            }
4979        }
4980        self.endpoint_events
4981            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
4982    }
4983
4984    /// Issues an initial set of CIDs for paths that have not yet had any CIDs issued
4985    ///
4986    /// Later CIDs are issued when CIDs expire or are retired by the peer.
4987    fn issue_first_path_cids(&mut self, now: Instant) {
4988        if let Some(max_path_id) = self.max_path_id() {
4989            let mut path_id = self.max_path_id_with_cids.next();
4990            while path_id <= max_path_id {
4991                self.endpoint_events
4992                    .push_back(EndpointEventInner::NeedIdentifiers(
4993                        path_id,
4994                        now,
4995                        self.peer_params.issue_cids_limit(),
4996                    ));
4997                path_id = path_id.next();
4998            }
4999            self.max_path_id_with_cids = max_path_id;
5000        }
5001    }
5002
5003    /// Populates a packet with frames
5004    ///
5005    /// This tries to fit as many frames as possible into the packet.
5006    ///
5007    /// *path_exclusive_only* means to only build frames which can only be sent on this
5008    /// *path.  This is used in multipath for backup paths while there is still an active
5009    /// *path.
5010    fn populate_packet(
5011        &mut self,
5012        now: Instant,
5013        space_id: SpaceId,
5014        path_id: PathId,
5015        path_exclusive_only: bool,
5016        buf: &mut impl BufMut,
5017        pn: u64,
5018        #[allow(unused)] qlog: &mut QlogSentPacket,
5019    ) -> SentFrames {
5020        let mut sent = SentFrames::default();
5021        let is_multipath_negotiated = self.is_multipath_negotiated();
5022        let space = &mut self.spaces[space_id];
5023        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5024        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5025        space
5026            .for_path(path_id)
5027            .pending_acks
5028            .maybe_ack_non_eliciting();
5029
5030        // HANDSHAKE_DONE
5031        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5032            trace!("HANDSHAKE_DONE");
5033            buf.write(frame::FrameType::HANDSHAKE_DONE);
5034            qlog.frame(&Frame::HandshakeDone);
5035            sent.retransmits.get_or_create().handshake_done = true;
5036            // This is just a u8 counter and the frame is typically just sent once
5037            self.stats.frame_tx.handshake_done =
5038                self.stats.frame_tx.handshake_done.saturating_add(1);
5039        }
5040
5041        // REACH_OUT
5042        // TODO(@divma): path explusive considerations
5043        if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5044            while let Some(local_addr) = addresses.pop() {
5045                let reach_out = frame::ReachOut::new(*round, local_addr);
5046                if buf.remaining_mut() > reach_out.size() {
5047                    trace!(%round, ?local_addr, "REACH_OUT");
5048                    reach_out.write(buf);
5049                    let sent_reachouts = sent
5050                        .retransmits
5051                        .get_or_create()
5052                        .reach_out
5053                        .get_or_insert_with(|| (*round, Default::default()));
5054                    sent_reachouts.1.push(local_addr);
5055                    self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5056                    qlog.frame(&Frame::ReachOut(reach_out));
5057                } else {
5058                    addresses.push(local_addr);
5059                    break;
5060                }
5061            }
5062            if addresses.is_empty() {
5063                space.pending.reach_out = None;
5064            }
5065        }
5066
5067        // OBSERVED_ADDR
5068        if !path_exclusive_only
5069            && space_id == SpaceId::Data
5070            && self
5071                .config
5072                .address_discovery_role
5073                .should_report(&self.peer_params.address_discovery_role)
5074            && (!path.observed_addr_sent || space.pending.observed_addr)
5075        {
5076            let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5077            if buf.remaining_mut() > frame.size() {
5078                trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5079                frame.write(buf);
5080
5081                self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5082                path.observed_addr_sent = true;
5083
5084                self.stats.frame_tx.observed_addr += 1;
5085                sent.retransmits.get_or_create().observed_addr = true;
5086                space.pending.observed_addr = false;
5087                qlog.frame(&Frame::ObservedAddr(frame));
5088            }
5089        }
5090
5091        // PING
5092        if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5093            trace!("PING");
5094            buf.write(frame::FrameType::PING);
5095            sent.non_retransmits = true;
5096            self.stats.frame_tx.ping += 1;
5097            qlog.frame(&Frame::Ping);
5098        }
5099
5100        // IMMEDIATE_ACK
5101        if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5102            debug_assert_eq!(
5103                space_id,
5104                SpaceId::Data,
5105                "immediate acks must be sent in the data space"
5106            );
5107            trace!("IMMEDIATE_ACK");
5108            buf.write(frame::FrameType::IMMEDIATE_ACK);
5109            sent.non_retransmits = true;
5110            self.stats.frame_tx.immediate_ack += 1;
5111            qlog.frame(&Frame::ImmediateAck);
5112        }
5113
5114        // ACK
5115        // TODO(flub): Should this send acks for this path anyway?
5116
5117        if !path_exclusive_only {
5118            for path_id in space
5119                .number_spaces
5120                .iter_mut()
5121                .filter(|(_, pns)| pns.pending_acks.can_send())
5122                .map(|(&path_id, _)| path_id)
5123                .collect::<Vec<_>>()
5124            {
5125                Self::populate_acks(
5126                    now,
5127                    self.receiving_ecn,
5128                    &mut sent,
5129                    path_id,
5130                    space_id,
5131                    space,
5132                    is_multipath_negotiated,
5133                    buf,
5134                    &mut self.stats,
5135                    qlog,
5136                );
5137            }
5138        }
5139
5140        // ACK_FREQUENCY
5141        if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5142            let sequence_number = self.ack_frequency.next_sequence_number();
5143
5144            // Safe to unwrap because this is always provided when ACK frequency is enabled
5145            let config = self.config.ack_frequency_config.as_ref().unwrap();
5146
5147            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
5148            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5149                path.rtt.get(),
5150                config,
5151                &self.peer_params,
5152            );
5153
5154            trace!(?max_ack_delay, "ACK_FREQUENCY");
5155
5156            let frame = frame::AckFrequency {
5157                sequence: sequence_number,
5158                ack_eliciting_threshold: config.ack_eliciting_threshold,
5159                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5160                reordering_threshold: config.reordering_threshold,
5161            };
5162            frame.encode(buf);
5163            qlog.frame(&Frame::AckFrequency(frame));
5164
5165            sent.retransmits.get_or_create().ack_frequency = true;
5166
5167            self.ack_frequency
5168                .ack_frequency_sent(path_id, pn, max_ack_delay);
5169            self.stats.frame_tx.ack_frequency += 1;
5170        }
5171
5172        // PATH_CHALLENGE
5173        if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5174            && space_id == SpaceId::Data
5175            && path.send_new_challenge
5176            && !self.state.is_closed()
5177        // we don't want to send new challenges if we are already closing
5178        {
5179            path.send_new_challenge = false;
5180
5181            // Generate a new challenge every time we send a new PATH_CHALLENGE
5182            let token = self.rng.random();
5183            let info = paths::SentChallengeInfo {
5184                sent_instant: now,
5185                remote: path.remote,
5186            };
5187            path.challenges_sent.insert(token, info);
5188            sent.non_retransmits = true;
5189            sent.requires_padding = true;
5190            let challenge = frame::PathChallenge(token);
5191            trace!(frame = %challenge);
5192            buf.write(challenge);
5193            qlog.frame(&Frame::PathChallenge(challenge));
5194            self.stats.frame_tx.path_challenge += 1;
5195            let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5196            self.timers.set(
5197                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5198                now + pto,
5199                self.qlog.with_time(now),
5200            );
5201
5202            if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5203                // queue informing the path status along with the challenge
5204                space.pending.path_status.insert(path_id);
5205            }
5206
5207            // Always include an OBSERVED_ADDR frame with a PATH_CHALLENGE, regardless
5208            // of whether one has already been sent on this path.
5209            if space_id == SpaceId::Data
5210                && self
5211                    .config
5212                    .address_discovery_role
5213                    .should_report(&self.peer_params.address_discovery_role)
5214            {
5215                let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5216                if buf.remaining_mut() > frame.size() {
5217                    frame.write(buf);
5218                    qlog.frame(&Frame::ObservedAddr(frame));
5219
5220                    self.next_observed_addr_seq_no =
5221                        self.next_observed_addr_seq_no.saturating_add(1u8);
5222                    path.observed_addr_sent = true;
5223
5224                    self.stats.frame_tx.observed_addr += 1;
5225                    sent.retransmits.get_or_create().observed_addr = true;
5226                    space.pending.observed_addr = false;
5227                }
5228            }
5229        }
5230
5231        // PATH_RESPONSE
5232        if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5233            if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5234                sent.non_retransmits = true;
5235                sent.requires_padding = true;
5236                let response = frame::PathResponse(token);
5237                trace!(frame = %response);
5238                buf.write(response);
5239                qlog.frame(&Frame::PathResponse(response));
5240                self.stats.frame_tx.path_response += 1;
5241
5242                // NOTE: this is technically not required but might be useful to ride the
5243                // request/response nature of path challenges to refresh an observation
5244                // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
5245                if space_id == SpaceId::Data
5246                    && self
5247                        .config
5248                        .address_discovery_role
5249                        .should_report(&self.peer_params.address_discovery_role)
5250                {
5251                    let frame =
5252                        frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5253                    if buf.remaining_mut() > frame.size() {
5254                        frame.write(buf);
5255                        qlog.frame(&Frame::ObservedAddr(frame));
5256
5257                        self.next_observed_addr_seq_no =
5258                            self.next_observed_addr_seq_no.saturating_add(1u8);
5259                        path.observed_addr_sent = true;
5260
5261                        self.stats.frame_tx.observed_addr += 1;
5262                        sent.retransmits.get_or_create().observed_addr = true;
5263                        space.pending.observed_addr = false;
5264                    }
5265                }
5266            }
5267        }
5268
5269        // CRYPTO
5270        while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5271            let mut frame = match space.pending.crypto.pop_front() {
5272                Some(x) => x,
5273                None => break,
5274            };
5275
5276            // Calculate the maximum amount of crypto data we can store in the buffer.
5277            // Since the offset is known, we can reserve the exact size required to encode it.
5278            // For length we reserve 2bytes which allows to encode up to 2^14,
5279            // which is more than what fits into normally sized QUIC frames.
5280            let max_crypto_data_size = buf.remaining_mut()
5281                - 1 // Frame Type
5282                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5283                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
5284
5285            let len = frame
5286                .data
5287                .len()
5288                .min(2usize.pow(14) - 1)
5289                .min(max_crypto_data_size);
5290
5291            let data = frame.data.split_to(len);
5292            let truncated = frame::Crypto {
5293                offset: frame.offset,
5294                data,
5295            };
5296            trace!(
5297                "CRYPTO: off {} len {}",
5298                truncated.offset,
5299                truncated.data.len()
5300            );
5301            truncated.encode(buf);
5302            self.stats.frame_tx.crypto += 1;
5303
5304            // The clone is cheap but we still cfg it out if qlog is disabled.
5305            #[cfg(feature = "qlog")]
5306            qlog.frame(&Frame::Crypto(truncated.clone()));
5307            sent.retransmits.get_or_create().crypto.push_back(truncated);
5308            if !frame.data.is_empty() {
5309                frame.offset += len as u64;
5310                space.pending.crypto.push_front(frame);
5311            }
5312        }
5313
5314        // TODO(flub): maybe this is much higher priority?
5315        // PATH_ABANDON
5316        while !path_exclusive_only
5317            && space_id == SpaceId::Data
5318            && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5319        {
5320            let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
5321            else {
5322                break;
5323            };
5324            let frame = frame::PathAbandon {
5325                path_id: abandoned_path_id,
5326                error_code,
5327            };
5328            frame.encode(buf);
5329            qlog.frame(&Frame::PathAbandon(frame));
5330            self.stats.frame_tx.path_abandon += 1;
5331            trace!(%abandoned_path_id, "PATH_ABANDON");
5332            sent.retransmits
5333                .get_or_create()
5334                .path_abandon
5335                .entry(abandoned_path_id)
5336                .or_insert(error_code);
5337
5338            let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5339            // We can't access path here anymore due to borrowing issues.
5340            let send_pto = self.paths.get(&path_id).unwrap().data.rtt.pto_base() + ack_delay;
5341            if let Some(abandoned_path) = self.paths.get_mut(&abandoned_path_id) {
5342                // We only want to set the deadline on the *first* PATH_ABANDON we send.
5343                // Retransmits shouldn't run this code again
5344                if matches!(
5345                    abandoned_path.data.abandon_state,
5346                    AbandonState::NotAbandoned
5347                ) {
5348                    // The peer MUST respond with a corresponding PATH_ABANDON frame.
5349                    // The other peer has 3 * PTO to do that.
5350                    // This uses the PTO of the path we send on!
5351                    // Once the PATH_ABANDON comes in, we set the path_abandon_deadline far
5352                    // into the future, so that receiving data on that path doesn't cause errors.
5353                    abandoned_path.data.abandon_state = AbandonState::ExpectingPathAbandon {
5354                        deadline: now + 3 * send_pto,
5355                    };
5356
5357                    // At some point, we need to forget about the path.
5358                    // If we do so too early, then we'll have discarded the CIDs of that path and won't
5359                    // handle incoming packets on that path correctly.
5360                    // To give this path enough time, we assume that the peer will have received our
5361                    // PATH_ABANDON within 3 * PTO of the path we sent the abandon on,
5362                    // and then we give the path 3 * PTO time to make it very unlikely that there will
5363                    // still be packets incoming on the path at that point.
5364                    // This timer will actually get reset to a value that's likely to be even earlier
5365                    // once we actually receive the PATH_ABANDON frame itself.
5366                    let abandoned_pto =
5367                        self.paths.get(&path_id).unwrap().data.rtt.pto_base() + ack_delay;
5368                    self.timers.set(
5369                        Timer::PerPath(abandoned_path_id, PathTimer::DiscardPath),
5370                        now + 3 * send_pto + 3 * abandoned_pto,
5371                        self.qlog.with_time(now),
5372                    );
5373                }
5374            } else {
5375                warn!("sent PATH_ABANDON after path was already discarded");
5376            }
5377        }
5378
5379        // PATH_STATUS_AVAILABLE & PATH_STATUS_BACKUP
5380        while !path_exclusive_only
5381            && space_id == SpaceId::Data
5382            && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5383        {
5384            let Some(path_id) = space.pending.path_status.pop_first() else {
5385                break;
5386            };
5387            let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5388                trace!(%path_id, "discarding queued path status for unknown path");
5389                continue;
5390            };
5391
5392            let seq = path.status.seq();
5393            sent.retransmits.get_or_create().path_status.insert(path_id);
5394            match path.local_status() {
5395                PathStatus::Available => {
5396                    let frame = frame::PathStatusAvailable {
5397                        path_id,
5398                        status_seq_no: seq,
5399                    };
5400                    frame.encode(buf);
5401                    qlog.frame(&Frame::PathStatusAvailable(frame));
5402                    self.stats.frame_tx.path_status_available += 1;
5403                    trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5404                }
5405                PathStatus::Backup => {
5406                    let frame = frame::PathStatusBackup {
5407                        path_id,
5408                        status_seq_no: seq,
5409                    };
5410                    frame.encode(buf);
5411                    qlog.frame(&Frame::PathStatusBackup(frame));
5412                    self.stats.frame_tx.path_status_backup += 1;
5413                    trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5414                }
5415            }
5416        }
5417
5418        // MAX_PATH_ID
5419        if space_id == SpaceId::Data
5420            && space.pending.max_path_id
5421            && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5422        {
5423            let frame = frame::MaxPathId(self.local_max_path_id);
5424            frame.encode(buf);
5425            qlog.frame(&Frame::MaxPathId(frame));
5426            space.pending.max_path_id = false;
5427            sent.retransmits.get_or_create().max_path_id = true;
5428            trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5429            self.stats.frame_tx.max_path_id += 1;
5430        }
5431
5432        // PATHS_BLOCKED
5433        if space_id == SpaceId::Data
5434            && space.pending.paths_blocked
5435            && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5436        {
5437            let frame = frame::PathsBlocked(self.remote_max_path_id);
5438            frame.encode(buf);
5439            qlog.frame(&Frame::PathsBlocked(frame));
5440            space.pending.paths_blocked = false;
5441            sent.retransmits.get_or_create().paths_blocked = true;
5442            trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5443            self.stats.frame_tx.paths_blocked += 1;
5444        }
5445
5446        // PATH_CIDS_BLOCKED
5447        while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5448        {
5449            let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5450                break;
5451            };
5452            let next_seq = match self.rem_cids.get(&path_id) {
5453                Some(cid_queue) => cid_queue.active_seq() + 1,
5454                None => 0,
5455            };
5456            let frame = frame::PathCidsBlocked {
5457                path_id,
5458                next_seq: VarInt(next_seq),
5459            };
5460            frame.encode(buf);
5461            qlog.frame(&Frame::PathCidsBlocked(frame));
5462            sent.retransmits
5463                .get_or_create()
5464                .path_cids_blocked
5465                .push(path_id);
5466            trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5467            self.stats.frame_tx.path_cids_blocked += 1;
5468        }
5469
5470        // RESET_STREAM, STOP_SENDING, MAX_DATA, MAX_STREAM_DATA, MAX_STREAMS
5471        if space_id == SpaceId::Data {
5472            self.streams.write_control_frames(
5473                buf,
5474                &mut space.pending,
5475                &mut sent.retransmits,
5476                &mut self.stats.frame_tx,
5477                qlog,
5478            );
5479        }
5480
5481        // NEW_CONNECTION_ID
5482        let cid_len = self
5483            .local_cid_state
5484            .values()
5485            .map(|cid_state| cid_state.cid_len())
5486            .max()
5487            .expect("some local CID state must exist");
5488        let new_cid_size_bound =
5489            frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5490        while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5491            let issued = match space.pending.new_cids.pop() {
5492                Some(x) => x,
5493                None => break,
5494            };
5495            let retire_prior_to = self
5496                .local_cid_state
5497                .get(&issued.path_id)
5498                .map(|cid_state| cid_state.retire_prior_to())
5499                .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5500
5501            let cid_path_id = match is_multipath_negotiated {
5502                true => {
5503                    trace!(
5504                        path_id = ?issued.path_id,
5505                        sequence = issued.sequence,
5506                        id = %issued.id,
5507                        "PATH_NEW_CONNECTION_ID",
5508                    );
5509                    self.stats.frame_tx.path_new_connection_id += 1;
5510                    Some(issued.path_id)
5511                }
5512                false => {
5513                    trace!(
5514                        sequence = issued.sequence,
5515                        id = %issued.id,
5516                        "NEW_CONNECTION_ID"
5517                    );
5518                    debug_assert_eq!(issued.path_id, PathId::ZERO);
5519                    self.stats.frame_tx.new_connection_id += 1;
5520                    None
5521                }
5522            };
5523            let frame = frame::NewConnectionId {
5524                path_id: cid_path_id,
5525                sequence: issued.sequence,
5526                retire_prior_to,
5527                id: issued.id,
5528                reset_token: issued.reset_token,
5529            };
5530            frame.encode(buf);
5531            sent.retransmits.get_or_create().new_cids.push(issued);
5532            qlog.frame(&Frame::NewConnectionId(frame));
5533        }
5534
5535        // RETIRE_CONNECTION_ID
5536        let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5537        while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5538            let (path_id, sequence) = match space.pending.retire_cids.pop() {
5539                Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5540                    trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5541                    self.stats.frame_tx.retire_connection_id += 1;
5542                    (None, seq)
5543                }
5544                Some((path_id, seq)) => {
5545                    trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5546                    self.stats.frame_tx.path_retire_connection_id += 1;
5547                    (Some(path_id), seq)
5548                }
5549                None => break,
5550            };
5551            let frame = frame::RetireConnectionId { path_id, sequence };
5552            frame.encode(buf);
5553            qlog.frame(&Frame::RetireConnectionId(frame));
5554            sent.retransmits
5555                .get_or_create()
5556                .retire_cids
5557                .push((path_id.unwrap_or_default(), sequence));
5558        }
5559
5560        // DATAGRAM
5561        let mut sent_datagrams = false;
5562        while !path_exclusive_only
5563            && buf.remaining_mut() > Datagram::SIZE_BOUND
5564            && space_id == SpaceId::Data
5565        {
5566            let prev_remaining = buf.remaining_mut();
5567            match self.datagrams.write(buf) {
5568                true => {
5569                    sent_datagrams = true;
5570                    sent.non_retransmits = true;
5571                    self.stats.frame_tx.datagram += 1;
5572                    qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5573                }
5574                false => break,
5575            }
5576        }
5577        if self.datagrams.send_blocked && sent_datagrams {
5578            self.events.push_back(Event::DatagramsUnblocked);
5579            self.datagrams.send_blocked = false;
5580        }
5581
5582        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5583
5584        // NEW_TOKEN
5585        if !path_exclusive_only {
5586            while let Some(remote_addr) = space.pending.new_tokens.pop() {
5587                debug_assert_eq!(space_id, SpaceId::Data);
5588                let ConnectionSide::Server { server_config } = &self.side else {
5589                    panic!("NEW_TOKEN frames should not be enqueued by clients");
5590                };
5591
5592                if remote_addr != path.remote {
5593                    // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
5594                    // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
5595                    // frames upon an path change. Instead, when the new path becomes validated,
5596                    // NEW_TOKEN frames may be enqueued for the new path instead.
5597                    continue;
5598                }
5599
5600                let token = Token::new(
5601                    TokenPayload::Validation {
5602                        ip: remote_addr.ip(),
5603                        issued: server_config.time_source.now(),
5604                    },
5605                    &mut self.rng,
5606                );
5607                let new_token = NewToken {
5608                    token: token.encode(&*server_config.token_key).into(),
5609                };
5610
5611                if buf.remaining_mut() < new_token.size() {
5612                    space.pending.new_tokens.push(remote_addr);
5613                    break;
5614                }
5615
5616                trace!("NEW_TOKEN");
5617                new_token.encode(buf);
5618                qlog.frame(&Frame::NewToken(new_token));
5619                sent.retransmits
5620                    .get_or_create()
5621                    .new_tokens
5622                    .push(remote_addr);
5623                self.stats.frame_tx.new_token += 1;
5624            }
5625        }
5626
5627        // STREAM
5628        if !path_exclusive_only && space_id == SpaceId::Data {
5629            sent.stream_frames =
5630                self.streams
5631                    .write_stream_frames(buf, self.config.send_fairness, qlog);
5632            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5633        }
5634
5635        // ADD_ADDRESS
5636        // TODO(@divma): check if we need to do path exclusive filters
5637        while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5638            if let Some(added_address) = space.pending.add_address.pop_last() {
5639                trace!(
5640                    seq = %added_address.seq_no,
5641                    ip = ?added_address.ip,
5642                    port = added_address.port,
5643                    "ADD_ADDRESS",
5644                );
5645                added_address.write(buf);
5646                sent.retransmits
5647                    .get_or_create()
5648                    .add_address
5649                    .insert(added_address);
5650                self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5651                qlog.frame(&Frame::AddAddress(added_address));
5652            } else {
5653                break;
5654            }
5655        }
5656
5657        // REMOVE_ADDRESS
5658        while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5659            if let Some(removed_address) = space.pending.remove_address.pop_last() {
5660                trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5661                removed_address.write(buf);
5662                sent.retransmits
5663                    .get_or_create()
5664                    .remove_address
5665                    .insert(removed_address);
5666                self.stats.frame_tx.remove_address =
5667                    self.stats.frame_tx.remove_address.saturating_add(1);
5668                qlog.frame(&Frame::RemoveAddress(removed_address));
5669            } else {
5670                break;
5671            }
5672        }
5673
5674        sent
5675    }
5676
5677    /// Write pending ACKs into a buffer
5678    fn populate_acks(
5679        now: Instant,
5680        receiving_ecn: bool,
5681        sent: &mut SentFrames,
5682        path_id: PathId,
5683        space_id: SpaceId,
5684        space: &mut PacketSpace,
5685        is_multipath_negotiated: bool,
5686        buf: &mut impl BufMut,
5687        stats: &mut ConnectionStats,
5688        #[allow(unused)] qlog: &mut QlogSentPacket,
5689    ) {
5690        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
5691        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5692
5693        debug_assert!(
5694            is_multipath_negotiated || path_id == PathId::ZERO,
5695            "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5696        );
5697        if is_multipath_negotiated {
5698            debug_assert!(
5699                space_id == SpaceId::Data || path_id == PathId::ZERO,
5700                "path acks must be sent in 1RTT space (have {space_id:?})"
5701            );
5702        }
5703
5704        let pns = space.for_path(path_id);
5705        let ranges = pns.pending_acks.ranges();
5706        debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5707        let ecn = if receiving_ecn {
5708            Some(&pns.ecn_counters)
5709        } else {
5710            None
5711        };
5712        if let Some(max) = ranges.max() {
5713            sent.largest_acked.insert(path_id, max);
5714        }
5715
5716        let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5717        // TODO: This should come from `TransportConfig` if that gets configurable.
5718        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5719        let delay = delay_micros >> ack_delay_exp.into_inner();
5720
5721        if is_multipath_negotiated && space_id == SpaceId::Data {
5722            if !ranges.is_empty() {
5723                trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5724                frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5725                qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5726                stats.frame_tx.path_acks += 1;
5727            }
5728        } else {
5729            trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5730            frame::Ack::encode(delay as _, ranges, ecn, buf);
5731            stats.frame_tx.acks += 1;
5732            qlog.frame_ack(delay, ranges, ecn);
5733        }
5734    }
5735
5736    fn close_common(&mut self) {
5737        trace!("connection closed");
5738        self.timers.reset();
5739    }
5740
5741    fn set_close_timer(&mut self, now: Instant) {
5742        // QUIC-MULTIPATH § 2.6 Connection Closure: draining for 3*PTO with PTO the max of
5743        // the PTO for all paths.
5744        let pto_max = self.pto_max_path(self.highest_space, true);
5745        self.timers.set(
5746            Timer::Conn(ConnTimer::Close),
5747            now + 3 * pto_max,
5748            self.qlog.with_time(now),
5749        );
5750    }
5751
5752    /// Handle transport parameters received from the peer
5753    ///
5754    /// *rem_cid* and *loc_cid* are the source and destination CIDs respectively of the
5755    /// *packet into which the transport parameters arrived.
5756    fn handle_peer_params(
5757        &mut self,
5758        params: TransportParameters,
5759        loc_cid: ConnectionId,
5760        rem_cid: ConnectionId,
5761        now: Instant,
5762    ) -> Result<(), TransportError> {
5763        if Some(self.orig_rem_cid) != params.initial_src_cid
5764            || (self.side.is_client()
5765                && (Some(self.initial_dst_cid) != params.original_dst_cid
5766                    || self.retry_src_cid != params.retry_src_cid))
5767        {
5768            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5769                "CID authentication failure",
5770            ));
5771        }
5772        if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5773            return Err(TransportError::PROTOCOL_VIOLATION(
5774                "multipath must not use zero-length CIDs",
5775            ));
5776        }
5777
5778        self.set_peer_params(params);
5779        self.qlog.emit_peer_transport_params_received(self, now);
5780
5781        Ok(())
5782    }
5783
5784    fn set_peer_params(&mut self, params: TransportParameters) {
5785        self.streams.set_params(&params);
5786        self.idle_timeout =
5787            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5788        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5789
5790        if let Some(ref info) = params.preferred_address {
5791            // During the handshake PathId::ZERO exists.
5792            self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5793                path_id: None,
5794                sequence: 1,
5795                id: info.connection_id,
5796                reset_token: info.stateless_reset_token,
5797                retire_prior_to: 0,
5798            })
5799            .expect(
5800                "preferred address CID is the first received, and hence is guaranteed to be legal",
5801            );
5802            let remote = self.path_data(PathId::ZERO).remote;
5803            self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5804        }
5805        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
5806
5807        let mut multipath_enabled = None;
5808        if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5809            self.config.get_initial_max_path_id(),
5810            params.initial_max_path_id,
5811        ) {
5812            // multipath is enabled, register the local and remote maximums
5813            self.local_max_path_id = local_max_path_id;
5814            self.remote_max_path_id = remote_max_path_id;
5815            let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5816            debug!(%initial_max_path_id, "multipath negotiated");
5817            multipath_enabled = Some(initial_max_path_id);
5818        }
5819
5820        if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5821            self.config
5822                .max_remote_nat_traversal_addresses
5823                .zip(params.max_remote_nat_traversal_addresses)
5824        {
5825            if let Some(max_initial_paths) =
5826                multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5827            {
5828                let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5829                let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5830                self.iroh_hp =
5831                    iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5832                debug!(
5833                    %max_remote_addresses, %max_local_addresses,
5834                    "iroh hole punching negotiated"
5835                );
5836
5837                match self.side() {
5838                    Side::Client => {
5839                        if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5840                            // in this case the client might try to open `max_remote_addresses` new
5841                            // paths, but the current multipath configuration will not allow it
5842                            warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5843                        } else if max_local_addresses as u64
5844                            > params.active_connection_id_limit.into_inner()
5845                        {
5846                            // the server allows us to send at most `params.active_connection_id_limit`
5847                            // but they might need at least `max_local_addresses` to effectively send
5848                            // `PATH_CHALLENGE` frames to each advertised local address
5849                            warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5850                        }
5851                    }
5852                    Side::Server => {
5853                        if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5854                            warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5855                        }
5856                    }
5857                }
5858            } else {
5859                debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5860            }
5861        }
5862
5863        self.peer_params = params;
5864        let peer_max_udp_payload_size =
5865            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5866        self.path_data_mut(PathId::ZERO)
5867            .mtud
5868            .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5869    }
5870
5871    /// Decrypts a packet, returning the packet number on success
5872    fn decrypt_packet(
5873        &mut self,
5874        now: Instant,
5875        path_id: PathId,
5876        packet: &mut Packet,
5877    ) -> Result<Option<u64>, Option<TransportError>> {
5878        let result = packet_crypto::decrypt_packet_body(
5879            packet,
5880            path_id,
5881            &self.spaces,
5882            self.zero_rtt_crypto.as_ref(),
5883            self.key_phase,
5884            self.prev_crypto.as_ref(),
5885            self.next_crypto.as_ref(),
5886        )?;
5887
5888        let result = match result {
5889            Some(r) => r,
5890            None => return Ok(None),
5891        };
5892
5893        if result.outgoing_key_update_acked {
5894            if let Some(prev) = self.prev_crypto.as_mut() {
5895                prev.end_packet = Some((result.number, now));
5896                self.set_key_discard_timer(now, packet.header.space());
5897            }
5898        }
5899
5900        if result.incoming_key_update {
5901            trace!("key update authenticated");
5902            self.update_keys(Some((result.number, now)), true);
5903            self.set_key_discard_timer(now, packet.header.space());
5904        }
5905
5906        Ok(Some(result.number))
5907    }
5908
5909    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5910        trace!("executing key update");
5911        // Generate keys for the key phase after the one we're switching to, store them in
5912        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
5913        // `prev_crypto`.
5914        let new = self
5915            .crypto
5916            .next_1rtt_keys()
5917            .expect("only called for `Data` packets");
5918        self.key_phase_size = new
5919            .local
5920            .confidentiality_limit()
5921            .saturating_sub(KEY_UPDATE_MARGIN);
5922        let old = mem::replace(
5923            &mut self.spaces[SpaceId::Data]
5924                .crypto
5925                .as_mut()
5926                .unwrap() // safe because update_keys() can only be triggered by short packets
5927                .packet,
5928            mem::replace(self.next_crypto.as_mut().unwrap(), new),
5929        );
5930        self.spaces[SpaceId::Data]
5931            .iter_paths_mut()
5932            .for_each(|s| s.sent_with_keys = 0);
5933        self.prev_crypto = Some(PrevCrypto {
5934            crypto: old,
5935            end_packet,
5936            update_unacked: remote,
5937        });
5938        self.key_phase = !self.key_phase;
5939    }
5940
5941    fn peer_supports_ack_frequency(&self) -> bool {
5942        self.peer_params.min_ack_delay.is_some()
5943    }
5944
5945    /// Send an IMMEDIATE_ACK frame to the remote endpoint
5946    ///
5947    /// According to the spec, this will result in an error if the remote endpoint does not support
5948    /// the Acknowledgement Frequency extension
5949    pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5950        debug_assert_eq!(
5951            self.highest_space,
5952            SpaceId::Data,
5953            "immediate ack must be written in the data space"
5954        );
5955        self.spaces[self.highest_space]
5956            .for_path(path_id)
5957            .immediate_ack_pending = true;
5958    }
5959
5960    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
5961    #[cfg(test)]
5962    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5963        let (path_id, first_decode, remaining) = match &event.0 {
5964            ConnectionEventInner::Datagram(DatagramConnectionEvent {
5965                path_id,
5966                first_decode,
5967                remaining,
5968                ..
5969            }) => (path_id, first_decode, remaining),
5970            _ => return None,
5971        };
5972
5973        if remaining.is_some() {
5974            panic!("Packets should never be coalesced in tests");
5975        }
5976
5977        let decrypted_header = packet_crypto::unprotect_header(
5978            first_decode.clone(),
5979            &self.spaces,
5980            self.zero_rtt_crypto.as_ref(),
5981            self.peer_params.stateless_reset_token,
5982        )?;
5983
5984        let mut packet = decrypted_header.packet?;
5985        packet_crypto::decrypt_packet_body(
5986            &mut packet,
5987            *path_id,
5988            &self.spaces,
5989            self.zero_rtt_crypto.as_ref(),
5990            self.key_phase,
5991            self.prev_crypto.as_ref(),
5992            self.next_crypto.as_ref(),
5993        )
5994        .ok()?;
5995
5996        Some(packet.payload.to_vec())
5997    }
5998
5999    /// The number of bytes of packets containing retransmittable frames that have not been
6000    /// acknowledged or declared lost.
6001    #[cfg(test)]
6002    pub(crate) fn bytes_in_flight(&self) -> u64 {
6003        // TODO(@divma): consider including for multipath?
6004        self.path_data(PathId::ZERO).in_flight.bytes
6005    }
6006
6007    /// Number of bytes worth of non-ack-only packets that may be sent
6008    #[cfg(test)]
6009    pub(crate) fn congestion_window(&self) -> u64 {
6010        let path = self.path_data(PathId::ZERO);
6011        path.congestion
6012            .window()
6013            .saturating_sub(path.in_flight.bytes)
6014    }
6015
6016    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
6017    #[cfg(test)]
6018    pub(crate) fn is_idle(&self) -> bool {
6019        let current_timers = self.timers.values();
6020        current_timers
6021            .into_iter()
6022            .filter(|(timer, _)| {
6023                !matches!(
6024                    timer,
6025                    Timer::Conn(ConnTimer::KeepAlive)
6026                        | Timer::PerPath(_, PathTimer::PathKeepAlive)
6027                        | Timer::Conn(ConnTimer::PushNewCid)
6028                        | Timer::Conn(ConnTimer::KeyDiscard)
6029                )
6030            })
6031            .min_by_key(|(_, time)| *time)
6032            .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6033    }
6034
6035    /// Whether explicit congestion notification is in use on outgoing packets.
6036    #[cfg(test)]
6037    pub(crate) fn using_ecn(&self) -> bool {
6038        self.path_data(PathId::ZERO).sending_ecn
6039    }
6040
6041    /// The number of received bytes in the current path
6042    #[cfg(test)]
6043    pub(crate) fn total_recvd(&self) -> u64 {
6044        self.path_data(PathId::ZERO).total_recvd
6045    }
6046
6047    #[cfg(test)]
6048    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6049        self.local_cid_state
6050            .get(&PathId::ZERO)
6051            .unwrap()
6052            .active_seq()
6053    }
6054
6055    #[cfg(test)]
6056    #[track_caller]
6057    pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6058        self.local_cid_state
6059            .get(&PathId(path_id))
6060            .unwrap()
6061            .active_seq()
6062    }
6063
6064    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
6065    /// with updated `retire_prior_to` field set to `v`
6066    #[cfg(test)]
6067    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6068        let n = self
6069            .local_cid_state
6070            .get_mut(&PathId::ZERO)
6071            .unwrap()
6072            .assign_retire_seq(v);
6073        self.endpoint_events
6074            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6075    }
6076
6077    /// Check the current active remote CID sequence for `PathId::ZERO`
6078    #[cfg(test)]
6079    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6080        self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6081    }
6082
6083    /// Returns the detected maximum udp payload size for the current path
6084    #[cfg(test)]
6085    pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6086        self.path_data(path_id).current_mtu()
6087    }
6088
6089    /// Triggers path validation on all paths
6090    #[cfg(test)]
6091    pub(crate) fn trigger_path_validation(&mut self) {
6092        for path in self.paths.values_mut() {
6093            path.data.send_new_challenge = true;
6094        }
6095    }
6096
6097    /// Whether we have 1-RTT data to send
6098    ///
6099    /// This checks for frames that can only be sent in the data space (1-RTT):
6100    /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated.
6101    /// - Pending PATH_RESPONSE frames.
6102    /// - Pending data to send in STREAM frames.
6103    /// - Pending DATAGRAM frames to send.
6104    ///
6105    /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that
6106    /// may need to be sent.
6107    fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6108        let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6109            path.data.send_new_challenge
6110                || path
6111                    .prev
6112                    .as_ref()
6113                    .is_some_and(|(_, path)| path.send_new_challenge)
6114                || !path.data.path_responses.is_empty()
6115        });
6116        let other = self.streams.can_send_stream_data()
6117            || self
6118                .datagrams
6119                .outgoing
6120                .front()
6121                .is_some_and(|x| x.size(true) <= max_size);
6122        SendableFrames {
6123            acks: false,
6124            other,
6125            close: false,
6126            path_exclusive,
6127        }
6128    }
6129
6130    /// Terminate the connection instantly, without sending a close packet
6131    fn kill(&mut self, reason: ConnectionError) {
6132        self.close_common();
6133        self.state.move_to_drained(Some(reason));
6134        self.endpoint_events.push_back(EndpointEventInner::Drained);
6135    }
6136
6137    /// Storage size required for the largest packet that can be transmitted on all currently
6138    /// available paths
6139    ///
6140    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
6141    ///
6142    /// When multipath is enabled, this value is the minimum MTU across all available paths.
6143    pub fn current_mtu(&self) -> u16 {
6144        self.paths
6145            .iter()
6146            .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6147            .map(|(_path_id, path_state)| path_state.data.current_mtu())
6148            .min()
6149            .expect("There is always at least one available path")
6150    }
6151
6152    /// Size of non-frame data for a 1-RTT packet
6153    ///
6154    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
6155    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
6156    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
6157    /// latency and packet loss.
6158    fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6159        let pn_len = PacketNumber::new(
6160            pn,
6161            self.spaces[SpaceId::Data]
6162                .for_path(path)
6163                .largest_acked_packet
6164                .unwrap_or(0),
6165        )
6166        .len();
6167
6168        // 1 byte for flags
6169        1 + self
6170            .rem_cids
6171            .get(&path)
6172            .map(|cids| cids.active().len())
6173            .unwrap_or(20)      // Max CID len in QUIC v1
6174            + pn_len
6175            + self.tag_len_1rtt()
6176    }
6177
6178    fn predict_1rtt_overhead_no_pn(&self) -> usize {
6179        let pn_len = 4;
6180
6181        let cid_len = self
6182            .rem_cids
6183            .values()
6184            .map(|cids| cids.active().len())
6185            .max()
6186            .unwrap_or(20); // Max CID len in QUIC v1
6187
6188        // 1 byte for flags
6189        1 + cid_len + pn_len + self.tag_len_1rtt()
6190    }
6191
6192    fn tag_len_1rtt(&self) -> usize {
6193        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6194            Some(crypto) => Some(&*crypto.packet.local),
6195            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6196        };
6197        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
6198        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
6199        // but that would needlessly prevent sending datagrams during 0-RTT.
6200        key.map_or(16, |x| x.tag_len())
6201    }
6202
6203    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
6204    fn on_path_validated(&mut self, path_id: PathId) {
6205        self.path_data_mut(path_id).validated = true;
6206        let ConnectionSide::Server { server_config } = &self.side else {
6207            return;
6208        };
6209        let remote_addr = self.path_data(path_id).remote;
6210        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6211        new_tokens.clear();
6212        for _ in 0..server_config.validation_token.sent {
6213            new_tokens.push(remote_addr);
6214        }
6215    }
6216
6217    /// Handle new path status information: PATH_STATUS_AVAILABLE, PATH_STATUS_BACKUP
6218    fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6219        if let Some(path) = self.paths.get_mut(&path_id) {
6220            path.data.status.remote_update(status, status_seq_no);
6221        } else {
6222            debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6223        }
6224        self.events.push_back(
6225            PathEvent::RemoteStatus {
6226                id: path_id,
6227                status,
6228            }
6229            .into(),
6230        );
6231    }
6232
6233    /// Returns the maximum [`PathId`] to be used for sending in this connection.
6234    ///
6235    /// This is calculated as minimum between the local and remote's maximums when multipath is
6236    /// enabled, or `None` when disabled.
6237    ///
6238    /// For data that's received, we should use [`Self::local_max_path_id`] instead.
6239    /// The reasoning is that the remote might already have updated to its own newer
6240    /// [`Self::max_path_id`] after sending out a `MAX_PATH_ID` frame, but it got re-ordered.
6241    fn max_path_id(&self) -> Option<PathId> {
6242        if self.is_multipath_negotiated() {
6243            Some(self.remote_max_path_id.min(self.local_max_path_id))
6244        } else {
6245            None
6246        }
6247    }
6248
6249    /// Add addresses the local endpoint considers are reachable for nat traversal
6250    pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6251        if let Some(added) = self.iroh_hp.add_local_address(address)? {
6252            self.spaces[SpaceId::Data].pending.add_address.insert(added);
6253        };
6254        Ok(())
6255    }
6256
6257    /// Removes an address the endpoing no longer considers reachable for nat traversal
6258    ///
6259    /// Addresses not present in the set will be silently ignored.
6260    pub fn remove_nat_traversal_address(
6261        &mut self,
6262        address: SocketAddr,
6263    ) -> Result<(), iroh_hp::Error> {
6264        if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6265            self.spaces[SpaceId::Data]
6266                .pending
6267                .remove_address
6268                .insert(removed);
6269        }
6270        Ok(())
6271    }
6272
6273    /// Get the current local nat traversal addresses
6274    pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6275        self.iroh_hp.get_local_nat_traversal_addresses()
6276    }
6277
6278    /// Get the currently advertised nat traversal addresses by the server
6279    pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6280        Ok(self
6281            .iroh_hp
6282            .client_side()?
6283            .get_remote_nat_traversal_addresses())
6284    }
6285
6286    /// Attempts to open a path for nat traversal.
6287    ///
6288    /// `ipv6` indicates if the path should be opened using an IPV6 remote. If the address is
6289    /// ignored, it will return `None`.
6290    ///
6291    /// On success returns the [`PathId`] and remote address of the path, as well as whether the path
6292    /// existed for the adjusted remote.
6293    fn open_nat_traversal_path(
6294        &mut self,
6295        now: Instant,
6296        (ip, port): (IpAddr, u16),
6297        ipv6: bool,
6298    ) -> Result<Option<(PathId, SocketAddr, bool)>, PathError> {
6299        // If this endpoint is an IPv6 endpoint we use IPv6 addresses for all remotes.
6300        let remote = match ip {
6301            IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6302            IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6303            IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6304            IpAddr::V6(_) => {
6305                trace!("not using IPv6 nat candidate for IPv4 socket");
6306                return Ok(None);
6307            }
6308        };
6309        match self.open_path_ensure(remote, PathStatus::Backup, now) {
6310            Ok((path_id, path_was_known)) => {
6311                if path_was_known {
6312                    trace!(%path_id, %remote, "nat traversal: path existed for remote");
6313                }
6314                Ok(Some((path_id, remote, path_was_known)))
6315            }
6316            Err(e) => {
6317                debug!(%remote, %e, "nat traversal: failed to probe remote");
6318                Err(e)
6319            }
6320        }
6321    }
6322
6323    /// Initiates a new nat traversal round
6324    ///
6325    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
6326    /// frames, and initiating probing of the known remote addresses. When a new round is
6327    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
6328    ///
6329    /// Returns the server addresses that are now being probed.
6330    /// If addresses fail due to spurious errors, these might succeed later and not be returned in
6331    /// this set.
6332    pub fn initiate_nat_traversal_round(
6333        &mut self,
6334        now: Instant,
6335    ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6336        if self.state.is_closed() {
6337            return Err(iroh_hp::Error::Closed);
6338        }
6339
6340        let client_state = self.iroh_hp.client_side_mut()?;
6341        let iroh_hp::NatTraversalRound {
6342            new_round,
6343            reach_out_at,
6344            addresses_to_probe,
6345            prev_round_path_ids,
6346        } = client_state.initiate_nat_traversal_round()?;
6347
6348        self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6349
6350        for path_id in prev_round_path_ids {
6351            // TODO(@divma): this sounds reasonable but we need if this actually works for the
6352            // purposes of the protocol
6353            let validated = self
6354                .path(path_id)
6355                .map(|path| path.validated)
6356                .unwrap_or(false);
6357
6358            if !validated {
6359                let _ = self.close_path(
6360                    now,
6361                    path_id,
6362                    TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6363                );
6364            }
6365        }
6366
6367        let mut err = None;
6368
6369        let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6370        let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6371        let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6372
6373        for (id, address) in addresses_to_probe {
6374            match self.open_nat_traversal_path(now, address, ipv6) {
6375                Ok(None) => {}
6376                Ok(Some((path_id, remote, path_was_known))) => {
6377                    if !path_was_known {
6378                        path_ids.push(path_id);
6379                        probed_addresses.push(remote);
6380                    }
6381                }
6382                Err(e) => {
6383                    self.iroh_hp
6384                        .client_side_mut()
6385                        .expect("validated")
6386                        .report_in_continuation(id, e);
6387                    err.get_or_insert(e);
6388                }
6389            }
6390        }
6391
6392        if let Some(err) = err {
6393            // We failed to probe any addresses, bail out
6394            if probed_addresses.is_empty() {
6395                return Err(iroh_hp::Error::Multipath(err));
6396            }
6397        }
6398
6399        self.iroh_hp
6400            .client_side_mut()
6401            .expect("connection side validated")
6402            .set_round_path_ids(path_ids);
6403
6404        Ok(probed_addresses)
6405    }
6406
6407    /// Attempts to continue a nat traversal round by trying to open paths for pending client probes.
6408    ///
6409    /// If there was nothing to do, it returns `None`. Otherwise it returns whether the path was
6410    /// successfully open.
6411    fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6412        let client_state = self.iroh_hp.client_side_mut().ok()?;
6413        let (id, address) = client_state.continue_nat_traversal_round()?;
6414        let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6415        let open_result = self.open_nat_traversal_path(now, address, ipv6);
6416        let client_state = self.iroh_hp.client_side_mut().expect("validated");
6417        match open_result {
6418            Ok(None) => Some(true),
6419            Ok(Some((path_id, _remote, path_was_known))) => {
6420                if !path_was_known {
6421                    client_state.add_round_path_id(path_id);
6422                }
6423                Some(true)
6424            }
6425            Err(e) => {
6426                client_state.report_in_continuation(id, e);
6427                Some(false)
6428            }
6429        }
6430    }
6431}
6432
6433impl fmt::Debug for Connection {
6434    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6435        f.debug_struct("Connection")
6436            .field("handshake_cid", &self.handshake_cid)
6437            .finish()
6438    }
6439}
6440
6441#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6442enum PathBlocked {
6443    No,
6444    AntiAmplification,
6445    Congestion,
6446    Pacing,
6447}
6448
6449/// Fields of `Connection` specific to it being client-side or server-side
6450enum ConnectionSide {
6451    Client {
6452        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
6453        token: Bytes,
6454        token_store: Arc<dyn TokenStore>,
6455        server_name: String,
6456    },
6457    Server {
6458        server_config: Arc<ServerConfig>,
6459    },
6460}
6461
6462impl ConnectionSide {
6463    fn remote_may_migrate(&self, state: &State) -> bool {
6464        match self {
6465            Self::Server { server_config } => server_config.migration,
6466            Self::Client { .. } => {
6467                if let Some(hs) = state.as_handshake() {
6468                    hs.allow_server_migration
6469                } else {
6470                    false
6471                }
6472            }
6473        }
6474    }
6475
6476    fn is_client(&self) -> bool {
6477        self.side().is_client()
6478    }
6479
6480    fn is_server(&self) -> bool {
6481        self.side().is_server()
6482    }
6483
6484    fn side(&self) -> Side {
6485        match *self {
6486            Self::Client { .. } => Side::Client,
6487            Self::Server { .. } => Side::Server,
6488        }
6489    }
6490}
6491
6492impl From<SideArgs> for ConnectionSide {
6493    fn from(side: SideArgs) -> Self {
6494        match side {
6495            SideArgs::Client {
6496                token_store,
6497                server_name,
6498            } => Self::Client {
6499                token: token_store.take(&server_name).unwrap_or_default(),
6500                token_store,
6501                server_name,
6502            },
6503            SideArgs::Server {
6504                server_config,
6505                pref_addr_cid: _,
6506                path_validated: _,
6507            } => Self::Server { server_config },
6508        }
6509    }
6510}
6511
6512/// Parameters to `Connection::new` specific to it being client-side or server-side
6513pub(crate) enum SideArgs {
6514    Client {
6515        token_store: Arc<dyn TokenStore>,
6516        server_name: String,
6517    },
6518    Server {
6519        server_config: Arc<ServerConfig>,
6520        pref_addr_cid: Option<ConnectionId>,
6521        path_validated: bool,
6522    },
6523}
6524
6525impl SideArgs {
6526    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6527        match *self {
6528            Self::Client { .. } => None,
6529            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6530        }
6531    }
6532
6533    pub(crate) fn path_validated(&self) -> bool {
6534        match *self {
6535            Self::Client { .. } => true,
6536            Self::Server { path_validated, .. } => path_validated,
6537        }
6538    }
6539
6540    pub(crate) fn side(&self) -> Side {
6541        match *self {
6542            Self::Client { .. } => Side::Client,
6543            Self::Server { .. } => Side::Server,
6544        }
6545    }
6546}
6547
6548/// Reasons why a connection might be lost
6549#[derive(Debug, Error, Clone, PartialEq, Eq)]
6550pub enum ConnectionError {
6551    /// The peer doesn't implement any supported version
6552    #[error("peer doesn't implement any supported version")]
6553    VersionMismatch,
6554    /// The peer violated the QUIC specification as understood by this implementation
6555    #[error(transparent)]
6556    TransportError(#[from] TransportError),
6557    /// The peer's QUIC stack aborted the connection automatically
6558    #[error("aborted by peer: {0}")]
6559    ConnectionClosed(frame::ConnectionClose),
6560    /// The peer closed the connection
6561    #[error("closed by peer: {0}")]
6562    ApplicationClosed(frame::ApplicationClose),
6563    /// The peer is unable to continue processing this connection, usually due to having restarted
6564    #[error("reset by peer")]
6565    Reset,
6566    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
6567    ///
6568    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
6569    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
6570    /// and [`TransportConfig::keep_alive_interval()`].
6571    #[error("timed out")]
6572    TimedOut,
6573    /// The local application closed the connection
6574    #[error("closed")]
6575    LocallyClosed,
6576    /// The connection could not be created because not enough of the CID space is available
6577    ///
6578    /// Try using longer connection IDs.
6579    #[error("CIDs exhausted")]
6580    CidsExhausted,
6581}
6582
6583impl From<Close> for ConnectionError {
6584    fn from(x: Close) -> Self {
6585        match x {
6586            Close::Connection(reason) => Self::ConnectionClosed(reason),
6587            Close::Application(reason) => Self::ApplicationClosed(reason),
6588        }
6589    }
6590}
6591
6592// For compatibility with API consumers
6593impl From<ConnectionError> for io::Error {
6594    fn from(x: ConnectionError) -> Self {
6595        use ConnectionError::*;
6596        let kind = match x {
6597            TimedOut => io::ErrorKind::TimedOut,
6598            Reset => io::ErrorKind::ConnectionReset,
6599            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6600            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6601                io::ErrorKind::Other
6602            }
6603        };
6604        Self::new(kind, x)
6605    }
6606}
6607
6608/// Errors that might trigger a path being closed
6609// TODO(@divma): maybe needs to be reworked based on what we want to do with the public API
6610#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6611pub enum PathError {
6612    /// The extension was not negotiated with the peer
6613    #[error("multipath extension not negotiated")]
6614    MultipathNotNegotiated,
6615    /// Paths can only be opened client-side
6616    #[error("the server side may not open a path")]
6617    ServerSideNotAllowed,
6618    /// Current limits do not allow us to open more paths
6619    #[error("maximum number of concurrent paths reached")]
6620    MaxPathIdReached,
6621    /// No remote CIDs available to open a new path
6622    #[error("remoted CIDs exhausted")]
6623    RemoteCidsExhausted,
6624    /// Path could not be validated and will be abandoned
6625    #[error("path validation failed")]
6626    ValidationFailed,
6627    /// The remote address for the path is not supported by the endpoint
6628    #[error("invalid remote address")]
6629    InvalidRemoteAddress(SocketAddr),
6630}
6631
6632/// Errors triggered when abandoning a path
6633#[derive(Debug, Error, Clone, Eq, PartialEq)]
6634pub enum ClosePathError {
6635    /// The path is already closed or was never opened
6636    #[error("closed path")]
6637    ClosedPath,
6638    /// This is the last path, which can not be abandoned
6639    #[error("last open path")]
6640    LastOpenPath,
6641}
6642
6643#[derive(Debug, Error, Clone, Copy)]
6644#[error("Multipath extension not negotiated")]
6645pub struct MultipathNotNegotiated {
6646    _private: (),
6647}
6648
6649/// Events of interest to the application
6650#[derive(Debug)]
6651pub enum Event {
6652    /// The connection's handshake data is ready
6653    HandshakeDataReady,
6654    /// The connection was successfully established
6655    Connected,
6656    /// The TLS handshake was confirmed
6657    HandshakeConfirmed,
6658    /// The connection was lost
6659    ///
6660    /// Emitted if the peer closes the connection or an error is encountered.
6661    ConnectionLost {
6662        /// Reason that the connection was closed
6663        reason: ConnectionError,
6664    },
6665    /// Stream events
6666    Stream(StreamEvent),
6667    /// One or more application datagrams have been received
6668    DatagramReceived,
6669    /// One or more application datagrams have been sent after blocking
6670    DatagramsUnblocked,
6671    /// (Multi)Path events
6672    Path(PathEvent),
6673    /// Iroh's nat traversal events
6674    NatTraversal(iroh_hp::Event),
6675}
6676
6677impl From<PathEvent> for Event {
6678    fn from(source: PathEvent) -> Self {
6679        Self::Path(source)
6680    }
6681}
6682
6683fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6684    Duration::from_micros(params.max_ack_delay.0 * 1000)
6685}
6686
6687// Prevents overflow and improves behavior in extreme circumstances
6688const MAX_BACKOFF_EXPONENT: u32 = 16;
6689
6690/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
6691///
6692/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
6693/// plus some space for frames. We only care about handshake headers because short header packets
6694/// necessarily have smaller headers, and initial packets are only ever the first packet in a
6695/// datagram (because we coalesce in ascending packet space order and the only reason to split a
6696/// packet is when packet space changes).
6697const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6698
6699/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
6700///
6701/// Excludes packet-type-specific fields such as packet number or Initial token
6702// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
6703// scid len + scid + length + pn
6704const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6705    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6706
6707/// Perform key updates this many packets before the AEAD confidentiality limit.
6708///
6709/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
6710const KEY_UPDATE_MARGIN: u64 = 10_000;
6711
6712#[derive(Default)]
6713struct SentFrames {
6714    retransmits: ThinRetransmits,
6715    /// The packet number of the largest acknowledged packet for each path
6716    largest_acked: FxHashMap<PathId, u64>,
6717    stream_frames: StreamMetaVec,
6718    /// Whether the packet contains non-retransmittable frames (like datagrams)
6719    non_retransmits: bool,
6720    /// If the datagram containing these frames should be padded to the min MTU
6721    requires_padding: bool,
6722}
6723
6724impl SentFrames {
6725    /// Returns whether the packet contains only ACKs
6726    fn is_ack_only(&self, streams: &StreamsState) -> bool {
6727        !self.largest_acked.is_empty()
6728            && !self.non_retransmits
6729            && self.stream_frames.is_empty()
6730            && self.retransmits.is_empty(streams)
6731    }
6732}
6733
6734/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
6735///
6736/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
6737///
6738/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
6739///
6740/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
6741fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6742    match (x, y) {
6743        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6744        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6745        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6746        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6747    }
6748}
6749
6750#[cfg(test)]
6751mod tests {
6752    use super::*;
6753
6754    #[test]
6755    fn negotiate_max_idle_timeout_commutative() {
6756        let test_params = [
6757            (None, None, None),
6758            (None, Some(VarInt(0)), None),
6759            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6760            (Some(VarInt(0)), Some(VarInt(0)), None),
6761            (
6762                Some(VarInt(2)),
6763                Some(VarInt(0)),
6764                Some(Duration::from_millis(2)),
6765            ),
6766            (
6767                Some(VarInt(1)),
6768                Some(VarInt(4)),
6769                Some(Duration::from_millis(1)),
6770            ),
6771        ];
6772
6773        for (left, right, result) in test_params {
6774            assert_eq!(negotiate_max_idle_timeout(left, right), result);
6775            assert_eq!(negotiate_max_idle_timeout(right, left), result);
6776        }
6777    }
6778}