iroh_quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque, btree_map},
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    num::{NonZeroU32, NonZeroUsize},
8    ops::Not,
9    sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21    Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22    MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23    TransportErrorCode, VarInt,
24    cid_generator::ConnectionIdGenerator,
25    cid_queue::CidQueue,
26    coding::{BufMutExt, Encodable},
27    config::{ServerConfig, TransportConfig},
28    congestion::Controller,
29    connection::{
30        qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31        spaces::LostPacket,
32        timer::{ConnTimer, PathTimer},
33    },
34    crypto::{self, KeyPair, Keys, PacketKey},
35    frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36    iroh_hp,
37    packet::{
38        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39        PacketNumber, PartialDecode, SpaceId,
40    },
41    range_set::ArrayRangeSet,
42    shared::{
43        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44        EndpointEvent, EndpointEventInner,
45    },
46    token::{ResetToken, Token, TokenPayload},
47    transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77pub(crate) mod send_buffer;
78
79mod spaces;
80#[cfg(fuzzing)]
81pub use spaces::Retransmits;
82#[cfg(not(fuzzing))]
83use spaces::Retransmits;
84use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
85
86mod stats;
87pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
88
89mod streams;
90#[cfg(fuzzing)]
91pub use streams::StreamsState;
92#[cfg(not(fuzzing))]
93use streams::StreamsState;
94pub use streams::{
95    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
96    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
97};
98
99mod timer;
100use timer::{Timer, TimerTable};
101
102mod transmit_buf;
103use transmit_buf::TransmitBuf;
104
105mod state;
106
107#[cfg(not(fuzzing))]
108use state::State;
109#[cfg(fuzzing)]
110pub use state::State;
111use state::StateType;
112
113/// Protocol state and logic for a single QUIC connection
114///
115/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
116/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
117/// expects timeouts through various methods. A number of simple getter methods are exposed
118/// to allow callers to inspect some of the connection state.
119///
120/// `Connection` has roughly 4 types of methods:
121///
122/// - A. Simple getters, taking `&self`
123/// - B. Handlers for incoming events from the network or system, named `handle_*`.
124/// - C. State machine mutators, for incoming commands from the application. For convenience we
125///   refer to this as "performing I/O" below, however as per the design of this library none of the
126///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
127///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
128/// - D. Polling functions for outgoing events or actions for the caller to
129///   take, named `poll_*`.
130///
131/// The simplest way to use this API correctly is to call (B) and (C) whenever
132/// appropriate, then after each of those calls, as soon as feasible call all
133/// polling methods (D) and deal with their outputs appropriately, e.g. by
134/// passing it to the application or by making a system-level I/O call. You
135/// should call the polling functions in this order:
136///
137/// 1. [`poll_transmit`](Self::poll_transmit)
138/// 2. [`poll_timeout`](Self::poll_timeout)
139/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
140/// 4. [`poll`](Self::poll)
141///
142/// Currently the only actual dependency is from (2) to (1), however additional
143/// dependencies may be added in future, so the above order is recommended.
144///
145/// (A) may be called whenever desired.
146///
147/// Care should be made to ensure that the input events represent monotonically
148/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
149/// with events of the same [`Instant`] may be interleaved in any order with a
150/// call to [`handle_event`](Self::handle_event) at that same instant; however
151/// events or timeouts with different instants must not be interleaved.
152pub struct Connection {
153    endpoint_config: Arc<EndpointConfig>,
154    config: Arc<TransportConfig>,
155    rng: StdRng,
156    crypto: Box<dyn crypto::Session>,
157    /// The CID we initially chose, for use during the handshake
158    handshake_cid: ConnectionId,
159    /// The CID the peer initially chose, for use during the handshake
160    rem_handshake_cid: ConnectionId,
161    /// The "real" local IP address which was was used to receive the initial packet.
162    /// This is only populated for the server case, and if known
163    local_ip: Option<IpAddr>,
164    /// The [`PathData`] for each path
165    ///
166    /// This needs to be ordered because [`Connection::poll_transmit`] needs to
167    /// deterministically select the next PathId to send on.
168    // TODO(flub): well does it really? But deterministic is nice for now.
169    paths: BTreeMap<PathId, PathState>,
170    /// Incremented every time we see a new path
171    ///
172    /// Stored separately from `path.generation` to account for aborted migrations
173    path_counter: u64,
174    /// Whether MTU detection is supported in this environment
175    allow_mtud: bool,
176    state: State,
177    side: ConnectionSide,
178    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
179    zero_rtt_enabled: bool,
180    /// Set if 0-RTT is supported, then cleared when no longer needed.
181    zero_rtt_crypto: Option<ZeroRttCrypto>,
182    key_phase: bool,
183    /// How many packets are in the current key phase. Used only for `Data` space.
184    key_phase_size: u64,
185    /// Transport parameters set by the peer
186    peer_params: TransportParameters,
187    /// Source ConnectionId of the first packet received from the peer
188    orig_rem_cid: ConnectionId,
189    /// Destination ConnectionId sent by the client on the first Initial
190    initial_dst_cid: ConnectionId,
191    /// The value that the server included in the Source Connection ID field of a Retry packet, if
192    /// one was received
193    retry_src_cid: Option<ConnectionId>,
194    /// Events returned by [`Connection::poll`]
195    events: VecDeque<Event>,
196    endpoint_events: VecDeque<EndpointEventInner>,
197    /// Whether the spin bit is in use for this connection
198    spin_enabled: bool,
199    /// Outgoing spin bit state
200    spin: bool,
201    /// Packet number spaces: initial, handshake, 1-RTT
202    spaces: [PacketSpace; 3],
203    /// Highest usable [`SpaceId`]
204    highest_space: SpaceId,
205    /// 1-RTT keys used prior to a key update
206    prev_crypto: Option<PrevCrypto>,
207    /// 1-RTT keys to be used for the next key update
208    ///
209    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
210    /// spoofing key updates.
211    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
212    accepted_0rtt: bool,
213    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
214    permit_idle_reset: bool,
215    /// Negotiated idle timeout
216    idle_timeout: Option<Duration>,
217    timers: TimerTable,
218    /// Number of packets received which could not be authenticated
219    authentication_failures: u64,
220
221    //
222    // Queued non-retransmittable 1-RTT data
223    //
224    /// If the CONNECTION_CLOSE frame needs to be sent
225    close: bool,
226
227    //
228    // ACK frequency
229    //
230    ack_frequency: AckFrequencyState,
231
232    //
233    // Congestion Control
234    //
235    /// Whether the most recently received packet had an ECN codepoint set
236    receiving_ecn: bool,
237    /// Number of packets authenticated
238    total_authed_packets: u64,
239    /// Whether the last `poll_transmit` call yielded no data because there was
240    /// no outgoing application data.
241    app_limited: bool,
242
243    //
244    // ObservedAddr
245    //
246    /// Sequence number for the next observed address frame sent to the peer.
247    next_observed_addr_seq_no: VarInt,
248
249    streams: StreamsState,
250    /// Surplus remote CIDs for future use on new paths
251    ///
252    /// These are given out before multiple paths exist, also for paths that will never
253    /// exist.  So if multipath is supported the number of paths here will be higher than
254    /// the actual number of paths in use.
255    rem_cids: FxHashMap<PathId, CidQueue>,
256    /// Attributes of CIDs generated by local endpoint
257    ///
258    /// Any path that is allowed to be opened is present in this map, as well as the already
259    /// opened paths. However since CIDs are issued async by the endpoint driver via
260    /// connection events it can not be used to know if CIDs have been issued for a path or
261    /// not. See [`Connection::max_path_id_with_cids`] for this.
262    local_cid_state: FxHashMap<PathId, CidState>,
263    /// State of the unreliable datagram extension
264    datagrams: DatagramState,
265    /// Connection level statistics
266    stats: ConnectionStats,
267    /// Path level statistics
268    path_stats: FxHashMap<PathId, PathStats>,
269    /// QUIC version used for the connection.
270    version: u32,
271
272    //
273    // Multipath
274    //
275    /// Maximum number of concurrent paths
276    ///
277    /// Initially set from the [`TransportConfig::max_concurrent_multipath_paths`]. Even
278    /// when multipath is disabled this will be set to 1, it is not used in that case
279    /// though.
280    max_concurrent_paths: NonZeroU32,
281    /// Local maximum [`PathId`] to be used
282    ///
283    /// This is initially set to [`TransportConfig::get_initial_max_path_id`] when multipath
284    /// is negotiated, or to [`PathId::ZERO`] otherwise. This is essentially the value of
285    /// the highest MAX_PATH_ID frame sent.
286    ///
287    /// Any path with an ID equal or below this [`PathId`] is either:
288    ///
289    /// - Abandoned, if it is also in [`Connection::abandoned_paths`].
290    /// - Open, in this case it is present in [`Connection::paths`]
291    /// - Not yet opened, if it is in neither of these two places.
292    ///
293    /// Note that for not-yet-open there may or may not be any CIDs issued. See
294    /// [`Connection::max_path_id_with_cids`].
295    local_max_path_id: PathId,
296    /// Remote's maximum [`PathId`] to be used
297    ///
298    /// This is initially set to the peer's [`TransportParameters::initial_max_path_id`] when
299    /// multipath is negotiated, or to [`PathId::ZERO`] otherwise. A peer may increase this limit
300    /// by sending [`Frame::MaxPathId`] frames.
301    remote_max_path_id: PathId,
302    /// The greatest [`PathId`] we have issued CIDs for
303    ///
304    /// CIDs are only issued for `min(local_max_path_id, remote_max_path_id)`. It is not
305    /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
306    /// since they are issued asynchronously by the endpoint driver.
307    max_path_id_with_cids: PathId,
308    /// The paths already abandoned
309    ///
310    /// They may still have some state left in [`Connection::paths`] or
311    /// [`Connection::local_cid_state`] since some of this has to be kept around for some
312    /// time after a path is abandoned.
313    // TODO(flub): Make this a more efficient data structure.  Like ranges of abandoned
314    //    paths.  Or a set together with a minimum.  Or something.
315    abandoned_paths: FxHashSet<PathId>,
316
317    iroh_hp: iroh_hp::State,
318    qlog: QlogSink,
319}
320
321impl Connection {
322    pub(crate) fn new(
323        endpoint_config: Arc<EndpointConfig>,
324        config: Arc<TransportConfig>,
325        init_cid: ConnectionId,
326        loc_cid: ConnectionId,
327        rem_cid: ConnectionId,
328        remote: SocketAddr,
329        local_ip: Option<IpAddr>,
330        crypto: Box<dyn crypto::Session>,
331        cid_gen: &dyn ConnectionIdGenerator,
332        now: Instant,
333        version: u32,
334        allow_mtud: bool,
335        rng_seed: [u8; 32],
336        side_args: SideArgs,
337        qlog: QlogSink,
338    ) -> Self {
339        let pref_addr_cid = side_args.pref_addr_cid();
340        let path_validated = side_args.path_validated();
341        let connection_side = ConnectionSide::from(side_args);
342        let side = connection_side.side();
343        let mut rng = StdRng::from_seed(rng_seed);
344        let initial_space = {
345            let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
346            space.crypto = Some(crypto.initial_keys(init_cid, side));
347            space
348        };
349        let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
350        #[cfg(test)]
351        let data_space = match config.deterministic_packet_numbers {
352            true => PacketSpace::new_deterministic(now, SpaceId::Data),
353            false => PacketSpace::new(now, SpaceId::Data, &mut rng),
354        };
355        #[cfg(not(test))]
356        let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
357        let state = State::handshake(state::Handshake {
358            rem_cid_set: side.is_server(),
359            expected_token: Bytes::new(),
360            client_hello: None,
361            allow_server_migration: side.is_client(),
362        });
363        let local_cid_state = FxHashMap::from_iter([(
364            PathId::ZERO,
365            CidState::new(
366                cid_gen.cid_len(),
367                cid_gen.cid_lifetime(),
368                now,
369                if pref_addr_cid.is_some() { 2 } else { 1 },
370            ),
371        )]);
372
373        let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
374        // TODO(@divma): consider if we want to delay this until the path is validated
375        path.open = true;
376        let mut this = Self {
377            endpoint_config,
378            crypto,
379            handshake_cid: loc_cid,
380            rem_handshake_cid: rem_cid,
381            local_cid_state,
382            paths: BTreeMap::from_iter([(
383                PathId::ZERO,
384                PathState {
385                    data: path,
386                    prev: None,
387                },
388            )]),
389            path_counter: 0,
390            allow_mtud,
391            local_ip,
392            state,
393            side: connection_side,
394            zero_rtt_enabled: false,
395            zero_rtt_crypto: None,
396            key_phase: false,
397            // A small initial key phase size ensures peers that don't handle key updates correctly
398            // fail sooner rather than later. It's okay for both peers to do this, as the first one
399            // to perform an update will reset the other's key phase size in `update_keys`, and a
400            // simultaneous key update by both is just like a regular key update with a really fast
401            // response. Inspired by quic-go's similar behavior of performing the first key update
402            // at the 100th short-header packet.
403            key_phase_size: rng.random_range(10..1000),
404            peer_params: TransportParameters::default(),
405            orig_rem_cid: rem_cid,
406            initial_dst_cid: init_cid,
407            retry_src_cid: None,
408            events: VecDeque::new(),
409            endpoint_events: VecDeque::new(),
410            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
411            spin: false,
412            spaces: [initial_space, handshake_space, data_space],
413            highest_space: SpaceId::Initial,
414            prev_crypto: None,
415            next_crypto: None,
416            accepted_0rtt: false,
417            permit_idle_reset: true,
418            idle_timeout: match config.max_idle_timeout {
419                None | Some(VarInt(0)) => None,
420                Some(dur) => Some(Duration::from_millis(dur.0)),
421            },
422            timers: TimerTable::default(),
423            authentication_failures: 0,
424            close: false,
425
426            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
427                &TransportParameters::default(),
428            )),
429
430            app_limited: false,
431            receiving_ecn: false,
432            total_authed_packets: 0,
433
434            next_observed_addr_seq_no: 0u32.into(),
435
436            streams: StreamsState::new(
437                side,
438                config.max_concurrent_uni_streams,
439                config.max_concurrent_bidi_streams,
440                config.send_window,
441                config.receive_window,
442                config.stream_receive_window,
443            ),
444            datagrams: DatagramState::default(),
445            config,
446            rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
447            rng,
448            stats: ConnectionStats::default(),
449            path_stats: Default::default(),
450            version,
451
452            // peer params are not yet known, so multipath is not enabled
453            max_concurrent_paths: NonZeroU32::MIN,
454            local_max_path_id: PathId::ZERO,
455            remote_max_path_id: PathId::ZERO,
456            max_path_id_with_cids: PathId::ZERO,
457            abandoned_paths: Default::default(),
458
459            // iroh's nat traversal
460            iroh_hp: Default::default(),
461            qlog,
462        };
463        if path_validated {
464            this.on_path_validated(PathId::ZERO);
465        }
466        if side.is_client() {
467            // Kick off the connection
468            this.write_crypto();
469            this.init_0rtt(now);
470        }
471        this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
472        this
473    }
474
475    /// Returns the next time at which `handle_timeout` should be called
476    ///
477    /// The value returned may change after:
478    /// - the application performed some I/O on the connection
479    /// - a call was made to `handle_event`
480    /// - a call to `poll_transmit` returned `Some`
481    /// - a call was made to `handle_timeout`
482    #[must_use]
483    pub fn poll_timeout(&mut self) -> Option<Instant> {
484        self.timers.peek()
485    }
486
487    /// Returns application-facing events
488    ///
489    /// Connections should be polled for events after:
490    /// - a call was made to `handle_event`
491    /// - a call was made to `handle_timeout`
492    #[must_use]
493    pub fn poll(&mut self) -> Option<Event> {
494        if let Some(x) = self.events.pop_front() {
495            return Some(x);
496        }
497
498        if let Some(event) = self.streams.poll() {
499            return Some(Event::Stream(event));
500        }
501
502        if let Some(reason) = self.state.take_error() {
503            return Some(Event::ConnectionLost { reason });
504        }
505
506        None
507    }
508
509    /// Return endpoint-facing events
510    #[must_use]
511    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
512        self.endpoint_events.pop_front().map(EndpointEvent)
513    }
514
515    /// Provide control over streams
516    #[must_use]
517    pub fn streams(&mut self) -> Streams<'_> {
518        Streams {
519            state: &mut self.streams,
520            conn_state: &self.state,
521        }
522    }
523
524    /// Provide control over streams
525    #[must_use]
526    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
527        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
528        RecvStream {
529            id,
530            state: &mut self.streams,
531            pending: &mut self.spaces[SpaceId::Data].pending,
532        }
533    }
534
535    /// Provide control over streams
536    #[must_use]
537    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
538        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
539        SendStream {
540            id,
541            state: &mut self.streams,
542            pending: &mut self.spaces[SpaceId::Data].pending,
543            conn_state: &self.state,
544        }
545    }
546
547    /// Opens a new path only if no path to the remote address exists so far
548    ///
549    /// See [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id,
550    /// false)` if was opened.
551    ///
552    /// [`open_path`]: Connection::open_path
553    pub fn open_path_ensure(
554        &mut self,
555        remote: SocketAddr,
556        initial_status: PathStatus,
557        now: Instant,
558    ) -> Result<(PathId, bool), PathError> {
559        match self
560            .paths
561            .iter()
562            .find(|(_id, path)| path.data.remote == remote)
563        {
564            Some((path_id, _state)) => Ok((*path_id, true)),
565            None => self
566                .open_path(remote, initial_status, now)
567                .map(|id| (id, false)),
568        }
569    }
570
571    /// Opens a new path
572    ///
573    /// Further errors might occur and they will be emitted in [`PathEvent::LocallyClosed`] events.
574    /// When the path is opened it will be reported as an [`PathEvent::Opened`].
575    pub fn open_path(
576        &mut self,
577        remote: SocketAddr,
578        initial_status: PathStatus,
579        now: Instant,
580    ) -> Result<PathId, PathError> {
581        if !self.is_multipath_negotiated() {
582            return Err(PathError::MultipathNotNegotiated);
583        }
584        if self.side().is_server() {
585            return Err(PathError::ServerSideNotAllowed);
586        }
587
588        let max_abandoned = self.abandoned_paths.iter().max().copied();
589        let max_used = self.paths.keys().last().copied();
590        let path_id = max_abandoned
591            .max(max_used)
592            .unwrap_or(PathId::ZERO)
593            .saturating_add(1u8);
594
595        if Some(path_id) > self.max_path_id() {
596            return Err(PathError::MaxPathIdReached);
597        }
598        if path_id > self.remote_max_path_id {
599            self.spaces[SpaceId::Data].pending.paths_blocked = true;
600            return Err(PathError::MaxPathIdReached);
601        }
602        if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
603            self.spaces[SpaceId::Data]
604                .pending
605                .path_cids_blocked
606                .push(path_id);
607            return Err(PathError::RemoteCidsExhausted);
608        }
609
610        let path = self.ensure_path(path_id, remote, now, None);
611        path.status.local_update(initial_status);
612
613        Ok(path_id)
614    }
615
616    /// Closes a path by sending a PATH_ABANDON frame
617    ///
618    /// This will not allow closing the last path. It does allow closing paths which have
619    /// not yet been opened, as e.g. is the case when receiving a PATH_ABANDON from the peer
620    /// for a path that was never opened locally.
621    pub fn close_path(
622        &mut self,
623        now: Instant,
624        path_id: PathId,
625        error_code: VarInt,
626    ) -> Result<(), ClosePathError> {
627        if self.abandoned_paths.contains(&path_id)
628            || Some(path_id) > self.max_path_id()
629            || !self.paths.contains_key(&path_id)
630        {
631            return Err(ClosePathError::ClosedPath);
632        }
633        if self
634            .paths
635            .iter()
636            // Would there be any remaining, non-abandoned, validated paths
637            .any(|(id, path)| {
638                *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
639            })
640            .not()
641        {
642            return Err(ClosePathError::LastOpenPath);
643        }
644
645        // Send PATH_ABANDON
646        self.spaces[SpaceId::Data]
647            .pending
648            .path_abandon
649            .insert(path_id, error_code.into());
650
651        // Remove pending NEW CIDs for this path
652        let pending_space = &mut self.spaces[SpaceId::Data].pending;
653        pending_space.new_cids.retain(|cid| cid.path_id != path_id);
654        pending_space.path_cids_blocked.retain(|&id| id != path_id);
655        pending_space.path_status.retain(|&id| id != path_id);
656
657        // Cleanup retransmits across ALL paths (CIDs for path_id may have been transmitted on other paths)
658        for space in self.spaces[SpaceId::Data].iter_paths_mut() {
659            for sent_packet in space.sent_packets.values_mut() {
660                if let Some(retransmits) = sent_packet.retransmits.get_mut() {
661                    retransmits.new_cids.retain(|cid| cid.path_id != path_id);
662                    retransmits.path_cids_blocked.retain(|&id| id != path_id);
663                    retransmits.path_status.retain(|&id| id != path_id);
664                }
665            }
666        }
667
668        // Consider remotely issued CIDs as retired.
669        // Technically we don't have to do this just yet.  We only need to do this *after*
670        // the ABANDON_PATH frame is sent, allowing us to still send it on the
671        // to-be-abandoned path.  However it is recommended to send it on another path, and
672        // we do not allow abandoning the last path anyway.
673        self.rem_cids.remove(&path_id);
674        self.endpoint_events
675            .push_back(EndpointEventInner::RetireResetToken(path_id));
676
677        let pto = self.pto_max_path(SpaceId::Data, false);
678
679        let path = self.paths.get_mut(&path_id).expect("checked above");
680
681        // We record the time after which receiving data on this path generates a transport error.
682        path.data.last_allowed_receive = Some(now + 3 * pto);
683        self.abandoned_paths.insert(path_id);
684
685        self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
686
687        // The peer MUST respond with a corresponding PATH_ABANDON frame.
688        // If we receive packets on the abandoned path after 3 * PTO we trigger a transport error.
689        // In any case, we completely discard the path after 6 * PTO whether we receive a
690        // PATH_ABANDON or not.
691        self.timers.set(
692            Timer::PerPath(path_id, PathTimer::DiscardPath),
693            now + 6 * pto,
694            self.qlog.with_time(now),
695        );
696        Ok(())
697    }
698
699    /// Gets the [`PathData`] for a known [`PathId`].
700    ///
701    /// Will panic if the path_id does not reference any known path.
702    #[track_caller]
703    fn path_data(&self, path_id: PathId) -> &PathData {
704        if let Some(data) = self.paths.get(&path_id) {
705            &data.data
706        } else {
707            panic!(
708                "unknown path: {path_id}, currently known paths: {:?}",
709                self.paths.keys().collect::<Vec<_>>()
710            );
711        }
712    }
713
714    /// Gets a reference to the [`PathData`] for a [`PathId`]
715    fn path(&self, path_id: PathId) -> Option<&PathData> {
716        self.paths.get(&path_id).map(|path_state| &path_state.data)
717    }
718
719    /// Gets a mutable reference to the [`PathData`] for a [`PathId`]
720    fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
721        self.paths
722            .get_mut(&path_id)
723            .map(|path_state| &mut path_state.data)
724    }
725
726    /// Returns all known paths.
727    ///
728    /// There is no guarantee any of these paths are open or usable.
729    pub fn paths(&self) -> Vec<PathId> {
730        self.paths.keys().copied().collect()
731    }
732
733    /// Gets the local [`PathStatus`] for a known [`PathId`]
734    pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
735        self.path(path_id)
736            .map(PathData::local_status)
737            .ok_or(ClosedPath { _private: () })
738    }
739
740    /// Returns the path's remote socket address
741    pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
742        self.path(path_id)
743            .map(|path| path.remote)
744            .ok_or(ClosedPath { _private: () })
745    }
746
747    /// Sets the [`PathStatus`] for a known [`PathId`]
748    ///
749    /// Returns the previous path status on success.
750    pub fn set_path_status(
751        &mut self,
752        path_id: PathId,
753        status: PathStatus,
754    ) -> Result<PathStatus, SetPathStatusError> {
755        if !self.is_multipath_negotiated() {
756            return Err(SetPathStatusError::MultipathNotNegotiated);
757        }
758        let path = self
759            .path_mut(path_id)
760            .ok_or(SetPathStatusError::ClosedPath)?;
761        let prev = match path.status.local_update(status) {
762            Some(prev) => {
763                self.spaces[SpaceId::Data]
764                    .pending
765                    .path_status
766                    .insert(path_id);
767                prev
768            }
769            None => path.local_status(),
770        };
771        Ok(prev)
772    }
773
774    /// Returns the remote path status
775    // TODO(flub): Probably should also be some kind of path event?  Not even sure if I like
776    //    this as an API, but for now it allows me to write a test easily.
777    // TODO(flub): Technically this should be a Result<Option<PathSTatus>>?
778    pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
779        self.path(path_id).and_then(|path| path.remote_status())
780    }
781
782    /// Sets the max_idle_timeout for a specific path
783    ///
784    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
785    ///
786    /// Returns the previous value of the setting.
787    pub fn set_path_max_idle_timeout(
788        &mut self,
789        path_id: PathId,
790        timeout: Option<Duration>,
791    ) -> Result<Option<Duration>, ClosedPath> {
792        let path = self
793            .paths
794            .get_mut(&path_id)
795            .ok_or(ClosedPath { _private: () })?;
796        Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
797    }
798
799    /// Sets the keep_alive_interval for a specific path
800    ///
801    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
802    ///
803    /// Returns the previous value of the setting.
804    pub fn set_path_keep_alive_interval(
805        &mut self,
806        path_id: PathId,
807        interval: Option<Duration>,
808    ) -> Result<Option<Duration>, ClosedPath> {
809        let path = self
810            .paths
811            .get_mut(&path_id)
812            .ok_or(ClosedPath { _private: () })?;
813        Ok(std::mem::replace(&mut path.data.keep_alive, interval))
814    }
815
816    /// Gets the [`PathData`] for a known [`PathId`].
817    ///
818    /// Will panic if the path_id does not reference any known path.
819    #[track_caller]
820    fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
821        &mut self.paths.get_mut(&path_id).expect("known path").data
822    }
823
824    fn ensure_path(
825        &mut self,
826        path_id: PathId,
827        remote: SocketAddr,
828        now: Instant,
829        pn: Option<u64>,
830    ) -> &mut PathData {
831        let vacant_entry = match self.paths.entry(path_id) {
832            btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
833            btree_map::Entry::Occupied(occupied_entry) => {
834                return &mut occupied_entry.into_mut().data;
835            }
836        };
837
838        // TODO(matheus23): Add back short-circuiting path.validated = true, if we know that the
839        // path's four-tuple was already validated.
840        debug!(%path_id, ?remote, "path added");
841        let peer_max_udp_payload_size =
842            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
843        self.path_counter = self.path_counter.wrapping_add(1);
844        let mut data = PathData::new(
845            remote,
846            self.allow_mtud,
847            Some(peer_max_udp_payload_size),
848            self.path_counter,
849            now,
850            &self.config,
851        );
852
853        let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
854        self.timers.set(
855            Timer::PerPath(path_id, PathTimer::PathOpen),
856            now + 3 * pto,
857            self.qlog.with_time(now),
858        );
859
860        // for the path to be opened we need to send a packet on the path. Sending a challenge
861        // guarantees this
862        data.send_new_challenge = true;
863
864        let path = vacant_entry.insert(PathState { data, prev: None });
865
866        let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
867        if let Some(pn) = pn {
868            pn_space.dedup.insert(pn);
869        }
870        self.spaces[SpaceId::Data]
871            .number_spaces
872            .insert(path_id, pn_space);
873        self.qlog.emit_tuple_assigned(path_id, remote, now);
874        &mut path.data
875    }
876
877    /// Returns packets to transmit
878    ///
879    /// Connections should be polled for transmit after:
880    /// - the application performed some I/O on the connection
881    /// - a call was made to `handle_event`
882    /// - a call was made to `handle_timeout`
883    ///
884    /// `max_datagrams` specifies how many datagrams can be returned inside a
885    /// single Transmit using GSO. This must be at least 1.
886    #[must_use]
887    pub fn poll_transmit(
888        &mut self,
889        now: Instant,
890        max_datagrams: NonZeroUsize,
891        buf: &mut Vec<u8>,
892    ) -> Option<Transmit> {
893        if let Some(probing) = self
894            .iroh_hp
895            .server_side_mut()
896            .ok()
897            .and_then(iroh_hp::ServerState::next_probe)
898        {
899            let destination = probing.remote();
900            trace!(%destination, "RAND_DATA packet");
901            let token: u64 = self.rng.random();
902            buf.put_u64(token);
903            probing.finish(token);
904            return Some(Transmit {
905                destination,
906                ecn: None,
907                size: 8,
908                segment_size: None,
909                src_ip: None,
910            });
911        }
912
913        let max_datagrams = match self.config.enable_segmentation_offload {
914            false => NonZeroUsize::MIN,
915            true => max_datagrams,
916        };
917
918        // Each call to poll_transmit can only send datagrams to one destination, because
919        // all datagrams in a GSO batch are for the same destination.  Therefore only
920        // datagrams for one Path ID are produced for each poll_transmit call.
921
922        // TODO(flub): this is wishful thinking and not actually implemented, but perhaps it
923        //   should be:
924
925        // First, if we have to send a close, select a path for that.
926        // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending.
927
928        // For all, open, validated and AVAILABLE paths:
929        // - Is the path congestion blocked or pacing blocked?
930        // - call maybe_queue_ to ensure a tail-loss probe would be sent?
931        // - do we need to send a close message?
932        // - call can_send
933        // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths
934
935        // Check whether we need to send a close message
936        let close = match self.state.as_type() {
937            StateType::Drained => {
938                self.app_limited = true;
939                return None;
940            }
941            StateType::Draining | StateType::Closed => {
942                // self.close is only reset once the associated packet had been
943                // encoded successfully
944                if !self.close {
945                    self.app_limited = true;
946                    return None;
947                }
948                true
949            }
950            _ => false,
951        };
952
953        // Check whether we need to send an ACK_FREQUENCY frame
954        if let Some(config) = &self.config.ack_frequency_config {
955            let rtt = self
956                .paths
957                .values()
958                .map(|p| p.data.rtt.get())
959                .min()
960                .expect("one path exists");
961            self.spaces[SpaceId::Data].pending.ack_frequency = self
962                .ack_frequency
963                .should_send_ack_frequency(rtt, config, &self.peer_params)
964                && self.highest_space == SpaceId::Data
965                && self.peer_supports_ack_frequency();
966        }
967
968        // Whether this packet can be coalesced with another one in the same datagram.
969        let mut coalesce = true;
970
971        // Whether the last packet in the datagram must be padded so the datagram takes up
972        // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller.
973        let mut pad_datagram = PadDatagram::No;
974
975        // Whether congestion control stopped the next packet from being sent. Further
976        // packets could still be built, as e.g. tail-loss probes are not congestion
977        // limited.
978        let mut congestion_blocked = false;
979
980        // The packet number of the last built packet.
981        let mut last_packet_number = None;
982
983        let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
984
985        // If there is any open, validated and available path we only want to send frames to
986        // any backup path that must be sent on that backup path exclusively.
987        let have_available_path = self.paths.iter().any(|(id, path)| {
988            path.data.validated
989                && path.data.local_status() == PathStatus::Available
990                && self.rem_cids.contains_key(id)
991        });
992
993        // Setup for the first path_id
994        let mut transmit = TransmitBuf::new(
995            buf,
996            max_datagrams,
997            self.path_data(path_id).current_mtu().into(),
998        );
999        if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1000            return Some(challenge);
1001        }
1002        let mut space_id = match path_id {
1003            PathId::ZERO => SpaceId::Initial,
1004            _ => SpaceId::Data,
1005        };
1006
1007        loop {
1008            // check if there is at least one active CID to use for sending
1009            let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1010                let err = PathError::RemoteCidsExhausted;
1011                if !self.abandoned_paths.contains(&path_id) {
1012                    debug!(?err, %path_id, "no active CID for path");
1013                    self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1014                        id: path_id,
1015                        error: err,
1016                    }));
1017                    // Locally we should have refused to open this path, the remote should
1018                    // have given us CIDs for this path before opening it.  So we can always
1019                    // abandon this here.
1020                    self.close_path(
1021                        now,
1022                        path_id,
1023                        TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1024                    )
1025                    .ok();
1026                    self.spaces[SpaceId::Data]
1027                        .pending
1028                        .path_cids_blocked
1029                        .push(path_id);
1030                } else {
1031                    trace!(%path_id, "remote CIDs retired for abandoned path");
1032                }
1033
1034                match self.paths.keys().find(|&&next| next > path_id) {
1035                    Some(next_path_id) => {
1036                        // See if this next path can send anything.
1037                        path_id = *next_path_id;
1038                        space_id = SpaceId::Data;
1039
1040                        // update per path state
1041                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1042                        if let Some(challenge) =
1043                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1044                        {
1045                            return Some(challenge);
1046                        }
1047
1048                        continue;
1049                    }
1050                    None => {
1051                        // Nothing more to send.
1052                        trace!(
1053                            ?space_id,
1054                            %path_id,
1055                            "no CIDs to send on path, no more paths"
1056                        );
1057                        break;
1058                    }
1059                }
1060            };
1061
1062            // Determine if anything can be sent in this packet number space (SpaceId +
1063            // PathId).
1064            let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1065                // We are trying to coalesce another packet into this datagram.
1066                transmit.datagram_remaining_mut()
1067            } else {
1068                // A new datagram needs to be started.
1069                transmit.segment_size()
1070            };
1071            let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1072            let path_should_send = {
1073                let path_exclusive_only = space_id == SpaceId::Data
1074                    && have_available_path
1075                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1076                let path_should_send = if path_exclusive_only {
1077                    can_send.path_exclusive
1078                } else {
1079                    !can_send.is_empty()
1080                };
1081                let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1082                path_should_send || needs_loss_probe
1083            };
1084
1085            if !path_should_send && space_id < SpaceId::Data {
1086                if self.spaces[space_id].crypto.is_some() {
1087                    trace!(?space_id, %path_id, "nothing to send in space");
1088                }
1089                space_id = space_id.next();
1090                continue;
1091            }
1092
1093            let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1094                // Only check congestion control if a new datagram is needed.
1095                self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1096            } else {
1097                PathBlocked::No
1098            };
1099            if send_blocked != PathBlocked::No {
1100                trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1101                congestion_blocked = true;
1102            }
1103            if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1104                // Higher spaces might still have tail-loss probes to send, which are not
1105                // congestion blocked.
1106                space_id = space_id.next();
1107                continue;
1108            }
1109            if !path_should_send || send_blocked != PathBlocked::No {
1110                // Nothing more to send on this path, check the next path if possible.
1111
1112                // If there are any datagrams in the transmit, packets for another path can
1113                // not be built.
1114                if transmit.num_datagrams() > 0 {
1115                    break;
1116                }
1117
1118                match self.paths.keys().find(|&&next| next > path_id) {
1119                    Some(next_path_id) => {
1120                        // See if this next path can send anything.
1121                        trace!(
1122                            ?space_id,
1123                            %path_id,
1124                            %next_path_id,
1125                            "nothing to send on path"
1126                        );
1127                        path_id = *next_path_id;
1128                        space_id = SpaceId::Data;
1129
1130                        // update per path state
1131                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1132                        if let Some(challenge) =
1133                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1134                        {
1135                            return Some(challenge);
1136                        }
1137
1138                        continue;
1139                    }
1140                    None => {
1141                        // Nothing more to send.
1142                        trace!(
1143                            ?space_id,
1144                            %path_id,
1145                            next_path_id=?None::<PathId>,
1146                            "nothing to send on path"
1147                        );
1148                        break;
1149                    }
1150                }
1151            }
1152
1153            // If the datagram is full, we need to start a new one.
1154            if transmit.datagram_remaining_mut() == 0 {
1155                if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1156                    // No more datagrams allowed
1157                    break;
1158                }
1159
1160                match self.spaces[space_id].for_path(path_id).loss_probes {
1161                    0 => transmit.start_new_datagram(),
1162                    _ => {
1163                        // We need something to send for a tail-loss probe.
1164                        let request_immediate_ack =
1165                            space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1166                        self.spaces[space_id].maybe_queue_probe(
1167                            path_id,
1168                            request_immediate_ack,
1169                            &self.streams,
1170                        );
1171
1172                        self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1173
1174                        // Clamp the datagram to at most the minimum MTU to ensure that loss
1175                        // probes can get through and enable recovery even if the path MTU
1176                        // has shrank unexpectedly.
1177                        transmit.start_new_datagram_with_size(std::cmp::min(
1178                            usize::from(INITIAL_MTU),
1179                            transmit.segment_size(),
1180                        ));
1181                    }
1182                }
1183                trace!(count = transmit.num_datagrams(), "new datagram started");
1184                coalesce = true;
1185                pad_datagram = PadDatagram::No;
1186            }
1187
1188            // If coalescing another packet into the existing datagram, there should
1189            // still be enough space for a whole packet.
1190            if transmit.datagram_start_offset() < transmit.len() {
1191                debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1192            }
1193
1194            //
1195            // From here on, we've determined that a packet will definitely be sent.
1196            //
1197
1198            if self.spaces[SpaceId::Initial].crypto.is_some()
1199                && space_id == SpaceId::Handshake
1200                && self.side.is_client()
1201            {
1202                // A client stops both sending and processing Initial packets when it
1203                // sends its first Handshake packet.
1204                self.discard_space(now, SpaceId::Initial);
1205            }
1206            if let Some(ref mut prev) = self.prev_crypto {
1207                prev.update_unacked = false;
1208            }
1209
1210            let mut qlog = QlogSentPacket::default();
1211            let mut builder = PacketBuilder::new(
1212                now,
1213                space_id,
1214                path_id,
1215                remote_cid,
1216                &mut transmit,
1217                can_send.other,
1218                self,
1219                &mut qlog,
1220            )?;
1221            last_packet_number = Some(builder.exact_number);
1222            coalesce = coalesce && !builder.short_header;
1223
1224            if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1225                // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1
1226                pad_datagram |= PadDatagram::ToMinMtu;
1227            }
1228            if space_id == SpaceId::Data && self.config.pad_to_mtu {
1229                pad_datagram |= PadDatagram::ToSegmentSize;
1230            }
1231
1232            if can_send.close {
1233                trace!("sending CONNECTION_CLOSE");
1234                // Encode ACKs before the ConnectionClose message, to give the receiver
1235                // a better approximate on what data has been processed. This is
1236                // especially important with ack delay, since the peer might not
1237                // have gotten any other ACK for the data earlier on.
1238                let mut sent_frames = SentFrames::default();
1239                let is_multipath_negotiated = self.is_multipath_negotiated();
1240                for path_id in self.spaces[space_id]
1241                    .number_spaces
1242                    .iter()
1243                    .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1244                    .map(|(&path_id, _)| path_id)
1245                    .collect::<Vec<_>>()
1246                {
1247                    Self::populate_acks(
1248                        now,
1249                        self.receiving_ecn,
1250                        &mut sent_frames,
1251                        path_id,
1252                        space_id,
1253                        &mut self.spaces[space_id],
1254                        is_multipath_negotiated,
1255                        &mut builder.frame_space_mut(),
1256                        &mut self.stats,
1257                        &mut qlog,
1258                    );
1259                }
1260
1261                // Since there only 64 ACK frames there will always be enough space
1262                // to encode the ConnectionClose frame too. However we still have the
1263                // check here to prevent crashes if something changes.
1264                debug_assert!(
1265                    builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1266                    "ACKs should leave space for ConnectionClose"
1267                );
1268                if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1269                    let max_frame_size = builder.frame_space_remaining();
1270                    match self.state.as_type() {
1271                        StateType::Closed => {
1272                            let reason: Close =
1273                                self.state.as_closed().expect("checked").clone().into();
1274                            if space_id == SpaceId::Data || reason.is_transport_layer() {
1275                                reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1276                                qlog.frame(&Frame::Close(reason));
1277                            } else {
1278                                let frame = frame::ConnectionClose {
1279                                    error_code: TransportErrorCode::APPLICATION_ERROR,
1280                                    frame_type: frame::MaybeFrame::None,
1281                                    reason: Bytes::new(),
1282                                };
1283                                frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1284                                qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1285                            }
1286                        }
1287                        StateType::Draining => {
1288                            let frame = frame::ConnectionClose {
1289                                error_code: TransportErrorCode::NO_ERROR,
1290                                frame_type: frame::MaybeFrame::None,
1291                                reason: Bytes::new(),
1292                            };
1293                            frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1294                            qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1295                        }
1296                        _ => unreachable!(
1297                            "tried to make a close packet when the connection wasn't closed"
1298                        ),
1299                    };
1300                }
1301                builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1302                if space_id == self.highest_space {
1303                    // Don't send another close packet. Even with multipath we only send
1304                    // CONNECTION_CLOSE on a single path since we expect our paths to work.
1305                    self.close = false;
1306                    // `CONNECTION_CLOSE` is the final packet
1307                    break;
1308                } else {
1309                    // Send a close frame in every possible space for robustness, per
1310                    // RFC9000 "Immediate Close during the Handshake". Don't bother trying
1311                    // to send anything else.
1312                    space_id = space_id.next();
1313                    continue;
1314                }
1315            }
1316
1317            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that
1318            // path validation can occur while the link is saturated.
1319            if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1320                let path = self.path_data_mut(path_id);
1321                if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1322                    // TODO(flub): We need to use the right CID!  We shouldn't use the same
1323                    //    CID as the current active one for the path.  Though see also
1324                    //    https://github.com/quinn-rs/quinn/issues/2184
1325                    let response = frame::PathResponse(token);
1326                    trace!(%response, "(off-path)");
1327                    builder.frame_space_mut().write(response);
1328                    qlog.frame(&Frame::PathResponse(response));
1329                    self.stats.frame_tx.path_response += 1;
1330                    builder.finish_and_track(
1331                        now,
1332                        self,
1333                        path_id,
1334                        SentFrames {
1335                            non_retransmits: true,
1336                            ..SentFrames::default()
1337                        },
1338                        PadDatagram::ToMinMtu,
1339                        qlog,
1340                    );
1341                    self.stats.udp_tx.on_sent(1, transmit.len());
1342                    return Some(Transmit {
1343                        destination: remote,
1344                        size: transmit.len(),
1345                        ecn: None,
1346                        segment_size: None,
1347                        src_ip: self.local_ip,
1348                    });
1349                }
1350            }
1351
1352            let sent_frames = {
1353                let path_exclusive_only = have_available_path
1354                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1355                let pn = builder.exact_number;
1356                self.populate_packet(
1357                    now,
1358                    space_id,
1359                    path_id,
1360                    path_exclusive_only,
1361                    &mut builder.frame_space_mut(),
1362                    pn,
1363                    &mut qlog,
1364                )
1365            };
1366
1367            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
1368            // any other reason, there is a bug which leads to one component announcing write
1369            // readiness while not writing any data. This degrades performance. The condition is
1370            // only checked if the full MTU is available and when potentially large fixed-size
1371            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
1372            // writing ACKs.
1373            debug_assert!(
1374                !(sent_frames.is_ack_only(&self.streams)
1375                    && !can_send.acks
1376                    && can_send.other
1377                    && builder.buf.segment_size()
1378                        == self.path_data(path_id).current_mtu() as usize
1379                    && self.datagrams.outgoing.is_empty()),
1380                "SendableFrames was {can_send:?}, but only ACKs have been written"
1381            );
1382            if sent_frames.requires_padding {
1383                pad_datagram |= PadDatagram::ToMinMtu;
1384            }
1385
1386            for (path_id, _pn) in sent_frames.largest_acked.iter() {
1387                self.spaces[space_id]
1388                    .for_path(*path_id)
1389                    .pending_acks
1390                    .acks_sent();
1391                self.timers.stop(
1392                    Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1393                    self.qlog.with_time(now),
1394                );
1395            }
1396
1397            // Now we need to finish the packet.  Before we do so we need to know if we will
1398            // be coalescing the next packet into this one, or will be ending the datagram
1399            // as well.  Because if this is the last packet in the datagram more padding
1400            // might be needed because of the packet type, or to fill the GSO segment size.
1401
1402            // Are we allowed to coalesce AND is there enough space for another *packet* in
1403            // this datagram AND is there another packet to send in this or the next space?
1404            if coalesce
1405                && builder
1406                    .buf
1407                    .datagram_remaining_mut()
1408                    .saturating_sub(builder.predict_packet_end())
1409                    > MIN_PACKET_SPACE
1410                && self
1411                    .next_send_space(space_id, path_id, builder.buf, close)
1412                    .is_some()
1413            {
1414                // We can append/coalesce the next packet into the current
1415                // datagram. Finish the current packet without adding extra padding.
1416                builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1417            } else {
1418                // We need a new datagram for the next packet.  Finish the current
1419                // packet with padding.
1420                if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1421                    // If too many padding bytes would be required to continue the
1422                    // GSO batch after this packet, end the GSO batch here. Ensures
1423                    // that fixed-size frames with heterogeneous sizes
1424                    // (e.g. application datagrams) won't inadvertently waste large
1425                    // amounts of bandwidth. The exact threshold is a bit arbitrary
1426                    // and might benefit from further tuning, though there's no
1427                    // universally optimal value.
1428                    const MAX_PADDING: usize = 32;
1429                    if builder.buf.datagram_remaining_mut()
1430                        > builder.predict_packet_end() + MAX_PADDING
1431                    {
1432                        trace!(
1433                            "GSO truncated by demand for {} padding bytes",
1434                            builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1435                        );
1436                        builder.finish_and_track(
1437                            now,
1438                            self,
1439                            path_id,
1440                            sent_frames,
1441                            PadDatagram::No,
1442                            qlog,
1443                        );
1444                        break;
1445                    }
1446
1447                    // Pad the current datagram to GSO segment size so it can be
1448                    // included in the GSO batch.
1449                    builder.finish_and_track(
1450                        now,
1451                        self,
1452                        path_id,
1453                        sent_frames,
1454                        PadDatagram::ToSegmentSize,
1455                        qlog,
1456                    );
1457                } else {
1458                    builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1459                }
1460                if transmit.num_datagrams() == 1 {
1461                    transmit.clip_datagram_size();
1462                }
1463            }
1464        }
1465
1466        if let Some(last_packet_number) = last_packet_number {
1467            // Note that when sending in multiple packet spaces the last packet number will
1468            // be the one from the highest packet space.
1469            self.path_data_mut(path_id).congestion.on_sent(
1470                now,
1471                transmit.len() as u64,
1472                last_packet_number,
1473            );
1474        }
1475
1476        self.qlog.emit_recovery_metrics(
1477            path_id,
1478            &mut self.paths.get_mut(&path_id).unwrap().data,
1479            now,
1480        );
1481
1482        self.app_limited = transmit.is_empty() && !congestion_blocked;
1483
1484        // Send MTU probe if necessary
1485        if transmit.is_empty() && self.state.is_established() {
1486            // MTU probing happens only in Data space.
1487            let space_id = SpaceId::Data;
1488            path_id = *self.paths.first_key_value().expect("one path must exist").0;
1489            let probe_data = loop {
1490                // We MTU probe all paths for which all of the following is true:
1491                // - We have an active destination CID for the path.
1492                // - The remote address *and* path are validated.
1493                // - The path is not abandoned.
1494                // - The MTU Discovery subsystem wants to probe the path.
1495                let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1496                let eligible = self.path_data(path_id).validated
1497                    && !self.path_data(path_id).is_validating_path()
1498                    && !self.abandoned_paths.contains(&path_id);
1499                let probe_size = eligible
1500                    .then(|| {
1501                        let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1502                        self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1503                    })
1504                    .flatten();
1505                match (active_cid, probe_size) {
1506                    (Some(active_cid), Some(probe_size)) => {
1507                        // Let's send an MTUD probe!
1508                        break Some((active_cid, probe_size));
1509                    }
1510                    _ => {
1511                        // Find the next path to check if it needs an MTUD probe.
1512                        match self.paths.keys().find(|&&next| next > path_id) {
1513                            Some(next) => {
1514                                path_id = *next;
1515                                continue;
1516                            }
1517                            None => break None,
1518                        }
1519                    }
1520                }
1521            };
1522            if let Some((active_cid, probe_size)) = probe_data {
1523                // We are definitely sending a DPLPMTUD probe.
1524                debug_assert_eq!(transmit.num_datagrams(), 0);
1525                transmit.start_new_datagram_with_size(probe_size as usize);
1526
1527                let mut qlog = QlogSentPacket::default();
1528                let mut builder = PacketBuilder::new(
1529                    now,
1530                    space_id,
1531                    path_id,
1532                    active_cid,
1533                    &mut transmit,
1534                    true,
1535                    self,
1536                    &mut qlog,
1537                )?;
1538
1539                // We implement MTU probes as ping packets padded up to the probe size
1540                trace!(?probe_size, "writing MTUD probe");
1541                trace!("PING");
1542                builder.frame_space_mut().write(frame::FrameType::Ping);
1543                qlog.frame(&Frame::Ping);
1544                self.stats.frame_tx.ping += 1;
1545
1546                // If supported by the peer, we want no delays to the probe's ACK
1547                if self.peer_supports_ack_frequency() {
1548                    trace!("IMMEDIATE_ACK");
1549                    builder
1550                        .frame_space_mut()
1551                        .write(frame::FrameType::ImmediateAck);
1552                    self.stats.frame_tx.immediate_ack += 1;
1553                    qlog.frame(&Frame::ImmediateAck);
1554                }
1555
1556                let sent_frames = SentFrames {
1557                    non_retransmits: true,
1558                    ..Default::default()
1559                };
1560                builder.finish_and_track(
1561                    now,
1562                    self,
1563                    path_id,
1564                    sent_frames,
1565                    PadDatagram::ToSize(probe_size),
1566                    qlog,
1567                );
1568
1569                self.path_stats
1570                    .entry(path_id)
1571                    .or_default()
1572                    .sent_plpmtud_probes += 1;
1573            }
1574        }
1575
1576        if transmit.is_empty() {
1577            return None;
1578        }
1579
1580        let destination = self.path_data(path_id).remote;
1581        trace!(
1582            segment_size = transmit.segment_size(),
1583            last_datagram_len = transmit.len() % transmit.segment_size(),
1584            ?destination,
1585            "sending {} bytes in {} datagrams",
1586            transmit.len(),
1587            transmit.num_datagrams()
1588        );
1589        self.path_data_mut(path_id)
1590            .inc_total_sent(transmit.len() as u64);
1591
1592        self.stats
1593            .udp_tx
1594            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1595
1596        Some(Transmit {
1597            destination,
1598            size: transmit.len(),
1599            ecn: if self.path_data(path_id).sending_ecn {
1600                Some(EcnCodepoint::Ect0)
1601            } else {
1602                None
1603            },
1604            segment_size: match transmit.num_datagrams() {
1605                1 => None,
1606                _ => Some(transmit.segment_size()),
1607            },
1608            src_ip: self.local_ip,
1609        })
1610    }
1611
1612    /// Returns the [`SpaceId`] of the next packet space which has data to send
1613    ///
1614    /// This takes into account the space available to frames in the next datagram.
1615    // TODO(flub): This duplication is not nice.
1616    fn next_send_space(
1617        &mut self,
1618        current_space_id: SpaceId,
1619        path_id: PathId,
1620        buf: &TransmitBuf<'_>,
1621        close: bool,
1622    ) -> Option<SpaceId> {
1623        // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed
1624        // to be able to send an individual frame at least this large in the next 1-RTT
1625        // packet. This could be generalized to support every space, but it's only needed to
1626        // handle large fixed-size frames, which only exist in 1-RTT (application
1627        // datagrams). We don't account for coalesced packets potentially occupying space
1628        // because frames can always spill into the next datagram.
1629        let mut space_id = current_space_id;
1630        loop {
1631            let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1632            if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1633                return Some(space_id);
1634            }
1635            space_id = match space_id {
1636                SpaceId::Initial => SpaceId::Handshake,
1637                SpaceId::Handshake => SpaceId::Data,
1638                SpaceId::Data => break,
1639            }
1640        }
1641        None
1642    }
1643
1644    /// Checks if creating a new datagram would be blocked by congestion control
1645    fn path_congestion_check(
1646        &mut self,
1647        space_id: SpaceId,
1648        path_id: PathId,
1649        transmit: &TransmitBuf<'_>,
1650        can_send: &SendableFrames,
1651        now: Instant,
1652    ) -> PathBlocked {
1653        // Anti-amplification is only based on `total_sent`, which gets updated after
1654        // the transmit is sent. Therefore we pass the amount of bytes for datagrams
1655        // that are already created, as well as 1 byte for starting another datagram. If
1656        // there is any anti-amplification budget left, we always allow a full MTU to be
1657        // sent (see https://github.com/quinn-rs/quinn/issues/1082).
1658        if self.side().is_server()
1659            && self
1660                .path_data(path_id)
1661                .anti_amplification_blocked(transmit.len() as u64 + 1)
1662        {
1663            trace!(?space_id, %path_id, "blocked by anti-amplification");
1664            return PathBlocked::AntiAmplification;
1665        }
1666
1667        // Congestion control check.
1668        // Tail loss probes must not be blocked by congestion, or a deadlock could arise.
1669        let bytes_to_send = transmit.segment_size() as u64;
1670        let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1671
1672        if can_send.other && !need_loss_probe && !can_send.close {
1673            let path = self.path_data(path_id);
1674            if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1675                trace!(?space_id, %path_id, "blocked by congestion control");
1676                return PathBlocked::Congestion;
1677            }
1678        }
1679
1680        // Pacing check.
1681        if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1682            self.timers.set(
1683                Timer::PerPath(path_id, PathTimer::Pacing),
1684                delay,
1685                self.qlog.with_time(now),
1686            );
1687            // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though
1688            // they are not congestion controlled.
1689            trace!(?space_id, %path_id, "blocked by pacing");
1690            return PathBlocked::Pacing;
1691        }
1692
1693        PathBlocked::No
1694    }
1695
1696    /// Send PATH_CHALLENGE for a previous path if necessary
1697    ///
1698    /// QUIC-TRANSPORT section 9.3.3
1699    /// <https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding>
1700    fn send_prev_path_challenge(
1701        &mut self,
1702        now: Instant,
1703        buf: &mut TransmitBuf<'_>,
1704        path_id: PathId,
1705    ) -> Option<Transmit> {
1706        let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1707        // TODO (matheus23): We could use !prev_path.is_validating() here instead to
1708        // (possibly) also re-send challenges when they get lost.
1709        if !prev_path.send_new_challenge {
1710            return None;
1711        };
1712        prev_path.send_new_challenge = false;
1713        let destination = prev_path.remote;
1714        let token = self.rng.random();
1715        let info = paths::SentChallengeInfo {
1716            sent_instant: now,
1717            remote: destination,
1718        };
1719        prev_path.challenges_sent.insert(token, info);
1720        debug_assert_eq!(
1721            self.highest_space,
1722            SpaceId::Data,
1723            "PATH_CHALLENGE queued without 1-RTT keys"
1724        );
1725        buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1726
1727        // Use the previous CID to avoid linking the new path with the previous path. We
1728        // don't bother accounting for possible retirement of that prev_cid because this is
1729        // sent once, immediately after migration, when the CID is known to be valid. Even
1730        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1731        // this is sent first.
1732        debug_assert_eq!(buf.datagram_start_offset(), 0);
1733        let mut qlog = QlogSentPacket::default();
1734        let mut builder = PacketBuilder::new(
1735            now,
1736            SpaceId::Data,
1737            path_id,
1738            *prev_cid,
1739            buf,
1740            false,
1741            self,
1742            &mut qlog,
1743        )?;
1744        let challenge = frame::PathChallenge(token);
1745        trace!(%challenge, "validating previous path");
1746        qlog.frame(&Frame::PathChallenge(challenge));
1747        builder.frame_space_mut().write(challenge);
1748        self.stats.frame_tx.path_challenge += 1;
1749
1750        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1751        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1752        // unless the anti-amplification limit for the path does not permit
1753        // sending a datagram of this size
1754        builder.pad_to(MIN_INITIAL_SIZE);
1755
1756        builder.finish(self, now, qlog);
1757        self.stats.udp_tx.on_sent(1, buf.len());
1758
1759        Some(Transmit {
1760            destination,
1761            size: buf.len(),
1762            ecn: None,
1763            segment_size: None,
1764            src_ip: self.local_ip,
1765        })
1766    }
1767
1768    /// Indicate what types of frames are ready to send for the given space
1769    ///
1770    /// *packet_size* is the number of bytes available to build the next packet.  *close*
1771    /// *indicates whether a CONNECTION_CLOSE frame needs to be sent.
1772    fn space_can_send(
1773        &mut self,
1774        space_id: SpaceId,
1775        path_id: PathId,
1776        packet_size: usize,
1777        close: bool,
1778    ) -> SendableFrames {
1779        let pn = self.spaces[SpaceId::Data]
1780            .for_path(path_id)
1781            .peek_tx_number();
1782        let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1783        if self.spaces[space_id].crypto.is_none()
1784            && (space_id != SpaceId::Data
1785                || self.zero_rtt_crypto.is_none()
1786                || self.side.is_server())
1787        {
1788            // No keys available for this space
1789            return SendableFrames::empty();
1790        }
1791        let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1792        if space_id == SpaceId::Data {
1793            can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1794        }
1795
1796        can_send.close = close && self.spaces[space_id].crypto.is_some();
1797
1798        can_send
1799    }
1800
1801    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1802    ///
1803    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1804    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1805    /// extracted through the relevant methods.
1806    pub fn handle_event(&mut self, event: ConnectionEvent) {
1807        use ConnectionEventInner::*;
1808        match event.0 {
1809            Datagram(DatagramConnectionEvent {
1810                now,
1811                remote,
1812                path_id,
1813                ecn,
1814                first_decode,
1815                remaining,
1816            }) => {
1817                let span = trace_span!("pkt", %path_id);
1818                let _guard = span.enter();
1819                // If this packet could initiate a migration and we're a client or a server that
1820                // forbids migration, drop the datagram. This could be relaxed to heuristically
1821                // permit NAT-rebinding-like migration.
1822                if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1823                    if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1824                        trace!(
1825                            %path_id,
1826                            ?remote,
1827                            path_remote = ?self.path(path_id).map(|p| p.remote),
1828                            "discarding packet from unrecognized peer"
1829                        );
1830                        return;
1831                    }
1832                }
1833
1834                let was_anti_amplification_blocked = self
1835                    .path(path_id)
1836                    .map(|path| path.anti_amplification_blocked(1))
1837                    .unwrap_or(true); // if we don't know about this path it's eagerly considered as unvalidated
1838                // TODO(@divma): revisit this
1839
1840                self.stats.udp_rx.datagrams += 1;
1841                self.stats.udp_rx.bytes += first_decode.len() as u64;
1842                let data_len = first_decode.len();
1843
1844                self.handle_decode(now, remote, path_id, ecn, first_decode);
1845                // The current `path` might have changed inside `handle_decode` since the packet
1846                // could have triggered a migration. The packet might also belong to an unknown
1847                // path and have been rejected. Make sure the data received is accounted for the
1848                // most recent path by accessing `path` after `handle_decode`.
1849                if let Some(path) = self.path_mut(path_id) {
1850                    path.inc_total_recvd(data_len as u64);
1851                }
1852
1853                if let Some(data) = remaining {
1854                    self.stats.udp_rx.bytes += data.len() as u64;
1855                    self.handle_coalesced(now, remote, path_id, ecn, data);
1856                }
1857
1858                if let Some(path) = self.paths.get_mut(&path_id) {
1859                    self.qlog
1860                        .emit_recovery_metrics(path_id, &mut path.data, now);
1861                }
1862
1863                if was_anti_amplification_blocked {
1864                    // A prior attempt to set the loss detection timer may have failed due to
1865                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1866                    // the server's first flight is lost.
1867                    self.set_loss_detection_timer(now, path_id);
1868                }
1869            }
1870            NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1871                let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1872                debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1873                let cid_state = self
1874                    .local_cid_state
1875                    .entry(path_id)
1876                    .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1877                cid_state.new_cids(&ids, now);
1878
1879                ids.into_iter().rev().for_each(|frame| {
1880                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1881                });
1882                // Always update Timer::PushNewCid
1883                self.reset_cid_retirement(now);
1884            }
1885        }
1886    }
1887
1888    /// Process timer expirations
1889    ///
1890    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1891    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1892    /// methods.
1893    ///
1894    /// It is most efficient to call this immediately after the system clock reaches the latest
1895    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1896    /// no-op and therefore are safe.
1897    pub fn handle_timeout(&mut self, now: Instant) {
1898        while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1899            // TODO(@divma): remove `at` when the unicorn is born
1900            trace!(?timer, at=?now, "timeout");
1901            match timer {
1902                Timer::Conn(timer) => match timer {
1903                    ConnTimer::Close => {
1904                        self.state.move_to_drained(None);
1905                        self.endpoint_events.push_back(EndpointEventInner::Drained);
1906                    }
1907                    ConnTimer::Idle => {
1908                        self.kill(ConnectionError::TimedOut);
1909                    }
1910                    ConnTimer::KeepAlive => {
1911                        trace!("sending keep-alive");
1912                        self.ping();
1913                    }
1914                    ConnTimer::KeyDiscard => {
1915                        self.zero_rtt_crypto = None;
1916                        self.prev_crypto = None;
1917                    }
1918                    ConnTimer::PushNewCid => {
1919                        while let Some((path_id, when)) = self.next_cid_retirement() {
1920                            if when > now {
1921                                break;
1922                            }
1923                            match self.local_cid_state.get_mut(&path_id) {
1924                                None => error!(%path_id, "No local CID state for path"),
1925                                Some(cid_state) => {
1926                                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1927                                    let num_new_cid = cid_state.on_cid_timeout().into();
1928                                    if !self.state.is_closed() {
1929                                        trace!(
1930                                            "push a new CID to peer RETIRE_PRIOR_TO field {}",
1931                                            cid_state.retire_prior_to()
1932                                        );
1933                                        self.endpoint_events.push_back(
1934                                            EndpointEventInner::NeedIdentifiers(
1935                                                path_id,
1936                                                now,
1937                                                num_new_cid,
1938                                            ),
1939                                        );
1940                                    }
1941                                }
1942                            }
1943                        }
1944                    }
1945                },
1946                // TODO: add path_id as span somehow
1947                Timer::PerPath(path_id, timer) => {
1948                    let span = trace_span!("per-path timer fired", %path_id, ?timer);
1949                    let _guard = span.enter();
1950                    match timer {
1951                        PathTimer::PathIdle => {
1952                            self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1953                                .ok();
1954                        }
1955
1956                        PathTimer::PathKeepAlive => {
1957                            trace!("sending keep-alive on path");
1958                            self.ping_path(path_id).ok();
1959                        }
1960                        PathTimer::LossDetection => {
1961                            self.on_loss_detection_timeout(now, path_id);
1962                            self.qlog.emit_recovery_metrics(
1963                                path_id,
1964                                &mut self.paths.get_mut(&path_id).unwrap().data,
1965                                now,
1966                            );
1967                        }
1968                        PathTimer::PathValidation => {
1969                            let Some(path) = self.paths.get_mut(&path_id) else {
1970                                continue;
1971                            };
1972                            self.timers.stop(
1973                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1974                                self.qlog.with_time(now),
1975                            );
1976                            debug!("path validation failed");
1977                            if let Some((_, prev)) = path.prev.take() {
1978                                path.data = prev;
1979                            }
1980                            path.data.challenges_sent.clear();
1981                            path.data.send_new_challenge = false;
1982                        }
1983                        PathTimer::PathChallengeLost => {
1984                            let Some(path) = self.paths.get_mut(&path_id) else {
1985                                continue;
1986                            };
1987                            trace!("path challenge deemed lost");
1988                            path.data.send_new_challenge = true;
1989                        }
1990                        PathTimer::PathOpen => {
1991                            let Some(path) = self.path_mut(path_id) else {
1992                                continue;
1993                            };
1994                            path.challenges_sent.clear();
1995                            path.send_new_challenge = false;
1996                            debug!("new path validation failed");
1997                            if let Err(err) = self.close_path(
1998                                now,
1999                                path_id,
2000                                TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2001                            ) {
2002                                warn!(?err, "failed closing path");
2003                            }
2004
2005                            self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2006                                id: path_id,
2007                                error: PathError::ValidationFailed,
2008                            }));
2009                        }
2010                        PathTimer::Pacing => trace!("pacing timer expired"),
2011                        PathTimer::MaxAckDelay => {
2012                            trace!("max ack delay reached");
2013                            // This timer is only armed in the Data space
2014                            self.spaces[SpaceId::Data]
2015                                .for_path(path_id)
2016                                .pending_acks
2017                                .on_max_ack_delay_timeout()
2018                        }
2019                        PathTimer::DiscardPath => {
2020                            // The path was abandoned and 3*PTO has expired since.  Clean up all
2021                            // remaining state and install stateless reset token.
2022                            self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2023                            if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2024                                let (min_seq, max_seq) = loc_cid_state.active_seq();
2025                                for seq in min_seq..=max_seq {
2026                                    self.endpoint_events.push_back(
2027                                        EndpointEventInner::RetireConnectionId(
2028                                            now, path_id, seq, false,
2029                                        ),
2030                                    );
2031                                }
2032                            }
2033                            self.discard_path(path_id, now);
2034                        }
2035                    }
2036                }
2037            }
2038        }
2039    }
2040
2041    /// Close a connection immediately
2042    ///
2043    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
2044    /// call this only when all important communications have been completed, e.g. by calling
2045    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
2046    /// [`StreamEvent::Finished`] event.
2047    ///
2048    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
2049    /// delivered. There may still be data from the peer that has not been received.
2050    ///
2051    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
2052    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2053        self.close_inner(
2054            now,
2055            Close::Application(frame::ApplicationClose { error_code, reason }),
2056        )
2057    }
2058
2059    fn close_inner(&mut self, now: Instant, reason: Close) {
2060        let was_closed = self.state.is_closed();
2061        if !was_closed {
2062            self.close_common();
2063            self.set_close_timer(now);
2064            self.close = true;
2065            self.state.move_to_closed_local(reason);
2066        }
2067    }
2068
2069    /// Control datagrams
2070    pub fn datagrams(&mut self) -> Datagrams<'_> {
2071        Datagrams { conn: self }
2072    }
2073
2074    /// Returns connection statistics
2075    pub fn stats(&mut self) -> ConnectionStats {
2076        self.stats.clone()
2077    }
2078
2079    /// Returns path statistics
2080    pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2081        let path = self.paths.get(&path_id)?;
2082        let stats = self.path_stats.entry(path_id).or_default();
2083        stats.rtt = path.data.rtt.get();
2084        stats.cwnd = path.data.congestion.window();
2085        stats.current_mtu = path.data.mtud.current_mtu();
2086        Some(*stats)
2087    }
2088
2089    /// Ping the remote endpoint
2090    ///
2091    /// Causes an ACK-eliciting packet to be transmitted on the connection.
2092    pub fn ping(&mut self) {
2093        // TODO(flub): This is very brute-force: it pings *all* the paths.  Instead it would
2094        //    be nice if we could only send a single packet for this.
2095        for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2096            path_data.ping_pending = true;
2097        }
2098    }
2099
2100    /// Ping the remote endpoint over a specific path
2101    ///
2102    /// Causes an ACK-eliciting packet to be transmitted on the path.
2103    pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2104        let path_data = self.spaces[self.highest_space]
2105            .number_spaces
2106            .get_mut(&path)
2107            .ok_or(ClosedPath { _private: () })?;
2108        path_data.ping_pending = true;
2109        Ok(())
2110    }
2111
2112    /// Update traffic keys spontaneously
2113    ///
2114    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
2115    pub fn force_key_update(&mut self) {
2116        if !self.state.is_established() {
2117            debug!("ignoring forced key update in illegal state");
2118            return;
2119        }
2120        if self.prev_crypto.is_some() {
2121            // We already just updated, or are currently updating, the keys. Concurrent key updates
2122            // are illegal.
2123            debug!("ignoring redundant forced key update");
2124            return;
2125        }
2126        self.update_keys(None, false);
2127    }
2128
2129    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
2130    #[doc(hidden)]
2131    #[deprecated]
2132    pub fn initiate_key_update(&mut self) {
2133        self.force_key_update();
2134    }
2135
2136    /// Get a session reference
2137    pub fn crypto_session(&self) -> &dyn crypto::Session {
2138        &*self.crypto
2139    }
2140
2141    /// Whether the connection is in the process of being established
2142    ///
2143    /// If this returns `false`, the connection may be either established or closed, signaled by the
2144    /// emission of a `Connected` or `ConnectionLost` message respectively.
2145    pub fn is_handshaking(&self) -> bool {
2146        self.state.is_handshake()
2147    }
2148
2149    /// Whether the connection is closed
2150    ///
2151    /// Closed connections cannot transport any further data. A connection becomes closed when
2152    /// either peer application intentionally closes it, or when either transport layer detects an
2153    /// error such as a time-out or certificate validation failure.
2154    ///
2155    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
2156    pub fn is_closed(&self) -> bool {
2157        self.state.is_closed()
2158    }
2159
2160    /// Whether there is no longer any need to keep the connection around
2161    ///
2162    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
2163    /// packets from the peer. All drained connections have been closed.
2164    pub fn is_drained(&self) -> bool {
2165        self.state.is_drained()
2166    }
2167
2168    /// For clients, if the peer accepted the 0-RTT data packets
2169    ///
2170    /// The value is meaningless until after the handshake completes.
2171    pub fn accepted_0rtt(&self) -> bool {
2172        self.accepted_0rtt
2173    }
2174
2175    /// Whether 0-RTT is/was possible during the handshake
2176    pub fn has_0rtt(&self) -> bool {
2177        self.zero_rtt_enabled
2178    }
2179
2180    /// Whether there are any pending retransmits
2181    pub fn has_pending_retransmits(&self) -> bool {
2182        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2183    }
2184
2185    /// Look up whether we're the client or server of this Connection
2186    pub fn side(&self) -> Side {
2187        self.side.side()
2188    }
2189
2190    /// Get the address observed by the remote over the given path
2191    pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2192        self.path(path_id)
2193            .map(|path_data| {
2194                path_data
2195                    .last_observed_addr_report
2196                    .as_ref()
2197                    .map(|observed| observed.socket_addr())
2198            })
2199            .ok_or(ClosedPath { _private: () })
2200    }
2201
2202    /// The local IP address which was used when the peer established
2203    /// the connection
2204    ///
2205    /// This can be different from the address the endpoint is bound to, in case
2206    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
2207    ///
2208    /// This will return `None` for clients, or when no `local_ip` was passed to
2209    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
2210    /// connection.
2211    pub fn local_ip(&self) -> Option<IpAddr> {
2212        self.local_ip
2213    }
2214
2215    /// Current best estimate of this connection's latency (round-trip-time)
2216    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2217        self.path(path_id).map(|d| d.rtt.get())
2218    }
2219
2220    /// Current state of this connection's congestion controller, for debugging purposes
2221    pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2222        self.path(path_id).map(|d| d.congestion.as_ref())
2223    }
2224
2225    /// Modify the number of remotely initiated streams that may be concurrently open
2226    ///
2227    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
2228    /// `count`s increase both minimum and worst-case memory consumption.
2229    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2230        self.streams.set_max_concurrent(dir, count);
2231        // If the limit was reduced, then a flow control update previously deemed insignificant may
2232        // now be significant.
2233        let pending = &mut self.spaces[SpaceId::Data].pending;
2234        self.streams.queue_max_stream_id(pending);
2235    }
2236
2237    /// Modify the number of open paths allowed when multipath is enabled
2238    ///
2239    /// When reducing the number of concurrent paths this will only affect delaying sending
2240    /// new MAX_PATH_ID frames until fewer than this number of paths are possible.  To
2241    /// actively reduce paths they must be closed using [`Connection::close_path`], which
2242    /// can also be used to close not-yet-opened paths.
2243    ///
2244    /// If multipath is not negotiated (see the [`TransportConfig`]) this can not enable
2245    /// multipath and will fail.
2246    pub fn set_max_concurrent_paths(
2247        &mut self,
2248        now: Instant,
2249        count: NonZeroU32,
2250    ) -> Result<(), MultipathNotNegotiated> {
2251        if !self.is_multipath_negotiated() {
2252            return Err(MultipathNotNegotiated { _private: () });
2253        }
2254        self.max_concurrent_paths = count;
2255
2256        let in_use_count = self
2257            .local_max_path_id
2258            .next()
2259            .saturating_sub(self.abandoned_paths.len() as u32)
2260            .as_u32();
2261        let extra_needed = count.get().saturating_sub(in_use_count);
2262        let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2263
2264        self.set_max_path_id(now, new_max_path_id);
2265
2266        Ok(())
2267    }
2268
2269    /// If needed, issues a new MAX_PATH_ID frame and new CIDs for any newly allowed paths
2270    fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2271        if max_path_id <= self.local_max_path_id {
2272            return;
2273        }
2274
2275        self.local_max_path_id = max_path_id;
2276        self.spaces[SpaceId::Data].pending.max_path_id = true;
2277
2278        self.issue_first_path_cids(now);
2279    }
2280
2281    /// Current number of remotely initiated streams that may be concurrently open
2282    ///
2283    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
2284    /// it will not change immediately, even if fewer streams are open. Instead, it will
2285    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
2286    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2287        self.streams.max_concurrent(dir)
2288    }
2289
2290    /// See [`TransportConfig::send_window()`]
2291    pub fn set_send_window(&mut self, send_window: u64) {
2292        self.streams.set_send_window(send_window);
2293    }
2294
2295    /// See [`TransportConfig::receive_window()`]
2296    pub fn set_receive_window(&mut self, receive_window: VarInt) {
2297        if self.streams.set_receive_window(receive_window) {
2298            self.spaces[SpaceId::Data].pending.max_data = true;
2299        }
2300    }
2301
2302    /// Whether the Multipath for QUIC extension is enabled.
2303    ///
2304    /// Multipath is only enabled after the handshake is completed and if it was enabled by both
2305    /// peers.
2306    pub fn is_multipath_negotiated(&self) -> bool {
2307        !self.is_handshaking()
2308            && self.config.max_concurrent_multipath_paths.is_some()
2309            && self.peer_params.initial_max_path_id.is_some()
2310    }
2311
2312    fn on_ack_received(
2313        &mut self,
2314        now: Instant,
2315        space: SpaceId,
2316        ack: frame::Ack,
2317    ) -> Result<(), TransportError> {
2318        // All ACKs are referencing path 0
2319        let path = PathId::ZERO;
2320        self.inner_on_ack_received(now, space, path, ack)
2321    }
2322
2323    fn on_path_ack_received(
2324        &mut self,
2325        now: Instant,
2326        space: SpaceId,
2327        path_ack: frame::PathAck,
2328    ) -> Result<(), TransportError> {
2329        let (ack, path) = path_ack.into_ack();
2330        self.inner_on_ack_received(now, space, path, ack)
2331    }
2332
2333    /// Handles an ACK frame acknowledging packets sent on *path*.
2334    fn inner_on_ack_received(
2335        &mut self,
2336        now: Instant,
2337        space: SpaceId,
2338        path: PathId,
2339        ack: frame::Ack,
2340    ) -> Result<(), TransportError> {
2341        if self.abandoned_paths.contains(&path) {
2342            // See also https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.4.3-3
2343            // > PATH_ACK frames received with an abandoned path ID are silently ignored, as specified in Section 4.
2344            trace!("silently ignoring PATH_ACK on abandoned path");
2345            return Ok(());
2346        }
2347        if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2348            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2349        }
2350        let new_largest = {
2351            let space = &mut self.spaces[space].for_path(path);
2352            if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2353                space.largest_acked_packet = Some(ack.largest);
2354                if let Some(info) = space.sent_packets.get(ack.largest) {
2355                    // This should always succeed, but a misbehaving peer might ACK a packet we
2356                    // haven't sent. At worst, that will result in us spuriously reducing the
2357                    // congestion window.
2358                    space.largest_acked_packet_sent = info.time_sent;
2359                }
2360                true
2361            } else {
2362                false
2363            }
2364        };
2365
2366        if self.detect_spurious_loss(&ack, space, path) {
2367            self.path_data_mut(path)
2368                .congestion
2369                .on_spurious_congestion_event();
2370        }
2371
2372        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
2373        let mut newly_acked = ArrayRangeSet::new();
2374        for range in ack.iter() {
2375            self.spaces[space].for_path(path).check_ack(range.clone())?;
2376            for (pn, _) in self.spaces[space]
2377                .for_path(path)
2378                .sent_packets
2379                .iter_range(range)
2380            {
2381                newly_acked.insert_one(pn);
2382            }
2383        }
2384
2385        if newly_acked.is_empty() {
2386            return Ok(());
2387        }
2388
2389        let mut ack_eliciting_acked = false;
2390        for packet in newly_acked.elts() {
2391            if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2392                for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2393                    // Assume ACKs for all packets below the largest acknowledged in
2394                    // `packet` have been received. This can cause the peer to spuriously
2395                    // retransmit if some of our earlier ACKs were lost, but allows for
2396                    // simpler state tracking. See discussion at
2397                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
2398                    if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2399                        pns.pending_acks.subtract_below(*acked_pn);
2400                    }
2401                }
2402                ack_eliciting_acked |= info.ack_eliciting;
2403
2404                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
2405                let path_data = self.path_data_mut(path);
2406                let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2407                if mtu_updated {
2408                    path_data
2409                        .congestion
2410                        .on_mtu_update(path_data.mtud.current_mtu());
2411                }
2412
2413                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
2414                self.ack_frequency.on_acked(path, packet);
2415
2416                self.on_packet_acked(now, path, info);
2417            }
2418        }
2419
2420        let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2421        let app_limited = self.app_limited;
2422        let path_data = self.path_data_mut(path);
2423        let in_flight = path_data.in_flight.bytes;
2424
2425        path_data
2426            .congestion
2427            .on_end_acks(now, in_flight, app_limited, largest_ackd);
2428
2429        if new_largest && ack_eliciting_acked {
2430            let ack_delay = if space != SpaceId::Data {
2431                Duration::from_micros(0)
2432            } else {
2433                cmp::min(
2434                    self.ack_frequency.peer_max_ack_delay,
2435                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2436                )
2437            };
2438            let rtt = now.saturating_duration_since(
2439                self.spaces[space].for_path(path).largest_acked_packet_sent,
2440            );
2441
2442            let next_pn = self.spaces[space].for_path(path).next_packet_number;
2443            let path_data = self.path_data_mut(path);
2444            // TODO(@divma): should be a method of path, should be contained in a single place
2445            path_data.rtt.update(ack_delay, rtt);
2446            if path_data.first_packet_after_rtt_sample.is_none() {
2447                path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2448            }
2449        }
2450
2451        // Must be called before crypto/pto_count are clobbered
2452        self.detect_lost_packets(now, space, path, true);
2453
2454        if self.peer_completed_address_validation(path) {
2455            self.path_data_mut(path).pto_count = 0;
2456        }
2457
2458        // Explicit congestion notification
2459        // TODO(@divma): this code is a good example of logic that should be contained in a single
2460        // place but it's split between the path data and the packet number space data, we should
2461        // find a way to make this work without two lookups
2462        if self.path_data(path).sending_ecn {
2463            if let Some(ecn) = ack.ecn {
2464                // We only examine ECN counters from ACKs that we are certain we received in transmit
2465                // order, allowing us to compute an increase in ECN counts to compare against the number
2466                // of newly acked packets that remains well-defined in the presence of arbitrary packet
2467                // reordering.
2468                if new_largest {
2469                    let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2470                    self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2471                }
2472            } else {
2473                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
2474                debug!("ECN not acknowledged by peer");
2475                self.path_data_mut(path).sending_ecn = false;
2476            }
2477        }
2478
2479        self.set_loss_detection_timer(now, path);
2480        Ok(())
2481    }
2482
2483    fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2484        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2485
2486        if lost_packets.is_empty() {
2487            return false;
2488        }
2489
2490        for range in ack.iter() {
2491            let spurious_losses: Vec<u64> = lost_packets
2492                .iter_range(range.clone())
2493                .map(|(pn, _info)| pn)
2494                .collect();
2495
2496            for pn in spurious_losses {
2497                lost_packets.remove(pn);
2498            }
2499        }
2500
2501        // If this ACK frame acknowledged all deemed lost packets,
2502        // then we have raised a spurious congestion event in the past.
2503        // We cannot conclude when there are remaining packets,
2504        // but future ACK frames might indicate a spurious loss detection.
2505        lost_packets.is_empty()
2506    }
2507
2508    /// Drain lost packets that we reasonably think will never arrive
2509    ///
2510    /// The current criterion is copied from `msquic`:
2511    /// discard packets that were sent earlier than 2 probe timeouts ago.
2512    fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2513        let two_pto = 2 * self.path_data(path).rtt.pto_base();
2514
2515        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2516        lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2517    }
2518
2519    /// Process a new ECN block from an in-order ACK
2520    fn process_ecn(
2521        &mut self,
2522        now: Instant,
2523        space: SpaceId,
2524        path: PathId,
2525        newly_acked: u64,
2526        ecn: frame::EcnCounts,
2527        largest_sent_time: Instant,
2528    ) {
2529        match self.spaces[space]
2530            .for_path(path)
2531            .detect_ecn(newly_acked, ecn)
2532        {
2533            Err(e) => {
2534                debug!("halting ECN due to verification failure: {}", e);
2535
2536                self.path_data_mut(path).sending_ecn = false;
2537                // Wipe out the existing value because it might be garbage and could interfere with
2538                // future attempts to use ECN on new paths.
2539                self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2540            }
2541            Ok(false) => {}
2542            Ok(true) => {
2543                self.path_stats.entry(path).or_default().congestion_events += 1;
2544                self.path_data_mut(path).congestion.on_congestion_event(
2545                    now,
2546                    largest_sent_time,
2547                    false,
2548                    true,
2549                    0,
2550                );
2551            }
2552        }
2553    }
2554
2555    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
2556    // high-latency handshakes
2557    fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2558        self.paths
2559            .get_mut(&path_id)
2560            .expect("known path")
2561            .remove_in_flight(&info);
2562        let app_limited = self.app_limited;
2563        let path = self.path_data_mut(path_id);
2564        if info.ack_eliciting && !path.is_validating_path() {
2565            // Only pass ACKs to the congestion controller if we are not validating the current
2566            // path, so as to ignore any ACKs from older paths still coming in.
2567            let rtt = path.rtt;
2568            path.congestion
2569                .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2570        }
2571
2572        // Update state for confirmed delivery of frames
2573        if let Some(retransmits) = info.retransmits.get() {
2574            for (id, _) in retransmits.reset_stream.iter() {
2575                self.streams.reset_acked(*id);
2576            }
2577        }
2578
2579        for frame in info.stream_frames {
2580            self.streams.received_ack_of(frame);
2581        }
2582    }
2583
2584    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2585        let start = if self.zero_rtt_crypto.is_some() {
2586            now
2587        } else {
2588            self.prev_crypto
2589                .as_ref()
2590                .expect("no previous keys")
2591                .end_packet
2592                .as_ref()
2593                .expect("update not acknowledged yet")
2594                .1
2595        };
2596
2597        // QUIC-MULTIPATH § 2.5 Key Phase Update Process: use largest PTO off all paths.
2598        self.timers.set(
2599            Timer::Conn(ConnTimer::KeyDiscard),
2600            start + self.pto_max_path(space, false) * 3,
2601            self.qlog.with_time(now),
2602        );
2603    }
2604
2605    /// Handle a [`PathTimer::LossDetection`] timeout.
2606    ///
2607    /// This timer expires for two reasons:
2608    /// - An ACK-eliciting packet we sent should be considered lost.
2609    /// - The PTO may have expired and a tail-loss probe needs to be scheduled.
2610    ///
2611    /// The former needs us to schedule re-transmission of the lost data.
2612    ///
2613    /// The latter means we have not received an ACK for an ack-eliciting packet we sent
2614    /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
2615    /// packet, to try and elicit new acknowledgements. These new acknowledgements will
2616    /// indicate whether the previously sent packets were lost or not.
2617    fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2618        if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2619            // Time threshold loss Detection
2620            self.detect_lost_packets(now, pn_space, path_id, false);
2621            self.set_loss_detection_timer(now, path_id);
2622            return;
2623        }
2624
2625        let (_, space) = match self.pto_time_and_space(now, path_id) {
2626            Some(x) => x,
2627            None => {
2628                error!(%path_id, "PTO expired while unset");
2629                return;
2630            }
2631        };
2632        trace!(
2633            in_flight = self.path_data(path_id).in_flight.bytes,
2634            count = self.path_data(path_id).pto_count,
2635            ?space,
2636            %path_id,
2637            "PTO fired"
2638        );
2639
2640        let count = match self.path_data(path_id).in_flight.ack_eliciting {
2641            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
2642            // deadlock preventions
2643            0 => {
2644                debug_assert!(!self.peer_completed_address_validation(path_id));
2645                1
2646            }
2647            // Conventional loss probe
2648            _ => 2,
2649        };
2650        let pns = self.spaces[space].for_path(path_id);
2651        pns.loss_probes = pns.loss_probes.saturating_add(count);
2652        let path_data = self.path_data_mut(path_id);
2653        path_data.pto_count = path_data.pto_count.saturating_add(1);
2654        self.set_loss_detection_timer(now, path_id);
2655    }
2656
2657    /// Detect any lost packets
2658    ///
2659    /// There are two cases in which we detects lost packets:
2660    ///
2661    /// - We received an ACK packet.
2662    /// - The [`PathTimer::LossDetection`] timer expired. So there is an un-acknowledged packet
2663    ///   that was followed by an acknowledged packet. The loss timer for this
2664    ///   un-acknowledged packet expired and we need to detect that packet as lost.
2665    ///
2666    /// Packets are lost if they are both (See RFC9002 §6.1):
2667    ///
2668    /// - Unacknowledged, in flight and sent prior to an acknowledged packet.
2669    /// - Old enough by either:
2670    ///   - Having a packet number [`TransportConfig::packet_threshold`] lower then the last
2671    ///     acknowledged packet.
2672    ///   - Being sent [`TransportConfig::time_threshold`] * RTT in the past.
2673    fn detect_lost_packets(
2674        &mut self,
2675        now: Instant,
2676        pn_space: SpaceId,
2677        path_id: PathId,
2678        due_to_ack: bool,
2679    ) {
2680        let mut lost_packets = Vec::<u64>::new();
2681        let mut lost_mtu_probe = None;
2682        let mut in_persistent_congestion = false;
2683        let mut size_of_lost_packets = 0u64;
2684        self.spaces[pn_space].for_path(path_id).loss_time = None;
2685
2686        // Find all the lost packets, populating all variables initialised above.
2687
2688        let path = self.path_data(path_id);
2689        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2690        let loss_delay = path
2691            .rtt
2692            .conservative()
2693            .mul_f32(self.config.time_threshold)
2694            .max(TIMER_GRANULARITY);
2695        let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2696
2697        let largest_acked_packet = self.spaces[pn_space]
2698            .for_path(path_id)
2699            .largest_acked_packet
2700            .expect("detect_lost_packets only to be called if path received at least one ACK");
2701        let packet_threshold = self.config.packet_threshold as u64;
2702
2703        // InPersistentCongestion: Determine if all packets in the time period before the newest
2704        // lost packet, including the edges, are marked lost. PTO computation must always
2705        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
2706        let congestion_period = self
2707            .pto(SpaceId::Data, path_id)
2708            .saturating_mul(self.config.persistent_congestion_threshold);
2709        let mut persistent_congestion_start: Option<Instant> = None;
2710        let mut prev_packet = None;
2711        let space = self.spaces[pn_space].for_path(path_id);
2712
2713        for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2714            if prev_packet != Some(packet.wrapping_sub(1)) {
2715                // An intervening packet was acknowledged
2716                persistent_congestion_start = None;
2717            }
2718
2719            // Packets sent before now - loss_delay are deemed lost.
2720            // However, we avoid subtraction as it can panic and there's no
2721            // saturating equivalent of this subtraction operation with a Duration.
2722            let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2723            if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2724                // The packet should be declared lost.
2725                if Some(packet) == in_flight_mtu_probe {
2726                    // Lost MTU probes are not included in `lost_packets`, because they
2727                    // should not trigger a congestion control response
2728                    lost_mtu_probe = in_flight_mtu_probe;
2729                } else {
2730                    lost_packets.push(packet);
2731                    size_of_lost_packets += info.size as u64;
2732                    if info.ack_eliciting && due_to_ack {
2733                        match persistent_congestion_start {
2734                            // Two ACK-eliciting packets lost more than
2735                            // congestion_period apart, with no ACKed packets in between
2736                            Some(start) if info.time_sent - start > congestion_period => {
2737                                in_persistent_congestion = true;
2738                            }
2739                            // Persistent congestion must start after the first RTT sample
2740                            None if first_packet_after_rtt_sample
2741                                .is_some_and(|x| x < (pn_space, packet)) =>
2742                            {
2743                                persistent_congestion_start = Some(info.time_sent);
2744                            }
2745                            _ => {}
2746                        }
2747                    }
2748                }
2749            } else {
2750                // The packet should not yet be declared lost.
2751                if space.loss_time.is_none() {
2752                    // Since we iterate in order the lowest packet number's loss time will
2753                    // always be the earliest.
2754                    space.loss_time = Some(info.time_sent + loss_delay);
2755                }
2756                persistent_congestion_start = None;
2757            }
2758
2759            prev_packet = Some(packet);
2760        }
2761
2762        self.handle_lost_packets(
2763            pn_space,
2764            path_id,
2765            now,
2766            lost_packets,
2767            lost_mtu_probe,
2768            loss_delay,
2769            in_persistent_congestion,
2770            size_of_lost_packets,
2771        );
2772    }
2773
2774    /// Drops the path state, declaring any remaining in-flight packets as lost
2775    fn discard_path(&mut self, path_id: PathId, now: Instant) {
2776        trace!(%path_id, "dropping path state");
2777        let path = self.path_data(path_id);
2778        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2779
2780        let mut size_of_lost_packets = 0u64; // add to path_stats.lost_bytes;
2781        let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2782            .for_path(path_id)
2783            .sent_packets
2784            .iter()
2785            .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2786            .map(|(pn, info)| {
2787                size_of_lost_packets += info.size as u64;
2788                pn
2789            })
2790            .collect();
2791
2792        if !lost_pns.is_empty() {
2793            trace!(
2794                %path_id,
2795                count = lost_pns.len(),
2796                lost_bytes = size_of_lost_packets,
2797                "packets lost on path abandon"
2798            );
2799            self.handle_lost_packets(
2800                SpaceId::Data,
2801                path_id,
2802                now,
2803                lost_pns,
2804                in_flight_mtu_probe,
2805                Duration::ZERO,
2806                false,
2807                size_of_lost_packets,
2808            );
2809        }
2810        self.paths.remove(&path_id);
2811        self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2812
2813        let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2814        self.events.push_back(
2815            PathEvent::Abandoned {
2816                id: path_id,
2817                path_stats,
2818            }
2819            .into(),
2820        );
2821    }
2822
2823    fn handle_lost_packets(
2824        &mut self,
2825        pn_space: SpaceId,
2826        path_id: PathId,
2827        now: Instant,
2828        lost_packets: Vec<u64>,
2829        lost_mtu_probe: Option<u64>,
2830        loss_delay: Duration,
2831        in_persistent_congestion: bool,
2832        size_of_lost_packets: u64,
2833    ) {
2834        debug_assert!(
2835            {
2836                let mut sorted = lost_packets.clone();
2837                sorted.sort();
2838                sorted == lost_packets
2839            },
2840            "lost_packets must be sorted"
2841        );
2842
2843        self.drain_lost_packets(now, pn_space, path_id);
2844
2845        // OnPacketsLost
2846        if let Some(largest_lost) = lost_packets.last().cloned() {
2847            let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2848            let largest_lost_sent = self.spaces[pn_space]
2849                .for_path(path_id)
2850                .sent_packets
2851                .get(largest_lost)
2852                .unwrap()
2853                .time_sent;
2854            let path_stats = self.path_stats.entry(path_id).or_default();
2855            path_stats.lost_packets += lost_packets.len() as u64;
2856            path_stats.lost_bytes += size_of_lost_packets;
2857            trace!(
2858                %path_id,
2859                count = lost_packets.len(),
2860                lost_bytes = size_of_lost_packets,
2861                "packets lost",
2862            );
2863
2864            for &packet in &lost_packets {
2865                let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2866                    continue;
2867                };
2868                self.qlog
2869                    .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2870                self.paths
2871                    .get_mut(&path_id)
2872                    .unwrap()
2873                    .remove_in_flight(&info);
2874
2875                for frame in info.stream_frames {
2876                    self.streams.retransmit(frame);
2877                }
2878                self.spaces[pn_space].pending |= info.retransmits;
2879                self.path_data_mut(path_id)
2880                    .mtud
2881                    .on_non_probe_lost(packet, info.size);
2882
2883                self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2884                    packet,
2885                    LostPacket {
2886                        time_sent: info.time_sent,
2887                    },
2888                );
2889            }
2890
2891            let path = self.path_data_mut(path_id);
2892            if path.mtud.black_hole_detected(now) {
2893                path.congestion.on_mtu_update(path.mtud.current_mtu());
2894                if let Some(max_datagram_size) = self.datagrams().max_size() {
2895                    self.datagrams.drop_oversized(max_datagram_size);
2896                }
2897                self.path_stats
2898                    .entry(path_id)
2899                    .or_default()
2900                    .black_holes_detected += 1;
2901            }
2902
2903            // Don't apply congestion penalty for lost ack-only packets
2904            let lost_ack_eliciting =
2905                old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2906
2907            if lost_ack_eliciting {
2908                self.path_stats
2909                    .entry(path_id)
2910                    .or_default()
2911                    .congestion_events += 1;
2912                self.path_data_mut(path_id).congestion.on_congestion_event(
2913                    now,
2914                    largest_lost_sent,
2915                    in_persistent_congestion,
2916                    false,
2917                    size_of_lost_packets,
2918                );
2919            }
2920        }
2921
2922        // Handle a lost MTU probe
2923        if let Some(packet) = lost_mtu_probe {
2924            let info = self.spaces[SpaceId::Data]
2925                .for_path(path_id)
2926                .take(packet)
2927                .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and
2928            // therefore must not have been removed yet
2929            self.paths
2930                .get_mut(&path_id)
2931                .unwrap()
2932                .remove_in_flight(&info);
2933            self.path_data_mut(path_id).mtud.on_probe_lost();
2934            self.path_stats
2935                .entry(path_id)
2936                .or_default()
2937                .lost_plpmtud_probes += 1;
2938        }
2939    }
2940
2941    /// Returns the earliest time packets should be declared lost for all spaces on a path.
2942    ///
2943    /// If a path has an acknowledged packet with any prior un-acknowledged packets, the
2944    /// earliest un-acknowledged packet can be declared lost after a timeout has elapsed.
2945    /// The time returned is when this packet should be declared lost.
2946    fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2947        SpaceId::iter()
2948            .filter_map(|id| {
2949                self.spaces[id]
2950                    .number_spaces
2951                    .get(&path_id)
2952                    .and_then(|pns| pns.loss_time)
2953                    .map(|time| (time, id))
2954            })
2955            .min_by_key(|&(time, _)| time)
2956    }
2957
2958    /// Returns the earliest next PTO should fire for all spaces on a path.
2959    fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2960        let path = self.path(path_id)?;
2961        let pto_count = path.pto_count;
2962        let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2963        let mut duration = path.rtt.pto_base() * backoff;
2964
2965        if path_id == PathId::ZERO
2966            && path.in_flight.ack_eliciting == 0
2967            && !self.peer_completed_address_validation(PathId::ZERO)
2968        {
2969            // Address Validation during Connection Establishment:
2970            // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
2971            // deadlock if an Initial or Handshake packet from the server is lost and the
2972            // server can not send more due to its anti-amplification limit the client must
2973            // send another packet on PTO.
2974            let space = match self.highest_space {
2975                SpaceId::Handshake => SpaceId::Handshake,
2976                _ => SpaceId::Initial,
2977            };
2978
2979            return Some((now + duration, space));
2980        }
2981
2982        let mut result = None;
2983        for space in SpaceId::iter() {
2984            let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2985                continue;
2986            };
2987
2988            if !pns.has_in_flight() {
2989                continue;
2990            }
2991            if space == SpaceId::Data {
2992                // Skip ApplicationData until handshake completes.
2993                if self.is_handshaking() {
2994                    return result;
2995                }
2996                // Include max_ack_delay and backoff for ApplicationData.
2997                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2998            }
2999            let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3000                continue;
3001            };
3002            let pto = last_ack_eliciting + duration;
3003            if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3004                if path.anti_amplification_blocked(1) {
3005                    // Nothing would be able to be sent.
3006                    continue;
3007                }
3008                if path.in_flight.ack_eliciting == 0 {
3009                    // Nothing ack-eliciting, no PTO to arm/fire.
3010                    continue;
3011                }
3012                result = Some((pto, space));
3013            }
3014        }
3015        result
3016    }
3017
3018    fn peer_completed_address_validation(&self, path: PathId) -> bool {
3019        // TODO(flub): This logic needs updating for multipath
3020        if self.side.is_server() || self.state.is_closed() {
3021            return true;
3022        }
3023        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
3024        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
3025        self.spaces[SpaceId::Handshake]
3026            .path_space(PathId::ZERO)
3027            .and_then(|pns| pns.largest_acked_packet)
3028            .is_some()
3029            || self.spaces[SpaceId::Data]
3030                .path_space(path)
3031                .and_then(|pns| pns.largest_acked_packet)
3032                .is_some()
3033            || (self.spaces[SpaceId::Data].crypto.is_some()
3034                && self.spaces[SpaceId::Handshake].crypto.is_none())
3035    }
3036
3037    /// Resets the the [`PathTimer::LossDetection`] timer to the next instant it may be needed
3038    ///
3039    /// The timer must fire if either:
3040    /// - An ack-eliciting packet we sent needs to be declared lost.
3041    /// - A tail-loss probe needs to be sent.
3042    ///
3043    /// See [`Connection::on_loss_detection_timeout`] for details.
3044    fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3045        if self.state.is_closed() {
3046            // No loss detection takes place on closed connections, and `close_common` already
3047            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
3048            // reordered packet being handled by state-insensitive code.
3049            return;
3050        }
3051
3052        if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3053            // Time threshold loss detection.
3054            self.timers.set(
3055                Timer::PerPath(path_id, PathTimer::LossDetection),
3056                loss_time,
3057                self.qlog.with_time(now),
3058            );
3059            return;
3060        }
3061
3062        // Determine which PN space to arm PTO for.
3063        // Calculate PTO duration
3064        if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3065            self.timers.set(
3066                Timer::PerPath(path_id, PathTimer::LossDetection),
3067                timeout,
3068                self.qlog.with_time(now),
3069            );
3070        } else {
3071            self.timers.stop(
3072                Timer::PerPath(path_id, PathTimer::LossDetection),
3073                self.qlog.with_time(now),
3074            );
3075        }
3076    }
3077
3078    /// The maximum probe timeout across all paths
3079    ///
3080    /// If `is_closing` is set to `true` it will filter out paths that have not yet been used.
3081    ///
3082    /// See [`Connection::pto`]
3083    fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3084        match space {
3085            SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3086            SpaceId::Data => self
3087                .paths
3088                .iter()
3089                .filter_map(|(path_id, state)| {
3090                    if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3091                        // If we are closing and haven't sent anything yet, do not include
3092                        None
3093                    } else {
3094                        let pto = self.pto(space, *path_id);
3095                        Some(pto)
3096                    }
3097                })
3098                .max()
3099                .expect("there should be one at least path"),
3100        }
3101    }
3102
3103    /// Probe Timeout
3104    ///
3105    /// The PTO is logically the time in which you'd expect to receive an acknowledgement
3106    /// for a packet. So approximately RTT + max_ack_delay.
3107    fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3108        let max_ack_delay = match space {
3109            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3110            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3111        };
3112        self.path_data(path_id).rtt.pto_base() + max_ack_delay
3113    }
3114
3115    fn on_packet_authenticated(
3116        &mut self,
3117        now: Instant,
3118        space_id: SpaceId,
3119        path_id: PathId,
3120        ecn: Option<EcnCodepoint>,
3121        packet: Option<u64>,
3122        spin: bool,
3123        is_1rtt: bool,
3124    ) {
3125        self.total_authed_packets += 1;
3126        if let Some(last_allowed_receive) = self
3127            .paths
3128            .get(&path_id)
3129            .and_then(|path| path.data.last_allowed_receive)
3130        {
3131            if now > last_allowed_receive {
3132                warn!("received data on path which we abandoned more than 3 * PTO ago");
3133                // The peer failed to respond with a PATH_ABANDON in time.
3134                if !self.state.is_closed() {
3135                    // TODO(flub): What should the error code be?
3136                    self.state.move_to_closed(TransportError::NO_ERROR(
3137                        "peer failed to respond with PATH_ABANDON in time",
3138                    ));
3139                    self.close_common();
3140                    self.set_close_timer(now);
3141                    self.close = true;
3142                }
3143                return;
3144            }
3145        }
3146
3147        self.reset_keep_alive(path_id, now);
3148        self.reset_idle_timeout(now, space_id, path_id);
3149        self.permit_idle_reset = true;
3150        self.receiving_ecn |= ecn.is_some();
3151        if let Some(x) = ecn {
3152            let space = &mut self.spaces[space_id];
3153            space.for_path(path_id).ecn_counters += x;
3154
3155            if x.is_ce() {
3156                space
3157                    .for_path(path_id)
3158                    .pending_acks
3159                    .set_immediate_ack_required();
3160            }
3161        }
3162
3163        let packet = match packet {
3164            Some(x) => x,
3165            None => return,
3166        };
3167        match &self.side {
3168            ConnectionSide::Client { .. } => {
3169                // If we received a handshake packet that authenticated, then we're talking to
3170                // the real server.  From now on we should no longer allow the server to migrate
3171                // its address.
3172                if space_id == SpaceId::Handshake {
3173                    if let Some(hs) = self.state.as_handshake_mut() {
3174                        hs.allow_server_migration = false;
3175                    }
3176                }
3177            }
3178            ConnectionSide::Server { .. } => {
3179                if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3180                {
3181                    // A server stops sending and processing Initial packets when it receives its first Handshake packet.
3182                    self.discard_space(now, SpaceId::Initial);
3183                }
3184                if self.zero_rtt_crypto.is_some() && is_1rtt {
3185                    // Discard 0-RTT keys soon after receiving a 1-RTT packet
3186                    self.set_key_discard_timer(now, space_id)
3187                }
3188            }
3189        }
3190        let space = self.spaces[space_id].for_path(path_id);
3191        space.pending_acks.insert_one(packet, now);
3192        if packet >= space.rx_packet.unwrap_or_default() {
3193            space.rx_packet = Some(packet);
3194            // Update outgoing spin bit, inverting iff we're the client
3195            self.spin = self.side.is_client() ^ spin;
3196        }
3197    }
3198
3199    /// Resets the idle timeout timers
3200    ///
3201    /// Without multipath there is only the connection-wide idle timeout. When multipath is
3202    /// enabled there is an additional per-path idle timeout.
3203    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3204        // First reset the global idle timeout.
3205        if let Some(timeout) = self.idle_timeout {
3206            if self.state.is_closed() {
3207                self.timers
3208                    .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3209            } else {
3210                let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3211                self.timers.set(
3212                    Timer::Conn(ConnTimer::Idle),
3213                    now + dt,
3214                    self.qlog.with_time(now),
3215                );
3216            }
3217        }
3218
3219        // Now handle the per-path state
3220        if let Some(timeout) = self.path_data(path_id).idle_timeout {
3221            if self.state.is_closed() {
3222                self.timers.stop(
3223                    Timer::PerPath(path_id, PathTimer::PathIdle),
3224                    self.qlog.with_time(now),
3225                );
3226            } else {
3227                let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3228                self.timers.set(
3229                    Timer::PerPath(path_id, PathTimer::PathIdle),
3230                    now + dt,
3231                    self.qlog.with_time(now),
3232                );
3233            }
3234        }
3235    }
3236
3237    /// Resets both the [`ConnTimer::KeepAlive`] and [`PathTimer::PathKeepAlive`] timers
3238    fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3239        if !self.state.is_established() {
3240            return;
3241        }
3242
3243        if let Some(interval) = self.config.keep_alive_interval {
3244            self.timers.set(
3245                Timer::Conn(ConnTimer::KeepAlive),
3246                now + interval,
3247                self.qlog.with_time(now),
3248            );
3249        }
3250
3251        if let Some(interval) = self.path_data(path_id).keep_alive {
3252            self.timers.set(
3253                Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3254                now + interval,
3255                self.qlog.with_time(now),
3256            );
3257        }
3258    }
3259
3260    /// Sets the timer for when a previously issued CID should be retired next
3261    fn reset_cid_retirement(&mut self, now: Instant) {
3262        if let Some((_path, t)) = self.next_cid_retirement() {
3263            self.timers.set(
3264                Timer::Conn(ConnTimer::PushNewCid),
3265                t,
3266                self.qlog.with_time(now),
3267            );
3268        }
3269    }
3270
3271    /// The next time when a previously issued CID should be retired
3272    fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3273        self.local_cid_state
3274            .iter()
3275            .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3276            .min_by_key(|(_path_id, timeout)| *timeout)
3277    }
3278
3279    /// Handle the already-decrypted first packet from the client
3280    ///
3281    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
3282    /// efficient.
3283    pub(crate) fn handle_first_packet(
3284        &mut self,
3285        now: Instant,
3286        remote: SocketAddr,
3287        ecn: Option<EcnCodepoint>,
3288        packet_number: u64,
3289        packet: InitialPacket,
3290        remaining: Option<BytesMut>,
3291    ) -> Result<(), ConnectionError> {
3292        let span = trace_span!("first recv");
3293        let _guard = span.enter();
3294        debug_assert!(self.side.is_server());
3295        let len = packet.header_data.len() + packet.payload.len();
3296        let path_id = PathId::ZERO;
3297        self.path_data_mut(path_id).total_recvd = len as u64;
3298
3299        if let Some(hs) = self.state.as_handshake_mut() {
3300            hs.expected_token = packet.header.token.clone();
3301        } else {
3302            unreachable!("first packet must be delivered in Handshake state");
3303        }
3304
3305        // The first packet is always on PathId::ZERO
3306        self.on_packet_authenticated(
3307            now,
3308            SpaceId::Initial,
3309            path_id,
3310            ecn,
3311            Some(packet_number),
3312            false,
3313            false,
3314        );
3315
3316        let packet: Packet = packet.into();
3317
3318        let mut qlog = QlogRecvPacket::new(len);
3319        qlog.header(&packet.header, Some(packet_number), path_id);
3320
3321        self.process_decrypted_packet(
3322            now,
3323            remote,
3324            path_id,
3325            Some(packet_number),
3326            packet,
3327            &mut qlog,
3328        )?;
3329        self.qlog.emit_packet_received(qlog, now);
3330        if let Some(data) = remaining {
3331            self.handle_coalesced(now, remote, path_id, ecn, data);
3332        }
3333
3334        self.qlog.emit_recovery_metrics(
3335            path_id,
3336            &mut self.paths.get_mut(&path_id).unwrap().data,
3337            now,
3338        );
3339
3340        Ok(())
3341    }
3342
3343    fn init_0rtt(&mut self, now: Instant) {
3344        let (header, packet) = match self.crypto.early_crypto() {
3345            Some(x) => x,
3346            None => return,
3347        };
3348        if self.side.is_client() {
3349            match self.crypto.transport_parameters() {
3350                Ok(params) => {
3351                    let params = params
3352                        .expect("crypto layer didn't supply transport parameters with ticket");
3353                    // Certain values must not be cached
3354                    let params = TransportParameters {
3355                        initial_src_cid: None,
3356                        original_dst_cid: None,
3357                        preferred_address: None,
3358                        retry_src_cid: None,
3359                        stateless_reset_token: None,
3360                        min_ack_delay: None,
3361                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3362                        max_ack_delay: TransportParameters::default().max_ack_delay,
3363                        initial_max_path_id: None,
3364                        ..params
3365                    };
3366                    self.set_peer_params(params);
3367                    self.qlog.emit_peer_transport_params_restored(self, now);
3368                }
3369                Err(e) => {
3370                    error!("session ticket has malformed transport parameters: {}", e);
3371                    return;
3372                }
3373            }
3374        }
3375        trace!("0-RTT enabled");
3376        self.zero_rtt_enabled = true;
3377        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3378    }
3379
3380    fn read_crypto(
3381        &mut self,
3382        space: SpaceId,
3383        crypto: &frame::Crypto,
3384        payload_len: usize,
3385    ) -> Result<(), TransportError> {
3386        let expected = if !self.state.is_handshake() {
3387            SpaceId::Data
3388        } else if self.highest_space == SpaceId::Initial {
3389            SpaceId::Initial
3390        } else {
3391            // On the server, self.highest_space can be Data after receiving the client's first
3392            // flight, but we expect Handshake CRYPTO until the handshake is complete.
3393            SpaceId::Handshake
3394        };
3395        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
3396        // packets are illegal, and we don't process 1-RTT packets until the handshake is
3397        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
3398        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3399
3400        let end = crypto.offset + crypto.data.len() as u64;
3401        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3402            warn!(
3403                "received new {:?} CRYPTO data when expecting {:?}",
3404                space, expected
3405            );
3406            return Err(TransportError::PROTOCOL_VIOLATION(
3407                "new data at unexpected encryption level",
3408            ));
3409        }
3410
3411        let space = &mut self.spaces[space];
3412        let max = end.saturating_sub(space.crypto_stream.bytes_read());
3413        if max > self.config.crypto_buffer_size as u64 {
3414            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3415        }
3416
3417        space
3418            .crypto_stream
3419            .insert(crypto.offset, crypto.data.clone(), payload_len);
3420        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3421            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3422            if self.crypto.read_handshake(&chunk.bytes)? {
3423                self.events.push_back(Event::HandshakeDataReady);
3424            }
3425        }
3426
3427        Ok(())
3428    }
3429
3430    fn write_crypto(&mut self) {
3431        loop {
3432            let space = self.highest_space;
3433            let mut outgoing = Vec::new();
3434            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3435                match space {
3436                    SpaceId::Initial => {
3437                        self.upgrade_crypto(SpaceId::Handshake, crypto);
3438                    }
3439                    SpaceId::Handshake => {
3440                        self.upgrade_crypto(SpaceId::Data, crypto);
3441                    }
3442                    _ => unreachable!("got updated secrets during 1-RTT"),
3443                }
3444            }
3445            if outgoing.is_empty() {
3446                if space == self.highest_space {
3447                    break;
3448                } else {
3449                    // Keys updated, check for more data to send
3450                    continue;
3451                }
3452            }
3453            let offset = self.spaces[space].crypto_offset;
3454            let outgoing = Bytes::from(outgoing);
3455            if let Some(hs) = self.state.as_handshake_mut() {
3456                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3457                    hs.client_hello = Some(outgoing.clone());
3458                }
3459            }
3460            self.spaces[space].crypto_offset += outgoing.len() as u64;
3461            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3462            self.spaces[space].pending.crypto.push_back(frame::Crypto {
3463                offset,
3464                data: outgoing,
3465            });
3466        }
3467    }
3468
3469    /// Switch to stronger cryptography during handshake
3470    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3471        debug_assert!(
3472            self.spaces[space].crypto.is_none(),
3473            "already reached packet space {space:?}"
3474        );
3475        trace!("{:?} keys ready", space);
3476        if space == SpaceId::Data {
3477            // Precompute the first key update
3478            self.next_crypto = Some(
3479                self.crypto
3480                    .next_1rtt_keys()
3481                    .expect("handshake should be complete"),
3482            );
3483        }
3484
3485        self.spaces[space].crypto = Some(crypto);
3486        debug_assert!(space as usize > self.highest_space as usize);
3487        self.highest_space = space;
3488        if space == SpaceId::Data && self.side.is_client() {
3489            // Discard 0-RTT keys because 1-RTT keys are available.
3490            self.zero_rtt_crypto = None;
3491        }
3492    }
3493
3494    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3495        debug_assert!(space_id != SpaceId::Data);
3496        trace!("discarding {:?} keys", space_id);
3497        if space_id == SpaceId::Initial {
3498            // No longer needed
3499            if let ConnectionSide::Client { token, .. } = &mut self.side {
3500                *token = Bytes::new();
3501            }
3502        }
3503        let space = &mut self.spaces[space_id];
3504        space.crypto = None;
3505        let pns = space.for_path(PathId::ZERO);
3506        pns.time_of_last_ack_eliciting_packet = None;
3507        pns.loss_time = None;
3508        pns.loss_probes = 0;
3509        let sent_packets = mem::take(&mut pns.sent_packets);
3510        let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3511        for (_, packet) in sent_packets.into_iter() {
3512            path.data.remove_in_flight(&packet);
3513        }
3514
3515        self.set_loss_detection_timer(now, PathId::ZERO)
3516    }
3517
3518    fn handle_coalesced(
3519        &mut self,
3520        now: Instant,
3521        remote: SocketAddr,
3522        path_id: PathId,
3523        ecn: Option<EcnCodepoint>,
3524        data: BytesMut,
3525    ) {
3526        self.path_data_mut(path_id)
3527            .inc_total_recvd(data.len() as u64);
3528        let mut remaining = Some(data);
3529        let cid_len = self
3530            .local_cid_state
3531            .values()
3532            .map(|cid_state| cid_state.cid_len())
3533            .next()
3534            .expect("one cid_state must exist");
3535        while let Some(data) = remaining {
3536            match PartialDecode::new(
3537                data,
3538                &FixedLengthConnectionIdParser::new(cid_len),
3539                &[self.version],
3540                self.endpoint_config.grease_quic_bit,
3541            ) {
3542                Ok((partial_decode, rest)) => {
3543                    remaining = rest;
3544                    self.handle_decode(now, remote, path_id, ecn, partial_decode);
3545                }
3546                Err(e) => {
3547                    trace!("malformed header: {}", e);
3548                    return;
3549                }
3550            }
3551        }
3552    }
3553
3554    fn handle_decode(
3555        &mut self,
3556        now: Instant,
3557        remote: SocketAddr,
3558        path_id: PathId,
3559        ecn: Option<EcnCodepoint>,
3560        partial_decode: PartialDecode,
3561    ) {
3562        let qlog = QlogRecvPacket::new(partial_decode.len());
3563        if let Some(decoded) = packet_crypto::unprotect_header(
3564            partial_decode,
3565            &self.spaces,
3566            self.zero_rtt_crypto.as_ref(),
3567            self.peer_params.stateless_reset_token,
3568        ) {
3569            self.handle_packet(
3570                now,
3571                remote,
3572                path_id,
3573                ecn,
3574                decoded.packet,
3575                decoded.stateless_reset,
3576                qlog,
3577            );
3578        }
3579    }
3580
3581    fn handle_packet(
3582        &mut self,
3583        now: Instant,
3584        remote: SocketAddr,
3585        path_id: PathId,
3586        ecn: Option<EcnCodepoint>,
3587        packet: Option<Packet>,
3588        stateless_reset: bool,
3589        mut qlog: QlogRecvPacket,
3590    ) {
3591        self.stats.udp_rx.ios += 1;
3592        if let Some(ref packet) = packet {
3593            trace!(
3594                "got {:?} packet ({} bytes) from {} using id {}",
3595                packet.header.space(),
3596                packet.payload.len() + packet.header_data.len(),
3597                remote,
3598                packet.header.dst_cid(),
3599            );
3600        }
3601
3602        if self.is_handshaking() {
3603            if path_id != PathId::ZERO {
3604                debug!(%remote, %path_id, "discarding multipath packet during handshake");
3605                return;
3606            }
3607            if remote != self.path_data_mut(path_id).remote {
3608                if let Some(hs) = self.state.as_handshake() {
3609                    if hs.allow_server_migration {
3610                        trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3611                        self.path_data_mut(path_id).remote = remote;
3612                        self.qlog.emit_tuple_assigned(path_id, remote, now);
3613                    } else {
3614                        debug!("discarding packet with unexpected remote during handshake");
3615                        return;
3616                    }
3617                } else {
3618                    debug!("discarding packet with unexpected remote during handshake");
3619                    return;
3620                }
3621            }
3622        }
3623
3624        let was_closed = self.state.is_closed();
3625        let was_drained = self.state.is_drained();
3626
3627        let decrypted = match packet {
3628            None => Err(None),
3629            Some(mut packet) => self
3630                .decrypt_packet(now, path_id, &mut packet)
3631                .map(move |number| (packet, number)),
3632        };
3633        let result = match decrypted {
3634            _ if stateless_reset => {
3635                debug!("got stateless reset");
3636                Err(ConnectionError::Reset)
3637            }
3638            Err(Some(e)) => {
3639                warn!("illegal packet: {}", e);
3640                Err(e.into())
3641            }
3642            Err(None) => {
3643                debug!("failed to authenticate packet");
3644                self.authentication_failures += 1;
3645                let integrity_limit = self.spaces[self.highest_space]
3646                    .crypto
3647                    .as_ref()
3648                    .unwrap()
3649                    .packet
3650                    .local
3651                    .integrity_limit();
3652                if self.authentication_failures > integrity_limit {
3653                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3654                } else {
3655                    return;
3656                }
3657            }
3658            Ok((packet, number)) => {
3659                qlog.header(&packet.header, number, path_id);
3660                let span = match number {
3661                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3662                    None => trace_span!("recv", space = ?packet.header.space()),
3663                };
3664                let _guard = span.enter();
3665
3666                let dedup = self.spaces[packet.header.space()]
3667                    .path_space_mut(path_id)
3668                    .map(|pns| &mut pns.dedup);
3669                if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3670                    debug!("discarding possible duplicate packet");
3671                    self.qlog.emit_packet_received(qlog, now);
3672                    return;
3673                } else if self.state.is_handshake() && packet.header.is_short() {
3674                    // TODO: SHOULD buffer these to improve reordering tolerance.
3675                    trace!("dropping short packet during handshake");
3676                    self.qlog.emit_packet_received(qlog, now);
3677                    return;
3678                } else {
3679                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3680                        if let Some(hs) = self.state.as_handshake() {
3681                            if self.side.is_server() && token != &hs.expected_token {
3682                                // Clients must send the same retry token in every Initial. Initial
3683                                // packets can be spoofed, so we discard rather than killing the
3684                                // connection.
3685                                warn!("discarding Initial with invalid retry token");
3686                                self.qlog.emit_packet_received(qlog, now);
3687                                return;
3688                            }
3689                        }
3690                    }
3691
3692                    if !self.state.is_closed() {
3693                        let spin = match packet.header {
3694                            Header::Short { spin, .. } => spin,
3695                            _ => false,
3696                        };
3697
3698                        if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3699                            // Only the client is allowed to open paths
3700                            self.ensure_path(path_id, remote, now, number);
3701                        }
3702                        if self.paths.contains_key(&path_id) {
3703                            self.on_packet_authenticated(
3704                                now,
3705                                packet.header.space(),
3706                                path_id,
3707                                ecn,
3708                                number,
3709                                spin,
3710                                packet.header.is_1rtt(),
3711                            );
3712                        }
3713                    }
3714
3715                    let res = self
3716                        .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3717
3718                    self.qlog.emit_packet_received(qlog, now);
3719                    res
3720                }
3721            }
3722        };
3723
3724        // State transitions for error cases
3725        if let Err(conn_err) = result {
3726            match conn_err {
3727                ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3728                ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3729                ConnectionError::Reset
3730                | ConnectionError::TransportError(TransportError {
3731                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
3732                    ..
3733                }) => {
3734                    self.state.move_to_drained(Some(conn_err));
3735                }
3736                ConnectionError::TimedOut => {
3737                    unreachable!("timeouts aren't generated by packet processing");
3738                }
3739                ConnectionError::TransportError(err) => {
3740                    debug!("closing connection due to transport error: {}", err);
3741                    self.state.move_to_closed(err);
3742                }
3743                ConnectionError::VersionMismatch => {
3744                    self.state.move_to_draining(Some(conn_err));
3745                }
3746                ConnectionError::LocallyClosed => {
3747                    unreachable!("LocallyClosed isn't generated by packet processing");
3748                }
3749                ConnectionError::CidsExhausted => {
3750                    unreachable!("CidsExhausted isn't generated by packet processing");
3751                }
3752            };
3753        }
3754
3755        if !was_closed && self.state.is_closed() {
3756            self.close_common();
3757            if !self.state.is_drained() {
3758                self.set_close_timer(now);
3759            }
3760        }
3761        if !was_drained && self.state.is_drained() {
3762            self.endpoint_events.push_back(EndpointEventInner::Drained);
3763            // Close timer may have been started previously, e.g. if we sent a close and got a
3764            // stateless reset in response
3765            self.timers
3766                .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3767        }
3768
3769        // Transmit CONNECTION_CLOSE if necessary
3770        if matches!(self.state.as_type(), StateType::Closed) {
3771            // If there is no PathData for this PathId the packet was for a brand new
3772            // path. It was a valid packet however, so the remote is valid and we want to
3773            // send CONNECTION_CLOSE.
3774            let path_remote = self
3775                .paths
3776                .get(&path_id)
3777                .map(|p| p.data.remote)
3778                .unwrap_or(remote);
3779            self.close = remote == path_remote;
3780        }
3781    }
3782
3783    fn process_decrypted_packet(
3784        &mut self,
3785        now: Instant,
3786        remote: SocketAddr,
3787        path_id: PathId,
3788        number: Option<u64>,
3789        packet: Packet,
3790        qlog: &mut QlogRecvPacket,
3791    ) -> Result<(), ConnectionError> {
3792        if !self.paths.contains_key(&path_id) {
3793            // There is a chance this is a server side, first (for this path) packet, which would
3794            // be a protocol violation. It's more likely, however, that this is a packet of a
3795            // pruned path
3796            trace!(%path_id, ?number, "discarding packet for unknown path");
3797            return Ok(());
3798        }
3799        let state = match self.state.as_type() {
3800            StateType::Established => {
3801                match packet.header.space() {
3802                    SpaceId::Data => {
3803                        self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3804                    }
3805                    _ if packet.header.has_frames() => {
3806                        self.process_early_payload(now, path_id, packet, qlog)?
3807                    }
3808                    _ => {
3809                        trace!("discarding unexpected pre-handshake packet");
3810                    }
3811                }
3812                return Ok(());
3813            }
3814            StateType::Closed => {
3815                for result in frame::Iter::new(packet.payload.freeze())? {
3816                    let frame = match result {
3817                        Ok(frame) => frame,
3818                        Err(err) => {
3819                            debug!("frame decoding error: {err:?}");
3820                            continue;
3821                        }
3822                    };
3823                    qlog.frame(&frame);
3824
3825                    if let Frame::Padding = frame {
3826                        continue;
3827                    };
3828
3829                    self.stats.frame_rx.record(&frame);
3830
3831                    if let Frame::Close(_error) = frame {
3832                        self.state.move_to_draining(None);
3833                        break;
3834                    }
3835                }
3836                return Ok(());
3837            }
3838            StateType::Draining | StateType::Drained => return Ok(()),
3839            StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3840        };
3841
3842        match packet.header {
3843            Header::Retry {
3844                src_cid: rem_cid, ..
3845            } => {
3846                debug_assert_eq!(path_id, PathId::ZERO);
3847                if self.side.is_server() {
3848                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3849                }
3850
3851                let is_valid_retry = self
3852                    .rem_cids
3853                    .get(&path_id)
3854                    .map(|cids| cids.active())
3855                    .map(|orig_dst_cid| {
3856                        self.crypto.is_valid_retry(
3857                            orig_dst_cid,
3858                            &packet.header_data,
3859                            &packet.payload,
3860                        )
3861                    })
3862                    .unwrap_or_default();
3863                if self.total_authed_packets > 1
3864                            || packet.payload.len() <= 16 // token + 16 byte tag
3865                            || !is_valid_retry
3866                {
3867                    trace!("discarding invalid Retry");
3868                    // - After the client has received and processed an Initial or Retry
3869                    //   packet from the server, it MUST discard any subsequent Retry
3870                    //   packets that it receives.
3871                    // - A client MUST discard a Retry packet with a zero-length Retry Token
3872                    //   field.
3873                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
3874                    //   that cannot be validated
3875                    return Ok(());
3876                }
3877
3878                trace!("retrying with CID {}", rem_cid);
3879                let client_hello = state.client_hello.take().unwrap();
3880                self.retry_src_cid = Some(rem_cid);
3881                self.rem_cids
3882                    .get_mut(&path_id)
3883                    .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3884                    .update_initial_cid(rem_cid);
3885                self.rem_handshake_cid = rem_cid;
3886
3887                let space = &mut self.spaces[SpaceId::Initial];
3888                if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3889                    self.on_packet_acked(now, PathId::ZERO, info);
3890                };
3891
3892                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after
3893                // any retransmitted Initials
3894                self.spaces[SpaceId::Initial] = {
3895                    let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3896                    space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3897                    space.crypto_offset = client_hello.len() as u64;
3898                    space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3899                        .for_path(path_id)
3900                        .next_packet_number;
3901                    space.pending.crypto.push_back(frame::Crypto {
3902                        offset: 0,
3903                        data: client_hello,
3904                    });
3905                    space
3906                };
3907
3908                // Retransmit all 0-RTT data
3909                let zero_rtt = mem::take(
3910                    &mut self.spaces[SpaceId::Data]
3911                        .for_path(PathId::ZERO)
3912                        .sent_packets,
3913                );
3914                for (_, info) in zero_rtt.into_iter() {
3915                    self.paths
3916                        .get_mut(&PathId::ZERO)
3917                        .unwrap()
3918                        .remove_in_flight(&info);
3919                    self.spaces[SpaceId::Data].pending |= info.retransmits;
3920                }
3921                self.streams.retransmit_all_for_0rtt();
3922
3923                let token_len = packet.payload.len() - 16;
3924                let ConnectionSide::Client { ref mut token, .. } = self.side else {
3925                    unreachable!("we already short-circuited if we're server");
3926                };
3927                *token = packet.payload.freeze().split_to(token_len);
3928
3929                self.state = State::handshake(state::Handshake {
3930                    expected_token: Bytes::new(),
3931                    rem_cid_set: false,
3932                    client_hello: None,
3933                    allow_server_migration: true,
3934                });
3935                Ok(())
3936            }
3937            Header::Long {
3938                ty: LongType::Handshake,
3939                src_cid: rem_cid,
3940                dst_cid: loc_cid,
3941                ..
3942            } => {
3943                debug_assert_eq!(path_id, PathId::ZERO);
3944                if rem_cid != self.rem_handshake_cid {
3945                    debug!(
3946                        "discarding packet with mismatched remote CID: {} != {}",
3947                        self.rem_handshake_cid, rem_cid
3948                    );
3949                    return Ok(());
3950                }
3951                self.on_path_validated(path_id);
3952
3953                self.process_early_payload(now, path_id, packet, qlog)?;
3954                if self.state.is_closed() {
3955                    return Ok(());
3956                }
3957
3958                if self.crypto.is_handshaking() {
3959                    trace!("handshake ongoing");
3960                    return Ok(());
3961                }
3962
3963                if self.side.is_client() {
3964                    // Client-only because server params were set from the client's Initial
3965                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3966                        TransportError::new(
3967                            TransportErrorCode::crypto(0x6d),
3968                            "transport parameters missing".to_owned(),
3969                        )
3970                    })?;
3971
3972                    if self.has_0rtt() {
3973                        if !self.crypto.early_data_accepted().unwrap() {
3974                            debug_assert!(self.side.is_client());
3975                            debug!("0-RTT rejected");
3976                            self.accepted_0rtt = false;
3977                            self.streams.zero_rtt_rejected();
3978
3979                            // Discard already-queued frames
3980                            self.spaces[SpaceId::Data].pending = Retransmits::default();
3981
3982                            // Discard 0-RTT packets
3983                            let sent_packets = mem::take(
3984                                &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3985                            );
3986                            for (_, packet) in sent_packets.into_iter() {
3987                                self.paths
3988                                    .get_mut(&path_id)
3989                                    .unwrap()
3990                                    .remove_in_flight(&packet);
3991                            }
3992                        } else {
3993                            self.accepted_0rtt = true;
3994                            params.validate_resumption_from(&self.peer_params)?;
3995                        }
3996                    }
3997                    if let Some(token) = params.stateless_reset_token {
3998                        let remote = self.path_data(path_id).remote;
3999                        self.endpoint_events
4000                            .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4001                    }
4002                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4003                    self.issue_first_cids(now);
4004                } else {
4005                    // Server-only
4006                    self.spaces[SpaceId::Data].pending.handshake_done = true;
4007                    self.discard_space(now, SpaceId::Handshake);
4008                    self.events.push_back(Event::HandshakeConfirmed);
4009                    trace!("handshake confirmed");
4010                }
4011
4012                self.events.push_back(Event::Connected);
4013                self.state.move_to_established();
4014                trace!("established");
4015
4016                // Multipath can only be enabled after the state has reached Established.
4017                // So this can not happen any earlier.
4018                self.issue_first_path_cids(now);
4019                Ok(())
4020            }
4021            Header::Initial(InitialHeader {
4022                src_cid: rem_cid,
4023                dst_cid: loc_cid,
4024                ..
4025            }) => {
4026                debug_assert_eq!(path_id, PathId::ZERO);
4027                if !state.rem_cid_set {
4028                    trace!("switching remote CID to {}", rem_cid);
4029                    let mut state = state.clone();
4030                    self.rem_cids
4031                        .get_mut(&path_id)
4032                        .expect("PathId::ZERO not yet abandoned")
4033                        .update_initial_cid(rem_cid);
4034                    self.rem_handshake_cid = rem_cid;
4035                    self.orig_rem_cid = rem_cid;
4036                    state.rem_cid_set = true;
4037                    self.state.move_to_handshake(state);
4038                } else if rem_cid != self.rem_handshake_cid {
4039                    debug!(
4040                        "discarding packet with mismatched remote CID: {} != {}",
4041                        self.rem_handshake_cid, rem_cid
4042                    );
4043                    return Ok(());
4044                }
4045
4046                let starting_space = self.highest_space;
4047                self.process_early_payload(now, path_id, packet, qlog)?;
4048
4049                if self.side.is_server()
4050                    && starting_space == SpaceId::Initial
4051                    && self.highest_space != SpaceId::Initial
4052                {
4053                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4054                        TransportError::new(
4055                            TransportErrorCode::crypto(0x6d),
4056                            "transport parameters missing".to_owned(),
4057                        )
4058                    })?;
4059                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4060                    self.issue_first_cids(now);
4061                    self.init_0rtt(now);
4062                }
4063                Ok(())
4064            }
4065            Header::Long {
4066                ty: LongType::ZeroRtt,
4067                ..
4068            } => {
4069                self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4070                Ok(())
4071            }
4072            Header::VersionNegotiate { .. } => {
4073                if self.total_authed_packets > 1 {
4074                    return Ok(());
4075                }
4076                let supported = packet
4077                    .payload
4078                    .chunks(4)
4079                    .any(|x| match <[u8; 4]>::try_from(x) {
4080                        Ok(version) => self.version == u32::from_be_bytes(version),
4081                        Err(_) => false,
4082                    });
4083                if supported {
4084                    return Ok(());
4085                }
4086                debug!("remote doesn't support our version");
4087                Err(ConnectionError::VersionMismatch)
4088            }
4089            Header::Short { .. } => unreachable!(
4090                "short packets received during handshake are discarded in handle_packet"
4091            ),
4092        }
4093    }
4094
4095    /// Process an Initial or Handshake packet payload
4096    fn process_early_payload(
4097        &mut self,
4098        now: Instant,
4099        path_id: PathId,
4100        packet: Packet,
4101        #[allow(unused)] qlog: &mut QlogRecvPacket,
4102    ) -> Result<(), TransportError> {
4103        debug_assert_ne!(packet.header.space(), SpaceId::Data);
4104        debug_assert_eq!(path_id, PathId::ZERO);
4105        let payload_len = packet.payload.len();
4106        let mut ack_eliciting = false;
4107        for result in frame::Iter::new(packet.payload.freeze())? {
4108            let frame = result?;
4109            qlog.frame(&frame);
4110            let span = match frame {
4111                Frame::Padding => continue,
4112                _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4113            };
4114
4115            self.stats.frame_rx.record(&frame);
4116
4117            let _guard = span.as_ref().map(|x| x.enter());
4118            ack_eliciting |= frame.is_ack_eliciting();
4119
4120            // Process frames
4121            if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4122                return Err(TransportError::PROTOCOL_VIOLATION(
4123                    "illegal frame type in handshake",
4124                ));
4125            }
4126
4127            match frame {
4128                Frame::Padding | Frame::Ping => {}
4129                Frame::Crypto(frame) => {
4130                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
4131                }
4132                Frame::Ack(ack) => {
4133                    self.on_ack_received(now, packet.header.space(), ack)?;
4134                }
4135                Frame::PathAck(ack) => {
4136                    span.as_ref()
4137                        .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4138                    self.on_path_ack_received(now, packet.header.space(), ack)?;
4139                }
4140                Frame::Close(reason) => {
4141                    self.state.move_to_draining(Some(reason.into()));
4142                    return Ok(());
4143                }
4144                _ => {
4145                    let mut err =
4146                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4147                    err.frame = frame::MaybeFrame::Known(frame.ty());
4148                    return Err(err);
4149                }
4150            }
4151        }
4152
4153        if ack_eliciting {
4154            // In the initial and handshake spaces, ACKs must be sent immediately
4155            self.spaces[packet.header.space()]
4156                .for_path(path_id)
4157                .pending_acks
4158                .set_immediate_ack_required();
4159        }
4160
4161        self.write_crypto();
4162        Ok(())
4163    }
4164
4165    /// Processes the packet payload, always in the data space.
4166    fn process_payload(
4167        &mut self,
4168        now: Instant,
4169        remote: SocketAddr,
4170        path_id: PathId,
4171        number: u64,
4172        packet: Packet,
4173        #[allow(unused)] qlog: &mut QlogRecvPacket,
4174    ) -> Result<(), TransportError> {
4175        let payload = packet.payload.freeze();
4176        let mut is_probing_packet = true;
4177        let mut close = None;
4178        let payload_len = payload.len();
4179        let mut ack_eliciting = false;
4180        // if this packet triggers a path migration and includes a observed address frame, it's
4181        // stored here
4182        let mut migration_observed_addr = None;
4183        for result in frame::Iter::new(payload)? {
4184            let frame = result?;
4185            qlog.frame(&frame);
4186            let span = match frame {
4187                Frame::Padding => continue,
4188                _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4189            };
4190
4191            self.stats.frame_rx.record(&frame);
4192            // Crypto, Stream and Datagram frames are special cased in order no pollute
4193            // the log with payload data
4194            match &frame {
4195                Frame::Crypto(f) => {
4196                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4197                }
4198                Frame::Stream(f) => {
4199                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4200                }
4201                Frame::Datagram(f) => {
4202                    trace!(len = f.data.len(), "got datagram frame");
4203                }
4204                f => {
4205                    trace!("got frame {f}");
4206                }
4207            }
4208
4209            let _guard = span.enter();
4210            if packet.header.is_0rtt() {
4211                match frame {
4212                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4213                        return Err(TransportError::PROTOCOL_VIOLATION(
4214                            "illegal frame type in 0-RTT",
4215                        ));
4216                    }
4217                    _ => {
4218                        if frame.is_1rtt() {
4219                            return Err(TransportError::PROTOCOL_VIOLATION(
4220                                "illegal frame type in 0-RTT",
4221                            ));
4222                        }
4223                    }
4224                }
4225            }
4226            ack_eliciting |= frame.is_ack_eliciting();
4227
4228            // Check whether this could be a probing packet
4229            match frame {
4230                Frame::Padding
4231                | Frame::PathChallenge(_)
4232                | Frame::PathResponse(_)
4233                | Frame::NewConnectionId(_)
4234                | Frame::ObservedAddr(_) => {}
4235                _ => {
4236                    is_probing_packet = false;
4237                }
4238            }
4239
4240            match frame {
4241                Frame::Crypto(frame) => {
4242                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4243                }
4244                Frame::Stream(frame) => {
4245                    if self.streams.received(frame, payload_len)?.should_transmit() {
4246                        self.spaces[SpaceId::Data].pending.max_data = true;
4247                    }
4248                }
4249                Frame::Ack(ack) => {
4250                    self.on_ack_received(now, SpaceId::Data, ack)?;
4251                }
4252                Frame::PathAck(ack) => {
4253                    span.record("path", tracing::field::debug(&ack.path_id));
4254                    self.on_path_ack_received(now, SpaceId::Data, ack)?;
4255                }
4256                Frame::Padding | Frame::Ping => {}
4257                Frame::Close(reason) => {
4258                    close = Some(reason);
4259                }
4260                Frame::PathChallenge(challenge) => {
4261                    let path = &mut self
4262                        .path_mut(path_id)
4263                        .expect("payload is processed only after the path becomes known");
4264                    path.path_responses.push(number, challenge.0, remote);
4265                    if remote == path.remote {
4266                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
4267                        // attack. Send a non-probing packet to recover the active path.
4268                        // TODO(flub): No longer true! We now path_challege also to validate
4269                        //    the path if the path is new, without an RFC9000-style
4270                        //    migration involved. This means we add in an extra
4271                        //    IMMEDIATE_ACK on some challenges. It isn't really wrong to do
4272                        //    so, but it still is something untidy. We should instead
4273                        //    suppress this when we know the remote is still validating the
4274                        //    path.
4275                        match self.peer_supports_ack_frequency() {
4276                            true => self.immediate_ack(path_id),
4277                            false => {
4278                                self.ping_path(path_id).ok();
4279                            }
4280                        }
4281                    }
4282                }
4283                Frame::PathResponse(response) => {
4284                    let path = self
4285                        .paths
4286                        .get_mut(&path_id)
4287                        .expect("payload is processed only after the path becomes known");
4288
4289                    use PathTimer::*;
4290                    use paths::OnPathResponseReceived::*;
4291                    match path.data.on_path_response_received(now, response.0, remote) {
4292                        OnPath { was_open } => {
4293                            let qlog = self.qlog.with_time(now);
4294
4295                            self.timers
4296                                .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4297                            self.timers
4298                                .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4299
4300                            let next_challenge = path
4301                                .data
4302                                .earliest_expiring_challenge()
4303                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4304                            self.timers.set_or_stop(
4305                                Timer::PerPath(path_id, PathChallengeLost),
4306                                next_challenge,
4307                                qlog,
4308                            );
4309
4310                            if !was_open {
4311                                self.events
4312                                    .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4313                                if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4314                                {
4315                                    self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4316                                        id: path_id,
4317                                        addr: observed.socket_addr(),
4318                                    }));
4319                                }
4320                            }
4321                            if let Some((_, ref mut prev)) = path.prev {
4322                                prev.challenges_sent.clear();
4323                                prev.send_new_challenge = false;
4324                            }
4325                        }
4326                        OffPath => {
4327                            debug!("Response to off-path PathChallenge!");
4328                            let next_challenge = path
4329                                .data
4330                                .earliest_expiring_challenge()
4331                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4332                            self.timers.set_or_stop(
4333                                Timer::PerPath(path_id, PathChallengeLost),
4334                                next_challenge,
4335                                self.qlog.with_time(now),
4336                            );
4337                        }
4338                        Invalid { expected } => {
4339                            debug!(%response, from=%remote, %expected, "ignoring invalid PATH_RESPONSE")
4340                        }
4341                        Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4342                    }
4343                }
4344                Frame::MaxData(bytes) => {
4345                    self.streams.received_max_data(bytes);
4346                }
4347                Frame::MaxStreamData { id, offset } => {
4348                    self.streams.received_max_stream_data(id, offset)?;
4349                }
4350                Frame::MaxStreams { dir, count } => {
4351                    self.streams.received_max_streams(dir, count)?;
4352                }
4353                Frame::ResetStream(frame) => {
4354                    if self.streams.received_reset(frame)?.should_transmit() {
4355                        self.spaces[SpaceId::Data].pending.max_data = true;
4356                    }
4357                }
4358                Frame::DataBlocked { offset } => {
4359                    debug!(offset, "peer claims to be blocked at connection level");
4360                }
4361                Frame::StreamDataBlocked { id, offset } => {
4362                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4363                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4364                        return Err(TransportError::STREAM_STATE_ERROR(
4365                            "STREAM_DATA_BLOCKED on send-only stream",
4366                        ));
4367                    }
4368                    debug!(
4369                        stream = %id,
4370                        offset, "peer claims to be blocked at stream level"
4371                    );
4372                }
4373                Frame::StreamsBlocked { dir, limit } => {
4374                    if limit > MAX_STREAM_COUNT {
4375                        return Err(TransportError::FRAME_ENCODING_ERROR(
4376                            "unrepresentable stream limit",
4377                        ));
4378                    }
4379                    debug!(
4380                        "peer claims to be blocked opening more than {} {} streams",
4381                        limit, dir
4382                    );
4383                }
4384                Frame::StopSending(frame::StopSending { id, error_code }) => {
4385                    if id.initiator() != self.side.side() {
4386                        if id.dir() == Dir::Uni {
4387                            debug!("got STOP_SENDING on recv-only {}", id);
4388                            return Err(TransportError::STREAM_STATE_ERROR(
4389                                "STOP_SENDING on recv-only stream",
4390                            ));
4391                        }
4392                    } else if self.streams.is_local_unopened(id) {
4393                        return Err(TransportError::STREAM_STATE_ERROR(
4394                            "STOP_SENDING on unopened stream",
4395                        ));
4396                    }
4397                    self.streams.received_stop_sending(id, error_code);
4398                }
4399                Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4400                    if let Some(ref path_id) = path_id {
4401                        span.record("path", tracing::field::debug(&path_id));
4402                    }
4403                    let path_id = path_id.unwrap_or_default();
4404                    match self.local_cid_state.get_mut(&path_id) {
4405                        None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4406                        Some(cid_state) => {
4407                            let allow_more_cids = cid_state
4408                                .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4409
4410                            // If the path has closed, we do not issue more CIDs for this path
4411                            // For details see  https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.2.2
4412                            // > an endpoint SHOULD provide new connection IDs for that path, if still open, using PATH_NEW_CONNECTION_ID frames.
4413                            let has_path = !self.abandoned_paths.contains(&path_id);
4414                            let allow_more_cids = allow_more_cids && has_path;
4415
4416                            self.endpoint_events
4417                                .push_back(EndpointEventInner::RetireConnectionId(
4418                                    now,
4419                                    path_id,
4420                                    sequence,
4421                                    allow_more_cids,
4422                                ));
4423                        }
4424                    }
4425                }
4426                Frame::NewConnectionId(frame) => {
4427                    let path_id = if let Some(path_id) = frame.path_id {
4428                        if !self.is_multipath_negotiated() {
4429                            return Err(TransportError::PROTOCOL_VIOLATION(
4430                                "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4431                            ));
4432                        }
4433                        if path_id > self.local_max_path_id {
4434                            return Err(TransportError::PROTOCOL_VIOLATION(
4435                                "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4436                            ));
4437                        }
4438                        path_id
4439                    } else {
4440                        PathId::ZERO
4441                    };
4442
4443                    if self.abandoned_paths.contains(&path_id) {
4444                        trace!("ignoring issued CID for abandoned path");
4445                        continue;
4446                    }
4447                    if let Some(ref path_id) = frame.path_id {
4448                        span.record("path", tracing::field::debug(&path_id));
4449                    }
4450                    let rem_cids = self
4451                        .rem_cids
4452                        .entry(path_id)
4453                        .or_insert_with(|| CidQueue::new(frame.id));
4454                    if rem_cids.active().is_empty() {
4455                        return Err(TransportError::PROTOCOL_VIOLATION(
4456                            "NEW_CONNECTION_ID when CIDs aren't in use",
4457                        ));
4458                    }
4459                    if frame.retire_prior_to > frame.sequence {
4460                        return Err(TransportError::PROTOCOL_VIOLATION(
4461                            "NEW_CONNECTION_ID retiring unissued CIDs",
4462                        ));
4463                    }
4464
4465                    use crate::cid_queue::InsertError;
4466                    match rem_cids.insert(frame) {
4467                        Ok(None) if self.path(path_id).is_none() => {
4468                            // if this gives us CIDs to open a new path and a nat traversal attempt
4469                            // is underway we could try to probe a pending remote
4470                            self.continue_nat_traversal_round(now);
4471                        }
4472                        Ok(None) => {}
4473                        Ok(Some((retired, reset_token))) => {
4474                            let pending_retired =
4475                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
4476                            /// Ensure `pending_retired` cannot grow without bound. Limit is
4477                            /// somewhat arbitrary but very permissive.
4478                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4479                            // We don't bother counting in-flight frames because those are bounded
4480                            // by congestion control.
4481                            if (pending_retired.len() as u64)
4482                                .saturating_add(retired.end.saturating_sub(retired.start))
4483                                > MAX_PENDING_RETIRED_CIDS
4484                            {
4485                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4486                                    "queued too many retired CIDs",
4487                                ));
4488                            }
4489                            pending_retired.extend(retired.map(|seq| (path_id, seq)));
4490                            self.set_reset_token(path_id, remote, reset_token);
4491                        }
4492                        Err(InsertError::ExceedsLimit) => {
4493                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4494                        }
4495                        Err(InsertError::Retired) => {
4496                            trace!("discarding already-retired");
4497                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
4498                            // range of connection IDs larger than the active connection ID limit
4499                            // was retired all at once via retire_prior_to.
4500                            self.spaces[SpaceId::Data]
4501                                .pending
4502                                .retire_cids
4503                                .push((path_id, frame.sequence));
4504                            continue;
4505                        }
4506                    };
4507
4508                    if self.side.is_server()
4509                        && path_id == PathId::ZERO
4510                        && self
4511                            .rem_cids
4512                            .get(&PathId::ZERO)
4513                            .map(|cids| cids.active_seq() == 0)
4514                            .unwrap_or_default()
4515                    {
4516                        // We're a server still using the initial remote CID for the client, so
4517                        // let's switch immediately to enable clientside stateless resets.
4518                        self.update_rem_cid(PathId::ZERO);
4519                    }
4520                }
4521                Frame::NewToken(NewToken { token }) => {
4522                    let ConnectionSide::Client {
4523                        token_store,
4524                        server_name,
4525                        ..
4526                    } = &self.side
4527                    else {
4528                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4529                    };
4530                    if token.is_empty() {
4531                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4532                    }
4533                    trace!("got new token");
4534                    token_store.insert(server_name, token);
4535                }
4536                Frame::Datagram(datagram) => {
4537                    if self
4538                        .datagrams
4539                        .received(datagram, &self.config.datagram_receive_buffer_size)?
4540                    {
4541                        self.events.push_back(Event::DatagramReceived);
4542                    }
4543                }
4544                Frame::AckFrequency(ack_frequency) => {
4545                    // This frame can only be sent in the Data space
4546
4547                    if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4548                        // The AckFrequency frame is stale (we have already received a more
4549                        // recent one)
4550                        continue;
4551                    }
4552
4553                    // Update the params for all of our paths
4554                    for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4555                        space.pending_acks.set_ack_frequency_params(&ack_frequency);
4556
4557                        // Our `max_ack_delay` has been updated, so we may need to adjust
4558                        // its associated timeout
4559                        if let Some(timeout) = space
4560                            .pending_acks
4561                            .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4562                        {
4563                            self.timers.set(
4564                                Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4565                                timeout,
4566                                self.qlog.with_time(now),
4567                            );
4568                        }
4569                    }
4570                }
4571                Frame::ImmediateAck => {
4572                    // This frame can only be sent in the Data space
4573                    for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4574                        pns.pending_acks.set_immediate_ack_required();
4575                    }
4576                }
4577                Frame::HandshakeDone => {
4578                    if self.side.is_server() {
4579                        return Err(TransportError::PROTOCOL_VIOLATION(
4580                            "client sent HANDSHAKE_DONE",
4581                        ));
4582                    }
4583                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
4584                        self.discard_space(now, SpaceId::Handshake);
4585                    }
4586                    self.events.push_back(Event::HandshakeConfirmed);
4587                    trace!("handshake confirmed");
4588                }
4589                Frame::ObservedAddr(observed) => {
4590                    // check if params allows the peer to send report and this node to receive it
4591                    trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4592                    if !self
4593                        .peer_params
4594                        .address_discovery_role
4595                        .should_report(&self.config.address_discovery_role)
4596                    {
4597                        return Err(TransportError::PROTOCOL_VIOLATION(
4598                            "received OBSERVED_ADDRESS frame when not negotiated",
4599                        ));
4600                    }
4601                    // must only be sent in data space
4602                    if packet.header.space() != SpaceId::Data {
4603                        return Err(TransportError::PROTOCOL_VIOLATION(
4604                            "OBSERVED_ADDRESS frame outside data space",
4605                        ));
4606                    }
4607
4608                    let path = self.path_data_mut(path_id);
4609                    if remote == path.remote {
4610                        if let Some(updated) = path.update_observed_addr_report(observed) {
4611                            if path.open {
4612                                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4613                                    id: path_id,
4614                                    addr: updated,
4615                                }));
4616                            }
4617                            // otherwise the event is reported when the path is deemed open
4618                        }
4619                    } else {
4620                        // include in migration
4621                        migration_observed_addr = Some(observed)
4622                    }
4623                }
4624                Frame::PathAbandon(frame::PathAbandon {
4625                    path_id,
4626                    error_code,
4627                }) => {
4628                    span.record("path", tracing::field::debug(&path_id));
4629                    // TODO(flub): don't really know which error code to use here.
4630                    let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4631                        Ok(()) => {
4632                            trace!("peer abandoned path");
4633                            false
4634                        }
4635                        Err(ClosePathError::LastOpenPath) => {
4636                            trace!("peer abandoned last path, closing connection");
4637                            // TODO(flub): which error code?
4638                            return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4639                        }
4640                        Err(ClosePathError::ClosedPath) => {
4641                            trace!("peer abandoned already closed path");
4642                            true
4643                        }
4644                    };
4645                    // If we receive a retransmit of PATH_ABANDON then we may already have
4646                    // abandoned this path locally.  In that case the DiscardPath timer
4647                    // may already have fired and we no longer have any state for this path.
4648                    // Only set this timer if we still have path state.
4649                    if self.path(path_id).is_some() && !already_abandoned {
4650                        // TODO(flub): Checking is_some() here followed by a number of calls
4651                        //    that would panic if it was None is really ugly.  If only we
4652                        //    could do something like PathData::pto().  One day we'll have
4653                        //    unified SpaceId and PathId and this will be possible.
4654                        let delay = self.pto(SpaceId::Data, path_id) * 3;
4655                        self.timers.set(
4656                            Timer::PerPath(path_id, PathTimer::DiscardPath),
4657                            now + delay,
4658                            self.qlog.with_time(now),
4659                        );
4660                    }
4661                }
4662                Frame::PathStatusAvailable(info) => {
4663                    span.record("path", tracing::field::debug(&info.path_id));
4664                    if self.is_multipath_negotiated() {
4665                        self.on_path_status(
4666                            info.path_id,
4667                            PathStatus::Available,
4668                            info.status_seq_no,
4669                        );
4670                    } else {
4671                        return Err(TransportError::PROTOCOL_VIOLATION(
4672                            "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4673                        ));
4674                    }
4675                }
4676                Frame::PathStatusBackup(info) => {
4677                    span.record("path", tracing::field::debug(&info.path_id));
4678                    if self.is_multipath_negotiated() {
4679                        self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4680                    } else {
4681                        return Err(TransportError::PROTOCOL_VIOLATION(
4682                            "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4683                        ));
4684                    }
4685                }
4686                Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4687                    span.record("path", tracing::field::debug(&path_id));
4688                    if !self.is_multipath_negotiated() {
4689                        return Err(TransportError::PROTOCOL_VIOLATION(
4690                            "received MAX_PATH_ID frame when multipath was not negotiated",
4691                        ));
4692                    }
4693                    // frames that do not increase the path id are ignored
4694                    if path_id > self.remote_max_path_id {
4695                        self.remote_max_path_id = path_id;
4696                        self.issue_first_path_cids(now);
4697                        while let Some(true) = self.continue_nat_traversal_round(now) {}
4698                    }
4699                }
4700                Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4701                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4702                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4703                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4704                    if self.is_multipath_negotiated() {
4705                        if self.local_max_path_id > max_path_id {
4706                            return Err(TransportError::PROTOCOL_VIOLATION(
4707                                "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4708                            ));
4709                        }
4710                        debug!("received PATHS_BLOCKED({:?})", max_path_id);
4711                        // TODO(@divma): ensure max concurrent paths
4712                    } else {
4713                        return Err(TransportError::PROTOCOL_VIOLATION(
4714                            "received PATHS_BLOCKED frame when not multipath was not negotiated",
4715                        ));
4716                    }
4717                }
4718                Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4719                    // Nothing to do.  This is recorded in the frame stats, but otherwise we
4720                    // always issue all CIDs we're allowed to issue, so either this is an
4721                    // impatient peer or a bug on our side.
4722
4723                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4724                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4725                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4726                    if self.is_multipath_negotiated() {
4727                        if path_id > self.local_max_path_id {
4728                            return Err(TransportError::PROTOCOL_VIOLATION(
4729                                "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4730                            ));
4731                        }
4732                        if next_seq.0
4733                            > self
4734                                .local_cid_state
4735                                .get(&path_id)
4736                                .map(|cid_state| cid_state.active_seq().1 + 1)
4737                                .unwrap_or_default()
4738                        {
4739                            return Err(TransportError::PROTOCOL_VIOLATION(
4740                                "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4741                            ));
4742                        }
4743                        debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4744                    } else {
4745                        return Err(TransportError::PROTOCOL_VIOLATION(
4746                            "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4747                        ));
4748                    }
4749                }
4750                Frame::AddAddress(addr) => {
4751                    let client_state = match self.iroh_hp.client_side_mut() {
4752                        Ok(state) => state,
4753                        Err(err) => {
4754                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4755                                "Nat traversal(ADD_ADDRESS): {err}"
4756                            )));
4757                        }
4758                    };
4759
4760                    if !client_state.check_remote_address(&addr) {
4761                        // if the address is not valid we flag it, but update anyway
4762                        warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4763                    }
4764
4765                    match client_state.add_remote_address(addr) {
4766                        Ok(maybe_added) => {
4767                            if let Some(added) = maybe_added {
4768                                self.events.push_back(Event::NatTraversal(
4769                                    iroh_hp::Event::AddressAdded(added),
4770                                ));
4771                            }
4772                        }
4773                        Err(e) => {
4774                            warn!(%e, "failed to add remote address")
4775                        }
4776                    }
4777                }
4778                Frame::RemoveAddress(addr) => {
4779                    let client_state = match self.iroh_hp.client_side_mut() {
4780                        Ok(state) => state,
4781                        Err(err) => {
4782                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4783                                "Nat traversal(REMOVE_ADDRESS): {err}"
4784                            )));
4785                        }
4786                    };
4787                    if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4788                        self.events
4789                            .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4790                                removed_addr,
4791                            )));
4792                    }
4793                }
4794                Frame::ReachOut(reach_out) => {
4795                    let server_state = match self.iroh_hp.server_side_mut() {
4796                        Ok(state) => state,
4797                        Err(err) => {
4798                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4799                                "Nat traversal(REACH_OUT): {err}"
4800                            )));
4801                        }
4802                    };
4803
4804                    if let Err(err) = server_state.handle_reach_out(reach_out) {
4805                        return Err(TransportError::PROTOCOL_VIOLATION(format!(
4806                            "Nat traversal(REACH_OUT): {err}"
4807                        )));
4808                    }
4809                }
4810            }
4811        }
4812
4813        let space = self.spaces[SpaceId::Data].for_path(path_id);
4814        if space
4815            .pending_acks
4816            .packet_received(now, number, ack_eliciting, &space.dedup)
4817        {
4818            if self.abandoned_paths.contains(&path_id) {
4819                // § 3.4.3 QUIC-MULTIPATH: promptly send ACKs for packets received from
4820                // abandoned paths.
4821                space.pending_acks.set_immediate_ack_required();
4822            } else {
4823                self.timers.set(
4824                    Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4825                    now + self.ack_frequency.max_ack_delay,
4826                    self.qlog.with_time(now),
4827                );
4828            }
4829        }
4830
4831        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
4832        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
4833        // are only freed, and hence only issue credit, once the application has been notified
4834        // during a read on the stream.
4835        let pending = &mut self.spaces[SpaceId::Data].pending;
4836        self.streams.queue_max_stream_id(pending);
4837
4838        if let Some(reason) = close {
4839            self.state.move_to_draining(Some(reason.into()));
4840            self.close = true;
4841        }
4842
4843        if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4844            && !is_probing_packet
4845            && remote != self.path_data(path_id).remote
4846        {
4847            let ConnectionSide::Server { ref server_config } = self.side else {
4848                panic!("packets from unknown remote should be dropped by clients");
4849            };
4850            debug_assert!(
4851                server_config.migration,
4852                "migration-initiating packets should have been dropped immediately"
4853            );
4854            self.migrate(path_id, now, remote, migration_observed_addr);
4855            // Break linkability, if possible
4856            self.update_rem_cid(path_id);
4857            self.spin = false;
4858        }
4859
4860        Ok(())
4861    }
4862
4863    fn migrate(
4864        &mut self,
4865        path_id: PathId,
4866        now: Instant,
4867        remote: SocketAddr,
4868        observed_addr: Option<ObservedAddr>,
4869    ) {
4870        trace!(%remote, %path_id, "migration initiated");
4871        self.path_counter = self.path_counter.wrapping_add(1);
4872        // TODO(@divma): conditions for path migration in multipath are very specific, check them
4873        // again to prevent path migrations that should actually create a new path
4874
4875        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
4876        // Note that the congestion window will not grow until validation terminates. Helps mitigate
4877        // amplification attacks performed by spoofing source addresses.
4878        let prev_pto = self.pto(SpaceId::Data, path_id);
4879        let known_path = self.paths.get_mut(&path_id).expect("known path");
4880        let path = &mut known_path.data;
4881        let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4882            PathData::from_previous(remote, path, self.path_counter, now)
4883        } else {
4884            let peer_max_udp_payload_size =
4885                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4886                    .unwrap_or(u16::MAX);
4887            PathData::new(
4888                remote,
4889                self.allow_mtud,
4890                Some(peer_max_udp_payload_size),
4891                self.path_counter,
4892                now,
4893                &self.config,
4894            )
4895        };
4896        new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4897        if let Some(report) = observed_addr {
4898            if let Some(updated) = new_path.update_observed_addr_report(report) {
4899                tracing::info!("adding observed addr event from migration");
4900                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4901                    id: path_id,
4902                    addr: updated,
4903                }));
4904            }
4905        }
4906        new_path.send_new_challenge = true;
4907
4908        let mut prev = mem::replace(path, new_path);
4909        // Don't clobber the original path if the previous one hasn't been validated yet
4910        if !prev.is_validating_path() {
4911            prev.send_new_challenge = true;
4912            // We haven't updated the remote CID yet, this captures the remote CID we were using on
4913            // the previous path.
4914
4915            known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4916        }
4917
4918        // We need to re-assign the correct remote to this path in qlog
4919        self.qlog.emit_tuple_assigned(path_id, remote, now);
4920
4921        self.timers.set(
4922            Timer::PerPath(path_id, PathTimer::PathValidation),
4923            now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4924            self.qlog.with_time(now),
4925        );
4926    }
4927
4928    /// Handle a change in the local address, i.e. an active migration
4929    pub fn local_address_changed(&mut self) {
4930        // TODO(flub): if multipath is enabled this needs to create a new path entirely.
4931        self.update_rem_cid(PathId::ZERO);
4932        self.ping();
4933    }
4934
4935    /// Switch to a previously unused remote connection ID, if possible
4936    fn update_rem_cid(&mut self, path_id: PathId) {
4937        let Some((reset_token, retired)) =
4938            self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4939        else {
4940            return;
4941        };
4942
4943        // Retire the current remote CID and any CIDs we had to skip.
4944        self.spaces[SpaceId::Data]
4945            .pending
4946            .retire_cids
4947            .extend(retired.map(|seq| (path_id, seq)));
4948        let remote = self.path_data(path_id).remote;
4949        self.set_reset_token(path_id, remote, reset_token);
4950    }
4951
4952    /// Sends this reset token to the endpoint
4953    ///
4954    /// The endpoint needs to know the reset tokens issued by the peer, so that if the peer
4955    /// sends a reset token it knows to route it to this connection. See RFC 9000 section
4956    /// 10.3. Stateless Reset.
4957    ///
4958    /// Reset tokens are different for each path, the endpoint identifies paths by peer
4959    /// socket address however, not by path ID.
4960    fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4961        self.endpoint_events
4962            .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4963
4964        // During the handshake the server sends a reset token in the transport
4965        // parameters. When we are the client and we receive the reset token during the
4966        // handshake we want this to affect our peer transport parameters.
4967        // TODO(flub): Pretty sure this is pointless, the entire params is overwritten
4968        //    shortly after this was called.  And then the params don't have this anymore.
4969        if path_id == PathId::ZERO {
4970            self.peer_params.stateless_reset_token = Some(reset_token);
4971        }
4972    }
4973
4974    /// Issue an initial set of connection IDs to the peer upon connection
4975    fn issue_first_cids(&mut self, now: Instant) {
4976        if self
4977            .local_cid_state
4978            .get(&PathId::ZERO)
4979            .expect("PathId::ZERO exists when the connection is created")
4980            .cid_len()
4981            == 0
4982        {
4983            return;
4984        }
4985
4986        // Subtract 1 to account for the CID we supplied while handshaking
4987        let mut n = self.peer_params.issue_cids_limit() - 1;
4988        if let ConnectionSide::Server { server_config } = &self.side {
4989            if server_config.has_preferred_address() {
4990                // We also sent a CID in the transport parameters
4991                n -= 1;
4992            }
4993        }
4994        self.endpoint_events
4995            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
4996    }
4997
4998    /// Issues an initial set of CIDs for paths that have not yet had any CIDs issued
4999    ///
5000    /// Later CIDs are issued when CIDs expire or are retired by the peer.
5001    fn issue_first_path_cids(&mut self, now: Instant) {
5002        if let Some(max_path_id) = self.max_path_id() {
5003            let mut path_id = self.max_path_id_with_cids.next();
5004            while path_id <= max_path_id {
5005                self.endpoint_events
5006                    .push_back(EndpointEventInner::NeedIdentifiers(
5007                        path_id,
5008                        now,
5009                        self.peer_params.issue_cids_limit(),
5010                    ));
5011                path_id = path_id.next();
5012            }
5013            self.max_path_id_with_cids = max_path_id;
5014        }
5015    }
5016
5017    /// Populates a packet with frames
5018    ///
5019    /// This tries to fit as many frames as possible into the packet.
5020    ///
5021    /// *path_exclusive_only* means to only build frames which can only be sent on this
5022    /// *path.  This is used in multipath for backup paths while there is still an active
5023    /// *path.
5024    fn populate_packet(
5025        &mut self,
5026        now: Instant,
5027        space_id: SpaceId,
5028        path_id: PathId,
5029        path_exclusive_only: bool,
5030        buf: &mut impl BufMut,
5031        pn: u64,
5032        #[allow(unused)] qlog: &mut QlogSentPacket,
5033    ) -> SentFrames {
5034        let mut sent = SentFrames::default();
5035        let is_multipath_negotiated = self.is_multipath_negotiated();
5036        let space = &mut self.spaces[space_id];
5037        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5038        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5039        space
5040            .for_path(path_id)
5041            .pending_acks
5042            .maybe_ack_non_eliciting();
5043
5044        // HANDSHAKE_DONE
5045        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5046            trace!("HANDSHAKE_DONE");
5047            buf.write(frame::FrameType::HandshakeDone);
5048            qlog.frame(&Frame::HandshakeDone);
5049            sent.retransmits.get_or_create().handshake_done = true;
5050            // This is just a u8 counter and the frame is typically just sent once
5051            self.stats.frame_tx.handshake_done =
5052                self.stats.frame_tx.handshake_done.saturating_add(1);
5053        }
5054
5055        // REACH_OUT
5056        // TODO(@divma): path explusive considerations
5057        if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5058            while let Some(local_addr) = addresses.pop() {
5059                let reach_out = frame::ReachOut::new(*round, local_addr);
5060                if buf.remaining_mut() > reach_out.size() {
5061                    trace!(%round, ?local_addr, "REACH_OUT");
5062                    reach_out.encode(buf);
5063                    let sent_reachouts = sent
5064                        .retransmits
5065                        .get_or_create()
5066                        .reach_out
5067                        .get_or_insert_with(|| (*round, Default::default()));
5068                    sent_reachouts.1.push(local_addr);
5069                    self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5070                    qlog.frame(&Frame::ReachOut(reach_out));
5071                } else {
5072                    addresses.push(local_addr);
5073                    break;
5074                }
5075            }
5076            if addresses.is_empty() {
5077                space.pending.reach_out = None;
5078            }
5079        }
5080
5081        // OBSERVED_ADDR
5082        if !path_exclusive_only
5083            && space_id == SpaceId::Data
5084            && self
5085                .config
5086                .address_discovery_role
5087                .should_report(&self.peer_params.address_discovery_role)
5088            && (!path.observed_addr_sent || space.pending.observed_addr)
5089        {
5090            let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5091            if buf.remaining_mut() > frame.size() {
5092                trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5093                frame.encode(buf);
5094
5095                self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5096                path.observed_addr_sent = true;
5097
5098                self.stats.frame_tx.observed_addr += 1;
5099                sent.retransmits.get_or_create().observed_addr = true;
5100                space.pending.observed_addr = false;
5101                qlog.frame(&Frame::ObservedAddr(frame));
5102            }
5103        }
5104
5105        // PING
5106        if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5107            trace!("PING");
5108            buf.write(frame::FrameType::Ping);
5109            sent.non_retransmits = true;
5110            self.stats.frame_tx.ping += 1;
5111            qlog.frame(&Frame::Ping);
5112        }
5113
5114        // IMMEDIATE_ACK
5115        if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5116            debug_assert_eq!(
5117                space_id,
5118                SpaceId::Data,
5119                "immediate acks must be sent in the data space"
5120            );
5121            trace!("IMMEDIATE_ACK");
5122            buf.write(frame::FrameType::ImmediateAck);
5123            sent.non_retransmits = true;
5124            self.stats.frame_tx.immediate_ack += 1;
5125            qlog.frame(&Frame::ImmediateAck);
5126        }
5127
5128        // ACK
5129        // TODO(flub): Should this send acks for this path anyway?
5130
5131        if !path_exclusive_only {
5132            for path_id in space
5133                .number_spaces
5134                .iter_mut()
5135                .filter(|(_, pns)| pns.pending_acks.can_send())
5136                .map(|(&path_id, _)| path_id)
5137                .collect::<Vec<_>>()
5138            {
5139                Self::populate_acks(
5140                    now,
5141                    self.receiving_ecn,
5142                    &mut sent,
5143                    path_id,
5144                    space_id,
5145                    space,
5146                    is_multipath_negotiated,
5147                    buf,
5148                    &mut self.stats,
5149                    qlog,
5150                );
5151            }
5152        }
5153
5154        // ACK_FREQUENCY
5155        if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5156            let sequence_number = self.ack_frequency.next_sequence_number();
5157
5158            // Safe to unwrap because this is always provided when ACK frequency is enabled
5159            let config = self.config.ack_frequency_config.as_ref().unwrap();
5160
5161            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
5162            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5163                path.rtt.get(),
5164                config,
5165                &self.peer_params,
5166            );
5167
5168            trace!(?max_ack_delay, "ACK_FREQUENCY");
5169
5170            let frame = frame::AckFrequency {
5171                sequence: sequence_number,
5172                ack_eliciting_threshold: config.ack_eliciting_threshold,
5173                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5174                reordering_threshold: config.reordering_threshold,
5175            };
5176            frame.encode(buf);
5177            qlog.frame(&Frame::AckFrequency(frame));
5178
5179            sent.retransmits.get_or_create().ack_frequency = true;
5180
5181            self.ack_frequency
5182                .ack_frequency_sent(path_id, pn, max_ack_delay);
5183            self.stats.frame_tx.ack_frequency += 1;
5184        }
5185
5186        // PATH_CHALLENGE
5187        if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5188            && space_id == SpaceId::Data
5189            && path.send_new_challenge
5190            && !self.state.is_closed()
5191        // we don't want to send new challenges if we are already closing
5192        {
5193            path.send_new_challenge = false;
5194
5195            // Generate a new challenge every time we send a new PATH_CHALLENGE
5196            let token = self.rng.random();
5197            let info = paths::SentChallengeInfo {
5198                sent_instant: now,
5199                remote: path.remote,
5200            };
5201            path.challenges_sent.insert(token, info);
5202            sent.non_retransmits = true;
5203            sent.requires_padding = true;
5204            let challenge = frame::PathChallenge(token);
5205            trace!(frame = %challenge);
5206            buf.write(challenge);
5207            qlog.frame(&Frame::PathChallenge(challenge));
5208            self.stats.frame_tx.path_challenge += 1;
5209            let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5210            self.timers.set(
5211                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5212                now + pto,
5213                self.qlog.with_time(now),
5214            );
5215
5216            if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5217                // queue informing the path status along with the challenge
5218                space.pending.path_status.insert(path_id);
5219            }
5220
5221            // Always include an OBSERVED_ADDR frame with a PATH_CHALLENGE, regardless
5222            // of whether one has already been sent on this path.
5223            if space_id == SpaceId::Data
5224                && self
5225                    .config
5226                    .address_discovery_role
5227                    .should_report(&self.peer_params.address_discovery_role)
5228            {
5229                let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5230                if buf.remaining_mut() > frame.size() {
5231                    frame.encode(buf);
5232                    qlog.frame(&Frame::ObservedAddr(frame));
5233
5234                    self.next_observed_addr_seq_no =
5235                        self.next_observed_addr_seq_no.saturating_add(1u8);
5236                    path.observed_addr_sent = true;
5237
5238                    self.stats.frame_tx.observed_addr += 1;
5239                    sent.retransmits.get_or_create().observed_addr = true;
5240                    space.pending.observed_addr = false;
5241                }
5242            }
5243        }
5244
5245        // PATH_RESPONSE
5246        if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5247            if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5248                sent.non_retransmits = true;
5249                sent.requires_padding = true;
5250                let response = frame::PathResponse(token);
5251                trace!(frame = %response);
5252                buf.write(response);
5253                qlog.frame(&Frame::PathResponse(response));
5254                self.stats.frame_tx.path_response += 1;
5255
5256                // NOTE: this is technically not required but might be useful to ride the
5257                // request/response nature of path challenges to refresh an observation
5258                // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
5259                if space_id == SpaceId::Data
5260                    && self
5261                        .config
5262                        .address_discovery_role
5263                        .should_report(&self.peer_params.address_discovery_role)
5264                {
5265                    let frame =
5266                        frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5267                    if buf.remaining_mut() > frame.size() {
5268                        frame.encode(buf);
5269                        qlog.frame(&Frame::ObservedAddr(frame));
5270
5271                        self.next_observed_addr_seq_no =
5272                            self.next_observed_addr_seq_no.saturating_add(1u8);
5273                        path.observed_addr_sent = true;
5274
5275                        self.stats.frame_tx.observed_addr += 1;
5276                        sent.retransmits.get_or_create().observed_addr = true;
5277                        space.pending.observed_addr = false;
5278                    }
5279                }
5280            }
5281        }
5282
5283        // CRYPTO
5284        while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5285            let mut frame = match space.pending.crypto.pop_front() {
5286                Some(x) => x,
5287                None => break,
5288            };
5289
5290            // Calculate the maximum amount of crypto data we can store in the buffer.
5291            // Since the offset is known, we can reserve the exact size required to encode it.
5292            // For length we reserve 2bytes which allows to encode up to 2^14,
5293            // which is more than what fits into normally sized QUIC frames.
5294            let max_crypto_data_size = buf.remaining_mut()
5295                - 1 // Frame Type
5296                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5297                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
5298
5299            let len = frame
5300                .data
5301                .len()
5302                .min(2usize.pow(14) - 1)
5303                .min(max_crypto_data_size);
5304
5305            let data = frame.data.split_to(len);
5306            let truncated = frame::Crypto {
5307                offset: frame.offset,
5308                data,
5309            };
5310            trace!(
5311                "CRYPTO: off {} len {}",
5312                truncated.offset,
5313                truncated.data.len()
5314            );
5315            truncated.encode(buf);
5316            self.stats.frame_tx.crypto += 1;
5317
5318            // The clone is cheap but we still cfg it out if qlog is disabled.
5319            #[cfg(feature = "qlog")]
5320            qlog.frame(&Frame::Crypto(truncated.clone()));
5321            sent.retransmits.get_or_create().crypto.push_back(truncated);
5322            if !frame.data.is_empty() {
5323                frame.offset += len as u64;
5324                space.pending.crypto.push_front(frame);
5325            }
5326        }
5327
5328        // TODO(flub): maybe this is much higher priority?
5329        // PATH_ABANDON
5330        while !path_exclusive_only
5331            && space_id == SpaceId::Data
5332            && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5333        {
5334            let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5335                break;
5336            };
5337            let frame = frame::PathAbandon {
5338                path_id,
5339                error_code,
5340            };
5341            frame.encode(buf);
5342            qlog.frame(&Frame::PathAbandon(frame));
5343            self.stats.frame_tx.path_abandon += 1;
5344            trace!(%path_id, "PATH_ABANDON");
5345            sent.retransmits
5346                .get_or_create()
5347                .path_abandon
5348                .entry(path_id)
5349                .or_insert(error_code);
5350        }
5351
5352        // PATH_STATUS_AVAILABLE & PATH_STATUS_BACKUP
5353        while !path_exclusive_only
5354            && space_id == SpaceId::Data
5355            && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5356        {
5357            let Some(path_id) = space.pending.path_status.pop_first() else {
5358                break;
5359            };
5360            let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5361                trace!(%path_id, "discarding queued path status for unknown path");
5362                continue;
5363            };
5364
5365            let seq = path.status.seq();
5366            sent.retransmits.get_or_create().path_status.insert(path_id);
5367            match path.local_status() {
5368                PathStatus::Available => {
5369                    let frame = frame::PathStatusAvailable {
5370                        path_id,
5371                        status_seq_no: seq,
5372                    };
5373                    frame.encode(buf);
5374                    qlog.frame(&Frame::PathStatusAvailable(frame));
5375                    self.stats.frame_tx.path_status_available += 1;
5376                    trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5377                }
5378                PathStatus::Backup => {
5379                    let frame = frame::PathStatusBackup {
5380                        path_id,
5381                        status_seq_no: seq,
5382                    };
5383                    frame.encode(buf);
5384                    qlog.frame(&Frame::PathStatusBackup(frame));
5385                    self.stats.frame_tx.path_status_backup += 1;
5386                    trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5387                }
5388            }
5389        }
5390
5391        // MAX_PATH_ID
5392        if space_id == SpaceId::Data
5393            && space.pending.max_path_id
5394            && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5395        {
5396            let frame = frame::MaxPathId(self.local_max_path_id);
5397            frame.encode(buf);
5398            qlog.frame(&Frame::MaxPathId(frame));
5399            space.pending.max_path_id = false;
5400            sent.retransmits.get_or_create().max_path_id = true;
5401            trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5402            self.stats.frame_tx.max_path_id += 1;
5403        }
5404
5405        // PATHS_BLOCKED
5406        if space_id == SpaceId::Data
5407            && space.pending.paths_blocked
5408            && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5409        {
5410            let frame = frame::PathsBlocked(self.remote_max_path_id);
5411            frame.encode(buf);
5412            qlog.frame(&Frame::PathsBlocked(frame));
5413            space.pending.paths_blocked = false;
5414            sent.retransmits.get_or_create().paths_blocked = true;
5415            trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5416            self.stats.frame_tx.paths_blocked += 1;
5417        }
5418
5419        // PATH_CIDS_BLOCKED
5420        while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5421        {
5422            let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5423                break;
5424            };
5425            let next_seq = match self.rem_cids.get(&path_id) {
5426                Some(cid_queue) => cid_queue.active_seq() + 1,
5427                None => 0,
5428            };
5429            let frame = frame::PathCidsBlocked {
5430                path_id,
5431                next_seq: VarInt(next_seq),
5432            };
5433            frame.encode(buf);
5434            qlog.frame(&Frame::PathCidsBlocked(frame));
5435            sent.retransmits
5436                .get_or_create()
5437                .path_cids_blocked
5438                .push(path_id);
5439            trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5440            self.stats.frame_tx.path_cids_blocked += 1;
5441        }
5442
5443        // RESET_STREAM, STOP_SENDING, MAX_DATA, MAX_STREAM_DATA, MAX_STREAMS
5444        if space_id == SpaceId::Data {
5445            self.streams.write_control_frames(
5446                buf,
5447                &mut space.pending,
5448                &mut sent.retransmits,
5449                &mut self.stats.frame_tx,
5450                qlog,
5451            );
5452        }
5453
5454        // NEW_CONNECTION_ID
5455        let cid_len = self
5456            .local_cid_state
5457            .values()
5458            .map(|cid_state| cid_state.cid_len())
5459            .max()
5460            .expect("some local CID state must exist");
5461        let new_cid_size_bound =
5462            frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5463        while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5464            let issued = match space.pending.new_cids.pop() {
5465                Some(x) => x,
5466                None => break,
5467            };
5468            let retire_prior_to = self
5469                .local_cid_state
5470                .get(&issued.path_id)
5471                .map(|cid_state| cid_state.retire_prior_to())
5472                .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5473
5474            let cid_path_id = match is_multipath_negotiated {
5475                true => {
5476                    trace!(
5477                        path_id = ?issued.path_id,
5478                        sequence = issued.sequence,
5479                        id = %issued.id,
5480                        "PATH_NEW_CONNECTION_ID",
5481                    );
5482                    self.stats.frame_tx.path_new_connection_id += 1;
5483                    Some(issued.path_id)
5484                }
5485                false => {
5486                    trace!(
5487                        sequence = issued.sequence,
5488                        id = %issued.id,
5489                        "NEW_CONNECTION_ID"
5490                    );
5491                    debug_assert_eq!(issued.path_id, PathId::ZERO);
5492                    self.stats.frame_tx.new_connection_id += 1;
5493                    None
5494                }
5495            };
5496            let frame = frame::NewConnectionId {
5497                path_id: cid_path_id,
5498                sequence: issued.sequence,
5499                retire_prior_to,
5500                id: issued.id,
5501                reset_token: issued.reset_token,
5502            };
5503            frame.encode(buf);
5504            sent.retransmits.get_or_create().new_cids.push(issued);
5505            qlog.frame(&Frame::NewConnectionId(frame));
5506        }
5507
5508        // RETIRE_CONNECTION_ID
5509        let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5510        while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5511            let (path_id, sequence) = match space.pending.retire_cids.pop() {
5512                Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5513                    trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5514                    self.stats.frame_tx.retire_connection_id += 1;
5515                    (None, seq)
5516                }
5517                Some((path_id, seq)) => {
5518                    trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5519                    self.stats.frame_tx.path_retire_connection_id += 1;
5520                    (Some(path_id), seq)
5521                }
5522                None => break,
5523            };
5524            let frame = frame::RetireConnectionId { path_id, sequence };
5525            frame.encode(buf);
5526            qlog.frame(&Frame::RetireConnectionId(frame));
5527            sent.retransmits
5528                .get_or_create()
5529                .retire_cids
5530                .push((path_id.unwrap_or_default(), sequence));
5531        }
5532
5533        // DATAGRAM
5534        let mut sent_datagrams = false;
5535        while !path_exclusive_only
5536            && buf.remaining_mut() > Datagram::SIZE_BOUND
5537            && space_id == SpaceId::Data
5538        {
5539            let prev_remaining = buf.remaining_mut();
5540            match self.datagrams.write(buf) {
5541                true => {
5542                    sent_datagrams = true;
5543                    sent.non_retransmits = true;
5544                    self.stats.frame_tx.datagram += 1;
5545                    qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5546                }
5547                false => break,
5548            }
5549        }
5550        if self.datagrams.send_blocked && sent_datagrams {
5551            self.events.push_back(Event::DatagramsUnblocked);
5552            self.datagrams.send_blocked = false;
5553        }
5554
5555        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5556
5557        // NEW_TOKEN
5558        while let Some(remote_addr) = space.pending.new_tokens.pop() {
5559            if path_exclusive_only {
5560                break;
5561            }
5562            debug_assert_eq!(space_id, SpaceId::Data);
5563            let ConnectionSide::Server { server_config } = &self.side else {
5564                panic!("NEW_TOKEN frames should not be enqueued by clients");
5565            };
5566
5567            if remote_addr != path.remote {
5568                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
5569                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
5570                // frames upon an path change. Instead, when the new path becomes validated,
5571                // NEW_TOKEN frames may be enqueued for the new path instead.
5572                continue;
5573            }
5574
5575            let token = Token::new(
5576                TokenPayload::Validation {
5577                    ip: remote_addr.ip(),
5578                    issued: server_config.time_source.now(),
5579                },
5580                &mut self.rng,
5581            );
5582            let new_token = NewToken {
5583                token: token.encode(&*server_config.token_key).into(),
5584            };
5585
5586            if buf.remaining_mut() < new_token.size() {
5587                space.pending.new_tokens.push(remote_addr);
5588                break;
5589            }
5590
5591            trace!("NEW_TOKEN");
5592            new_token.encode(buf);
5593            qlog.frame(&Frame::NewToken(new_token));
5594            sent.retransmits
5595                .get_or_create()
5596                .new_tokens
5597                .push(remote_addr);
5598            self.stats.frame_tx.new_token += 1;
5599        }
5600
5601        // STREAM
5602        if !path_exclusive_only && space_id == SpaceId::Data {
5603            sent.stream_frames =
5604                self.streams
5605                    .write_stream_frames(buf, self.config.send_fairness, qlog);
5606            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5607        }
5608
5609        // ADD_ADDRESS
5610        // TODO(@divma): check if we need to do path exclusive filters
5611        while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5612            if let Some(added_address) = space.pending.add_address.pop_last() {
5613                trace!(
5614                    seq = %added_address.seq_no,
5615                    ip = ?added_address.ip,
5616                    port = added_address.port,
5617                    "ADD_ADDRESS",
5618                );
5619                added_address.encode(buf);
5620                sent.retransmits
5621                    .get_or_create()
5622                    .add_address
5623                    .insert(added_address);
5624                self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5625                qlog.frame(&Frame::AddAddress(added_address));
5626            } else {
5627                break;
5628            }
5629        }
5630
5631        // REMOVE_ADDRESS
5632        while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5633            if let Some(removed_address) = space.pending.remove_address.pop_last() {
5634                trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5635                removed_address.encode(buf);
5636                sent.retransmits
5637                    .get_or_create()
5638                    .remove_address
5639                    .insert(removed_address);
5640                self.stats.frame_tx.remove_address =
5641                    self.stats.frame_tx.remove_address.saturating_add(1);
5642                qlog.frame(&Frame::RemoveAddress(removed_address));
5643            } else {
5644                break;
5645            }
5646        }
5647
5648        sent
5649    }
5650
5651    /// Write pending ACKs into a buffer
5652    fn populate_acks(
5653        now: Instant,
5654        receiving_ecn: bool,
5655        sent: &mut SentFrames,
5656        path_id: PathId,
5657        space_id: SpaceId,
5658        space: &mut PacketSpace,
5659        is_multipath_negotiated: bool,
5660        buf: &mut impl BufMut,
5661        stats: &mut ConnectionStats,
5662        #[allow(unused)] qlog: &mut QlogSentPacket,
5663    ) {
5664        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
5665        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5666
5667        debug_assert!(
5668            is_multipath_negotiated || path_id == PathId::ZERO,
5669            "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5670        );
5671        if is_multipath_negotiated {
5672            debug_assert!(
5673                space_id == SpaceId::Data || path_id == PathId::ZERO,
5674                "path acks must be sent in 1RTT space (have {space_id:?})"
5675            );
5676        }
5677
5678        let pns = space.for_path(path_id);
5679        let ranges = pns.pending_acks.ranges();
5680        debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5681        let ecn = if receiving_ecn {
5682            Some(&pns.ecn_counters)
5683        } else {
5684            None
5685        };
5686        if let Some(max) = ranges.max() {
5687            sent.largest_acked.insert(path_id, max);
5688        }
5689
5690        let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5691        // TODO: This should come from `TransportConfig` if that gets configurable.
5692        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5693        let delay = delay_micros >> ack_delay_exp.into_inner();
5694
5695        if is_multipath_negotiated && space_id == SpaceId::Data {
5696            if !ranges.is_empty() {
5697                trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5698                frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5699                qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5700                stats.frame_tx.path_acks += 1;
5701            }
5702        } else {
5703            trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5704            frame::Ack::encode(delay as _, ranges, ecn, buf);
5705            stats.frame_tx.acks += 1;
5706            qlog.frame_ack(delay, ranges, ecn);
5707        }
5708    }
5709
5710    fn close_common(&mut self) {
5711        trace!("connection closed");
5712        self.timers.reset();
5713    }
5714
5715    fn set_close_timer(&mut self, now: Instant) {
5716        // QUIC-MULTIPATH § 2.6 Connection Closure: draining for 3*PTO with PTO the max of
5717        // the PTO for all paths.
5718        let pto_max = self.pto_max_path(self.highest_space, true);
5719        self.timers.set(
5720            Timer::Conn(ConnTimer::Close),
5721            now + 3 * pto_max,
5722            self.qlog.with_time(now),
5723        );
5724    }
5725
5726    /// Handle transport parameters received from the peer
5727    ///
5728    /// *rem_cid* and *loc_cid* are the source and destination CIDs respectively of the
5729    /// *packet into which the transport parameters arrived.
5730    fn handle_peer_params(
5731        &mut self,
5732        params: TransportParameters,
5733        loc_cid: ConnectionId,
5734        rem_cid: ConnectionId,
5735        now: Instant,
5736    ) -> Result<(), TransportError> {
5737        if Some(self.orig_rem_cid) != params.initial_src_cid
5738            || (self.side.is_client()
5739                && (Some(self.initial_dst_cid) != params.original_dst_cid
5740                    || self.retry_src_cid != params.retry_src_cid))
5741        {
5742            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5743                "CID authentication failure",
5744            ));
5745        }
5746        if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5747            return Err(TransportError::PROTOCOL_VIOLATION(
5748                "multipath must not use zero-length CIDs",
5749            ));
5750        }
5751
5752        self.set_peer_params(params);
5753        self.qlog.emit_peer_transport_params_received(self, now);
5754
5755        Ok(())
5756    }
5757
5758    fn set_peer_params(&mut self, params: TransportParameters) {
5759        self.streams.set_params(&params);
5760        self.idle_timeout =
5761            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5762        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5763
5764        if let Some(ref info) = params.preferred_address {
5765            // During the handshake PathId::ZERO exists.
5766            self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5767                path_id: None,
5768                sequence: 1,
5769                id: info.connection_id,
5770                reset_token: info.stateless_reset_token,
5771                retire_prior_to: 0,
5772            })
5773            .expect(
5774                "preferred address CID is the first received, and hence is guaranteed to be legal",
5775            );
5776            let remote = self.path_data(PathId::ZERO).remote;
5777            self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5778        }
5779        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
5780
5781        let mut multipath_enabled = None;
5782        if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5783            self.config.get_initial_max_path_id(),
5784            params.initial_max_path_id,
5785        ) {
5786            // multipath is enabled, register the local and remote maximums
5787            self.local_max_path_id = local_max_path_id;
5788            self.remote_max_path_id = remote_max_path_id;
5789            let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5790            debug!(%initial_max_path_id, "multipath negotiated");
5791            multipath_enabled = Some(initial_max_path_id);
5792        }
5793
5794        if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5795            self.config
5796                .max_remote_nat_traversal_addresses
5797                .zip(params.max_remote_nat_traversal_addresses)
5798        {
5799            if let Some(max_initial_paths) =
5800                multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5801            {
5802                let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5803                let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5804                self.iroh_hp =
5805                    iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5806                debug!(
5807                    %max_remote_addresses, %max_local_addresses,
5808                    "iroh hole punching negotiated"
5809                );
5810
5811                match self.side() {
5812                    Side::Client => {
5813                        if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5814                            // in this case the client might try to open `max_remote_addresses` new
5815                            // paths, but the current multipath configuration will not allow it
5816                            warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5817                        } else if max_local_addresses as u64
5818                            > params.active_connection_id_limit.into_inner()
5819                        {
5820                            // the server allows us to send at most `params.active_connection_id_limit`
5821                            // but they might need at least `max_local_addresses` to effectively send
5822                            // `PATH_CHALLENGE` frames to each advertised local address
5823                            warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5824                        }
5825                    }
5826                    Side::Server => {
5827                        if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5828                            warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5829                        }
5830                    }
5831                }
5832            } else {
5833                debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5834            }
5835        }
5836
5837        self.peer_params = params;
5838        let peer_max_udp_payload_size =
5839            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5840        self.path_data_mut(PathId::ZERO)
5841            .mtud
5842            .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5843    }
5844
5845    /// Decrypts a packet, returning the packet number on success
5846    fn decrypt_packet(
5847        &mut self,
5848        now: Instant,
5849        path_id: PathId,
5850        packet: &mut Packet,
5851    ) -> Result<Option<u64>, Option<TransportError>> {
5852        let result = packet_crypto::decrypt_packet_body(
5853            packet,
5854            path_id,
5855            &self.spaces,
5856            self.zero_rtt_crypto.as_ref(),
5857            self.key_phase,
5858            self.prev_crypto.as_ref(),
5859            self.next_crypto.as_ref(),
5860        )?;
5861
5862        let result = match result {
5863            Some(r) => r,
5864            None => return Ok(None),
5865        };
5866
5867        if result.outgoing_key_update_acked {
5868            if let Some(prev) = self.prev_crypto.as_mut() {
5869                prev.end_packet = Some((result.number, now));
5870                self.set_key_discard_timer(now, packet.header.space());
5871            }
5872        }
5873
5874        if result.incoming_key_update {
5875            trace!("key update authenticated");
5876            self.update_keys(Some((result.number, now)), true);
5877            self.set_key_discard_timer(now, packet.header.space());
5878        }
5879
5880        Ok(Some(result.number))
5881    }
5882
5883    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5884        trace!("executing key update");
5885        // Generate keys for the key phase after the one we're switching to, store them in
5886        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
5887        // `prev_crypto`.
5888        let new = self
5889            .crypto
5890            .next_1rtt_keys()
5891            .expect("only called for `Data` packets");
5892        self.key_phase_size = new
5893            .local
5894            .confidentiality_limit()
5895            .saturating_sub(KEY_UPDATE_MARGIN);
5896        let old = mem::replace(
5897            &mut self.spaces[SpaceId::Data]
5898                .crypto
5899                .as_mut()
5900                .unwrap() // safe because update_keys() can only be triggered by short packets
5901                .packet,
5902            mem::replace(self.next_crypto.as_mut().unwrap(), new),
5903        );
5904        self.spaces[SpaceId::Data]
5905            .iter_paths_mut()
5906            .for_each(|s| s.sent_with_keys = 0);
5907        self.prev_crypto = Some(PrevCrypto {
5908            crypto: old,
5909            end_packet,
5910            update_unacked: remote,
5911        });
5912        self.key_phase = !self.key_phase;
5913    }
5914
5915    fn peer_supports_ack_frequency(&self) -> bool {
5916        self.peer_params.min_ack_delay.is_some()
5917    }
5918
5919    /// Send an IMMEDIATE_ACK frame to the remote endpoint
5920    ///
5921    /// According to the spec, this will result in an error if the remote endpoint does not support
5922    /// the Acknowledgement Frequency extension
5923    pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5924        debug_assert_eq!(
5925            self.highest_space,
5926            SpaceId::Data,
5927            "immediate ack must be written in the data space"
5928        );
5929        self.spaces[self.highest_space]
5930            .for_path(path_id)
5931            .immediate_ack_pending = true;
5932    }
5933
5934    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
5935    #[cfg(test)]
5936    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5937        let (path_id, first_decode, remaining) = match &event.0 {
5938            ConnectionEventInner::Datagram(DatagramConnectionEvent {
5939                path_id,
5940                first_decode,
5941                remaining,
5942                ..
5943            }) => (path_id, first_decode, remaining),
5944            _ => return None,
5945        };
5946
5947        if remaining.is_some() {
5948            panic!("Packets should never be coalesced in tests");
5949        }
5950
5951        let decrypted_header = packet_crypto::unprotect_header(
5952            first_decode.clone(),
5953            &self.spaces,
5954            self.zero_rtt_crypto.as_ref(),
5955            self.peer_params.stateless_reset_token,
5956        )?;
5957
5958        let mut packet = decrypted_header.packet?;
5959        packet_crypto::decrypt_packet_body(
5960            &mut packet,
5961            *path_id,
5962            &self.spaces,
5963            self.zero_rtt_crypto.as_ref(),
5964            self.key_phase,
5965            self.prev_crypto.as_ref(),
5966            self.next_crypto.as_ref(),
5967        )
5968        .ok()?;
5969
5970        Some(packet.payload.to_vec())
5971    }
5972
5973    /// The number of bytes of packets containing retransmittable frames that have not been
5974    /// acknowledged or declared lost.
5975    #[cfg(test)]
5976    pub(crate) fn bytes_in_flight(&self) -> u64 {
5977        // TODO(@divma): consider including for multipath?
5978        self.path_data(PathId::ZERO).in_flight.bytes
5979    }
5980
5981    /// Number of bytes worth of non-ack-only packets that may be sent
5982    #[cfg(test)]
5983    pub(crate) fn congestion_window(&self) -> u64 {
5984        let path = self.path_data(PathId::ZERO);
5985        path.congestion
5986            .window()
5987            .saturating_sub(path.in_flight.bytes)
5988    }
5989
5990    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
5991    #[cfg(test)]
5992    pub(crate) fn is_idle(&self) -> bool {
5993        let current_timers = self.timers.values();
5994        current_timers
5995            .into_iter()
5996            .filter(|(timer, _)| {
5997                !matches!(
5998                    timer,
5999                    Timer::Conn(ConnTimer::KeepAlive)
6000                        | Timer::PerPath(_, PathTimer::PathKeepAlive)
6001                        | Timer::Conn(ConnTimer::PushNewCid)
6002                        | Timer::Conn(ConnTimer::KeyDiscard)
6003                )
6004            })
6005            .min_by_key(|(_, time)| *time)
6006            .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6007    }
6008
6009    /// Whether explicit congestion notification is in use on outgoing packets.
6010    #[cfg(test)]
6011    pub(crate) fn using_ecn(&self) -> bool {
6012        self.path_data(PathId::ZERO).sending_ecn
6013    }
6014
6015    /// The number of received bytes in the current path
6016    #[cfg(test)]
6017    pub(crate) fn total_recvd(&self) -> u64 {
6018        self.path_data(PathId::ZERO).total_recvd
6019    }
6020
6021    #[cfg(test)]
6022    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6023        self.local_cid_state
6024            .get(&PathId::ZERO)
6025            .unwrap()
6026            .active_seq()
6027    }
6028
6029    #[cfg(test)]
6030    #[track_caller]
6031    pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6032        self.local_cid_state
6033            .get(&PathId(path_id))
6034            .unwrap()
6035            .active_seq()
6036    }
6037
6038    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
6039    /// with updated `retire_prior_to` field set to `v`
6040    #[cfg(test)]
6041    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6042        let n = self
6043            .local_cid_state
6044            .get_mut(&PathId::ZERO)
6045            .unwrap()
6046            .assign_retire_seq(v);
6047        self.endpoint_events
6048            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6049    }
6050
6051    /// Check the current active remote CID sequence for `PathId::ZERO`
6052    #[cfg(test)]
6053    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6054        self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6055    }
6056
6057    /// Returns the detected maximum udp payload size for the current path
6058    #[cfg(test)]
6059    pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6060        self.path_data(path_id).current_mtu()
6061    }
6062
6063    /// Triggers path validation on all paths
6064    #[cfg(test)]
6065    pub(crate) fn trigger_path_validation(&mut self) {
6066        for path in self.paths.values_mut() {
6067            path.data.send_new_challenge = true;
6068        }
6069    }
6070
6071    /// Whether we have 1-RTT data to send
6072    ///
6073    /// This checks for frames that can only be sent in the data space (1-RTT):
6074    /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated.
6075    /// - Pending PATH_RESPONSE frames.
6076    /// - Pending data to send in STREAM frames.
6077    /// - Pending DATAGRAM frames to send.
6078    ///
6079    /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that
6080    /// may need to be sent.
6081    fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6082        let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6083            path.data.send_new_challenge
6084                || path
6085                    .prev
6086                    .as_ref()
6087                    .is_some_and(|(_, path)| path.send_new_challenge)
6088                || !path.data.path_responses.is_empty()
6089        });
6090        let other = self.streams.can_send_stream_data()
6091            || self
6092                .datagrams
6093                .outgoing
6094                .front()
6095                .is_some_and(|x| x.size(true) <= max_size);
6096        SendableFrames {
6097            acks: false,
6098            other,
6099            close: false,
6100            path_exclusive,
6101        }
6102    }
6103
6104    /// Terminate the connection instantly, without sending a close packet
6105    fn kill(&mut self, reason: ConnectionError) {
6106        self.close_common();
6107        self.state.move_to_drained(Some(reason));
6108        self.endpoint_events.push_back(EndpointEventInner::Drained);
6109    }
6110
6111    /// Storage size required for the largest packet that can be transmitted on all currently
6112    /// available paths
6113    ///
6114    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
6115    ///
6116    /// When multipath is enabled, this value is the minimum MTU across all available paths.
6117    pub fn current_mtu(&self) -> u16 {
6118        self.paths
6119            .iter()
6120            .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6121            .map(|(_path_id, path_state)| path_state.data.current_mtu())
6122            .min()
6123            .expect("There is always at least one available path")
6124    }
6125
6126    /// Size of non-frame data for a 1-RTT packet
6127    ///
6128    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
6129    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
6130    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
6131    /// latency and packet loss.
6132    fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6133        let pn_len = PacketNumber::new(
6134            pn,
6135            self.spaces[SpaceId::Data]
6136                .for_path(path)
6137                .largest_acked_packet
6138                .unwrap_or(0),
6139        )
6140        .len();
6141
6142        // 1 byte for flags
6143        1 + self
6144            .rem_cids
6145            .get(&path)
6146            .map(|cids| cids.active().len())
6147            .unwrap_or(20)      // Max CID len in QUIC v1
6148            + pn_len
6149            + self.tag_len_1rtt()
6150    }
6151
6152    fn predict_1rtt_overhead_no_pn(&self) -> usize {
6153        let pn_len = 4;
6154
6155        let cid_len = self
6156            .rem_cids
6157            .values()
6158            .map(|cids| cids.active().len())
6159            .max()
6160            .unwrap_or(20); // Max CID len in QUIC v1
6161
6162        // 1 byte for flags
6163        1 + cid_len + pn_len + self.tag_len_1rtt()
6164    }
6165
6166    fn tag_len_1rtt(&self) -> usize {
6167        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6168            Some(crypto) => Some(&*crypto.packet.local),
6169            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6170        };
6171        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
6172        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
6173        // but that would needlessly prevent sending datagrams during 0-RTT.
6174        key.map_or(16, |x| x.tag_len())
6175    }
6176
6177    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
6178    fn on_path_validated(&mut self, path_id: PathId) {
6179        self.path_data_mut(path_id).validated = true;
6180        let ConnectionSide::Server { server_config } = &self.side else {
6181            return;
6182        };
6183        let remote_addr = self.path_data(path_id).remote;
6184        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6185        new_tokens.clear();
6186        for _ in 0..server_config.validation_token.sent {
6187            new_tokens.push(remote_addr);
6188        }
6189    }
6190
6191    /// Handle new path status information: PATH_STATUS_AVAILABLE, PATH_STATUS_BACKUP
6192    fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6193        if let Some(path) = self.paths.get_mut(&path_id) {
6194            path.data.status.remote_update(status, status_seq_no);
6195        } else {
6196            debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6197        }
6198        self.events.push_back(
6199            PathEvent::RemoteStatus {
6200                id: path_id,
6201                status,
6202            }
6203            .into(),
6204        );
6205    }
6206
6207    /// Returns the maximum [`PathId`] to be used for sending in this connection.
6208    ///
6209    /// This is calculated as minimum between the local and remote's maximums when multipath is
6210    /// enabled, or `None` when disabled.
6211    ///
6212    /// For data that's received, we should use [`Self::local_max_path_id`] instead.
6213    /// The reasoning is that the remote might already have updated to its own newer
6214    /// [`Self::max_path_id`] after sending out a `MAX_PATH_ID` frame, but it got re-ordered.
6215    fn max_path_id(&self) -> Option<PathId> {
6216        if self.is_multipath_negotiated() {
6217            Some(self.remote_max_path_id.min(self.local_max_path_id))
6218        } else {
6219            None
6220        }
6221    }
6222
6223    /// Add addresses the local endpoint considers are reachable for nat traversal
6224    pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6225        if let Some(added) = self.iroh_hp.add_local_address(address)? {
6226            self.spaces[SpaceId::Data].pending.add_address.insert(added);
6227        };
6228        Ok(())
6229    }
6230
6231    /// Removes an address the endpoing no longer considers reachable for nat traversal
6232    ///
6233    /// Addresses not present in the set will be silently ignored.
6234    pub fn remove_nat_traversal_address(
6235        &mut self,
6236        address: SocketAddr,
6237    ) -> Result<(), iroh_hp::Error> {
6238        if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6239            self.spaces[SpaceId::Data]
6240                .pending
6241                .remove_address
6242                .insert(removed);
6243        }
6244        Ok(())
6245    }
6246
6247    /// Get the current local nat traversal addresses
6248    pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6249        self.iroh_hp.get_local_nat_traversal_addresses()
6250    }
6251
6252    /// Get the currently advertised nat traversal addresses by the server
6253    pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6254        Ok(self
6255            .iroh_hp
6256            .client_side()?
6257            .get_remote_nat_traversal_addresses())
6258    }
6259
6260    /// Attempts to open a path for nat traversal.
6261    ///
6262    /// `ipv6` indicates if the path should be opened using an IPV6 remote. If the address is
6263    /// ignored, it will return `None`.
6264    ///
6265    /// On success returns the [`PathId`] and remote address of the path, as well as whether the path
6266    /// existed for the adjusted remote.
6267    fn open_nat_traversal_path(
6268        &mut self,
6269        now: Instant,
6270        (ip, port): (IpAddr, u16),
6271        ipv6: bool,
6272    ) -> Result<Option<(PathId, SocketAddr, bool)>, PathError> {
6273        // If this endpoint is an IPv6 endpoint we use IPv6 addresses for all remotes.
6274        let remote = match ip {
6275            IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6276            IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6277            IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6278            IpAddr::V6(_) => {
6279                trace!("not using IPv6 nat candidate for IPv4 socket");
6280                return Ok(None);
6281            }
6282        };
6283        match self.open_path_ensure(remote, PathStatus::Backup, now) {
6284            Ok((path_id, path_was_known)) => {
6285                if path_was_known {
6286                    trace!(%path_id, %remote, "nat traversal: path existed for remote");
6287                }
6288                Ok(Some((path_id, remote, path_was_known)))
6289            }
6290            Err(e) => {
6291                debug!(%remote, %e, "nat traversal: failed to probe remote");
6292                Err(e)
6293            }
6294        }
6295    }
6296
6297    /// Initiates a new nat traversal round
6298    ///
6299    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
6300    /// frames, and initiating probing of the known remote addresses. When a new round is
6301    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
6302    ///
6303    /// Returns the server addresses that are now being probed.
6304    /// If addresses fail due to spurious errors, these might succeed later and not be returned in
6305    /// this set.
6306    pub fn initiate_nat_traversal_round(
6307        &mut self,
6308        now: Instant,
6309    ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6310        if self.state.is_closed() {
6311            return Err(iroh_hp::Error::Closed);
6312        }
6313
6314        let client_state = self.iroh_hp.client_side_mut()?;
6315        let iroh_hp::NatTraversalRound {
6316            new_round,
6317            reach_out_at,
6318            addresses_to_probe,
6319            prev_round_path_ids,
6320        } = client_state.initiate_nat_traversal_round()?;
6321
6322        self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6323
6324        for path_id in prev_round_path_ids {
6325            // TODO(@divma): this sounds reasonable but we need if this actually works for the
6326            // purposes of the protocol
6327            let validated = self
6328                .path(path_id)
6329                .map(|path| path.validated)
6330                .unwrap_or(false);
6331
6332            if !validated {
6333                let _ = self.close_path(
6334                    now,
6335                    path_id,
6336                    TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6337                );
6338            }
6339        }
6340
6341        let mut err = None;
6342
6343        let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6344        let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6345        let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6346
6347        for (id, address) in addresses_to_probe {
6348            match self.open_nat_traversal_path(now, address, ipv6) {
6349                Ok(None) => {}
6350                Ok(Some((path_id, remote, path_was_known))) => {
6351                    if !path_was_known {
6352                        path_ids.push(path_id);
6353                        probed_addresses.push(remote);
6354                    }
6355                }
6356                Err(e) => {
6357                    self.iroh_hp
6358                        .client_side_mut()
6359                        .expect("validated")
6360                        .report_in_continuation(id, e);
6361                    err.get_or_insert(e);
6362                }
6363            }
6364        }
6365
6366        if let Some(err) = err {
6367            // We failed to probe any addresses, bail out
6368            if probed_addresses.is_empty() {
6369                return Err(iroh_hp::Error::Multipath(err));
6370            }
6371        }
6372
6373        self.iroh_hp
6374            .client_side_mut()
6375            .expect("connection side validated")
6376            .set_round_path_ids(path_ids);
6377
6378        Ok(probed_addresses)
6379    }
6380
6381    /// Attempts to continue a nat traversal round by trying to open paths for pending client probes.
6382    ///
6383    /// If there was nothing to do, it returns `None`. Otherwise it returns whether the path was
6384    /// successfully open.
6385    fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6386        let client_state = self.iroh_hp.client_side_mut().ok()?;
6387        let (id, address) = client_state.continue_nat_traversal_round()?;
6388        let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6389        let open_result = self.open_nat_traversal_path(now, address, ipv6);
6390        let client_state = self.iroh_hp.client_side_mut().expect("validated");
6391        match open_result {
6392            Ok(None) => Some(true),
6393            Ok(Some((path_id, _remote, path_was_known))) => {
6394                if !path_was_known {
6395                    client_state.add_round_path_id(path_id);
6396                }
6397                Some(true)
6398            }
6399            Err(e) => {
6400                client_state.report_in_continuation(id, e);
6401                Some(false)
6402            }
6403        }
6404    }
6405}
6406
6407impl fmt::Debug for Connection {
6408    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6409        f.debug_struct("Connection")
6410            .field("handshake_cid", &self.handshake_cid)
6411            .finish()
6412    }
6413}
6414
6415#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6416enum PathBlocked {
6417    No,
6418    AntiAmplification,
6419    Congestion,
6420    Pacing,
6421}
6422
6423/// Fields of `Connection` specific to it being client-side or server-side
6424enum ConnectionSide {
6425    Client {
6426        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
6427        token: Bytes,
6428        token_store: Arc<dyn TokenStore>,
6429        server_name: String,
6430    },
6431    Server {
6432        server_config: Arc<ServerConfig>,
6433    },
6434}
6435
6436impl ConnectionSide {
6437    fn remote_may_migrate(&self, state: &State) -> bool {
6438        match self {
6439            Self::Server { server_config } => server_config.migration,
6440            Self::Client { .. } => {
6441                if let Some(hs) = state.as_handshake() {
6442                    hs.allow_server_migration
6443                } else {
6444                    false
6445                }
6446            }
6447        }
6448    }
6449
6450    fn is_client(&self) -> bool {
6451        self.side().is_client()
6452    }
6453
6454    fn is_server(&self) -> bool {
6455        self.side().is_server()
6456    }
6457
6458    fn side(&self) -> Side {
6459        match *self {
6460            Self::Client { .. } => Side::Client,
6461            Self::Server { .. } => Side::Server,
6462        }
6463    }
6464}
6465
6466impl From<SideArgs> for ConnectionSide {
6467    fn from(side: SideArgs) -> Self {
6468        match side {
6469            SideArgs::Client {
6470                token_store,
6471                server_name,
6472            } => Self::Client {
6473                token: token_store.take(&server_name).unwrap_or_default(),
6474                token_store,
6475                server_name,
6476            },
6477            SideArgs::Server {
6478                server_config,
6479                pref_addr_cid: _,
6480                path_validated: _,
6481            } => Self::Server { server_config },
6482        }
6483    }
6484}
6485
6486/// Parameters to `Connection::new` specific to it being client-side or server-side
6487pub(crate) enum SideArgs {
6488    Client {
6489        token_store: Arc<dyn TokenStore>,
6490        server_name: String,
6491    },
6492    Server {
6493        server_config: Arc<ServerConfig>,
6494        pref_addr_cid: Option<ConnectionId>,
6495        path_validated: bool,
6496    },
6497}
6498
6499impl SideArgs {
6500    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6501        match *self {
6502            Self::Client { .. } => None,
6503            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6504        }
6505    }
6506
6507    pub(crate) fn path_validated(&self) -> bool {
6508        match *self {
6509            Self::Client { .. } => true,
6510            Self::Server { path_validated, .. } => path_validated,
6511        }
6512    }
6513
6514    pub(crate) fn side(&self) -> Side {
6515        match *self {
6516            Self::Client { .. } => Side::Client,
6517            Self::Server { .. } => Side::Server,
6518        }
6519    }
6520}
6521
6522/// Reasons why a connection might be lost
6523#[derive(Debug, Error, Clone, PartialEq, Eq)]
6524pub enum ConnectionError {
6525    /// The peer doesn't implement any supported version
6526    #[error("peer doesn't implement any supported version")]
6527    VersionMismatch,
6528    /// The peer violated the QUIC specification as understood by this implementation
6529    #[error(transparent)]
6530    TransportError(#[from] TransportError),
6531    /// The peer's QUIC stack aborted the connection automatically
6532    #[error("aborted by peer: {0}")]
6533    ConnectionClosed(frame::ConnectionClose),
6534    /// The peer closed the connection
6535    #[error("closed by peer: {0}")]
6536    ApplicationClosed(frame::ApplicationClose),
6537    /// The peer is unable to continue processing this connection, usually due to having restarted
6538    #[error("reset by peer")]
6539    Reset,
6540    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
6541    ///
6542    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
6543    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
6544    /// and [`TransportConfig::keep_alive_interval()`].
6545    #[error("timed out")]
6546    TimedOut,
6547    /// The local application closed the connection
6548    #[error("closed")]
6549    LocallyClosed,
6550    /// The connection could not be created because not enough of the CID space is available
6551    ///
6552    /// Try using longer connection IDs.
6553    #[error("CIDs exhausted")]
6554    CidsExhausted,
6555}
6556
6557impl From<Close> for ConnectionError {
6558    fn from(x: Close) -> Self {
6559        match x {
6560            Close::Connection(reason) => Self::ConnectionClosed(reason),
6561            Close::Application(reason) => Self::ApplicationClosed(reason),
6562        }
6563    }
6564}
6565
6566// For compatibility with API consumers
6567impl From<ConnectionError> for io::Error {
6568    fn from(x: ConnectionError) -> Self {
6569        use ConnectionError::*;
6570        let kind = match x {
6571            TimedOut => io::ErrorKind::TimedOut,
6572            Reset => io::ErrorKind::ConnectionReset,
6573            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6574            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6575                io::ErrorKind::Other
6576            }
6577        };
6578        Self::new(kind, x)
6579    }
6580}
6581
6582/// Errors that might trigger a path being closed
6583// TODO(@divma): maybe needs to be reworked based on what we want to do with the public API
6584#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6585pub enum PathError {
6586    /// The extension was not negotiated with the peer
6587    #[error("multipath extension not negotiated")]
6588    MultipathNotNegotiated,
6589    /// Paths can only be opened client-side
6590    #[error("the server side may not open a path")]
6591    ServerSideNotAllowed,
6592    /// Current limits do not allow us to open more paths
6593    #[error("maximum number of concurrent paths reached")]
6594    MaxPathIdReached,
6595    /// No remote CIDs available to open a new path
6596    #[error("remoted CIDs exhausted")]
6597    RemoteCidsExhausted,
6598    /// Path could not be validated and will be abandoned
6599    #[error("path validation failed")]
6600    ValidationFailed,
6601    /// The remote address for the path is not supported by the endpoint
6602    #[error("invalid remote address")]
6603    InvalidRemoteAddress(SocketAddr),
6604}
6605
6606/// Errors triggered when abandoning a path
6607#[derive(Debug, Error, Clone, Eq, PartialEq)]
6608pub enum ClosePathError {
6609    /// The path is already closed or was never opened
6610    #[error("closed path")]
6611    ClosedPath,
6612    /// This is the last path, which can not be abandoned
6613    #[error("last open path")]
6614    LastOpenPath,
6615}
6616
6617#[derive(Debug, Error, Clone, Copy)]
6618#[error("Multipath extension not negotiated")]
6619pub struct MultipathNotNegotiated {
6620    _private: (),
6621}
6622
6623/// Events of interest to the application
6624#[derive(Debug)]
6625pub enum Event {
6626    /// The connection's handshake data is ready
6627    HandshakeDataReady,
6628    /// The connection was successfully established
6629    Connected,
6630    /// The TLS handshake was confirmed
6631    HandshakeConfirmed,
6632    /// The connection was lost
6633    ///
6634    /// Emitted if the peer closes the connection or an error is encountered.
6635    ConnectionLost {
6636        /// Reason that the connection was closed
6637        reason: ConnectionError,
6638    },
6639    /// Stream events
6640    Stream(StreamEvent),
6641    /// One or more application datagrams have been received
6642    DatagramReceived,
6643    /// One or more application datagrams have been sent after blocking
6644    DatagramsUnblocked,
6645    /// (Multi)Path events
6646    Path(PathEvent),
6647    /// Iroh's nat traversal events
6648    NatTraversal(iroh_hp::Event),
6649}
6650
6651impl From<PathEvent> for Event {
6652    fn from(source: PathEvent) -> Self {
6653        Self::Path(source)
6654    }
6655}
6656
6657fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6658    Duration::from_micros(params.max_ack_delay.0 * 1000)
6659}
6660
6661// Prevents overflow and improves behavior in extreme circumstances
6662const MAX_BACKOFF_EXPONENT: u32 = 16;
6663
6664/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
6665///
6666/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
6667/// plus some space for frames. We only care about handshake headers because short header packets
6668/// necessarily have smaller headers, and initial packets are only ever the first packet in a
6669/// datagram (because we coalesce in ascending packet space order and the only reason to split a
6670/// packet is when packet space changes).
6671const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6672
6673/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
6674///
6675/// Excludes packet-type-specific fields such as packet number or Initial token
6676// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
6677// scid len + scid + length + pn
6678const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6679    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6680
6681/// Perform key updates this many packets before the AEAD confidentiality limit.
6682///
6683/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
6684const KEY_UPDATE_MARGIN: u64 = 10_000;
6685
6686#[derive(Default)]
6687struct SentFrames {
6688    retransmits: ThinRetransmits,
6689    /// The packet number of the largest acknowledged packet for each path
6690    largest_acked: FxHashMap<PathId, u64>,
6691    stream_frames: StreamMetaVec,
6692    /// Whether the packet contains non-retransmittable frames (like datagrams)
6693    non_retransmits: bool,
6694    /// If the datagram containing these frames should be padded to the min MTU
6695    requires_padding: bool,
6696}
6697
6698impl SentFrames {
6699    /// Returns whether the packet contains only ACKs
6700    fn is_ack_only(&self, streams: &StreamsState) -> bool {
6701        !self.largest_acked.is_empty()
6702            && !self.non_retransmits
6703            && self.stream_frames.is_empty()
6704            && self.retransmits.is_empty(streams)
6705    }
6706}
6707
6708/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
6709///
6710/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
6711///
6712/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
6713///
6714/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
6715fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6716    match (x, y) {
6717        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6718        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6719        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6720        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6721    }
6722}
6723
6724#[cfg(test)]
6725mod tests {
6726    use super::*;
6727
6728    #[test]
6729    fn negotiate_max_idle_timeout_commutative() {
6730        let test_params = [
6731            (None, None, None),
6732            (None, Some(VarInt(0)), None),
6733            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6734            (Some(VarInt(0)), Some(VarInt(0)), None),
6735            (
6736                Some(VarInt(2)),
6737                Some(VarInt(0)),
6738                Some(Duration::from_millis(2)),
6739            ),
6740            (
6741                Some(VarInt(1)),
6742                Some(VarInt(4)),
6743                Some(Duration::from_millis(1)),
6744            ),
6745        ];
6746
6747        for (left, right, result) in test_params {
6748            assert_eq!(negotiate_max_idle_timeout(left, right), result);
6749            assert_eq!(negotiate_max_idle_timeout(right, left), result);
6750        }
6751    }
6752}