iroh_quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque, btree_map},
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    num::{NonZeroU32, NonZeroUsize},
8    ops::Not,
9    sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21    Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22    MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23    TransportErrorCode, VarInt,
24    cid_generator::ConnectionIdGenerator,
25    cid_queue::CidQueue,
26    coding::BufMutExt,
27    config::{ServerConfig, TransportConfig},
28    congestion::Controller,
29    connection::{
30        qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31        spaces::LostPacket,
32        timer::{ConnTimer, PathTimer},
33    },
34    crypto::{self, KeyPair, Keys, PacketKey},
35    frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36    iroh_hp,
37    packet::{
38        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39        PacketNumber, PartialDecode, SpaceId,
40    },
41    range_set::ArrayRangeSet,
42    shared::{
43        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44        EndpointEvent, EndpointEventInner,
45    },
46    token::{ResetToken, Token, TokenPayload},
47    transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77
78mod send_buffer;
79
80mod spaces;
81#[cfg(fuzzing)]
82pub use spaces::Retransmits;
83#[cfg(not(fuzzing))]
84use spaces::Retransmits;
85use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
86
87mod stats;
88pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
89
90mod streams;
91#[cfg(fuzzing)]
92pub use streams::StreamsState;
93#[cfg(not(fuzzing))]
94use streams::StreamsState;
95pub use streams::{
96    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
97    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
98};
99
100mod timer;
101use timer::{Timer, TimerTable};
102
103mod transmit_buf;
104use transmit_buf::TransmitBuf;
105
106mod state;
107
108#[cfg(not(fuzzing))]
109use state::State;
110#[cfg(fuzzing)]
111pub use state::State;
112use state::StateType;
113
114/// Protocol state and logic for a single QUIC connection
115///
116/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
117/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
118/// expects timeouts through various methods. A number of simple getter methods are exposed
119/// to allow callers to inspect some of the connection state.
120///
121/// `Connection` has roughly 4 types of methods:
122///
123/// - A. Simple getters, taking `&self`
124/// - B. Handlers for incoming events from the network or system, named `handle_*`.
125/// - C. State machine mutators, for incoming commands from the application. For convenience we
126///   refer to this as "performing I/O" below, however as per the design of this library none of the
127///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
128///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
129/// - D. Polling functions for outgoing events or actions for the caller to
130///   take, named `poll_*`.
131///
132/// The simplest way to use this API correctly is to call (B) and (C) whenever
133/// appropriate, then after each of those calls, as soon as feasible call all
134/// polling methods (D) and deal with their outputs appropriately, e.g. by
135/// passing it to the application or by making a system-level I/O call. You
136/// should call the polling functions in this order:
137///
138/// 1. [`poll_transmit`](Self::poll_transmit)
139/// 2. [`poll_timeout`](Self::poll_timeout)
140/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
141/// 4. [`poll`](Self::poll)
142///
143/// Currently the only actual dependency is from (2) to (1), however additional
144/// dependencies may be added in future, so the above order is recommended.
145///
146/// (A) may be called whenever desired.
147///
148/// Care should be made to ensure that the input events represent monotonically
149/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
150/// with events of the same [`Instant`] may be interleaved in any order with a
151/// call to [`handle_event`](Self::handle_event) at that same instant; however
152/// events or timeouts with different instants must not be interleaved.
153pub struct Connection {
154    endpoint_config: Arc<EndpointConfig>,
155    config: Arc<TransportConfig>,
156    rng: StdRng,
157    crypto: Box<dyn crypto::Session>,
158    /// The CID we initially chose, for use during the handshake
159    handshake_cid: ConnectionId,
160    /// The CID the peer initially chose, for use during the handshake
161    rem_handshake_cid: ConnectionId,
162    /// The "real" local IP address which was was used to receive the initial packet.
163    /// This is only populated for the server case, and if known
164    local_ip: Option<IpAddr>,
165    /// The [`PathData`] for each path
166    ///
167    /// This needs to be ordered because [`Connection::poll_transmit`] needs to
168    /// deterministically select the next PathId to send on.
169    // TODO(flub): well does it really? But deterministic is nice for now.
170    paths: BTreeMap<PathId, PathState>,
171    /// Incremented every time we see a new path
172    ///
173    /// Stored separately from `path.generation` to account for aborted migrations
174    path_counter: u64,
175    /// Whether MTU detection is supported in this environment
176    allow_mtud: bool,
177    state: State,
178    side: ConnectionSide,
179    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
180    zero_rtt_enabled: bool,
181    /// Set if 0-RTT is supported, then cleared when no longer needed.
182    zero_rtt_crypto: Option<ZeroRttCrypto>,
183    key_phase: bool,
184    /// How many packets are in the current key phase. Used only for `Data` space.
185    key_phase_size: u64,
186    /// Transport parameters set by the peer
187    peer_params: TransportParameters,
188    /// Source ConnectionId of the first packet received from the peer
189    orig_rem_cid: ConnectionId,
190    /// Destination ConnectionId sent by the client on the first Initial
191    initial_dst_cid: ConnectionId,
192    /// The value that the server included in the Source Connection ID field of a Retry packet, if
193    /// one was received
194    retry_src_cid: Option<ConnectionId>,
195    /// Events returned by [`Connection::poll`]
196    events: VecDeque<Event>,
197    endpoint_events: VecDeque<EndpointEventInner>,
198    /// Whether the spin bit is in use for this connection
199    spin_enabled: bool,
200    /// Outgoing spin bit state
201    spin: bool,
202    /// Packet number spaces: initial, handshake, 1-RTT
203    spaces: [PacketSpace; 3],
204    /// Highest usable [`SpaceId`]
205    highest_space: SpaceId,
206    /// 1-RTT keys used prior to a key update
207    prev_crypto: Option<PrevCrypto>,
208    /// 1-RTT keys to be used for the next key update
209    ///
210    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
211    /// spoofing key updates.
212    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
213    accepted_0rtt: bool,
214    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
215    permit_idle_reset: bool,
216    /// Negotiated idle timeout
217    idle_timeout: Option<Duration>,
218    timers: TimerTable,
219    /// Number of packets received which could not be authenticated
220    authentication_failures: u64,
221
222    //
223    // Queued non-retransmittable 1-RTT data
224    //
225    /// If the CONNECTION_CLOSE frame needs to be sent
226    close: bool,
227
228    //
229    // ACK frequency
230    //
231    ack_frequency: AckFrequencyState,
232
233    //
234    // Congestion Control
235    //
236    /// Whether the most recently received packet had an ECN codepoint set
237    receiving_ecn: bool,
238    /// Number of packets authenticated
239    total_authed_packets: u64,
240    /// Whether the last `poll_transmit` call yielded no data because there was
241    /// no outgoing application data.
242    app_limited: bool,
243
244    //
245    // ObservedAddr
246    //
247    /// Sequence number for the next observed address frame sent to the peer.
248    next_observed_addr_seq_no: VarInt,
249
250    streams: StreamsState,
251    /// Surplus remote CIDs for future use on new paths
252    ///
253    /// These are given out before multiple paths exist, also for paths that will never
254    /// exist.  So if multipath is supported the number of paths here will be higher than
255    /// the actual number of paths in use.
256    rem_cids: FxHashMap<PathId, CidQueue>,
257    /// Attributes of CIDs generated by local endpoint
258    ///
259    /// Any path that is allowed to be opened is present in this map, as well as the already
260    /// opened paths. However since CIDs are issued async by the endpoint driver via
261    /// connection events it can not be used to know if CIDs have been issued for a path or
262    /// not. See [`Connection::max_path_id_with_cids`] for this.
263    local_cid_state: FxHashMap<PathId, CidState>,
264    /// State of the unreliable datagram extension
265    datagrams: DatagramState,
266    /// Connection level statistics
267    stats: ConnectionStats,
268    /// Path level statistics
269    path_stats: FxHashMap<PathId, PathStats>,
270    /// QUIC version used for the connection.
271    version: u32,
272
273    //
274    // Multipath
275    //
276    /// Maximum number of concurrent paths
277    ///
278    /// Initially set from the [`TransportConfig::max_concurrent_multipath_paths`]. Even
279    /// when multipath is disabled this will be set to 1, it is not used in that case
280    /// though.
281    max_concurrent_paths: NonZeroU32,
282    /// Local maximum [`PathId`] to be used
283    ///
284    /// This is initially set to [`TransportConfig::get_initial_max_path_id`] when multipath
285    /// is negotiated, or to [`PathId::ZERO`] otherwise. This is essentially the value of
286    /// the highest MAX_PATH_ID frame sent.
287    ///
288    /// Any path with an ID equal or below this [`PathId`] is either:
289    ///
290    /// - Abandoned, if it is also in [`Connection::abandoned_paths`].
291    /// - Open, in this case it is present in [`Connection::paths`]
292    /// - Not yet opened, if it is in neither of these two places.
293    ///
294    /// Note that for not-yet-open there may or may not be any CIDs issued. See
295    /// [`Connection::max_path_id_with_cids`].
296    local_max_path_id: PathId,
297    /// Remote's maximum [`PathId`] to be used
298    ///
299    /// This is initially set to the peer's [`TransportParameters::initial_max_path_id`] when
300    /// multipath is negotiated, or to [`PathId::ZERO`] otherwise. A peer may increase this limit
301    /// by sending [`Frame::MaxPathId`] frames.
302    remote_max_path_id: PathId,
303    /// The greatest [`PathId`] we have issued CIDs for
304    ///
305    /// CIDs are only issued for `min(local_max_path_id, remote_max_path_id)`. It is not
306    /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
307    /// since they are issued asynchronously by the endpoint driver.
308    max_path_id_with_cids: PathId,
309    /// The paths already abandoned
310    ///
311    /// They may still have some state left in [`Connection::paths`] or
312    /// [`Connection::local_cid_state`] since some of this has to be kept around for some
313    /// time after a path is abandoned.
314    // TODO(flub): Make this a more efficient data structure.  Like ranges of abandoned
315    //    paths.  Or a set together with a minimum.  Or something.
316    abandoned_paths: FxHashSet<PathId>,
317
318    iroh_hp: iroh_hp::State,
319    qlog: QlogSink,
320}
321
322impl Connection {
323    pub(crate) fn new(
324        endpoint_config: Arc<EndpointConfig>,
325        config: Arc<TransportConfig>,
326        init_cid: ConnectionId,
327        loc_cid: ConnectionId,
328        rem_cid: ConnectionId,
329        remote: SocketAddr,
330        local_ip: Option<IpAddr>,
331        crypto: Box<dyn crypto::Session>,
332        cid_gen: &dyn ConnectionIdGenerator,
333        now: Instant,
334        version: u32,
335        allow_mtud: bool,
336        rng_seed: [u8; 32],
337        side_args: SideArgs,
338        qlog: QlogSink,
339    ) -> Self {
340        let pref_addr_cid = side_args.pref_addr_cid();
341        let path_validated = side_args.path_validated();
342        let connection_side = ConnectionSide::from(side_args);
343        let side = connection_side.side();
344        let mut rng = StdRng::from_seed(rng_seed);
345        let initial_space = {
346            let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
347            space.crypto = Some(crypto.initial_keys(init_cid, side));
348            space
349        };
350        let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
351        #[cfg(test)]
352        let data_space = match config.deterministic_packet_numbers {
353            true => PacketSpace::new_deterministic(now, SpaceId::Data),
354            false => PacketSpace::new(now, SpaceId::Data, &mut rng),
355        };
356        #[cfg(not(test))]
357        let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
358        let state = State::handshake(state::Handshake {
359            rem_cid_set: side.is_server(),
360            expected_token: Bytes::new(),
361            client_hello: None,
362            allow_server_migration: side.is_client(),
363        });
364        let local_cid_state = FxHashMap::from_iter([(
365            PathId::ZERO,
366            CidState::new(
367                cid_gen.cid_len(),
368                cid_gen.cid_lifetime(),
369                now,
370                if pref_addr_cid.is_some() { 2 } else { 1 },
371            ),
372        )]);
373
374        let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
375        // TODO(@divma): consider if we want to delay this until the path is validated
376        path.open = true;
377        let mut this = Self {
378            endpoint_config,
379            crypto,
380            handshake_cid: loc_cid,
381            rem_handshake_cid: rem_cid,
382            local_cid_state,
383            paths: BTreeMap::from_iter([(
384                PathId::ZERO,
385                PathState {
386                    data: path,
387                    prev: None,
388                },
389            )]),
390            path_counter: 0,
391            allow_mtud,
392            local_ip,
393            state,
394            side: connection_side,
395            zero_rtt_enabled: false,
396            zero_rtt_crypto: None,
397            key_phase: false,
398            // A small initial key phase size ensures peers that don't handle key updates correctly
399            // fail sooner rather than later. It's okay for both peers to do this, as the first one
400            // to perform an update will reset the other's key phase size in `update_keys`, and a
401            // simultaneous key update by both is just like a regular key update with a really fast
402            // response. Inspired by quic-go's similar behavior of performing the first key update
403            // at the 100th short-header packet.
404            key_phase_size: rng.random_range(10..1000),
405            peer_params: TransportParameters::default(),
406            orig_rem_cid: rem_cid,
407            initial_dst_cid: init_cid,
408            retry_src_cid: None,
409            events: VecDeque::new(),
410            endpoint_events: VecDeque::new(),
411            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
412            spin: false,
413            spaces: [initial_space, handshake_space, data_space],
414            highest_space: SpaceId::Initial,
415            prev_crypto: None,
416            next_crypto: None,
417            accepted_0rtt: false,
418            permit_idle_reset: true,
419            idle_timeout: match config.max_idle_timeout {
420                None | Some(VarInt(0)) => None,
421                Some(dur) => Some(Duration::from_millis(dur.0)),
422            },
423            timers: TimerTable::default(),
424            authentication_failures: 0,
425            close: false,
426
427            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
428                &TransportParameters::default(),
429            )),
430
431            app_limited: false,
432            receiving_ecn: false,
433            total_authed_packets: 0,
434
435            next_observed_addr_seq_no: 0u32.into(),
436
437            streams: StreamsState::new(
438                side,
439                config.max_concurrent_uni_streams,
440                config.max_concurrent_bidi_streams,
441                config.send_window,
442                config.receive_window,
443                config.stream_receive_window,
444            ),
445            datagrams: DatagramState::default(),
446            config,
447            rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
448            rng,
449            stats: ConnectionStats::default(),
450            path_stats: Default::default(),
451            version,
452
453            // peer params are not yet known, so multipath is not enabled
454            max_concurrent_paths: NonZeroU32::MIN,
455            local_max_path_id: PathId::ZERO,
456            remote_max_path_id: PathId::ZERO,
457            max_path_id_with_cids: PathId::ZERO,
458            abandoned_paths: Default::default(),
459
460            // iroh's nat traversal
461            iroh_hp: Default::default(),
462            qlog,
463        };
464        if path_validated {
465            this.on_path_validated(PathId::ZERO);
466        }
467        if side.is_client() {
468            // Kick off the connection
469            this.write_crypto();
470            this.init_0rtt(now);
471        }
472        this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
473        this
474    }
475
476    /// Returns the next time at which `handle_timeout` should be called
477    ///
478    /// The value returned may change after:
479    /// - the application performed some I/O on the connection
480    /// - a call was made to `handle_event`
481    /// - a call to `poll_transmit` returned `Some`
482    /// - a call was made to `handle_timeout`
483    #[must_use]
484    pub fn poll_timeout(&mut self) -> Option<Instant> {
485        self.timers.peek()
486    }
487
488    /// Returns application-facing events
489    ///
490    /// Connections should be polled for events after:
491    /// - a call was made to `handle_event`
492    /// - a call was made to `handle_timeout`
493    #[must_use]
494    pub fn poll(&mut self) -> Option<Event> {
495        if let Some(x) = self.events.pop_front() {
496            return Some(x);
497        }
498
499        if let Some(event) = self.streams.poll() {
500            return Some(Event::Stream(event));
501        }
502
503        if let Some(reason) = self.state.take_error() {
504            return Some(Event::ConnectionLost { reason });
505        }
506
507        None
508    }
509
510    /// Return endpoint-facing events
511    #[must_use]
512    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
513        self.endpoint_events.pop_front().map(EndpointEvent)
514    }
515
516    /// Provide control over streams
517    #[must_use]
518    pub fn streams(&mut self) -> Streams<'_> {
519        Streams {
520            state: &mut self.streams,
521            conn_state: &self.state,
522        }
523    }
524
525    /// Provide control over streams
526    #[must_use]
527    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
528        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
529        RecvStream {
530            id,
531            state: &mut self.streams,
532            pending: &mut self.spaces[SpaceId::Data].pending,
533        }
534    }
535
536    /// Provide control over streams
537    #[must_use]
538    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
539        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
540        SendStream {
541            id,
542            state: &mut self.streams,
543            pending: &mut self.spaces[SpaceId::Data].pending,
544            conn_state: &self.state,
545        }
546    }
547
548    /// Opens a new path only if no path to the remote address exists so far
549    ///
550    /// See [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id,
551    /// false)` if was opened.
552    ///
553    /// [`open_path`]: Connection::open_path
554    pub fn open_path_ensure(
555        &mut self,
556        remote: SocketAddr,
557        initial_status: PathStatus,
558        now: Instant,
559    ) -> Result<(PathId, bool), PathError> {
560        match self
561            .paths
562            .iter()
563            .find(|(_id, path)| path.data.remote == remote)
564        {
565            Some((path_id, _state)) => Ok((*path_id, true)),
566            None => self
567                .open_path(remote, initial_status, now)
568                .map(|id| (id, false)),
569        }
570    }
571
572    /// Opens a new path
573    ///
574    /// Further errors might occur and they will be emitted in [`PathEvent::LocallyClosed`] events.
575    /// When the path is opened it will be reported as an [`PathEvent::Opened`].
576    pub fn open_path(
577        &mut self,
578        remote: SocketAddr,
579        initial_status: PathStatus,
580        now: Instant,
581    ) -> Result<PathId, PathError> {
582        if !self.is_multipath_negotiated() {
583            return Err(PathError::MultipathNotNegotiated);
584        }
585        if self.side().is_server() {
586            return Err(PathError::ServerSideNotAllowed);
587        }
588
589        let max_abandoned = self.abandoned_paths.iter().max().copied();
590        let max_used = self.paths.keys().last().copied();
591        let path_id = max_abandoned
592            .max(max_used)
593            .unwrap_or(PathId::ZERO)
594            .saturating_add(1u8);
595
596        if Some(path_id) > self.max_path_id() {
597            return Err(PathError::MaxPathIdReached);
598        }
599        if path_id > self.remote_max_path_id {
600            self.spaces[SpaceId::Data].pending.paths_blocked = true;
601            return Err(PathError::MaxPathIdReached);
602        }
603        if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
604            self.spaces[SpaceId::Data]
605                .pending
606                .path_cids_blocked
607                .push(path_id);
608            return Err(PathError::RemoteCidsExhausted);
609        }
610
611        let path = self.ensure_path(path_id, remote, now, None);
612        path.status.local_update(initial_status);
613
614        Ok(path_id)
615    }
616
617    /// Closes a path by sending a PATH_ABANDON frame
618    ///
619    /// This will not allow closing the last path. It does allow closing paths which have
620    /// not yet been opened, as e.g. is the case when receiving a PATH_ABANDON from the peer
621    /// for a path that was never opened locally.
622    pub fn close_path(
623        &mut self,
624        now: Instant,
625        path_id: PathId,
626        error_code: VarInt,
627    ) -> Result<(), ClosePathError> {
628        if self.abandoned_paths.contains(&path_id)
629            || Some(path_id) > self.max_path_id()
630            || !self.paths.contains_key(&path_id)
631        {
632            return Err(ClosePathError::ClosedPath);
633        }
634        if self
635            .paths
636            .iter()
637            // Would there be any remaining, non-abandoned, validated paths
638            .any(|(id, path)| {
639                *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
640            })
641            .not()
642        {
643            return Err(ClosePathError::LastOpenPath);
644        }
645
646        // Send PATH_ABANDON
647        self.spaces[SpaceId::Data]
648            .pending
649            .path_abandon
650            .insert(path_id, error_code.into());
651
652        // Remove pending NEW CIDs for this path
653        let pending_space = &mut self.spaces[SpaceId::Data].pending;
654        pending_space.new_cids.retain(|cid| cid.path_id != path_id);
655        pending_space.path_cids_blocked.retain(|&id| id != path_id);
656        pending_space.path_status.retain(|&id| id != path_id);
657
658        // Cleanup retransmits across ALL paths (CIDs for path_id may have been transmitted on other paths)
659        for space in self.spaces[SpaceId::Data].iter_paths_mut() {
660            for sent_packet in space.sent_packets.values_mut() {
661                if let Some(retransmits) = sent_packet.retransmits.get_mut() {
662                    retransmits.new_cids.retain(|cid| cid.path_id != path_id);
663                    retransmits.path_cids_blocked.retain(|&id| id != path_id);
664                    retransmits.path_status.retain(|&id| id != path_id);
665                }
666            }
667        }
668
669        // Consider remotely issued CIDs as retired.
670        // Technically we don't have to do this just yet.  We only need to do this *after*
671        // the ABANDON_PATH frame is sent, allowing us to still send it on the
672        // to-be-abandoned path.  However it is recommended to send it on another path, and
673        // we do not allow abandoning the last path anyway.
674        self.rem_cids.remove(&path_id);
675        self.endpoint_events
676            .push_back(EndpointEventInner::RetireResetToken(path_id));
677
678        let pto = self.pto_max_path(SpaceId::Data, false);
679
680        let path = self.paths.get_mut(&path_id).expect("checked above");
681
682        // We record the time after which receiving data on this path generates a transport error.
683        path.data.last_allowed_receive = Some(now + 3 * pto);
684        self.abandoned_paths.insert(path_id);
685
686        self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
687
688        // The peer MUST respond with a corresponding PATH_ABANDON frame.
689        // If we receive packets on the abandoned path after 3 * PTO we trigger a transport error.
690        // In any case, we completely discard the path after 6 * PTO whether we receive a
691        // PATH_ABANDON or not.
692        self.timers.set(
693            Timer::PerPath(path_id, PathTimer::DiscardPath),
694            now + 6 * pto,
695            self.qlog.with_time(now),
696        );
697        Ok(())
698    }
699
700    /// Gets the [`PathData`] for a known [`PathId`].
701    ///
702    /// Will panic if the path_id does not reference any known path.
703    #[track_caller]
704    fn path_data(&self, path_id: PathId) -> &PathData {
705        if let Some(data) = self.paths.get(&path_id) {
706            &data.data
707        } else {
708            panic!(
709                "unknown path: {path_id}, currently known paths: {:?}",
710                self.paths.keys().collect::<Vec<_>>()
711            );
712        }
713    }
714
715    /// Gets a reference to the [`PathData`] for a [`PathId`]
716    fn path(&self, path_id: PathId) -> Option<&PathData> {
717        self.paths.get(&path_id).map(|path_state| &path_state.data)
718    }
719
720    /// Gets a mutable reference to the [`PathData`] for a [`PathId`]
721    fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
722        self.paths
723            .get_mut(&path_id)
724            .map(|path_state| &mut path_state.data)
725    }
726
727    /// Returns all known paths.
728    ///
729    /// There is no guarantee any of these paths are open or usable.
730    pub fn paths(&self) -> Vec<PathId> {
731        self.paths.keys().copied().collect()
732    }
733
734    /// Gets the local [`PathStatus`] for a known [`PathId`]
735    pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
736        self.path(path_id)
737            .map(PathData::local_status)
738            .ok_or(ClosedPath { _private: () })
739    }
740
741    /// Returns the path's remote socket address
742    pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
743        self.path(path_id)
744            .map(|path| path.remote)
745            .ok_or(ClosedPath { _private: () })
746    }
747
748    /// Sets the [`PathStatus`] for a known [`PathId`]
749    ///
750    /// Returns the previous path status on success.
751    pub fn set_path_status(
752        &mut self,
753        path_id: PathId,
754        status: PathStatus,
755    ) -> Result<PathStatus, SetPathStatusError> {
756        if !self.is_multipath_negotiated() {
757            return Err(SetPathStatusError::MultipathNotNegotiated);
758        }
759        let path = self
760            .path_mut(path_id)
761            .ok_or(SetPathStatusError::ClosedPath)?;
762        let prev = match path.status.local_update(status) {
763            Some(prev) => {
764                self.spaces[SpaceId::Data]
765                    .pending
766                    .path_status
767                    .insert(path_id);
768                prev
769            }
770            None => path.local_status(),
771        };
772        Ok(prev)
773    }
774
775    /// Returns the remote path status
776    // TODO(flub): Probably should also be some kind of path event?  Not even sure if I like
777    //    this as an API, but for now it allows me to write a test easily.
778    // TODO(flub): Technically this should be a Result<Option<PathSTatus>>?
779    pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
780        self.path(path_id).and_then(|path| path.remote_status())
781    }
782
783    /// Sets the max_idle_timeout for a specific path
784    ///
785    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
786    ///
787    /// Returns the previous value of the setting.
788    pub fn set_path_max_idle_timeout(
789        &mut self,
790        path_id: PathId,
791        timeout: Option<Duration>,
792    ) -> Result<Option<Duration>, ClosedPath> {
793        let path = self
794            .paths
795            .get_mut(&path_id)
796            .ok_or(ClosedPath { _private: () })?;
797        Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
798    }
799
800    /// Sets the keep_alive_interval for a specific path
801    ///
802    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
803    ///
804    /// Returns the previous value of the setting.
805    pub fn set_path_keep_alive_interval(
806        &mut self,
807        path_id: PathId,
808        interval: Option<Duration>,
809    ) -> Result<Option<Duration>, ClosedPath> {
810        let path = self
811            .paths
812            .get_mut(&path_id)
813            .ok_or(ClosedPath { _private: () })?;
814        Ok(std::mem::replace(&mut path.data.keep_alive, interval))
815    }
816
817    /// Gets the [`PathData`] for a known [`PathId`].
818    ///
819    /// Will panic if the path_id does not reference any known path.
820    #[track_caller]
821    fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
822        &mut self.paths.get_mut(&path_id).expect("known path").data
823    }
824
825    fn ensure_path(
826        &mut self,
827        path_id: PathId,
828        remote: SocketAddr,
829        now: Instant,
830        pn: Option<u64>,
831    ) -> &mut PathData {
832        let vacant_entry = match self.paths.entry(path_id) {
833            btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
834            btree_map::Entry::Occupied(occupied_entry) => {
835                return &mut occupied_entry.into_mut().data;
836            }
837        };
838
839        // TODO(matheus23): Add back short-circuiting path.validated = true, if we know that the
840        // path's four-tuple was already validated.
841        debug!(%path_id, ?remote, "path added");
842        let peer_max_udp_payload_size =
843            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
844        self.path_counter = self.path_counter.wrapping_add(1);
845        let mut data = PathData::new(
846            remote,
847            self.allow_mtud,
848            Some(peer_max_udp_payload_size),
849            self.path_counter,
850            now,
851            &self.config,
852        );
853
854        let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
855        self.timers.set(
856            Timer::PerPath(path_id, PathTimer::PathOpen),
857            now + 3 * pto,
858            self.qlog.with_time(now),
859        );
860
861        // for the path to be opened we need to send a packet on the path. Sending a challenge
862        // guarantees this
863        data.send_new_challenge = true;
864
865        let path = vacant_entry.insert(PathState { data, prev: None });
866
867        let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
868        if let Some(pn) = pn {
869            pn_space.dedup.insert(pn);
870        }
871        self.spaces[SpaceId::Data]
872            .number_spaces
873            .insert(path_id, pn_space);
874        self.qlog.emit_tuple_assigned(path_id, remote, now);
875        &mut path.data
876    }
877
878    /// Returns packets to transmit
879    ///
880    /// Connections should be polled for transmit after:
881    /// - the application performed some I/O on the connection
882    /// - a call was made to `handle_event`
883    /// - a call was made to `handle_timeout`
884    ///
885    /// `max_datagrams` specifies how many datagrams can be returned inside a
886    /// single Transmit using GSO. This must be at least 1.
887    #[must_use]
888    pub fn poll_transmit(
889        &mut self,
890        now: Instant,
891        max_datagrams: NonZeroUsize,
892        buf: &mut Vec<u8>,
893    ) -> Option<Transmit> {
894        if let Some(probing) = self
895            .iroh_hp
896            .server_side_mut()
897            .ok()
898            .and_then(iroh_hp::ServerState::next_probe)
899        {
900            let destination = probing.remote();
901            trace!(%destination, "RAND_DATA packet");
902            let token: u64 = self.rng.random();
903            buf.put_u64(token);
904            probing.finish(token);
905            return Some(Transmit {
906                destination,
907                ecn: None,
908                size: 8,
909                segment_size: None,
910                src_ip: None,
911            });
912        }
913
914        let max_datagrams = match self.config.enable_segmentation_offload {
915            false => NonZeroUsize::MIN,
916            true => max_datagrams,
917        };
918
919        // Each call to poll_transmit can only send datagrams to one destination, because
920        // all datagrams in a GSO batch are for the same destination.  Therefore only
921        // datagrams for one Path ID are produced for each poll_transmit call.
922
923        // TODO(flub): this is wishful thinking and not actually implemented, but perhaps it
924        //   should be:
925
926        // First, if we have to send a close, select a path for that.
927        // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending.
928
929        // For all, open, validated and AVAILABLE paths:
930        // - Is the path congestion blocked or pacing blocked?
931        // - call maybe_queue_ to ensure a tail-loss probe would be sent?
932        // - do we need to send a close message?
933        // - call can_send
934        // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths
935
936        // Check whether we need to send a close message
937        let close = match self.state.as_type() {
938            StateType::Drained => {
939                self.app_limited = true;
940                return None;
941            }
942            StateType::Draining | StateType::Closed => {
943                // self.close is only reset once the associated packet had been
944                // encoded successfully
945                if !self.close {
946                    self.app_limited = true;
947                    return None;
948                }
949                true
950            }
951            _ => false,
952        };
953
954        // Check whether we need to send an ACK_FREQUENCY frame
955        if let Some(config) = &self.config.ack_frequency_config {
956            let rtt = self
957                .paths
958                .values()
959                .map(|p| p.data.rtt.get())
960                .min()
961                .expect("one path exists");
962            self.spaces[SpaceId::Data].pending.ack_frequency = self
963                .ack_frequency
964                .should_send_ack_frequency(rtt, config, &self.peer_params)
965                && self.highest_space == SpaceId::Data
966                && self.peer_supports_ack_frequency();
967        }
968
969        // Whether this packet can be coalesced with another one in the same datagram.
970        let mut coalesce = true;
971
972        // Whether the last packet in the datagram must be padded so the datagram takes up
973        // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller.
974        let mut pad_datagram = PadDatagram::No;
975
976        // Whether congestion control stopped the next packet from being sent. Further
977        // packets could still be built, as e.g. tail-loss probes are not congestion
978        // limited.
979        let mut congestion_blocked = false;
980
981        // The packet number of the last built packet.
982        let mut last_packet_number = None;
983
984        let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
985
986        // If there is any open, validated and available path we only want to send frames to
987        // any backup path that must be sent on that backup path exclusively.
988        let have_available_path = self.paths.iter().any(|(id, path)| {
989            path.data.validated
990                && path.data.local_status() == PathStatus::Available
991                && self.rem_cids.contains_key(id)
992        });
993
994        // Setup for the first path_id
995        let mut transmit = TransmitBuf::new(
996            buf,
997            max_datagrams,
998            self.path_data(path_id).current_mtu().into(),
999        );
1000        if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1001            return Some(challenge);
1002        }
1003        let mut space_id = match path_id {
1004            PathId::ZERO => SpaceId::Initial,
1005            _ => SpaceId::Data,
1006        };
1007
1008        loop {
1009            // check if there is at least one active CID to use for sending
1010            let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1011                let err = PathError::RemoteCidsExhausted;
1012                if !self.abandoned_paths.contains(&path_id) {
1013                    debug!(?err, %path_id, "no active CID for path");
1014                    self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1015                        id: path_id,
1016                        error: err,
1017                    }));
1018                    // Locally we should have refused to open this path, the remote should
1019                    // have given us CIDs for this path before opening it.  So we can always
1020                    // abandon this here.
1021                    self.close_path(
1022                        now,
1023                        path_id,
1024                        TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1025                    )
1026                    .ok();
1027                    self.spaces[SpaceId::Data]
1028                        .pending
1029                        .path_cids_blocked
1030                        .push(path_id);
1031                } else {
1032                    trace!(%path_id, "remote CIDs retired for abandoned path");
1033                }
1034
1035                match self.paths.keys().find(|&&next| next > path_id) {
1036                    Some(next_path_id) => {
1037                        // See if this next path can send anything.
1038                        path_id = *next_path_id;
1039                        space_id = SpaceId::Data;
1040
1041                        // update per path state
1042                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1043                        if let Some(challenge) =
1044                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1045                        {
1046                            return Some(challenge);
1047                        }
1048
1049                        continue;
1050                    }
1051                    None => {
1052                        // Nothing more to send.
1053                        trace!(
1054                            ?space_id,
1055                            %path_id,
1056                            "no CIDs to send on path, no more paths"
1057                        );
1058                        break;
1059                    }
1060                }
1061            };
1062
1063            // Determine if anything can be sent in this packet number space (SpaceId +
1064            // PathId).
1065            let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1066                // We are trying to coalesce another packet into this datagram.
1067                transmit.datagram_remaining_mut()
1068            } else {
1069                // A new datagram needs to be started.
1070                transmit.segment_size()
1071            };
1072            let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1073            let path_should_send = {
1074                let path_exclusive_only = space_id == SpaceId::Data
1075                    && have_available_path
1076                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1077                let path_should_send = if path_exclusive_only {
1078                    can_send.path_exclusive
1079                } else {
1080                    !can_send.is_empty()
1081                };
1082                let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1083                path_should_send || needs_loss_probe
1084            };
1085
1086            if !path_should_send && space_id < SpaceId::Data {
1087                if self.spaces[space_id].crypto.is_some() {
1088                    trace!(?space_id, %path_id, "nothing to send in space");
1089                }
1090                space_id = space_id.next();
1091                continue;
1092            }
1093
1094            let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1095                // Only check congestion control if a new datagram is needed.
1096                self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1097            } else {
1098                PathBlocked::No
1099            };
1100            if send_blocked != PathBlocked::No {
1101                trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1102                congestion_blocked = true;
1103            }
1104            if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1105                // Higher spaces might still have tail-loss probes to send, which are not
1106                // congestion blocked.
1107                space_id = space_id.next();
1108                continue;
1109            }
1110            if !path_should_send || send_blocked != PathBlocked::No {
1111                // Nothing more to send on this path, check the next path if possible.
1112
1113                // If there are any datagrams in the transmit, packets for another path can
1114                // not be built.
1115                if transmit.num_datagrams() > 0 {
1116                    break;
1117                }
1118
1119                match self.paths.keys().find(|&&next| next > path_id) {
1120                    Some(next_path_id) => {
1121                        // See if this next path can send anything.
1122                        trace!(
1123                            ?space_id,
1124                            %path_id,
1125                            %next_path_id,
1126                            "nothing to send on path"
1127                        );
1128                        path_id = *next_path_id;
1129                        space_id = SpaceId::Data;
1130
1131                        // update per path state
1132                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1133                        if let Some(challenge) =
1134                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1135                        {
1136                            return Some(challenge);
1137                        }
1138
1139                        continue;
1140                    }
1141                    None => {
1142                        // Nothing more to send.
1143                        trace!(
1144                            ?space_id,
1145                            %path_id,
1146                            next_path_id=?None::<PathId>,
1147                            "nothing to send on path"
1148                        );
1149                        break;
1150                    }
1151                }
1152            }
1153
1154            // If the datagram is full, we need to start a new one.
1155            if transmit.datagram_remaining_mut() == 0 {
1156                if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1157                    // No more datagrams allowed
1158                    break;
1159                }
1160
1161                match self.spaces[space_id].for_path(path_id).loss_probes {
1162                    0 => transmit.start_new_datagram(),
1163                    _ => {
1164                        // We need something to send for a tail-loss probe.
1165                        let request_immediate_ack =
1166                            space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1167                        self.spaces[space_id].maybe_queue_probe(
1168                            path_id,
1169                            request_immediate_ack,
1170                            &self.streams,
1171                        );
1172
1173                        self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1174
1175                        // Clamp the datagram to at most the minimum MTU to ensure that loss
1176                        // probes can get through and enable recovery even if the path MTU
1177                        // has shrank unexpectedly.
1178                        transmit.start_new_datagram_with_size(std::cmp::min(
1179                            usize::from(INITIAL_MTU),
1180                            transmit.segment_size(),
1181                        ));
1182                    }
1183                }
1184                trace!(count = transmit.num_datagrams(), "new datagram started");
1185                coalesce = true;
1186                pad_datagram = PadDatagram::No;
1187            }
1188
1189            // If coalescing another packet into the existing datagram, there should
1190            // still be enough space for a whole packet.
1191            if transmit.datagram_start_offset() < transmit.len() {
1192                debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1193            }
1194
1195            //
1196            // From here on, we've determined that a packet will definitely be sent.
1197            //
1198
1199            if self.spaces[SpaceId::Initial].crypto.is_some()
1200                && space_id == SpaceId::Handshake
1201                && self.side.is_client()
1202            {
1203                // A client stops both sending and processing Initial packets when it
1204                // sends its first Handshake packet.
1205                self.discard_space(now, SpaceId::Initial);
1206            }
1207            if let Some(ref mut prev) = self.prev_crypto {
1208                prev.update_unacked = false;
1209            }
1210
1211            let mut qlog = QlogSentPacket::default();
1212            let mut builder = PacketBuilder::new(
1213                now,
1214                space_id,
1215                path_id,
1216                remote_cid,
1217                &mut transmit,
1218                can_send.other,
1219                self,
1220                &mut qlog,
1221            )?;
1222            last_packet_number = Some(builder.exact_number);
1223            coalesce = coalesce && !builder.short_header;
1224
1225            if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1226                // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1
1227                pad_datagram |= PadDatagram::ToMinMtu;
1228            }
1229            if space_id == SpaceId::Data && self.config.pad_to_mtu {
1230                pad_datagram |= PadDatagram::ToSegmentSize;
1231            }
1232
1233            if can_send.close {
1234                trace!("sending CONNECTION_CLOSE");
1235                // Encode ACKs before the ConnectionClose message, to give the receiver
1236                // a better approximate on what data has been processed. This is
1237                // especially important with ack delay, since the peer might not
1238                // have gotten any other ACK for the data earlier on.
1239                let mut sent_frames = SentFrames::default();
1240                let is_multipath_negotiated = self.is_multipath_negotiated();
1241                for path_id in self.spaces[space_id]
1242                    .number_spaces
1243                    .iter()
1244                    .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1245                    .map(|(&path_id, _)| path_id)
1246                    .collect::<Vec<_>>()
1247                {
1248                    Self::populate_acks(
1249                        now,
1250                        self.receiving_ecn,
1251                        &mut sent_frames,
1252                        path_id,
1253                        space_id,
1254                        &mut self.spaces[space_id],
1255                        is_multipath_negotiated,
1256                        &mut builder.frame_space_mut(),
1257                        &mut self.stats,
1258                        &mut qlog,
1259                    );
1260                }
1261
1262                // Since there only 64 ACK frames there will always be enough space
1263                // to encode the ConnectionClose frame too. However we still have the
1264                // check here to prevent crashes if something changes.
1265                debug_assert!(
1266                    builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1267                    "ACKs should leave space for ConnectionClose"
1268                );
1269                if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1270                    let max_frame_size = builder.frame_space_remaining();
1271                    match self.state.as_type() {
1272                        StateType::Closed => {
1273                            let reason: Close =
1274                                self.state.as_closed().expect("checked").clone().into();
1275                            if space_id == SpaceId::Data || reason.is_transport_layer() {
1276                                reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1277                                qlog.frame(&Frame::Close(reason));
1278                            } else {
1279                                let frame = frame::ConnectionClose {
1280                                    error_code: TransportErrorCode::APPLICATION_ERROR,
1281                                    frame_type: None,
1282                                    reason: Bytes::new(),
1283                                };
1284                                frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1285                                qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1286                            }
1287                        }
1288                        StateType::Draining => {
1289                            let frame = frame::ConnectionClose {
1290                                error_code: TransportErrorCode::NO_ERROR,
1291                                frame_type: None,
1292                                reason: Bytes::new(),
1293                            };
1294                            frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1295                            qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1296                        }
1297                        _ => unreachable!(
1298                            "tried to make a close packet when the connection wasn't closed"
1299                        ),
1300                    };
1301                }
1302                builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1303                if space_id == self.highest_space {
1304                    // Don't send another close packet. Even with multipath we only send
1305                    // CONNECTION_CLOSE on a single path since we expect our paths to work.
1306                    self.close = false;
1307                    // `CONNECTION_CLOSE` is the final packet
1308                    break;
1309                } else {
1310                    // Send a close frame in every possible space for robustness, per
1311                    // RFC9000 "Immediate Close during the Handshake". Don't bother trying
1312                    // to send anything else.
1313                    space_id = space_id.next();
1314                    continue;
1315                }
1316            }
1317
1318            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that
1319            // path validation can occur while the link is saturated.
1320            if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1321                let path = self.path_data_mut(path_id);
1322                if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1323                    // TODO(flub): We need to use the right CID!  We shouldn't use the same
1324                    //    CID as the current active one for the path.  Though see also
1325                    //    https://github.com/quinn-rs/quinn/issues/2184
1326                    let response = frame::PathResponse(token);
1327                    trace!(%response, "(off-path)");
1328                    builder.frame_space_mut().write(response);
1329                    qlog.frame(&Frame::PathResponse(response));
1330                    self.stats.frame_tx.path_response += 1;
1331                    builder.finish_and_track(
1332                        now,
1333                        self,
1334                        path_id,
1335                        SentFrames {
1336                            non_retransmits: true,
1337                            ..SentFrames::default()
1338                        },
1339                        PadDatagram::ToMinMtu,
1340                        qlog,
1341                    );
1342                    self.stats.udp_tx.on_sent(1, transmit.len());
1343                    return Some(Transmit {
1344                        destination: remote,
1345                        size: transmit.len(),
1346                        ecn: None,
1347                        segment_size: None,
1348                        src_ip: self.local_ip,
1349                    });
1350                }
1351            }
1352
1353            let sent_frames = {
1354                let path_exclusive_only = have_available_path
1355                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1356                let pn = builder.exact_number;
1357                self.populate_packet(
1358                    now,
1359                    space_id,
1360                    path_id,
1361                    path_exclusive_only,
1362                    &mut builder.frame_space_mut(),
1363                    pn,
1364                    &mut qlog,
1365                )
1366            };
1367
1368            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
1369            // any other reason, there is a bug which leads to one component announcing write
1370            // readiness while not writing any data. This degrades performance. The condition is
1371            // only checked if the full MTU is available and when potentially large fixed-size
1372            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
1373            // writing ACKs.
1374            debug_assert!(
1375                !(sent_frames.is_ack_only(&self.streams)
1376                    && !can_send.acks
1377                    && can_send.other
1378                    && builder.buf.segment_size()
1379                        == self.path_data(path_id).current_mtu() as usize
1380                    && self.datagrams.outgoing.is_empty()),
1381                "SendableFrames was {can_send:?}, but only ACKs have been written"
1382            );
1383            if sent_frames.requires_padding {
1384                pad_datagram |= PadDatagram::ToMinMtu;
1385            }
1386
1387            for (path_id, _pn) in sent_frames.largest_acked.iter() {
1388                self.spaces[space_id]
1389                    .for_path(*path_id)
1390                    .pending_acks
1391                    .acks_sent();
1392                self.timers.stop(
1393                    Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1394                    self.qlog.with_time(now),
1395                );
1396            }
1397
1398            // Now we need to finish the packet.  Before we do so we need to know if we will
1399            // be coalescing the next packet into this one, or will be ending the datagram
1400            // as well.  Because if this is the last packet in the datagram more padding
1401            // might be needed because of the packet type, or to fill the GSO segment size.
1402
1403            // Are we allowed to coalesce AND is there enough space for another *packet* in
1404            // this datagram AND is there another packet to send in this or the next space?
1405            if coalesce
1406                && builder
1407                    .buf
1408                    .datagram_remaining_mut()
1409                    .saturating_sub(builder.predict_packet_end())
1410                    > MIN_PACKET_SPACE
1411                && self
1412                    .next_send_space(space_id, path_id, builder.buf, close)
1413                    .is_some()
1414            {
1415                // We can append/coalesce the next packet into the current
1416                // datagram. Finish the current packet without adding extra padding.
1417                builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1418            } else {
1419                // We need a new datagram for the next packet.  Finish the current
1420                // packet with padding.
1421                if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1422                    // If too many padding bytes would be required to continue the
1423                    // GSO batch after this packet, end the GSO batch here. Ensures
1424                    // that fixed-size frames with heterogeneous sizes
1425                    // (e.g. application datagrams) won't inadvertently waste large
1426                    // amounts of bandwidth. The exact threshold is a bit arbitrary
1427                    // and might benefit from further tuning, though there's no
1428                    // universally optimal value.
1429                    const MAX_PADDING: usize = 32;
1430                    if builder.buf.datagram_remaining_mut()
1431                        > builder.predict_packet_end() + MAX_PADDING
1432                    {
1433                        trace!(
1434                            "GSO truncated by demand for {} padding bytes",
1435                            builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1436                        );
1437                        builder.finish_and_track(
1438                            now,
1439                            self,
1440                            path_id,
1441                            sent_frames,
1442                            PadDatagram::No,
1443                            qlog,
1444                        );
1445                        break;
1446                    }
1447
1448                    // Pad the current datagram to GSO segment size so it can be
1449                    // included in the GSO batch.
1450                    builder.finish_and_track(
1451                        now,
1452                        self,
1453                        path_id,
1454                        sent_frames,
1455                        PadDatagram::ToSegmentSize,
1456                        qlog,
1457                    );
1458                } else {
1459                    builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1460                }
1461                if transmit.num_datagrams() == 1 {
1462                    transmit.clip_datagram_size();
1463                }
1464            }
1465        }
1466
1467        if let Some(last_packet_number) = last_packet_number {
1468            // Note that when sending in multiple packet spaces the last packet number will
1469            // be the one from the highest packet space.
1470            self.path_data_mut(path_id).congestion.on_sent(
1471                now,
1472                transmit.len() as u64,
1473                last_packet_number,
1474            );
1475        }
1476
1477        self.qlog.emit_recovery_metrics(
1478            path_id,
1479            &mut self.paths.get_mut(&path_id).unwrap().data,
1480            now,
1481        );
1482
1483        self.app_limited = transmit.is_empty() && !congestion_blocked;
1484
1485        // Send MTU probe if necessary
1486        if transmit.is_empty() && self.state.is_established() {
1487            // MTU probing happens only in Data space.
1488            let space_id = SpaceId::Data;
1489            path_id = *self.paths.first_key_value().expect("one path must exist").0;
1490            let probe_data = loop {
1491                // We MTU probe all paths for which all of the following is true:
1492                // - We have an active destination CID for the path.
1493                // - The remote address *and* path are validated.
1494                // - The path is not abandoned.
1495                // - The MTU Discovery subsystem wants to probe the path.
1496                let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1497                let eligible = self.path_data(path_id).validated
1498                    && !self.path_data(path_id).is_validating_path()
1499                    && !self.abandoned_paths.contains(&path_id);
1500                let probe_size = eligible
1501                    .then(|| {
1502                        let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1503                        self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1504                    })
1505                    .flatten();
1506                match (active_cid, probe_size) {
1507                    (Some(active_cid), Some(probe_size)) => {
1508                        // Let's send an MTUD probe!
1509                        break Some((active_cid, probe_size));
1510                    }
1511                    _ => {
1512                        // Find the next path to check if it needs an MTUD probe.
1513                        match self.paths.keys().find(|&&next| next > path_id) {
1514                            Some(next) => {
1515                                path_id = *next;
1516                                continue;
1517                            }
1518                            None => break None,
1519                        }
1520                    }
1521                }
1522            };
1523            if let Some((active_cid, probe_size)) = probe_data {
1524                // We are definitely sending a DPLPMTUD probe.
1525                debug_assert_eq!(transmit.num_datagrams(), 0);
1526                transmit.start_new_datagram_with_size(probe_size as usize);
1527
1528                let mut qlog = QlogSentPacket::default();
1529                let mut builder = PacketBuilder::new(
1530                    now,
1531                    space_id,
1532                    path_id,
1533                    active_cid,
1534                    &mut transmit,
1535                    true,
1536                    self,
1537                    &mut qlog,
1538                )?;
1539
1540                // We implement MTU probes as ping packets padded up to the probe size
1541                trace!(?probe_size, "writing MTUD probe");
1542                trace!("PING");
1543                builder.frame_space_mut().write(frame::FrameType::PING);
1544                qlog.frame(&Frame::Ping);
1545                self.stats.frame_tx.ping += 1;
1546
1547                // If supported by the peer, we want no delays to the probe's ACK
1548                if self.peer_supports_ack_frequency() {
1549                    trace!("IMMEDIATE_ACK");
1550                    builder
1551                        .frame_space_mut()
1552                        .write(frame::FrameType::IMMEDIATE_ACK);
1553                    self.stats.frame_tx.immediate_ack += 1;
1554                    qlog.frame(&Frame::ImmediateAck);
1555                }
1556
1557                let sent_frames = SentFrames {
1558                    non_retransmits: true,
1559                    ..Default::default()
1560                };
1561                builder.finish_and_track(
1562                    now,
1563                    self,
1564                    path_id,
1565                    sent_frames,
1566                    PadDatagram::ToSize(probe_size),
1567                    qlog,
1568                );
1569
1570                self.path_stats
1571                    .entry(path_id)
1572                    .or_default()
1573                    .sent_plpmtud_probes += 1;
1574            }
1575        }
1576
1577        if transmit.is_empty() {
1578            return None;
1579        }
1580
1581        let destination = self.path_data(path_id).remote;
1582        trace!(
1583            segment_size = transmit.segment_size(),
1584            last_datagram_len = transmit.len() % transmit.segment_size(),
1585            ?destination,
1586            "sending {} bytes in {} datagrams",
1587            transmit.len(),
1588            transmit.num_datagrams()
1589        );
1590        self.path_data_mut(path_id)
1591            .inc_total_sent(transmit.len() as u64);
1592
1593        self.stats
1594            .udp_tx
1595            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1596
1597        Some(Transmit {
1598            destination,
1599            size: transmit.len(),
1600            ecn: if self.path_data(path_id).sending_ecn {
1601                Some(EcnCodepoint::Ect0)
1602            } else {
1603                None
1604            },
1605            segment_size: match transmit.num_datagrams() {
1606                1 => None,
1607                _ => Some(transmit.segment_size()),
1608            },
1609            src_ip: self.local_ip,
1610        })
1611    }
1612
1613    /// Returns the [`SpaceId`] of the next packet space which has data to send
1614    ///
1615    /// This takes into account the space available to frames in the next datagram.
1616    // TODO(flub): This duplication is not nice.
1617    fn next_send_space(
1618        &mut self,
1619        current_space_id: SpaceId,
1620        path_id: PathId,
1621        buf: &TransmitBuf<'_>,
1622        close: bool,
1623    ) -> Option<SpaceId> {
1624        // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed
1625        // to be able to send an individual frame at least this large in the next 1-RTT
1626        // packet. This could be generalized to support every space, but it's only needed to
1627        // handle large fixed-size frames, which only exist in 1-RTT (application
1628        // datagrams). We don't account for coalesced packets potentially occupying space
1629        // because frames can always spill into the next datagram.
1630        let mut space_id = current_space_id;
1631        loop {
1632            let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1633            if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1634                return Some(space_id);
1635            }
1636            space_id = match space_id {
1637                SpaceId::Initial => SpaceId::Handshake,
1638                SpaceId::Handshake => SpaceId::Data,
1639                SpaceId::Data => break,
1640            }
1641        }
1642        None
1643    }
1644
1645    /// Checks if creating a new datagram would be blocked by congestion control
1646    fn path_congestion_check(
1647        &mut self,
1648        space_id: SpaceId,
1649        path_id: PathId,
1650        transmit: &TransmitBuf<'_>,
1651        can_send: &SendableFrames,
1652        now: Instant,
1653    ) -> PathBlocked {
1654        // Anti-amplification is only based on `total_sent`, which gets updated after
1655        // the transmit is sent. Therefore we pass the amount of bytes for datagrams
1656        // that are already created, as well as 1 byte for starting another datagram. If
1657        // there is any anti-amplification budget left, we always allow a full MTU to be
1658        // sent (see https://github.com/quinn-rs/quinn/issues/1082).
1659        if self.side().is_server()
1660            && self
1661                .path_data(path_id)
1662                .anti_amplification_blocked(transmit.len() as u64 + 1)
1663        {
1664            trace!(?space_id, %path_id, "blocked by anti-amplification");
1665            return PathBlocked::AntiAmplification;
1666        }
1667
1668        // Congestion control check.
1669        // Tail loss probes must not be blocked by congestion, or a deadlock could arise.
1670        let bytes_to_send = transmit.segment_size() as u64;
1671        let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1672
1673        if can_send.other && !need_loss_probe && !can_send.close {
1674            let path = self.path_data(path_id);
1675            if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1676                trace!(?space_id, %path_id, "blocked by congestion control");
1677                return PathBlocked::Congestion;
1678            }
1679        }
1680
1681        // Pacing check.
1682        if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1683            self.timers.set(
1684                Timer::PerPath(path_id, PathTimer::Pacing),
1685                delay,
1686                self.qlog.with_time(now),
1687            );
1688            // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though
1689            // they are not congestion controlled.
1690            trace!(?space_id, %path_id, "blocked by pacing");
1691            return PathBlocked::Pacing;
1692        }
1693
1694        PathBlocked::No
1695    }
1696
1697    /// Send PATH_CHALLENGE for a previous path if necessary
1698    ///
1699    /// QUIC-TRANSPORT section 9.3.3
1700    /// <https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding>
1701    fn send_prev_path_challenge(
1702        &mut self,
1703        now: Instant,
1704        buf: &mut TransmitBuf<'_>,
1705        path_id: PathId,
1706    ) -> Option<Transmit> {
1707        let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1708        // TODO (matheus23): We could use !prev_path.is_validating() here instead to
1709        // (possibly) also re-send challenges when they get lost.
1710        if !prev_path.send_new_challenge {
1711            return None;
1712        };
1713        prev_path.send_new_challenge = false;
1714        let destination = prev_path.remote;
1715        let token = self.rng.random();
1716        let info = paths::SentChallengeInfo {
1717            sent_instant: now,
1718            remote: destination,
1719        };
1720        prev_path.challenges_sent.insert(token, info);
1721        debug_assert_eq!(
1722            self.highest_space,
1723            SpaceId::Data,
1724            "PATH_CHALLENGE queued without 1-RTT keys"
1725        );
1726        buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1727
1728        // Use the previous CID to avoid linking the new path with the previous path. We
1729        // don't bother accounting for possible retirement of that prev_cid because this is
1730        // sent once, immediately after migration, when the CID is known to be valid. Even
1731        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1732        // this is sent first.
1733        debug_assert_eq!(buf.datagram_start_offset(), 0);
1734        let mut qlog = QlogSentPacket::default();
1735        let mut builder = PacketBuilder::new(
1736            now,
1737            SpaceId::Data,
1738            path_id,
1739            *prev_cid,
1740            buf,
1741            false,
1742            self,
1743            &mut qlog,
1744        )?;
1745        let challenge = frame::PathChallenge(token);
1746        trace!(%challenge, "validating previous path");
1747        qlog.frame(&Frame::PathChallenge(challenge));
1748        builder.frame_space_mut().write(challenge);
1749        self.stats.frame_tx.path_challenge += 1;
1750
1751        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1752        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1753        // unless the anti-amplification limit for the path does not permit
1754        // sending a datagram of this size
1755        builder.pad_to(MIN_INITIAL_SIZE);
1756
1757        builder.finish(self, now, qlog);
1758        self.stats.udp_tx.on_sent(1, buf.len());
1759
1760        Some(Transmit {
1761            destination,
1762            size: buf.len(),
1763            ecn: None,
1764            segment_size: None,
1765            src_ip: self.local_ip,
1766        })
1767    }
1768
1769    /// Indicate what types of frames are ready to send for the given space
1770    ///
1771    /// *packet_size* is the number of bytes available to build the next packet.  *close*
1772    /// *indicates whether a CONNECTION_CLOSE frame needs to be sent.
1773    fn space_can_send(
1774        &mut self,
1775        space_id: SpaceId,
1776        path_id: PathId,
1777        packet_size: usize,
1778        close: bool,
1779    ) -> SendableFrames {
1780        let pn = self.spaces[SpaceId::Data]
1781            .for_path(path_id)
1782            .peek_tx_number();
1783        let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1784        if self.spaces[space_id].crypto.is_none()
1785            && (space_id != SpaceId::Data
1786                || self.zero_rtt_crypto.is_none()
1787                || self.side.is_server())
1788        {
1789            // No keys available for this space
1790            return SendableFrames::empty();
1791        }
1792        let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1793        if space_id == SpaceId::Data {
1794            can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1795        }
1796
1797        can_send.close = close && self.spaces[space_id].crypto.is_some();
1798
1799        can_send
1800    }
1801
1802    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1803    ///
1804    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1805    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1806    /// extracted through the relevant methods.
1807    pub fn handle_event(&mut self, event: ConnectionEvent) {
1808        use ConnectionEventInner::*;
1809        match event.0 {
1810            Datagram(DatagramConnectionEvent {
1811                now,
1812                remote,
1813                path_id,
1814                ecn,
1815                first_decode,
1816                remaining,
1817            }) => {
1818                let span = trace_span!("pkt", %path_id);
1819                let _guard = span.enter();
1820                // If this packet could initiate a migration and we're a client or a server that
1821                // forbids migration, drop the datagram. This could be relaxed to heuristically
1822                // permit NAT-rebinding-like migration.
1823                if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1824                    if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1825                        trace!(
1826                            %path_id,
1827                            ?remote,
1828                            path_remote = ?self.path(path_id).map(|p| p.remote),
1829                            "discarding packet from unrecognized peer"
1830                        );
1831                        return;
1832                    }
1833                }
1834
1835                let was_anti_amplification_blocked = self
1836                    .path(path_id)
1837                    .map(|path| path.anti_amplification_blocked(1))
1838                    .unwrap_or(true); // if we don't know about this path it's eagerly considered as unvalidated
1839                // TODO(@divma): revisit this
1840
1841                self.stats.udp_rx.datagrams += 1;
1842                self.stats.udp_rx.bytes += first_decode.len() as u64;
1843                let data_len = first_decode.len();
1844
1845                self.handle_decode(now, remote, path_id, ecn, first_decode);
1846                // The current `path` might have changed inside `handle_decode` since the packet
1847                // could have triggered a migration. The packet might also belong to an unknown
1848                // path and have been rejected. Make sure the data received is accounted for the
1849                // most recent path by accessing `path` after `handle_decode`.
1850                if let Some(path) = self.path_mut(path_id) {
1851                    path.inc_total_recvd(data_len as u64);
1852                }
1853
1854                if let Some(data) = remaining {
1855                    self.stats.udp_rx.bytes += data.len() as u64;
1856                    self.handle_coalesced(now, remote, path_id, ecn, data);
1857                }
1858
1859                if let Some(path) = self.paths.get_mut(&path_id) {
1860                    self.qlog
1861                        .emit_recovery_metrics(path_id, &mut path.data, now);
1862                }
1863
1864                if was_anti_amplification_blocked {
1865                    // A prior attempt to set the loss detection timer may have failed due to
1866                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1867                    // the server's first flight is lost.
1868                    self.set_loss_detection_timer(now, path_id);
1869                }
1870            }
1871            NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1872                let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1873                debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1874                let cid_state = self
1875                    .local_cid_state
1876                    .entry(path_id)
1877                    .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1878                cid_state.new_cids(&ids, now);
1879
1880                ids.into_iter().rev().for_each(|frame| {
1881                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1882                });
1883                // Always update Timer::PushNewCid
1884                self.reset_cid_retirement(now);
1885            }
1886        }
1887    }
1888
1889    /// Process timer expirations
1890    ///
1891    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1892    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1893    /// methods.
1894    ///
1895    /// It is most efficient to call this immediately after the system clock reaches the latest
1896    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1897    /// no-op and therefore are safe.
1898    pub fn handle_timeout(&mut self, now: Instant) {
1899        while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1900            // TODO(@divma): remove `at` when the unicorn is born
1901            trace!(?timer, at=?now, "timeout");
1902            match timer {
1903                Timer::Conn(timer) => match timer {
1904                    ConnTimer::Close => {
1905                        self.state.move_to_drained(None);
1906                        self.endpoint_events.push_back(EndpointEventInner::Drained);
1907                    }
1908                    ConnTimer::Idle => {
1909                        self.kill(ConnectionError::TimedOut);
1910                    }
1911                    ConnTimer::KeepAlive => {
1912                        trace!("sending keep-alive");
1913                        self.ping();
1914                    }
1915                    ConnTimer::KeyDiscard => {
1916                        self.zero_rtt_crypto = None;
1917                        self.prev_crypto = None;
1918                    }
1919                    ConnTimer::PushNewCid => {
1920                        while let Some((path_id, when)) = self.next_cid_retirement() {
1921                            if when > now {
1922                                break;
1923                            }
1924                            match self.local_cid_state.get_mut(&path_id) {
1925                                None => error!(%path_id, "No local CID state for path"),
1926                                Some(cid_state) => {
1927                                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1928                                    let num_new_cid = cid_state.on_cid_timeout().into();
1929                                    if !self.state.is_closed() {
1930                                        trace!(
1931                                            "push a new CID to peer RETIRE_PRIOR_TO field {}",
1932                                            cid_state.retire_prior_to()
1933                                        );
1934                                        self.endpoint_events.push_back(
1935                                            EndpointEventInner::NeedIdentifiers(
1936                                                path_id,
1937                                                now,
1938                                                num_new_cid,
1939                                            ),
1940                                        );
1941                                    }
1942                                }
1943                            }
1944                        }
1945                    }
1946                },
1947                // TODO: add path_id as span somehow
1948                Timer::PerPath(path_id, timer) => {
1949                    let span = trace_span!("per-path timer fired", %path_id, ?timer);
1950                    let _guard = span.enter();
1951                    match timer {
1952                        PathTimer::PathIdle => {
1953                            self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1954                                .ok();
1955                        }
1956
1957                        PathTimer::PathKeepAlive => {
1958                            trace!("sending keep-alive on path");
1959                            self.ping_path(path_id).ok();
1960                        }
1961                        PathTimer::LossDetection => {
1962                            self.on_loss_detection_timeout(now, path_id);
1963                            self.qlog.emit_recovery_metrics(
1964                                path_id,
1965                                &mut self.paths.get_mut(&path_id).unwrap().data,
1966                                now,
1967                            );
1968                        }
1969                        PathTimer::PathValidation => {
1970                            let Some(path) = self.paths.get_mut(&path_id) else {
1971                                continue;
1972                            };
1973                            self.timers.stop(
1974                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1975                                self.qlog.with_time(now),
1976                            );
1977                            debug!("path validation failed");
1978                            if let Some((_, prev)) = path.prev.take() {
1979                                path.data = prev;
1980                            }
1981                            path.data.challenges_sent.clear();
1982                            path.data.send_new_challenge = false;
1983                        }
1984                        PathTimer::PathChallengeLost => {
1985                            let Some(path) = self.paths.get_mut(&path_id) else {
1986                                continue;
1987                            };
1988                            trace!("path challenge deemed lost");
1989                            path.data.send_new_challenge = true;
1990                        }
1991                        PathTimer::PathOpen => {
1992                            let Some(path) = self.path_mut(path_id) else {
1993                                continue;
1994                            };
1995                            path.challenges_sent.clear();
1996                            path.send_new_challenge = false;
1997                            debug!("new path validation failed");
1998                            if let Err(err) = self.close_path(
1999                                now,
2000                                path_id,
2001                                TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2002                            ) {
2003                                warn!(?err, "failed closing path");
2004                            }
2005
2006                            self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2007                                id: path_id,
2008                                error: PathError::ValidationFailed,
2009                            }));
2010                        }
2011                        PathTimer::Pacing => trace!("pacing timer expired"),
2012                        PathTimer::MaxAckDelay => {
2013                            trace!("max ack delay reached");
2014                            // This timer is only armed in the Data space
2015                            self.spaces[SpaceId::Data]
2016                                .for_path(path_id)
2017                                .pending_acks
2018                                .on_max_ack_delay_timeout()
2019                        }
2020                        PathTimer::DiscardPath => {
2021                            // The path was abandoned and 3*PTO has expired since.  Clean up all
2022                            // remaining state and install stateless reset token.
2023                            self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2024                            if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2025                                let (min_seq, max_seq) = loc_cid_state.active_seq();
2026                                for seq in min_seq..=max_seq {
2027                                    self.endpoint_events.push_back(
2028                                        EndpointEventInner::RetireConnectionId(
2029                                            now, path_id, seq, false,
2030                                        ),
2031                                    );
2032                                }
2033                            }
2034                            self.discard_path(path_id, now);
2035                        }
2036                    }
2037                }
2038            }
2039        }
2040    }
2041
2042    /// Close a connection immediately
2043    ///
2044    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
2045    /// call this only when all important communications have been completed, e.g. by calling
2046    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
2047    /// [`StreamEvent::Finished`] event.
2048    ///
2049    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
2050    /// delivered. There may still be data from the peer that has not been received.
2051    ///
2052    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
2053    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2054        self.close_inner(
2055            now,
2056            Close::Application(frame::ApplicationClose { error_code, reason }),
2057        )
2058    }
2059
2060    fn close_inner(&mut self, now: Instant, reason: Close) {
2061        let was_closed = self.state.is_closed();
2062        if !was_closed {
2063            self.close_common();
2064            self.set_close_timer(now);
2065            self.close = true;
2066            self.state.move_to_closed_local(reason);
2067        }
2068    }
2069
2070    /// Control datagrams
2071    pub fn datagrams(&mut self) -> Datagrams<'_> {
2072        Datagrams { conn: self }
2073    }
2074
2075    /// Returns connection statistics
2076    pub fn stats(&mut self) -> ConnectionStats {
2077        self.stats.clone()
2078    }
2079
2080    /// Returns path statistics
2081    pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2082        let path = self.paths.get(&path_id)?;
2083        let stats = self.path_stats.entry(path_id).or_default();
2084        stats.rtt = path.data.rtt.get();
2085        stats.cwnd = path.data.congestion.window();
2086        stats.current_mtu = path.data.mtud.current_mtu();
2087        Some(*stats)
2088    }
2089
2090    /// Ping the remote endpoint
2091    ///
2092    /// Causes an ACK-eliciting packet to be transmitted on the connection.
2093    pub fn ping(&mut self) {
2094        // TODO(flub): This is very brute-force: it pings *all* the paths.  Instead it would
2095        //    be nice if we could only send a single packet for this.
2096        for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2097            path_data.ping_pending = true;
2098        }
2099    }
2100
2101    /// Ping the remote endpoint over a specific path
2102    ///
2103    /// Causes an ACK-eliciting packet to be transmitted on the path.
2104    pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2105        let path_data = self.spaces[self.highest_space]
2106            .number_spaces
2107            .get_mut(&path)
2108            .ok_or(ClosedPath { _private: () })?;
2109        path_data.ping_pending = true;
2110        Ok(())
2111    }
2112
2113    /// Update traffic keys spontaneously
2114    ///
2115    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
2116    pub fn force_key_update(&mut self) {
2117        if !self.state.is_established() {
2118            debug!("ignoring forced key update in illegal state");
2119            return;
2120        }
2121        if self.prev_crypto.is_some() {
2122            // We already just updated, or are currently updating, the keys. Concurrent key updates
2123            // are illegal.
2124            debug!("ignoring redundant forced key update");
2125            return;
2126        }
2127        self.update_keys(None, false);
2128    }
2129
2130    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
2131    #[doc(hidden)]
2132    #[deprecated]
2133    pub fn initiate_key_update(&mut self) {
2134        self.force_key_update();
2135    }
2136
2137    /// Get a session reference
2138    pub fn crypto_session(&self) -> &dyn crypto::Session {
2139        &*self.crypto
2140    }
2141
2142    /// Whether the connection is in the process of being established
2143    ///
2144    /// If this returns `false`, the connection may be either established or closed, signaled by the
2145    /// emission of a `Connected` or `ConnectionLost` message respectively.
2146    pub fn is_handshaking(&self) -> bool {
2147        self.state.is_handshake()
2148    }
2149
2150    /// Whether the connection is closed
2151    ///
2152    /// Closed connections cannot transport any further data. A connection becomes closed when
2153    /// either peer application intentionally closes it, or when either transport layer detects an
2154    /// error such as a time-out or certificate validation failure.
2155    ///
2156    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
2157    pub fn is_closed(&self) -> bool {
2158        self.state.is_closed()
2159    }
2160
2161    /// Whether there is no longer any need to keep the connection around
2162    ///
2163    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
2164    /// packets from the peer. All drained connections have been closed.
2165    pub fn is_drained(&self) -> bool {
2166        self.state.is_drained()
2167    }
2168
2169    /// For clients, if the peer accepted the 0-RTT data packets
2170    ///
2171    /// The value is meaningless until after the handshake completes.
2172    pub fn accepted_0rtt(&self) -> bool {
2173        self.accepted_0rtt
2174    }
2175
2176    /// Whether 0-RTT is/was possible during the handshake
2177    pub fn has_0rtt(&self) -> bool {
2178        self.zero_rtt_enabled
2179    }
2180
2181    /// Whether there are any pending retransmits
2182    pub fn has_pending_retransmits(&self) -> bool {
2183        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2184    }
2185
2186    /// Look up whether we're the client or server of this Connection
2187    pub fn side(&self) -> Side {
2188        self.side.side()
2189    }
2190
2191    /// Get the address observed by the remote over the given path
2192    pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2193        self.path(path_id)
2194            .map(|path_data| {
2195                path_data
2196                    .last_observed_addr_report
2197                    .as_ref()
2198                    .map(|observed| observed.socket_addr())
2199            })
2200            .ok_or(ClosedPath { _private: () })
2201    }
2202
2203    /// The local IP address which was used when the peer established
2204    /// the connection
2205    ///
2206    /// This can be different from the address the endpoint is bound to, in case
2207    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
2208    ///
2209    /// This will return `None` for clients, or when no `local_ip` was passed to
2210    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
2211    /// connection.
2212    pub fn local_ip(&self) -> Option<IpAddr> {
2213        self.local_ip
2214    }
2215
2216    /// Current best estimate of this connection's latency (round-trip-time)
2217    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2218        self.path(path_id).map(|d| d.rtt.get())
2219    }
2220
2221    /// Current state of this connection's congestion controller, for debugging purposes
2222    pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2223        self.path(path_id).map(|d| d.congestion.as_ref())
2224    }
2225
2226    /// Modify the number of remotely initiated streams that may be concurrently open
2227    ///
2228    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
2229    /// `count`s increase both minimum and worst-case memory consumption.
2230    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2231        self.streams.set_max_concurrent(dir, count);
2232        // If the limit was reduced, then a flow control update previously deemed insignificant may
2233        // now be significant.
2234        let pending = &mut self.spaces[SpaceId::Data].pending;
2235        self.streams.queue_max_stream_id(pending);
2236    }
2237
2238    /// Modify the number of open paths allowed when multipath is enabled
2239    ///
2240    /// When reducing the number of concurrent paths this will only affect delaying sending
2241    /// new MAX_PATH_ID frames until fewer than this number of paths are possible.  To
2242    /// actively reduce paths they must be closed using [`Connection::close_path`], which
2243    /// can also be used to close not-yet-opened paths.
2244    ///
2245    /// If multipath is not negotiated (see the [`TransportConfig`]) this can not enable
2246    /// multipath and will fail.
2247    pub fn set_max_concurrent_paths(
2248        &mut self,
2249        now: Instant,
2250        count: NonZeroU32,
2251    ) -> Result<(), MultipathNotNegotiated> {
2252        if !self.is_multipath_negotiated() {
2253            return Err(MultipathNotNegotiated { _private: () });
2254        }
2255        self.max_concurrent_paths = count;
2256
2257        let in_use_count = self
2258            .local_max_path_id
2259            .next()
2260            .saturating_sub(self.abandoned_paths.len() as u32)
2261            .as_u32();
2262        let extra_needed = count.get().saturating_sub(in_use_count);
2263        let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2264
2265        self.set_max_path_id(now, new_max_path_id);
2266
2267        Ok(())
2268    }
2269
2270    /// If needed, issues a new MAX_PATH_ID frame and new CIDs for any newly allowed paths
2271    fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2272        if max_path_id <= self.local_max_path_id {
2273            return;
2274        }
2275
2276        self.local_max_path_id = max_path_id;
2277        self.spaces[SpaceId::Data].pending.max_path_id = true;
2278
2279        self.issue_first_path_cids(now);
2280    }
2281
2282    /// Current number of remotely initiated streams that may be concurrently open
2283    ///
2284    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
2285    /// it will not change immediately, even if fewer streams are open. Instead, it will
2286    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
2287    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2288        self.streams.max_concurrent(dir)
2289    }
2290
2291    /// See [`TransportConfig::send_window()`]
2292    pub fn set_send_window(&mut self, send_window: u64) {
2293        self.streams.set_send_window(send_window);
2294    }
2295
2296    /// See [`TransportConfig::receive_window()`]
2297    pub fn set_receive_window(&mut self, receive_window: VarInt) {
2298        if self.streams.set_receive_window(receive_window) {
2299            self.spaces[SpaceId::Data].pending.max_data = true;
2300        }
2301    }
2302
2303    /// Whether the Multipath for QUIC extension is enabled.
2304    ///
2305    /// Multipath is only enabled after the handshake is completed and if it was enabled by both
2306    /// peers.
2307    pub fn is_multipath_negotiated(&self) -> bool {
2308        !self.is_handshaking()
2309            && self.config.max_concurrent_multipath_paths.is_some()
2310            && self.peer_params.initial_max_path_id.is_some()
2311    }
2312
2313    fn on_ack_received(
2314        &mut self,
2315        now: Instant,
2316        space: SpaceId,
2317        ack: frame::Ack,
2318    ) -> Result<(), TransportError> {
2319        // All ACKs are referencing path 0
2320        let path = PathId::ZERO;
2321        self.inner_on_ack_received(now, space, path, ack)
2322    }
2323
2324    fn on_path_ack_received(
2325        &mut self,
2326        now: Instant,
2327        space: SpaceId,
2328        path_ack: frame::PathAck,
2329    ) -> Result<(), TransportError> {
2330        let (ack, path) = path_ack.into_ack();
2331        self.inner_on_ack_received(now, space, path, ack)
2332    }
2333
2334    /// Handles an ACK frame acknowledging packets sent on *path*.
2335    fn inner_on_ack_received(
2336        &mut self,
2337        now: Instant,
2338        space: SpaceId,
2339        path: PathId,
2340        ack: frame::Ack,
2341    ) -> Result<(), TransportError> {
2342        if self.abandoned_paths.contains(&path) {
2343            // See also https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.4.3-3
2344            // > PATH_ACK frames received with an abandoned path ID are silently ignored, as specified in Section 4.
2345            trace!("silently ignoring PATH_ACK on abandoned path");
2346            return Ok(());
2347        }
2348        if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2349            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2350        }
2351        let new_largest = {
2352            let space = &mut self.spaces[space].for_path(path);
2353            if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2354                space.largest_acked_packet = Some(ack.largest);
2355                if let Some(info) = space.sent_packets.get(ack.largest) {
2356                    // This should always succeed, but a misbehaving peer might ACK a packet we
2357                    // haven't sent. At worst, that will result in us spuriously reducing the
2358                    // congestion window.
2359                    space.largest_acked_packet_sent = info.time_sent;
2360                }
2361                true
2362            } else {
2363                false
2364            }
2365        };
2366
2367        if self.detect_spurious_loss(&ack, space, path) {
2368            self.path_data_mut(path)
2369                .congestion
2370                .on_spurious_congestion_event();
2371        }
2372
2373        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
2374        let mut newly_acked = ArrayRangeSet::new();
2375        for range in ack.iter() {
2376            self.spaces[space].for_path(path).check_ack(range.clone())?;
2377            for (pn, _) in self.spaces[space]
2378                .for_path(path)
2379                .sent_packets
2380                .iter_range(range)
2381            {
2382                newly_acked.insert_one(pn);
2383            }
2384        }
2385
2386        if newly_acked.is_empty() {
2387            return Ok(());
2388        }
2389
2390        let mut ack_eliciting_acked = false;
2391        for packet in newly_acked.elts() {
2392            if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2393                for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2394                    // Assume ACKs for all packets below the largest acknowledged in
2395                    // `packet` have been received. This can cause the peer to spuriously
2396                    // retransmit if some of our earlier ACKs were lost, but allows for
2397                    // simpler state tracking. See discussion at
2398                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
2399                    if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2400                        pns.pending_acks.subtract_below(*acked_pn);
2401                    }
2402                }
2403                ack_eliciting_acked |= info.ack_eliciting;
2404
2405                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
2406                let path_data = self.path_data_mut(path);
2407                let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2408                if mtu_updated {
2409                    path_data
2410                        .congestion
2411                        .on_mtu_update(path_data.mtud.current_mtu());
2412                }
2413
2414                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
2415                self.ack_frequency.on_acked(path, packet);
2416
2417                self.on_packet_acked(now, path, info);
2418            }
2419        }
2420
2421        let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2422        let app_limited = self.app_limited;
2423        let path_data = self.path_data_mut(path);
2424        let in_flight = path_data.in_flight.bytes;
2425
2426        path_data
2427            .congestion
2428            .on_end_acks(now, in_flight, app_limited, largest_ackd);
2429
2430        if new_largest && ack_eliciting_acked {
2431            let ack_delay = if space != SpaceId::Data {
2432                Duration::from_micros(0)
2433            } else {
2434                cmp::min(
2435                    self.ack_frequency.peer_max_ack_delay,
2436                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2437                )
2438            };
2439            let rtt = now.saturating_duration_since(
2440                self.spaces[space].for_path(path).largest_acked_packet_sent,
2441            );
2442
2443            let next_pn = self.spaces[space].for_path(path).next_packet_number;
2444            let path_data = self.path_data_mut(path);
2445            // TODO(@divma): should be a method of path, should be contained in a single place
2446            path_data.rtt.update(ack_delay, rtt);
2447            if path_data.first_packet_after_rtt_sample.is_none() {
2448                path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2449            }
2450        }
2451
2452        // Must be called before crypto/pto_count are clobbered
2453        self.detect_lost_packets(now, space, path, true);
2454
2455        if self.peer_completed_address_validation(path) {
2456            self.path_data_mut(path).pto_count = 0;
2457        }
2458
2459        // Explicit congestion notification
2460        // TODO(@divma): this code is a good example of logic that should be contained in a single
2461        // place but it's split between the path data and the packet number space data, we should
2462        // find a way to make this work without two lookups
2463        if self.path_data(path).sending_ecn {
2464            if let Some(ecn) = ack.ecn {
2465                // We only examine ECN counters from ACKs that we are certain we received in transmit
2466                // order, allowing us to compute an increase in ECN counts to compare against the number
2467                // of newly acked packets that remains well-defined in the presence of arbitrary packet
2468                // reordering.
2469                if new_largest {
2470                    let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2471                    self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2472                }
2473            } else {
2474                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
2475                debug!("ECN not acknowledged by peer");
2476                self.path_data_mut(path).sending_ecn = false;
2477            }
2478        }
2479
2480        self.set_loss_detection_timer(now, path);
2481        Ok(())
2482    }
2483
2484    fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2485        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2486
2487        if lost_packets.is_empty() {
2488            return false;
2489        }
2490
2491        for range in ack.iter() {
2492            let spurious_losses: Vec<u64> = lost_packets
2493                .iter_range(range.clone())
2494                .map(|(pn, _info)| pn)
2495                .collect();
2496
2497            for pn in spurious_losses {
2498                lost_packets.remove(pn);
2499            }
2500        }
2501
2502        // If this ACK frame acknowledged all deemed lost packets,
2503        // then we have raised a spurious congestion event in the past.
2504        // We cannot conclude when there are remaining packets,
2505        // but future ACK frames might indicate a spurious loss detection.
2506        lost_packets.is_empty()
2507    }
2508
2509    /// Drain lost packets that we reasonably think will never arrive
2510    ///
2511    /// The current criterion is copied from `msquic`:
2512    /// discard packets that were sent earlier than 2 probe timeouts ago.
2513    fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2514        let two_pto = 2 * self.path_data(path).rtt.pto_base();
2515
2516        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2517        lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2518    }
2519
2520    /// Process a new ECN block from an in-order ACK
2521    fn process_ecn(
2522        &mut self,
2523        now: Instant,
2524        space: SpaceId,
2525        path: PathId,
2526        newly_acked: u64,
2527        ecn: frame::EcnCounts,
2528        largest_sent_time: Instant,
2529    ) {
2530        match self.spaces[space]
2531            .for_path(path)
2532            .detect_ecn(newly_acked, ecn)
2533        {
2534            Err(e) => {
2535                debug!("halting ECN due to verification failure: {}", e);
2536
2537                self.path_data_mut(path).sending_ecn = false;
2538                // Wipe out the existing value because it might be garbage and could interfere with
2539                // future attempts to use ECN on new paths.
2540                self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2541            }
2542            Ok(false) => {}
2543            Ok(true) => {
2544                self.path_stats.entry(path).or_default().congestion_events += 1;
2545                self.path_data_mut(path).congestion.on_congestion_event(
2546                    now,
2547                    largest_sent_time,
2548                    false,
2549                    true,
2550                    0,
2551                );
2552            }
2553        }
2554    }
2555
2556    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
2557    // high-latency handshakes
2558    fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2559        self.paths
2560            .get_mut(&path_id)
2561            .expect("known path")
2562            .remove_in_flight(&info);
2563        let app_limited = self.app_limited;
2564        let path = self.path_data_mut(path_id);
2565        if info.ack_eliciting && !path.is_validating_path() {
2566            // Only pass ACKs to the congestion controller if we are not validating the current
2567            // path, so as to ignore any ACKs from older paths still coming in.
2568            let rtt = path.rtt;
2569            path.congestion
2570                .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2571        }
2572
2573        // Update state for confirmed delivery of frames
2574        if let Some(retransmits) = info.retransmits.get() {
2575            for (id, _) in retransmits.reset_stream.iter() {
2576                self.streams.reset_acked(*id);
2577            }
2578        }
2579
2580        for frame in info.stream_frames {
2581            self.streams.received_ack_of(frame);
2582        }
2583    }
2584
2585    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2586        let start = if self.zero_rtt_crypto.is_some() {
2587            now
2588        } else {
2589            self.prev_crypto
2590                .as_ref()
2591                .expect("no previous keys")
2592                .end_packet
2593                .as_ref()
2594                .expect("update not acknowledged yet")
2595                .1
2596        };
2597
2598        // QUIC-MULTIPATH § 2.5 Key Phase Update Process: use largest PTO off all paths.
2599        self.timers.set(
2600            Timer::Conn(ConnTimer::KeyDiscard),
2601            start + self.pto_max_path(space, false) * 3,
2602            self.qlog.with_time(now),
2603        );
2604    }
2605
2606    /// Handle a [`PathTimer::LossDetection`] timeout.
2607    ///
2608    /// This timer expires for two reasons:
2609    /// - An ACK-eliciting packet we sent should be considered lost.
2610    /// - The PTO may have expired and a tail-loss probe needs to be scheduled.
2611    ///
2612    /// The former needs us to schedule re-transmission of the lost data.
2613    ///
2614    /// The latter means we have not received an ACK for an ack-eliciting packet we sent
2615    /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
2616    /// packet, to try and elicit new acknowledgements. These new acknowledgements will
2617    /// indicate whether the previously sent packets were lost or not.
2618    fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2619        if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2620            // Time threshold loss Detection
2621            self.detect_lost_packets(now, pn_space, path_id, false);
2622            self.set_loss_detection_timer(now, path_id);
2623            return;
2624        }
2625
2626        let (_, space) = match self.pto_time_and_space(now, path_id) {
2627            Some(x) => x,
2628            None => {
2629                error!(%path_id, "PTO expired while unset");
2630                return;
2631            }
2632        };
2633        trace!(
2634            in_flight = self.path_data(path_id).in_flight.bytes,
2635            count = self.path_data(path_id).pto_count,
2636            ?space,
2637            %path_id,
2638            "PTO fired"
2639        );
2640
2641        let count = match self.path_data(path_id).in_flight.ack_eliciting {
2642            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
2643            // deadlock preventions
2644            0 => {
2645                debug_assert!(!self.peer_completed_address_validation(path_id));
2646                1
2647            }
2648            // Conventional loss probe
2649            _ => 2,
2650        };
2651        let pns = self.spaces[space].for_path(path_id);
2652        pns.loss_probes = pns.loss_probes.saturating_add(count);
2653        let path_data = self.path_data_mut(path_id);
2654        path_data.pto_count = path_data.pto_count.saturating_add(1);
2655        self.set_loss_detection_timer(now, path_id);
2656    }
2657
2658    /// Detect any lost packets
2659    ///
2660    /// There are two cases in which we detects lost packets:
2661    ///
2662    /// - We received an ACK packet.
2663    /// - The [`PathTimer::LossDetection`] timer expired. So there is an un-acknowledged packet
2664    ///   that was followed by an acknowledged packet. The loss timer for this
2665    ///   un-acknowledged packet expired and we need to detect that packet as lost.
2666    ///
2667    /// Packets are lost if they are both (See RFC9002 §6.1):
2668    ///
2669    /// - Unacknowledged, in flight and sent prior to an acknowledged packet.
2670    /// - Old enough by either:
2671    ///   - Having a packet number [`TransportConfig::packet_threshold`] lower then the last
2672    ///     acknowledged packet.
2673    ///   - Being sent [`TransportConfig::time_threshold`] * RTT in the past.
2674    fn detect_lost_packets(
2675        &mut self,
2676        now: Instant,
2677        pn_space: SpaceId,
2678        path_id: PathId,
2679        due_to_ack: bool,
2680    ) {
2681        let mut lost_packets = Vec::<u64>::new();
2682        let mut lost_mtu_probe = None;
2683        let mut in_persistent_congestion = false;
2684        let mut size_of_lost_packets = 0u64;
2685        self.spaces[pn_space].for_path(path_id).loss_time = None;
2686
2687        // Find all the lost packets, populating all variables initialised above.
2688
2689        let path = self.path_data(path_id);
2690        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2691        let loss_delay = path
2692            .rtt
2693            .conservative()
2694            .mul_f32(self.config.time_threshold)
2695            .max(TIMER_GRANULARITY);
2696        let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2697
2698        let largest_acked_packet = self.spaces[pn_space]
2699            .for_path(path_id)
2700            .largest_acked_packet
2701            .expect("detect_lost_packets only to be called if path received at least one ACK");
2702        let packet_threshold = self.config.packet_threshold as u64;
2703
2704        // InPersistentCongestion: Determine if all packets in the time period before the newest
2705        // lost packet, including the edges, are marked lost. PTO computation must always
2706        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
2707        let congestion_period = self
2708            .pto(SpaceId::Data, path_id)
2709            .saturating_mul(self.config.persistent_congestion_threshold);
2710        let mut persistent_congestion_start: Option<Instant> = None;
2711        let mut prev_packet = None;
2712        let space = self.spaces[pn_space].for_path(path_id);
2713
2714        for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2715            if prev_packet != Some(packet.wrapping_sub(1)) {
2716                // An intervening packet was acknowledged
2717                persistent_congestion_start = None;
2718            }
2719
2720            // Packets sent before now - loss_delay are deemed lost.
2721            // However, we avoid subtraction as it can panic and there's no
2722            // saturating equivalent of this subtraction operation with a Duration.
2723            let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2724            if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2725                // The packet should be declared lost.
2726                if Some(packet) == in_flight_mtu_probe {
2727                    // Lost MTU probes are not included in `lost_packets`, because they
2728                    // should not trigger a congestion control response
2729                    lost_mtu_probe = in_flight_mtu_probe;
2730                } else {
2731                    lost_packets.push(packet);
2732                    size_of_lost_packets += info.size as u64;
2733                    if info.ack_eliciting && due_to_ack {
2734                        match persistent_congestion_start {
2735                            // Two ACK-eliciting packets lost more than
2736                            // congestion_period apart, with no ACKed packets in between
2737                            Some(start) if info.time_sent - start > congestion_period => {
2738                                in_persistent_congestion = true;
2739                            }
2740                            // Persistent congestion must start after the first RTT sample
2741                            None if first_packet_after_rtt_sample
2742                                .is_some_and(|x| x < (pn_space, packet)) =>
2743                            {
2744                                persistent_congestion_start = Some(info.time_sent);
2745                            }
2746                            _ => {}
2747                        }
2748                    }
2749                }
2750            } else {
2751                // The packet should not yet be declared lost.
2752                if space.loss_time.is_none() {
2753                    // Since we iterate in order the lowest packet number's loss time will
2754                    // always be the earliest.
2755                    space.loss_time = Some(info.time_sent + loss_delay);
2756                }
2757                persistent_congestion_start = None;
2758            }
2759
2760            prev_packet = Some(packet);
2761        }
2762
2763        self.handle_lost_packets(
2764            pn_space,
2765            path_id,
2766            now,
2767            lost_packets,
2768            lost_mtu_probe,
2769            loss_delay,
2770            in_persistent_congestion,
2771            size_of_lost_packets,
2772        );
2773    }
2774
2775    /// Drops the path state, declaring any remaining in-flight packets as lost
2776    fn discard_path(&mut self, path_id: PathId, now: Instant) {
2777        trace!(%path_id, "dropping path state");
2778        let path = self.path_data(path_id);
2779        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2780
2781        let mut size_of_lost_packets = 0u64; // add to path_stats.lost_bytes;
2782        let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2783            .for_path(path_id)
2784            .sent_packets
2785            .iter()
2786            .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2787            .map(|(pn, info)| {
2788                size_of_lost_packets += info.size as u64;
2789                pn
2790            })
2791            .collect();
2792
2793        if !lost_pns.is_empty() {
2794            trace!(
2795                %path_id,
2796                count = lost_pns.len(),
2797                lost_bytes = size_of_lost_packets,
2798                "packets lost on path abandon"
2799            );
2800            self.handle_lost_packets(
2801                SpaceId::Data,
2802                path_id,
2803                now,
2804                lost_pns,
2805                in_flight_mtu_probe,
2806                Duration::ZERO,
2807                false,
2808                size_of_lost_packets,
2809            );
2810        }
2811        self.paths.remove(&path_id);
2812        self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2813
2814        let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2815        self.events.push_back(
2816            PathEvent::Abandoned {
2817                id: path_id,
2818                path_stats,
2819            }
2820            .into(),
2821        );
2822    }
2823
2824    fn handle_lost_packets(
2825        &mut self,
2826        pn_space: SpaceId,
2827        path_id: PathId,
2828        now: Instant,
2829        lost_packets: Vec<u64>,
2830        lost_mtu_probe: Option<u64>,
2831        loss_delay: Duration,
2832        in_persistent_congestion: bool,
2833        size_of_lost_packets: u64,
2834    ) {
2835        debug_assert!(
2836            {
2837                let mut sorted = lost_packets.clone();
2838                sorted.sort();
2839                sorted == lost_packets
2840            },
2841            "lost_packets must be sorted"
2842        );
2843
2844        self.drain_lost_packets(now, pn_space, path_id);
2845
2846        // OnPacketsLost
2847        if let Some(largest_lost) = lost_packets.last().cloned() {
2848            let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2849            let largest_lost_sent = self.spaces[pn_space]
2850                .for_path(path_id)
2851                .sent_packets
2852                .get(largest_lost)
2853                .unwrap()
2854                .time_sent;
2855            let path_stats = self.path_stats.entry(path_id).or_default();
2856            path_stats.lost_packets += lost_packets.len() as u64;
2857            path_stats.lost_bytes += size_of_lost_packets;
2858            trace!(
2859                %path_id,
2860                count = lost_packets.len(),
2861                lost_bytes = size_of_lost_packets,
2862                "packets lost",
2863            );
2864
2865            for &packet in &lost_packets {
2866                let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2867                    continue;
2868                };
2869                self.qlog
2870                    .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2871                self.paths
2872                    .get_mut(&path_id)
2873                    .unwrap()
2874                    .remove_in_flight(&info);
2875
2876                for frame in info.stream_frames {
2877                    self.streams.retransmit(frame);
2878                }
2879                self.spaces[pn_space].pending |= info.retransmits;
2880                self.path_data_mut(path_id)
2881                    .mtud
2882                    .on_non_probe_lost(packet, info.size);
2883
2884                self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2885                    packet,
2886                    LostPacket {
2887                        time_sent: info.time_sent,
2888                    },
2889                );
2890            }
2891
2892            let path = self.path_data_mut(path_id);
2893            if path.mtud.black_hole_detected(now) {
2894                path.congestion.on_mtu_update(path.mtud.current_mtu());
2895                if let Some(max_datagram_size) = self.datagrams().max_size() {
2896                    self.datagrams.drop_oversized(max_datagram_size);
2897                }
2898                self.path_stats
2899                    .entry(path_id)
2900                    .or_default()
2901                    .black_holes_detected += 1;
2902            }
2903
2904            // Don't apply congestion penalty for lost ack-only packets
2905            let lost_ack_eliciting =
2906                old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2907
2908            if lost_ack_eliciting {
2909                self.path_stats
2910                    .entry(path_id)
2911                    .or_default()
2912                    .congestion_events += 1;
2913                self.path_data_mut(path_id).congestion.on_congestion_event(
2914                    now,
2915                    largest_lost_sent,
2916                    in_persistent_congestion,
2917                    false,
2918                    size_of_lost_packets,
2919                );
2920            }
2921        }
2922
2923        // Handle a lost MTU probe
2924        if let Some(packet) = lost_mtu_probe {
2925            let info = self.spaces[SpaceId::Data]
2926                .for_path(path_id)
2927                .take(packet)
2928                .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and
2929            // therefore must not have been removed yet
2930            self.paths
2931                .get_mut(&path_id)
2932                .unwrap()
2933                .remove_in_flight(&info);
2934            self.path_data_mut(path_id).mtud.on_probe_lost();
2935            self.path_stats
2936                .entry(path_id)
2937                .or_default()
2938                .lost_plpmtud_probes += 1;
2939        }
2940    }
2941
2942    /// Returns the earliest time packets should be declared lost for all spaces on a path.
2943    ///
2944    /// If a path has an acknowledged packet with any prior un-acknowledged packets, the
2945    /// earliest un-acknowledged packet can be declared lost after a timeout has elapsed.
2946    /// The time returned is when this packet should be declared lost.
2947    fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2948        SpaceId::iter()
2949            .filter_map(|id| {
2950                self.spaces[id]
2951                    .number_spaces
2952                    .get(&path_id)
2953                    .and_then(|pns| pns.loss_time)
2954                    .map(|time| (time, id))
2955            })
2956            .min_by_key(|&(time, _)| time)
2957    }
2958
2959    /// Returns the earliest next PTO should fire for all spaces on a path.
2960    fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2961        let path = self.path(path_id)?;
2962        let pto_count = path.pto_count;
2963        let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2964        let mut duration = path.rtt.pto_base() * backoff;
2965
2966        if path_id == PathId::ZERO
2967            && path.in_flight.ack_eliciting == 0
2968            && !self.peer_completed_address_validation(PathId::ZERO)
2969        {
2970            // Address Validation during Connection Establishment:
2971            // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
2972            // deadlock if an Initial or Handshake packet from the server is lost and the
2973            // server can not send more due to its anti-amplification limit the client must
2974            // send another packet on PTO.
2975            let space = match self.highest_space {
2976                SpaceId::Handshake => SpaceId::Handshake,
2977                _ => SpaceId::Initial,
2978            };
2979
2980            return Some((now + duration, space));
2981        }
2982
2983        let mut result = None;
2984        for space in SpaceId::iter() {
2985            let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2986                continue;
2987            };
2988
2989            if !pns.has_in_flight() {
2990                continue;
2991            }
2992            if space == SpaceId::Data {
2993                // Skip ApplicationData until handshake completes.
2994                if self.is_handshaking() {
2995                    return result;
2996                }
2997                // Include max_ack_delay and backoff for ApplicationData.
2998                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2999            }
3000            let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3001                continue;
3002            };
3003            let pto = last_ack_eliciting + duration;
3004            if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3005                if path.anti_amplification_blocked(1) {
3006                    // Nothing would be able to be sent.
3007                    continue;
3008                }
3009                if path.in_flight.ack_eliciting == 0 {
3010                    // Nothing ack-eliciting, no PTO to arm/fire.
3011                    continue;
3012                }
3013                result = Some((pto, space));
3014            }
3015        }
3016        result
3017    }
3018
3019    fn peer_completed_address_validation(&self, path: PathId) -> bool {
3020        // TODO(flub): This logic needs updating for multipath
3021        if self.side.is_server() || self.state.is_closed() {
3022            return true;
3023        }
3024        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
3025        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
3026        self.spaces[SpaceId::Handshake]
3027            .path_space(PathId::ZERO)
3028            .and_then(|pns| pns.largest_acked_packet)
3029            .is_some()
3030            || self.spaces[SpaceId::Data]
3031                .path_space(path)
3032                .and_then(|pns| pns.largest_acked_packet)
3033                .is_some()
3034            || (self.spaces[SpaceId::Data].crypto.is_some()
3035                && self.spaces[SpaceId::Handshake].crypto.is_none())
3036    }
3037
3038    /// Resets the the [`PathTimer::LossDetection`] timer to the next instant it may be needed
3039    ///
3040    /// The timer must fire if either:
3041    /// - An ack-eliciting packet we sent needs to be declared lost.
3042    /// - A tail-loss probe needs to be sent.
3043    ///
3044    /// See [`Connection::on_loss_detection_timeout`] for details.
3045    fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3046        if self.state.is_closed() {
3047            // No loss detection takes place on closed connections, and `close_common` already
3048            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
3049            // reordered packet being handled by state-insensitive code.
3050            return;
3051        }
3052
3053        if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3054            // Time threshold loss detection.
3055            self.timers.set(
3056                Timer::PerPath(path_id, PathTimer::LossDetection),
3057                loss_time,
3058                self.qlog.with_time(now),
3059            );
3060            return;
3061        }
3062
3063        // Determine which PN space to arm PTO for.
3064        // Calculate PTO duration
3065        if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3066            self.timers.set(
3067                Timer::PerPath(path_id, PathTimer::LossDetection),
3068                timeout,
3069                self.qlog.with_time(now),
3070            );
3071        } else {
3072            self.timers.stop(
3073                Timer::PerPath(path_id, PathTimer::LossDetection),
3074                self.qlog.with_time(now),
3075            );
3076        }
3077    }
3078
3079    /// The maximum probe timeout across all paths
3080    ///
3081    /// If `is_closing` is set to `true` it will filter out paths that have not yet been used.
3082    ///
3083    /// See [`Connection::pto`]
3084    fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3085        match space {
3086            SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3087            SpaceId::Data => self
3088                .paths
3089                .iter()
3090                .filter_map(|(path_id, state)| {
3091                    if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3092                        // If we are closing and haven't sent anything yet, do not include
3093                        None
3094                    } else {
3095                        let pto = self.pto(space, *path_id);
3096                        Some(pto)
3097                    }
3098                })
3099                .max()
3100                .expect("there should be one at least path"),
3101        }
3102    }
3103
3104    /// Probe Timeout
3105    ///
3106    /// The PTO is logically the time in which you'd expect to receive an acknowledgement
3107    /// for a packet. So approximately RTT + max_ack_delay.
3108    fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3109        let max_ack_delay = match space {
3110            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3111            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3112        };
3113        self.path_data(path_id).rtt.pto_base() + max_ack_delay
3114    }
3115
3116    fn on_packet_authenticated(
3117        &mut self,
3118        now: Instant,
3119        space_id: SpaceId,
3120        path_id: PathId,
3121        ecn: Option<EcnCodepoint>,
3122        packet: Option<u64>,
3123        spin: bool,
3124        is_1rtt: bool,
3125    ) {
3126        self.total_authed_packets += 1;
3127        if let Some(last_allowed_receive) = self
3128            .paths
3129            .get(&path_id)
3130            .and_then(|path| path.data.last_allowed_receive)
3131        {
3132            if now > last_allowed_receive {
3133                warn!("received data on path which we abandoned more than 3 * PTO ago");
3134                // The peer failed to respond with a PATH_ABANDON in time.
3135                if !self.state.is_closed() {
3136                    // TODO(flub): What should the error code be?
3137                    self.state.move_to_closed(TransportError::NO_ERROR(
3138                        "peer failed to respond with PATH_ABANDON in time",
3139                    ));
3140                    self.close_common();
3141                    self.set_close_timer(now);
3142                    self.close = true;
3143                }
3144                return;
3145            }
3146        }
3147
3148        self.reset_keep_alive(path_id, now);
3149        self.reset_idle_timeout(now, space_id, path_id);
3150        self.permit_idle_reset = true;
3151        self.receiving_ecn |= ecn.is_some();
3152        if let Some(x) = ecn {
3153            let space = &mut self.spaces[space_id];
3154            space.for_path(path_id).ecn_counters += x;
3155
3156            if x.is_ce() {
3157                space
3158                    .for_path(path_id)
3159                    .pending_acks
3160                    .set_immediate_ack_required();
3161            }
3162        }
3163
3164        let packet = match packet {
3165            Some(x) => x,
3166            None => return,
3167        };
3168        match &self.side {
3169            ConnectionSide::Client { .. } => {
3170                // If we received a handshake packet that authenticated, then we're talking to
3171                // the real server.  From now on we should no longer allow the server to migrate
3172                // its address.
3173                if space_id == SpaceId::Handshake {
3174                    if let Some(hs) = self.state.as_handshake_mut() {
3175                        hs.allow_server_migration = false;
3176                    }
3177                }
3178            }
3179            ConnectionSide::Server { .. } => {
3180                if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3181                {
3182                    // A server stops sending and processing Initial packets when it receives its first Handshake packet.
3183                    self.discard_space(now, SpaceId::Initial);
3184                }
3185                if self.zero_rtt_crypto.is_some() && is_1rtt {
3186                    // Discard 0-RTT keys soon after receiving a 1-RTT packet
3187                    self.set_key_discard_timer(now, space_id)
3188                }
3189            }
3190        }
3191        let space = self.spaces[space_id].for_path(path_id);
3192        space.pending_acks.insert_one(packet, now);
3193        if packet >= space.rx_packet.unwrap_or_default() {
3194            space.rx_packet = Some(packet);
3195            // Update outgoing spin bit, inverting iff we're the client
3196            self.spin = self.side.is_client() ^ spin;
3197        }
3198    }
3199
3200    /// Resets the idle timeout timers
3201    ///
3202    /// Without multipath there is only the connection-wide idle timeout. When multipath is
3203    /// enabled there is an additional per-path idle timeout.
3204    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3205        // First reset the global idle timeout.
3206        if let Some(timeout) = self.idle_timeout {
3207            if self.state.is_closed() {
3208                self.timers
3209                    .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3210            } else {
3211                let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3212                self.timers.set(
3213                    Timer::Conn(ConnTimer::Idle),
3214                    now + dt,
3215                    self.qlog.with_time(now),
3216                );
3217            }
3218        }
3219
3220        // Now handle the per-path state
3221        if let Some(timeout) = self.path_data(path_id).idle_timeout {
3222            if self.state.is_closed() {
3223                self.timers.stop(
3224                    Timer::PerPath(path_id, PathTimer::PathIdle),
3225                    self.qlog.with_time(now),
3226                );
3227            } else {
3228                let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3229                self.timers.set(
3230                    Timer::PerPath(path_id, PathTimer::PathIdle),
3231                    now + dt,
3232                    self.qlog.with_time(now),
3233                );
3234            }
3235        }
3236    }
3237
3238    /// Resets both the [`ConnTimer::KeepAlive`] and [`PathTimer::PathKeepAlive`] timers
3239    fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3240        if !self.state.is_established() {
3241            return;
3242        }
3243
3244        if let Some(interval) = self.config.keep_alive_interval {
3245            self.timers.set(
3246                Timer::Conn(ConnTimer::KeepAlive),
3247                now + interval,
3248                self.qlog.with_time(now),
3249            );
3250        }
3251
3252        if let Some(interval) = self.path_data(path_id).keep_alive {
3253            self.timers.set(
3254                Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3255                now + interval,
3256                self.qlog.with_time(now),
3257            );
3258        }
3259    }
3260
3261    /// Sets the timer for when a previously issued CID should be retired next
3262    fn reset_cid_retirement(&mut self, now: Instant) {
3263        if let Some((_path, t)) = self.next_cid_retirement() {
3264            self.timers.set(
3265                Timer::Conn(ConnTimer::PushNewCid),
3266                t,
3267                self.qlog.with_time(now),
3268            );
3269        }
3270    }
3271
3272    /// The next time when a previously issued CID should be retired
3273    fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3274        self.local_cid_state
3275            .iter()
3276            .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3277            .min_by_key(|(_path_id, timeout)| *timeout)
3278    }
3279
3280    /// Handle the already-decrypted first packet from the client
3281    ///
3282    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
3283    /// efficient.
3284    pub(crate) fn handle_first_packet(
3285        &mut self,
3286        now: Instant,
3287        remote: SocketAddr,
3288        ecn: Option<EcnCodepoint>,
3289        packet_number: u64,
3290        packet: InitialPacket,
3291        remaining: Option<BytesMut>,
3292    ) -> Result<(), ConnectionError> {
3293        let span = trace_span!("first recv");
3294        let _guard = span.enter();
3295        debug_assert!(self.side.is_server());
3296        let len = packet.header_data.len() + packet.payload.len();
3297        let path_id = PathId::ZERO;
3298        self.path_data_mut(path_id).total_recvd = len as u64;
3299
3300        if let Some(hs) = self.state.as_handshake_mut() {
3301            hs.expected_token = packet.header.token.clone();
3302        } else {
3303            unreachable!("first packet must be delivered in Handshake state");
3304        }
3305
3306        // The first packet is always on PathId::ZERO
3307        self.on_packet_authenticated(
3308            now,
3309            SpaceId::Initial,
3310            path_id,
3311            ecn,
3312            Some(packet_number),
3313            false,
3314            false,
3315        );
3316
3317        let packet: Packet = packet.into();
3318
3319        let mut qlog = QlogRecvPacket::new(len);
3320        qlog.header(&packet.header, Some(packet_number), path_id);
3321
3322        self.process_decrypted_packet(
3323            now,
3324            remote,
3325            path_id,
3326            Some(packet_number),
3327            packet,
3328            &mut qlog,
3329        )?;
3330        self.qlog.emit_packet_received(qlog, now);
3331        if let Some(data) = remaining {
3332            self.handle_coalesced(now, remote, path_id, ecn, data);
3333        }
3334
3335        self.qlog.emit_recovery_metrics(
3336            path_id,
3337            &mut self.paths.get_mut(&path_id).unwrap().data,
3338            now,
3339        );
3340
3341        Ok(())
3342    }
3343
3344    fn init_0rtt(&mut self, now: Instant) {
3345        let (header, packet) = match self.crypto.early_crypto() {
3346            Some(x) => x,
3347            None => return,
3348        };
3349        if self.side.is_client() {
3350            match self.crypto.transport_parameters() {
3351                Ok(params) => {
3352                    let params = params
3353                        .expect("crypto layer didn't supply transport parameters with ticket");
3354                    // Certain values must not be cached
3355                    let params = TransportParameters {
3356                        initial_src_cid: None,
3357                        original_dst_cid: None,
3358                        preferred_address: None,
3359                        retry_src_cid: None,
3360                        stateless_reset_token: None,
3361                        min_ack_delay: None,
3362                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3363                        max_ack_delay: TransportParameters::default().max_ack_delay,
3364                        initial_max_path_id: None,
3365                        ..params
3366                    };
3367                    self.set_peer_params(params);
3368                    self.qlog.emit_peer_transport_params_restored(self, now);
3369                }
3370                Err(e) => {
3371                    error!("session ticket has malformed transport parameters: {}", e);
3372                    return;
3373                }
3374            }
3375        }
3376        trace!("0-RTT enabled");
3377        self.zero_rtt_enabled = true;
3378        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3379    }
3380
3381    fn read_crypto(
3382        &mut self,
3383        space: SpaceId,
3384        crypto: &frame::Crypto,
3385        payload_len: usize,
3386    ) -> Result<(), TransportError> {
3387        let expected = if !self.state.is_handshake() {
3388            SpaceId::Data
3389        } else if self.highest_space == SpaceId::Initial {
3390            SpaceId::Initial
3391        } else {
3392            // On the server, self.highest_space can be Data after receiving the client's first
3393            // flight, but we expect Handshake CRYPTO until the handshake is complete.
3394            SpaceId::Handshake
3395        };
3396        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
3397        // packets are illegal, and we don't process 1-RTT packets until the handshake is
3398        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
3399        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3400
3401        let end = crypto.offset + crypto.data.len() as u64;
3402        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3403            warn!(
3404                "received new {:?} CRYPTO data when expecting {:?}",
3405                space, expected
3406            );
3407            return Err(TransportError::PROTOCOL_VIOLATION(
3408                "new data at unexpected encryption level",
3409            ));
3410        }
3411
3412        let space = &mut self.spaces[space];
3413        let max = end.saturating_sub(space.crypto_stream.bytes_read());
3414        if max > self.config.crypto_buffer_size as u64 {
3415            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3416        }
3417
3418        space
3419            .crypto_stream
3420            .insert(crypto.offset, crypto.data.clone(), payload_len);
3421        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3422            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3423            if self.crypto.read_handshake(&chunk.bytes)? {
3424                self.events.push_back(Event::HandshakeDataReady);
3425            }
3426        }
3427
3428        Ok(())
3429    }
3430
3431    fn write_crypto(&mut self) {
3432        loop {
3433            let space = self.highest_space;
3434            let mut outgoing = Vec::new();
3435            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3436                match space {
3437                    SpaceId::Initial => {
3438                        self.upgrade_crypto(SpaceId::Handshake, crypto);
3439                    }
3440                    SpaceId::Handshake => {
3441                        self.upgrade_crypto(SpaceId::Data, crypto);
3442                    }
3443                    _ => unreachable!("got updated secrets during 1-RTT"),
3444                }
3445            }
3446            if outgoing.is_empty() {
3447                if space == self.highest_space {
3448                    break;
3449                } else {
3450                    // Keys updated, check for more data to send
3451                    continue;
3452                }
3453            }
3454            let offset = self.spaces[space].crypto_offset;
3455            let outgoing = Bytes::from(outgoing);
3456            if let Some(hs) = self.state.as_handshake_mut() {
3457                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3458                    hs.client_hello = Some(outgoing.clone());
3459                }
3460            }
3461            self.spaces[space].crypto_offset += outgoing.len() as u64;
3462            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3463            self.spaces[space].pending.crypto.push_back(frame::Crypto {
3464                offset,
3465                data: outgoing,
3466            });
3467        }
3468    }
3469
3470    /// Switch to stronger cryptography during handshake
3471    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3472        debug_assert!(
3473            self.spaces[space].crypto.is_none(),
3474            "already reached packet space {space:?}"
3475        );
3476        trace!("{:?} keys ready", space);
3477        if space == SpaceId::Data {
3478            // Precompute the first key update
3479            self.next_crypto = Some(
3480                self.crypto
3481                    .next_1rtt_keys()
3482                    .expect("handshake should be complete"),
3483            );
3484        }
3485
3486        self.spaces[space].crypto = Some(crypto);
3487        debug_assert!(space as usize > self.highest_space as usize);
3488        self.highest_space = space;
3489        if space == SpaceId::Data && self.side.is_client() {
3490            // Discard 0-RTT keys because 1-RTT keys are available.
3491            self.zero_rtt_crypto = None;
3492        }
3493    }
3494
3495    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3496        debug_assert!(space_id != SpaceId::Data);
3497        trace!("discarding {:?} keys", space_id);
3498        if space_id == SpaceId::Initial {
3499            // No longer needed
3500            if let ConnectionSide::Client { token, .. } = &mut self.side {
3501                *token = Bytes::new();
3502            }
3503        }
3504        let space = &mut self.spaces[space_id];
3505        space.crypto = None;
3506        let pns = space.for_path(PathId::ZERO);
3507        pns.time_of_last_ack_eliciting_packet = None;
3508        pns.loss_time = None;
3509        pns.loss_probes = 0;
3510        let sent_packets = mem::take(&mut pns.sent_packets);
3511        let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3512        for (_, packet) in sent_packets.into_iter() {
3513            path.data.remove_in_flight(&packet);
3514        }
3515
3516        self.set_loss_detection_timer(now, PathId::ZERO)
3517    }
3518
3519    fn handle_coalesced(
3520        &mut self,
3521        now: Instant,
3522        remote: SocketAddr,
3523        path_id: PathId,
3524        ecn: Option<EcnCodepoint>,
3525        data: BytesMut,
3526    ) {
3527        self.path_data_mut(path_id)
3528            .inc_total_recvd(data.len() as u64);
3529        let mut remaining = Some(data);
3530        let cid_len = self
3531            .local_cid_state
3532            .values()
3533            .map(|cid_state| cid_state.cid_len())
3534            .next()
3535            .expect("one cid_state must exist");
3536        while let Some(data) = remaining {
3537            match PartialDecode::new(
3538                data,
3539                &FixedLengthConnectionIdParser::new(cid_len),
3540                &[self.version],
3541                self.endpoint_config.grease_quic_bit,
3542            ) {
3543                Ok((partial_decode, rest)) => {
3544                    remaining = rest;
3545                    self.handle_decode(now, remote, path_id, ecn, partial_decode);
3546                }
3547                Err(e) => {
3548                    trace!("malformed header: {}", e);
3549                    return;
3550                }
3551            }
3552        }
3553    }
3554
3555    fn handle_decode(
3556        &mut self,
3557        now: Instant,
3558        remote: SocketAddr,
3559        path_id: PathId,
3560        ecn: Option<EcnCodepoint>,
3561        partial_decode: PartialDecode,
3562    ) {
3563        let qlog = QlogRecvPacket::new(partial_decode.len());
3564        if let Some(decoded) = packet_crypto::unprotect_header(
3565            partial_decode,
3566            &self.spaces,
3567            self.zero_rtt_crypto.as_ref(),
3568            self.peer_params.stateless_reset_token,
3569        ) {
3570            self.handle_packet(
3571                now,
3572                remote,
3573                path_id,
3574                ecn,
3575                decoded.packet,
3576                decoded.stateless_reset,
3577                qlog,
3578            );
3579        }
3580    }
3581
3582    fn handle_packet(
3583        &mut self,
3584        now: Instant,
3585        remote: SocketAddr,
3586        path_id: PathId,
3587        ecn: Option<EcnCodepoint>,
3588        packet: Option<Packet>,
3589        stateless_reset: bool,
3590        mut qlog: QlogRecvPacket,
3591    ) {
3592        self.stats.udp_rx.ios += 1;
3593        if let Some(ref packet) = packet {
3594            trace!(
3595                "got {:?} packet ({} bytes) from {} using id {}",
3596                packet.header.space(),
3597                packet.payload.len() + packet.header_data.len(),
3598                remote,
3599                packet.header.dst_cid(),
3600            );
3601        }
3602
3603        if self.is_handshaking() {
3604            if path_id != PathId::ZERO {
3605                debug!(%remote, %path_id, "discarding multipath packet during handshake");
3606                return;
3607            }
3608            if remote != self.path_data_mut(path_id).remote {
3609                if let Some(hs) = self.state.as_handshake() {
3610                    if hs.allow_server_migration {
3611                        trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3612                        self.path_data_mut(path_id).remote = remote;
3613                        self.qlog.emit_tuple_assigned(path_id, remote, now);
3614                    } else {
3615                        debug!("discarding packet with unexpected remote during handshake");
3616                        return;
3617                    }
3618                } else {
3619                    debug!("discarding packet with unexpected remote during handshake");
3620                    return;
3621                }
3622            }
3623        }
3624
3625        let was_closed = self.state.is_closed();
3626        let was_drained = self.state.is_drained();
3627
3628        let decrypted = match packet {
3629            None => Err(None),
3630            Some(mut packet) => self
3631                .decrypt_packet(now, path_id, &mut packet)
3632                .map(move |number| (packet, number)),
3633        };
3634        let result = match decrypted {
3635            _ if stateless_reset => {
3636                debug!("got stateless reset");
3637                Err(ConnectionError::Reset)
3638            }
3639            Err(Some(e)) => {
3640                warn!("illegal packet: {}", e);
3641                Err(e.into())
3642            }
3643            Err(None) => {
3644                debug!("failed to authenticate packet");
3645                self.authentication_failures += 1;
3646                let integrity_limit = self.spaces[self.highest_space]
3647                    .crypto
3648                    .as_ref()
3649                    .unwrap()
3650                    .packet
3651                    .local
3652                    .integrity_limit();
3653                if self.authentication_failures > integrity_limit {
3654                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3655                } else {
3656                    return;
3657                }
3658            }
3659            Ok((packet, number)) => {
3660                qlog.header(&packet.header, number, path_id);
3661                let span = match number {
3662                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3663                    None => trace_span!("recv", space = ?packet.header.space()),
3664                };
3665                let _guard = span.enter();
3666
3667                let dedup = self.spaces[packet.header.space()]
3668                    .path_space_mut(path_id)
3669                    .map(|pns| &mut pns.dedup);
3670                if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3671                    debug!("discarding possible duplicate packet");
3672                    self.qlog.emit_packet_received(qlog, now);
3673                    return;
3674                } else if self.state.is_handshake() && packet.header.is_short() {
3675                    // TODO: SHOULD buffer these to improve reordering tolerance.
3676                    trace!("dropping short packet during handshake");
3677                    self.qlog.emit_packet_received(qlog, now);
3678                    return;
3679                } else {
3680                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3681                        if let Some(hs) = self.state.as_handshake() {
3682                            if self.side.is_server() && token != &hs.expected_token {
3683                                // Clients must send the same retry token in every Initial. Initial
3684                                // packets can be spoofed, so we discard rather than killing the
3685                                // connection.
3686                                warn!("discarding Initial with invalid retry token");
3687                                self.qlog.emit_packet_received(qlog, now);
3688                                return;
3689                            }
3690                        }
3691                    }
3692
3693                    if !self.state.is_closed() {
3694                        let spin = match packet.header {
3695                            Header::Short { spin, .. } => spin,
3696                            _ => false,
3697                        };
3698
3699                        if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3700                            // Only the client is allowed to open paths
3701                            self.ensure_path(path_id, remote, now, number);
3702                        }
3703                        if self.paths.contains_key(&path_id) {
3704                            self.on_packet_authenticated(
3705                                now,
3706                                packet.header.space(),
3707                                path_id,
3708                                ecn,
3709                                number,
3710                                spin,
3711                                packet.header.is_1rtt(),
3712                            );
3713                        }
3714                    }
3715
3716                    let res = self
3717                        .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3718
3719                    self.qlog.emit_packet_received(qlog, now);
3720                    res
3721                }
3722            }
3723        };
3724
3725        // State transitions for error cases
3726        if let Err(conn_err) = result {
3727            match conn_err {
3728                ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3729                ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3730                ConnectionError::Reset
3731                | ConnectionError::TransportError(TransportError {
3732                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
3733                    ..
3734                }) => {
3735                    self.state.move_to_drained(Some(conn_err));
3736                }
3737                ConnectionError::TimedOut => {
3738                    unreachable!("timeouts aren't generated by packet processing");
3739                }
3740                ConnectionError::TransportError(err) => {
3741                    debug!("closing connection due to transport error: {}", err);
3742                    self.state.move_to_closed(err);
3743                }
3744                ConnectionError::VersionMismatch => {
3745                    self.state.move_to_draining(Some(conn_err));
3746                }
3747                ConnectionError::LocallyClosed => {
3748                    unreachable!("LocallyClosed isn't generated by packet processing");
3749                }
3750                ConnectionError::CidsExhausted => {
3751                    unreachable!("CidsExhausted isn't generated by packet processing");
3752                }
3753            };
3754        }
3755
3756        if !was_closed && self.state.is_closed() {
3757            self.close_common();
3758            if !self.state.is_drained() {
3759                self.set_close_timer(now);
3760            }
3761        }
3762        if !was_drained && self.state.is_drained() {
3763            self.endpoint_events.push_back(EndpointEventInner::Drained);
3764            // Close timer may have been started previously, e.g. if we sent a close and got a
3765            // stateless reset in response
3766            self.timers
3767                .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3768        }
3769
3770        // Transmit CONNECTION_CLOSE if necessary
3771        if matches!(self.state.as_type(), StateType::Closed) {
3772            // If there is no PathData for this PathId the packet was for a brand new
3773            // path. It was a valid packet however, so the remote is valid and we want to
3774            // send CONNECTION_CLOSE.
3775            let path_remote = self
3776                .paths
3777                .get(&path_id)
3778                .map(|p| p.data.remote)
3779                .unwrap_or(remote);
3780            self.close = remote == path_remote;
3781        }
3782    }
3783
3784    fn process_decrypted_packet(
3785        &mut self,
3786        now: Instant,
3787        remote: SocketAddr,
3788        path_id: PathId,
3789        number: Option<u64>,
3790        packet: Packet,
3791        qlog: &mut QlogRecvPacket,
3792    ) -> Result<(), ConnectionError> {
3793        if !self.paths.contains_key(&path_id) {
3794            // There is a chance this is a server side, first (for this path) packet, which would
3795            // be a protocol violation. It's more likely, however, that this is a packet of a
3796            // pruned path
3797            trace!(%path_id, ?number, "discarding packet for unknown path");
3798            return Ok(());
3799        }
3800        let state = match self.state.as_type() {
3801            StateType::Established => {
3802                match packet.header.space() {
3803                    SpaceId::Data => {
3804                        self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3805                    }
3806                    _ if packet.header.has_frames() => {
3807                        self.process_early_payload(now, path_id, packet, qlog)?
3808                    }
3809                    _ => {
3810                        trace!("discarding unexpected pre-handshake packet");
3811                    }
3812                }
3813                return Ok(());
3814            }
3815            StateType::Closed => {
3816                for result in frame::Iter::new(packet.payload.freeze())? {
3817                    let frame = match result {
3818                        Ok(frame) => frame,
3819                        Err(err) => {
3820                            debug!("frame decoding error: {err:?}");
3821                            continue;
3822                        }
3823                    };
3824                    qlog.frame(&frame);
3825
3826                    if let Frame::Padding = frame {
3827                        continue;
3828                    };
3829
3830                    self.stats.frame_rx.record(&frame);
3831
3832                    if let Frame::Close(_error) = frame {
3833                        self.state.move_to_draining(None);
3834                        break;
3835                    }
3836                }
3837                return Ok(());
3838            }
3839            StateType::Draining | StateType::Drained => return Ok(()),
3840            StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3841        };
3842
3843        match packet.header {
3844            Header::Retry {
3845                src_cid: rem_cid, ..
3846            } => {
3847                debug_assert_eq!(path_id, PathId::ZERO);
3848                if self.side.is_server() {
3849                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3850                }
3851
3852                let is_valid_retry = self
3853                    .rem_cids
3854                    .get(&path_id)
3855                    .map(|cids| cids.active())
3856                    .map(|orig_dst_cid| {
3857                        self.crypto.is_valid_retry(
3858                            orig_dst_cid,
3859                            &packet.header_data,
3860                            &packet.payload,
3861                        )
3862                    })
3863                    .unwrap_or_default();
3864                if self.total_authed_packets > 1
3865                            || packet.payload.len() <= 16 // token + 16 byte tag
3866                            || !is_valid_retry
3867                {
3868                    trace!("discarding invalid Retry");
3869                    // - After the client has received and processed an Initial or Retry
3870                    //   packet from the server, it MUST discard any subsequent Retry
3871                    //   packets that it receives.
3872                    // - A client MUST discard a Retry packet with a zero-length Retry Token
3873                    //   field.
3874                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
3875                    //   that cannot be validated
3876                    return Ok(());
3877                }
3878
3879                trace!("retrying with CID {}", rem_cid);
3880                let client_hello = state.client_hello.take().unwrap();
3881                self.retry_src_cid = Some(rem_cid);
3882                self.rem_cids
3883                    .get_mut(&path_id)
3884                    .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3885                    .update_initial_cid(rem_cid);
3886                self.rem_handshake_cid = rem_cid;
3887
3888                let space = &mut self.spaces[SpaceId::Initial];
3889                if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3890                    self.on_packet_acked(now, PathId::ZERO, info);
3891                };
3892
3893                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after
3894                // any retransmitted Initials
3895                self.spaces[SpaceId::Initial] = {
3896                    let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3897                    space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3898                    space.crypto_offset = client_hello.len() as u64;
3899                    space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3900                        .for_path(path_id)
3901                        .next_packet_number;
3902                    space.pending.crypto.push_back(frame::Crypto {
3903                        offset: 0,
3904                        data: client_hello,
3905                    });
3906                    space
3907                };
3908
3909                // Retransmit all 0-RTT data
3910                let zero_rtt = mem::take(
3911                    &mut self.spaces[SpaceId::Data]
3912                        .for_path(PathId::ZERO)
3913                        .sent_packets,
3914                );
3915                for (_, info) in zero_rtt.into_iter() {
3916                    self.paths
3917                        .get_mut(&PathId::ZERO)
3918                        .unwrap()
3919                        .remove_in_flight(&info);
3920                    self.spaces[SpaceId::Data].pending |= info.retransmits;
3921                }
3922                self.streams.retransmit_all_for_0rtt();
3923
3924                let token_len = packet.payload.len() - 16;
3925                let ConnectionSide::Client { ref mut token, .. } = self.side else {
3926                    unreachable!("we already short-circuited if we're server");
3927                };
3928                *token = packet.payload.freeze().split_to(token_len);
3929
3930                self.state = State::handshake(state::Handshake {
3931                    expected_token: Bytes::new(),
3932                    rem_cid_set: false,
3933                    client_hello: None,
3934                    allow_server_migration: true,
3935                });
3936                Ok(())
3937            }
3938            Header::Long {
3939                ty: LongType::Handshake,
3940                src_cid: rem_cid,
3941                dst_cid: loc_cid,
3942                ..
3943            } => {
3944                debug_assert_eq!(path_id, PathId::ZERO);
3945                if rem_cid != self.rem_handshake_cid {
3946                    debug!(
3947                        "discarding packet with mismatched remote CID: {} != {}",
3948                        self.rem_handshake_cid, rem_cid
3949                    );
3950                    return Ok(());
3951                }
3952                self.on_path_validated(path_id);
3953
3954                self.process_early_payload(now, path_id, packet, qlog)?;
3955                if self.state.is_closed() {
3956                    return Ok(());
3957                }
3958
3959                if self.crypto.is_handshaking() {
3960                    trace!("handshake ongoing");
3961                    return Ok(());
3962                }
3963
3964                if self.side.is_client() {
3965                    // Client-only because server params were set from the client's Initial
3966                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3967                        TransportError::new(
3968                            TransportErrorCode::crypto(0x6d),
3969                            "transport parameters missing".to_owned(),
3970                        )
3971                    })?;
3972
3973                    if self.has_0rtt() {
3974                        if !self.crypto.early_data_accepted().unwrap() {
3975                            debug_assert!(self.side.is_client());
3976                            debug!("0-RTT rejected");
3977                            self.accepted_0rtt = false;
3978                            self.streams.zero_rtt_rejected();
3979
3980                            // Discard already-queued frames
3981                            self.spaces[SpaceId::Data].pending = Retransmits::default();
3982
3983                            // Discard 0-RTT packets
3984                            let sent_packets = mem::take(
3985                                &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3986                            );
3987                            for (_, packet) in sent_packets.into_iter() {
3988                                self.paths
3989                                    .get_mut(&path_id)
3990                                    .unwrap()
3991                                    .remove_in_flight(&packet);
3992                            }
3993                        } else {
3994                            self.accepted_0rtt = true;
3995                            params.validate_resumption_from(&self.peer_params)?;
3996                        }
3997                    }
3998                    if let Some(token) = params.stateless_reset_token {
3999                        let remote = self.path_data(path_id).remote;
4000                        self.endpoint_events
4001                            .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4002                    }
4003                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4004                    self.issue_first_cids(now);
4005                } else {
4006                    // Server-only
4007                    self.spaces[SpaceId::Data].pending.handshake_done = true;
4008                    self.discard_space(now, SpaceId::Handshake);
4009                    self.events.push_back(Event::HandshakeConfirmed);
4010                    trace!("handshake confirmed");
4011                }
4012
4013                self.events.push_back(Event::Connected);
4014                self.state.move_to_established();
4015                trace!("established");
4016
4017                // Multipath can only be enabled after the state has reached Established.
4018                // So this can not happen any earlier.
4019                self.issue_first_path_cids(now);
4020                Ok(())
4021            }
4022            Header::Initial(InitialHeader {
4023                src_cid: rem_cid,
4024                dst_cid: loc_cid,
4025                ..
4026            }) => {
4027                debug_assert_eq!(path_id, PathId::ZERO);
4028                if !state.rem_cid_set {
4029                    trace!("switching remote CID to {}", rem_cid);
4030                    let mut state = state.clone();
4031                    self.rem_cids
4032                        .get_mut(&path_id)
4033                        .expect("PathId::ZERO not yet abandoned")
4034                        .update_initial_cid(rem_cid);
4035                    self.rem_handshake_cid = rem_cid;
4036                    self.orig_rem_cid = rem_cid;
4037                    state.rem_cid_set = true;
4038                    self.state.move_to_handshake(state);
4039                } else if rem_cid != self.rem_handshake_cid {
4040                    debug!(
4041                        "discarding packet with mismatched remote CID: {} != {}",
4042                        self.rem_handshake_cid, rem_cid
4043                    );
4044                    return Ok(());
4045                }
4046
4047                let starting_space = self.highest_space;
4048                self.process_early_payload(now, path_id, packet, qlog)?;
4049
4050                if self.side.is_server()
4051                    && starting_space == SpaceId::Initial
4052                    && self.highest_space != SpaceId::Initial
4053                {
4054                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4055                        TransportError::new(
4056                            TransportErrorCode::crypto(0x6d),
4057                            "transport parameters missing".to_owned(),
4058                        )
4059                    })?;
4060                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4061                    self.issue_first_cids(now);
4062                    self.init_0rtt(now);
4063                }
4064                Ok(())
4065            }
4066            Header::Long {
4067                ty: LongType::ZeroRtt,
4068                ..
4069            } => {
4070                self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4071                Ok(())
4072            }
4073            Header::VersionNegotiate { .. } => {
4074                if self.total_authed_packets > 1 {
4075                    return Ok(());
4076                }
4077                let supported = packet
4078                    .payload
4079                    .chunks(4)
4080                    .any(|x| match <[u8; 4]>::try_from(x) {
4081                        Ok(version) => self.version == u32::from_be_bytes(version),
4082                        Err(_) => false,
4083                    });
4084                if supported {
4085                    return Ok(());
4086                }
4087                debug!("remote doesn't support our version");
4088                Err(ConnectionError::VersionMismatch)
4089            }
4090            Header::Short { .. } => unreachable!(
4091                "short packets received during handshake are discarded in handle_packet"
4092            ),
4093        }
4094    }
4095
4096    /// Process an Initial or Handshake packet payload
4097    fn process_early_payload(
4098        &mut self,
4099        now: Instant,
4100        path_id: PathId,
4101        packet: Packet,
4102        #[allow(unused)] qlog: &mut QlogRecvPacket,
4103    ) -> Result<(), TransportError> {
4104        debug_assert_ne!(packet.header.space(), SpaceId::Data);
4105        debug_assert_eq!(path_id, PathId::ZERO);
4106        let payload_len = packet.payload.len();
4107        let mut ack_eliciting = false;
4108        for result in frame::Iter::new(packet.payload.freeze())? {
4109            let frame = result?;
4110            qlog.frame(&frame);
4111            let span = match frame {
4112                Frame::Padding => continue,
4113                _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4114            };
4115
4116            self.stats.frame_rx.record(&frame);
4117
4118            let _guard = span.as_ref().map(|x| x.enter());
4119            ack_eliciting |= frame.is_ack_eliciting();
4120
4121            // Process frames
4122            if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4123                return Err(TransportError::PROTOCOL_VIOLATION(
4124                    "illegal frame type in handshake",
4125                ));
4126            }
4127
4128            match frame {
4129                Frame::Padding | Frame::Ping => {}
4130                Frame::Crypto(frame) => {
4131                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
4132                }
4133                Frame::Ack(ack) => {
4134                    self.on_ack_received(now, packet.header.space(), ack)?;
4135                }
4136                Frame::PathAck(ack) => {
4137                    span.as_ref()
4138                        .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4139                    self.on_path_ack_received(now, packet.header.space(), ack)?;
4140                }
4141                Frame::Close(reason) => {
4142                    self.state.move_to_draining(Some(reason.into()));
4143                    return Ok(());
4144                }
4145                _ => {
4146                    let mut err =
4147                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4148                    err.frame = Some(frame.ty());
4149                    return Err(err);
4150                }
4151            }
4152        }
4153
4154        if ack_eliciting {
4155            // In the initial and handshake spaces, ACKs must be sent immediately
4156            self.spaces[packet.header.space()]
4157                .for_path(path_id)
4158                .pending_acks
4159                .set_immediate_ack_required();
4160        }
4161
4162        self.write_crypto();
4163        Ok(())
4164    }
4165
4166    /// Processes the packet payload, always in the data space.
4167    fn process_payload(
4168        &mut self,
4169        now: Instant,
4170        remote: SocketAddr,
4171        path_id: PathId,
4172        number: u64,
4173        packet: Packet,
4174        #[allow(unused)] qlog: &mut QlogRecvPacket,
4175    ) -> Result<(), TransportError> {
4176        let payload = packet.payload.freeze();
4177        let mut is_probing_packet = true;
4178        let mut close = None;
4179        let payload_len = payload.len();
4180        let mut ack_eliciting = false;
4181        // if this packet triggers a path migration and includes a observed address frame, it's
4182        // stored here
4183        let mut migration_observed_addr = None;
4184        for result in frame::Iter::new(payload)? {
4185            let frame = result?;
4186            qlog.frame(&frame);
4187            let span = match frame {
4188                Frame::Padding => continue,
4189                _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4190            };
4191
4192            self.stats.frame_rx.record(&frame);
4193            // Crypto, Stream and Datagram frames are special cased in order no pollute
4194            // the log with payload data
4195            match &frame {
4196                Frame::Crypto(f) => {
4197                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4198                }
4199                Frame::Stream(f) => {
4200                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4201                }
4202                Frame::Datagram(f) => {
4203                    trace!(len = f.data.len(), "got datagram frame");
4204                }
4205                f => {
4206                    trace!("got frame {f}");
4207                }
4208            }
4209
4210            let _guard = span.enter();
4211            if packet.header.is_0rtt() {
4212                match frame {
4213                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4214                        return Err(TransportError::PROTOCOL_VIOLATION(
4215                            "illegal frame type in 0-RTT",
4216                        ));
4217                    }
4218                    _ => {
4219                        if frame.is_1rtt() {
4220                            return Err(TransportError::PROTOCOL_VIOLATION(
4221                                "illegal frame type in 0-RTT",
4222                            ));
4223                        }
4224                    }
4225                }
4226            }
4227            ack_eliciting |= frame.is_ack_eliciting();
4228
4229            // Check whether this could be a probing packet
4230            match frame {
4231                Frame::Padding
4232                | Frame::PathChallenge(_)
4233                | Frame::PathResponse(_)
4234                | Frame::NewConnectionId(_)
4235                | Frame::ObservedAddr(_) => {}
4236                _ => {
4237                    is_probing_packet = false;
4238                }
4239            }
4240
4241            match frame {
4242                Frame::Crypto(frame) => {
4243                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4244                }
4245                Frame::Stream(frame) => {
4246                    if self.streams.received(frame, payload_len)?.should_transmit() {
4247                        self.spaces[SpaceId::Data].pending.max_data = true;
4248                    }
4249                }
4250                Frame::Ack(ack) => {
4251                    self.on_ack_received(now, SpaceId::Data, ack)?;
4252                }
4253                Frame::PathAck(ack) => {
4254                    span.record("path", tracing::field::debug(&ack.path_id));
4255                    self.on_path_ack_received(now, SpaceId::Data, ack)?;
4256                }
4257                Frame::Padding | Frame::Ping => {}
4258                Frame::Close(reason) => {
4259                    close = Some(reason);
4260                }
4261                Frame::PathChallenge(challenge) => {
4262                    let path = &mut self
4263                        .path_mut(path_id)
4264                        .expect("payload is processed only after the path becomes known");
4265                    path.path_responses.push(number, challenge.0, remote);
4266                    if remote == path.remote {
4267                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
4268                        // attack. Send a non-probing packet to recover the active path.
4269                        // TODO(flub): No longer true! We now path_challege also to validate
4270                        //    the path if the path is new, without an RFC9000-style
4271                        //    migration involved. This means we add in an extra
4272                        //    IMMEDIATE_ACK on some challenges. It isn't really wrong to do
4273                        //    so, but it still is something untidy. We should instead
4274                        //    suppress this when we know the remote is still validating the
4275                        //    path.
4276                        match self.peer_supports_ack_frequency() {
4277                            true => self.immediate_ack(path_id),
4278                            false => {
4279                                self.ping_path(path_id).ok();
4280                            }
4281                        }
4282                    }
4283                }
4284                Frame::PathResponse(response) => {
4285                    let path = self
4286                        .paths
4287                        .get_mut(&path_id)
4288                        .expect("payload is processed only after the path becomes known");
4289
4290                    use PathTimer::*;
4291                    use paths::OnPathResponseReceived::*;
4292                    match path.data.on_path_response_received(now, response.0, remote) {
4293                        OnPath { was_open } => {
4294                            let qlog = self.qlog.with_time(now);
4295
4296                            self.timers
4297                                .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4298                            self.timers
4299                                .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4300
4301                            let next_challenge = path
4302                                .data
4303                                .earliest_expiring_challenge()
4304                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4305                            self.timers.set_or_stop(
4306                                Timer::PerPath(path_id, PathChallengeLost),
4307                                next_challenge,
4308                                qlog,
4309                            );
4310
4311                            if !was_open {
4312                                self.events
4313                                    .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4314                                if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4315                                {
4316                                    self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4317                                        id: path_id,
4318                                        addr: observed.socket_addr(),
4319                                    }));
4320                                }
4321                            }
4322                            if let Some((_, ref mut prev)) = path.prev {
4323                                prev.challenges_sent.clear();
4324                                prev.send_new_challenge = false;
4325                            }
4326                        }
4327                        OffPath => {
4328                            debug!("Response to off-path PathChallenge!");
4329                            let next_challenge = path
4330                                .data
4331                                .earliest_expiring_challenge()
4332                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4333                            self.timers.set_or_stop(
4334                                Timer::PerPath(path_id, PathChallengeLost),
4335                                next_challenge,
4336                                self.qlog.with_time(now),
4337                            );
4338                        }
4339                        Invalid { expected } => {
4340                            debug!(%response, from=%remote, %expected, "ignoring invalid PATH_RESPONSE")
4341                        }
4342                        Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4343                    }
4344                }
4345                Frame::MaxData(bytes) => {
4346                    self.streams.received_max_data(bytes);
4347                }
4348                Frame::MaxStreamData { id, offset } => {
4349                    self.streams.received_max_stream_data(id, offset)?;
4350                }
4351                Frame::MaxStreams { dir, count } => {
4352                    self.streams.received_max_streams(dir, count)?;
4353                }
4354                Frame::ResetStream(frame) => {
4355                    if self.streams.received_reset(frame)?.should_transmit() {
4356                        self.spaces[SpaceId::Data].pending.max_data = true;
4357                    }
4358                }
4359                Frame::DataBlocked { offset } => {
4360                    debug!(offset, "peer claims to be blocked at connection level");
4361                }
4362                Frame::StreamDataBlocked { id, offset } => {
4363                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4364                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4365                        return Err(TransportError::STREAM_STATE_ERROR(
4366                            "STREAM_DATA_BLOCKED on send-only stream",
4367                        ));
4368                    }
4369                    debug!(
4370                        stream = %id,
4371                        offset, "peer claims to be blocked at stream level"
4372                    );
4373                }
4374                Frame::StreamsBlocked { dir, limit } => {
4375                    if limit > MAX_STREAM_COUNT {
4376                        return Err(TransportError::FRAME_ENCODING_ERROR(
4377                            "unrepresentable stream limit",
4378                        ));
4379                    }
4380                    debug!(
4381                        "peer claims to be blocked opening more than {} {} streams",
4382                        limit, dir
4383                    );
4384                }
4385                Frame::StopSending(frame::StopSending { id, error_code }) => {
4386                    if id.initiator() != self.side.side() {
4387                        if id.dir() == Dir::Uni {
4388                            debug!("got STOP_SENDING on recv-only {}", id);
4389                            return Err(TransportError::STREAM_STATE_ERROR(
4390                                "STOP_SENDING on recv-only stream",
4391                            ));
4392                        }
4393                    } else if self.streams.is_local_unopened(id) {
4394                        return Err(TransportError::STREAM_STATE_ERROR(
4395                            "STOP_SENDING on unopened stream",
4396                        ));
4397                    }
4398                    self.streams.received_stop_sending(id, error_code);
4399                }
4400                Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4401                    if let Some(ref path_id) = path_id {
4402                        span.record("path", tracing::field::debug(&path_id));
4403                    }
4404                    let path_id = path_id.unwrap_or_default();
4405                    match self.local_cid_state.get_mut(&path_id) {
4406                        None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4407                        Some(cid_state) => {
4408                            let allow_more_cids = cid_state
4409                                .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4410
4411                            // If the path has closed, we do not issue more CIDs for this path
4412                            // For details see  https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.2.2
4413                            // > an endpoint SHOULD provide new connection IDs for that path, if still open, using PATH_NEW_CONNECTION_ID frames.
4414                            let has_path = !self.abandoned_paths.contains(&path_id);
4415                            let allow_more_cids = allow_more_cids && has_path;
4416
4417                            self.endpoint_events
4418                                .push_back(EndpointEventInner::RetireConnectionId(
4419                                    now,
4420                                    path_id,
4421                                    sequence,
4422                                    allow_more_cids,
4423                                ));
4424                        }
4425                    }
4426                }
4427                Frame::NewConnectionId(frame) => {
4428                    let path_id = if let Some(path_id) = frame.path_id {
4429                        if !self.is_multipath_negotiated() {
4430                            return Err(TransportError::PROTOCOL_VIOLATION(
4431                                "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4432                            ));
4433                        }
4434                        if path_id > self.local_max_path_id {
4435                            return Err(TransportError::PROTOCOL_VIOLATION(
4436                                "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4437                            ));
4438                        }
4439                        path_id
4440                    } else {
4441                        PathId::ZERO
4442                    };
4443
4444                    if self.abandoned_paths.contains(&path_id) {
4445                        trace!("ignoring issued CID for abandoned path");
4446                        continue;
4447                    }
4448                    if let Some(ref path_id) = frame.path_id {
4449                        span.record("path", tracing::field::debug(&path_id));
4450                    }
4451                    let rem_cids = self
4452                        .rem_cids
4453                        .entry(path_id)
4454                        .or_insert_with(|| CidQueue::new(frame.id));
4455                    if rem_cids.active().is_empty() {
4456                        return Err(TransportError::PROTOCOL_VIOLATION(
4457                            "NEW_CONNECTION_ID when CIDs aren't in use",
4458                        ));
4459                    }
4460                    if frame.retire_prior_to > frame.sequence {
4461                        return Err(TransportError::PROTOCOL_VIOLATION(
4462                            "NEW_CONNECTION_ID retiring unissued CIDs",
4463                        ));
4464                    }
4465
4466                    use crate::cid_queue::InsertError;
4467                    match rem_cids.insert(frame) {
4468                        Ok(None) if self.path(path_id).is_none() => {
4469                            // if this gives us CIDs to open a new path and a nat traversal attempt
4470                            // is underway we could try to probe a pending remote
4471                            self.continue_nat_traversal_round(now);
4472                        }
4473                        Ok(None) => {}
4474                        Ok(Some((retired, reset_token))) => {
4475                            let pending_retired =
4476                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
4477                            /// Ensure `pending_retired` cannot grow without bound. Limit is
4478                            /// somewhat arbitrary but very permissive.
4479                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4480                            // We don't bother counting in-flight frames because those are bounded
4481                            // by congestion control.
4482                            if (pending_retired.len() as u64)
4483                                .saturating_add(retired.end.saturating_sub(retired.start))
4484                                > MAX_PENDING_RETIRED_CIDS
4485                            {
4486                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4487                                    "queued too many retired CIDs",
4488                                ));
4489                            }
4490                            pending_retired.extend(retired.map(|seq| (path_id, seq)));
4491                            self.set_reset_token(path_id, remote, reset_token);
4492                        }
4493                        Err(InsertError::ExceedsLimit) => {
4494                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4495                        }
4496                        Err(InsertError::Retired) => {
4497                            trace!("discarding already-retired");
4498                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
4499                            // range of connection IDs larger than the active connection ID limit
4500                            // was retired all at once via retire_prior_to.
4501                            self.spaces[SpaceId::Data]
4502                                .pending
4503                                .retire_cids
4504                                .push((path_id, frame.sequence));
4505                            continue;
4506                        }
4507                    };
4508
4509                    if self.side.is_server()
4510                        && path_id == PathId::ZERO
4511                        && self
4512                            .rem_cids
4513                            .get(&PathId::ZERO)
4514                            .map(|cids| cids.active_seq() == 0)
4515                            .unwrap_or_default()
4516                    {
4517                        // We're a server still using the initial remote CID for the client, so
4518                        // let's switch immediately to enable clientside stateless resets.
4519                        self.update_rem_cid(PathId::ZERO);
4520                    }
4521                }
4522                Frame::NewToken(NewToken { token }) => {
4523                    let ConnectionSide::Client {
4524                        token_store,
4525                        server_name,
4526                        ..
4527                    } = &self.side
4528                    else {
4529                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4530                    };
4531                    if token.is_empty() {
4532                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4533                    }
4534                    trace!("got new token");
4535                    token_store.insert(server_name, token);
4536                }
4537                Frame::Datagram(datagram) => {
4538                    if self
4539                        .datagrams
4540                        .received(datagram, &self.config.datagram_receive_buffer_size)?
4541                    {
4542                        self.events.push_back(Event::DatagramReceived);
4543                    }
4544                }
4545                Frame::AckFrequency(ack_frequency) => {
4546                    // This frame can only be sent in the Data space
4547
4548                    if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4549                        // The AckFrequency frame is stale (we have already received a more
4550                        // recent one)
4551                        continue;
4552                    }
4553
4554                    // Update the params for all of our paths
4555                    for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4556                        space.pending_acks.set_ack_frequency_params(&ack_frequency);
4557
4558                        // Our `max_ack_delay` has been updated, so we may need to adjust
4559                        // its associated timeout
4560                        if let Some(timeout) = space
4561                            .pending_acks
4562                            .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4563                        {
4564                            self.timers.set(
4565                                Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4566                                timeout,
4567                                self.qlog.with_time(now),
4568                            );
4569                        }
4570                    }
4571                }
4572                Frame::ImmediateAck => {
4573                    // This frame can only be sent in the Data space
4574                    for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4575                        pns.pending_acks.set_immediate_ack_required();
4576                    }
4577                }
4578                Frame::HandshakeDone => {
4579                    if self.side.is_server() {
4580                        return Err(TransportError::PROTOCOL_VIOLATION(
4581                            "client sent HANDSHAKE_DONE",
4582                        ));
4583                    }
4584                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
4585                        self.discard_space(now, SpaceId::Handshake);
4586                    }
4587                    self.events.push_back(Event::HandshakeConfirmed);
4588                    trace!("handshake confirmed");
4589                }
4590                Frame::ObservedAddr(observed) => {
4591                    // check if params allows the peer to send report and this node to receive it
4592                    trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4593                    if !self
4594                        .peer_params
4595                        .address_discovery_role
4596                        .should_report(&self.config.address_discovery_role)
4597                    {
4598                        return Err(TransportError::PROTOCOL_VIOLATION(
4599                            "received OBSERVED_ADDRESS frame when not negotiated",
4600                        ));
4601                    }
4602                    // must only be sent in data space
4603                    if packet.header.space() != SpaceId::Data {
4604                        return Err(TransportError::PROTOCOL_VIOLATION(
4605                            "OBSERVED_ADDRESS frame outside data space",
4606                        ));
4607                    }
4608
4609                    let path = self.path_data_mut(path_id);
4610                    if remote == path.remote {
4611                        if let Some(updated) = path.update_observed_addr_report(observed) {
4612                            if path.open {
4613                                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4614                                    id: path_id,
4615                                    addr: updated,
4616                                }));
4617                            }
4618                            // otherwise the event is reported when the path is deemed open
4619                        }
4620                    } else {
4621                        // include in migration
4622                        migration_observed_addr = Some(observed)
4623                    }
4624                }
4625                Frame::PathAbandon(frame::PathAbandon {
4626                    path_id,
4627                    error_code,
4628                }) => {
4629                    span.record("path", tracing::field::debug(&path_id));
4630                    // TODO(flub): don't really know which error code to use here.
4631                    let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4632                        Ok(()) => {
4633                            trace!("peer abandoned path");
4634                            false
4635                        }
4636                        Err(ClosePathError::LastOpenPath) => {
4637                            trace!("peer abandoned last path, closing connection");
4638                            // TODO(flub): which error code?
4639                            return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4640                        }
4641                        Err(ClosePathError::ClosedPath) => {
4642                            trace!("peer abandoned already closed path");
4643                            true
4644                        }
4645                    };
4646                    // If we receive a retransmit of PATH_ABANDON then we may already have
4647                    // abandoned this path locally.  In that case the DiscardPath timer
4648                    // may already have fired and we no longer have any state for this path.
4649                    // Only set this timer if we still have path state.
4650                    if self.path(path_id).is_some() && !already_abandoned {
4651                        // TODO(flub): Checking is_some() here followed by a number of calls
4652                        //    that would panic if it was None is really ugly.  If only we
4653                        //    could do something like PathData::pto().  One day we'll have
4654                        //    unified SpaceId and PathId and this will be possible.
4655                        let delay = self.pto(SpaceId::Data, path_id) * 3;
4656                        self.timers.set(
4657                            Timer::PerPath(path_id, PathTimer::DiscardPath),
4658                            now + delay,
4659                            self.qlog.with_time(now),
4660                        );
4661                    }
4662                }
4663                Frame::PathStatusAvailable(info) => {
4664                    span.record("path", tracing::field::debug(&info.path_id));
4665                    if self.is_multipath_negotiated() {
4666                        self.on_path_status(
4667                            info.path_id,
4668                            PathStatus::Available,
4669                            info.status_seq_no,
4670                        );
4671                    } else {
4672                        return Err(TransportError::PROTOCOL_VIOLATION(
4673                            "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4674                        ));
4675                    }
4676                }
4677                Frame::PathStatusBackup(info) => {
4678                    span.record("path", tracing::field::debug(&info.path_id));
4679                    if self.is_multipath_negotiated() {
4680                        self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4681                    } else {
4682                        return Err(TransportError::PROTOCOL_VIOLATION(
4683                            "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4684                        ));
4685                    }
4686                }
4687                Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4688                    span.record("path", tracing::field::debug(&path_id));
4689                    if !self.is_multipath_negotiated() {
4690                        return Err(TransportError::PROTOCOL_VIOLATION(
4691                            "received MAX_PATH_ID frame when multipath was not negotiated",
4692                        ));
4693                    }
4694                    // frames that do not increase the path id are ignored
4695                    if path_id > self.remote_max_path_id {
4696                        self.remote_max_path_id = path_id;
4697                        self.issue_first_path_cids(now);
4698                        while let Some(true) = self.continue_nat_traversal_round(now) {}
4699                    }
4700                }
4701                Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4702                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4703                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4704                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4705                    if self.is_multipath_negotiated() {
4706                        if self.local_max_path_id > max_path_id {
4707                            return Err(TransportError::PROTOCOL_VIOLATION(
4708                                "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4709                            ));
4710                        }
4711                        debug!("received PATHS_BLOCKED({:?})", max_path_id);
4712                        // TODO(@divma): ensure max concurrent paths
4713                    } else {
4714                        return Err(TransportError::PROTOCOL_VIOLATION(
4715                            "received PATHS_BLOCKED frame when not multipath was not negotiated",
4716                        ));
4717                    }
4718                }
4719                Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4720                    // Nothing to do.  This is recorded in the frame stats, but otherwise we
4721                    // always issue all CIDs we're allowed to issue, so either this is an
4722                    // impatient peer or a bug on our side.
4723
4724                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4725                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4726                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4727                    if self.is_multipath_negotiated() {
4728                        if path_id > self.local_max_path_id {
4729                            return Err(TransportError::PROTOCOL_VIOLATION(
4730                                "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4731                            ));
4732                        }
4733                        if next_seq.0
4734                            > self
4735                                .local_cid_state
4736                                .get(&path_id)
4737                                .map(|cid_state| cid_state.active_seq().1 + 1)
4738                                .unwrap_or_default()
4739                        {
4740                            return Err(TransportError::PROTOCOL_VIOLATION(
4741                                "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4742                            ));
4743                        }
4744                        debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4745                    } else {
4746                        return Err(TransportError::PROTOCOL_VIOLATION(
4747                            "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4748                        ));
4749                    }
4750                }
4751                Frame::AddAddress(addr) => {
4752                    let client_state = match self.iroh_hp.client_side_mut() {
4753                        Ok(state) => state,
4754                        Err(err) => {
4755                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4756                                "Nat traversal(ADD_ADDRESS): {err}"
4757                            )));
4758                        }
4759                    };
4760
4761                    if !client_state.check_remote_address(&addr) {
4762                        // if the address is not valid we flag it, but update anyway
4763                        warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4764                    }
4765
4766                    match client_state.add_remote_address(addr) {
4767                        Ok(maybe_added) => {
4768                            if let Some(added) = maybe_added {
4769                                self.events.push_back(Event::NatTraversal(
4770                                    iroh_hp::Event::AddressAdded(added),
4771                                ));
4772                            }
4773                        }
4774                        Err(e) => {
4775                            warn!(%e, "failed to add remote address")
4776                        }
4777                    }
4778                }
4779                Frame::RemoveAddress(addr) => {
4780                    let client_state = match self.iroh_hp.client_side_mut() {
4781                        Ok(state) => state,
4782                        Err(err) => {
4783                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4784                                "Nat traversal(REMOVE_ADDRESS): {err}"
4785                            )));
4786                        }
4787                    };
4788                    if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4789                        self.events
4790                            .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4791                                removed_addr,
4792                            )));
4793                    }
4794                }
4795                Frame::ReachOut(reach_out) => {
4796                    let server_state = match self.iroh_hp.server_side_mut() {
4797                        Ok(state) => state,
4798                        Err(err) => {
4799                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4800                                "Nat traversal(REACH_OUT): {err}"
4801                            )));
4802                        }
4803                    };
4804
4805                    if let Err(err) = server_state.handle_reach_out(reach_out) {
4806                        return Err(TransportError::PROTOCOL_VIOLATION(format!(
4807                            "Nat traversal(REACH_OUT): {err}"
4808                        )));
4809                    }
4810                }
4811            }
4812        }
4813
4814        let space = self.spaces[SpaceId::Data].for_path(path_id);
4815        if space
4816            .pending_acks
4817            .packet_received(now, number, ack_eliciting, &space.dedup)
4818        {
4819            if self.abandoned_paths.contains(&path_id) {
4820                // § 3.4.3 QUIC-MULTIPATH: promptly send ACKs for packets received from
4821                // abandoned paths.
4822                space.pending_acks.set_immediate_ack_required();
4823            } else {
4824                self.timers.set(
4825                    Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4826                    now + self.ack_frequency.max_ack_delay,
4827                    self.qlog.with_time(now),
4828                );
4829            }
4830        }
4831
4832        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
4833        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
4834        // are only freed, and hence only issue credit, once the application has been notified
4835        // during a read on the stream.
4836        let pending = &mut self.spaces[SpaceId::Data].pending;
4837        self.streams.queue_max_stream_id(pending);
4838
4839        if let Some(reason) = close {
4840            self.state.move_to_draining(Some(reason.into()));
4841            self.close = true;
4842        }
4843
4844        if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4845            && !is_probing_packet
4846            && remote != self.path_data(path_id).remote
4847        {
4848            let ConnectionSide::Server { ref server_config } = self.side else {
4849                panic!("packets from unknown remote should be dropped by clients");
4850            };
4851            debug_assert!(
4852                server_config.migration,
4853                "migration-initiating packets should have been dropped immediately"
4854            );
4855            self.migrate(path_id, now, remote, migration_observed_addr);
4856            // Break linkability, if possible
4857            self.update_rem_cid(path_id);
4858            self.spin = false;
4859        }
4860
4861        Ok(())
4862    }
4863
4864    fn migrate(
4865        &mut self,
4866        path_id: PathId,
4867        now: Instant,
4868        remote: SocketAddr,
4869        observed_addr: Option<ObservedAddr>,
4870    ) {
4871        trace!(%remote, %path_id, "migration initiated");
4872        self.path_counter = self.path_counter.wrapping_add(1);
4873        // TODO(@divma): conditions for path migration in multipath are very specific, check them
4874        // again to prevent path migrations that should actually create a new path
4875
4876        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
4877        // Note that the congestion window will not grow until validation terminates. Helps mitigate
4878        // amplification attacks performed by spoofing source addresses.
4879        let prev_pto = self.pto(SpaceId::Data, path_id);
4880        let known_path = self.paths.get_mut(&path_id).expect("known path");
4881        let path = &mut known_path.data;
4882        let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4883            PathData::from_previous(remote, path, self.path_counter, now)
4884        } else {
4885            let peer_max_udp_payload_size =
4886                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4887                    .unwrap_or(u16::MAX);
4888            PathData::new(
4889                remote,
4890                self.allow_mtud,
4891                Some(peer_max_udp_payload_size),
4892                self.path_counter,
4893                now,
4894                &self.config,
4895            )
4896        };
4897        new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4898        if let Some(report) = observed_addr {
4899            if let Some(updated) = new_path.update_observed_addr_report(report) {
4900                tracing::info!("adding observed addr event from migration");
4901                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4902                    id: path_id,
4903                    addr: updated,
4904                }));
4905            }
4906        }
4907        new_path.send_new_challenge = true;
4908
4909        let mut prev = mem::replace(path, new_path);
4910        // Don't clobber the original path if the previous one hasn't been validated yet
4911        if !prev.is_validating_path() {
4912            prev.send_new_challenge = true;
4913            // We haven't updated the remote CID yet, this captures the remote CID we were using on
4914            // the previous path.
4915
4916            known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4917        }
4918
4919        // We need to re-assign the correct remote to this path in qlog
4920        self.qlog.emit_tuple_assigned(path_id, remote, now);
4921
4922        self.timers.set(
4923            Timer::PerPath(path_id, PathTimer::PathValidation),
4924            now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4925            self.qlog.with_time(now),
4926        );
4927    }
4928
4929    /// Handle a change in the local address, i.e. an active migration
4930    pub fn local_address_changed(&mut self) {
4931        // TODO(flub): if multipath is enabled this needs to create a new path entirely.
4932        self.update_rem_cid(PathId::ZERO);
4933        self.ping();
4934    }
4935
4936    /// Switch to a previously unused remote connection ID, if possible
4937    fn update_rem_cid(&mut self, path_id: PathId) {
4938        let Some((reset_token, retired)) =
4939            self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4940        else {
4941            return;
4942        };
4943
4944        // Retire the current remote CID and any CIDs we had to skip.
4945        self.spaces[SpaceId::Data]
4946            .pending
4947            .retire_cids
4948            .extend(retired.map(|seq| (path_id, seq)));
4949        let remote = self.path_data(path_id).remote;
4950        self.set_reset_token(path_id, remote, reset_token);
4951    }
4952
4953    /// Sends this reset token to the endpoint
4954    ///
4955    /// The endpoint needs to know the reset tokens issued by the peer, so that if the peer
4956    /// sends a reset token it knows to route it to this connection. See RFC 9000 section
4957    /// 10.3. Stateless Reset.
4958    ///
4959    /// Reset tokens are different for each path, the endpoint identifies paths by peer
4960    /// socket address however, not by path ID.
4961    fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4962        self.endpoint_events
4963            .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4964
4965        // During the handshake the server sends a reset token in the transport
4966        // parameters. When we are the client and we receive the reset token during the
4967        // handshake we want this to affect our peer transport parameters.
4968        // TODO(flub): Pretty sure this is pointless, the entire params is overwritten
4969        //    shortly after this was called.  And then the params don't have this anymore.
4970        if path_id == PathId::ZERO {
4971            self.peer_params.stateless_reset_token = Some(reset_token);
4972        }
4973    }
4974
4975    /// Issue an initial set of connection IDs to the peer upon connection
4976    fn issue_first_cids(&mut self, now: Instant) {
4977        if self
4978            .local_cid_state
4979            .get(&PathId::ZERO)
4980            .expect("PathId::ZERO exists when the connection is created")
4981            .cid_len()
4982            == 0
4983        {
4984            return;
4985        }
4986
4987        // Subtract 1 to account for the CID we supplied while handshaking
4988        let mut n = self.peer_params.issue_cids_limit() - 1;
4989        if let ConnectionSide::Server { server_config } = &self.side {
4990            if server_config.has_preferred_address() {
4991                // We also sent a CID in the transport parameters
4992                n -= 1;
4993            }
4994        }
4995        self.endpoint_events
4996            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
4997    }
4998
4999    /// Issues an initial set of CIDs for paths that have not yet had any CIDs issued
5000    ///
5001    /// Later CIDs are issued when CIDs expire or are retired by the peer.
5002    fn issue_first_path_cids(&mut self, now: Instant) {
5003        if let Some(max_path_id) = self.max_path_id() {
5004            let mut path_id = self.max_path_id_with_cids.next();
5005            while path_id <= max_path_id {
5006                self.endpoint_events
5007                    .push_back(EndpointEventInner::NeedIdentifiers(
5008                        path_id,
5009                        now,
5010                        self.peer_params.issue_cids_limit(),
5011                    ));
5012                path_id = path_id.next();
5013            }
5014            self.max_path_id_with_cids = max_path_id;
5015        }
5016    }
5017
5018    /// Populates a packet with frames
5019    ///
5020    /// This tries to fit as many frames as possible into the packet.
5021    ///
5022    /// *path_exclusive_only* means to only build frames which can only be sent on this
5023    /// *path.  This is used in multipath for backup paths while there is still an active
5024    /// *path.
5025    fn populate_packet(
5026        &mut self,
5027        now: Instant,
5028        space_id: SpaceId,
5029        path_id: PathId,
5030        path_exclusive_only: bool,
5031        buf: &mut impl BufMut,
5032        pn: u64,
5033        #[allow(unused)] qlog: &mut QlogSentPacket,
5034    ) -> SentFrames {
5035        let mut sent = SentFrames::default();
5036        let is_multipath_negotiated = self.is_multipath_negotiated();
5037        let space = &mut self.spaces[space_id];
5038        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5039        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5040        space
5041            .for_path(path_id)
5042            .pending_acks
5043            .maybe_ack_non_eliciting();
5044
5045        // HANDSHAKE_DONE
5046        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5047            trace!("HANDSHAKE_DONE");
5048            buf.write(frame::FrameType::HANDSHAKE_DONE);
5049            qlog.frame(&Frame::HandshakeDone);
5050            sent.retransmits.get_or_create().handshake_done = true;
5051            // This is just a u8 counter and the frame is typically just sent once
5052            self.stats.frame_tx.handshake_done =
5053                self.stats.frame_tx.handshake_done.saturating_add(1);
5054        }
5055
5056        // REACH_OUT
5057        // TODO(@divma): path explusive considerations
5058        if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5059            while let Some(local_addr) = addresses.pop() {
5060                let reach_out = frame::ReachOut::new(*round, local_addr);
5061                if buf.remaining_mut() > reach_out.size() {
5062                    trace!(%round, ?local_addr, "REACH_OUT");
5063                    reach_out.write(buf);
5064                    let sent_reachouts = sent
5065                        .retransmits
5066                        .get_or_create()
5067                        .reach_out
5068                        .get_or_insert_with(|| (*round, Default::default()));
5069                    sent_reachouts.1.push(local_addr);
5070                    self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5071                    qlog.frame(&Frame::ReachOut(reach_out));
5072                } else {
5073                    addresses.push(local_addr);
5074                    break;
5075                }
5076            }
5077            if addresses.is_empty() {
5078                space.pending.reach_out = None;
5079            }
5080        }
5081
5082        // OBSERVED_ADDR
5083        if !path_exclusive_only
5084            && space_id == SpaceId::Data
5085            && self
5086                .config
5087                .address_discovery_role
5088                .should_report(&self.peer_params.address_discovery_role)
5089            && (!path.observed_addr_sent || space.pending.observed_addr)
5090        {
5091            let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5092            if buf.remaining_mut() > frame.size() {
5093                trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5094                frame.write(buf);
5095
5096                self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5097                path.observed_addr_sent = true;
5098
5099                self.stats.frame_tx.observed_addr += 1;
5100                sent.retransmits.get_or_create().observed_addr = true;
5101                space.pending.observed_addr = false;
5102                qlog.frame(&Frame::ObservedAddr(frame));
5103            }
5104        }
5105
5106        // PING
5107        if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5108            trace!("PING");
5109            buf.write(frame::FrameType::PING);
5110            sent.non_retransmits = true;
5111            self.stats.frame_tx.ping += 1;
5112            qlog.frame(&Frame::Ping);
5113        }
5114
5115        // IMMEDIATE_ACK
5116        if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5117            debug_assert_eq!(
5118                space_id,
5119                SpaceId::Data,
5120                "immediate acks must be sent in the data space"
5121            );
5122            trace!("IMMEDIATE_ACK");
5123            buf.write(frame::FrameType::IMMEDIATE_ACK);
5124            sent.non_retransmits = true;
5125            self.stats.frame_tx.immediate_ack += 1;
5126            qlog.frame(&Frame::ImmediateAck);
5127        }
5128
5129        // ACK
5130        // TODO(flub): Should this send acks for this path anyway?
5131
5132        if !path_exclusive_only {
5133            for path_id in space
5134                .number_spaces
5135                .iter_mut()
5136                .filter(|(_, pns)| pns.pending_acks.can_send())
5137                .map(|(&path_id, _)| path_id)
5138                .collect::<Vec<_>>()
5139            {
5140                Self::populate_acks(
5141                    now,
5142                    self.receiving_ecn,
5143                    &mut sent,
5144                    path_id,
5145                    space_id,
5146                    space,
5147                    is_multipath_negotiated,
5148                    buf,
5149                    &mut self.stats,
5150                    qlog,
5151                );
5152            }
5153        }
5154
5155        // ACK_FREQUENCY
5156        if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5157            let sequence_number = self.ack_frequency.next_sequence_number();
5158
5159            // Safe to unwrap because this is always provided when ACK frequency is enabled
5160            let config = self.config.ack_frequency_config.as_ref().unwrap();
5161
5162            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
5163            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5164                path.rtt.get(),
5165                config,
5166                &self.peer_params,
5167            );
5168
5169            trace!(?max_ack_delay, "ACK_FREQUENCY");
5170
5171            let frame = frame::AckFrequency {
5172                sequence: sequence_number,
5173                ack_eliciting_threshold: config.ack_eliciting_threshold,
5174                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5175                reordering_threshold: config.reordering_threshold,
5176            };
5177            frame.encode(buf);
5178            qlog.frame(&Frame::AckFrequency(frame));
5179
5180            sent.retransmits.get_or_create().ack_frequency = true;
5181
5182            self.ack_frequency
5183                .ack_frequency_sent(path_id, pn, max_ack_delay);
5184            self.stats.frame_tx.ack_frequency += 1;
5185        }
5186
5187        // PATH_CHALLENGE
5188        if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5189            && space_id == SpaceId::Data
5190            && path.send_new_challenge
5191            && !self.state.is_closed()
5192        // we don't want to send new challenges if we are already closing
5193        {
5194            path.send_new_challenge = false;
5195
5196            // Generate a new challenge every time we send a new PATH_CHALLENGE
5197            let token = self.rng.random();
5198            let info = paths::SentChallengeInfo {
5199                sent_instant: now,
5200                remote: path.remote,
5201            };
5202            path.challenges_sent.insert(token, info);
5203            sent.non_retransmits = true;
5204            sent.requires_padding = true;
5205            let challenge = frame::PathChallenge(token);
5206            trace!(frame = %challenge);
5207            buf.write(challenge);
5208            qlog.frame(&Frame::PathChallenge(challenge));
5209            self.stats.frame_tx.path_challenge += 1;
5210            let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5211            self.timers.set(
5212                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5213                now + pto,
5214                self.qlog.with_time(now),
5215            );
5216
5217            if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5218                // queue informing the path status along with the challenge
5219                space.pending.path_status.insert(path_id);
5220            }
5221
5222            // Always include an OBSERVED_ADDR frame with a PATH_CHALLENGE, regardless
5223            // of whether one has already been sent on this path.
5224            if space_id == SpaceId::Data
5225                && self
5226                    .config
5227                    .address_discovery_role
5228                    .should_report(&self.peer_params.address_discovery_role)
5229            {
5230                let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5231                if buf.remaining_mut() > frame.size() {
5232                    frame.write(buf);
5233                    qlog.frame(&Frame::ObservedAddr(frame));
5234
5235                    self.next_observed_addr_seq_no =
5236                        self.next_observed_addr_seq_no.saturating_add(1u8);
5237                    path.observed_addr_sent = true;
5238
5239                    self.stats.frame_tx.observed_addr += 1;
5240                    sent.retransmits.get_or_create().observed_addr = true;
5241                    space.pending.observed_addr = false;
5242                }
5243            }
5244        }
5245
5246        // PATH_RESPONSE
5247        if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5248            if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5249                sent.non_retransmits = true;
5250                sent.requires_padding = true;
5251                let response = frame::PathResponse(token);
5252                trace!(frame = %response);
5253                buf.write(response);
5254                qlog.frame(&Frame::PathResponse(response));
5255                self.stats.frame_tx.path_response += 1;
5256
5257                // NOTE: this is technically not required but might be useful to ride the
5258                // request/response nature of path challenges to refresh an observation
5259                // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
5260                if space_id == SpaceId::Data
5261                    && self
5262                        .config
5263                        .address_discovery_role
5264                        .should_report(&self.peer_params.address_discovery_role)
5265                {
5266                    let frame =
5267                        frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5268                    if buf.remaining_mut() > frame.size() {
5269                        frame.write(buf);
5270                        qlog.frame(&Frame::ObservedAddr(frame));
5271
5272                        self.next_observed_addr_seq_no =
5273                            self.next_observed_addr_seq_no.saturating_add(1u8);
5274                        path.observed_addr_sent = true;
5275
5276                        self.stats.frame_tx.observed_addr += 1;
5277                        sent.retransmits.get_or_create().observed_addr = true;
5278                        space.pending.observed_addr = false;
5279                    }
5280                }
5281            }
5282        }
5283
5284        // CRYPTO
5285        while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5286            let mut frame = match space.pending.crypto.pop_front() {
5287                Some(x) => x,
5288                None => break,
5289            };
5290
5291            // Calculate the maximum amount of crypto data we can store in the buffer.
5292            // Since the offset is known, we can reserve the exact size required to encode it.
5293            // For length we reserve 2bytes which allows to encode up to 2^14,
5294            // which is more than what fits into normally sized QUIC frames.
5295            let max_crypto_data_size = buf.remaining_mut()
5296                - 1 // Frame Type
5297                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5298                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
5299
5300            let len = frame
5301                .data
5302                .len()
5303                .min(2usize.pow(14) - 1)
5304                .min(max_crypto_data_size);
5305
5306            let data = frame.data.split_to(len);
5307            let truncated = frame::Crypto {
5308                offset: frame.offset,
5309                data,
5310            };
5311            trace!(
5312                "CRYPTO: off {} len {}",
5313                truncated.offset,
5314                truncated.data.len()
5315            );
5316            truncated.encode(buf);
5317            self.stats.frame_tx.crypto += 1;
5318
5319            // The clone is cheap but we still cfg it out if qlog is disabled.
5320            #[cfg(feature = "qlog")]
5321            qlog.frame(&Frame::Crypto(truncated.clone()));
5322            sent.retransmits.get_or_create().crypto.push_back(truncated);
5323            if !frame.data.is_empty() {
5324                frame.offset += len as u64;
5325                space.pending.crypto.push_front(frame);
5326            }
5327        }
5328
5329        // TODO(flub): maybe this is much higher priority?
5330        // PATH_ABANDON
5331        while !path_exclusive_only
5332            && space_id == SpaceId::Data
5333            && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5334        {
5335            let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5336                break;
5337            };
5338            let frame = frame::PathAbandon {
5339                path_id,
5340                error_code,
5341            };
5342            frame.encode(buf);
5343            qlog.frame(&Frame::PathAbandon(frame));
5344            self.stats.frame_tx.path_abandon += 1;
5345            trace!(%path_id, "PATH_ABANDON");
5346            sent.retransmits
5347                .get_or_create()
5348                .path_abandon
5349                .entry(path_id)
5350                .or_insert(error_code);
5351        }
5352
5353        // PATH_STATUS_AVAILABLE & PATH_STATUS_BACKUP
5354        while !path_exclusive_only
5355            && space_id == SpaceId::Data
5356            && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5357        {
5358            let Some(path_id) = space.pending.path_status.pop_first() else {
5359                break;
5360            };
5361            let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5362                trace!(%path_id, "discarding queued path status for unknown path");
5363                continue;
5364            };
5365
5366            let seq = path.status.seq();
5367            sent.retransmits.get_or_create().path_status.insert(path_id);
5368            match path.local_status() {
5369                PathStatus::Available => {
5370                    let frame = frame::PathStatusAvailable {
5371                        path_id,
5372                        status_seq_no: seq,
5373                    };
5374                    frame.encode(buf);
5375                    qlog.frame(&Frame::PathStatusAvailable(frame));
5376                    self.stats.frame_tx.path_status_available += 1;
5377                    trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5378                }
5379                PathStatus::Backup => {
5380                    let frame = frame::PathStatusBackup {
5381                        path_id,
5382                        status_seq_no: seq,
5383                    };
5384                    frame.encode(buf);
5385                    qlog.frame(&Frame::PathStatusBackup(frame));
5386                    self.stats.frame_tx.path_status_backup += 1;
5387                    trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5388                }
5389            }
5390        }
5391
5392        // MAX_PATH_ID
5393        if space_id == SpaceId::Data
5394            && space.pending.max_path_id
5395            && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5396        {
5397            let frame = frame::MaxPathId(self.local_max_path_id);
5398            frame.encode(buf);
5399            qlog.frame(&Frame::MaxPathId(frame));
5400            space.pending.max_path_id = false;
5401            sent.retransmits.get_or_create().max_path_id = true;
5402            trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5403            self.stats.frame_tx.max_path_id += 1;
5404        }
5405
5406        // PATHS_BLOCKED
5407        if space_id == SpaceId::Data
5408            && space.pending.paths_blocked
5409            && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5410        {
5411            let frame = frame::PathsBlocked(self.remote_max_path_id);
5412            frame.encode(buf);
5413            qlog.frame(&Frame::PathsBlocked(frame));
5414            space.pending.paths_blocked = false;
5415            sent.retransmits.get_or_create().paths_blocked = true;
5416            trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5417            self.stats.frame_tx.paths_blocked += 1;
5418        }
5419
5420        // PATH_CIDS_BLOCKED
5421        while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5422        {
5423            let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5424                break;
5425            };
5426            let next_seq = match self.rem_cids.get(&path_id) {
5427                Some(cid_queue) => cid_queue.active_seq() + 1,
5428                None => 0,
5429            };
5430            let frame = frame::PathCidsBlocked {
5431                path_id,
5432                next_seq: VarInt(next_seq),
5433            };
5434            frame.encode(buf);
5435            qlog.frame(&Frame::PathCidsBlocked(frame));
5436            sent.retransmits
5437                .get_or_create()
5438                .path_cids_blocked
5439                .push(path_id);
5440            trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5441            self.stats.frame_tx.path_cids_blocked += 1;
5442        }
5443
5444        // RESET_STREAM, STOP_SENDING, MAX_DATA, MAX_STREAM_DATA, MAX_STREAMS
5445        if space_id == SpaceId::Data {
5446            self.streams.write_control_frames(
5447                buf,
5448                &mut space.pending,
5449                &mut sent.retransmits,
5450                &mut self.stats.frame_tx,
5451                qlog,
5452            );
5453        }
5454
5455        // NEW_CONNECTION_ID
5456        let cid_len = self
5457            .local_cid_state
5458            .values()
5459            .map(|cid_state| cid_state.cid_len())
5460            .max()
5461            .expect("some local CID state must exist");
5462        let new_cid_size_bound =
5463            frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5464        while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5465            let issued = match space.pending.new_cids.pop() {
5466                Some(x) => x,
5467                None => break,
5468            };
5469            let retire_prior_to = self
5470                .local_cid_state
5471                .get(&issued.path_id)
5472                .map(|cid_state| cid_state.retire_prior_to())
5473                .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5474
5475            let cid_path_id = match is_multipath_negotiated {
5476                true => {
5477                    trace!(
5478                        path_id = ?issued.path_id,
5479                        sequence = issued.sequence,
5480                        id = %issued.id,
5481                        "PATH_NEW_CONNECTION_ID",
5482                    );
5483                    self.stats.frame_tx.path_new_connection_id += 1;
5484                    Some(issued.path_id)
5485                }
5486                false => {
5487                    trace!(
5488                        sequence = issued.sequence,
5489                        id = %issued.id,
5490                        "NEW_CONNECTION_ID"
5491                    );
5492                    debug_assert_eq!(issued.path_id, PathId::ZERO);
5493                    self.stats.frame_tx.new_connection_id += 1;
5494                    None
5495                }
5496            };
5497            let frame = frame::NewConnectionId {
5498                path_id: cid_path_id,
5499                sequence: issued.sequence,
5500                retire_prior_to,
5501                id: issued.id,
5502                reset_token: issued.reset_token,
5503            };
5504            frame.encode(buf);
5505            sent.retransmits.get_or_create().new_cids.push(issued);
5506            qlog.frame(&Frame::NewConnectionId(frame));
5507        }
5508
5509        // RETIRE_CONNECTION_ID
5510        let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5511        while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5512            let (path_id, sequence) = match space.pending.retire_cids.pop() {
5513                Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5514                    trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5515                    self.stats.frame_tx.retire_connection_id += 1;
5516                    (None, seq)
5517                }
5518                Some((path_id, seq)) => {
5519                    trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5520                    self.stats.frame_tx.path_retire_connection_id += 1;
5521                    (Some(path_id), seq)
5522                }
5523                None => break,
5524            };
5525            let frame = frame::RetireConnectionId { path_id, sequence };
5526            frame.encode(buf);
5527            qlog.frame(&Frame::RetireConnectionId(frame));
5528            sent.retransmits
5529                .get_or_create()
5530                .retire_cids
5531                .push((path_id.unwrap_or_default(), sequence));
5532        }
5533
5534        // DATAGRAM
5535        let mut sent_datagrams = false;
5536        while !path_exclusive_only
5537            && buf.remaining_mut() > Datagram::SIZE_BOUND
5538            && space_id == SpaceId::Data
5539        {
5540            let prev_remaining = buf.remaining_mut();
5541            match self.datagrams.write(buf) {
5542                true => {
5543                    sent_datagrams = true;
5544                    sent.non_retransmits = true;
5545                    self.stats.frame_tx.datagram += 1;
5546                    qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5547                }
5548                false => break,
5549            }
5550        }
5551        if self.datagrams.send_blocked && sent_datagrams {
5552            self.events.push_back(Event::DatagramsUnblocked);
5553            self.datagrams.send_blocked = false;
5554        }
5555
5556        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5557
5558        // NEW_TOKEN
5559        while let Some(remote_addr) = space.pending.new_tokens.pop() {
5560            if path_exclusive_only {
5561                break;
5562            }
5563            debug_assert_eq!(space_id, SpaceId::Data);
5564            let ConnectionSide::Server { server_config } = &self.side else {
5565                panic!("NEW_TOKEN frames should not be enqueued by clients");
5566            };
5567
5568            if remote_addr != path.remote {
5569                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
5570                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
5571                // frames upon an path change. Instead, when the new path becomes validated,
5572                // NEW_TOKEN frames may be enqueued for the new path instead.
5573                continue;
5574            }
5575
5576            let token = Token::new(
5577                TokenPayload::Validation {
5578                    ip: remote_addr.ip(),
5579                    issued: server_config.time_source.now(),
5580                },
5581                &mut self.rng,
5582            );
5583            let new_token = NewToken {
5584                token: token.encode(&*server_config.token_key).into(),
5585            };
5586
5587            if buf.remaining_mut() < new_token.size() {
5588                space.pending.new_tokens.push(remote_addr);
5589                break;
5590            }
5591
5592            trace!("NEW_TOKEN");
5593            new_token.encode(buf);
5594            qlog.frame(&Frame::NewToken(new_token));
5595            sent.retransmits
5596                .get_or_create()
5597                .new_tokens
5598                .push(remote_addr);
5599            self.stats.frame_tx.new_token += 1;
5600        }
5601
5602        // STREAM
5603        if !path_exclusive_only && space_id == SpaceId::Data {
5604            sent.stream_frames =
5605                self.streams
5606                    .write_stream_frames(buf, self.config.send_fairness, qlog);
5607            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5608        }
5609
5610        // ADD_ADDRESS
5611        // TODO(@divma): check if we need to do path exclusive filters
5612        while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5613            if let Some(added_address) = space.pending.add_address.pop_last() {
5614                trace!(
5615                    seq = %added_address.seq_no,
5616                    ip = ?added_address.ip,
5617                    port = added_address.port,
5618                    "ADD_ADDRESS",
5619                );
5620                added_address.write(buf);
5621                sent.retransmits
5622                    .get_or_create()
5623                    .add_address
5624                    .insert(added_address);
5625                self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5626                qlog.frame(&Frame::AddAddress(added_address));
5627            } else {
5628                break;
5629            }
5630        }
5631
5632        // REMOVE_ADDRESS
5633        while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5634            if let Some(removed_address) = space.pending.remove_address.pop_last() {
5635                trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5636                removed_address.write(buf);
5637                sent.retransmits
5638                    .get_or_create()
5639                    .remove_address
5640                    .insert(removed_address);
5641                self.stats.frame_tx.remove_address =
5642                    self.stats.frame_tx.remove_address.saturating_add(1);
5643                qlog.frame(&Frame::RemoveAddress(removed_address));
5644            } else {
5645                break;
5646            }
5647        }
5648
5649        sent
5650    }
5651
5652    /// Write pending ACKs into a buffer
5653    fn populate_acks(
5654        now: Instant,
5655        receiving_ecn: bool,
5656        sent: &mut SentFrames,
5657        path_id: PathId,
5658        space_id: SpaceId,
5659        space: &mut PacketSpace,
5660        is_multipath_negotiated: bool,
5661        buf: &mut impl BufMut,
5662        stats: &mut ConnectionStats,
5663        #[allow(unused)] qlog: &mut QlogSentPacket,
5664    ) {
5665        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
5666        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5667
5668        debug_assert!(
5669            is_multipath_negotiated || path_id == PathId::ZERO,
5670            "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5671        );
5672        if is_multipath_negotiated {
5673            debug_assert!(
5674                space_id == SpaceId::Data || path_id == PathId::ZERO,
5675                "path acks must be sent in 1RTT space (have {space_id:?})"
5676            );
5677        }
5678
5679        let pns = space.for_path(path_id);
5680        let ranges = pns.pending_acks.ranges();
5681        debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5682        let ecn = if receiving_ecn {
5683            Some(&pns.ecn_counters)
5684        } else {
5685            None
5686        };
5687        if let Some(max) = ranges.max() {
5688            sent.largest_acked.insert(path_id, max);
5689        }
5690
5691        let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5692        // TODO: This should come from `TransportConfig` if that gets configurable.
5693        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5694        let delay = delay_micros >> ack_delay_exp.into_inner();
5695
5696        if is_multipath_negotiated && space_id == SpaceId::Data {
5697            if !ranges.is_empty() {
5698                trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5699                frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5700                qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5701                stats.frame_tx.path_acks += 1;
5702            }
5703        } else {
5704            trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5705            frame::Ack::encode(delay as _, ranges, ecn, buf);
5706            stats.frame_tx.acks += 1;
5707            qlog.frame_ack(delay, ranges, ecn);
5708        }
5709    }
5710
5711    fn close_common(&mut self) {
5712        trace!("connection closed");
5713        self.timers.reset();
5714    }
5715
5716    fn set_close_timer(&mut self, now: Instant) {
5717        // QUIC-MULTIPATH § 2.6 Connection Closure: draining for 3*PTO with PTO the max of
5718        // the PTO for all paths.
5719        let pto_max = self.pto_max_path(self.highest_space, true);
5720        self.timers.set(
5721            Timer::Conn(ConnTimer::Close),
5722            now + 3 * pto_max,
5723            self.qlog.with_time(now),
5724        );
5725    }
5726
5727    /// Handle transport parameters received from the peer
5728    ///
5729    /// *rem_cid* and *loc_cid* are the source and destination CIDs respectively of the
5730    /// *packet into which the transport parameters arrived.
5731    fn handle_peer_params(
5732        &mut self,
5733        params: TransportParameters,
5734        loc_cid: ConnectionId,
5735        rem_cid: ConnectionId,
5736        now: Instant,
5737    ) -> Result<(), TransportError> {
5738        if Some(self.orig_rem_cid) != params.initial_src_cid
5739            || (self.side.is_client()
5740                && (Some(self.initial_dst_cid) != params.original_dst_cid
5741                    || self.retry_src_cid != params.retry_src_cid))
5742        {
5743            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5744                "CID authentication failure",
5745            ));
5746        }
5747        if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5748            return Err(TransportError::PROTOCOL_VIOLATION(
5749                "multipath must not use zero-length CIDs",
5750            ));
5751        }
5752
5753        self.set_peer_params(params);
5754        self.qlog.emit_peer_transport_params_received(self, now);
5755
5756        Ok(())
5757    }
5758
5759    fn set_peer_params(&mut self, params: TransportParameters) {
5760        self.streams.set_params(&params);
5761        self.idle_timeout =
5762            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5763        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5764
5765        if let Some(ref info) = params.preferred_address {
5766            // During the handshake PathId::ZERO exists.
5767            self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5768                path_id: None,
5769                sequence: 1,
5770                id: info.connection_id,
5771                reset_token: info.stateless_reset_token,
5772                retire_prior_to: 0,
5773            })
5774            .expect(
5775                "preferred address CID is the first received, and hence is guaranteed to be legal",
5776            );
5777            let remote = self.path_data(PathId::ZERO).remote;
5778            self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5779        }
5780        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
5781
5782        let mut multipath_enabled = None;
5783        if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5784            self.config.get_initial_max_path_id(),
5785            params.initial_max_path_id,
5786        ) {
5787            // multipath is enabled, register the local and remote maximums
5788            self.local_max_path_id = local_max_path_id;
5789            self.remote_max_path_id = remote_max_path_id;
5790            let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5791            debug!(%initial_max_path_id, "multipath negotiated");
5792            multipath_enabled = Some(initial_max_path_id);
5793        }
5794
5795        if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5796            self.config
5797                .max_remote_nat_traversal_addresses
5798                .zip(params.max_remote_nat_traversal_addresses)
5799        {
5800            if let Some(max_initial_paths) =
5801                multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5802            {
5803                let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5804                let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5805                self.iroh_hp =
5806                    iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5807                debug!(
5808                    %max_remote_addresses, %max_local_addresses,
5809                    "iroh hole punching negotiated"
5810                );
5811
5812                match self.side() {
5813                    Side::Client => {
5814                        if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5815                            // in this case the client might try to open `max_remote_addresses` new
5816                            // paths, but the current multipath configuration will not allow it
5817                            warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5818                        } else if max_local_addresses as u64
5819                            > params.active_connection_id_limit.into_inner()
5820                        {
5821                            // the server allows us to send at most `params.active_connection_id_limit`
5822                            // but they might need at least `max_local_addresses` to effectively send
5823                            // `PATH_CHALLENGE` frames to each advertised local address
5824                            warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5825                        }
5826                    }
5827                    Side::Server => {
5828                        if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5829                            warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5830                        }
5831                    }
5832                }
5833            } else {
5834                debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5835            }
5836        }
5837
5838        self.peer_params = params;
5839        let peer_max_udp_payload_size =
5840            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5841        self.path_data_mut(PathId::ZERO)
5842            .mtud
5843            .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5844    }
5845
5846    /// Decrypts a packet, returning the packet number on success
5847    fn decrypt_packet(
5848        &mut self,
5849        now: Instant,
5850        path_id: PathId,
5851        packet: &mut Packet,
5852    ) -> Result<Option<u64>, Option<TransportError>> {
5853        let result = packet_crypto::decrypt_packet_body(
5854            packet,
5855            path_id,
5856            &self.spaces,
5857            self.zero_rtt_crypto.as_ref(),
5858            self.key_phase,
5859            self.prev_crypto.as_ref(),
5860            self.next_crypto.as_ref(),
5861        )?;
5862
5863        let result = match result {
5864            Some(r) => r,
5865            None => return Ok(None),
5866        };
5867
5868        if result.outgoing_key_update_acked {
5869            if let Some(prev) = self.prev_crypto.as_mut() {
5870                prev.end_packet = Some((result.number, now));
5871                self.set_key_discard_timer(now, packet.header.space());
5872            }
5873        }
5874
5875        if result.incoming_key_update {
5876            trace!("key update authenticated");
5877            self.update_keys(Some((result.number, now)), true);
5878            self.set_key_discard_timer(now, packet.header.space());
5879        }
5880
5881        Ok(Some(result.number))
5882    }
5883
5884    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5885        trace!("executing key update");
5886        // Generate keys for the key phase after the one we're switching to, store them in
5887        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
5888        // `prev_crypto`.
5889        let new = self
5890            .crypto
5891            .next_1rtt_keys()
5892            .expect("only called for `Data` packets");
5893        self.key_phase_size = new
5894            .local
5895            .confidentiality_limit()
5896            .saturating_sub(KEY_UPDATE_MARGIN);
5897        let old = mem::replace(
5898            &mut self.spaces[SpaceId::Data]
5899                .crypto
5900                .as_mut()
5901                .unwrap() // safe because update_keys() can only be triggered by short packets
5902                .packet,
5903            mem::replace(self.next_crypto.as_mut().unwrap(), new),
5904        );
5905        self.spaces[SpaceId::Data]
5906            .iter_paths_mut()
5907            .for_each(|s| s.sent_with_keys = 0);
5908        self.prev_crypto = Some(PrevCrypto {
5909            crypto: old,
5910            end_packet,
5911            update_unacked: remote,
5912        });
5913        self.key_phase = !self.key_phase;
5914    }
5915
5916    fn peer_supports_ack_frequency(&self) -> bool {
5917        self.peer_params.min_ack_delay.is_some()
5918    }
5919
5920    /// Send an IMMEDIATE_ACK frame to the remote endpoint
5921    ///
5922    /// According to the spec, this will result in an error if the remote endpoint does not support
5923    /// the Acknowledgement Frequency extension
5924    pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5925        debug_assert_eq!(
5926            self.highest_space,
5927            SpaceId::Data,
5928            "immediate ack must be written in the data space"
5929        );
5930        self.spaces[self.highest_space]
5931            .for_path(path_id)
5932            .immediate_ack_pending = true;
5933    }
5934
5935    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
5936    #[cfg(test)]
5937    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5938        let (path_id, first_decode, remaining) = match &event.0 {
5939            ConnectionEventInner::Datagram(DatagramConnectionEvent {
5940                path_id,
5941                first_decode,
5942                remaining,
5943                ..
5944            }) => (path_id, first_decode, remaining),
5945            _ => return None,
5946        };
5947
5948        if remaining.is_some() {
5949            panic!("Packets should never be coalesced in tests");
5950        }
5951
5952        let decrypted_header = packet_crypto::unprotect_header(
5953            first_decode.clone(),
5954            &self.spaces,
5955            self.zero_rtt_crypto.as_ref(),
5956            self.peer_params.stateless_reset_token,
5957        )?;
5958
5959        let mut packet = decrypted_header.packet?;
5960        packet_crypto::decrypt_packet_body(
5961            &mut packet,
5962            *path_id,
5963            &self.spaces,
5964            self.zero_rtt_crypto.as_ref(),
5965            self.key_phase,
5966            self.prev_crypto.as_ref(),
5967            self.next_crypto.as_ref(),
5968        )
5969        .ok()?;
5970
5971        Some(packet.payload.to_vec())
5972    }
5973
5974    /// The number of bytes of packets containing retransmittable frames that have not been
5975    /// acknowledged or declared lost.
5976    #[cfg(test)]
5977    pub(crate) fn bytes_in_flight(&self) -> u64 {
5978        // TODO(@divma): consider including for multipath?
5979        self.path_data(PathId::ZERO).in_flight.bytes
5980    }
5981
5982    /// Number of bytes worth of non-ack-only packets that may be sent
5983    #[cfg(test)]
5984    pub(crate) fn congestion_window(&self) -> u64 {
5985        let path = self.path_data(PathId::ZERO);
5986        path.congestion
5987            .window()
5988            .saturating_sub(path.in_flight.bytes)
5989    }
5990
5991    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
5992    #[cfg(test)]
5993    pub(crate) fn is_idle(&self) -> bool {
5994        let current_timers = self.timers.values();
5995        current_timers
5996            .into_iter()
5997            .filter(|(timer, _)| {
5998                !matches!(
5999                    timer,
6000                    Timer::Conn(ConnTimer::KeepAlive)
6001                        | Timer::PerPath(_, PathTimer::PathKeepAlive)
6002                        | Timer::Conn(ConnTimer::PushNewCid)
6003                        | Timer::Conn(ConnTimer::KeyDiscard)
6004                )
6005            })
6006            .min_by_key(|(_, time)| *time)
6007            .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6008    }
6009
6010    /// Whether explicit congestion notification is in use on outgoing packets.
6011    #[cfg(test)]
6012    pub(crate) fn using_ecn(&self) -> bool {
6013        self.path_data(PathId::ZERO).sending_ecn
6014    }
6015
6016    /// The number of received bytes in the current path
6017    #[cfg(test)]
6018    pub(crate) fn total_recvd(&self) -> u64 {
6019        self.path_data(PathId::ZERO).total_recvd
6020    }
6021
6022    #[cfg(test)]
6023    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6024        self.local_cid_state
6025            .get(&PathId::ZERO)
6026            .unwrap()
6027            .active_seq()
6028    }
6029
6030    #[cfg(test)]
6031    #[track_caller]
6032    pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6033        self.local_cid_state
6034            .get(&PathId(path_id))
6035            .unwrap()
6036            .active_seq()
6037    }
6038
6039    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
6040    /// with updated `retire_prior_to` field set to `v`
6041    #[cfg(test)]
6042    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6043        let n = self
6044            .local_cid_state
6045            .get_mut(&PathId::ZERO)
6046            .unwrap()
6047            .assign_retire_seq(v);
6048        self.endpoint_events
6049            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6050    }
6051
6052    /// Check the current active remote CID sequence for `PathId::ZERO`
6053    #[cfg(test)]
6054    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6055        self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6056    }
6057
6058    /// Returns the detected maximum udp payload size for the current path
6059    #[cfg(test)]
6060    pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6061        self.path_data(path_id).current_mtu()
6062    }
6063
6064    /// Triggers path validation on all paths
6065    #[cfg(test)]
6066    pub(crate) fn trigger_path_validation(&mut self) {
6067        for path in self.paths.values_mut() {
6068            path.data.send_new_challenge = true;
6069        }
6070    }
6071
6072    /// Whether we have 1-RTT data to send
6073    ///
6074    /// This checks for frames that can only be sent in the data space (1-RTT):
6075    /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated.
6076    /// - Pending PATH_RESPONSE frames.
6077    /// - Pending data to send in STREAM frames.
6078    /// - Pending DATAGRAM frames to send.
6079    ///
6080    /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that
6081    /// may need to be sent.
6082    fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6083        let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6084            path.data.send_new_challenge
6085                || path
6086                    .prev
6087                    .as_ref()
6088                    .is_some_and(|(_, path)| path.send_new_challenge)
6089                || !path.data.path_responses.is_empty()
6090        });
6091        let other = self.streams.can_send_stream_data()
6092            || self
6093                .datagrams
6094                .outgoing
6095                .front()
6096                .is_some_and(|x| x.size(true) <= max_size);
6097        SendableFrames {
6098            acks: false,
6099            other,
6100            close: false,
6101            path_exclusive,
6102        }
6103    }
6104
6105    /// Terminate the connection instantly, without sending a close packet
6106    fn kill(&mut self, reason: ConnectionError) {
6107        self.close_common();
6108        self.state.move_to_drained(Some(reason));
6109        self.endpoint_events.push_back(EndpointEventInner::Drained);
6110    }
6111
6112    /// Storage size required for the largest packet that can be transmitted on all currently
6113    /// available paths
6114    ///
6115    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
6116    ///
6117    /// When multipath is enabled, this value is the minimum MTU across all available paths.
6118    pub fn current_mtu(&self) -> u16 {
6119        self.paths
6120            .iter()
6121            .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6122            .map(|(_path_id, path_state)| path_state.data.current_mtu())
6123            .min()
6124            .expect("There is always at least one available path")
6125    }
6126
6127    /// Size of non-frame data for a 1-RTT packet
6128    ///
6129    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
6130    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
6131    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
6132    /// latency and packet loss.
6133    fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6134        let pn_len = PacketNumber::new(
6135            pn,
6136            self.spaces[SpaceId::Data]
6137                .for_path(path)
6138                .largest_acked_packet
6139                .unwrap_or(0),
6140        )
6141        .len();
6142
6143        // 1 byte for flags
6144        1 + self
6145            .rem_cids
6146            .get(&path)
6147            .map(|cids| cids.active().len())
6148            .unwrap_or(20)      // Max CID len in QUIC v1
6149            + pn_len
6150            + self.tag_len_1rtt()
6151    }
6152
6153    fn predict_1rtt_overhead_no_pn(&self) -> usize {
6154        let pn_len = 4;
6155
6156        let cid_len = self
6157            .rem_cids
6158            .values()
6159            .map(|cids| cids.active().len())
6160            .max()
6161            .unwrap_or(20); // Max CID len in QUIC v1
6162
6163        // 1 byte for flags
6164        1 + cid_len + pn_len + self.tag_len_1rtt()
6165    }
6166
6167    fn tag_len_1rtt(&self) -> usize {
6168        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6169            Some(crypto) => Some(&*crypto.packet.local),
6170            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6171        };
6172        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
6173        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
6174        // but that would needlessly prevent sending datagrams during 0-RTT.
6175        key.map_or(16, |x| x.tag_len())
6176    }
6177
6178    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
6179    fn on_path_validated(&mut self, path_id: PathId) {
6180        self.path_data_mut(path_id).validated = true;
6181        let ConnectionSide::Server { server_config } = &self.side else {
6182            return;
6183        };
6184        let remote_addr = self.path_data(path_id).remote;
6185        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6186        new_tokens.clear();
6187        for _ in 0..server_config.validation_token.sent {
6188            new_tokens.push(remote_addr);
6189        }
6190    }
6191
6192    /// Handle new path status information: PATH_STATUS_AVAILABLE, PATH_STATUS_BACKUP
6193    fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6194        if let Some(path) = self.paths.get_mut(&path_id) {
6195            path.data.status.remote_update(status, status_seq_no);
6196        } else {
6197            debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6198        }
6199        self.events.push_back(
6200            PathEvent::RemoteStatus {
6201                id: path_id,
6202                status,
6203            }
6204            .into(),
6205        );
6206    }
6207
6208    /// Returns the maximum [`PathId`] to be used for sending in this connection.
6209    ///
6210    /// This is calculated as minimum between the local and remote's maximums when multipath is
6211    /// enabled, or `None` when disabled.
6212    ///
6213    /// For data that's received, we should use [`Self::local_max_path_id`] instead.
6214    /// The reasoning is that the remote might already have updated to its own newer
6215    /// [`Self::max_path_id`] after sending out a `MAX_PATH_ID` frame, but it got re-ordered.
6216    fn max_path_id(&self) -> Option<PathId> {
6217        if self.is_multipath_negotiated() {
6218            Some(self.remote_max_path_id.min(self.local_max_path_id))
6219        } else {
6220            None
6221        }
6222    }
6223
6224    /// Add addresses the local endpoint considers are reachable for nat traversal
6225    pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6226        if let Some(added) = self.iroh_hp.add_local_address(address)? {
6227            self.spaces[SpaceId::Data].pending.add_address.insert(added);
6228        };
6229        Ok(())
6230    }
6231
6232    /// Removes an address the endpoing no longer considers reachable for nat traversal
6233    ///
6234    /// Addresses not present in the set will be silently ignored.
6235    pub fn remove_nat_traversal_address(
6236        &mut self,
6237        address: SocketAddr,
6238    ) -> Result<(), iroh_hp::Error> {
6239        if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6240            self.spaces[SpaceId::Data]
6241                .pending
6242                .remove_address
6243                .insert(removed);
6244        }
6245        Ok(())
6246    }
6247
6248    /// Get the current local nat traversal addresses
6249    pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6250        self.iroh_hp.get_local_nat_traversal_addresses()
6251    }
6252
6253    /// Get the currently advertised nat traversal addresses by the server
6254    pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6255        Ok(self
6256            .iroh_hp
6257            .client_side()?
6258            .get_remote_nat_traversal_addresses())
6259    }
6260
6261    /// Attempts to open a path for nat traversal.
6262    ///
6263    /// `ipv6` indicates if the path should be opened using an IPV6 remote. If the address is
6264    /// ignored, it will return `None`.
6265    ///
6266    /// On success returns the [`PathId`] and remote address of the path, as well as whether the path
6267    /// existed for the adjusted remote.
6268    fn open_nat_traversal_path(
6269        &mut self,
6270        now: Instant,
6271        (ip, port): (IpAddr, u16),
6272        ipv6: bool,
6273    ) -> Result<Option<(PathId, SocketAddr, bool)>, PathError> {
6274        // If this endpoint is an IPv6 endpoint we use IPv6 addresses for all remotes.
6275        let remote = match ip {
6276            IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6277            IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6278            IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6279            IpAddr::V6(_) => {
6280                trace!("not using IPv6 nat candidate for IPv4 socket");
6281                return Ok(None);
6282            }
6283        };
6284        match self.open_path_ensure(remote, PathStatus::Backup, now) {
6285            Ok((path_id, path_was_known)) => {
6286                if path_was_known {
6287                    trace!(%path_id, %remote, "nat traversal: path existed for remote");
6288                }
6289                Ok(Some((path_id, remote, path_was_known)))
6290            }
6291            Err(e) => {
6292                debug!(%remote, %e, "nat traversal: failed to probe remote");
6293                Err(e)
6294            }
6295        }
6296    }
6297
6298    /// Initiates a new nat traversal round
6299    ///
6300    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
6301    /// frames, and initiating probing of the known remote addresses. When a new round is
6302    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
6303    ///
6304    /// Returns the server addresses that are now being probed.
6305    /// If addresses fail due to spurious errors, these might succeed later and not be returned in
6306    /// this set.
6307    pub fn initiate_nat_traversal_round(
6308        &mut self,
6309        now: Instant,
6310    ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6311        if self.state.is_closed() {
6312            return Err(iroh_hp::Error::Closed);
6313        }
6314
6315        let client_state = self.iroh_hp.client_side_mut()?;
6316        let iroh_hp::NatTraversalRound {
6317            new_round,
6318            reach_out_at,
6319            addresses_to_probe,
6320            prev_round_path_ids,
6321        } = client_state.initiate_nat_traversal_round()?;
6322
6323        self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6324
6325        for path_id in prev_round_path_ids {
6326            // TODO(@divma): this sounds reasonable but we need if this actually works for the
6327            // purposes of the protocol
6328            let validated = self
6329                .path(path_id)
6330                .map(|path| path.validated)
6331                .unwrap_or(false);
6332
6333            if !validated {
6334                let _ = self.close_path(
6335                    now,
6336                    path_id,
6337                    TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6338                );
6339            }
6340        }
6341
6342        let mut err = None;
6343
6344        let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6345        let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6346        let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6347
6348        for (id, address) in addresses_to_probe {
6349            match self.open_nat_traversal_path(now, address, ipv6) {
6350                Ok(None) => {}
6351                Ok(Some((path_id, remote, path_was_known))) => {
6352                    if !path_was_known {
6353                        path_ids.push(path_id);
6354                        probed_addresses.push(remote);
6355                    }
6356                }
6357                Err(e) => {
6358                    self.iroh_hp
6359                        .client_side_mut()
6360                        .expect("validated")
6361                        .report_in_continuation(id, e);
6362                    err.get_or_insert(e);
6363                }
6364            }
6365        }
6366
6367        if let Some(err) = err {
6368            // We failed to probe any addresses, bail out
6369            if probed_addresses.is_empty() {
6370                return Err(iroh_hp::Error::Multipath(err));
6371            }
6372        }
6373
6374        self.iroh_hp
6375            .client_side_mut()
6376            .expect("connection side validated")
6377            .set_round_path_ids(path_ids);
6378
6379        Ok(probed_addresses)
6380    }
6381
6382    /// Attempts to continue a nat traversal round by trying to open paths for pending client probes.
6383    ///
6384    /// If there was nothing to do, it returns `None`. Otherwise it returns whether the path was
6385    /// successfully open.
6386    fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6387        let client_state = self.iroh_hp.client_side_mut().ok()?;
6388        let (id, address) = client_state.continue_nat_traversal_round()?;
6389        let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6390        let open_result = self.open_nat_traversal_path(now, address, ipv6);
6391        let client_state = self.iroh_hp.client_side_mut().expect("validated");
6392        match open_result {
6393            Ok(None) => Some(true),
6394            Ok(Some((path_id, _remote, path_was_known))) => {
6395                if !path_was_known {
6396                    client_state.add_round_path_id(path_id);
6397                }
6398                Some(true)
6399            }
6400            Err(e) => {
6401                client_state.report_in_continuation(id, e);
6402                Some(false)
6403            }
6404        }
6405    }
6406}
6407
6408impl fmt::Debug for Connection {
6409    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6410        f.debug_struct("Connection")
6411            .field("handshake_cid", &self.handshake_cid)
6412            .finish()
6413    }
6414}
6415
6416#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6417enum PathBlocked {
6418    No,
6419    AntiAmplification,
6420    Congestion,
6421    Pacing,
6422}
6423
6424/// Fields of `Connection` specific to it being client-side or server-side
6425enum ConnectionSide {
6426    Client {
6427        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
6428        token: Bytes,
6429        token_store: Arc<dyn TokenStore>,
6430        server_name: String,
6431    },
6432    Server {
6433        server_config: Arc<ServerConfig>,
6434    },
6435}
6436
6437impl ConnectionSide {
6438    fn remote_may_migrate(&self, state: &State) -> bool {
6439        match self {
6440            Self::Server { server_config } => server_config.migration,
6441            Self::Client { .. } => {
6442                if let Some(hs) = state.as_handshake() {
6443                    hs.allow_server_migration
6444                } else {
6445                    false
6446                }
6447            }
6448        }
6449    }
6450
6451    fn is_client(&self) -> bool {
6452        self.side().is_client()
6453    }
6454
6455    fn is_server(&self) -> bool {
6456        self.side().is_server()
6457    }
6458
6459    fn side(&self) -> Side {
6460        match *self {
6461            Self::Client { .. } => Side::Client,
6462            Self::Server { .. } => Side::Server,
6463        }
6464    }
6465}
6466
6467impl From<SideArgs> for ConnectionSide {
6468    fn from(side: SideArgs) -> Self {
6469        match side {
6470            SideArgs::Client {
6471                token_store,
6472                server_name,
6473            } => Self::Client {
6474                token: token_store.take(&server_name).unwrap_or_default(),
6475                token_store,
6476                server_name,
6477            },
6478            SideArgs::Server {
6479                server_config,
6480                pref_addr_cid: _,
6481                path_validated: _,
6482            } => Self::Server { server_config },
6483        }
6484    }
6485}
6486
6487/// Parameters to `Connection::new` specific to it being client-side or server-side
6488pub(crate) enum SideArgs {
6489    Client {
6490        token_store: Arc<dyn TokenStore>,
6491        server_name: String,
6492    },
6493    Server {
6494        server_config: Arc<ServerConfig>,
6495        pref_addr_cid: Option<ConnectionId>,
6496        path_validated: bool,
6497    },
6498}
6499
6500impl SideArgs {
6501    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6502        match *self {
6503            Self::Client { .. } => None,
6504            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6505        }
6506    }
6507
6508    pub(crate) fn path_validated(&self) -> bool {
6509        match *self {
6510            Self::Client { .. } => true,
6511            Self::Server { path_validated, .. } => path_validated,
6512        }
6513    }
6514
6515    pub(crate) fn side(&self) -> Side {
6516        match *self {
6517            Self::Client { .. } => Side::Client,
6518            Self::Server { .. } => Side::Server,
6519        }
6520    }
6521}
6522
6523/// Reasons why a connection might be lost
6524#[derive(Debug, Error, Clone, PartialEq, Eq)]
6525pub enum ConnectionError {
6526    /// The peer doesn't implement any supported version
6527    #[error("peer doesn't implement any supported version")]
6528    VersionMismatch,
6529    /// The peer violated the QUIC specification as understood by this implementation
6530    #[error(transparent)]
6531    TransportError(#[from] TransportError),
6532    /// The peer's QUIC stack aborted the connection automatically
6533    #[error("aborted by peer: {0}")]
6534    ConnectionClosed(frame::ConnectionClose),
6535    /// The peer closed the connection
6536    #[error("closed by peer: {0}")]
6537    ApplicationClosed(frame::ApplicationClose),
6538    /// The peer is unable to continue processing this connection, usually due to having restarted
6539    #[error("reset by peer")]
6540    Reset,
6541    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
6542    ///
6543    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
6544    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
6545    /// and [`TransportConfig::keep_alive_interval()`].
6546    #[error("timed out")]
6547    TimedOut,
6548    /// The local application closed the connection
6549    #[error("closed")]
6550    LocallyClosed,
6551    /// The connection could not be created because not enough of the CID space is available
6552    ///
6553    /// Try using longer connection IDs.
6554    #[error("CIDs exhausted")]
6555    CidsExhausted,
6556}
6557
6558impl From<Close> for ConnectionError {
6559    fn from(x: Close) -> Self {
6560        match x {
6561            Close::Connection(reason) => Self::ConnectionClosed(reason),
6562            Close::Application(reason) => Self::ApplicationClosed(reason),
6563        }
6564    }
6565}
6566
6567// For compatibility with API consumers
6568impl From<ConnectionError> for io::Error {
6569    fn from(x: ConnectionError) -> Self {
6570        use ConnectionError::*;
6571        let kind = match x {
6572            TimedOut => io::ErrorKind::TimedOut,
6573            Reset => io::ErrorKind::ConnectionReset,
6574            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6575            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6576                io::ErrorKind::Other
6577            }
6578        };
6579        Self::new(kind, x)
6580    }
6581}
6582
6583/// Errors that might trigger a path being closed
6584// TODO(@divma): maybe needs to be reworked based on what we want to do with the public API
6585#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6586pub enum PathError {
6587    /// The extension was not negotiated with the peer
6588    #[error("multipath extension not negotiated")]
6589    MultipathNotNegotiated,
6590    /// Paths can only be opened client-side
6591    #[error("the server side may not open a path")]
6592    ServerSideNotAllowed,
6593    /// Current limits do not allow us to open more paths
6594    #[error("maximum number of concurrent paths reached")]
6595    MaxPathIdReached,
6596    /// No remote CIDs available to open a new path
6597    #[error("remoted CIDs exhausted")]
6598    RemoteCidsExhausted,
6599    /// Path could not be validated and will be abandoned
6600    #[error("path validation failed")]
6601    ValidationFailed,
6602    /// The remote address for the path is not supported by the endpoint
6603    #[error("invalid remote address")]
6604    InvalidRemoteAddress(SocketAddr),
6605}
6606
6607/// Errors triggered when abandoning a path
6608#[derive(Debug, Error, Clone, Eq, PartialEq)]
6609pub enum ClosePathError {
6610    /// The path is already closed or was never opened
6611    #[error("closed path")]
6612    ClosedPath,
6613    /// This is the last path, which can not be abandoned
6614    #[error("last open path")]
6615    LastOpenPath,
6616}
6617
6618#[derive(Debug, Error, Clone, Copy)]
6619#[error("Multipath extension not negotiated")]
6620pub struct MultipathNotNegotiated {
6621    _private: (),
6622}
6623
6624/// Events of interest to the application
6625#[derive(Debug)]
6626pub enum Event {
6627    /// The connection's handshake data is ready
6628    HandshakeDataReady,
6629    /// The connection was successfully established
6630    Connected,
6631    /// The TLS handshake was confirmed
6632    HandshakeConfirmed,
6633    /// The connection was lost
6634    ///
6635    /// Emitted if the peer closes the connection or an error is encountered.
6636    ConnectionLost {
6637        /// Reason that the connection was closed
6638        reason: ConnectionError,
6639    },
6640    /// Stream events
6641    Stream(StreamEvent),
6642    /// One or more application datagrams have been received
6643    DatagramReceived,
6644    /// One or more application datagrams have been sent after blocking
6645    DatagramsUnblocked,
6646    /// (Multi)Path events
6647    Path(PathEvent),
6648    /// Iroh's nat traversal events
6649    NatTraversal(iroh_hp::Event),
6650}
6651
6652impl From<PathEvent> for Event {
6653    fn from(source: PathEvent) -> Self {
6654        Self::Path(source)
6655    }
6656}
6657
6658fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6659    Duration::from_micros(params.max_ack_delay.0 * 1000)
6660}
6661
6662// Prevents overflow and improves behavior in extreme circumstances
6663const MAX_BACKOFF_EXPONENT: u32 = 16;
6664
6665/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
6666///
6667/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
6668/// plus some space for frames. We only care about handshake headers because short header packets
6669/// necessarily have smaller headers, and initial packets are only ever the first packet in a
6670/// datagram (because we coalesce in ascending packet space order and the only reason to split a
6671/// packet is when packet space changes).
6672const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6673
6674/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
6675///
6676/// Excludes packet-type-specific fields such as packet number or Initial token
6677// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
6678// scid len + scid + length + pn
6679const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6680    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6681
6682/// Perform key updates this many packets before the AEAD confidentiality limit.
6683///
6684/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
6685const KEY_UPDATE_MARGIN: u64 = 10_000;
6686
6687#[derive(Default)]
6688struct SentFrames {
6689    retransmits: ThinRetransmits,
6690    /// The packet number of the largest acknowledged packet for each path
6691    largest_acked: FxHashMap<PathId, u64>,
6692    stream_frames: StreamMetaVec,
6693    /// Whether the packet contains non-retransmittable frames (like datagrams)
6694    non_retransmits: bool,
6695    /// If the datagram containing these frames should be padded to the min MTU
6696    requires_padding: bool,
6697}
6698
6699impl SentFrames {
6700    /// Returns whether the packet contains only ACKs
6701    fn is_ack_only(&self, streams: &StreamsState) -> bool {
6702        !self.largest_acked.is_empty()
6703            && !self.non_retransmits
6704            && self.stream_frames.is_empty()
6705            && self.retransmits.is_empty(streams)
6706    }
6707}
6708
6709/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
6710///
6711/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
6712///
6713/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
6714///
6715/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
6716fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6717    match (x, y) {
6718        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6719        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6720        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6721        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6722    }
6723}
6724
6725#[cfg(test)]
6726mod tests {
6727    use super::*;
6728
6729    #[test]
6730    fn negotiate_max_idle_timeout_commutative() {
6731        let test_params = [
6732            (None, None, None),
6733            (None, Some(VarInt(0)), None),
6734            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6735            (Some(VarInt(0)), Some(VarInt(0)), None),
6736            (
6737                Some(VarInt(2)),
6738                Some(VarInt(0)),
6739                Some(Duration::from_millis(2)),
6740            ),
6741            (
6742                Some(VarInt(1)),
6743                Some(VarInt(4)),
6744                Some(Duration::from_millis(1)),
6745            ),
6746        ];
6747
6748        for (left, right, result) in test_params {
6749            assert_eq!(negotiate_max_idle_timeout(left, right), result);
6750            assert_eq!(negotiate_max_idle_timeout(right, left), result);
6751        }
6752    }
6753}