iroh_quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque, btree_map},
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    num::{NonZeroU32, NonZeroUsize},
8    ops::Not,
9    sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21    Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
22    MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
23    TransportError, TransportErrorCode, VarInt,
24    cid_generator::ConnectionIdGenerator,
25    cid_queue::CidQueue,
26    config::{ServerConfig, TransportConfig},
27    congestion::Controller,
28    connection::{
29        qlog::{QlogRecvPacket, QlogSink},
30        spaces::LostPacket,
31        timer::{ConnTimer, PathTimer},
32    },
33    crypto::{self, KeyPair, Keys, PacketKey},
34    frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
35    iroh_hp,
36    packet::{
37        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
38        PacketNumber, PartialDecode, SpaceId,
39    },
40    range_set::ArrayRangeSet,
41    shared::{
42        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
43        EndpointEvent, EndpointEventInner,
44    },
45    token::{ResetToken, Token, TokenPayload},
46    transport_parameters::TransportParameters,
47};
48
49mod ack_frequency;
50use ack_frequency::AckFrequencyState;
51
52mod assembler;
53pub use assembler::Chunk;
54
55mod cid_state;
56use cid_state::CidState;
57
58mod datagrams;
59use datagrams::DatagramState;
60pub use datagrams::{Datagrams, SendDatagramError};
61
62mod mtud;
63mod pacing;
64
65mod packet_builder;
66use packet_builder::{PacketBuilder, PadDatagram};
67
68mod packet_crypto;
69use packet_crypto::{PrevCrypto, ZeroRttCrypto};
70
71mod paths;
72pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
73use paths::{PathData, PathState};
74
75pub(crate) mod qlog;
76pub(crate) mod send_buffer;
77
78mod spaces;
79#[cfg(fuzzing)]
80pub use spaces::Retransmits;
81#[cfg(not(fuzzing))]
82use spaces::Retransmits;
83use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
84
85mod stats;
86pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
87
88mod streams;
89#[cfg(fuzzing)]
90pub use streams::StreamsState;
91#[cfg(not(fuzzing))]
92use streams::StreamsState;
93pub use streams::{
94    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
95    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
96};
97
98mod timer;
99use timer::{Timer, TimerTable};
100
101mod transmit_buf;
102use transmit_buf::TransmitBuf;
103
104mod state;
105
106#[cfg(not(fuzzing))]
107use state::State;
108#[cfg(fuzzing)]
109pub use state::State;
110use state::StateType;
111
112/// Protocol state and logic for a single QUIC connection
113///
114/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
115/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
116/// expects timeouts through various methods. A number of simple getter methods are exposed
117/// to allow callers to inspect some of the connection state.
118///
119/// `Connection` has roughly 4 types of methods:
120///
121/// - A. Simple getters, taking `&self`
122/// - B. Handlers for incoming events from the network or system, named `handle_*`.
123/// - C. State machine mutators, for incoming commands from the application. For convenience we
124///   refer to this as "performing I/O" below, however as per the design of this library none of the
125///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
126///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
127/// - D. Polling functions for outgoing events or actions for the caller to
128///   take, named `poll_*`.
129///
130/// The simplest way to use this API correctly is to call (B) and (C) whenever
131/// appropriate, then after each of those calls, as soon as feasible call all
132/// polling methods (D) and deal with their outputs appropriately, e.g. by
133/// passing it to the application or by making a system-level I/O call. You
134/// should call the polling functions in this order:
135///
136/// 1. [`poll_transmit`](Self::poll_transmit)
137/// 2. [`poll_timeout`](Self::poll_timeout)
138/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
139/// 4. [`poll`](Self::poll)
140///
141/// Currently the only actual dependency is from (2) to (1), however additional
142/// dependencies may be added in future, so the above order is recommended.
143///
144/// (A) may be called whenever desired.
145///
146/// Care should be made to ensure that the input events represent monotonically
147/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
148/// with events of the same [`Instant`] may be interleaved in any order with a
149/// call to [`handle_event`](Self::handle_event) at that same instant; however
150/// events or timeouts with different instants must not be interleaved.
151pub struct Connection {
152    endpoint_config: Arc<EndpointConfig>,
153    config: Arc<TransportConfig>,
154    rng: StdRng,
155    crypto: Box<dyn crypto::Session>,
156    /// The CID we initially chose, for use during the handshake
157    handshake_cid: ConnectionId,
158    /// The CID the peer initially chose, for use during the handshake
159    rem_handshake_cid: ConnectionId,
160    /// The [`PathData`] for each path
161    ///
162    /// This needs to be ordered because [`Connection::poll_transmit`] needs to
163    /// deterministically select the next PathId to send on.
164    // TODO(flub): well does it really? But deterministic is nice for now.
165    paths: BTreeMap<PathId, PathState>,
166    /// Counter to uniquely identify every [`PathData`] created in this connection.
167    ///
168    /// Each [`PathData`] gets a [`PathData::generation`] that is unique among all
169    /// [`PathData`]s created in the lifetime of this connection. This helps identify the
170    /// correct path when RFC9000-style migrations happen, even when they are
171    /// aborted.
172    ///
173    /// Multipath does not change this, each path can also undergo RFC9000-style
174    /// migrations. So a single multipath path ID could see several [`PathData`]s each with
175    /// their unique [`PathData::generation].
176    path_generation_counter: u64,
177    /// Whether MTU detection is supported in this environment
178    allow_mtud: bool,
179    state: State,
180    side: ConnectionSide,
181    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
182    zero_rtt_enabled: bool,
183    /// Set if 0-RTT is supported, then cleared when no longer needed.
184    zero_rtt_crypto: Option<ZeroRttCrypto>,
185    key_phase: bool,
186    /// How many packets are in the current key phase. Used only for `Data` space.
187    key_phase_size: u64,
188    /// Transport parameters set by the peer
189    peer_params: TransportParameters,
190    /// Source ConnectionId of the first packet received from the peer
191    orig_rem_cid: ConnectionId,
192    /// Destination ConnectionId sent by the client on the first Initial
193    initial_dst_cid: ConnectionId,
194    /// The value that the server included in the Source Connection ID field of a Retry packet, if
195    /// one was received
196    retry_src_cid: Option<ConnectionId>,
197    /// Events returned by [`Connection::poll`]
198    events: VecDeque<Event>,
199    endpoint_events: VecDeque<EndpointEventInner>,
200    /// Whether the spin bit is in use for this connection
201    spin_enabled: bool,
202    /// Outgoing spin bit state
203    spin: bool,
204    /// Packet number spaces: initial, handshake, 1-RTT
205    spaces: [PacketSpace; 3],
206    /// Highest usable [`SpaceId`]
207    highest_space: SpaceId,
208    /// 1-RTT keys used prior to a key update
209    prev_crypto: Option<PrevCrypto>,
210    /// 1-RTT keys to be used for the next key update
211    ///
212    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
213    /// spoofing key updates.
214    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
215    accepted_0rtt: bool,
216    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
217    permit_idle_reset: bool,
218    /// Negotiated idle timeout
219    idle_timeout: Option<Duration>,
220    timers: TimerTable,
221    /// Number of packets received which could not be authenticated
222    authentication_failures: u64,
223
224    //
225    // Queued non-retransmittable 1-RTT data
226    //
227    /// If the CONNECTION_CLOSE frame needs to be sent
228    close: bool,
229
230    //
231    // ACK frequency
232    //
233    ack_frequency: AckFrequencyState,
234
235    //
236    // Congestion Control
237    //
238    /// Whether the most recently received packet had an ECN codepoint set
239    receiving_ecn: bool,
240    /// Number of packets authenticated
241    total_authed_packets: u64,
242    /// Whether the last `poll_transmit` call yielded no data because there was
243    /// no outgoing application data.
244    app_limited: bool,
245
246    //
247    // ObservedAddr
248    //
249    /// Sequence number for the next observed address frame sent to the peer.
250    next_observed_addr_seq_no: VarInt,
251
252    streams: StreamsState,
253    /// Surplus remote CIDs for future use on new paths
254    ///
255    /// These are given out before multiple paths exist, also for paths that will never
256    /// exist.  So if multipath is supported the number of paths here will be higher than
257    /// the actual number of paths in use.
258    rem_cids: FxHashMap<PathId, CidQueue>,
259    /// Attributes of CIDs generated by local endpoint
260    ///
261    /// Any path that is allowed to be opened is present in this map, as well as the already
262    /// opened paths. However since CIDs are issued async by the endpoint driver via
263    /// connection events it can not be used to know if CIDs have been issued for a path or
264    /// not. See [`Connection::max_path_id_with_cids`] for this.
265    local_cid_state: FxHashMap<PathId, CidState>,
266    /// State of the unreliable datagram extension
267    datagrams: DatagramState,
268    /// Connection level statistics
269    stats: ConnectionStats,
270    /// Path level statistics
271    path_stats: FxHashMap<PathId, PathStats>,
272    /// QUIC version used for the connection.
273    version: u32,
274
275    //
276    // Multipath
277    //
278    /// Maximum number of concurrent paths
279    ///
280    /// Initially set from the [`TransportConfig::max_concurrent_multipath_paths`]. Even
281    /// when multipath is disabled this will be set to 1, it is not used in that case
282    /// though.
283    max_concurrent_paths: NonZeroU32,
284    /// Local maximum [`PathId`] to be used
285    ///
286    /// This is initially set to [`TransportConfig::get_initial_max_path_id`] when multipath
287    /// is negotiated, or to [`PathId::ZERO`] otherwise. This is essentially the value of
288    /// the highest MAX_PATH_ID frame sent.
289    ///
290    /// Any path with an ID equal or below this [`PathId`] is either:
291    ///
292    /// - Abandoned, if it is also in [`Connection::abandoned_paths`].
293    /// - Open, in this case it is present in [`Connection::paths`]
294    /// - Not yet opened, if it is in neither of these two places.
295    ///
296    /// Note that for not-yet-open there may or may not be any CIDs issued. See
297    /// [`Connection::max_path_id_with_cids`].
298    local_max_path_id: PathId,
299    /// Remote's maximum [`PathId`] to be used
300    ///
301    /// This is initially set to the peer's [`TransportParameters::initial_max_path_id`] when
302    /// multipath is negotiated, or to [`PathId::ZERO`] otherwise. A peer may increase this limit
303    /// by sending [`Frame::MaxPathId`] frames.
304    remote_max_path_id: PathId,
305    /// The greatest [`PathId`] we have issued CIDs for
306    ///
307    /// CIDs are only issued for `min(local_max_path_id, remote_max_path_id)`. It is not
308    /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
309    /// since they are issued asynchronously by the endpoint driver.
310    max_path_id_with_cids: PathId,
311    /// The paths already abandoned
312    ///
313    /// They may still have some state left in [`Connection::paths`] or
314    /// [`Connection::local_cid_state`] since some of this has to be kept around for some
315    /// time after a path is abandoned.
316    // TODO(flub): Make this a more efficient data structure.  Like ranges of abandoned
317    //    paths.  Or a set together with a minimum.  Or something.
318    abandoned_paths: FxHashSet<PathId>,
319
320    iroh_hp: iroh_hp::State,
321    qlog: QlogSink,
322}
323
324impl Connection {
325    pub(crate) fn new(
326        endpoint_config: Arc<EndpointConfig>,
327        config: Arc<TransportConfig>,
328        init_cid: ConnectionId,
329        loc_cid: ConnectionId,
330        rem_cid: ConnectionId,
331        network_path: FourTuple,
332        crypto: Box<dyn crypto::Session>,
333        cid_gen: &dyn ConnectionIdGenerator,
334        now: Instant,
335        version: u32,
336        allow_mtud: bool,
337        rng_seed: [u8; 32],
338        side_args: SideArgs,
339        qlog: QlogSink,
340    ) -> Self {
341        let pref_addr_cid = side_args.pref_addr_cid();
342        let path_validated = side_args.path_validated();
343        let connection_side = ConnectionSide::from(side_args);
344        let side = connection_side.side();
345        let mut rng = StdRng::from_seed(rng_seed);
346        let initial_space = {
347            let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
348            space.crypto = Some(crypto.initial_keys(init_cid, side));
349            space
350        };
351        let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
352        #[cfg(test)]
353        let data_space = match config.deterministic_packet_numbers {
354            true => PacketSpace::new_deterministic(now, SpaceId::Data),
355            false => PacketSpace::new(now, SpaceId::Data, &mut rng),
356        };
357        #[cfg(not(test))]
358        let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
359        let state = State::handshake(state::Handshake {
360            rem_cid_set: side.is_server(),
361            expected_token: Bytes::new(),
362            client_hello: None,
363            allow_server_migration: side.is_client(),
364        });
365        let local_cid_state = FxHashMap::from_iter([(
366            PathId::ZERO,
367            CidState::new(
368                cid_gen.cid_len(),
369                cid_gen.cid_lifetime(),
370                now,
371                if pref_addr_cid.is_some() { 2 } else { 1 },
372            ),
373        )]);
374
375        let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
376        // TODO(@divma): consider if we want to delay this until the path is validated
377        path.open = true;
378        let mut this = Self {
379            endpoint_config,
380            crypto,
381            handshake_cid: loc_cid,
382            rem_handshake_cid: rem_cid,
383            local_cid_state,
384            paths: BTreeMap::from_iter([(
385                PathId::ZERO,
386                PathState {
387                    data: path,
388                    prev: None,
389                },
390            )]),
391            path_generation_counter: 0,
392            allow_mtud,
393            state,
394            side: connection_side,
395            zero_rtt_enabled: false,
396            zero_rtt_crypto: None,
397            key_phase: false,
398            // A small initial key phase size ensures peers that don't handle key updates correctly
399            // fail sooner rather than later. It's okay for both peers to do this, as the first one
400            // to perform an update will reset the other's key phase size in `update_keys`, and a
401            // simultaneous key update by both is just like a regular key update with a really fast
402            // response. Inspired by quic-go's similar behavior of performing the first key update
403            // at the 100th short-header packet.
404            key_phase_size: rng.random_range(10..1000),
405            peer_params: TransportParameters::default(),
406            orig_rem_cid: rem_cid,
407            initial_dst_cid: init_cid,
408            retry_src_cid: None,
409            events: VecDeque::new(),
410            endpoint_events: VecDeque::new(),
411            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
412            spin: false,
413            spaces: [initial_space, handshake_space, data_space],
414            highest_space: SpaceId::Initial,
415            prev_crypto: None,
416            next_crypto: None,
417            accepted_0rtt: false,
418            permit_idle_reset: true,
419            idle_timeout: match config.max_idle_timeout {
420                None | Some(VarInt(0)) => None,
421                Some(dur) => Some(Duration::from_millis(dur.0)),
422            },
423            timers: TimerTable::default(),
424            authentication_failures: 0,
425            close: false,
426
427            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
428                &TransportParameters::default(),
429            )),
430
431            app_limited: false,
432            receiving_ecn: false,
433            total_authed_packets: 0,
434
435            next_observed_addr_seq_no: 0u32.into(),
436
437            streams: StreamsState::new(
438                side,
439                config.max_concurrent_uni_streams,
440                config.max_concurrent_bidi_streams,
441                config.send_window,
442                config.receive_window,
443                config.stream_receive_window,
444            ),
445            datagrams: DatagramState::default(),
446            config,
447            rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
448            rng,
449            stats: ConnectionStats::default(),
450            path_stats: Default::default(),
451            version,
452
453            // peer params are not yet known, so multipath is not enabled
454            max_concurrent_paths: NonZeroU32::MIN,
455            local_max_path_id: PathId::ZERO,
456            remote_max_path_id: PathId::ZERO,
457            max_path_id_with_cids: PathId::ZERO,
458            abandoned_paths: Default::default(),
459
460            // iroh's nat traversal
461            iroh_hp: Default::default(),
462            qlog,
463        };
464        if path_validated {
465            this.on_path_validated(PathId::ZERO);
466        }
467        if side.is_client() {
468            // Kick off the connection
469            this.write_crypto();
470            this.init_0rtt(now);
471        }
472        this.qlog
473            .emit_tuple_assigned(PathId::ZERO, network_path, now);
474        this
475    }
476
477    /// Returns the next time at which `handle_timeout` should be called
478    ///
479    /// The value returned may change after:
480    /// - the application performed some I/O on the connection
481    /// - a call was made to `handle_event`
482    /// - a call to `poll_transmit` returned `Some`
483    /// - a call was made to `handle_timeout`
484    #[must_use]
485    pub fn poll_timeout(&mut self) -> Option<Instant> {
486        self.timers.peek()
487    }
488
489    /// Returns application-facing events
490    ///
491    /// Connections should be polled for events after:
492    /// - a call was made to `handle_event`
493    /// - a call was made to `handle_timeout`
494    #[must_use]
495    pub fn poll(&mut self) -> Option<Event> {
496        if let Some(x) = self.events.pop_front() {
497            return Some(x);
498        }
499
500        if let Some(event) = self.streams.poll() {
501            return Some(Event::Stream(event));
502        }
503
504        if let Some(reason) = self.state.take_error() {
505            return Some(Event::ConnectionLost { reason });
506        }
507
508        None
509    }
510
511    /// Return endpoint-facing events
512    #[must_use]
513    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
514        self.endpoint_events.pop_front().map(EndpointEvent)
515    }
516
517    /// Provide control over streams
518    #[must_use]
519    pub fn streams(&mut self) -> Streams<'_> {
520        Streams {
521            state: &mut self.streams,
522            conn_state: &self.state,
523        }
524    }
525
526    /// Provide control over streams
527    #[must_use]
528    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
529        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
530        RecvStream {
531            id,
532            state: &mut self.streams,
533            pending: &mut self.spaces[SpaceId::Data].pending,
534        }
535    }
536
537    /// Provide control over streams
538    #[must_use]
539    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
540        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
541        SendStream {
542            id,
543            state: &mut self.streams,
544            pending: &mut self.spaces[SpaceId::Data].pending,
545            conn_state: &self.state,
546        }
547    }
548
549    /// Opens a new path only if no path on the same network path currently exists.
550    ///
551    /// This comparison will use [`FourTuple::is_probably_same_path`] on the given `network_path`
552    /// and pass it existing path's network paths.
553    ///
554    /// This means that you can pass `local_ip: None` to make the comparison only compare
555    /// remote addresses.
556    ///
557    /// This avoids having to guess which local interface will be used to communicate with the
558    /// remote, should it not be known yet. We assume that if we already have a path to the remote,
559    /// the OS is likely to use the same interface to talk to said remote.
560    ///
561    /// See also [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id,
562    /// false)` if was opened.
563    ///
564    /// [`open_path`]: Connection::open_path
565    pub fn open_path_ensure(
566        &mut self,
567        network_path: FourTuple,
568        initial_status: PathStatus,
569        now: Instant,
570    ) -> Result<(PathId, bool), PathError> {
571        Ok(
572            match self
573                .paths
574                .iter()
575                .find(|(_id, path)| network_path.is_probably_same_path(&path.data.network_path))
576            {
577                Some((path_id, _state)) => (*path_id, true),
578                None => (self.open_path(network_path, initial_status, now)?, false),
579            },
580        )
581    }
582
583    /// Opens a new path
584    ///
585    /// Further errors might occur and they will be emitted in [`PathEvent::LocallyClosed`] events.
586    /// When the path is opened it will be reported as an [`PathEvent::Opened`].
587    pub fn open_path(
588        &mut self,
589        network_path: FourTuple,
590        initial_status: PathStatus,
591        now: Instant,
592    ) -> Result<PathId, PathError> {
593        if !self.is_multipath_negotiated() {
594            return Err(PathError::MultipathNotNegotiated);
595        }
596        if self.side().is_server() {
597            return Err(PathError::ServerSideNotAllowed);
598        }
599
600        let max_abandoned = self.abandoned_paths.iter().max().copied();
601        let max_used = self.paths.keys().last().copied();
602        let path_id = max_abandoned
603            .max(max_used)
604            .unwrap_or(PathId::ZERO)
605            .saturating_add(1u8);
606
607        if Some(path_id) > self.max_path_id() {
608            return Err(PathError::MaxPathIdReached);
609        }
610        if path_id > self.remote_max_path_id {
611            self.spaces[SpaceId::Data].pending.paths_blocked = true;
612            return Err(PathError::MaxPathIdReached);
613        }
614        if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
615            self.spaces[SpaceId::Data]
616                .pending
617                .path_cids_blocked
618                .insert(path_id);
619            return Err(PathError::RemoteCidsExhausted);
620        }
621
622        let path = self.ensure_path(path_id, network_path, now, None);
623        path.status.local_update(initial_status);
624
625        Ok(path_id)
626    }
627
628    /// Closes a path by sending a PATH_ABANDON frame
629    ///
630    /// This will not allow closing the last path. It does allow closing paths which have
631    /// not yet been opened, as e.g. is the case when receiving a PATH_ABANDON from the peer
632    /// for a path that was never opened locally.
633    pub fn close_path(
634        &mut self,
635        now: Instant,
636        path_id: PathId,
637        error_code: VarInt,
638    ) -> Result<(), ClosePathError> {
639        if self.abandoned_paths.contains(&path_id)
640            || Some(path_id) > self.max_path_id()
641            || !self.paths.contains_key(&path_id)
642        {
643            return Err(ClosePathError::ClosedPath);
644        }
645        if self
646            .paths
647            .iter()
648            // Would there be any remaining, non-abandoned, validated paths
649            .any(|(id, path)| {
650                *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
651            })
652            .not()
653        {
654            return Err(ClosePathError::LastOpenPath);
655        }
656
657        // Send PATH_ABANDON
658        self.spaces[SpaceId::Data]
659            .pending
660            .path_abandon
661            .insert(path_id, error_code.into());
662
663        // Remove pending NEW CIDs for this path
664        let pending_space = &mut self.spaces[SpaceId::Data].pending;
665        pending_space.new_cids.retain(|cid| cid.path_id != path_id);
666        pending_space.path_cids_blocked.retain(|&id| id != path_id);
667        pending_space.path_status.retain(|&id| id != path_id);
668
669        // Cleanup retransmits across ALL paths (CIDs for path_id may have been transmitted on other paths)
670        for space in self.spaces[SpaceId::Data].iter_paths_mut() {
671            for sent_packet in space.sent_packets.values_mut() {
672                if let Some(retransmits) = sent_packet.retransmits.get_mut() {
673                    retransmits.new_cids.retain(|cid| cid.path_id != path_id);
674                    retransmits.path_cids_blocked.retain(|&id| id != path_id);
675                    retransmits.path_status.retain(|&id| id != path_id);
676                }
677            }
678        }
679
680        // Consider remotely issued CIDs as retired.
681        // Technically we don't have to do this just yet.  We only need to do this *after*
682        // the ABANDON_PATH frame is sent, allowing us to still send it on the
683        // to-be-abandoned path.  However it is recommended to send it on another path, and
684        // we do not allow abandoning the last path anyway.
685        self.rem_cids.remove(&path_id);
686        self.endpoint_events
687            .push_back(EndpointEventInner::RetireResetToken(path_id));
688
689        let pto = self.pto_max_path(SpaceId::Data, false);
690
691        let path = self.paths.get_mut(&path_id).expect("checked above");
692
693        // We record the time after which receiving data on this path generates a transport error.
694        path.data.last_allowed_receive = Some(now + 3 * pto);
695        self.abandoned_paths.insert(path_id);
696
697        self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
698
699        // The peer MUST respond with a corresponding PATH_ABANDON frame.
700        // If we receive packets on the abandoned path after 3 * PTO we trigger a transport error.
701        // In any case, we completely discard the path after 6 * PTO whether we receive a
702        // PATH_ABANDON or not.
703        self.timers.set(
704            Timer::PerPath(path_id, PathTimer::DiscardPath),
705            now + 6 * pto,
706            self.qlog.with_time(now),
707        );
708        Ok(())
709    }
710
711    /// Gets the [`PathData`] for a known [`PathId`].
712    ///
713    /// Will panic if the path_id does not reference any known path.
714    #[track_caller]
715    fn path_data(&self, path_id: PathId) -> &PathData {
716        if let Some(data) = self.paths.get(&path_id) {
717            &data.data
718        } else {
719            panic!(
720                "unknown path: {path_id}, currently known paths: {:?}",
721                self.paths.keys().collect::<Vec<_>>()
722            );
723        }
724    }
725
726    /// Gets a reference to the [`PathData`] for a [`PathId`]
727    fn path(&self, path_id: PathId) -> Option<&PathData> {
728        self.paths.get(&path_id).map(|path_state| &path_state.data)
729    }
730
731    /// Gets a mutable reference to the [`PathData`] for a [`PathId`]
732    fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
733        self.paths
734            .get_mut(&path_id)
735            .map(|path_state| &mut path_state.data)
736    }
737
738    /// Returns all known paths.
739    ///
740    /// There is no guarantee any of these paths are open or usable.
741    pub fn paths(&self) -> Vec<PathId> {
742        self.paths.keys().copied().collect()
743    }
744
745    /// Gets the local [`PathStatus`] for a known [`PathId`]
746    pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
747        self.path(path_id)
748            .map(PathData::local_status)
749            .ok_or(ClosedPath { _private: () })
750    }
751
752    /// Returns the path's network path represented as a 4-tuple.
753    pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
754        self.path(path_id)
755            .map(|path| path.network_path)
756            .ok_or(ClosedPath { _private: () })
757    }
758
759    /// Sets the [`PathStatus`] for a known [`PathId`]
760    ///
761    /// Returns the previous path status on success.
762    pub fn set_path_status(
763        &mut self,
764        path_id: PathId,
765        status: PathStatus,
766    ) -> Result<PathStatus, SetPathStatusError> {
767        if !self.is_multipath_negotiated() {
768            return Err(SetPathStatusError::MultipathNotNegotiated);
769        }
770        let path = self
771            .path_mut(path_id)
772            .ok_or(SetPathStatusError::ClosedPath)?;
773        let prev = match path.status.local_update(status) {
774            Some(prev) => {
775                self.spaces[SpaceId::Data]
776                    .pending
777                    .path_status
778                    .insert(path_id);
779                prev
780            }
781            None => path.local_status(),
782        };
783        Ok(prev)
784    }
785
786    /// Returns the remote path status
787    // TODO(flub): Probably should also be some kind of path event?  Not even sure if I like
788    //    this as an API, but for now it allows me to write a test easily.
789    // TODO(flub): Technically this should be a Result<Option<PathSTatus>>?
790    pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
791        self.path(path_id).and_then(|path| path.remote_status())
792    }
793
794    /// Sets the max_idle_timeout for a specific path
795    ///
796    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
797    ///
798    /// Returns the previous value of the setting.
799    pub fn set_path_max_idle_timeout(
800        &mut self,
801        path_id: PathId,
802        timeout: Option<Duration>,
803    ) -> Result<Option<Duration>, ClosedPath> {
804        let path = self
805            .paths
806            .get_mut(&path_id)
807            .ok_or(ClosedPath { _private: () })?;
808        Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
809    }
810
811    /// Sets the keep_alive_interval for a specific path
812    ///
813    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
814    ///
815    /// Returns the previous value of the setting.
816    pub fn set_path_keep_alive_interval(
817        &mut self,
818        path_id: PathId,
819        interval: Option<Duration>,
820    ) -> Result<Option<Duration>, ClosedPath> {
821        let path = self
822            .paths
823            .get_mut(&path_id)
824            .ok_or(ClosedPath { _private: () })?;
825        Ok(std::mem::replace(&mut path.data.keep_alive, interval))
826    }
827
828    /// Gets the [`PathData`] for a known [`PathId`].
829    ///
830    /// Will panic if the path_id does not reference any known path.
831    #[track_caller]
832    fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
833        &mut self.paths.get_mut(&path_id).expect("known path").data
834    }
835
836    /// Find an open, validated path that's on the same network path as the given network path.
837    ///
838    /// Returns the first path matching, even if there's multiple.
839    fn find_validated_path_on_network_path(
840        &self,
841        network_path: FourTuple,
842    ) -> Option<(&PathId, &PathState)> {
843        self.paths.iter().find(|(path_id, path_state)| {
844            path_state.data.validated
845                // Would this use the same network path, if network_path were used to send right now?
846                && network_path.is_probably_same_path(&path_state.data.network_path)
847                && !self.abandoned_paths.contains(path_id)
848        })
849        // TODO(@divma): we might want to ensure the path has been recently active to consider the
850        // address validated
851        // matheus23: Perhaps looking at !self.abandoned_paths.contains(path_id) is enough, given keep-alives?
852    }
853
854    fn ensure_path(
855        &mut self,
856        path_id: PathId,
857        network_path: FourTuple,
858        now: Instant,
859        pn: Option<u64>,
860    ) -> &mut PathData {
861        let valid_path = self.find_validated_path_on_network_path(network_path);
862        let validated = valid_path.is_some();
863        let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
864        let vacant_entry = match self.paths.entry(path_id) {
865            btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
866            btree_map::Entry::Occupied(occupied_entry) => {
867                return &mut occupied_entry.into_mut().data;
868            }
869        };
870
871        debug!(%validated, %path_id, %network_path, "path added");
872        let peer_max_udp_payload_size =
873            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
874        self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
875        let mut data = PathData::new(
876            network_path,
877            self.allow_mtud,
878            Some(peer_max_udp_payload_size),
879            self.path_generation_counter,
880            now,
881            &self.config,
882        );
883
884        data.validated = validated;
885        if let Some(initial_rtt) = initial_rtt {
886            data.rtt.reset_initial_rtt(initial_rtt);
887        }
888
889        let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
890        self.timers.set(
891            Timer::PerPath(path_id, PathTimer::PathOpen),
892            now + 3 * pto,
893            self.qlog.with_time(now),
894        );
895
896        // for the path to be opened we need to send a packet on the path. Sending a challenge
897        // guarantees this
898        data.send_new_challenge = true;
899
900        let path = vacant_entry.insert(PathState { data, prev: None });
901
902        let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
903        if let Some(pn) = pn {
904            pn_space.dedup.insert(pn);
905        }
906        self.spaces[SpaceId::Data]
907            .number_spaces
908            .insert(path_id, pn_space);
909        self.qlog.emit_tuple_assigned(path_id, network_path, now);
910        &mut path.data
911    }
912
913    /// Returns packets to transmit
914    ///
915    /// Connections should be polled for transmit after:
916    /// - the application performed some I/O on the connection
917    /// - a call was made to `handle_event`
918    /// - a call was made to `handle_timeout`
919    ///
920    /// `max_datagrams` specifies how many datagrams can be returned inside a
921    /// single Transmit using GSO. This must be at least 1.
922    #[must_use]
923    pub fn poll_transmit(
924        &mut self,
925        now: Instant,
926        max_datagrams: NonZeroUsize,
927        buf: &mut Vec<u8>,
928    ) -> Option<Transmit> {
929        if let Some(probing) = self
930            .iroh_hp
931            .server_side_mut()
932            .ok()
933            .and_then(iroh_hp::ServerState::next_probe)
934        {
935            let destination = probing.remote();
936            trace!(%destination, "RAND_DATA packet");
937            let token: u64 = self.rng.random();
938            buf.put_u64(token);
939            probing.finish(token);
940            return Some(Transmit {
941                destination,
942                ecn: None,
943                size: 8,
944                segment_size: None,
945                src_ip: None,
946            });
947        }
948
949        let max_datagrams = match self.config.enable_segmentation_offload {
950            false => NonZeroUsize::MIN,
951            true => max_datagrams,
952        };
953
954        // Each call to poll_transmit can only send datagrams to one destination, because
955        // all datagrams in a GSO batch are for the same destination.  Therefore only
956        // datagrams for one Path ID are produced for each poll_transmit call.
957
958        // TODO(flub): this is wishful thinking and not actually implemented, but perhaps it
959        //   should be:
960
961        // First, if we have to send a close, select a path for that.
962        // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending.
963
964        // For all, open, validated and AVAILABLE paths:
965        // - Is the path congestion blocked or pacing blocked?
966        // - call maybe_queue_ to ensure a tail-loss probe would be sent?
967        // - do we need to send a close message?
968        // - call can_send
969        // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths
970
971        // Check whether we need to send a close message
972        let close = match self.state.as_type() {
973            StateType::Drained => {
974                self.app_limited = true;
975                return None;
976            }
977            StateType::Draining | StateType::Closed => {
978                // self.close is only reset once the associated packet had been
979                // encoded successfully
980                if !self.close {
981                    self.app_limited = true;
982                    return None;
983                }
984                true
985            }
986            _ => false,
987        };
988
989        // Check whether we need to send an ACK_FREQUENCY frame
990        if let Some(config) = &self.config.ack_frequency_config {
991            let rtt = self
992                .paths
993                .values()
994                .map(|p| p.data.rtt.get())
995                .min()
996                .expect("one path exists");
997            self.spaces[SpaceId::Data].pending.ack_frequency = self
998                .ack_frequency
999                .should_send_ack_frequency(rtt, config, &self.peer_params)
1000                && self.highest_space == SpaceId::Data
1001                && self.peer_supports_ack_frequency();
1002        }
1003
1004        // Whether this packet can be coalesced with another one in the same datagram.
1005        let mut coalesce = true;
1006
1007        // Whether the last packet in the datagram must be padded so the datagram takes up
1008        // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller.
1009        let mut pad_datagram = PadDatagram::No;
1010
1011        // Whether congestion control stopped the next packet from being sent. Further
1012        // packets could still be built, as e.g. tail-loss probes are not congestion
1013        // limited.
1014        let mut congestion_blocked = false;
1015
1016        // The packet number of the last built packet.
1017        let mut last_packet_number = None;
1018
1019        let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
1020
1021        // If there is any open, validated and available path we only want to send frames to
1022        // any backup path that must be sent on that backup path exclusively.
1023        let have_available_path = self.paths.iter().any(|(id, path)| {
1024            path.data.validated
1025                && path.data.local_status() == PathStatus::Available
1026                && self.rem_cids.contains_key(id)
1027        });
1028
1029        // Setup for the first path_id
1030        let mut transmit = TransmitBuf::new(
1031            buf,
1032            max_datagrams,
1033            self.path_data(path_id).current_mtu().into(),
1034        );
1035        if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1036            return Some(challenge);
1037        }
1038        let mut space_id = match path_id {
1039            PathId::ZERO => SpaceId::Initial,
1040            _ => SpaceId::Data,
1041        };
1042
1043        loop {
1044            // check if there is at least one active CID to use for sending
1045            let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1046                let err = PathError::RemoteCidsExhausted;
1047                if !self.abandoned_paths.contains(&path_id) {
1048                    debug!(?err, %path_id, "no active CID for path");
1049                    self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1050                        id: path_id,
1051                        error: err,
1052                    }));
1053                    // Locally we should have refused to open this path, the remote should
1054                    // have given us CIDs for this path before opening it.  So we can always
1055                    // abandon this here.
1056                    self.close_path(
1057                        now,
1058                        path_id,
1059                        TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1060                    )
1061                    .ok();
1062                    self.spaces[SpaceId::Data]
1063                        .pending
1064                        .path_cids_blocked
1065                        .insert(path_id);
1066                } else {
1067                    trace!(%path_id, "remote CIDs retired for abandoned path");
1068                }
1069
1070                match self.paths.keys().find(|&&next| next > path_id) {
1071                    Some(next_path_id) => {
1072                        // See if this next path can send anything.
1073                        path_id = *next_path_id;
1074                        space_id = SpaceId::Data;
1075
1076                        // update per path state
1077                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1078                        if let Some(challenge) =
1079                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1080                        {
1081                            return Some(challenge);
1082                        }
1083
1084                        continue;
1085                    }
1086                    None => {
1087                        // Nothing more to send.
1088                        trace!(
1089                            ?space_id,
1090                            %path_id,
1091                            "no CIDs to send on path, no more paths"
1092                        );
1093                        break;
1094                    }
1095                }
1096            };
1097
1098            // Determine if anything can be sent in this packet number space (SpaceId +
1099            // PathId).
1100            let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1101                // We are trying to coalesce another packet into this datagram.
1102                transmit.datagram_remaining_mut()
1103            } else {
1104                // A new datagram needs to be started.
1105                transmit.segment_size()
1106            };
1107            let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1108            let path_should_send = {
1109                let path_exclusive_only = space_id == SpaceId::Data
1110                    && have_available_path
1111                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1112                let path_should_send = if path_exclusive_only {
1113                    can_send.path_exclusive
1114                } else {
1115                    !can_send.is_empty()
1116                };
1117                let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1118                path_should_send || needs_loss_probe
1119            };
1120
1121            if !path_should_send && space_id < SpaceId::Data {
1122                if self.spaces[space_id].crypto.is_some() {
1123                    trace!(?space_id, %path_id, "nothing to send in space");
1124                }
1125                space_id = space_id.next();
1126                continue;
1127            }
1128
1129            let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1130                // Only check congestion control if a new datagram is needed.
1131                self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1132            } else {
1133                PathBlocked::No
1134            };
1135            if send_blocked != PathBlocked::No {
1136                trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1137                congestion_blocked = true;
1138            }
1139            if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1140                // Higher spaces might still have tail-loss probes to send, which are not
1141                // congestion blocked.
1142                space_id = space_id.next();
1143                continue;
1144            }
1145            if !path_should_send || send_blocked != PathBlocked::No {
1146                // Nothing more to send on this path, check the next path if possible.
1147
1148                // If there are any datagrams in the transmit, packets for another path can
1149                // not be built.
1150                if transmit.num_datagrams() > 0 {
1151                    break;
1152                }
1153
1154                match self.paths.keys().find(|&&next| next > path_id) {
1155                    Some(next_path_id) => {
1156                        // See if this next path can send anything.
1157                        trace!(
1158                            ?space_id,
1159                            %path_id,
1160                            %next_path_id,
1161                            "nothing to send on path"
1162                        );
1163                        path_id = *next_path_id;
1164                        space_id = SpaceId::Data;
1165
1166                        // update per path state
1167                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1168                        if let Some(challenge) =
1169                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1170                        {
1171                            return Some(challenge);
1172                        }
1173
1174                        continue;
1175                    }
1176                    None => {
1177                        // Nothing more to send.
1178                        trace!(
1179                            ?space_id,
1180                            %path_id,
1181                            next_path_id=?None::<PathId>,
1182                            "nothing to send on path"
1183                        );
1184                        break;
1185                    }
1186                }
1187            }
1188
1189            // If the datagram is full, we need to start a new one.
1190            if transmit.datagram_remaining_mut() == 0 {
1191                if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1192                    // No more datagrams allowed
1193                    break;
1194                }
1195
1196                match self.spaces[space_id].for_path(path_id).loss_probes {
1197                    0 => transmit.start_new_datagram(),
1198                    _ => {
1199                        // We need something to send for a tail-loss probe.
1200                        let request_immediate_ack =
1201                            space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1202                        self.spaces[space_id].maybe_queue_probe(
1203                            path_id,
1204                            request_immediate_ack,
1205                            &self.streams,
1206                        );
1207
1208                        self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1209
1210                        // Clamp the datagram to at most the minimum MTU to ensure that loss
1211                        // probes can get through and enable recovery even if the path MTU
1212                        // has shrank unexpectedly.
1213                        transmit.start_new_datagram_with_size(std::cmp::min(
1214                            usize::from(INITIAL_MTU),
1215                            transmit.segment_size(),
1216                        ));
1217                    }
1218                }
1219                trace!(count = transmit.num_datagrams(), "new datagram started");
1220                coalesce = true;
1221                pad_datagram = PadDatagram::No;
1222            }
1223
1224            // If coalescing another packet into the existing datagram, there should
1225            // still be enough space for a whole packet.
1226            if transmit.datagram_start_offset() < transmit.len() {
1227                debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1228            }
1229
1230            //
1231            // From here on, we've determined that a packet will definitely be sent.
1232            //
1233
1234            if self.spaces[SpaceId::Initial].crypto.is_some()
1235                && space_id == SpaceId::Handshake
1236                && self.side.is_client()
1237            {
1238                // A client stops both sending and processing Initial packets when it
1239                // sends its first Handshake packet.
1240                self.discard_space(now, SpaceId::Initial);
1241            }
1242            if let Some(ref mut prev) = self.prev_crypto {
1243                prev.update_unacked = false;
1244            }
1245
1246            let mut builder = PacketBuilder::new(
1247                now,
1248                space_id,
1249                path_id,
1250                remote_cid,
1251                &mut transmit,
1252                can_send.other,
1253                self,
1254            )?;
1255            last_packet_number = Some(builder.exact_number);
1256            coalesce = coalesce && !builder.short_header;
1257
1258            if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1259                // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1
1260                pad_datagram |= PadDatagram::ToMinMtu;
1261            }
1262            if space_id == SpaceId::Data && self.config.pad_to_mtu {
1263                pad_datagram |= PadDatagram::ToSegmentSize;
1264            }
1265
1266            if can_send.close {
1267                trace!("sending CONNECTION_CLOSE");
1268                // Encode ACKs before the ConnectionClose message, to give the receiver
1269                // a better approximate on what data has been processed. This is
1270                // especially important with ack delay, since the peer might not
1271                // have gotten any other ACK for the data earlier on.
1272                let is_multipath_negotiated = self.is_multipath_negotiated();
1273                for path_id in self.spaces[space_id]
1274                    .number_spaces
1275                    .iter()
1276                    .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1277                    .map(|(&path_id, _)| path_id)
1278                    .collect::<Vec<_>>()
1279                {
1280                    Self::populate_acks(
1281                        now,
1282                        self.receiving_ecn,
1283                        path_id,
1284                        space_id,
1285                        &mut self.spaces[space_id],
1286                        is_multipath_negotiated,
1287                        &mut builder,
1288                        &mut self.stats.frame_tx,
1289                    );
1290                }
1291
1292                // Since there only 64 ACK frames there will always be enough space
1293                // to encode the ConnectionClose frame too. However we still have the
1294                // check here to prevent crashes if something changes.
1295                debug_assert!(
1296                    builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1297                    "ACKs should leave space for ConnectionClose"
1298                );
1299                let stats = &mut self.stats.frame_tx;
1300                if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1301                    let max_frame_size = builder.frame_space_remaining();
1302                    let close: Close = match self.state.as_type() {
1303                        StateType::Closed => {
1304                            let reason: Close =
1305                                self.state.as_closed().expect("checked").clone().into();
1306                            if space_id == SpaceId::Data || reason.is_transport_layer() {
1307                                reason
1308                            } else {
1309                                TransportError::APPLICATION_ERROR("").into()
1310                            }
1311                        }
1312                        StateType::Draining => TransportError::NO_ERROR("").into(),
1313                        _ => unreachable!(
1314                            "tried to make a close packet when the connection wasn't closed"
1315                        ),
1316                    };
1317                    builder.write_frame(close.encoder(max_frame_size), stats);
1318                }
1319                builder.finish_and_track(now, self, path_id, pad_datagram);
1320                if space_id == self.highest_space {
1321                    // Don't send another close packet. Even with multipath we only send
1322                    // CONNECTION_CLOSE on a single path since we expect our paths to work.
1323                    self.close = false;
1324                    // `CONNECTION_CLOSE` is the final packet
1325                    break;
1326                } else {
1327                    // Send a close frame in every possible space for robustness, per
1328                    // RFC9000 "Immediate Close during the Handshake". Don't bother trying
1329                    // to send anything else.
1330                    space_id = space_id.next();
1331                    continue;
1332                }
1333            }
1334
1335            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that
1336            // path validation can occur while the link is saturated.
1337            if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1338                let path = self.path_data_mut(path_id);
1339                if let Some((token, network_path)) =
1340                    path.path_responses.pop_off_path(path.network_path)
1341                {
1342                    // TODO(flub): We need to use the right CID!  We shouldn't use the same
1343                    //    CID as the current active one for the path.  Though see also
1344                    //    https://github.com/quinn-rs/quinn/issues/2184
1345                    //
1346                    let stats = &mut self.stats.frame_tx;
1347                    let frame = frame::PathResponse(token);
1348                    builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
1349                    builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
1350                    self.stats.udp_tx.on_sent(1, transmit.len());
1351                    return Some(Transmit {
1352                        destination: network_path.remote,
1353                        size: transmit.len(),
1354                        ecn: None,
1355                        segment_size: None,
1356                        src_ip: network_path.local_ip,
1357                    });
1358                }
1359            }
1360
1361            let path_exclusive_only =
1362                have_available_path && self.path_data(path_id).local_status() == PathStatus::Backup;
1363            self.populate_packet(now, space_id, path_id, path_exclusive_only, &mut builder);
1364
1365            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
1366            // any other reason, there is a bug which leads to one component announcing write
1367            // readiness while not writing any data. This degrades performance. The condition is
1368            // only checked if the full MTU is available and when potentially large fixed-size
1369            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
1370            // writing ACKs.
1371            debug_assert!(
1372                !(builder.sent_frames().is_ack_only(&self.streams)
1373                    && !can_send.acks
1374                    && can_send.other
1375                    && builder.buf.segment_size()
1376                        == self.path_data(path_id).current_mtu() as usize
1377                    && self.datagrams.outgoing.is_empty()),
1378                "SendableFrames was {can_send:?}, but only ACKs have been written"
1379            );
1380            if builder.sent_frames().requires_padding {
1381                pad_datagram |= PadDatagram::ToMinMtu;
1382            }
1383
1384            for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1385                self.spaces[space_id]
1386                    .for_path(*path_id)
1387                    .pending_acks
1388                    .acks_sent();
1389                self.timers.stop(
1390                    Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1391                    self.qlog.with_time(now),
1392                );
1393            }
1394
1395            // Now we need to finish the packet.  Before we do so we need to know if we will
1396            // be coalescing the next packet into this one, or will be ending the datagram
1397            // as well.  Because if this is the last packet in the datagram more padding
1398            // might be needed because of the packet type, or to fill the GSO segment size.
1399
1400            // Are we allowed to coalesce AND is there enough space for another *packet* in
1401            // this datagram AND is there another packet to send in this or the next space?
1402            if coalesce
1403                && builder
1404                    .buf
1405                    .datagram_remaining_mut()
1406                    .saturating_sub(builder.predict_packet_end())
1407                    > MIN_PACKET_SPACE
1408                && self
1409                    .next_send_space(space_id, path_id, builder.buf, close)
1410                    .is_some()
1411            {
1412                // We can append/coalesce the next packet into the current
1413                // datagram. Finish the current packet without adding extra padding.
1414                builder.finish_and_track(now, self, path_id, PadDatagram::No);
1415            } else {
1416                // We need a new datagram for the next packet.  Finish the current
1417                // packet with padding.
1418                if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1419                    // If too many padding bytes would be required to continue the
1420                    // GSO batch after this packet, end the GSO batch here. Ensures
1421                    // that fixed-size frames with heterogeneous sizes
1422                    // (e.g. application datagrams) won't inadvertently waste large
1423                    // amounts of bandwidth. The exact threshold is a bit arbitrary
1424                    // and might benefit from further tuning, though there's no
1425                    // universally optimal value.
1426                    const MAX_PADDING: usize = 32;
1427                    if builder.buf.datagram_remaining_mut()
1428                        > builder.predict_packet_end() + MAX_PADDING
1429                    {
1430                        trace!(
1431                            "GSO truncated by demand for {} padding bytes",
1432                            builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1433                        );
1434                        builder.finish_and_track(now, self, path_id, PadDatagram::No);
1435                        break;
1436                    }
1437
1438                    // Pad the current datagram to GSO segment size so it can be
1439                    // included in the GSO batch.
1440                    builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1441                } else {
1442                    builder.finish_and_track(now, self, path_id, pad_datagram);
1443                }
1444                if transmit.num_datagrams() == 1 {
1445                    transmit.clip_datagram_size();
1446                }
1447            }
1448        }
1449
1450        if let Some(last_packet_number) = last_packet_number {
1451            // Note that when sending in multiple packet spaces the last packet number will
1452            // be the one from the highest packet space.
1453            self.path_data_mut(path_id).congestion.on_sent(
1454                now,
1455                transmit.len() as u64,
1456                last_packet_number,
1457            );
1458        }
1459
1460        self.qlog.emit_recovery_metrics(
1461            path_id,
1462            &mut self.paths.get_mut(&path_id).unwrap().data,
1463            now,
1464        );
1465
1466        self.app_limited = transmit.is_empty() && !congestion_blocked;
1467
1468        // Send MTU probe if necessary
1469        if transmit.is_empty() && self.state.is_established() {
1470            // MTU probing happens only in Data space.
1471            let space_id = SpaceId::Data;
1472            path_id = *self.paths.first_key_value().expect("one path must exist").0;
1473            let probe_data = loop {
1474                // We MTU probe all paths for which all of the following is true:
1475                // - We have an active destination CID for the path.
1476                // - The remote address *and* path are validated.
1477                // - The path is not abandoned.
1478                // - The MTU Discovery subsystem wants to probe the path.
1479                let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1480                let eligible = self.path_data(path_id).validated
1481                    && !self.path_data(path_id).is_validating_path()
1482                    && !self.abandoned_paths.contains(&path_id);
1483                let probe_size = eligible
1484                    .then(|| {
1485                        let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1486                        self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1487                    })
1488                    .flatten();
1489                match (active_cid, probe_size) {
1490                    (Some(active_cid), Some(probe_size)) => {
1491                        // Let's send an MTUD probe!
1492                        break Some((active_cid, probe_size));
1493                    }
1494                    _ => {
1495                        // Find the next path to check if it needs an MTUD probe.
1496                        match self.paths.keys().find(|&&next| next > path_id) {
1497                            Some(next) => {
1498                                path_id = *next;
1499                                continue;
1500                            }
1501                            None => break None,
1502                        }
1503                    }
1504                }
1505            };
1506            if let Some((active_cid, probe_size)) = probe_data {
1507                // We are definitely sending a DPLPMTUD probe.
1508                debug_assert_eq!(transmit.num_datagrams(), 0);
1509                transmit.start_new_datagram_with_size(probe_size as usize);
1510
1511                let mut builder = PacketBuilder::new(
1512                    now,
1513                    space_id,
1514                    path_id,
1515                    active_cid,
1516                    &mut transmit,
1517                    true,
1518                    self,
1519                )?;
1520
1521                // We implement MTU probes as ping packets padded up to the probe size
1522                trace!(?probe_size, "writing MTUD probe");
1523                builder.write_frame(frame::Ping, &mut self.stats.frame_tx);
1524
1525                // If supported by the peer, we want no delays to the probe's ACK
1526                if self.peer_supports_ack_frequency() {
1527                    builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx);
1528                }
1529
1530                builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1531
1532                self.path_stats
1533                    .entry(path_id)
1534                    .or_default()
1535                    .sent_plpmtud_probes += 1;
1536            }
1537        }
1538
1539        if transmit.is_empty() {
1540            return None;
1541        }
1542
1543        let network_path = self.path_data(path_id).network_path;
1544        trace!(
1545            segment_size = transmit.segment_size(),
1546            last_datagram_len = transmit.len() % transmit.segment_size(),
1547            %network_path,
1548            "sending {} bytes in {} datagrams",
1549            transmit.len(),
1550            transmit.num_datagrams()
1551        );
1552        self.path_data_mut(path_id)
1553            .inc_total_sent(transmit.len() as u64);
1554
1555        self.stats
1556            .udp_tx
1557            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1558
1559        Some(Transmit {
1560            destination: network_path.remote,
1561            size: transmit.len(),
1562            ecn: if self.path_data(path_id).sending_ecn {
1563                Some(EcnCodepoint::Ect0)
1564            } else {
1565                None
1566            },
1567            segment_size: match transmit.num_datagrams() {
1568                1 => None,
1569                _ => Some(transmit.segment_size()),
1570            },
1571            src_ip: network_path.local_ip,
1572        })
1573    }
1574
1575    /// Returns the [`SpaceId`] of the next packet space which has data to send
1576    ///
1577    /// This takes into account the space available to frames in the next datagram.
1578    // TODO(flub): This duplication is not nice.
1579    fn next_send_space(
1580        &mut self,
1581        current_space_id: SpaceId,
1582        path_id: PathId,
1583        buf: &TransmitBuf<'_>,
1584        close: bool,
1585    ) -> Option<SpaceId> {
1586        // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed
1587        // to be able to send an individual frame at least this large in the next 1-RTT
1588        // packet. This could be generalized to support every space, but it's only needed to
1589        // handle large fixed-size frames, which only exist in 1-RTT (application
1590        // datagrams). We don't account for coalesced packets potentially occupying space
1591        // because frames can always spill into the next datagram.
1592        let mut space_id = current_space_id;
1593        loop {
1594            let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1595            if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1596                return Some(space_id);
1597            }
1598            space_id = match space_id {
1599                SpaceId::Initial => SpaceId::Handshake,
1600                SpaceId::Handshake => SpaceId::Data,
1601                SpaceId::Data => break,
1602            }
1603        }
1604        None
1605    }
1606
1607    /// Checks if creating a new datagram would be blocked by congestion control
1608    fn path_congestion_check(
1609        &mut self,
1610        space_id: SpaceId,
1611        path_id: PathId,
1612        transmit: &TransmitBuf<'_>,
1613        can_send: &SendableFrames,
1614        now: Instant,
1615    ) -> PathBlocked {
1616        // Anti-amplification is only based on `total_sent`, which gets updated after
1617        // the transmit is sent. Therefore we pass the amount of bytes for datagrams
1618        // that are already created, as well as 1 byte for starting another datagram. If
1619        // there is any anti-amplification budget left, we always allow a full MTU to be
1620        // sent (see https://github.com/quinn-rs/quinn/issues/1082).
1621        if self.side().is_server()
1622            && self
1623                .path_data(path_id)
1624                .anti_amplification_blocked(transmit.len() as u64 + 1)
1625        {
1626            trace!(?space_id, %path_id, "blocked by anti-amplification");
1627            return PathBlocked::AntiAmplification;
1628        }
1629
1630        // Congestion control check.
1631        // Tail loss probes must not be blocked by congestion, or a deadlock could arise.
1632        let bytes_to_send = transmit.segment_size() as u64;
1633        let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1634
1635        if can_send.other && !need_loss_probe && !can_send.close {
1636            let path = self.path_data(path_id);
1637            if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1638                trace!(?space_id, %path_id, "blocked by congestion control");
1639                return PathBlocked::Congestion;
1640            }
1641        }
1642
1643        // Pacing check.
1644        if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1645            self.timers.set(
1646                Timer::PerPath(path_id, PathTimer::Pacing),
1647                delay,
1648                self.qlog.with_time(now),
1649            );
1650            // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though
1651            // they are not congestion controlled.
1652            trace!(?space_id, %path_id, "blocked by pacing");
1653            return PathBlocked::Pacing;
1654        }
1655
1656        PathBlocked::No
1657    }
1658
1659    /// Send PATH_CHALLENGE for a previous path if necessary
1660    ///
1661    /// QUIC-TRANSPORT section 9.3.3
1662    /// <https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding>
1663    fn send_prev_path_challenge(
1664        &mut self,
1665        now: Instant,
1666        buf: &mut TransmitBuf<'_>,
1667        path_id: PathId,
1668    ) -> Option<Transmit> {
1669        let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1670        // TODO (matheus23): We could use !prev_path.is_validating() here instead to
1671        // (possibly) also re-send challenges when they get lost.
1672        if !prev_path.send_new_challenge {
1673            return None;
1674        };
1675        prev_path.send_new_challenge = false;
1676        let network_path = prev_path.network_path;
1677        let token = self.rng.random();
1678        let info = paths::SentChallengeInfo {
1679            sent_instant: now,
1680            network_path,
1681        };
1682        prev_path.challenges_sent.insert(token, info);
1683        debug_assert_eq!(
1684            self.highest_space,
1685            SpaceId::Data,
1686            "PATH_CHALLENGE queued without 1-RTT keys"
1687        );
1688        buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1689
1690        // Use the previous CID to avoid linking the new path with the previous path. We
1691        // don't bother accounting for possible retirement of that prev_cid because this is
1692        // sent once, immediately after migration, when the CID is known to be valid. Even
1693        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1694        // this is sent first.
1695        debug_assert_eq!(buf.datagram_start_offset(), 0);
1696        let mut builder =
1697            PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, false, self)?;
1698        let challenge = frame::PathChallenge(token);
1699        let stats = &mut self.stats.frame_tx;
1700        builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1701
1702        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1703        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1704        // unless the anti-amplification limit for the path does not permit
1705        // sending a datagram of this size
1706        builder.pad_to(MIN_INITIAL_SIZE);
1707
1708        builder.finish(self, now);
1709        self.stats.udp_tx.on_sent(1, buf.len());
1710
1711        Some(Transmit {
1712            destination: network_path.remote,
1713            size: buf.len(),
1714            ecn: None,
1715            segment_size: None,
1716            src_ip: network_path.local_ip,
1717        })
1718    }
1719
1720    /// Indicate what types of frames are ready to send for the given space
1721    ///
1722    /// *packet_size* is the number of bytes available to build the next packet.  *close*
1723    /// *indicates whether a CONNECTION_CLOSE frame needs to be sent.
1724    fn space_can_send(
1725        &mut self,
1726        space_id: SpaceId,
1727        path_id: PathId,
1728        packet_size: usize,
1729        close: bool,
1730    ) -> SendableFrames {
1731        let pn = self.spaces[SpaceId::Data]
1732            .for_path(path_id)
1733            .peek_tx_number();
1734        let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1735        if self.spaces[space_id].crypto.is_none()
1736            && (space_id != SpaceId::Data
1737                || self.zero_rtt_crypto.is_none()
1738                || self.side.is_server())
1739        {
1740            // No keys available for this space
1741            return SendableFrames::empty();
1742        }
1743        let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1744        if space_id == SpaceId::Data {
1745            can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1746        }
1747
1748        can_send.close = close && self.spaces[space_id].crypto.is_some();
1749
1750        can_send
1751    }
1752
1753    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1754    ///
1755    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1756    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1757    /// extracted through the relevant methods.
1758    pub fn handle_event(&mut self, event: ConnectionEvent) {
1759        use ConnectionEventInner::*;
1760        match event.0 {
1761            Datagram(DatagramConnectionEvent {
1762                now,
1763                network_path,
1764                path_id,
1765                ecn,
1766                first_decode,
1767                remaining,
1768            }) => {
1769                let span = trace_span!("pkt", %path_id);
1770                let _guard = span.enter();
1771
1772                if self.update_network_path_or_discard(network_path, path_id) {
1773                    // A return value of true indicates we should discard this packet.
1774                    return;
1775                }
1776
1777                let was_anti_amplification_blocked = self
1778                    .path(path_id)
1779                    .map(|path| path.anti_amplification_blocked(1))
1780                    .unwrap_or(true); // if we don't know about this path it's eagerly considered as unvalidated
1781                // TODO(@divma): revisit this
1782
1783                self.stats.udp_rx.datagrams += 1;
1784                self.stats.udp_rx.bytes += first_decode.len() as u64;
1785                let data_len = first_decode.len();
1786
1787                self.handle_decode(now, network_path, path_id, ecn, first_decode);
1788                // The current `path` might have changed inside `handle_decode` since the packet
1789                // could have triggered a migration. The packet might also belong to an unknown
1790                // path and have been rejected. Make sure the data received is accounted for the
1791                // most recent path by accessing `path` after `handle_decode`.
1792                if let Some(path) = self.path_mut(path_id) {
1793                    path.inc_total_recvd(data_len as u64);
1794                }
1795
1796                if let Some(data) = remaining {
1797                    self.stats.udp_rx.bytes += data.len() as u64;
1798                    self.handle_coalesced(now, network_path, path_id, ecn, data);
1799                }
1800
1801                if let Some(path) = self.paths.get_mut(&path_id) {
1802                    self.qlog
1803                        .emit_recovery_metrics(path_id, &mut path.data, now);
1804                }
1805
1806                if was_anti_amplification_blocked {
1807                    // A prior attempt to set the loss detection timer may have failed due to
1808                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1809                    // the server's first flight is lost.
1810                    self.set_loss_detection_timer(now, path_id);
1811                }
1812            }
1813            NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1814                let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1815                debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1816                let cid_state = self
1817                    .local_cid_state
1818                    .entry(path_id)
1819                    .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1820                cid_state.new_cids(&ids, now);
1821
1822                ids.into_iter().rev().for_each(|frame| {
1823                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1824                });
1825                // Always update Timer::PushNewCid
1826                self.reset_cid_retirement(now);
1827            }
1828        }
1829    }
1830
1831    /// Updates the network path for `path_id`.
1832    ///
1833    /// Returns true if a packet coming in for this `path_id` over given `network_path` should be discarded.
1834    /// Returns false if the path was updated and the packet doesn't need to be discarded.
1835    fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
1836        let remote_may_migrate = self.side.remote_may_migrate(&self.state);
1837        let local_ip_may_migrate = self.side.is_client();
1838        // If this packet could initiate a migration and we're a client or a server that
1839        // forbids migration, drop the datagram. This could be relaxed to heuristically
1840        // permit NAT-rebinding-like migration.
1841        if let Some(known_path) = self.path_mut(path_id) {
1842            if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
1843                trace!(
1844                    %path_id,
1845                    %network_path,
1846                    %known_path.network_path,
1847                    "discarding packet from unrecognized peer"
1848                );
1849                return true;
1850            }
1851
1852            if known_path.network_path.local_ip.is_some()
1853                && network_path.local_ip.is_some()
1854                && known_path.network_path.local_ip != network_path.local_ip
1855                && !local_ip_may_migrate
1856            {
1857                trace!(
1858                    %path_id,
1859                    %network_path,
1860                    %known_path.network_path,
1861                    "discarding packet sent to incorrect interface"
1862                );
1863                return true;
1864            }
1865            // If the datagram indicates that we've changed our local IP, we update it.
1866            // This is alluded to in Section 5.2 of the Multipath RFC draft 18:
1867            // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#name-using-multiple-paths-on-the
1868            // > Client receives the packet, recognizes a path migration, updates the source address of path 2 to 192.0.2.1.
1869            if let Some(local_ip) = network_path.local_ip {
1870                if known_path
1871                    .network_path
1872                    .local_ip
1873                    .is_some_and(|ip| ip != local_ip)
1874                {
1875                    debug!(
1876                        %path_id,
1877                        %network_path,
1878                        %known_path.network_path,
1879                        "path's local address seemingly migrated"
1880                    );
1881                }
1882                // We update the address without path validation on the client side.
1883                // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#section-5.1
1884                // > Servers observing a 4-tuple change will perform path validation (see Section 9 of [QUIC-TRANSPORT]).
1885                // This sounds like it's *only* the server endpoints that do this.
1886                // TODO(matheus23): We should still consider doing a proper migration on the client side in the future.
1887                // For now, this preserves the behavior of this code pre 4-tuple tracking.
1888                known_path.network_path.local_ip = Some(local_ip);
1889            }
1890        }
1891        false
1892    }
1893
1894    /// Process timer expirations
1895    ///
1896    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1897    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1898    /// methods.
1899    ///
1900    /// It is most efficient to call this immediately after the system clock reaches the latest
1901    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1902    /// no-op and therefore are safe.
1903    pub fn handle_timeout(&mut self, now: Instant) {
1904        while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1905            // TODO(@divma): remove `at` when the unicorn is born
1906            trace!(?timer, at=?now, "timeout");
1907            match timer {
1908                Timer::Conn(timer) => match timer {
1909                    ConnTimer::Close => {
1910                        self.state.move_to_drained(None);
1911                        self.endpoint_events.push_back(EndpointEventInner::Drained);
1912                    }
1913                    ConnTimer::Idle => {
1914                        self.kill(ConnectionError::TimedOut);
1915                    }
1916                    ConnTimer::KeepAlive => {
1917                        trace!("sending keep-alive");
1918                        self.ping();
1919                    }
1920                    ConnTimer::KeyDiscard => {
1921                        self.zero_rtt_crypto = None;
1922                        self.prev_crypto = None;
1923                    }
1924                    ConnTimer::PushNewCid => {
1925                        while let Some((path_id, when)) = self.next_cid_retirement() {
1926                            if when > now {
1927                                break;
1928                            }
1929                            match self.local_cid_state.get_mut(&path_id) {
1930                                None => error!(%path_id, "No local CID state for path"),
1931                                Some(cid_state) => {
1932                                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1933                                    let num_new_cid = cid_state.on_cid_timeout().into();
1934                                    if !self.state.is_closed() {
1935                                        trace!(
1936                                            "push a new CID to peer RETIRE_PRIOR_TO field {}",
1937                                            cid_state.retire_prior_to()
1938                                        );
1939                                        self.endpoint_events.push_back(
1940                                            EndpointEventInner::NeedIdentifiers(
1941                                                path_id,
1942                                                now,
1943                                                num_new_cid,
1944                                            ),
1945                                        );
1946                                    }
1947                                }
1948                            }
1949                        }
1950                    }
1951                },
1952                // TODO: add path_id as span somehow
1953                Timer::PerPath(path_id, timer) => {
1954                    let span = trace_span!("per-path timer fired", %path_id, ?timer);
1955                    let _guard = span.enter();
1956                    match timer {
1957                        PathTimer::PathIdle => {
1958                            self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1959                                .ok();
1960                        }
1961
1962                        PathTimer::PathKeepAlive => {
1963                            trace!("sending keep-alive on path");
1964                            self.ping_path(path_id).ok();
1965                        }
1966                        PathTimer::LossDetection => {
1967                            self.on_loss_detection_timeout(now, path_id);
1968                            self.qlog.emit_recovery_metrics(
1969                                path_id,
1970                                &mut self.paths.get_mut(&path_id).unwrap().data,
1971                                now,
1972                            );
1973                        }
1974                        PathTimer::PathValidation => {
1975                            let Some(path) = self.paths.get_mut(&path_id) else {
1976                                continue;
1977                            };
1978                            self.timers.stop(
1979                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1980                                self.qlog.with_time(now),
1981                            );
1982                            debug!("path validation failed");
1983                            if let Some((_, prev)) = path.prev.take() {
1984                                path.data = prev;
1985                            }
1986                            path.data.challenges_sent.clear();
1987                            path.data.send_new_challenge = false;
1988                        }
1989                        PathTimer::PathChallengeLost => {
1990                            let Some(path) = self.paths.get_mut(&path_id) else {
1991                                continue;
1992                            };
1993                            trace!("path challenge deemed lost");
1994                            path.data.send_new_challenge = true;
1995                        }
1996                        PathTimer::PathOpen => {
1997                            let Some(path) = self.paths.get_mut(&path_id) else {
1998                                continue;
1999                            };
2000                            path.data.challenges_sent.clear();
2001                            path.data.send_new_challenge = false;
2002                            self.timers.stop(
2003                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2004                                self.qlog.with_time(now),
2005                            );
2006                            debug!("new path validation failed");
2007                            if let Err(err) = self.close_path(
2008                                now,
2009                                path_id,
2010                                TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2011                            ) {
2012                                warn!(?err, "failed closing path");
2013                            }
2014
2015                            self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2016                                id: path_id,
2017                                error: PathError::ValidationFailed,
2018                            }));
2019                        }
2020                        PathTimer::Pacing => trace!("pacing timer expired"),
2021                        PathTimer::MaxAckDelay => {
2022                            trace!("max ack delay reached");
2023                            // This timer is only armed in the Data space
2024                            self.spaces[SpaceId::Data]
2025                                .for_path(path_id)
2026                                .pending_acks
2027                                .on_max_ack_delay_timeout()
2028                        }
2029                        PathTimer::DiscardPath => {
2030                            // The path was abandoned and 3*PTO has expired since.  Clean up all
2031                            // remaining state and install stateless reset token.
2032                            self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2033                            if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2034                                let (min_seq, max_seq) = loc_cid_state.active_seq();
2035                                for seq in min_seq..=max_seq {
2036                                    self.endpoint_events.push_back(
2037                                        EndpointEventInner::RetireConnectionId(
2038                                            now, path_id, seq, false,
2039                                        ),
2040                                    );
2041                                }
2042                            }
2043                            self.discard_path(path_id, now);
2044                        }
2045                    }
2046                }
2047            }
2048        }
2049    }
2050
2051    /// Close a connection immediately
2052    ///
2053    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
2054    /// call this only when all important communications have been completed, e.g. by calling
2055    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
2056    /// [`StreamEvent::Finished`] event.
2057    ///
2058    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
2059    /// delivered. There may still be data from the peer that has not been received.
2060    ///
2061    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
2062    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2063        self.close_inner(
2064            now,
2065            Close::Application(frame::ApplicationClose { error_code, reason }),
2066        )
2067    }
2068
2069    fn close_inner(&mut self, now: Instant, reason: Close) {
2070        let was_closed = self.state.is_closed();
2071        if !was_closed {
2072            self.close_common();
2073            self.set_close_timer(now);
2074            self.close = true;
2075            self.state.move_to_closed_local(reason);
2076        }
2077    }
2078
2079    /// Control datagrams
2080    pub fn datagrams(&mut self) -> Datagrams<'_> {
2081        Datagrams { conn: self }
2082    }
2083
2084    /// Returns connection statistics
2085    pub fn stats(&mut self) -> ConnectionStats {
2086        self.stats.clone()
2087    }
2088
2089    /// Returns path statistics
2090    pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2091        let path = self.paths.get(&path_id)?;
2092        let stats = self.path_stats.entry(path_id).or_default();
2093        stats.rtt = path.data.rtt.get();
2094        stats.cwnd = path.data.congestion.window();
2095        stats.current_mtu = path.data.mtud.current_mtu();
2096        Some(*stats)
2097    }
2098
2099    /// Ping the remote endpoint
2100    ///
2101    /// Causes an ACK-eliciting packet to be transmitted on the connection.
2102    pub fn ping(&mut self) {
2103        // TODO(flub): This is very brute-force: it pings *all* the paths.  Instead it would
2104        //    be nice if we could only send a single packet for this.
2105        for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2106            path_data.ping_pending = true;
2107        }
2108    }
2109
2110    /// Ping the remote endpoint over a specific path
2111    ///
2112    /// Causes an ACK-eliciting packet to be transmitted on the path.
2113    pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2114        let path_data = self.spaces[self.highest_space]
2115            .number_spaces
2116            .get_mut(&path)
2117            .ok_or(ClosedPath { _private: () })?;
2118        path_data.ping_pending = true;
2119        Ok(())
2120    }
2121
2122    /// Update traffic keys spontaneously
2123    ///
2124    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
2125    pub fn force_key_update(&mut self) {
2126        if !self.state.is_established() {
2127            debug!("ignoring forced key update in illegal state");
2128            return;
2129        }
2130        if self.prev_crypto.is_some() {
2131            // We already just updated, or are currently updating, the keys. Concurrent key updates
2132            // are illegal.
2133            debug!("ignoring redundant forced key update");
2134            return;
2135        }
2136        self.update_keys(None, false);
2137    }
2138
2139    /// Get a session reference
2140    pub fn crypto_session(&self) -> &dyn crypto::Session {
2141        &*self.crypto
2142    }
2143
2144    /// Whether the connection is in the process of being established
2145    ///
2146    /// If this returns `false`, the connection may be either established or closed, signaled by the
2147    /// emission of a `Connected` or `ConnectionLost` message respectively.
2148    pub fn is_handshaking(&self) -> bool {
2149        self.state.is_handshake()
2150    }
2151
2152    /// Whether the connection is closed
2153    ///
2154    /// Closed connections cannot transport any further data. A connection becomes closed when
2155    /// either peer application intentionally closes it, or when either transport layer detects an
2156    /// error such as a time-out or certificate validation failure.
2157    ///
2158    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
2159    pub fn is_closed(&self) -> bool {
2160        self.state.is_closed()
2161    }
2162
2163    /// Whether there is no longer any need to keep the connection around
2164    ///
2165    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
2166    /// packets from the peer. All drained connections have been closed.
2167    pub fn is_drained(&self) -> bool {
2168        self.state.is_drained()
2169    }
2170
2171    /// For clients, if the peer accepted the 0-RTT data packets
2172    ///
2173    /// The value is meaningless until after the handshake completes.
2174    pub fn accepted_0rtt(&self) -> bool {
2175        self.accepted_0rtt
2176    }
2177
2178    /// Whether 0-RTT is/was possible during the handshake
2179    pub fn has_0rtt(&self) -> bool {
2180        self.zero_rtt_enabled
2181    }
2182
2183    /// Whether there are any pending retransmits
2184    pub fn has_pending_retransmits(&self) -> bool {
2185        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2186    }
2187
2188    /// Look up whether we're the client or server of this Connection
2189    pub fn side(&self) -> Side {
2190        self.side.side()
2191    }
2192
2193    /// Get the address observed by the remote over the given path
2194    pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2195        self.path(path_id)
2196            .map(|path_data| {
2197                path_data
2198                    .last_observed_addr_report
2199                    .as_ref()
2200                    .map(|observed| observed.socket_addr())
2201            })
2202            .ok_or(ClosedPath { _private: () })
2203    }
2204
2205    /// Current best estimate of this connection's latency (round-trip-time)
2206    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2207        self.path(path_id).map(|d| d.rtt.get())
2208    }
2209
2210    /// Current state of this connection's congestion controller, for debugging purposes
2211    pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2212        self.path(path_id).map(|d| d.congestion.as_ref())
2213    }
2214
2215    /// Modify the number of remotely initiated streams that may be concurrently open
2216    ///
2217    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
2218    /// `count`s increase both minimum and worst-case memory consumption.
2219    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2220        self.streams.set_max_concurrent(dir, count);
2221        // If the limit was reduced, then a flow control update previously deemed insignificant may
2222        // now be significant.
2223        let pending = &mut self.spaces[SpaceId::Data].pending;
2224        self.streams.queue_max_stream_id(pending);
2225    }
2226
2227    /// Modify the number of open paths allowed when multipath is enabled
2228    ///
2229    /// When reducing the number of concurrent paths this will only affect delaying sending
2230    /// new MAX_PATH_ID frames until fewer than this number of paths are possible.  To
2231    /// actively reduce paths they must be closed using [`Connection::close_path`], which
2232    /// can also be used to close not-yet-opened paths.
2233    ///
2234    /// If multipath is not negotiated (see the [`TransportConfig`]) this can not enable
2235    /// multipath and will fail.
2236    pub fn set_max_concurrent_paths(
2237        &mut self,
2238        now: Instant,
2239        count: NonZeroU32,
2240    ) -> Result<(), MultipathNotNegotiated> {
2241        if !self.is_multipath_negotiated() {
2242            return Err(MultipathNotNegotiated { _private: () });
2243        }
2244        self.max_concurrent_paths = count;
2245
2246        let in_use_count = self
2247            .local_max_path_id
2248            .next()
2249            .saturating_sub(self.abandoned_paths.len() as u32)
2250            .as_u32();
2251        let extra_needed = count.get().saturating_sub(in_use_count);
2252        let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2253
2254        self.set_max_path_id(now, new_max_path_id);
2255
2256        Ok(())
2257    }
2258
2259    /// If needed, issues a new MAX_PATH_ID frame and new CIDs for any newly allowed paths
2260    fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2261        if max_path_id <= self.local_max_path_id {
2262            return;
2263        }
2264
2265        self.local_max_path_id = max_path_id;
2266        self.spaces[SpaceId::Data].pending.max_path_id = true;
2267
2268        self.issue_first_path_cids(now);
2269    }
2270
2271    /// Current number of remotely initiated streams that may be concurrently open
2272    ///
2273    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
2274    /// it will not change immediately, even if fewer streams are open. Instead, it will
2275    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
2276    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2277        self.streams.max_concurrent(dir)
2278    }
2279
2280    /// See [`TransportConfig::send_window()`]
2281    pub fn set_send_window(&mut self, send_window: u64) {
2282        self.streams.set_send_window(send_window);
2283    }
2284
2285    /// See [`TransportConfig::receive_window()`]
2286    pub fn set_receive_window(&mut self, receive_window: VarInt) {
2287        if self.streams.set_receive_window(receive_window) {
2288            self.spaces[SpaceId::Data].pending.max_data = true;
2289        }
2290    }
2291
2292    /// Whether the Multipath for QUIC extension is enabled.
2293    ///
2294    /// Multipath is only enabled after the handshake is completed and if it was enabled by both
2295    /// peers.
2296    pub fn is_multipath_negotiated(&self) -> bool {
2297        !self.is_handshaking()
2298            && self.config.max_concurrent_multipath_paths.is_some()
2299            && self.peer_params.initial_max_path_id.is_some()
2300    }
2301
2302    fn on_ack_received(
2303        &mut self,
2304        now: Instant,
2305        space: SpaceId,
2306        ack: frame::Ack,
2307    ) -> Result<(), TransportError> {
2308        // All ACKs are referencing path 0
2309        let path = PathId::ZERO;
2310        self.inner_on_ack_received(now, space, path, ack)
2311    }
2312
2313    fn on_path_ack_received(
2314        &mut self,
2315        now: Instant,
2316        space: SpaceId,
2317        path_ack: frame::PathAck,
2318    ) -> Result<(), TransportError> {
2319        let (ack, path) = path_ack.into_ack();
2320        self.inner_on_ack_received(now, space, path, ack)
2321    }
2322
2323    /// Handles an ACK frame acknowledging packets sent on *path*.
2324    fn inner_on_ack_received(
2325        &mut self,
2326        now: Instant,
2327        space: SpaceId,
2328        path: PathId,
2329        ack: frame::Ack,
2330    ) -> Result<(), TransportError> {
2331        if self.abandoned_paths.contains(&path) {
2332            // See also https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.4.3-3
2333            // > PATH_ACK frames received with an abandoned path ID are silently ignored, as specified in Section 4.
2334            trace!("silently ignoring PATH_ACK on abandoned path");
2335            return Ok(());
2336        }
2337        if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2338            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2339        }
2340        let new_largest = {
2341            let space = &mut self.spaces[space].for_path(path);
2342            if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2343                space.largest_acked_packet = Some(ack.largest);
2344                if let Some(info) = space.sent_packets.get(ack.largest) {
2345                    // This should always succeed, but a misbehaving peer might ACK a packet we
2346                    // haven't sent. At worst, that will result in us spuriously reducing the
2347                    // congestion window.
2348                    space.largest_acked_packet_sent = info.time_sent;
2349                }
2350                true
2351            } else {
2352                false
2353            }
2354        };
2355
2356        if self.detect_spurious_loss(&ack, space, path) {
2357            self.path_data_mut(path)
2358                .congestion
2359                .on_spurious_congestion_event();
2360        }
2361
2362        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
2363        let mut newly_acked = ArrayRangeSet::new();
2364        for range in ack.iter() {
2365            self.spaces[space].for_path(path).check_ack(range.clone())?;
2366            for (pn, _) in self.spaces[space]
2367                .for_path(path)
2368                .sent_packets
2369                .iter_range(range)
2370            {
2371                newly_acked.insert_one(pn);
2372            }
2373        }
2374
2375        if newly_acked.is_empty() {
2376            return Ok(());
2377        }
2378
2379        let mut ack_eliciting_acked = false;
2380        for packet in newly_acked.elts() {
2381            if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2382                for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2383                    // Assume ACKs for all packets below the largest acknowledged in
2384                    // `packet` have been received. This can cause the peer to spuriously
2385                    // retransmit if some of our earlier ACKs were lost, but allows for
2386                    // simpler state tracking. See discussion at
2387                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
2388                    if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2389                        pns.pending_acks.subtract_below(*acked_pn);
2390                    }
2391                }
2392                ack_eliciting_acked |= info.ack_eliciting;
2393
2394                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
2395                let path_data = self.path_data_mut(path);
2396                let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2397                if mtu_updated {
2398                    path_data
2399                        .congestion
2400                        .on_mtu_update(path_data.mtud.current_mtu());
2401                }
2402
2403                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
2404                self.ack_frequency.on_acked(path, packet);
2405
2406                self.on_packet_acked(now, path, info);
2407            }
2408        }
2409
2410        let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2411        let app_limited = self.app_limited;
2412        let path_data = self.path_data_mut(path);
2413        let in_flight = path_data.in_flight.bytes;
2414
2415        path_data
2416            .congestion
2417            .on_end_acks(now, in_flight, app_limited, largest_ackd);
2418
2419        if new_largest && ack_eliciting_acked {
2420            let ack_delay = if space != SpaceId::Data {
2421                Duration::from_micros(0)
2422            } else {
2423                cmp::min(
2424                    self.ack_frequency.peer_max_ack_delay,
2425                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2426                )
2427            };
2428            let rtt = now.saturating_duration_since(
2429                self.spaces[space].for_path(path).largest_acked_packet_sent,
2430            );
2431
2432            let next_pn = self.spaces[space].for_path(path).next_packet_number;
2433            let path_data = self.path_data_mut(path);
2434            // TODO(@divma): should be a method of path, should be contained in a single place
2435            path_data.rtt.update(ack_delay, rtt);
2436            if path_data.first_packet_after_rtt_sample.is_none() {
2437                path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2438            }
2439        }
2440
2441        // Must be called before crypto/pto_count are clobbered
2442        self.detect_lost_packets(now, space, path, true);
2443
2444        if self.peer_completed_address_validation(path) {
2445            self.path_data_mut(path).pto_count = 0;
2446        }
2447
2448        // Explicit congestion notification
2449        // TODO(@divma): this code is a good example of logic that should be contained in a single
2450        // place but it's split between the path data and the packet number space data, we should
2451        // find a way to make this work without two lookups
2452        if self.path_data(path).sending_ecn {
2453            if let Some(ecn) = ack.ecn {
2454                // We only examine ECN counters from ACKs that we are certain we received in transmit
2455                // order, allowing us to compute an increase in ECN counts to compare against the number
2456                // of newly acked packets that remains well-defined in the presence of arbitrary packet
2457                // reordering.
2458                if new_largest {
2459                    let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2460                    self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2461                }
2462            } else {
2463                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
2464                debug!("ECN not acknowledged by peer");
2465                self.path_data_mut(path).sending_ecn = false;
2466            }
2467        }
2468
2469        self.set_loss_detection_timer(now, path);
2470        Ok(())
2471    }
2472
2473    fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2474        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2475
2476        if lost_packets.is_empty() {
2477            return false;
2478        }
2479
2480        for range in ack.iter() {
2481            let spurious_losses: Vec<u64> = lost_packets
2482                .iter_range(range.clone())
2483                .map(|(pn, _info)| pn)
2484                .collect();
2485
2486            for pn in spurious_losses {
2487                lost_packets.remove(pn);
2488            }
2489        }
2490
2491        // If this ACK frame acknowledged all deemed lost packets,
2492        // then we have raised a spurious congestion event in the past.
2493        // We cannot conclude when there are remaining packets,
2494        // but future ACK frames might indicate a spurious loss detection.
2495        lost_packets.is_empty()
2496    }
2497
2498    /// Drain lost packets that we reasonably think will never arrive
2499    ///
2500    /// The current criterion is copied from `msquic`:
2501    /// discard packets that were sent earlier than 2 probe timeouts ago.
2502    fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2503        let two_pto = 2 * self.path_data(path).rtt.pto_base();
2504
2505        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2506        lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2507    }
2508
2509    /// Process a new ECN block from an in-order ACK
2510    fn process_ecn(
2511        &mut self,
2512        now: Instant,
2513        space: SpaceId,
2514        path: PathId,
2515        newly_acked: u64,
2516        ecn: frame::EcnCounts,
2517        largest_sent_time: Instant,
2518    ) {
2519        match self.spaces[space]
2520            .for_path(path)
2521            .detect_ecn(newly_acked, ecn)
2522        {
2523            Err(e) => {
2524                debug!("halting ECN due to verification failure: {}", e);
2525
2526                self.path_data_mut(path).sending_ecn = false;
2527                // Wipe out the existing value because it might be garbage and could interfere with
2528                // future attempts to use ECN on new paths.
2529                self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2530            }
2531            Ok(false) => {}
2532            Ok(true) => {
2533                self.path_stats.entry(path).or_default().congestion_events += 1;
2534                self.path_data_mut(path).congestion.on_congestion_event(
2535                    now,
2536                    largest_sent_time,
2537                    false,
2538                    true,
2539                    0,
2540                );
2541            }
2542        }
2543    }
2544
2545    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
2546    // high-latency handshakes
2547    fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2548        self.paths
2549            .get_mut(&path_id)
2550            .expect("known path")
2551            .remove_in_flight(&info);
2552        let app_limited = self.app_limited;
2553        let path = self.path_data_mut(path_id);
2554        if info.ack_eliciting && !path.is_validating_path() {
2555            // Only pass ACKs to the congestion controller if we are not validating the current
2556            // path, so as to ignore any ACKs from older paths still coming in.
2557            let rtt = path.rtt;
2558            path.congestion
2559                .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2560        }
2561
2562        // Update state for confirmed delivery of frames
2563        if let Some(retransmits) = info.retransmits.get() {
2564            for (id, _) in retransmits.reset_stream.iter() {
2565                self.streams.reset_acked(*id);
2566            }
2567        }
2568
2569        for frame in info.stream_frames {
2570            self.streams.received_ack_of(frame);
2571        }
2572    }
2573
2574    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2575        let start = if self.zero_rtt_crypto.is_some() {
2576            now
2577        } else {
2578            self.prev_crypto
2579                .as_ref()
2580                .expect("no previous keys")
2581                .end_packet
2582                .as_ref()
2583                .expect("update not acknowledged yet")
2584                .1
2585        };
2586
2587        // QUIC-MULTIPATH § 2.5 Key Phase Update Process: use largest PTO off all paths.
2588        self.timers.set(
2589            Timer::Conn(ConnTimer::KeyDiscard),
2590            start + self.pto_max_path(space, false) * 3,
2591            self.qlog.with_time(now),
2592        );
2593    }
2594
2595    /// Handle a [`PathTimer::LossDetection`] timeout.
2596    ///
2597    /// This timer expires for two reasons:
2598    /// - An ACK-eliciting packet we sent should be considered lost.
2599    /// - The PTO may have expired and a tail-loss probe needs to be scheduled.
2600    ///
2601    /// The former needs us to schedule re-transmission of the lost data.
2602    ///
2603    /// The latter means we have not received an ACK for an ack-eliciting packet we sent
2604    /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
2605    /// packet, to try and elicit new acknowledgements. These new acknowledgements will
2606    /// indicate whether the previously sent packets were lost or not.
2607    fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2608        if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2609            // Time threshold loss Detection
2610            self.detect_lost_packets(now, pn_space, path_id, false);
2611            self.set_loss_detection_timer(now, path_id);
2612            return;
2613        }
2614
2615        let (_, space) = match self.pto_time_and_space(now, path_id) {
2616            Some(x) => x,
2617            None => {
2618                error!(%path_id, "PTO expired while unset");
2619                return;
2620            }
2621        };
2622        trace!(
2623            in_flight = self.path_data(path_id).in_flight.bytes,
2624            count = self.path_data(path_id).pto_count,
2625            ?space,
2626            %path_id,
2627            "PTO fired"
2628        );
2629
2630        let count = match self.path_data(path_id).in_flight.ack_eliciting {
2631            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
2632            // deadlock preventions
2633            0 => {
2634                debug_assert!(!self.peer_completed_address_validation(path_id));
2635                1
2636            }
2637            // Conventional loss probe
2638            _ => 2,
2639        };
2640        let pns = self.spaces[space].for_path(path_id);
2641        pns.loss_probes = pns.loss_probes.saturating_add(count);
2642        let path_data = self.path_data_mut(path_id);
2643        path_data.pto_count = path_data.pto_count.saturating_add(1);
2644        self.set_loss_detection_timer(now, path_id);
2645    }
2646
2647    /// Detect any lost packets
2648    ///
2649    /// There are two cases in which we detects lost packets:
2650    ///
2651    /// - We received an ACK packet.
2652    /// - The [`PathTimer::LossDetection`] timer expired. So there is an un-acknowledged packet
2653    ///   that was followed by an acknowledged packet. The loss timer for this
2654    ///   un-acknowledged packet expired and we need to detect that packet as lost.
2655    ///
2656    /// Packets are lost if they are both (See RFC9002 §6.1):
2657    ///
2658    /// - Unacknowledged, in flight and sent prior to an acknowledged packet.
2659    /// - Old enough by either:
2660    ///   - Having a packet number [`TransportConfig::packet_threshold`] lower then the last
2661    ///     acknowledged packet.
2662    ///   - Being sent [`TransportConfig::time_threshold`] * RTT in the past.
2663    fn detect_lost_packets(
2664        &mut self,
2665        now: Instant,
2666        pn_space: SpaceId,
2667        path_id: PathId,
2668        due_to_ack: bool,
2669    ) {
2670        let mut lost_packets = Vec::<u64>::new();
2671        let mut lost_mtu_probe = None;
2672        let mut in_persistent_congestion = false;
2673        let mut size_of_lost_packets = 0u64;
2674        self.spaces[pn_space].for_path(path_id).loss_time = None;
2675
2676        // Find all the lost packets, populating all variables initialised above.
2677
2678        let path = self.path_data(path_id);
2679        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2680        let loss_delay = path
2681            .rtt
2682            .conservative()
2683            .mul_f32(self.config.time_threshold)
2684            .max(TIMER_GRANULARITY);
2685        let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2686
2687        let largest_acked_packet = self.spaces[pn_space]
2688            .for_path(path_id)
2689            .largest_acked_packet
2690            .expect("detect_lost_packets only to be called if path received at least one ACK");
2691        let packet_threshold = self.config.packet_threshold as u64;
2692
2693        // InPersistentCongestion: Determine if all packets in the time period before the newest
2694        // lost packet, including the edges, are marked lost. PTO computation must always
2695        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
2696        let congestion_period = self
2697            .pto(SpaceId::Data, path_id)
2698            .saturating_mul(self.config.persistent_congestion_threshold);
2699        let mut persistent_congestion_start: Option<Instant> = None;
2700        let mut prev_packet = None;
2701        let space = self.spaces[pn_space].for_path(path_id);
2702
2703        for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2704            if prev_packet != Some(packet.wrapping_sub(1)) {
2705                // An intervening packet was acknowledged
2706                persistent_congestion_start = None;
2707            }
2708
2709            // Packets sent before now - loss_delay are deemed lost.
2710            // However, we avoid subtraction as it can panic and there's no
2711            // saturating equivalent of this subtraction operation with a Duration.
2712            let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2713            if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2714                // The packet should be declared lost.
2715                if Some(packet) == in_flight_mtu_probe {
2716                    // Lost MTU probes are not included in `lost_packets`, because they
2717                    // should not trigger a congestion control response
2718                    lost_mtu_probe = in_flight_mtu_probe;
2719                } else {
2720                    lost_packets.push(packet);
2721                    size_of_lost_packets += info.size as u64;
2722                    if info.ack_eliciting && due_to_ack {
2723                        match persistent_congestion_start {
2724                            // Two ACK-eliciting packets lost more than
2725                            // congestion_period apart, with no ACKed packets in between
2726                            Some(start) if info.time_sent - start > congestion_period => {
2727                                in_persistent_congestion = true;
2728                            }
2729                            // Persistent congestion must start after the first RTT sample
2730                            None if first_packet_after_rtt_sample
2731                                .is_some_and(|x| x < (pn_space, packet)) =>
2732                            {
2733                                persistent_congestion_start = Some(info.time_sent);
2734                            }
2735                            _ => {}
2736                        }
2737                    }
2738                }
2739            } else {
2740                // The packet should not yet be declared lost.
2741                if space.loss_time.is_none() {
2742                    // Since we iterate in order the lowest packet number's loss time will
2743                    // always be the earliest.
2744                    space.loss_time = Some(info.time_sent + loss_delay);
2745                }
2746                persistent_congestion_start = None;
2747            }
2748
2749            prev_packet = Some(packet);
2750        }
2751
2752        self.handle_lost_packets(
2753            pn_space,
2754            path_id,
2755            now,
2756            lost_packets,
2757            lost_mtu_probe,
2758            loss_delay,
2759            in_persistent_congestion,
2760            size_of_lost_packets,
2761        );
2762    }
2763
2764    /// Drops the path state, declaring any remaining in-flight packets as lost
2765    fn discard_path(&mut self, path_id: PathId, now: Instant) {
2766        trace!(%path_id, "dropping path state");
2767        let path = self.path_data(path_id);
2768        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2769
2770        let mut size_of_lost_packets = 0u64; // add to path_stats.lost_bytes;
2771        let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2772            .for_path(path_id)
2773            .sent_packets
2774            .iter()
2775            .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2776            .map(|(pn, info)| {
2777                size_of_lost_packets += info.size as u64;
2778                pn
2779            })
2780            .collect();
2781
2782        if !lost_pns.is_empty() {
2783            trace!(
2784                %path_id,
2785                count = lost_pns.len(),
2786                lost_bytes = size_of_lost_packets,
2787                "packets lost on path abandon"
2788            );
2789            self.handle_lost_packets(
2790                SpaceId::Data,
2791                path_id,
2792                now,
2793                lost_pns,
2794                in_flight_mtu_probe,
2795                Duration::ZERO,
2796                false,
2797                size_of_lost_packets,
2798            );
2799        }
2800        self.paths.remove(&path_id);
2801        self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2802
2803        let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2804        self.events.push_back(
2805            PathEvent::Abandoned {
2806                id: path_id,
2807                path_stats,
2808            }
2809            .into(),
2810        );
2811    }
2812
2813    fn handle_lost_packets(
2814        &mut self,
2815        pn_space: SpaceId,
2816        path_id: PathId,
2817        now: Instant,
2818        lost_packets: Vec<u64>,
2819        lost_mtu_probe: Option<u64>,
2820        loss_delay: Duration,
2821        in_persistent_congestion: bool,
2822        size_of_lost_packets: u64,
2823    ) {
2824        debug_assert!(
2825            {
2826                let mut sorted = lost_packets.clone();
2827                sorted.sort();
2828                sorted == lost_packets
2829            },
2830            "lost_packets must be sorted"
2831        );
2832
2833        self.drain_lost_packets(now, pn_space, path_id);
2834
2835        // OnPacketsLost
2836        if let Some(largest_lost) = lost_packets.last().cloned() {
2837            let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2838            let largest_lost_sent = self.spaces[pn_space]
2839                .for_path(path_id)
2840                .sent_packets
2841                .get(largest_lost)
2842                .unwrap()
2843                .time_sent;
2844            let path_stats = self.path_stats.entry(path_id).or_default();
2845            path_stats.lost_packets += lost_packets.len() as u64;
2846            path_stats.lost_bytes += size_of_lost_packets;
2847            trace!(
2848                %path_id,
2849                count = lost_packets.len(),
2850                lost_bytes = size_of_lost_packets,
2851                "packets lost",
2852            );
2853
2854            for &packet in &lost_packets {
2855                let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2856                    continue;
2857                };
2858                self.qlog
2859                    .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2860                self.paths
2861                    .get_mut(&path_id)
2862                    .unwrap()
2863                    .remove_in_flight(&info);
2864
2865                for frame in info.stream_frames {
2866                    self.streams.retransmit(frame);
2867                }
2868                self.spaces[pn_space].pending |= info.retransmits;
2869                self.path_data_mut(path_id)
2870                    .mtud
2871                    .on_non_probe_lost(packet, info.size);
2872
2873                self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2874                    packet,
2875                    LostPacket {
2876                        time_sent: info.time_sent,
2877                    },
2878                );
2879            }
2880
2881            let path = self.path_data_mut(path_id);
2882            if path.mtud.black_hole_detected(now) {
2883                path.congestion.on_mtu_update(path.mtud.current_mtu());
2884                if let Some(max_datagram_size) = self.datagrams().max_size() {
2885                    if self.datagrams.drop_oversized(max_datagram_size)
2886                        && self.datagrams.send_blocked
2887                    {
2888                        self.datagrams.send_blocked = false;
2889                        self.events.push_back(Event::DatagramsUnblocked);
2890                    }
2891                }
2892                self.path_stats
2893                    .entry(path_id)
2894                    .or_default()
2895                    .black_holes_detected += 1;
2896            }
2897
2898            // Don't apply congestion penalty for lost ack-only packets
2899            let lost_ack_eliciting =
2900                old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2901
2902            if lost_ack_eliciting {
2903                self.path_stats
2904                    .entry(path_id)
2905                    .or_default()
2906                    .congestion_events += 1;
2907                self.path_data_mut(path_id).congestion.on_congestion_event(
2908                    now,
2909                    largest_lost_sent,
2910                    in_persistent_congestion,
2911                    false,
2912                    size_of_lost_packets,
2913                );
2914            }
2915        }
2916
2917        // Handle a lost MTU probe
2918        if let Some(packet) = lost_mtu_probe {
2919            let info = self.spaces[SpaceId::Data]
2920                .for_path(path_id)
2921                .take(packet)
2922                .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and
2923            // therefore must not have been removed yet
2924            self.paths
2925                .get_mut(&path_id)
2926                .unwrap()
2927                .remove_in_flight(&info);
2928            self.path_data_mut(path_id).mtud.on_probe_lost();
2929            self.path_stats
2930                .entry(path_id)
2931                .or_default()
2932                .lost_plpmtud_probes += 1;
2933        }
2934    }
2935
2936    /// Returns the earliest time packets should be declared lost for all spaces on a path.
2937    ///
2938    /// If a path has an acknowledged packet with any prior un-acknowledged packets, the
2939    /// earliest un-acknowledged packet can be declared lost after a timeout has elapsed.
2940    /// The time returned is when this packet should be declared lost.
2941    fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2942        SpaceId::iter()
2943            .filter_map(|id| {
2944                self.spaces[id]
2945                    .number_spaces
2946                    .get(&path_id)
2947                    .and_then(|pns| pns.loss_time)
2948                    .map(|time| (time, id))
2949            })
2950            .min_by_key(|&(time, _)| time)
2951    }
2952
2953    /// Returns the earliest next PTO should fire for all spaces on a path.
2954    fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2955        let path = self.path(path_id)?;
2956        let pto_count = path.pto_count;
2957        let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2958        let mut duration = path.rtt.pto_base() * backoff;
2959
2960        if path_id == PathId::ZERO
2961            && path.in_flight.ack_eliciting == 0
2962            && !self.peer_completed_address_validation(PathId::ZERO)
2963        {
2964            // Address Validation during Connection Establishment:
2965            // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
2966            // deadlock if an Initial or Handshake packet from the server is lost and the
2967            // server can not send more due to its anti-amplification limit the client must
2968            // send another packet on PTO.
2969            let space = match self.highest_space {
2970                SpaceId::Handshake => SpaceId::Handshake,
2971                _ => SpaceId::Initial,
2972            };
2973
2974            return Some((now + duration, space));
2975        }
2976
2977        let mut result = None;
2978        for space in SpaceId::iter() {
2979            let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2980                continue;
2981            };
2982
2983            if !pns.has_in_flight() {
2984                continue;
2985            }
2986            if space == SpaceId::Data {
2987                // Skip ApplicationData until handshake completes.
2988                if self.is_handshaking() {
2989                    return result;
2990                }
2991                // Include max_ack_delay and backoff for ApplicationData.
2992                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2993            }
2994            let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
2995                continue;
2996            };
2997            let pto = last_ack_eliciting + duration;
2998            if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2999                if path.anti_amplification_blocked(1) {
3000                    // Nothing would be able to be sent.
3001                    continue;
3002                }
3003                if path.in_flight.ack_eliciting == 0 {
3004                    // Nothing ack-eliciting, no PTO to arm/fire.
3005                    continue;
3006                }
3007                result = Some((pto, space));
3008            }
3009        }
3010        result
3011    }
3012
3013    fn peer_completed_address_validation(&self, path: PathId) -> bool {
3014        // TODO(flub): This logic needs updating for multipath
3015        if self.side.is_server() || self.state.is_closed() {
3016            return true;
3017        }
3018        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
3019        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
3020        self.spaces[SpaceId::Handshake]
3021            .path_space(PathId::ZERO)
3022            .and_then(|pns| pns.largest_acked_packet)
3023            .is_some()
3024            || self.spaces[SpaceId::Data]
3025                .path_space(path)
3026                .and_then(|pns| pns.largest_acked_packet)
3027                .is_some()
3028            || (self.spaces[SpaceId::Data].crypto.is_some()
3029                && self.spaces[SpaceId::Handshake].crypto.is_none())
3030    }
3031
3032    /// Resets the the [`PathTimer::LossDetection`] timer to the next instant it may be needed
3033    ///
3034    /// The timer must fire if either:
3035    /// - An ack-eliciting packet we sent needs to be declared lost.
3036    /// - A tail-loss probe needs to be sent.
3037    ///
3038    /// See [`Connection::on_loss_detection_timeout`] for details.
3039    fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3040        if self.state.is_closed() {
3041            // No loss detection takes place on closed connections, and `close_common` already
3042            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
3043            // reordered packet being handled by state-insensitive code.
3044            return;
3045        }
3046
3047        if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3048            // Time threshold loss detection.
3049            self.timers.set(
3050                Timer::PerPath(path_id, PathTimer::LossDetection),
3051                loss_time,
3052                self.qlog.with_time(now),
3053            );
3054            return;
3055        }
3056
3057        // Determine which PN space to arm PTO for.
3058        // Calculate PTO duration
3059        if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3060            self.timers.set(
3061                Timer::PerPath(path_id, PathTimer::LossDetection),
3062                timeout,
3063                self.qlog.with_time(now),
3064            );
3065        } else {
3066            self.timers.stop(
3067                Timer::PerPath(path_id, PathTimer::LossDetection),
3068                self.qlog.with_time(now),
3069            );
3070        }
3071    }
3072
3073    /// The maximum probe timeout across all paths
3074    ///
3075    /// If `is_closing` is set to `true` it will filter out paths that have not yet been used.
3076    ///
3077    /// See [`Connection::pto`]
3078    fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3079        match space {
3080            SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3081            SpaceId::Data => self
3082                .paths
3083                .iter()
3084                .filter_map(|(path_id, state)| {
3085                    if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3086                        // If we are closing and haven't sent anything yet, do not include
3087                        None
3088                    } else {
3089                        let pto = self.pto(space, *path_id);
3090                        Some(pto)
3091                    }
3092                })
3093                .max()
3094                .expect("there should be one at least path"),
3095        }
3096    }
3097
3098    /// Probe Timeout
3099    ///
3100    /// The PTO is logically the time in which you'd expect to receive an acknowledgement
3101    /// for a packet. So approximately RTT + max_ack_delay.
3102    fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3103        let max_ack_delay = match space {
3104            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3105            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3106        };
3107        self.path_data(path_id).rtt.pto_base() + max_ack_delay
3108    }
3109
3110    fn on_packet_authenticated(
3111        &mut self,
3112        now: Instant,
3113        space_id: SpaceId,
3114        path_id: PathId,
3115        ecn: Option<EcnCodepoint>,
3116        packet: Option<u64>,
3117        spin: bool,
3118        is_1rtt: bool,
3119    ) {
3120        self.total_authed_packets += 1;
3121        if let Some(last_allowed_receive) = self
3122            .paths
3123            .get(&path_id)
3124            .and_then(|path| path.data.last_allowed_receive)
3125        {
3126            if now > last_allowed_receive {
3127                warn!("received data on path which we abandoned more than 3 * PTO ago");
3128                // The peer failed to respond with a PATH_ABANDON in time.
3129                if !self.state.is_closed() {
3130                    // TODO(flub): What should the error code be?
3131                    self.state.move_to_closed(TransportError::NO_ERROR(
3132                        "peer failed to respond with PATH_ABANDON in time",
3133                    ));
3134                    self.close_common();
3135                    self.set_close_timer(now);
3136                    self.close = true;
3137                }
3138                return;
3139            }
3140        }
3141
3142        self.reset_keep_alive(path_id, now);
3143        self.reset_idle_timeout(now, space_id, path_id);
3144        self.permit_idle_reset = true;
3145        self.receiving_ecn |= ecn.is_some();
3146        if let Some(x) = ecn {
3147            let space = &mut self.spaces[space_id];
3148            space.for_path(path_id).ecn_counters += x;
3149
3150            if x.is_ce() {
3151                space
3152                    .for_path(path_id)
3153                    .pending_acks
3154                    .set_immediate_ack_required();
3155            }
3156        }
3157
3158        let packet = match packet {
3159            Some(x) => x,
3160            None => return,
3161        };
3162        match &self.side {
3163            ConnectionSide::Client { .. } => {
3164                // If we received a handshake packet that authenticated, then we're talking to
3165                // the real server.  From now on we should no longer allow the server to migrate
3166                // its address.
3167                if space_id == SpaceId::Handshake {
3168                    if let Some(hs) = self.state.as_handshake_mut() {
3169                        hs.allow_server_migration = false;
3170                    }
3171                }
3172            }
3173            ConnectionSide::Server { .. } => {
3174                if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3175                {
3176                    // A server stops sending and processing Initial packets when it receives its first Handshake packet.
3177                    self.discard_space(now, SpaceId::Initial);
3178                }
3179                if self.zero_rtt_crypto.is_some() && is_1rtt {
3180                    // Discard 0-RTT keys soon after receiving a 1-RTT packet
3181                    self.set_key_discard_timer(now, space_id)
3182                }
3183            }
3184        }
3185        let space = self.spaces[space_id].for_path(path_id);
3186        space.pending_acks.insert_one(packet, now);
3187        if packet >= space.rx_packet.unwrap_or_default() {
3188            space.rx_packet = Some(packet);
3189            // Update outgoing spin bit, inverting iff we're the client
3190            self.spin = self.side.is_client() ^ spin;
3191        }
3192    }
3193
3194    /// Resets the idle timeout timers
3195    ///
3196    /// Without multipath there is only the connection-wide idle timeout. When multipath is
3197    /// enabled there is an additional per-path idle timeout.
3198    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3199        // First reset the global idle timeout.
3200        if let Some(timeout) = self.idle_timeout {
3201            if self.state.is_closed() {
3202                self.timers
3203                    .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3204            } else {
3205                let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3206                self.timers.set(
3207                    Timer::Conn(ConnTimer::Idle),
3208                    now + dt,
3209                    self.qlog.with_time(now),
3210                );
3211            }
3212        }
3213
3214        // Now handle the per-path state
3215        if let Some(timeout) = self.path_data(path_id).idle_timeout {
3216            if self.state.is_closed() {
3217                self.timers.stop(
3218                    Timer::PerPath(path_id, PathTimer::PathIdle),
3219                    self.qlog.with_time(now),
3220                );
3221            } else {
3222                let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3223                self.timers.set(
3224                    Timer::PerPath(path_id, PathTimer::PathIdle),
3225                    now + dt,
3226                    self.qlog.with_time(now),
3227                );
3228            }
3229        }
3230    }
3231
3232    /// Resets both the [`ConnTimer::KeepAlive`] and [`PathTimer::PathKeepAlive`] timers
3233    fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3234        if !self.state.is_established() {
3235            return;
3236        }
3237
3238        if let Some(interval) = self.config.keep_alive_interval {
3239            self.timers.set(
3240                Timer::Conn(ConnTimer::KeepAlive),
3241                now + interval,
3242                self.qlog.with_time(now),
3243            );
3244        }
3245
3246        if let Some(interval) = self.path_data(path_id).keep_alive {
3247            self.timers.set(
3248                Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3249                now + interval,
3250                self.qlog.with_time(now),
3251            );
3252        }
3253    }
3254
3255    /// Sets the timer for when a previously issued CID should be retired next
3256    fn reset_cid_retirement(&mut self, now: Instant) {
3257        if let Some((_path, t)) = self.next_cid_retirement() {
3258            self.timers.set(
3259                Timer::Conn(ConnTimer::PushNewCid),
3260                t,
3261                self.qlog.with_time(now),
3262            );
3263        }
3264    }
3265
3266    /// The next time when a previously issued CID should be retired
3267    fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3268        self.local_cid_state
3269            .iter()
3270            .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3271            .min_by_key(|(_path_id, timeout)| *timeout)
3272    }
3273
3274    /// Handle the already-decrypted first packet from the client
3275    ///
3276    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
3277    /// efficient.
3278    pub(crate) fn handle_first_packet(
3279        &mut self,
3280        now: Instant,
3281        network_path: FourTuple,
3282        ecn: Option<EcnCodepoint>,
3283        packet_number: u64,
3284        packet: InitialPacket,
3285        remaining: Option<BytesMut>,
3286    ) -> Result<(), ConnectionError> {
3287        let span = trace_span!("first recv");
3288        let _guard = span.enter();
3289        debug_assert!(self.side.is_server());
3290        let len = packet.header_data.len() + packet.payload.len();
3291        let path_id = PathId::ZERO;
3292        self.path_data_mut(path_id).total_recvd = len as u64;
3293
3294        if let Some(hs) = self.state.as_handshake_mut() {
3295            hs.expected_token = packet.header.token.clone();
3296        } else {
3297            unreachable!("first packet must be delivered in Handshake state");
3298        }
3299
3300        // The first packet is always on PathId::ZERO
3301        self.on_packet_authenticated(
3302            now,
3303            SpaceId::Initial,
3304            path_id,
3305            ecn,
3306            Some(packet_number),
3307            false,
3308            false,
3309        );
3310
3311        let packet: Packet = packet.into();
3312
3313        let mut qlog = QlogRecvPacket::new(len);
3314        qlog.header(&packet.header, Some(packet_number), path_id);
3315
3316        self.process_decrypted_packet(
3317            now,
3318            network_path,
3319            path_id,
3320            Some(packet_number),
3321            packet,
3322            &mut qlog,
3323        )?;
3324        self.qlog.emit_packet_received(qlog, now);
3325        if let Some(data) = remaining {
3326            self.handle_coalesced(now, network_path, path_id, ecn, data);
3327        }
3328
3329        self.qlog.emit_recovery_metrics(
3330            path_id,
3331            &mut self.paths.get_mut(&path_id).unwrap().data,
3332            now,
3333        );
3334
3335        Ok(())
3336    }
3337
3338    fn init_0rtt(&mut self, now: Instant) {
3339        let (header, packet) = match self.crypto.early_crypto() {
3340            Some(x) => x,
3341            None => return,
3342        };
3343        if self.side.is_client() {
3344            match self.crypto.transport_parameters() {
3345                Ok(params) => {
3346                    let params = params
3347                        .expect("crypto layer didn't supply transport parameters with ticket");
3348                    // Certain values must not be cached
3349                    let params = TransportParameters {
3350                        initial_src_cid: None,
3351                        original_dst_cid: None,
3352                        preferred_address: None,
3353                        retry_src_cid: None,
3354                        stateless_reset_token: None,
3355                        min_ack_delay: None,
3356                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3357                        max_ack_delay: TransportParameters::default().max_ack_delay,
3358                        initial_max_path_id: None,
3359                        ..params
3360                    };
3361                    self.set_peer_params(params);
3362                    self.qlog.emit_peer_transport_params_restored(self, now);
3363                }
3364                Err(e) => {
3365                    error!("session ticket has malformed transport parameters: {}", e);
3366                    return;
3367                }
3368            }
3369        }
3370        trace!("0-RTT enabled");
3371        self.zero_rtt_enabled = true;
3372        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3373    }
3374
3375    fn read_crypto(
3376        &mut self,
3377        space: SpaceId,
3378        crypto: &frame::Crypto,
3379        payload_len: usize,
3380    ) -> Result<(), TransportError> {
3381        let expected = if !self.state.is_handshake() {
3382            SpaceId::Data
3383        } else if self.highest_space == SpaceId::Initial {
3384            SpaceId::Initial
3385        } else {
3386            // On the server, self.highest_space can be Data after receiving the client's first
3387            // flight, but we expect Handshake CRYPTO until the handshake is complete.
3388            SpaceId::Handshake
3389        };
3390        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
3391        // packets are illegal, and we don't process 1-RTT packets until the handshake is
3392        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
3393        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3394
3395        let end = crypto.offset + crypto.data.len() as u64;
3396        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3397            warn!(
3398                "received new {:?} CRYPTO data when expecting {:?}",
3399                space, expected
3400            );
3401            return Err(TransportError::PROTOCOL_VIOLATION(
3402                "new data at unexpected encryption level",
3403            ));
3404        }
3405
3406        let space = &mut self.spaces[space];
3407        let max = end.saturating_sub(space.crypto_stream.bytes_read());
3408        if max > self.config.crypto_buffer_size as u64 {
3409            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3410        }
3411
3412        space
3413            .crypto_stream
3414            .insert(crypto.offset, crypto.data.clone(), payload_len);
3415        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3416            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3417            if self.crypto.read_handshake(&chunk.bytes)? {
3418                self.events.push_back(Event::HandshakeDataReady);
3419            }
3420        }
3421
3422        Ok(())
3423    }
3424
3425    fn write_crypto(&mut self) {
3426        loop {
3427            let space = self.highest_space;
3428            let mut outgoing = Vec::new();
3429            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3430                match space {
3431                    SpaceId::Initial => {
3432                        self.upgrade_crypto(SpaceId::Handshake, crypto);
3433                    }
3434                    SpaceId::Handshake => {
3435                        self.upgrade_crypto(SpaceId::Data, crypto);
3436                    }
3437                    _ => unreachable!("got updated secrets during 1-RTT"),
3438                }
3439            }
3440            if outgoing.is_empty() {
3441                if space == self.highest_space {
3442                    break;
3443                } else {
3444                    // Keys updated, check for more data to send
3445                    continue;
3446                }
3447            }
3448            let offset = self.spaces[space].crypto_offset;
3449            let outgoing = Bytes::from(outgoing);
3450            if let Some(hs) = self.state.as_handshake_mut() {
3451                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3452                    hs.client_hello = Some(outgoing.clone());
3453                }
3454            }
3455            self.spaces[space].crypto_offset += outgoing.len() as u64;
3456            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3457            self.spaces[space].pending.crypto.push_back(frame::Crypto {
3458                offset,
3459                data: outgoing,
3460            });
3461        }
3462    }
3463
3464    /// Switch to stronger cryptography during handshake
3465    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3466        debug_assert!(
3467            self.spaces[space].crypto.is_none(),
3468            "already reached packet space {space:?}"
3469        );
3470        trace!("{:?} keys ready", space);
3471        if space == SpaceId::Data {
3472            // Precompute the first key update
3473            self.next_crypto = Some(
3474                self.crypto
3475                    .next_1rtt_keys()
3476                    .expect("handshake should be complete"),
3477            );
3478        }
3479
3480        self.spaces[space].crypto = Some(crypto);
3481        debug_assert!(space as usize > self.highest_space as usize);
3482        self.highest_space = space;
3483        if space == SpaceId::Data && self.side.is_client() {
3484            // Discard 0-RTT keys because 1-RTT keys are available.
3485            self.zero_rtt_crypto = None;
3486        }
3487    }
3488
3489    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3490        debug_assert!(space_id != SpaceId::Data);
3491        trace!("discarding {:?} keys", space_id);
3492        if space_id == SpaceId::Initial {
3493            // No longer needed
3494            if let ConnectionSide::Client { token, .. } = &mut self.side {
3495                *token = Bytes::new();
3496            }
3497        }
3498        let space = &mut self.spaces[space_id];
3499        space.crypto = None;
3500        let pns = space.for_path(PathId::ZERO);
3501        pns.time_of_last_ack_eliciting_packet = None;
3502        pns.loss_time = None;
3503        pns.loss_probes = 0;
3504        let sent_packets = mem::take(&mut pns.sent_packets);
3505        let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3506        for (_, packet) in sent_packets.into_iter() {
3507            path.data.remove_in_flight(&packet);
3508        }
3509
3510        self.set_loss_detection_timer(now, PathId::ZERO)
3511    }
3512
3513    fn handle_coalesced(
3514        &mut self,
3515        now: Instant,
3516        network_path: FourTuple,
3517        path_id: PathId,
3518        ecn: Option<EcnCodepoint>,
3519        data: BytesMut,
3520    ) {
3521        self.path_data_mut(path_id)
3522            .inc_total_recvd(data.len() as u64);
3523        let mut remaining = Some(data);
3524        let cid_len = self
3525            .local_cid_state
3526            .values()
3527            .map(|cid_state| cid_state.cid_len())
3528            .next()
3529            .expect("one cid_state must exist");
3530        while let Some(data) = remaining {
3531            match PartialDecode::new(
3532                data,
3533                &FixedLengthConnectionIdParser::new(cid_len),
3534                &[self.version],
3535                self.endpoint_config.grease_quic_bit,
3536            ) {
3537                Ok((partial_decode, rest)) => {
3538                    remaining = rest;
3539                    self.handle_decode(now, network_path, path_id, ecn, partial_decode);
3540                }
3541                Err(e) => {
3542                    trace!("malformed header: {}", e);
3543                    return;
3544                }
3545            }
3546        }
3547    }
3548
3549    fn handle_decode(
3550        &mut self,
3551        now: Instant,
3552        network_path: FourTuple,
3553        path_id: PathId,
3554        ecn: Option<EcnCodepoint>,
3555        partial_decode: PartialDecode,
3556    ) {
3557        let qlog = QlogRecvPacket::new(partial_decode.len());
3558        if let Some(decoded) = packet_crypto::unprotect_header(
3559            partial_decode,
3560            &self.spaces,
3561            self.zero_rtt_crypto.as_ref(),
3562            self.peer_params.stateless_reset_token,
3563        ) {
3564            self.handle_packet(
3565                now,
3566                network_path,
3567                path_id,
3568                ecn,
3569                decoded.packet,
3570                decoded.stateless_reset,
3571                qlog,
3572            );
3573        }
3574    }
3575
3576    fn handle_packet(
3577        &mut self,
3578        now: Instant,
3579        network_path: FourTuple,
3580        path_id: PathId,
3581        ecn: Option<EcnCodepoint>,
3582        packet: Option<Packet>,
3583        stateless_reset: bool,
3584        mut qlog: QlogRecvPacket,
3585    ) {
3586        self.stats.udp_rx.ios += 1;
3587        if let Some(ref packet) = packet {
3588            trace!(
3589                "got {:?} packet ({} bytes) from {} using id {}",
3590                packet.header.space(),
3591                packet.payload.len() + packet.header_data.len(),
3592                network_path,
3593                packet.header.dst_cid(),
3594            );
3595        }
3596
3597        if self.is_handshaking() {
3598            if path_id != PathId::ZERO {
3599                debug!(%network_path, %path_id, "discarding multipath packet during handshake");
3600                return;
3601            }
3602            if network_path != self.path_data_mut(path_id).network_path {
3603                if let Some(hs) = self.state.as_handshake() {
3604                    if hs.allow_server_migration {
3605                        trace!(%network_path, prev = %self.path_data(path_id).network_path, "server migrated to new remote");
3606                        self.path_data_mut(path_id).network_path = network_path;
3607                        self.qlog.emit_tuple_assigned(path_id, network_path, now);
3608                    } else {
3609                        debug!("discarding packet with unexpected remote during handshake");
3610                        return;
3611                    }
3612                } else {
3613                    debug!("discarding packet with unexpected remote during handshake");
3614                    return;
3615                }
3616            }
3617        }
3618
3619        let was_closed = self.state.is_closed();
3620        let was_drained = self.state.is_drained();
3621
3622        let decrypted = match packet {
3623            None => Err(None),
3624            Some(mut packet) => self
3625                .decrypt_packet(now, path_id, &mut packet)
3626                .map(move |number| (packet, number)),
3627        };
3628        let result = match decrypted {
3629            _ if stateless_reset => {
3630                debug!("got stateless reset");
3631                Err(ConnectionError::Reset)
3632            }
3633            Err(Some(e)) => {
3634                warn!("illegal packet: {}", e);
3635                Err(e.into())
3636            }
3637            Err(None) => {
3638                debug!("failed to authenticate packet");
3639                self.authentication_failures += 1;
3640                let integrity_limit = self.spaces[self.highest_space]
3641                    .crypto
3642                    .as_ref()
3643                    .unwrap()
3644                    .packet
3645                    .local
3646                    .integrity_limit();
3647                if self.authentication_failures > integrity_limit {
3648                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3649                } else {
3650                    return;
3651                }
3652            }
3653            Ok((packet, number)) => {
3654                qlog.header(&packet.header, number, path_id);
3655                let span = match number {
3656                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3657                    None => trace_span!("recv", space = ?packet.header.space()),
3658                };
3659                let _guard = span.enter();
3660
3661                let dedup = self.spaces[packet.header.space()]
3662                    .path_space_mut(path_id)
3663                    .map(|pns| &mut pns.dedup);
3664                if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3665                    debug!("discarding possible duplicate packet");
3666                    self.qlog.emit_packet_received(qlog, now);
3667                    return;
3668                } else if self.state.is_handshake() && packet.header.is_short() {
3669                    // TODO: SHOULD buffer these to improve reordering tolerance.
3670                    trace!("dropping short packet during handshake");
3671                    self.qlog.emit_packet_received(qlog, now);
3672                    return;
3673                } else {
3674                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3675                        if let Some(hs) = self.state.as_handshake() {
3676                            if self.side.is_server() && token != &hs.expected_token {
3677                                // Clients must send the same retry token in every Initial. Initial
3678                                // packets can be spoofed, so we discard rather than killing the
3679                                // connection.
3680                                warn!("discarding Initial with invalid retry token");
3681                                self.qlog.emit_packet_received(qlog, now);
3682                                return;
3683                            }
3684                        }
3685                    }
3686
3687                    if !self.state.is_closed() {
3688                        let spin = match packet.header {
3689                            Header::Short { spin, .. } => spin,
3690                            _ => false,
3691                        };
3692
3693                        if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3694                            // Only the client is allowed to open paths
3695                            self.ensure_path(path_id, network_path, now, number);
3696                        }
3697                        if self.paths.contains_key(&path_id) {
3698                            self.on_packet_authenticated(
3699                                now,
3700                                packet.header.space(),
3701                                path_id,
3702                                ecn,
3703                                number,
3704                                spin,
3705                                packet.header.is_1rtt(),
3706                            );
3707                        }
3708                    }
3709
3710                    let res = self.process_decrypted_packet(
3711                        now,
3712                        network_path,
3713                        path_id,
3714                        number,
3715                        packet,
3716                        &mut qlog,
3717                    );
3718
3719                    self.qlog.emit_packet_received(qlog, now);
3720                    res
3721                }
3722            }
3723        };
3724
3725        // State transitions for error cases
3726        if let Err(conn_err) = result {
3727            match conn_err {
3728                ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3729                ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3730                ConnectionError::Reset
3731                | ConnectionError::TransportError(TransportError {
3732                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
3733                    ..
3734                }) => {
3735                    self.state.move_to_drained(Some(conn_err));
3736                }
3737                ConnectionError::TimedOut => {
3738                    unreachable!("timeouts aren't generated by packet processing");
3739                }
3740                ConnectionError::TransportError(err) => {
3741                    debug!("closing connection due to transport error: {}", err);
3742                    self.state.move_to_closed(err);
3743                }
3744                ConnectionError::VersionMismatch => {
3745                    self.state.move_to_draining(Some(conn_err));
3746                }
3747                ConnectionError::LocallyClosed => {
3748                    unreachable!("LocallyClosed isn't generated by packet processing");
3749                }
3750                ConnectionError::CidsExhausted => {
3751                    unreachable!("CidsExhausted isn't generated by packet processing");
3752                }
3753            };
3754        }
3755
3756        if !was_closed && self.state.is_closed() {
3757            self.close_common();
3758            if !self.state.is_drained() {
3759                self.set_close_timer(now);
3760            }
3761        }
3762        if !was_drained && self.state.is_drained() {
3763            self.endpoint_events.push_back(EndpointEventInner::Drained);
3764            // Close timer may have been started previously, e.g. if we sent a close and got a
3765            // stateless reset in response
3766            self.timers
3767                .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3768        }
3769
3770        // Transmit CONNECTION_CLOSE if necessary
3771        if matches!(self.state.as_type(), StateType::Closed) {
3772            // If there is no PathData for this PathId the packet was for a brand new
3773            // path. It was a valid packet however, so the remote is valid and we want to
3774            // send CONNECTION_CLOSE.
3775            let path_remote = self
3776                .paths
3777                .get(&path_id)
3778                .map(|p| p.data.network_path)
3779                .unwrap_or(network_path);
3780            self.close = network_path == path_remote;
3781        }
3782    }
3783
3784    fn process_decrypted_packet(
3785        &mut self,
3786        now: Instant,
3787        network_path: FourTuple,
3788        path_id: PathId,
3789        number: Option<u64>,
3790        packet: Packet,
3791        qlog: &mut QlogRecvPacket,
3792    ) -> Result<(), ConnectionError> {
3793        if !self.paths.contains_key(&path_id) {
3794            // There is a chance this is a server side, first (for this path) packet, which would
3795            // be a protocol violation. It's more likely, however, that this is a packet of a
3796            // pruned path
3797            trace!(%path_id, ?number, "discarding packet for unknown path");
3798            return Ok(());
3799        }
3800        let state = match self.state.as_type() {
3801            StateType::Established => {
3802                match packet.header.space() {
3803                    SpaceId::Data => self.process_payload(
3804                        now,
3805                        network_path,
3806                        path_id,
3807                        number.unwrap(),
3808                        packet,
3809                        qlog,
3810                    )?,
3811                    _ if packet.header.has_frames() => {
3812                        self.process_early_payload(now, path_id, packet, qlog)?
3813                    }
3814                    _ => {
3815                        trace!("discarding unexpected pre-handshake packet");
3816                    }
3817                }
3818                return Ok(());
3819            }
3820            StateType::Closed => {
3821                for result in frame::Iter::new(packet.payload.freeze())? {
3822                    let frame = match result {
3823                        Ok(frame) => frame,
3824                        Err(err) => {
3825                            debug!("frame decoding error: {err:?}");
3826                            continue;
3827                        }
3828                    };
3829                    qlog.frame(&frame);
3830
3831                    if let Frame::Padding = frame {
3832                        continue;
3833                    };
3834
3835                    self.stats.frame_rx.record(frame.ty());
3836
3837                    if let Frame::Close(_error) = frame {
3838                        self.state.move_to_draining(None);
3839                        break;
3840                    }
3841                }
3842                return Ok(());
3843            }
3844            StateType::Draining | StateType::Drained => return Ok(()),
3845            StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3846        };
3847
3848        match packet.header {
3849            Header::Retry {
3850                src_cid: rem_cid, ..
3851            } => {
3852                debug_assert_eq!(path_id, PathId::ZERO);
3853                if self.side.is_server() {
3854                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3855                }
3856
3857                let is_valid_retry = self
3858                    .rem_cids
3859                    .get(&path_id)
3860                    .map(|cids| cids.active())
3861                    .map(|orig_dst_cid| {
3862                        self.crypto.is_valid_retry(
3863                            orig_dst_cid,
3864                            &packet.header_data,
3865                            &packet.payload,
3866                        )
3867                    })
3868                    .unwrap_or_default();
3869                if self.total_authed_packets > 1
3870                            || packet.payload.len() <= 16 // token + 16 byte tag
3871                            || !is_valid_retry
3872                {
3873                    trace!("discarding invalid Retry");
3874                    // - After the client has received and processed an Initial or Retry
3875                    //   packet from the server, it MUST discard any subsequent Retry
3876                    //   packets that it receives.
3877                    // - A client MUST discard a Retry packet with a zero-length Retry Token
3878                    //   field.
3879                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
3880                    //   that cannot be validated
3881                    return Ok(());
3882                }
3883
3884                trace!("retrying with CID {}", rem_cid);
3885                let client_hello = state.client_hello.take().unwrap();
3886                self.retry_src_cid = Some(rem_cid);
3887                self.rem_cids
3888                    .get_mut(&path_id)
3889                    .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3890                    .update_initial_cid(rem_cid);
3891                self.rem_handshake_cid = rem_cid;
3892
3893                let space = &mut self.spaces[SpaceId::Initial];
3894                if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3895                    self.on_packet_acked(now, PathId::ZERO, info);
3896                };
3897
3898                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after
3899                // any retransmitted Initials
3900                self.spaces[SpaceId::Initial] = {
3901                    let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3902                    space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3903                    space.crypto_offset = client_hello.len() as u64;
3904                    space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3905                        .for_path(path_id)
3906                        .next_packet_number;
3907                    space.pending.crypto.push_back(frame::Crypto {
3908                        offset: 0,
3909                        data: client_hello,
3910                    });
3911                    space
3912                };
3913
3914                // Retransmit all 0-RTT data
3915                let zero_rtt = mem::take(
3916                    &mut self.spaces[SpaceId::Data]
3917                        .for_path(PathId::ZERO)
3918                        .sent_packets,
3919                );
3920                for (_, info) in zero_rtt.into_iter() {
3921                    self.paths
3922                        .get_mut(&PathId::ZERO)
3923                        .unwrap()
3924                        .remove_in_flight(&info);
3925                    self.spaces[SpaceId::Data].pending |= info.retransmits;
3926                }
3927                self.streams.retransmit_all_for_0rtt();
3928
3929                let token_len = packet.payload.len() - 16;
3930                let ConnectionSide::Client { ref mut token, .. } = self.side else {
3931                    unreachable!("we already short-circuited if we're server");
3932                };
3933                *token = packet.payload.freeze().split_to(token_len);
3934
3935                self.state = State::handshake(state::Handshake {
3936                    expected_token: Bytes::new(),
3937                    rem_cid_set: false,
3938                    client_hello: None,
3939                    allow_server_migration: true,
3940                });
3941                Ok(())
3942            }
3943            Header::Long {
3944                ty: LongType::Handshake,
3945                src_cid: rem_cid,
3946                dst_cid: loc_cid,
3947                ..
3948            } => {
3949                debug_assert_eq!(path_id, PathId::ZERO);
3950                if rem_cid != self.rem_handshake_cid {
3951                    debug!(
3952                        "discarding packet with mismatched remote CID: {} != {}",
3953                        self.rem_handshake_cid, rem_cid
3954                    );
3955                    return Ok(());
3956                }
3957                self.on_path_validated(path_id);
3958
3959                self.process_early_payload(now, path_id, packet, qlog)?;
3960                if self.state.is_closed() {
3961                    return Ok(());
3962                }
3963
3964                if self.crypto.is_handshaking() {
3965                    trace!("handshake ongoing");
3966                    return Ok(());
3967                }
3968
3969                if self.side.is_client() {
3970                    // Client-only because server params were set from the client's Initial
3971                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3972                        TransportError::new(
3973                            TransportErrorCode::crypto(0x6d),
3974                            "transport parameters missing".to_owned(),
3975                        )
3976                    })?;
3977
3978                    if self.has_0rtt() {
3979                        if !self.crypto.early_data_accepted().unwrap() {
3980                            debug_assert!(self.side.is_client());
3981                            debug!("0-RTT rejected");
3982                            self.accepted_0rtt = false;
3983                            self.streams.zero_rtt_rejected();
3984
3985                            // Discard already-queued frames
3986                            self.spaces[SpaceId::Data].pending = Retransmits::default();
3987
3988                            // Discard 0-RTT packets
3989                            let sent_packets = mem::take(
3990                                &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3991                            );
3992                            for (_, packet) in sent_packets.into_iter() {
3993                                self.paths
3994                                    .get_mut(&path_id)
3995                                    .unwrap()
3996                                    .remove_in_flight(&packet);
3997                            }
3998                        } else {
3999                            self.accepted_0rtt = true;
4000                            params.validate_resumption_from(&self.peer_params)?;
4001                        }
4002                    }
4003                    if let Some(token) = params.stateless_reset_token {
4004                        // TODO(matheus23): Reset token for a remote, or for a 4-tuple?
4005                        let remote = self.path_data(path_id).network_path.remote;
4006                        self.endpoint_events
4007                            .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4008                    }
4009                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4010                    self.issue_first_cids(now);
4011                } else {
4012                    // Server-only
4013                    self.spaces[SpaceId::Data].pending.handshake_done = true;
4014                    self.discard_space(now, SpaceId::Handshake);
4015                    self.events.push_back(Event::HandshakeConfirmed);
4016                    trace!("handshake confirmed");
4017                }
4018
4019                self.events.push_back(Event::Connected);
4020                self.state.move_to_established();
4021                trace!("established");
4022
4023                // Multipath can only be enabled after the state has reached Established.
4024                // So this can not happen any earlier.
4025                self.issue_first_path_cids(now);
4026                Ok(())
4027            }
4028            Header::Initial(InitialHeader {
4029                src_cid: rem_cid,
4030                dst_cid: loc_cid,
4031                ..
4032            }) => {
4033                debug_assert_eq!(path_id, PathId::ZERO);
4034                if !state.rem_cid_set {
4035                    trace!("switching remote CID to {}", rem_cid);
4036                    let mut state = state.clone();
4037                    self.rem_cids
4038                        .get_mut(&path_id)
4039                        .expect("PathId::ZERO not yet abandoned")
4040                        .update_initial_cid(rem_cid);
4041                    self.rem_handshake_cid = rem_cid;
4042                    self.orig_rem_cid = rem_cid;
4043                    state.rem_cid_set = true;
4044                    self.state.move_to_handshake(state);
4045                } else if rem_cid != self.rem_handshake_cid {
4046                    debug!(
4047                        "discarding packet with mismatched remote CID: {} != {}",
4048                        self.rem_handshake_cid, rem_cid
4049                    );
4050                    return Ok(());
4051                }
4052
4053                let starting_space = self.highest_space;
4054                self.process_early_payload(now, path_id, packet, qlog)?;
4055
4056                if self.side.is_server()
4057                    && starting_space == SpaceId::Initial
4058                    && self.highest_space != SpaceId::Initial
4059                {
4060                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4061                        TransportError::new(
4062                            TransportErrorCode::crypto(0x6d),
4063                            "transport parameters missing".to_owned(),
4064                        )
4065                    })?;
4066                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4067                    self.issue_first_cids(now);
4068                    self.init_0rtt(now);
4069                }
4070                Ok(())
4071            }
4072            Header::Long {
4073                ty: LongType::ZeroRtt,
4074                ..
4075            } => {
4076                self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4077                Ok(())
4078            }
4079            Header::VersionNegotiate { .. } => {
4080                if self.total_authed_packets > 1 {
4081                    return Ok(());
4082                }
4083                let supported = packet
4084                    .payload
4085                    .chunks(4)
4086                    .any(|x| match <[u8; 4]>::try_from(x) {
4087                        Ok(version) => self.version == u32::from_be_bytes(version),
4088                        Err(_) => false,
4089                    });
4090                if supported {
4091                    return Ok(());
4092                }
4093                debug!("remote doesn't support our version");
4094                Err(ConnectionError::VersionMismatch)
4095            }
4096            Header::Short { .. } => unreachable!(
4097                "short packets received during handshake are discarded in handle_packet"
4098            ),
4099        }
4100    }
4101
4102    /// Process an Initial or Handshake packet payload
4103    fn process_early_payload(
4104        &mut self,
4105        now: Instant,
4106        path_id: PathId,
4107        packet: Packet,
4108        #[allow(unused)] qlog: &mut QlogRecvPacket,
4109    ) -> Result<(), TransportError> {
4110        debug_assert_ne!(packet.header.space(), SpaceId::Data);
4111        debug_assert_eq!(path_id, PathId::ZERO);
4112        let payload_len = packet.payload.len();
4113        let mut ack_eliciting = false;
4114        for result in frame::Iter::new(packet.payload.freeze())? {
4115            let frame = result?;
4116            qlog.frame(&frame);
4117            let span = match frame {
4118                Frame::Padding => continue,
4119                _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4120            };
4121
4122            self.stats.frame_rx.record(frame.ty());
4123
4124            let _guard = span.as_ref().map(|x| x.enter());
4125            ack_eliciting |= frame.is_ack_eliciting();
4126
4127            // Process frames
4128            if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4129                return Err(TransportError::PROTOCOL_VIOLATION(
4130                    "illegal frame type in handshake",
4131                ));
4132            }
4133
4134            match frame {
4135                Frame::Padding | Frame::Ping => {}
4136                Frame::Crypto(frame) => {
4137                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
4138                }
4139                Frame::Ack(ack) => {
4140                    self.on_ack_received(now, packet.header.space(), ack)?;
4141                }
4142                Frame::PathAck(ack) => {
4143                    span.as_ref()
4144                        .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4145                    self.on_path_ack_received(now, packet.header.space(), ack)?;
4146                }
4147                Frame::Close(reason) => {
4148                    self.state.move_to_draining(Some(reason.into()));
4149                    return Ok(());
4150                }
4151                _ => {
4152                    let mut err =
4153                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4154                    err.frame = frame::MaybeFrame::Known(frame.ty());
4155                    return Err(err);
4156                }
4157            }
4158        }
4159
4160        if ack_eliciting {
4161            // In the initial and handshake spaces, ACKs must be sent immediately
4162            self.spaces[packet.header.space()]
4163                .for_path(path_id)
4164                .pending_acks
4165                .set_immediate_ack_required();
4166        }
4167
4168        self.write_crypto();
4169        Ok(())
4170    }
4171
4172    /// Processes the packet payload, always in the data space.
4173    fn process_payload(
4174        &mut self,
4175        now: Instant,
4176        network_path: FourTuple,
4177        path_id: PathId,
4178        number: u64,
4179        packet: Packet,
4180        #[allow(unused)] qlog: &mut QlogRecvPacket,
4181    ) -> Result<(), TransportError> {
4182        let payload = packet.payload.freeze();
4183        let mut is_probing_packet = true;
4184        let mut close = None;
4185        let payload_len = payload.len();
4186        let mut ack_eliciting = false;
4187        // if this packet triggers a path migration and includes a observed address frame, it's
4188        // stored here
4189        let mut migration_observed_addr = None;
4190        for result in frame::Iter::new(payload)? {
4191            let frame = result?;
4192            qlog.frame(&frame);
4193            let span = match frame {
4194                Frame::Padding => continue,
4195                _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4196            };
4197
4198            self.stats.frame_rx.record(frame.ty());
4199            // Crypto, Stream and Datagram frames are special cased in order no pollute
4200            // the log with payload data
4201            match &frame {
4202                Frame::Crypto(f) => {
4203                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4204                }
4205                Frame::Stream(f) => {
4206                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4207                }
4208                Frame::Datagram(f) => {
4209                    trace!(len = f.data.len(), "got datagram frame");
4210                }
4211                f => {
4212                    trace!("got frame {f}");
4213                }
4214            }
4215
4216            let _guard = span.enter();
4217            if packet.header.is_0rtt() {
4218                match frame {
4219                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4220                        return Err(TransportError::PROTOCOL_VIOLATION(
4221                            "illegal frame type in 0-RTT",
4222                        ));
4223                    }
4224                    _ => {
4225                        if frame.is_1rtt() {
4226                            return Err(TransportError::PROTOCOL_VIOLATION(
4227                                "illegal frame type in 0-RTT",
4228                            ));
4229                        }
4230                    }
4231                }
4232            }
4233            ack_eliciting |= frame.is_ack_eliciting();
4234
4235            // Check whether this could be a probing packet
4236            match frame {
4237                Frame::Padding
4238                | Frame::PathChallenge(_)
4239                | Frame::PathResponse(_)
4240                | Frame::NewConnectionId(_)
4241                | Frame::ObservedAddr(_) => {}
4242                _ => {
4243                    is_probing_packet = false;
4244                }
4245            }
4246
4247            match frame {
4248                Frame::Crypto(frame) => {
4249                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4250                }
4251                Frame::Stream(frame) => {
4252                    if self.streams.received(frame, payload_len)?.should_transmit() {
4253                        self.spaces[SpaceId::Data].pending.max_data = true;
4254                    }
4255                }
4256                Frame::Ack(ack) => {
4257                    self.on_ack_received(now, SpaceId::Data, ack)?;
4258                }
4259                Frame::PathAck(ack) => {
4260                    span.record("path", tracing::field::debug(&ack.path_id));
4261                    self.on_path_ack_received(now, SpaceId::Data, ack)?;
4262                }
4263                Frame::Padding | Frame::Ping => {}
4264                Frame::Close(reason) => {
4265                    close = Some(reason);
4266                }
4267                Frame::PathChallenge(challenge) => {
4268                    let path = &mut self
4269                        .path_mut(path_id)
4270                        .expect("payload is processed only after the path becomes known");
4271                    path.path_responses.push(number, challenge.0, network_path);
4272                    // At this point, update_network_path_or_discard was already called, so
4273                    // we don't need to be lenient about `local_ip` possibly mis-matching.
4274                    if network_path == path.network_path {
4275                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
4276                        // attack. Send a non-probing packet to recover the active path.
4277                        // TODO(flub): No longer true! We now path_challege also to validate
4278                        //    the path if the path is new, without an RFC9000-style
4279                        //    migration involved. This means we add in an extra
4280                        //    IMMEDIATE_ACK on some challenges. It isn't really wrong to do
4281                        //    so, but it still is something untidy. We should instead
4282                        //    suppress this when we know the remote is still validating the
4283                        //    path.
4284                        match self.peer_supports_ack_frequency() {
4285                            true => self.immediate_ack(path_id),
4286                            false => {
4287                                self.ping_path(path_id).ok();
4288                            }
4289                        }
4290                    }
4291                }
4292                Frame::PathResponse(response) => {
4293                    let path = self
4294                        .paths
4295                        .get_mut(&path_id)
4296                        .expect("payload is processed only after the path becomes known");
4297
4298                    use PathTimer::*;
4299                    use paths::OnPathResponseReceived::*;
4300                    match path
4301                        .data
4302                        .on_path_response_received(now, response.0, network_path)
4303                    {
4304                        OnPath { was_open } => {
4305                            let qlog = self.qlog.with_time(now);
4306
4307                            self.timers
4308                                .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4309                            self.timers
4310                                .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4311
4312                            let next_challenge = path
4313                                .data
4314                                .earliest_expiring_challenge()
4315                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4316                            self.timers.set_or_stop(
4317                                Timer::PerPath(path_id, PathChallengeLost),
4318                                next_challenge,
4319                                qlog,
4320                            );
4321
4322                            if !was_open {
4323                                self.events
4324                                    .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4325                                if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4326                                {
4327                                    self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4328                                        id: path_id,
4329                                        addr: observed.socket_addr(),
4330                                    }));
4331                                }
4332                            }
4333                            if let Some((_, ref mut prev)) = path.prev {
4334                                prev.challenges_sent.clear();
4335                                prev.send_new_challenge = false;
4336                            }
4337                        }
4338                        OffPath => {
4339                            debug!("Response to off-path PathChallenge!");
4340                            let next_challenge = path
4341                                .data
4342                                .earliest_expiring_challenge()
4343                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4344                            self.timers.set_or_stop(
4345                                Timer::PerPath(path_id, PathChallengeLost),
4346                                next_challenge,
4347                                self.qlog.with_time(now),
4348                            );
4349                        }
4350                        Invalid { expected } => {
4351                            debug!(%response, %network_path, %expected, "ignoring invalid PATH_RESPONSE")
4352                        }
4353                        Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4354                    }
4355                }
4356                Frame::MaxData(frame::MaxData(bytes)) => {
4357                    self.streams.received_max_data(bytes);
4358                }
4359                Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
4360                    self.streams.received_max_stream_data(id, offset)?;
4361                }
4362                Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
4363                    self.streams.received_max_streams(dir, count)?;
4364                }
4365                Frame::ResetStream(frame) => {
4366                    if self.streams.received_reset(frame)?.should_transmit() {
4367                        self.spaces[SpaceId::Data].pending.max_data = true;
4368                    }
4369                }
4370                Frame::DataBlocked { offset } => {
4371                    debug!(offset, "peer claims to be blocked at connection level");
4372                }
4373                Frame::StreamDataBlocked { id, offset } => {
4374                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4375                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4376                        return Err(TransportError::STREAM_STATE_ERROR(
4377                            "STREAM_DATA_BLOCKED on send-only stream",
4378                        ));
4379                    }
4380                    debug!(
4381                        stream = %id,
4382                        offset, "peer claims to be blocked at stream level"
4383                    );
4384                }
4385                Frame::StreamsBlocked { dir, limit } => {
4386                    if limit > MAX_STREAM_COUNT {
4387                        return Err(TransportError::FRAME_ENCODING_ERROR(
4388                            "unrepresentable stream limit",
4389                        ));
4390                    }
4391                    debug!(
4392                        "peer claims to be blocked opening more than {} {} streams",
4393                        limit, dir
4394                    );
4395                }
4396                Frame::StopSending(frame::StopSending { id, error_code }) => {
4397                    if id.initiator() != self.side.side() {
4398                        if id.dir() == Dir::Uni {
4399                            debug!("got STOP_SENDING on recv-only {}", id);
4400                            return Err(TransportError::STREAM_STATE_ERROR(
4401                                "STOP_SENDING on recv-only stream",
4402                            ));
4403                        }
4404                    } else if self.streams.is_local_unopened(id) {
4405                        return Err(TransportError::STREAM_STATE_ERROR(
4406                            "STOP_SENDING on unopened stream",
4407                        ));
4408                    }
4409                    self.streams.received_stop_sending(id, error_code);
4410                }
4411                Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4412                    if let Some(ref path_id) = path_id {
4413                        span.record("path", tracing::field::debug(&path_id));
4414                    }
4415                    let path_id = path_id.unwrap_or_default();
4416                    match self.local_cid_state.get_mut(&path_id) {
4417                        None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4418                        Some(cid_state) => {
4419                            let allow_more_cids = cid_state
4420                                .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4421
4422                            // If the path has closed, we do not issue more CIDs for this path
4423                            // For details see  https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.2.2
4424                            // > an endpoint SHOULD provide new connection IDs for that path, if still open, using PATH_NEW_CONNECTION_ID frames.
4425                            let has_path = !self.abandoned_paths.contains(&path_id);
4426                            let allow_more_cids = allow_more_cids && has_path;
4427
4428                            self.endpoint_events
4429                                .push_back(EndpointEventInner::RetireConnectionId(
4430                                    now,
4431                                    path_id,
4432                                    sequence,
4433                                    allow_more_cids,
4434                                ));
4435                        }
4436                    }
4437                }
4438                Frame::NewConnectionId(frame) => {
4439                    let path_id = if let Some(path_id) = frame.path_id {
4440                        if !self.is_multipath_negotiated() {
4441                            return Err(TransportError::PROTOCOL_VIOLATION(
4442                                "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4443                            ));
4444                        }
4445                        if path_id > self.local_max_path_id {
4446                            return Err(TransportError::PROTOCOL_VIOLATION(
4447                                "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4448                            ));
4449                        }
4450                        path_id
4451                    } else {
4452                        PathId::ZERO
4453                    };
4454
4455                    if self.abandoned_paths.contains(&path_id) {
4456                        trace!("ignoring issued CID for abandoned path");
4457                        continue;
4458                    }
4459                    if let Some(ref path_id) = frame.path_id {
4460                        span.record("path", tracing::field::debug(&path_id));
4461                    }
4462                    let rem_cids = self
4463                        .rem_cids
4464                        .entry(path_id)
4465                        .or_insert_with(|| CidQueue::new(frame.id));
4466                    if rem_cids.active().is_empty() {
4467                        return Err(TransportError::PROTOCOL_VIOLATION(
4468                            "NEW_CONNECTION_ID when CIDs aren't in use",
4469                        ));
4470                    }
4471                    if frame.retire_prior_to > frame.sequence {
4472                        return Err(TransportError::PROTOCOL_VIOLATION(
4473                            "NEW_CONNECTION_ID retiring unissued CIDs",
4474                        ));
4475                    }
4476
4477                    use crate::cid_queue::InsertError;
4478                    match rem_cids.insert(frame) {
4479                        Ok(None) if self.path(path_id).is_none() => {
4480                            // if this gives us CIDs to open a new path and a nat traversal attempt
4481                            // is underway we could try to probe a pending remote
4482                            self.continue_nat_traversal_round(now);
4483                        }
4484                        Ok(None) => {}
4485                        Ok(Some((retired, reset_token))) => {
4486                            let pending_retired =
4487                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
4488                            /// Ensure `pending_retired` cannot grow without bound. Limit is
4489                            /// somewhat arbitrary but very permissive.
4490                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4491                            // We don't bother counting in-flight frames because those are bounded
4492                            // by congestion control.
4493                            if (pending_retired.len() as u64)
4494                                .saturating_add(retired.end.saturating_sub(retired.start))
4495                                > MAX_PENDING_RETIRED_CIDS
4496                            {
4497                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4498                                    "queued too many retired CIDs",
4499                                ));
4500                            }
4501                            pending_retired.extend(retired.map(|seq| (path_id, seq)));
4502                            // TODO(matheus23): Reset token for a remote or a full 4-tuple?
4503                            self.set_reset_token(path_id, network_path.remote, reset_token);
4504                        }
4505                        Err(InsertError::ExceedsLimit) => {
4506                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4507                        }
4508                        Err(InsertError::Retired) => {
4509                            trace!("discarding already-retired");
4510                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
4511                            // range of connection IDs larger than the active connection ID limit
4512                            // was retired all at once via retire_prior_to.
4513                            self.spaces[SpaceId::Data]
4514                                .pending
4515                                .retire_cids
4516                                .push((path_id, frame.sequence));
4517                            continue;
4518                        }
4519                    };
4520
4521                    if self.side.is_server()
4522                        && path_id == PathId::ZERO
4523                        && self
4524                            .rem_cids
4525                            .get(&PathId::ZERO)
4526                            .map(|cids| cids.active_seq() == 0)
4527                            .unwrap_or_default()
4528                    {
4529                        // We're a server still using the initial remote CID for the client, so
4530                        // let's switch immediately to enable clientside stateless resets.
4531                        self.update_rem_cid(PathId::ZERO);
4532                    }
4533                }
4534                Frame::NewToken(NewToken { token }) => {
4535                    let ConnectionSide::Client {
4536                        token_store,
4537                        server_name,
4538                        ..
4539                    } = &self.side
4540                    else {
4541                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4542                    };
4543                    if token.is_empty() {
4544                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4545                    }
4546                    trace!("got new token");
4547                    token_store.insert(server_name, token);
4548                }
4549                Frame::Datagram(datagram) => {
4550                    if self
4551                        .datagrams
4552                        .received(datagram, &self.config.datagram_receive_buffer_size)?
4553                    {
4554                        self.events.push_back(Event::DatagramReceived);
4555                    }
4556                }
4557                Frame::AckFrequency(ack_frequency) => {
4558                    // This frame can only be sent in the Data space
4559
4560                    if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4561                        // The AckFrequency frame is stale (we have already received a more
4562                        // recent one)
4563                        continue;
4564                    }
4565
4566                    // Update the params for all of our paths
4567                    for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4568                        space.pending_acks.set_ack_frequency_params(&ack_frequency);
4569
4570                        // Our `max_ack_delay` has been updated, so we may need to adjust
4571                        // its associated timeout
4572                        if let Some(timeout) = space
4573                            .pending_acks
4574                            .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4575                        {
4576                            self.timers.set(
4577                                Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4578                                timeout,
4579                                self.qlog.with_time(now),
4580                            );
4581                        }
4582                    }
4583                }
4584                Frame::ImmediateAck => {
4585                    // This frame can only be sent in the Data space
4586                    for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4587                        pns.pending_acks.set_immediate_ack_required();
4588                    }
4589                }
4590                Frame::HandshakeDone => {
4591                    if self.side.is_server() {
4592                        return Err(TransportError::PROTOCOL_VIOLATION(
4593                            "client sent HANDSHAKE_DONE",
4594                        ));
4595                    }
4596                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
4597                        self.discard_space(now, SpaceId::Handshake);
4598                    }
4599                    self.events.push_back(Event::HandshakeConfirmed);
4600                    trace!("handshake confirmed");
4601                }
4602                Frame::ObservedAddr(observed) => {
4603                    // check if params allows the peer to send report and this node to receive it
4604                    trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4605                    if !self
4606                        .peer_params
4607                        .address_discovery_role
4608                        .should_report(&self.config.address_discovery_role)
4609                    {
4610                        return Err(TransportError::PROTOCOL_VIOLATION(
4611                            "received OBSERVED_ADDRESS frame when not negotiated",
4612                        ));
4613                    }
4614                    // must only be sent in data space
4615                    if packet.header.space() != SpaceId::Data {
4616                        return Err(TransportError::PROTOCOL_VIOLATION(
4617                            "OBSERVED_ADDRESS frame outside data space",
4618                        ));
4619                    }
4620
4621                    let path = self.path_data_mut(path_id);
4622                    if network_path == path.network_path {
4623                        if let Some(updated) = path.update_observed_addr_report(observed) {
4624                            if path.open {
4625                                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4626                                    id: path_id,
4627                                    addr: updated,
4628                                }));
4629                            }
4630                            // otherwise the event is reported when the path is deemed open
4631                        }
4632                    } else {
4633                        // include in migration
4634                        migration_observed_addr = Some(observed)
4635                    }
4636                }
4637                Frame::PathAbandon(frame::PathAbandon {
4638                    path_id,
4639                    error_code,
4640                }) => {
4641                    span.record("path", tracing::field::debug(&path_id));
4642                    // TODO(flub): don't really know which error code to use here.
4643                    let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4644                        Ok(()) => {
4645                            trace!("peer abandoned path");
4646                            false
4647                        }
4648                        Err(ClosePathError::LastOpenPath) => {
4649                            trace!("peer abandoned last path, closing connection");
4650                            // TODO(flub): which error code?
4651                            return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4652                        }
4653                        Err(ClosePathError::ClosedPath) => {
4654                            trace!("peer abandoned already closed path");
4655                            true
4656                        }
4657                    };
4658                    // If we receive a retransmit of PATH_ABANDON then we may already have
4659                    // abandoned this path locally.  In that case the DiscardPath timer
4660                    // may already have fired and we no longer have any state for this path.
4661                    // Only set this timer if we still have path state.
4662                    if self.path(path_id).is_some() && !already_abandoned {
4663                        // TODO(flub): Checking is_some() here followed by a number of calls
4664                        //    that would panic if it was None is really ugly.  If only we
4665                        //    could do something like PathData::pto().  One day we'll have
4666                        //    unified SpaceId and PathId and this will be possible.
4667                        let delay = self.pto(SpaceId::Data, path_id) * 3;
4668                        self.timers.set(
4669                            Timer::PerPath(path_id, PathTimer::DiscardPath),
4670                            now + delay,
4671                            self.qlog.with_time(now),
4672                        );
4673                    }
4674                }
4675                Frame::PathStatusAvailable(info) => {
4676                    span.record("path", tracing::field::debug(&info.path_id));
4677                    if self.is_multipath_negotiated() {
4678                        self.on_path_status(
4679                            info.path_id,
4680                            PathStatus::Available,
4681                            info.status_seq_no,
4682                        );
4683                    } else {
4684                        return Err(TransportError::PROTOCOL_VIOLATION(
4685                            "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4686                        ));
4687                    }
4688                }
4689                Frame::PathStatusBackup(info) => {
4690                    span.record("path", tracing::field::debug(&info.path_id));
4691                    if self.is_multipath_negotiated() {
4692                        self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4693                    } else {
4694                        return Err(TransportError::PROTOCOL_VIOLATION(
4695                            "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4696                        ));
4697                    }
4698                }
4699                Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4700                    span.record("path", tracing::field::debug(&path_id));
4701                    if !self.is_multipath_negotiated() {
4702                        return Err(TransportError::PROTOCOL_VIOLATION(
4703                            "received MAX_PATH_ID frame when multipath was not negotiated",
4704                        ));
4705                    }
4706                    // frames that do not increase the path id are ignored
4707                    if path_id > self.remote_max_path_id {
4708                        self.remote_max_path_id = path_id;
4709                        self.issue_first_path_cids(now);
4710                        while let Some(true) = self.continue_nat_traversal_round(now) {}
4711                    }
4712                }
4713                Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4714                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4715                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4716                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4717                    if self.is_multipath_negotiated() {
4718                        if max_path_id > self.local_max_path_id {
4719                            return Err(TransportError::PROTOCOL_VIOLATION(
4720                                "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4721                            ));
4722                        }
4723                        debug!("received PATHS_BLOCKED({:?})", max_path_id);
4724                        // TODO(@divma): ensure max concurrent paths
4725                    } else {
4726                        return Err(TransportError::PROTOCOL_VIOLATION(
4727                            "received PATHS_BLOCKED frame when not multipath was not negotiated",
4728                        ));
4729                    }
4730                }
4731                Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4732                    // Nothing to do.  This is recorded in the frame stats, but otherwise we
4733                    // always issue all CIDs we're allowed to issue, so either this is an
4734                    // impatient peer or a bug on our side.
4735
4736                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4737                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4738                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4739                    if self.is_multipath_negotiated() {
4740                        if path_id > self.local_max_path_id {
4741                            return Err(TransportError::PROTOCOL_VIOLATION(
4742                                "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4743                            ));
4744                        }
4745                        if next_seq.0
4746                            > self
4747                                .local_cid_state
4748                                .get(&path_id)
4749                                .map(|cid_state| cid_state.active_seq().1 + 1)
4750                                .unwrap_or_default()
4751                        {
4752                            return Err(TransportError::PROTOCOL_VIOLATION(
4753                                "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4754                            ));
4755                        }
4756                        debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4757                    } else {
4758                        return Err(TransportError::PROTOCOL_VIOLATION(
4759                            "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4760                        ));
4761                    }
4762                }
4763                Frame::AddAddress(addr) => {
4764                    let client_state = match self.iroh_hp.client_side_mut() {
4765                        Ok(state) => state,
4766                        Err(err) => {
4767                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4768                                "Nat traversal(ADD_ADDRESS): {err}"
4769                            )));
4770                        }
4771                    };
4772
4773                    if !client_state.check_remote_address(&addr) {
4774                        // if the address is not valid we flag it, but update anyway
4775                        warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4776                    }
4777
4778                    match client_state.add_remote_address(addr) {
4779                        Ok(maybe_added) => {
4780                            if let Some(added) = maybe_added {
4781                                self.events.push_back(Event::NatTraversal(
4782                                    iroh_hp::Event::AddressAdded(added),
4783                                ));
4784                            }
4785                        }
4786                        Err(e) => {
4787                            warn!(%e, "failed to add remote address")
4788                        }
4789                    }
4790                }
4791                Frame::RemoveAddress(addr) => {
4792                    let client_state = match self.iroh_hp.client_side_mut() {
4793                        Ok(state) => state,
4794                        Err(err) => {
4795                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4796                                "Nat traversal(REMOVE_ADDRESS): {err}"
4797                            )));
4798                        }
4799                    };
4800                    if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4801                        self.events
4802                            .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4803                                removed_addr,
4804                            )));
4805                    }
4806                }
4807                Frame::ReachOut(reach_out) => {
4808                    let server_state = match self.iroh_hp.server_side_mut() {
4809                        Ok(state) => state,
4810                        Err(err) => {
4811                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4812                                "Nat traversal(REACH_OUT): {err}"
4813                            )));
4814                        }
4815                    };
4816
4817                    if let Err(err) = server_state.handle_reach_out(reach_out) {
4818                        return Err(TransportError::PROTOCOL_VIOLATION(format!(
4819                            "Nat traversal(REACH_OUT): {err}"
4820                        )));
4821                    }
4822                }
4823            }
4824        }
4825
4826        let space = self.spaces[SpaceId::Data].for_path(path_id);
4827        if space
4828            .pending_acks
4829            .packet_received(now, number, ack_eliciting, &space.dedup)
4830        {
4831            if self.abandoned_paths.contains(&path_id) {
4832                // § 3.4.3 QUIC-MULTIPATH: promptly send ACKs for packets received from
4833                // abandoned paths.
4834                space.pending_acks.set_immediate_ack_required();
4835            } else {
4836                self.timers.set(
4837                    Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4838                    now + self.ack_frequency.max_ack_delay,
4839                    self.qlog.with_time(now),
4840                );
4841            }
4842        }
4843
4844        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
4845        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
4846        // are only freed, and hence only issue credit, once the application has been notified
4847        // during a read on the stream.
4848        let pending = &mut self.spaces[SpaceId::Data].pending;
4849        self.streams.queue_max_stream_id(pending);
4850
4851        if let Some(reason) = close {
4852            self.state.move_to_draining(Some(reason.into()));
4853            self.close = true;
4854        }
4855
4856        if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4857            && !is_probing_packet
4858            && network_path != self.path_data(path_id).network_path
4859        {
4860            let ConnectionSide::Server { ref server_config } = self.side else {
4861                panic!("packets from unknown remote should be dropped by clients");
4862            };
4863            debug_assert!(
4864                server_config.migration,
4865                "migration-initiating packets should have been dropped immediately"
4866            );
4867            self.migrate(path_id, now, network_path, migration_observed_addr);
4868            // Break linkability, if possible
4869            self.update_rem_cid(path_id);
4870            self.spin = false;
4871        }
4872
4873        Ok(())
4874    }
4875
4876    fn migrate(
4877        &mut self,
4878        path_id: PathId,
4879        now: Instant,
4880        network_path: FourTuple,
4881        observed_addr: Option<ObservedAddr>,
4882    ) {
4883        trace!(%network_path, %path_id, "migration initiated");
4884        self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
4885        // TODO(@divma): conditions for path migration in multipath are very specific, check them
4886        // again to prevent path migrations that should actually create a new path
4887
4888        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
4889        // Note that the congestion window will not grow until validation terminates. Helps mitigate
4890        // amplification attacks performed by spoofing source addresses.
4891        let prev_pto = self.pto(SpaceId::Data, path_id);
4892        let known_path = self.paths.get_mut(&path_id).expect("known path");
4893        let path = &mut known_path.data;
4894        let mut new_path = if network_path.remote.is_ipv4()
4895            && network_path.remote.ip() == path.network_path.remote.ip()
4896        {
4897            PathData::from_previous(network_path, path, self.path_generation_counter, now)
4898        } else {
4899            let peer_max_udp_payload_size =
4900                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4901                    .unwrap_or(u16::MAX);
4902            PathData::new(
4903                network_path,
4904                self.allow_mtud,
4905                Some(peer_max_udp_payload_size),
4906                self.path_generation_counter,
4907                now,
4908                &self.config,
4909            )
4910        };
4911        new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4912        if let Some(report) = observed_addr {
4913            if let Some(updated) = new_path.update_observed_addr_report(report) {
4914                tracing::info!("adding observed addr event from migration");
4915                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4916                    id: path_id,
4917                    addr: updated,
4918                }));
4919            }
4920        }
4921        new_path.send_new_challenge = true;
4922
4923        let mut prev = mem::replace(path, new_path);
4924        // Don't clobber the original path if the previous one hasn't been validated yet
4925        if !prev.is_validating_path() {
4926            prev.send_new_challenge = true;
4927            // We haven't updated the remote CID yet, this captures the remote CID we were using on
4928            // the previous path.
4929
4930            known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4931        }
4932
4933        // We need to re-assign the correct remote to this path in qlog
4934        self.qlog.emit_tuple_assigned(path_id, network_path, now);
4935
4936        self.timers.set(
4937            Timer::PerPath(path_id, PathTimer::PathValidation),
4938            now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4939            self.qlog.with_time(now),
4940        );
4941    }
4942
4943    /// Handle a change in the local address, i.e. an active migration
4944    pub fn local_address_changed(&mut self) {
4945        // TODO(flub): if multipath is enabled this needs to create a new path entirely.
4946        self.update_rem_cid(PathId::ZERO);
4947        self.ping();
4948    }
4949
4950    /// Switch to a previously unused remote connection ID, if possible
4951    fn update_rem_cid(&mut self, path_id: PathId) {
4952        let Some((reset_token, retired)) =
4953            self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4954        else {
4955            return;
4956        };
4957
4958        // Retire the current remote CID and any CIDs we had to skip.
4959        self.spaces[SpaceId::Data]
4960            .pending
4961            .retire_cids
4962            .extend(retired.map(|seq| (path_id, seq)));
4963        let remote = self.path_data(path_id).network_path.remote;
4964        self.set_reset_token(path_id, remote, reset_token);
4965    }
4966
4967    /// Sends this reset token to the endpoint
4968    ///
4969    /// The endpoint needs to know the reset tokens issued by the peer, so that if the peer
4970    /// sends a reset token it knows to route it to this connection. See RFC 9000 section
4971    /// 10.3. Stateless Reset.
4972    ///
4973    /// Reset tokens are different for each path, the endpoint identifies paths by peer
4974    /// socket address however, not by path ID.
4975    fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4976        self.endpoint_events
4977            .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4978
4979        // During the handshake the server sends a reset token in the transport
4980        // parameters. When we are the client and we receive the reset token during the
4981        // handshake we want this to affect our peer transport parameters.
4982        // TODO(flub): Pretty sure this is pointless, the entire params is overwritten
4983        //    shortly after this was called.  And then the params don't have this anymore.
4984        if path_id == PathId::ZERO {
4985            self.peer_params.stateless_reset_token = Some(reset_token);
4986        }
4987    }
4988
4989    /// Issue an initial set of connection IDs to the peer upon connection
4990    fn issue_first_cids(&mut self, now: Instant) {
4991        if self
4992            .local_cid_state
4993            .get(&PathId::ZERO)
4994            .expect("PathId::ZERO exists when the connection is created")
4995            .cid_len()
4996            == 0
4997        {
4998            return;
4999        }
5000
5001        // Subtract 1 to account for the CID we supplied while handshaking
5002        let mut n = self.peer_params.issue_cids_limit() - 1;
5003        if let ConnectionSide::Server { server_config } = &self.side {
5004            if server_config.has_preferred_address() {
5005                // We also sent a CID in the transport parameters
5006                n -= 1;
5007            }
5008        }
5009        self.endpoint_events
5010            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5011    }
5012
5013    /// Issues an initial set of CIDs for paths that have not yet had any CIDs issued
5014    ///
5015    /// Later CIDs are issued when CIDs expire or are retired by the peer.
5016    fn issue_first_path_cids(&mut self, now: Instant) {
5017        if let Some(max_path_id) = self.max_path_id() {
5018            let mut path_id = self.max_path_id_with_cids.next();
5019            while path_id <= max_path_id {
5020                self.endpoint_events
5021                    .push_back(EndpointEventInner::NeedIdentifiers(
5022                        path_id,
5023                        now,
5024                        self.peer_params.issue_cids_limit(),
5025                    ));
5026                path_id = path_id.next();
5027            }
5028            self.max_path_id_with_cids = max_path_id;
5029        }
5030    }
5031
5032    /// Populates a packet with frames
5033    ///
5034    /// This tries to fit as many frames as possible into the packet.
5035    ///
5036    /// *path_exclusive_only* means to only build frames which can only be sent on this
5037    /// *path.  This is used in multipath for backup paths while there is still an active
5038    /// *path.
5039    fn populate_packet<'a, 'b>(
5040        &mut self,
5041        now: Instant,
5042        space_id: SpaceId,
5043        path_id: PathId,
5044        path_exclusive_only: bool,
5045        builder: &mut PacketBuilder<'a, 'b>,
5046    ) {
5047        let pn = builder.exact_number;
5048        let is_multipath_negotiated = self.is_multipath_negotiated();
5049        let stats = &mut self.stats.frame_tx;
5050        let space = &mut self.spaces[space_id];
5051        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5052        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5053        space
5054            .for_path(path_id)
5055            .pending_acks
5056            .maybe_ack_non_eliciting();
5057
5058        // HANDSHAKE_DONE
5059        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5060            builder.write_frame(frame::HandshakeDone, stats);
5061        }
5062
5063        // REACH_OUT
5064        // TODO(@divma): path explusive considerations
5065        if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5066            while let Some(local_addr) = addresses.pop() {
5067                let reach_out = frame::ReachOut::new(*round, local_addr);
5068                if builder.frame_space_remaining() > reach_out.size() {
5069                    builder.write_frame(reach_out, stats);
5070                } else {
5071                    addresses.push(local_addr);
5072                    break;
5073                }
5074            }
5075            if addresses.is_empty() {
5076                space.pending.reach_out = None;
5077            }
5078        }
5079
5080        // OBSERVED_ADDR
5081        if !path_exclusive_only
5082            && space_id == SpaceId::Data
5083            && self
5084                .config
5085                .address_discovery_role
5086                .should_report(&self.peer_params.address_discovery_role)
5087            && (!path.observed_addr_sent || space.pending.observed_addr)
5088        {
5089            let frame =
5090                frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
5091            if builder.frame_space_remaining() > frame.size() {
5092                builder.write_frame(frame, stats);
5093
5094                self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5095                path.observed_addr_sent = true;
5096
5097                space.pending.observed_addr = false;
5098            }
5099        }
5100
5101        // PING
5102        if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5103            builder.write_frame(frame::Ping, stats);
5104        }
5105
5106        // IMMEDIATE_ACK
5107        if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5108            debug_assert_eq!(
5109                space_id,
5110                SpaceId::Data,
5111                "immediate acks must be sent in the data space"
5112            );
5113            builder.write_frame(frame::ImmediateAck, stats);
5114        }
5115
5116        // ACK
5117        // TODO(flub): Should this send acks for this path anyway?
5118
5119        if !path_exclusive_only {
5120            for path_id in space
5121                .number_spaces
5122                .iter_mut()
5123                .filter(|(_, pns)| pns.pending_acks.can_send())
5124                .map(|(&path_id, _)| path_id)
5125                .collect::<Vec<_>>()
5126            {
5127                Self::populate_acks(
5128                    now,
5129                    self.receiving_ecn,
5130                    path_id,
5131                    space_id,
5132                    space,
5133                    is_multipath_negotiated,
5134                    builder,
5135                    stats,
5136                );
5137            }
5138        }
5139
5140        // ACK_FREQUENCY
5141        if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5142            let sequence_number = self.ack_frequency.next_sequence_number();
5143
5144            // Safe to unwrap because this is always provided when ACK frequency is enabled
5145            let config = self.config.ack_frequency_config.as_ref().unwrap();
5146
5147            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
5148            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5149                path.rtt.get(),
5150                config,
5151                &self.peer_params,
5152            );
5153
5154            let frame = frame::AckFrequency {
5155                sequence: sequence_number,
5156                ack_eliciting_threshold: config.ack_eliciting_threshold,
5157                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5158                reordering_threshold: config.reordering_threshold,
5159            };
5160            builder.write_frame(frame, stats);
5161
5162            self.ack_frequency
5163                .ack_frequency_sent(path_id, pn, max_ack_delay);
5164        }
5165
5166        // PATH_CHALLENGE
5167        if builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
5168            && space_id == SpaceId::Data
5169            && path.send_new_challenge
5170            && !self.state.is_closed()
5171        // we don't want to send new challenges if we are already closing
5172        {
5173            path.send_new_challenge = false;
5174
5175            // Generate a new challenge every time we send a new PATH_CHALLENGE
5176            let token = self.rng.random();
5177            let info = paths::SentChallengeInfo {
5178                sent_instant: now,
5179                network_path: path.network_path,
5180            };
5181            path.challenges_sent.insert(token, info);
5182            let challenge = frame::PathChallenge(token);
5183            trace!(frame = %challenge);
5184            builder.write_frame(challenge, stats);
5185            builder.require_padding();
5186            let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5187            self.timers.set(
5188                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5189                now + pto,
5190                self.qlog.with_time(now),
5191            );
5192
5193            if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5194                // queue informing the path status along with the challenge
5195                space.pending.path_status.insert(path_id);
5196            }
5197
5198            // Always include an OBSERVED_ADDR frame with a PATH_CHALLENGE, regardless
5199            // of whether one has already been sent on this path.
5200            if space_id == SpaceId::Data
5201                && self
5202                    .config
5203                    .address_discovery_role
5204                    .should_report(&self.peer_params.address_discovery_role)
5205            {
5206                let frame = frame::ObservedAddr::new(
5207                    path.network_path.remote,
5208                    self.next_observed_addr_seq_no,
5209                );
5210                if builder.frame_space_remaining() > frame.size() {
5211                    builder.write_frame(frame, stats);
5212
5213                    self.next_observed_addr_seq_no =
5214                        self.next_observed_addr_seq_no.saturating_add(1u8);
5215                    path.observed_addr_sent = true;
5216
5217                    space.pending.observed_addr = false;
5218                }
5219            }
5220        }
5221
5222        // PATH_RESPONSE
5223        if builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
5224            && space_id == SpaceId::Data
5225        {
5226            if let Some(token) = path.path_responses.pop_on_path(path.network_path) {
5227                let response = frame::PathResponse(token);
5228                trace!(frame = %response);
5229                builder.write_frame(response, stats);
5230                builder.require_padding();
5231
5232                // NOTE: this is technically not required but might be useful to ride the
5233                // request/response nature of path challenges to refresh an observation
5234                // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
5235                if space_id == SpaceId::Data
5236                    && self
5237                        .config
5238                        .address_discovery_role
5239                        .should_report(&self.peer_params.address_discovery_role)
5240                {
5241                    let frame = frame::ObservedAddr::new(
5242                        path.network_path.remote,
5243                        self.next_observed_addr_seq_no,
5244                    );
5245                    if builder.frame_space_remaining() > frame.size() {
5246                        builder.write_frame(frame, stats);
5247
5248                        self.next_observed_addr_seq_no =
5249                            self.next_observed_addr_seq_no.saturating_add(1u8);
5250                        path.observed_addr_sent = true;
5251
5252                        space.pending.observed_addr = false;
5253                    }
5254                }
5255            }
5256        }
5257
5258        // CRYPTO
5259        while !path_exclusive_only
5260            && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
5261            && !is_0rtt
5262        {
5263            let mut frame = match space.pending.crypto.pop_front() {
5264                Some(x) => x,
5265                None => break,
5266            };
5267
5268            // Calculate the maximum amount of crypto data we can store in the buffer.
5269            // Since the offset is known, we can reserve the exact size required to encode it.
5270            // For length we reserve 2bytes which allows to encode up to 2^14,
5271            // which is more than what fits into normally sized QUIC frames.
5272            let max_crypto_data_size = builder.frame_space_remaining()
5273                - 1 // Frame Type
5274                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5275                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
5276
5277            let len = frame
5278                .data
5279                .len()
5280                .min(2usize.pow(14) - 1)
5281                .min(max_crypto_data_size);
5282
5283            let data = frame.data.split_to(len);
5284            let offset = frame.offset;
5285            let truncated = frame::Crypto { offset, data };
5286            builder.write_frame(truncated, stats);
5287
5288            if !frame.data.is_empty() {
5289                frame.offset += len as u64;
5290                space.pending.crypto.push_front(frame);
5291            }
5292        }
5293
5294        // TODO(flub): maybe this is much higher priority?
5295        // PATH_ABANDON
5296        while !path_exclusive_only
5297            && space_id == SpaceId::Data
5298            && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5299        {
5300            let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5301                break;
5302            };
5303            let frame = frame::PathAbandon {
5304                path_id,
5305                error_code,
5306            };
5307            builder.write_frame(frame, stats);
5308        }
5309
5310        // PATH_STATUS_AVAILABLE & PATH_STATUS_BACKUP
5311        while !path_exclusive_only
5312            && space_id == SpaceId::Data
5313            && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
5314        {
5315            let Some(path_id) = space.pending.path_status.pop_first() else {
5316                break;
5317            };
5318            let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5319                trace!(%path_id, "discarding queued path status for unknown path");
5320                continue;
5321            };
5322
5323            let seq = path.status.seq();
5324            match path.local_status() {
5325                PathStatus::Available => {
5326                    let frame = frame::PathStatusAvailable {
5327                        path_id,
5328                        status_seq_no: seq,
5329                    };
5330                    builder.write_frame(frame, stats);
5331                }
5332                PathStatus::Backup => {
5333                    let frame = frame::PathStatusBackup {
5334                        path_id,
5335                        status_seq_no: seq,
5336                    };
5337                    builder.write_frame(frame, stats);
5338                }
5339            }
5340        }
5341
5342        // MAX_PATH_ID
5343        if space_id == SpaceId::Data
5344            && space.pending.max_path_id
5345            && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
5346        {
5347            let frame = frame::MaxPathId(self.local_max_path_id);
5348            builder.write_frame(frame, stats);
5349            space.pending.max_path_id = false;
5350        }
5351
5352        // PATHS_BLOCKED
5353        if space_id == SpaceId::Data
5354            && space.pending.paths_blocked
5355            && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5356        {
5357            let frame = frame::PathsBlocked(self.remote_max_path_id);
5358            builder.write_frame(frame, stats);
5359            space.pending.paths_blocked = false;
5360        }
5361
5362        // PATH_CIDS_BLOCKED
5363        while space_id == SpaceId::Data
5364            && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5365        {
5366            let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
5367                break;
5368            };
5369            let next_seq = match self.rem_cids.get(&path_id) {
5370                Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
5371                None => VarInt(0),
5372            };
5373            let frame = frame::PathCidsBlocked { path_id, next_seq };
5374            builder.write_frame(frame, stats);
5375        }
5376
5377        // RESET_STREAM, STOP_SENDING, MAX_DATA, MAX_STREAM_DATA, MAX_STREAMS
5378        if space_id == SpaceId::Data {
5379            self.streams
5380                .write_control_frames(builder, &mut space.pending, stats);
5381        }
5382
5383        // NEW_CONNECTION_ID
5384        let cid_len = self
5385            .local_cid_state
5386            .values()
5387            .map(|cid_state| cid_state.cid_len())
5388            .max()
5389            .expect("some local CID state must exist");
5390        let new_cid_size_bound =
5391            frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5392        while !path_exclusive_only && builder.frame_space_remaining() > new_cid_size_bound {
5393            let issued = match space.pending.new_cids.pop() {
5394                Some(x) => x,
5395                None => break,
5396            };
5397            let retire_prior_to = self
5398                .local_cid_state
5399                .get(&issued.path_id)
5400                .map(|cid_state| cid_state.retire_prior_to())
5401                .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5402
5403            let cid_path_id = match is_multipath_negotiated {
5404                true => Some(issued.path_id),
5405                false => {
5406                    debug_assert_eq!(issued.path_id, PathId::ZERO);
5407                    None
5408                }
5409            };
5410            let frame = frame::NewConnectionId {
5411                path_id: cid_path_id,
5412                sequence: issued.sequence,
5413                retire_prior_to,
5414                id: issued.id,
5415                reset_token: issued.reset_token,
5416            };
5417            builder.write_frame(frame, stats);
5418        }
5419
5420        // RETIRE_CONNECTION_ID
5421        let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5422        while !path_exclusive_only && builder.frame_space_remaining() > retire_cid_bound {
5423            let (path_id, sequence) = match space.pending.retire_cids.pop() {
5424                Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
5425                Some((path_id, seq)) => (Some(path_id), seq),
5426                None => break,
5427            };
5428            let frame = frame::RetireConnectionId { path_id, sequence };
5429            builder.write_frame(frame, stats);
5430        }
5431
5432        // DATAGRAM
5433        let mut sent_datagrams = false;
5434        while !path_exclusive_only
5435            && builder.frame_space_remaining() > Datagram::SIZE_BOUND
5436            && space_id == SpaceId::Data
5437        {
5438            match self.datagrams.write(builder, stats) {
5439                true => {
5440                    sent_datagrams = true;
5441                }
5442                false => break,
5443            }
5444        }
5445        if self.datagrams.send_blocked && sent_datagrams {
5446            self.events.push_back(Event::DatagramsUnblocked);
5447            self.datagrams.send_blocked = false;
5448        }
5449
5450        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5451
5452        // NEW_TOKEN
5453        while let Some(network_path) = space.pending.new_tokens.pop() {
5454            if path_exclusive_only {
5455                break;
5456            }
5457            debug_assert_eq!(space_id, SpaceId::Data);
5458            let ConnectionSide::Server { server_config } = &self.side else {
5459                panic!("NEW_TOKEN frames should not be enqueued by clients");
5460            };
5461
5462            if !network_path.is_probably_same_path(&path.network_path) {
5463                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
5464                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
5465                // frames upon an path change. Instead, when the new path becomes validated,
5466                // NEW_TOKEN frames may be enqueued for the new path instead.
5467                continue;
5468            }
5469
5470            let token = Token::new(
5471                TokenPayload::Validation {
5472                    ip: network_path.remote.ip(),
5473                    issued: server_config.time_source.now(),
5474                },
5475                &mut self.rng,
5476            );
5477            let new_token = NewToken {
5478                token: token.encode(&*server_config.token_key).into(),
5479            };
5480
5481            if builder.frame_space_remaining() < new_token.size() {
5482                space.pending.new_tokens.push(network_path);
5483                break;
5484            }
5485
5486            builder.write_frame(new_token, stats);
5487            builder.retransmits_mut().new_tokens.push(network_path);
5488        }
5489
5490        // STREAM
5491        if !path_exclusive_only && space_id == SpaceId::Data {
5492            self.streams
5493                .write_stream_frames(builder, self.config.send_fairness, stats);
5494        }
5495
5496        // ADD_ADDRESS
5497        // TODO(@divma): check if we need to do path exclusive filters
5498        while space_id == SpaceId::Data
5499            && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
5500        {
5501            if let Some(added_address) = space.pending.add_address.pop_last() {
5502                builder.write_frame(added_address, stats);
5503            } else {
5504                break;
5505            }
5506        }
5507
5508        // REMOVE_ADDRESS
5509        while space_id == SpaceId::Data
5510            && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
5511        {
5512            if let Some(removed_address) = space.pending.remove_address.pop_last() {
5513                builder.write_frame(removed_address, stats);
5514            } else {
5515                break;
5516            }
5517        }
5518    }
5519
5520    /// Write pending ACKs into a buffer
5521    fn populate_acks<'a, 'b>(
5522        now: Instant,
5523        receiving_ecn: bool,
5524        path_id: PathId,
5525        space_id: SpaceId,
5526        space: &mut PacketSpace,
5527        is_multipath_negotiated: bool,
5528        builder: &mut PacketBuilder<'a, 'b>,
5529        stats: &mut FrameStats,
5530    ) {
5531        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
5532        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5533
5534        debug_assert!(
5535            is_multipath_negotiated || path_id == PathId::ZERO,
5536            "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5537        );
5538        if is_multipath_negotiated {
5539            debug_assert!(
5540                space_id == SpaceId::Data || path_id == PathId::ZERO,
5541                "path acks must be sent in 1RTT space (have {space_id:?})"
5542            );
5543        }
5544
5545        let pns = space.for_path(path_id);
5546        let ranges = pns.pending_acks.ranges();
5547        debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5548        let ecn = if receiving_ecn {
5549            Some(&pns.ecn_counters)
5550        } else {
5551            None
5552        };
5553
5554        let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5555        // TODO: This should come from `TransportConfig` if that gets configurable.
5556        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5557        let delay = delay_micros >> ack_delay_exp.into_inner();
5558
5559        if is_multipath_negotiated && space_id == SpaceId::Data {
5560            if !ranges.is_empty() {
5561                let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
5562                builder.write_frame(frame, stats);
5563            }
5564        } else {
5565            builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
5566        }
5567    }
5568
5569    fn close_common(&mut self) {
5570        trace!("connection closed");
5571        self.timers.reset();
5572    }
5573
5574    fn set_close_timer(&mut self, now: Instant) {
5575        // QUIC-MULTIPATH § 2.6 Connection Closure: draining for 3*PTO with PTO the max of
5576        // the PTO for all paths.
5577        let pto_max = self.pto_max_path(self.highest_space, true);
5578        self.timers.set(
5579            Timer::Conn(ConnTimer::Close),
5580            now + 3 * pto_max,
5581            self.qlog.with_time(now),
5582        );
5583    }
5584
5585    /// Handle transport parameters received from the peer
5586    ///
5587    /// *rem_cid* and *loc_cid* are the source and destination CIDs respectively of the
5588    /// *packet into which the transport parameters arrived.
5589    fn handle_peer_params(
5590        &mut self,
5591        params: TransportParameters,
5592        loc_cid: ConnectionId,
5593        rem_cid: ConnectionId,
5594        now: Instant,
5595    ) -> Result<(), TransportError> {
5596        if Some(self.orig_rem_cid) != params.initial_src_cid
5597            || (self.side.is_client()
5598                && (Some(self.initial_dst_cid) != params.original_dst_cid
5599                    || self.retry_src_cid != params.retry_src_cid))
5600        {
5601            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5602                "CID authentication failure",
5603            ));
5604        }
5605        if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5606            return Err(TransportError::PROTOCOL_VIOLATION(
5607                "multipath must not use zero-length CIDs",
5608            ));
5609        }
5610
5611        self.set_peer_params(params);
5612        self.qlog.emit_peer_transport_params_received(self, now);
5613
5614        Ok(())
5615    }
5616
5617    fn set_peer_params(&mut self, params: TransportParameters) {
5618        self.streams.set_params(&params);
5619        self.idle_timeout =
5620            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5621        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5622
5623        if let Some(ref info) = params.preferred_address {
5624            // During the handshake PathId::ZERO exists.
5625            self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5626                path_id: None,
5627                sequence: 1,
5628                id: info.connection_id,
5629                reset_token: info.stateless_reset_token,
5630                retire_prior_to: 0,
5631            })
5632            .expect(
5633                "preferred address CID is the first received, and hence is guaranteed to be legal",
5634            );
5635            let remote = self.path_data(PathId::ZERO).network_path.remote;
5636            self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5637        }
5638        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
5639
5640        let mut multipath_enabled = None;
5641        if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5642            self.config.get_initial_max_path_id(),
5643            params.initial_max_path_id,
5644        ) {
5645            // multipath is enabled, register the local and remote maximums
5646            self.local_max_path_id = local_max_path_id;
5647            self.remote_max_path_id = remote_max_path_id;
5648            let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5649            debug!(%initial_max_path_id, "multipath negotiated");
5650            multipath_enabled = Some(initial_max_path_id);
5651        }
5652
5653        if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5654            self.config
5655                .max_remote_nat_traversal_addresses
5656                .zip(params.max_remote_nat_traversal_addresses)
5657        {
5658            if let Some(max_initial_paths) =
5659                multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5660            {
5661                let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5662                let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5663                self.iroh_hp =
5664                    iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5665                debug!(
5666                    %max_remote_addresses, %max_local_addresses,
5667                    "iroh hole punching negotiated"
5668                );
5669
5670                match self.side() {
5671                    Side::Client => {
5672                        if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5673                            // in this case the client might try to open `max_remote_addresses` new
5674                            // paths, but the current multipath configuration will not allow it
5675                            warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5676                        } else if max_local_addresses as u64
5677                            > params.active_connection_id_limit.into_inner()
5678                        {
5679                            // the server allows us to send at most `params.active_connection_id_limit`
5680                            // but they might need at least `max_local_addresses` to effectively send
5681                            // `PATH_CHALLENGE` frames to each advertised local address
5682                            warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5683                        }
5684                    }
5685                    Side::Server => {
5686                        if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5687                            warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5688                        }
5689                    }
5690                }
5691            } else {
5692                debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5693            }
5694        }
5695
5696        self.peer_params = params;
5697        let peer_max_udp_payload_size =
5698            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5699        self.path_data_mut(PathId::ZERO)
5700            .mtud
5701            .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5702    }
5703
5704    /// Decrypts a packet, returning the packet number on success
5705    fn decrypt_packet(
5706        &mut self,
5707        now: Instant,
5708        path_id: PathId,
5709        packet: &mut Packet,
5710    ) -> Result<Option<u64>, Option<TransportError>> {
5711        let result = packet_crypto::decrypt_packet_body(
5712            packet,
5713            path_id,
5714            &self.spaces,
5715            self.zero_rtt_crypto.as_ref(),
5716            self.key_phase,
5717            self.prev_crypto.as_ref(),
5718            self.next_crypto.as_ref(),
5719        )?;
5720
5721        let result = match result {
5722            Some(r) => r,
5723            None => return Ok(None),
5724        };
5725
5726        if result.outgoing_key_update_acked {
5727            if let Some(prev) = self.prev_crypto.as_mut() {
5728                prev.end_packet = Some((result.number, now));
5729                self.set_key_discard_timer(now, packet.header.space());
5730            }
5731        }
5732
5733        if result.incoming_key_update {
5734            trace!("key update authenticated");
5735            self.update_keys(Some((result.number, now)), true);
5736            self.set_key_discard_timer(now, packet.header.space());
5737        }
5738
5739        Ok(Some(result.number))
5740    }
5741
5742    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5743        trace!("executing key update");
5744        // Generate keys for the key phase after the one we're switching to, store them in
5745        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
5746        // `prev_crypto`.
5747        let new = self
5748            .crypto
5749            .next_1rtt_keys()
5750            .expect("only called for `Data` packets");
5751        self.key_phase_size = new
5752            .local
5753            .confidentiality_limit()
5754            .saturating_sub(KEY_UPDATE_MARGIN);
5755        let old = mem::replace(
5756            &mut self.spaces[SpaceId::Data]
5757                .crypto
5758                .as_mut()
5759                .unwrap() // safe because update_keys() can only be triggered by short packets
5760                .packet,
5761            mem::replace(self.next_crypto.as_mut().unwrap(), new),
5762        );
5763        self.spaces[SpaceId::Data]
5764            .iter_paths_mut()
5765            .for_each(|s| s.sent_with_keys = 0);
5766        self.prev_crypto = Some(PrevCrypto {
5767            crypto: old,
5768            end_packet,
5769            update_unacked: remote,
5770        });
5771        self.key_phase = !self.key_phase;
5772    }
5773
5774    fn peer_supports_ack_frequency(&self) -> bool {
5775        self.peer_params.min_ack_delay.is_some()
5776    }
5777
5778    /// Send an IMMEDIATE_ACK frame to the remote endpoint
5779    ///
5780    /// According to the spec, this will result in an error if the remote endpoint does not support
5781    /// the Acknowledgement Frequency extension
5782    pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5783        debug_assert_eq!(
5784            self.highest_space,
5785            SpaceId::Data,
5786            "immediate ack must be written in the data space"
5787        );
5788        self.spaces[self.highest_space]
5789            .for_path(path_id)
5790            .immediate_ack_pending = true;
5791    }
5792
5793    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
5794    #[cfg(test)]
5795    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5796        let (path_id, first_decode, remaining) = match &event.0 {
5797            ConnectionEventInner::Datagram(DatagramConnectionEvent {
5798                path_id,
5799                first_decode,
5800                remaining,
5801                ..
5802            }) => (path_id, first_decode, remaining),
5803            _ => return None,
5804        };
5805
5806        if remaining.is_some() {
5807            panic!("Packets should never be coalesced in tests");
5808        }
5809
5810        let decrypted_header = packet_crypto::unprotect_header(
5811            first_decode.clone(),
5812            &self.spaces,
5813            self.zero_rtt_crypto.as_ref(),
5814            self.peer_params.stateless_reset_token,
5815        )?;
5816
5817        let mut packet = decrypted_header.packet?;
5818        packet_crypto::decrypt_packet_body(
5819            &mut packet,
5820            *path_id,
5821            &self.spaces,
5822            self.zero_rtt_crypto.as_ref(),
5823            self.key_phase,
5824            self.prev_crypto.as_ref(),
5825            self.next_crypto.as_ref(),
5826        )
5827        .ok()?;
5828
5829        Some(packet.payload.to_vec())
5830    }
5831
5832    /// The number of bytes of packets containing retransmittable frames that have not been
5833    /// acknowledged or declared lost.
5834    #[cfg(test)]
5835    pub(crate) fn bytes_in_flight(&self) -> u64 {
5836        // TODO(@divma): consider including for multipath?
5837        self.path_data(PathId::ZERO).in_flight.bytes
5838    }
5839
5840    /// Number of bytes worth of non-ack-only packets that may be sent
5841    #[cfg(test)]
5842    pub(crate) fn congestion_window(&self) -> u64 {
5843        let path = self.path_data(PathId::ZERO);
5844        path.congestion
5845            .window()
5846            .saturating_sub(path.in_flight.bytes)
5847    }
5848
5849    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
5850    #[cfg(test)]
5851    pub(crate) fn is_idle(&self) -> bool {
5852        let current_timers = self.timers.values();
5853        current_timers
5854            .into_iter()
5855            .filter(|(timer, _)| {
5856                !matches!(
5857                    timer,
5858                    Timer::Conn(ConnTimer::KeepAlive)
5859                        | Timer::PerPath(_, PathTimer::PathKeepAlive)
5860                        | Timer::Conn(ConnTimer::PushNewCid)
5861                        | Timer::Conn(ConnTimer::KeyDiscard)
5862                )
5863            })
5864            .min_by_key(|(_, time)| *time)
5865            .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
5866    }
5867
5868    /// Whether explicit congestion notification is in use on outgoing packets.
5869    #[cfg(test)]
5870    pub(crate) fn using_ecn(&self) -> bool {
5871        self.path_data(PathId::ZERO).sending_ecn
5872    }
5873
5874    /// The number of received bytes in the current path
5875    #[cfg(test)]
5876    pub(crate) fn total_recvd(&self) -> u64 {
5877        self.path_data(PathId::ZERO).total_recvd
5878    }
5879
5880    #[cfg(test)]
5881    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5882        self.local_cid_state
5883            .get(&PathId::ZERO)
5884            .unwrap()
5885            .active_seq()
5886    }
5887
5888    #[cfg(test)]
5889    #[track_caller]
5890    pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
5891        self.local_cid_state
5892            .get(&PathId(path_id))
5893            .unwrap()
5894            .active_seq()
5895    }
5896
5897    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
5898    /// with updated `retire_prior_to` field set to `v`
5899    #[cfg(test)]
5900    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5901        let n = self
5902            .local_cid_state
5903            .get_mut(&PathId::ZERO)
5904            .unwrap()
5905            .assign_retire_seq(v);
5906        self.endpoint_events
5907            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5908    }
5909
5910    /// Check the current active remote CID sequence for `PathId::ZERO`
5911    #[cfg(test)]
5912    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5913        self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
5914    }
5915
5916    /// Returns the detected maximum udp payload size for the current path
5917    #[cfg(test)]
5918    pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
5919        self.path_data(path_id).current_mtu()
5920    }
5921
5922    /// Triggers path validation on all paths
5923    #[cfg(test)]
5924    pub(crate) fn trigger_path_validation(&mut self) {
5925        for path in self.paths.values_mut() {
5926            path.data.send_new_challenge = true;
5927        }
5928    }
5929
5930    /// Whether we have 1-RTT data to send
5931    ///
5932    /// This checks for frames that can only be sent in the data space (1-RTT):
5933    /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated.
5934    /// - Pending PATH_RESPONSE frames.
5935    /// - Pending data to send in STREAM frames.
5936    /// - Pending DATAGRAM frames to send.
5937    ///
5938    /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that
5939    /// may need to be sent.
5940    fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
5941        let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
5942            path.data.send_new_challenge
5943                || path
5944                    .prev
5945                    .as_ref()
5946                    .is_some_and(|(_, path)| path.send_new_challenge)
5947                || !path.data.path_responses.is_empty()
5948        });
5949        let other = self.streams.can_send_stream_data()
5950            || self
5951                .datagrams
5952                .outgoing
5953                .front()
5954                .is_some_and(|x| x.size(true) <= max_size);
5955        SendableFrames {
5956            acks: false,
5957            other,
5958            close: false,
5959            path_exclusive,
5960        }
5961    }
5962
5963    /// Terminate the connection instantly, without sending a close packet
5964    fn kill(&mut self, reason: ConnectionError) {
5965        self.close_common();
5966        self.state.move_to_drained(Some(reason));
5967        self.endpoint_events.push_back(EndpointEventInner::Drained);
5968    }
5969
5970    /// Storage size required for the largest packet that can be transmitted on all currently
5971    /// available paths
5972    ///
5973    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
5974    ///
5975    /// When multipath is enabled, this value is the minimum MTU across all available paths.
5976    pub fn current_mtu(&self) -> u16 {
5977        self.paths
5978            .iter()
5979            .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
5980            .map(|(_path_id, path_state)| path_state.data.current_mtu())
5981            .min()
5982            .expect("There is always at least one available path")
5983    }
5984
5985    /// Size of non-frame data for a 1-RTT packet
5986    ///
5987    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
5988    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
5989    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
5990    /// latency and packet loss.
5991    fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
5992        let pn_len = PacketNumber::new(
5993            pn,
5994            self.spaces[SpaceId::Data]
5995                .for_path(path)
5996                .largest_acked_packet
5997                .unwrap_or(0),
5998        )
5999        .len();
6000
6001        // 1 byte for flags
6002        1 + self
6003            .rem_cids
6004            .get(&path)
6005            .map(|cids| cids.active().len())
6006            .unwrap_or(20)      // Max CID len in QUIC v1
6007            + pn_len
6008            + self.tag_len_1rtt()
6009    }
6010
6011    fn predict_1rtt_overhead_no_pn(&self) -> usize {
6012        let pn_len = 4;
6013
6014        let cid_len = self
6015            .rem_cids
6016            .values()
6017            .map(|cids| cids.active().len())
6018            .max()
6019            .unwrap_or(20); // Max CID len in QUIC v1
6020
6021        // 1 byte for flags
6022        1 + cid_len + pn_len + self.tag_len_1rtt()
6023    }
6024
6025    fn tag_len_1rtt(&self) -> usize {
6026        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6027            Some(crypto) => Some(&*crypto.packet.local),
6028            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6029        };
6030        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
6031        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
6032        // but that would needlessly prevent sending datagrams during 0-RTT.
6033        key.map_or(16, |x| x.tag_len())
6034    }
6035
6036    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
6037    fn on_path_validated(&mut self, path_id: PathId) {
6038        self.path_data_mut(path_id).validated = true;
6039        let ConnectionSide::Server { server_config } = &self.side else {
6040            return;
6041        };
6042        let network_path = self.path_data(path_id).network_path;
6043        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6044        new_tokens.clear();
6045        for _ in 0..server_config.validation_token.sent {
6046            new_tokens.push(network_path);
6047        }
6048    }
6049
6050    /// Handle new path status information: PATH_STATUS_AVAILABLE, PATH_STATUS_BACKUP
6051    fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6052        if let Some(path) = self.paths.get_mut(&path_id) {
6053            path.data.status.remote_update(status, status_seq_no);
6054        } else {
6055            debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6056        }
6057        self.events.push_back(
6058            PathEvent::RemoteStatus {
6059                id: path_id,
6060                status,
6061            }
6062            .into(),
6063        );
6064    }
6065
6066    /// Returns the maximum [`PathId`] to be used for sending in this connection.
6067    ///
6068    /// This is calculated as minimum between the local and remote's maximums when multipath is
6069    /// enabled, or `None` when disabled.
6070    ///
6071    /// For data that's received, we should use [`Self::local_max_path_id`] instead.
6072    /// The reasoning is that the remote might already have updated to its own newer
6073    /// [`Self::max_path_id`] after sending out a `MAX_PATH_ID` frame, but it got re-ordered.
6074    fn max_path_id(&self) -> Option<PathId> {
6075        if self.is_multipath_negotiated() {
6076            Some(self.remote_max_path_id.min(self.local_max_path_id))
6077        } else {
6078            None
6079        }
6080    }
6081
6082    /// Add addresses the local endpoint considers are reachable for nat traversal
6083    pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6084        if let Some(added) = self.iroh_hp.add_local_address(address)? {
6085            self.spaces[SpaceId::Data].pending.add_address.insert(added);
6086        };
6087        Ok(())
6088    }
6089
6090    /// Removes an address the endpoing no longer considers reachable for nat traversal
6091    ///
6092    /// Addresses not present in the set will be silently ignored.
6093    pub fn remove_nat_traversal_address(
6094        &mut self,
6095        address: SocketAddr,
6096    ) -> Result<(), iroh_hp::Error> {
6097        if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6098            self.spaces[SpaceId::Data]
6099                .pending
6100                .remove_address
6101                .insert(removed);
6102        }
6103        Ok(())
6104    }
6105
6106    /// Get the current local nat traversal addresses
6107    pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6108        self.iroh_hp.get_local_nat_traversal_addresses()
6109    }
6110
6111    /// Get the currently advertised nat traversal addresses by the server
6112    pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6113        Ok(self
6114            .iroh_hp
6115            .client_side()?
6116            .get_remote_nat_traversal_addresses())
6117    }
6118
6119    /// Attempts to open a path for nat traversal.
6120    ///
6121    /// `ipv6` indicates if the path should be opened using an IPV6 remote. If the address is
6122    /// ignored, it will return `None`.
6123    ///
6124    /// On success returns the [`PathId`] and remote address of the path, as well as whether the path
6125    /// existed for the adjusted remote.
6126    fn open_nat_traversal_path(
6127        &mut self,
6128        now: Instant,
6129        (ip, port): (IpAddr, u16),
6130        ipv6: bool,
6131    ) -> Result<Option<(PathId, SocketAddr, bool)>, PathError> {
6132        // If this endpoint is an IPv6 endpoint we use IPv6 addresses for all remotes.
6133        let remote = match ip {
6134            IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6135            IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6136            IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6137            IpAddr::V6(_) => {
6138                trace!("not using IPv6 nat candidate for IPv4 socket");
6139                return Ok(None);
6140            }
6141        };
6142        // TODO(matheus23): Probe the correct 4-tuple, instead of only a remote address?
6143        // By specifying None, we do two things: 1. open_path_ensure won't generate two
6144        // paths to the same remote and 2. we let the OS choose which interface to use for
6145        // sending on that path.
6146        let network_path = FourTuple {
6147            remote,
6148            local_ip: None,
6149        };
6150        match self.open_path_ensure(network_path, PathStatus::Backup, now) {
6151            Ok((path_id, path_was_known)) => {
6152                if path_was_known {
6153                    trace!(%path_id, %remote, "nat traversal: path existed for remote");
6154                }
6155                Ok(Some((path_id, remote, path_was_known)))
6156            }
6157            Err(e) => {
6158                debug!(%remote, %e, "nat traversal: failed to probe remote");
6159                Err(e)
6160            }
6161        }
6162    }
6163
6164    /// Initiates a new nat traversal round
6165    ///
6166    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
6167    /// frames, and initiating probing of the known remote addresses. When a new round is
6168    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
6169    ///
6170    /// Returns the server addresses that are now being probed.
6171    /// If addresses fail due to spurious errors, these might succeed later and not be returned in
6172    /// this set.
6173    pub fn initiate_nat_traversal_round(
6174        &mut self,
6175        now: Instant,
6176    ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6177        if self.state.is_closed() {
6178            return Err(iroh_hp::Error::Closed);
6179        }
6180
6181        let client_state = self.iroh_hp.client_side_mut()?;
6182        let iroh_hp::NatTraversalRound {
6183            new_round,
6184            reach_out_at,
6185            addresses_to_probe,
6186            prev_round_path_ids,
6187        } = client_state.initiate_nat_traversal_round()?;
6188
6189        self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6190
6191        for path_id in prev_round_path_ids {
6192            // TODO(@divma): this sounds reasonable but we need if this actually works for the
6193            // purposes of the protocol
6194            let validated = self
6195                .path(path_id)
6196                .map(|path| path.validated)
6197                .unwrap_or(false);
6198
6199            if !validated {
6200                let _ = self.close_path(
6201                    now,
6202                    path_id,
6203                    TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6204                );
6205            }
6206        }
6207
6208        let mut err = None;
6209
6210        let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6211        let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6212        let ipv6 = self
6213            .paths
6214            .values()
6215            .any(|p| p.data.network_path.remote.is_ipv6());
6216
6217        for (id, address) in addresses_to_probe {
6218            match self.open_nat_traversal_path(now, address, ipv6) {
6219                Ok(None) => {}
6220                Ok(Some((path_id, remote, path_was_known))) => {
6221                    if !path_was_known {
6222                        path_ids.push(path_id);
6223                        probed_addresses.push(remote);
6224                    }
6225                }
6226                Err(e) => {
6227                    self.iroh_hp
6228                        .client_side_mut()
6229                        .expect("validated")
6230                        .report_in_continuation(id, e);
6231                    err.get_or_insert(e);
6232                }
6233            }
6234        }
6235
6236        if let Some(err) = err {
6237            // We failed to probe any addresses, bail out
6238            if probed_addresses.is_empty() {
6239                return Err(iroh_hp::Error::Multipath(err));
6240            }
6241        }
6242
6243        self.iroh_hp
6244            .client_side_mut()
6245            .expect("connection side validated")
6246            .set_round_path_ids(path_ids);
6247
6248        Ok(probed_addresses)
6249    }
6250
6251    /// Attempts to continue a nat traversal round by trying to open paths for pending client probes.
6252    ///
6253    /// If there was nothing to do, it returns `None`. Otherwise it returns whether the path was
6254    /// successfully open.
6255    fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6256        let client_state = self.iroh_hp.client_side_mut().ok()?;
6257        let (id, address) = client_state.continue_nat_traversal_round()?;
6258        let ipv6 = self
6259            .paths
6260            .values()
6261            .any(|p| p.data.network_path.remote.is_ipv6());
6262        let open_result = self.open_nat_traversal_path(now, address, ipv6);
6263        let client_state = self.iroh_hp.client_side_mut().expect("validated");
6264        match open_result {
6265            Ok(None) => Some(true),
6266            Ok(Some((path_id, _remote, path_was_known))) => {
6267                if !path_was_known {
6268                    client_state.add_round_path_id(path_id);
6269                }
6270                Some(true)
6271            }
6272            Err(e) => {
6273                client_state.report_in_continuation(id, e);
6274                Some(false)
6275            }
6276        }
6277    }
6278}
6279
6280impl fmt::Debug for Connection {
6281    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
6282        f.debug_struct("Connection")
6283            .field("handshake_cid", &self.handshake_cid)
6284            .finish()
6285    }
6286}
6287
6288#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6289enum PathBlocked {
6290    No,
6291    AntiAmplification,
6292    Congestion,
6293    Pacing,
6294}
6295
6296/// Fields of `Connection` specific to it being client-side or server-side
6297enum ConnectionSide {
6298    Client {
6299        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
6300        token: Bytes,
6301        token_store: Arc<dyn TokenStore>,
6302        server_name: String,
6303    },
6304    Server {
6305        server_config: Arc<ServerConfig>,
6306    },
6307}
6308
6309impl ConnectionSide {
6310    fn remote_may_migrate(&self, state: &State) -> bool {
6311        match self {
6312            Self::Server { server_config } => server_config.migration,
6313            Self::Client { .. } => {
6314                if let Some(hs) = state.as_handshake() {
6315                    hs.allow_server_migration
6316                } else {
6317                    false
6318                }
6319            }
6320        }
6321    }
6322
6323    fn is_client(&self) -> bool {
6324        self.side().is_client()
6325    }
6326
6327    fn is_server(&self) -> bool {
6328        self.side().is_server()
6329    }
6330
6331    fn side(&self) -> Side {
6332        match *self {
6333            Self::Client { .. } => Side::Client,
6334            Self::Server { .. } => Side::Server,
6335        }
6336    }
6337}
6338
6339impl From<SideArgs> for ConnectionSide {
6340    fn from(side: SideArgs) -> Self {
6341        match side {
6342            SideArgs::Client {
6343                token_store,
6344                server_name,
6345            } => Self::Client {
6346                token: token_store.take(&server_name).unwrap_or_default(),
6347                token_store,
6348                server_name,
6349            },
6350            SideArgs::Server {
6351                server_config,
6352                pref_addr_cid: _,
6353                path_validated: _,
6354            } => Self::Server { server_config },
6355        }
6356    }
6357}
6358
6359/// Parameters to `Connection::new` specific to it being client-side or server-side
6360pub(crate) enum SideArgs {
6361    Client {
6362        token_store: Arc<dyn TokenStore>,
6363        server_name: String,
6364    },
6365    Server {
6366        server_config: Arc<ServerConfig>,
6367        pref_addr_cid: Option<ConnectionId>,
6368        path_validated: bool,
6369    },
6370}
6371
6372impl SideArgs {
6373    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6374        match *self {
6375            Self::Client { .. } => None,
6376            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6377        }
6378    }
6379
6380    pub(crate) fn path_validated(&self) -> bool {
6381        match *self {
6382            Self::Client { .. } => true,
6383            Self::Server { path_validated, .. } => path_validated,
6384        }
6385    }
6386
6387    pub(crate) fn side(&self) -> Side {
6388        match *self {
6389            Self::Client { .. } => Side::Client,
6390            Self::Server { .. } => Side::Server,
6391        }
6392    }
6393}
6394
6395/// Reasons why a connection might be lost
6396#[derive(Debug, Error, Clone, PartialEq, Eq)]
6397pub enum ConnectionError {
6398    /// The peer doesn't implement any supported version
6399    #[error("peer doesn't implement any supported version")]
6400    VersionMismatch,
6401    /// The peer violated the QUIC specification as understood by this implementation
6402    #[error(transparent)]
6403    TransportError(#[from] TransportError),
6404    /// The peer's QUIC stack aborted the connection automatically
6405    #[error("aborted by peer: {0}")]
6406    ConnectionClosed(frame::ConnectionClose),
6407    /// The peer closed the connection
6408    #[error("closed by peer: {0}")]
6409    ApplicationClosed(frame::ApplicationClose),
6410    /// The peer is unable to continue processing this connection, usually due to having restarted
6411    #[error("reset by peer")]
6412    Reset,
6413    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
6414    ///
6415    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
6416    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
6417    /// and [`TransportConfig::keep_alive_interval()`].
6418    #[error("timed out")]
6419    TimedOut,
6420    /// The local application closed the connection
6421    #[error("closed")]
6422    LocallyClosed,
6423    /// The connection could not be created because not enough of the CID space is available
6424    ///
6425    /// Try using longer connection IDs.
6426    #[error("CIDs exhausted")]
6427    CidsExhausted,
6428}
6429
6430impl From<Close> for ConnectionError {
6431    fn from(x: Close) -> Self {
6432        match x {
6433            Close::Connection(reason) => Self::ConnectionClosed(reason),
6434            Close::Application(reason) => Self::ApplicationClosed(reason),
6435        }
6436    }
6437}
6438
6439// For compatibility with API consumers
6440impl From<ConnectionError> for io::Error {
6441    fn from(x: ConnectionError) -> Self {
6442        use ConnectionError::*;
6443        let kind = match x {
6444            TimedOut => io::ErrorKind::TimedOut,
6445            Reset => io::ErrorKind::ConnectionReset,
6446            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6447            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6448                io::ErrorKind::Other
6449            }
6450        };
6451        Self::new(kind, x)
6452    }
6453}
6454
6455/// Errors that might trigger a path being closed
6456// TODO(@divma): maybe needs to be reworked based on what we want to do with the public API
6457#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6458pub enum PathError {
6459    /// The extension was not negotiated with the peer
6460    #[error("multipath extension not negotiated")]
6461    MultipathNotNegotiated,
6462    /// Paths can only be opened client-side
6463    #[error("the server side may not open a path")]
6464    ServerSideNotAllowed,
6465    /// Current limits do not allow us to open more paths
6466    #[error("maximum number of concurrent paths reached")]
6467    MaxPathIdReached,
6468    /// No remote CIDs available to open a new path
6469    #[error("remoted CIDs exhausted")]
6470    RemoteCidsExhausted,
6471    /// Path could not be validated and will be abandoned
6472    #[error("path validation failed")]
6473    ValidationFailed,
6474    /// The remote address for the path is not supported by the endpoint
6475    #[error("invalid remote address")]
6476    InvalidRemoteAddress(SocketAddr),
6477}
6478
6479/// Errors triggered when abandoning a path
6480#[derive(Debug, Error, Clone, Eq, PartialEq)]
6481pub enum ClosePathError {
6482    /// The path is already closed or was never opened
6483    #[error("closed path")]
6484    ClosedPath,
6485    /// This is the last path, which can not be abandoned
6486    #[error("last open path")]
6487    LastOpenPath,
6488}
6489
6490/// Error when the multipath extension was not negotiated, but attempted to be used.
6491#[derive(Debug, Error, Clone, Copy)]
6492#[error("Multipath extension not negotiated")]
6493pub struct MultipathNotNegotiated {
6494    _private: (),
6495}
6496
6497/// Events of interest to the application
6498#[derive(Debug)]
6499pub enum Event {
6500    /// The connection's handshake data is ready
6501    HandshakeDataReady,
6502    /// The connection was successfully established
6503    Connected,
6504    /// The TLS handshake was confirmed
6505    HandshakeConfirmed,
6506    /// The connection was lost
6507    ///
6508    /// Emitted if the peer closes the connection or an error is encountered.
6509    ConnectionLost {
6510        /// Reason that the connection was closed
6511        reason: ConnectionError,
6512    },
6513    /// Stream events
6514    Stream(StreamEvent),
6515    /// One or more application datagrams have been received
6516    DatagramReceived,
6517    /// One or more application datagrams have been sent after blocking
6518    DatagramsUnblocked,
6519    /// (Multi)Path events
6520    Path(PathEvent),
6521    /// Iroh's nat traversal events
6522    NatTraversal(iroh_hp::Event),
6523}
6524
6525impl From<PathEvent> for Event {
6526    fn from(source: PathEvent) -> Self {
6527        Self::Path(source)
6528    }
6529}
6530
6531fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6532    Duration::from_micros(params.max_ack_delay.0 * 1000)
6533}
6534
6535// Prevents overflow and improves behavior in extreme circumstances
6536const MAX_BACKOFF_EXPONENT: u32 = 16;
6537
6538/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
6539///
6540/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
6541/// plus some space for frames. We only care about handshake headers because short header packets
6542/// necessarily have smaller headers, and initial packets are only ever the first packet in a
6543/// datagram (because we coalesce in ascending packet space order and the only reason to split a
6544/// packet is when packet space changes).
6545const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6546
6547/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
6548///
6549/// Excludes packet-type-specific fields such as packet number or Initial token
6550// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
6551// scid len + scid + length + pn
6552const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6553    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6554
6555/// Perform key updates this many packets before the AEAD confidentiality limit.
6556///
6557/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
6558const KEY_UPDATE_MARGIN: u64 = 10_000;
6559
6560#[derive(Default)]
6561struct SentFrames {
6562    retransmits: ThinRetransmits,
6563    /// The packet number of the largest acknowledged packet for each path
6564    largest_acked: FxHashMap<PathId, u64>,
6565    stream_frames: StreamMetaVec,
6566    /// Whether the packet contains non-retransmittable frames (like datagrams)
6567    non_retransmits: bool,
6568    /// If the datagram containing these frames should be padded to the min MTU
6569    requires_padding: bool,
6570}
6571
6572impl SentFrames {
6573    /// Returns whether the packet contains only ACKs
6574    fn is_ack_only(&self, streams: &StreamsState) -> bool {
6575        !self.largest_acked.is_empty()
6576            && !self.non_retransmits
6577            && self.stream_frames.is_empty()
6578            && self.retransmits.is_empty(streams)
6579    }
6580
6581    fn retransmits_mut(&mut self) -> &mut Retransmits {
6582        self.retransmits.get_or_create()
6583    }
6584
6585    fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
6586        use frame::EncodableFrame::*;
6587        match frame {
6588            PathAck(path_ack_encoder) => {
6589                if let Some(max) = path_ack_encoder.ranges.max() {
6590                    self.largest_acked.insert(path_ack_encoder.path_id, max);
6591                }
6592            }
6593            Ack(ack_encoder) => {
6594                if let Some(max) = ack_encoder.ranges.max() {
6595                    self.largest_acked.insert(PathId::ZERO, max);
6596                }
6597            }
6598            Close(_) => { /* non retransmittable, but after this we don't really care */ }
6599            PathResponse(_) => self.non_retransmits = true,
6600            HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
6601            ReachOut(frame::ReachOut { round, ip, port }) => self
6602                .retransmits_mut()
6603                .reach_out
6604                .get_or_insert_with(|| (round, Vec::new()))
6605                .1
6606                .push((ip, port)),
6607            ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
6608            Ping(_) => self.non_retransmits = true,
6609            ImmediateAck(_) => self.non_retransmits = true,
6610            AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
6611            PathChallenge(_) => self.non_retransmits = true,
6612            Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
6613            PathAbandon(path_abandon) => {
6614                self.retransmits_mut()
6615                    .path_abandon
6616                    .entry(path_abandon.path_id)
6617                    .or_insert(path_abandon.error_code);
6618            }
6619            PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
6620            | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
6621                self.retransmits_mut().path_status.insert(path_id);
6622            }
6623            MaxPathId(_) => self.retransmits_mut().max_path_id = true,
6624            PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
6625            PathCidsBlocked(path_cids_blocked) => {
6626                self.retransmits_mut()
6627                    .path_cids_blocked
6628                    .insert(path_cids_blocked.path_id);
6629            }
6630            ResetStream(reset) => self
6631                .retransmits_mut()
6632                .reset_stream
6633                .push((reset.id, reset.error_code)),
6634            StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
6635            NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
6636            RetireConnectionId(retire_cid) => self
6637                .retransmits_mut()
6638                .retire_cids
6639                .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
6640            Datagram(_) => self.non_retransmits = true,
6641            NewToken(_) => {}
6642            AddAddress(add_address) => {
6643                self.retransmits_mut().add_address.insert(add_address);
6644            }
6645            RemoveAddress(remove_address) => {
6646                self.retransmits_mut().remove_address.insert(remove_address);
6647            }
6648            StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
6649            MaxData(_) => self.retransmits_mut().max_data = true,
6650            MaxStreamData(max) => {
6651                self.retransmits_mut().max_stream_data.insert(max.id);
6652            }
6653            MaxStreams(max_streams) => {
6654                self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
6655            }
6656        }
6657    }
6658}
6659
6660/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
6661///
6662/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
6663///
6664/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
6665///
6666/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
6667fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6668    match (x, y) {
6669        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6670        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6671        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6672        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6673    }
6674}
6675
6676#[cfg(test)]
6677mod tests {
6678    use super::*;
6679
6680    #[test]
6681    fn negotiate_max_idle_timeout_commutative() {
6682        let test_params = [
6683            (None, None, None),
6684            (None, Some(VarInt(0)), None),
6685            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6686            (Some(VarInt(0)), Some(VarInt(0)), None),
6687            (
6688                Some(VarInt(2)),
6689                Some(VarInt(0)),
6690                Some(Duration::from_millis(2)),
6691            ),
6692            (
6693                Some(VarInt(1)),
6694                Some(VarInt(4)),
6695                Some(Duration::from_millis(1)),
6696            ),
6697        ];
6698
6699        for (left, right, result) in test_params {
6700            assert_eq!(negotiate_max_idle_timeout(left, right), result);
6701            assert_eq!(negotiate_max_idle_timeout(right, left), result);
6702        }
6703    }
6704}