iroh_quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque, btree_map},
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    num::NonZeroU32,
8    ops::Not,
9    sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21    Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22    MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23    TransportErrorCode, VarInt,
24    cid_generator::ConnectionIdGenerator,
25    cid_queue::CidQueue,
26    coding::BufMutExt,
27    config::{ServerConfig, TransportConfig},
28    congestion::Controller,
29    connection::{
30        qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31        spaces::LostPacket,
32        timer::{ConnTimer, PathTimer},
33    },
34    crypto::{self, KeyPair, Keys, PacketKey},
35    frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36    iroh_hp,
37    packet::{
38        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39        PacketNumber, PartialDecode, SpaceId,
40    },
41    range_set::ArrayRangeSet,
42    shared::{
43        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44        EndpointEvent, EndpointEventInner,
45    },
46    token::{ResetToken, Token, TokenPayload},
47    transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77pub(crate) mod send_buffer;
78
79mod spaces;
80#[cfg(fuzzing)]
81pub use spaces::Retransmits;
82#[cfg(not(fuzzing))]
83use spaces::Retransmits;
84use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
85
86mod stats;
87pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
88
89mod streams;
90#[cfg(fuzzing)]
91pub use streams::StreamsState;
92#[cfg(not(fuzzing))]
93use streams::StreamsState;
94pub use streams::{
95    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
96    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
97};
98
99mod timer;
100use timer::{Timer, TimerTable};
101
102mod transmit_buf;
103use transmit_buf::TransmitBuf;
104
105mod state;
106
107#[cfg(not(fuzzing))]
108use state::State;
109#[cfg(fuzzing)]
110pub use state::State;
111use state::StateType;
112
113/// Protocol state and logic for a single QUIC connection
114///
115/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
116/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
117/// expects timeouts through various methods. A number of simple getter methods are exposed
118/// to allow callers to inspect some of the connection state.
119///
120/// `Connection` has roughly 4 types of methods:
121///
122/// - A. Simple getters, taking `&self`
123/// - B. Handlers for incoming events from the network or system, named `handle_*`.
124/// - C. State machine mutators, for incoming commands from the application. For convenience we
125///   refer to this as "performing I/O" below, however as per the design of this library none of the
126///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
127///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
128/// - D. Polling functions for outgoing events or actions for the caller to
129///   take, named `poll_*`.
130///
131/// The simplest way to use this API correctly is to call (B) and (C) whenever
132/// appropriate, then after each of those calls, as soon as feasible call all
133/// polling methods (D) and deal with their outputs appropriately, e.g. by
134/// passing it to the application or by making a system-level I/O call. You
135/// should call the polling functions in this order:
136///
137/// 1. [`poll_transmit`](Self::poll_transmit)
138/// 2. [`poll_timeout`](Self::poll_timeout)
139/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
140/// 4. [`poll`](Self::poll)
141///
142/// Currently the only actual dependency is from (2) to (1), however additional
143/// dependencies may be added in future, so the above order is recommended.
144///
145/// (A) may be called whenever desired.
146///
147/// Care should be made to ensure that the input events represent monotonically
148/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
149/// with events of the same [`Instant`] may be interleaved in any order with a
150/// call to [`handle_event`](Self::handle_event) at that same instant; however
151/// events or timeouts with different instants must not be interleaved.
152pub struct Connection {
153    endpoint_config: Arc<EndpointConfig>,
154    config: Arc<TransportConfig>,
155    rng: StdRng,
156    crypto: Box<dyn crypto::Session>,
157    /// The CID we initially chose, for use during the handshake
158    handshake_cid: ConnectionId,
159    /// The CID the peer initially chose, for use during the handshake
160    rem_handshake_cid: ConnectionId,
161    /// The "real" local IP address which was was used to receive the initial packet.
162    /// This is only populated for the server case, and if known
163    local_ip: Option<IpAddr>,
164    /// The [`PathData`] for each path
165    ///
166    /// This needs to be ordered because [`Connection::poll_transmit`] needs to
167    /// deterministically select the next PathId to send on.
168    // TODO(flub): well does it really? But deterministic is nice for now.
169    paths: BTreeMap<PathId, PathState>,
170    /// Incremented every time we see a new path
171    ///
172    /// Stored separately from `path.generation` to account for aborted migrations
173    path_counter: u64,
174    /// Whether MTU detection is supported in this environment
175    allow_mtud: bool,
176    state: State,
177    side: ConnectionSide,
178    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
179    zero_rtt_enabled: bool,
180    /// Set if 0-RTT is supported, then cleared when no longer needed.
181    zero_rtt_crypto: Option<ZeroRttCrypto>,
182    key_phase: bool,
183    /// How many packets are in the current key phase. Used only for `Data` space.
184    key_phase_size: u64,
185    /// Transport parameters set by the peer
186    peer_params: TransportParameters,
187    /// Source ConnectionId of the first packet received from the peer
188    orig_rem_cid: ConnectionId,
189    /// Destination ConnectionId sent by the client on the first Initial
190    initial_dst_cid: ConnectionId,
191    /// The value that the server included in the Source Connection ID field of a Retry packet, if
192    /// one was received
193    retry_src_cid: Option<ConnectionId>,
194    /// Events returned by [`Connection::poll`]
195    events: VecDeque<Event>,
196    endpoint_events: VecDeque<EndpointEventInner>,
197    /// Whether the spin bit is in use for this connection
198    spin_enabled: bool,
199    /// Outgoing spin bit state
200    spin: bool,
201    /// Packet number spaces: initial, handshake, 1-RTT
202    spaces: [PacketSpace; 3],
203    /// Highest usable [`SpaceId`]
204    highest_space: SpaceId,
205    /// 1-RTT keys used prior to a key update
206    prev_crypto: Option<PrevCrypto>,
207    /// 1-RTT keys to be used for the next key update
208    ///
209    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
210    /// spoofing key updates.
211    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
212    accepted_0rtt: bool,
213    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
214    permit_idle_reset: bool,
215    /// Negotiated idle timeout
216    idle_timeout: Option<Duration>,
217    timers: TimerTable,
218    /// Number of packets received which could not be authenticated
219    authentication_failures: u64,
220
221    //
222    // Queued non-retransmittable 1-RTT data
223    //
224    /// If the CONNECTION_CLOSE frame needs to be sent
225    close: bool,
226
227    //
228    // ACK frequency
229    //
230    ack_frequency: AckFrequencyState,
231
232    //
233    // Congestion Control
234    //
235    /// Whether the most recently received packet had an ECN codepoint set
236    receiving_ecn: bool,
237    /// Number of packets authenticated
238    total_authed_packets: u64,
239    /// Whether the last `poll_transmit` call yielded no data because there was
240    /// no outgoing application data.
241    app_limited: bool,
242
243    //
244    // ObservedAddr
245    //
246    /// Sequence number for the next observed address frame sent to the peer.
247    next_observed_addr_seq_no: VarInt,
248
249    streams: StreamsState,
250    /// Surplus remote CIDs for future use on new paths
251    ///
252    /// These are given out before multiple paths exist, also for paths that will never
253    /// exist.  So if multipath is supported the number of paths here will be higher than
254    /// the actual number of paths in use.
255    rem_cids: FxHashMap<PathId, CidQueue>,
256    /// Attributes of CIDs generated by local endpoint
257    ///
258    /// Any path that is allowed to be opened is present in this map, as well as the already
259    /// opened paths. However since CIDs are issued async by the endpoint driver via
260    /// connection events it can not be used to know if CIDs have been issued for a path or
261    /// not. See [`Connection::max_path_id_with_cids`] for this.
262    local_cid_state: FxHashMap<PathId, CidState>,
263    /// State of the unreliable datagram extension
264    datagrams: DatagramState,
265    /// Connection level statistics
266    stats: ConnectionStats,
267    /// Path level statistics
268    path_stats: FxHashMap<PathId, PathStats>,
269    /// QUIC version used for the connection.
270    version: u32,
271
272    //
273    // Multipath
274    //
275    /// Maximum number of concurrent paths
276    ///
277    /// Initially set from the [`TransportConfig::max_concurrent_multipath_paths`]. Even
278    /// when multipath is disabled this will be set to 1, it is not used in that case
279    /// though.
280    max_concurrent_paths: NonZeroU32,
281    /// Local maximum [`PathId`] to be used
282    ///
283    /// This is initially set to [`TransportConfig::get_initial_max_path_id`] when multipath
284    /// is negotiated, or to [`PathId::ZERO`] otherwise. This is essentially the value of
285    /// the highest MAX_PATH_ID frame sent.
286    ///
287    /// Any path with an ID equal or below this [`PathId`] is either:
288    ///
289    /// - Abandoned, if it is also in [`Connection::abandoned_paths`].
290    /// - Open, in this case it is present in [`Connection::paths`]
291    /// - Not yet opened, if it is in neither of these two places.
292    ///
293    /// Note that for not-yet-open there may or may not be any CIDs issued. See
294    /// [`Connection::max_path_id_with_cids`].
295    local_max_path_id: PathId,
296    /// Remote's maximum [`PathId`] to be used
297    ///
298    /// This is initially set to the peer's [`TransportParameters::initial_max_path_id`] when
299    /// multipath is negotiated, or to [`PathId::ZERO`] otherwise. A peer may increase this limit
300    /// by sending [`Frame::MaxPathId`] frames.
301    remote_max_path_id: PathId,
302    /// The greatest [`PathId`] we have issued CIDs for
303    ///
304    /// CIDs are only issued for `min(local_max_path_id, remote_max_path_id)`. It is not
305    /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
306    /// since they are issued asynchronously by the endpoint driver.
307    max_path_id_with_cids: PathId,
308    /// The paths already abandoned
309    ///
310    /// They may still have some state left in [`Connection::paths`] or
311    /// [`Connection::local_cid_state`] since some of this has to be kept around for some
312    /// time after a path is abandoned.
313    // TODO(flub): Make this a more efficient data structure.  Like ranges of abandoned
314    //    paths.  Or a set together with a minimum.  Or something.
315    abandoned_paths: FxHashSet<PathId>,
316
317    iroh_hp: iroh_hp::State,
318    qlog: QlogSink,
319}
320
321impl Connection {
322    pub(crate) fn new(
323        endpoint_config: Arc<EndpointConfig>,
324        config: Arc<TransportConfig>,
325        init_cid: ConnectionId,
326        loc_cid: ConnectionId,
327        rem_cid: ConnectionId,
328        remote: SocketAddr,
329        local_ip: Option<IpAddr>,
330        crypto: Box<dyn crypto::Session>,
331        cid_gen: &dyn ConnectionIdGenerator,
332        now: Instant,
333        version: u32,
334        allow_mtud: bool,
335        rng_seed: [u8; 32],
336        side_args: SideArgs,
337        qlog: QlogSink,
338    ) -> Self {
339        let pref_addr_cid = side_args.pref_addr_cid();
340        let path_validated = side_args.path_validated();
341        let connection_side = ConnectionSide::from(side_args);
342        let side = connection_side.side();
343        let mut rng = StdRng::from_seed(rng_seed);
344        let initial_space = {
345            let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
346            space.crypto = Some(crypto.initial_keys(init_cid, side));
347            space
348        };
349        let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
350        #[cfg(test)]
351        let data_space = match config.deterministic_packet_numbers {
352            true => PacketSpace::new_deterministic(now, SpaceId::Data),
353            false => PacketSpace::new(now, SpaceId::Data, &mut rng),
354        };
355        #[cfg(not(test))]
356        let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
357        let state = State::handshake(state::Handshake {
358            rem_cid_set: side.is_server(),
359            expected_token: Bytes::new(),
360            client_hello: None,
361            allow_server_migration: side.is_client(),
362        });
363        let local_cid_state = FxHashMap::from_iter([(
364            PathId::ZERO,
365            CidState::new(
366                cid_gen.cid_len(),
367                cid_gen.cid_lifetime(),
368                now,
369                if pref_addr_cid.is_some() { 2 } else { 1 },
370            ),
371        )]);
372
373        let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
374        // TODO(@divma): consider if we want to delay this until the path is validated
375        path.open = true;
376        let mut this = Self {
377            endpoint_config,
378            crypto,
379            handshake_cid: loc_cid,
380            rem_handshake_cid: rem_cid,
381            local_cid_state,
382            paths: BTreeMap::from_iter([(
383                PathId::ZERO,
384                PathState {
385                    data: path,
386                    prev: None,
387                },
388            )]),
389            path_counter: 0,
390            allow_mtud,
391            local_ip,
392            state,
393            side: connection_side,
394            zero_rtt_enabled: false,
395            zero_rtt_crypto: None,
396            key_phase: false,
397            // A small initial key phase size ensures peers that don't handle key updates correctly
398            // fail sooner rather than later. It's okay for both peers to do this, as the first one
399            // to perform an update will reset the other's key phase size in `update_keys`, and a
400            // simultaneous key update by both is just like a regular key update with a really fast
401            // response. Inspired by quic-go's similar behavior of performing the first key update
402            // at the 100th short-header packet.
403            key_phase_size: rng.random_range(10..1000),
404            peer_params: TransportParameters::default(),
405            orig_rem_cid: rem_cid,
406            initial_dst_cid: init_cid,
407            retry_src_cid: None,
408            events: VecDeque::new(),
409            endpoint_events: VecDeque::new(),
410            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
411            spin: false,
412            spaces: [initial_space, handshake_space, data_space],
413            highest_space: SpaceId::Initial,
414            prev_crypto: None,
415            next_crypto: None,
416            accepted_0rtt: false,
417            permit_idle_reset: true,
418            idle_timeout: match config.max_idle_timeout {
419                None | Some(VarInt(0)) => None,
420                Some(dur) => Some(Duration::from_millis(dur.0)),
421            },
422            timers: TimerTable::default(),
423            authentication_failures: 0,
424            close: false,
425
426            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
427                &TransportParameters::default(),
428            )),
429
430            app_limited: false,
431            receiving_ecn: false,
432            total_authed_packets: 0,
433
434            next_observed_addr_seq_no: 0u32.into(),
435
436            streams: StreamsState::new(
437                side,
438                config.max_concurrent_uni_streams,
439                config.max_concurrent_bidi_streams,
440                config.send_window,
441                config.receive_window,
442                config.stream_receive_window,
443            ),
444            datagrams: DatagramState::default(),
445            config,
446            rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
447            rng,
448            stats: ConnectionStats::default(),
449            path_stats: Default::default(),
450            version,
451
452            // peer params are not yet known, so multipath is not enabled
453            max_concurrent_paths: NonZeroU32::MIN,
454            local_max_path_id: PathId::ZERO,
455            remote_max_path_id: PathId::ZERO,
456            max_path_id_with_cids: PathId::ZERO,
457            abandoned_paths: Default::default(),
458
459            // iroh's nat traversal
460            iroh_hp: Default::default(),
461            qlog,
462        };
463        if path_validated {
464            this.on_path_validated(PathId::ZERO);
465        }
466        if side.is_client() {
467            // Kick off the connection
468            this.write_crypto();
469            this.init_0rtt(now);
470        }
471        this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
472        this
473    }
474
475    /// Returns the next time at which `handle_timeout` should be called
476    ///
477    /// The value returned may change after:
478    /// - the application performed some I/O on the connection
479    /// - a call was made to `handle_event`
480    /// - a call to `poll_transmit` returned `Some`
481    /// - a call was made to `handle_timeout`
482    #[must_use]
483    pub fn poll_timeout(&mut self) -> Option<Instant> {
484        self.timers.peek()
485    }
486
487    /// Returns application-facing events
488    ///
489    /// Connections should be polled for events after:
490    /// - a call was made to `handle_event`
491    /// - a call was made to `handle_timeout`
492    #[must_use]
493    pub fn poll(&mut self) -> Option<Event> {
494        if let Some(x) = self.events.pop_front() {
495            return Some(x);
496        }
497
498        if let Some(event) = self.streams.poll() {
499            return Some(Event::Stream(event));
500        }
501
502        if let Some(reason) = self.state.take_error() {
503            return Some(Event::ConnectionLost { reason });
504        }
505
506        None
507    }
508
509    /// Return endpoint-facing events
510    #[must_use]
511    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
512        self.endpoint_events.pop_front().map(EndpointEvent)
513    }
514
515    /// Provide control over streams
516    #[must_use]
517    pub fn streams(&mut self) -> Streams<'_> {
518        Streams {
519            state: &mut self.streams,
520            conn_state: &self.state,
521        }
522    }
523
524    /// Provide control over streams
525    #[must_use]
526    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
527        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
528        RecvStream {
529            id,
530            state: &mut self.streams,
531            pending: &mut self.spaces[SpaceId::Data].pending,
532        }
533    }
534
535    /// Provide control over streams
536    #[must_use]
537    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
538        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
539        SendStream {
540            id,
541            state: &mut self.streams,
542            pending: &mut self.spaces[SpaceId::Data].pending,
543            conn_state: &self.state,
544        }
545    }
546
547    /// Opens a new path only if no path to the remote address exists so far
548    ///
549    /// See [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id,
550    /// false)` if was opened.
551    ///
552    /// [`open_path`]: Connection::open_path
553    pub fn open_path_ensure(
554        &mut self,
555        remote: SocketAddr,
556        initial_status: PathStatus,
557        now: Instant,
558    ) -> Result<(PathId, bool), PathError> {
559        match self
560            .paths
561            .iter()
562            .find(|(_id, path)| path.data.remote == remote)
563        {
564            Some((path_id, _state)) => Ok((*path_id, true)),
565            None => self
566                .open_path(remote, initial_status, now)
567                .map(|id| (id, false)),
568        }
569    }
570
571    /// Opens a new path
572    ///
573    /// Further errors might occur and they will be emitted in [`PathEvent::LocallyClosed`] events.
574    /// When the path is opened it will be reported as an [`PathEvent::Opened`].
575    pub fn open_path(
576        &mut self,
577        remote: SocketAddr,
578        initial_status: PathStatus,
579        now: Instant,
580    ) -> Result<PathId, PathError> {
581        if !self.is_multipath_negotiated() {
582            return Err(PathError::MultipathNotNegotiated);
583        }
584        if self.side().is_server() {
585            return Err(PathError::ServerSideNotAllowed);
586        }
587
588        let max_abandoned = self.abandoned_paths.iter().max().copied();
589        let max_used = self.paths.keys().last().copied();
590        let path_id = max_abandoned
591            .max(max_used)
592            .unwrap_or(PathId::ZERO)
593            .saturating_add(1u8);
594
595        if Some(path_id) > self.max_path_id() {
596            return Err(PathError::MaxPathIdReached);
597        }
598        if path_id > self.remote_max_path_id {
599            self.spaces[SpaceId::Data].pending.paths_blocked = true;
600            return Err(PathError::MaxPathIdReached);
601        }
602        if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
603            self.spaces[SpaceId::Data]
604                .pending
605                .path_cids_blocked
606                .push(path_id);
607            return Err(PathError::RemoteCidsExhausted);
608        }
609
610        let path = self.ensure_path(path_id, remote, now, None);
611        path.status.local_update(initial_status);
612
613        Ok(path_id)
614    }
615
616    /// Closes a path by sending a PATH_ABANDON frame
617    ///
618    /// This will not allow closing the last path. It does allow closing paths which have
619    /// not yet been opened, as e.g. is the case when receiving a PATH_ABANDON from the peer
620    /// for a path that was never opened locally.
621    pub fn close_path(
622        &mut self,
623        now: Instant,
624        path_id: PathId,
625        error_code: VarInt,
626    ) -> Result<(), ClosePathError> {
627        if self.abandoned_paths.contains(&path_id)
628            || Some(path_id) > self.max_path_id()
629            || !self.paths.contains_key(&path_id)
630        {
631            return Err(ClosePathError::ClosedPath);
632        }
633        if self
634            .paths
635            .iter()
636            // Would there be any remaining, non-abandoned, validated paths
637            .any(|(id, path)| {
638                *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
639            })
640            .not()
641        {
642            return Err(ClosePathError::LastOpenPath);
643        }
644
645        // Send PATH_ABANDON
646        self.spaces[SpaceId::Data]
647            .pending
648            .path_abandon
649            .insert(path_id, error_code.into());
650
651        // Remove pending NEW CIDs for this path
652        let pending_space = &mut self.spaces[SpaceId::Data].pending;
653        pending_space.new_cids.retain(|cid| cid.path_id != path_id);
654        pending_space.path_cids_blocked.retain(|&id| id != path_id);
655        pending_space.path_status.retain(|&id| id != path_id);
656
657        // Cleanup retransmits across ALL paths (CIDs for path_id may have been transmitted on other paths)
658        for space in self.spaces[SpaceId::Data].iter_paths_mut() {
659            for sent_packet in space.sent_packets.values_mut() {
660                if let Some(retransmits) = sent_packet.retransmits.get_mut() {
661                    retransmits.new_cids.retain(|cid| cid.path_id != path_id);
662                    retransmits.path_cids_blocked.retain(|&id| id != path_id);
663                    retransmits.path_status.retain(|&id| id != path_id);
664                }
665            }
666        }
667
668        // Consider remotely issued CIDs as retired.
669        // Technically we don't have to do this just yet.  We only need to do this *after*
670        // the ABANDON_PATH frame is sent, allowing us to still send it on the
671        // to-be-abandoned path.  However it is recommended to send it on another path, and
672        // we do not allow abandoning the last path anyway.
673        self.rem_cids.remove(&path_id);
674        self.endpoint_events
675            .push_back(EndpointEventInner::RetireResetToken(path_id));
676
677        let pto = self.pto_max_path(SpaceId::Data);
678
679        let path = self.paths.get_mut(&path_id).expect("checked above");
680
681        // We record the time after which receiving data on this path generates a transport error.
682        path.data.last_allowed_receive = Some(now + 3 * pto);
683        self.abandoned_paths.insert(path_id);
684
685        self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
686
687        // The peer MUST respond with a corresponding PATH_ABANDON frame.
688        // If we receive packets on the abandoned path after 3 * PTO we trigger a transport error.
689        // In any case, we completely discard the path after 6 * PTO whether we receive a
690        // PATH_ABANDON or not.
691        self.timers.set(
692            Timer::PerPath(path_id, PathTimer::DiscardPath),
693            now + 6 * pto,
694            self.qlog.with_time(now),
695        );
696        Ok(())
697    }
698
699    /// Gets the [`PathData`] for a known [`PathId`].
700    ///
701    /// Will panic if the path_id does not reference any known path.
702    #[track_caller]
703    fn path_data(&self, path_id: PathId) -> &PathData {
704        if let Some(data) = self.paths.get(&path_id) {
705            &data.data
706        } else {
707            panic!(
708                "unknown path: {path_id}, currently known paths: {:?}",
709                self.paths.keys().collect::<Vec<_>>()
710            );
711        }
712    }
713
714    /// Gets a reference to the [`PathData`] for a [`PathId`]
715    fn path(&self, path_id: PathId) -> Option<&PathData> {
716        self.paths.get(&path_id).map(|path_state| &path_state.data)
717    }
718
719    /// Gets a mutable reference to the [`PathData`] for a [`PathId`]
720    fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
721        self.paths
722            .get_mut(&path_id)
723            .map(|path_state| &mut path_state.data)
724    }
725
726    /// Returns all known paths.
727    ///
728    /// There is no guarantee any of these paths are open or usable.
729    pub fn paths(&self) -> Vec<PathId> {
730        self.paths.keys().copied().collect()
731    }
732
733    /// Gets the local [`PathStatus`] for a known [`PathId`]
734    pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
735        self.path(path_id)
736            .map(PathData::local_status)
737            .ok_or(ClosedPath { _private: () })
738    }
739
740    /// Returns the path's remote socket address
741    pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
742        self.path(path_id)
743            .map(|path| path.remote)
744            .ok_or(ClosedPath { _private: () })
745    }
746
747    /// Sets the [`PathStatus`] for a known [`PathId`]
748    ///
749    /// Returns the previous path status on success.
750    pub fn set_path_status(
751        &mut self,
752        path_id: PathId,
753        status: PathStatus,
754    ) -> Result<PathStatus, SetPathStatusError> {
755        if !self.is_multipath_negotiated() {
756            return Err(SetPathStatusError::MultipathNotNegotiated);
757        }
758        let path = self
759            .path_mut(path_id)
760            .ok_or(SetPathStatusError::ClosedPath)?;
761        let prev = match path.status.local_update(status) {
762            Some(prev) => {
763                self.spaces[SpaceId::Data]
764                    .pending
765                    .path_status
766                    .insert(path_id);
767                prev
768            }
769            None => path.local_status(),
770        };
771        Ok(prev)
772    }
773
774    /// Returns the remote path status
775    // TODO(flub): Probably should also be some kind of path event?  Not even sure if I like
776    //    this as an API, but for now it allows me to write a test easily.
777    // TODO(flub): Technically this should be a Result<Option<PathSTatus>>?
778    pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
779        self.path(path_id).and_then(|path| path.remote_status())
780    }
781
782    /// Sets the max_idle_timeout for a specific path
783    ///
784    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
785    ///
786    /// Returns the previous value of the setting.
787    pub fn set_path_max_idle_timeout(
788        &mut self,
789        path_id: PathId,
790        timeout: Option<Duration>,
791    ) -> Result<Option<Duration>, ClosedPath> {
792        let path = self
793            .paths
794            .get_mut(&path_id)
795            .ok_or(ClosedPath { _private: () })?;
796        Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
797    }
798
799    /// Sets the keep_alive_interval for a specific path
800    ///
801    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
802    ///
803    /// Returns the previous value of the setting.
804    pub fn set_path_keep_alive_interval(
805        &mut self,
806        path_id: PathId,
807        interval: Option<Duration>,
808    ) -> Result<Option<Duration>, ClosedPath> {
809        let path = self
810            .paths
811            .get_mut(&path_id)
812            .ok_or(ClosedPath { _private: () })?;
813        Ok(std::mem::replace(&mut path.data.keep_alive, interval))
814    }
815
816    /// Gets the [`PathData`] for a known [`PathId`].
817    ///
818    /// Will panic if the path_id does not reference any known path.
819    #[track_caller]
820    fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
821        &mut self.paths.get_mut(&path_id).expect("known path").data
822    }
823
824    /// Check if the remote has been validated in any active path
825    fn is_remote_validated(&self, remote: SocketAddr) -> bool {
826        self.paths
827            .values()
828            .any(|path_state| path_state.data.remote == remote && path_state.data.validated)
829        // TODO(@divma): we might want to ensure the path has been recently active to consider the
830        // address validated
831    }
832
833    fn ensure_path(
834        &mut self,
835        path_id: PathId,
836        remote: SocketAddr,
837        now: Instant,
838        pn: Option<u64>,
839    ) -> &mut PathData {
840        let validated = self.is_remote_validated(remote);
841        let vacant_entry = match self.paths.entry(path_id) {
842            btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
843            btree_map::Entry::Occupied(occupied_entry) => {
844                return &mut occupied_entry.into_mut().data;
845            }
846        };
847
848        debug!(%validated, %path_id, ?remote, "path added");
849        let peer_max_udp_payload_size =
850            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
851        self.path_counter = self.path_counter.wrapping_add(1);
852        let mut data = PathData::new(
853            remote,
854            self.allow_mtud,
855            Some(peer_max_udp_payload_size),
856            self.path_counter,
857            now,
858            &self.config,
859        );
860
861        data.validated = validated;
862
863        let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
864        self.timers.set(
865            Timer::PerPath(path_id, PathTimer::PathOpen),
866            now + 3 * pto,
867            self.qlog.with_time(now),
868        );
869
870        // for the path to be opened we need to send a packet on the path. Sending a challenge
871        // guarantees this
872        data.send_new_challenge = true;
873
874        let path = vacant_entry.insert(PathState { data, prev: None });
875
876        let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
877        if let Some(pn) = pn {
878            pn_space.dedup.insert(pn);
879        }
880        self.spaces[SpaceId::Data]
881            .number_spaces
882            .insert(path_id, pn_space);
883        self.qlog.emit_tuple_assigned(path_id, remote, now);
884        &mut path.data
885    }
886
887    /// Returns packets to transmit
888    ///
889    /// Connections should be polled for transmit after:
890    /// - the application performed some I/O on the connection
891    /// - a call was made to `handle_event`
892    /// - a call was made to `handle_timeout`
893    ///
894    /// `max_datagrams` specifies how many datagrams can be returned inside a
895    /// single Transmit using GSO. This must be at least 1.
896    #[must_use]
897    pub fn poll_transmit(
898        &mut self,
899        now: Instant,
900        max_datagrams: usize,
901        buf: &mut Vec<u8>,
902    ) -> Option<Transmit> {
903        if let Some(probing) = self
904            .iroh_hp
905            .server_side_mut()
906            .ok()
907            .and_then(iroh_hp::ServerState::next_probe)
908        {
909            let destination = probing.remote();
910            trace!(%destination, "RAND_DATA packet");
911            let token: u64 = self.rng.random();
912            buf.put_u64(token);
913            probing.finish(token);
914            return Some(Transmit {
915                destination,
916                ecn: None,
917                size: 8,
918                segment_size: None,
919                src_ip: None,
920            });
921        }
922
923        assert!(max_datagrams != 0);
924        let max_datagrams = match self.config.enable_segmentation_offload {
925            false => 1,
926            true => max_datagrams,
927        };
928
929        // Each call to poll_transmit can only send datagrams to one destination, because
930        // all datagrams in a GSO batch are for the same destination.  Therefore only
931        // datagrams for one Path ID are produced for each poll_transmit call.
932
933        // TODO(flub): this is wishful thinking and not actually implemented, but perhaps it
934        //   should be:
935
936        // First, if we have to send a close, select a path for that.
937        // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending.
938
939        // For all, open, validated and AVAILABLE paths:
940        // - Is the path congestion blocked or pacing blocked?
941        // - call maybe_queue_ to ensure a tail-loss probe would be sent?
942        // - do we need to send a close message?
943        // - call can_send
944        // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths
945
946        // Check whether we need to send a close message
947        let close = match self.state.as_type() {
948            StateType::Drained => {
949                self.app_limited = true;
950                return None;
951            }
952            StateType::Draining | StateType::Closed => {
953                // self.close is only reset once the associated packet had been
954                // encoded successfully
955                if !self.close {
956                    self.app_limited = true;
957                    return None;
958                }
959                true
960            }
961            _ => false,
962        };
963
964        // Check whether we need to send an ACK_FREQUENCY frame
965        if let Some(config) = &self.config.ack_frequency_config {
966            let rtt = self
967                .paths
968                .values()
969                .map(|p| p.data.rtt.get())
970                .min()
971                .expect("one path exists");
972            self.spaces[SpaceId::Data].pending.ack_frequency = self
973                .ack_frequency
974                .should_send_ack_frequency(rtt, config, &self.peer_params)
975                && self.highest_space == SpaceId::Data
976                && self.peer_supports_ack_frequency();
977        }
978
979        // Whether this packet can be coalesced with another one in the same datagram.
980        let mut coalesce = true;
981
982        // Whether the last packet in the datagram must be padded so the datagram takes up
983        // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller.
984        let mut pad_datagram = PadDatagram::No;
985
986        // Whether congestion control stopped the next packet from being sent. Further
987        // packets could still be built, as e.g. tail-loss probes are not congestion
988        // limited.
989        let mut congestion_blocked = false;
990
991        // The packet number of the last built packet.
992        let mut last_packet_number = None;
993
994        let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
995
996        // If there is any open, validated and available path we only want to send frames to
997        // any backup path that must be sent on that backup path exclusively.
998        let have_available_path = self.paths.iter().any(|(id, path)| {
999            path.data.validated
1000                && path.data.local_status() == PathStatus::Available
1001                && self.rem_cids.contains_key(id)
1002        });
1003
1004        // Setup for the first path_id
1005        let mut transmit = TransmitBuf::new(
1006            buf,
1007            max_datagrams,
1008            self.path_data(path_id).current_mtu().into(),
1009        );
1010        if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1011            return Some(challenge);
1012        }
1013        let mut space_id = match path_id {
1014            PathId::ZERO => SpaceId::Initial,
1015            _ => SpaceId::Data,
1016        };
1017
1018        loop {
1019            // check if there is at least one active CID to use for sending
1020            let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1021                let err = PathError::RemoteCidsExhausted;
1022                if !self.abandoned_paths.contains(&path_id) {
1023                    debug!(?err, %path_id, "no active CID for path");
1024                    self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1025                        id: path_id,
1026                        error: err,
1027                    }));
1028                    // Locally we should have refused to open this path, the remote should
1029                    // have given us CIDs for this path before opening it.  So we can always
1030                    // abandon this here.
1031                    self.close_path(
1032                        now,
1033                        path_id,
1034                        TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1035                    )
1036                    .ok();
1037                    self.spaces[SpaceId::Data]
1038                        .pending
1039                        .path_cids_blocked
1040                        .push(path_id);
1041                } else {
1042                    trace!(%path_id, "remote CIDs retired for abandoned path");
1043                }
1044
1045                match self.paths.keys().find(|&&next| next > path_id) {
1046                    Some(next_path_id) => {
1047                        // See if this next path can send anything.
1048                        path_id = *next_path_id;
1049                        space_id = SpaceId::Data;
1050
1051                        // update per path state
1052                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1053                        if let Some(challenge) =
1054                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1055                        {
1056                            return Some(challenge);
1057                        }
1058
1059                        continue;
1060                    }
1061                    None => {
1062                        // Nothing more to send.
1063                        trace!(
1064                            ?space_id,
1065                            %path_id,
1066                            "no CIDs to send on path, no more paths"
1067                        );
1068                        break;
1069                    }
1070                }
1071            };
1072
1073            // Determine if anything can be sent in this packet number space (SpaceId +
1074            // PathId).
1075            let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1076                // We are trying to coalesce another packet into this datagram.
1077                transmit.datagram_remaining_mut()
1078            } else {
1079                // A new datagram needs to be started.
1080                transmit.segment_size()
1081            };
1082            let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1083            let path_should_send = {
1084                let path_exclusive_only = space_id == SpaceId::Data
1085                    && have_available_path
1086                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1087                let path_should_send = if path_exclusive_only {
1088                    can_send.path_exclusive
1089                } else {
1090                    !can_send.is_empty()
1091                };
1092                let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1093                path_should_send || needs_loss_probe || can_send.close
1094            };
1095
1096            if !path_should_send && space_id < SpaceId::Data {
1097                if self.spaces[space_id].crypto.is_some() {
1098                    trace!(?space_id, %path_id, "nothing to send in space");
1099                }
1100                space_id = space_id.next();
1101                continue;
1102            }
1103
1104            let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1105                // Only check congestion control if a new datagram is needed.
1106                self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1107            } else {
1108                PathBlocked::No
1109            };
1110            if send_blocked != PathBlocked::No {
1111                trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1112                congestion_blocked = true;
1113            }
1114            if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1115                // Higher spaces might still have tail-loss probes to send, which are not
1116                // congestion blocked.
1117                space_id = space_id.next();
1118                continue;
1119            }
1120            if !path_should_send || send_blocked != PathBlocked::No {
1121                // Nothing more to send on this path, check the next path if possible.
1122
1123                // If there are any datagrams in the transmit, packets for another path can
1124                // not be built.
1125                if transmit.num_datagrams() > 0 {
1126                    break;
1127                }
1128
1129                match self.paths.keys().find(|&&next| next > path_id) {
1130                    Some(next_path_id) => {
1131                        // See if this next path can send anything.
1132                        trace!(
1133                            ?space_id,
1134                            %path_id,
1135                            %next_path_id,
1136                            "nothing to send on path"
1137                        );
1138                        path_id = *next_path_id;
1139                        space_id = SpaceId::Data;
1140
1141                        // update per path state
1142                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1143                        if let Some(challenge) =
1144                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1145                        {
1146                            return Some(challenge);
1147                        }
1148
1149                        continue;
1150                    }
1151                    None => {
1152                        // Nothing more to send.
1153                        trace!(
1154                            ?space_id,
1155                            %path_id,
1156                            next_path_id=?None::<PathId>,
1157                            "nothing to send on path"
1158                        );
1159                        break;
1160                    }
1161                }
1162            }
1163
1164            // If the datagram is full, we need to start a new one.
1165            if transmit.datagram_remaining_mut() == 0 {
1166                if transmit.num_datagrams() >= transmit.max_datagrams() {
1167                    // No more datagrams allowed
1168                    break;
1169                }
1170
1171                match self.spaces[space_id].for_path(path_id).loss_probes {
1172                    0 => transmit.start_new_datagram(),
1173                    _ => {
1174                        // We need something to send for a tail-loss probe.
1175                        let request_immediate_ack =
1176                            space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1177                        self.spaces[space_id].maybe_queue_probe(
1178                            path_id,
1179                            request_immediate_ack,
1180                            &self.streams,
1181                        );
1182
1183                        self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1184
1185                        // Clamp the datagram to at most the minimum MTU to ensure that loss
1186                        // probes can get through and enable recovery even if the path MTU
1187                        // has shrank unexpectedly.
1188                        transmit.start_new_datagram_with_size(std::cmp::min(
1189                            usize::from(INITIAL_MTU),
1190                            transmit.segment_size(),
1191                        ));
1192                    }
1193                }
1194                trace!(count = transmit.num_datagrams(), "new datagram started");
1195                coalesce = true;
1196                pad_datagram = PadDatagram::No;
1197            }
1198
1199            // If coalescing another packet into the existing datagram, there should
1200            // still be enough space for a whole packet.
1201            if transmit.datagram_start_offset() < transmit.len() {
1202                debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1203            }
1204
1205            //
1206            // From here on, we've determined that a packet will definitely be sent.
1207            //
1208
1209            if self.spaces[SpaceId::Initial].crypto.is_some()
1210                && space_id == SpaceId::Handshake
1211                && self.side.is_client()
1212            {
1213                // A client stops both sending and processing Initial packets when it
1214                // sends its first Handshake packet.
1215                self.discard_space(now, SpaceId::Initial);
1216            }
1217            if let Some(ref mut prev) = self.prev_crypto {
1218                prev.update_unacked = false;
1219            }
1220
1221            let mut qlog = QlogSentPacket::default();
1222            let mut builder = PacketBuilder::new(
1223                now,
1224                space_id,
1225                path_id,
1226                remote_cid,
1227                &mut transmit,
1228                can_send.other,
1229                self,
1230                &mut qlog,
1231            )?;
1232            last_packet_number = Some(builder.exact_number);
1233            coalesce = coalesce && !builder.short_header;
1234
1235            if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1236                // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1
1237                pad_datagram |= PadDatagram::ToMinMtu;
1238            }
1239            if space_id == SpaceId::Data && self.config.pad_to_mtu {
1240                pad_datagram |= PadDatagram::ToSegmentSize;
1241            }
1242
1243            if can_send.close {
1244                trace!("sending CONNECTION_CLOSE");
1245                // Encode ACKs before the ConnectionClose message, to give the receiver
1246                // a better approximate on what data has been processed. This is
1247                // especially important with ack delay, since the peer might not
1248                // have gotten any other ACK for the data earlier on.
1249                let mut sent_frames = SentFrames::default();
1250                let is_multipath_negotiated = self.is_multipath_negotiated();
1251                for path_id in self.spaces[space_id]
1252                    .number_spaces
1253                    .iter()
1254                    .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1255                    .map(|(&path_id, _)| path_id)
1256                    .collect::<Vec<_>>()
1257                {
1258                    Self::populate_acks(
1259                        now,
1260                        self.receiving_ecn,
1261                        &mut sent_frames,
1262                        path_id,
1263                        space_id,
1264                        &mut self.spaces[space_id],
1265                        is_multipath_negotiated,
1266                        &mut builder.frame_space_mut(),
1267                        &mut self.stats,
1268                        &mut qlog,
1269                    );
1270                }
1271
1272                // Since there only 64 ACK frames there will always be enough space
1273                // to encode the ConnectionClose frame too. However we still have the
1274                // check here to prevent crashes if something changes.
1275                debug_assert!(
1276                    builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1277                    "ACKs should leave space for ConnectionClose"
1278                );
1279                if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1280                    let max_frame_size = builder.frame_space_remaining();
1281                    match self.state.as_type() {
1282                        StateType::Closed => {
1283                            let reason: Close =
1284                                self.state.as_closed().expect("checked").clone().into();
1285                            if space_id == SpaceId::Data || reason.is_transport_layer() {
1286                                reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1287                                qlog.frame(&Frame::Close(reason));
1288                            } else {
1289                                let frame = frame::ConnectionClose {
1290                                    error_code: TransportErrorCode::APPLICATION_ERROR,
1291                                    frame_type: None,
1292                                    reason: Bytes::new(),
1293                                };
1294                                frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1295                                qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1296                            }
1297                        }
1298                        StateType::Draining => {
1299                            let frame = frame::ConnectionClose {
1300                                error_code: TransportErrorCode::NO_ERROR,
1301                                frame_type: None,
1302                                reason: Bytes::new(),
1303                            };
1304                            frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1305                            qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1306                        }
1307                        _ => unreachable!(
1308                            "tried to make a close packet when the connection wasn't closed"
1309                        ),
1310                    };
1311                }
1312                builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1313                if space_id == self.highest_space {
1314                    // Don't send another close packet. Even with multipath we only send
1315                    // CONNECTION_CLOSE on a single path since we expect our paths to work.
1316                    self.close = false;
1317                    // `CONNECTION_CLOSE` is the final packet
1318                    break;
1319                } else {
1320                    // Send a close frame in every possible space for robustness, per
1321                    // RFC9000 "Immediate Close during the Handshake". Don't bother trying
1322                    // to send anything else.
1323                    space_id = space_id.next();
1324                    continue;
1325                }
1326            }
1327
1328            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that
1329            // path validation can occur while the link is saturated.
1330            if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1331                let path = self.path_data_mut(path_id);
1332                if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1333                    // TODO(flub): We need to use the right CID!  We shouldn't use the same
1334                    //    CID as the current active one for the path.  Though see also
1335                    //    https://github.com/quinn-rs/quinn/issues/2184
1336                    let response = frame::PathResponse(token);
1337                    trace!(%response, "(off-path)");
1338                    builder.frame_space_mut().write(response);
1339                    qlog.frame(&Frame::PathResponse(response));
1340                    self.stats.frame_tx.path_response += 1;
1341                    builder.finish_and_track(
1342                        now,
1343                        self,
1344                        path_id,
1345                        SentFrames {
1346                            non_retransmits: true,
1347                            ..SentFrames::default()
1348                        },
1349                        PadDatagram::ToMinMtu,
1350                        qlog,
1351                    );
1352                    self.stats.udp_tx.on_sent(1, transmit.len());
1353                    return Some(Transmit {
1354                        destination: remote,
1355                        size: transmit.len(),
1356                        ecn: None,
1357                        segment_size: None,
1358                        src_ip: self.local_ip,
1359                    });
1360                }
1361            }
1362
1363            let sent_frames = {
1364                let path_exclusive_only = have_available_path
1365                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1366                let pn = builder.exact_number;
1367                self.populate_packet(
1368                    now,
1369                    space_id,
1370                    path_id,
1371                    path_exclusive_only,
1372                    &mut builder.frame_space_mut(),
1373                    pn,
1374                    &mut qlog,
1375                )
1376            };
1377
1378            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
1379            // any other reason, there is a bug which leads to one component announcing write
1380            // readiness while not writing any data. This degrades performance. The condition is
1381            // only checked if the full MTU is available and when potentially large fixed-size
1382            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
1383            // writing ACKs.
1384            debug_assert!(
1385                !(sent_frames.is_ack_only(&self.streams)
1386                    && !can_send.acks
1387                    && can_send.other
1388                    && builder.buf.segment_size()
1389                        == self.path_data(path_id).current_mtu() as usize
1390                    && self.datagrams.outgoing.is_empty()),
1391                "SendableFrames was {can_send:?}, but only ACKs have been written"
1392            );
1393            if sent_frames.requires_padding {
1394                pad_datagram |= PadDatagram::ToMinMtu;
1395            }
1396
1397            for (path_id, _pn) in sent_frames.largest_acked.iter() {
1398                self.spaces[space_id]
1399                    .for_path(*path_id)
1400                    .pending_acks
1401                    .acks_sent();
1402                self.timers.stop(
1403                    Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1404                    self.qlog.with_time(now),
1405                );
1406            }
1407
1408            // Now we need to finish the packet.  Before we do so we need to know if we will
1409            // be coalescing the next packet into this one, or will be ending the datagram
1410            // as well.  Because if this is the last packet in the datagram more padding
1411            // might be needed because of the packet type, or to fill the GSO segment size.
1412
1413            // Are we allowed to coalesce AND is there enough space for another *packet* in
1414            // this datagram AND is there another packet to send in this or the next space?
1415            if coalesce
1416                && builder
1417                    .buf
1418                    .datagram_remaining_mut()
1419                    .saturating_sub(builder.predict_packet_end())
1420                    > MIN_PACKET_SPACE
1421                && self
1422                    .next_send_space(space_id, path_id, builder.buf, close)
1423                    .is_some()
1424            {
1425                // We can append/coalesce the next packet into the current
1426                // datagram. Finish the current packet without adding extra padding.
1427                builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1428            } else {
1429                // We need a new datagram for the next packet.  Finish the current
1430                // packet with padding.
1431                if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1432                    // If too many padding bytes would be required to continue the
1433                    // GSO batch after this packet, end the GSO batch here. Ensures
1434                    // that fixed-size frames with heterogeneous sizes
1435                    // (e.g. application datagrams) won't inadvertently waste large
1436                    // amounts of bandwidth. The exact threshold is a bit arbitrary
1437                    // and might benefit from further tuning, though there's no
1438                    // universally optimal value.
1439                    const MAX_PADDING: usize = 32;
1440                    if builder.buf.datagram_remaining_mut()
1441                        > builder.predict_packet_end() + MAX_PADDING
1442                    {
1443                        trace!(
1444                            "GSO truncated by demand for {} padding bytes",
1445                            builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1446                        );
1447                        builder.finish_and_track(
1448                            now,
1449                            self,
1450                            path_id,
1451                            sent_frames,
1452                            PadDatagram::No,
1453                            qlog,
1454                        );
1455                        break;
1456                    }
1457
1458                    // Pad the current datagram to GSO segment size so it can be
1459                    // included in the GSO batch.
1460                    builder.finish_and_track(
1461                        now,
1462                        self,
1463                        path_id,
1464                        sent_frames,
1465                        PadDatagram::ToSegmentSize,
1466                        qlog,
1467                    );
1468                } else {
1469                    builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1470                }
1471                if transmit.num_datagrams() == 1 {
1472                    transmit.clip_datagram_size();
1473                }
1474            }
1475        }
1476
1477        if let Some(last_packet_number) = last_packet_number {
1478            // Note that when sending in multiple packet spaces the last packet number will
1479            // be the one from the highest packet space.
1480            self.path_data_mut(path_id).congestion.on_sent(
1481                now,
1482                transmit.len() as u64,
1483                last_packet_number,
1484            );
1485        }
1486
1487        self.qlog.emit_recovery_metrics(
1488            path_id,
1489            &mut self.paths.get_mut(&path_id).unwrap().data,
1490            now,
1491        );
1492
1493        self.app_limited = transmit.is_empty() && !congestion_blocked;
1494
1495        // Send MTU probe if necessary
1496        if transmit.is_empty() && self.state.is_established() {
1497            // MTU probing happens only in Data space.
1498            let space_id = SpaceId::Data;
1499            path_id = *self.paths.first_key_value().expect("one path must exist").0;
1500            let probe_data = loop {
1501                // We MTU probe all paths for which all of the following is true:
1502                // - We have an active destination CID for the path.
1503                // - The remote address *and* path are validated.
1504                // - The path is not abandoned.
1505                // - The MTU Discovery subsystem wants to probe the path.
1506                let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1507                let eligible = self.path_data(path_id).validated
1508                    && !self.path_data(path_id).is_validating_path()
1509                    && !self.abandoned_paths.contains(&path_id);
1510                let probe_size = eligible
1511                    .then(|| {
1512                        let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1513                        self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1514                    })
1515                    .flatten();
1516                match (active_cid, probe_size) {
1517                    (Some(active_cid), Some(probe_size)) => {
1518                        // Let's send an MTUD probe!
1519                        break Some((active_cid, probe_size));
1520                    }
1521                    _ => {
1522                        // Find the next path to check if it needs an MTUD probe.
1523                        match self.paths.keys().find(|&&next| next > path_id) {
1524                            Some(next) => {
1525                                path_id = *next;
1526                                continue;
1527                            }
1528                            None => break None,
1529                        }
1530                    }
1531                }
1532            };
1533            if let Some((active_cid, probe_size)) = probe_data {
1534                // We are definitely sending a DPLPMTUD probe.
1535                debug_assert_eq!(transmit.num_datagrams(), 0);
1536                transmit.start_new_datagram_with_size(probe_size as usize);
1537
1538                let mut qlog = QlogSentPacket::default();
1539                let mut builder = PacketBuilder::new(
1540                    now,
1541                    space_id,
1542                    path_id,
1543                    active_cid,
1544                    &mut transmit,
1545                    true,
1546                    self,
1547                    &mut qlog,
1548                )?;
1549
1550                // We implement MTU probes as ping packets padded up to the probe size
1551                trace!(?probe_size, "writing MTUD probe");
1552                trace!("PING");
1553                builder.frame_space_mut().write(frame::FrameType::PING);
1554                qlog.frame(&Frame::Ping);
1555                self.stats.frame_tx.ping += 1;
1556
1557                // If supported by the peer, we want no delays to the probe's ACK
1558                if self.peer_supports_ack_frequency() {
1559                    trace!("IMMEDIATE_ACK");
1560                    builder
1561                        .frame_space_mut()
1562                        .write(frame::FrameType::IMMEDIATE_ACK);
1563                    self.stats.frame_tx.immediate_ack += 1;
1564                    qlog.frame(&Frame::ImmediateAck);
1565                }
1566
1567                let sent_frames = SentFrames {
1568                    non_retransmits: true,
1569                    ..Default::default()
1570                };
1571                builder.finish_and_track(
1572                    now,
1573                    self,
1574                    path_id,
1575                    sent_frames,
1576                    PadDatagram::ToSize(probe_size),
1577                    qlog,
1578                );
1579
1580                self.path_stats
1581                    .entry(path_id)
1582                    .or_default()
1583                    .sent_plpmtud_probes += 1;
1584            }
1585        }
1586
1587        if transmit.is_empty() {
1588            return None;
1589        }
1590
1591        let destination = self.path_data(path_id).remote;
1592        trace!(
1593            segment_size = transmit.segment_size(),
1594            last_datagram_len = transmit.len() % transmit.segment_size(),
1595            ?destination,
1596            "sending {} bytes in {} datagrams",
1597            transmit.len(),
1598            transmit.num_datagrams()
1599        );
1600        self.path_data_mut(path_id)
1601            .inc_total_sent(transmit.len() as u64);
1602
1603        self.stats
1604            .udp_tx
1605            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1606
1607        Some(Transmit {
1608            destination,
1609            size: transmit.len(),
1610            ecn: if self.path_data(path_id).sending_ecn {
1611                Some(EcnCodepoint::Ect0)
1612            } else {
1613                None
1614            },
1615            segment_size: match transmit.num_datagrams() {
1616                1 => None,
1617                _ => Some(transmit.segment_size()),
1618            },
1619            src_ip: self.local_ip,
1620        })
1621    }
1622
1623    /// Returns the [`SpaceId`] of the next packet space which has data to send
1624    ///
1625    /// This takes into account the space available to frames in the next datagram.
1626    // TODO(flub): This duplication is not nice.
1627    fn next_send_space(
1628        &mut self,
1629        current_space_id: SpaceId,
1630        path_id: PathId,
1631        buf: &TransmitBuf<'_>,
1632        close: bool,
1633    ) -> Option<SpaceId> {
1634        // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed
1635        // to be able to send an individual frame at least this large in the next 1-RTT
1636        // packet. This could be generalized to support every space, but it's only needed to
1637        // handle large fixed-size frames, which only exist in 1-RTT (application
1638        // datagrams). We don't account for coalesced packets potentially occupying space
1639        // because frames can always spill into the next datagram.
1640        let mut space_id = current_space_id;
1641        loop {
1642            let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1643            if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1644                return Some(space_id);
1645            }
1646            space_id = match space_id {
1647                SpaceId::Initial => SpaceId::Handshake,
1648                SpaceId::Handshake => SpaceId::Data,
1649                SpaceId::Data => break,
1650            }
1651        }
1652        None
1653    }
1654
1655    /// Checks if creating a new datagram would be blocked by congestion control
1656    fn path_congestion_check(
1657        &mut self,
1658        space_id: SpaceId,
1659        path_id: PathId,
1660        transmit: &TransmitBuf<'_>,
1661        can_send: &SendableFrames,
1662        now: Instant,
1663    ) -> PathBlocked {
1664        // Anti-amplification is only based on `total_sent`, which gets updated after
1665        // the transmit is sent. Therefore we pass the amount of bytes for datagrams
1666        // that are already created, as well as 1 byte for starting another datagram. If
1667        // there is any anti-amplification budget left, we always allow a full MTU to be
1668        // sent (see https://github.com/quinn-rs/quinn/issues/1082).
1669        if self.side().is_server()
1670            && self
1671                .path_data(path_id)
1672                .anti_amplification_blocked(transmit.len() as u64 + 1)
1673        {
1674            trace!(?space_id, %path_id, "blocked by anti-amplification");
1675            return PathBlocked::AntiAmplification;
1676        }
1677
1678        // Congestion control check.
1679        // Tail loss probes must not be blocked by congestion, or a deadlock could arise.
1680        let bytes_to_send = transmit.segment_size() as u64;
1681        let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1682
1683        if can_send.other && !need_loss_probe && !can_send.close {
1684            let path = self.path_data(path_id);
1685            if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1686                trace!(?space_id, %path_id, "blocked by congestion control");
1687                return PathBlocked::Congestion;
1688            }
1689        }
1690
1691        // Pacing check.
1692        if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1693            self.timers.set(
1694                Timer::PerPath(path_id, PathTimer::Pacing),
1695                delay,
1696                self.qlog.with_time(now),
1697            );
1698            // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though
1699            // they are not congestion controlled.
1700            trace!(?space_id, %path_id, "blocked by pacing");
1701            return PathBlocked::Pacing;
1702        }
1703
1704        PathBlocked::No
1705    }
1706
1707    /// Send PATH_CHALLENGE for a previous path if necessary
1708    ///
1709    /// QUIC-TRANSPORT section 9.3.3
1710    /// <https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding>
1711    fn send_prev_path_challenge(
1712        &mut self,
1713        now: Instant,
1714        buf: &mut TransmitBuf<'_>,
1715        path_id: PathId,
1716    ) -> Option<Transmit> {
1717        let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1718        // TODO (matheus23): We could use !prev_path.is_validating() here instead to
1719        // (possibly) also re-send challenges when they get lost.
1720        if !prev_path.send_new_challenge {
1721            return None;
1722        };
1723        prev_path.send_new_challenge = false;
1724        let destination = prev_path.remote;
1725        let token = self.rng.random();
1726        let info = paths::SentChallengeInfo {
1727            sent_instant: now,
1728            remote: destination,
1729        };
1730        prev_path.challenges_sent.insert(token, info);
1731        debug_assert_eq!(
1732            self.highest_space,
1733            SpaceId::Data,
1734            "PATH_CHALLENGE queued without 1-RTT keys"
1735        );
1736        buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1737
1738        // Use the previous CID to avoid linking the new path with the previous path. We
1739        // don't bother accounting for possible retirement of that prev_cid because this is
1740        // sent once, immediately after migration, when the CID is known to be valid. Even
1741        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1742        // this is sent first.
1743        debug_assert_eq!(buf.datagram_start_offset(), 0);
1744        let mut qlog = QlogSentPacket::default();
1745        let mut builder = PacketBuilder::new(
1746            now,
1747            SpaceId::Data,
1748            path_id,
1749            *prev_cid,
1750            buf,
1751            false,
1752            self,
1753            &mut qlog,
1754        )?;
1755        let challenge = frame::PathChallenge(token);
1756        trace!(%challenge, "validating previous path");
1757        qlog.frame(&Frame::PathChallenge(challenge));
1758        builder.frame_space_mut().write(challenge);
1759        self.stats.frame_tx.path_challenge += 1;
1760
1761        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1762        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1763        // unless the anti-amplification limit for the path does not permit
1764        // sending a datagram of this size
1765        builder.pad_to(MIN_INITIAL_SIZE);
1766
1767        builder.finish(self, now, qlog);
1768        self.stats.udp_tx.on_sent(1, buf.len());
1769
1770        Some(Transmit {
1771            destination,
1772            size: buf.len(),
1773            ecn: None,
1774            segment_size: None,
1775            src_ip: self.local_ip,
1776        })
1777    }
1778
1779    /// Indicate what types of frames are ready to send for the given space
1780    ///
1781    /// *packet_size* is the number of bytes available to build the next packet.  *close*
1782    /// *indicates whether a CONNECTION_CLOSE frame needs to be sent.
1783    fn space_can_send(
1784        &mut self,
1785        space_id: SpaceId,
1786        path_id: PathId,
1787        packet_size: usize,
1788        close: bool,
1789    ) -> SendableFrames {
1790        let pn = self.spaces[SpaceId::Data]
1791            .for_path(path_id)
1792            .peek_tx_number();
1793        let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1794        if self.spaces[space_id].crypto.is_none()
1795            && (space_id != SpaceId::Data
1796                || self.zero_rtt_crypto.is_none()
1797                || self.side.is_server())
1798        {
1799            // No keys available for this space
1800            return SendableFrames::empty();
1801        }
1802        let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1803        if space_id == SpaceId::Data {
1804            can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1805        }
1806
1807        can_send.close = close && self.spaces[space_id].crypto.is_some();
1808
1809        can_send
1810    }
1811
1812    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1813    ///
1814    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1815    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1816    /// extracted through the relevant methods.
1817    pub fn handle_event(&mut self, event: ConnectionEvent) {
1818        use ConnectionEventInner::*;
1819        match event.0 {
1820            Datagram(DatagramConnectionEvent {
1821                now,
1822                remote,
1823                path_id,
1824                ecn,
1825                first_decode,
1826                remaining,
1827            }) => {
1828                let span = trace_span!("pkt", %path_id);
1829                let _guard = span.enter();
1830                // If this packet could initiate a migration and we're a client or a server that
1831                // forbids migration, drop the datagram. This could be relaxed to heuristically
1832                // permit NAT-rebinding-like migration.
1833                if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1834                    if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1835                        trace!(
1836                            %path_id,
1837                            ?remote,
1838                            path_remote = ?self.path(path_id).map(|p| p.remote),
1839                            "discarding packet from unrecognized peer"
1840                        );
1841                        return;
1842                    }
1843                }
1844
1845                let was_anti_amplification_blocked = self
1846                    .path(path_id)
1847                    .map(|path| path.anti_amplification_blocked(1))
1848                    .unwrap_or(true); // if we don't know about this path it's eagerly considered as unvalidated
1849                // TODO(@divma): revisit this
1850
1851                self.stats.udp_rx.datagrams += 1;
1852                self.stats.udp_rx.bytes += first_decode.len() as u64;
1853                let data_len = first_decode.len();
1854
1855                self.handle_decode(now, remote, path_id, ecn, first_decode);
1856                // The current `path` might have changed inside `handle_decode` since the packet
1857                // could have triggered a migration. The packet might also belong to an unknown
1858                // path and have been rejected. Make sure the data received is accounted for the
1859                // most recent path by accessing `path` after `handle_decode`.
1860                if let Some(path) = self.path_mut(path_id) {
1861                    path.inc_total_recvd(data_len as u64);
1862                }
1863
1864                if let Some(data) = remaining {
1865                    self.stats.udp_rx.bytes += data.len() as u64;
1866                    self.handle_coalesced(now, remote, path_id, ecn, data);
1867                }
1868
1869                if let Some(path) = self.paths.get_mut(&path_id) {
1870                    self.qlog
1871                        .emit_recovery_metrics(path_id, &mut path.data, now);
1872                }
1873
1874                if was_anti_amplification_blocked {
1875                    // A prior attempt to set the loss detection timer may have failed due to
1876                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1877                    // the server's first flight is lost.
1878                    self.set_loss_detection_timer(now, path_id);
1879                }
1880            }
1881            NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1882                let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1883                debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1884                let cid_state = self
1885                    .local_cid_state
1886                    .entry(path_id)
1887                    .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1888                cid_state.new_cids(&ids, now);
1889
1890                ids.into_iter().rev().for_each(|frame| {
1891                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1892                });
1893                // Always update Timer::PushNewCid
1894                self.reset_cid_retirement(now);
1895            }
1896        }
1897    }
1898
1899    /// Process timer expirations
1900    ///
1901    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1902    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1903    /// methods.
1904    ///
1905    /// It is most efficient to call this immediately after the system clock reaches the latest
1906    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1907    /// no-op and therefore are safe.
1908    pub fn handle_timeout(&mut self, now: Instant) {
1909        while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1910            // TODO(@divma): remove `at` when the unicorn is born
1911            trace!(?timer, at=?now, "timeout");
1912            match timer {
1913                Timer::Conn(timer) => match timer {
1914                    ConnTimer::Close => {
1915                        self.state.move_to_drained(None);
1916                        self.endpoint_events.push_back(EndpointEventInner::Drained);
1917                    }
1918                    ConnTimer::Idle => {
1919                        self.kill(ConnectionError::TimedOut);
1920                    }
1921                    ConnTimer::KeepAlive => {
1922                        trace!("sending keep-alive");
1923                        self.ping();
1924                    }
1925                    ConnTimer::KeyDiscard => {
1926                        self.zero_rtt_crypto = None;
1927                        self.prev_crypto = None;
1928                    }
1929                    ConnTimer::PushNewCid => {
1930                        while let Some((path_id, when)) = self.next_cid_retirement() {
1931                            if when > now {
1932                                break;
1933                            }
1934                            match self.local_cid_state.get_mut(&path_id) {
1935                                None => error!(%path_id, "No local CID state for path"),
1936                                Some(cid_state) => {
1937                                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1938                                    let num_new_cid = cid_state.on_cid_timeout().into();
1939                                    if !self.state.is_closed() {
1940                                        trace!(
1941                                            "push a new CID to peer RETIRE_PRIOR_TO field {}",
1942                                            cid_state.retire_prior_to()
1943                                        );
1944                                        self.endpoint_events.push_back(
1945                                            EndpointEventInner::NeedIdentifiers(
1946                                                path_id,
1947                                                now,
1948                                                num_new_cid,
1949                                            ),
1950                                        );
1951                                    }
1952                                }
1953                            }
1954                        }
1955                    }
1956                },
1957                // TODO: add path_id as span somehow
1958                Timer::PerPath(path_id, timer) => {
1959                    let span = trace_span!("per-path timer fired", %path_id, ?timer);
1960                    let _guard = span.enter();
1961                    match timer {
1962                        PathTimer::PathIdle => {
1963                            self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1964                                .ok();
1965                        }
1966
1967                        PathTimer::PathKeepAlive => {
1968                            trace!("sending keep-alive on path");
1969                            self.ping_path(path_id).ok();
1970                        }
1971                        PathTimer::LossDetection => {
1972                            self.on_loss_detection_timeout(now, path_id);
1973                            self.qlog.emit_recovery_metrics(
1974                                path_id,
1975                                &mut self.paths.get_mut(&path_id).unwrap().data,
1976                                now,
1977                            );
1978                        }
1979                        PathTimer::PathValidation => {
1980                            let Some(path) = self.paths.get_mut(&path_id) else {
1981                                continue;
1982                            };
1983                            self.timers.stop(
1984                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1985                                self.qlog.with_time(now),
1986                            );
1987                            debug!("path validation failed");
1988                            if let Some((_, prev)) = path.prev.take() {
1989                                path.data = prev;
1990                            }
1991                            path.data.challenges_sent.clear();
1992                            path.data.send_new_challenge = false;
1993                        }
1994                        PathTimer::PathChallengeLost => {
1995                            let Some(path) = self.paths.get_mut(&path_id) else {
1996                                continue;
1997                            };
1998                            trace!("path challenge deemed lost");
1999                            path.data.send_new_challenge = true;
2000                        }
2001                        PathTimer::PathOpen => {
2002                            let Some(path) = self.path_mut(path_id) else {
2003                                continue;
2004                            };
2005                            path.challenges_sent.clear();
2006                            path.send_new_challenge = false;
2007                            debug!("new path validation failed");
2008                            if let Err(err) = self.close_path(
2009                                now,
2010                                path_id,
2011                                TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2012                            ) {
2013                                warn!(?err, "failed closing path");
2014                            }
2015
2016                            self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2017                                id: path_id,
2018                                error: PathError::ValidationFailed,
2019                            }));
2020                        }
2021                        PathTimer::Pacing => trace!("pacing timer expired"),
2022                        PathTimer::MaxAckDelay => {
2023                            trace!("max ack delay reached");
2024                            // This timer is only armed in the Data space
2025                            self.spaces[SpaceId::Data]
2026                                .for_path(path_id)
2027                                .pending_acks
2028                                .on_max_ack_delay_timeout()
2029                        }
2030                        PathTimer::DiscardPath => {
2031                            // The path was abandoned and 3*PTO has expired since.  Clean up all
2032                            // remaining state and install stateless reset token.
2033                            self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2034                            if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2035                                let (min_seq, max_seq) = loc_cid_state.active_seq();
2036                                for seq in min_seq..=max_seq {
2037                                    self.endpoint_events.push_back(
2038                                        EndpointEventInner::RetireConnectionId(
2039                                            now, path_id, seq, false,
2040                                        ),
2041                                    );
2042                                }
2043                            }
2044                            self.discard_path(path_id, now);
2045                        }
2046                    }
2047                }
2048            }
2049        }
2050    }
2051
2052    /// Close a connection immediately
2053    ///
2054    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
2055    /// call this only when all important communications have been completed, e.g. by calling
2056    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
2057    /// [`StreamEvent::Finished`] event.
2058    ///
2059    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
2060    /// delivered. There may still be data from the peer that has not been received.
2061    ///
2062    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
2063    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2064        self.close_inner(
2065            now,
2066            Close::Application(frame::ApplicationClose { error_code, reason }),
2067        )
2068    }
2069
2070    fn close_inner(&mut self, now: Instant, reason: Close) {
2071        let was_closed = self.state.is_closed();
2072        if !was_closed {
2073            self.close_common();
2074            self.set_close_timer(now);
2075            self.close = true;
2076            self.state.move_to_closed_local(reason);
2077        }
2078    }
2079
2080    /// Control datagrams
2081    pub fn datagrams(&mut self) -> Datagrams<'_> {
2082        Datagrams { conn: self }
2083    }
2084
2085    /// Returns connection statistics
2086    pub fn stats(&mut self) -> ConnectionStats {
2087        self.stats.clone()
2088    }
2089
2090    /// Returns path statistics
2091    pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2092        let path = self.paths.get(&path_id)?;
2093        let stats = self.path_stats.entry(path_id).or_default();
2094        stats.rtt = path.data.rtt.get();
2095        stats.cwnd = path.data.congestion.window();
2096        stats.current_mtu = path.data.mtud.current_mtu();
2097        Some(*stats)
2098    }
2099
2100    /// Ping the remote endpoint
2101    ///
2102    /// Causes an ACK-eliciting packet to be transmitted on the connection.
2103    pub fn ping(&mut self) {
2104        // TODO(flub): This is very brute-force: it pings *all* the paths.  Instead it would
2105        //    be nice if we could only send a single packet for this.
2106        for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2107            path_data.ping_pending = true;
2108        }
2109    }
2110
2111    /// Ping the remote endpoint over a specific path
2112    ///
2113    /// Causes an ACK-eliciting packet to be transmitted on the path.
2114    pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2115        let path_data = self.spaces[self.highest_space]
2116            .number_spaces
2117            .get_mut(&path)
2118            .ok_or(ClosedPath { _private: () })?;
2119        path_data.ping_pending = true;
2120        Ok(())
2121    }
2122
2123    /// Update traffic keys spontaneously
2124    ///
2125    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
2126    pub fn force_key_update(&mut self) {
2127        if !self.state.is_established() {
2128            debug!("ignoring forced key update in illegal state");
2129            return;
2130        }
2131        if self.prev_crypto.is_some() {
2132            // We already just updated, or are currently updating, the keys. Concurrent key updates
2133            // are illegal.
2134            debug!("ignoring redundant forced key update");
2135            return;
2136        }
2137        self.update_keys(None, false);
2138    }
2139
2140    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
2141    #[doc(hidden)]
2142    #[deprecated]
2143    pub fn initiate_key_update(&mut self) {
2144        self.force_key_update();
2145    }
2146
2147    /// Get a session reference
2148    pub fn crypto_session(&self) -> &dyn crypto::Session {
2149        &*self.crypto
2150    }
2151
2152    /// Whether the connection is in the process of being established
2153    ///
2154    /// If this returns `false`, the connection may be either established or closed, signaled by the
2155    /// emission of a `Connected` or `ConnectionLost` message respectively.
2156    pub fn is_handshaking(&self) -> bool {
2157        self.state.is_handshake()
2158    }
2159
2160    /// Whether the connection is closed
2161    ///
2162    /// Closed connections cannot transport any further data. A connection becomes closed when
2163    /// either peer application intentionally closes it, or when either transport layer detects an
2164    /// error such as a time-out or certificate validation failure.
2165    ///
2166    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
2167    pub fn is_closed(&self) -> bool {
2168        self.state.is_closed()
2169    }
2170
2171    /// Whether there is no longer any need to keep the connection around
2172    ///
2173    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
2174    /// packets from the peer. All drained connections have been closed.
2175    pub fn is_drained(&self) -> bool {
2176        self.state.is_drained()
2177    }
2178
2179    /// For clients, if the peer accepted the 0-RTT data packets
2180    ///
2181    /// The value is meaningless until after the handshake completes.
2182    pub fn accepted_0rtt(&self) -> bool {
2183        self.accepted_0rtt
2184    }
2185
2186    /// Whether 0-RTT is/was possible during the handshake
2187    pub fn has_0rtt(&self) -> bool {
2188        self.zero_rtt_enabled
2189    }
2190
2191    /// Whether there are any pending retransmits
2192    pub fn has_pending_retransmits(&self) -> bool {
2193        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2194    }
2195
2196    /// Look up whether we're the client or server of this Connection
2197    pub fn side(&self) -> Side {
2198        self.side.side()
2199    }
2200
2201    /// Get the address observed by the remote over the given path
2202    pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2203        self.path(path_id)
2204            .map(|path_data| {
2205                path_data
2206                    .last_observed_addr_report
2207                    .as_ref()
2208                    .map(|observed| observed.socket_addr())
2209            })
2210            .ok_or(ClosedPath { _private: () })
2211    }
2212
2213    /// The local IP address which was used when the peer established
2214    /// the connection
2215    ///
2216    /// This can be different from the address the endpoint is bound to, in case
2217    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
2218    ///
2219    /// This will return `None` for clients, or when no `local_ip` was passed to
2220    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
2221    /// connection.
2222    pub fn local_ip(&self) -> Option<IpAddr> {
2223        self.local_ip
2224    }
2225
2226    /// Current best estimate of this connection's latency (round-trip-time)
2227    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2228        self.path(path_id).map(|d| d.rtt.get())
2229    }
2230
2231    /// Current state of this connection's congestion controller, for debugging purposes
2232    pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2233        self.path(path_id).map(|d| d.congestion.as_ref())
2234    }
2235
2236    /// Modify the number of remotely initiated streams that may be concurrently open
2237    ///
2238    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
2239    /// `count`s increase both minimum and worst-case memory consumption.
2240    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2241        self.streams.set_max_concurrent(dir, count);
2242        // If the limit was reduced, then a flow control update previously deemed insignificant may
2243        // now be significant.
2244        let pending = &mut self.spaces[SpaceId::Data].pending;
2245        self.streams.queue_max_stream_id(pending);
2246    }
2247
2248    /// Modify the number of open paths allowed when multipath is enabled
2249    ///
2250    /// When reducing the number of concurrent paths this will only affect delaying sending
2251    /// new MAX_PATH_ID frames until fewer than this number of paths are possible.  To
2252    /// actively reduce paths they must be closed using [`Connection::close_path`], which
2253    /// can also be used to close not-yet-opened paths.
2254    ///
2255    /// If multipath is not negotiated (see the [`TransportConfig`]) this can not enable
2256    /// multipath and will fail.
2257    pub fn set_max_concurrent_paths(
2258        &mut self,
2259        now: Instant,
2260        count: NonZeroU32,
2261    ) -> Result<(), MultipathNotNegotiated> {
2262        if !self.is_multipath_negotiated() {
2263            return Err(MultipathNotNegotiated { _private: () });
2264        }
2265        self.max_concurrent_paths = count;
2266
2267        let in_use_count = self
2268            .local_max_path_id
2269            .next()
2270            .saturating_sub(self.abandoned_paths.len() as u32)
2271            .as_u32();
2272        let extra_needed = count.get().saturating_sub(in_use_count);
2273        let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2274
2275        self.set_max_path_id(now, new_max_path_id);
2276
2277        Ok(())
2278    }
2279
2280    /// If needed, issues a new MAX_PATH_ID frame and new CIDs for any newly allowed paths
2281    fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2282        if max_path_id <= self.local_max_path_id {
2283            return;
2284        }
2285
2286        self.local_max_path_id = max_path_id;
2287        self.spaces[SpaceId::Data].pending.max_path_id = true;
2288
2289        self.issue_first_path_cids(now);
2290    }
2291
2292    /// Current number of remotely initiated streams that may be concurrently open
2293    ///
2294    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
2295    /// it will not change immediately, even if fewer streams are open. Instead, it will
2296    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
2297    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2298        self.streams.max_concurrent(dir)
2299    }
2300
2301    /// See [`TransportConfig::send_window()`]
2302    pub fn set_send_window(&mut self, send_window: u64) {
2303        self.streams.set_send_window(send_window);
2304    }
2305
2306    /// See [`TransportConfig::receive_window()`]
2307    pub fn set_receive_window(&mut self, receive_window: VarInt) {
2308        if self.streams.set_receive_window(receive_window) {
2309            self.spaces[SpaceId::Data].pending.max_data = true;
2310        }
2311    }
2312
2313    /// Whether the Multipath for QUIC extension is enabled.
2314    ///
2315    /// Multipath is only enabled after the handshake is completed and if it was enabled by both
2316    /// peers.
2317    pub fn is_multipath_negotiated(&self) -> bool {
2318        !self.is_handshaking()
2319            && self.config.max_concurrent_multipath_paths.is_some()
2320            && self.peer_params.initial_max_path_id.is_some()
2321    }
2322
2323    fn on_ack_received(
2324        &mut self,
2325        now: Instant,
2326        space: SpaceId,
2327        ack: frame::Ack,
2328    ) -> Result<(), TransportError> {
2329        // All ACKs are referencing path 0
2330        let path = PathId::ZERO;
2331        self.inner_on_ack_received(now, space, path, ack)
2332    }
2333
2334    fn on_path_ack_received(
2335        &mut self,
2336        now: Instant,
2337        space: SpaceId,
2338        path_ack: frame::PathAck,
2339    ) -> Result<(), TransportError> {
2340        let (ack, path) = path_ack.into_ack();
2341        self.inner_on_ack_received(now, space, path, ack)
2342    }
2343
2344    /// Handles an ACK frame acknowledging packets sent on *path*.
2345    fn inner_on_ack_received(
2346        &mut self,
2347        now: Instant,
2348        space: SpaceId,
2349        path: PathId,
2350        ack: frame::Ack,
2351    ) -> Result<(), TransportError> {
2352        if self.abandoned_paths.contains(&path) {
2353            // See also https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.4.3-3
2354            // > PATH_ACK frames received with an abandoned path ID are silently ignored, as specified in Section 4.
2355            trace!("silently ignoring PATH_ACK on abandoned path");
2356            return Ok(());
2357        }
2358        if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2359            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2360        }
2361        let new_largest = {
2362            let space = &mut self.spaces[space].for_path(path);
2363            if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2364                space.largest_acked_packet = Some(ack.largest);
2365                if let Some(info) = space.sent_packets.get(ack.largest) {
2366                    // This should always succeed, but a misbehaving peer might ACK a packet we
2367                    // haven't sent. At worst, that will result in us spuriously reducing the
2368                    // congestion window.
2369                    space.largest_acked_packet_sent = info.time_sent;
2370                }
2371                true
2372            } else {
2373                false
2374            }
2375        };
2376
2377        if self.detect_spurious_loss(&ack, space, path) {
2378            self.path_data_mut(path)
2379                .congestion
2380                .on_spurious_congestion_event();
2381        }
2382
2383        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
2384        let mut newly_acked = ArrayRangeSet::new();
2385        for range in ack.iter() {
2386            self.spaces[space].for_path(path).check_ack(range.clone())?;
2387            for (pn, _) in self.spaces[space]
2388                .for_path(path)
2389                .sent_packets
2390                .iter_range(range)
2391            {
2392                newly_acked.insert_one(pn);
2393            }
2394        }
2395
2396        if newly_acked.is_empty() {
2397            return Ok(());
2398        }
2399
2400        let mut ack_eliciting_acked = false;
2401        for packet in newly_acked.elts() {
2402            if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2403                for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2404                    // Assume ACKs for all packets below the largest acknowledged in
2405                    // `packet` have been received. This can cause the peer to spuriously
2406                    // retransmit if some of our earlier ACKs were lost, but allows for
2407                    // simpler state tracking. See discussion at
2408                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
2409                    if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2410                        pns.pending_acks.subtract_below(*acked_pn);
2411                    }
2412                }
2413                ack_eliciting_acked |= info.ack_eliciting;
2414
2415                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
2416                let path_data = self.path_data_mut(path);
2417                let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2418                if mtu_updated {
2419                    path_data
2420                        .congestion
2421                        .on_mtu_update(path_data.mtud.current_mtu());
2422                }
2423
2424                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
2425                self.ack_frequency.on_acked(path, packet);
2426
2427                self.on_packet_acked(now, path, info);
2428            }
2429        }
2430
2431        let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2432        let app_limited = self.app_limited;
2433        let path_data = self.path_data_mut(path);
2434        let in_flight = path_data.in_flight.bytes;
2435
2436        path_data
2437            .congestion
2438            .on_end_acks(now, in_flight, app_limited, largest_ackd);
2439
2440        if new_largest && ack_eliciting_acked {
2441            let ack_delay = if space != SpaceId::Data {
2442                Duration::from_micros(0)
2443            } else {
2444                cmp::min(
2445                    self.ack_frequency.peer_max_ack_delay,
2446                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2447                )
2448            };
2449            let rtt = now.saturating_duration_since(
2450                self.spaces[space].for_path(path).largest_acked_packet_sent,
2451            );
2452
2453            let next_pn = self.spaces[space].for_path(path).next_packet_number;
2454            let path_data = self.path_data_mut(path);
2455            // TODO(@divma): should be a method of path, should be contained in a single place
2456            path_data.rtt.update(ack_delay, rtt);
2457            if path_data.first_packet_after_rtt_sample.is_none() {
2458                path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2459            }
2460        }
2461
2462        // Must be called before crypto/pto_count are clobbered
2463        self.detect_lost_packets(now, space, path, true);
2464
2465        if self.peer_completed_address_validation(path) {
2466            self.path_data_mut(path).pto_count = 0;
2467        }
2468
2469        // Explicit congestion notification
2470        // TODO(@divma): this code is a good example of logic that should be contained in a single
2471        // place but it's split between the path data and the packet number space data, we should
2472        // find a way to make this work without two lookups
2473        if self.path_data(path).sending_ecn {
2474            if let Some(ecn) = ack.ecn {
2475                // We only examine ECN counters from ACKs that we are certain we received in transmit
2476                // order, allowing us to compute an increase in ECN counts to compare against the number
2477                // of newly acked packets that remains well-defined in the presence of arbitrary packet
2478                // reordering.
2479                if new_largest {
2480                    let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2481                    self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2482                }
2483            } else {
2484                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
2485                debug!("ECN not acknowledged by peer");
2486                self.path_data_mut(path).sending_ecn = false;
2487            }
2488        }
2489
2490        self.set_loss_detection_timer(now, path);
2491        Ok(())
2492    }
2493
2494    fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2495        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2496
2497        if lost_packets.is_empty() {
2498            return false;
2499        }
2500
2501        for range in ack.iter() {
2502            let spurious_losses: Vec<u64> = lost_packets
2503                .iter_range(range.clone())
2504                .map(|(pn, _info)| pn)
2505                .collect();
2506
2507            for pn in spurious_losses {
2508                lost_packets.remove(pn);
2509            }
2510        }
2511
2512        // If this ACK frame acknowledged all deemed lost packets,
2513        // then we have raised a spurious congestion event in the past.
2514        // We cannot conclude when there are remaining packets,
2515        // but future ACK frames might indicate a spurious loss detection.
2516        lost_packets.is_empty()
2517    }
2518
2519    /// Drain lost packets that we reasonably think will never arrive
2520    ///
2521    /// The current criterion is copied from `msquic`:
2522    /// discard packets that were sent earlier than 2 probe timeouts ago.
2523    fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2524        let two_pto = 2 * self.path_data(path).rtt.pto_base();
2525
2526        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2527        lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2528    }
2529
2530    /// Process a new ECN block from an in-order ACK
2531    fn process_ecn(
2532        &mut self,
2533        now: Instant,
2534        space: SpaceId,
2535        path: PathId,
2536        newly_acked: u64,
2537        ecn: frame::EcnCounts,
2538        largest_sent_time: Instant,
2539    ) {
2540        match self.spaces[space]
2541            .for_path(path)
2542            .detect_ecn(newly_acked, ecn)
2543        {
2544            Err(e) => {
2545                debug!("halting ECN due to verification failure: {}", e);
2546
2547                self.path_data_mut(path).sending_ecn = false;
2548                // Wipe out the existing value because it might be garbage and could interfere with
2549                // future attempts to use ECN on new paths.
2550                self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2551            }
2552            Ok(false) => {}
2553            Ok(true) => {
2554                self.path_stats.entry(path).or_default().congestion_events += 1;
2555                self.path_data_mut(path).congestion.on_congestion_event(
2556                    now,
2557                    largest_sent_time,
2558                    false,
2559                    true,
2560                    0,
2561                );
2562            }
2563        }
2564    }
2565
2566    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
2567    // high-latency handshakes
2568    fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2569        self.paths
2570            .get_mut(&path_id)
2571            .expect("known path")
2572            .remove_in_flight(&info);
2573        let app_limited = self.app_limited;
2574        let path = self.path_data_mut(path_id);
2575        if info.ack_eliciting && !path.is_validating_path() {
2576            // Only pass ACKs to the congestion controller if we are not validating the current
2577            // path, so as to ignore any ACKs from older paths still coming in.
2578            let rtt = path.rtt;
2579            path.congestion
2580                .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2581        }
2582
2583        // Update state for confirmed delivery of frames
2584        if let Some(retransmits) = info.retransmits.get() {
2585            for (id, _) in retransmits.reset_stream.iter() {
2586                self.streams.reset_acked(*id);
2587            }
2588        }
2589
2590        for frame in info.stream_frames {
2591            self.streams.received_ack_of(frame);
2592        }
2593    }
2594
2595    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2596        let start = if self.zero_rtt_crypto.is_some() {
2597            now
2598        } else {
2599            self.prev_crypto
2600                .as_ref()
2601                .expect("no previous keys")
2602                .end_packet
2603                .as_ref()
2604                .expect("update not acknowledged yet")
2605                .1
2606        };
2607
2608        // QUIC-MULTIPATH § 2.5 Key Phase Update Process: use largest PTO off all paths.
2609        self.timers.set(
2610            Timer::Conn(ConnTimer::KeyDiscard),
2611            start + self.pto_max_path(space) * 3,
2612            self.qlog.with_time(now),
2613        );
2614    }
2615
2616    /// Handle a [`PathTimer::LossDetection`] timeout.
2617    ///
2618    /// This timer expires for two reasons:
2619    /// - An ACK-eliciting packet we sent should be considered lost.
2620    /// - The PTO may have expired and a tail-loss probe needs to be scheduled.
2621    ///
2622    /// The former needs us to schedule re-transmission of the lost data.
2623    ///
2624    /// The latter means we have not received an ACK for an ack-eliciting packet we sent
2625    /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
2626    /// packet, to try and elicit new acknowledgements. These new acknowledgements will
2627    /// indicate whether the previously sent packets were lost or not.
2628    fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2629        if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2630            // Time threshold loss Detection
2631            self.detect_lost_packets(now, pn_space, path_id, false);
2632            self.set_loss_detection_timer(now, path_id);
2633            return;
2634        }
2635
2636        let (_, space) = match self.pto_time_and_space(now, path_id) {
2637            Some(x) => x,
2638            None => {
2639                error!(%path_id, "PTO expired while unset");
2640                return;
2641            }
2642        };
2643        trace!(
2644            in_flight = self.path_data(path_id).in_flight.bytes,
2645            count = self.path_data(path_id).pto_count,
2646            ?space,
2647            %path_id,
2648            "PTO fired"
2649        );
2650
2651        let count = match self.path_data(path_id).in_flight.ack_eliciting {
2652            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
2653            // deadlock preventions
2654            0 => {
2655                debug_assert!(!self.peer_completed_address_validation(path_id));
2656                1
2657            }
2658            // Conventional loss probe
2659            _ => 2,
2660        };
2661        let pns = self.spaces[space].for_path(path_id);
2662        pns.loss_probes = pns.loss_probes.saturating_add(count);
2663        let path_data = self.path_data_mut(path_id);
2664        path_data.pto_count = path_data.pto_count.saturating_add(1);
2665        self.set_loss_detection_timer(now, path_id);
2666    }
2667
2668    /// Detect any lost packets
2669    ///
2670    /// There are two cases in which we detects lost packets:
2671    ///
2672    /// - We received an ACK packet.
2673    /// - The [`PathTimer::LossDetection`] timer expired. So there is an un-acknowledged packet
2674    ///   that was followed by an acknowledged packet. The loss timer for this
2675    ///   un-acknowledged packet expired and we need to detect that packet as lost.
2676    ///
2677    /// Packets are lost if they are both (See RFC9002 §6.1):
2678    ///
2679    /// - Unacknowledged, in flight and sent prior to an acknowledged packet.
2680    /// - Old enough by either:
2681    ///   - Having a packet number [`TransportConfig::packet_threshold`] lower then the last
2682    ///     acknowledged packet.
2683    ///   - Being sent [`TransportConfig::time_threshold`] * RTT in the past.
2684    fn detect_lost_packets(
2685        &mut self,
2686        now: Instant,
2687        pn_space: SpaceId,
2688        path_id: PathId,
2689        due_to_ack: bool,
2690    ) {
2691        let mut lost_packets = Vec::<u64>::new();
2692        let mut lost_mtu_probe = None;
2693        let mut in_persistent_congestion = false;
2694        let mut size_of_lost_packets = 0u64;
2695        self.spaces[pn_space].for_path(path_id).loss_time = None;
2696
2697        // Find all the lost packets, populating all variables initialised above.
2698
2699        let path = self.path_data(path_id);
2700        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2701        let loss_delay = path
2702            .rtt
2703            .conservative()
2704            .mul_f32(self.config.time_threshold)
2705            .max(TIMER_GRANULARITY);
2706        let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2707
2708        let largest_acked_packet = self.spaces[pn_space]
2709            .for_path(path_id)
2710            .largest_acked_packet
2711            .expect("detect_lost_packets only to be called if path received at least one ACK");
2712        let packet_threshold = self.config.packet_threshold as u64;
2713
2714        // InPersistentCongestion: Determine if all packets in the time period before the newest
2715        // lost packet, including the edges, are marked lost. PTO computation must always
2716        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
2717        let congestion_period = self
2718            .pto(SpaceId::Data, path_id)
2719            .saturating_mul(self.config.persistent_congestion_threshold);
2720        let mut persistent_congestion_start: Option<Instant> = None;
2721        let mut prev_packet = None;
2722        let space = self.spaces[pn_space].for_path(path_id);
2723
2724        for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2725            if prev_packet != Some(packet.wrapping_sub(1)) {
2726                // An intervening packet was acknowledged
2727                persistent_congestion_start = None;
2728            }
2729
2730            // Packets sent before now - loss_delay are deemed lost.
2731            // However, we avoid subtraction as it can panic and there's no
2732            // saturating equivalent of this subtraction operation with a Duration.
2733            let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2734            if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2735                // The packet should be declared lost.
2736                if Some(packet) == in_flight_mtu_probe {
2737                    // Lost MTU probes are not included in `lost_packets`, because they
2738                    // should not trigger a congestion control response
2739                    lost_mtu_probe = in_flight_mtu_probe;
2740                } else {
2741                    lost_packets.push(packet);
2742                    size_of_lost_packets += info.size as u64;
2743                    if info.ack_eliciting && due_to_ack {
2744                        match persistent_congestion_start {
2745                            // Two ACK-eliciting packets lost more than
2746                            // congestion_period apart, with no ACKed packets in between
2747                            Some(start) if info.time_sent - start > congestion_period => {
2748                                in_persistent_congestion = true;
2749                            }
2750                            // Persistent congestion must start after the first RTT sample
2751                            None if first_packet_after_rtt_sample
2752                                .is_some_and(|x| x < (pn_space, packet)) =>
2753                            {
2754                                persistent_congestion_start = Some(info.time_sent);
2755                            }
2756                            _ => {}
2757                        }
2758                    }
2759                }
2760            } else {
2761                // The packet should not yet be declared lost.
2762                if space.loss_time.is_none() {
2763                    // Since we iterate in order the lowest packet number's loss time will
2764                    // always be the earliest.
2765                    space.loss_time = Some(info.time_sent + loss_delay);
2766                }
2767                persistent_congestion_start = None;
2768            }
2769
2770            prev_packet = Some(packet);
2771        }
2772
2773        self.handle_lost_packets(
2774            pn_space,
2775            path_id,
2776            now,
2777            lost_packets,
2778            lost_mtu_probe,
2779            loss_delay,
2780            in_persistent_congestion,
2781            size_of_lost_packets,
2782        );
2783    }
2784
2785    /// Drops the path state, declaring any remaining in-flight packets as lost
2786    fn discard_path(&mut self, path_id: PathId, now: Instant) {
2787        trace!(%path_id, "dropping path state");
2788        let path = self.path_data(path_id);
2789        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2790
2791        let mut size_of_lost_packets = 0u64; // add to path_stats.lost_bytes;
2792        let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2793            .for_path(path_id)
2794            .sent_packets
2795            .iter()
2796            .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2797            .map(|(pn, info)| {
2798                size_of_lost_packets += info.size as u64;
2799                pn
2800            })
2801            .collect();
2802
2803        if !lost_pns.is_empty() {
2804            trace!(
2805                %path_id,
2806                count = lost_pns.len(),
2807                lost_bytes = size_of_lost_packets,
2808                "packets lost on path abandon"
2809            );
2810            self.handle_lost_packets(
2811                SpaceId::Data,
2812                path_id,
2813                now,
2814                lost_pns,
2815                in_flight_mtu_probe,
2816                Duration::ZERO,
2817                false,
2818                size_of_lost_packets,
2819            );
2820        }
2821        self.paths.remove(&path_id);
2822        self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2823
2824        let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2825        self.events.push_back(
2826            PathEvent::Abandoned {
2827                id: path_id,
2828                path_stats,
2829            }
2830            .into(),
2831        );
2832    }
2833
2834    fn handle_lost_packets(
2835        &mut self,
2836        pn_space: SpaceId,
2837        path_id: PathId,
2838        now: Instant,
2839        lost_packets: Vec<u64>,
2840        lost_mtu_probe: Option<u64>,
2841        loss_delay: Duration,
2842        in_persistent_congestion: bool,
2843        size_of_lost_packets: u64,
2844    ) {
2845        debug_assert!(
2846            {
2847                let mut sorted = lost_packets.clone();
2848                sorted.sort();
2849                sorted == lost_packets
2850            },
2851            "lost_packets must be sorted"
2852        );
2853
2854        self.drain_lost_packets(now, pn_space, path_id);
2855
2856        // OnPacketsLost
2857        if let Some(largest_lost) = lost_packets.last().cloned() {
2858            let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2859            let largest_lost_sent = self.spaces[pn_space]
2860                .for_path(path_id)
2861                .sent_packets
2862                .get(largest_lost)
2863                .unwrap()
2864                .time_sent;
2865            let path_stats = self.path_stats.entry(path_id).or_default();
2866            path_stats.lost_packets += lost_packets.len() as u64;
2867            path_stats.lost_bytes += size_of_lost_packets;
2868            trace!(
2869                %path_id,
2870                count = lost_packets.len(),
2871                lost_bytes = size_of_lost_packets,
2872                "packets lost",
2873            );
2874
2875            for &packet in &lost_packets {
2876                let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2877                    continue;
2878                };
2879                self.qlog
2880                    .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2881                self.paths
2882                    .get_mut(&path_id)
2883                    .unwrap()
2884                    .remove_in_flight(&info);
2885
2886                for frame in info.stream_frames {
2887                    self.streams.retransmit(frame);
2888                }
2889                self.spaces[pn_space].pending |= info.retransmits;
2890                self.path_data_mut(path_id)
2891                    .mtud
2892                    .on_non_probe_lost(packet, info.size);
2893
2894                self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2895                    packet,
2896                    LostPacket {
2897                        time_sent: info.time_sent,
2898                    },
2899                );
2900            }
2901
2902            let path = self.path_data_mut(path_id);
2903            if path.mtud.black_hole_detected(now) {
2904                path.congestion.on_mtu_update(path.mtud.current_mtu());
2905                if let Some(max_datagram_size) = self.datagrams().max_size() {
2906                    self.datagrams.drop_oversized(max_datagram_size);
2907                }
2908                self.path_stats
2909                    .entry(path_id)
2910                    .or_default()
2911                    .black_holes_detected += 1;
2912            }
2913
2914            // Don't apply congestion penalty for lost ack-only packets
2915            let lost_ack_eliciting =
2916                old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2917
2918            if lost_ack_eliciting {
2919                self.path_stats
2920                    .entry(path_id)
2921                    .or_default()
2922                    .congestion_events += 1;
2923                self.path_data_mut(path_id).congestion.on_congestion_event(
2924                    now,
2925                    largest_lost_sent,
2926                    in_persistent_congestion,
2927                    false,
2928                    size_of_lost_packets,
2929                );
2930            }
2931        }
2932
2933        // Handle a lost MTU probe
2934        if let Some(packet) = lost_mtu_probe {
2935            let info = self.spaces[SpaceId::Data]
2936                .for_path(path_id)
2937                .take(packet)
2938                .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and
2939            // therefore must not have been removed yet
2940            self.paths
2941                .get_mut(&path_id)
2942                .unwrap()
2943                .remove_in_flight(&info);
2944            self.path_data_mut(path_id).mtud.on_probe_lost();
2945            self.path_stats
2946                .entry(path_id)
2947                .or_default()
2948                .lost_plpmtud_probes += 1;
2949        }
2950    }
2951
2952    /// Returns the earliest time packets should be declared lost for all spaces on a path.
2953    ///
2954    /// If a path has an acknowledged packet with any prior un-acknowledged packets, the
2955    /// earliest un-acknowledged packet can be declared lost after a timeout has elapsed.
2956    /// The time returned is when this packet should be declared lost.
2957    fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2958        SpaceId::iter()
2959            .filter_map(|id| {
2960                self.spaces[id]
2961                    .number_spaces
2962                    .get(&path_id)
2963                    .and_then(|pns| pns.loss_time)
2964                    .map(|time| (time, id))
2965            })
2966            .min_by_key(|&(time, _)| time)
2967    }
2968
2969    /// Returns the earliest next PTO should fire for all spaces on a path.
2970    fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2971        let path = self.path(path_id)?;
2972        let pto_count = path.pto_count;
2973        let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2974        let mut duration = path.rtt.pto_base() * backoff;
2975
2976        if path_id == PathId::ZERO
2977            && path.in_flight.ack_eliciting == 0
2978            && !self.peer_completed_address_validation(PathId::ZERO)
2979        {
2980            // Address Validation during Connection Establishment:
2981            // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
2982            // deadlock if an Initial or Handshake packet from the server is lost and the
2983            // server can not send more due to its anti-amplification limit the client must
2984            // send another packet on PTO.
2985            let space = match self.highest_space {
2986                SpaceId::Handshake => SpaceId::Handshake,
2987                _ => SpaceId::Initial,
2988            };
2989
2990            return Some((now + duration, space));
2991        }
2992
2993        let mut result = None;
2994        for space in SpaceId::iter() {
2995            let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2996                continue;
2997            };
2998
2999            if !pns.has_in_flight() {
3000                continue;
3001            }
3002            if space == SpaceId::Data {
3003                // Skip ApplicationData until handshake completes.
3004                if self.is_handshaking() {
3005                    return result;
3006                }
3007                // Include max_ack_delay and backoff for ApplicationData.
3008                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3009            }
3010            let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3011                continue;
3012            };
3013            let pto = last_ack_eliciting + duration;
3014            if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3015                if path.anti_amplification_blocked(1) {
3016                    // Nothing would be able to be sent.
3017                    continue;
3018                }
3019                if path.in_flight.ack_eliciting == 0 {
3020                    // Nothing ack-eliciting, no PTO to arm/fire.
3021                    continue;
3022                }
3023                result = Some((pto, space));
3024            }
3025        }
3026        result
3027    }
3028
3029    fn peer_completed_address_validation(&self, path: PathId) -> bool {
3030        // TODO(flub): This logic needs updating for multipath
3031        if self.side.is_server() || self.state.is_closed() {
3032            return true;
3033        }
3034        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
3035        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
3036        self.spaces[SpaceId::Handshake]
3037            .path_space(PathId::ZERO)
3038            .and_then(|pns| pns.largest_acked_packet)
3039            .is_some()
3040            || self.spaces[SpaceId::Data]
3041                .path_space(path)
3042                .and_then(|pns| pns.largest_acked_packet)
3043                .is_some()
3044            || (self.spaces[SpaceId::Data].crypto.is_some()
3045                && self.spaces[SpaceId::Handshake].crypto.is_none())
3046    }
3047
3048    /// Resets the the [`PathTimer::LossDetection`] timer to the next instant it may be needed
3049    ///
3050    /// The timer must fire if either:
3051    /// - An ack-eliciting packet we sent needs to be declared lost.
3052    /// - A tail-loss probe needs to be sent.
3053    ///
3054    /// See [`Connection::on_loss_detection_timeout`] for details.
3055    fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3056        if self.state.is_closed() {
3057            // No loss detection takes place on closed connections, and `close_common` already
3058            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
3059            // reordered packet being handled by state-insensitive code.
3060            return;
3061        }
3062
3063        if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3064            // Time threshold loss detection.
3065            self.timers.set(
3066                Timer::PerPath(path_id, PathTimer::LossDetection),
3067                loss_time,
3068                self.qlog.with_time(now),
3069            );
3070            return;
3071        }
3072
3073        // Determine which PN space to arm PTO for.
3074        // Calculate PTO duration
3075        if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3076            self.timers.set(
3077                Timer::PerPath(path_id, PathTimer::LossDetection),
3078                timeout,
3079                self.qlog.with_time(now),
3080            );
3081        } else {
3082            self.timers.stop(
3083                Timer::PerPath(path_id, PathTimer::LossDetection),
3084                self.qlog.with_time(now),
3085            );
3086        }
3087    }
3088
3089    /// The maximum probe timeout across all paths
3090    ///
3091    /// See [`Connection::pto`]
3092    fn pto_max_path(&self, space: SpaceId) -> Duration {
3093        match space {
3094            SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3095            SpaceId::Data => self
3096                .paths
3097                .keys()
3098                .map(|path_id| self.pto(space, *path_id))
3099                .max()
3100                .expect("there should be one at least path"),
3101        }
3102    }
3103
3104    /// Probe Timeout
3105    ///
3106    /// The PTO is logically the time in which you'd expect to receive an acknowledgement
3107    /// for a packet. So approximately RTT + max_ack_delay.
3108    fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3109        let max_ack_delay = match space {
3110            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3111            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3112        };
3113        self.path_data(path_id).rtt.pto_base() + max_ack_delay
3114    }
3115
3116    fn on_packet_authenticated(
3117        &mut self,
3118        now: Instant,
3119        space_id: SpaceId,
3120        path_id: PathId,
3121        ecn: Option<EcnCodepoint>,
3122        packet: Option<u64>,
3123        spin: bool,
3124        is_1rtt: bool,
3125    ) {
3126        self.total_authed_packets += 1;
3127        if let Some(last_allowed_receive) = self
3128            .paths
3129            .get(&path_id)
3130            .and_then(|path| path.data.last_allowed_receive)
3131        {
3132            if now > last_allowed_receive {
3133                warn!("received data on path which we abandoned more than 3 * PTO ago");
3134                // The peer failed to respond with a PATH_ABANDON in time.
3135                if !self.state.is_closed() {
3136                    // TODO(flub): What should the error code be?
3137                    self.state.move_to_closed(TransportError::NO_ERROR(
3138                        "peer failed to respond with PATH_ABANDON in time",
3139                    ));
3140                    self.close_common();
3141                    self.set_close_timer(now);
3142                    self.close = true;
3143                }
3144                return;
3145            }
3146        }
3147
3148        self.reset_keep_alive(path_id, now);
3149        self.reset_idle_timeout(now, space_id, path_id);
3150        self.permit_idle_reset = true;
3151        self.receiving_ecn |= ecn.is_some();
3152        if let Some(x) = ecn {
3153            let space = &mut self.spaces[space_id];
3154            space.for_path(path_id).ecn_counters += x;
3155
3156            if x.is_ce() {
3157                space
3158                    .for_path(path_id)
3159                    .pending_acks
3160                    .set_immediate_ack_required();
3161            }
3162        }
3163
3164        let packet = match packet {
3165            Some(x) => x,
3166            None => return,
3167        };
3168        match &self.side {
3169            ConnectionSide::Client { .. } => {
3170                // If we received a handshake packet that authenticated, then we're talking to
3171                // the real server.  From now on we should no longer allow the server to migrate
3172                // its address.
3173                if space_id == SpaceId::Handshake {
3174                    if let Some(hs) = self.state.as_handshake_mut() {
3175                        hs.allow_server_migration = false;
3176                    }
3177                }
3178            }
3179            ConnectionSide::Server { .. } => {
3180                if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3181                {
3182                    // A server stops sending and processing Initial packets when it receives its first Handshake packet.
3183                    self.discard_space(now, SpaceId::Initial);
3184                }
3185                if self.zero_rtt_crypto.is_some() && is_1rtt {
3186                    // Discard 0-RTT keys soon after receiving a 1-RTT packet
3187                    self.set_key_discard_timer(now, space_id)
3188                }
3189            }
3190        }
3191        let space = self.spaces[space_id].for_path(path_id);
3192        space.pending_acks.insert_one(packet, now);
3193        if packet >= space.rx_packet.unwrap_or_default() {
3194            space.rx_packet = Some(packet);
3195            // Update outgoing spin bit, inverting iff we're the client
3196            self.spin = self.side.is_client() ^ spin;
3197        }
3198    }
3199
3200    /// Resets the idle timeout timers
3201    ///
3202    /// Without multipath there is only the connection-wide idle timeout. When multipath is
3203    /// enabled there is an additional per-path idle timeout.
3204    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3205        // First reset the global idle timeout.
3206        if let Some(timeout) = self.idle_timeout {
3207            if self.state.is_closed() {
3208                self.timers
3209                    .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3210            } else {
3211                let dt = cmp::max(timeout, 3 * self.pto_max_path(space));
3212                self.timers.set(
3213                    Timer::Conn(ConnTimer::Idle),
3214                    now + dt,
3215                    self.qlog.with_time(now),
3216                );
3217            }
3218        }
3219
3220        // Now handle the per-path state
3221        if let Some(timeout) = self.path_data(path_id).idle_timeout {
3222            if self.state.is_closed() {
3223                self.timers.stop(
3224                    Timer::PerPath(path_id, PathTimer::PathIdle),
3225                    self.qlog.with_time(now),
3226                );
3227            } else {
3228                let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3229                self.timers.set(
3230                    Timer::PerPath(path_id, PathTimer::PathIdle),
3231                    now + dt,
3232                    self.qlog.with_time(now),
3233                );
3234            }
3235        }
3236    }
3237
3238    /// Resets both the [`ConnTimer::KeepAlive`] and [`PathTimer::PathKeepAlive`] timers
3239    fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3240        if !self.state.is_established() {
3241            return;
3242        }
3243
3244        if let Some(interval) = self.config.keep_alive_interval {
3245            self.timers.set(
3246                Timer::Conn(ConnTimer::KeepAlive),
3247                now + interval,
3248                self.qlog.with_time(now),
3249            );
3250        }
3251
3252        if let Some(interval) = self.path_data(path_id).keep_alive {
3253            self.timers.set(
3254                Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3255                now + interval,
3256                self.qlog.with_time(now),
3257            );
3258        }
3259    }
3260
3261    /// Sets the timer for when a previously issued CID should be retired next
3262    fn reset_cid_retirement(&mut self, now: Instant) {
3263        if let Some((_path, t)) = self.next_cid_retirement() {
3264            self.timers.set(
3265                Timer::Conn(ConnTimer::PushNewCid),
3266                t,
3267                self.qlog.with_time(now),
3268            );
3269        }
3270    }
3271
3272    /// The next time when a previously issued CID should be retired
3273    fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3274        self.local_cid_state
3275            .iter()
3276            .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3277            .min_by_key(|(_path_id, timeout)| *timeout)
3278    }
3279
3280    /// Handle the already-decrypted first packet from the client
3281    ///
3282    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
3283    /// efficient.
3284    pub(crate) fn handle_first_packet(
3285        &mut self,
3286        now: Instant,
3287        remote: SocketAddr,
3288        ecn: Option<EcnCodepoint>,
3289        packet_number: u64,
3290        packet: InitialPacket,
3291        remaining: Option<BytesMut>,
3292    ) -> Result<(), ConnectionError> {
3293        let span = trace_span!("first recv");
3294        let _guard = span.enter();
3295        debug_assert!(self.side.is_server());
3296        let len = packet.header_data.len() + packet.payload.len();
3297        let path_id = PathId::ZERO;
3298        self.path_data_mut(path_id).total_recvd = len as u64;
3299
3300        if let Some(hs) = self.state.as_handshake_mut() {
3301            hs.expected_token = packet.header.token.clone();
3302        } else {
3303            unreachable!("first packet must be delivered in Handshake state");
3304        }
3305
3306        // The first packet is always on PathId::ZERO
3307        self.on_packet_authenticated(
3308            now,
3309            SpaceId::Initial,
3310            path_id,
3311            ecn,
3312            Some(packet_number),
3313            false,
3314            false,
3315        );
3316
3317        let packet: Packet = packet.into();
3318
3319        let mut qlog = QlogRecvPacket::new(len);
3320        qlog.header(&packet.header, Some(packet_number), path_id);
3321
3322        self.process_decrypted_packet(
3323            now,
3324            remote,
3325            path_id,
3326            Some(packet_number),
3327            packet,
3328            &mut qlog,
3329        )?;
3330        self.qlog.emit_packet_received(qlog, now);
3331        if let Some(data) = remaining {
3332            self.handle_coalesced(now, remote, path_id, ecn, data);
3333        }
3334
3335        self.qlog.emit_recovery_metrics(
3336            path_id,
3337            &mut self.paths.get_mut(&path_id).unwrap().data,
3338            now,
3339        );
3340
3341        Ok(())
3342    }
3343
3344    fn init_0rtt(&mut self, now: Instant) {
3345        let (header, packet) = match self.crypto.early_crypto() {
3346            Some(x) => x,
3347            None => return,
3348        };
3349        if self.side.is_client() {
3350            match self.crypto.transport_parameters() {
3351                Ok(params) => {
3352                    let params = params
3353                        .expect("crypto layer didn't supply transport parameters with ticket");
3354                    // Certain values must not be cached
3355                    let params = TransportParameters {
3356                        initial_src_cid: None,
3357                        original_dst_cid: None,
3358                        preferred_address: None,
3359                        retry_src_cid: None,
3360                        stateless_reset_token: None,
3361                        min_ack_delay: None,
3362                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3363                        max_ack_delay: TransportParameters::default().max_ack_delay,
3364                        initial_max_path_id: None,
3365                        ..params
3366                    };
3367                    self.set_peer_params(params);
3368                    self.qlog.emit_peer_transport_params_restored(self, now);
3369                }
3370                Err(e) => {
3371                    error!("session ticket has malformed transport parameters: {}", e);
3372                    return;
3373                }
3374            }
3375        }
3376        trace!("0-RTT enabled");
3377        self.zero_rtt_enabled = true;
3378        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3379    }
3380
3381    fn read_crypto(
3382        &mut self,
3383        space: SpaceId,
3384        crypto: &frame::Crypto,
3385        payload_len: usize,
3386    ) -> Result<(), TransportError> {
3387        let expected = if !self.state.is_handshake() {
3388            SpaceId::Data
3389        } else if self.highest_space == SpaceId::Initial {
3390            SpaceId::Initial
3391        } else {
3392            // On the server, self.highest_space can be Data after receiving the client's first
3393            // flight, but we expect Handshake CRYPTO until the handshake is complete.
3394            SpaceId::Handshake
3395        };
3396        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
3397        // packets are illegal, and we don't process 1-RTT packets until the handshake is
3398        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
3399        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3400
3401        let end = crypto.offset + crypto.data.len() as u64;
3402        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3403            warn!(
3404                "received new {:?} CRYPTO data when expecting {:?}",
3405                space, expected
3406            );
3407            return Err(TransportError::PROTOCOL_VIOLATION(
3408                "new data at unexpected encryption level",
3409            ));
3410        }
3411
3412        let space = &mut self.spaces[space];
3413        let max = end.saturating_sub(space.crypto_stream.bytes_read());
3414        if max > self.config.crypto_buffer_size as u64 {
3415            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3416        }
3417
3418        space
3419            .crypto_stream
3420            .insert(crypto.offset, crypto.data.clone(), payload_len);
3421        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3422            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3423            if self.crypto.read_handshake(&chunk.bytes)? {
3424                self.events.push_back(Event::HandshakeDataReady);
3425            }
3426        }
3427
3428        Ok(())
3429    }
3430
3431    fn write_crypto(&mut self) {
3432        loop {
3433            let space = self.highest_space;
3434            let mut outgoing = Vec::new();
3435            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3436                match space {
3437                    SpaceId::Initial => {
3438                        self.upgrade_crypto(SpaceId::Handshake, crypto);
3439                    }
3440                    SpaceId::Handshake => {
3441                        self.upgrade_crypto(SpaceId::Data, crypto);
3442                    }
3443                    _ => unreachable!("got updated secrets during 1-RTT"),
3444                }
3445            }
3446            if outgoing.is_empty() {
3447                if space == self.highest_space {
3448                    break;
3449                } else {
3450                    // Keys updated, check for more data to send
3451                    continue;
3452                }
3453            }
3454            let offset = self.spaces[space].crypto_offset;
3455            let outgoing = Bytes::from(outgoing);
3456            if let Some(hs) = self.state.as_handshake_mut() {
3457                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3458                    hs.client_hello = Some(outgoing.clone());
3459                }
3460            }
3461            self.spaces[space].crypto_offset += outgoing.len() as u64;
3462            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3463            self.spaces[space].pending.crypto.push_back(frame::Crypto {
3464                offset,
3465                data: outgoing,
3466            });
3467        }
3468    }
3469
3470    /// Switch to stronger cryptography during handshake
3471    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3472        debug_assert!(
3473            self.spaces[space].crypto.is_none(),
3474            "already reached packet space {space:?}"
3475        );
3476        trace!("{:?} keys ready", space);
3477        if space == SpaceId::Data {
3478            // Precompute the first key update
3479            self.next_crypto = Some(
3480                self.crypto
3481                    .next_1rtt_keys()
3482                    .expect("handshake should be complete"),
3483            );
3484        }
3485
3486        self.spaces[space].crypto = Some(crypto);
3487        debug_assert!(space as usize > self.highest_space as usize);
3488        self.highest_space = space;
3489        if space == SpaceId::Data && self.side.is_client() {
3490            // Discard 0-RTT keys because 1-RTT keys are available.
3491            self.zero_rtt_crypto = None;
3492        }
3493    }
3494
3495    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3496        debug_assert!(space_id != SpaceId::Data);
3497        trace!("discarding {:?} keys", space_id);
3498        if space_id == SpaceId::Initial {
3499            // No longer needed
3500            if let ConnectionSide::Client { token, .. } = &mut self.side {
3501                *token = Bytes::new();
3502            }
3503        }
3504        let space = &mut self.spaces[space_id];
3505        space.crypto = None;
3506        let pns = space.for_path(PathId::ZERO);
3507        pns.time_of_last_ack_eliciting_packet = None;
3508        pns.loss_time = None;
3509        pns.loss_probes = 0;
3510        let sent_packets = mem::take(&mut pns.sent_packets);
3511        let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3512        for (_, packet) in sent_packets.into_iter() {
3513            path.data.remove_in_flight(&packet);
3514        }
3515
3516        self.set_loss_detection_timer(now, PathId::ZERO)
3517    }
3518
3519    fn handle_coalesced(
3520        &mut self,
3521        now: Instant,
3522        remote: SocketAddr,
3523        path_id: PathId,
3524        ecn: Option<EcnCodepoint>,
3525        data: BytesMut,
3526    ) {
3527        self.path_data_mut(path_id)
3528            .inc_total_recvd(data.len() as u64);
3529        let mut remaining = Some(data);
3530        let cid_len = self
3531            .local_cid_state
3532            .values()
3533            .map(|cid_state| cid_state.cid_len())
3534            .next()
3535            .expect("one cid_state must exist");
3536        while let Some(data) = remaining {
3537            match PartialDecode::new(
3538                data,
3539                &FixedLengthConnectionIdParser::new(cid_len),
3540                &[self.version],
3541                self.endpoint_config.grease_quic_bit,
3542            ) {
3543                Ok((partial_decode, rest)) => {
3544                    remaining = rest;
3545                    self.handle_decode(now, remote, path_id, ecn, partial_decode);
3546                }
3547                Err(e) => {
3548                    trace!("malformed header: {}", e);
3549                    return;
3550                }
3551            }
3552        }
3553    }
3554
3555    fn handle_decode(
3556        &mut self,
3557        now: Instant,
3558        remote: SocketAddr,
3559        path_id: PathId,
3560        ecn: Option<EcnCodepoint>,
3561        partial_decode: PartialDecode,
3562    ) {
3563        let qlog = QlogRecvPacket::new(partial_decode.len());
3564        if let Some(decoded) = packet_crypto::unprotect_header(
3565            partial_decode,
3566            &self.spaces,
3567            self.zero_rtt_crypto.as_ref(),
3568            self.peer_params.stateless_reset_token,
3569        ) {
3570            self.handle_packet(
3571                now,
3572                remote,
3573                path_id,
3574                ecn,
3575                decoded.packet,
3576                decoded.stateless_reset,
3577                qlog,
3578            );
3579        }
3580    }
3581
3582    fn handle_packet(
3583        &mut self,
3584        now: Instant,
3585        remote: SocketAddr,
3586        path_id: PathId,
3587        ecn: Option<EcnCodepoint>,
3588        packet: Option<Packet>,
3589        stateless_reset: bool,
3590        mut qlog: QlogRecvPacket,
3591    ) {
3592        self.stats.udp_rx.ios += 1;
3593        if let Some(ref packet) = packet {
3594            trace!(
3595                "got {:?} packet ({} bytes) from {} using id {}",
3596                packet.header.space(),
3597                packet.payload.len() + packet.header_data.len(),
3598                remote,
3599                packet.header.dst_cid(),
3600            );
3601        }
3602
3603        if self.is_handshaking() {
3604            if path_id != PathId::ZERO {
3605                debug!(%remote, %path_id, "discarding multipath packet during handshake");
3606                return;
3607            }
3608            if remote != self.path_data_mut(path_id).remote {
3609                if let Some(hs) = self.state.as_handshake() {
3610                    if hs.allow_server_migration {
3611                        trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3612                        self.path_data_mut(path_id).remote = remote;
3613                        self.qlog.emit_tuple_assigned(path_id, remote, now);
3614                    } else {
3615                        debug!("discarding packet with unexpected remote during handshake");
3616                        return;
3617                    }
3618                } else {
3619                    debug!("discarding packet with unexpected remote during handshake");
3620                    return;
3621                }
3622            }
3623        }
3624
3625        let was_closed = self.state.is_closed();
3626        let was_drained = self.state.is_drained();
3627
3628        let decrypted = match packet {
3629            None => Err(None),
3630            Some(mut packet) => self
3631                .decrypt_packet(now, path_id, &mut packet)
3632                .map(move |number| (packet, number)),
3633        };
3634        let result = match decrypted {
3635            _ if stateless_reset => {
3636                debug!("got stateless reset");
3637                Err(ConnectionError::Reset)
3638            }
3639            Err(Some(e)) => {
3640                warn!("illegal packet: {}", e);
3641                Err(e.into())
3642            }
3643            Err(None) => {
3644                debug!("failed to authenticate packet");
3645                self.authentication_failures += 1;
3646                let integrity_limit = self.spaces[self.highest_space]
3647                    .crypto
3648                    .as_ref()
3649                    .unwrap()
3650                    .packet
3651                    .local
3652                    .integrity_limit();
3653                if self.authentication_failures > integrity_limit {
3654                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3655                } else {
3656                    return;
3657                }
3658            }
3659            Ok((packet, number)) => {
3660                qlog.header(&packet.header, number, path_id);
3661                let span = match number {
3662                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3663                    None => trace_span!("recv", space = ?packet.header.space()),
3664                };
3665                let _guard = span.enter();
3666
3667                let dedup = self.spaces[packet.header.space()]
3668                    .path_space_mut(path_id)
3669                    .map(|pns| &mut pns.dedup);
3670                if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3671                    debug!("discarding possible duplicate packet");
3672                    self.qlog.emit_packet_received(qlog, now);
3673                    return;
3674                } else if self.state.is_handshake() && packet.header.is_short() {
3675                    // TODO: SHOULD buffer these to improve reordering tolerance.
3676                    trace!("dropping short packet during handshake");
3677                    self.qlog.emit_packet_received(qlog, now);
3678                    return;
3679                } else {
3680                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3681                        if let Some(hs) = self.state.as_handshake() {
3682                            if self.side.is_server() && token != &hs.expected_token {
3683                                // Clients must send the same retry token in every Initial. Initial
3684                                // packets can be spoofed, so we discard rather than killing the
3685                                // connection.
3686                                warn!("discarding Initial with invalid retry token");
3687                                self.qlog.emit_packet_received(qlog, now);
3688                                return;
3689                            }
3690                        }
3691                    }
3692
3693                    if !self.state.is_closed() {
3694                        let spin = match packet.header {
3695                            Header::Short { spin, .. } => spin,
3696                            _ => false,
3697                        };
3698
3699                        if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3700                            // Only the client is allowed to open paths
3701                            self.ensure_path(path_id, remote, now, number);
3702                        }
3703                        if self.paths.contains_key(&path_id) {
3704                            self.on_packet_authenticated(
3705                                now,
3706                                packet.header.space(),
3707                                path_id,
3708                                ecn,
3709                                number,
3710                                spin,
3711                                packet.header.is_1rtt(),
3712                            );
3713                        }
3714                    }
3715
3716                    let res = self
3717                        .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3718
3719                    self.qlog.emit_packet_received(qlog, now);
3720                    res
3721                }
3722            }
3723        };
3724
3725        // State transitions for error cases
3726        if let Err(conn_err) = result {
3727            match conn_err {
3728                ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3729                ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3730                ConnectionError::Reset
3731                | ConnectionError::TransportError(TransportError {
3732                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
3733                    ..
3734                }) => {
3735                    self.state.move_to_drained(Some(conn_err));
3736                }
3737                ConnectionError::TimedOut => {
3738                    unreachable!("timeouts aren't generated by packet processing");
3739                }
3740                ConnectionError::TransportError(err) => {
3741                    debug!("closing connection due to transport error: {}", err);
3742                    self.state.move_to_closed(err);
3743                }
3744                ConnectionError::VersionMismatch => {
3745                    self.state.move_to_draining(Some(conn_err));
3746                }
3747                ConnectionError::LocallyClosed => {
3748                    unreachable!("LocallyClosed isn't generated by packet processing");
3749                }
3750                ConnectionError::CidsExhausted => {
3751                    unreachable!("CidsExhausted isn't generated by packet processing");
3752                }
3753            };
3754        }
3755
3756        if !was_closed && self.state.is_closed() {
3757            self.close_common();
3758            if !self.state.is_drained() {
3759                self.set_close_timer(now);
3760            }
3761        }
3762        if !was_drained && self.state.is_drained() {
3763            self.endpoint_events.push_back(EndpointEventInner::Drained);
3764            // Close timer may have been started previously, e.g. if we sent a close and got a
3765            // stateless reset in response
3766            self.timers
3767                .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3768        }
3769
3770        // Transmit CONNECTION_CLOSE if necessary
3771        if matches!(self.state.as_type(), StateType::Closed) {
3772            // If there is no PathData for this PathId the packet was for a brand new
3773            // path. It was a valid packet however, so the remote is valid and we want to
3774            // send CONNECTION_CLOSE.
3775            let path_remote = self
3776                .paths
3777                .get(&path_id)
3778                .map(|p| p.data.remote)
3779                .unwrap_or(remote);
3780            self.close = remote == path_remote;
3781        }
3782    }
3783
3784    fn process_decrypted_packet(
3785        &mut self,
3786        now: Instant,
3787        remote: SocketAddr,
3788        path_id: PathId,
3789        number: Option<u64>,
3790        packet: Packet,
3791        qlog: &mut QlogRecvPacket,
3792    ) -> Result<(), ConnectionError> {
3793        if !self.paths.contains_key(&path_id) {
3794            // There is a chance this is a server side, first (for this path) packet, which would
3795            // be a protocol violation. It's more likely, however, that this is a packet of a
3796            // pruned path
3797            trace!(%path_id, ?number, "discarding packet for unknown path");
3798            return Ok(());
3799        }
3800        let state = match self.state.as_type() {
3801            StateType::Established => {
3802                match packet.header.space() {
3803                    SpaceId::Data => {
3804                        self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3805                    }
3806                    _ if packet.header.has_frames() => {
3807                        self.process_early_payload(now, path_id, packet, qlog)?
3808                    }
3809                    _ => {
3810                        trace!("discarding unexpected pre-handshake packet");
3811                    }
3812                }
3813                return Ok(());
3814            }
3815            StateType::Closed => {
3816                for result in frame::Iter::new(packet.payload.freeze())? {
3817                    let frame = match result {
3818                        Ok(frame) => frame,
3819                        Err(err) => {
3820                            debug!("frame decoding error: {err:?}");
3821                            continue;
3822                        }
3823                    };
3824                    qlog.frame(&frame);
3825
3826                    if let Frame::Padding = frame {
3827                        continue;
3828                    };
3829
3830                    self.stats.frame_rx.record(&frame);
3831
3832                    if let Frame::Close(_error) = frame {
3833                        trace!("draining");
3834                        self.state.move_to_draining(None);
3835                        break;
3836                    }
3837                }
3838                return Ok(());
3839            }
3840            StateType::Draining | StateType::Drained => return Ok(()),
3841            StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3842        };
3843
3844        match packet.header {
3845            Header::Retry {
3846                src_cid: rem_cid, ..
3847            } => {
3848                debug_assert_eq!(path_id, PathId::ZERO);
3849                if self.side.is_server() {
3850                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3851                }
3852
3853                let is_valid_retry = self
3854                    .rem_cids
3855                    .get(&path_id)
3856                    .map(|cids| cids.active())
3857                    .map(|orig_dst_cid| {
3858                        self.crypto.is_valid_retry(
3859                            orig_dst_cid,
3860                            &packet.header_data,
3861                            &packet.payload,
3862                        )
3863                    })
3864                    .unwrap_or_default();
3865                if self.total_authed_packets > 1
3866                            || packet.payload.len() <= 16 // token + 16 byte tag
3867                            || !is_valid_retry
3868                {
3869                    trace!("discarding invalid Retry");
3870                    // - After the client has received and processed an Initial or Retry
3871                    //   packet from the server, it MUST discard any subsequent Retry
3872                    //   packets that it receives.
3873                    // - A client MUST discard a Retry packet with a zero-length Retry Token
3874                    //   field.
3875                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
3876                    //   that cannot be validated
3877                    return Ok(());
3878                }
3879
3880                trace!("retrying with CID {}", rem_cid);
3881                let client_hello = state.client_hello.take().unwrap();
3882                self.retry_src_cid = Some(rem_cid);
3883                self.rem_cids
3884                    .get_mut(&path_id)
3885                    .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3886                    .update_initial_cid(rem_cid);
3887                self.rem_handshake_cid = rem_cid;
3888
3889                let space = &mut self.spaces[SpaceId::Initial];
3890                if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3891                    self.on_packet_acked(now, PathId::ZERO, info);
3892                };
3893
3894                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after
3895                // any retransmitted Initials
3896                self.spaces[SpaceId::Initial] = {
3897                    let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3898                    space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3899                    space.crypto_offset = client_hello.len() as u64;
3900                    space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3901                        .for_path(path_id)
3902                        .next_packet_number;
3903                    space.pending.crypto.push_back(frame::Crypto {
3904                        offset: 0,
3905                        data: client_hello,
3906                    });
3907                    space
3908                };
3909
3910                // Retransmit all 0-RTT data
3911                let zero_rtt = mem::take(
3912                    &mut self.spaces[SpaceId::Data]
3913                        .for_path(PathId::ZERO)
3914                        .sent_packets,
3915                );
3916                for (_, info) in zero_rtt.into_iter() {
3917                    self.paths
3918                        .get_mut(&PathId::ZERO)
3919                        .unwrap()
3920                        .remove_in_flight(&info);
3921                    self.spaces[SpaceId::Data].pending |= info.retransmits;
3922                }
3923                self.streams.retransmit_all_for_0rtt();
3924
3925                let token_len = packet.payload.len() - 16;
3926                let ConnectionSide::Client { ref mut token, .. } = self.side else {
3927                    unreachable!("we already short-circuited if we're server");
3928                };
3929                *token = packet.payload.freeze().split_to(token_len);
3930
3931                self.state = State::handshake(state::Handshake {
3932                    expected_token: Bytes::new(),
3933                    rem_cid_set: false,
3934                    client_hello: None,
3935                    allow_server_migration: true,
3936                });
3937                Ok(())
3938            }
3939            Header::Long {
3940                ty: LongType::Handshake,
3941                src_cid: rem_cid,
3942                dst_cid: loc_cid,
3943                ..
3944            } => {
3945                debug_assert_eq!(path_id, PathId::ZERO);
3946                if rem_cid != self.rem_handshake_cid {
3947                    debug!(
3948                        "discarding packet with mismatched remote CID: {} != {}",
3949                        self.rem_handshake_cid, rem_cid
3950                    );
3951                    return Ok(());
3952                }
3953                self.on_path_validated(path_id);
3954
3955                self.process_early_payload(now, path_id, packet, qlog)?;
3956                if self.state.is_closed() {
3957                    return Ok(());
3958                }
3959
3960                if self.crypto.is_handshaking() {
3961                    trace!("handshake ongoing");
3962                    return Ok(());
3963                }
3964
3965                if self.side.is_client() {
3966                    // Client-only because server params were set from the client's Initial
3967                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3968                        TransportError::new(
3969                            TransportErrorCode::crypto(0x6d),
3970                            "transport parameters missing".to_owned(),
3971                        )
3972                    })?;
3973
3974                    if self.has_0rtt() {
3975                        if !self.crypto.early_data_accepted().unwrap() {
3976                            debug_assert!(self.side.is_client());
3977                            debug!("0-RTT rejected");
3978                            self.accepted_0rtt = false;
3979                            self.streams.zero_rtt_rejected();
3980
3981                            // Discard already-queued frames
3982                            self.spaces[SpaceId::Data].pending = Retransmits::default();
3983
3984                            // Discard 0-RTT packets
3985                            let sent_packets = mem::take(
3986                                &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3987                            );
3988                            for (_, packet) in sent_packets.into_iter() {
3989                                self.paths
3990                                    .get_mut(&path_id)
3991                                    .unwrap()
3992                                    .remove_in_flight(&packet);
3993                            }
3994                        } else {
3995                            self.accepted_0rtt = true;
3996                            params.validate_resumption_from(&self.peer_params)?;
3997                        }
3998                    }
3999                    if let Some(token) = params.stateless_reset_token {
4000                        let remote = self.path_data(path_id).remote;
4001                        self.endpoint_events
4002                            .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4003                    }
4004                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4005                    self.issue_first_cids(now);
4006                } else {
4007                    // Server-only
4008                    self.spaces[SpaceId::Data].pending.handshake_done = true;
4009                    self.discard_space(now, SpaceId::Handshake);
4010                    self.events.push_back(Event::HandshakeConfirmed);
4011                    trace!("handshake confirmed");
4012                }
4013
4014                self.events.push_back(Event::Connected);
4015                self.state.move_to_established();
4016                trace!("established");
4017
4018                // Multipath can only be enabled after the state has reached Established.
4019                // So this can not happen any earlier.
4020                self.issue_first_path_cids(now);
4021                Ok(())
4022            }
4023            Header::Initial(InitialHeader {
4024                src_cid: rem_cid,
4025                dst_cid: loc_cid,
4026                ..
4027            }) => {
4028                debug_assert_eq!(path_id, PathId::ZERO);
4029                if !state.rem_cid_set {
4030                    trace!("switching remote CID to {}", rem_cid);
4031                    let mut state = state.clone();
4032                    self.rem_cids
4033                        .get_mut(&path_id)
4034                        .expect("PathId::ZERO not yet abandoned")
4035                        .update_initial_cid(rem_cid);
4036                    self.rem_handshake_cid = rem_cid;
4037                    self.orig_rem_cid = rem_cid;
4038                    state.rem_cid_set = true;
4039                    self.state.move_to_handshake(state);
4040                } else if rem_cid != self.rem_handshake_cid {
4041                    debug!(
4042                        "discarding packet with mismatched remote CID: {} != {}",
4043                        self.rem_handshake_cid, rem_cid
4044                    );
4045                    return Ok(());
4046                }
4047
4048                let starting_space = self.highest_space;
4049                self.process_early_payload(now, path_id, packet, qlog)?;
4050
4051                if self.side.is_server()
4052                    && starting_space == SpaceId::Initial
4053                    && self.highest_space != SpaceId::Initial
4054                {
4055                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4056                        TransportError::new(
4057                            TransportErrorCode::crypto(0x6d),
4058                            "transport parameters missing".to_owned(),
4059                        )
4060                    })?;
4061                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4062                    self.issue_first_cids(now);
4063                    self.init_0rtt(now);
4064                }
4065                Ok(())
4066            }
4067            Header::Long {
4068                ty: LongType::ZeroRtt,
4069                ..
4070            } => {
4071                self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4072                Ok(())
4073            }
4074            Header::VersionNegotiate { .. } => {
4075                if self.total_authed_packets > 1 {
4076                    return Ok(());
4077                }
4078                let supported = packet
4079                    .payload
4080                    .chunks(4)
4081                    .any(|x| match <[u8; 4]>::try_from(x) {
4082                        Ok(version) => self.version == u32::from_be_bytes(version),
4083                        Err(_) => false,
4084                    });
4085                if supported {
4086                    return Ok(());
4087                }
4088                debug!("remote doesn't support our version");
4089                Err(ConnectionError::VersionMismatch)
4090            }
4091            Header::Short { .. } => unreachable!(
4092                "short packets received during handshake are discarded in handle_packet"
4093            ),
4094        }
4095    }
4096
4097    /// Process an Initial or Handshake packet payload
4098    fn process_early_payload(
4099        &mut self,
4100        now: Instant,
4101        path_id: PathId,
4102        packet: Packet,
4103        #[allow(unused)] qlog: &mut QlogRecvPacket,
4104    ) -> Result<(), TransportError> {
4105        debug_assert_ne!(packet.header.space(), SpaceId::Data);
4106        debug_assert_eq!(path_id, PathId::ZERO);
4107        let payload_len = packet.payload.len();
4108        let mut ack_eliciting = false;
4109        for result in frame::Iter::new(packet.payload.freeze())? {
4110            let frame = result?;
4111            qlog.frame(&frame);
4112            let span = match frame {
4113                Frame::Padding => continue,
4114                _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4115            };
4116
4117            self.stats.frame_rx.record(&frame);
4118
4119            let _guard = span.as_ref().map(|x| x.enter());
4120            ack_eliciting |= frame.is_ack_eliciting();
4121
4122            // Process frames
4123            if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4124                return Err(TransportError::PROTOCOL_VIOLATION(
4125                    "illegal frame type in handshake",
4126                ));
4127            }
4128
4129            match frame {
4130                Frame::Padding | Frame::Ping => {}
4131                Frame::Crypto(frame) => {
4132                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
4133                }
4134                Frame::Ack(ack) => {
4135                    self.on_ack_received(now, packet.header.space(), ack)?;
4136                }
4137                Frame::PathAck(ack) => {
4138                    span.as_ref()
4139                        .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4140                    self.on_path_ack_received(now, packet.header.space(), ack)?;
4141                }
4142                Frame::Close(reason) => {
4143                    self.state.move_to_draining(Some(reason.into()));
4144                    return Ok(());
4145                }
4146                _ => {
4147                    let mut err =
4148                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4149                    err.frame = Some(frame.ty());
4150                    return Err(err);
4151                }
4152            }
4153        }
4154
4155        if ack_eliciting {
4156            // In the initial and handshake spaces, ACKs must be sent immediately
4157            self.spaces[packet.header.space()]
4158                .for_path(path_id)
4159                .pending_acks
4160                .set_immediate_ack_required();
4161        }
4162
4163        self.write_crypto();
4164        Ok(())
4165    }
4166
4167    /// Processes the packet payload, always in the data space.
4168    fn process_payload(
4169        &mut self,
4170        now: Instant,
4171        remote: SocketAddr,
4172        path_id: PathId,
4173        number: u64,
4174        packet: Packet,
4175        #[allow(unused)] qlog: &mut QlogRecvPacket,
4176    ) -> Result<(), TransportError> {
4177        let payload = packet.payload.freeze();
4178        let mut is_probing_packet = true;
4179        let mut close = None;
4180        let payload_len = payload.len();
4181        let mut ack_eliciting = false;
4182        // if this packet triggers a path migration and includes a observed address frame, it's
4183        // stored here
4184        let mut migration_observed_addr = None;
4185        for result in frame::Iter::new(payload)? {
4186            let frame = result?;
4187            qlog.frame(&frame);
4188            let span = match frame {
4189                Frame::Padding => continue,
4190                _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4191            };
4192
4193            self.stats.frame_rx.record(&frame);
4194            // Crypto, Stream and Datagram frames are special cased in order no pollute
4195            // the log with payload data
4196            match &frame {
4197                Frame::Crypto(f) => {
4198                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4199                }
4200                Frame::Stream(f) => {
4201                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4202                }
4203                Frame::Datagram(f) => {
4204                    trace!(len = f.data.len(), "got datagram frame");
4205                }
4206                f => {
4207                    trace!("got frame {:?}", f);
4208                }
4209            }
4210
4211            let _guard = span.enter();
4212            if packet.header.is_0rtt() {
4213                match frame {
4214                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4215                        return Err(TransportError::PROTOCOL_VIOLATION(
4216                            "illegal frame type in 0-RTT",
4217                        ));
4218                    }
4219                    _ => {
4220                        if frame.is_1rtt() {
4221                            return Err(TransportError::PROTOCOL_VIOLATION(
4222                                "illegal frame type in 0-RTT",
4223                            ));
4224                        }
4225                    }
4226                }
4227            }
4228            ack_eliciting |= frame.is_ack_eliciting();
4229
4230            // Check whether this could be a probing packet
4231            match frame {
4232                Frame::Padding
4233                | Frame::PathChallenge(_)
4234                | Frame::PathResponse(_)
4235                | Frame::NewConnectionId(_)
4236                | Frame::ObservedAddr(_) => {}
4237                _ => {
4238                    is_probing_packet = false;
4239                }
4240            }
4241
4242            match frame {
4243                Frame::Crypto(frame) => {
4244                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4245                }
4246                Frame::Stream(frame) => {
4247                    if self.streams.received(frame, payload_len)?.should_transmit() {
4248                        self.spaces[SpaceId::Data].pending.max_data = true;
4249                    }
4250                }
4251                Frame::Ack(ack) => {
4252                    self.on_ack_received(now, SpaceId::Data, ack)?;
4253                }
4254                Frame::PathAck(ack) => {
4255                    span.record("path", tracing::field::debug(&ack.path_id));
4256                    self.on_path_ack_received(now, SpaceId::Data, ack)?;
4257                }
4258                Frame::Padding | Frame::Ping => {}
4259                Frame::Close(reason) => {
4260                    close = Some(reason);
4261                }
4262                Frame::PathChallenge(challenge) => {
4263                    let path = &mut self
4264                        .path_mut(path_id)
4265                        .expect("payload is processed only after the path becomes known");
4266                    path.path_responses.push(number, challenge.0, remote);
4267                    if remote == path.remote {
4268                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
4269                        // attack. Send a non-probing packet to recover the active path.
4270                        // TODO(flub): No longer true! We now path_challege also to validate
4271                        //    the path if the path is new, without an RFC9000-style
4272                        //    migration involved. This means we add in an extra
4273                        //    IMMEDIATE_ACK on some challenges. It isn't really wrong to do
4274                        //    so, but it still is something untidy. We should instead
4275                        //    suppress this when we know the remote is still validating the
4276                        //    path.
4277                        match self.peer_supports_ack_frequency() {
4278                            true => self.immediate_ack(path_id),
4279                            false => {
4280                                self.ping_path(path_id).ok();
4281                            }
4282                        }
4283                    }
4284                }
4285                Frame::PathResponse(response) => {
4286                    let path = self
4287                        .paths
4288                        .get_mut(&path_id)
4289                        .expect("payload is processed only after the path becomes known");
4290
4291                    match path.data.challenges_sent.get(&response.0) {
4292                        // Response to an on-path PathChallenge
4293                        Some(info) if info.remote == remote && path.data.remote == remote => {
4294                            let sent_instant = info.sent_instant;
4295                            // TODO(@divma): reset timers using the remaining off-path challenges
4296                            self.timers.stop(
4297                                Timer::PerPath(path_id, PathTimer::PathValidation),
4298                                self.qlog.with_time(now),
4299                            );
4300                            self.timers.stop(
4301                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
4302                                self.qlog.with_time(now),
4303                            );
4304                            if !path.data.validated {
4305                                trace!("new path validated");
4306                            }
4307                            self.timers.stop(
4308                                Timer::PerPath(path_id, PathTimer::PathOpen),
4309                                self.qlog.with_time(now),
4310                            );
4311                            // Clear any other on-path sent challenge.
4312                            path.data
4313                                .challenges_sent
4314                                .retain(|_token, info| info.remote != remote);
4315                            path.data.send_new_challenge = false;
4316                            path.data.validated = true;
4317
4318                            // This RTT can only be used for the initial RTT, not as a normal
4319                            // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
4320                            let rtt = now.saturating_duration_since(sent_instant);
4321                            path.data.rtt.reset_initial_rtt(rtt);
4322
4323                            self.events
4324                                .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4325                            // mark the path as open from the application perspective now that Opened
4326                            // event has been queued
4327                            if !std::mem::replace(&mut path.data.open, true) {
4328                                trace!("path opened");
4329                                if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4330                                {
4331                                    self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4332                                        id: path_id,
4333                                        addr: observed.socket_addr(),
4334                                    }));
4335                                }
4336                            }
4337                            if let Some((_, ref mut prev)) = path.prev {
4338                                prev.challenges_sent.clear();
4339                                prev.send_new_challenge = false;
4340                            }
4341                        }
4342                        // Response to an off-path PathChallenge
4343                        Some(info) if info.remote == remote => {
4344                            debug!("Response to off-path PathChallenge!");
4345                            path.data
4346                                .challenges_sent
4347                                .retain(|_token, info| info.remote != remote);
4348                        }
4349                        // Response to a PathChallenge we recognize, but from an invalid remote
4350                        Some(info) => {
4351                            debug!(%response, from=%remote, expected=%info.remote, "ignoring invalid PATH_RESPONSE")
4352                        }
4353                        // Response to an unknown PathChallenge
4354                        None => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4355                    }
4356                }
4357                Frame::MaxData(bytes) => {
4358                    self.streams.received_max_data(bytes);
4359                }
4360                Frame::MaxStreamData { id, offset } => {
4361                    self.streams.received_max_stream_data(id, offset)?;
4362                }
4363                Frame::MaxStreams { dir, count } => {
4364                    self.streams.received_max_streams(dir, count)?;
4365                }
4366                Frame::ResetStream(frame) => {
4367                    if self.streams.received_reset(frame)?.should_transmit() {
4368                        self.spaces[SpaceId::Data].pending.max_data = true;
4369                    }
4370                }
4371                Frame::DataBlocked { offset } => {
4372                    debug!(offset, "peer claims to be blocked at connection level");
4373                }
4374                Frame::StreamDataBlocked { id, offset } => {
4375                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4376                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4377                        return Err(TransportError::STREAM_STATE_ERROR(
4378                            "STREAM_DATA_BLOCKED on send-only stream",
4379                        ));
4380                    }
4381                    debug!(
4382                        stream = %id,
4383                        offset, "peer claims to be blocked at stream level"
4384                    );
4385                }
4386                Frame::StreamsBlocked { dir, limit } => {
4387                    if limit > MAX_STREAM_COUNT {
4388                        return Err(TransportError::FRAME_ENCODING_ERROR(
4389                            "unrepresentable stream limit",
4390                        ));
4391                    }
4392                    debug!(
4393                        "peer claims to be blocked opening more than {} {} streams",
4394                        limit, dir
4395                    );
4396                }
4397                Frame::StopSending(frame::StopSending { id, error_code }) => {
4398                    if id.initiator() != self.side.side() {
4399                        if id.dir() == Dir::Uni {
4400                            debug!("got STOP_SENDING on recv-only {}", id);
4401                            return Err(TransportError::STREAM_STATE_ERROR(
4402                                "STOP_SENDING on recv-only stream",
4403                            ));
4404                        }
4405                    } else if self.streams.is_local_unopened(id) {
4406                        return Err(TransportError::STREAM_STATE_ERROR(
4407                            "STOP_SENDING on unopened stream",
4408                        ));
4409                    }
4410                    self.streams.received_stop_sending(id, error_code);
4411                }
4412                Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4413                    if let Some(ref path_id) = path_id {
4414                        span.record("path", tracing::field::debug(&path_id));
4415                    }
4416                    let path_id = path_id.unwrap_or_default();
4417                    match self.local_cid_state.get_mut(&path_id) {
4418                        None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4419                        Some(cid_state) => {
4420                            let allow_more_cids = cid_state
4421                                .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4422
4423                            // If the path has closed, we do not issue more CIDs for this path
4424                            // For details see  https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.2.2
4425                            // > an endpoint SHOULD provide new connection IDs for that path, if still open, using PATH_NEW_CONNECTION_ID frames.
4426                            let has_path = !self.abandoned_paths.contains(&path_id);
4427                            let allow_more_cids = allow_more_cids && has_path;
4428
4429                            self.endpoint_events
4430                                .push_back(EndpointEventInner::RetireConnectionId(
4431                                    now,
4432                                    path_id,
4433                                    sequence,
4434                                    allow_more_cids,
4435                                ));
4436                        }
4437                    }
4438                }
4439                Frame::NewConnectionId(frame) => {
4440                    let path_id = if let Some(path_id) = frame.path_id {
4441                        if !self.is_multipath_negotiated() {
4442                            return Err(TransportError::PROTOCOL_VIOLATION(
4443                                "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4444                            ));
4445                        }
4446                        if path_id > self.local_max_path_id {
4447                            return Err(TransportError::PROTOCOL_VIOLATION(
4448                                "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4449                            ));
4450                        }
4451                        path_id
4452                    } else {
4453                        PathId::ZERO
4454                    };
4455
4456                    if self.abandoned_paths.contains(&path_id) {
4457                        trace!("ignoring issued CID for abandoned path");
4458                        continue;
4459                    }
4460                    if let Some(ref path_id) = frame.path_id {
4461                        span.record("path", tracing::field::debug(&path_id));
4462                    }
4463                    let rem_cids = self
4464                        .rem_cids
4465                        .entry(path_id)
4466                        .or_insert_with(|| CidQueue::new(frame.id));
4467                    if rem_cids.active().is_empty() {
4468                        // TODO(@divma): is the entry removed later? (rem_cids.entry)
4469                        return Err(TransportError::PROTOCOL_VIOLATION(
4470                            "NEW_CONNECTION_ID when CIDs aren't in use",
4471                        ));
4472                    }
4473                    if frame.retire_prior_to > frame.sequence {
4474                        return Err(TransportError::PROTOCOL_VIOLATION(
4475                            "NEW_CONNECTION_ID retiring unissued CIDs",
4476                        ));
4477                    }
4478
4479                    use crate::cid_queue::InsertError;
4480                    match rem_cids.insert(frame) {
4481                        Ok(None) => {}
4482                        Ok(Some((retired, reset_token))) => {
4483                            let pending_retired =
4484                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
4485                            /// Ensure `pending_retired` cannot grow without bound. Limit is
4486                            /// somewhat arbitrary but very permissive.
4487                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4488                            // We don't bother counting in-flight frames because those are bounded
4489                            // by congestion control.
4490                            if (pending_retired.len() as u64)
4491                                .saturating_add(retired.end.saturating_sub(retired.start))
4492                                > MAX_PENDING_RETIRED_CIDS
4493                            {
4494                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4495                                    "queued too many retired CIDs",
4496                                ));
4497                            }
4498                            pending_retired.extend(retired.map(|seq| (path_id, seq)));
4499                            self.set_reset_token(path_id, remote, reset_token);
4500                        }
4501                        Err(InsertError::ExceedsLimit) => {
4502                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4503                        }
4504                        Err(InsertError::Retired) => {
4505                            trace!("discarding already-retired");
4506                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
4507                            // range of connection IDs larger than the active connection ID limit
4508                            // was retired all at once via retire_prior_to.
4509                            self.spaces[SpaceId::Data]
4510                                .pending
4511                                .retire_cids
4512                                .push((path_id, frame.sequence));
4513                            continue;
4514                        }
4515                    };
4516
4517                    if self.side.is_server()
4518                        && path_id == PathId::ZERO
4519                        && self
4520                            .rem_cids
4521                            .get(&PathId::ZERO)
4522                            .map(|cids| cids.active_seq() == 0)
4523                            .unwrap_or_default()
4524                    {
4525                        // We're a server still using the initial remote CID for the client, so
4526                        // let's switch immediately to enable clientside stateless resets.
4527                        self.update_rem_cid(PathId::ZERO);
4528                    }
4529                }
4530                Frame::NewToken(NewToken { token }) => {
4531                    let ConnectionSide::Client {
4532                        token_store,
4533                        server_name,
4534                        ..
4535                    } = &self.side
4536                    else {
4537                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4538                    };
4539                    if token.is_empty() {
4540                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4541                    }
4542                    trace!("got new token");
4543                    token_store.insert(server_name, token);
4544                }
4545                Frame::Datagram(datagram) => {
4546                    if self
4547                        .datagrams
4548                        .received(datagram, &self.config.datagram_receive_buffer_size)?
4549                    {
4550                        self.events.push_back(Event::DatagramReceived);
4551                    }
4552                }
4553                Frame::AckFrequency(ack_frequency) => {
4554                    // This frame can only be sent in the Data space
4555
4556                    if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4557                        // The AckFrequency frame is stale (we have already received a more
4558                        // recent one)
4559                        continue;
4560                    }
4561
4562                    // Update the params for all of our paths
4563                    for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4564                        space.pending_acks.set_ack_frequency_params(&ack_frequency);
4565
4566                        // Our `max_ack_delay` has been updated, so we may need to adjust
4567                        // its associated timeout
4568                        if let Some(timeout) = space
4569                            .pending_acks
4570                            .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4571                        {
4572                            self.timers.set(
4573                                Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4574                                timeout,
4575                                self.qlog.with_time(now),
4576                            );
4577                        }
4578                    }
4579                }
4580                Frame::ImmediateAck => {
4581                    // This frame can only be sent in the Data space
4582                    for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4583                        pns.pending_acks.set_immediate_ack_required();
4584                    }
4585                }
4586                Frame::HandshakeDone => {
4587                    if self.side.is_server() {
4588                        return Err(TransportError::PROTOCOL_VIOLATION(
4589                            "client sent HANDSHAKE_DONE",
4590                        ));
4591                    }
4592                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
4593                        self.discard_space(now, SpaceId::Handshake);
4594                    }
4595                    self.events.push_back(Event::HandshakeConfirmed);
4596                    trace!("handshake confirmed");
4597                }
4598                Frame::ObservedAddr(observed) => {
4599                    // check if params allows the peer to send report and this node to receive it
4600                    trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4601                    if !self
4602                        .peer_params
4603                        .address_discovery_role
4604                        .should_report(&self.config.address_discovery_role)
4605                    {
4606                        return Err(TransportError::PROTOCOL_VIOLATION(
4607                            "received OBSERVED_ADDRESS frame when not negotiated",
4608                        ));
4609                    }
4610                    // must only be sent in data space
4611                    if packet.header.space() != SpaceId::Data {
4612                        return Err(TransportError::PROTOCOL_VIOLATION(
4613                            "OBSERVED_ADDRESS frame outside data space",
4614                        ));
4615                    }
4616
4617                    let path = self.path_data_mut(path_id);
4618                    if remote == path.remote {
4619                        if let Some(updated) = path.update_observed_addr_report(observed) {
4620                            if path.open {
4621                                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4622                                    id: path_id,
4623                                    addr: updated,
4624                                }));
4625                            }
4626                            // otherwise the event is reported when the path is deemed open
4627                        }
4628                    } else {
4629                        // include in migration
4630                        migration_observed_addr = Some(observed)
4631                    }
4632                }
4633                Frame::PathAbandon(frame::PathAbandon {
4634                    path_id,
4635                    error_code,
4636                }) => {
4637                    span.record("path", tracing::field::debug(&path_id));
4638                    // TODO(flub): don't really know which error code to use here.
4639                    let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4640                        Ok(()) => {
4641                            trace!("peer abandoned path");
4642                            false
4643                        }
4644                        Err(ClosePathError::LastOpenPath) => {
4645                            trace!("peer abandoned last path, closing connection");
4646                            // TODO(flub): which error code?
4647                            return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4648                        }
4649                        Err(ClosePathError::ClosedPath) => {
4650                            trace!("peer abandoned already closed path");
4651                            true
4652                        }
4653                    };
4654                    // If we receive a retransmit of PATH_ABANDON then we may already have
4655                    // abandoned this path locally.  In that case the DiscardPath timer
4656                    // may already have fired and we no longer have any state for this path.
4657                    // Only set this timer if we still have path state.
4658                    if self.path(path_id).is_some() && !already_abandoned {
4659                        // TODO(flub): Checking is_some() here followed by a number of calls
4660                        //    that would panic if it was None is really ugly.  If only we
4661                        //    could do something like PathData::pto().  One day we'll have
4662                        //    unified SpaceId and PathId and this will be possible.
4663                        let delay = self.pto(SpaceId::Data, path_id) * 3;
4664                        self.timers.set(
4665                            Timer::PerPath(path_id, PathTimer::DiscardPath),
4666                            now + delay,
4667                            self.qlog.with_time(now),
4668                        );
4669                    }
4670                }
4671                Frame::PathStatusAvailable(info) => {
4672                    span.record("path", tracing::field::debug(&info.path_id));
4673                    if self.is_multipath_negotiated() {
4674                        self.on_path_status(
4675                            info.path_id,
4676                            PathStatus::Available,
4677                            info.status_seq_no,
4678                        );
4679                    } else {
4680                        return Err(TransportError::PROTOCOL_VIOLATION(
4681                            "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4682                        ));
4683                    }
4684                }
4685                Frame::PathStatusBackup(info) => {
4686                    span.record("path", tracing::field::debug(&info.path_id));
4687                    if self.is_multipath_negotiated() {
4688                        self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4689                    } else {
4690                        return Err(TransportError::PROTOCOL_VIOLATION(
4691                            "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4692                        ));
4693                    }
4694                }
4695                Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4696                    span.record("path", tracing::field::debug(&path_id));
4697                    if !self.is_multipath_negotiated() {
4698                        return Err(TransportError::PROTOCOL_VIOLATION(
4699                            "received MAX_PATH_ID frame when multipath was not negotiated",
4700                        ));
4701                    }
4702                    // frames that do not increase the path id are ignored
4703                    if path_id > self.remote_max_path_id {
4704                        self.remote_max_path_id = path_id;
4705                        self.issue_first_path_cids(now);
4706                    }
4707                }
4708                Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4709                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4710                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4711                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4712                    if self.is_multipath_negotiated() {
4713                        if self.local_max_path_id > max_path_id {
4714                            return Err(TransportError::PROTOCOL_VIOLATION(
4715                                "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4716                            ));
4717                        }
4718                        debug!("received PATHS_BLOCKED({:?})", max_path_id);
4719                        // TODO(@divma): ensure max concurrent paths
4720                    } else {
4721                        return Err(TransportError::PROTOCOL_VIOLATION(
4722                            "received PATHS_BLOCKED frame when not multipath was not negotiated",
4723                        ));
4724                    }
4725                }
4726                Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4727                    // Nothing to do.  This is recorded in the frame stats, but otherwise we
4728                    // always issue all CIDs we're allowed to issue, so either this is an
4729                    // impatient peer or a bug on our side.
4730
4731                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4732                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4733                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4734                    if self.is_multipath_negotiated() {
4735                        if path_id > self.local_max_path_id {
4736                            return Err(TransportError::PROTOCOL_VIOLATION(
4737                                "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4738                            ));
4739                        }
4740                        if next_seq.0
4741                            > self
4742                                .local_cid_state
4743                                .get(&path_id)
4744                                .map(|cid_state| cid_state.active_seq().1 + 1)
4745                                .unwrap_or_default()
4746                        {
4747                            return Err(TransportError::PROTOCOL_VIOLATION(
4748                                "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4749                            ));
4750                        }
4751                        debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4752                    } else {
4753                        return Err(TransportError::PROTOCOL_VIOLATION(
4754                            "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4755                        ));
4756                    }
4757                }
4758                Frame::AddAddress(addr) => {
4759                    let client_state = match self.iroh_hp.client_side_mut() {
4760                        Ok(state) => state,
4761                        Err(err) => {
4762                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4763                                "Nat traversal(ADD_ADDRESS): {err}"
4764                            )));
4765                        }
4766                    };
4767
4768                    if !client_state.check_remote_address(&addr) {
4769                        // if the address is not valid we flag it, but update anyway
4770                        warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4771                    }
4772
4773                    match client_state.add_remote_address(addr) {
4774                        Ok(maybe_added) => {
4775                            if let Some(added) = maybe_added {
4776                                self.events.push_back(Event::NatTraversal(
4777                                    iroh_hp::Event::AddressAdded(added),
4778                                ));
4779                            }
4780                        }
4781                        Err(e) => {
4782                            warn!(%e, "failed to add remote address")
4783                        }
4784                    }
4785                }
4786                Frame::RemoveAddress(addr) => {
4787                    let client_state = match self.iroh_hp.client_side_mut() {
4788                        Ok(state) => state,
4789                        Err(err) => {
4790                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4791                                "Nat traversal(REMOVE_ADDRESS): {err}"
4792                            )));
4793                        }
4794                    };
4795                    if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4796                        self.events
4797                            .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4798                                removed_addr,
4799                            )));
4800                    }
4801                }
4802                Frame::ReachOut(reach_out) => {
4803                    let server_state = match self.iroh_hp.server_side_mut() {
4804                        Ok(state) => state,
4805                        Err(err) => {
4806                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4807                                "Nat traversal(REACH_OUT): {err}"
4808                            )));
4809                        }
4810                    };
4811
4812                    if let Err(err) = server_state.handle_reach_out(reach_out) {
4813                        return Err(TransportError::PROTOCOL_VIOLATION(format!(
4814                            "Nat traversal(REACH_OUT): {err}"
4815                        )));
4816                    }
4817                }
4818            }
4819        }
4820
4821        let space = self.spaces[SpaceId::Data].for_path(path_id);
4822        if space
4823            .pending_acks
4824            .packet_received(now, number, ack_eliciting, &space.dedup)
4825        {
4826            if self.abandoned_paths.contains(&path_id) {
4827                // § 3.4.3 QUIC-MULTIPATH: promptly send ACKs for packets received from
4828                // abandoned paths.
4829                space.pending_acks.set_immediate_ack_required();
4830            } else {
4831                self.timers.set(
4832                    Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4833                    now + self.ack_frequency.max_ack_delay,
4834                    self.qlog.with_time(now),
4835                );
4836            }
4837        }
4838
4839        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
4840        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
4841        // are only freed, and hence only issue credit, once the application has been notified
4842        // during a read on the stream.
4843        let pending = &mut self.spaces[SpaceId::Data].pending;
4844        self.streams.queue_max_stream_id(pending);
4845
4846        if let Some(reason) = close {
4847            self.state.move_to_draining(Some(reason.into()));
4848            self.close = true;
4849        }
4850
4851        if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4852            && !is_probing_packet
4853            && remote != self.path_data(path_id).remote
4854        {
4855            let ConnectionSide::Server { ref server_config } = self.side else {
4856                panic!("packets from unknown remote should be dropped by clients");
4857            };
4858            debug_assert!(
4859                server_config.migration,
4860                "migration-initiating packets should have been dropped immediately"
4861            );
4862            self.migrate(path_id, now, remote, migration_observed_addr);
4863            // Break linkability, if possible
4864            self.update_rem_cid(path_id);
4865            self.spin = false;
4866        }
4867
4868        Ok(())
4869    }
4870
4871    fn migrate(
4872        &mut self,
4873        path_id: PathId,
4874        now: Instant,
4875        remote: SocketAddr,
4876        observed_addr: Option<ObservedAddr>,
4877    ) {
4878        trace!(%remote, %path_id, "migration initiated");
4879        self.path_counter = self.path_counter.wrapping_add(1);
4880        // TODO(@divma): conditions for path migration in multipath are very specific, check them
4881        // again to prevent path migrations that should actually create a new path
4882
4883        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
4884        // Note that the congestion window will not grow until validation terminates. Helps mitigate
4885        // amplification attacks performed by spoofing source addresses.
4886        let prev_pto = self.pto(SpaceId::Data, path_id);
4887        let known_path = self.paths.get_mut(&path_id).expect("known path");
4888        let path = &mut known_path.data;
4889        let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4890            PathData::from_previous(remote, path, self.path_counter, now)
4891        } else {
4892            let peer_max_udp_payload_size =
4893                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4894                    .unwrap_or(u16::MAX);
4895            PathData::new(
4896                remote,
4897                self.allow_mtud,
4898                Some(peer_max_udp_payload_size),
4899                self.path_counter,
4900                now,
4901                &self.config,
4902            )
4903        };
4904        new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4905        if let Some(report) = observed_addr {
4906            if let Some(updated) = new_path.update_observed_addr_report(report) {
4907                tracing::info!("adding observed addr event from migration");
4908                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4909                    id: path_id,
4910                    addr: updated,
4911                }));
4912            }
4913        }
4914        new_path.send_new_challenge = true;
4915
4916        let mut prev = mem::replace(path, new_path);
4917        // Don't clobber the original path if the previous one hasn't been validated yet
4918        if !prev.is_validating_path() {
4919            prev.send_new_challenge = true;
4920            // We haven't updated the remote CID yet, this captures the remote CID we were using on
4921            // the previous path.
4922
4923            known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4924        }
4925
4926        // We need to re-assign the correct remote to this path in qlog
4927        self.qlog.emit_tuple_assigned(path_id, remote, now);
4928
4929        self.timers.set(
4930            Timer::PerPath(path_id, PathTimer::PathValidation),
4931            now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4932            self.qlog.with_time(now),
4933        );
4934    }
4935
4936    /// Handle a change in the local address, i.e. an active migration
4937    pub fn local_address_changed(&mut self) {
4938        // TODO(flub): if multipath is enabled this needs to create a new path entirely.
4939        self.update_rem_cid(PathId::ZERO);
4940        self.ping();
4941    }
4942
4943    /// Switch to a previously unused remote connection ID, if possible
4944    fn update_rem_cid(&mut self, path_id: PathId) {
4945        let Some((reset_token, retired)) =
4946            self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4947        else {
4948            return;
4949        };
4950
4951        // Retire the current remote CID and any CIDs we had to skip.
4952        self.spaces[SpaceId::Data]
4953            .pending
4954            .retire_cids
4955            .extend(retired.map(|seq| (path_id, seq)));
4956        let remote = self.path_data(path_id).remote;
4957        self.set_reset_token(path_id, remote, reset_token);
4958    }
4959
4960    /// Sends this reset token to the endpoint
4961    ///
4962    /// The endpoint needs to know the reset tokens issued by the peer, so that if the peer
4963    /// sends a reset token it knows to route it to this connection. See RFC 9000 section
4964    /// 10.3. Stateless Reset.
4965    ///
4966    /// Reset tokens are different for each path, the endpoint identifies paths by peer
4967    /// socket address however, not by path ID.
4968    fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4969        self.endpoint_events
4970            .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4971
4972        // During the handshake the server sends a reset token in the transport
4973        // parameters. When we are the client and we receive the reset token during the
4974        // handshake we want this to affect our peer transport parameters.
4975        // TODO(flub): Pretty sure this is pointless, the entire params is overwritten
4976        //    shortly after this was called.  And then the params don't have this anymore.
4977        if path_id == PathId::ZERO {
4978            self.peer_params.stateless_reset_token = Some(reset_token);
4979        }
4980    }
4981
4982    /// Issue an initial set of connection IDs to the peer upon connection
4983    fn issue_first_cids(&mut self, now: Instant) {
4984        if self
4985            .local_cid_state
4986            .get(&PathId::ZERO)
4987            .expect("PathId::ZERO exists when the connection is created")
4988            .cid_len()
4989            == 0
4990        {
4991            return;
4992        }
4993
4994        // Subtract 1 to account for the CID we supplied while handshaking
4995        let mut n = self.peer_params.issue_cids_limit() - 1;
4996        if let ConnectionSide::Server { server_config } = &self.side {
4997            if server_config.has_preferred_address() {
4998                // We also sent a CID in the transport parameters
4999                n -= 1;
5000            }
5001        }
5002        self.endpoint_events
5003            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5004    }
5005
5006    /// Issues an initial set of CIDs for paths that have not yet had any CIDs issued
5007    ///
5008    /// Later CIDs are issued when CIDs expire or are retired by the peer.
5009    fn issue_first_path_cids(&mut self, now: Instant) {
5010        if let Some(max_path_id) = self.max_path_id() {
5011            let mut path_id = self.max_path_id_with_cids.next();
5012            while path_id <= max_path_id {
5013                self.endpoint_events
5014                    .push_back(EndpointEventInner::NeedIdentifiers(
5015                        path_id,
5016                        now,
5017                        self.peer_params.issue_cids_limit(),
5018                    ));
5019                path_id = path_id.next();
5020            }
5021            self.max_path_id_with_cids = max_path_id;
5022        }
5023    }
5024
5025    /// Populates a packet with frames
5026    ///
5027    /// This tries to fit as many frames as possible into the packet.
5028    ///
5029    /// *path_exclusive_only* means to only build frames which can only be sent on this
5030    /// *path.  This is used in multipath for backup paths while there is still an active
5031    /// *path.
5032    fn populate_packet(
5033        &mut self,
5034        now: Instant,
5035        space_id: SpaceId,
5036        path_id: PathId,
5037        path_exclusive_only: bool,
5038        buf: &mut impl BufMut,
5039        pn: u64,
5040        #[allow(unused)] qlog: &mut QlogSentPacket,
5041    ) -> SentFrames {
5042        let mut sent = SentFrames::default();
5043        let is_multipath_negotiated = self.is_multipath_negotiated();
5044        let space = &mut self.spaces[space_id];
5045        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5046        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5047        space
5048            .for_path(path_id)
5049            .pending_acks
5050            .maybe_ack_non_eliciting();
5051
5052        // HANDSHAKE_DONE
5053        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5054            trace!("HANDSHAKE_DONE");
5055            buf.write(frame::FrameType::HANDSHAKE_DONE);
5056            qlog.frame(&Frame::HandshakeDone);
5057            sent.retransmits.get_or_create().handshake_done = true;
5058            // This is just a u8 counter and the frame is typically just sent once
5059            self.stats.frame_tx.handshake_done =
5060                self.stats.frame_tx.handshake_done.saturating_add(1);
5061        }
5062
5063        // REACH_OUT
5064        // TODO(@divma): path explusive considerations
5065        if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5066            while let Some(local_addr) = addresses.pop() {
5067                let reach_out = frame::ReachOut::new(*round, local_addr);
5068                if buf.remaining_mut() > reach_out.size() {
5069                    trace!(%round, ?local_addr, "REACH_OUT");
5070                    reach_out.write(buf);
5071                    let sent_reachouts = sent
5072                        .retransmits
5073                        .get_or_create()
5074                        .reach_out
5075                        .get_or_insert_with(|| (*round, Default::default()));
5076                    sent_reachouts.1.push(local_addr);
5077                    self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5078                    qlog.frame(&Frame::ReachOut(reach_out));
5079                } else {
5080                    addresses.push(local_addr);
5081                    break;
5082                }
5083            }
5084            if addresses.is_empty() {
5085                space.pending.reach_out = None;
5086            }
5087        }
5088
5089        // OBSERVED_ADDR
5090        if !path_exclusive_only
5091            && space_id == SpaceId::Data
5092            && self
5093                .config
5094                .address_discovery_role
5095                .should_report(&self.peer_params.address_discovery_role)
5096            && (!path.observed_addr_sent || space.pending.observed_addr)
5097        {
5098            let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5099            if buf.remaining_mut() > frame.size() {
5100                trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5101                frame.write(buf);
5102
5103                self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5104                path.observed_addr_sent = true;
5105
5106                self.stats.frame_tx.observed_addr += 1;
5107                sent.retransmits.get_or_create().observed_addr = true;
5108                space.pending.observed_addr = false;
5109                qlog.frame(&Frame::ObservedAddr(frame));
5110            }
5111        }
5112
5113        // PING
5114        if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5115            trace!("PING");
5116            buf.write(frame::FrameType::PING);
5117            sent.non_retransmits = true;
5118            self.stats.frame_tx.ping += 1;
5119            qlog.frame(&Frame::Ping);
5120        }
5121
5122        // IMMEDIATE_ACK
5123        if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5124            debug_assert_eq!(
5125                space_id,
5126                SpaceId::Data,
5127                "immediate acks must be sent in the data space"
5128            );
5129            trace!("IMMEDIATE_ACK");
5130            buf.write(frame::FrameType::IMMEDIATE_ACK);
5131            sent.non_retransmits = true;
5132            self.stats.frame_tx.immediate_ack += 1;
5133            qlog.frame(&Frame::ImmediateAck);
5134        }
5135
5136        // ACK
5137        // TODO(flub): Should this send acks for this path anyway?
5138
5139        if !path_exclusive_only {
5140            for path_id in space
5141                .number_spaces
5142                .iter_mut()
5143                .filter(|(_, pns)| pns.pending_acks.can_send())
5144                .map(|(&path_id, _)| path_id)
5145                .collect::<Vec<_>>()
5146            {
5147                Self::populate_acks(
5148                    now,
5149                    self.receiving_ecn,
5150                    &mut sent,
5151                    path_id,
5152                    space_id,
5153                    space,
5154                    is_multipath_negotiated,
5155                    buf,
5156                    &mut self.stats,
5157                    qlog,
5158                );
5159            }
5160        }
5161
5162        // ACK_FREQUENCY
5163        if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5164            let sequence_number = self.ack_frequency.next_sequence_number();
5165
5166            // Safe to unwrap because this is always provided when ACK frequency is enabled
5167            let config = self.config.ack_frequency_config.as_ref().unwrap();
5168
5169            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
5170            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5171                path.rtt.get(),
5172                config,
5173                &self.peer_params,
5174            );
5175
5176            trace!(?max_ack_delay, "ACK_FREQUENCY");
5177
5178            let frame = frame::AckFrequency {
5179                sequence: sequence_number,
5180                ack_eliciting_threshold: config.ack_eliciting_threshold,
5181                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5182                reordering_threshold: config.reordering_threshold,
5183            };
5184            frame.encode(buf);
5185            qlog.frame(&Frame::AckFrequency(frame));
5186
5187            sent.retransmits.get_or_create().ack_frequency = true;
5188
5189            self.ack_frequency
5190                .ack_frequency_sent(path_id, pn, max_ack_delay);
5191            self.stats.frame_tx.ack_frequency += 1;
5192        }
5193
5194        // PATH_CHALLENGE
5195        if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5196            && space_id == SpaceId::Data
5197            && path.send_new_challenge
5198        {
5199            path.send_new_challenge = false;
5200
5201            // Generate a new challenge every time we send a new PATH_CHALLENGE
5202            let token = self.rng.random();
5203            let info = paths::SentChallengeInfo {
5204                sent_instant: now,
5205                remote: path.remote,
5206            };
5207            path.challenges_sent.insert(token, info);
5208            sent.non_retransmits = true;
5209            sent.requires_padding = true;
5210            let challenge = frame::PathChallenge(token);
5211            trace!(%challenge, "sending new challenge");
5212            buf.write(challenge);
5213            qlog.frame(&Frame::PathChallenge(challenge));
5214            self.stats.frame_tx.path_challenge += 1;
5215            let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5216            self.timers.set(
5217                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5218                now + pto,
5219                self.qlog.with_time(now),
5220            );
5221
5222            if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5223                // queue informing the path status along with the challenge
5224                space.pending.path_status.insert(path_id);
5225            }
5226
5227            // Always include an OBSERVED_ADDR frame with a PATH_CHALLENGE, regardless
5228            // of whether one has already been sent on this path.
5229            if space_id == SpaceId::Data
5230                && self
5231                    .config
5232                    .address_discovery_role
5233                    .should_report(&self.peer_params.address_discovery_role)
5234            {
5235                let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5236                if buf.remaining_mut() > frame.size() {
5237                    frame.write(buf);
5238                    qlog.frame(&Frame::ObservedAddr(frame));
5239
5240                    self.next_observed_addr_seq_no =
5241                        self.next_observed_addr_seq_no.saturating_add(1u8);
5242                    path.observed_addr_sent = true;
5243
5244                    self.stats.frame_tx.observed_addr += 1;
5245                    sent.retransmits.get_or_create().observed_addr = true;
5246                    space.pending.observed_addr = false;
5247                }
5248            }
5249        }
5250
5251        // PATH_RESPONSE
5252        if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5253            if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5254                sent.non_retransmits = true;
5255                sent.requires_padding = true;
5256                let response = frame::PathResponse(token);
5257                trace!(%response, "sending response");
5258                buf.write(response);
5259                qlog.frame(&Frame::PathResponse(response));
5260                self.stats.frame_tx.path_response += 1;
5261
5262                // NOTE: this is technically not required but might be useful to ride the
5263                // request/response nature of path challenges to refresh an observation
5264                // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
5265                if space_id == SpaceId::Data
5266                    && self
5267                        .config
5268                        .address_discovery_role
5269                        .should_report(&self.peer_params.address_discovery_role)
5270                {
5271                    let frame =
5272                        frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5273                    if buf.remaining_mut() > frame.size() {
5274                        frame.write(buf);
5275                        qlog.frame(&Frame::ObservedAddr(frame));
5276
5277                        self.next_observed_addr_seq_no =
5278                            self.next_observed_addr_seq_no.saturating_add(1u8);
5279                        path.observed_addr_sent = true;
5280
5281                        self.stats.frame_tx.observed_addr += 1;
5282                        sent.retransmits.get_or_create().observed_addr = true;
5283                        space.pending.observed_addr = false;
5284                    }
5285                }
5286            }
5287        }
5288
5289        // CRYPTO
5290        while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5291            let mut frame = match space.pending.crypto.pop_front() {
5292                Some(x) => x,
5293                None => break,
5294            };
5295
5296            // Calculate the maximum amount of crypto data we can store in the buffer.
5297            // Since the offset is known, we can reserve the exact size required to encode it.
5298            // For length we reserve 2bytes which allows to encode up to 2^14,
5299            // which is more than what fits into normally sized QUIC frames.
5300            let max_crypto_data_size = buf.remaining_mut()
5301                - 1 // Frame Type
5302                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5303                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
5304
5305            let len = frame
5306                .data
5307                .len()
5308                .min(2usize.pow(14) - 1)
5309                .min(max_crypto_data_size);
5310
5311            let data = frame.data.split_to(len);
5312            let truncated = frame::Crypto {
5313                offset: frame.offset,
5314                data,
5315            };
5316            trace!(
5317                "CRYPTO: off {} len {}",
5318                truncated.offset,
5319                truncated.data.len()
5320            );
5321            truncated.encode(buf);
5322            self.stats.frame_tx.crypto += 1;
5323
5324            // The clone is cheap but we still cfg it out if qlog is disabled.
5325            #[cfg(feature = "qlog")]
5326            qlog.frame(&Frame::Crypto(truncated.clone()));
5327            sent.retransmits.get_or_create().crypto.push_back(truncated);
5328            if !frame.data.is_empty() {
5329                frame.offset += len as u64;
5330                space.pending.crypto.push_front(frame);
5331            }
5332        }
5333
5334        // TODO(flub): maybe this is much higher priority?
5335        // PATH_ABANDON
5336        while !path_exclusive_only
5337            && space_id == SpaceId::Data
5338            && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5339        {
5340            let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5341                break;
5342            };
5343            let frame = frame::PathAbandon {
5344                path_id,
5345                error_code,
5346            };
5347            frame.encode(buf);
5348            qlog.frame(&Frame::PathAbandon(frame));
5349            self.stats.frame_tx.path_abandon += 1;
5350            trace!(%path_id, "PATH_ABANDON");
5351            sent.retransmits
5352                .get_or_create()
5353                .path_abandon
5354                .entry(path_id)
5355                .or_insert(error_code);
5356        }
5357
5358        // PATH_STATUS_AVAILABLE & PATH_STATUS_BACKUP
5359        while !path_exclusive_only
5360            && space_id == SpaceId::Data
5361            && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5362        {
5363            let Some(path_id) = space.pending.path_status.pop_first() else {
5364                break;
5365            };
5366            let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5367                trace!(%path_id, "discarding queued path status for unknown path");
5368                continue;
5369            };
5370
5371            let seq = path.status.seq();
5372            sent.retransmits.get_or_create().path_status.insert(path_id);
5373            match path.local_status() {
5374                PathStatus::Available => {
5375                    let frame = frame::PathStatusAvailable {
5376                        path_id,
5377                        status_seq_no: seq,
5378                    };
5379                    frame.encode(buf);
5380                    qlog.frame(&Frame::PathStatusAvailable(frame));
5381                    self.stats.frame_tx.path_status_available += 1;
5382                    trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5383                }
5384                PathStatus::Backup => {
5385                    let frame = frame::PathStatusBackup {
5386                        path_id,
5387                        status_seq_no: seq,
5388                    };
5389                    frame.encode(buf);
5390                    qlog.frame(&Frame::PathStatusBackup(frame));
5391                    self.stats.frame_tx.path_status_backup += 1;
5392                    trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5393                }
5394            }
5395        }
5396
5397        // MAX_PATH_ID
5398        if space_id == SpaceId::Data
5399            && space.pending.max_path_id
5400            && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5401        {
5402            let frame = frame::MaxPathId(self.local_max_path_id);
5403            frame.encode(buf);
5404            qlog.frame(&Frame::MaxPathId(frame));
5405            space.pending.max_path_id = false;
5406            sent.retransmits.get_or_create().max_path_id = true;
5407            trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5408            self.stats.frame_tx.max_path_id += 1;
5409        }
5410
5411        // PATHS_BLOCKED
5412        if space_id == SpaceId::Data
5413            && space.pending.paths_blocked
5414            && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5415        {
5416            let frame = frame::PathsBlocked(self.remote_max_path_id);
5417            frame.encode(buf);
5418            qlog.frame(&Frame::PathsBlocked(frame));
5419            space.pending.paths_blocked = false;
5420            sent.retransmits.get_or_create().paths_blocked = true;
5421            trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5422            self.stats.frame_tx.paths_blocked += 1;
5423        }
5424
5425        // PATH_CIDS_BLOCKED
5426        while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5427        {
5428            let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5429                break;
5430            };
5431            let next_seq = match self.rem_cids.get(&path_id) {
5432                Some(cid_queue) => cid_queue.active_seq() + 1,
5433                None => 0,
5434            };
5435            let frame = frame::PathCidsBlocked {
5436                path_id,
5437                next_seq: VarInt(next_seq),
5438            };
5439            frame.encode(buf);
5440            qlog.frame(&Frame::PathCidsBlocked(frame));
5441            sent.retransmits
5442                .get_or_create()
5443                .path_cids_blocked
5444                .push(path_id);
5445            trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5446            self.stats.frame_tx.path_cids_blocked += 1;
5447        }
5448
5449        // RESET_STREAM, STOP_SENDING, MAX_DATA, MAX_STREAM_DATA, MAX_STREAMS
5450        if space_id == SpaceId::Data {
5451            self.streams.write_control_frames(
5452                buf,
5453                &mut space.pending,
5454                &mut sent.retransmits,
5455                &mut self.stats.frame_tx,
5456                qlog,
5457            );
5458        }
5459
5460        // NEW_CONNECTION_ID
5461        let cid_len = self
5462            .local_cid_state
5463            .values()
5464            .map(|cid_state| cid_state.cid_len())
5465            .max()
5466            .expect("some local CID state must exist");
5467        let new_cid_size_bound =
5468            frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5469        while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5470            let issued = match space.pending.new_cids.pop() {
5471                Some(x) => x,
5472                None => break,
5473            };
5474            let retire_prior_to = self
5475                .local_cid_state
5476                .get(&issued.path_id)
5477                .map(|cid_state| cid_state.retire_prior_to())
5478                .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5479
5480            let cid_path_id = match is_multipath_negotiated {
5481                true => {
5482                    trace!(
5483                        path_id = ?issued.path_id,
5484                        sequence = issued.sequence,
5485                        id = %issued.id,
5486                        "PATH_NEW_CONNECTION_ID",
5487                    );
5488                    self.stats.frame_tx.path_new_connection_id += 1;
5489                    Some(issued.path_id)
5490                }
5491                false => {
5492                    trace!(
5493                        sequence = issued.sequence,
5494                        id = %issued.id,
5495                        "NEW_CONNECTION_ID"
5496                    );
5497                    debug_assert_eq!(issued.path_id, PathId::ZERO);
5498                    self.stats.frame_tx.new_connection_id += 1;
5499                    None
5500                }
5501            };
5502            let frame = frame::NewConnectionId {
5503                path_id: cid_path_id,
5504                sequence: issued.sequence,
5505                retire_prior_to,
5506                id: issued.id,
5507                reset_token: issued.reset_token,
5508            };
5509            frame.encode(buf);
5510            sent.retransmits.get_or_create().new_cids.push(issued);
5511            qlog.frame(&Frame::NewConnectionId(frame));
5512        }
5513
5514        // RETIRE_CONNECTION_ID
5515        let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5516        while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5517            let (path_id, sequence) = match space.pending.retire_cids.pop() {
5518                Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5519                    trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5520                    self.stats.frame_tx.retire_connection_id += 1;
5521                    (None, seq)
5522                }
5523                Some((path_id, seq)) => {
5524                    trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5525                    self.stats.frame_tx.path_retire_connection_id += 1;
5526                    (Some(path_id), seq)
5527                }
5528                None => break,
5529            };
5530            let frame = frame::RetireConnectionId { path_id, sequence };
5531            frame.encode(buf);
5532            qlog.frame(&Frame::RetireConnectionId(frame));
5533            sent.retransmits
5534                .get_or_create()
5535                .retire_cids
5536                .push((path_id.unwrap_or_default(), sequence));
5537        }
5538
5539        // DATAGRAM
5540        let mut sent_datagrams = false;
5541        while !path_exclusive_only
5542            && buf.remaining_mut() > Datagram::SIZE_BOUND
5543            && space_id == SpaceId::Data
5544        {
5545            let prev_remaining = buf.remaining_mut();
5546            match self.datagrams.write(buf) {
5547                true => {
5548                    sent_datagrams = true;
5549                    sent.non_retransmits = true;
5550                    self.stats.frame_tx.datagram += 1;
5551                    qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5552                }
5553                false => break,
5554            }
5555        }
5556        if self.datagrams.send_blocked && sent_datagrams {
5557            self.events.push_back(Event::DatagramsUnblocked);
5558            self.datagrams.send_blocked = false;
5559        }
5560
5561        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5562
5563        // NEW_TOKEN
5564        while let Some(remote_addr) = space.pending.new_tokens.pop() {
5565            if path_exclusive_only {
5566                break;
5567            }
5568            debug_assert_eq!(space_id, SpaceId::Data);
5569            let ConnectionSide::Server { server_config } = &self.side else {
5570                panic!("NEW_TOKEN frames should not be enqueued by clients");
5571            };
5572
5573            if remote_addr != path.remote {
5574                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
5575                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
5576                // frames upon an path change. Instead, when the new path becomes validated,
5577                // NEW_TOKEN frames may be enqueued for the new path instead.
5578                continue;
5579            }
5580
5581            let token = Token::new(
5582                TokenPayload::Validation {
5583                    ip: remote_addr.ip(),
5584                    issued: server_config.time_source.now(),
5585                },
5586                &mut self.rng,
5587            );
5588            let new_token = NewToken {
5589                token: token.encode(&*server_config.token_key).into(),
5590            };
5591
5592            if buf.remaining_mut() < new_token.size() {
5593                space.pending.new_tokens.push(remote_addr);
5594                break;
5595            }
5596
5597            trace!("NEW_TOKEN");
5598            new_token.encode(buf);
5599            qlog.frame(&Frame::NewToken(new_token));
5600            sent.retransmits
5601                .get_or_create()
5602                .new_tokens
5603                .push(remote_addr);
5604            self.stats.frame_tx.new_token += 1;
5605        }
5606
5607        // STREAM
5608        if !path_exclusive_only && space_id == SpaceId::Data {
5609            sent.stream_frames =
5610                self.streams
5611                    .write_stream_frames(buf, self.config.send_fairness, qlog);
5612            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5613        }
5614
5615        // ADD_ADDRESS
5616        // TODO(@divma): check if we need to do path exclusive filters
5617        while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5618            if let Some(added_address) = space.pending.add_address.pop_last() {
5619                trace!(
5620                    seq = %added_address.seq_no,
5621                    ip = ?added_address.ip,
5622                    port = added_address.port,
5623                    "ADD_ADDRESS",
5624                );
5625                added_address.write(buf);
5626                sent.retransmits
5627                    .get_or_create()
5628                    .add_address
5629                    .insert(added_address);
5630                self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5631                qlog.frame(&Frame::AddAddress(added_address));
5632            } else {
5633                break;
5634            }
5635        }
5636
5637        // REMOVE_ADDRESS
5638        while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5639            if let Some(removed_address) = space.pending.remove_address.pop_last() {
5640                trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5641                removed_address.write(buf);
5642                sent.retransmits
5643                    .get_or_create()
5644                    .remove_address
5645                    .insert(removed_address);
5646                self.stats.frame_tx.remove_address =
5647                    self.stats.frame_tx.remove_address.saturating_add(1);
5648                qlog.frame(&Frame::RemoveAddress(removed_address));
5649            } else {
5650                break;
5651            }
5652        }
5653
5654        sent
5655    }
5656
5657    /// Write pending ACKs into a buffer
5658    fn populate_acks(
5659        now: Instant,
5660        receiving_ecn: bool,
5661        sent: &mut SentFrames,
5662        path_id: PathId,
5663        space_id: SpaceId,
5664        space: &mut PacketSpace,
5665        is_multipath_negotiated: bool,
5666        buf: &mut impl BufMut,
5667        stats: &mut ConnectionStats,
5668        #[allow(unused)] qlog: &mut QlogSentPacket,
5669    ) {
5670        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
5671        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5672
5673        debug_assert!(
5674            is_multipath_negotiated || path_id == PathId::ZERO,
5675            "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5676        );
5677        if is_multipath_negotiated {
5678            debug_assert!(
5679                space_id == SpaceId::Data || path_id == PathId::ZERO,
5680                "path acks must be sent in 1RTT space (have {space_id:?})"
5681            );
5682        }
5683
5684        let pns = space.for_path(path_id);
5685        let ranges = pns.pending_acks.ranges();
5686        debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5687        let ecn = if receiving_ecn {
5688            Some(&pns.ecn_counters)
5689        } else {
5690            None
5691        };
5692        if let Some(max) = ranges.max() {
5693            sent.largest_acked.insert(path_id, max);
5694        }
5695
5696        let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5697        // TODO: This should come from `TransportConfig` if that gets configurable.
5698        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5699        let delay = delay_micros >> ack_delay_exp.into_inner();
5700
5701        if is_multipath_negotiated && space_id == SpaceId::Data {
5702            if !ranges.is_empty() {
5703                trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5704                frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5705                qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5706                stats.frame_tx.path_acks += 1;
5707            }
5708        } else {
5709            trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5710            frame::Ack::encode(delay as _, ranges, ecn, buf);
5711            stats.frame_tx.acks += 1;
5712            qlog.frame_ack(delay, ranges, ecn);
5713        }
5714    }
5715
5716    fn close_common(&mut self) {
5717        trace!("connection closed");
5718        self.timers.reset();
5719    }
5720
5721    fn set_close_timer(&mut self, now: Instant) {
5722        // QUIC-MULTIPATH § 2.6 Connection Closure: draining for 3*PTO with PTO the max of
5723        // the PTO for all paths.
5724        self.timers.set(
5725            Timer::Conn(ConnTimer::Close),
5726            now + 3 * self.pto_max_path(self.highest_space),
5727            self.qlog.with_time(now),
5728        );
5729    }
5730
5731    /// Handle transport parameters received from the peer
5732    ///
5733    /// *rem_cid* and *loc_cid* are the source and destination CIDs respectively of the
5734    /// *packet into which the transport parameters arrived.
5735    fn handle_peer_params(
5736        &mut self,
5737        params: TransportParameters,
5738        loc_cid: ConnectionId,
5739        rem_cid: ConnectionId,
5740        now: Instant,
5741    ) -> Result<(), TransportError> {
5742        if Some(self.orig_rem_cid) != params.initial_src_cid
5743            || (self.side.is_client()
5744                && (Some(self.initial_dst_cid) != params.original_dst_cid
5745                    || self.retry_src_cid != params.retry_src_cid))
5746        {
5747            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5748                "CID authentication failure",
5749            ));
5750        }
5751        if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5752            return Err(TransportError::PROTOCOL_VIOLATION(
5753                "multipath must not use zero-length CIDs",
5754            ));
5755        }
5756
5757        self.set_peer_params(params);
5758        self.qlog.emit_peer_transport_params_received(self, now);
5759
5760        Ok(())
5761    }
5762
5763    fn set_peer_params(&mut self, params: TransportParameters) {
5764        self.streams.set_params(&params);
5765        self.idle_timeout =
5766            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5767        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5768
5769        if let Some(ref info) = params.preferred_address {
5770            // During the handshake PathId::ZERO exists.
5771            self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5772                path_id: None,
5773                sequence: 1,
5774                id: info.connection_id,
5775                reset_token: info.stateless_reset_token,
5776                retire_prior_to: 0,
5777            })
5778            .expect(
5779                "preferred address CID is the first received, and hence is guaranteed to be legal",
5780            );
5781            let remote = self.path_data(PathId::ZERO).remote;
5782            self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5783        }
5784        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
5785
5786        let mut multipath_enabled = None;
5787        if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5788            self.config.get_initial_max_path_id(),
5789            params.initial_max_path_id,
5790        ) {
5791            // multipath is enabled, register the local and remote maximums
5792            self.local_max_path_id = local_max_path_id;
5793            self.remote_max_path_id = remote_max_path_id;
5794            let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5795            debug!(%initial_max_path_id, "multipath negotiated");
5796            multipath_enabled = Some(initial_max_path_id);
5797        }
5798
5799        if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5800            self.config
5801                .max_remote_nat_traversal_addresses
5802                .zip(params.max_remote_nat_traversal_addresses)
5803        {
5804            if let Some(max_initial_paths) =
5805                multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5806            {
5807                let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5808                let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5809                self.iroh_hp =
5810                    iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5811                debug!(
5812                    %max_remote_addresses, %max_local_addresses,
5813                    "iroh hole punching negotiated"
5814                );
5815
5816                match self.side() {
5817                    Side::Client => {
5818                        if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5819                            // in this case the client might try to open `max_remote_addresses` new
5820                            // paths, but the current multipath configuration will not allow it
5821                            warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5822                        } else if max_local_addresses as u64
5823                            > params.active_connection_id_limit.into_inner()
5824                        {
5825                            // the server allows us to send at most `params.active_connection_id_limit`
5826                            // but they might need at least `max_local_addresses` to effectively send
5827                            // `PATH_CHALLENGE` frames to each advertised local address
5828                            warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5829                        }
5830                    }
5831                    Side::Server => {
5832                        if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5833                            warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5834                        }
5835                    }
5836                }
5837            } else {
5838                debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5839            }
5840        }
5841
5842        self.peer_params = params;
5843        let peer_max_udp_payload_size =
5844            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5845        self.path_data_mut(PathId::ZERO)
5846            .mtud
5847            .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5848    }
5849
5850    /// Decrypts a packet, returning the packet number on success
5851    fn decrypt_packet(
5852        &mut self,
5853        now: Instant,
5854        path_id: PathId,
5855        packet: &mut Packet,
5856    ) -> Result<Option<u64>, Option<TransportError>> {
5857        let result = packet_crypto::decrypt_packet_body(
5858            packet,
5859            path_id,
5860            &self.spaces,
5861            self.zero_rtt_crypto.as_ref(),
5862            self.key_phase,
5863            self.prev_crypto.as_ref(),
5864            self.next_crypto.as_ref(),
5865        )?;
5866
5867        let result = match result {
5868            Some(r) => r,
5869            None => return Ok(None),
5870        };
5871
5872        if result.outgoing_key_update_acked {
5873            if let Some(prev) = self.prev_crypto.as_mut() {
5874                prev.end_packet = Some((result.number, now));
5875                self.set_key_discard_timer(now, packet.header.space());
5876            }
5877        }
5878
5879        if result.incoming_key_update {
5880            trace!("key update authenticated");
5881            self.update_keys(Some((result.number, now)), true);
5882            self.set_key_discard_timer(now, packet.header.space());
5883        }
5884
5885        Ok(Some(result.number))
5886    }
5887
5888    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5889        trace!("executing key update");
5890        // Generate keys for the key phase after the one we're switching to, store them in
5891        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
5892        // `prev_crypto`.
5893        let new = self
5894            .crypto
5895            .next_1rtt_keys()
5896            .expect("only called for `Data` packets");
5897        self.key_phase_size = new
5898            .local
5899            .confidentiality_limit()
5900            .saturating_sub(KEY_UPDATE_MARGIN);
5901        let old = mem::replace(
5902            &mut self.spaces[SpaceId::Data]
5903                .crypto
5904                .as_mut()
5905                .unwrap() // safe because update_keys() can only be triggered by short packets
5906                .packet,
5907            mem::replace(self.next_crypto.as_mut().unwrap(), new),
5908        );
5909        self.spaces[SpaceId::Data]
5910            .iter_paths_mut()
5911            .for_each(|s| s.sent_with_keys = 0);
5912        self.prev_crypto = Some(PrevCrypto {
5913            crypto: old,
5914            end_packet,
5915            update_unacked: remote,
5916        });
5917        self.key_phase = !self.key_phase;
5918    }
5919
5920    fn peer_supports_ack_frequency(&self) -> bool {
5921        self.peer_params.min_ack_delay.is_some()
5922    }
5923
5924    /// Send an IMMEDIATE_ACK frame to the remote endpoint
5925    ///
5926    /// According to the spec, this will result in an error if the remote endpoint does not support
5927    /// the Acknowledgement Frequency extension
5928    pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5929        debug_assert_eq!(
5930            self.highest_space,
5931            SpaceId::Data,
5932            "immediate ack must be written in the data space"
5933        );
5934        self.spaces[self.highest_space]
5935            .for_path(path_id)
5936            .immediate_ack_pending = true;
5937    }
5938
5939    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
5940    #[cfg(test)]
5941    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5942        let (path_id, first_decode, remaining) = match &event.0 {
5943            ConnectionEventInner::Datagram(DatagramConnectionEvent {
5944                path_id,
5945                first_decode,
5946                remaining,
5947                ..
5948            }) => (path_id, first_decode, remaining),
5949            _ => return None,
5950        };
5951
5952        if remaining.is_some() {
5953            panic!("Packets should never be coalesced in tests");
5954        }
5955
5956        let decrypted_header = packet_crypto::unprotect_header(
5957            first_decode.clone(),
5958            &self.spaces,
5959            self.zero_rtt_crypto.as_ref(),
5960            self.peer_params.stateless_reset_token,
5961        )?;
5962
5963        let mut packet = decrypted_header.packet?;
5964        packet_crypto::decrypt_packet_body(
5965            &mut packet,
5966            *path_id,
5967            &self.spaces,
5968            self.zero_rtt_crypto.as_ref(),
5969            self.key_phase,
5970            self.prev_crypto.as_ref(),
5971            self.next_crypto.as_ref(),
5972        )
5973        .ok()?;
5974
5975        Some(packet.payload.to_vec())
5976    }
5977
5978    /// The number of bytes of packets containing retransmittable frames that have not been
5979    /// acknowledged or declared lost.
5980    #[cfg(test)]
5981    pub(crate) fn bytes_in_flight(&self) -> u64 {
5982        // TODO(@divma): consider including for multipath?
5983        self.path_data(PathId::ZERO).in_flight.bytes
5984    }
5985
5986    /// Number of bytes worth of non-ack-only packets that may be sent
5987    #[cfg(test)]
5988    pub(crate) fn congestion_window(&self) -> u64 {
5989        let path = self.path_data(PathId::ZERO);
5990        path.congestion
5991            .window()
5992            .saturating_sub(path.in_flight.bytes)
5993    }
5994
5995    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
5996    #[cfg(test)]
5997    pub(crate) fn is_idle(&self) -> bool {
5998        let current_timers = self.timers.values();
5999        current_timers
6000            .into_iter()
6001            .filter(|(timer, _)| {
6002                !matches!(
6003                    timer,
6004                    Timer::Conn(ConnTimer::KeepAlive)
6005                        | Timer::PerPath(_, PathTimer::PathKeepAlive)
6006                        | Timer::Conn(ConnTimer::PushNewCid)
6007                        | Timer::Conn(ConnTimer::KeyDiscard)
6008                )
6009            })
6010            .min_by_key(|(_, time)| *time)
6011            .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6012    }
6013
6014    /// Whether explicit congestion notification is in use on outgoing packets.
6015    #[cfg(test)]
6016    pub(crate) fn using_ecn(&self) -> bool {
6017        self.path_data(PathId::ZERO).sending_ecn
6018    }
6019
6020    /// The number of received bytes in the current path
6021    #[cfg(test)]
6022    pub(crate) fn total_recvd(&self) -> u64 {
6023        self.path_data(PathId::ZERO).total_recvd
6024    }
6025
6026    #[cfg(test)]
6027    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6028        self.local_cid_state
6029            .get(&PathId::ZERO)
6030            .unwrap()
6031            .active_seq()
6032    }
6033
6034    #[cfg(test)]
6035    #[track_caller]
6036    pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6037        self.local_cid_state
6038            .get(&PathId(path_id))
6039            .unwrap()
6040            .active_seq()
6041    }
6042
6043    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
6044    /// with updated `retire_prior_to` field set to `v`
6045    #[cfg(test)]
6046    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6047        let n = self
6048            .local_cid_state
6049            .get_mut(&PathId::ZERO)
6050            .unwrap()
6051            .assign_retire_seq(v);
6052        self.endpoint_events
6053            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6054    }
6055
6056    /// Check the current active remote CID sequence for `PathId::ZERO`
6057    #[cfg(test)]
6058    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6059        self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6060    }
6061
6062    /// Returns the detected maximum udp payload size for the current path
6063    #[cfg(test)]
6064    pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6065        self.path_data(path_id).current_mtu()
6066    }
6067
6068    /// Triggers path validation on all paths
6069    #[cfg(test)]
6070    pub(crate) fn trigger_path_validation(&mut self) {
6071        for path in self.paths.values_mut() {
6072            path.data.send_new_challenge = true;
6073        }
6074    }
6075
6076    /// Whether we have 1-RTT data to send
6077    ///
6078    /// This checks for frames that can only be sent in the data space (1-RTT):
6079    /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated.
6080    /// - Pending PATH_RESPONSE frames.
6081    /// - Pending data to send in STREAM frames.
6082    /// - Pending DATAGRAM frames to send.
6083    ///
6084    /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that
6085    /// may need to be sent.
6086    fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6087        let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6088            path.data.send_new_challenge
6089                || path
6090                    .prev
6091                    .as_ref()
6092                    .is_some_and(|(_, path)| path.send_new_challenge)
6093                || !path.data.path_responses.is_empty()
6094        });
6095        let other = self.streams.can_send_stream_data()
6096            || self
6097                .datagrams
6098                .outgoing
6099                .front()
6100                .is_some_and(|x| x.size(true) <= max_size);
6101        SendableFrames {
6102            acks: false,
6103            other,
6104            close: false,
6105            path_exclusive,
6106        }
6107    }
6108
6109    /// Terminate the connection instantly, without sending a close packet
6110    fn kill(&mut self, reason: ConnectionError) {
6111        self.close_common();
6112        self.state.move_to_drained(Some(reason));
6113        self.endpoint_events.push_back(EndpointEventInner::Drained);
6114    }
6115
6116    /// Storage size required for the largest packet that can be transmitted on all currently
6117    /// available paths
6118    ///
6119    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
6120    ///
6121    /// When multipath is enabled, this value is the minimum MTU across all available paths.
6122    pub fn current_mtu(&self) -> u16 {
6123        self.paths
6124            .iter()
6125            .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6126            .map(|(_path_id, path_state)| path_state.data.current_mtu())
6127            .min()
6128            .expect("There is always at least one available path")
6129    }
6130
6131    /// Size of non-frame data for a 1-RTT packet
6132    ///
6133    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
6134    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
6135    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
6136    /// latency and packet loss.
6137    fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6138        let pn_len = PacketNumber::new(
6139            pn,
6140            self.spaces[SpaceId::Data]
6141                .for_path(path)
6142                .largest_acked_packet
6143                .unwrap_or(0),
6144        )
6145        .len();
6146
6147        // 1 byte for flags
6148        1 + self
6149            .rem_cids
6150            .get(&path)
6151            .map(|cids| cids.active().len())
6152            .unwrap_or(20)      // Max CID len in QUIC v1
6153            + pn_len
6154            + self.tag_len_1rtt()
6155    }
6156
6157    fn predict_1rtt_overhead_no_pn(&self) -> usize {
6158        let pn_len = 4;
6159
6160        let cid_len = self
6161            .rem_cids
6162            .values()
6163            .map(|cids| cids.active().len())
6164            .max()
6165            .unwrap_or(20); // Max CID len in QUIC v1
6166
6167        // 1 byte for flags
6168        1 + cid_len + pn_len + self.tag_len_1rtt()
6169    }
6170
6171    fn tag_len_1rtt(&self) -> usize {
6172        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6173            Some(crypto) => Some(&*crypto.packet.local),
6174            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6175        };
6176        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
6177        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
6178        // but that would needlessly prevent sending datagrams during 0-RTT.
6179        key.map_or(16, |x| x.tag_len())
6180    }
6181
6182    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
6183    fn on_path_validated(&mut self, path_id: PathId) {
6184        self.path_data_mut(path_id).validated = true;
6185        let ConnectionSide::Server { server_config } = &self.side else {
6186            return;
6187        };
6188        let remote_addr = self.path_data(path_id).remote;
6189        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6190        new_tokens.clear();
6191        for _ in 0..server_config.validation_token.sent {
6192            new_tokens.push(remote_addr);
6193        }
6194    }
6195
6196    /// Handle new path status information: PATH_STATUS_AVAILABLE, PATH_STATUS_BACKUP
6197    fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6198        if let Some(path) = self.paths.get_mut(&path_id) {
6199            path.data.status.remote_update(status, status_seq_no);
6200        } else {
6201            debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6202        }
6203        self.events.push_back(
6204            PathEvent::RemoteStatus {
6205                id: path_id,
6206                status,
6207            }
6208            .into(),
6209        );
6210    }
6211
6212    /// Returns the maximum [`PathId`] to be used for sending in this connection.
6213    ///
6214    /// This is calculated as minimum between the local and remote's maximums when multipath is
6215    /// enabled, or `None` when disabled.
6216    ///
6217    /// For data that's received, we should use [`Self::local_max_path_id`] instead.
6218    /// The reasoning is that the remote might already have updated to its own newer
6219    /// [`Self::max_path_id`] after sending out a `MAX_PATH_ID` frame, but it got re-ordered.
6220    fn max_path_id(&self) -> Option<PathId> {
6221        if self.is_multipath_negotiated() {
6222            Some(self.remote_max_path_id.min(self.local_max_path_id))
6223        } else {
6224            None
6225        }
6226    }
6227
6228    /// Add addresses the local endpoint considers are reachable for nat traversal
6229    pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6230        if let Some(added) = self.iroh_hp.add_local_address(address)? {
6231            self.spaces[SpaceId::Data].pending.add_address.insert(added);
6232        };
6233        Ok(())
6234    }
6235
6236    /// Removes an address the endpoing no longer considers reachable for nat traversal
6237    ///
6238    /// Addresses not present in the set will be silently ignored.
6239    pub fn remove_nat_traversal_address(
6240        &mut self,
6241        address: SocketAddr,
6242    ) -> Result<(), iroh_hp::Error> {
6243        if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6244            self.spaces[SpaceId::Data]
6245                .pending
6246                .remove_address
6247                .insert(removed);
6248        }
6249        Ok(())
6250    }
6251
6252    /// Get the current local nat traversal addresses
6253    pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6254        self.iroh_hp.get_local_nat_traversal_addresses()
6255    }
6256
6257    /// Get the currently advertised nat traversal addresses by the server
6258    pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6259        Ok(self
6260            .iroh_hp
6261            .client_side()?
6262            .get_remote_nat_traversal_addresses())
6263    }
6264
6265    /// Initiates a new nat traversal round
6266    ///
6267    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
6268    /// frames, and initiating probing of the known remote addresses. When a new round is
6269    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
6270    ///
6271    /// Returns the server addresses that are now being probed.
6272    pub fn initiate_nat_traversal_round(
6273        &mut self,
6274        now: Instant,
6275    ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6276        let client_state = self.iroh_hp.client_side_mut()?;
6277        let iroh_hp::NatTraversalRound {
6278            new_round,
6279            reach_out_at,
6280            addresses_to_probe,
6281            prev_round_path_ids,
6282        } = client_state.initiate_nat_traversal_round()?;
6283
6284        self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6285
6286        for path_id in prev_round_path_ids {
6287            // TODO(@divma): this sounds reasonable but we need if this actually works for the
6288            // purposes of the protocol
6289            let validated = self
6290                .path(path_id)
6291                .map(|path| path.validated)
6292                .unwrap_or(false);
6293
6294            if !validated {
6295                let _ = self.close_path(
6296                    now,
6297                    path_id,
6298                    TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6299                );
6300            }
6301        }
6302
6303        let mut err = None;
6304
6305        let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6306        let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6307        let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6308
6309        for (ip, port) in addresses_to_probe {
6310            // If this endpoint is an IPv6 endpoint we use IPv6 addresses for all remotes.
6311            let remote = match ip {
6312                IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6313                IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6314                IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6315                IpAddr::V6(_) => {
6316                    trace!("not using IPv6 nat candidate for IPv4 socket");
6317                    continue;
6318                }
6319            };
6320            match self.open_path_ensure(remote, PathStatus::Backup, now) {
6321                Ok((path_id, path_was_known)) if !path_was_known => {
6322                    path_ids.push(path_id);
6323                    probed_addresses.push(remote);
6324                }
6325                Ok((path_id, _)) => {
6326                    trace!(%path_id, %remote,"nat traversal: path existed for remote")
6327                }
6328                Err(e) => {
6329                    debug!(%remote, %e,"nat traversal: failed to probe remote");
6330                    err.get_or_insert(e);
6331                }
6332            }
6333        }
6334
6335        if let Some(err) = err {
6336            // We failed to probe any addresses, bail out
6337            if probed_addresses.is_empty() {
6338                return Err(iroh_hp::Error::Multipath(err));
6339            }
6340        }
6341
6342        self.iroh_hp
6343            .client_side_mut()
6344            .expect("connection side validated")
6345            .set_round_path_ids(path_ids);
6346
6347        Ok(probed_addresses)
6348    }
6349}
6350
6351impl fmt::Debug for Connection {
6352    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6353        f.debug_struct("Connection")
6354            .field("handshake_cid", &self.handshake_cid)
6355            .finish()
6356    }
6357}
6358
6359#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6360enum PathBlocked {
6361    No,
6362    AntiAmplification,
6363    Congestion,
6364    Pacing,
6365}
6366
6367/// Fields of `Connection` specific to it being client-side or server-side
6368enum ConnectionSide {
6369    Client {
6370        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
6371        token: Bytes,
6372        token_store: Arc<dyn TokenStore>,
6373        server_name: String,
6374    },
6375    Server {
6376        server_config: Arc<ServerConfig>,
6377    },
6378}
6379
6380impl ConnectionSide {
6381    fn remote_may_migrate(&self, state: &State) -> bool {
6382        match self {
6383            Self::Server { server_config } => server_config.migration,
6384            Self::Client { .. } => {
6385                if let Some(hs) = state.as_handshake() {
6386                    hs.allow_server_migration
6387                } else {
6388                    false
6389                }
6390            }
6391        }
6392    }
6393
6394    fn is_client(&self) -> bool {
6395        self.side().is_client()
6396    }
6397
6398    fn is_server(&self) -> bool {
6399        self.side().is_server()
6400    }
6401
6402    fn side(&self) -> Side {
6403        match *self {
6404            Self::Client { .. } => Side::Client,
6405            Self::Server { .. } => Side::Server,
6406        }
6407    }
6408}
6409
6410impl From<SideArgs> for ConnectionSide {
6411    fn from(side: SideArgs) -> Self {
6412        match side {
6413            SideArgs::Client {
6414                token_store,
6415                server_name,
6416            } => Self::Client {
6417                token: token_store.take(&server_name).unwrap_or_default(),
6418                token_store,
6419                server_name,
6420            },
6421            SideArgs::Server {
6422                server_config,
6423                pref_addr_cid: _,
6424                path_validated: _,
6425            } => Self::Server { server_config },
6426        }
6427    }
6428}
6429
6430/// Parameters to `Connection::new` specific to it being client-side or server-side
6431pub(crate) enum SideArgs {
6432    Client {
6433        token_store: Arc<dyn TokenStore>,
6434        server_name: String,
6435    },
6436    Server {
6437        server_config: Arc<ServerConfig>,
6438        pref_addr_cid: Option<ConnectionId>,
6439        path_validated: bool,
6440    },
6441}
6442
6443impl SideArgs {
6444    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6445        match *self {
6446            Self::Client { .. } => None,
6447            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6448        }
6449    }
6450
6451    pub(crate) fn path_validated(&self) -> bool {
6452        match *self {
6453            Self::Client { .. } => true,
6454            Self::Server { path_validated, .. } => path_validated,
6455        }
6456    }
6457
6458    pub(crate) fn side(&self) -> Side {
6459        match *self {
6460            Self::Client { .. } => Side::Client,
6461            Self::Server { .. } => Side::Server,
6462        }
6463    }
6464}
6465
6466/// Reasons why a connection might be lost
6467#[derive(Debug, Error, Clone, PartialEq, Eq)]
6468pub enum ConnectionError {
6469    /// The peer doesn't implement any supported version
6470    #[error("peer doesn't implement any supported version")]
6471    VersionMismatch,
6472    /// The peer violated the QUIC specification as understood by this implementation
6473    #[error(transparent)]
6474    TransportError(#[from] TransportError),
6475    /// The peer's QUIC stack aborted the connection automatically
6476    #[error("aborted by peer: {0}")]
6477    ConnectionClosed(frame::ConnectionClose),
6478    /// The peer closed the connection
6479    #[error("closed by peer: {0}")]
6480    ApplicationClosed(frame::ApplicationClose),
6481    /// The peer is unable to continue processing this connection, usually due to having restarted
6482    #[error("reset by peer")]
6483    Reset,
6484    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
6485    ///
6486    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
6487    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
6488    /// and [`TransportConfig::keep_alive_interval()`].
6489    #[error("timed out")]
6490    TimedOut,
6491    /// The local application closed the connection
6492    #[error("closed")]
6493    LocallyClosed,
6494    /// The connection could not be created because not enough of the CID space is available
6495    ///
6496    /// Try using longer connection IDs.
6497    #[error("CIDs exhausted")]
6498    CidsExhausted,
6499}
6500
6501impl From<Close> for ConnectionError {
6502    fn from(x: Close) -> Self {
6503        match x {
6504            Close::Connection(reason) => Self::ConnectionClosed(reason),
6505            Close::Application(reason) => Self::ApplicationClosed(reason),
6506        }
6507    }
6508}
6509
6510// For compatibility with API consumers
6511impl From<ConnectionError> for io::Error {
6512    fn from(x: ConnectionError) -> Self {
6513        use ConnectionError::*;
6514        let kind = match x {
6515            TimedOut => io::ErrorKind::TimedOut,
6516            Reset => io::ErrorKind::ConnectionReset,
6517            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6518            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6519                io::ErrorKind::Other
6520            }
6521        };
6522        Self::new(kind, x)
6523    }
6524}
6525
6526/// Errors that might trigger a path being closed
6527// TODO(@divma): maybe needs to be reworked based on what we want to do with the public API
6528#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6529pub enum PathError {
6530    /// The extension was not negotiated with the peer
6531    #[error("multipath extension not negotiated")]
6532    MultipathNotNegotiated,
6533    /// Paths can only be opened client-side
6534    #[error("the server side may not open a path")]
6535    ServerSideNotAllowed,
6536    /// Current limits do not allow us to open more paths
6537    #[error("maximum number of concurrent paths reached")]
6538    MaxPathIdReached,
6539    /// No remote CIDs available to open a new path
6540    #[error("remoted CIDs exhausted")]
6541    RemoteCidsExhausted,
6542    /// Path could not be validated and will be abandoned
6543    #[error("path validation failed")]
6544    ValidationFailed,
6545    /// The remote address for the path is not supported by the endpoint
6546    #[error("invalid remote address")]
6547    InvalidRemoteAddress(SocketAddr),
6548}
6549
6550/// Errors triggered when abandoning a path
6551#[derive(Debug, Error, Clone, Eq, PartialEq)]
6552pub enum ClosePathError {
6553    /// The path is already closed or was never opened
6554    #[error("closed path")]
6555    ClosedPath,
6556    /// This is the last path, which can not be abandoned
6557    #[error("last open path")]
6558    LastOpenPath,
6559}
6560
6561#[derive(Debug, Error, Clone, Copy)]
6562#[error("Multipath extension not negotiated")]
6563pub struct MultipathNotNegotiated {
6564    _private: (),
6565}
6566
6567/// Events of interest to the application
6568#[derive(Debug)]
6569pub enum Event {
6570    /// The connection's handshake data is ready
6571    HandshakeDataReady,
6572    /// The connection was successfully established
6573    Connected,
6574    /// The TLS handshake was confirmed
6575    HandshakeConfirmed,
6576    /// The connection was lost
6577    ///
6578    /// Emitted if the peer closes the connection or an error is encountered.
6579    ConnectionLost {
6580        /// Reason that the connection was closed
6581        reason: ConnectionError,
6582    },
6583    /// Stream events
6584    Stream(StreamEvent),
6585    /// One or more application datagrams have been received
6586    DatagramReceived,
6587    /// One or more application datagrams have been sent after blocking
6588    DatagramsUnblocked,
6589    /// (Multi)Path events
6590    Path(PathEvent),
6591    /// Iroh's nat traversal events
6592    NatTraversal(iroh_hp::Event),
6593}
6594
6595impl From<PathEvent> for Event {
6596    fn from(source: PathEvent) -> Self {
6597        Self::Path(source)
6598    }
6599}
6600
6601fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6602    Duration::from_micros(params.max_ack_delay.0 * 1000)
6603}
6604
6605// Prevents overflow and improves behavior in extreme circumstances
6606const MAX_BACKOFF_EXPONENT: u32 = 16;
6607
6608/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
6609///
6610/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
6611/// plus some space for frames. We only care about handshake headers because short header packets
6612/// necessarily have smaller headers, and initial packets are only ever the first packet in a
6613/// datagram (because we coalesce in ascending packet space order and the only reason to split a
6614/// packet is when packet space changes).
6615const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6616
6617/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
6618///
6619/// Excludes packet-type-specific fields such as packet number or Initial token
6620// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
6621// scid len + scid + length + pn
6622const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6623    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6624
6625/// Perform key updates this many packets before the AEAD confidentiality limit.
6626///
6627/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
6628const KEY_UPDATE_MARGIN: u64 = 10_000;
6629
6630#[derive(Default)]
6631struct SentFrames {
6632    retransmits: ThinRetransmits,
6633    /// The packet number of the largest acknowledged packet for each path
6634    largest_acked: FxHashMap<PathId, u64>,
6635    stream_frames: StreamMetaVec,
6636    /// Whether the packet contains non-retransmittable frames (like datagrams)
6637    non_retransmits: bool,
6638    /// If the datagram containing these frames should be padded to the min MTU
6639    requires_padding: bool,
6640}
6641
6642impl SentFrames {
6643    /// Returns whether the packet contains only ACKs
6644    fn is_ack_only(&self, streams: &StreamsState) -> bool {
6645        !self.largest_acked.is_empty()
6646            && !self.non_retransmits
6647            && self.stream_frames.is_empty()
6648            && self.retransmits.is_empty(streams)
6649    }
6650}
6651
6652/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
6653///
6654/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
6655///
6656/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
6657///
6658/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
6659fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6660    match (x, y) {
6661        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6662        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6663        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6664        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6665    }
6666}
6667
6668#[cfg(test)]
6669mod tests {
6670    use super::*;
6671
6672    #[test]
6673    fn negotiate_max_idle_timeout_commutative() {
6674        let test_params = [
6675            (None, None, None),
6676            (None, Some(VarInt(0)), None),
6677            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6678            (Some(VarInt(0)), Some(VarInt(0)), None),
6679            (
6680                Some(VarInt(2)),
6681                Some(VarInt(0)),
6682                Some(Duration::from_millis(2)),
6683            ),
6684            (
6685                Some(VarInt(1)),
6686                Some(VarInt(4)),
6687                Some(Duration::from_millis(1)),
6688            ),
6689        ];
6690
6691        for (left, right, result) in test_params {
6692            assert_eq!(negotiate_max_idle_timeout(left, right), result);
6693            assert_eq!(negotiate_max_idle_timeout(right, left), result);
6694        }
6695    }
6696}