iroh_quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque, btree_map},
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    num::NonZeroU32,
8    ops::Not,
9    sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21    Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22    MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23    TransportErrorCode, VarInt,
24    cid_generator::ConnectionIdGenerator,
25    cid_queue::CidQueue,
26    coding::BufMutExt,
27    config::{ServerConfig, TransportConfig},
28    congestion::Controller,
29    connection::{
30        qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31        spaces::LostPacket,
32        timer::{ConnTimer, PathTimer},
33    },
34    crypto::{self, KeyPair, Keys, PacketKey},
35    frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36    iroh_hp,
37    packet::{
38        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39        PacketNumber, PartialDecode, SpaceId,
40    },
41    range_set::ArrayRangeSet,
42    shared::{
43        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44        EndpointEvent, EndpointEventInner,
45    },
46    token::{ResetToken, Token, TokenPayload},
47    transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77
78mod send_buffer;
79
80mod spaces;
81#[cfg(fuzzing)]
82pub use spaces::Retransmits;
83#[cfg(not(fuzzing))]
84use spaces::Retransmits;
85use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
86
87mod stats;
88pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
89
90mod streams;
91#[cfg(fuzzing)]
92pub use streams::StreamsState;
93#[cfg(not(fuzzing))]
94use streams::StreamsState;
95pub use streams::{
96    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
97    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
98};
99
100mod timer;
101use timer::{Timer, TimerTable};
102
103mod transmit_buf;
104use transmit_buf::TransmitBuf;
105
106mod state;
107
108#[cfg(not(fuzzing))]
109use state::State;
110#[cfg(fuzzing)]
111pub use state::State;
112use state::StateType;
113
114/// Protocol state and logic for a single QUIC connection
115///
116/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
117/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
118/// expects timeouts through various methods. A number of simple getter methods are exposed
119/// to allow callers to inspect some of the connection state.
120///
121/// `Connection` has roughly 4 types of methods:
122///
123/// - A. Simple getters, taking `&self`
124/// - B. Handlers for incoming events from the network or system, named `handle_*`.
125/// - C. State machine mutators, for incoming commands from the application. For convenience we
126///   refer to this as "performing I/O" below, however as per the design of this library none of the
127///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
128///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
129/// - D. Polling functions for outgoing events or actions for the caller to
130///   take, named `poll_*`.
131///
132/// The simplest way to use this API correctly is to call (B) and (C) whenever
133/// appropriate, then after each of those calls, as soon as feasible call all
134/// polling methods (D) and deal with their outputs appropriately, e.g. by
135/// passing it to the application or by making a system-level I/O call. You
136/// should call the polling functions in this order:
137///
138/// 1. [`poll_transmit`](Self::poll_transmit)
139/// 2. [`poll_timeout`](Self::poll_timeout)
140/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
141/// 4. [`poll`](Self::poll)
142///
143/// Currently the only actual dependency is from (2) to (1), however additional
144/// dependencies may be added in future, so the above order is recommended.
145///
146/// (A) may be called whenever desired.
147///
148/// Care should be made to ensure that the input events represent monotonically
149/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
150/// with events of the same [`Instant`] may be interleaved in any order with a
151/// call to [`handle_event`](Self::handle_event) at that same instant; however
152/// events or timeouts with different instants must not be interleaved.
153pub struct Connection {
154    endpoint_config: Arc<EndpointConfig>,
155    config: Arc<TransportConfig>,
156    rng: StdRng,
157    crypto: Box<dyn crypto::Session>,
158    /// The CID we initially chose, for use during the handshake
159    handshake_cid: ConnectionId,
160    /// The CID the peer initially chose, for use during the handshake
161    rem_handshake_cid: ConnectionId,
162    /// The "real" local IP address which was was used to receive the initial packet.
163    /// This is only populated for the server case, and if known
164    local_ip: Option<IpAddr>,
165    /// The [`PathData`] for each path
166    ///
167    /// This needs to be ordered because [`Connection::poll_transmit`] needs to
168    /// deterministically select the next PathId to send on.
169    // TODO(flub): well does it really? But deterministic is nice for now.
170    paths: BTreeMap<PathId, PathState>,
171    /// Incremented every time we see a new path
172    ///
173    /// Stored separately from `path.generation` to account for aborted migrations
174    path_counter: u64,
175    /// Whether MTU detection is supported in this environment
176    allow_mtud: bool,
177    state: State,
178    side: ConnectionSide,
179    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
180    zero_rtt_enabled: bool,
181    /// Set if 0-RTT is supported, then cleared when no longer needed.
182    zero_rtt_crypto: Option<ZeroRttCrypto>,
183    key_phase: bool,
184    /// How many packets are in the current key phase. Used only for `Data` space.
185    key_phase_size: u64,
186    /// Transport parameters set by the peer
187    peer_params: TransportParameters,
188    /// Source ConnectionId of the first packet received from the peer
189    orig_rem_cid: ConnectionId,
190    /// Destination ConnectionId sent by the client on the first Initial
191    initial_dst_cid: ConnectionId,
192    /// The value that the server included in the Source Connection ID field of a Retry packet, if
193    /// one was received
194    retry_src_cid: Option<ConnectionId>,
195    /// Events returned by [`Connection::poll`]
196    events: VecDeque<Event>,
197    endpoint_events: VecDeque<EndpointEventInner>,
198    /// Whether the spin bit is in use for this connection
199    spin_enabled: bool,
200    /// Outgoing spin bit state
201    spin: bool,
202    /// Packet number spaces: initial, handshake, 1-RTT
203    spaces: [PacketSpace; 3],
204    /// Highest usable [`SpaceId`]
205    highest_space: SpaceId,
206    /// 1-RTT keys used prior to a key update
207    prev_crypto: Option<PrevCrypto>,
208    /// 1-RTT keys to be used for the next key update
209    ///
210    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
211    /// spoofing key updates.
212    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
213    accepted_0rtt: bool,
214    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
215    permit_idle_reset: bool,
216    /// Negotiated idle timeout
217    idle_timeout: Option<Duration>,
218    timers: TimerTable,
219    /// Number of packets received which could not be authenticated
220    authentication_failures: u64,
221
222    //
223    // Queued non-retransmittable 1-RTT data
224    //
225    /// If the CONNECTION_CLOSE frame needs to be sent
226    close: bool,
227
228    //
229    // ACK frequency
230    //
231    ack_frequency: AckFrequencyState,
232
233    //
234    // Congestion Control
235    //
236    /// Whether the most recently received packet had an ECN codepoint set
237    receiving_ecn: bool,
238    /// Number of packets authenticated
239    total_authed_packets: u64,
240    /// Whether the last `poll_transmit` call yielded no data because there was
241    /// no outgoing application data.
242    app_limited: bool,
243
244    //
245    // ObservedAddr
246    //
247    /// Sequence number for the next observed address frame sent to the peer.
248    next_observed_addr_seq_no: VarInt,
249
250    streams: StreamsState,
251    /// Surplus remote CIDs for future use on new paths
252    ///
253    /// These are given out before multiple paths exist, also for paths that will never
254    /// exist.  So if multipath is supported the number of paths here will be higher than
255    /// the actual number of paths in use.
256    rem_cids: FxHashMap<PathId, CidQueue>,
257    /// Attributes of CIDs generated by local endpoint
258    ///
259    /// Any path that is allowed to be opened is present in this map, as well as the already
260    /// opened paths. However since CIDs are issued async by the endpoint driver via
261    /// connection events it can not be used to know if CIDs have been issued for a path or
262    /// not. See [`Connection::max_path_id_with_cids`] for this.
263    local_cid_state: FxHashMap<PathId, CidState>,
264    /// State of the unreliable datagram extension
265    datagrams: DatagramState,
266    /// Connection level statistics
267    stats: ConnectionStats,
268    /// Path level statistics
269    path_stats: FxHashMap<PathId, PathStats>,
270    /// QUIC version used for the connection.
271    version: u32,
272
273    //
274    // Multipath
275    //
276    /// Maximum number of concurrent paths
277    ///
278    /// Initially set from the [`TransportConfig::max_concurrent_multipath_paths`]. Even
279    /// when multipath is disabled this will be set to 1, it is not used in that case
280    /// though.
281    max_concurrent_paths: NonZeroU32,
282    /// Local maximum [`PathId`] to be used
283    ///
284    /// This is initially set to [`TransportConfig::get_initial_max_path_id`] when multipath
285    /// is negotiated, or to [`PathId::ZERO`] otherwise. This is essentially the value of
286    /// the highest MAX_PATH_ID frame sent.
287    ///
288    /// Any path with an ID equal or below this [`PathId`] is either:
289    ///
290    /// - Abandoned, if it is also in [`Connection::abandoned_paths`].
291    /// - Open, in this case it is present in [`Connection::paths`]
292    /// - Not yet opened, if it is in neither of these two places.
293    ///
294    /// Note that for not-yet-open there may or may not be any CIDs issued. See
295    /// [`Connection::max_path_id_with_cids`].
296    local_max_path_id: PathId,
297    /// Remote's maximum [`PathId`] to be used
298    ///
299    /// This is initially set to the peer's [`TransportParameters::initial_max_path_id`] when
300    /// multipath is negotiated, or to [`PathId::ZERO`] otherwise. A peer may increase this limit
301    /// by sending [`Frame::MaxPathId`] frames.
302    remote_max_path_id: PathId,
303    /// The greatest [`PathId`] we have issued CIDs for
304    ///
305    /// CIDs are only issued for `min(local_max_path_id, remote_max_path_id)`. It is not
306    /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
307    /// since they are issued asynchronously by the endpoint driver.
308    max_path_id_with_cids: PathId,
309    /// The paths already abandoned
310    ///
311    /// They may still have some state left in [`Connection::paths`] or
312    /// [`Connection::local_cid_state`] since some of this has to be kept around for some
313    /// time after a path is abandoned.
314    // TODO(flub): Make this a more efficient data structure.  Like ranges of abandoned
315    //    paths.  Or a set together with a minimum.  Or something.
316    abandoned_paths: FxHashSet<PathId>,
317
318    iroh_hp: iroh_hp::State,
319    qlog: QlogSink,
320}
321
322impl Connection {
323    pub(crate) fn new(
324        endpoint_config: Arc<EndpointConfig>,
325        config: Arc<TransportConfig>,
326        init_cid: ConnectionId,
327        loc_cid: ConnectionId,
328        rem_cid: ConnectionId,
329        remote: SocketAddr,
330        local_ip: Option<IpAddr>,
331        crypto: Box<dyn crypto::Session>,
332        cid_gen: &dyn ConnectionIdGenerator,
333        now: Instant,
334        version: u32,
335        allow_mtud: bool,
336        rng_seed: [u8; 32],
337        side_args: SideArgs,
338        qlog: QlogSink,
339    ) -> Self {
340        let pref_addr_cid = side_args.pref_addr_cid();
341        let path_validated = side_args.path_validated();
342        let connection_side = ConnectionSide::from(side_args);
343        let side = connection_side.side();
344        let mut rng = StdRng::from_seed(rng_seed);
345        let initial_space = {
346            let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
347            space.crypto = Some(crypto.initial_keys(init_cid, side));
348            space
349        };
350        let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
351        #[cfg(test)]
352        let data_space = match config.deterministic_packet_numbers {
353            true => PacketSpace::new_deterministic(now, SpaceId::Data),
354            false => PacketSpace::new(now, SpaceId::Data, &mut rng),
355        };
356        #[cfg(not(test))]
357        let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
358        let state = State::handshake(state::Handshake {
359            rem_cid_set: side.is_server(),
360            expected_token: Bytes::new(),
361            client_hello: None,
362            allow_server_migration: side.is_client(),
363        });
364        let local_cid_state = FxHashMap::from_iter([(
365            PathId::ZERO,
366            CidState::new(
367                cid_gen.cid_len(),
368                cid_gen.cid_lifetime(),
369                now,
370                if pref_addr_cid.is_some() { 2 } else { 1 },
371            ),
372        )]);
373
374        let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
375        // TODO(@divma): consider if we want to delay this until the path is validated
376        path.open = true;
377        let mut this = Self {
378            endpoint_config,
379            crypto,
380            handshake_cid: loc_cid,
381            rem_handshake_cid: rem_cid,
382            local_cid_state,
383            paths: BTreeMap::from_iter([(
384                PathId::ZERO,
385                PathState {
386                    data: path,
387                    prev: None,
388                },
389            )]),
390            path_counter: 0,
391            allow_mtud,
392            local_ip,
393            state,
394            side: connection_side,
395            zero_rtt_enabled: false,
396            zero_rtt_crypto: None,
397            key_phase: false,
398            // A small initial key phase size ensures peers that don't handle key updates correctly
399            // fail sooner rather than later. It's okay for both peers to do this, as the first one
400            // to perform an update will reset the other's key phase size in `update_keys`, and a
401            // simultaneous key update by both is just like a regular key update with a really fast
402            // response. Inspired by quic-go's similar behavior of performing the first key update
403            // at the 100th short-header packet.
404            key_phase_size: rng.random_range(10..1000),
405            peer_params: TransportParameters::default(),
406            orig_rem_cid: rem_cid,
407            initial_dst_cid: init_cid,
408            retry_src_cid: None,
409            events: VecDeque::new(),
410            endpoint_events: VecDeque::new(),
411            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
412            spin: false,
413            spaces: [initial_space, handshake_space, data_space],
414            highest_space: SpaceId::Initial,
415            prev_crypto: None,
416            next_crypto: None,
417            accepted_0rtt: false,
418            permit_idle_reset: true,
419            idle_timeout: match config.max_idle_timeout {
420                None | Some(VarInt(0)) => None,
421                Some(dur) => Some(Duration::from_millis(dur.0)),
422            },
423            timers: TimerTable::default(),
424            authentication_failures: 0,
425            close: false,
426
427            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
428                &TransportParameters::default(),
429            )),
430
431            app_limited: false,
432            receiving_ecn: false,
433            total_authed_packets: 0,
434
435            next_observed_addr_seq_no: 0u32.into(),
436
437            streams: StreamsState::new(
438                side,
439                config.max_concurrent_uni_streams,
440                config.max_concurrent_bidi_streams,
441                config.send_window,
442                config.receive_window,
443                config.stream_receive_window,
444            ),
445            datagrams: DatagramState::default(),
446            config,
447            rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
448            rng,
449            stats: ConnectionStats::default(),
450            path_stats: Default::default(),
451            version,
452
453            // peer params are not yet known, so multipath is not enabled
454            max_concurrent_paths: NonZeroU32::MIN,
455            local_max_path_id: PathId::ZERO,
456            remote_max_path_id: PathId::ZERO,
457            max_path_id_with_cids: PathId::ZERO,
458            abandoned_paths: Default::default(),
459
460            // iroh's nat traversal
461            iroh_hp: Default::default(),
462            qlog,
463        };
464        if path_validated {
465            this.on_path_validated(PathId::ZERO);
466        }
467        if side.is_client() {
468            // Kick off the connection
469            this.write_crypto();
470            this.init_0rtt(now);
471        }
472        this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
473        this
474    }
475
476    /// Returns the next time at which `handle_timeout` should be called
477    ///
478    /// The value returned may change after:
479    /// - the application performed some I/O on the connection
480    /// - a call was made to `handle_event`
481    /// - a call to `poll_transmit` returned `Some`
482    /// - a call was made to `handle_timeout`
483    #[must_use]
484    pub fn poll_timeout(&mut self) -> Option<Instant> {
485        self.timers.peek()
486    }
487
488    /// Returns application-facing events
489    ///
490    /// Connections should be polled for events after:
491    /// - a call was made to `handle_event`
492    /// - a call was made to `handle_timeout`
493    #[must_use]
494    pub fn poll(&mut self) -> Option<Event> {
495        if let Some(x) = self.events.pop_front() {
496            return Some(x);
497        }
498
499        if let Some(event) = self.streams.poll() {
500            return Some(Event::Stream(event));
501        }
502
503        if let Some(reason) = self.state.take_error() {
504            return Some(Event::ConnectionLost { reason });
505        }
506
507        None
508    }
509
510    /// Return endpoint-facing events
511    #[must_use]
512    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
513        self.endpoint_events.pop_front().map(EndpointEvent)
514    }
515
516    /// Provide control over streams
517    #[must_use]
518    pub fn streams(&mut self) -> Streams<'_> {
519        Streams {
520            state: &mut self.streams,
521            conn_state: &self.state,
522        }
523    }
524
525    /// Provide control over streams
526    #[must_use]
527    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
528        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
529        RecvStream {
530            id,
531            state: &mut self.streams,
532            pending: &mut self.spaces[SpaceId::Data].pending,
533        }
534    }
535
536    /// Provide control over streams
537    #[must_use]
538    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
539        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
540        SendStream {
541            id,
542            state: &mut self.streams,
543            pending: &mut self.spaces[SpaceId::Data].pending,
544            conn_state: &self.state,
545        }
546    }
547
548    /// Opens a new path only if no path to the remote address exists so far
549    ///
550    /// See [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id,
551    /// false)` if was opened.
552    ///
553    /// [`open_path`]: Connection::open_path
554    pub fn open_path_ensure(
555        &mut self,
556        remote: SocketAddr,
557        initial_status: PathStatus,
558        now: Instant,
559    ) -> Result<(PathId, bool), PathError> {
560        match self
561            .paths
562            .iter()
563            .find(|(_id, path)| path.data.remote == remote)
564        {
565            Some((path_id, _state)) => Ok((*path_id, true)),
566            None => self
567                .open_path(remote, initial_status, now)
568                .map(|id| (id, false)),
569        }
570    }
571
572    /// Opens a new path
573    ///
574    /// Further errors might occur and they will be emitted in [`PathEvent::LocallyClosed`] events.
575    /// When the path is opened it will be reported as an [`PathEvent::Opened`].
576    pub fn open_path(
577        &mut self,
578        remote: SocketAddr,
579        initial_status: PathStatus,
580        now: Instant,
581    ) -> Result<PathId, PathError> {
582        if !self.is_multipath_negotiated() {
583            return Err(PathError::MultipathNotNegotiated);
584        }
585        if self.side().is_server() {
586            return Err(PathError::ServerSideNotAllowed);
587        }
588
589        let max_abandoned = self.abandoned_paths.iter().max().copied();
590        let max_used = self.paths.keys().last().copied();
591        let path_id = max_abandoned
592            .max(max_used)
593            .unwrap_or(PathId::ZERO)
594            .saturating_add(1u8);
595
596        if Some(path_id) > self.max_path_id() {
597            return Err(PathError::MaxPathIdReached);
598        }
599        if path_id > self.remote_max_path_id {
600            self.spaces[SpaceId::Data].pending.paths_blocked = true;
601            return Err(PathError::MaxPathIdReached);
602        }
603        if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
604            self.spaces[SpaceId::Data]
605                .pending
606                .path_cids_blocked
607                .push(path_id);
608            return Err(PathError::RemoteCidsExhausted);
609        }
610
611        let path = self.ensure_path(path_id, remote, now, None);
612        path.status.local_update(initial_status);
613
614        Ok(path_id)
615    }
616
617    /// Closes a path by sending a PATH_ABANDON frame
618    ///
619    /// This will not allow closing the last path. It does allow closing paths which have
620    /// not yet been opened, as e.g. is the case when receiving a PATH_ABANDON from the peer
621    /// for a path that was never opened locally.
622    pub fn close_path(
623        &mut self,
624        now: Instant,
625        path_id: PathId,
626        error_code: VarInt,
627    ) -> Result<(), ClosePathError> {
628        if self.abandoned_paths.contains(&path_id)
629            || Some(path_id) > self.max_path_id()
630            || !self.paths.contains_key(&path_id)
631        {
632            return Err(ClosePathError::ClosedPath);
633        }
634        if self
635            .paths
636            .iter()
637            // Would there be any remaining, non-abandoned, validated paths
638            .any(|(id, path)| {
639                *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
640            })
641            .not()
642        {
643            return Err(ClosePathError::LastOpenPath);
644        }
645
646        // Send PATH_ABANDON
647        self.spaces[SpaceId::Data]
648            .pending
649            .path_abandon
650            .insert(path_id, error_code.into());
651
652        // Remove pending NEW CIDs for this path
653        let pending_space = &mut self.spaces[SpaceId::Data].pending;
654        pending_space.new_cids.retain(|cid| cid.path_id != path_id);
655        pending_space.path_cids_blocked.retain(|&id| id != path_id);
656        pending_space.path_status.retain(|&id| id != path_id);
657
658        // Cleanup retransmits across ALL paths (CIDs for path_id may have been transmitted on other paths)
659        for space in self.spaces[SpaceId::Data].iter_paths_mut() {
660            for sent_packet in space.sent_packets.values_mut() {
661                if let Some(retransmits) = sent_packet.retransmits.get_mut() {
662                    retransmits.new_cids.retain(|cid| cid.path_id != path_id);
663                    retransmits.path_cids_blocked.retain(|&id| id != path_id);
664                    retransmits.path_status.retain(|&id| id != path_id);
665                }
666            }
667        }
668
669        // Consider remotely issued CIDs as retired.
670        // Technically we don't have to do this just yet.  We only need to do this *after*
671        // the ABANDON_PATH frame is sent, allowing us to still send it on the
672        // to-be-abandoned path.  However it is recommended to send it on another path, and
673        // we do not allow abandoning the last path anyway.
674        self.rem_cids.remove(&path_id);
675        self.endpoint_events
676            .push_back(EndpointEventInner::RetireResetToken(path_id));
677
678        let pto = self.pto_max_path(SpaceId::Data);
679
680        let path = self.paths.get_mut(&path_id).expect("checked above");
681
682        // We record the time after which receiving data on this path generates a transport error.
683        path.data.last_allowed_receive = Some(now + 3 * pto);
684        self.abandoned_paths.insert(path_id);
685
686        self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
687
688        // The peer MUST respond with a corresponding PATH_ABANDON frame.
689        // If we receive packets on the abandoned path after 3 * PTO we trigger a transport error.
690        // In any case, we completely discard the path after 6 * PTO whether we receive a
691        // PATH_ABANDON or not.
692        self.timers.set(
693            Timer::PerPath(path_id, PathTimer::DiscardPath),
694            now + 6 * pto,
695            self.qlog.with_time(now),
696        );
697        Ok(())
698    }
699
700    /// Gets the [`PathData`] for a known [`PathId`].
701    ///
702    /// Will panic if the path_id does not reference any known path.
703    #[track_caller]
704    fn path_data(&self, path_id: PathId) -> &PathData {
705        if let Some(data) = self.paths.get(&path_id) {
706            &data.data
707        } else {
708            panic!(
709                "unknown path: {path_id}, currently known paths: {:?}",
710                self.paths.keys().collect::<Vec<_>>()
711            );
712        }
713    }
714
715    /// Gets a reference to the [`PathData`] for a [`PathId`]
716    fn path(&self, path_id: PathId) -> Option<&PathData> {
717        self.paths.get(&path_id).map(|path_state| &path_state.data)
718    }
719
720    /// Gets a mutable reference to the [`PathData`] for a [`PathId`]
721    fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
722        self.paths
723            .get_mut(&path_id)
724            .map(|path_state| &mut path_state.data)
725    }
726
727    /// Returns all known paths.
728    ///
729    /// There is no guarantee any of these paths are open or usable.
730    pub fn paths(&self) -> Vec<PathId> {
731        self.paths.keys().copied().collect()
732    }
733
734    /// Gets the local [`PathStatus`] for a known [`PathId`]
735    pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
736        self.path(path_id)
737            .map(PathData::local_status)
738            .ok_or(ClosedPath { _private: () })
739    }
740
741    /// Returns the path's remote socket address
742    pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
743        self.path(path_id)
744            .map(|path| path.remote)
745            .ok_or(ClosedPath { _private: () })
746    }
747
748    /// Sets the [`PathStatus`] for a known [`PathId`]
749    ///
750    /// Returns the previous path status on success.
751    pub fn set_path_status(
752        &mut self,
753        path_id: PathId,
754        status: PathStatus,
755    ) -> Result<PathStatus, SetPathStatusError> {
756        if !self.is_multipath_negotiated() {
757            return Err(SetPathStatusError::MultipathNotNegotiated);
758        }
759        let path = self
760            .path_mut(path_id)
761            .ok_or(SetPathStatusError::ClosedPath)?;
762        let prev = match path.status.local_update(status) {
763            Some(prev) => {
764                self.spaces[SpaceId::Data]
765                    .pending
766                    .path_status
767                    .insert(path_id);
768                prev
769            }
770            None => path.local_status(),
771        };
772        Ok(prev)
773    }
774
775    /// Returns the remote path status
776    // TODO(flub): Probably should also be some kind of path event?  Not even sure if I like
777    //    this as an API, but for now it allows me to write a test easily.
778    // TODO(flub): Technically this should be a Result<Option<PathSTatus>>?
779    pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
780        self.path(path_id).and_then(|path| path.remote_status())
781    }
782
783    /// Sets the max_idle_timeout for a specific path
784    ///
785    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
786    ///
787    /// Returns the previous value of the setting.
788    pub fn set_path_max_idle_timeout(
789        &mut self,
790        path_id: PathId,
791        timeout: Option<Duration>,
792    ) -> Result<Option<Duration>, ClosedPath> {
793        let path = self
794            .paths
795            .get_mut(&path_id)
796            .ok_or(ClosedPath { _private: () })?;
797        Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
798    }
799
800    /// Sets the keep_alive_interval for a specific path
801    ///
802    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
803    ///
804    /// Returns the previous value of the setting.
805    pub fn set_path_keep_alive_interval(
806        &mut self,
807        path_id: PathId,
808        interval: Option<Duration>,
809    ) -> Result<Option<Duration>, ClosedPath> {
810        let path = self
811            .paths
812            .get_mut(&path_id)
813            .ok_or(ClosedPath { _private: () })?;
814        Ok(std::mem::replace(&mut path.data.keep_alive, interval))
815    }
816
817    /// Gets the [`PathData`] for a known [`PathId`].
818    ///
819    /// Will panic if the path_id does not reference any known path.
820    #[track_caller]
821    fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
822        &mut self.paths.get_mut(&path_id).expect("known path").data
823    }
824
825    /// Check if the remote has been validated in any active path
826    fn is_remote_validated(&self, remote: SocketAddr) -> bool {
827        self.paths
828            .values()
829            .any(|path_state| path_state.data.remote == remote && path_state.data.validated)
830        // TODO(@divma): we might want to ensure the path has been recently active to consider the
831        // address validated
832    }
833
834    fn ensure_path(
835        &mut self,
836        path_id: PathId,
837        remote: SocketAddr,
838        now: Instant,
839        pn: Option<u64>,
840    ) -> &mut PathData {
841        let validated = self.is_remote_validated(remote);
842        let vacant_entry = match self.paths.entry(path_id) {
843            btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
844            btree_map::Entry::Occupied(occupied_entry) => {
845                return &mut occupied_entry.into_mut().data;
846            }
847        };
848
849        debug!(%validated, %path_id, ?remote, "path added");
850        let peer_max_udp_payload_size =
851            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
852        self.path_counter = self.path_counter.wrapping_add(1);
853        let mut data = PathData::new(
854            remote,
855            self.allow_mtud,
856            Some(peer_max_udp_payload_size),
857            self.path_counter,
858            now,
859            &self.config,
860        );
861
862        data.validated = validated;
863
864        let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
865        self.timers.set(
866            Timer::PerPath(path_id, PathTimer::PathOpen),
867            now + 3 * pto,
868            self.qlog.with_time(now),
869        );
870
871        // for the path to be opened we need to send a packet on the path. Sending a challenge
872        // guarantees this
873        data.send_new_challenge = true;
874
875        let path = vacant_entry.insert(PathState { data, prev: None });
876
877        let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
878        if let Some(pn) = pn {
879            pn_space.dedup.insert(pn);
880        }
881        self.spaces[SpaceId::Data]
882            .number_spaces
883            .insert(path_id, pn_space);
884        self.qlog.emit_tuple_assigned(path_id, remote, now);
885        &mut path.data
886    }
887
888    /// Returns packets to transmit
889    ///
890    /// Connections should be polled for transmit after:
891    /// - the application performed some I/O on the connection
892    /// - a call was made to `handle_event`
893    /// - a call was made to `handle_timeout`
894    ///
895    /// `max_datagrams` specifies how many datagrams can be returned inside a
896    /// single Transmit using GSO. This must be at least 1.
897    #[must_use]
898    pub fn poll_transmit(
899        &mut self,
900        now: Instant,
901        max_datagrams: usize,
902        buf: &mut Vec<u8>,
903    ) -> Option<Transmit> {
904        assert!(max_datagrams != 0);
905        let max_datagrams = match self.config.enable_segmentation_offload {
906            false => 1,
907            true => max_datagrams,
908        };
909
910        // Each call to poll_transmit can only send datagrams to one destination, because
911        // all datagrams in a GSO batch are for the same destination.  Therefore only
912        // datagrams for one Path ID are produced for each poll_transmit call.
913
914        // TODO(flub): this is wishful thinking and not actually implemented, but perhaps it
915        //   should be:
916
917        // First, if we have to send a close, select a path for that.
918        // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending.
919
920        // For all, open, validated and AVAILABLE paths:
921        // - Is the path congestion blocked or pacing blocked?
922        // - call maybe_queue_ to ensure a tail-loss probe would be sent?
923        // - do we need to send a close message?
924        // - call can_send
925        // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths
926
927        // Check whether we need to send a close message
928        let close = match self.state.as_type() {
929            StateType::Drained => {
930                self.app_limited = true;
931                return None;
932            }
933            StateType::Draining | StateType::Closed => {
934                // self.close is only reset once the associated packet had been
935                // encoded successfully
936                if !self.close {
937                    self.app_limited = true;
938                    return None;
939                }
940                true
941            }
942            _ => false,
943        };
944
945        // Check whether we need to send an ACK_FREQUENCY frame
946        if let Some(config) = &self.config.ack_frequency_config {
947            let rtt = self
948                .paths
949                .values()
950                .map(|p| p.data.rtt.get())
951                .min()
952                .expect("one path exists");
953            self.spaces[SpaceId::Data].pending.ack_frequency = self
954                .ack_frequency
955                .should_send_ack_frequency(rtt, config, &self.peer_params)
956                && self.highest_space == SpaceId::Data
957                && self.peer_supports_ack_frequency();
958        }
959
960        // Whether this packet can be coalesced with another one in the same datagram.
961        let mut coalesce = true;
962
963        // Whether the last packet in the datagram must be padded so the datagram takes up
964        // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller.
965        let mut pad_datagram = PadDatagram::No;
966
967        // Whether congestion control stopped the next packet from being sent. Further
968        // packets could still be built, as e.g. tail-loss probes are not congestion
969        // limited.
970        let mut congestion_blocked = false;
971
972        // The packet number of the last built packet.
973        let mut last_packet_number = None;
974
975        let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
976
977        // If there is any open, validated and available path we only want to send frames to
978        // any backup path that must be sent on that backup path exclusively.
979        let have_available_path = self.paths.iter().any(|(id, path)| {
980            path.data.validated
981                && path.data.local_status() == PathStatus::Available
982                && self.rem_cids.contains_key(id)
983        });
984
985        // Setup for the first path_id
986        let mut transmit = TransmitBuf::new(
987            buf,
988            max_datagrams,
989            self.path_data(path_id).current_mtu().into(),
990        );
991        if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
992            return Some(challenge);
993        }
994        let mut space_id = match path_id {
995            PathId::ZERO => SpaceId::Initial,
996            _ => SpaceId::Data,
997        };
998
999        loop {
1000            // check if there is at least one active CID to use for sending
1001            let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1002                let err = PathError::RemoteCidsExhausted;
1003                if !self.abandoned_paths.contains(&path_id) {
1004                    debug!(?err, %path_id, "no active CID for path");
1005                    self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1006                        id: path_id,
1007                        error: err,
1008                    }));
1009                    // Locally we should have refused to open this path, the remote should
1010                    // have given us CIDs for this path before opening it.  So we can always
1011                    // abandon this here.
1012                    self.close_path(
1013                        now,
1014                        path_id,
1015                        TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1016                    )
1017                    .ok();
1018                    self.spaces[SpaceId::Data]
1019                        .pending
1020                        .path_cids_blocked
1021                        .push(path_id);
1022                } else {
1023                    trace!(%path_id, "remote CIDs retired for abandoned path");
1024                }
1025
1026                match self.paths.keys().find(|&&next| next > path_id) {
1027                    Some(next_path_id) => {
1028                        // See if this next path can send anything.
1029                        path_id = *next_path_id;
1030                        space_id = SpaceId::Data;
1031
1032                        // update per path state
1033                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1034                        if let Some(challenge) =
1035                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1036                        {
1037                            return Some(challenge);
1038                        }
1039
1040                        continue;
1041                    }
1042                    None => {
1043                        // Nothing more to send.
1044                        trace!(
1045                            ?space_id,
1046                            %path_id,
1047                            "no CIDs to send on path, no more paths"
1048                        );
1049                        break;
1050                    }
1051                }
1052            };
1053
1054            // Determine if anything can be sent in this packet number space (SpaceId +
1055            // PathId).
1056            let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1057                // We are trying to coalesce another packet into this datagram.
1058                transmit.datagram_remaining_mut()
1059            } else {
1060                // A new datagram needs to be started.
1061                transmit.segment_size()
1062            };
1063            let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1064            let path_should_send = {
1065                let path_exclusive_only = space_id == SpaceId::Data
1066                    && have_available_path
1067                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1068                let path_should_send = if path_exclusive_only {
1069                    can_send.path_exclusive
1070                } else {
1071                    !can_send.is_empty()
1072                };
1073                let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1074                path_should_send || needs_loss_probe || can_send.close
1075            };
1076
1077            if !path_should_send && space_id < SpaceId::Data {
1078                if self.spaces[space_id].crypto.is_some() {
1079                    trace!(?space_id, %path_id, "nothing to send in space");
1080                }
1081                space_id = space_id.next();
1082                continue;
1083            }
1084
1085            let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1086                // Only check congestion control if a new datagram is needed.
1087                self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1088            } else {
1089                PathBlocked::No
1090            };
1091            if send_blocked != PathBlocked::No {
1092                trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1093                congestion_blocked = true;
1094            }
1095            if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1096                // Higher spaces might still have tail-loss probes to send, which are not
1097                // congestion blocked.
1098                space_id = space_id.next();
1099                continue;
1100            }
1101            if !path_should_send || send_blocked != PathBlocked::No {
1102                // Nothing more to send on this path, check the next path if possible.
1103
1104                // If there are any datagrams in the transmit, packets for another path can
1105                // not be built.
1106                if transmit.num_datagrams() > 0 {
1107                    break;
1108                }
1109
1110                if transmit.is_empty() {
1111                    // Nothing to send on this path, and nothing yet built; try to send hole punching
1112                    // path challenges.
1113                    if let Some(probing) = self
1114                        .iroh_hp
1115                        .server_side_mut()
1116                        .ok()
1117                        .and_then(iroh_hp::ServerState::next_probe)
1118                    {
1119                        if let Some(new_cid) = self
1120                            .rem_cids
1121                            .get_mut(&path_id)
1122                            .and_then(CidQueue::next_reserved)
1123                        {
1124                            let destination = probing.remote();
1125                            let token: u64 = self.rng.random();
1126                            let challenge = frame::PathChallenge(token);
1127                            trace!(%destination, cid=%new_cid, %challenge, "Nat traversal: PATH_CHALLENGE packet");
1128                            buf.write(challenge);
1129                            QlogSentPacket::default().frame(&Frame::PathChallenge(challenge));
1130                            self.stats.frame_tx.path_challenge += 1;
1131
1132                            // TODO(@divma): padding
1133                            // Remember the token sent to this remote
1134                            probing.finish(token);
1135                            return Some(Transmit {
1136                                destination,
1137                                ecn: None,
1138                                size: 8,
1139                                segment_size: None,
1140                                src_ip: None,
1141                            });
1142                        }
1143                    }
1144                }
1145
1146                match self.paths.keys().find(|&&next| next > path_id) {
1147                    Some(next_path_id) => {
1148                        // See if this next path can send anything.
1149                        trace!(
1150                            ?space_id,
1151                            %path_id,
1152                            %next_path_id,
1153                            "nothing to send on path"
1154                        );
1155                        path_id = *next_path_id;
1156                        space_id = SpaceId::Data;
1157
1158                        // update per path state
1159                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1160                        if let Some(challenge) =
1161                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1162                        {
1163                            return Some(challenge);
1164                        }
1165
1166                        continue;
1167                    }
1168                    None => {
1169                        // Nothing more to send.
1170                        trace!(
1171                            ?space_id,
1172                            %path_id,
1173                            next_path_id=?None::<PathId>,
1174                            "nothing to send on path"
1175                        );
1176                        break;
1177                    }
1178                }
1179            }
1180
1181            // If the datagram is full, we need to start a new one.
1182            if transmit.datagram_remaining_mut() == 0 {
1183                if transmit.num_datagrams() >= transmit.max_datagrams() {
1184                    // No more datagrams allowed
1185                    break;
1186                }
1187
1188                match self.spaces[space_id].for_path(path_id).loss_probes {
1189                    0 => transmit.start_new_datagram(),
1190                    _ => {
1191                        // We need something to send for a tail-loss probe.
1192                        let request_immediate_ack =
1193                            space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1194                        self.spaces[space_id].maybe_queue_probe(
1195                            path_id,
1196                            request_immediate_ack,
1197                            &self.streams,
1198                        );
1199
1200                        self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1201
1202                        // Clamp the datagram to at most the minimum MTU to ensure that loss
1203                        // probes can get through and enable recovery even if the path MTU
1204                        // has shrank unexpectedly.
1205                        transmit.start_new_datagram_with_size(std::cmp::min(
1206                            usize::from(INITIAL_MTU),
1207                            transmit.segment_size(),
1208                        ));
1209                    }
1210                }
1211                trace!(count = transmit.num_datagrams(), "new datagram started");
1212                coalesce = true;
1213                pad_datagram = PadDatagram::No;
1214            }
1215
1216            // If coalescing another packet into the existing datagram, there should
1217            // still be enough space for a whole packet.
1218            if transmit.datagram_start_offset() < transmit.len() {
1219                debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1220            }
1221
1222            //
1223            // From here on, we've determined that a packet will definitely be sent.
1224            //
1225
1226            if self.spaces[SpaceId::Initial].crypto.is_some()
1227                && space_id == SpaceId::Handshake
1228                && self.side.is_client()
1229            {
1230                // A client stops both sending and processing Initial packets when it
1231                // sends its first Handshake packet.
1232                self.discard_space(now, SpaceId::Initial);
1233            }
1234            if let Some(ref mut prev) = self.prev_crypto {
1235                prev.update_unacked = false;
1236            }
1237
1238            let mut qlog = QlogSentPacket::default();
1239            let mut builder = PacketBuilder::new(
1240                now,
1241                space_id,
1242                path_id,
1243                remote_cid,
1244                &mut transmit,
1245                can_send.other,
1246                self,
1247                &mut qlog,
1248            )?;
1249            last_packet_number = Some(builder.exact_number);
1250            coalesce = coalesce && !builder.short_header;
1251
1252            if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1253                // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1
1254                pad_datagram |= PadDatagram::ToMinMtu;
1255            }
1256            if space_id == SpaceId::Data && self.config.pad_to_mtu {
1257                pad_datagram |= PadDatagram::ToSegmentSize;
1258            }
1259
1260            if can_send.close {
1261                trace!("sending CONNECTION_CLOSE");
1262                // Encode ACKs before the ConnectionClose message, to give the receiver
1263                // a better approximate on what data has been processed. This is
1264                // especially important with ack delay, since the peer might not
1265                // have gotten any other ACK for the data earlier on.
1266                let mut sent_frames = SentFrames::default();
1267                let is_multipath_negotiated = self.is_multipath_negotiated();
1268                for path_id in self.spaces[space_id]
1269                    .number_spaces
1270                    .iter()
1271                    .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1272                    .map(|(&path_id, _)| path_id)
1273                    .collect::<Vec<_>>()
1274                {
1275                    Self::populate_acks(
1276                        now,
1277                        self.receiving_ecn,
1278                        &mut sent_frames,
1279                        path_id,
1280                        space_id,
1281                        &mut self.spaces[space_id],
1282                        is_multipath_negotiated,
1283                        &mut builder.frame_space_mut(),
1284                        &mut self.stats,
1285                        &mut qlog,
1286                    );
1287                }
1288
1289                // Since there only 64 ACK frames there will always be enough space
1290                // to encode the ConnectionClose frame too. However we still have the
1291                // check here to prevent crashes if something changes.
1292                debug_assert!(
1293                    builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1294                    "ACKs should leave space for ConnectionClose"
1295                );
1296                if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1297                    let max_frame_size = builder.frame_space_remaining();
1298                    match self.state.as_type() {
1299                        StateType::Closed => {
1300                            let reason: Close =
1301                                self.state.as_closed().expect("checked").clone().into();
1302                            if space_id == SpaceId::Data || reason.is_transport_layer() {
1303                                reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1304                                qlog.frame(&Frame::Close(reason));
1305                            } else {
1306                                let frame = frame::ConnectionClose {
1307                                    error_code: TransportErrorCode::APPLICATION_ERROR,
1308                                    frame_type: None,
1309                                    reason: Bytes::new(),
1310                                };
1311                                frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1312                                qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1313                            }
1314                        }
1315                        StateType::Draining => {
1316                            let frame = frame::ConnectionClose {
1317                                error_code: TransportErrorCode::NO_ERROR,
1318                                frame_type: None,
1319                                reason: Bytes::new(),
1320                            };
1321                            frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1322                            qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1323                        }
1324                        _ => unreachable!(
1325                            "tried to make a close packet when the connection wasn't closed"
1326                        ),
1327                    };
1328                }
1329                builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1330                if space_id == self.highest_space {
1331                    // Don't send another close packet. Even with multipath we only send
1332                    // CONNECTION_CLOSE on a single path since we expect our paths to work.
1333                    self.close = false;
1334                    // `CONNECTION_CLOSE` is the final packet
1335                    break;
1336                } else {
1337                    // Send a close frame in every possible space for robustness, per
1338                    // RFC9000 "Immediate Close during the Handshake". Don't bother trying
1339                    // to send anything else.
1340                    space_id = space_id.next();
1341                    continue;
1342                }
1343            }
1344
1345            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that
1346            // path validation can occur while the link is saturated.
1347            if space_id == SpaceId::Data && builder.buf.is_empty() {
1348                let path = self.path_data_mut(path_id);
1349                if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1350                    let mut builder = if let Some(fresh_cid) = self
1351                        .rem_cids
1352                        .get_mut(&path_id)
1353                        .and_then(CidQueue::next_reserved)
1354                    {
1355                        PacketBuilder::new(
1356                            now,
1357                            space_id,
1358                            path_id,
1359                            fresh_cid,
1360                            &mut transmit,
1361                            can_send.other,
1362                            self,
1363                            &mut qlog,
1364                        )?
1365                    } else {
1366                        // TODO(@divma): keep the old logic. We would need to push the challenge
1367                        // back, but we have lost the packet number.
1368                        builder
1369                    };
1370                    let response = frame::PathResponse(token);
1371                    trace!(%response, "(off-path)");
1372                    builder.frame_space_mut().write(response);
1373                    qlog.frame(&Frame::PathResponse(response));
1374                    self.stats.frame_tx.path_response += 1;
1375                    builder.finish_and_track(
1376                        now,
1377                        self,
1378                        path_id,
1379                        SentFrames {
1380                            non_retransmits: true,
1381                            ..SentFrames::default()
1382                        },
1383                        PadDatagram::ToMinMtu,
1384                        qlog,
1385                    );
1386                    self.stats.udp_tx.on_sent(1, transmit.len());
1387                    return Some(Transmit {
1388                        destination: remote,
1389                        size: transmit.len(),
1390                        ecn: None,
1391                        segment_size: None,
1392                        src_ip: self.local_ip,
1393                    });
1394                }
1395            }
1396
1397            let sent_frames = {
1398                let path_exclusive_only = have_available_path
1399                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1400                let pn = builder.exact_number;
1401                self.populate_packet(
1402                    now,
1403                    space_id,
1404                    path_id,
1405                    path_exclusive_only,
1406                    &mut builder.frame_space_mut(),
1407                    pn,
1408                    &mut qlog,
1409                )
1410            };
1411
1412            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
1413            // any other reason, there is a bug which leads to one component announcing write
1414            // readiness while not writing any data. This degrades performance. The condition is
1415            // only checked if the full MTU is available and when potentially large fixed-size
1416            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
1417            // writing ACKs.
1418            debug_assert!(
1419                !(sent_frames.is_ack_only(&self.streams)
1420                    && !can_send.acks
1421                    && can_send.other
1422                    && builder.buf.segment_size()
1423                        == self.path_data(path_id).current_mtu() as usize
1424                    && self.datagrams.outgoing.is_empty()),
1425                "SendableFrames was {can_send:?}, but only ACKs have been written"
1426            );
1427            if sent_frames.requires_padding {
1428                pad_datagram |= PadDatagram::ToMinMtu;
1429            }
1430
1431            for (path_id, _pn) in sent_frames.largest_acked.iter() {
1432                self.spaces[space_id]
1433                    .for_path(*path_id)
1434                    .pending_acks
1435                    .acks_sent();
1436                self.timers.stop(
1437                    Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1438                    self.qlog.with_time(now),
1439                );
1440            }
1441
1442            // Now we need to finish the packet.  Before we do so we need to know if we will
1443            // be coalescing the next packet into this one, or will be ending the datagram
1444            // as well.  Because if this is the last packet in the datagram more padding
1445            // might be needed because of the packet type, or to fill the GSO segment size.
1446
1447            // Are we allowed to coalesce AND is there enough space for another *packet* in
1448            // this datagram AND is there another packet to send in this or the next space?
1449            if coalesce
1450                && builder
1451                    .buf
1452                    .datagram_remaining_mut()
1453                    .saturating_sub(builder.predict_packet_end())
1454                    > MIN_PACKET_SPACE
1455                && self
1456                    .next_send_space(space_id, path_id, builder.buf, close)
1457                    .is_some()
1458            {
1459                // We can append/coalesce the next packet into the current
1460                // datagram. Finish the current packet without adding extra padding.
1461                builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1462            } else {
1463                // We need a new datagram for the next packet.  Finish the current
1464                // packet with padding.
1465                if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1466                    // If too many padding bytes would be required to continue the
1467                    // GSO batch after this packet, end the GSO batch here. Ensures
1468                    // that fixed-size frames with heterogeneous sizes
1469                    // (e.g. application datagrams) won't inadvertently waste large
1470                    // amounts of bandwidth. The exact threshold is a bit arbitrary
1471                    // and might benefit from further tuning, though there's no
1472                    // universally optimal value.
1473                    const MAX_PADDING: usize = 32;
1474                    if builder.buf.datagram_remaining_mut()
1475                        > builder.predict_packet_end() + MAX_PADDING
1476                    {
1477                        trace!(
1478                            "GSO truncated by demand for {} padding bytes",
1479                            builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1480                        );
1481                        builder.finish_and_track(
1482                            now,
1483                            self,
1484                            path_id,
1485                            sent_frames,
1486                            PadDatagram::No,
1487                            qlog,
1488                        );
1489                        break;
1490                    }
1491
1492                    // Pad the current datagram to GSO segment size so it can be
1493                    // included in the GSO batch.
1494                    builder.finish_and_track(
1495                        now,
1496                        self,
1497                        path_id,
1498                        sent_frames,
1499                        PadDatagram::ToSegmentSize,
1500                        qlog,
1501                    );
1502                } else {
1503                    builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1504                }
1505                if transmit.num_datagrams() == 1 {
1506                    transmit.clip_datagram_size();
1507                }
1508            }
1509        }
1510
1511        if let Some(last_packet_number) = last_packet_number {
1512            // Note that when sending in multiple packet spaces the last packet number will
1513            // be the one from the highest packet space.
1514            self.path_data_mut(path_id).congestion.on_sent(
1515                now,
1516                transmit.len() as u64,
1517                last_packet_number,
1518            );
1519        }
1520
1521        self.qlog.emit_recovery_metrics(
1522            path_id,
1523            &mut self.paths.get_mut(&path_id).unwrap().data,
1524            now,
1525        );
1526
1527        self.app_limited = transmit.is_empty() && !congestion_blocked;
1528
1529        // Send MTU probe if necessary
1530        if transmit.is_empty() && self.state.is_established() {
1531            // MTU probing happens only in Data space.
1532            let space_id = SpaceId::Data;
1533            path_id = *self.paths.first_key_value().expect("one path must exist").0;
1534            let probe_data = loop {
1535                // We MTU probe all paths for which all of the following is true:
1536                // - We have an active destination CID for the path.
1537                // - The remote address *and* path are validated.
1538                // - The path is not abandoned.
1539                // - The MTU Discovery subsystem wants to probe the path.
1540                let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1541                let eligible = self.path_data(path_id).validated
1542                    && !self.path_data(path_id).is_validating_path()
1543                    && !self.abandoned_paths.contains(&path_id);
1544                let probe_size = eligible
1545                    .then(|| {
1546                        let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1547                        self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1548                    })
1549                    .flatten();
1550                match (active_cid, probe_size) {
1551                    (Some(active_cid), Some(probe_size)) => {
1552                        // Let's send an MTUD probe!
1553                        break Some((active_cid, probe_size));
1554                    }
1555                    _ => {
1556                        // Find the next path to check if it needs an MTUD probe.
1557                        match self.paths.keys().find(|&&next| next > path_id) {
1558                            Some(next) => {
1559                                path_id = *next;
1560                                continue;
1561                            }
1562                            None => break None,
1563                        }
1564                    }
1565                }
1566            };
1567            if let Some((active_cid, probe_size)) = probe_data {
1568                // We are definitely sending a DPLPMTUD probe.
1569                debug_assert_eq!(transmit.num_datagrams(), 0);
1570                transmit.start_new_datagram_with_size(probe_size as usize);
1571
1572                let mut qlog = QlogSentPacket::default();
1573                let mut builder = PacketBuilder::new(
1574                    now,
1575                    space_id,
1576                    path_id,
1577                    active_cid,
1578                    &mut transmit,
1579                    true,
1580                    self,
1581                    &mut qlog,
1582                )?;
1583
1584                // We implement MTU probes as ping packets padded up to the probe size
1585                trace!(?probe_size, "writing MTUD probe");
1586                trace!("PING");
1587                builder.frame_space_mut().write(frame::FrameType::PING);
1588                qlog.frame(&Frame::Ping);
1589                self.stats.frame_tx.ping += 1;
1590
1591                // If supported by the peer, we want no delays to the probe's ACK
1592                if self.peer_supports_ack_frequency() {
1593                    trace!("IMMEDIATE_ACK");
1594                    builder
1595                        .frame_space_mut()
1596                        .write(frame::FrameType::IMMEDIATE_ACK);
1597                    self.stats.frame_tx.immediate_ack += 1;
1598                    qlog.frame(&Frame::ImmediateAck);
1599                }
1600
1601                let sent_frames = SentFrames {
1602                    non_retransmits: true,
1603                    ..Default::default()
1604                };
1605                builder.finish_and_track(
1606                    now,
1607                    self,
1608                    path_id,
1609                    sent_frames,
1610                    PadDatagram::ToSize(probe_size),
1611                    qlog,
1612                );
1613
1614                self.path_stats
1615                    .entry(path_id)
1616                    .or_default()
1617                    .sent_plpmtud_probes += 1;
1618            }
1619        }
1620
1621        if transmit.is_empty() {
1622            return None;
1623        }
1624
1625        let destination = self.path_data(path_id).remote;
1626        trace!(
1627            segment_size = transmit.segment_size(),
1628            last_datagram_len = transmit.len() % transmit.segment_size(),
1629            ?destination,
1630            "sending {} bytes in {} datagrams",
1631            transmit.len(),
1632            transmit.num_datagrams()
1633        );
1634        self.path_data_mut(path_id)
1635            .inc_total_sent(transmit.len() as u64);
1636
1637        self.stats
1638            .udp_tx
1639            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1640
1641        Some(Transmit {
1642            destination,
1643            size: transmit.len(),
1644            ecn: if self.path_data(path_id).sending_ecn {
1645                Some(EcnCodepoint::Ect0)
1646            } else {
1647                None
1648            },
1649            segment_size: match transmit.num_datagrams() {
1650                1 => None,
1651                _ => Some(transmit.segment_size()),
1652            },
1653            src_ip: self.local_ip,
1654        })
1655    }
1656
1657    /// Returns the [`SpaceId`] of the next packet space which has data to send
1658    ///
1659    /// This takes into account the space available to frames in the next datagram.
1660    // TODO(flub): This duplication is not nice.
1661    fn next_send_space(
1662        &mut self,
1663        current_space_id: SpaceId,
1664        path_id: PathId,
1665        buf: &TransmitBuf<'_>,
1666        close: bool,
1667    ) -> Option<SpaceId> {
1668        // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed
1669        // to be able to send an individual frame at least this large in the next 1-RTT
1670        // packet. This could be generalized to support every space, but it's only needed to
1671        // handle large fixed-size frames, which only exist in 1-RTT (application
1672        // datagrams). We don't account for coalesced packets potentially occupying space
1673        // because frames can always spill into the next datagram.
1674        let mut space_id = current_space_id;
1675        loop {
1676            let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1677            if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1678                return Some(space_id);
1679            }
1680            space_id = match space_id {
1681                SpaceId::Initial => SpaceId::Handshake,
1682                SpaceId::Handshake => SpaceId::Data,
1683                SpaceId::Data => break,
1684            }
1685        }
1686        None
1687    }
1688
1689    /// Checks if creating a new datagram would be blocked by congestion control
1690    fn path_congestion_check(
1691        &mut self,
1692        space_id: SpaceId,
1693        path_id: PathId,
1694        transmit: &TransmitBuf<'_>,
1695        can_send: &SendableFrames,
1696        now: Instant,
1697    ) -> PathBlocked {
1698        // Anti-amplification is only based on `total_sent`, which gets updated after
1699        // the transmit is sent. Therefore we pass the amount of bytes for datagrams
1700        // that are already created, as well as 1 byte for starting another datagram. If
1701        // there is any anti-amplification budget left, we always allow a full MTU to be
1702        // sent (see https://github.com/quinn-rs/quinn/issues/1082).
1703        if self.side().is_server()
1704            && self
1705                .path_data(path_id)
1706                .anti_amplification_blocked(transmit.len() as u64 + 1)
1707        {
1708            trace!(?space_id, %path_id, "blocked by anti-amplification");
1709            return PathBlocked::AntiAmplification;
1710        }
1711
1712        // Congestion control check.
1713        // Tail loss probes must not be blocked by congestion, or a deadlock could arise.
1714        let bytes_to_send = transmit.segment_size() as u64;
1715        let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1716
1717        if can_send.other && !need_loss_probe && !can_send.close {
1718            let path = self.path_data(path_id);
1719            if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1720                trace!(?space_id, %path_id, "blocked by congestion control");
1721                return PathBlocked::Congestion;
1722            }
1723        }
1724
1725        // Pacing check.
1726        if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1727            self.timers.set(
1728                Timer::PerPath(path_id, PathTimer::Pacing),
1729                delay,
1730                self.qlog.with_time(now),
1731            );
1732            // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though
1733            // they are not congestion controlled.
1734            trace!(?space_id, %path_id, "blocked by pacing");
1735            return PathBlocked::Pacing;
1736        }
1737
1738        PathBlocked::No
1739    }
1740
1741    /// Send PATH_CHALLENGE for a previous path if necessary
1742    ///
1743    /// QUIC-TRANSPORT section 9.3.3
1744    /// <https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding>
1745    fn send_prev_path_challenge(
1746        &mut self,
1747        now: Instant,
1748        buf: &mut TransmitBuf<'_>,
1749        path_id: PathId,
1750    ) -> Option<Transmit> {
1751        let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1752        // TODO (matheus23): We could use !prev_path.is_validating() here instead to
1753        // (possibly) also re-send challenges when they get lost.
1754        if !prev_path.send_new_challenge {
1755            return None;
1756        };
1757        prev_path.send_new_challenge = false;
1758        let destination = prev_path.remote;
1759        let token = self.rng.random();
1760        let info = paths::SentChallengeInfo {
1761            sent_instant: now,
1762            remote: destination,
1763        };
1764        prev_path.challenges_sent.insert(token, info);
1765        debug_assert_eq!(
1766            self.highest_space,
1767            SpaceId::Data,
1768            "PATH_CHALLENGE queued without 1-RTT keys"
1769        );
1770        buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1771
1772        // Use the previous CID to avoid linking the new path with the previous path. We
1773        // don't bother accounting for possible retirement of that prev_cid because this is
1774        // sent once, immediately after migration, when the CID is known to be valid. Even
1775        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1776        // this is sent first.
1777        debug_assert_eq!(buf.datagram_start_offset(), 0);
1778        let mut qlog = QlogSentPacket::default();
1779        let mut builder = PacketBuilder::new(
1780            now,
1781            SpaceId::Data,
1782            path_id,
1783            *prev_cid,
1784            buf,
1785            false,
1786            self,
1787            &mut qlog,
1788        )?;
1789        let challenge = frame::PathChallenge(token);
1790        trace!(%challenge, "validating previous path");
1791        qlog.frame(&Frame::PathChallenge(challenge));
1792        builder.frame_space_mut().write(challenge);
1793        self.stats.frame_tx.path_challenge += 1;
1794
1795        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1796        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1797        // unless the anti-amplification limit for the path does not permit
1798        // sending a datagram of this size
1799        builder.pad_to(MIN_INITIAL_SIZE);
1800
1801        builder.finish(self, now, qlog);
1802        self.stats.udp_tx.on_sent(1, buf.len());
1803
1804        Some(Transmit {
1805            destination,
1806            size: buf.len(),
1807            ecn: None,
1808            segment_size: None,
1809            src_ip: self.local_ip,
1810        })
1811    }
1812
1813    /// Indicate what types of frames are ready to send for the given space
1814    ///
1815    /// *packet_size* is the number of bytes available to build the next packet.  *close*
1816    /// *indicates whether a CONNECTION_CLOSE frame needs to be sent.
1817    fn space_can_send(
1818        &mut self,
1819        space_id: SpaceId,
1820        path_id: PathId,
1821        packet_size: usize,
1822        close: bool,
1823    ) -> SendableFrames {
1824        let pn = self.spaces[SpaceId::Data]
1825            .for_path(path_id)
1826            .peek_tx_number();
1827        let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1828        if self.spaces[space_id].crypto.is_none()
1829            && (space_id != SpaceId::Data
1830                || self.zero_rtt_crypto.is_none()
1831                || self.side.is_server())
1832        {
1833            // No keys available for this space
1834            return SendableFrames::empty();
1835        }
1836        let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1837        if space_id == SpaceId::Data {
1838            can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1839        }
1840
1841        can_send.close = close && self.spaces[space_id].crypto.is_some();
1842
1843        can_send
1844    }
1845
1846    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1847    ///
1848    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1849    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1850    /// extracted through the relevant methods.
1851    pub fn handle_event(&mut self, event: ConnectionEvent) {
1852        use ConnectionEventInner::*;
1853        match event.0 {
1854            Datagram(DatagramConnectionEvent {
1855                now,
1856                remote,
1857                path_id,
1858                ecn,
1859                first_decode,
1860                remaining,
1861            }) => {
1862                let span = trace_span!("pkt", %path_id);
1863                let _guard = span.enter();
1864                // If this packet could initiate a migration and we're a client or a server that
1865                // forbids migration, drop the datagram. This could be relaxed to heuristically
1866                // permit NAT-rebinding-like migration.
1867                if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1868                    if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1869                        trace!(
1870                            %path_id,
1871                            ?remote,
1872                            path_remote = ?self.path(path_id).map(|p| p.remote),
1873                            "discarding packet from unrecognized peer"
1874                        );
1875                        return;
1876                    }
1877                }
1878
1879                let was_anti_amplification_blocked = self
1880                    .path(path_id)
1881                    .map(|path| path.anti_amplification_blocked(1))
1882                    .unwrap_or(true); // if we don't know about this path it's eagerly considered as unvalidated
1883                // TODO(@divma): revisit this
1884
1885                self.stats.udp_rx.datagrams += 1;
1886                self.stats.udp_rx.bytes += first_decode.len() as u64;
1887                let data_len = first_decode.len();
1888
1889                self.handle_decode(now, remote, path_id, ecn, first_decode);
1890                // The current `path` might have changed inside `handle_decode` since the packet
1891                // could have triggered a migration. The packet might also belong to an unknown
1892                // path and have been rejected. Make sure the data received is accounted for the
1893                // most recent path by accessing `path` after `handle_decode`.
1894                if let Some(path) = self.path_mut(path_id) {
1895                    path.inc_total_recvd(data_len as u64);
1896                }
1897
1898                if let Some(data) = remaining {
1899                    self.stats.udp_rx.bytes += data.len() as u64;
1900                    self.handle_coalesced(now, remote, path_id, ecn, data);
1901                }
1902
1903                if let Some(path) = self.paths.get_mut(&path_id) {
1904                    self.qlog
1905                        .emit_recovery_metrics(path_id, &mut path.data, now);
1906                }
1907
1908                if was_anti_amplification_blocked {
1909                    // A prior attempt to set the loss detection timer may have failed due to
1910                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1911                    // the server's first flight is lost.
1912                    self.set_loss_detection_timer(now, path_id);
1913                }
1914            }
1915            NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1916                let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1917                debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1918                let cid_state = self
1919                    .local_cid_state
1920                    .entry(path_id)
1921                    .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1922                cid_state.new_cids(&ids, now);
1923
1924                ids.into_iter().rev().for_each(|frame| {
1925                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1926                });
1927                // Always update Timer::PushNewCid
1928                self.reset_cid_retirement(now);
1929            }
1930        }
1931    }
1932
1933    /// Process timer expirations
1934    ///
1935    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1936    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1937    /// methods.
1938    ///
1939    /// It is most efficient to call this immediately after the system clock reaches the latest
1940    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1941    /// no-op and therefore are safe.
1942    pub fn handle_timeout(&mut self, now: Instant) {
1943        while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1944            // TODO(@divma): remove `at` when the unicorn is born
1945            trace!(?timer, at=?now, "timeout");
1946            match timer {
1947                Timer::Conn(timer) => match timer {
1948                    ConnTimer::Close => {
1949                        self.state.move_to_drained(None);
1950                        self.endpoint_events.push_back(EndpointEventInner::Drained);
1951                    }
1952                    ConnTimer::Idle => {
1953                        self.kill(ConnectionError::TimedOut);
1954                    }
1955                    ConnTimer::KeepAlive => {
1956                        trace!("sending keep-alive");
1957                        self.ping();
1958                    }
1959                    ConnTimer::KeyDiscard => {
1960                        self.zero_rtt_crypto = None;
1961                        self.prev_crypto = None;
1962                    }
1963                    ConnTimer::PushNewCid => {
1964                        while let Some((path_id, when)) = self.next_cid_retirement() {
1965                            if when > now {
1966                                break;
1967                            }
1968                            match self.local_cid_state.get_mut(&path_id) {
1969                                None => error!(%path_id, "No local CID state for path"),
1970                                Some(cid_state) => {
1971                                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1972                                    let num_new_cid = cid_state.on_cid_timeout().into();
1973                                    if !self.state.is_closed() {
1974                                        trace!(
1975                                            "push a new CID to peer RETIRE_PRIOR_TO field {}",
1976                                            cid_state.retire_prior_to()
1977                                        );
1978                                        self.endpoint_events.push_back(
1979                                            EndpointEventInner::NeedIdentifiers(
1980                                                path_id,
1981                                                now,
1982                                                num_new_cid,
1983                                            ),
1984                                        );
1985                                    }
1986                                }
1987                            }
1988                        }
1989                    }
1990                },
1991                // TODO: add path_id as span somehow
1992                Timer::PerPath(path_id, timer) => {
1993                    let span = trace_span!("per-path timer fired", %path_id, ?timer);
1994                    let _guard = span.enter();
1995                    match timer {
1996                        PathTimer::PathIdle => {
1997                            self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1998                                .ok();
1999                        }
2000
2001                        PathTimer::PathKeepAlive => {
2002                            trace!("sending keep-alive on path");
2003                            self.ping_path(path_id).ok();
2004                        }
2005                        PathTimer::LossDetection => {
2006                            self.on_loss_detection_timeout(now, path_id);
2007                            self.qlog.emit_recovery_metrics(
2008                                path_id,
2009                                &mut self.paths.get_mut(&path_id).unwrap().data,
2010                                now,
2011                            );
2012                        }
2013                        PathTimer::PathValidation => {
2014                            let Some(path) = self.paths.get_mut(&path_id) else {
2015                                continue;
2016                            };
2017                            self.timers.stop(
2018                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2019                                self.qlog.with_time(now),
2020                            );
2021                            debug!("path validation failed");
2022                            if let Some((_, prev)) = path.prev.take() {
2023                                path.data = prev;
2024                            }
2025                            path.data.challenges_sent.clear();
2026                            path.data.send_new_challenge = false;
2027                        }
2028                        PathTimer::PathChallengeLost => {
2029                            let Some(path) = self.paths.get_mut(&path_id) else {
2030                                continue;
2031                            };
2032                            trace!("path challenge deemed lost");
2033                            path.data.send_new_challenge = true;
2034                        }
2035                        PathTimer::PathOpen => {
2036                            let Some(path) = self.path_mut(path_id) else {
2037                                continue;
2038                            };
2039                            path.challenges_sent.clear();
2040                            path.send_new_challenge = false;
2041                            debug!("new path validation failed");
2042                            if let Err(err) = self.close_path(
2043                                now,
2044                                path_id,
2045                                TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2046                            ) {
2047                                warn!(?err, "failed closing path");
2048                            }
2049
2050                            self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2051                                id: path_id,
2052                                error: PathError::ValidationFailed,
2053                            }));
2054                        }
2055                        PathTimer::Pacing => trace!("pacing timer expired"),
2056                        PathTimer::MaxAckDelay => {
2057                            trace!("max ack delay reached");
2058                            // This timer is only armed in the Data space
2059                            self.spaces[SpaceId::Data]
2060                                .for_path(path_id)
2061                                .pending_acks
2062                                .on_max_ack_delay_timeout()
2063                        }
2064                        PathTimer::DiscardPath => {
2065                            // The path was abandoned and 3*PTO has expired since.  Clean up all
2066                            // remaining state and install stateless reset token.
2067                            self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2068                            if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2069                                let (min_seq, max_seq) = loc_cid_state.active_seq();
2070                                for seq in min_seq..=max_seq {
2071                                    self.endpoint_events.push_back(
2072                                        EndpointEventInner::RetireConnectionId(
2073                                            now, path_id, seq, false,
2074                                        ),
2075                                    );
2076                                }
2077                            }
2078                            self.discard_path(path_id, now);
2079                        }
2080                    }
2081                }
2082            }
2083        }
2084    }
2085
2086    /// Close a connection immediately
2087    ///
2088    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
2089    /// call this only when all important communications have been completed, e.g. by calling
2090    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
2091    /// [`StreamEvent::Finished`] event.
2092    ///
2093    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
2094    /// delivered. There may still be data from the peer that has not been received.
2095    ///
2096    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
2097    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2098        self.close_inner(
2099            now,
2100            Close::Application(frame::ApplicationClose { error_code, reason }),
2101        )
2102    }
2103
2104    fn close_inner(&mut self, now: Instant, reason: Close) {
2105        let was_closed = self.state.is_closed();
2106        if !was_closed {
2107            self.close_common();
2108            self.set_close_timer(now);
2109            self.close = true;
2110            self.state.move_to_closed_local(reason);
2111        }
2112    }
2113
2114    /// Control datagrams
2115    pub fn datagrams(&mut self) -> Datagrams<'_> {
2116        Datagrams { conn: self }
2117    }
2118
2119    /// Returns connection statistics
2120    pub fn stats(&mut self) -> ConnectionStats {
2121        self.stats.clone()
2122    }
2123
2124    /// Returns path statistics
2125    pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2126        let path = self.paths.get(&path_id)?;
2127        let stats = self.path_stats.entry(path_id).or_default();
2128        stats.rtt = path.data.rtt.get();
2129        stats.cwnd = path.data.congestion.window();
2130        stats.current_mtu = path.data.mtud.current_mtu();
2131        Some(*stats)
2132    }
2133
2134    /// Ping the remote endpoint
2135    ///
2136    /// Causes an ACK-eliciting packet to be transmitted on the connection.
2137    pub fn ping(&mut self) {
2138        // TODO(flub): This is very brute-force: it pings *all* the paths.  Instead it would
2139        //    be nice if we could only send a single packet for this.
2140        for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2141            path_data.ping_pending = true;
2142        }
2143    }
2144
2145    /// Ping the remote endpoint over a specific path
2146    ///
2147    /// Causes an ACK-eliciting packet to be transmitted on the path.
2148    pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2149        let path_data = self.spaces[self.highest_space]
2150            .number_spaces
2151            .get_mut(&path)
2152            .ok_or(ClosedPath { _private: () })?;
2153        path_data.ping_pending = true;
2154        Ok(())
2155    }
2156
2157    /// Update traffic keys spontaneously
2158    ///
2159    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
2160    pub fn force_key_update(&mut self) {
2161        if !self.state.is_established() {
2162            debug!("ignoring forced key update in illegal state");
2163            return;
2164        }
2165        if self.prev_crypto.is_some() {
2166            // We already just updated, or are currently updating, the keys. Concurrent key updates
2167            // are illegal.
2168            debug!("ignoring redundant forced key update");
2169            return;
2170        }
2171        self.update_keys(None, false);
2172    }
2173
2174    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
2175    #[doc(hidden)]
2176    #[deprecated]
2177    pub fn initiate_key_update(&mut self) {
2178        self.force_key_update();
2179    }
2180
2181    /// Get a session reference
2182    pub fn crypto_session(&self) -> &dyn crypto::Session {
2183        &*self.crypto
2184    }
2185
2186    /// Whether the connection is in the process of being established
2187    ///
2188    /// If this returns `false`, the connection may be either established or closed, signaled by the
2189    /// emission of a `Connected` or `ConnectionLost` message respectively.
2190    pub fn is_handshaking(&self) -> bool {
2191        self.state.is_handshake()
2192    }
2193
2194    /// Whether the connection is closed
2195    ///
2196    /// Closed connections cannot transport any further data. A connection becomes closed when
2197    /// either peer application intentionally closes it, or when either transport layer detects an
2198    /// error such as a time-out or certificate validation failure.
2199    ///
2200    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
2201    pub fn is_closed(&self) -> bool {
2202        self.state.is_closed()
2203    }
2204
2205    /// Whether there is no longer any need to keep the connection around
2206    ///
2207    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
2208    /// packets from the peer. All drained connections have been closed.
2209    pub fn is_drained(&self) -> bool {
2210        self.state.is_drained()
2211    }
2212
2213    /// For clients, if the peer accepted the 0-RTT data packets
2214    ///
2215    /// The value is meaningless until after the handshake completes.
2216    pub fn accepted_0rtt(&self) -> bool {
2217        self.accepted_0rtt
2218    }
2219
2220    /// Whether 0-RTT is/was possible during the handshake
2221    pub fn has_0rtt(&self) -> bool {
2222        self.zero_rtt_enabled
2223    }
2224
2225    /// Whether there are any pending retransmits
2226    pub fn has_pending_retransmits(&self) -> bool {
2227        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2228    }
2229
2230    /// Look up whether we're the client or server of this Connection
2231    pub fn side(&self) -> Side {
2232        self.side.side()
2233    }
2234
2235    /// Get the address observed by the remote over the given path
2236    pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2237        self.path(path_id)
2238            .map(|path_data| {
2239                path_data
2240                    .last_observed_addr_report
2241                    .as_ref()
2242                    .map(|observed| observed.socket_addr())
2243            })
2244            .ok_or(ClosedPath { _private: () })
2245    }
2246
2247    /// The local IP address which was used when the peer established
2248    /// the connection
2249    ///
2250    /// This can be different from the address the endpoint is bound to, in case
2251    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
2252    ///
2253    /// This will return `None` for clients, or when no `local_ip` was passed to
2254    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
2255    /// connection.
2256    pub fn local_ip(&self) -> Option<IpAddr> {
2257        self.local_ip
2258    }
2259
2260    /// Current best estimate of this connection's latency (round-trip-time)
2261    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2262        self.path(path_id).map(|d| d.rtt.get())
2263    }
2264
2265    /// Current state of this connection's congestion controller, for debugging purposes
2266    pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2267        self.path(path_id).map(|d| d.congestion.as_ref())
2268    }
2269
2270    /// Modify the number of remotely initiated streams that may be concurrently open
2271    ///
2272    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
2273    /// `count`s increase both minimum and worst-case memory consumption.
2274    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2275        self.streams.set_max_concurrent(dir, count);
2276        // If the limit was reduced, then a flow control update previously deemed insignificant may
2277        // now be significant.
2278        let pending = &mut self.spaces[SpaceId::Data].pending;
2279        self.streams.queue_max_stream_id(pending);
2280    }
2281
2282    /// Modify the number of open paths allowed when multipath is enabled
2283    ///
2284    /// When reducing the number of concurrent paths this will only affect delaying sending
2285    /// new MAX_PATH_ID frames until fewer than this number of paths are possible.  To
2286    /// actively reduce paths they must be closed using [`Connection::close_path`], which
2287    /// can also be used to close not-yet-opened paths.
2288    ///
2289    /// If multipath is not negotiated (see the [`TransportConfig`]) this can not enable
2290    /// multipath and will fail.
2291    pub fn set_max_concurrent_paths(
2292        &mut self,
2293        now: Instant,
2294        count: NonZeroU32,
2295    ) -> Result<(), MultipathNotNegotiated> {
2296        if !self.is_multipath_negotiated() {
2297            return Err(MultipathNotNegotiated { _private: () });
2298        }
2299        self.max_concurrent_paths = count;
2300
2301        let in_use_count = self
2302            .local_max_path_id
2303            .next()
2304            .saturating_sub(self.abandoned_paths.len() as u32)
2305            .as_u32();
2306        let extra_needed = count.get().saturating_sub(in_use_count);
2307        let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2308
2309        self.set_max_path_id(now, new_max_path_id);
2310
2311        Ok(())
2312    }
2313
2314    /// If needed, issues a new MAX_PATH_ID frame and new CIDs for any newly allowed paths
2315    fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2316        if max_path_id <= self.local_max_path_id {
2317            return;
2318        }
2319
2320        self.local_max_path_id = max_path_id;
2321        self.spaces[SpaceId::Data].pending.max_path_id = true;
2322
2323        self.issue_first_path_cids(now);
2324    }
2325
2326    /// Current number of remotely initiated streams that may be concurrently open
2327    ///
2328    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
2329    /// it will not change immediately, even if fewer streams are open. Instead, it will
2330    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
2331    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2332        self.streams.max_concurrent(dir)
2333    }
2334
2335    /// See [`TransportConfig::send_window()`]
2336    pub fn set_send_window(&mut self, send_window: u64) {
2337        self.streams.set_send_window(send_window);
2338    }
2339
2340    /// See [`TransportConfig::receive_window()`]
2341    pub fn set_receive_window(&mut self, receive_window: VarInt) {
2342        if self.streams.set_receive_window(receive_window) {
2343            self.spaces[SpaceId::Data].pending.max_data = true;
2344        }
2345    }
2346
2347    /// Whether the Multipath for QUIC extension is enabled.
2348    ///
2349    /// Multipath is only enabled after the handshake is completed and if it was enabled by both
2350    /// peers.
2351    pub fn is_multipath_negotiated(&self) -> bool {
2352        !self.is_handshaking()
2353            && self.config.max_concurrent_multipath_paths.is_some()
2354            && self.peer_params.initial_max_path_id.is_some()
2355    }
2356
2357    fn on_ack_received(
2358        &mut self,
2359        now: Instant,
2360        space: SpaceId,
2361        ack: frame::Ack,
2362    ) -> Result<(), TransportError> {
2363        // All ACKs are referencing path 0
2364        let path = PathId::ZERO;
2365        self.inner_on_ack_received(now, space, path, ack)
2366    }
2367
2368    fn on_path_ack_received(
2369        &mut self,
2370        now: Instant,
2371        space: SpaceId,
2372        path_ack: frame::PathAck,
2373    ) -> Result<(), TransportError> {
2374        let (ack, path) = path_ack.into_ack();
2375        self.inner_on_ack_received(now, space, path, ack)
2376    }
2377
2378    /// Handles an ACK frame acknowledging packets sent on *path*.
2379    fn inner_on_ack_received(
2380        &mut self,
2381        now: Instant,
2382        space: SpaceId,
2383        path: PathId,
2384        ack: frame::Ack,
2385    ) -> Result<(), TransportError> {
2386        if self.abandoned_paths.contains(&path) {
2387            // See also https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.4.3-3
2388            // > PATH_ACK frames received with an abandoned path ID are silently ignored, as specified in Section 4.
2389            trace!("silently ignoring PATH_ACK on abandoned path");
2390            return Ok(());
2391        }
2392        if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2393            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2394        }
2395        let new_largest = {
2396            let space = &mut self.spaces[space].for_path(path);
2397            if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2398                space.largest_acked_packet = Some(ack.largest);
2399                if let Some(info) = space.sent_packets.get(ack.largest) {
2400                    // This should always succeed, but a misbehaving peer might ACK a packet we
2401                    // haven't sent. At worst, that will result in us spuriously reducing the
2402                    // congestion window.
2403                    space.largest_acked_packet_sent = info.time_sent;
2404                }
2405                true
2406            } else {
2407                false
2408            }
2409        };
2410
2411        if self.detect_spurious_loss(&ack, space, path) {
2412            self.path_data_mut(path)
2413                .congestion
2414                .on_spurious_congestion_event();
2415        }
2416
2417        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
2418        let mut newly_acked = ArrayRangeSet::new();
2419        for range in ack.iter() {
2420            self.spaces[space].for_path(path).check_ack(range.clone())?;
2421            for (pn, _) in self.spaces[space]
2422                .for_path(path)
2423                .sent_packets
2424                .iter_range(range)
2425            {
2426                newly_acked.insert_one(pn);
2427            }
2428        }
2429
2430        if newly_acked.is_empty() {
2431            return Ok(());
2432        }
2433
2434        let mut ack_eliciting_acked = false;
2435        for packet in newly_acked.elts() {
2436            if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2437                for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2438                    // Assume ACKs for all packets below the largest acknowledged in
2439                    // `packet` have been received. This can cause the peer to spuriously
2440                    // retransmit if some of our earlier ACKs were lost, but allows for
2441                    // simpler state tracking. See discussion at
2442                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
2443                    if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2444                        pns.pending_acks.subtract_below(*acked_pn);
2445                    }
2446                }
2447                ack_eliciting_acked |= info.ack_eliciting;
2448
2449                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
2450                let path_data = self.path_data_mut(path);
2451                let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2452                if mtu_updated {
2453                    path_data
2454                        .congestion
2455                        .on_mtu_update(path_data.mtud.current_mtu());
2456                }
2457
2458                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
2459                self.ack_frequency.on_acked(path, packet);
2460
2461                self.on_packet_acked(now, path, info);
2462            }
2463        }
2464
2465        let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2466        let app_limited = self.app_limited;
2467        let path_data = self.path_data_mut(path);
2468        let in_flight = path_data.in_flight.bytes;
2469
2470        path_data
2471            .congestion
2472            .on_end_acks(now, in_flight, app_limited, largest_ackd);
2473
2474        if new_largest && ack_eliciting_acked {
2475            let ack_delay = if space != SpaceId::Data {
2476                Duration::from_micros(0)
2477            } else {
2478                cmp::min(
2479                    self.ack_frequency.peer_max_ack_delay,
2480                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2481                )
2482            };
2483            let rtt = now.saturating_duration_since(
2484                self.spaces[space].for_path(path).largest_acked_packet_sent,
2485            );
2486
2487            let next_pn = self.spaces[space].for_path(path).next_packet_number;
2488            let path_data = self.path_data_mut(path);
2489            // TODO(@divma): should be a method of path, should be contained in a single place
2490            path_data.rtt.update(ack_delay, rtt);
2491            if path_data.first_packet_after_rtt_sample.is_none() {
2492                path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2493            }
2494        }
2495
2496        // Must be called before crypto/pto_count are clobbered
2497        self.detect_lost_packets(now, space, path, true);
2498
2499        if self.peer_completed_address_validation(path) {
2500            self.path_data_mut(path).pto_count = 0;
2501        }
2502
2503        // Explicit congestion notification
2504        // TODO(@divma): this code is a good example of logic that should be contained in a single
2505        // place but it's split between the path data and the packet number space data, we should
2506        // find a way to make this work without two lookups
2507        if self.path_data(path).sending_ecn {
2508            if let Some(ecn) = ack.ecn {
2509                // We only examine ECN counters from ACKs that we are certain we received in transmit
2510                // order, allowing us to compute an increase in ECN counts to compare against the number
2511                // of newly acked packets that remains well-defined in the presence of arbitrary packet
2512                // reordering.
2513                if new_largest {
2514                    let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2515                    self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2516                }
2517            } else {
2518                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
2519                debug!("ECN not acknowledged by peer");
2520                self.path_data_mut(path).sending_ecn = false;
2521            }
2522        }
2523
2524        self.set_loss_detection_timer(now, path);
2525        Ok(())
2526    }
2527
2528    fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2529        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2530
2531        if lost_packets.is_empty() {
2532            return false;
2533        }
2534
2535        for range in ack.iter() {
2536            let spurious_losses: Vec<u64> = lost_packets
2537                .iter_range(range.clone())
2538                .map(|(pn, _info)| pn)
2539                .collect();
2540
2541            for pn in spurious_losses {
2542                lost_packets.remove(pn);
2543            }
2544        }
2545
2546        // If this ACK frame acknowledged all deemed lost packets,
2547        // then we have raised a spurious congestion event in the past.
2548        // We cannot conclude when there are remaining packets,
2549        // but future ACK frames might indicate a spurious loss detection.
2550        lost_packets.is_empty()
2551    }
2552
2553    /// Drain lost packets that we reasonably think will never arrive
2554    ///
2555    /// The current criterion is copied from `msquic`:
2556    /// discard packets that were sent earlier than 2 probe timeouts ago.
2557    fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2558        let two_pto = 2 * self.path_data(path).rtt.pto_base();
2559
2560        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2561        lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2562    }
2563
2564    /// Process a new ECN block from an in-order ACK
2565    fn process_ecn(
2566        &mut self,
2567        now: Instant,
2568        space: SpaceId,
2569        path: PathId,
2570        newly_acked: u64,
2571        ecn: frame::EcnCounts,
2572        largest_sent_time: Instant,
2573    ) {
2574        match self.spaces[space]
2575            .for_path(path)
2576            .detect_ecn(newly_acked, ecn)
2577        {
2578            Err(e) => {
2579                debug!("halting ECN due to verification failure: {}", e);
2580
2581                self.path_data_mut(path).sending_ecn = false;
2582                // Wipe out the existing value because it might be garbage and could interfere with
2583                // future attempts to use ECN on new paths.
2584                self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2585            }
2586            Ok(false) => {}
2587            Ok(true) => {
2588                self.path_stats.entry(path).or_default().congestion_events += 1;
2589                self.path_data_mut(path).congestion.on_congestion_event(
2590                    now,
2591                    largest_sent_time,
2592                    false,
2593                    true,
2594                    0,
2595                );
2596            }
2597        }
2598    }
2599
2600    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
2601    // high-latency handshakes
2602    fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2603        self.paths
2604            .get_mut(&path_id)
2605            .expect("known path")
2606            .remove_in_flight(&info);
2607        let app_limited = self.app_limited;
2608        let path = self.path_data_mut(path_id);
2609        if info.ack_eliciting && !path.is_validating_path() {
2610            // Only pass ACKs to the congestion controller if we are not validating the current
2611            // path, so as to ignore any ACKs from older paths still coming in.
2612            let rtt = path.rtt;
2613            path.congestion
2614                .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2615        }
2616
2617        // Update state for confirmed delivery of frames
2618        if let Some(retransmits) = info.retransmits.get() {
2619            for (id, _) in retransmits.reset_stream.iter() {
2620                self.streams.reset_acked(*id);
2621            }
2622        }
2623
2624        for frame in info.stream_frames {
2625            self.streams.received_ack_of(frame);
2626        }
2627    }
2628
2629    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2630        let start = if self.zero_rtt_crypto.is_some() {
2631            now
2632        } else {
2633            self.prev_crypto
2634                .as_ref()
2635                .expect("no previous keys")
2636                .end_packet
2637                .as_ref()
2638                .expect("update not acknowledged yet")
2639                .1
2640        };
2641
2642        // QUIC-MULTIPATH § 2.5 Key Phase Update Process: use largest PTO off all paths.
2643        self.timers.set(
2644            Timer::Conn(ConnTimer::KeyDiscard),
2645            start + self.pto_max_path(space) * 3,
2646            self.qlog.with_time(now),
2647        );
2648    }
2649
2650    /// Handle a [`PathTimer::LossDetection`] timeout.
2651    ///
2652    /// This timer expires for two reasons:
2653    /// - An ACK-eliciting packet we sent should be considered lost.
2654    /// - The PTO may have expired and a tail-loss probe needs to be scheduled.
2655    ///
2656    /// The former needs us to schedule re-transmission of the lost data.
2657    ///
2658    /// The latter means we have not received an ACK for an ack-eliciting packet we sent
2659    /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
2660    /// packet, to try and elicit new acknowledgements. These new acknowledgements will
2661    /// indicate whether the previously sent packets were lost or not.
2662    fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2663        if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2664            // Time threshold loss Detection
2665            self.detect_lost_packets(now, pn_space, path_id, false);
2666            self.set_loss_detection_timer(now, path_id);
2667            return;
2668        }
2669
2670        let (_, space) = match self.pto_time_and_space(now, path_id) {
2671            Some(x) => x,
2672            None => {
2673                error!(%path_id, "PTO expired while unset");
2674                return;
2675            }
2676        };
2677        trace!(
2678            in_flight = self.path_data(path_id).in_flight.bytes,
2679            count = self.path_data(path_id).pto_count,
2680            ?space,
2681            %path_id,
2682            "PTO fired"
2683        );
2684
2685        let count = match self.path_data(path_id).in_flight.ack_eliciting {
2686            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
2687            // deadlock preventions
2688            0 => {
2689                debug_assert!(!self.peer_completed_address_validation(path_id));
2690                1
2691            }
2692            // Conventional loss probe
2693            _ => 2,
2694        };
2695        let pns = self.spaces[space].for_path(path_id);
2696        pns.loss_probes = pns.loss_probes.saturating_add(count);
2697        let path_data = self.path_data_mut(path_id);
2698        path_data.pto_count = path_data.pto_count.saturating_add(1);
2699        self.set_loss_detection_timer(now, path_id);
2700    }
2701
2702    /// Detect any lost packets
2703    ///
2704    /// There are two cases in which we detects lost packets:
2705    ///
2706    /// - We received an ACK packet.
2707    /// - The [`PathTimer::LossDetection`] timer expired. So there is an un-acknowledged packet
2708    ///   that was followed by an acknowledged packet. The loss timer for this
2709    ///   un-acknowledged packet expired and we need to detect that packet as lost.
2710    ///
2711    /// Packets are lost if they are both (See RFC9002 §6.1):
2712    ///
2713    /// - Unacknowledged, in flight and sent prior to an acknowledged packet.
2714    /// - Old enough by either:
2715    ///   - Having a packet number [`TransportConfig::packet_threshold`] lower then the last
2716    ///     acknowledged packet.
2717    ///   - Being sent [`TransportConfig::time_threshold`] * RTT in the past.
2718    fn detect_lost_packets(
2719        &mut self,
2720        now: Instant,
2721        pn_space: SpaceId,
2722        path_id: PathId,
2723        due_to_ack: bool,
2724    ) {
2725        let mut lost_packets = Vec::<u64>::new();
2726        let mut lost_mtu_probe = None;
2727        let mut in_persistent_congestion = false;
2728        let mut size_of_lost_packets = 0u64;
2729        self.spaces[pn_space].for_path(path_id).loss_time = None;
2730
2731        // Find all the lost packets, populating all variables initialised above.
2732
2733        let path = self.path_data(path_id);
2734        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2735        let loss_delay = path
2736            .rtt
2737            .conservative()
2738            .mul_f32(self.config.time_threshold)
2739            .max(TIMER_GRANULARITY);
2740        let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2741
2742        let largest_acked_packet = self.spaces[pn_space]
2743            .for_path(path_id)
2744            .largest_acked_packet
2745            .expect("detect_lost_packets only to be called if path received at least one ACK");
2746        let packet_threshold = self.config.packet_threshold as u64;
2747
2748        // InPersistentCongestion: Determine if all packets in the time period before the newest
2749        // lost packet, including the edges, are marked lost. PTO computation must always
2750        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
2751        let congestion_period = self
2752            .pto(SpaceId::Data, path_id)
2753            .saturating_mul(self.config.persistent_congestion_threshold);
2754        let mut persistent_congestion_start: Option<Instant> = None;
2755        let mut prev_packet = None;
2756        let space = self.spaces[pn_space].for_path(path_id);
2757
2758        for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2759            if prev_packet != Some(packet.wrapping_sub(1)) {
2760                // An intervening packet was acknowledged
2761                persistent_congestion_start = None;
2762            }
2763
2764            // Packets sent before now - loss_delay are deemed lost.
2765            // However, we avoid subtraction as it can panic and there's no
2766            // saturating equivalent of this subtraction operation with a Duration.
2767            let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2768            if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2769                // The packet should be declared lost.
2770                if Some(packet) == in_flight_mtu_probe {
2771                    // Lost MTU probes are not included in `lost_packets`, because they
2772                    // should not trigger a congestion control response
2773                    lost_mtu_probe = in_flight_mtu_probe;
2774                } else {
2775                    lost_packets.push(packet);
2776                    size_of_lost_packets += info.size as u64;
2777                    if info.ack_eliciting && due_to_ack {
2778                        match persistent_congestion_start {
2779                            // Two ACK-eliciting packets lost more than
2780                            // congestion_period apart, with no ACKed packets in between
2781                            Some(start) if info.time_sent - start > congestion_period => {
2782                                in_persistent_congestion = true;
2783                            }
2784                            // Persistent congestion must start after the first RTT sample
2785                            None if first_packet_after_rtt_sample
2786                                .is_some_and(|x| x < (pn_space, packet)) =>
2787                            {
2788                                persistent_congestion_start = Some(info.time_sent);
2789                            }
2790                            _ => {}
2791                        }
2792                    }
2793                }
2794            } else {
2795                // The packet should not yet be declared lost.
2796                if space.loss_time.is_none() {
2797                    // Since we iterate in order the lowest packet number's loss time will
2798                    // always be the earliest.
2799                    space.loss_time = Some(info.time_sent + loss_delay);
2800                }
2801                persistent_congestion_start = None;
2802            }
2803
2804            prev_packet = Some(packet);
2805        }
2806
2807        self.handle_lost_packets(
2808            pn_space,
2809            path_id,
2810            now,
2811            lost_packets,
2812            lost_mtu_probe,
2813            loss_delay,
2814            in_persistent_congestion,
2815            size_of_lost_packets,
2816        );
2817    }
2818
2819    /// Drops the path state, declaring any remaining in-flight packets as lost
2820    fn discard_path(&mut self, path_id: PathId, now: Instant) {
2821        trace!(%path_id, "dropping path state");
2822        let path = self.path_data(path_id);
2823        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2824
2825        let mut size_of_lost_packets = 0u64; // add to path_stats.lost_bytes;
2826        let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2827            .for_path(path_id)
2828            .sent_packets
2829            .iter()
2830            .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2831            .map(|(pn, info)| {
2832                size_of_lost_packets += info.size as u64;
2833                pn
2834            })
2835            .collect();
2836
2837        if !lost_pns.is_empty() {
2838            trace!(
2839                %path_id,
2840                count = lost_pns.len(),
2841                lost_bytes = size_of_lost_packets,
2842                "packets lost on path abandon"
2843            );
2844            self.handle_lost_packets(
2845                SpaceId::Data,
2846                path_id,
2847                now,
2848                lost_pns,
2849                in_flight_mtu_probe,
2850                Duration::ZERO,
2851                false,
2852                size_of_lost_packets,
2853            );
2854        }
2855        self.paths.remove(&path_id);
2856        self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2857
2858        let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2859        self.events.push_back(
2860            PathEvent::Abandoned {
2861                id: path_id,
2862                path_stats,
2863            }
2864            .into(),
2865        );
2866    }
2867
2868    fn handle_lost_packets(
2869        &mut self,
2870        pn_space: SpaceId,
2871        path_id: PathId,
2872        now: Instant,
2873        lost_packets: Vec<u64>,
2874        lost_mtu_probe: Option<u64>,
2875        loss_delay: Duration,
2876        in_persistent_congestion: bool,
2877        size_of_lost_packets: u64,
2878    ) {
2879        debug_assert!(
2880            {
2881                let mut sorted = lost_packets.clone();
2882                sorted.sort();
2883                sorted == lost_packets
2884            },
2885            "lost_packets must be sorted"
2886        );
2887
2888        self.drain_lost_packets(now, pn_space, path_id);
2889
2890        // OnPacketsLost
2891        if let Some(largest_lost) = lost_packets.last().cloned() {
2892            let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2893            let largest_lost_sent = self.spaces[pn_space]
2894                .for_path(path_id)
2895                .sent_packets
2896                .get(largest_lost)
2897                .unwrap()
2898                .time_sent;
2899            let path_stats = self.path_stats.entry(path_id).or_default();
2900            path_stats.lost_packets += lost_packets.len() as u64;
2901            path_stats.lost_bytes += size_of_lost_packets;
2902            trace!(
2903                %path_id,
2904                count = lost_packets.len(),
2905                lost_bytes = size_of_lost_packets,
2906                "packets lost",
2907            );
2908
2909            for &packet in &lost_packets {
2910                let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2911                    continue;
2912                };
2913                self.qlog
2914                    .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2915                self.paths
2916                    .get_mut(&path_id)
2917                    .unwrap()
2918                    .remove_in_flight(&info);
2919
2920                for frame in info.stream_frames {
2921                    self.streams.retransmit(frame);
2922                }
2923                self.spaces[pn_space].pending |= info.retransmits;
2924                self.path_data_mut(path_id)
2925                    .mtud
2926                    .on_non_probe_lost(packet, info.size);
2927
2928                self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2929                    packet,
2930                    LostPacket {
2931                        time_sent: info.time_sent,
2932                    },
2933                );
2934            }
2935
2936            let path = self.path_data_mut(path_id);
2937            if path.mtud.black_hole_detected(now) {
2938                path.congestion.on_mtu_update(path.mtud.current_mtu());
2939                if let Some(max_datagram_size) = self.datagrams().max_size() {
2940                    self.datagrams.drop_oversized(max_datagram_size);
2941                }
2942                self.path_stats
2943                    .entry(path_id)
2944                    .or_default()
2945                    .black_holes_detected += 1;
2946            }
2947
2948            // Don't apply congestion penalty for lost ack-only packets
2949            let lost_ack_eliciting =
2950                old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2951
2952            if lost_ack_eliciting {
2953                self.path_stats
2954                    .entry(path_id)
2955                    .or_default()
2956                    .congestion_events += 1;
2957                self.path_data_mut(path_id).congestion.on_congestion_event(
2958                    now,
2959                    largest_lost_sent,
2960                    in_persistent_congestion,
2961                    false,
2962                    size_of_lost_packets,
2963                );
2964            }
2965        }
2966
2967        // Handle a lost MTU probe
2968        if let Some(packet) = lost_mtu_probe {
2969            let info = self.spaces[SpaceId::Data]
2970                .for_path(path_id)
2971                .take(packet)
2972                .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and
2973            // therefore must not have been removed yet
2974            self.paths
2975                .get_mut(&path_id)
2976                .unwrap()
2977                .remove_in_flight(&info);
2978            self.path_data_mut(path_id).mtud.on_probe_lost();
2979            self.path_stats
2980                .entry(path_id)
2981                .or_default()
2982                .lost_plpmtud_probes += 1;
2983        }
2984    }
2985
2986    /// Returns the earliest time packets should be declared lost for all spaces on a path.
2987    ///
2988    /// If a path has an acknowledged packet with any prior un-acknowledged packets, the
2989    /// earliest un-acknowledged packet can be declared lost after a timeout has elapsed.
2990    /// The time returned is when this packet should be declared lost.
2991    fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2992        SpaceId::iter()
2993            .filter_map(|id| {
2994                self.spaces[id]
2995                    .number_spaces
2996                    .get(&path_id)
2997                    .and_then(|pns| pns.loss_time)
2998                    .map(|time| (time, id))
2999            })
3000            .min_by_key(|&(time, _)| time)
3001    }
3002
3003    /// Returns the earliest next PTO should fire for all spaces on a path.
3004    fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3005        let path = self.path(path_id)?;
3006        let pto_count = path.pto_count;
3007        let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
3008        let mut duration = path.rtt.pto_base() * backoff;
3009
3010        if path_id == PathId::ZERO
3011            && path.in_flight.ack_eliciting == 0
3012            && !self.peer_completed_address_validation(PathId::ZERO)
3013        {
3014            // Address Validation during Connection Establishment:
3015            // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
3016            // deadlock if an Initial or Handshake packet from the server is lost and the
3017            // server can not send more due to its anti-amplification limit the client must
3018            // send another packet on PTO.
3019            let space = match self.highest_space {
3020                SpaceId::Handshake => SpaceId::Handshake,
3021                _ => SpaceId::Initial,
3022            };
3023
3024            return Some((now + duration, space));
3025        }
3026
3027        let mut result = None;
3028        for space in SpaceId::iter() {
3029            let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3030                continue;
3031            };
3032
3033            if !pns.has_in_flight() {
3034                continue;
3035            }
3036            if space == SpaceId::Data {
3037                // Skip ApplicationData until handshake completes.
3038                if self.is_handshaking() {
3039                    return result;
3040                }
3041                // Include max_ack_delay and backoff for ApplicationData.
3042                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3043            }
3044            let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3045                continue;
3046            };
3047            let pto = last_ack_eliciting + duration;
3048            if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3049                if path.anti_amplification_blocked(1) {
3050                    // Nothing would be able to be sent.
3051                    continue;
3052                }
3053                if path.in_flight.ack_eliciting == 0 {
3054                    // Nothing ack-eliciting, no PTO to arm/fire.
3055                    continue;
3056                }
3057                result = Some((pto, space));
3058            }
3059        }
3060        result
3061    }
3062
3063    fn peer_completed_address_validation(&self, path: PathId) -> bool {
3064        // TODO(flub): This logic needs updating for multipath
3065        if self.side.is_server() || self.state.is_closed() {
3066            return true;
3067        }
3068        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
3069        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
3070        self.spaces[SpaceId::Handshake]
3071            .path_space(PathId::ZERO)
3072            .and_then(|pns| pns.largest_acked_packet)
3073            .is_some()
3074            || self.spaces[SpaceId::Data]
3075                .path_space(path)
3076                .and_then(|pns| pns.largest_acked_packet)
3077                .is_some()
3078            || (self.spaces[SpaceId::Data].crypto.is_some()
3079                && self.spaces[SpaceId::Handshake].crypto.is_none())
3080    }
3081
3082    /// Resets the the [`PathTimer::LossDetection`] timer to the next instant it may be needed
3083    ///
3084    /// The timer must fire if either:
3085    /// - An ack-eliciting packet we sent needs to be declared lost.
3086    /// - A tail-loss probe needs to be sent.
3087    ///
3088    /// See [`Connection::on_loss_detection_timeout`] for details.
3089    fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3090        if self.state.is_closed() {
3091            // No loss detection takes place on closed connections, and `close_common` already
3092            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
3093            // reordered packet being handled by state-insensitive code.
3094            return;
3095        }
3096
3097        if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3098            // Time threshold loss detection.
3099            self.timers.set(
3100                Timer::PerPath(path_id, PathTimer::LossDetection),
3101                loss_time,
3102                self.qlog.with_time(now),
3103            );
3104            return;
3105        }
3106
3107        // Determine which PN space to arm PTO for.
3108        // Calculate PTO duration
3109        if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3110            self.timers.set(
3111                Timer::PerPath(path_id, PathTimer::LossDetection),
3112                timeout,
3113                self.qlog.with_time(now),
3114            );
3115        } else {
3116            self.timers.stop(
3117                Timer::PerPath(path_id, PathTimer::LossDetection),
3118                self.qlog.with_time(now),
3119            );
3120        }
3121    }
3122
3123    /// The maximum probe timeout across all paths
3124    ///
3125    /// See [`Connection::pto`]
3126    fn pto_max_path(&self, space: SpaceId) -> Duration {
3127        match space {
3128            SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3129            SpaceId::Data => self
3130                .paths
3131                .keys()
3132                .map(|path_id| self.pto(space, *path_id))
3133                .max()
3134                .expect("there should be one at least path"),
3135        }
3136    }
3137
3138    /// Probe Timeout
3139    ///
3140    /// The PTO is logically the time in which you'd expect to receive an acknowledgement
3141    /// for a packet. So approximately RTT + max_ack_delay.
3142    fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3143        let max_ack_delay = match space {
3144            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3145            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3146        };
3147        self.path_data(path_id).rtt.pto_base() + max_ack_delay
3148    }
3149
3150    fn on_packet_authenticated(
3151        &mut self,
3152        now: Instant,
3153        space_id: SpaceId,
3154        path_id: PathId,
3155        ecn: Option<EcnCodepoint>,
3156        packet: Option<u64>,
3157        spin: bool,
3158        is_1rtt: bool,
3159    ) {
3160        self.total_authed_packets += 1;
3161        if let Some(last_allowed_receive) = self
3162            .paths
3163            .get(&path_id)
3164            .and_then(|path| path.data.last_allowed_receive)
3165        {
3166            if now > last_allowed_receive {
3167                warn!("received data on path which we abandoned more than 3 * PTO ago");
3168                // The peer failed to respond with a PATH_ABANDON in time.
3169                if !self.state.is_closed() {
3170                    // TODO(flub): What should the error code be?
3171                    self.state.move_to_closed(TransportError::NO_ERROR(
3172                        "peer failed to respond with PATH_ABANDON in time",
3173                    ));
3174                    self.close_common();
3175                    self.set_close_timer(now);
3176                    self.close = true;
3177                }
3178                return;
3179            }
3180        }
3181
3182        self.reset_keep_alive(path_id, now);
3183        self.reset_idle_timeout(now, space_id, path_id);
3184        self.permit_idle_reset = true;
3185        self.receiving_ecn |= ecn.is_some();
3186        if let Some(x) = ecn {
3187            let space = &mut self.spaces[space_id];
3188            space.for_path(path_id).ecn_counters += x;
3189
3190            if x.is_ce() {
3191                space
3192                    .for_path(path_id)
3193                    .pending_acks
3194                    .set_immediate_ack_required();
3195            }
3196        }
3197
3198        let packet = match packet {
3199            Some(x) => x,
3200            None => return,
3201        };
3202        match &self.side {
3203            ConnectionSide::Client { .. } => {
3204                // If we received a handshake packet that authenticated, then we're talking to
3205                // the real server.  From now on we should no longer allow the server to migrate
3206                // its address.
3207                if space_id == SpaceId::Handshake {
3208                    if let Some(hs) = self.state.as_handshake_mut() {
3209                        hs.allow_server_migration = false;
3210                    }
3211                }
3212            }
3213            ConnectionSide::Server { .. } => {
3214                if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3215                {
3216                    // A server stops sending and processing Initial packets when it receives its first Handshake packet.
3217                    self.discard_space(now, SpaceId::Initial);
3218                }
3219                if self.zero_rtt_crypto.is_some() && is_1rtt {
3220                    // Discard 0-RTT keys soon after receiving a 1-RTT packet
3221                    self.set_key_discard_timer(now, space_id)
3222                }
3223            }
3224        }
3225        let space = self.spaces[space_id].for_path(path_id);
3226        space.pending_acks.insert_one(packet, now);
3227        if packet >= space.rx_packet.unwrap_or_default() {
3228            space.rx_packet = Some(packet);
3229            // Update outgoing spin bit, inverting iff we're the client
3230            self.spin = self.side.is_client() ^ spin;
3231        }
3232    }
3233
3234    /// Resets the idle timeout timers
3235    ///
3236    /// Without multipath there is only the connection-wide idle timeout. When multipath is
3237    /// enabled there is an additional per-path idle timeout.
3238    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3239        // First reset the global idle timeout.
3240        if let Some(timeout) = self.idle_timeout {
3241            if self.state.is_closed() {
3242                self.timers
3243                    .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3244            } else {
3245                let dt = cmp::max(timeout, 3 * self.pto_max_path(space));
3246                self.timers.set(
3247                    Timer::Conn(ConnTimer::Idle),
3248                    now + dt,
3249                    self.qlog.with_time(now),
3250                );
3251            }
3252        }
3253
3254        // Now handle the per-path state
3255        if let Some(timeout) = self.path_data(path_id).idle_timeout {
3256            if self.state.is_closed() {
3257                self.timers.stop(
3258                    Timer::PerPath(path_id, PathTimer::PathIdle),
3259                    self.qlog.with_time(now),
3260                );
3261            } else {
3262                let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3263                self.timers.set(
3264                    Timer::PerPath(path_id, PathTimer::PathIdle),
3265                    now + dt,
3266                    self.qlog.with_time(now),
3267                );
3268            }
3269        }
3270    }
3271
3272    /// Resets both the [`ConnTimer::KeepAlive`] and [`PathTimer::PathKeepAlive`] timers
3273    fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3274        if !self.state.is_established() {
3275            return;
3276        }
3277
3278        if let Some(interval) = self.config.keep_alive_interval {
3279            self.timers.set(
3280                Timer::Conn(ConnTimer::KeepAlive),
3281                now + interval,
3282                self.qlog.with_time(now),
3283            );
3284        }
3285
3286        if let Some(interval) = self.path_data(path_id).keep_alive {
3287            self.timers.set(
3288                Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3289                now + interval,
3290                self.qlog.with_time(now),
3291            );
3292        }
3293    }
3294
3295    /// Sets the timer for when a previously issued CID should be retired next
3296    fn reset_cid_retirement(&mut self, now: Instant) {
3297        if let Some((_path, t)) = self.next_cid_retirement() {
3298            self.timers.set(
3299                Timer::Conn(ConnTimer::PushNewCid),
3300                t,
3301                self.qlog.with_time(now),
3302            );
3303        }
3304    }
3305
3306    /// The next time when a previously issued CID should be retired
3307    fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3308        self.local_cid_state
3309            .iter()
3310            .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3311            .min_by_key(|(_path_id, timeout)| *timeout)
3312    }
3313
3314    /// Handle the already-decrypted first packet from the client
3315    ///
3316    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
3317    /// efficient.
3318    pub(crate) fn handle_first_packet(
3319        &mut self,
3320        now: Instant,
3321        remote: SocketAddr,
3322        ecn: Option<EcnCodepoint>,
3323        packet_number: u64,
3324        packet: InitialPacket,
3325        remaining: Option<BytesMut>,
3326    ) -> Result<(), ConnectionError> {
3327        let span = trace_span!("first recv");
3328        let _guard = span.enter();
3329        debug_assert!(self.side.is_server());
3330        let len = packet.header_data.len() + packet.payload.len();
3331        let path_id = PathId::ZERO;
3332        self.path_data_mut(path_id).total_recvd = len as u64;
3333
3334        if let Some(hs) = self.state.as_handshake_mut() {
3335            hs.expected_token = packet.header.token.clone();
3336        } else {
3337            unreachable!("first packet must be delivered in Handshake state");
3338        }
3339
3340        // The first packet is always on PathId::ZERO
3341        self.on_packet_authenticated(
3342            now,
3343            SpaceId::Initial,
3344            path_id,
3345            ecn,
3346            Some(packet_number),
3347            false,
3348            false,
3349        );
3350
3351        let packet: Packet = packet.into();
3352
3353        let mut qlog = QlogRecvPacket::new(len);
3354        qlog.header(&packet.header, Some(packet_number), path_id);
3355
3356        self.process_decrypted_packet(
3357            now,
3358            remote,
3359            path_id,
3360            Some(packet_number),
3361            packet,
3362            &mut qlog,
3363        )?;
3364        self.qlog.emit_packet_received(qlog, now);
3365        if let Some(data) = remaining {
3366            self.handle_coalesced(now, remote, path_id, ecn, data);
3367        }
3368
3369        self.qlog.emit_recovery_metrics(
3370            path_id,
3371            &mut self.paths.get_mut(&path_id).unwrap().data,
3372            now,
3373        );
3374
3375        Ok(())
3376    }
3377
3378    fn init_0rtt(&mut self, now: Instant) {
3379        let (header, packet) = match self.crypto.early_crypto() {
3380            Some(x) => x,
3381            None => return,
3382        };
3383        if self.side.is_client() {
3384            match self.crypto.transport_parameters() {
3385                Ok(params) => {
3386                    let params = params
3387                        .expect("crypto layer didn't supply transport parameters with ticket");
3388                    // Certain values must not be cached
3389                    let params = TransportParameters {
3390                        initial_src_cid: None,
3391                        original_dst_cid: None,
3392                        preferred_address: None,
3393                        retry_src_cid: None,
3394                        stateless_reset_token: None,
3395                        min_ack_delay: None,
3396                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3397                        max_ack_delay: TransportParameters::default().max_ack_delay,
3398                        initial_max_path_id: None,
3399                        ..params
3400                    };
3401                    self.set_peer_params(params);
3402                    self.qlog.emit_peer_transport_params_restored(self, now);
3403                }
3404                Err(e) => {
3405                    error!("session ticket has malformed transport parameters: {}", e);
3406                    return;
3407                }
3408            }
3409        }
3410        trace!("0-RTT enabled");
3411        self.zero_rtt_enabled = true;
3412        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3413    }
3414
3415    fn read_crypto(
3416        &mut self,
3417        space: SpaceId,
3418        crypto: &frame::Crypto,
3419        payload_len: usize,
3420    ) -> Result<(), TransportError> {
3421        let expected = if !self.state.is_handshake() {
3422            SpaceId::Data
3423        } else if self.highest_space == SpaceId::Initial {
3424            SpaceId::Initial
3425        } else {
3426            // On the server, self.highest_space can be Data after receiving the client's first
3427            // flight, but we expect Handshake CRYPTO until the handshake is complete.
3428            SpaceId::Handshake
3429        };
3430        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
3431        // packets are illegal, and we don't process 1-RTT packets until the handshake is
3432        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
3433        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3434
3435        let end = crypto.offset + crypto.data.len() as u64;
3436        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3437            warn!(
3438                "received new {:?} CRYPTO data when expecting {:?}",
3439                space, expected
3440            );
3441            return Err(TransportError::PROTOCOL_VIOLATION(
3442                "new data at unexpected encryption level",
3443            ));
3444        }
3445
3446        let space = &mut self.spaces[space];
3447        let max = end.saturating_sub(space.crypto_stream.bytes_read());
3448        if max > self.config.crypto_buffer_size as u64 {
3449            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3450        }
3451
3452        space
3453            .crypto_stream
3454            .insert(crypto.offset, crypto.data.clone(), payload_len);
3455        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3456            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3457            if self.crypto.read_handshake(&chunk.bytes)? {
3458                self.events.push_back(Event::HandshakeDataReady);
3459            }
3460        }
3461
3462        Ok(())
3463    }
3464
3465    fn write_crypto(&mut self) {
3466        loop {
3467            let space = self.highest_space;
3468            let mut outgoing = Vec::new();
3469            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3470                match space {
3471                    SpaceId::Initial => {
3472                        self.upgrade_crypto(SpaceId::Handshake, crypto);
3473                    }
3474                    SpaceId::Handshake => {
3475                        self.upgrade_crypto(SpaceId::Data, crypto);
3476                    }
3477                    _ => unreachable!("got updated secrets during 1-RTT"),
3478                }
3479            }
3480            if outgoing.is_empty() {
3481                if space == self.highest_space {
3482                    break;
3483                } else {
3484                    // Keys updated, check for more data to send
3485                    continue;
3486                }
3487            }
3488            let offset = self.spaces[space].crypto_offset;
3489            let outgoing = Bytes::from(outgoing);
3490            if let Some(hs) = self.state.as_handshake_mut() {
3491                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3492                    hs.client_hello = Some(outgoing.clone());
3493                }
3494            }
3495            self.spaces[space].crypto_offset += outgoing.len() as u64;
3496            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3497            self.spaces[space].pending.crypto.push_back(frame::Crypto {
3498                offset,
3499                data: outgoing,
3500            });
3501        }
3502    }
3503
3504    /// Switch to stronger cryptography during handshake
3505    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3506        debug_assert!(
3507            self.spaces[space].crypto.is_none(),
3508            "already reached packet space {space:?}"
3509        );
3510        trace!("{:?} keys ready", space);
3511        if space == SpaceId::Data {
3512            // Precompute the first key update
3513            self.next_crypto = Some(
3514                self.crypto
3515                    .next_1rtt_keys()
3516                    .expect("handshake should be complete"),
3517            );
3518        }
3519
3520        self.spaces[space].crypto = Some(crypto);
3521        debug_assert!(space as usize > self.highest_space as usize);
3522        self.highest_space = space;
3523        if space == SpaceId::Data && self.side.is_client() {
3524            // Discard 0-RTT keys because 1-RTT keys are available.
3525            self.zero_rtt_crypto = None;
3526        }
3527    }
3528
3529    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3530        debug_assert!(space_id != SpaceId::Data);
3531        trace!("discarding {:?} keys", space_id);
3532        if space_id == SpaceId::Initial {
3533            // No longer needed
3534            if let ConnectionSide::Client { token, .. } = &mut self.side {
3535                *token = Bytes::new();
3536            }
3537        }
3538        let space = &mut self.spaces[space_id];
3539        space.crypto = None;
3540        let pns = space.for_path(PathId::ZERO);
3541        pns.time_of_last_ack_eliciting_packet = None;
3542        pns.loss_time = None;
3543        pns.loss_probes = 0;
3544        let sent_packets = mem::take(&mut pns.sent_packets);
3545        let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3546        for (_, packet) in sent_packets.into_iter() {
3547            path.data.remove_in_flight(&packet);
3548        }
3549
3550        self.set_loss_detection_timer(now, PathId::ZERO)
3551    }
3552
3553    fn handle_coalesced(
3554        &mut self,
3555        now: Instant,
3556        remote: SocketAddr,
3557        path_id: PathId,
3558        ecn: Option<EcnCodepoint>,
3559        data: BytesMut,
3560    ) {
3561        self.path_data_mut(path_id)
3562            .inc_total_recvd(data.len() as u64);
3563        let mut remaining = Some(data);
3564        let cid_len = self
3565            .local_cid_state
3566            .values()
3567            .map(|cid_state| cid_state.cid_len())
3568            .next()
3569            .expect("one cid_state must exist");
3570        while let Some(data) = remaining {
3571            match PartialDecode::new(
3572                data,
3573                &FixedLengthConnectionIdParser::new(cid_len),
3574                &[self.version],
3575                self.endpoint_config.grease_quic_bit,
3576            ) {
3577                Ok((partial_decode, rest)) => {
3578                    remaining = rest;
3579                    self.handle_decode(now, remote, path_id, ecn, partial_decode);
3580                }
3581                Err(e) => {
3582                    trace!("malformed header: {}", e);
3583                    return;
3584                }
3585            }
3586        }
3587    }
3588
3589    fn handle_decode(
3590        &mut self,
3591        now: Instant,
3592        remote: SocketAddr,
3593        path_id: PathId,
3594        ecn: Option<EcnCodepoint>,
3595        partial_decode: PartialDecode,
3596    ) {
3597        let qlog = QlogRecvPacket::new(partial_decode.len());
3598        if let Some(decoded) = packet_crypto::unprotect_header(
3599            partial_decode,
3600            &self.spaces,
3601            self.zero_rtt_crypto.as_ref(),
3602            self.peer_params.stateless_reset_token,
3603        ) {
3604            self.handle_packet(
3605                now,
3606                remote,
3607                path_id,
3608                ecn,
3609                decoded.packet,
3610                decoded.stateless_reset,
3611                qlog,
3612            );
3613        }
3614    }
3615
3616    fn handle_packet(
3617        &mut self,
3618        now: Instant,
3619        remote: SocketAddr,
3620        path_id: PathId,
3621        ecn: Option<EcnCodepoint>,
3622        packet: Option<Packet>,
3623        stateless_reset: bool,
3624        mut qlog: QlogRecvPacket,
3625    ) {
3626        self.stats.udp_rx.ios += 1;
3627        if let Some(ref packet) = packet {
3628            trace!(
3629                "got {:?} packet ({} bytes) from {} using id {}",
3630                packet.header.space(),
3631                packet.payload.len() + packet.header_data.len(),
3632                remote,
3633                packet.header.dst_cid(),
3634            );
3635        }
3636
3637        if self.is_handshaking() {
3638            if path_id != PathId::ZERO {
3639                debug!(%remote, %path_id, "discarding multipath packet during handshake");
3640                return;
3641            }
3642            if remote != self.path_data_mut(path_id).remote {
3643                if let Some(hs) = self.state.as_handshake() {
3644                    if hs.allow_server_migration {
3645                        trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3646                        self.path_data_mut(path_id).remote = remote;
3647                        self.qlog.emit_tuple_assigned(path_id, remote, now);
3648                    } else {
3649                        debug!("discarding packet with unexpected remote during handshake");
3650                        return;
3651                    }
3652                } else {
3653                    debug!("discarding packet with unexpected remote during handshake");
3654                    return;
3655                }
3656            }
3657        }
3658
3659        let was_closed = self.state.is_closed();
3660        let was_drained = self.state.is_drained();
3661
3662        let decrypted = match packet {
3663            None => Err(None),
3664            Some(mut packet) => self
3665                .decrypt_packet(now, path_id, &mut packet)
3666                .map(move |number| (packet, number)),
3667        };
3668        let result = match decrypted {
3669            _ if stateless_reset => {
3670                debug!("got stateless reset");
3671                Err(ConnectionError::Reset)
3672            }
3673            Err(Some(e)) => {
3674                warn!("illegal packet: {}", e);
3675                Err(e.into())
3676            }
3677            Err(None) => {
3678                debug!("failed to authenticate packet");
3679                self.authentication_failures += 1;
3680                let integrity_limit = self.spaces[self.highest_space]
3681                    .crypto
3682                    .as_ref()
3683                    .unwrap()
3684                    .packet
3685                    .local
3686                    .integrity_limit();
3687                if self.authentication_failures > integrity_limit {
3688                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3689                } else {
3690                    return;
3691                }
3692            }
3693            Ok((packet, number)) => {
3694                qlog.header(&packet.header, number, path_id);
3695                let span = match number {
3696                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3697                    None => trace_span!("recv", space = ?packet.header.space()),
3698                };
3699                let _guard = span.enter();
3700
3701                let dedup = self.spaces[packet.header.space()]
3702                    .path_space_mut(path_id)
3703                    .map(|pns| &mut pns.dedup);
3704                if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3705                    debug!("discarding possible duplicate packet");
3706                    self.qlog.emit_packet_received(qlog, now);
3707                    return;
3708                } else if self.state.is_handshake() && packet.header.is_short() {
3709                    // TODO: SHOULD buffer these to improve reordering tolerance.
3710                    trace!("dropping short packet during handshake");
3711                    self.qlog.emit_packet_received(qlog, now);
3712                    return;
3713                } else {
3714                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3715                        if let Some(hs) = self.state.as_handshake() {
3716                            if self.side.is_server() && token != &hs.expected_token {
3717                                // Clients must send the same retry token in every Initial. Initial
3718                                // packets can be spoofed, so we discard rather than killing the
3719                                // connection.
3720                                warn!("discarding Initial with invalid retry token");
3721                                self.qlog.emit_packet_received(qlog, now);
3722                                return;
3723                            }
3724                        }
3725                    }
3726
3727                    if !self.state.is_closed() {
3728                        let spin = match packet.header {
3729                            Header::Short { spin, .. } => spin,
3730                            _ => false,
3731                        };
3732
3733                        if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3734                            // Only the client is allowed to open paths
3735                            self.ensure_path(path_id, remote, now, number);
3736                        }
3737                        if self.paths.contains_key(&path_id) {
3738                            self.on_packet_authenticated(
3739                                now,
3740                                packet.header.space(),
3741                                path_id,
3742                                ecn,
3743                                number,
3744                                spin,
3745                                packet.header.is_1rtt(),
3746                            );
3747                        }
3748                    }
3749
3750                    let res = self
3751                        .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3752
3753                    self.qlog.emit_packet_received(qlog, now);
3754                    res
3755                }
3756            }
3757        };
3758
3759        // State transitions for error cases
3760        if let Err(conn_err) = result {
3761            match conn_err {
3762                ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3763                ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3764                ConnectionError::Reset
3765                | ConnectionError::TransportError(TransportError {
3766                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
3767                    ..
3768                }) => {
3769                    self.state.move_to_drained(Some(conn_err));
3770                }
3771                ConnectionError::TimedOut => {
3772                    unreachable!("timeouts aren't generated by packet processing");
3773                }
3774                ConnectionError::TransportError(err) => {
3775                    debug!("closing connection due to transport error: {}", err);
3776                    self.state.move_to_closed(err);
3777                }
3778                ConnectionError::VersionMismatch => {
3779                    self.state.move_to_draining(Some(conn_err));
3780                }
3781                ConnectionError::LocallyClosed => {
3782                    unreachable!("LocallyClosed isn't generated by packet processing");
3783                }
3784                ConnectionError::CidsExhausted => {
3785                    unreachable!("CidsExhausted isn't generated by packet processing");
3786                }
3787            };
3788        }
3789
3790        if !was_closed && self.state.is_closed() {
3791            self.close_common();
3792            if !self.state.is_drained() {
3793                self.set_close_timer(now);
3794            }
3795        }
3796        if !was_drained && self.state.is_drained() {
3797            self.endpoint_events.push_back(EndpointEventInner::Drained);
3798            // Close timer may have been started previously, e.g. if we sent a close and got a
3799            // stateless reset in response
3800            self.timers
3801                .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3802        }
3803
3804        // Transmit CONNECTION_CLOSE if necessary
3805        if matches!(self.state.as_type(), StateType::Closed) {
3806            // If there is no PathData for this PathId the packet was for a brand new
3807            // path. It was a valid packet however, so the remote is valid and we want to
3808            // send CONNECTION_CLOSE.
3809            let path_remote = self
3810                .paths
3811                .get(&path_id)
3812                .map(|p| p.data.remote)
3813                .unwrap_or(remote);
3814            self.close = remote == path_remote;
3815        }
3816    }
3817
3818    fn process_decrypted_packet(
3819        &mut self,
3820        now: Instant,
3821        remote: SocketAddr,
3822        path_id: PathId,
3823        number: Option<u64>,
3824        packet: Packet,
3825        qlog: &mut QlogRecvPacket,
3826    ) -> Result<(), ConnectionError> {
3827        if !self.paths.contains_key(&path_id) {
3828            // There is a chance this is a server side, first (for this path) packet, which would
3829            // be a protocol violation. It's more likely, however, that this is a packet of a
3830            // pruned path
3831            trace!(%path_id, ?number, "discarding packet for unknown path");
3832            return Ok(());
3833        }
3834        let state = match self.state.as_type() {
3835            StateType::Established => {
3836                match packet.header.space() {
3837                    SpaceId::Data => {
3838                        self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3839                    }
3840                    _ if packet.header.has_frames() => {
3841                        self.process_early_payload(now, path_id, packet, qlog)?
3842                    }
3843                    _ => {
3844                        trace!("discarding unexpected pre-handshake packet");
3845                    }
3846                }
3847                return Ok(());
3848            }
3849            StateType::Closed => {
3850                for result in frame::Iter::new(packet.payload.freeze())? {
3851                    let frame = match result {
3852                        Ok(frame) => frame,
3853                        Err(err) => {
3854                            debug!("frame decoding error: {err:?}");
3855                            continue;
3856                        }
3857                    };
3858                    qlog.frame(&frame);
3859
3860                    if let Frame::Padding = frame {
3861                        continue;
3862                    };
3863
3864                    self.stats.frame_rx.record(&frame);
3865
3866                    if let Frame::Close(_error) = frame {
3867                        trace!("draining");
3868                        self.state.move_to_draining(None);
3869                        break;
3870                    }
3871                }
3872                return Ok(());
3873            }
3874            StateType::Draining | StateType::Drained => return Ok(()),
3875            StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3876        };
3877
3878        match packet.header {
3879            Header::Retry {
3880                src_cid: rem_cid, ..
3881            } => {
3882                debug_assert_eq!(path_id, PathId::ZERO);
3883                if self.side.is_server() {
3884                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3885                }
3886
3887                let is_valid_retry = self
3888                    .rem_cids
3889                    .get(&path_id)
3890                    .map(|cids| cids.active())
3891                    .map(|orig_dst_cid| {
3892                        self.crypto.is_valid_retry(
3893                            orig_dst_cid,
3894                            &packet.header_data,
3895                            &packet.payload,
3896                        )
3897                    })
3898                    .unwrap_or_default();
3899                if self.total_authed_packets > 1
3900                            || packet.payload.len() <= 16 // token + 16 byte tag
3901                            || !is_valid_retry
3902                {
3903                    trace!("discarding invalid Retry");
3904                    // - After the client has received and processed an Initial or Retry
3905                    //   packet from the server, it MUST discard any subsequent Retry
3906                    //   packets that it receives.
3907                    // - A client MUST discard a Retry packet with a zero-length Retry Token
3908                    //   field.
3909                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
3910                    //   that cannot be validated
3911                    return Ok(());
3912                }
3913
3914                trace!("retrying with CID {}", rem_cid);
3915                let client_hello = state.client_hello.take().unwrap();
3916                self.retry_src_cid = Some(rem_cid);
3917                self.rem_cids
3918                    .get_mut(&path_id)
3919                    .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3920                    .update_initial_cid(rem_cid);
3921                self.rem_handshake_cid = rem_cid;
3922
3923                let space = &mut self.spaces[SpaceId::Initial];
3924                if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3925                    self.on_packet_acked(now, PathId::ZERO, info);
3926                };
3927
3928                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after
3929                // any retransmitted Initials
3930                self.spaces[SpaceId::Initial] = {
3931                    let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3932                    space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3933                    space.crypto_offset = client_hello.len() as u64;
3934                    space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3935                        .for_path(path_id)
3936                        .next_packet_number;
3937                    space.pending.crypto.push_back(frame::Crypto {
3938                        offset: 0,
3939                        data: client_hello,
3940                    });
3941                    space
3942                };
3943
3944                // Retransmit all 0-RTT data
3945                let zero_rtt = mem::take(
3946                    &mut self.spaces[SpaceId::Data]
3947                        .for_path(PathId::ZERO)
3948                        .sent_packets,
3949                );
3950                for (_, info) in zero_rtt.into_iter() {
3951                    self.paths
3952                        .get_mut(&PathId::ZERO)
3953                        .unwrap()
3954                        .remove_in_flight(&info);
3955                    self.spaces[SpaceId::Data].pending |= info.retransmits;
3956                }
3957                self.streams.retransmit_all_for_0rtt();
3958
3959                let token_len = packet.payload.len() - 16;
3960                let ConnectionSide::Client { ref mut token, .. } = self.side else {
3961                    unreachable!("we already short-circuited if we're server");
3962                };
3963                *token = packet.payload.freeze().split_to(token_len);
3964
3965                self.state = State::handshake(state::Handshake {
3966                    expected_token: Bytes::new(),
3967                    rem_cid_set: false,
3968                    client_hello: None,
3969                    allow_server_migration: true,
3970                });
3971                Ok(())
3972            }
3973            Header::Long {
3974                ty: LongType::Handshake,
3975                src_cid: rem_cid,
3976                dst_cid: loc_cid,
3977                ..
3978            } => {
3979                debug_assert_eq!(path_id, PathId::ZERO);
3980                if rem_cid != self.rem_handshake_cid {
3981                    debug!(
3982                        "discarding packet with mismatched remote CID: {} != {}",
3983                        self.rem_handshake_cid, rem_cid
3984                    );
3985                    return Ok(());
3986                }
3987                self.on_path_validated(path_id);
3988
3989                self.process_early_payload(now, path_id, packet, qlog)?;
3990                if self.state.is_closed() {
3991                    return Ok(());
3992                }
3993
3994                if self.crypto.is_handshaking() {
3995                    trace!("handshake ongoing");
3996                    return Ok(());
3997                }
3998
3999                if self.side.is_client() {
4000                    // Client-only because server params were set from the client's Initial
4001                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4002                        TransportError::new(
4003                            TransportErrorCode::crypto(0x6d),
4004                            "transport parameters missing".to_owned(),
4005                        )
4006                    })?;
4007
4008                    if self.has_0rtt() {
4009                        if !self.crypto.early_data_accepted().unwrap() {
4010                            debug_assert!(self.side.is_client());
4011                            debug!("0-RTT rejected");
4012                            self.accepted_0rtt = false;
4013                            self.streams.zero_rtt_rejected();
4014
4015                            // Discard already-queued frames
4016                            self.spaces[SpaceId::Data].pending = Retransmits::default();
4017
4018                            // Discard 0-RTT packets
4019                            let sent_packets = mem::take(
4020                                &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4021                            );
4022                            for (_, packet) in sent_packets.into_iter() {
4023                                self.paths
4024                                    .get_mut(&path_id)
4025                                    .unwrap()
4026                                    .remove_in_flight(&packet);
4027                            }
4028                        } else {
4029                            self.accepted_0rtt = true;
4030                            params.validate_resumption_from(&self.peer_params)?;
4031                        }
4032                    }
4033                    if let Some(token) = params.stateless_reset_token {
4034                        let remote = self.path_data(path_id).remote;
4035                        self.endpoint_events
4036                            .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4037                    }
4038                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4039                    self.issue_first_cids(now);
4040                } else {
4041                    // Server-only
4042                    self.spaces[SpaceId::Data].pending.handshake_done = true;
4043                    self.discard_space(now, SpaceId::Handshake);
4044                    self.events.push_back(Event::HandshakeConfirmed);
4045                    trace!("handshake confirmed");
4046                }
4047
4048                self.events.push_back(Event::Connected);
4049                self.state.move_to_established();
4050                trace!("established");
4051
4052                // Multipath can only be enabled after the state has reached Established.
4053                // So this can not happen any earlier.
4054                self.issue_first_path_cids(now);
4055                Ok(())
4056            }
4057            Header::Initial(InitialHeader {
4058                src_cid: rem_cid,
4059                dst_cid: loc_cid,
4060                ..
4061            }) => {
4062                debug_assert_eq!(path_id, PathId::ZERO);
4063                if !state.rem_cid_set {
4064                    trace!("switching remote CID to {}", rem_cid);
4065                    let mut state = state.clone();
4066                    self.rem_cids
4067                        .get_mut(&path_id)
4068                        .expect("PathId::ZERO not yet abandoned")
4069                        .update_initial_cid(rem_cid);
4070                    self.rem_handshake_cid = rem_cid;
4071                    self.orig_rem_cid = rem_cid;
4072                    state.rem_cid_set = true;
4073                    self.state.move_to_handshake(state);
4074                } else if rem_cid != self.rem_handshake_cid {
4075                    debug!(
4076                        "discarding packet with mismatched remote CID: {} != {}",
4077                        self.rem_handshake_cid, rem_cid
4078                    );
4079                    return Ok(());
4080                }
4081
4082                let starting_space = self.highest_space;
4083                self.process_early_payload(now, path_id, packet, qlog)?;
4084
4085                if self.side.is_server()
4086                    && starting_space == SpaceId::Initial
4087                    && self.highest_space != SpaceId::Initial
4088                {
4089                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4090                        TransportError::new(
4091                            TransportErrorCode::crypto(0x6d),
4092                            "transport parameters missing".to_owned(),
4093                        )
4094                    })?;
4095                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4096                    self.issue_first_cids(now);
4097                    self.init_0rtt(now);
4098                }
4099                Ok(())
4100            }
4101            Header::Long {
4102                ty: LongType::ZeroRtt,
4103                ..
4104            } => {
4105                self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4106                Ok(())
4107            }
4108            Header::VersionNegotiate { .. } => {
4109                if self.total_authed_packets > 1 {
4110                    return Ok(());
4111                }
4112                let supported = packet
4113                    .payload
4114                    .chunks(4)
4115                    .any(|x| match <[u8; 4]>::try_from(x) {
4116                        Ok(version) => self.version == u32::from_be_bytes(version),
4117                        Err(_) => false,
4118                    });
4119                if supported {
4120                    return Ok(());
4121                }
4122                debug!("remote doesn't support our version");
4123                Err(ConnectionError::VersionMismatch)
4124            }
4125            Header::Short { .. } => unreachable!(
4126                "short packets received during handshake are discarded in handle_packet"
4127            ),
4128        }
4129    }
4130
4131    /// Process an Initial or Handshake packet payload
4132    fn process_early_payload(
4133        &mut self,
4134        now: Instant,
4135        path_id: PathId,
4136        packet: Packet,
4137        #[allow(unused)] qlog: &mut QlogRecvPacket,
4138    ) -> Result<(), TransportError> {
4139        debug_assert_ne!(packet.header.space(), SpaceId::Data);
4140        debug_assert_eq!(path_id, PathId::ZERO);
4141        let payload_len = packet.payload.len();
4142        let mut ack_eliciting = false;
4143        for result in frame::Iter::new(packet.payload.freeze())? {
4144            let frame = result?;
4145            qlog.frame(&frame);
4146            let span = match frame {
4147                Frame::Padding => continue,
4148                _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4149            };
4150
4151            self.stats.frame_rx.record(&frame);
4152
4153            let _guard = span.as_ref().map(|x| x.enter());
4154            ack_eliciting |= frame.is_ack_eliciting();
4155
4156            // Process frames
4157            if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4158                return Err(TransportError::PROTOCOL_VIOLATION(
4159                    "illegal frame type in handshake",
4160                ));
4161            }
4162
4163            match frame {
4164                Frame::Padding | Frame::Ping => {}
4165                Frame::Crypto(frame) => {
4166                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
4167                }
4168                Frame::Ack(ack) => {
4169                    self.on_ack_received(now, packet.header.space(), ack)?;
4170                }
4171                Frame::PathAck(ack) => {
4172                    span.as_ref()
4173                        .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4174                    self.on_path_ack_received(now, packet.header.space(), ack)?;
4175                }
4176                Frame::Close(reason) => {
4177                    self.state.move_to_draining(Some(reason.into()));
4178                    return Ok(());
4179                }
4180                _ => {
4181                    let mut err =
4182                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4183                    err.frame = Some(frame.ty());
4184                    return Err(err);
4185                }
4186            }
4187        }
4188
4189        if ack_eliciting {
4190            // In the initial and handshake spaces, ACKs must be sent immediately
4191            self.spaces[packet.header.space()]
4192                .for_path(path_id)
4193                .pending_acks
4194                .set_immediate_ack_required();
4195        }
4196
4197        self.write_crypto();
4198        Ok(())
4199    }
4200
4201    /// Processes the packet payload, always in the data space.
4202    fn process_payload(
4203        &mut self,
4204        now: Instant,
4205        remote: SocketAddr,
4206        path_id: PathId,
4207        number: u64,
4208        packet: Packet,
4209        #[allow(unused)] qlog: &mut QlogRecvPacket,
4210    ) -> Result<(), TransportError> {
4211        let payload = packet.payload.freeze();
4212        let mut is_probing_packet = true;
4213        let mut close = None;
4214        let payload_len = payload.len();
4215        let mut ack_eliciting = false;
4216        // if this packet triggers a path migration and includes a observed address frame, it's
4217        // stored here
4218        let mut migration_observed_addr = None;
4219        for result in frame::Iter::new(payload)? {
4220            let frame = result?;
4221            qlog.frame(&frame);
4222            let span = match frame {
4223                Frame::Padding => continue,
4224                _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4225            };
4226
4227            self.stats.frame_rx.record(&frame);
4228            // Crypto, Stream and Datagram frames are special cased in order no pollute
4229            // the log with payload data
4230            match &frame {
4231                Frame::Crypto(f) => {
4232                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4233                }
4234                Frame::Stream(f) => {
4235                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4236                }
4237                Frame::Datagram(f) => {
4238                    trace!(len = f.data.len(), "got datagram frame");
4239                }
4240                f => {
4241                    trace!("got frame {:?}", f);
4242                }
4243            }
4244
4245            let _guard = span.enter();
4246            if packet.header.is_0rtt() {
4247                match frame {
4248                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4249                        return Err(TransportError::PROTOCOL_VIOLATION(
4250                            "illegal frame type in 0-RTT",
4251                        ));
4252                    }
4253                    _ => {
4254                        if frame.is_1rtt() {
4255                            return Err(TransportError::PROTOCOL_VIOLATION(
4256                                "illegal frame type in 0-RTT",
4257                            ));
4258                        }
4259                    }
4260                }
4261            }
4262            ack_eliciting |= frame.is_ack_eliciting();
4263
4264            // Check whether this could be a probing packet
4265            match frame {
4266                Frame::Padding
4267                | Frame::PathChallenge(_)
4268                | Frame::PathResponse(_)
4269                | Frame::NewConnectionId(_)
4270                | Frame::ObservedAddr(_) => {}
4271                _ => {
4272                    is_probing_packet = false;
4273                }
4274            }
4275
4276            match frame {
4277                Frame::Crypto(frame) => {
4278                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4279                }
4280                Frame::Stream(frame) => {
4281                    if self.streams.received(frame, payload_len)?.should_transmit() {
4282                        self.spaces[SpaceId::Data].pending.max_data = true;
4283                    }
4284                }
4285                Frame::Ack(ack) => {
4286                    self.on_ack_received(now, SpaceId::Data, ack)?;
4287                }
4288                Frame::PathAck(ack) => {
4289                    span.record("path", tracing::field::debug(&ack.path_id));
4290                    self.on_path_ack_received(now, SpaceId::Data, ack)?;
4291                }
4292                Frame::Padding | Frame::Ping => {}
4293                Frame::Close(reason) => {
4294                    close = Some(reason);
4295                }
4296                Frame::PathChallenge(challenge) => {
4297                    let path = &mut self
4298                        .path_mut(path_id)
4299                        .expect("payload is processed only after the path becomes known");
4300                    path.path_responses.push(number, challenge.0, remote);
4301                    if remote == path.remote {
4302                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
4303                        // attack. Send a non-probing packet to recover the active path.
4304                        // TODO(flub): No longer true! We now path_challege also to validate
4305                        //    the path if the path is new, without an RFC9000-style
4306                        //    migration involved. This means we add in an extra
4307                        //    IMMEDIATE_ACK on some challenges. It isn't really wrong to do
4308                        //    so, but it still is something untidy. We should instead
4309                        //    suppress this when we know the remote is still validating the
4310                        //    path.
4311                        match self.peer_supports_ack_frequency() {
4312                            true => self.immediate_ack(path_id),
4313                            false => {
4314                                self.ping_path(path_id).ok();
4315                            }
4316                        }
4317                    } else if self.iroh_hp.client_side().is_ok() {
4318                        // TODO(@divma): temp log. This might be too much
4319                        debug!("Potential Nat traversal PATH_CHALLENGE received");
4320                    }
4321                }
4322                Frame::PathResponse(response) => {
4323                    let path = self
4324                        .paths
4325                        .get_mut(&path_id)
4326                        .expect("payload is processed only after the path becomes known");
4327
4328                    match path.data.challenges_sent.get(&response.0) {
4329                        // Response to an on-path PathChallenge
4330                        Some(info) if info.remote == remote && path.data.remote == remote => {
4331                            let sent_instant = info.sent_instant;
4332                            // TODO(@divma): reset timers using the remaining off-path challenges
4333                            self.timers.stop(
4334                                Timer::PerPath(path_id, PathTimer::PathValidation),
4335                                self.qlog.with_time(now),
4336                            );
4337                            self.timers.stop(
4338                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
4339                                self.qlog.with_time(now),
4340                            );
4341                            if !path.data.validated {
4342                                trace!("new path validated");
4343                            }
4344                            self.timers.stop(
4345                                Timer::PerPath(path_id, PathTimer::PathOpen),
4346                                self.qlog.with_time(now),
4347                            );
4348                            // Clear any other on-path sent challenge.
4349                            path.data
4350                                .challenges_sent
4351                                .retain(|_token, info| info.remote != remote);
4352                            path.data.send_new_challenge = false;
4353                            path.data.validated = true;
4354
4355                            // This RTT can only be used for the initial RTT, not as a normal
4356                            // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
4357                            let rtt = now.saturating_duration_since(sent_instant);
4358                            path.data.rtt.reset_initial_rtt(rtt);
4359
4360                            self.events
4361                                .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4362                            // mark the path as open from the application perspective now that Opened
4363                            // event has been queued
4364                            if !std::mem::replace(&mut path.data.open, true) {
4365                                trace!("path opened");
4366                                if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4367                                {
4368                                    self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4369                                        id: path_id,
4370                                        addr: observed.socket_addr(),
4371                                    }));
4372                                }
4373                            }
4374                            if let Some((_, ref mut prev)) = path.prev {
4375                                prev.challenges_sent.clear();
4376                                prev.send_new_challenge = false;
4377                            }
4378                        }
4379                        // Response to an off-path PathChallenge
4380                        Some(info) if info.remote == remote => {
4381                            debug!("Response to off-path PathChallenge!");
4382                            path.data
4383                                .challenges_sent
4384                                .retain(|_token, info| info.remote != remote);
4385                        }
4386                        // Response to a PathChallenge we recognize, but from an invalid remote
4387                        Some(info) => {
4388                            debug!(%response, from=%remote, expected=%info.remote, "ignoring invalid PATH_RESPONSE")
4389                        }
4390                        // Response to an unknown PathChallenge
4391                        None => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4392                    }
4393                }
4394                Frame::MaxData(bytes) => {
4395                    self.streams.received_max_data(bytes);
4396                }
4397                Frame::MaxStreamData { id, offset } => {
4398                    self.streams.received_max_stream_data(id, offset)?;
4399                }
4400                Frame::MaxStreams { dir, count } => {
4401                    self.streams.received_max_streams(dir, count)?;
4402                }
4403                Frame::ResetStream(frame) => {
4404                    if self.streams.received_reset(frame)?.should_transmit() {
4405                        self.spaces[SpaceId::Data].pending.max_data = true;
4406                    }
4407                }
4408                Frame::DataBlocked { offset } => {
4409                    debug!(offset, "peer claims to be blocked at connection level");
4410                }
4411                Frame::StreamDataBlocked { id, offset } => {
4412                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4413                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4414                        return Err(TransportError::STREAM_STATE_ERROR(
4415                            "STREAM_DATA_BLOCKED on send-only stream",
4416                        ));
4417                    }
4418                    debug!(
4419                        stream = %id,
4420                        offset, "peer claims to be blocked at stream level"
4421                    );
4422                }
4423                Frame::StreamsBlocked { dir, limit } => {
4424                    if limit > MAX_STREAM_COUNT {
4425                        return Err(TransportError::FRAME_ENCODING_ERROR(
4426                            "unrepresentable stream limit",
4427                        ));
4428                    }
4429                    debug!(
4430                        "peer claims to be blocked opening more than {} {} streams",
4431                        limit, dir
4432                    );
4433                }
4434                Frame::StopSending(frame::StopSending { id, error_code }) => {
4435                    if id.initiator() != self.side.side() {
4436                        if id.dir() == Dir::Uni {
4437                            debug!("got STOP_SENDING on recv-only {}", id);
4438                            return Err(TransportError::STREAM_STATE_ERROR(
4439                                "STOP_SENDING on recv-only stream",
4440                            ));
4441                        }
4442                    } else if self.streams.is_local_unopened(id) {
4443                        return Err(TransportError::STREAM_STATE_ERROR(
4444                            "STOP_SENDING on unopened stream",
4445                        ));
4446                    }
4447                    self.streams.received_stop_sending(id, error_code);
4448                }
4449                Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4450                    if let Some(ref path_id) = path_id {
4451                        span.record("path", tracing::field::debug(&path_id));
4452                    }
4453                    let path_id = path_id.unwrap_or_default();
4454                    match self.local_cid_state.get_mut(&path_id) {
4455                        None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4456                        Some(cid_state) => {
4457                            let allow_more_cids = cid_state
4458                                .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4459
4460                            // If the path has closed, we do not issue more CIDs for this path
4461                            // For details see  https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.2.2
4462                            // > an endpoint SHOULD provide new connection IDs for that path, if still open, using PATH_NEW_CONNECTION_ID frames.
4463                            let has_path = !self.abandoned_paths.contains(&path_id);
4464                            let allow_more_cids = allow_more_cids && has_path;
4465
4466                            self.endpoint_events
4467                                .push_back(EndpointEventInner::RetireConnectionId(
4468                                    now,
4469                                    path_id,
4470                                    sequence,
4471                                    allow_more_cids,
4472                                ));
4473                        }
4474                    }
4475                }
4476                Frame::NewConnectionId(frame) => {
4477                    let path_id = if let Some(path_id) = frame.path_id {
4478                        if !self.is_multipath_negotiated() {
4479                            return Err(TransportError::PROTOCOL_VIOLATION(
4480                                "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4481                            ));
4482                        }
4483                        if path_id > self.local_max_path_id {
4484                            return Err(TransportError::PROTOCOL_VIOLATION(
4485                                "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4486                            ));
4487                        }
4488                        path_id
4489                    } else {
4490                        PathId::ZERO
4491                    };
4492
4493                    if self.abandoned_paths.contains(&path_id) {
4494                        trace!("ignoring issued CID for abandoned path");
4495                        continue;
4496                    }
4497                    if let Some(ref path_id) = frame.path_id {
4498                        span.record("path", tracing::field::debug(&path_id));
4499                    }
4500                    let rem_cids = self
4501                        .rem_cids
4502                        .entry(path_id)
4503                        .or_insert_with(|| CidQueue::new(frame.id));
4504                    if rem_cids.active().is_empty() {
4505                        // TODO(@divma): is the entry removed later? (rem_cids.entry)
4506                        return Err(TransportError::PROTOCOL_VIOLATION(
4507                            "NEW_CONNECTION_ID when CIDs aren't in use",
4508                        ));
4509                    }
4510                    if frame.retire_prior_to > frame.sequence {
4511                        return Err(TransportError::PROTOCOL_VIOLATION(
4512                            "NEW_CONNECTION_ID retiring unissued CIDs",
4513                        ));
4514                    }
4515
4516                    use crate::cid_queue::InsertError;
4517                    match rem_cids.insert(frame) {
4518                        Ok(None) => {}
4519                        Ok(Some((retired, reset_token))) => {
4520                            let pending_retired =
4521                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
4522                            /// Ensure `pending_retired` cannot grow without bound. Limit is
4523                            /// somewhat arbitrary but very permissive.
4524                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4525                            // We don't bother counting in-flight frames because those are bounded
4526                            // by congestion control.
4527                            if (pending_retired.len() as u64)
4528                                .saturating_add(retired.end.saturating_sub(retired.start))
4529                                > MAX_PENDING_RETIRED_CIDS
4530                            {
4531                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4532                                    "queued too many retired CIDs",
4533                                ));
4534                            }
4535                            pending_retired.extend(retired.map(|seq| (path_id, seq)));
4536                            self.set_reset_token(path_id, remote, reset_token);
4537                        }
4538                        Err(InsertError::ExceedsLimit) => {
4539                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4540                        }
4541                        Err(InsertError::Retired) => {
4542                            trace!("discarding already-retired");
4543                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
4544                            // range of connection IDs larger than the active connection ID limit
4545                            // was retired all at once via retire_prior_to.
4546                            self.spaces[SpaceId::Data]
4547                                .pending
4548                                .retire_cids
4549                                .push((path_id, frame.sequence));
4550                            continue;
4551                        }
4552                    };
4553
4554                    if self.side.is_server()
4555                        && path_id == PathId::ZERO
4556                        && self
4557                            .rem_cids
4558                            .get(&PathId::ZERO)
4559                            .map(|cids| cids.active_seq() == 0)
4560                            .unwrap_or_default()
4561                    {
4562                        // We're a server still using the initial remote CID for the client, so
4563                        // let's switch immediately to enable clientside stateless resets.
4564                        self.update_rem_cid(PathId::ZERO);
4565                    }
4566                }
4567                Frame::NewToken(NewToken { token }) => {
4568                    let ConnectionSide::Client {
4569                        token_store,
4570                        server_name,
4571                        ..
4572                    } = &self.side
4573                    else {
4574                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4575                    };
4576                    if token.is_empty() {
4577                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4578                    }
4579                    trace!("got new token");
4580                    token_store.insert(server_name, token);
4581                }
4582                Frame::Datagram(datagram) => {
4583                    if self
4584                        .datagrams
4585                        .received(datagram, &self.config.datagram_receive_buffer_size)?
4586                    {
4587                        self.events.push_back(Event::DatagramReceived);
4588                    }
4589                }
4590                Frame::AckFrequency(ack_frequency) => {
4591                    // This frame can only be sent in the Data space
4592
4593                    if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4594                        // The AckFrequency frame is stale (we have already received a more
4595                        // recent one)
4596                        continue;
4597                    }
4598
4599                    // Update the params for all of our paths
4600                    for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4601                        space.pending_acks.set_ack_frequency_params(&ack_frequency);
4602
4603                        // Our `max_ack_delay` has been updated, so we may need to adjust
4604                        // its associated timeout
4605                        if let Some(timeout) = space
4606                            .pending_acks
4607                            .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4608                        {
4609                            self.timers.set(
4610                                Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4611                                timeout,
4612                                self.qlog.with_time(now),
4613                            );
4614                        }
4615                    }
4616                }
4617                Frame::ImmediateAck => {
4618                    // This frame can only be sent in the Data space
4619                    for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4620                        pns.pending_acks.set_immediate_ack_required();
4621                    }
4622                }
4623                Frame::HandshakeDone => {
4624                    if self.side.is_server() {
4625                        return Err(TransportError::PROTOCOL_VIOLATION(
4626                            "client sent HANDSHAKE_DONE",
4627                        ));
4628                    }
4629                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
4630                        self.discard_space(now, SpaceId::Handshake);
4631                    }
4632                    self.events.push_back(Event::HandshakeConfirmed);
4633                    trace!("handshake confirmed");
4634                }
4635                Frame::ObservedAddr(observed) => {
4636                    // check if params allows the peer to send report and this node to receive it
4637                    trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4638                    if !self
4639                        .peer_params
4640                        .address_discovery_role
4641                        .should_report(&self.config.address_discovery_role)
4642                    {
4643                        return Err(TransportError::PROTOCOL_VIOLATION(
4644                            "received OBSERVED_ADDRESS frame when not negotiated",
4645                        ));
4646                    }
4647                    // must only be sent in data space
4648                    if packet.header.space() != SpaceId::Data {
4649                        return Err(TransportError::PROTOCOL_VIOLATION(
4650                            "OBSERVED_ADDRESS frame outside data space",
4651                        ));
4652                    }
4653
4654                    let path = self.path_data_mut(path_id);
4655                    if remote == path.remote {
4656                        if let Some(updated) = path.update_observed_addr_report(observed) {
4657                            if path.open {
4658                                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4659                                    id: path_id,
4660                                    addr: updated,
4661                                }));
4662                            }
4663                            // otherwise the event is reported when the path is deemed open
4664                        }
4665                    } else {
4666                        // include in migration
4667                        migration_observed_addr = Some(observed)
4668                    }
4669                }
4670                Frame::PathAbandon(frame::PathAbandon {
4671                    path_id,
4672                    error_code,
4673                }) => {
4674                    span.record("path", tracing::field::debug(&path_id));
4675                    // TODO(flub): don't really know which error code to use here.
4676                    let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4677                        Ok(()) => {
4678                            trace!("peer abandoned path");
4679                            false
4680                        }
4681                        Err(ClosePathError::LastOpenPath) => {
4682                            trace!("peer abandoned last path, closing connection");
4683                            // TODO(flub): which error code?
4684                            return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4685                        }
4686                        Err(ClosePathError::ClosedPath) => {
4687                            trace!("peer abandoned already closed path");
4688                            true
4689                        }
4690                    };
4691                    // If we receive a retransmit of PATH_ABANDON then we may already have
4692                    // abandoned this path locally.  In that case the DiscardPath timer
4693                    // may already have fired and we no longer have any state for this path.
4694                    // Only set this timer if we still have path state.
4695                    if self.path(path_id).is_some() && !already_abandoned {
4696                        // TODO(flub): Checking is_some() here followed by a number of calls
4697                        //    that would panic if it was None is really ugly.  If only we
4698                        //    could do something like PathData::pto().  One day we'll have
4699                        //    unified SpaceId and PathId and this will be possible.
4700                        let delay = self.pto(SpaceId::Data, path_id) * 3;
4701                        self.timers.set(
4702                            Timer::PerPath(path_id, PathTimer::DiscardPath),
4703                            now + delay,
4704                            self.qlog.with_time(now),
4705                        );
4706                    }
4707                }
4708                Frame::PathStatusAvailable(info) => {
4709                    span.record("path", tracing::field::debug(&info.path_id));
4710                    if self.is_multipath_negotiated() {
4711                        self.on_path_status(
4712                            info.path_id,
4713                            PathStatus::Available,
4714                            info.status_seq_no,
4715                        );
4716                    } else {
4717                        return Err(TransportError::PROTOCOL_VIOLATION(
4718                            "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4719                        ));
4720                    }
4721                }
4722                Frame::PathStatusBackup(info) => {
4723                    span.record("path", tracing::field::debug(&info.path_id));
4724                    if self.is_multipath_negotiated() {
4725                        self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4726                    } else {
4727                        return Err(TransportError::PROTOCOL_VIOLATION(
4728                            "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4729                        ));
4730                    }
4731                }
4732                Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4733                    span.record("path", tracing::field::debug(&path_id));
4734                    if !self.is_multipath_negotiated() {
4735                        return Err(TransportError::PROTOCOL_VIOLATION(
4736                            "received MAX_PATH_ID frame when multipath was not negotiated",
4737                        ));
4738                    }
4739                    // frames that do not increase the path id are ignored
4740                    if path_id > self.remote_max_path_id {
4741                        self.remote_max_path_id = path_id;
4742                        self.issue_first_path_cids(now);
4743                    }
4744                }
4745                Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4746                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4747                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4748                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4749                    if self.is_multipath_negotiated() {
4750                        if self.local_max_path_id > max_path_id {
4751                            return Err(TransportError::PROTOCOL_VIOLATION(
4752                                "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4753                            ));
4754                        }
4755                        debug!("received PATHS_BLOCKED({:?})", max_path_id);
4756                        // TODO(@divma): ensure max concurrent paths
4757                    } else {
4758                        return Err(TransportError::PROTOCOL_VIOLATION(
4759                            "received PATHS_BLOCKED frame when not multipath was not negotiated",
4760                        ));
4761                    }
4762                }
4763                Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4764                    // Nothing to do.  This is recorded in the frame stats, but otherwise we
4765                    // always issue all CIDs we're allowed to issue, so either this is an
4766                    // impatient peer or a bug on our side.
4767
4768                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4769                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4770                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4771                    if self.is_multipath_negotiated() {
4772                        if path_id > self.local_max_path_id {
4773                            return Err(TransportError::PROTOCOL_VIOLATION(
4774                                "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4775                            ));
4776                        }
4777                        if next_seq.0
4778                            > self
4779                                .local_cid_state
4780                                .get(&path_id)
4781                                .map(|cid_state| cid_state.active_seq().1 + 1)
4782                                .unwrap_or_default()
4783                        {
4784                            return Err(TransportError::PROTOCOL_VIOLATION(
4785                                "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4786                            ));
4787                        }
4788                        debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4789                    } else {
4790                        return Err(TransportError::PROTOCOL_VIOLATION(
4791                            "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4792                        ));
4793                    }
4794                }
4795                Frame::AddAddress(addr) => {
4796                    let client_state = match self.iroh_hp.client_side_mut() {
4797                        Ok(state) => state,
4798                        Err(err) => {
4799                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4800                                "Nat traversal(ADD_ADDRESS): {err}"
4801                            )));
4802                        }
4803                    };
4804
4805                    if !client_state.check_remote_address(&addr) {
4806                        // if the address is not valid we flag it, but update anyway
4807                        warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4808                    }
4809
4810                    match client_state.add_remote_address(addr) {
4811                        Ok(maybe_added) => {
4812                            if let Some(added) = maybe_added {
4813                                self.events.push_back(Event::NatTraversal(
4814                                    iroh_hp::Event::AddressAdded(added),
4815                                ));
4816                            }
4817                        }
4818                        Err(e) => {
4819                            warn!(%e, "failed to add remote address")
4820                        }
4821                    }
4822                }
4823                Frame::RemoveAddress(addr) => {
4824                    let client_state = match self.iroh_hp.client_side_mut() {
4825                        Ok(state) => state,
4826                        Err(err) => {
4827                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4828                                "Nat traversal(REMOVE_ADDRESS): {err}"
4829                            )));
4830                        }
4831                    };
4832                    if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4833                        self.events
4834                            .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4835                                removed_addr,
4836                            )));
4837                    }
4838                }
4839                Frame::ReachOut(reach_out) => {
4840                    let server_state = match self.iroh_hp.server_side_mut() {
4841                        Ok(state) => state,
4842                        Err(err) => {
4843                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4844                                "Nat traversal(REACH_OUT): {err}"
4845                            )));
4846                        }
4847                    };
4848
4849                    if let Err(err) = server_state.handle_reach_out(reach_out) {
4850                        return Err(TransportError::PROTOCOL_VIOLATION(format!(
4851                            "Nat traversal(REACH_OUT): {err}"
4852                        )));
4853                    }
4854                }
4855            }
4856        }
4857
4858        let space = self.spaces[SpaceId::Data].for_path(path_id);
4859        if space
4860            .pending_acks
4861            .packet_received(now, number, ack_eliciting, &space.dedup)
4862        {
4863            if self.abandoned_paths.contains(&path_id) {
4864                // § 3.4.3 QUIC-MULTIPATH: promptly send ACKs for packets received from
4865                // abandoned paths.
4866                space.pending_acks.set_immediate_ack_required();
4867            } else {
4868                self.timers.set(
4869                    Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4870                    now + self.ack_frequency.max_ack_delay,
4871                    self.qlog.with_time(now),
4872                );
4873            }
4874        }
4875
4876        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
4877        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
4878        // are only freed, and hence only issue credit, once the application has been notified
4879        // during a read on the stream.
4880        let pending = &mut self.spaces[SpaceId::Data].pending;
4881        self.streams.queue_max_stream_id(pending);
4882
4883        if let Some(reason) = close {
4884            self.state.move_to_draining(Some(reason.into()));
4885            self.close = true;
4886        }
4887
4888        if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4889            && !is_probing_packet
4890            && remote != self.path_data(path_id).remote
4891        {
4892            let ConnectionSide::Server { ref server_config } = self.side else {
4893                panic!("packets from unknown remote should be dropped by clients");
4894            };
4895            debug_assert!(
4896                server_config.migration,
4897                "migration-initiating packets should have been dropped immediately"
4898            );
4899            self.migrate(path_id, now, remote, migration_observed_addr);
4900            // Break linkability, if possible
4901            self.update_rem_cid(path_id);
4902            self.spin = false;
4903        }
4904
4905        Ok(())
4906    }
4907
4908    fn migrate(
4909        &mut self,
4910        path_id: PathId,
4911        now: Instant,
4912        remote: SocketAddr,
4913        observed_addr: Option<ObservedAddr>,
4914    ) {
4915        trace!(%remote, %path_id, "migration initiated");
4916        self.path_counter = self.path_counter.wrapping_add(1);
4917        // TODO(@divma): conditions for path migration in multipath are very specific, check them
4918        // again to prevent path migrations that should actually create a new path
4919
4920        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
4921        // Note that the congestion window will not grow until validation terminates. Helps mitigate
4922        // amplification attacks performed by spoofing source addresses.
4923        let prev_pto = self.pto(SpaceId::Data, path_id);
4924        let known_path = self.paths.get_mut(&path_id).expect("known path");
4925        let path = &mut known_path.data;
4926        let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4927            PathData::from_previous(remote, path, self.path_counter, now)
4928        } else {
4929            let peer_max_udp_payload_size =
4930                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4931                    .unwrap_or(u16::MAX);
4932            PathData::new(
4933                remote,
4934                self.allow_mtud,
4935                Some(peer_max_udp_payload_size),
4936                self.path_counter,
4937                now,
4938                &self.config,
4939            )
4940        };
4941        new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4942        if let Some(report) = observed_addr {
4943            if let Some(updated) = new_path.update_observed_addr_report(report) {
4944                tracing::info!("adding observed addr event from migration");
4945                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4946                    id: path_id,
4947                    addr: updated,
4948                }));
4949            }
4950        }
4951        new_path.send_new_challenge = true;
4952
4953        let mut prev = mem::replace(path, new_path);
4954        // Don't clobber the original path if the previous one hasn't been validated yet
4955        if !prev.is_validating_path() {
4956            prev.send_new_challenge = true;
4957            // We haven't updated the remote CID yet, this captures the remote CID we were using on
4958            // the previous path.
4959
4960            known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4961        }
4962
4963        // We need to re-assign the correct remote to this path in qlog
4964        self.qlog.emit_tuple_assigned(path_id, remote, now);
4965
4966        self.timers.set(
4967            Timer::PerPath(path_id, PathTimer::PathValidation),
4968            now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4969            self.qlog.with_time(now),
4970        );
4971    }
4972
4973    /// Handle a change in the local address, i.e. an active migration
4974    pub fn local_address_changed(&mut self) {
4975        // TODO(flub): if multipath is enabled this needs to create a new path entirely.
4976        self.update_rem_cid(PathId::ZERO);
4977        self.ping();
4978    }
4979
4980    /// Switch to a previously unused remote connection ID, if possible
4981    fn update_rem_cid(&mut self, path_id: PathId) {
4982        let Some((reset_token, retired)) =
4983            self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4984        else {
4985            return;
4986        };
4987
4988        // Retire the current remote CID and any CIDs we had to skip.
4989        self.spaces[SpaceId::Data]
4990            .pending
4991            .retire_cids
4992            .extend(retired.map(|seq| (path_id, seq)));
4993        let remote = self.path_data(path_id).remote;
4994        self.set_reset_token(path_id, remote, reset_token);
4995    }
4996
4997    /// Sends this reset token to the endpoint
4998    ///
4999    /// The endpoint needs to know the reset tokens issued by the peer, so that if the peer
5000    /// sends a reset token it knows to route it to this connection. See RFC 9000 section
5001    /// 10.3. Stateless Reset.
5002    ///
5003    /// Reset tokens are different for each path, the endpoint identifies paths by peer
5004    /// socket address however, not by path ID.
5005    fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5006        self.endpoint_events
5007            .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5008
5009        // During the handshake the server sends a reset token in the transport
5010        // parameters. When we are the client and we receive the reset token during the
5011        // handshake we want this to affect our peer transport parameters.
5012        // TODO(flub): Pretty sure this is pointless, the entire params is overwritten
5013        //    shortly after this was called.  And then the params don't have this anymore.
5014        if path_id == PathId::ZERO {
5015            self.peer_params.stateless_reset_token = Some(reset_token);
5016        }
5017    }
5018
5019    /// Issue an initial set of connection IDs to the peer upon connection
5020    fn issue_first_cids(&mut self, now: Instant) {
5021        if self
5022            .local_cid_state
5023            .get(&PathId::ZERO)
5024            .expect("PathId::ZERO exists when the connection is created")
5025            .cid_len()
5026            == 0
5027        {
5028            return;
5029        }
5030
5031        // Subtract 1 to account for the CID we supplied while handshaking
5032        let mut n = self.peer_params.issue_cids_limit() - 1;
5033        if let ConnectionSide::Server { server_config } = &self.side {
5034            if server_config.has_preferred_address() {
5035                // We also sent a CID in the transport parameters
5036                n -= 1;
5037            }
5038        }
5039        self.endpoint_events
5040            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5041    }
5042
5043    /// Issues an initial set of CIDs for paths that have not yet had any CIDs issued
5044    ///
5045    /// Later CIDs are issued when CIDs expire or are retired by the peer.
5046    fn issue_first_path_cids(&mut self, now: Instant) {
5047        if let Some(max_path_id) = self.max_path_id() {
5048            let mut path_id = self.max_path_id_with_cids.next();
5049            while path_id <= max_path_id {
5050                self.endpoint_events
5051                    .push_back(EndpointEventInner::NeedIdentifiers(
5052                        path_id,
5053                        now,
5054                        self.peer_params.issue_cids_limit(),
5055                    ));
5056                path_id = path_id.next();
5057            }
5058            self.max_path_id_with_cids = max_path_id;
5059        }
5060    }
5061
5062    /// Populates a packet with frames
5063    ///
5064    /// This tries to fit as many frames as possible into the packet.
5065    ///
5066    /// *path_exclusive_only* means to only build frames which can only be sent on this
5067    /// *path.  This is used in multipath for backup paths while there is still an active
5068    /// *path.
5069    fn populate_packet(
5070        &mut self,
5071        now: Instant,
5072        space_id: SpaceId,
5073        path_id: PathId,
5074        path_exclusive_only: bool,
5075        buf: &mut impl BufMut,
5076        pn: u64,
5077        #[allow(unused)] qlog: &mut QlogSentPacket,
5078    ) -> SentFrames {
5079        let mut sent = SentFrames::default();
5080        let is_multipath_negotiated = self.is_multipath_negotiated();
5081        let space = &mut self.spaces[space_id];
5082        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5083        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5084        space
5085            .for_path(path_id)
5086            .pending_acks
5087            .maybe_ack_non_eliciting();
5088
5089        // HANDSHAKE_DONE
5090        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5091            trace!("HANDSHAKE_DONE");
5092            buf.write(frame::FrameType::HANDSHAKE_DONE);
5093            qlog.frame(&Frame::HandshakeDone);
5094            sent.retransmits.get_or_create().handshake_done = true;
5095            // This is just a u8 counter and the frame is typically just sent once
5096            self.stats.frame_tx.handshake_done =
5097                self.stats.frame_tx.handshake_done.saturating_add(1);
5098        }
5099
5100        // REACH_OUT
5101        // TODO(@divma): path explusive considerations
5102        if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5103            while let Some(local_addr) = addresses.pop() {
5104                let reach_out = frame::ReachOut::new(*round, local_addr);
5105                if buf.remaining_mut() > reach_out.size() {
5106                    trace!(%round, ?local_addr, "REACH_OUT");
5107                    reach_out.write(buf);
5108                    let sent_reachouts = sent
5109                        .retransmits
5110                        .get_or_create()
5111                        .reach_out
5112                        .get_or_insert_with(|| (*round, Default::default()));
5113                    sent_reachouts.1.push(local_addr);
5114                    self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5115                    qlog.frame(&Frame::ReachOut(reach_out));
5116                } else {
5117                    addresses.push(local_addr);
5118                    break;
5119                }
5120            }
5121            if addresses.is_empty() {
5122                space.pending.reach_out = None;
5123            }
5124        }
5125
5126        // OBSERVED_ADDR
5127        if !path_exclusive_only
5128            && space_id == SpaceId::Data
5129            && self
5130                .config
5131                .address_discovery_role
5132                .should_report(&self.peer_params.address_discovery_role)
5133            && (!path.observed_addr_sent || space.pending.observed_addr)
5134        {
5135            let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5136            if buf.remaining_mut() > frame.size() {
5137                trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5138                frame.write(buf);
5139
5140                self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5141                path.observed_addr_sent = true;
5142
5143                self.stats.frame_tx.observed_addr += 1;
5144                sent.retransmits.get_or_create().observed_addr = true;
5145                space.pending.observed_addr = false;
5146                qlog.frame(&Frame::ObservedAddr(frame));
5147            }
5148        }
5149
5150        // PING
5151        if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5152            trace!("PING");
5153            buf.write(frame::FrameType::PING);
5154            sent.non_retransmits = true;
5155            self.stats.frame_tx.ping += 1;
5156            qlog.frame(&Frame::Ping);
5157        }
5158
5159        // IMMEDIATE_ACK
5160        if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5161            debug_assert_eq!(
5162                space_id,
5163                SpaceId::Data,
5164                "immediate acks must be sent in the data space"
5165            );
5166            trace!("IMMEDIATE_ACK");
5167            buf.write(frame::FrameType::IMMEDIATE_ACK);
5168            sent.non_retransmits = true;
5169            self.stats.frame_tx.immediate_ack += 1;
5170            qlog.frame(&Frame::ImmediateAck);
5171        }
5172
5173        // ACK
5174        // TODO(flub): Should this send acks for this path anyway?
5175
5176        if !path_exclusive_only {
5177            for path_id in space
5178                .number_spaces
5179                .iter_mut()
5180                .filter(|(_, pns)| pns.pending_acks.can_send())
5181                .map(|(&path_id, _)| path_id)
5182                .collect::<Vec<_>>()
5183            {
5184                Self::populate_acks(
5185                    now,
5186                    self.receiving_ecn,
5187                    &mut sent,
5188                    path_id,
5189                    space_id,
5190                    space,
5191                    is_multipath_negotiated,
5192                    buf,
5193                    &mut self.stats,
5194                    qlog,
5195                );
5196            }
5197        }
5198
5199        // ACK_FREQUENCY
5200        if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5201            let sequence_number = self.ack_frequency.next_sequence_number();
5202
5203            // Safe to unwrap because this is always provided when ACK frequency is enabled
5204            let config = self.config.ack_frequency_config.as_ref().unwrap();
5205
5206            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
5207            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5208                path.rtt.get(),
5209                config,
5210                &self.peer_params,
5211            );
5212
5213            trace!(?max_ack_delay, "ACK_FREQUENCY");
5214
5215            let frame = frame::AckFrequency {
5216                sequence: sequence_number,
5217                ack_eliciting_threshold: config.ack_eliciting_threshold,
5218                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5219                reordering_threshold: config.reordering_threshold,
5220            };
5221            frame.encode(buf);
5222            qlog.frame(&Frame::AckFrequency(frame));
5223
5224            sent.retransmits.get_or_create().ack_frequency = true;
5225
5226            self.ack_frequency
5227                .ack_frequency_sent(path_id, pn, max_ack_delay);
5228            self.stats.frame_tx.ack_frequency += 1;
5229        }
5230
5231        // PATH_CHALLENGE
5232        if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5233            && space_id == SpaceId::Data
5234            && path.send_new_challenge
5235        {
5236            path.send_new_challenge = false;
5237
5238            // Generate a new challenge every time we send a new PATH_CHALLENGE
5239            let token = self.rng.random();
5240            let info = paths::SentChallengeInfo {
5241                sent_instant: now,
5242                remote: path.remote,
5243            };
5244            path.challenges_sent.insert(token, info);
5245            sent.non_retransmits = true;
5246            sent.requires_padding = true;
5247            let challenge = frame::PathChallenge(token);
5248            trace!(%challenge, "sending new challenge");
5249            buf.write(challenge);
5250            qlog.frame(&Frame::PathChallenge(challenge));
5251            self.stats.frame_tx.path_challenge += 1;
5252            let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5253            self.timers.set(
5254                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5255                now + pto,
5256                self.qlog.with_time(now),
5257            );
5258
5259            if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5260                // queue informing the path status along with the challenge
5261                space.pending.path_status.insert(path_id);
5262            }
5263
5264            // Always include an OBSERVED_ADDR frame with a PATH_CHALLENGE, regardless
5265            // of whether one has already been sent on this path.
5266            if space_id == SpaceId::Data
5267                && self
5268                    .config
5269                    .address_discovery_role
5270                    .should_report(&self.peer_params.address_discovery_role)
5271            {
5272                let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5273                if buf.remaining_mut() > frame.size() {
5274                    frame.write(buf);
5275                    qlog.frame(&Frame::ObservedAddr(frame));
5276
5277                    self.next_observed_addr_seq_no =
5278                        self.next_observed_addr_seq_no.saturating_add(1u8);
5279                    path.observed_addr_sent = true;
5280
5281                    self.stats.frame_tx.observed_addr += 1;
5282                    sent.retransmits.get_or_create().observed_addr = true;
5283                    space.pending.observed_addr = false;
5284                }
5285            }
5286        }
5287
5288        // PATH_RESPONSE
5289        if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5290            if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5291                sent.non_retransmits = true;
5292                sent.requires_padding = true;
5293                let response = frame::PathResponse(token);
5294                trace!(%response, "sending response");
5295                buf.write(response);
5296                qlog.frame(&Frame::PathResponse(response));
5297                self.stats.frame_tx.path_response += 1;
5298
5299                // NOTE: this is technically not required but might be useful to ride the
5300                // request/response nature of path challenges to refresh an observation
5301                // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
5302                if space_id == SpaceId::Data
5303                    && self
5304                        .config
5305                        .address_discovery_role
5306                        .should_report(&self.peer_params.address_discovery_role)
5307                {
5308                    let frame =
5309                        frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5310                    if buf.remaining_mut() > frame.size() {
5311                        frame.write(buf);
5312                        qlog.frame(&Frame::ObservedAddr(frame));
5313
5314                        self.next_observed_addr_seq_no =
5315                            self.next_observed_addr_seq_no.saturating_add(1u8);
5316                        path.observed_addr_sent = true;
5317
5318                        self.stats.frame_tx.observed_addr += 1;
5319                        sent.retransmits.get_or_create().observed_addr = true;
5320                        space.pending.observed_addr = false;
5321                    }
5322                }
5323            }
5324        }
5325
5326        // CRYPTO
5327        while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5328            let mut frame = match space.pending.crypto.pop_front() {
5329                Some(x) => x,
5330                None => break,
5331            };
5332
5333            // Calculate the maximum amount of crypto data we can store in the buffer.
5334            // Since the offset is known, we can reserve the exact size required to encode it.
5335            // For length we reserve 2bytes which allows to encode up to 2^14,
5336            // which is more than what fits into normally sized QUIC frames.
5337            let max_crypto_data_size = buf.remaining_mut()
5338                - 1 // Frame Type
5339                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5340                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
5341
5342            let len = frame
5343                .data
5344                .len()
5345                .min(2usize.pow(14) - 1)
5346                .min(max_crypto_data_size);
5347
5348            let data = frame.data.split_to(len);
5349            let truncated = frame::Crypto {
5350                offset: frame.offset,
5351                data,
5352            };
5353            trace!(
5354                "CRYPTO: off {} len {}",
5355                truncated.offset,
5356                truncated.data.len()
5357            );
5358            truncated.encode(buf);
5359            self.stats.frame_tx.crypto += 1;
5360
5361            // The clone is cheap but we still cfg it out if qlog is disabled.
5362            #[cfg(feature = "qlog")]
5363            qlog.frame(&Frame::Crypto(truncated.clone()));
5364            sent.retransmits.get_or_create().crypto.push_back(truncated);
5365            if !frame.data.is_empty() {
5366                frame.offset += len as u64;
5367                space.pending.crypto.push_front(frame);
5368            }
5369        }
5370
5371        // TODO(flub): maybe this is much higher priority?
5372        // PATH_ABANDON
5373        while !path_exclusive_only
5374            && space_id == SpaceId::Data
5375            && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5376        {
5377            let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5378                break;
5379            };
5380            let frame = frame::PathAbandon {
5381                path_id,
5382                error_code,
5383            };
5384            frame.encode(buf);
5385            qlog.frame(&Frame::PathAbandon(frame));
5386            self.stats.frame_tx.path_abandon += 1;
5387            trace!(%path_id, "PATH_ABANDON");
5388            sent.retransmits
5389                .get_or_create()
5390                .path_abandon
5391                .entry(path_id)
5392                .or_insert(error_code);
5393        }
5394
5395        // PATH_STATUS_AVAILABLE & PATH_STATUS_BACKUP
5396        while !path_exclusive_only
5397            && space_id == SpaceId::Data
5398            && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5399        {
5400            let Some(path_id) = space.pending.path_status.pop_first() else {
5401                break;
5402            };
5403            let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5404                trace!(%path_id, "discarding queued path status for unknown path");
5405                continue;
5406            };
5407
5408            let seq = path.status.seq();
5409            sent.retransmits.get_or_create().path_status.insert(path_id);
5410            match path.local_status() {
5411                PathStatus::Available => {
5412                    let frame = frame::PathStatusAvailable {
5413                        path_id,
5414                        status_seq_no: seq,
5415                    };
5416                    frame.encode(buf);
5417                    qlog.frame(&Frame::PathStatusAvailable(frame));
5418                    self.stats.frame_tx.path_status_available += 1;
5419                    trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5420                }
5421                PathStatus::Backup => {
5422                    let frame = frame::PathStatusBackup {
5423                        path_id,
5424                        status_seq_no: seq,
5425                    };
5426                    frame.encode(buf);
5427                    qlog.frame(&Frame::PathStatusBackup(frame));
5428                    self.stats.frame_tx.path_status_backup += 1;
5429                    trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5430                }
5431            }
5432        }
5433
5434        // MAX_PATH_ID
5435        if space_id == SpaceId::Data
5436            && space.pending.max_path_id
5437            && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5438        {
5439            let frame = frame::MaxPathId(self.local_max_path_id);
5440            frame.encode(buf);
5441            qlog.frame(&Frame::MaxPathId(frame));
5442            space.pending.max_path_id = false;
5443            sent.retransmits.get_or_create().max_path_id = true;
5444            trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5445            self.stats.frame_tx.max_path_id += 1;
5446        }
5447
5448        // PATHS_BLOCKED
5449        if space_id == SpaceId::Data
5450            && space.pending.paths_blocked
5451            && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5452        {
5453            let frame = frame::PathsBlocked(self.remote_max_path_id);
5454            frame.encode(buf);
5455            qlog.frame(&Frame::PathsBlocked(frame));
5456            space.pending.paths_blocked = false;
5457            sent.retransmits.get_or_create().paths_blocked = true;
5458            trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5459            self.stats.frame_tx.paths_blocked += 1;
5460        }
5461
5462        // PATH_CIDS_BLOCKED
5463        while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5464        {
5465            let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5466                break;
5467            };
5468            let next_seq = match self.rem_cids.get(&path_id) {
5469                Some(cid_queue) => cid_queue.active_seq() + 1,
5470                None => 0,
5471            };
5472            let frame = frame::PathCidsBlocked {
5473                path_id,
5474                next_seq: VarInt(next_seq),
5475            };
5476            frame.encode(buf);
5477            qlog.frame(&Frame::PathCidsBlocked(frame));
5478            sent.retransmits
5479                .get_or_create()
5480                .path_cids_blocked
5481                .push(path_id);
5482            trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5483            self.stats.frame_tx.path_cids_blocked += 1;
5484        }
5485
5486        // RESET_STREAM, STOP_SENDING, MAX_DATA, MAX_STREAM_DATA, MAX_STREAMS
5487        if space_id == SpaceId::Data {
5488            self.streams.write_control_frames(
5489                buf,
5490                &mut space.pending,
5491                &mut sent.retransmits,
5492                &mut self.stats.frame_tx,
5493                qlog,
5494            );
5495        }
5496
5497        // NEW_CONNECTION_ID
5498        let cid_len = self
5499            .local_cid_state
5500            .values()
5501            .map(|cid_state| cid_state.cid_len())
5502            .max()
5503            .expect("some local CID state must exist");
5504        let new_cid_size_bound =
5505            frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5506        while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5507            let issued = match space.pending.new_cids.pop() {
5508                Some(x) => x,
5509                None => break,
5510            };
5511            let retire_prior_to = self
5512                .local_cid_state
5513                .get(&issued.path_id)
5514                .map(|cid_state| cid_state.retire_prior_to())
5515                .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5516
5517            let cid_path_id = match is_multipath_negotiated {
5518                true => {
5519                    trace!(
5520                        path_id = ?issued.path_id,
5521                        sequence = issued.sequence,
5522                        id = %issued.id,
5523                        "PATH_NEW_CONNECTION_ID",
5524                    );
5525                    self.stats.frame_tx.path_new_connection_id += 1;
5526                    Some(issued.path_id)
5527                }
5528                false => {
5529                    trace!(
5530                        sequence = issued.sequence,
5531                        id = %issued.id,
5532                        "NEW_CONNECTION_ID"
5533                    );
5534                    debug_assert_eq!(issued.path_id, PathId::ZERO);
5535                    self.stats.frame_tx.new_connection_id += 1;
5536                    None
5537                }
5538            };
5539            let frame = frame::NewConnectionId {
5540                path_id: cid_path_id,
5541                sequence: issued.sequence,
5542                retire_prior_to,
5543                id: issued.id,
5544                reset_token: issued.reset_token,
5545            };
5546            frame.encode(buf);
5547            sent.retransmits.get_or_create().new_cids.push(issued);
5548            qlog.frame(&Frame::NewConnectionId(frame));
5549        }
5550
5551        // RETIRE_CONNECTION_ID
5552        let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5553        while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5554            let (path_id, sequence) = match space.pending.retire_cids.pop() {
5555                Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5556                    trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5557                    self.stats.frame_tx.retire_connection_id += 1;
5558                    (None, seq)
5559                }
5560                Some((path_id, seq)) => {
5561                    trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5562                    self.stats.frame_tx.path_retire_connection_id += 1;
5563                    (Some(path_id), seq)
5564                }
5565                None => break,
5566            };
5567            let frame = frame::RetireConnectionId { path_id, sequence };
5568            frame.encode(buf);
5569            qlog.frame(&Frame::RetireConnectionId(frame));
5570            sent.retransmits
5571                .get_or_create()
5572                .retire_cids
5573                .push((path_id.unwrap_or_default(), sequence));
5574        }
5575
5576        // DATAGRAM
5577        let mut sent_datagrams = false;
5578        while !path_exclusive_only
5579            && buf.remaining_mut() > Datagram::SIZE_BOUND
5580            && space_id == SpaceId::Data
5581        {
5582            let prev_remaining = buf.remaining_mut();
5583            match self.datagrams.write(buf) {
5584                true => {
5585                    sent_datagrams = true;
5586                    sent.non_retransmits = true;
5587                    self.stats.frame_tx.datagram += 1;
5588                    qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5589                }
5590                false => break,
5591            }
5592        }
5593        if self.datagrams.send_blocked && sent_datagrams {
5594            self.events.push_back(Event::DatagramsUnblocked);
5595            self.datagrams.send_blocked = false;
5596        }
5597
5598        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5599
5600        // NEW_TOKEN
5601        while let Some(remote_addr) = space.pending.new_tokens.pop() {
5602            if path_exclusive_only {
5603                break;
5604            }
5605            debug_assert_eq!(space_id, SpaceId::Data);
5606            let ConnectionSide::Server { server_config } = &self.side else {
5607                panic!("NEW_TOKEN frames should not be enqueued by clients");
5608            };
5609
5610            if remote_addr != path.remote {
5611                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
5612                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
5613                // frames upon an path change. Instead, when the new path becomes validated,
5614                // NEW_TOKEN frames may be enqueued for the new path instead.
5615                continue;
5616            }
5617
5618            let token = Token::new(
5619                TokenPayload::Validation {
5620                    ip: remote_addr.ip(),
5621                    issued: server_config.time_source.now(),
5622                },
5623                &mut self.rng,
5624            );
5625            let new_token = NewToken {
5626                token: token.encode(&*server_config.token_key).into(),
5627            };
5628
5629            if buf.remaining_mut() < new_token.size() {
5630                space.pending.new_tokens.push(remote_addr);
5631                break;
5632            }
5633
5634            trace!("NEW_TOKEN");
5635            new_token.encode(buf);
5636            qlog.frame(&Frame::NewToken(new_token));
5637            sent.retransmits
5638                .get_or_create()
5639                .new_tokens
5640                .push(remote_addr);
5641            self.stats.frame_tx.new_token += 1;
5642        }
5643
5644        // STREAM
5645        if !path_exclusive_only && space_id == SpaceId::Data {
5646            sent.stream_frames =
5647                self.streams
5648                    .write_stream_frames(buf, self.config.send_fairness, qlog);
5649            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5650        }
5651
5652        // ADD_ADDRESS
5653        // TODO(@divma): check if we need to do path exclusive filters
5654        while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5655            if let Some(added_address) = space.pending.add_address.pop_last() {
5656                trace!(
5657                    seq = %added_address.seq_no,
5658                    ip = ?added_address.ip,
5659                    port = added_address.port,
5660                    "ADD_ADDRESS",
5661                );
5662                added_address.write(buf);
5663                sent.retransmits
5664                    .get_or_create()
5665                    .add_address
5666                    .insert(added_address);
5667                self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5668                qlog.frame(&Frame::AddAddress(added_address));
5669            } else {
5670                break;
5671            }
5672        }
5673
5674        // REMOVE_ADDRESS
5675        while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5676            if let Some(removed_address) = space.pending.remove_address.pop_last() {
5677                trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5678                removed_address.write(buf);
5679                sent.retransmits
5680                    .get_or_create()
5681                    .remove_address
5682                    .insert(removed_address);
5683                self.stats.frame_tx.remove_address =
5684                    self.stats.frame_tx.remove_address.saturating_add(1);
5685                qlog.frame(&Frame::RemoveAddress(removed_address));
5686            } else {
5687                break;
5688            }
5689        }
5690
5691        sent
5692    }
5693
5694    /// Write pending ACKs into a buffer
5695    fn populate_acks(
5696        now: Instant,
5697        receiving_ecn: bool,
5698        sent: &mut SentFrames,
5699        path_id: PathId,
5700        space_id: SpaceId,
5701        space: &mut PacketSpace,
5702        is_multipath_negotiated: bool,
5703        buf: &mut impl BufMut,
5704        stats: &mut ConnectionStats,
5705        #[allow(unused)] qlog: &mut QlogSentPacket,
5706    ) {
5707        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
5708        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5709
5710        debug_assert!(
5711            is_multipath_negotiated || path_id == PathId::ZERO,
5712            "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5713        );
5714        if is_multipath_negotiated {
5715            debug_assert!(
5716                space_id == SpaceId::Data || path_id == PathId::ZERO,
5717                "path acks must be sent in 1RTT space (have {space_id:?})"
5718            );
5719        }
5720
5721        let pns = space.for_path(path_id);
5722        let ranges = pns.pending_acks.ranges();
5723        debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5724        let ecn = if receiving_ecn {
5725            Some(&pns.ecn_counters)
5726        } else {
5727            None
5728        };
5729        if let Some(max) = ranges.max() {
5730            sent.largest_acked.insert(path_id, max);
5731        }
5732
5733        let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5734        // TODO: This should come from `TransportConfig` if that gets configurable.
5735        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5736        let delay = delay_micros >> ack_delay_exp.into_inner();
5737
5738        if is_multipath_negotiated && space_id == SpaceId::Data {
5739            if !ranges.is_empty() {
5740                trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5741                frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5742                qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5743                stats.frame_tx.path_acks += 1;
5744            }
5745        } else {
5746            trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5747            frame::Ack::encode(delay as _, ranges, ecn, buf);
5748            stats.frame_tx.acks += 1;
5749            qlog.frame_ack(delay, ranges, ecn);
5750        }
5751    }
5752
5753    fn close_common(&mut self) {
5754        trace!("connection closed");
5755        self.timers.reset();
5756    }
5757
5758    fn set_close_timer(&mut self, now: Instant) {
5759        // QUIC-MULTIPATH § 2.6 Connection Closure: draining for 3*PTO with PTO the max of
5760        // the PTO for all paths.
5761        self.timers.set(
5762            Timer::Conn(ConnTimer::Close),
5763            now + 3 * self.pto_max_path(self.highest_space),
5764            self.qlog.with_time(now),
5765        );
5766    }
5767
5768    /// Handle transport parameters received from the peer
5769    ///
5770    /// *rem_cid* and *loc_cid* are the source and destination CIDs respectively of the
5771    /// *packet into which the transport parameters arrived.
5772    fn handle_peer_params(
5773        &mut self,
5774        params: TransportParameters,
5775        loc_cid: ConnectionId,
5776        rem_cid: ConnectionId,
5777        now: Instant,
5778    ) -> Result<(), TransportError> {
5779        if Some(self.orig_rem_cid) != params.initial_src_cid
5780            || (self.side.is_client()
5781                && (Some(self.initial_dst_cid) != params.original_dst_cid
5782                    || self.retry_src_cid != params.retry_src_cid))
5783        {
5784            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5785                "CID authentication failure",
5786            ));
5787        }
5788        if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5789            return Err(TransportError::PROTOCOL_VIOLATION(
5790                "multipath must not use zero-length CIDs",
5791            ));
5792        }
5793
5794        self.set_peer_params(params);
5795        self.qlog.emit_peer_transport_params_received(self, now);
5796
5797        Ok(())
5798    }
5799
5800    fn set_peer_params(&mut self, params: TransportParameters) {
5801        self.streams.set_params(&params);
5802        self.idle_timeout =
5803            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5804        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5805
5806        if let Some(ref info) = params.preferred_address {
5807            // During the handshake PathId::ZERO exists.
5808            self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5809                path_id: None,
5810                sequence: 1,
5811                id: info.connection_id,
5812                reset_token: info.stateless_reset_token,
5813                retire_prior_to: 0,
5814            })
5815            .expect(
5816                "preferred address CID is the first received, and hence is guaranteed to be legal",
5817            );
5818            let remote = self.path_data(PathId::ZERO).remote;
5819            self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5820        }
5821        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
5822
5823        let mut multipath_enabled = None;
5824        if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5825            self.config.get_initial_max_path_id(),
5826            params.initial_max_path_id,
5827        ) {
5828            // multipath is enabled, register the local and remote maximums
5829            self.local_max_path_id = local_max_path_id;
5830            self.remote_max_path_id = remote_max_path_id;
5831            let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5832            debug!(%initial_max_path_id, "multipath negotiated");
5833            multipath_enabled = Some(initial_max_path_id);
5834        }
5835
5836        if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5837            self.config
5838                .max_remote_nat_traversal_addresses
5839                .zip(params.max_remote_nat_traversal_addresses)
5840        {
5841            if let Some(max_initial_paths) =
5842                multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5843            {
5844                let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5845                let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5846                self.iroh_hp =
5847                    iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5848                debug!(
5849                    %max_remote_addresses, %max_local_addresses,
5850                    "iroh hole punching negotiated"
5851                );
5852
5853                match self.side() {
5854                    Side::Client => {
5855                        if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5856                            // in this case the client might try to open `max_remote_addresses` new
5857                            // paths, but the current multipath configuration will not allow it
5858                            warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5859                        } else if max_local_addresses as u64
5860                            > params.active_connection_id_limit.into_inner()
5861                        {
5862                            // the server allows us to send at most `params.active_connection_id_limit`
5863                            // but they might need at least `max_local_addresses` to effectively send
5864                            // `PATH_CHALLENGE` frames to each advertised local address
5865                            warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5866                        }
5867                    }
5868                    Side::Server => {
5869                        if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5870                            warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5871                        }
5872                    }
5873                }
5874            } else {
5875                debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5876            }
5877        }
5878
5879        self.peer_params = params;
5880        let peer_max_udp_payload_size =
5881            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5882        self.path_data_mut(PathId::ZERO)
5883            .mtud
5884            .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5885    }
5886
5887    /// Decrypts a packet, returning the packet number on success
5888    fn decrypt_packet(
5889        &mut self,
5890        now: Instant,
5891        path_id: PathId,
5892        packet: &mut Packet,
5893    ) -> Result<Option<u64>, Option<TransportError>> {
5894        let result = packet_crypto::decrypt_packet_body(
5895            packet,
5896            path_id,
5897            &self.spaces,
5898            self.zero_rtt_crypto.as_ref(),
5899            self.key_phase,
5900            self.prev_crypto.as_ref(),
5901            self.next_crypto.as_ref(),
5902        )?;
5903
5904        let result = match result {
5905            Some(r) => r,
5906            None => return Ok(None),
5907        };
5908
5909        if result.outgoing_key_update_acked {
5910            if let Some(prev) = self.prev_crypto.as_mut() {
5911                prev.end_packet = Some((result.number, now));
5912                self.set_key_discard_timer(now, packet.header.space());
5913            }
5914        }
5915
5916        if result.incoming_key_update {
5917            trace!("key update authenticated");
5918            self.update_keys(Some((result.number, now)), true);
5919            self.set_key_discard_timer(now, packet.header.space());
5920        }
5921
5922        Ok(Some(result.number))
5923    }
5924
5925    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5926        trace!("executing key update");
5927        // Generate keys for the key phase after the one we're switching to, store them in
5928        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
5929        // `prev_crypto`.
5930        let new = self
5931            .crypto
5932            .next_1rtt_keys()
5933            .expect("only called for `Data` packets");
5934        self.key_phase_size = new
5935            .local
5936            .confidentiality_limit()
5937            .saturating_sub(KEY_UPDATE_MARGIN);
5938        let old = mem::replace(
5939            &mut self.spaces[SpaceId::Data]
5940                .crypto
5941                .as_mut()
5942                .unwrap() // safe because update_keys() can only be triggered by short packets
5943                .packet,
5944            mem::replace(self.next_crypto.as_mut().unwrap(), new),
5945        );
5946        self.spaces[SpaceId::Data]
5947            .iter_paths_mut()
5948            .for_each(|s| s.sent_with_keys = 0);
5949        self.prev_crypto = Some(PrevCrypto {
5950            crypto: old,
5951            end_packet,
5952            update_unacked: remote,
5953        });
5954        self.key_phase = !self.key_phase;
5955    }
5956
5957    fn peer_supports_ack_frequency(&self) -> bool {
5958        self.peer_params.min_ack_delay.is_some()
5959    }
5960
5961    /// Send an IMMEDIATE_ACK frame to the remote endpoint
5962    ///
5963    /// According to the spec, this will result in an error if the remote endpoint does not support
5964    /// the Acknowledgement Frequency extension
5965    pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5966        debug_assert_eq!(
5967            self.highest_space,
5968            SpaceId::Data,
5969            "immediate ack must be written in the data space"
5970        );
5971        self.spaces[self.highest_space]
5972            .for_path(path_id)
5973            .immediate_ack_pending = true;
5974    }
5975
5976    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
5977    #[cfg(test)]
5978    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5979        let (path_id, first_decode, remaining) = match &event.0 {
5980            ConnectionEventInner::Datagram(DatagramConnectionEvent {
5981                path_id,
5982                first_decode,
5983                remaining,
5984                ..
5985            }) => (path_id, first_decode, remaining),
5986            _ => return None,
5987        };
5988
5989        if remaining.is_some() {
5990            panic!("Packets should never be coalesced in tests");
5991        }
5992
5993        let decrypted_header = packet_crypto::unprotect_header(
5994            first_decode.clone(),
5995            &self.spaces,
5996            self.zero_rtt_crypto.as_ref(),
5997            self.peer_params.stateless_reset_token,
5998        )?;
5999
6000        let mut packet = decrypted_header.packet?;
6001        packet_crypto::decrypt_packet_body(
6002            &mut packet,
6003            *path_id,
6004            &self.spaces,
6005            self.zero_rtt_crypto.as_ref(),
6006            self.key_phase,
6007            self.prev_crypto.as_ref(),
6008            self.next_crypto.as_ref(),
6009        )
6010        .ok()?;
6011
6012        Some(packet.payload.to_vec())
6013    }
6014
6015    /// The number of bytes of packets containing retransmittable frames that have not been
6016    /// acknowledged or declared lost.
6017    #[cfg(test)]
6018    pub(crate) fn bytes_in_flight(&self) -> u64 {
6019        // TODO(@divma): consider including for multipath?
6020        self.path_data(PathId::ZERO).in_flight.bytes
6021    }
6022
6023    /// Number of bytes worth of non-ack-only packets that may be sent
6024    #[cfg(test)]
6025    pub(crate) fn congestion_window(&self) -> u64 {
6026        let path = self.path_data(PathId::ZERO);
6027        path.congestion
6028            .window()
6029            .saturating_sub(path.in_flight.bytes)
6030    }
6031
6032    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
6033    #[cfg(test)]
6034    pub(crate) fn is_idle(&self) -> bool {
6035        let current_timers = self.timers.values();
6036        current_timers
6037            .into_iter()
6038            .filter(|(timer, _)| {
6039                !matches!(
6040                    timer,
6041                    Timer::Conn(ConnTimer::KeepAlive)
6042                        | Timer::PerPath(_, PathTimer::PathKeepAlive)
6043                        | Timer::Conn(ConnTimer::PushNewCid)
6044                        | Timer::Conn(ConnTimer::KeyDiscard)
6045                )
6046            })
6047            .min_by_key(|(_, time)| *time)
6048            .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6049    }
6050
6051    /// Whether explicit congestion notification is in use on outgoing packets.
6052    #[cfg(test)]
6053    pub(crate) fn using_ecn(&self) -> bool {
6054        self.path_data(PathId::ZERO).sending_ecn
6055    }
6056
6057    /// The number of received bytes in the current path
6058    #[cfg(test)]
6059    pub(crate) fn total_recvd(&self) -> u64 {
6060        self.path_data(PathId::ZERO).total_recvd
6061    }
6062
6063    #[cfg(test)]
6064    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6065        self.local_cid_state
6066            .get(&PathId::ZERO)
6067            .unwrap()
6068            .active_seq()
6069    }
6070
6071    #[cfg(test)]
6072    #[track_caller]
6073    pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6074        self.local_cid_state
6075            .get(&PathId(path_id))
6076            .unwrap()
6077            .active_seq()
6078    }
6079
6080    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
6081    /// with updated `retire_prior_to` field set to `v`
6082    #[cfg(test)]
6083    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6084        let n = self
6085            .local_cid_state
6086            .get_mut(&PathId::ZERO)
6087            .unwrap()
6088            .assign_retire_seq(v);
6089        self.endpoint_events
6090            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6091    }
6092
6093    /// Check the current active remote CID sequence for `PathId::ZERO`
6094    #[cfg(test)]
6095    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6096        self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6097    }
6098
6099    /// Returns the detected maximum udp payload size for the current path
6100    #[cfg(test)]
6101    pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6102        self.path_data(path_id).current_mtu()
6103    }
6104
6105    /// Triggers path validation on all paths
6106    #[cfg(test)]
6107    pub(crate) fn trigger_path_validation(&mut self) {
6108        for path in self.paths.values_mut() {
6109            path.data.send_new_challenge = true;
6110        }
6111    }
6112
6113    /// Whether we have 1-RTT data to send
6114    ///
6115    /// This checks for frames that can only be sent in the data space (1-RTT):
6116    /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated.
6117    /// - Pending PATH_RESPONSE frames.
6118    /// - Pending data to send in STREAM frames.
6119    /// - Pending DATAGRAM frames to send.
6120    ///
6121    /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that
6122    /// may need to be sent.
6123    fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6124        let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6125            path.data.send_new_challenge
6126                || path
6127                    .prev
6128                    .as_ref()
6129                    .is_some_and(|(_, path)| path.send_new_challenge)
6130                || !path.data.path_responses.is_empty()
6131        });
6132        let other = self.streams.can_send_stream_data()
6133            || self
6134                .datagrams
6135                .outgoing
6136                .front()
6137                .is_some_and(|x| x.size(true) <= max_size);
6138        SendableFrames {
6139            acks: false,
6140            other,
6141            close: false,
6142            path_exclusive,
6143        }
6144    }
6145
6146    /// Terminate the connection instantly, without sending a close packet
6147    fn kill(&mut self, reason: ConnectionError) {
6148        self.close_common();
6149        self.state.move_to_drained(Some(reason));
6150        self.endpoint_events.push_back(EndpointEventInner::Drained);
6151    }
6152
6153    /// Storage size required for the largest packet that can be transmitted on all currently
6154    /// available paths
6155    ///
6156    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
6157    ///
6158    /// When multipath is enabled, this value is the minimum MTU across all available paths.
6159    pub fn current_mtu(&self) -> u16 {
6160        self.paths
6161            .iter()
6162            .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6163            .map(|(_path_id, path_state)| path_state.data.current_mtu())
6164            .min()
6165            .expect("There is always at least one available path")
6166    }
6167
6168    /// Size of non-frame data for a 1-RTT packet
6169    ///
6170    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
6171    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
6172    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
6173    /// latency and packet loss.
6174    fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6175        let pn_len = PacketNumber::new(
6176            pn,
6177            self.spaces[SpaceId::Data]
6178                .for_path(path)
6179                .largest_acked_packet
6180                .unwrap_or(0),
6181        )
6182        .len();
6183
6184        // 1 byte for flags
6185        1 + self
6186            .rem_cids
6187            .get(&path)
6188            .map(|cids| cids.active().len())
6189            .unwrap_or(20)      // Max CID len in QUIC v1
6190            + pn_len
6191            + self.tag_len_1rtt()
6192    }
6193
6194    fn predict_1rtt_overhead_no_pn(&self) -> usize {
6195        let pn_len = 4;
6196
6197        let cid_len = self
6198            .rem_cids
6199            .values()
6200            .map(|cids| cids.active().len())
6201            .max()
6202            .unwrap_or(20); // Max CID len in QUIC v1
6203
6204        // 1 byte for flags
6205        1 + cid_len + pn_len + self.tag_len_1rtt()
6206    }
6207
6208    fn tag_len_1rtt(&self) -> usize {
6209        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6210            Some(crypto) => Some(&*crypto.packet.local),
6211            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6212        };
6213        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
6214        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
6215        // but that would needlessly prevent sending datagrams during 0-RTT.
6216        key.map_or(16, |x| x.tag_len())
6217    }
6218
6219    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
6220    fn on_path_validated(&mut self, path_id: PathId) {
6221        self.path_data_mut(path_id).validated = true;
6222        let ConnectionSide::Server { server_config } = &self.side else {
6223            return;
6224        };
6225        let remote_addr = self.path_data(path_id).remote;
6226        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6227        new_tokens.clear();
6228        for _ in 0..server_config.validation_token.sent {
6229            new_tokens.push(remote_addr);
6230        }
6231    }
6232
6233    /// Handle new path status information: PATH_STATUS_AVAILABLE, PATH_STATUS_BACKUP
6234    fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6235        if let Some(path) = self.paths.get_mut(&path_id) {
6236            path.data.status.remote_update(status, status_seq_no);
6237        } else {
6238            debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6239        }
6240        self.events.push_back(
6241            PathEvent::RemoteStatus {
6242                id: path_id,
6243                status,
6244            }
6245            .into(),
6246        );
6247    }
6248
6249    /// Returns the maximum [`PathId`] to be used for sending in this connection.
6250    ///
6251    /// This is calculated as minimum between the local and remote's maximums when multipath is
6252    /// enabled, or `None` when disabled.
6253    ///
6254    /// For data that's received, we should use [`Self::local_max_path_id`] instead.
6255    /// The reasoning is that the remote might already have updated to its own newer
6256    /// [`Self::max_path_id`] after sending out a `MAX_PATH_ID` frame, but it got re-ordered.
6257    fn max_path_id(&self) -> Option<PathId> {
6258        if self.is_multipath_negotiated() {
6259            Some(self.remote_max_path_id.min(self.local_max_path_id))
6260        } else {
6261            None
6262        }
6263    }
6264
6265    /// Add addresses the local endpoint considers are reachable for nat traversal
6266    pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6267        if let Some(added) = self.iroh_hp.add_local_address(address)? {
6268            self.spaces[SpaceId::Data].pending.add_address.insert(added);
6269        };
6270        Ok(())
6271    }
6272
6273    /// Removes an address the endpoing no longer considers reachable for nat traversal
6274    ///
6275    /// Addresses not present in the set will be silently ignored.
6276    pub fn remove_nat_traversal_address(
6277        &mut self,
6278        address: SocketAddr,
6279    ) -> Result<(), iroh_hp::Error> {
6280        if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6281            self.spaces[SpaceId::Data]
6282                .pending
6283                .remove_address
6284                .insert(removed);
6285        }
6286        Ok(())
6287    }
6288
6289    /// Get the current local nat traversal addresses
6290    pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6291        self.iroh_hp.get_local_nat_traversal_addresses()
6292    }
6293
6294    /// Get the currently advertised nat traversal addresses by the server
6295    pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6296        Ok(self
6297            .iroh_hp
6298            .client_side()?
6299            .get_remote_nat_traversal_addresses())
6300    }
6301
6302    /// Initiates a new nat traversal round
6303    ///
6304    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
6305    /// frames, and initiating probing of the known remote addresses. When a new round is
6306    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
6307    ///
6308    /// Returns the server addresses that are now being probed.
6309    pub fn initiate_nat_traversal_round(
6310        &mut self,
6311        now: Instant,
6312    ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6313        let client_state = self.iroh_hp.client_side_mut()?;
6314        let iroh_hp::NatTraversalRound {
6315            new_round,
6316            reach_out_at,
6317            addresses_to_probe,
6318            prev_round_path_ids,
6319        } = client_state.initiate_nat_traversal_round()?;
6320
6321        self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6322
6323        for path_id in prev_round_path_ids {
6324            // TODO(@divma): this sounds reasonable but we need if this actually works for the
6325            // purposes of the protocol
6326            let validated = self
6327                .path(path_id)
6328                .map(|path| path.validated)
6329                .unwrap_or(false);
6330
6331            if !validated {
6332                let _ = self.close_path(
6333                    now,
6334                    path_id,
6335                    TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6336                );
6337            }
6338        }
6339
6340        let mut err = None;
6341
6342        let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6343        let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6344        let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6345
6346        for (ip, port) in addresses_to_probe {
6347            // If this endpoint is an IPv6 endpoint we use IPv6 addresses for all remotes.
6348            let remote = match ip {
6349                IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6350                IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6351                IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6352                IpAddr::V6(_) => {
6353                    trace!("not using IPv6 nat candidate for IPv4 socket");
6354                    continue;
6355                }
6356            };
6357            match self.open_path_ensure(remote, PathStatus::Backup, now) {
6358                Ok((path_id, path_was_known)) if !path_was_known => {
6359                    path_ids.push(path_id);
6360                    probed_addresses.push(remote);
6361                }
6362                Ok((path_id, _)) => {
6363                    trace!(%path_id, %remote,"nat traversal: path existed for remote")
6364                }
6365                Err(e) => {
6366                    debug!(%remote, %e,"nat traversal: failed to probe remote");
6367                    err.get_or_insert(e);
6368                }
6369            }
6370        }
6371
6372        if let Some(err) = err {
6373            // We failed to probe any addresses, bail out
6374            if probed_addresses.is_empty() {
6375                return Err(iroh_hp::Error::Multipath(err));
6376            }
6377        }
6378
6379        self.iroh_hp
6380            .client_side_mut()
6381            .expect("connection side validated")
6382            .set_round_path_ids(path_ids);
6383
6384        Ok(probed_addresses)
6385    }
6386}
6387
6388impl fmt::Debug for Connection {
6389    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6390        f.debug_struct("Connection")
6391            .field("handshake_cid", &self.handshake_cid)
6392            .finish()
6393    }
6394}
6395
6396#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6397enum PathBlocked {
6398    No,
6399    AntiAmplification,
6400    Congestion,
6401    Pacing,
6402}
6403
6404/// Fields of `Connection` specific to it being client-side or server-side
6405enum ConnectionSide {
6406    Client {
6407        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
6408        token: Bytes,
6409        token_store: Arc<dyn TokenStore>,
6410        server_name: String,
6411    },
6412    Server {
6413        server_config: Arc<ServerConfig>,
6414    },
6415}
6416
6417impl ConnectionSide {
6418    fn remote_may_migrate(&self, state: &State) -> bool {
6419        match self {
6420            Self::Server { server_config } => server_config.migration,
6421            Self::Client { .. } => {
6422                if let Some(hs) = state.as_handshake() {
6423                    hs.allow_server_migration
6424                } else {
6425                    false
6426                }
6427            }
6428        }
6429    }
6430
6431    fn is_client(&self) -> bool {
6432        self.side().is_client()
6433    }
6434
6435    fn is_server(&self) -> bool {
6436        self.side().is_server()
6437    }
6438
6439    fn side(&self) -> Side {
6440        match *self {
6441            Self::Client { .. } => Side::Client,
6442            Self::Server { .. } => Side::Server,
6443        }
6444    }
6445}
6446
6447impl From<SideArgs> for ConnectionSide {
6448    fn from(side: SideArgs) -> Self {
6449        match side {
6450            SideArgs::Client {
6451                token_store,
6452                server_name,
6453            } => Self::Client {
6454                token: token_store.take(&server_name).unwrap_or_default(),
6455                token_store,
6456                server_name,
6457            },
6458            SideArgs::Server {
6459                server_config,
6460                pref_addr_cid: _,
6461                path_validated: _,
6462            } => Self::Server { server_config },
6463        }
6464    }
6465}
6466
6467/// Parameters to `Connection::new` specific to it being client-side or server-side
6468pub(crate) enum SideArgs {
6469    Client {
6470        token_store: Arc<dyn TokenStore>,
6471        server_name: String,
6472    },
6473    Server {
6474        server_config: Arc<ServerConfig>,
6475        pref_addr_cid: Option<ConnectionId>,
6476        path_validated: bool,
6477    },
6478}
6479
6480impl SideArgs {
6481    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6482        match *self {
6483            Self::Client { .. } => None,
6484            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6485        }
6486    }
6487
6488    pub(crate) fn path_validated(&self) -> bool {
6489        match *self {
6490            Self::Client { .. } => true,
6491            Self::Server { path_validated, .. } => path_validated,
6492        }
6493    }
6494
6495    pub(crate) fn side(&self) -> Side {
6496        match *self {
6497            Self::Client { .. } => Side::Client,
6498            Self::Server { .. } => Side::Server,
6499        }
6500    }
6501}
6502
6503/// Reasons why a connection might be lost
6504#[derive(Debug, Error, Clone, PartialEq, Eq)]
6505pub enum ConnectionError {
6506    /// The peer doesn't implement any supported version
6507    #[error("peer doesn't implement any supported version")]
6508    VersionMismatch,
6509    /// The peer violated the QUIC specification as understood by this implementation
6510    #[error(transparent)]
6511    TransportError(#[from] TransportError),
6512    /// The peer's QUIC stack aborted the connection automatically
6513    #[error("aborted by peer: {0}")]
6514    ConnectionClosed(frame::ConnectionClose),
6515    /// The peer closed the connection
6516    #[error("closed by peer: {0}")]
6517    ApplicationClosed(frame::ApplicationClose),
6518    /// The peer is unable to continue processing this connection, usually due to having restarted
6519    #[error("reset by peer")]
6520    Reset,
6521    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
6522    ///
6523    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
6524    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
6525    /// and [`TransportConfig::keep_alive_interval()`].
6526    #[error("timed out")]
6527    TimedOut,
6528    /// The local application closed the connection
6529    #[error("closed")]
6530    LocallyClosed,
6531    /// The connection could not be created because not enough of the CID space is available
6532    ///
6533    /// Try using longer connection IDs.
6534    #[error("CIDs exhausted")]
6535    CidsExhausted,
6536}
6537
6538impl From<Close> for ConnectionError {
6539    fn from(x: Close) -> Self {
6540        match x {
6541            Close::Connection(reason) => Self::ConnectionClosed(reason),
6542            Close::Application(reason) => Self::ApplicationClosed(reason),
6543        }
6544    }
6545}
6546
6547// For compatibility with API consumers
6548impl From<ConnectionError> for io::Error {
6549    fn from(x: ConnectionError) -> Self {
6550        use ConnectionError::*;
6551        let kind = match x {
6552            TimedOut => io::ErrorKind::TimedOut,
6553            Reset => io::ErrorKind::ConnectionReset,
6554            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6555            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6556                io::ErrorKind::Other
6557            }
6558        };
6559        Self::new(kind, x)
6560    }
6561}
6562
6563/// Errors that might trigger a path being closed
6564// TODO(@divma): maybe needs to be reworked based on what we want to do with the public API
6565#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6566pub enum PathError {
6567    /// The extension was not negotiated with the peer
6568    #[error("multipath extension not negotiated")]
6569    MultipathNotNegotiated,
6570    /// Paths can only be opened client-side
6571    #[error("the server side may not open a path")]
6572    ServerSideNotAllowed,
6573    /// Current limits do not allow us to open more paths
6574    #[error("maximum number of concurrent paths reached")]
6575    MaxPathIdReached,
6576    /// No remote CIDs available to open a new path
6577    #[error("remoted CIDs exhausted")]
6578    RemoteCidsExhausted,
6579    /// Path could not be validated and will be abandoned
6580    #[error("path validation failed")]
6581    ValidationFailed,
6582    /// The remote address for the path is not supported by the endpoint
6583    #[error("invalid remote address")]
6584    InvalidRemoteAddress(SocketAddr),
6585}
6586
6587/// Errors triggered when abandoning a path
6588#[derive(Debug, Error, Clone, Eq, PartialEq)]
6589pub enum ClosePathError {
6590    /// The path is already closed or was never opened
6591    #[error("closed path")]
6592    ClosedPath,
6593    /// This is the last path, which can not be abandoned
6594    #[error("last open path")]
6595    LastOpenPath,
6596}
6597
6598#[derive(Debug, Error, Clone, Copy)]
6599#[error("Multipath extension not negotiated")]
6600pub struct MultipathNotNegotiated {
6601    _private: (),
6602}
6603
6604/// Events of interest to the application
6605#[derive(Debug)]
6606pub enum Event {
6607    /// The connection's handshake data is ready
6608    HandshakeDataReady,
6609    /// The connection was successfully established
6610    Connected,
6611    /// The TLS handshake was confirmed
6612    HandshakeConfirmed,
6613    /// The connection was lost
6614    ///
6615    /// Emitted if the peer closes the connection or an error is encountered.
6616    ConnectionLost {
6617        /// Reason that the connection was closed
6618        reason: ConnectionError,
6619    },
6620    /// Stream events
6621    Stream(StreamEvent),
6622    /// One or more application datagrams have been received
6623    DatagramReceived,
6624    /// One or more application datagrams have been sent after blocking
6625    DatagramsUnblocked,
6626    /// (Multi)Path events
6627    Path(PathEvent),
6628    /// Iroh's nat traversal events
6629    NatTraversal(iroh_hp::Event),
6630}
6631
6632impl From<PathEvent> for Event {
6633    fn from(source: PathEvent) -> Self {
6634        Self::Path(source)
6635    }
6636}
6637
6638fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6639    Duration::from_micros(params.max_ack_delay.0 * 1000)
6640}
6641
6642// Prevents overflow and improves behavior in extreme circumstances
6643const MAX_BACKOFF_EXPONENT: u32 = 16;
6644
6645/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
6646///
6647/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
6648/// plus some space for frames. We only care about handshake headers because short header packets
6649/// necessarily have smaller headers, and initial packets are only ever the first packet in a
6650/// datagram (because we coalesce in ascending packet space order and the only reason to split a
6651/// packet is when packet space changes).
6652const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6653
6654/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
6655///
6656/// Excludes packet-type-specific fields such as packet number or Initial token
6657// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
6658// scid len + scid + length + pn
6659const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6660    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6661
6662/// Perform key updates this many packets before the AEAD confidentiality limit.
6663///
6664/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
6665const KEY_UPDATE_MARGIN: u64 = 10_000;
6666
6667#[derive(Default)]
6668struct SentFrames {
6669    retransmits: ThinRetransmits,
6670    /// The packet number of the largest acknowledged packet for each path
6671    largest_acked: FxHashMap<PathId, u64>,
6672    stream_frames: StreamMetaVec,
6673    /// Whether the packet contains non-retransmittable frames (like datagrams)
6674    non_retransmits: bool,
6675    /// If the datagram containing these frames should be padded to the min MTU
6676    requires_padding: bool,
6677}
6678
6679impl SentFrames {
6680    /// Returns whether the packet contains only ACKs
6681    fn is_ack_only(&self, streams: &StreamsState) -> bool {
6682        !self.largest_acked.is_empty()
6683            && !self.non_retransmits
6684            && self.stream_frames.is_empty()
6685            && self.retransmits.is_empty(streams)
6686    }
6687}
6688
6689/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
6690///
6691/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
6692///
6693/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
6694///
6695/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
6696fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6697    match (x, y) {
6698        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6699        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6700        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6701        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6702    }
6703}
6704
6705#[cfg(test)]
6706mod tests {
6707    use super::*;
6708
6709    #[test]
6710    fn negotiate_max_idle_timeout_commutative() {
6711        let test_params = [
6712            (None, None, None),
6713            (None, Some(VarInt(0)), None),
6714            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6715            (Some(VarInt(0)), Some(VarInt(0)), None),
6716            (
6717                Some(VarInt(2)),
6718                Some(VarInt(0)),
6719                Some(Duration::from_millis(2)),
6720            ),
6721            (
6722                Some(VarInt(1)),
6723                Some(VarInt(4)),
6724                Some(Duration::from_millis(1)),
6725            ),
6726        ];
6727
6728        for (left, right, result) in test_params {
6729            assert_eq!(negotiate_max_idle_timeout(left, right), result);
6730            assert_eq!(negotiate_max_idle_timeout(right, left), result);
6731        }
6732    }
6733}