iroh_quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque, btree_map},
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    num::NonZeroU32,
8    ops::Not,
9    sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21    Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22    MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23    TransportErrorCode, VarInt,
24    cid_generator::ConnectionIdGenerator,
25    cid_queue::CidQueue,
26    coding::BufMutExt,
27    config::{ServerConfig, TransportConfig},
28    congestion::Controller,
29    connection::{
30        qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31        spaces::LostPacket,
32        timer::{ConnTimer, PathTimer},
33    },
34    crypto::{self, KeyPair, Keys, PacketKey},
35    frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36    iroh_hp,
37    packet::{
38        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39        PacketNumber, PartialDecode, SpaceId,
40    },
41    range_set::ArrayRangeSet,
42    shared::{
43        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44        EndpointEvent, EndpointEventInner,
45    },
46    token::{ResetToken, Token, TokenPayload},
47    transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77
78mod send_buffer;
79
80mod spaces;
81#[cfg(fuzzing)]
82pub use spaces::Retransmits;
83#[cfg(not(fuzzing))]
84use spaces::Retransmits;
85use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
86
87mod stats;
88pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
89
90mod streams;
91#[cfg(fuzzing)]
92pub use streams::StreamsState;
93#[cfg(not(fuzzing))]
94use streams::StreamsState;
95pub use streams::{
96    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
97    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
98};
99
100mod timer;
101use timer::{Timer, TimerTable};
102
103mod transmit_buf;
104use transmit_buf::TransmitBuf;
105
106mod state;
107
108#[cfg(not(fuzzing))]
109use state::State;
110#[cfg(fuzzing)]
111pub use state::State;
112use state::StateType;
113
114/// Protocol state and logic for a single QUIC connection
115///
116/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
117/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
118/// expects timeouts through various methods. A number of simple getter methods are exposed
119/// to allow callers to inspect some of the connection state.
120///
121/// `Connection` has roughly 4 types of methods:
122///
123/// - A. Simple getters, taking `&self`
124/// - B. Handlers for incoming events from the network or system, named `handle_*`.
125/// - C. State machine mutators, for incoming commands from the application. For convenience we
126///   refer to this as "performing I/O" below, however as per the design of this library none of the
127///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
128///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
129/// - D. Polling functions for outgoing events or actions for the caller to
130///   take, named `poll_*`.
131///
132/// The simplest way to use this API correctly is to call (B) and (C) whenever
133/// appropriate, then after each of those calls, as soon as feasible call all
134/// polling methods (D) and deal with their outputs appropriately, e.g. by
135/// passing it to the application or by making a system-level I/O call. You
136/// should call the polling functions in this order:
137///
138/// 1. [`poll_transmit`](Self::poll_transmit)
139/// 2. [`poll_timeout`](Self::poll_timeout)
140/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
141/// 4. [`poll`](Self::poll)
142///
143/// Currently the only actual dependency is from (2) to (1), however additional
144/// dependencies may be added in future, so the above order is recommended.
145///
146/// (A) may be called whenever desired.
147///
148/// Care should be made to ensure that the input events represent monotonically
149/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
150/// with events of the same [`Instant`] may be interleaved in any order with a
151/// call to [`handle_event`](Self::handle_event) at that same instant; however
152/// events or timeouts with different instants must not be interleaved.
153pub struct Connection {
154    endpoint_config: Arc<EndpointConfig>,
155    config: Arc<TransportConfig>,
156    rng: StdRng,
157    crypto: Box<dyn crypto::Session>,
158    /// The CID we initially chose, for use during the handshake
159    handshake_cid: ConnectionId,
160    /// The CID the peer initially chose, for use during the handshake
161    rem_handshake_cid: ConnectionId,
162    /// The "real" local IP address which was was used to receive the initial packet.
163    /// This is only populated for the server case, and if known
164    local_ip: Option<IpAddr>,
165    /// The [`PathData`] for each path
166    ///
167    /// This needs to be ordered because [`Connection::poll_transmit`] needs to
168    /// deterministically select the next PathId to send on.
169    // TODO(flub): well does it really? But deterministic is nice for now.
170    paths: BTreeMap<PathId, PathState>,
171    /// Incremented every time we see a new path
172    ///
173    /// Stored separately from `path.generation` to account for aborted migrations
174    path_counter: u64,
175    /// Whether MTU detection is supported in this environment
176    allow_mtud: bool,
177    state: State,
178    side: ConnectionSide,
179    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
180    zero_rtt_enabled: bool,
181    /// Set if 0-RTT is supported, then cleared when no longer needed.
182    zero_rtt_crypto: Option<ZeroRttCrypto>,
183    key_phase: bool,
184    /// How many packets are in the current key phase. Used only for `Data` space.
185    key_phase_size: u64,
186    /// Transport parameters set by the peer
187    peer_params: TransportParameters,
188    /// Source ConnectionId of the first packet received from the peer
189    orig_rem_cid: ConnectionId,
190    /// Destination ConnectionId sent by the client on the first Initial
191    initial_dst_cid: ConnectionId,
192    /// The value that the server included in the Source Connection ID field of a Retry packet, if
193    /// one was received
194    retry_src_cid: Option<ConnectionId>,
195    /// Events returned by [`Connection::poll`]
196    events: VecDeque<Event>,
197    endpoint_events: VecDeque<EndpointEventInner>,
198    /// Whether the spin bit is in use for this connection
199    spin_enabled: bool,
200    /// Outgoing spin bit state
201    spin: bool,
202    /// Packet number spaces: initial, handshake, 1-RTT
203    spaces: [PacketSpace; 3],
204    /// Highest usable [`SpaceId`]
205    highest_space: SpaceId,
206    /// 1-RTT keys used prior to a key update
207    prev_crypto: Option<PrevCrypto>,
208    /// 1-RTT keys to be used for the next key update
209    ///
210    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
211    /// spoofing key updates.
212    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
213    accepted_0rtt: bool,
214    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
215    permit_idle_reset: bool,
216    /// Negotiated idle timeout
217    idle_timeout: Option<Duration>,
218    timers: TimerTable,
219    /// Number of packets received which could not be authenticated
220    authentication_failures: u64,
221
222    //
223    // Queued non-retransmittable 1-RTT data
224    //
225    /// If the CONNECTION_CLOSE frame needs to be sent
226    close: bool,
227
228    //
229    // ACK frequency
230    //
231    ack_frequency: AckFrequencyState,
232
233    //
234    // Congestion Control
235    //
236    /// Whether the most recently received packet had an ECN codepoint set
237    receiving_ecn: bool,
238    /// Number of packets authenticated
239    total_authed_packets: u64,
240    /// Whether the last `poll_transmit` call yielded no data because there was
241    /// no outgoing application data.
242    app_limited: bool,
243
244    //
245    // ObservedAddr
246    //
247    /// Sequence number for the next observed address frame sent to the peer.
248    next_observed_addr_seq_no: VarInt,
249
250    streams: StreamsState,
251    /// Surplus remote CIDs for future use on new paths
252    ///
253    /// These are given out before multiple paths exist, also for paths that will never
254    /// exist.  So if multipath is supported the number of paths here will be higher than
255    /// the actual number of paths in use.
256    rem_cids: FxHashMap<PathId, CidQueue>,
257    /// Attributes of CIDs generated by local endpoint
258    ///
259    /// Any path that is allowed to be opened is present in this map, as well as the already
260    /// opened paths. However since CIDs are issued async by the endpoint driver via
261    /// connection events it can not be used to know if CIDs have been issued for a path or
262    /// not. See [`Connection::max_path_id_with_cids`] for this.
263    local_cid_state: FxHashMap<PathId, CidState>,
264    /// State of the unreliable datagram extension
265    datagrams: DatagramState,
266    /// Connection level statistics
267    stats: ConnectionStats,
268    /// Path level statistics
269    path_stats: FxHashMap<PathId, PathStats>,
270    /// QUIC version used for the connection.
271    version: u32,
272
273    //
274    // Multipath
275    //
276    /// Maximum number of concurrent paths
277    ///
278    /// Initially set from the [`TransportConfig::max_concurrent_multipath_paths`]. Even
279    /// when multipath is disabled this will be set to 1, it is not used in that case
280    /// though.
281    max_concurrent_paths: NonZeroU32,
282    /// Local maximum [`PathId`] to be used
283    ///
284    /// This is initially set to [`TransportConfig::get_initial_max_path_id`] when multipath
285    /// is negotiated, or to [`PathId::ZERO`] otherwise. This is essentially the value of
286    /// the highest MAX_PATH_ID frame sent.
287    ///
288    /// Any path with an ID equal or below this [`PathId`] is either:
289    ///
290    /// - Abandoned, if it is also in [`Connection::abandoned_paths`].
291    /// - Open, in this case it is present in [`Connection::paths`]
292    /// - Not yet opened, if it is in neither of these two places.
293    ///
294    /// Note that for not-yet-open there may or may not be any CIDs issued. See
295    /// [`Connection::max_path_id_with_cids`].
296    local_max_path_id: PathId,
297    /// Remote's maximum [`PathId`] to be used
298    ///
299    /// This is initially set to the peer's [`TransportParameters::initial_max_path_id`] when
300    /// multipath is negotiated, or to [`PathId::ZERO`] otherwise. A peer may increase this limit
301    /// by sending [`Frame::MaxPathId`] frames.
302    remote_max_path_id: PathId,
303    /// The greatest [`PathId`] we have issued CIDs for
304    ///
305    /// CIDs are only issued for `min(local_max_path_id, remote_max_path_id)`. It is not
306    /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
307    /// since they are issued asynchronously by the endpoint driver.
308    max_path_id_with_cids: PathId,
309    /// The paths already abandoned
310    ///
311    /// They may still have some state left in [`Connection::paths`] or
312    /// [`Connection::local_cid_state`] since some of this has to be kept around for some
313    /// time after a path is abandoned.
314    // TODO(flub): Make this a more efficient data structure.  Like ranges of abandoned
315    //    paths.  Or a set together with a minimum.  Or something.
316    abandoned_paths: FxHashSet<PathId>,
317
318    iroh_hp: iroh_hp::State,
319    qlog: QlogSink,
320}
321
322impl Connection {
323    pub(crate) fn new(
324        endpoint_config: Arc<EndpointConfig>,
325        config: Arc<TransportConfig>,
326        init_cid: ConnectionId,
327        loc_cid: ConnectionId,
328        rem_cid: ConnectionId,
329        remote: SocketAddr,
330        local_ip: Option<IpAddr>,
331        crypto: Box<dyn crypto::Session>,
332        cid_gen: &dyn ConnectionIdGenerator,
333        now: Instant,
334        version: u32,
335        allow_mtud: bool,
336        rng_seed: [u8; 32],
337        side_args: SideArgs,
338        qlog: QlogSink,
339    ) -> Self {
340        let pref_addr_cid = side_args.pref_addr_cid();
341        let path_validated = side_args.path_validated();
342        let connection_side = ConnectionSide::from(side_args);
343        let side = connection_side.side();
344        let mut rng = StdRng::from_seed(rng_seed);
345        let initial_space = {
346            let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
347            space.crypto = Some(crypto.initial_keys(init_cid, side));
348            space
349        };
350        let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
351        #[cfg(test)]
352        let data_space = match config.deterministic_packet_numbers {
353            true => PacketSpace::new_deterministic(now, SpaceId::Data),
354            false => PacketSpace::new(now, SpaceId::Data, &mut rng),
355        };
356        #[cfg(not(test))]
357        let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
358        let state = State::handshake(state::Handshake {
359            rem_cid_set: side.is_server(),
360            expected_token: Bytes::new(),
361            client_hello: None,
362            allow_server_migration: side.is_client(),
363        });
364        let local_cid_state = FxHashMap::from_iter([(
365            PathId::ZERO,
366            CidState::new(
367                cid_gen.cid_len(),
368                cid_gen.cid_lifetime(),
369                now,
370                if pref_addr_cid.is_some() { 2 } else { 1 },
371            ),
372        )]);
373
374        let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
375        // TODO(@divma): consider if we want to delay this until the path is validated
376        path.open = true;
377        let mut this = Self {
378            endpoint_config,
379            crypto,
380            handshake_cid: loc_cid,
381            rem_handshake_cid: rem_cid,
382            local_cid_state,
383            paths: BTreeMap::from_iter([(
384                PathId::ZERO,
385                PathState {
386                    data: path,
387                    prev: None,
388                },
389            )]),
390            path_counter: 0,
391            allow_mtud,
392            local_ip,
393            state,
394            side: connection_side,
395            zero_rtt_enabled: false,
396            zero_rtt_crypto: None,
397            key_phase: false,
398            // A small initial key phase size ensures peers that don't handle key updates correctly
399            // fail sooner rather than later. It's okay for both peers to do this, as the first one
400            // to perform an update will reset the other's key phase size in `update_keys`, and a
401            // simultaneous key update by both is just like a regular key update with a really fast
402            // response. Inspired by quic-go's similar behavior of performing the first key update
403            // at the 100th short-header packet.
404            key_phase_size: rng.random_range(10..1000),
405            peer_params: TransportParameters::default(),
406            orig_rem_cid: rem_cid,
407            initial_dst_cid: init_cid,
408            retry_src_cid: None,
409            events: VecDeque::new(),
410            endpoint_events: VecDeque::new(),
411            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
412            spin: false,
413            spaces: [initial_space, handshake_space, data_space],
414            highest_space: SpaceId::Initial,
415            prev_crypto: None,
416            next_crypto: None,
417            accepted_0rtt: false,
418            permit_idle_reset: true,
419            idle_timeout: match config.max_idle_timeout {
420                None | Some(VarInt(0)) => None,
421                Some(dur) => Some(Duration::from_millis(dur.0)),
422            },
423            timers: TimerTable::default(),
424            authentication_failures: 0,
425            close: false,
426
427            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
428                &TransportParameters::default(),
429            )),
430
431            app_limited: false,
432            receiving_ecn: false,
433            total_authed_packets: 0,
434
435            next_observed_addr_seq_no: 0u32.into(),
436
437            streams: StreamsState::new(
438                side,
439                config.max_concurrent_uni_streams,
440                config.max_concurrent_bidi_streams,
441                config.send_window,
442                config.receive_window,
443                config.stream_receive_window,
444            ),
445            datagrams: DatagramState::default(),
446            config,
447            rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
448            rng,
449            stats: ConnectionStats::default(),
450            path_stats: Default::default(),
451            version,
452
453            // peer params are not yet known, so multipath is not enabled
454            max_concurrent_paths: NonZeroU32::MIN,
455            local_max_path_id: PathId::ZERO,
456            remote_max_path_id: PathId::ZERO,
457            max_path_id_with_cids: PathId::ZERO,
458            abandoned_paths: Default::default(),
459
460            // iroh's nat traversal
461            iroh_hp: Default::default(),
462            qlog,
463        };
464        if path_validated {
465            this.on_path_validated(PathId::ZERO);
466        }
467        if side.is_client() {
468            // Kick off the connection
469            this.write_crypto();
470            this.init_0rtt(now);
471        }
472        this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
473        this
474    }
475
476    /// Returns the next time at which `handle_timeout` should be called
477    ///
478    /// The value returned may change after:
479    /// - the application performed some I/O on the connection
480    /// - a call was made to `handle_event`
481    /// - a call to `poll_transmit` returned `Some`
482    /// - a call was made to `handle_timeout`
483    #[must_use]
484    pub fn poll_timeout(&mut self) -> Option<Instant> {
485        self.timers.peek()
486    }
487
488    /// Returns application-facing events
489    ///
490    /// Connections should be polled for events after:
491    /// - a call was made to `handle_event`
492    /// - a call was made to `handle_timeout`
493    #[must_use]
494    pub fn poll(&mut self) -> Option<Event> {
495        if let Some(x) = self.events.pop_front() {
496            return Some(x);
497        }
498
499        if let Some(event) = self.streams.poll() {
500            return Some(Event::Stream(event));
501        }
502
503        if let Some(reason) = self.state.take_error() {
504            return Some(Event::ConnectionLost { reason });
505        }
506
507        None
508    }
509
510    /// Return endpoint-facing events
511    #[must_use]
512    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
513        self.endpoint_events.pop_front().map(EndpointEvent)
514    }
515
516    /// Provide control over streams
517    #[must_use]
518    pub fn streams(&mut self) -> Streams<'_> {
519        Streams {
520            state: &mut self.streams,
521            conn_state: &self.state,
522        }
523    }
524
525    /// Provide control over streams
526    #[must_use]
527    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
528        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
529        RecvStream {
530            id,
531            state: &mut self.streams,
532            pending: &mut self.spaces[SpaceId::Data].pending,
533        }
534    }
535
536    /// Provide control over streams
537    #[must_use]
538    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
539        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
540        SendStream {
541            id,
542            state: &mut self.streams,
543            pending: &mut self.spaces[SpaceId::Data].pending,
544            conn_state: &self.state,
545        }
546    }
547
548    /// Opens a new path only if no path to the remote address exists so far
549    ///
550    /// See [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id,
551    /// false)` if was opened.
552    ///
553    /// [`open_path`]: Connection::open_path
554    pub fn open_path_ensure(
555        &mut self,
556        remote: SocketAddr,
557        initial_status: PathStatus,
558        now: Instant,
559    ) -> Result<(PathId, bool), PathError> {
560        match self
561            .paths
562            .iter()
563            .find(|(_id, path)| path.data.remote == remote)
564        {
565            Some((path_id, _state)) => Ok((*path_id, true)),
566            None => self
567                .open_path(remote, initial_status, now)
568                .map(|id| (id, false)),
569        }
570    }
571
572    /// Opens a new path
573    ///
574    /// Further errors might occur and they will be emitted in [`PathEvent::LocallyClosed`] events.
575    /// When the path is opened it will be reported as an [`PathEvent::Opened`].
576    pub fn open_path(
577        &mut self,
578        remote: SocketAddr,
579        initial_status: PathStatus,
580        now: Instant,
581    ) -> Result<PathId, PathError> {
582        if !self.is_multipath_negotiated() {
583            return Err(PathError::MultipathNotNegotiated);
584        }
585        if self.side().is_server() {
586            return Err(PathError::ServerSideNotAllowed);
587        }
588
589        let max_abandoned = self.abandoned_paths.iter().max().copied();
590        let max_used = self.paths.keys().last().copied();
591        let path_id = max_abandoned
592            .max(max_used)
593            .unwrap_or(PathId::ZERO)
594            .saturating_add(1u8);
595
596        if Some(path_id) > self.max_path_id() {
597            return Err(PathError::MaxPathIdReached);
598        }
599        if path_id > self.remote_max_path_id {
600            self.spaces[SpaceId::Data].pending.paths_blocked = true;
601            return Err(PathError::MaxPathIdReached);
602        }
603        if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
604            self.spaces[SpaceId::Data]
605                .pending
606                .path_cids_blocked
607                .push(path_id);
608            return Err(PathError::RemoteCidsExhausted);
609        }
610
611        let path = self.ensure_path(path_id, remote, now, None);
612        path.status.local_update(initial_status);
613
614        Ok(path_id)
615    }
616
617    /// Closes a path by sending a PATH_ABANDON frame
618    ///
619    /// This will not allow closing the last path. It does allow closing paths which have
620    /// not yet been opened, as e.g. is the case when receiving a PATH_ABANDON from the peer
621    /// for a path that was never opened locally.
622    pub fn close_path(
623        &mut self,
624        now: Instant,
625        path_id: PathId,
626        error_code: VarInt,
627    ) -> Result<(), ClosePathError> {
628        if self.abandoned_paths.contains(&path_id)
629            || Some(path_id) > self.max_path_id()
630            || !self.paths.contains_key(&path_id)
631        {
632            return Err(ClosePathError::ClosedPath);
633        }
634        if self
635            .paths
636            .iter()
637            // Would there be any remaining, non-abandoned, validated paths
638            .any(|(id, path)| {
639                *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
640            })
641            .not()
642        {
643            return Err(ClosePathError::LastOpenPath);
644        }
645
646        // Send PATH_ABANDON
647        self.spaces[SpaceId::Data]
648            .pending
649            .path_abandon
650            .insert(path_id, error_code.into());
651
652        // Remove pending NEW CIDs for this path
653        let pending_space = &mut self.spaces[SpaceId::Data].pending;
654        pending_space.new_cids.retain(|cid| cid.path_id != path_id);
655        pending_space.path_cids_blocked.retain(|&id| id != path_id);
656        pending_space.path_status.retain(|&id| id != path_id);
657
658        // Cleanup retransmits across ALL paths (CIDs for path_id may have been transmitted on other paths)
659        for space in self.spaces[SpaceId::Data].iter_paths_mut() {
660            for sent_packet in space.sent_packets.values_mut() {
661                if let Some(retransmits) = sent_packet.retransmits.get_mut() {
662                    retransmits.new_cids.retain(|cid| cid.path_id != path_id);
663                    retransmits.path_cids_blocked.retain(|&id| id != path_id);
664                    retransmits.path_status.retain(|&id| id != path_id);
665                }
666            }
667        }
668
669        // Consider remotely issued CIDs as retired.
670        // Technically we don't have to do this just yet.  We only need to do this *after*
671        // the ABANDON_PATH frame is sent, allowing us to still send it on the
672        // to-be-abandoned path.  However it is recommended to send it on another path, and
673        // we do not allow abandoning the last path anyway.
674        self.rem_cids.remove(&path_id);
675        self.endpoint_events
676            .push_back(EndpointEventInner::RetireResetToken(path_id));
677
678        let pto = self.pto_max_path(SpaceId::Data);
679
680        let path = self.paths.get_mut(&path_id).expect("checked above");
681
682        // We record the time after which receiving data on this path generates a transport error.
683        path.data.last_allowed_receive = Some(now + 3 * pto);
684        self.abandoned_paths.insert(path_id);
685
686        self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
687
688        // The peer MUST respond with a corresponding PATH_ABANDON frame.
689        // If we receive packets on the abandoned path after 3 * PTO we trigger a transport error.
690        // In any case, we completely discard the path after 6 * PTO whether we receive a
691        // PATH_ABANDON or not.
692        self.timers.set(
693            Timer::PerPath(path_id, PathTimer::DiscardPath),
694            now + 6 * pto,
695            self.qlog.with_time(now),
696        );
697        Ok(())
698    }
699
700    /// Gets the [`PathData`] for a known [`PathId`].
701    ///
702    /// Will panic if the path_id does not reference any known path.
703    #[track_caller]
704    fn path_data(&self, path_id: PathId) -> &PathData {
705        if let Some(data) = self.paths.get(&path_id) {
706            &data.data
707        } else {
708            panic!(
709                "unknown path: {path_id}, currently known paths: {:?}",
710                self.paths.keys().collect::<Vec<_>>()
711            );
712        }
713    }
714
715    /// Gets a reference to the [`PathData`] for a [`PathId`]
716    fn path(&self, path_id: PathId) -> Option<&PathData> {
717        self.paths.get(&path_id).map(|path_state| &path_state.data)
718    }
719
720    /// Gets a mutable reference to the [`PathData`] for a [`PathId`]
721    fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
722        self.paths
723            .get_mut(&path_id)
724            .map(|path_state| &mut path_state.data)
725    }
726
727    /// Returns all known paths.
728    ///
729    /// There is no guarantee any of these paths are open or usable.
730    pub fn paths(&self) -> Vec<PathId> {
731        self.paths.keys().copied().collect()
732    }
733
734    /// Gets the local [`PathStatus`] for a known [`PathId`]
735    pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
736        self.path(path_id)
737            .map(PathData::local_status)
738            .ok_or(ClosedPath { _private: () })
739    }
740
741    /// Returns the path's remote socket address
742    pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
743        self.path(path_id)
744            .map(|path| path.remote)
745            .ok_or(ClosedPath { _private: () })
746    }
747
748    /// Sets the [`PathStatus`] for a known [`PathId`]
749    ///
750    /// Returns the previous path status on success.
751    pub fn set_path_status(
752        &mut self,
753        path_id: PathId,
754        status: PathStatus,
755    ) -> Result<PathStatus, SetPathStatusError> {
756        if !self.is_multipath_negotiated() {
757            return Err(SetPathStatusError::MultipathNotNegotiated);
758        }
759        let path = self
760            .path_mut(path_id)
761            .ok_or(SetPathStatusError::ClosedPath)?;
762        let prev = match path.status.local_update(status) {
763            Some(prev) => {
764                self.spaces[SpaceId::Data]
765                    .pending
766                    .path_status
767                    .insert(path_id);
768                prev
769            }
770            None => path.local_status(),
771        };
772        Ok(prev)
773    }
774
775    /// Returns the remote path status
776    // TODO(flub): Probably should also be some kind of path event?  Not even sure if I like
777    //    this as an API, but for now it allows me to write a test easily.
778    // TODO(flub): Technically this should be a Result<Option<PathSTatus>>?
779    pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
780        self.path(path_id).and_then(|path| path.remote_status())
781    }
782
783    /// Sets the max_idle_timeout for a specific path
784    ///
785    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
786    ///
787    /// Returns the previous value of the setting.
788    pub fn set_path_max_idle_timeout(
789        &mut self,
790        path_id: PathId,
791        timeout: Option<Duration>,
792    ) -> Result<Option<Duration>, ClosedPath> {
793        let path = self
794            .paths
795            .get_mut(&path_id)
796            .ok_or(ClosedPath { _private: () })?;
797        Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
798    }
799
800    /// Sets the keep_alive_interval for a specific path
801    ///
802    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
803    ///
804    /// Returns the previous value of the setting.
805    pub fn set_path_keep_alive_interval(
806        &mut self,
807        path_id: PathId,
808        interval: Option<Duration>,
809    ) -> Result<Option<Duration>, ClosedPath> {
810        let path = self
811            .paths
812            .get_mut(&path_id)
813            .ok_or(ClosedPath { _private: () })?;
814        Ok(std::mem::replace(&mut path.data.keep_alive, interval))
815    }
816
817    /// Gets the [`PathData`] for a known [`PathId`].
818    ///
819    /// Will panic if the path_id does not reference any known path.
820    #[track_caller]
821    fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
822        &mut self.paths.get_mut(&path_id).expect("known path").data
823    }
824
825    /// Check if the remote has been validated in any active path
826    fn is_remote_validated(&self, remote: SocketAddr) -> bool {
827        self.paths
828            .values()
829            .any(|path_state| path_state.data.remote == remote && path_state.data.validated)
830        // TODO(@divma): we might want to ensure the path has been recently active to consider the
831        // address validated
832    }
833
834    fn ensure_path(
835        &mut self,
836        path_id: PathId,
837        remote: SocketAddr,
838        now: Instant,
839        pn: Option<u64>,
840    ) -> &mut PathData {
841        let validated = self.is_remote_validated(remote);
842        let vacant_entry = match self.paths.entry(path_id) {
843            btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
844            btree_map::Entry::Occupied(occupied_entry) => {
845                return &mut occupied_entry.into_mut().data;
846            }
847        };
848
849        debug!(%validated, %path_id, ?remote, "path added");
850        let peer_max_udp_payload_size =
851            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
852        self.path_counter = self.path_counter.wrapping_add(1);
853        let mut data = PathData::new(
854            remote,
855            self.allow_mtud,
856            Some(peer_max_udp_payload_size),
857            self.path_counter,
858            now,
859            &self.config,
860        );
861
862        data.validated = validated;
863
864        let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
865        self.timers.set(
866            Timer::PerPath(path_id, PathTimer::PathOpen),
867            now + 3 * pto,
868            self.qlog.with_time(now),
869        );
870
871        // for the path to be opened we need to send a packet on the path. Sending a challenge
872        // guarantees this
873        data.send_new_challenge = true;
874
875        let path = vacant_entry.insert(PathState { data, prev: None });
876
877        let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
878        if let Some(pn) = pn {
879            pn_space.dedup.insert(pn);
880        }
881        self.spaces[SpaceId::Data]
882            .number_spaces
883            .insert(path_id, pn_space);
884        self.qlog.emit_tuple_assigned(path_id, remote, now);
885        &mut path.data
886    }
887
888    /// Returns packets to transmit
889    ///
890    /// Connections should be polled for transmit after:
891    /// - the application performed some I/O on the connection
892    /// - a call was made to `handle_event`
893    /// - a call was made to `handle_timeout`
894    ///
895    /// `max_datagrams` specifies how many datagrams can be returned inside a
896    /// single Transmit using GSO. This must be at least 1.
897    #[must_use]
898    pub fn poll_transmit(
899        &mut self,
900        now: Instant,
901        max_datagrams: usize,
902        buf: &mut Vec<u8>,
903    ) -> Option<Transmit> {
904        if let Some(probing) = self
905            .iroh_hp
906            .server_side_mut()
907            .ok()
908            .and_then(iroh_hp::ServerState::next_probe)
909        {
910            let destination = probing.remote();
911            trace!(%destination, "RAND_DATA packet");
912            let token: u64 = self.rng.random();
913            buf.put_u64(token);
914            probing.finish(token);
915            return Some(Transmit {
916                destination,
917                ecn: None,
918                size: 8,
919                segment_size: None,
920                src_ip: None,
921            });
922        }
923
924        assert!(max_datagrams != 0);
925        let max_datagrams = match self.config.enable_segmentation_offload {
926            false => 1,
927            true => max_datagrams,
928        };
929
930        // Each call to poll_transmit can only send datagrams to one destination, because
931        // all datagrams in a GSO batch are for the same destination.  Therefore only
932        // datagrams for one Path ID are produced for each poll_transmit call.
933
934        // TODO(flub): this is wishful thinking and not actually implemented, but perhaps it
935        //   should be:
936
937        // First, if we have to send a close, select a path for that.
938        // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending.
939
940        // For all, open, validated and AVAILABLE paths:
941        // - Is the path congestion blocked or pacing blocked?
942        // - call maybe_queue_ to ensure a tail-loss probe would be sent?
943        // - do we need to send a close message?
944        // - call can_send
945        // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths
946
947        // Check whether we need to send a close message
948        let close = match self.state.as_type() {
949            StateType::Drained => {
950                self.app_limited = true;
951                return None;
952            }
953            StateType::Draining | StateType::Closed => {
954                // self.close is only reset once the associated packet had been
955                // encoded successfully
956                if !self.close {
957                    self.app_limited = true;
958                    return None;
959                }
960                true
961            }
962            _ => false,
963        };
964
965        // Check whether we need to send an ACK_FREQUENCY frame
966        if let Some(config) = &self.config.ack_frequency_config {
967            let rtt = self
968                .paths
969                .values()
970                .map(|p| p.data.rtt.get())
971                .min()
972                .expect("one path exists");
973            self.spaces[SpaceId::Data].pending.ack_frequency = self
974                .ack_frequency
975                .should_send_ack_frequency(rtt, config, &self.peer_params)
976                && self.highest_space == SpaceId::Data
977                && self.peer_supports_ack_frequency();
978        }
979
980        // Whether this packet can be coalesced with another one in the same datagram.
981        let mut coalesce = true;
982
983        // Whether the last packet in the datagram must be padded so the datagram takes up
984        // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller.
985        let mut pad_datagram = PadDatagram::No;
986
987        // Whether congestion control stopped the next packet from being sent. Further
988        // packets could still be built, as e.g. tail-loss probes are not congestion
989        // limited.
990        let mut congestion_blocked = false;
991
992        // The packet number of the last built packet.
993        let mut last_packet_number = None;
994
995        let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
996
997        // If there is any open, validated and available path we only want to send frames to
998        // any backup path that must be sent on that backup path exclusively.
999        let have_available_path = self.paths.iter().any(|(id, path)| {
1000            path.data.validated
1001                && path.data.local_status() == PathStatus::Available
1002                && self.rem_cids.contains_key(id)
1003        });
1004
1005        // Setup for the first path_id
1006        let mut transmit = TransmitBuf::new(
1007            buf,
1008            max_datagrams,
1009            self.path_data(path_id).current_mtu().into(),
1010        );
1011        if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1012            return Some(challenge);
1013        }
1014        let mut space_id = match path_id {
1015            PathId::ZERO => SpaceId::Initial,
1016            _ => SpaceId::Data,
1017        };
1018
1019        loop {
1020            // check if there is at least one active CID to use for sending
1021            let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1022                let err = PathError::RemoteCidsExhausted;
1023                if !self.abandoned_paths.contains(&path_id) {
1024                    debug!(?err, %path_id, "no active CID for path");
1025                    self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1026                        id: path_id,
1027                        error: err,
1028                    }));
1029                    // Locally we should have refused to open this path, the remote should
1030                    // have given us CIDs for this path before opening it.  So we can always
1031                    // abandon this here.
1032                    self.close_path(
1033                        now,
1034                        path_id,
1035                        TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1036                    )
1037                    .ok();
1038                    self.spaces[SpaceId::Data]
1039                        .pending
1040                        .path_cids_blocked
1041                        .push(path_id);
1042                } else {
1043                    trace!(%path_id, "remote CIDs retired for abandoned path");
1044                }
1045
1046                match self.paths.keys().find(|&&next| next > path_id) {
1047                    Some(next_path_id) => {
1048                        // See if this next path can send anything.
1049                        path_id = *next_path_id;
1050                        space_id = SpaceId::Data;
1051
1052                        // update per path state
1053                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1054                        if let Some(challenge) =
1055                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1056                        {
1057                            return Some(challenge);
1058                        }
1059
1060                        continue;
1061                    }
1062                    None => {
1063                        // Nothing more to send.
1064                        trace!(
1065                            ?space_id,
1066                            %path_id,
1067                            "no CIDs to send on path, no more paths"
1068                        );
1069                        break;
1070                    }
1071                }
1072            };
1073
1074            // Determine if anything can be sent in this packet number space (SpaceId +
1075            // PathId).
1076            let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1077                // We are trying to coalesce another packet into this datagram.
1078                transmit.datagram_remaining_mut()
1079            } else {
1080                // A new datagram needs to be started.
1081                transmit.segment_size()
1082            };
1083            let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1084            let path_should_send = {
1085                let path_exclusive_only = space_id == SpaceId::Data
1086                    && have_available_path
1087                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1088                let path_should_send = if path_exclusive_only {
1089                    can_send.path_exclusive
1090                } else {
1091                    !can_send.is_empty()
1092                };
1093                let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1094                path_should_send || needs_loss_probe || can_send.close
1095            };
1096
1097            if !path_should_send && space_id < SpaceId::Data {
1098                if self.spaces[space_id].crypto.is_some() {
1099                    trace!(?space_id, %path_id, "nothing to send in space");
1100                }
1101                space_id = space_id.next();
1102                continue;
1103            }
1104
1105            let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1106                // Only check congestion control if a new datagram is needed.
1107                self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1108            } else {
1109                PathBlocked::No
1110            };
1111            if send_blocked != PathBlocked::No {
1112                trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1113                congestion_blocked = true;
1114            }
1115            if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1116                // Higher spaces might still have tail-loss probes to send, which are not
1117                // congestion blocked.
1118                space_id = space_id.next();
1119                continue;
1120            }
1121            if !path_should_send || send_blocked != PathBlocked::No {
1122                // Nothing more to send on this path, check the next path if possible.
1123
1124                // If there are any datagrams in the transmit, packets for another path can
1125                // not be built.
1126                if transmit.num_datagrams() > 0 {
1127                    break;
1128                }
1129
1130                match self.paths.keys().find(|&&next| next > path_id) {
1131                    Some(next_path_id) => {
1132                        // See if this next path can send anything.
1133                        trace!(
1134                            ?space_id,
1135                            %path_id,
1136                            %next_path_id,
1137                            "nothing to send on path"
1138                        );
1139                        path_id = *next_path_id;
1140                        space_id = SpaceId::Data;
1141
1142                        // update per path state
1143                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1144                        if let Some(challenge) =
1145                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1146                        {
1147                            return Some(challenge);
1148                        }
1149
1150                        continue;
1151                    }
1152                    None => {
1153                        // Nothing more to send.
1154                        trace!(
1155                            ?space_id,
1156                            %path_id,
1157                            next_path_id=?None::<PathId>,
1158                            "nothing to send on path"
1159                        );
1160                        break;
1161                    }
1162                }
1163            }
1164
1165            // If the datagram is full, we need to start a new one.
1166            if transmit.datagram_remaining_mut() == 0 {
1167                if transmit.num_datagrams() >= transmit.max_datagrams() {
1168                    // No more datagrams allowed
1169                    break;
1170                }
1171
1172                match self.spaces[space_id].for_path(path_id).loss_probes {
1173                    0 => transmit.start_new_datagram(),
1174                    _ => {
1175                        // We need something to send for a tail-loss probe.
1176                        let request_immediate_ack =
1177                            space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1178                        self.spaces[space_id].maybe_queue_probe(
1179                            path_id,
1180                            request_immediate_ack,
1181                            &self.streams,
1182                        );
1183
1184                        self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1185
1186                        // Clamp the datagram to at most the minimum MTU to ensure that loss
1187                        // probes can get through and enable recovery even if the path MTU
1188                        // has shrank unexpectedly.
1189                        transmit.start_new_datagram_with_size(std::cmp::min(
1190                            usize::from(INITIAL_MTU),
1191                            transmit.segment_size(),
1192                        ));
1193                    }
1194                }
1195                trace!(count = transmit.num_datagrams(), "new datagram started");
1196                coalesce = true;
1197                pad_datagram = PadDatagram::No;
1198            }
1199
1200            // If coalescing another packet into the existing datagram, there should
1201            // still be enough space for a whole packet.
1202            if transmit.datagram_start_offset() < transmit.len() {
1203                debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1204            }
1205
1206            //
1207            // From here on, we've determined that a packet will definitely be sent.
1208            //
1209
1210            if self.spaces[SpaceId::Initial].crypto.is_some()
1211                && space_id == SpaceId::Handshake
1212                && self.side.is_client()
1213            {
1214                // A client stops both sending and processing Initial packets when it
1215                // sends its first Handshake packet.
1216                self.discard_space(now, SpaceId::Initial);
1217            }
1218            if let Some(ref mut prev) = self.prev_crypto {
1219                prev.update_unacked = false;
1220            }
1221
1222            let mut qlog = QlogSentPacket::default();
1223            let mut builder = PacketBuilder::new(
1224                now,
1225                space_id,
1226                path_id,
1227                remote_cid,
1228                &mut transmit,
1229                can_send.other,
1230                self,
1231                &mut qlog,
1232            )?;
1233            last_packet_number = Some(builder.exact_number);
1234            coalesce = coalesce && !builder.short_header;
1235
1236            if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1237                // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1
1238                pad_datagram |= PadDatagram::ToMinMtu;
1239            }
1240            if space_id == SpaceId::Data && self.config.pad_to_mtu {
1241                pad_datagram |= PadDatagram::ToSegmentSize;
1242            }
1243
1244            if can_send.close {
1245                trace!("sending CONNECTION_CLOSE");
1246                // Encode ACKs before the ConnectionClose message, to give the receiver
1247                // a better approximate on what data has been processed. This is
1248                // especially important with ack delay, since the peer might not
1249                // have gotten any other ACK for the data earlier on.
1250                let mut sent_frames = SentFrames::default();
1251                let is_multipath_negotiated = self.is_multipath_negotiated();
1252                for path_id in self.spaces[space_id]
1253                    .number_spaces
1254                    .iter()
1255                    .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1256                    .map(|(&path_id, _)| path_id)
1257                    .collect::<Vec<_>>()
1258                {
1259                    Self::populate_acks(
1260                        now,
1261                        self.receiving_ecn,
1262                        &mut sent_frames,
1263                        path_id,
1264                        space_id,
1265                        &mut self.spaces[space_id],
1266                        is_multipath_negotiated,
1267                        &mut builder.frame_space_mut(),
1268                        &mut self.stats,
1269                        &mut qlog,
1270                    );
1271                }
1272
1273                // Since there only 64 ACK frames there will always be enough space
1274                // to encode the ConnectionClose frame too. However we still have the
1275                // check here to prevent crashes if something changes.
1276                debug_assert!(
1277                    builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1278                    "ACKs should leave space for ConnectionClose"
1279                );
1280                if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1281                    let max_frame_size = builder.frame_space_remaining();
1282                    match self.state.as_type() {
1283                        StateType::Closed => {
1284                            let reason: Close =
1285                                self.state.as_closed().expect("checked").clone().into();
1286                            if space_id == SpaceId::Data || reason.is_transport_layer() {
1287                                reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1288                                qlog.frame(&Frame::Close(reason));
1289                            } else {
1290                                let frame = frame::ConnectionClose {
1291                                    error_code: TransportErrorCode::APPLICATION_ERROR,
1292                                    frame_type: None,
1293                                    reason: Bytes::new(),
1294                                };
1295                                frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1296                                qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1297                            }
1298                        }
1299                        StateType::Draining => {
1300                            let frame = frame::ConnectionClose {
1301                                error_code: TransportErrorCode::NO_ERROR,
1302                                frame_type: None,
1303                                reason: Bytes::new(),
1304                            };
1305                            frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1306                            qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1307                        }
1308                        _ => unreachable!(
1309                            "tried to make a close packet when the connection wasn't closed"
1310                        ),
1311                    };
1312                }
1313                builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1314                if space_id == self.highest_space {
1315                    // Don't send another close packet. Even with multipath we only send
1316                    // CONNECTION_CLOSE on a single path since we expect our paths to work.
1317                    self.close = false;
1318                    // `CONNECTION_CLOSE` is the final packet
1319                    break;
1320                } else {
1321                    // Send a close frame in every possible space for robustness, per
1322                    // RFC9000 "Immediate Close during the Handshake". Don't bother trying
1323                    // to send anything else.
1324                    space_id = space_id.next();
1325                    continue;
1326                }
1327            }
1328
1329            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that
1330            // path validation can occur while the link is saturated.
1331            if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1332                let path = self.path_data_mut(path_id);
1333                if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1334                    // TODO(flub): We need to use the right CID!  We shouldn't use the same
1335                    //    CID as the current active one for the path.  Though see also
1336                    //    https://github.com/quinn-rs/quinn/issues/2184
1337                    let response = frame::PathResponse(token);
1338                    trace!(%response, "(off-path)");
1339                    builder.frame_space_mut().write(response);
1340                    qlog.frame(&Frame::PathResponse(response));
1341                    self.stats.frame_tx.path_response += 1;
1342                    builder.finish_and_track(
1343                        now,
1344                        self,
1345                        path_id,
1346                        SentFrames {
1347                            non_retransmits: true,
1348                            ..SentFrames::default()
1349                        },
1350                        PadDatagram::ToMinMtu,
1351                        qlog,
1352                    );
1353                    self.stats.udp_tx.on_sent(1, transmit.len());
1354                    return Some(Transmit {
1355                        destination: remote,
1356                        size: transmit.len(),
1357                        ecn: None,
1358                        segment_size: None,
1359                        src_ip: self.local_ip,
1360                    });
1361                }
1362            }
1363
1364            let sent_frames = {
1365                let path_exclusive_only = have_available_path
1366                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1367                let pn = builder.exact_number;
1368                self.populate_packet(
1369                    now,
1370                    space_id,
1371                    path_id,
1372                    path_exclusive_only,
1373                    &mut builder.frame_space_mut(),
1374                    pn,
1375                    &mut qlog,
1376                )
1377            };
1378
1379            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
1380            // any other reason, there is a bug which leads to one component announcing write
1381            // readiness while not writing any data. This degrades performance. The condition is
1382            // only checked if the full MTU is available and when potentially large fixed-size
1383            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
1384            // writing ACKs.
1385            debug_assert!(
1386                !(sent_frames.is_ack_only(&self.streams)
1387                    && !can_send.acks
1388                    && can_send.other
1389                    && builder.buf.segment_size()
1390                        == self.path_data(path_id).current_mtu() as usize
1391                    && self.datagrams.outgoing.is_empty()),
1392                "SendableFrames was {can_send:?}, but only ACKs have been written"
1393            );
1394            if sent_frames.requires_padding {
1395                pad_datagram |= PadDatagram::ToMinMtu;
1396            }
1397
1398            for (path_id, _pn) in sent_frames.largest_acked.iter() {
1399                self.spaces[space_id]
1400                    .for_path(*path_id)
1401                    .pending_acks
1402                    .acks_sent();
1403                self.timers.stop(
1404                    Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1405                    self.qlog.with_time(now),
1406                );
1407            }
1408
1409            // Now we need to finish the packet.  Before we do so we need to know if we will
1410            // be coalescing the next packet into this one, or will be ending the datagram
1411            // as well.  Because if this is the last packet in the datagram more padding
1412            // might be needed because of the packet type, or to fill the GSO segment size.
1413
1414            // Are we allowed to coalesce AND is there enough space for another *packet* in
1415            // this datagram AND is there another packet to send in this or the next space?
1416            if coalesce
1417                && builder
1418                    .buf
1419                    .datagram_remaining_mut()
1420                    .saturating_sub(builder.predict_packet_end())
1421                    > MIN_PACKET_SPACE
1422                && self
1423                    .next_send_space(space_id, path_id, builder.buf, close)
1424                    .is_some()
1425            {
1426                // We can append/coalesce the next packet into the current
1427                // datagram. Finish the current packet without adding extra padding.
1428                builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1429            } else {
1430                // We need a new datagram for the next packet.  Finish the current
1431                // packet with padding.
1432                if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1433                    // If too many padding bytes would be required to continue the
1434                    // GSO batch after this packet, end the GSO batch here. Ensures
1435                    // that fixed-size frames with heterogeneous sizes
1436                    // (e.g. application datagrams) won't inadvertently waste large
1437                    // amounts of bandwidth. The exact threshold is a bit arbitrary
1438                    // and might benefit from further tuning, though there's no
1439                    // universally optimal value.
1440                    const MAX_PADDING: usize = 32;
1441                    if builder.buf.datagram_remaining_mut()
1442                        > builder.predict_packet_end() + MAX_PADDING
1443                    {
1444                        trace!(
1445                            "GSO truncated by demand for {} padding bytes",
1446                            builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1447                        );
1448                        builder.finish_and_track(
1449                            now,
1450                            self,
1451                            path_id,
1452                            sent_frames,
1453                            PadDatagram::No,
1454                            qlog,
1455                        );
1456                        break;
1457                    }
1458
1459                    // Pad the current datagram to GSO segment size so it can be
1460                    // included in the GSO batch.
1461                    builder.finish_and_track(
1462                        now,
1463                        self,
1464                        path_id,
1465                        sent_frames,
1466                        PadDatagram::ToSegmentSize,
1467                        qlog,
1468                    );
1469                } else {
1470                    builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1471                }
1472                if transmit.num_datagrams() == 1 {
1473                    transmit.clip_datagram_size();
1474                }
1475            }
1476        }
1477
1478        if let Some(last_packet_number) = last_packet_number {
1479            // Note that when sending in multiple packet spaces the last packet number will
1480            // be the one from the highest packet space.
1481            self.path_data_mut(path_id).congestion.on_sent(
1482                now,
1483                transmit.len() as u64,
1484                last_packet_number,
1485            );
1486        }
1487
1488        self.qlog.emit_recovery_metrics(
1489            path_id,
1490            &mut self.paths.get_mut(&path_id).unwrap().data,
1491            now,
1492        );
1493
1494        self.app_limited = transmit.is_empty() && !congestion_blocked;
1495
1496        // Send MTU probe if necessary
1497        if transmit.is_empty() && self.state.is_established() {
1498            // MTU probing happens only in Data space.
1499            let space_id = SpaceId::Data;
1500            path_id = *self.paths.first_key_value().expect("one path must exist").0;
1501            let probe_data = loop {
1502                // We MTU probe all paths for which all of the following is true:
1503                // - We have an active destination CID for the path.
1504                // - The remote address *and* path are validated.
1505                // - The path is not abandoned.
1506                // - The MTU Discovery subsystem wants to probe the path.
1507                let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1508                let eligible = self.path_data(path_id).validated
1509                    && !self.path_data(path_id).is_validating_path()
1510                    && !self.abandoned_paths.contains(&path_id);
1511                let probe_size = eligible
1512                    .then(|| {
1513                        let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1514                        self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1515                    })
1516                    .flatten();
1517                match (active_cid, probe_size) {
1518                    (Some(active_cid), Some(probe_size)) => {
1519                        // Let's send an MTUD probe!
1520                        break Some((active_cid, probe_size));
1521                    }
1522                    _ => {
1523                        // Find the next path to check if it needs an MTUD probe.
1524                        match self.paths.keys().find(|&&next| next > path_id) {
1525                            Some(next) => {
1526                                path_id = *next;
1527                                continue;
1528                            }
1529                            None => break None,
1530                        }
1531                    }
1532                }
1533            };
1534            if let Some((active_cid, probe_size)) = probe_data {
1535                // We are definitely sending a DPLPMTUD probe.
1536                debug_assert_eq!(transmit.num_datagrams(), 0);
1537                transmit.start_new_datagram_with_size(probe_size as usize);
1538
1539                let mut qlog = QlogSentPacket::default();
1540                let mut builder = PacketBuilder::new(
1541                    now,
1542                    space_id,
1543                    path_id,
1544                    active_cid,
1545                    &mut transmit,
1546                    true,
1547                    self,
1548                    &mut qlog,
1549                )?;
1550
1551                // We implement MTU probes as ping packets padded up to the probe size
1552                trace!(?probe_size, "writing MTUD probe");
1553                trace!("PING");
1554                builder.frame_space_mut().write(frame::FrameType::PING);
1555                qlog.frame(&Frame::Ping);
1556                self.stats.frame_tx.ping += 1;
1557
1558                // If supported by the peer, we want no delays to the probe's ACK
1559                if self.peer_supports_ack_frequency() {
1560                    trace!("IMMEDIATE_ACK");
1561                    builder
1562                        .frame_space_mut()
1563                        .write(frame::FrameType::IMMEDIATE_ACK);
1564                    self.stats.frame_tx.immediate_ack += 1;
1565                    qlog.frame(&Frame::ImmediateAck);
1566                }
1567
1568                let sent_frames = SentFrames {
1569                    non_retransmits: true,
1570                    ..Default::default()
1571                };
1572                builder.finish_and_track(
1573                    now,
1574                    self,
1575                    path_id,
1576                    sent_frames,
1577                    PadDatagram::ToSize(probe_size),
1578                    qlog,
1579                );
1580
1581                self.path_stats
1582                    .entry(path_id)
1583                    .or_default()
1584                    .sent_plpmtud_probes += 1;
1585            }
1586        }
1587
1588        if transmit.is_empty() {
1589            return None;
1590        }
1591
1592        let destination = self.path_data(path_id).remote;
1593        trace!(
1594            segment_size = transmit.segment_size(),
1595            last_datagram_len = transmit.len() % transmit.segment_size(),
1596            ?destination,
1597            "sending {} bytes in {} datagrams",
1598            transmit.len(),
1599            transmit.num_datagrams()
1600        );
1601        self.path_data_mut(path_id)
1602            .inc_total_sent(transmit.len() as u64);
1603
1604        self.stats
1605            .udp_tx
1606            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1607
1608        Some(Transmit {
1609            destination,
1610            size: transmit.len(),
1611            ecn: if self.path_data(path_id).sending_ecn {
1612                Some(EcnCodepoint::Ect0)
1613            } else {
1614                None
1615            },
1616            segment_size: match transmit.num_datagrams() {
1617                1 => None,
1618                _ => Some(transmit.segment_size()),
1619            },
1620            src_ip: self.local_ip,
1621        })
1622    }
1623
1624    /// Returns the [`SpaceId`] of the next packet space which has data to send
1625    ///
1626    /// This takes into account the space available to frames in the next datagram.
1627    // TODO(flub): This duplication is not nice.
1628    fn next_send_space(
1629        &mut self,
1630        current_space_id: SpaceId,
1631        path_id: PathId,
1632        buf: &TransmitBuf<'_>,
1633        close: bool,
1634    ) -> Option<SpaceId> {
1635        // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed
1636        // to be able to send an individual frame at least this large in the next 1-RTT
1637        // packet. This could be generalized to support every space, but it's only needed to
1638        // handle large fixed-size frames, which only exist in 1-RTT (application
1639        // datagrams). We don't account for coalesced packets potentially occupying space
1640        // because frames can always spill into the next datagram.
1641        let mut space_id = current_space_id;
1642        loop {
1643            let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1644            if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1645                return Some(space_id);
1646            }
1647            space_id = match space_id {
1648                SpaceId::Initial => SpaceId::Handshake,
1649                SpaceId::Handshake => SpaceId::Data,
1650                SpaceId::Data => break,
1651            }
1652        }
1653        None
1654    }
1655
1656    /// Checks if creating a new datagram would be blocked by congestion control
1657    fn path_congestion_check(
1658        &mut self,
1659        space_id: SpaceId,
1660        path_id: PathId,
1661        transmit: &TransmitBuf<'_>,
1662        can_send: &SendableFrames,
1663        now: Instant,
1664    ) -> PathBlocked {
1665        // Anti-amplification is only based on `total_sent`, which gets updated after
1666        // the transmit is sent. Therefore we pass the amount of bytes for datagrams
1667        // that are already created, as well as 1 byte for starting another datagram. If
1668        // there is any anti-amplification budget left, we always allow a full MTU to be
1669        // sent (see https://github.com/quinn-rs/quinn/issues/1082).
1670        if self.side().is_server()
1671            && self
1672                .path_data(path_id)
1673                .anti_amplification_blocked(transmit.len() as u64 + 1)
1674        {
1675            trace!(?space_id, %path_id, "blocked by anti-amplification");
1676            return PathBlocked::AntiAmplification;
1677        }
1678
1679        // Congestion control check.
1680        // Tail loss probes must not be blocked by congestion, or a deadlock could arise.
1681        let bytes_to_send = transmit.segment_size() as u64;
1682        let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1683
1684        if can_send.other && !need_loss_probe && !can_send.close {
1685            let path = self.path_data(path_id);
1686            if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1687                trace!(?space_id, %path_id, "blocked by congestion control");
1688                return PathBlocked::Congestion;
1689            }
1690        }
1691
1692        // Pacing check.
1693        if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1694            self.timers.set(
1695                Timer::PerPath(path_id, PathTimer::Pacing),
1696                delay,
1697                self.qlog.with_time(now),
1698            );
1699            // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though
1700            // they are not congestion controlled.
1701            trace!(?space_id, %path_id, "blocked by pacing");
1702            return PathBlocked::Pacing;
1703        }
1704
1705        PathBlocked::No
1706    }
1707
1708    /// Send PATH_CHALLENGE for a previous path if necessary
1709    ///
1710    /// QUIC-TRANSPORT section 9.3.3
1711    /// <https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding>
1712    fn send_prev_path_challenge(
1713        &mut self,
1714        now: Instant,
1715        buf: &mut TransmitBuf<'_>,
1716        path_id: PathId,
1717    ) -> Option<Transmit> {
1718        let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1719        // TODO (matheus23): We could use !prev_path.is_validating() here instead to
1720        // (possibly) also re-send challenges when they get lost.
1721        if !prev_path.send_new_challenge {
1722            return None;
1723        };
1724        prev_path.send_new_challenge = false;
1725        let destination = prev_path.remote;
1726        let token = self.rng.random();
1727        let info = paths::SentChallengeInfo {
1728            sent_instant: now,
1729            remote: destination,
1730        };
1731        prev_path.challenges_sent.insert(token, info);
1732        debug_assert_eq!(
1733            self.highest_space,
1734            SpaceId::Data,
1735            "PATH_CHALLENGE queued without 1-RTT keys"
1736        );
1737        buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1738
1739        // Use the previous CID to avoid linking the new path with the previous path. We
1740        // don't bother accounting for possible retirement of that prev_cid because this is
1741        // sent once, immediately after migration, when the CID is known to be valid. Even
1742        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1743        // this is sent first.
1744        debug_assert_eq!(buf.datagram_start_offset(), 0);
1745        let mut qlog = QlogSentPacket::default();
1746        let mut builder = PacketBuilder::new(
1747            now,
1748            SpaceId::Data,
1749            path_id,
1750            *prev_cid,
1751            buf,
1752            false,
1753            self,
1754            &mut qlog,
1755        )?;
1756        let challenge = frame::PathChallenge(token);
1757        trace!(%challenge, "validating previous path");
1758        qlog.frame(&Frame::PathChallenge(challenge));
1759        builder.frame_space_mut().write(challenge);
1760        self.stats.frame_tx.path_challenge += 1;
1761
1762        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1763        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1764        // unless the anti-amplification limit for the path does not permit
1765        // sending a datagram of this size
1766        builder.pad_to(MIN_INITIAL_SIZE);
1767
1768        builder.finish(self, now, qlog);
1769        self.stats.udp_tx.on_sent(1, buf.len());
1770
1771        Some(Transmit {
1772            destination,
1773            size: buf.len(),
1774            ecn: None,
1775            segment_size: None,
1776            src_ip: self.local_ip,
1777        })
1778    }
1779
1780    /// Indicate what types of frames are ready to send for the given space
1781    ///
1782    /// *packet_size* is the number of bytes available to build the next packet.  *close*
1783    /// *indicates whether a CONNECTION_CLOSE frame needs to be sent.
1784    fn space_can_send(
1785        &mut self,
1786        space_id: SpaceId,
1787        path_id: PathId,
1788        packet_size: usize,
1789        close: bool,
1790    ) -> SendableFrames {
1791        let pn = self.spaces[SpaceId::Data]
1792            .for_path(path_id)
1793            .peek_tx_number();
1794        let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1795        if self.spaces[space_id].crypto.is_none()
1796            && (space_id != SpaceId::Data
1797                || self.zero_rtt_crypto.is_none()
1798                || self.side.is_server())
1799        {
1800            // No keys available for this space
1801            return SendableFrames::empty();
1802        }
1803        let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1804        if space_id == SpaceId::Data {
1805            can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1806        }
1807
1808        can_send.close = close && self.spaces[space_id].crypto.is_some();
1809
1810        can_send
1811    }
1812
1813    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1814    ///
1815    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1816    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1817    /// extracted through the relevant methods.
1818    pub fn handle_event(&mut self, event: ConnectionEvent) {
1819        use ConnectionEventInner::*;
1820        match event.0 {
1821            Datagram(DatagramConnectionEvent {
1822                now,
1823                remote,
1824                path_id,
1825                ecn,
1826                first_decode,
1827                remaining,
1828            }) => {
1829                let span = trace_span!("pkt", %path_id);
1830                let _guard = span.enter();
1831                // If this packet could initiate a migration and we're a client or a server that
1832                // forbids migration, drop the datagram. This could be relaxed to heuristically
1833                // permit NAT-rebinding-like migration.
1834                if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1835                    if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1836                        trace!(
1837                            %path_id,
1838                            ?remote,
1839                            path_remote = ?self.path(path_id).map(|p| p.remote),
1840                            "discarding packet from unrecognized peer"
1841                        );
1842                        return;
1843                    }
1844                }
1845
1846                let was_anti_amplification_blocked = self
1847                    .path(path_id)
1848                    .map(|path| path.anti_amplification_blocked(1))
1849                    .unwrap_or(true); // if we don't know about this path it's eagerly considered as unvalidated
1850                // TODO(@divma): revisit this
1851
1852                self.stats.udp_rx.datagrams += 1;
1853                self.stats.udp_rx.bytes += first_decode.len() as u64;
1854                let data_len = first_decode.len();
1855
1856                self.handle_decode(now, remote, path_id, ecn, first_decode);
1857                // The current `path` might have changed inside `handle_decode` since the packet
1858                // could have triggered a migration. The packet might also belong to an unknown
1859                // path and have been rejected. Make sure the data received is accounted for the
1860                // most recent path by accessing `path` after `handle_decode`.
1861                if let Some(path) = self.path_mut(path_id) {
1862                    path.inc_total_recvd(data_len as u64);
1863                }
1864
1865                if let Some(data) = remaining {
1866                    self.stats.udp_rx.bytes += data.len() as u64;
1867                    self.handle_coalesced(now, remote, path_id, ecn, data);
1868                }
1869
1870                if let Some(path) = self.paths.get_mut(&path_id) {
1871                    self.qlog
1872                        .emit_recovery_metrics(path_id, &mut path.data, now);
1873                }
1874
1875                if was_anti_amplification_blocked {
1876                    // A prior attempt to set the loss detection timer may have failed due to
1877                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1878                    // the server's first flight is lost.
1879                    self.set_loss_detection_timer(now, path_id);
1880                }
1881            }
1882            NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1883                let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1884                debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1885                let cid_state = self
1886                    .local_cid_state
1887                    .entry(path_id)
1888                    .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1889                cid_state.new_cids(&ids, now);
1890
1891                ids.into_iter().rev().for_each(|frame| {
1892                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1893                });
1894                // Always update Timer::PushNewCid
1895                self.reset_cid_retirement(now);
1896            }
1897        }
1898    }
1899
1900    /// Process timer expirations
1901    ///
1902    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1903    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1904    /// methods.
1905    ///
1906    /// It is most efficient to call this immediately after the system clock reaches the latest
1907    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1908    /// no-op and therefore are safe.
1909    pub fn handle_timeout(&mut self, now: Instant) {
1910        while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1911            // TODO(@divma): remove `at` when the unicorn is born
1912            trace!(?timer, at=?now, "timeout");
1913            match timer {
1914                Timer::Conn(timer) => match timer {
1915                    ConnTimer::Close => {
1916                        self.state.move_to_drained(None);
1917                        self.endpoint_events.push_back(EndpointEventInner::Drained);
1918                    }
1919                    ConnTimer::Idle => {
1920                        self.kill(ConnectionError::TimedOut);
1921                    }
1922                    ConnTimer::KeepAlive => {
1923                        trace!("sending keep-alive");
1924                        self.ping();
1925                    }
1926                    ConnTimer::KeyDiscard => {
1927                        self.zero_rtt_crypto = None;
1928                        self.prev_crypto = None;
1929                    }
1930                    ConnTimer::PushNewCid => {
1931                        while let Some((path_id, when)) = self.next_cid_retirement() {
1932                            if when > now {
1933                                break;
1934                            }
1935                            match self.local_cid_state.get_mut(&path_id) {
1936                                None => error!(%path_id, "No local CID state for path"),
1937                                Some(cid_state) => {
1938                                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1939                                    let num_new_cid = cid_state.on_cid_timeout().into();
1940                                    if !self.state.is_closed() {
1941                                        trace!(
1942                                            "push a new CID to peer RETIRE_PRIOR_TO field {}",
1943                                            cid_state.retire_prior_to()
1944                                        );
1945                                        self.endpoint_events.push_back(
1946                                            EndpointEventInner::NeedIdentifiers(
1947                                                path_id,
1948                                                now,
1949                                                num_new_cid,
1950                                            ),
1951                                        );
1952                                    }
1953                                }
1954                            }
1955                        }
1956                    }
1957                },
1958                // TODO: add path_id as span somehow
1959                Timer::PerPath(path_id, timer) => {
1960                    let span = trace_span!("per-path timer fired", %path_id, ?timer);
1961                    let _guard = span.enter();
1962                    match timer {
1963                        PathTimer::PathIdle => {
1964                            self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1965                                .ok();
1966                        }
1967
1968                        PathTimer::PathKeepAlive => {
1969                            trace!("sending keep-alive on path");
1970                            self.ping_path(path_id).ok();
1971                        }
1972                        PathTimer::LossDetection => {
1973                            self.on_loss_detection_timeout(now, path_id);
1974                            self.qlog.emit_recovery_metrics(
1975                                path_id,
1976                                &mut self.paths.get_mut(&path_id).unwrap().data,
1977                                now,
1978                            );
1979                        }
1980                        PathTimer::PathValidation => {
1981                            let Some(path) = self.paths.get_mut(&path_id) else {
1982                                continue;
1983                            };
1984                            self.timers.stop(
1985                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1986                                self.qlog.with_time(now),
1987                            );
1988                            debug!("path validation failed");
1989                            if let Some((_, prev)) = path.prev.take() {
1990                                path.data = prev;
1991                            }
1992                            path.data.challenges_sent.clear();
1993                            path.data.send_new_challenge = false;
1994                        }
1995                        PathTimer::PathChallengeLost => {
1996                            let Some(path) = self.paths.get_mut(&path_id) else {
1997                                continue;
1998                            };
1999                            trace!("path challenge deemed lost");
2000                            path.data.send_new_challenge = true;
2001                        }
2002                        PathTimer::PathOpen => {
2003                            let Some(path) = self.path_mut(path_id) else {
2004                                continue;
2005                            };
2006                            path.challenges_sent.clear();
2007                            path.send_new_challenge = false;
2008                            debug!("new path validation failed");
2009                            if let Err(err) = self.close_path(
2010                                now,
2011                                path_id,
2012                                TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2013                            ) {
2014                                warn!(?err, "failed closing path");
2015                            }
2016
2017                            self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2018                                id: path_id,
2019                                error: PathError::ValidationFailed,
2020                            }));
2021                        }
2022                        PathTimer::Pacing => trace!("pacing timer expired"),
2023                        PathTimer::MaxAckDelay => {
2024                            trace!("max ack delay reached");
2025                            // This timer is only armed in the Data space
2026                            self.spaces[SpaceId::Data]
2027                                .for_path(path_id)
2028                                .pending_acks
2029                                .on_max_ack_delay_timeout()
2030                        }
2031                        PathTimer::DiscardPath => {
2032                            // The path was abandoned and 3*PTO has expired since.  Clean up all
2033                            // remaining state and install stateless reset token.
2034                            self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2035                            if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2036                                let (min_seq, max_seq) = loc_cid_state.active_seq();
2037                                for seq in min_seq..=max_seq {
2038                                    self.endpoint_events.push_back(
2039                                        EndpointEventInner::RetireConnectionId(
2040                                            now, path_id, seq, false,
2041                                        ),
2042                                    );
2043                                }
2044                            }
2045                            self.discard_path(path_id, now);
2046                        }
2047                    }
2048                }
2049            }
2050        }
2051    }
2052
2053    /// Close a connection immediately
2054    ///
2055    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
2056    /// call this only when all important communications have been completed, e.g. by calling
2057    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
2058    /// [`StreamEvent::Finished`] event.
2059    ///
2060    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
2061    /// delivered. There may still be data from the peer that has not been received.
2062    ///
2063    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
2064    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2065        self.close_inner(
2066            now,
2067            Close::Application(frame::ApplicationClose { error_code, reason }),
2068        )
2069    }
2070
2071    fn close_inner(&mut self, now: Instant, reason: Close) {
2072        let was_closed = self.state.is_closed();
2073        if !was_closed {
2074            self.close_common();
2075            self.set_close_timer(now);
2076            self.close = true;
2077            self.state.move_to_closed_local(reason);
2078        }
2079    }
2080
2081    /// Control datagrams
2082    pub fn datagrams(&mut self) -> Datagrams<'_> {
2083        Datagrams { conn: self }
2084    }
2085
2086    /// Returns connection statistics
2087    pub fn stats(&mut self) -> ConnectionStats {
2088        self.stats.clone()
2089    }
2090
2091    /// Returns path statistics
2092    pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2093        let path = self.paths.get(&path_id)?;
2094        let stats = self.path_stats.entry(path_id).or_default();
2095        stats.rtt = path.data.rtt.get();
2096        stats.cwnd = path.data.congestion.window();
2097        stats.current_mtu = path.data.mtud.current_mtu();
2098        Some(*stats)
2099    }
2100
2101    /// Ping the remote endpoint
2102    ///
2103    /// Causes an ACK-eliciting packet to be transmitted on the connection.
2104    pub fn ping(&mut self) {
2105        // TODO(flub): This is very brute-force: it pings *all* the paths.  Instead it would
2106        //    be nice if we could only send a single packet for this.
2107        for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2108            path_data.ping_pending = true;
2109        }
2110    }
2111
2112    /// Ping the remote endpoint over a specific path
2113    ///
2114    /// Causes an ACK-eliciting packet to be transmitted on the path.
2115    pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2116        let path_data = self.spaces[self.highest_space]
2117            .number_spaces
2118            .get_mut(&path)
2119            .ok_or(ClosedPath { _private: () })?;
2120        path_data.ping_pending = true;
2121        Ok(())
2122    }
2123
2124    /// Update traffic keys spontaneously
2125    ///
2126    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
2127    pub fn force_key_update(&mut self) {
2128        if !self.state.is_established() {
2129            debug!("ignoring forced key update in illegal state");
2130            return;
2131        }
2132        if self.prev_crypto.is_some() {
2133            // We already just updated, or are currently updating, the keys. Concurrent key updates
2134            // are illegal.
2135            debug!("ignoring redundant forced key update");
2136            return;
2137        }
2138        self.update_keys(None, false);
2139    }
2140
2141    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
2142    #[doc(hidden)]
2143    #[deprecated]
2144    pub fn initiate_key_update(&mut self) {
2145        self.force_key_update();
2146    }
2147
2148    /// Get a session reference
2149    pub fn crypto_session(&self) -> &dyn crypto::Session {
2150        &*self.crypto
2151    }
2152
2153    /// Whether the connection is in the process of being established
2154    ///
2155    /// If this returns `false`, the connection may be either established or closed, signaled by the
2156    /// emission of a `Connected` or `ConnectionLost` message respectively.
2157    pub fn is_handshaking(&self) -> bool {
2158        self.state.is_handshake()
2159    }
2160
2161    /// Whether the connection is closed
2162    ///
2163    /// Closed connections cannot transport any further data. A connection becomes closed when
2164    /// either peer application intentionally closes it, or when either transport layer detects an
2165    /// error such as a time-out or certificate validation failure.
2166    ///
2167    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
2168    pub fn is_closed(&self) -> bool {
2169        self.state.is_closed()
2170    }
2171
2172    /// Whether there is no longer any need to keep the connection around
2173    ///
2174    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
2175    /// packets from the peer. All drained connections have been closed.
2176    pub fn is_drained(&self) -> bool {
2177        self.state.is_drained()
2178    }
2179
2180    /// For clients, if the peer accepted the 0-RTT data packets
2181    ///
2182    /// The value is meaningless until after the handshake completes.
2183    pub fn accepted_0rtt(&self) -> bool {
2184        self.accepted_0rtt
2185    }
2186
2187    /// Whether 0-RTT is/was possible during the handshake
2188    pub fn has_0rtt(&self) -> bool {
2189        self.zero_rtt_enabled
2190    }
2191
2192    /// Whether there are any pending retransmits
2193    pub fn has_pending_retransmits(&self) -> bool {
2194        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2195    }
2196
2197    /// Look up whether we're the client or server of this Connection
2198    pub fn side(&self) -> Side {
2199        self.side.side()
2200    }
2201
2202    /// Get the address observed by the remote over the given path
2203    pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2204        self.path(path_id)
2205            .map(|path_data| {
2206                path_data
2207                    .last_observed_addr_report
2208                    .as_ref()
2209                    .map(|observed| observed.socket_addr())
2210            })
2211            .ok_or(ClosedPath { _private: () })
2212    }
2213
2214    /// The local IP address which was used when the peer established
2215    /// the connection
2216    ///
2217    /// This can be different from the address the endpoint is bound to, in case
2218    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
2219    ///
2220    /// This will return `None` for clients, or when no `local_ip` was passed to
2221    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
2222    /// connection.
2223    pub fn local_ip(&self) -> Option<IpAddr> {
2224        self.local_ip
2225    }
2226
2227    /// Current best estimate of this connection's latency (round-trip-time)
2228    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2229        self.path(path_id).map(|d| d.rtt.get())
2230    }
2231
2232    /// Current state of this connection's congestion controller, for debugging purposes
2233    pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2234        self.path(path_id).map(|d| d.congestion.as_ref())
2235    }
2236
2237    /// Modify the number of remotely initiated streams that may be concurrently open
2238    ///
2239    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
2240    /// `count`s increase both minimum and worst-case memory consumption.
2241    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2242        self.streams.set_max_concurrent(dir, count);
2243        // If the limit was reduced, then a flow control update previously deemed insignificant may
2244        // now be significant.
2245        let pending = &mut self.spaces[SpaceId::Data].pending;
2246        self.streams.queue_max_stream_id(pending);
2247    }
2248
2249    /// Modify the number of open paths allowed when multipath is enabled
2250    ///
2251    /// When reducing the number of concurrent paths this will only affect delaying sending
2252    /// new MAX_PATH_ID frames until fewer than this number of paths are possible.  To
2253    /// actively reduce paths they must be closed using [`Connection::close_path`], which
2254    /// can also be used to close not-yet-opened paths.
2255    ///
2256    /// If multipath is not negotiated (see the [`TransportConfig`]) this can not enable
2257    /// multipath and will fail.
2258    pub fn set_max_concurrent_paths(
2259        &mut self,
2260        now: Instant,
2261        count: NonZeroU32,
2262    ) -> Result<(), MultipathNotNegotiated> {
2263        if !self.is_multipath_negotiated() {
2264            return Err(MultipathNotNegotiated { _private: () });
2265        }
2266        self.max_concurrent_paths = count;
2267
2268        let in_use_count = self
2269            .local_max_path_id
2270            .next()
2271            .saturating_sub(self.abandoned_paths.len() as u32)
2272            .as_u32();
2273        let extra_needed = count.get().saturating_sub(in_use_count);
2274        let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2275
2276        self.set_max_path_id(now, new_max_path_id);
2277
2278        Ok(())
2279    }
2280
2281    /// If needed, issues a new MAX_PATH_ID frame and new CIDs for any newly allowed paths
2282    fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2283        if max_path_id <= self.local_max_path_id {
2284            return;
2285        }
2286
2287        self.local_max_path_id = max_path_id;
2288        self.spaces[SpaceId::Data].pending.max_path_id = true;
2289
2290        self.issue_first_path_cids(now);
2291    }
2292
2293    /// Current number of remotely initiated streams that may be concurrently open
2294    ///
2295    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
2296    /// it will not change immediately, even if fewer streams are open. Instead, it will
2297    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
2298    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2299        self.streams.max_concurrent(dir)
2300    }
2301
2302    /// See [`TransportConfig::send_window()`]
2303    pub fn set_send_window(&mut self, send_window: u64) {
2304        self.streams.set_send_window(send_window);
2305    }
2306
2307    /// See [`TransportConfig::receive_window()`]
2308    pub fn set_receive_window(&mut self, receive_window: VarInt) {
2309        if self.streams.set_receive_window(receive_window) {
2310            self.spaces[SpaceId::Data].pending.max_data = true;
2311        }
2312    }
2313
2314    /// Whether the Multipath for QUIC extension is enabled.
2315    ///
2316    /// Multipath is only enabled after the handshake is completed and if it was enabled by both
2317    /// peers.
2318    pub fn is_multipath_negotiated(&self) -> bool {
2319        !self.is_handshaking()
2320            && self.config.max_concurrent_multipath_paths.is_some()
2321            && self.peer_params.initial_max_path_id.is_some()
2322    }
2323
2324    fn on_ack_received(
2325        &mut self,
2326        now: Instant,
2327        space: SpaceId,
2328        ack: frame::Ack,
2329    ) -> Result<(), TransportError> {
2330        // All ACKs are referencing path 0
2331        let path = PathId::ZERO;
2332        self.inner_on_ack_received(now, space, path, ack)
2333    }
2334
2335    fn on_path_ack_received(
2336        &mut self,
2337        now: Instant,
2338        space: SpaceId,
2339        path_ack: frame::PathAck,
2340    ) -> Result<(), TransportError> {
2341        let (ack, path) = path_ack.into_ack();
2342        self.inner_on_ack_received(now, space, path, ack)
2343    }
2344
2345    /// Handles an ACK frame acknowledging packets sent on *path*.
2346    fn inner_on_ack_received(
2347        &mut self,
2348        now: Instant,
2349        space: SpaceId,
2350        path: PathId,
2351        ack: frame::Ack,
2352    ) -> Result<(), TransportError> {
2353        if self.abandoned_paths.contains(&path) {
2354            // See also https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.4.3-3
2355            // > PATH_ACK frames received with an abandoned path ID are silently ignored, as specified in Section 4.
2356            trace!("silently ignoring PATH_ACK on abandoned path");
2357            return Ok(());
2358        }
2359        if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2360            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2361        }
2362        let new_largest = {
2363            let space = &mut self.spaces[space].for_path(path);
2364            if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2365                space.largest_acked_packet = Some(ack.largest);
2366                if let Some(info) = space.sent_packets.get(ack.largest) {
2367                    // This should always succeed, but a misbehaving peer might ACK a packet we
2368                    // haven't sent. At worst, that will result in us spuriously reducing the
2369                    // congestion window.
2370                    space.largest_acked_packet_sent = info.time_sent;
2371                }
2372                true
2373            } else {
2374                false
2375            }
2376        };
2377
2378        if self.detect_spurious_loss(&ack, space, path) {
2379            self.path_data_mut(path)
2380                .congestion
2381                .on_spurious_congestion_event();
2382        }
2383
2384        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
2385        let mut newly_acked = ArrayRangeSet::new();
2386        for range in ack.iter() {
2387            self.spaces[space].for_path(path).check_ack(range.clone())?;
2388            for (pn, _) in self.spaces[space]
2389                .for_path(path)
2390                .sent_packets
2391                .iter_range(range)
2392            {
2393                newly_acked.insert_one(pn);
2394            }
2395        }
2396
2397        if newly_acked.is_empty() {
2398            return Ok(());
2399        }
2400
2401        let mut ack_eliciting_acked = false;
2402        for packet in newly_acked.elts() {
2403            if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2404                for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2405                    // Assume ACKs for all packets below the largest acknowledged in
2406                    // `packet` have been received. This can cause the peer to spuriously
2407                    // retransmit if some of our earlier ACKs were lost, but allows for
2408                    // simpler state tracking. See discussion at
2409                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
2410                    if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2411                        pns.pending_acks.subtract_below(*acked_pn);
2412                    }
2413                }
2414                ack_eliciting_acked |= info.ack_eliciting;
2415
2416                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
2417                let path_data = self.path_data_mut(path);
2418                let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2419                if mtu_updated {
2420                    path_data
2421                        .congestion
2422                        .on_mtu_update(path_data.mtud.current_mtu());
2423                }
2424
2425                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
2426                self.ack_frequency.on_acked(path, packet);
2427
2428                self.on_packet_acked(now, path, info);
2429            }
2430        }
2431
2432        let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2433        let app_limited = self.app_limited;
2434        let path_data = self.path_data_mut(path);
2435        let in_flight = path_data.in_flight.bytes;
2436
2437        path_data
2438            .congestion
2439            .on_end_acks(now, in_flight, app_limited, largest_ackd);
2440
2441        if new_largest && ack_eliciting_acked {
2442            let ack_delay = if space != SpaceId::Data {
2443                Duration::from_micros(0)
2444            } else {
2445                cmp::min(
2446                    self.ack_frequency.peer_max_ack_delay,
2447                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2448                )
2449            };
2450            let rtt = now.saturating_duration_since(
2451                self.spaces[space].for_path(path).largest_acked_packet_sent,
2452            );
2453
2454            let next_pn = self.spaces[space].for_path(path).next_packet_number;
2455            let path_data = self.path_data_mut(path);
2456            // TODO(@divma): should be a method of path, should be contained in a single place
2457            path_data.rtt.update(ack_delay, rtt);
2458            if path_data.first_packet_after_rtt_sample.is_none() {
2459                path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2460            }
2461        }
2462
2463        // Must be called before crypto/pto_count are clobbered
2464        self.detect_lost_packets(now, space, path, true);
2465
2466        if self.peer_completed_address_validation(path) {
2467            self.path_data_mut(path).pto_count = 0;
2468        }
2469
2470        // Explicit congestion notification
2471        // TODO(@divma): this code is a good example of logic that should be contained in a single
2472        // place but it's split between the path data and the packet number space data, we should
2473        // find a way to make this work without two lookups
2474        if self.path_data(path).sending_ecn {
2475            if let Some(ecn) = ack.ecn {
2476                // We only examine ECN counters from ACKs that we are certain we received in transmit
2477                // order, allowing us to compute an increase in ECN counts to compare against the number
2478                // of newly acked packets that remains well-defined in the presence of arbitrary packet
2479                // reordering.
2480                if new_largest {
2481                    let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2482                    self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2483                }
2484            } else {
2485                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
2486                debug!("ECN not acknowledged by peer");
2487                self.path_data_mut(path).sending_ecn = false;
2488            }
2489        }
2490
2491        self.set_loss_detection_timer(now, path);
2492        Ok(())
2493    }
2494
2495    fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2496        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2497
2498        if lost_packets.is_empty() {
2499            return false;
2500        }
2501
2502        for range in ack.iter() {
2503            let spurious_losses: Vec<u64> = lost_packets
2504                .iter_range(range.clone())
2505                .map(|(pn, _info)| pn)
2506                .collect();
2507
2508            for pn in spurious_losses {
2509                lost_packets.remove(pn);
2510            }
2511        }
2512
2513        // If this ACK frame acknowledged all deemed lost packets,
2514        // then we have raised a spurious congestion event in the past.
2515        // We cannot conclude when there are remaining packets,
2516        // but future ACK frames might indicate a spurious loss detection.
2517        lost_packets.is_empty()
2518    }
2519
2520    /// Drain lost packets that we reasonably think will never arrive
2521    ///
2522    /// The current criterion is copied from `msquic`:
2523    /// discard packets that were sent earlier than 2 probe timeouts ago.
2524    fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2525        let two_pto = 2 * self.path_data(path).rtt.pto_base();
2526
2527        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2528        lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2529    }
2530
2531    /// Process a new ECN block from an in-order ACK
2532    fn process_ecn(
2533        &mut self,
2534        now: Instant,
2535        space: SpaceId,
2536        path: PathId,
2537        newly_acked: u64,
2538        ecn: frame::EcnCounts,
2539        largest_sent_time: Instant,
2540    ) {
2541        match self.spaces[space]
2542            .for_path(path)
2543            .detect_ecn(newly_acked, ecn)
2544        {
2545            Err(e) => {
2546                debug!("halting ECN due to verification failure: {}", e);
2547
2548                self.path_data_mut(path).sending_ecn = false;
2549                // Wipe out the existing value because it might be garbage and could interfere with
2550                // future attempts to use ECN on new paths.
2551                self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2552            }
2553            Ok(false) => {}
2554            Ok(true) => {
2555                self.path_stats.entry(path).or_default().congestion_events += 1;
2556                self.path_data_mut(path).congestion.on_congestion_event(
2557                    now,
2558                    largest_sent_time,
2559                    false,
2560                    true,
2561                    0,
2562                );
2563            }
2564        }
2565    }
2566
2567    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
2568    // high-latency handshakes
2569    fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2570        self.paths
2571            .get_mut(&path_id)
2572            .expect("known path")
2573            .remove_in_flight(&info);
2574        let app_limited = self.app_limited;
2575        let path = self.path_data_mut(path_id);
2576        if info.ack_eliciting && !path.is_validating_path() {
2577            // Only pass ACKs to the congestion controller if we are not validating the current
2578            // path, so as to ignore any ACKs from older paths still coming in.
2579            let rtt = path.rtt;
2580            path.congestion
2581                .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2582        }
2583
2584        // Update state for confirmed delivery of frames
2585        if let Some(retransmits) = info.retransmits.get() {
2586            for (id, _) in retransmits.reset_stream.iter() {
2587                self.streams.reset_acked(*id);
2588            }
2589        }
2590
2591        for frame in info.stream_frames {
2592            self.streams.received_ack_of(frame);
2593        }
2594    }
2595
2596    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2597        let start = if self.zero_rtt_crypto.is_some() {
2598            now
2599        } else {
2600            self.prev_crypto
2601                .as_ref()
2602                .expect("no previous keys")
2603                .end_packet
2604                .as_ref()
2605                .expect("update not acknowledged yet")
2606                .1
2607        };
2608
2609        // QUIC-MULTIPATH § 2.5 Key Phase Update Process: use largest PTO off all paths.
2610        self.timers.set(
2611            Timer::Conn(ConnTimer::KeyDiscard),
2612            start + self.pto_max_path(space) * 3,
2613            self.qlog.with_time(now),
2614        );
2615    }
2616
2617    /// Handle a [`PathTimer::LossDetection`] timeout.
2618    ///
2619    /// This timer expires for two reasons:
2620    /// - An ACK-eliciting packet we sent should be considered lost.
2621    /// - The PTO may have expired and a tail-loss probe needs to be scheduled.
2622    ///
2623    /// The former needs us to schedule re-transmission of the lost data.
2624    ///
2625    /// The latter means we have not received an ACK for an ack-eliciting packet we sent
2626    /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
2627    /// packet, to try and elicit new acknowledgements. These new acknowledgements will
2628    /// indicate whether the previously sent packets were lost or not.
2629    fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2630        if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2631            // Time threshold loss Detection
2632            self.detect_lost_packets(now, pn_space, path_id, false);
2633            self.set_loss_detection_timer(now, path_id);
2634            return;
2635        }
2636
2637        let (_, space) = match self.pto_time_and_space(now, path_id) {
2638            Some(x) => x,
2639            None => {
2640                error!(%path_id, "PTO expired while unset");
2641                return;
2642            }
2643        };
2644        trace!(
2645            in_flight = self.path_data(path_id).in_flight.bytes,
2646            count = self.path_data(path_id).pto_count,
2647            ?space,
2648            %path_id,
2649            "PTO fired"
2650        );
2651
2652        let count = match self.path_data(path_id).in_flight.ack_eliciting {
2653            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
2654            // deadlock preventions
2655            0 => {
2656                debug_assert!(!self.peer_completed_address_validation(path_id));
2657                1
2658            }
2659            // Conventional loss probe
2660            _ => 2,
2661        };
2662        let pns = self.spaces[space].for_path(path_id);
2663        pns.loss_probes = pns.loss_probes.saturating_add(count);
2664        let path_data = self.path_data_mut(path_id);
2665        path_data.pto_count = path_data.pto_count.saturating_add(1);
2666        self.set_loss_detection_timer(now, path_id);
2667    }
2668
2669    /// Detect any lost packets
2670    ///
2671    /// There are two cases in which we detects lost packets:
2672    ///
2673    /// - We received an ACK packet.
2674    /// - The [`PathTimer::LossDetection`] timer expired. So there is an un-acknowledged packet
2675    ///   that was followed by an acknowledged packet. The loss timer for this
2676    ///   un-acknowledged packet expired and we need to detect that packet as lost.
2677    ///
2678    /// Packets are lost if they are both (See RFC9002 §6.1):
2679    ///
2680    /// - Unacknowledged, in flight and sent prior to an acknowledged packet.
2681    /// - Old enough by either:
2682    ///   - Having a packet number [`TransportConfig::packet_threshold`] lower then the last
2683    ///     acknowledged packet.
2684    ///   - Being sent [`TransportConfig::time_threshold`] * RTT in the past.
2685    fn detect_lost_packets(
2686        &mut self,
2687        now: Instant,
2688        pn_space: SpaceId,
2689        path_id: PathId,
2690        due_to_ack: bool,
2691    ) {
2692        let mut lost_packets = Vec::<u64>::new();
2693        let mut lost_mtu_probe = None;
2694        let mut in_persistent_congestion = false;
2695        let mut size_of_lost_packets = 0u64;
2696        self.spaces[pn_space].for_path(path_id).loss_time = None;
2697
2698        // Find all the lost packets, populating all variables initialised above.
2699
2700        let path = self.path_data(path_id);
2701        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2702        let loss_delay = path
2703            .rtt
2704            .conservative()
2705            .mul_f32(self.config.time_threshold)
2706            .max(TIMER_GRANULARITY);
2707        let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2708
2709        let largest_acked_packet = self.spaces[pn_space]
2710            .for_path(path_id)
2711            .largest_acked_packet
2712            .expect("detect_lost_packets only to be called if path received at least one ACK");
2713        let packet_threshold = self.config.packet_threshold as u64;
2714
2715        // InPersistentCongestion: Determine if all packets in the time period before the newest
2716        // lost packet, including the edges, are marked lost. PTO computation must always
2717        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
2718        let congestion_period = self
2719            .pto(SpaceId::Data, path_id)
2720            .saturating_mul(self.config.persistent_congestion_threshold);
2721        let mut persistent_congestion_start: Option<Instant> = None;
2722        let mut prev_packet = None;
2723        let space = self.spaces[pn_space].for_path(path_id);
2724
2725        for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2726            if prev_packet != Some(packet.wrapping_sub(1)) {
2727                // An intervening packet was acknowledged
2728                persistent_congestion_start = None;
2729            }
2730
2731            // Packets sent before now - loss_delay are deemed lost.
2732            // However, we avoid subtraction as it can panic and there's no
2733            // saturating equivalent of this subtraction operation with a Duration.
2734            let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2735            if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2736                // The packet should be declared lost.
2737                if Some(packet) == in_flight_mtu_probe {
2738                    // Lost MTU probes are not included in `lost_packets`, because they
2739                    // should not trigger a congestion control response
2740                    lost_mtu_probe = in_flight_mtu_probe;
2741                } else {
2742                    lost_packets.push(packet);
2743                    size_of_lost_packets += info.size as u64;
2744                    if info.ack_eliciting && due_to_ack {
2745                        match persistent_congestion_start {
2746                            // Two ACK-eliciting packets lost more than
2747                            // congestion_period apart, with no ACKed packets in between
2748                            Some(start) if info.time_sent - start > congestion_period => {
2749                                in_persistent_congestion = true;
2750                            }
2751                            // Persistent congestion must start after the first RTT sample
2752                            None if first_packet_after_rtt_sample
2753                                .is_some_and(|x| x < (pn_space, packet)) =>
2754                            {
2755                                persistent_congestion_start = Some(info.time_sent);
2756                            }
2757                            _ => {}
2758                        }
2759                    }
2760                }
2761            } else {
2762                // The packet should not yet be declared lost.
2763                if space.loss_time.is_none() {
2764                    // Since we iterate in order the lowest packet number's loss time will
2765                    // always be the earliest.
2766                    space.loss_time = Some(info.time_sent + loss_delay);
2767                }
2768                persistent_congestion_start = None;
2769            }
2770
2771            prev_packet = Some(packet);
2772        }
2773
2774        self.handle_lost_packets(
2775            pn_space,
2776            path_id,
2777            now,
2778            lost_packets,
2779            lost_mtu_probe,
2780            loss_delay,
2781            in_persistent_congestion,
2782            size_of_lost_packets,
2783        );
2784    }
2785
2786    /// Drops the path state, declaring any remaining in-flight packets as lost
2787    fn discard_path(&mut self, path_id: PathId, now: Instant) {
2788        trace!(%path_id, "dropping path state");
2789        let path = self.path_data(path_id);
2790        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2791
2792        let mut size_of_lost_packets = 0u64; // add to path_stats.lost_bytes;
2793        let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2794            .for_path(path_id)
2795            .sent_packets
2796            .iter()
2797            .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2798            .map(|(pn, info)| {
2799                size_of_lost_packets += info.size as u64;
2800                pn
2801            })
2802            .collect();
2803
2804        if !lost_pns.is_empty() {
2805            trace!(
2806                %path_id,
2807                count = lost_pns.len(),
2808                lost_bytes = size_of_lost_packets,
2809                "packets lost on path abandon"
2810            );
2811            self.handle_lost_packets(
2812                SpaceId::Data,
2813                path_id,
2814                now,
2815                lost_pns,
2816                in_flight_mtu_probe,
2817                Duration::ZERO,
2818                false,
2819                size_of_lost_packets,
2820            );
2821        }
2822        self.paths.remove(&path_id);
2823        self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2824
2825        let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2826        self.events.push_back(
2827            PathEvent::Abandoned {
2828                id: path_id,
2829                path_stats,
2830            }
2831            .into(),
2832        );
2833    }
2834
2835    fn handle_lost_packets(
2836        &mut self,
2837        pn_space: SpaceId,
2838        path_id: PathId,
2839        now: Instant,
2840        lost_packets: Vec<u64>,
2841        lost_mtu_probe: Option<u64>,
2842        loss_delay: Duration,
2843        in_persistent_congestion: bool,
2844        size_of_lost_packets: u64,
2845    ) {
2846        debug_assert!(
2847            {
2848                let mut sorted = lost_packets.clone();
2849                sorted.sort();
2850                sorted == lost_packets
2851            },
2852            "lost_packets must be sorted"
2853        );
2854
2855        self.drain_lost_packets(now, pn_space, path_id);
2856
2857        // OnPacketsLost
2858        if let Some(largest_lost) = lost_packets.last().cloned() {
2859            let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2860            let largest_lost_sent = self.spaces[pn_space]
2861                .for_path(path_id)
2862                .sent_packets
2863                .get(largest_lost)
2864                .unwrap()
2865                .time_sent;
2866            let path_stats = self.path_stats.entry(path_id).or_default();
2867            path_stats.lost_packets += lost_packets.len() as u64;
2868            path_stats.lost_bytes += size_of_lost_packets;
2869            trace!(
2870                %path_id,
2871                count = lost_packets.len(),
2872                lost_bytes = size_of_lost_packets,
2873                "packets lost",
2874            );
2875
2876            for &packet in &lost_packets {
2877                let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2878                    continue;
2879                };
2880                self.qlog
2881                    .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2882                self.paths
2883                    .get_mut(&path_id)
2884                    .unwrap()
2885                    .remove_in_flight(&info);
2886
2887                for frame in info.stream_frames {
2888                    self.streams.retransmit(frame);
2889                }
2890                self.spaces[pn_space].pending |= info.retransmits;
2891                self.path_data_mut(path_id)
2892                    .mtud
2893                    .on_non_probe_lost(packet, info.size);
2894
2895                self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2896                    packet,
2897                    LostPacket {
2898                        time_sent: info.time_sent,
2899                    },
2900                );
2901            }
2902
2903            let path = self.path_data_mut(path_id);
2904            if path.mtud.black_hole_detected(now) {
2905                path.congestion.on_mtu_update(path.mtud.current_mtu());
2906                if let Some(max_datagram_size) = self.datagrams().max_size() {
2907                    self.datagrams.drop_oversized(max_datagram_size);
2908                }
2909                self.path_stats
2910                    .entry(path_id)
2911                    .or_default()
2912                    .black_holes_detected += 1;
2913            }
2914
2915            // Don't apply congestion penalty for lost ack-only packets
2916            let lost_ack_eliciting =
2917                old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2918
2919            if lost_ack_eliciting {
2920                self.path_stats
2921                    .entry(path_id)
2922                    .or_default()
2923                    .congestion_events += 1;
2924                self.path_data_mut(path_id).congestion.on_congestion_event(
2925                    now,
2926                    largest_lost_sent,
2927                    in_persistent_congestion,
2928                    false,
2929                    size_of_lost_packets,
2930                );
2931            }
2932        }
2933
2934        // Handle a lost MTU probe
2935        if let Some(packet) = lost_mtu_probe {
2936            let info = self.spaces[SpaceId::Data]
2937                .for_path(path_id)
2938                .take(packet)
2939                .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and
2940            // therefore must not have been removed yet
2941            self.paths
2942                .get_mut(&path_id)
2943                .unwrap()
2944                .remove_in_flight(&info);
2945            self.path_data_mut(path_id).mtud.on_probe_lost();
2946            self.path_stats
2947                .entry(path_id)
2948                .or_default()
2949                .lost_plpmtud_probes += 1;
2950        }
2951    }
2952
2953    /// Returns the earliest time packets should be declared lost for all spaces on a path.
2954    ///
2955    /// If a path has an acknowledged packet with any prior un-acknowledged packets, the
2956    /// earliest un-acknowledged packet can be declared lost after a timeout has elapsed.
2957    /// The time returned is when this packet should be declared lost.
2958    fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2959        SpaceId::iter()
2960            .filter_map(|id| {
2961                self.spaces[id]
2962                    .number_spaces
2963                    .get(&path_id)
2964                    .and_then(|pns| pns.loss_time)
2965                    .map(|time| (time, id))
2966            })
2967            .min_by_key(|&(time, _)| time)
2968    }
2969
2970    /// Returns the earliest next PTO should fire for all spaces on a path.
2971    fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2972        let path = self.path(path_id)?;
2973        let pto_count = path.pto_count;
2974        let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2975        let mut duration = path.rtt.pto_base() * backoff;
2976
2977        if path_id == PathId::ZERO
2978            && path.in_flight.ack_eliciting == 0
2979            && !self.peer_completed_address_validation(PathId::ZERO)
2980        {
2981            // Address Validation during Connection Establishment:
2982            // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
2983            // deadlock if an Initial or Handshake packet from the server is lost and the
2984            // server can not send more due to its anti-amplification limit the client must
2985            // send another packet on PTO.
2986            let space = match self.highest_space {
2987                SpaceId::Handshake => SpaceId::Handshake,
2988                _ => SpaceId::Initial,
2989            };
2990
2991            return Some((now + duration, space));
2992        }
2993
2994        let mut result = None;
2995        for space in SpaceId::iter() {
2996            let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2997                continue;
2998            };
2999
3000            if !pns.has_in_flight() {
3001                continue;
3002            }
3003            if space == SpaceId::Data {
3004                // Skip ApplicationData until handshake completes.
3005                if self.is_handshaking() {
3006                    return result;
3007                }
3008                // Include max_ack_delay and backoff for ApplicationData.
3009                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3010            }
3011            let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3012                continue;
3013            };
3014            let pto = last_ack_eliciting + duration;
3015            if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3016                if path.anti_amplification_blocked(1) {
3017                    // Nothing would be able to be sent.
3018                    continue;
3019                }
3020                if path.in_flight.ack_eliciting == 0 {
3021                    // Nothing ack-eliciting, no PTO to arm/fire.
3022                    continue;
3023                }
3024                result = Some((pto, space));
3025            }
3026        }
3027        result
3028    }
3029
3030    fn peer_completed_address_validation(&self, path: PathId) -> bool {
3031        // TODO(flub): This logic needs updating for multipath
3032        if self.side.is_server() || self.state.is_closed() {
3033            return true;
3034        }
3035        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
3036        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
3037        self.spaces[SpaceId::Handshake]
3038            .path_space(PathId::ZERO)
3039            .and_then(|pns| pns.largest_acked_packet)
3040            .is_some()
3041            || self.spaces[SpaceId::Data]
3042                .path_space(path)
3043                .and_then(|pns| pns.largest_acked_packet)
3044                .is_some()
3045            || (self.spaces[SpaceId::Data].crypto.is_some()
3046                && self.spaces[SpaceId::Handshake].crypto.is_none())
3047    }
3048
3049    /// Resets the the [`PathTimer::LossDetection`] timer to the next instant it may be needed
3050    ///
3051    /// The timer must fire if either:
3052    /// - An ack-eliciting packet we sent needs to be declared lost.
3053    /// - A tail-loss probe needs to be sent.
3054    ///
3055    /// See [`Connection::on_loss_detection_timeout`] for details.
3056    fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3057        if self.state.is_closed() {
3058            // No loss detection takes place on closed connections, and `close_common` already
3059            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
3060            // reordered packet being handled by state-insensitive code.
3061            return;
3062        }
3063
3064        if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3065            // Time threshold loss detection.
3066            self.timers.set(
3067                Timer::PerPath(path_id, PathTimer::LossDetection),
3068                loss_time,
3069                self.qlog.with_time(now),
3070            );
3071            return;
3072        }
3073
3074        // Determine which PN space to arm PTO for.
3075        // Calculate PTO duration
3076        if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3077            self.timers.set(
3078                Timer::PerPath(path_id, PathTimer::LossDetection),
3079                timeout,
3080                self.qlog.with_time(now),
3081            );
3082        } else {
3083            self.timers.stop(
3084                Timer::PerPath(path_id, PathTimer::LossDetection),
3085                self.qlog.with_time(now),
3086            );
3087        }
3088    }
3089
3090    /// The maximum probe timeout across all paths
3091    ///
3092    /// See [`Connection::pto`]
3093    fn pto_max_path(&self, space: SpaceId) -> Duration {
3094        match space {
3095            SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3096            SpaceId::Data => self
3097                .paths
3098                .keys()
3099                .map(|path_id| self.pto(space, *path_id))
3100                .max()
3101                .expect("there should be one at least path"),
3102        }
3103    }
3104
3105    /// Probe Timeout
3106    ///
3107    /// The PTO is logically the time in which you'd expect to receive an acknowledgement
3108    /// for a packet. So approximately RTT + max_ack_delay.
3109    fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3110        let max_ack_delay = match space {
3111            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3112            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3113        };
3114        self.path_data(path_id).rtt.pto_base() + max_ack_delay
3115    }
3116
3117    fn on_packet_authenticated(
3118        &mut self,
3119        now: Instant,
3120        space_id: SpaceId,
3121        path_id: PathId,
3122        ecn: Option<EcnCodepoint>,
3123        packet: Option<u64>,
3124        spin: bool,
3125        is_1rtt: bool,
3126    ) {
3127        self.total_authed_packets += 1;
3128        if let Some(last_allowed_receive) = self
3129            .paths
3130            .get(&path_id)
3131            .and_then(|path| path.data.last_allowed_receive)
3132        {
3133            if now > last_allowed_receive {
3134                warn!("received data on path which we abandoned more than 3 * PTO ago");
3135                // The peer failed to respond with a PATH_ABANDON in time.
3136                if !self.state.is_closed() {
3137                    // TODO(flub): What should the error code be?
3138                    self.state.move_to_closed(TransportError::NO_ERROR(
3139                        "peer failed to respond with PATH_ABANDON in time",
3140                    ));
3141                    self.close_common();
3142                    self.set_close_timer(now);
3143                    self.close = true;
3144                }
3145                return;
3146            }
3147        }
3148
3149        self.reset_keep_alive(path_id, now);
3150        self.reset_idle_timeout(now, space_id, path_id);
3151        self.permit_idle_reset = true;
3152        self.receiving_ecn |= ecn.is_some();
3153        if let Some(x) = ecn {
3154            let space = &mut self.spaces[space_id];
3155            space.for_path(path_id).ecn_counters += x;
3156
3157            if x.is_ce() {
3158                space
3159                    .for_path(path_id)
3160                    .pending_acks
3161                    .set_immediate_ack_required();
3162            }
3163        }
3164
3165        let packet = match packet {
3166            Some(x) => x,
3167            None => return,
3168        };
3169        match &self.side {
3170            ConnectionSide::Client { .. } => {
3171                // If we received a handshake packet that authenticated, then we're talking to
3172                // the real server.  From now on we should no longer allow the server to migrate
3173                // its address.
3174                if space_id == SpaceId::Handshake {
3175                    if let Some(hs) = self.state.as_handshake_mut() {
3176                        hs.allow_server_migration = false;
3177                    }
3178                }
3179            }
3180            ConnectionSide::Server { .. } => {
3181                if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3182                {
3183                    // A server stops sending and processing Initial packets when it receives its first Handshake packet.
3184                    self.discard_space(now, SpaceId::Initial);
3185                }
3186                if self.zero_rtt_crypto.is_some() && is_1rtt {
3187                    // Discard 0-RTT keys soon after receiving a 1-RTT packet
3188                    self.set_key_discard_timer(now, space_id)
3189                }
3190            }
3191        }
3192        let space = self.spaces[space_id].for_path(path_id);
3193        space.pending_acks.insert_one(packet, now);
3194        if packet >= space.rx_packet.unwrap_or_default() {
3195            space.rx_packet = Some(packet);
3196            // Update outgoing spin bit, inverting iff we're the client
3197            self.spin = self.side.is_client() ^ spin;
3198        }
3199    }
3200
3201    /// Resets the idle timeout timers
3202    ///
3203    /// Without multipath there is only the connection-wide idle timeout. When multipath is
3204    /// enabled there is an additional per-path idle timeout.
3205    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3206        // First reset the global idle timeout.
3207        if let Some(timeout) = self.idle_timeout {
3208            if self.state.is_closed() {
3209                self.timers
3210                    .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3211            } else {
3212                let dt = cmp::max(timeout, 3 * self.pto_max_path(space));
3213                self.timers.set(
3214                    Timer::Conn(ConnTimer::Idle),
3215                    now + dt,
3216                    self.qlog.with_time(now),
3217                );
3218            }
3219        }
3220
3221        // Now handle the per-path state
3222        if let Some(timeout) = self.path_data(path_id).idle_timeout {
3223            if self.state.is_closed() {
3224                self.timers.stop(
3225                    Timer::PerPath(path_id, PathTimer::PathIdle),
3226                    self.qlog.with_time(now),
3227                );
3228            } else {
3229                let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3230                self.timers.set(
3231                    Timer::PerPath(path_id, PathTimer::PathIdle),
3232                    now + dt,
3233                    self.qlog.with_time(now),
3234                );
3235            }
3236        }
3237    }
3238
3239    /// Resets both the [`ConnTimer::KeepAlive`] and [`PathTimer::PathKeepAlive`] timers
3240    fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3241        if !self.state.is_established() {
3242            return;
3243        }
3244
3245        if let Some(interval) = self.config.keep_alive_interval {
3246            self.timers.set(
3247                Timer::Conn(ConnTimer::KeepAlive),
3248                now + interval,
3249                self.qlog.with_time(now),
3250            );
3251        }
3252
3253        if let Some(interval) = self.path_data(path_id).keep_alive {
3254            self.timers.set(
3255                Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3256                now + interval,
3257                self.qlog.with_time(now),
3258            );
3259        }
3260    }
3261
3262    /// Sets the timer for when a previously issued CID should be retired next
3263    fn reset_cid_retirement(&mut self, now: Instant) {
3264        if let Some((_path, t)) = self.next_cid_retirement() {
3265            self.timers.set(
3266                Timer::Conn(ConnTimer::PushNewCid),
3267                t,
3268                self.qlog.with_time(now),
3269            );
3270        }
3271    }
3272
3273    /// The next time when a previously issued CID should be retired
3274    fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3275        self.local_cid_state
3276            .iter()
3277            .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3278            .min_by_key(|(_path_id, timeout)| *timeout)
3279    }
3280
3281    /// Handle the already-decrypted first packet from the client
3282    ///
3283    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
3284    /// efficient.
3285    pub(crate) fn handle_first_packet(
3286        &mut self,
3287        now: Instant,
3288        remote: SocketAddr,
3289        ecn: Option<EcnCodepoint>,
3290        packet_number: u64,
3291        packet: InitialPacket,
3292        remaining: Option<BytesMut>,
3293    ) -> Result<(), ConnectionError> {
3294        let span = trace_span!("first recv");
3295        let _guard = span.enter();
3296        debug_assert!(self.side.is_server());
3297        let len = packet.header_data.len() + packet.payload.len();
3298        let path_id = PathId::ZERO;
3299        self.path_data_mut(path_id).total_recvd = len as u64;
3300
3301        if let Some(hs) = self.state.as_handshake_mut() {
3302            hs.expected_token = packet.header.token.clone();
3303        } else {
3304            unreachable!("first packet must be delivered in Handshake state");
3305        }
3306
3307        // The first packet is always on PathId::ZERO
3308        self.on_packet_authenticated(
3309            now,
3310            SpaceId::Initial,
3311            path_id,
3312            ecn,
3313            Some(packet_number),
3314            false,
3315            false,
3316        );
3317
3318        let packet: Packet = packet.into();
3319
3320        let mut qlog = QlogRecvPacket::new(len);
3321        qlog.header(&packet.header, Some(packet_number), path_id);
3322
3323        self.process_decrypted_packet(
3324            now,
3325            remote,
3326            path_id,
3327            Some(packet_number),
3328            packet,
3329            &mut qlog,
3330        )?;
3331        self.qlog.emit_packet_received(qlog, now);
3332        if let Some(data) = remaining {
3333            self.handle_coalesced(now, remote, path_id, ecn, data);
3334        }
3335
3336        self.qlog.emit_recovery_metrics(
3337            path_id,
3338            &mut self.paths.get_mut(&path_id).unwrap().data,
3339            now,
3340        );
3341
3342        Ok(())
3343    }
3344
3345    fn init_0rtt(&mut self, now: Instant) {
3346        let (header, packet) = match self.crypto.early_crypto() {
3347            Some(x) => x,
3348            None => return,
3349        };
3350        if self.side.is_client() {
3351            match self.crypto.transport_parameters() {
3352                Ok(params) => {
3353                    let params = params
3354                        .expect("crypto layer didn't supply transport parameters with ticket");
3355                    // Certain values must not be cached
3356                    let params = TransportParameters {
3357                        initial_src_cid: None,
3358                        original_dst_cid: None,
3359                        preferred_address: None,
3360                        retry_src_cid: None,
3361                        stateless_reset_token: None,
3362                        min_ack_delay: None,
3363                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3364                        max_ack_delay: TransportParameters::default().max_ack_delay,
3365                        initial_max_path_id: None,
3366                        ..params
3367                    };
3368                    self.set_peer_params(params);
3369                    self.qlog.emit_peer_transport_params_restored(self, now);
3370                }
3371                Err(e) => {
3372                    error!("session ticket has malformed transport parameters: {}", e);
3373                    return;
3374                }
3375            }
3376        }
3377        trace!("0-RTT enabled");
3378        self.zero_rtt_enabled = true;
3379        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3380    }
3381
3382    fn read_crypto(
3383        &mut self,
3384        space: SpaceId,
3385        crypto: &frame::Crypto,
3386        payload_len: usize,
3387    ) -> Result<(), TransportError> {
3388        let expected = if !self.state.is_handshake() {
3389            SpaceId::Data
3390        } else if self.highest_space == SpaceId::Initial {
3391            SpaceId::Initial
3392        } else {
3393            // On the server, self.highest_space can be Data after receiving the client's first
3394            // flight, but we expect Handshake CRYPTO until the handshake is complete.
3395            SpaceId::Handshake
3396        };
3397        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
3398        // packets are illegal, and we don't process 1-RTT packets until the handshake is
3399        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
3400        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3401
3402        let end = crypto.offset + crypto.data.len() as u64;
3403        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3404            warn!(
3405                "received new {:?} CRYPTO data when expecting {:?}",
3406                space, expected
3407            );
3408            return Err(TransportError::PROTOCOL_VIOLATION(
3409                "new data at unexpected encryption level",
3410            ));
3411        }
3412
3413        let space = &mut self.spaces[space];
3414        let max = end.saturating_sub(space.crypto_stream.bytes_read());
3415        if max > self.config.crypto_buffer_size as u64 {
3416            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3417        }
3418
3419        space
3420            .crypto_stream
3421            .insert(crypto.offset, crypto.data.clone(), payload_len);
3422        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3423            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3424            if self.crypto.read_handshake(&chunk.bytes)? {
3425                self.events.push_back(Event::HandshakeDataReady);
3426            }
3427        }
3428
3429        Ok(())
3430    }
3431
3432    fn write_crypto(&mut self) {
3433        loop {
3434            let space = self.highest_space;
3435            let mut outgoing = Vec::new();
3436            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3437                match space {
3438                    SpaceId::Initial => {
3439                        self.upgrade_crypto(SpaceId::Handshake, crypto);
3440                    }
3441                    SpaceId::Handshake => {
3442                        self.upgrade_crypto(SpaceId::Data, crypto);
3443                    }
3444                    _ => unreachable!("got updated secrets during 1-RTT"),
3445                }
3446            }
3447            if outgoing.is_empty() {
3448                if space == self.highest_space {
3449                    break;
3450                } else {
3451                    // Keys updated, check for more data to send
3452                    continue;
3453                }
3454            }
3455            let offset = self.spaces[space].crypto_offset;
3456            let outgoing = Bytes::from(outgoing);
3457            if let Some(hs) = self.state.as_handshake_mut() {
3458                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3459                    hs.client_hello = Some(outgoing.clone());
3460                }
3461            }
3462            self.spaces[space].crypto_offset += outgoing.len() as u64;
3463            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3464            self.spaces[space].pending.crypto.push_back(frame::Crypto {
3465                offset,
3466                data: outgoing,
3467            });
3468        }
3469    }
3470
3471    /// Switch to stronger cryptography during handshake
3472    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3473        debug_assert!(
3474            self.spaces[space].crypto.is_none(),
3475            "already reached packet space {space:?}"
3476        );
3477        trace!("{:?} keys ready", space);
3478        if space == SpaceId::Data {
3479            // Precompute the first key update
3480            self.next_crypto = Some(
3481                self.crypto
3482                    .next_1rtt_keys()
3483                    .expect("handshake should be complete"),
3484            );
3485        }
3486
3487        self.spaces[space].crypto = Some(crypto);
3488        debug_assert!(space as usize > self.highest_space as usize);
3489        self.highest_space = space;
3490        if space == SpaceId::Data && self.side.is_client() {
3491            // Discard 0-RTT keys because 1-RTT keys are available.
3492            self.zero_rtt_crypto = None;
3493        }
3494    }
3495
3496    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3497        debug_assert!(space_id != SpaceId::Data);
3498        trace!("discarding {:?} keys", space_id);
3499        if space_id == SpaceId::Initial {
3500            // No longer needed
3501            if let ConnectionSide::Client { token, .. } = &mut self.side {
3502                *token = Bytes::new();
3503            }
3504        }
3505        let space = &mut self.spaces[space_id];
3506        space.crypto = None;
3507        let pns = space.for_path(PathId::ZERO);
3508        pns.time_of_last_ack_eliciting_packet = None;
3509        pns.loss_time = None;
3510        pns.loss_probes = 0;
3511        let sent_packets = mem::take(&mut pns.sent_packets);
3512        let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3513        for (_, packet) in sent_packets.into_iter() {
3514            path.data.remove_in_flight(&packet);
3515        }
3516
3517        self.set_loss_detection_timer(now, PathId::ZERO)
3518    }
3519
3520    fn handle_coalesced(
3521        &mut self,
3522        now: Instant,
3523        remote: SocketAddr,
3524        path_id: PathId,
3525        ecn: Option<EcnCodepoint>,
3526        data: BytesMut,
3527    ) {
3528        self.path_data_mut(path_id)
3529            .inc_total_recvd(data.len() as u64);
3530        let mut remaining = Some(data);
3531        let cid_len = self
3532            .local_cid_state
3533            .values()
3534            .map(|cid_state| cid_state.cid_len())
3535            .next()
3536            .expect("one cid_state must exist");
3537        while let Some(data) = remaining {
3538            match PartialDecode::new(
3539                data,
3540                &FixedLengthConnectionIdParser::new(cid_len),
3541                &[self.version],
3542                self.endpoint_config.grease_quic_bit,
3543            ) {
3544                Ok((partial_decode, rest)) => {
3545                    remaining = rest;
3546                    self.handle_decode(now, remote, path_id, ecn, partial_decode);
3547                }
3548                Err(e) => {
3549                    trace!("malformed header: {}", e);
3550                    return;
3551                }
3552            }
3553        }
3554    }
3555
3556    fn handle_decode(
3557        &mut self,
3558        now: Instant,
3559        remote: SocketAddr,
3560        path_id: PathId,
3561        ecn: Option<EcnCodepoint>,
3562        partial_decode: PartialDecode,
3563    ) {
3564        let qlog = QlogRecvPacket::new(partial_decode.len());
3565        if let Some(decoded) = packet_crypto::unprotect_header(
3566            partial_decode,
3567            &self.spaces,
3568            self.zero_rtt_crypto.as_ref(),
3569            self.peer_params.stateless_reset_token,
3570        ) {
3571            self.handle_packet(
3572                now,
3573                remote,
3574                path_id,
3575                ecn,
3576                decoded.packet,
3577                decoded.stateless_reset,
3578                qlog,
3579            );
3580        }
3581    }
3582
3583    fn handle_packet(
3584        &mut self,
3585        now: Instant,
3586        remote: SocketAddr,
3587        path_id: PathId,
3588        ecn: Option<EcnCodepoint>,
3589        packet: Option<Packet>,
3590        stateless_reset: bool,
3591        mut qlog: QlogRecvPacket,
3592    ) {
3593        self.stats.udp_rx.ios += 1;
3594        if let Some(ref packet) = packet {
3595            trace!(
3596                "got {:?} packet ({} bytes) from {} using id {}",
3597                packet.header.space(),
3598                packet.payload.len() + packet.header_data.len(),
3599                remote,
3600                packet.header.dst_cid(),
3601            );
3602        }
3603
3604        if self.is_handshaking() {
3605            if path_id != PathId::ZERO {
3606                debug!(%remote, %path_id, "discarding multipath packet during handshake");
3607                return;
3608            }
3609            if remote != self.path_data_mut(path_id).remote {
3610                if let Some(hs) = self.state.as_handshake() {
3611                    if hs.allow_server_migration {
3612                        trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3613                        self.path_data_mut(path_id).remote = remote;
3614                        self.qlog.emit_tuple_assigned(path_id, remote, now);
3615                    } else {
3616                        debug!("discarding packet with unexpected remote during handshake");
3617                        return;
3618                    }
3619                } else {
3620                    debug!("discarding packet with unexpected remote during handshake");
3621                    return;
3622                }
3623            }
3624        }
3625
3626        let was_closed = self.state.is_closed();
3627        let was_drained = self.state.is_drained();
3628
3629        let decrypted = match packet {
3630            None => Err(None),
3631            Some(mut packet) => self
3632                .decrypt_packet(now, path_id, &mut packet)
3633                .map(move |number| (packet, number)),
3634        };
3635        let result = match decrypted {
3636            _ if stateless_reset => {
3637                debug!("got stateless reset");
3638                Err(ConnectionError::Reset)
3639            }
3640            Err(Some(e)) => {
3641                warn!("illegal packet: {}", e);
3642                Err(e.into())
3643            }
3644            Err(None) => {
3645                debug!("failed to authenticate packet");
3646                self.authentication_failures += 1;
3647                let integrity_limit = self.spaces[self.highest_space]
3648                    .crypto
3649                    .as_ref()
3650                    .unwrap()
3651                    .packet
3652                    .local
3653                    .integrity_limit();
3654                if self.authentication_failures > integrity_limit {
3655                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3656                } else {
3657                    return;
3658                }
3659            }
3660            Ok((packet, number)) => {
3661                qlog.header(&packet.header, number, path_id);
3662                let span = match number {
3663                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3664                    None => trace_span!("recv", space = ?packet.header.space()),
3665                };
3666                let _guard = span.enter();
3667
3668                let dedup = self.spaces[packet.header.space()]
3669                    .path_space_mut(path_id)
3670                    .map(|pns| &mut pns.dedup);
3671                if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3672                    debug!("discarding possible duplicate packet");
3673                    self.qlog.emit_packet_received(qlog, now);
3674                    return;
3675                } else if self.state.is_handshake() && packet.header.is_short() {
3676                    // TODO: SHOULD buffer these to improve reordering tolerance.
3677                    trace!("dropping short packet during handshake");
3678                    self.qlog.emit_packet_received(qlog, now);
3679                    return;
3680                } else {
3681                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3682                        if let Some(hs) = self.state.as_handshake() {
3683                            if self.side.is_server() && token != &hs.expected_token {
3684                                // Clients must send the same retry token in every Initial. Initial
3685                                // packets can be spoofed, so we discard rather than killing the
3686                                // connection.
3687                                warn!("discarding Initial with invalid retry token");
3688                                self.qlog.emit_packet_received(qlog, now);
3689                                return;
3690                            }
3691                        }
3692                    }
3693
3694                    if !self.state.is_closed() {
3695                        let spin = match packet.header {
3696                            Header::Short { spin, .. } => spin,
3697                            _ => false,
3698                        };
3699
3700                        if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3701                            // Only the client is allowed to open paths
3702                            self.ensure_path(path_id, remote, now, number);
3703                        }
3704                        if self.paths.contains_key(&path_id) {
3705                            self.on_packet_authenticated(
3706                                now,
3707                                packet.header.space(),
3708                                path_id,
3709                                ecn,
3710                                number,
3711                                spin,
3712                                packet.header.is_1rtt(),
3713                            );
3714                        }
3715                    }
3716
3717                    let res = self
3718                        .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3719
3720                    self.qlog.emit_packet_received(qlog, now);
3721                    res
3722                }
3723            }
3724        };
3725
3726        // State transitions for error cases
3727        if let Err(conn_err) = result {
3728            match conn_err {
3729                ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3730                ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3731                ConnectionError::Reset
3732                | ConnectionError::TransportError(TransportError {
3733                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
3734                    ..
3735                }) => {
3736                    self.state.move_to_drained(Some(conn_err));
3737                }
3738                ConnectionError::TimedOut => {
3739                    unreachable!("timeouts aren't generated by packet processing");
3740                }
3741                ConnectionError::TransportError(err) => {
3742                    debug!("closing connection due to transport error: {}", err);
3743                    self.state.move_to_closed(err);
3744                }
3745                ConnectionError::VersionMismatch => {
3746                    self.state.move_to_draining(Some(conn_err));
3747                }
3748                ConnectionError::LocallyClosed => {
3749                    unreachable!("LocallyClosed isn't generated by packet processing");
3750                }
3751                ConnectionError::CidsExhausted => {
3752                    unreachable!("CidsExhausted isn't generated by packet processing");
3753                }
3754            };
3755        }
3756
3757        if !was_closed && self.state.is_closed() {
3758            self.close_common();
3759            if !self.state.is_drained() {
3760                self.set_close_timer(now);
3761            }
3762        }
3763        if !was_drained && self.state.is_drained() {
3764            self.endpoint_events.push_back(EndpointEventInner::Drained);
3765            // Close timer may have been started previously, e.g. if we sent a close and got a
3766            // stateless reset in response
3767            self.timers
3768                .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3769        }
3770
3771        // Transmit CONNECTION_CLOSE if necessary
3772        if matches!(self.state.as_type(), StateType::Closed) {
3773            // If there is no PathData for this PathId the packet was for a brand new
3774            // path. It was a valid packet however, so the remote is valid and we want to
3775            // send CONNECTION_CLOSE.
3776            let path_remote = self
3777                .paths
3778                .get(&path_id)
3779                .map(|p| p.data.remote)
3780                .unwrap_or(remote);
3781            self.close = remote == path_remote;
3782        }
3783    }
3784
3785    fn process_decrypted_packet(
3786        &mut self,
3787        now: Instant,
3788        remote: SocketAddr,
3789        path_id: PathId,
3790        number: Option<u64>,
3791        packet: Packet,
3792        qlog: &mut QlogRecvPacket,
3793    ) -> Result<(), ConnectionError> {
3794        if !self.paths.contains_key(&path_id) {
3795            // There is a chance this is a server side, first (for this path) packet, which would
3796            // be a protocol violation. It's more likely, however, that this is a packet of a
3797            // pruned path
3798            trace!(%path_id, ?number, "discarding packet for unknown path");
3799            return Ok(());
3800        }
3801        let state = match self.state.as_type() {
3802            StateType::Established => {
3803                match packet.header.space() {
3804                    SpaceId::Data => {
3805                        self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3806                    }
3807                    _ if packet.header.has_frames() => {
3808                        self.process_early_payload(now, path_id, packet, qlog)?
3809                    }
3810                    _ => {
3811                        trace!("discarding unexpected pre-handshake packet");
3812                    }
3813                }
3814                return Ok(());
3815            }
3816            StateType::Closed => {
3817                for result in frame::Iter::new(packet.payload.freeze())? {
3818                    let frame = match result {
3819                        Ok(frame) => frame,
3820                        Err(err) => {
3821                            debug!("frame decoding error: {err:?}");
3822                            continue;
3823                        }
3824                    };
3825                    qlog.frame(&frame);
3826
3827                    if let Frame::Padding = frame {
3828                        continue;
3829                    };
3830
3831                    self.stats.frame_rx.record(&frame);
3832
3833                    if let Frame::Close(_error) = frame {
3834                        trace!("draining");
3835                        self.state.move_to_draining(None);
3836                        break;
3837                    }
3838                }
3839                return Ok(());
3840            }
3841            StateType::Draining | StateType::Drained => return Ok(()),
3842            StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3843        };
3844
3845        match packet.header {
3846            Header::Retry {
3847                src_cid: rem_cid, ..
3848            } => {
3849                debug_assert_eq!(path_id, PathId::ZERO);
3850                if self.side.is_server() {
3851                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3852                }
3853
3854                let is_valid_retry = self
3855                    .rem_cids
3856                    .get(&path_id)
3857                    .map(|cids| cids.active())
3858                    .map(|orig_dst_cid| {
3859                        self.crypto.is_valid_retry(
3860                            orig_dst_cid,
3861                            &packet.header_data,
3862                            &packet.payload,
3863                        )
3864                    })
3865                    .unwrap_or_default();
3866                if self.total_authed_packets > 1
3867                            || packet.payload.len() <= 16 // token + 16 byte tag
3868                            || !is_valid_retry
3869                {
3870                    trace!("discarding invalid Retry");
3871                    // - After the client has received and processed an Initial or Retry
3872                    //   packet from the server, it MUST discard any subsequent Retry
3873                    //   packets that it receives.
3874                    // - A client MUST discard a Retry packet with a zero-length Retry Token
3875                    //   field.
3876                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
3877                    //   that cannot be validated
3878                    return Ok(());
3879                }
3880
3881                trace!("retrying with CID {}", rem_cid);
3882                let client_hello = state.client_hello.take().unwrap();
3883                self.retry_src_cid = Some(rem_cid);
3884                self.rem_cids
3885                    .get_mut(&path_id)
3886                    .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3887                    .update_initial_cid(rem_cid);
3888                self.rem_handshake_cid = rem_cid;
3889
3890                let space = &mut self.spaces[SpaceId::Initial];
3891                if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3892                    self.on_packet_acked(now, PathId::ZERO, info);
3893                };
3894
3895                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after
3896                // any retransmitted Initials
3897                self.spaces[SpaceId::Initial] = {
3898                    let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3899                    space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3900                    space.crypto_offset = client_hello.len() as u64;
3901                    space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3902                        .for_path(path_id)
3903                        .next_packet_number;
3904                    space.pending.crypto.push_back(frame::Crypto {
3905                        offset: 0,
3906                        data: client_hello,
3907                    });
3908                    space
3909                };
3910
3911                // Retransmit all 0-RTT data
3912                let zero_rtt = mem::take(
3913                    &mut self.spaces[SpaceId::Data]
3914                        .for_path(PathId::ZERO)
3915                        .sent_packets,
3916                );
3917                for (_, info) in zero_rtt.into_iter() {
3918                    self.paths
3919                        .get_mut(&PathId::ZERO)
3920                        .unwrap()
3921                        .remove_in_flight(&info);
3922                    self.spaces[SpaceId::Data].pending |= info.retransmits;
3923                }
3924                self.streams.retransmit_all_for_0rtt();
3925
3926                let token_len = packet.payload.len() - 16;
3927                let ConnectionSide::Client { ref mut token, .. } = self.side else {
3928                    unreachable!("we already short-circuited if we're server");
3929                };
3930                *token = packet.payload.freeze().split_to(token_len);
3931
3932                self.state = State::handshake(state::Handshake {
3933                    expected_token: Bytes::new(),
3934                    rem_cid_set: false,
3935                    client_hello: None,
3936                    allow_server_migration: true,
3937                });
3938                Ok(())
3939            }
3940            Header::Long {
3941                ty: LongType::Handshake,
3942                src_cid: rem_cid,
3943                dst_cid: loc_cid,
3944                ..
3945            } => {
3946                debug_assert_eq!(path_id, PathId::ZERO);
3947                if rem_cid != self.rem_handshake_cid {
3948                    debug!(
3949                        "discarding packet with mismatched remote CID: {} != {}",
3950                        self.rem_handshake_cid, rem_cid
3951                    );
3952                    return Ok(());
3953                }
3954                self.on_path_validated(path_id);
3955
3956                self.process_early_payload(now, path_id, packet, qlog)?;
3957                if self.state.is_closed() {
3958                    return Ok(());
3959                }
3960
3961                if self.crypto.is_handshaking() {
3962                    trace!("handshake ongoing");
3963                    return Ok(());
3964                }
3965
3966                if self.side.is_client() {
3967                    // Client-only because server params were set from the client's Initial
3968                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3969                        TransportError::new(
3970                            TransportErrorCode::crypto(0x6d),
3971                            "transport parameters missing".to_owned(),
3972                        )
3973                    })?;
3974
3975                    if self.has_0rtt() {
3976                        if !self.crypto.early_data_accepted().unwrap() {
3977                            debug_assert!(self.side.is_client());
3978                            debug!("0-RTT rejected");
3979                            self.accepted_0rtt = false;
3980                            self.streams.zero_rtt_rejected();
3981
3982                            // Discard already-queued frames
3983                            self.spaces[SpaceId::Data].pending = Retransmits::default();
3984
3985                            // Discard 0-RTT packets
3986                            let sent_packets = mem::take(
3987                                &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3988                            );
3989                            for (_, packet) in sent_packets.into_iter() {
3990                                self.paths
3991                                    .get_mut(&path_id)
3992                                    .unwrap()
3993                                    .remove_in_flight(&packet);
3994                            }
3995                        } else {
3996                            self.accepted_0rtt = true;
3997                            params.validate_resumption_from(&self.peer_params)?;
3998                        }
3999                    }
4000                    if let Some(token) = params.stateless_reset_token {
4001                        let remote = self.path_data(path_id).remote;
4002                        self.endpoint_events
4003                            .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4004                    }
4005                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4006                    self.issue_first_cids(now);
4007                } else {
4008                    // Server-only
4009                    self.spaces[SpaceId::Data].pending.handshake_done = true;
4010                    self.discard_space(now, SpaceId::Handshake);
4011                    self.events.push_back(Event::HandshakeConfirmed);
4012                    trace!("handshake confirmed");
4013                }
4014
4015                self.events.push_back(Event::Connected);
4016                self.state.move_to_established();
4017                trace!("established");
4018
4019                // Multipath can only be enabled after the state has reached Established.
4020                // So this can not happen any earlier.
4021                self.issue_first_path_cids(now);
4022                Ok(())
4023            }
4024            Header::Initial(InitialHeader {
4025                src_cid: rem_cid,
4026                dst_cid: loc_cid,
4027                ..
4028            }) => {
4029                debug_assert_eq!(path_id, PathId::ZERO);
4030                if !state.rem_cid_set {
4031                    trace!("switching remote CID to {}", rem_cid);
4032                    let mut state = state.clone();
4033                    self.rem_cids
4034                        .get_mut(&path_id)
4035                        .expect("PathId::ZERO not yet abandoned")
4036                        .update_initial_cid(rem_cid);
4037                    self.rem_handshake_cid = rem_cid;
4038                    self.orig_rem_cid = rem_cid;
4039                    state.rem_cid_set = true;
4040                    self.state.move_to_handshake(state);
4041                } else if rem_cid != self.rem_handshake_cid {
4042                    debug!(
4043                        "discarding packet with mismatched remote CID: {} != {}",
4044                        self.rem_handshake_cid, rem_cid
4045                    );
4046                    return Ok(());
4047                }
4048
4049                let starting_space = self.highest_space;
4050                self.process_early_payload(now, path_id, packet, qlog)?;
4051
4052                if self.side.is_server()
4053                    && starting_space == SpaceId::Initial
4054                    && self.highest_space != SpaceId::Initial
4055                {
4056                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4057                        TransportError::new(
4058                            TransportErrorCode::crypto(0x6d),
4059                            "transport parameters missing".to_owned(),
4060                        )
4061                    })?;
4062                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4063                    self.issue_first_cids(now);
4064                    self.init_0rtt(now);
4065                }
4066                Ok(())
4067            }
4068            Header::Long {
4069                ty: LongType::ZeroRtt,
4070                ..
4071            } => {
4072                self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4073                Ok(())
4074            }
4075            Header::VersionNegotiate { .. } => {
4076                if self.total_authed_packets > 1 {
4077                    return Ok(());
4078                }
4079                let supported = packet
4080                    .payload
4081                    .chunks(4)
4082                    .any(|x| match <[u8; 4]>::try_from(x) {
4083                        Ok(version) => self.version == u32::from_be_bytes(version),
4084                        Err(_) => false,
4085                    });
4086                if supported {
4087                    return Ok(());
4088                }
4089                debug!("remote doesn't support our version");
4090                Err(ConnectionError::VersionMismatch)
4091            }
4092            Header::Short { .. } => unreachable!(
4093                "short packets received during handshake are discarded in handle_packet"
4094            ),
4095        }
4096    }
4097
4098    /// Process an Initial or Handshake packet payload
4099    fn process_early_payload(
4100        &mut self,
4101        now: Instant,
4102        path_id: PathId,
4103        packet: Packet,
4104        #[allow(unused)] qlog: &mut QlogRecvPacket,
4105    ) -> Result<(), TransportError> {
4106        debug_assert_ne!(packet.header.space(), SpaceId::Data);
4107        debug_assert_eq!(path_id, PathId::ZERO);
4108        let payload_len = packet.payload.len();
4109        let mut ack_eliciting = false;
4110        for result in frame::Iter::new(packet.payload.freeze())? {
4111            let frame = result?;
4112            qlog.frame(&frame);
4113            let span = match frame {
4114                Frame::Padding => continue,
4115                _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4116            };
4117
4118            self.stats.frame_rx.record(&frame);
4119
4120            let _guard = span.as_ref().map(|x| x.enter());
4121            ack_eliciting |= frame.is_ack_eliciting();
4122
4123            // Process frames
4124            if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4125                return Err(TransportError::PROTOCOL_VIOLATION(
4126                    "illegal frame type in handshake",
4127                ));
4128            }
4129
4130            match frame {
4131                Frame::Padding | Frame::Ping => {}
4132                Frame::Crypto(frame) => {
4133                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
4134                }
4135                Frame::Ack(ack) => {
4136                    self.on_ack_received(now, packet.header.space(), ack)?;
4137                }
4138                Frame::PathAck(ack) => {
4139                    span.as_ref()
4140                        .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4141                    self.on_path_ack_received(now, packet.header.space(), ack)?;
4142                }
4143                Frame::Close(reason) => {
4144                    self.state.move_to_draining(Some(reason.into()));
4145                    return Ok(());
4146                }
4147                _ => {
4148                    let mut err =
4149                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4150                    err.frame = Some(frame.ty());
4151                    return Err(err);
4152                }
4153            }
4154        }
4155
4156        if ack_eliciting {
4157            // In the initial and handshake spaces, ACKs must be sent immediately
4158            self.spaces[packet.header.space()]
4159                .for_path(path_id)
4160                .pending_acks
4161                .set_immediate_ack_required();
4162        }
4163
4164        self.write_crypto();
4165        Ok(())
4166    }
4167
4168    /// Processes the packet payload, always in the data space.
4169    fn process_payload(
4170        &mut self,
4171        now: Instant,
4172        remote: SocketAddr,
4173        path_id: PathId,
4174        number: u64,
4175        packet: Packet,
4176        #[allow(unused)] qlog: &mut QlogRecvPacket,
4177    ) -> Result<(), TransportError> {
4178        let payload = packet.payload.freeze();
4179        let mut is_probing_packet = true;
4180        let mut close = None;
4181        let payload_len = payload.len();
4182        let mut ack_eliciting = false;
4183        // if this packet triggers a path migration and includes a observed address frame, it's
4184        // stored here
4185        let mut migration_observed_addr = None;
4186        for result in frame::Iter::new(payload)? {
4187            let frame = result?;
4188            qlog.frame(&frame);
4189            let span = match frame {
4190                Frame::Padding => continue,
4191                _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4192            };
4193
4194            self.stats.frame_rx.record(&frame);
4195            // Crypto, Stream and Datagram frames are special cased in order no pollute
4196            // the log with payload data
4197            match &frame {
4198                Frame::Crypto(f) => {
4199                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4200                }
4201                Frame::Stream(f) => {
4202                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4203                }
4204                Frame::Datagram(f) => {
4205                    trace!(len = f.data.len(), "got datagram frame");
4206                }
4207                f => {
4208                    trace!("got frame {:?}", f);
4209                }
4210            }
4211
4212            let _guard = span.enter();
4213            if packet.header.is_0rtt() {
4214                match frame {
4215                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4216                        return Err(TransportError::PROTOCOL_VIOLATION(
4217                            "illegal frame type in 0-RTT",
4218                        ));
4219                    }
4220                    _ => {
4221                        if frame.is_1rtt() {
4222                            return Err(TransportError::PROTOCOL_VIOLATION(
4223                                "illegal frame type in 0-RTT",
4224                            ));
4225                        }
4226                    }
4227                }
4228            }
4229            ack_eliciting |= frame.is_ack_eliciting();
4230
4231            // Check whether this could be a probing packet
4232            match frame {
4233                Frame::Padding
4234                | Frame::PathChallenge(_)
4235                | Frame::PathResponse(_)
4236                | Frame::NewConnectionId(_)
4237                | Frame::ObservedAddr(_) => {}
4238                _ => {
4239                    is_probing_packet = false;
4240                }
4241            }
4242
4243            match frame {
4244                Frame::Crypto(frame) => {
4245                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4246                }
4247                Frame::Stream(frame) => {
4248                    if self.streams.received(frame, payload_len)?.should_transmit() {
4249                        self.spaces[SpaceId::Data].pending.max_data = true;
4250                    }
4251                }
4252                Frame::Ack(ack) => {
4253                    self.on_ack_received(now, SpaceId::Data, ack)?;
4254                }
4255                Frame::PathAck(ack) => {
4256                    span.record("path", tracing::field::debug(&ack.path_id));
4257                    self.on_path_ack_received(now, SpaceId::Data, ack)?;
4258                }
4259                Frame::Padding | Frame::Ping => {}
4260                Frame::Close(reason) => {
4261                    close = Some(reason);
4262                }
4263                Frame::PathChallenge(challenge) => {
4264                    let path = &mut self
4265                        .path_mut(path_id)
4266                        .expect("payload is processed only after the path becomes known");
4267                    path.path_responses.push(number, challenge.0, remote);
4268                    if remote == path.remote {
4269                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
4270                        // attack. Send a non-probing packet to recover the active path.
4271                        // TODO(flub): No longer true! We now path_challege also to validate
4272                        //    the path if the path is new, without an RFC9000-style
4273                        //    migration involved. This means we add in an extra
4274                        //    IMMEDIATE_ACK on some challenges. It isn't really wrong to do
4275                        //    so, but it still is something untidy. We should instead
4276                        //    suppress this when we know the remote is still validating the
4277                        //    path.
4278                        match self.peer_supports_ack_frequency() {
4279                            true => self.immediate_ack(path_id),
4280                            false => {
4281                                self.ping_path(path_id).ok();
4282                            }
4283                        }
4284                    }
4285                }
4286                Frame::PathResponse(response) => {
4287                    let path = self
4288                        .paths
4289                        .get_mut(&path_id)
4290                        .expect("payload is processed only after the path becomes known");
4291
4292                    match path.data.challenges_sent.get(&response.0) {
4293                        // Response to an on-path PathChallenge
4294                        Some(info) if info.remote == remote && path.data.remote == remote => {
4295                            let sent_instant = info.sent_instant;
4296                            // TODO(@divma): reset timers using the remaining off-path challenges
4297                            self.timers.stop(
4298                                Timer::PerPath(path_id, PathTimer::PathValidation),
4299                                self.qlog.with_time(now),
4300                            );
4301                            self.timers.stop(
4302                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
4303                                self.qlog.with_time(now),
4304                            );
4305                            if !path.data.validated {
4306                                trace!("new path validated");
4307                            }
4308                            self.timers.stop(
4309                                Timer::PerPath(path_id, PathTimer::PathOpen),
4310                                self.qlog.with_time(now),
4311                            );
4312                            // Clear any other on-path sent challenge.
4313                            path.data
4314                                .challenges_sent
4315                                .retain(|_token, info| info.remote != remote);
4316                            path.data.send_new_challenge = false;
4317                            path.data.validated = true;
4318
4319                            // This RTT can only be used for the initial RTT, not as a normal
4320                            // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
4321                            let rtt = now.saturating_duration_since(sent_instant);
4322                            path.data.rtt.reset_initial_rtt(rtt);
4323
4324                            self.events
4325                                .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4326                            // mark the path as open from the application perspective now that Opened
4327                            // event has been queued
4328                            if !std::mem::replace(&mut path.data.open, true) {
4329                                trace!("path opened");
4330                                if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4331                                {
4332                                    self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4333                                        id: path_id,
4334                                        addr: observed.socket_addr(),
4335                                    }));
4336                                }
4337                            }
4338                            if let Some((_, ref mut prev)) = path.prev {
4339                                prev.challenges_sent.clear();
4340                                prev.send_new_challenge = false;
4341                            }
4342                        }
4343                        // Response to an off-path PathChallenge
4344                        Some(info) if info.remote == remote => {
4345                            debug!("Response to off-path PathChallenge!");
4346                            path.data
4347                                .challenges_sent
4348                                .retain(|_token, info| info.remote != remote);
4349                        }
4350                        // Response to a PathChallenge we recognize, but from an invalid remote
4351                        Some(info) => {
4352                            debug!(%response, from=%remote, expected=%info.remote, "ignoring invalid PATH_RESPONSE")
4353                        }
4354                        // Response to an unknown PathChallenge
4355                        None => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4356                    }
4357                }
4358                Frame::MaxData(bytes) => {
4359                    self.streams.received_max_data(bytes);
4360                }
4361                Frame::MaxStreamData { id, offset } => {
4362                    self.streams.received_max_stream_data(id, offset)?;
4363                }
4364                Frame::MaxStreams { dir, count } => {
4365                    self.streams.received_max_streams(dir, count)?;
4366                }
4367                Frame::ResetStream(frame) => {
4368                    if self.streams.received_reset(frame)?.should_transmit() {
4369                        self.spaces[SpaceId::Data].pending.max_data = true;
4370                    }
4371                }
4372                Frame::DataBlocked { offset } => {
4373                    debug!(offset, "peer claims to be blocked at connection level");
4374                }
4375                Frame::StreamDataBlocked { id, offset } => {
4376                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4377                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4378                        return Err(TransportError::STREAM_STATE_ERROR(
4379                            "STREAM_DATA_BLOCKED on send-only stream",
4380                        ));
4381                    }
4382                    debug!(
4383                        stream = %id,
4384                        offset, "peer claims to be blocked at stream level"
4385                    );
4386                }
4387                Frame::StreamsBlocked { dir, limit } => {
4388                    if limit > MAX_STREAM_COUNT {
4389                        return Err(TransportError::FRAME_ENCODING_ERROR(
4390                            "unrepresentable stream limit",
4391                        ));
4392                    }
4393                    debug!(
4394                        "peer claims to be blocked opening more than {} {} streams",
4395                        limit, dir
4396                    );
4397                }
4398                Frame::StopSending(frame::StopSending { id, error_code }) => {
4399                    if id.initiator() != self.side.side() {
4400                        if id.dir() == Dir::Uni {
4401                            debug!("got STOP_SENDING on recv-only {}", id);
4402                            return Err(TransportError::STREAM_STATE_ERROR(
4403                                "STOP_SENDING on recv-only stream",
4404                            ));
4405                        }
4406                    } else if self.streams.is_local_unopened(id) {
4407                        return Err(TransportError::STREAM_STATE_ERROR(
4408                            "STOP_SENDING on unopened stream",
4409                        ));
4410                    }
4411                    self.streams.received_stop_sending(id, error_code);
4412                }
4413                Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4414                    if let Some(ref path_id) = path_id {
4415                        span.record("path", tracing::field::debug(&path_id));
4416                    }
4417                    let path_id = path_id.unwrap_or_default();
4418                    match self.local_cid_state.get_mut(&path_id) {
4419                        None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4420                        Some(cid_state) => {
4421                            let allow_more_cids = cid_state
4422                                .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4423
4424                            // If the path has closed, we do not issue more CIDs for this path
4425                            // For details see  https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.2.2
4426                            // > an endpoint SHOULD provide new connection IDs for that path, if still open, using PATH_NEW_CONNECTION_ID frames.
4427                            let has_path = !self.abandoned_paths.contains(&path_id);
4428                            let allow_more_cids = allow_more_cids && has_path;
4429
4430                            self.endpoint_events
4431                                .push_back(EndpointEventInner::RetireConnectionId(
4432                                    now,
4433                                    path_id,
4434                                    sequence,
4435                                    allow_more_cids,
4436                                ));
4437                        }
4438                    }
4439                }
4440                Frame::NewConnectionId(frame) => {
4441                    let path_id = if let Some(path_id) = frame.path_id {
4442                        if !self.is_multipath_negotiated() {
4443                            return Err(TransportError::PROTOCOL_VIOLATION(
4444                                "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4445                            ));
4446                        }
4447                        if path_id > self.local_max_path_id {
4448                            return Err(TransportError::PROTOCOL_VIOLATION(
4449                                "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4450                            ));
4451                        }
4452                        path_id
4453                    } else {
4454                        PathId::ZERO
4455                    };
4456
4457                    if self.abandoned_paths.contains(&path_id) {
4458                        trace!("ignoring issued CID for abandoned path");
4459                        continue;
4460                    }
4461                    if let Some(ref path_id) = frame.path_id {
4462                        span.record("path", tracing::field::debug(&path_id));
4463                    }
4464                    let rem_cids = self
4465                        .rem_cids
4466                        .entry(path_id)
4467                        .or_insert_with(|| CidQueue::new(frame.id));
4468                    if rem_cids.active().is_empty() {
4469                        // TODO(@divma): is the entry removed later? (rem_cids.entry)
4470                        return Err(TransportError::PROTOCOL_VIOLATION(
4471                            "NEW_CONNECTION_ID when CIDs aren't in use",
4472                        ));
4473                    }
4474                    if frame.retire_prior_to > frame.sequence {
4475                        return Err(TransportError::PROTOCOL_VIOLATION(
4476                            "NEW_CONNECTION_ID retiring unissued CIDs",
4477                        ));
4478                    }
4479
4480                    use crate::cid_queue::InsertError;
4481                    match rem_cids.insert(frame) {
4482                        Ok(None) => {}
4483                        Ok(Some((retired, reset_token))) => {
4484                            let pending_retired =
4485                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
4486                            /// Ensure `pending_retired` cannot grow without bound. Limit is
4487                            /// somewhat arbitrary but very permissive.
4488                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4489                            // We don't bother counting in-flight frames because those are bounded
4490                            // by congestion control.
4491                            if (pending_retired.len() as u64)
4492                                .saturating_add(retired.end.saturating_sub(retired.start))
4493                                > MAX_PENDING_RETIRED_CIDS
4494                            {
4495                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4496                                    "queued too many retired CIDs",
4497                                ));
4498                            }
4499                            pending_retired.extend(retired.map(|seq| (path_id, seq)));
4500                            self.set_reset_token(path_id, remote, reset_token);
4501                        }
4502                        Err(InsertError::ExceedsLimit) => {
4503                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4504                        }
4505                        Err(InsertError::Retired) => {
4506                            trace!("discarding already-retired");
4507                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
4508                            // range of connection IDs larger than the active connection ID limit
4509                            // was retired all at once via retire_prior_to.
4510                            self.spaces[SpaceId::Data]
4511                                .pending
4512                                .retire_cids
4513                                .push((path_id, frame.sequence));
4514                            continue;
4515                        }
4516                    };
4517
4518                    if self.side.is_server()
4519                        && path_id == PathId::ZERO
4520                        && self
4521                            .rem_cids
4522                            .get(&PathId::ZERO)
4523                            .map(|cids| cids.active_seq() == 0)
4524                            .unwrap_or_default()
4525                    {
4526                        // We're a server still using the initial remote CID for the client, so
4527                        // let's switch immediately to enable clientside stateless resets.
4528                        self.update_rem_cid(PathId::ZERO);
4529                    }
4530                }
4531                Frame::NewToken(NewToken { token }) => {
4532                    let ConnectionSide::Client {
4533                        token_store,
4534                        server_name,
4535                        ..
4536                    } = &self.side
4537                    else {
4538                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4539                    };
4540                    if token.is_empty() {
4541                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4542                    }
4543                    trace!("got new token");
4544                    token_store.insert(server_name, token);
4545                }
4546                Frame::Datagram(datagram) => {
4547                    if self
4548                        .datagrams
4549                        .received(datagram, &self.config.datagram_receive_buffer_size)?
4550                    {
4551                        self.events.push_back(Event::DatagramReceived);
4552                    }
4553                }
4554                Frame::AckFrequency(ack_frequency) => {
4555                    // This frame can only be sent in the Data space
4556
4557                    if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4558                        // The AckFrequency frame is stale (we have already received a more
4559                        // recent one)
4560                        continue;
4561                    }
4562
4563                    // Update the params for all of our paths
4564                    for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4565                        space.pending_acks.set_ack_frequency_params(&ack_frequency);
4566
4567                        // Our `max_ack_delay` has been updated, so we may need to adjust
4568                        // its associated timeout
4569                        if let Some(timeout) = space
4570                            .pending_acks
4571                            .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4572                        {
4573                            self.timers.set(
4574                                Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4575                                timeout,
4576                                self.qlog.with_time(now),
4577                            );
4578                        }
4579                    }
4580                }
4581                Frame::ImmediateAck => {
4582                    // This frame can only be sent in the Data space
4583                    for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4584                        pns.pending_acks.set_immediate_ack_required();
4585                    }
4586                }
4587                Frame::HandshakeDone => {
4588                    if self.side.is_server() {
4589                        return Err(TransportError::PROTOCOL_VIOLATION(
4590                            "client sent HANDSHAKE_DONE",
4591                        ));
4592                    }
4593                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
4594                        self.discard_space(now, SpaceId::Handshake);
4595                    }
4596                    self.events.push_back(Event::HandshakeConfirmed);
4597                    trace!("handshake confirmed");
4598                }
4599                Frame::ObservedAddr(observed) => {
4600                    // check if params allows the peer to send report and this node to receive it
4601                    trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4602                    if !self
4603                        .peer_params
4604                        .address_discovery_role
4605                        .should_report(&self.config.address_discovery_role)
4606                    {
4607                        return Err(TransportError::PROTOCOL_VIOLATION(
4608                            "received OBSERVED_ADDRESS frame when not negotiated",
4609                        ));
4610                    }
4611                    // must only be sent in data space
4612                    if packet.header.space() != SpaceId::Data {
4613                        return Err(TransportError::PROTOCOL_VIOLATION(
4614                            "OBSERVED_ADDRESS frame outside data space",
4615                        ));
4616                    }
4617
4618                    let path = self.path_data_mut(path_id);
4619                    if remote == path.remote {
4620                        if let Some(updated) = path.update_observed_addr_report(observed) {
4621                            if path.open {
4622                                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4623                                    id: path_id,
4624                                    addr: updated,
4625                                }));
4626                            }
4627                            // otherwise the event is reported when the path is deemed open
4628                        }
4629                    } else {
4630                        // include in migration
4631                        migration_observed_addr = Some(observed)
4632                    }
4633                }
4634                Frame::PathAbandon(frame::PathAbandon {
4635                    path_id,
4636                    error_code,
4637                }) => {
4638                    span.record("path", tracing::field::debug(&path_id));
4639                    // TODO(flub): don't really know which error code to use here.
4640                    let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4641                        Ok(()) => {
4642                            trace!("peer abandoned path");
4643                            false
4644                        }
4645                        Err(ClosePathError::LastOpenPath) => {
4646                            trace!("peer abandoned last path, closing connection");
4647                            // TODO(flub): which error code?
4648                            return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4649                        }
4650                        Err(ClosePathError::ClosedPath) => {
4651                            trace!("peer abandoned already closed path");
4652                            true
4653                        }
4654                    };
4655                    // If we receive a retransmit of PATH_ABANDON then we may already have
4656                    // abandoned this path locally.  In that case the DiscardPath timer
4657                    // may already have fired and we no longer have any state for this path.
4658                    // Only set this timer if we still have path state.
4659                    if self.path(path_id).is_some() && !already_abandoned {
4660                        // TODO(flub): Checking is_some() here followed by a number of calls
4661                        //    that would panic if it was None is really ugly.  If only we
4662                        //    could do something like PathData::pto().  One day we'll have
4663                        //    unified SpaceId and PathId and this will be possible.
4664                        let delay = self.pto(SpaceId::Data, path_id) * 3;
4665                        self.timers.set(
4666                            Timer::PerPath(path_id, PathTimer::DiscardPath),
4667                            now + delay,
4668                            self.qlog.with_time(now),
4669                        );
4670                    }
4671                }
4672                Frame::PathStatusAvailable(info) => {
4673                    span.record("path", tracing::field::debug(&info.path_id));
4674                    if self.is_multipath_negotiated() {
4675                        self.on_path_status(
4676                            info.path_id,
4677                            PathStatus::Available,
4678                            info.status_seq_no,
4679                        );
4680                    } else {
4681                        return Err(TransportError::PROTOCOL_VIOLATION(
4682                            "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4683                        ));
4684                    }
4685                }
4686                Frame::PathStatusBackup(info) => {
4687                    span.record("path", tracing::field::debug(&info.path_id));
4688                    if self.is_multipath_negotiated() {
4689                        self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4690                    } else {
4691                        return Err(TransportError::PROTOCOL_VIOLATION(
4692                            "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4693                        ));
4694                    }
4695                }
4696                Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4697                    span.record("path", tracing::field::debug(&path_id));
4698                    if !self.is_multipath_negotiated() {
4699                        return Err(TransportError::PROTOCOL_VIOLATION(
4700                            "received MAX_PATH_ID frame when multipath was not negotiated",
4701                        ));
4702                    }
4703                    // frames that do not increase the path id are ignored
4704                    if path_id > self.remote_max_path_id {
4705                        self.remote_max_path_id = path_id;
4706                        self.issue_first_path_cids(now);
4707                    }
4708                }
4709                Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4710                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4711                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4712                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4713                    if self.is_multipath_negotiated() {
4714                        if self.local_max_path_id > max_path_id {
4715                            return Err(TransportError::PROTOCOL_VIOLATION(
4716                                "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4717                            ));
4718                        }
4719                        debug!("received PATHS_BLOCKED({:?})", max_path_id);
4720                        // TODO(@divma): ensure max concurrent paths
4721                    } else {
4722                        return Err(TransportError::PROTOCOL_VIOLATION(
4723                            "received PATHS_BLOCKED frame when not multipath was not negotiated",
4724                        ));
4725                    }
4726                }
4727                Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4728                    // Nothing to do.  This is recorded in the frame stats, but otherwise we
4729                    // always issue all CIDs we're allowed to issue, so either this is an
4730                    // impatient peer or a bug on our side.
4731
4732                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4733                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4734                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4735                    if self.is_multipath_negotiated() {
4736                        if path_id > self.local_max_path_id {
4737                            return Err(TransportError::PROTOCOL_VIOLATION(
4738                                "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4739                            ));
4740                        }
4741                        if next_seq.0
4742                            > self
4743                                .local_cid_state
4744                                .get(&path_id)
4745                                .map(|cid_state| cid_state.active_seq().1 + 1)
4746                                .unwrap_or_default()
4747                        {
4748                            return Err(TransportError::PROTOCOL_VIOLATION(
4749                                "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4750                            ));
4751                        }
4752                        debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4753                    } else {
4754                        return Err(TransportError::PROTOCOL_VIOLATION(
4755                            "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4756                        ));
4757                    }
4758                }
4759                Frame::AddAddress(addr) => {
4760                    let client_state = match self.iroh_hp.client_side_mut() {
4761                        Ok(state) => state,
4762                        Err(err) => {
4763                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4764                                "Nat traversal(ADD_ADDRESS): {err}"
4765                            )));
4766                        }
4767                    };
4768
4769                    if !client_state.check_remote_address(&addr) {
4770                        // if the address is not valid we flag it, but update anyway
4771                        warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4772                    }
4773
4774                    match client_state.add_remote_address(addr) {
4775                        Ok(maybe_added) => {
4776                            if let Some(added) = maybe_added {
4777                                self.events.push_back(Event::NatTraversal(
4778                                    iroh_hp::Event::AddressAdded(added),
4779                                ));
4780                            }
4781                        }
4782                        Err(e) => {
4783                            warn!(%e, "failed to add remote address")
4784                        }
4785                    }
4786                }
4787                Frame::RemoveAddress(addr) => {
4788                    let client_state = match self.iroh_hp.client_side_mut() {
4789                        Ok(state) => state,
4790                        Err(err) => {
4791                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4792                                "Nat traversal(REMOVE_ADDRESS): {err}"
4793                            )));
4794                        }
4795                    };
4796                    if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4797                        self.events
4798                            .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4799                                removed_addr,
4800                            )));
4801                    }
4802                }
4803                Frame::ReachOut(reach_out) => {
4804                    let server_state = match self.iroh_hp.server_side_mut() {
4805                        Ok(state) => state,
4806                        Err(err) => {
4807                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4808                                "Nat traversal(REACH_OUT): {err}"
4809                            )));
4810                        }
4811                    };
4812
4813                    if let Err(err) = server_state.handle_reach_out(reach_out) {
4814                        return Err(TransportError::PROTOCOL_VIOLATION(format!(
4815                            "Nat traversal(REACH_OUT): {err}"
4816                        )));
4817                    }
4818                }
4819            }
4820        }
4821
4822        let space = self.spaces[SpaceId::Data].for_path(path_id);
4823        if space
4824            .pending_acks
4825            .packet_received(now, number, ack_eliciting, &space.dedup)
4826        {
4827            if self.abandoned_paths.contains(&path_id) {
4828                // § 3.4.3 QUIC-MULTIPATH: promptly send ACKs for packets received from
4829                // abandoned paths.
4830                space.pending_acks.set_immediate_ack_required();
4831            } else {
4832                self.timers.set(
4833                    Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4834                    now + self.ack_frequency.max_ack_delay,
4835                    self.qlog.with_time(now),
4836                );
4837            }
4838        }
4839
4840        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
4841        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
4842        // are only freed, and hence only issue credit, once the application has been notified
4843        // during a read on the stream.
4844        let pending = &mut self.spaces[SpaceId::Data].pending;
4845        self.streams.queue_max_stream_id(pending);
4846
4847        if let Some(reason) = close {
4848            self.state.move_to_draining(Some(reason.into()));
4849            self.close = true;
4850        }
4851
4852        if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4853            && !is_probing_packet
4854            && remote != self.path_data(path_id).remote
4855        {
4856            let ConnectionSide::Server { ref server_config } = self.side else {
4857                panic!("packets from unknown remote should be dropped by clients");
4858            };
4859            debug_assert!(
4860                server_config.migration,
4861                "migration-initiating packets should have been dropped immediately"
4862            );
4863            self.migrate(path_id, now, remote, migration_observed_addr);
4864            // Break linkability, if possible
4865            self.update_rem_cid(path_id);
4866            self.spin = false;
4867        }
4868
4869        Ok(())
4870    }
4871
4872    fn migrate(
4873        &mut self,
4874        path_id: PathId,
4875        now: Instant,
4876        remote: SocketAddr,
4877        observed_addr: Option<ObservedAddr>,
4878    ) {
4879        trace!(%remote, %path_id, "migration initiated");
4880        self.path_counter = self.path_counter.wrapping_add(1);
4881        // TODO(@divma): conditions for path migration in multipath are very specific, check them
4882        // again to prevent path migrations that should actually create a new path
4883
4884        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
4885        // Note that the congestion window will not grow until validation terminates. Helps mitigate
4886        // amplification attacks performed by spoofing source addresses.
4887        let prev_pto = self.pto(SpaceId::Data, path_id);
4888        let known_path = self.paths.get_mut(&path_id).expect("known path");
4889        let path = &mut known_path.data;
4890        let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4891            PathData::from_previous(remote, path, self.path_counter, now)
4892        } else {
4893            let peer_max_udp_payload_size =
4894                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4895                    .unwrap_or(u16::MAX);
4896            PathData::new(
4897                remote,
4898                self.allow_mtud,
4899                Some(peer_max_udp_payload_size),
4900                self.path_counter,
4901                now,
4902                &self.config,
4903            )
4904        };
4905        new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4906        if let Some(report) = observed_addr {
4907            if let Some(updated) = new_path.update_observed_addr_report(report) {
4908                tracing::info!("adding observed addr event from migration");
4909                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4910                    id: path_id,
4911                    addr: updated,
4912                }));
4913            }
4914        }
4915        new_path.send_new_challenge = true;
4916
4917        let mut prev = mem::replace(path, new_path);
4918        // Don't clobber the original path if the previous one hasn't been validated yet
4919        if !prev.is_validating_path() {
4920            prev.send_new_challenge = true;
4921            // We haven't updated the remote CID yet, this captures the remote CID we were using on
4922            // the previous path.
4923
4924            known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4925        }
4926
4927        // We need to re-assign the correct remote to this path in qlog
4928        self.qlog.emit_tuple_assigned(path_id, remote, now);
4929
4930        self.timers.set(
4931            Timer::PerPath(path_id, PathTimer::PathValidation),
4932            now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4933            self.qlog.with_time(now),
4934        );
4935    }
4936
4937    /// Handle a change in the local address, i.e. an active migration
4938    pub fn local_address_changed(&mut self) {
4939        // TODO(flub): if multipath is enabled this needs to create a new path entirely.
4940        self.update_rem_cid(PathId::ZERO);
4941        self.ping();
4942    }
4943
4944    /// Switch to a previously unused remote connection ID, if possible
4945    fn update_rem_cid(&mut self, path_id: PathId) {
4946        let Some((reset_token, retired)) =
4947            self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4948        else {
4949            return;
4950        };
4951
4952        // Retire the current remote CID and any CIDs we had to skip.
4953        self.spaces[SpaceId::Data]
4954            .pending
4955            .retire_cids
4956            .extend(retired.map(|seq| (path_id, seq)));
4957        let remote = self.path_data(path_id).remote;
4958        self.set_reset_token(path_id, remote, reset_token);
4959    }
4960
4961    /// Sends this reset token to the endpoint
4962    ///
4963    /// The endpoint needs to know the reset tokens issued by the peer, so that if the peer
4964    /// sends a reset token it knows to route it to this connection. See RFC 9000 section
4965    /// 10.3. Stateless Reset.
4966    ///
4967    /// Reset tokens are different for each path, the endpoint identifies paths by peer
4968    /// socket address however, not by path ID.
4969    fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4970        self.endpoint_events
4971            .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4972
4973        // During the handshake the server sends a reset token in the transport
4974        // parameters. When we are the client and we receive the reset token during the
4975        // handshake we want this to affect our peer transport parameters.
4976        // TODO(flub): Pretty sure this is pointless, the entire params is overwritten
4977        //    shortly after this was called.  And then the params don't have this anymore.
4978        if path_id == PathId::ZERO {
4979            self.peer_params.stateless_reset_token = Some(reset_token);
4980        }
4981    }
4982
4983    /// Issue an initial set of connection IDs to the peer upon connection
4984    fn issue_first_cids(&mut self, now: Instant) {
4985        if self
4986            .local_cid_state
4987            .get(&PathId::ZERO)
4988            .expect("PathId::ZERO exists when the connection is created")
4989            .cid_len()
4990            == 0
4991        {
4992            return;
4993        }
4994
4995        // Subtract 1 to account for the CID we supplied while handshaking
4996        let mut n = self.peer_params.issue_cids_limit() - 1;
4997        if let ConnectionSide::Server { server_config } = &self.side {
4998            if server_config.has_preferred_address() {
4999                // We also sent a CID in the transport parameters
5000                n -= 1;
5001            }
5002        }
5003        self.endpoint_events
5004            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5005    }
5006
5007    /// Issues an initial set of CIDs for paths that have not yet had any CIDs issued
5008    ///
5009    /// Later CIDs are issued when CIDs expire or are retired by the peer.
5010    fn issue_first_path_cids(&mut self, now: Instant) {
5011        if let Some(max_path_id) = self.max_path_id() {
5012            let mut path_id = self.max_path_id_with_cids.next();
5013            while path_id <= max_path_id {
5014                self.endpoint_events
5015                    .push_back(EndpointEventInner::NeedIdentifiers(
5016                        path_id,
5017                        now,
5018                        self.peer_params.issue_cids_limit(),
5019                    ));
5020                path_id = path_id.next();
5021            }
5022            self.max_path_id_with_cids = max_path_id;
5023        }
5024    }
5025
5026    /// Populates a packet with frames
5027    ///
5028    /// This tries to fit as many frames as possible into the packet.
5029    ///
5030    /// *path_exclusive_only* means to only build frames which can only be sent on this
5031    /// *path.  This is used in multipath for backup paths while there is still an active
5032    /// *path.
5033    fn populate_packet(
5034        &mut self,
5035        now: Instant,
5036        space_id: SpaceId,
5037        path_id: PathId,
5038        path_exclusive_only: bool,
5039        buf: &mut impl BufMut,
5040        pn: u64,
5041        #[allow(unused)] qlog: &mut QlogSentPacket,
5042    ) -> SentFrames {
5043        let mut sent = SentFrames::default();
5044        let is_multipath_negotiated = self.is_multipath_negotiated();
5045        let space = &mut self.spaces[space_id];
5046        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5047        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5048        space
5049            .for_path(path_id)
5050            .pending_acks
5051            .maybe_ack_non_eliciting();
5052
5053        // HANDSHAKE_DONE
5054        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5055            trace!("HANDSHAKE_DONE");
5056            buf.write(frame::FrameType::HANDSHAKE_DONE);
5057            qlog.frame(&Frame::HandshakeDone);
5058            sent.retransmits.get_or_create().handshake_done = true;
5059            // This is just a u8 counter and the frame is typically just sent once
5060            self.stats.frame_tx.handshake_done =
5061                self.stats.frame_tx.handshake_done.saturating_add(1);
5062        }
5063
5064        // REACH_OUT
5065        // TODO(@divma): path explusive considerations
5066        if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5067            while let Some(local_addr) = addresses.pop() {
5068                let reach_out = frame::ReachOut::new(*round, local_addr);
5069                if buf.remaining_mut() > reach_out.size() {
5070                    trace!(%round, ?local_addr, "REACH_OUT");
5071                    reach_out.write(buf);
5072                    let sent_reachouts = sent
5073                        .retransmits
5074                        .get_or_create()
5075                        .reach_out
5076                        .get_or_insert_with(|| (*round, Default::default()));
5077                    sent_reachouts.1.push(local_addr);
5078                    self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5079                    qlog.frame(&Frame::ReachOut(reach_out));
5080                } else {
5081                    addresses.push(local_addr);
5082                    break;
5083                }
5084            }
5085            if addresses.is_empty() {
5086                space.pending.reach_out = None;
5087            }
5088        }
5089
5090        // OBSERVED_ADDR
5091        if !path_exclusive_only
5092            && space_id == SpaceId::Data
5093            && self
5094                .config
5095                .address_discovery_role
5096                .should_report(&self.peer_params.address_discovery_role)
5097            && (!path.observed_addr_sent || space.pending.observed_addr)
5098        {
5099            let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5100            if buf.remaining_mut() > frame.size() {
5101                trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5102                frame.write(buf);
5103
5104                self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5105                path.observed_addr_sent = true;
5106
5107                self.stats.frame_tx.observed_addr += 1;
5108                sent.retransmits.get_or_create().observed_addr = true;
5109                space.pending.observed_addr = false;
5110                qlog.frame(&Frame::ObservedAddr(frame));
5111            }
5112        }
5113
5114        // PING
5115        if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5116            trace!("PING");
5117            buf.write(frame::FrameType::PING);
5118            sent.non_retransmits = true;
5119            self.stats.frame_tx.ping += 1;
5120            qlog.frame(&Frame::Ping);
5121        }
5122
5123        // IMMEDIATE_ACK
5124        if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5125            debug_assert_eq!(
5126                space_id,
5127                SpaceId::Data,
5128                "immediate acks must be sent in the data space"
5129            );
5130            trace!("IMMEDIATE_ACK");
5131            buf.write(frame::FrameType::IMMEDIATE_ACK);
5132            sent.non_retransmits = true;
5133            self.stats.frame_tx.immediate_ack += 1;
5134            qlog.frame(&Frame::ImmediateAck);
5135        }
5136
5137        // ACK
5138        // TODO(flub): Should this send acks for this path anyway?
5139
5140        if !path_exclusive_only {
5141            for path_id in space
5142                .number_spaces
5143                .iter_mut()
5144                .filter(|(_, pns)| pns.pending_acks.can_send())
5145                .map(|(&path_id, _)| path_id)
5146                .collect::<Vec<_>>()
5147            {
5148                Self::populate_acks(
5149                    now,
5150                    self.receiving_ecn,
5151                    &mut sent,
5152                    path_id,
5153                    space_id,
5154                    space,
5155                    is_multipath_negotiated,
5156                    buf,
5157                    &mut self.stats,
5158                    qlog,
5159                );
5160            }
5161        }
5162
5163        // ACK_FREQUENCY
5164        if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5165            let sequence_number = self.ack_frequency.next_sequence_number();
5166
5167            // Safe to unwrap because this is always provided when ACK frequency is enabled
5168            let config = self.config.ack_frequency_config.as_ref().unwrap();
5169
5170            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
5171            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5172                path.rtt.get(),
5173                config,
5174                &self.peer_params,
5175            );
5176
5177            trace!(?max_ack_delay, "ACK_FREQUENCY");
5178
5179            let frame = frame::AckFrequency {
5180                sequence: sequence_number,
5181                ack_eliciting_threshold: config.ack_eliciting_threshold,
5182                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5183                reordering_threshold: config.reordering_threshold,
5184            };
5185            frame.encode(buf);
5186            qlog.frame(&Frame::AckFrequency(frame));
5187
5188            sent.retransmits.get_or_create().ack_frequency = true;
5189
5190            self.ack_frequency
5191                .ack_frequency_sent(path_id, pn, max_ack_delay);
5192            self.stats.frame_tx.ack_frequency += 1;
5193        }
5194
5195        // PATH_CHALLENGE
5196        if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5197            && space_id == SpaceId::Data
5198            && path.send_new_challenge
5199        {
5200            path.send_new_challenge = false;
5201
5202            // Generate a new challenge every time we send a new PATH_CHALLENGE
5203            let token = self.rng.random();
5204            let info = paths::SentChallengeInfo {
5205                sent_instant: now,
5206                remote: path.remote,
5207            };
5208            path.challenges_sent.insert(token, info);
5209            sent.non_retransmits = true;
5210            sent.requires_padding = true;
5211            let challenge = frame::PathChallenge(token);
5212            trace!(%challenge, "sending new challenge");
5213            buf.write(challenge);
5214            qlog.frame(&Frame::PathChallenge(challenge));
5215            self.stats.frame_tx.path_challenge += 1;
5216            let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5217            self.timers.set(
5218                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5219                now + pto,
5220                self.qlog.with_time(now),
5221            );
5222
5223            if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5224                // queue informing the path status along with the challenge
5225                space.pending.path_status.insert(path_id);
5226            }
5227
5228            // Always include an OBSERVED_ADDR frame with a PATH_CHALLENGE, regardless
5229            // of whether one has already been sent on this path.
5230            if space_id == SpaceId::Data
5231                && self
5232                    .config
5233                    .address_discovery_role
5234                    .should_report(&self.peer_params.address_discovery_role)
5235            {
5236                let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5237                if buf.remaining_mut() > frame.size() {
5238                    frame.write(buf);
5239                    qlog.frame(&Frame::ObservedAddr(frame));
5240
5241                    self.next_observed_addr_seq_no =
5242                        self.next_observed_addr_seq_no.saturating_add(1u8);
5243                    path.observed_addr_sent = true;
5244
5245                    self.stats.frame_tx.observed_addr += 1;
5246                    sent.retransmits.get_or_create().observed_addr = true;
5247                    space.pending.observed_addr = false;
5248                }
5249            }
5250        }
5251
5252        // PATH_RESPONSE
5253        if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5254            if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5255                sent.non_retransmits = true;
5256                sent.requires_padding = true;
5257                let response = frame::PathResponse(token);
5258                trace!(%response, "sending response");
5259                buf.write(response);
5260                qlog.frame(&Frame::PathResponse(response));
5261                self.stats.frame_tx.path_response += 1;
5262
5263                // NOTE: this is technically not required but might be useful to ride the
5264                // request/response nature of path challenges to refresh an observation
5265                // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
5266                if space_id == SpaceId::Data
5267                    && self
5268                        .config
5269                        .address_discovery_role
5270                        .should_report(&self.peer_params.address_discovery_role)
5271                {
5272                    let frame =
5273                        frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5274                    if buf.remaining_mut() > frame.size() {
5275                        frame.write(buf);
5276                        qlog.frame(&Frame::ObservedAddr(frame));
5277
5278                        self.next_observed_addr_seq_no =
5279                            self.next_observed_addr_seq_no.saturating_add(1u8);
5280                        path.observed_addr_sent = true;
5281
5282                        self.stats.frame_tx.observed_addr += 1;
5283                        sent.retransmits.get_or_create().observed_addr = true;
5284                        space.pending.observed_addr = false;
5285                    }
5286                }
5287            }
5288        }
5289
5290        // CRYPTO
5291        while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5292            let mut frame = match space.pending.crypto.pop_front() {
5293                Some(x) => x,
5294                None => break,
5295            };
5296
5297            // Calculate the maximum amount of crypto data we can store in the buffer.
5298            // Since the offset is known, we can reserve the exact size required to encode it.
5299            // For length we reserve 2bytes which allows to encode up to 2^14,
5300            // which is more than what fits into normally sized QUIC frames.
5301            let max_crypto_data_size = buf.remaining_mut()
5302                - 1 // Frame Type
5303                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5304                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
5305
5306            let len = frame
5307                .data
5308                .len()
5309                .min(2usize.pow(14) - 1)
5310                .min(max_crypto_data_size);
5311
5312            let data = frame.data.split_to(len);
5313            let truncated = frame::Crypto {
5314                offset: frame.offset,
5315                data,
5316            };
5317            trace!(
5318                "CRYPTO: off {} len {}",
5319                truncated.offset,
5320                truncated.data.len()
5321            );
5322            truncated.encode(buf);
5323            self.stats.frame_tx.crypto += 1;
5324
5325            // The clone is cheap but we still cfg it out if qlog is disabled.
5326            #[cfg(feature = "qlog")]
5327            qlog.frame(&Frame::Crypto(truncated.clone()));
5328            sent.retransmits.get_or_create().crypto.push_back(truncated);
5329            if !frame.data.is_empty() {
5330                frame.offset += len as u64;
5331                space.pending.crypto.push_front(frame);
5332            }
5333        }
5334
5335        // TODO(flub): maybe this is much higher priority?
5336        // PATH_ABANDON
5337        while !path_exclusive_only
5338            && space_id == SpaceId::Data
5339            && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5340        {
5341            let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5342                break;
5343            };
5344            let frame = frame::PathAbandon {
5345                path_id,
5346                error_code,
5347            };
5348            frame.encode(buf);
5349            qlog.frame(&Frame::PathAbandon(frame));
5350            self.stats.frame_tx.path_abandon += 1;
5351            trace!(%path_id, "PATH_ABANDON");
5352            sent.retransmits
5353                .get_or_create()
5354                .path_abandon
5355                .entry(path_id)
5356                .or_insert(error_code);
5357        }
5358
5359        // PATH_STATUS_AVAILABLE & PATH_STATUS_BACKUP
5360        while !path_exclusive_only
5361            && space_id == SpaceId::Data
5362            && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5363        {
5364            let Some(path_id) = space.pending.path_status.pop_first() else {
5365                break;
5366            };
5367            let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5368                trace!(%path_id, "discarding queued path status for unknown path");
5369                continue;
5370            };
5371
5372            let seq = path.status.seq();
5373            sent.retransmits.get_or_create().path_status.insert(path_id);
5374            match path.local_status() {
5375                PathStatus::Available => {
5376                    let frame = frame::PathStatusAvailable {
5377                        path_id,
5378                        status_seq_no: seq,
5379                    };
5380                    frame.encode(buf);
5381                    qlog.frame(&Frame::PathStatusAvailable(frame));
5382                    self.stats.frame_tx.path_status_available += 1;
5383                    trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5384                }
5385                PathStatus::Backup => {
5386                    let frame = frame::PathStatusBackup {
5387                        path_id,
5388                        status_seq_no: seq,
5389                    };
5390                    frame.encode(buf);
5391                    qlog.frame(&Frame::PathStatusBackup(frame));
5392                    self.stats.frame_tx.path_status_backup += 1;
5393                    trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5394                }
5395            }
5396        }
5397
5398        // MAX_PATH_ID
5399        if space_id == SpaceId::Data
5400            && space.pending.max_path_id
5401            && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5402        {
5403            let frame = frame::MaxPathId(self.local_max_path_id);
5404            frame.encode(buf);
5405            qlog.frame(&Frame::MaxPathId(frame));
5406            space.pending.max_path_id = false;
5407            sent.retransmits.get_or_create().max_path_id = true;
5408            trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5409            self.stats.frame_tx.max_path_id += 1;
5410        }
5411
5412        // PATHS_BLOCKED
5413        if space_id == SpaceId::Data
5414            && space.pending.paths_blocked
5415            && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5416        {
5417            let frame = frame::PathsBlocked(self.remote_max_path_id);
5418            frame.encode(buf);
5419            qlog.frame(&Frame::PathsBlocked(frame));
5420            space.pending.paths_blocked = false;
5421            sent.retransmits.get_or_create().paths_blocked = true;
5422            trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5423            self.stats.frame_tx.paths_blocked += 1;
5424        }
5425
5426        // PATH_CIDS_BLOCKED
5427        while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5428        {
5429            let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5430                break;
5431            };
5432            let next_seq = match self.rem_cids.get(&path_id) {
5433                Some(cid_queue) => cid_queue.active_seq() + 1,
5434                None => 0,
5435            };
5436            let frame = frame::PathCidsBlocked {
5437                path_id,
5438                next_seq: VarInt(next_seq),
5439            };
5440            frame.encode(buf);
5441            qlog.frame(&Frame::PathCidsBlocked(frame));
5442            sent.retransmits
5443                .get_or_create()
5444                .path_cids_blocked
5445                .push(path_id);
5446            trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5447            self.stats.frame_tx.path_cids_blocked += 1;
5448        }
5449
5450        // RESET_STREAM, STOP_SENDING, MAX_DATA, MAX_STREAM_DATA, MAX_STREAMS
5451        if space_id == SpaceId::Data {
5452            self.streams.write_control_frames(
5453                buf,
5454                &mut space.pending,
5455                &mut sent.retransmits,
5456                &mut self.stats.frame_tx,
5457                qlog,
5458            );
5459        }
5460
5461        // NEW_CONNECTION_ID
5462        let cid_len = self
5463            .local_cid_state
5464            .values()
5465            .map(|cid_state| cid_state.cid_len())
5466            .max()
5467            .expect("some local CID state must exist");
5468        let new_cid_size_bound =
5469            frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5470        while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5471            let issued = match space.pending.new_cids.pop() {
5472                Some(x) => x,
5473                None => break,
5474            };
5475            let retire_prior_to = self
5476                .local_cid_state
5477                .get(&issued.path_id)
5478                .map(|cid_state| cid_state.retire_prior_to())
5479                .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5480
5481            let cid_path_id = match is_multipath_negotiated {
5482                true => {
5483                    trace!(
5484                        path_id = ?issued.path_id,
5485                        sequence = issued.sequence,
5486                        id = %issued.id,
5487                        "PATH_NEW_CONNECTION_ID",
5488                    );
5489                    self.stats.frame_tx.path_new_connection_id += 1;
5490                    Some(issued.path_id)
5491                }
5492                false => {
5493                    trace!(
5494                        sequence = issued.sequence,
5495                        id = %issued.id,
5496                        "NEW_CONNECTION_ID"
5497                    );
5498                    debug_assert_eq!(issued.path_id, PathId::ZERO);
5499                    self.stats.frame_tx.new_connection_id += 1;
5500                    None
5501                }
5502            };
5503            let frame = frame::NewConnectionId {
5504                path_id: cid_path_id,
5505                sequence: issued.sequence,
5506                retire_prior_to,
5507                id: issued.id,
5508                reset_token: issued.reset_token,
5509            };
5510            frame.encode(buf);
5511            sent.retransmits.get_or_create().new_cids.push(issued);
5512            qlog.frame(&Frame::NewConnectionId(frame));
5513        }
5514
5515        // RETIRE_CONNECTION_ID
5516        let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5517        while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5518            let (path_id, sequence) = match space.pending.retire_cids.pop() {
5519                Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5520                    trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5521                    self.stats.frame_tx.retire_connection_id += 1;
5522                    (None, seq)
5523                }
5524                Some((path_id, seq)) => {
5525                    trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5526                    self.stats.frame_tx.path_retire_connection_id += 1;
5527                    (Some(path_id), seq)
5528                }
5529                None => break,
5530            };
5531            let frame = frame::RetireConnectionId { path_id, sequence };
5532            frame.encode(buf);
5533            qlog.frame(&Frame::RetireConnectionId(frame));
5534            sent.retransmits
5535                .get_or_create()
5536                .retire_cids
5537                .push((path_id.unwrap_or_default(), sequence));
5538        }
5539
5540        // DATAGRAM
5541        let mut sent_datagrams = false;
5542        while !path_exclusive_only
5543            && buf.remaining_mut() > Datagram::SIZE_BOUND
5544            && space_id == SpaceId::Data
5545        {
5546            let prev_remaining = buf.remaining_mut();
5547            match self.datagrams.write(buf) {
5548                true => {
5549                    sent_datagrams = true;
5550                    sent.non_retransmits = true;
5551                    self.stats.frame_tx.datagram += 1;
5552                    qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5553                }
5554                false => break,
5555            }
5556        }
5557        if self.datagrams.send_blocked && sent_datagrams {
5558            self.events.push_back(Event::DatagramsUnblocked);
5559            self.datagrams.send_blocked = false;
5560        }
5561
5562        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5563
5564        // NEW_TOKEN
5565        while let Some(remote_addr) = space.pending.new_tokens.pop() {
5566            if path_exclusive_only {
5567                break;
5568            }
5569            debug_assert_eq!(space_id, SpaceId::Data);
5570            let ConnectionSide::Server { server_config } = &self.side else {
5571                panic!("NEW_TOKEN frames should not be enqueued by clients");
5572            };
5573
5574            if remote_addr != path.remote {
5575                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
5576                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
5577                // frames upon an path change. Instead, when the new path becomes validated,
5578                // NEW_TOKEN frames may be enqueued for the new path instead.
5579                continue;
5580            }
5581
5582            let token = Token::new(
5583                TokenPayload::Validation {
5584                    ip: remote_addr.ip(),
5585                    issued: server_config.time_source.now(),
5586                },
5587                &mut self.rng,
5588            );
5589            let new_token = NewToken {
5590                token: token.encode(&*server_config.token_key).into(),
5591            };
5592
5593            if buf.remaining_mut() < new_token.size() {
5594                space.pending.new_tokens.push(remote_addr);
5595                break;
5596            }
5597
5598            trace!("NEW_TOKEN");
5599            new_token.encode(buf);
5600            qlog.frame(&Frame::NewToken(new_token));
5601            sent.retransmits
5602                .get_or_create()
5603                .new_tokens
5604                .push(remote_addr);
5605            self.stats.frame_tx.new_token += 1;
5606        }
5607
5608        // STREAM
5609        if !path_exclusive_only && space_id == SpaceId::Data {
5610            sent.stream_frames =
5611                self.streams
5612                    .write_stream_frames(buf, self.config.send_fairness, qlog);
5613            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5614        }
5615
5616        // ADD_ADDRESS
5617        // TODO(@divma): check if we need to do path exclusive filters
5618        while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5619            if let Some(added_address) = space.pending.add_address.pop_last() {
5620                trace!(
5621                    seq = %added_address.seq_no,
5622                    ip = ?added_address.ip,
5623                    port = added_address.port,
5624                    "ADD_ADDRESS",
5625                );
5626                added_address.write(buf);
5627                sent.retransmits
5628                    .get_or_create()
5629                    .add_address
5630                    .insert(added_address);
5631                self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5632                qlog.frame(&Frame::AddAddress(added_address));
5633            } else {
5634                break;
5635            }
5636        }
5637
5638        // REMOVE_ADDRESS
5639        while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5640            if let Some(removed_address) = space.pending.remove_address.pop_last() {
5641                trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5642                removed_address.write(buf);
5643                sent.retransmits
5644                    .get_or_create()
5645                    .remove_address
5646                    .insert(removed_address);
5647                self.stats.frame_tx.remove_address =
5648                    self.stats.frame_tx.remove_address.saturating_add(1);
5649                qlog.frame(&Frame::RemoveAddress(removed_address));
5650            } else {
5651                break;
5652            }
5653        }
5654
5655        sent
5656    }
5657
5658    /// Write pending ACKs into a buffer
5659    fn populate_acks(
5660        now: Instant,
5661        receiving_ecn: bool,
5662        sent: &mut SentFrames,
5663        path_id: PathId,
5664        space_id: SpaceId,
5665        space: &mut PacketSpace,
5666        is_multipath_negotiated: bool,
5667        buf: &mut impl BufMut,
5668        stats: &mut ConnectionStats,
5669        #[allow(unused)] qlog: &mut QlogSentPacket,
5670    ) {
5671        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
5672        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5673
5674        debug_assert!(
5675            is_multipath_negotiated || path_id == PathId::ZERO,
5676            "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5677        );
5678        if is_multipath_negotiated {
5679            debug_assert!(
5680                space_id == SpaceId::Data || path_id == PathId::ZERO,
5681                "path acks must be sent in 1RTT space (have {space_id:?})"
5682            );
5683        }
5684
5685        let pns = space.for_path(path_id);
5686        let ranges = pns.pending_acks.ranges();
5687        debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5688        let ecn = if receiving_ecn {
5689            Some(&pns.ecn_counters)
5690        } else {
5691            None
5692        };
5693        if let Some(max) = ranges.max() {
5694            sent.largest_acked.insert(path_id, max);
5695        }
5696
5697        let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5698        // TODO: This should come from `TransportConfig` if that gets configurable.
5699        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5700        let delay = delay_micros >> ack_delay_exp.into_inner();
5701
5702        if is_multipath_negotiated && space_id == SpaceId::Data {
5703            if !ranges.is_empty() {
5704                trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5705                frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5706                qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5707                stats.frame_tx.path_acks += 1;
5708            }
5709        } else {
5710            trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5711            frame::Ack::encode(delay as _, ranges, ecn, buf);
5712            stats.frame_tx.acks += 1;
5713            qlog.frame_ack(delay, ranges, ecn);
5714        }
5715    }
5716
5717    fn close_common(&mut self) {
5718        trace!("connection closed");
5719        self.timers.reset();
5720    }
5721
5722    fn set_close_timer(&mut self, now: Instant) {
5723        // QUIC-MULTIPATH § 2.6 Connection Closure: draining for 3*PTO with PTO the max of
5724        // the PTO for all paths.
5725        self.timers.set(
5726            Timer::Conn(ConnTimer::Close),
5727            now + 3 * self.pto_max_path(self.highest_space),
5728            self.qlog.with_time(now),
5729        );
5730    }
5731
5732    /// Handle transport parameters received from the peer
5733    ///
5734    /// *rem_cid* and *loc_cid* are the source and destination CIDs respectively of the
5735    /// *packet into which the transport parameters arrived.
5736    fn handle_peer_params(
5737        &mut self,
5738        params: TransportParameters,
5739        loc_cid: ConnectionId,
5740        rem_cid: ConnectionId,
5741        now: Instant,
5742    ) -> Result<(), TransportError> {
5743        if Some(self.orig_rem_cid) != params.initial_src_cid
5744            || (self.side.is_client()
5745                && (Some(self.initial_dst_cid) != params.original_dst_cid
5746                    || self.retry_src_cid != params.retry_src_cid))
5747        {
5748            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5749                "CID authentication failure",
5750            ));
5751        }
5752        if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5753            return Err(TransportError::PROTOCOL_VIOLATION(
5754                "multipath must not use zero-length CIDs",
5755            ));
5756        }
5757
5758        self.set_peer_params(params);
5759        self.qlog.emit_peer_transport_params_received(self, now);
5760
5761        Ok(())
5762    }
5763
5764    fn set_peer_params(&mut self, params: TransportParameters) {
5765        self.streams.set_params(&params);
5766        self.idle_timeout =
5767            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5768        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5769
5770        if let Some(ref info) = params.preferred_address {
5771            // During the handshake PathId::ZERO exists.
5772            self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5773                path_id: None,
5774                sequence: 1,
5775                id: info.connection_id,
5776                reset_token: info.stateless_reset_token,
5777                retire_prior_to: 0,
5778            })
5779            .expect(
5780                "preferred address CID is the first received, and hence is guaranteed to be legal",
5781            );
5782            let remote = self.path_data(PathId::ZERO).remote;
5783            self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5784        }
5785        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
5786
5787        let mut multipath_enabled = None;
5788        if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5789            self.config.get_initial_max_path_id(),
5790            params.initial_max_path_id,
5791        ) {
5792            // multipath is enabled, register the local and remote maximums
5793            self.local_max_path_id = local_max_path_id;
5794            self.remote_max_path_id = remote_max_path_id;
5795            let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5796            debug!(%initial_max_path_id, "multipath negotiated");
5797            multipath_enabled = Some(initial_max_path_id);
5798        }
5799
5800        if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5801            self.config
5802                .max_remote_nat_traversal_addresses
5803                .zip(params.max_remote_nat_traversal_addresses)
5804        {
5805            if let Some(max_initial_paths) =
5806                multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5807            {
5808                let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5809                let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5810                self.iroh_hp =
5811                    iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5812                debug!(
5813                    %max_remote_addresses, %max_local_addresses,
5814                    "iroh hole punching negotiated"
5815                );
5816
5817                match self.side() {
5818                    Side::Client => {
5819                        if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5820                            // in this case the client might try to open `max_remote_addresses` new
5821                            // paths, but the current multipath configuration will not allow it
5822                            warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5823                        } else if max_local_addresses as u64
5824                            > params.active_connection_id_limit.into_inner()
5825                        {
5826                            // the server allows us to send at most `params.active_connection_id_limit`
5827                            // but they might need at least `max_local_addresses` to effectively send
5828                            // `PATH_CHALLENGE` frames to each advertised local address
5829                            warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5830                        }
5831                    }
5832                    Side::Server => {
5833                        if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5834                            warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5835                        }
5836                    }
5837                }
5838            } else {
5839                debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5840            }
5841        }
5842
5843        self.peer_params = params;
5844        let peer_max_udp_payload_size =
5845            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5846        self.path_data_mut(PathId::ZERO)
5847            .mtud
5848            .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5849    }
5850
5851    /// Decrypts a packet, returning the packet number on success
5852    fn decrypt_packet(
5853        &mut self,
5854        now: Instant,
5855        path_id: PathId,
5856        packet: &mut Packet,
5857    ) -> Result<Option<u64>, Option<TransportError>> {
5858        let result = packet_crypto::decrypt_packet_body(
5859            packet,
5860            path_id,
5861            &self.spaces,
5862            self.zero_rtt_crypto.as_ref(),
5863            self.key_phase,
5864            self.prev_crypto.as_ref(),
5865            self.next_crypto.as_ref(),
5866        )?;
5867
5868        let result = match result {
5869            Some(r) => r,
5870            None => return Ok(None),
5871        };
5872
5873        if result.outgoing_key_update_acked {
5874            if let Some(prev) = self.prev_crypto.as_mut() {
5875                prev.end_packet = Some((result.number, now));
5876                self.set_key_discard_timer(now, packet.header.space());
5877            }
5878        }
5879
5880        if result.incoming_key_update {
5881            trace!("key update authenticated");
5882            self.update_keys(Some((result.number, now)), true);
5883            self.set_key_discard_timer(now, packet.header.space());
5884        }
5885
5886        Ok(Some(result.number))
5887    }
5888
5889    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5890        trace!("executing key update");
5891        // Generate keys for the key phase after the one we're switching to, store them in
5892        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
5893        // `prev_crypto`.
5894        let new = self
5895            .crypto
5896            .next_1rtt_keys()
5897            .expect("only called for `Data` packets");
5898        self.key_phase_size = new
5899            .local
5900            .confidentiality_limit()
5901            .saturating_sub(KEY_UPDATE_MARGIN);
5902        let old = mem::replace(
5903            &mut self.spaces[SpaceId::Data]
5904                .crypto
5905                .as_mut()
5906                .unwrap() // safe because update_keys() can only be triggered by short packets
5907                .packet,
5908            mem::replace(self.next_crypto.as_mut().unwrap(), new),
5909        );
5910        self.spaces[SpaceId::Data]
5911            .iter_paths_mut()
5912            .for_each(|s| s.sent_with_keys = 0);
5913        self.prev_crypto = Some(PrevCrypto {
5914            crypto: old,
5915            end_packet,
5916            update_unacked: remote,
5917        });
5918        self.key_phase = !self.key_phase;
5919    }
5920
5921    fn peer_supports_ack_frequency(&self) -> bool {
5922        self.peer_params.min_ack_delay.is_some()
5923    }
5924
5925    /// Send an IMMEDIATE_ACK frame to the remote endpoint
5926    ///
5927    /// According to the spec, this will result in an error if the remote endpoint does not support
5928    /// the Acknowledgement Frequency extension
5929    pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5930        debug_assert_eq!(
5931            self.highest_space,
5932            SpaceId::Data,
5933            "immediate ack must be written in the data space"
5934        );
5935        self.spaces[self.highest_space]
5936            .for_path(path_id)
5937            .immediate_ack_pending = true;
5938    }
5939
5940    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
5941    #[cfg(test)]
5942    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5943        let (path_id, first_decode, remaining) = match &event.0 {
5944            ConnectionEventInner::Datagram(DatagramConnectionEvent {
5945                path_id,
5946                first_decode,
5947                remaining,
5948                ..
5949            }) => (path_id, first_decode, remaining),
5950            _ => return None,
5951        };
5952
5953        if remaining.is_some() {
5954            panic!("Packets should never be coalesced in tests");
5955        }
5956
5957        let decrypted_header = packet_crypto::unprotect_header(
5958            first_decode.clone(),
5959            &self.spaces,
5960            self.zero_rtt_crypto.as_ref(),
5961            self.peer_params.stateless_reset_token,
5962        )?;
5963
5964        let mut packet = decrypted_header.packet?;
5965        packet_crypto::decrypt_packet_body(
5966            &mut packet,
5967            *path_id,
5968            &self.spaces,
5969            self.zero_rtt_crypto.as_ref(),
5970            self.key_phase,
5971            self.prev_crypto.as_ref(),
5972            self.next_crypto.as_ref(),
5973        )
5974        .ok()?;
5975
5976        Some(packet.payload.to_vec())
5977    }
5978
5979    /// The number of bytes of packets containing retransmittable frames that have not been
5980    /// acknowledged or declared lost.
5981    #[cfg(test)]
5982    pub(crate) fn bytes_in_flight(&self) -> u64 {
5983        // TODO(@divma): consider including for multipath?
5984        self.path_data(PathId::ZERO).in_flight.bytes
5985    }
5986
5987    /// Number of bytes worth of non-ack-only packets that may be sent
5988    #[cfg(test)]
5989    pub(crate) fn congestion_window(&self) -> u64 {
5990        let path = self.path_data(PathId::ZERO);
5991        path.congestion
5992            .window()
5993            .saturating_sub(path.in_flight.bytes)
5994    }
5995
5996    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
5997    #[cfg(test)]
5998    pub(crate) fn is_idle(&self) -> bool {
5999        let current_timers = self.timers.values();
6000        current_timers
6001            .into_iter()
6002            .filter(|(timer, _)| {
6003                !matches!(
6004                    timer,
6005                    Timer::Conn(ConnTimer::KeepAlive)
6006                        | Timer::PerPath(_, PathTimer::PathKeepAlive)
6007                        | Timer::Conn(ConnTimer::PushNewCid)
6008                        | Timer::Conn(ConnTimer::KeyDiscard)
6009                )
6010            })
6011            .min_by_key(|(_, time)| *time)
6012            .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6013    }
6014
6015    /// Whether explicit congestion notification is in use on outgoing packets.
6016    #[cfg(test)]
6017    pub(crate) fn using_ecn(&self) -> bool {
6018        self.path_data(PathId::ZERO).sending_ecn
6019    }
6020
6021    /// The number of received bytes in the current path
6022    #[cfg(test)]
6023    pub(crate) fn total_recvd(&self) -> u64 {
6024        self.path_data(PathId::ZERO).total_recvd
6025    }
6026
6027    #[cfg(test)]
6028    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6029        self.local_cid_state
6030            .get(&PathId::ZERO)
6031            .unwrap()
6032            .active_seq()
6033    }
6034
6035    #[cfg(test)]
6036    #[track_caller]
6037    pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6038        self.local_cid_state
6039            .get(&PathId(path_id))
6040            .unwrap()
6041            .active_seq()
6042    }
6043
6044    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
6045    /// with updated `retire_prior_to` field set to `v`
6046    #[cfg(test)]
6047    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6048        let n = self
6049            .local_cid_state
6050            .get_mut(&PathId::ZERO)
6051            .unwrap()
6052            .assign_retire_seq(v);
6053        self.endpoint_events
6054            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6055    }
6056
6057    /// Check the current active remote CID sequence for `PathId::ZERO`
6058    #[cfg(test)]
6059    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6060        self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6061    }
6062
6063    /// Returns the detected maximum udp payload size for the current path
6064    #[cfg(test)]
6065    pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6066        self.path_data(path_id).current_mtu()
6067    }
6068
6069    /// Triggers path validation on all paths
6070    #[cfg(test)]
6071    pub(crate) fn trigger_path_validation(&mut self) {
6072        for path in self.paths.values_mut() {
6073            path.data.send_new_challenge = true;
6074        }
6075    }
6076
6077    /// Whether we have 1-RTT data to send
6078    ///
6079    /// This checks for frames that can only be sent in the data space (1-RTT):
6080    /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated.
6081    /// - Pending PATH_RESPONSE frames.
6082    /// - Pending data to send in STREAM frames.
6083    /// - Pending DATAGRAM frames to send.
6084    ///
6085    /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that
6086    /// may need to be sent.
6087    fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6088        let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6089            path.data.send_new_challenge
6090                || path
6091                    .prev
6092                    .as_ref()
6093                    .is_some_and(|(_, path)| path.send_new_challenge)
6094                || !path.data.path_responses.is_empty()
6095        });
6096        let other = self.streams.can_send_stream_data()
6097            || self
6098                .datagrams
6099                .outgoing
6100                .front()
6101                .is_some_and(|x| x.size(true) <= max_size);
6102        SendableFrames {
6103            acks: false,
6104            other,
6105            close: false,
6106            path_exclusive,
6107        }
6108    }
6109
6110    /// Terminate the connection instantly, without sending a close packet
6111    fn kill(&mut self, reason: ConnectionError) {
6112        self.close_common();
6113        self.state.move_to_drained(Some(reason));
6114        self.endpoint_events.push_back(EndpointEventInner::Drained);
6115    }
6116
6117    /// Storage size required for the largest packet that can be transmitted on all currently
6118    /// available paths
6119    ///
6120    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
6121    ///
6122    /// When multipath is enabled, this value is the minimum MTU across all available paths.
6123    pub fn current_mtu(&self) -> u16 {
6124        self.paths
6125            .iter()
6126            .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6127            .map(|(_path_id, path_state)| path_state.data.current_mtu())
6128            .min()
6129            .expect("There is always at least one available path")
6130    }
6131
6132    /// Size of non-frame data for a 1-RTT packet
6133    ///
6134    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
6135    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
6136    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
6137    /// latency and packet loss.
6138    fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6139        let pn_len = PacketNumber::new(
6140            pn,
6141            self.spaces[SpaceId::Data]
6142                .for_path(path)
6143                .largest_acked_packet
6144                .unwrap_or(0),
6145        )
6146        .len();
6147
6148        // 1 byte for flags
6149        1 + self
6150            .rem_cids
6151            .get(&path)
6152            .map(|cids| cids.active().len())
6153            .unwrap_or(20)      // Max CID len in QUIC v1
6154            + pn_len
6155            + self.tag_len_1rtt()
6156    }
6157
6158    fn predict_1rtt_overhead_no_pn(&self) -> usize {
6159        let pn_len = 4;
6160
6161        let cid_len = self
6162            .rem_cids
6163            .values()
6164            .map(|cids| cids.active().len())
6165            .max()
6166            .unwrap_or(20); // Max CID len in QUIC v1
6167
6168        // 1 byte for flags
6169        1 + cid_len + pn_len + self.tag_len_1rtt()
6170    }
6171
6172    fn tag_len_1rtt(&self) -> usize {
6173        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6174            Some(crypto) => Some(&*crypto.packet.local),
6175            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6176        };
6177        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
6178        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
6179        // but that would needlessly prevent sending datagrams during 0-RTT.
6180        key.map_or(16, |x| x.tag_len())
6181    }
6182
6183    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
6184    fn on_path_validated(&mut self, path_id: PathId) {
6185        self.path_data_mut(path_id).validated = true;
6186        let ConnectionSide::Server { server_config } = &self.side else {
6187            return;
6188        };
6189        let remote_addr = self.path_data(path_id).remote;
6190        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6191        new_tokens.clear();
6192        for _ in 0..server_config.validation_token.sent {
6193            new_tokens.push(remote_addr);
6194        }
6195    }
6196
6197    /// Handle new path status information: PATH_STATUS_AVAILABLE, PATH_STATUS_BACKUP
6198    fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6199        if let Some(path) = self.paths.get_mut(&path_id) {
6200            path.data.status.remote_update(status, status_seq_no);
6201        } else {
6202            debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6203        }
6204        self.events.push_back(
6205            PathEvent::RemoteStatus {
6206                id: path_id,
6207                status,
6208            }
6209            .into(),
6210        );
6211    }
6212
6213    /// Returns the maximum [`PathId`] to be used for sending in this connection.
6214    ///
6215    /// This is calculated as minimum between the local and remote's maximums when multipath is
6216    /// enabled, or `None` when disabled.
6217    ///
6218    /// For data that's received, we should use [`Self::local_max_path_id`] instead.
6219    /// The reasoning is that the remote might already have updated to its own newer
6220    /// [`Self::max_path_id`] after sending out a `MAX_PATH_ID` frame, but it got re-ordered.
6221    fn max_path_id(&self) -> Option<PathId> {
6222        if self.is_multipath_negotiated() {
6223            Some(self.remote_max_path_id.min(self.local_max_path_id))
6224        } else {
6225            None
6226        }
6227    }
6228
6229    /// Add addresses the local endpoint considers are reachable for nat traversal
6230    pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6231        if let Some(added) = self.iroh_hp.add_local_address(address)? {
6232            self.spaces[SpaceId::Data].pending.add_address.insert(added);
6233        };
6234        Ok(())
6235    }
6236
6237    /// Removes an address the endpoing no longer considers reachable for nat traversal
6238    ///
6239    /// Addresses not present in the set will be silently ignored.
6240    pub fn remove_nat_traversal_address(
6241        &mut self,
6242        address: SocketAddr,
6243    ) -> Result<(), iroh_hp::Error> {
6244        if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6245            self.spaces[SpaceId::Data]
6246                .pending
6247                .remove_address
6248                .insert(removed);
6249        }
6250        Ok(())
6251    }
6252
6253    /// Get the current local nat traversal addresses
6254    pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6255        self.iroh_hp.get_local_nat_traversal_addresses()
6256    }
6257
6258    /// Get the currently advertised nat traversal addresses by the server
6259    pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6260        Ok(self
6261            .iroh_hp
6262            .client_side()?
6263            .get_remote_nat_traversal_addresses())
6264    }
6265
6266    /// Initiates a new nat traversal round
6267    ///
6268    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
6269    /// frames, and initiating probing of the known remote addresses. When a new round is
6270    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
6271    ///
6272    /// Returns the server addresses that are now being probed.
6273    pub fn initiate_nat_traversal_round(
6274        &mut self,
6275        now: Instant,
6276    ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6277        let client_state = self.iroh_hp.client_side_mut()?;
6278        let iroh_hp::NatTraversalRound {
6279            new_round,
6280            reach_out_at,
6281            addresses_to_probe,
6282            prev_round_path_ids,
6283        } = client_state.initiate_nat_traversal_round()?;
6284
6285        self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6286
6287        for path_id in prev_round_path_ids {
6288            // TODO(@divma): this sounds reasonable but we need if this actually works for the
6289            // purposes of the protocol
6290            let validated = self
6291                .path(path_id)
6292                .map(|path| path.validated)
6293                .unwrap_or(false);
6294
6295            if !validated {
6296                let _ = self.close_path(
6297                    now,
6298                    path_id,
6299                    TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6300                );
6301            }
6302        }
6303
6304        let mut err = None;
6305
6306        let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6307        let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6308        let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6309
6310        for (ip, port) in addresses_to_probe {
6311            // If this endpoint is an IPv6 endpoint we use IPv6 addresses for all remotes.
6312            let remote = match ip {
6313                IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6314                IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6315                IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6316                IpAddr::V6(_) => {
6317                    trace!("not using IPv6 nat candidate for IPv4 socket");
6318                    continue;
6319                }
6320            };
6321            match self.open_path_ensure(remote, PathStatus::Backup, now) {
6322                Ok((path_id, path_was_known)) if !path_was_known => {
6323                    path_ids.push(path_id);
6324                    probed_addresses.push(remote);
6325                }
6326                Ok((path_id, _)) => {
6327                    trace!(%path_id, %remote,"nat traversal: path existed for remote")
6328                }
6329                Err(e) => {
6330                    debug!(%remote, %e,"nat traversal: failed to probe remote");
6331                    err.get_or_insert(e);
6332                }
6333            }
6334        }
6335
6336        if let Some(err) = err {
6337            // We failed to probe any addresses, bail out
6338            if probed_addresses.is_empty() {
6339                return Err(iroh_hp::Error::Multipath(err));
6340            }
6341        }
6342
6343        self.iroh_hp
6344            .client_side_mut()
6345            .expect("connection side validated")
6346            .set_round_path_ids(path_ids);
6347
6348        Ok(probed_addresses)
6349    }
6350}
6351
6352impl fmt::Debug for Connection {
6353    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6354        f.debug_struct("Connection")
6355            .field("handshake_cid", &self.handshake_cid)
6356            .finish()
6357    }
6358}
6359
6360#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6361enum PathBlocked {
6362    No,
6363    AntiAmplification,
6364    Congestion,
6365    Pacing,
6366}
6367
6368/// Fields of `Connection` specific to it being client-side or server-side
6369enum ConnectionSide {
6370    Client {
6371        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
6372        token: Bytes,
6373        token_store: Arc<dyn TokenStore>,
6374        server_name: String,
6375    },
6376    Server {
6377        server_config: Arc<ServerConfig>,
6378    },
6379}
6380
6381impl ConnectionSide {
6382    fn remote_may_migrate(&self, state: &State) -> bool {
6383        match self {
6384            Self::Server { server_config } => server_config.migration,
6385            Self::Client { .. } => {
6386                if let Some(hs) = state.as_handshake() {
6387                    hs.allow_server_migration
6388                } else {
6389                    false
6390                }
6391            }
6392        }
6393    }
6394
6395    fn is_client(&self) -> bool {
6396        self.side().is_client()
6397    }
6398
6399    fn is_server(&self) -> bool {
6400        self.side().is_server()
6401    }
6402
6403    fn side(&self) -> Side {
6404        match *self {
6405            Self::Client { .. } => Side::Client,
6406            Self::Server { .. } => Side::Server,
6407        }
6408    }
6409}
6410
6411impl From<SideArgs> for ConnectionSide {
6412    fn from(side: SideArgs) -> Self {
6413        match side {
6414            SideArgs::Client {
6415                token_store,
6416                server_name,
6417            } => Self::Client {
6418                token: token_store.take(&server_name).unwrap_or_default(),
6419                token_store,
6420                server_name,
6421            },
6422            SideArgs::Server {
6423                server_config,
6424                pref_addr_cid: _,
6425                path_validated: _,
6426            } => Self::Server { server_config },
6427        }
6428    }
6429}
6430
6431/// Parameters to `Connection::new` specific to it being client-side or server-side
6432pub(crate) enum SideArgs {
6433    Client {
6434        token_store: Arc<dyn TokenStore>,
6435        server_name: String,
6436    },
6437    Server {
6438        server_config: Arc<ServerConfig>,
6439        pref_addr_cid: Option<ConnectionId>,
6440        path_validated: bool,
6441    },
6442}
6443
6444impl SideArgs {
6445    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6446        match *self {
6447            Self::Client { .. } => None,
6448            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6449        }
6450    }
6451
6452    pub(crate) fn path_validated(&self) -> bool {
6453        match *self {
6454            Self::Client { .. } => true,
6455            Self::Server { path_validated, .. } => path_validated,
6456        }
6457    }
6458
6459    pub(crate) fn side(&self) -> Side {
6460        match *self {
6461            Self::Client { .. } => Side::Client,
6462            Self::Server { .. } => Side::Server,
6463        }
6464    }
6465}
6466
6467/// Reasons why a connection might be lost
6468#[derive(Debug, Error, Clone, PartialEq, Eq)]
6469pub enum ConnectionError {
6470    /// The peer doesn't implement any supported version
6471    #[error("peer doesn't implement any supported version")]
6472    VersionMismatch,
6473    /// The peer violated the QUIC specification as understood by this implementation
6474    #[error(transparent)]
6475    TransportError(#[from] TransportError),
6476    /// The peer's QUIC stack aborted the connection automatically
6477    #[error("aborted by peer: {0}")]
6478    ConnectionClosed(frame::ConnectionClose),
6479    /// The peer closed the connection
6480    #[error("closed by peer: {0}")]
6481    ApplicationClosed(frame::ApplicationClose),
6482    /// The peer is unable to continue processing this connection, usually due to having restarted
6483    #[error("reset by peer")]
6484    Reset,
6485    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
6486    ///
6487    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
6488    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
6489    /// and [`TransportConfig::keep_alive_interval()`].
6490    #[error("timed out")]
6491    TimedOut,
6492    /// The local application closed the connection
6493    #[error("closed")]
6494    LocallyClosed,
6495    /// The connection could not be created because not enough of the CID space is available
6496    ///
6497    /// Try using longer connection IDs.
6498    #[error("CIDs exhausted")]
6499    CidsExhausted,
6500}
6501
6502impl From<Close> for ConnectionError {
6503    fn from(x: Close) -> Self {
6504        match x {
6505            Close::Connection(reason) => Self::ConnectionClosed(reason),
6506            Close::Application(reason) => Self::ApplicationClosed(reason),
6507        }
6508    }
6509}
6510
6511// For compatibility with API consumers
6512impl From<ConnectionError> for io::Error {
6513    fn from(x: ConnectionError) -> Self {
6514        use ConnectionError::*;
6515        let kind = match x {
6516            TimedOut => io::ErrorKind::TimedOut,
6517            Reset => io::ErrorKind::ConnectionReset,
6518            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6519            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6520                io::ErrorKind::Other
6521            }
6522        };
6523        Self::new(kind, x)
6524    }
6525}
6526
6527/// Errors that might trigger a path being closed
6528// TODO(@divma): maybe needs to be reworked based on what we want to do with the public API
6529#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6530pub enum PathError {
6531    /// The extension was not negotiated with the peer
6532    #[error("multipath extension not negotiated")]
6533    MultipathNotNegotiated,
6534    /// Paths can only be opened client-side
6535    #[error("the server side may not open a path")]
6536    ServerSideNotAllowed,
6537    /// Current limits do not allow us to open more paths
6538    #[error("maximum number of concurrent paths reached")]
6539    MaxPathIdReached,
6540    /// No remote CIDs available to open a new path
6541    #[error("remoted CIDs exhausted")]
6542    RemoteCidsExhausted,
6543    /// Path could not be validated and will be abandoned
6544    #[error("path validation failed")]
6545    ValidationFailed,
6546    /// The remote address for the path is not supported by the endpoint
6547    #[error("invalid remote address")]
6548    InvalidRemoteAddress(SocketAddr),
6549}
6550
6551/// Errors triggered when abandoning a path
6552#[derive(Debug, Error, Clone, Eq, PartialEq)]
6553pub enum ClosePathError {
6554    /// The path is already closed or was never opened
6555    #[error("closed path")]
6556    ClosedPath,
6557    /// This is the last path, which can not be abandoned
6558    #[error("last open path")]
6559    LastOpenPath,
6560}
6561
6562#[derive(Debug, Error, Clone, Copy)]
6563#[error("Multipath extension not negotiated")]
6564pub struct MultipathNotNegotiated {
6565    _private: (),
6566}
6567
6568/// Events of interest to the application
6569#[derive(Debug)]
6570pub enum Event {
6571    /// The connection's handshake data is ready
6572    HandshakeDataReady,
6573    /// The connection was successfully established
6574    Connected,
6575    /// The TLS handshake was confirmed
6576    HandshakeConfirmed,
6577    /// The connection was lost
6578    ///
6579    /// Emitted if the peer closes the connection or an error is encountered.
6580    ConnectionLost {
6581        /// Reason that the connection was closed
6582        reason: ConnectionError,
6583    },
6584    /// Stream events
6585    Stream(StreamEvent),
6586    /// One or more application datagrams have been received
6587    DatagramReceived,
6588    /// One or more application datagrams have been sent after blocking
6589    DatagramsUnblocked,
6590    /// (Multi)Path events
6591    Path(PathEvent),
6592    /// Iroh's nat traversal events
6593    NatTraversal(iroh_hp::Event),
6594}
6595
6596impl From<PathEvent> for Event {
6597    fn from(source: PathEvent) -> Self {
6598        Self::Path(source)
6599    }
6600}
6601
6602fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6603    Duration::from_micros(params.max_ack_delay.0 * 1000)
6604}
6605
6606// Prevents overflow and improves behavior in extreme circumstances
6607const MAX_BACKOFF_EXPONENT: u32 = 16;
6608
6609/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
6610///
6611/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
6612/// plus some space for frames. We only care about handshake headers because short header packets
6613/// necessarily have smaller headers, and initial packets are only ever the first packet in a
6614/// datagram (because we coalesce in ascending packet space order and the only reason to split a
6615/// packet is when packet space changes).
6616const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6617
6618/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
6619///
6620/// Excludes packet-type-specific fields such as packet number or Initial token
6621// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
6622// scid len + scid + length + pn
6623const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6624    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6625
6626/// Perform key updates this many packets before the AEAD confidentiality limit.
6627///
6628/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
6629const KEY_UPDATE_MARGIN: u64 = 10_000;
6630
6631#[derive(Default)]
6632struct SentFrames {
6633    retransmits: ThinRetransmits,
6634    /// The packet number of the largest acknowledged packet for each path
6635    largest_acked: FxHashMap<PathId, u64>,
6636    stream_frames: StreamMetaVec,
6637    /// Whether the packet contains non-retransmittable frames (like datagrams)
6638    non_retransmits: bool,
6639    /// If the datagram containing these frames should be padded to the min MTU
6640    requires_padding: bool,
6641}
6642
6643impl SentFrames {
6644    /// Returns whether the packet contains only ACKs
6645    fn is_ack_only(&self, streams: &StreamsState) -> bool {
6646        !self.largest_acked.is_empty()
6647            && !self.non_retransmits
6648            && self.stream_frames.is_empty()
6649            && self.retransmits.is_empty(streams)
6650    }
6651}
6652
6653/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
6654///
6655/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
6656///
6657/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
6658///
6659/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
6660fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6661    match (x, y) {
6662        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6663        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6664        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6665        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6666    }
6667}
6668
6669#[cfg(test)]
6670mod tests {
6671    use super::*;
6672
6673    #[test]
6674    fn negotiate_max_idle_timeout_commutative() {
6675        let test_params = [
6676            (None, None, None),
6677            (None, Some(VarInt(0)), None),
6678            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6679            (Some(VarInt(0)), Some(VarInt(0)), None),
6680            (
6681                Some(VarInt(2)),
6682                Some(VarInt(0)),
6683                Some(Duration::from_millis(2)),
6684            ),
6685            (
6686                Some(VarInt(1)),
6687                Some(VarInt(4)),
6688                Some(Duration::from_millis(1)),
6689            ),
6690        ];
6691
6692        for (left, right, result) in test_params {
6693            assert_eq!(negotiate_max_idle_timeout(left, right), result);
6694            assert_eq!(negotiate_max_idle_timeout(right, left), result);
6695        }
6696    }
6697}