iroh_quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, VecDeque, btree_map},
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    num::{NonZeroU32, NonZeroUsize},
8    ops::Not,
9    sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21    Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
22    MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
23    TransportError, TransportErrorCode, VarInt,
24    cid_generator::ConnectionIdGenerator,
25    cid_queue::CidQueue,
26    config::{ServerConfig, TransportConfig},
27    congestion::Controller,
28    connection::{
29        qlog::{QlogRecvPacket, QlogSink},
30        spaces::LostPacket,
31        timer::{ConnTimer, PathTimer},
32    },
33    crypto::{self, KeyPair, Keys, PacketKey},
34    frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
35    iroh_hp,
36    packet::{
37        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
38        PacketNumber, PartialDecode, SpaceId,
39    },
40    range_set::ArrayRangeSet,
41    shared::{
42        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
43        EndpointEvent, EndpointEventInner,
44    },
45    token::{ResetToken, Token, TokenPayload},
46    transport_parameters::TransportParameters,
47};
48
49mod ack_frequency;
50use ack_frequency::AckFrequencyState;
51
52mod assembler;
53pub use assembler::Chunk;
54
55mod cid_state;
56use cid_state::CidState;
57
58mod datagrams;
59use datagrams::DatagramState;
60pub use datagrams::{Datagrams, SendDatagramError};
61
62mod mtud;
63mod pacing;
64
65mod packet_builder;
66use packet_builder::{PacketBuilder, PadDatagram};
67
68mod packet_crypto;
69use packet_crypto::{PrevCrypto, ZeroRttCrypto};
70
71mod paths;
72pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
73use paths::{PathData, PathState};
74
75pub(crate) mod qlog;
76pub(crate) mod send_buffer;
77
78mod spaces;
79#[cfg(fuzzing)]
80pub use spaces::Retransmits;
81#[cfg(not(fuzzing))]
82use spaces::Retransmits;
83use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
84
85mod stats;
86pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
87
88mod streams;
89#[cfg(fuzzing)]
90pub use streams::StreamsState;
91#[cfg(not(fuzzing))]
92use streams::StreamsState;
93pub use streams::{
94    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
95    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
96};
97
98mod timer;
99use timer::{Timer, TimerTable};
100
101mod transmit_buf;
102use transmit_buf::TransmitBuf;
103
104mod state;
105
106#[cfg(not(fuzzing))]
107use state::State;
108#[cfg(fuzzing)]
109pub use state::State;
110use state::StateType;
111
112/// Protocol state and logic for a single QUIC connection
113///
114/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
115/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
116/// expects timeouts through various methods. A number of simple getter methods are exposed
117/// to allow callers to inspect some of the connection state.
118///
119/// `Connection` has roughly 4 types of methods:
120///
121/// - A. Simple getters, taking `&self`
122/// - B. Handlers for incoming events from the network or system, named `handle_*`.
123/// - C. State machine mutators, for incoming commands from the application. For convenience we
124///   refer to this as "performing I/O" below, however as per the design of this library none of the
125///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
126///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
127/// - D. Polling functions for outgoing events or actions for the caller to
128///   take, named `poll_*`.
129///
130/// The simplest way to use this API correctly is to call (B) and (C) whenever
131/// appropriate, then after each of those calls, as soon as feasible call all
132/// polling methods (D) and deal with their outputs appropriately, e.g. by
133/// passing it to the application or by making a system-level I/O call. You
134/// should call the polling functions in this order:
135///
136/// 1. [`poll_transmit`](Self::poll_transmit)
137/// 2. [`poll_timeout`](Self::poll_timeout)
138/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
139/// 4. [`poll`](Self::poll)
140///
141/// Currently the only actual dependency is from (2) to (1), however additional
142/// dependencies may be added in future, so the above order is recommended.
143///
144/// (A) may be called whenever desired.
145///
146/// Care should be made to ensure that the input events represent monotonically
147/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
148/// with events of the same [`Instant`] may be interleaved in any order with a
149/// call to [`handle_event`](Self::handle_event) at that same instant; however
150/// events or timeouts with different instants must not be interleaved.
151pub struct Connection {
152    endpoint_config: Arc<EndpointConfig>,
153    config: Arc<TransportConfig>,
154    rng: StdRng,
155    crypto: Box<dyn crypto::Session>,
156    /// The CID we initially chose, for use during the handshake
157    handshake_cid: ConnectionId,
158    /// The CID the peer initially chose, for use during the handshake
159    rem_handshake_cid: ConnectionId,
160    /// The [`PathData`] for each path
161    ///
162    /// This needs to be ordered because [`Connection::poll_transmit`] needs to
163    /// deterministically select the next PathId to send on.
164    // TODO(flub): well does it really? But deterministic is nice for now.
165    paths: BTreeMap<PathId, PathState>,
166    /// Incremented every time we see a new path
167    ///
168    /// Stored separately from `path.generation` to account for aborted migrations
169    path_counter: u64,
170    /// Whether MTU detection is supported in this environment
171    allow_mtud: bool,
172    state: State,
173    side: ConnectionSide,
174    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
175    zero_rtt_enabled: bool,
176    /// Set if 0-RTT is supported, then cleared when no longer needed.
177    zero_rtt_crypto: Option<ZeroRttCrypto>,
178    key_phase: bool,
179    /// How many packets are in the current key phase. Used only for `Data` space.
180    key_phase_size: u64,
181    /// Transport parameters set by the peer
182    peer_params: TransportParameters,
183    /// Source ConnectionId of the first packet received from the peer
184    orig_rem_cid: ConnectionId,
185    /// Destination ConnectionId sent by the client on the first Initial
186    initial_dst_cid: ConnectionId,
187    /// The value that the server included in the Source Connection ID field of a Retry packet, if
188    /// one was received
189    retry_src_cid: Option<ConnectionId>,
190    /// Events returned by [`Connection::poll`]
191    events: VecDeque<Event>,
192    endpoint_events: VecDeque<EndpointEventInner>,
193    /// Whether the spin bit is in use for this connection
194    spin_enabled: bool,
195    /// Outgoing spin bit state
196    spin: bool,
197    /// Packet number spaces: initial, handshake, 1-RTT
198    spaces: [PacketSpace; 3],
199    /// Highest usable [`SpaceId`]
200    highest_space: SpaceId,
201    /// 1-RTT keys used prior to a key update
202    prev_crypto: Option<PrevCrypto>,
203    /// 1-RTT keys to be used for the next key update
204    ///
205    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
206    /// spoofing key updates.
207    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
208    accepted_0rtt: bool,
209    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
210    permit_idle_reset: bool,
211    /// Negotiated idle timeout
212    idle_timeout: Option<Duration>,
213    timers: TimerTable,
214    /// Number of packets received which could not be authenticated
215    authentication_failures: u64,
216
217    //
218    // Queued non-retransmittable 1-RTT data
219    //
220    /// If the CONNECTION_CLOSE frame needs to be sent
221    close: bool,
222
223    //
224    // ACK frequency
225    //
226    ack_frequency: AckFrequencyState,
227
228    //
229    // Congestion Control
230    //
231    /// Whether the most recently received packet had an ECN codepoint set
232    receiving_ecn: bool,
233    /// Number of packets authenticated
234    total_authed_packets: u64,
235    /// Whether the last `poll_transmit` call yielded no data because there was
236    /// no outgoing application data.
237    app_limited: bool,
238
239    //
240    // ObservedAddr
241    //
242    /// Sequence number for the next observed address frame sent to the peer.
243    next_observed_addr_seq_no: VarInt,
244
245    streams: StreamsState,
246    /// Surplus remote CIDs for future use on new paths
247    ///
248    /// These are given out before multiple paths exist, also for paths that will never
249    /// exist.  So if multipath is supported the number of paths here will be higher than
250    /// the actual number of paths in use.
251    rem_cids: FxHashMap<PathId, CidQueue>,
252    /// Attributes of CIDs generated by local endpoint
253    ///
254    /// Any path that is allowed to be opened is present in this map, as well as the already
255    /// opened paths. However since CIDs are issued async by the endpoint driver via
256    /// connection events it can not be used to know if CIDs have been issued for a path or
257    /// not. See [`Connection::max_path_id_with_cids`] for this.
258    local_cid_state: FxHashMap<PathId, CidState>,
259    /// State of the unreliable datagram extension
260    datagrams: DatagramState,
261    /// Connection level statistics
262    stats: ConnectionStats,
263    /// Path level statistics
264    path_stats: FxHashMap<PathId, PathStats>,
265    /// QUIC version used for the connection.
266    version: u32,
267
268    //
269    // Multipath
270    //
271    /// Maximum number of concurrent paths
272    ///
273    /// Initially set from the [`TransportConfig::max_concurrent_multipath_paths`]. Even
274    /// when multipath is disabled this will be set to 1, it is not used in that case
275    /// though.
276    max_concurrent_paths: NonZeroU32,
277    /// Local maximum [`PathId`] to be used
278    ///
279    /// This is initially set to [`TransportConfig::get_initial_max_path_id`] when multipath
280    /// is negotiated, or to [`PathId::ZERO`] otherwise. This is essentially the value of
281    /// the highest MAX_PATH_ID frame sent.
282    ///
283    /// Any path with an ID equal or below this [`PathId`] is either:
284    ///
285    /// - Abandoned, if it is also in [`Connection::abandoned_paths`].
286    /// - Open, in this case it is present in [`Connection::paths`]
287    /// - Not yet opened, if it is in neither of these two places.
288    ///
289    /// Note that for not-yet-open there may or may not be any CIDs issued. See
290    /// [`Connection::max_path_id_with_cids`].
291    local_max_path_id: PathId,
292    /// Remote's maximum [`PathId`] to be used
293    ///
294    /// This is initially set to the peer's [`TransportParameters::initial_max_path_id`] when
295    /// multipath is negotiated, or to [`PathId::ZERO`] otherwise. A peer may increase this limit
296    /// by sending [`Frame::MaxPathId`] frames.
297    remote_max_path_id: PathId,
298    /// The greatest [`PathId`] we have issued CIDs for
299    ///
300    /// CIDs are only issued for `min(local_max_path_id, remote_max_path_id)`. It is not
301    /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
302    /// since they are issued asynchronously by the endpoint driver.
303    max_path_id_with_cids: PathId,
304    /// The paths already abandoned
305    ///
306    /// They may still have some state left in [`Connection::paths`] or
307    /// [`Connection::local_cid_state`] since some of this has to be kept around for some
308    /// time after a path is abandoned.
309    // TODO(flub): Make this a more efficient data structure.  Like ranges of abandoned
310    //    paths.  Or a set together with a minimum.  Or something.
311    abandoned_paths: FxHashSet<PathId>,
312
313    iroh_hp: iroh_hp::State,
314    qlog: QlogSink,
315}
316
317impl Connection {
318    pub(crate) fn new(
319        endpoint_config: Arc<EndpointConfig>,
320        config: Arc<TransportConfig>,
321        init_cid: ConnectionId,
322        loc_cid: ConnectionId,
323        rem_cid: ConnectionId,
324        network_path: FourTuple,
325        crypto: Box<dyn crypto::Session>,
326        cid_gen: &dyn ConnectionIdGenerator,
327        now: Instant,
328        version: u32,
329        allow_mtud: bool,
330        rng_seed: [u8; 32],
331        side_args: SideArgs,
332        qlog: QlogSink,
333    ) -> Self {
334        let pref_addr_cid = side_args.pref_addr_cid();
335        let path_validated = side_args.path_validated();
336        let connection_side = ConnectionSide::from(side_args);
337        let side = connection_side.side();
338        let mut rng = StdRng::from_seed(rng_seed);
339        let initial_space = {
340            let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
341            space.crypto = Some(crypto.initial_keys(init_cid, side));
342            space
343        };
344        let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
345        #[cfg(test)]
346        let data_space = match config.deterministic_packet_numbers {
347            true => PacketSpace::new_deterministic(now, SpaceId::Data),
348            false => PacketSpace::new(now, SpaceId::Data, &mut rng),
349        };
350        #[cfg(not(test))]
351        let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
352        let state = State::handshake(state::Handshake {
353            rem_cid_set: side.is_server(),
354            expected_token: Bytes::new(),
355            client_hello: None,
356            allow_server_migration: side.is_client(),
357        });
358        let local_cid_state = FxHashMap::from_iter([(
359            PathId::ZERO,
360            CidState::new(
361                cid_gen.cid_len(),
362                cid_gen.cid_lifetime(),
363                now,
364                if pref_addr_cid.is_some() { 2 } else { 1 },
365            ),
366        )]);
367
368        let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
369        // TODO(@divma): consider if we want to delay this until the path is validated
370        path.open = true;
371        let mut this = Self {
372            endpoint_config,
373            crypto,
374            handshake_cid: loc_cid,
375            rem_handshake_cid: rem_cid,
376            local_cid_state,
377            paths: BTreeMap::from_iter([(
378                PathId::ZERO,
379                PathState {
380                    data: path,
381                    prev: None,
382                },
383            )]),
384            path_counter: 0,
385            allow_mtud,
386            state,
387            side: connection_side,
388            zero_rtt_enabled: false,
389            zero_rtt_crypto: None,
390            key_phase: false,
391            // A small initial key phase size ensures peers that don't handle key updates correctly
392            // fail sooner rather than later. It's okay for both peers to do this, as the first one
393            // to perform an update will reset the other's key phase size in `update_keys`, and a
394            // simultaneous key update by both is just like a regular key update with a really fast
395            // response. Inspired by quic-go's similar behavior of performing the first key update
396            // at the 100th short-header packet.
397            key_phase_size: rng.random_range(10..1000),
398            peer_params: TransportParameters::default(),
399            orig_rem_cid: rem_cid,
400            initial_dst_cid: init_cid,
401            retry_src_cid: None,
402            events: VecDeque::new(),
403            endpoint_events: VecDeque::new(),
404            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
405            spin: false,
406            spaces: [initial_space, handshake_space, data_space],
407            highest_space: SpaceId::Initial,
408            prev_crypto: None,
409            next_crypto: None,
410            accepted_0rtt: false,
411            permit_idle_reset: true,
412            idle_timeout: match config.max_idle_timeout {
413                None | Some(VarInt(0)) => None,
414                Some(dur) => Some(Duration::from_millis(dur.0)),
415            },
416            timers: TimerTable::default(),
417            authentication_failures: 0,
418            close: false,
419
420            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
421                &TransportParameters::default(),
422            )),
423
424            app_limited: false,
425            receiving_ecn: false,
426            total_authed_packets: 0,
427
428            next_observed_addr_seq_no: 0u32.into(),
429
430            streams: StreamsState::new(
431                side,
432                config.max_concurrent_uni_streams,
433                config.max_concurrent_bidi_streams,
434                config.send_window,
435                config.receive_window,
436                config.stream_receive_window,
437            ),
438            datagrams: DatagramState::default(),
439            config,
440            rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
441            rng,
442            stats: ConnectionStats::default(),
443            path_stats: Default::default(),
444            version,
445
446            // peer params are not yet known, so multipath is not enabled
447            max_concurrent_paths: NonZeroU32::MIN,
448            local_max_path_id: PathId::ZERO,
449            remote_max_path_id: PathId::ZERO,
450            max_path_id_with_cids: PathId::ZERO,
451            abandoned_paths: Default::default(),
452
453            // iroh's nat traversal
454            iroh_hp: Default::default(),
455            qlog,
456        };
457        if path_validated {
458            this.on_path_validated(PathId::ZERO);
459        }
460        if side.is_client() {
461            // Kick off the connection
462            this.write_crypto();
463            this.init_0rtt(now);
464        }
465        this.qlog
466            .emit_tuple_assigned(PathId::ZERO, network_path, now);
467        this
468    }
469
470    /// Returns the next time at which `handle_timeout` should be called
471    ///
472    /// The value returned may change after:
473    /// - the application performed some I/O on the connection
474    /// - a call was made to `handle_event`
475    /// - a call to `poll_transmit` returned `Some`
476    /// - a call was made to `handle_timeout`
477    #[must_use]
478    pub fn poll_timeout(&mut self) -> Option<Instant> {
479        self.timers.peek()
480    }
481
482    /// Returns application-facing events
483    ///
484    /// Connections should be polled for events after:
485    /// - a call was made to `handle_event`
486    /// - a call was made to `handle_timeout`
487    #[must_use]
488    pub fn poll(&mut self) -> Option<Event> {
489        if let Some(x) = self.events.pop_front() {
490            return Some(x);
491        }
492
493        if let Some(event) = self.streams.poll() {
494            return Some(Event::Stream(event));
495        }
496
497        if let Some(reason) = self.state.take_error() {
498            return Some(Event::ConnectionLost { reason });
499        }
500
501        None
502    }
503
504    /// Return endpoint-facing events
505    #[must_use]
506    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
507        self.endpoint_events.pop_front().map(EndpointEvent)
508    }
509
510    /// Provide control over streams
511    #[must_use]
512    pub fn streams(&mut self) -> Streams<'_> {
513        Streams {
514            state: &mut self.streams,
515            conn_state: &self.state,
516        }
517    }
518
519    /// Provide control over streams
520    #[must_use]
521    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
522        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
523        RecvStream {
524            id,
525            state: &mut self.streams,
526            pending: &mut self.spaces[SpaceId::Data].pending,
527        }
528    }
529
530    /// Provide control over streams
531    #[must_use]
532    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
533        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
534        SendStream {
535            id,
536            state: &mut self.streams,
537            pending: &mut self.spaces[SpaceId::Data].pending,
538            conn_state: &self.state,
539        }
540    }
541
542    /// Opens a new path only if no path on the same network path currently exists.
543    ///
544    /// This comparison will use [`FourTuple::is_probably_same_path`] on the given `network_path`
545    /// and pass it existing path's network paths.
546    ///
547    /// This means that you can pass `local_ip: None` to make the comparison only compare
548    /// remote addresses.
549    ///
550    /// This avoids having to guess which local interface will be used to communicate with the
551    /// remote, should it not be known yet. We assume that if we already have a path to the remote,
552    /// the OS is likely to use the same interface to talk to said remote.
553    ///
554    /// See also [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id,
555    /// false)` if was opened.
556    ///
557    /// [`open_path`]: Connection::open_path
558    pub fn open_path_ensure(
559        &mut self,
560        network_path: FourTuple,
561        initial_status: PathStatus,
562        now: Instant,
563    ) -> Result<(PathId, bool), PathError> {
564        Ok(
565            match self
566                .paths
567                .iter()
568                .find(|(_id, path)| network_path.is_probably_same_path(&path.data.network_path))
569            {
570                Some((path_id, _state)) => (*path_id, true),
571                None => (self.open_path(network_path, initial_status, now)?, false),
572            },
573        )
574    }
575
576    /// Opens a new path
577    ///
578    /// Further errors might occur and they will be emitted in [`PathEvent::LocallyClosed`] events.
579    /// When the path is opened it will be reported as an [`PathEvent::Opened`].
580    pub fn open_path(
581        &mut self,
582        network_path: FourTuple,
583        initial_status: PathStatus,
584        now: Instant,
585    ) -> Result<PathId, PathError> {
586        if !self.is_multipath_negotiated() {
587            return Err(PathError::MultipathNotNegotiated);
588        }
589        if self.side().is_server() {
590            return Err(PathError::ServerSideNotAllowed);
591        }
592
593        let max_abandoned = self.abandoned_paths.iter().max().copied();
594        let max_used = self.paths.keys().last().copied();
595        let path_id = max_abandoned
596            .max(max_used)
597            .unwrap_or(PathId::ZERO)
598            .saturating_add(1u8);
599
600        if Some(path_id) > self.max_path_id() {
601            return Err(PathError::MaxPathIdReached);
602        }
603        if path_id > self.remote_max_path_id {
604            self.spaces[SpaceId::Data].pending.paths_blocked = true;
605            return Err(PathError::MaxPathIdReached);
606        }
607        if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
608            self.spaces[SpaceId::Data]
609                .pending
610                .path_cids_blocked
611                .push(path_id);
612            return Err(PathError::RemoteCidsExhausted);
613        }
614
615        let path = self.ensure_path(path_id, network_path, now, None);
616        path.status.local_update(initial_status);
617
618        Ok(path_id)
619    }
620
621    /// Closes a path by sending a PATH_ABANDON frame
622    ///
623    /// This will not allow closing the last path. It does allow closing paths which have
624    /// not yet been opened, as e.g. is the case when receiving a PATH_ABANDON from the peer
625    /// for a path that was never opened locally.
626    pub fn close_path(
627        &mut self,
628        now: Instant,
629        path_id: PathId,
630        error_code: VarInt,
631    ) -> Result<(), ClosePathError> {
632        if self.abandoned_paths.contains(&path_id)
633            || Some(path_id) > self.max_path_id()
634            || !self.paths.contains_key(&path_id)
635        {
636            return Err(ClosePathError::ClosedPath);
637        }
638        if self
639            .paths
640            .iter()
641            // Would there be any remaining, non-abandoned, validated paths
642            .any(|(id, path)| {
643                *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
644            })
645            .not()
646        {
647            return Err(ClosePathError::LastOpenPath);
648        }
649
650        // Send PATH_ABANDON
651        self.spaces[SpaceId::Data]
652            .pending
653            .path_abandon
654            .insert(path_id, error_code.into());
655
656        // Remove pending NEW CIDs for this path
657        let pending_space = &mut self.spaces[SpaceId::Data].pending;
658        pending_space.new_cids.retain(|cid| cid.path_id != path_id);
659        pending_space.path_cids_blocked.retain(|&id| id != path_id);
660        pending_space.path_status.retain(|&id| id != path_id);
661
662        // Cleanup retransmits across ALL paths (CIDs for path_id may have been transmitted on other paths)
663        for space in self.spaces[SpaceId::Data].iter_paths_mut() {
664            for sent_packet in space.sent_packets.values_mut() {
665                if let Some(retransmits) = sent_packet.retransmits.get_mut() {
666                    retransmits.new_cids.retain(|cid| cid.path_id != path_id);
667                    retransmits.path_cids_blocked.retain(|&id| id != path_id);
668                    retransmits.path_status.retain(|&id| id != path_id);
669                }
670            }
671        }
672
673        // Consider remotely issued CIDs as retired.
674        // Technically we don't have to do this just yet.  We only need to do this *after*
675        // the ABANDON_PATH frame is sent, allowing us to still send it on the
676        // to-be-abandoned path.  However it is recommended to send it on another path, and
677        // we do not allow abandoning the last path anyway.
678        self.rem_cids.remove(&path_id);
679        self.endpoint_events
680            .push_back(EndpointEventInner::RetireResetToken(path_id));
681
682        let pto = self.pto_max_path(SpaceId::Data, false);
683
684        let path = self.paths.get_mut(&path_id).expect("checked above");
685
686        // We record the time after which receiving data on this path generates a transport error.
687        path.data.last_allowed_receive = Some(now + 3 * pto);
688        self.abandoned_paths.insert(path_id);
689
690        self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
691
692        // The peer MUST respond with a corresponding PATH_ABANDON frame.
693        // If we receive packets on the abandoned path after 3 * PTO we trigger a transport error.
694        // In any case, we completely discard the path after 6 * PTO whether we receive a
695        // PATH_ABANDON or not.
696        self.timers.set(
697            Timer::PerPath(path_id, PathTimer::DiscardPath),
698            now + 6 * pto,
699            self.qlog.with_time(now),
700        );
701        Ok(())
702    }
703
704    /// Gets the [`PathData`] for a known [`PathId`].
705    ///
706    /// Will panic if the path_id does not reference any known path.
707    #[track_caller]
708    fn path_data(&self, path_id: PathId) -> &PathData {
709        if let Some(data) = self.paths.get(&path_id) {
710            &data.data
711        } else {
712            panic!(
713                "unknown path: {path_id}, currently known paths: {:?}",
714                self.paths.keys().collect::<Vec<_>>()
715            );
716        }
717    }
718
719    /// Gets a reference to the [`PathData`] for a [`PathId`]
720    fn path(&self, path_id: PathId) -> Option<&PathData> {
721        self.paths.get(&path_id).map(|path_state| &path_state.data)
722    }
723
724    /// Gets a mutable reference to the [`PathData`] for a [`PathId`]
725    fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
726        self.paths
727            .get_mut(&path_id)
728            .map(|path_state| &mut path_state.data)
729    }
730
731    /// Returns all known paths.
732    ///
733    /// There is no guarantee any of these paths are open or usable.
734    pub fn paths(&self) -> Vec<PathId> {
735        self.paths.keys().copied().collect()
736    }
737
738    /// Gets the local [`PathStatus`] for a known [`PathId`]
739    pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
740        self.path(path_id)
741            .map(PathData::local_status)
742            .ok_or(ClosedPath { _private: () })
743    }
744
745    /// Returns the path's network path represented as a 4-tuple.
746    pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
747        self.path(path_id)
748            .map(|path| path.network_path)
749            .ok_or(ClosedPath { _private: () })
750    }
751
752    /// Sets the [`PathStatus`] for a known [`PathId`]
753    ///
754    /// Returns the previous path status on success.
755    pub fn set_path_status(
756        &mut self,
757        path_id: PathId,
758        status: PathStatus,
759    ) -> Result<PathStatus, SetPathStatusError> {
760        if !self.is_multipath_negotiated() {
761            return Err(SetPathStatusError::MultipathNotNegotiated);
762        }
763        let path = self
764            .path_mut(path_id)
765            .ok_or(SetPathStatusError::ClosedPath)?;
766        let prev = match path.status.local_update(status) {
767            Some(prev) => {
768                self.spaces[SpaceId::Data]
769                    .pending
770                    .path_status
771                    .insert(path_id);
772                prev
773            }
774            None => path.local_status(),
775        };
776        Ok(prev)
777    }
778
779    /// Returns the remote path status
780    // TODO(flub): Probably should also be some kind of path event?  Not even sure if I like
781    //    this as an API, but for now it allows me to write a test easily.
782    // TODO(flub): Technically this should be a Result<Option<PathSTatus>>?
783    pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
784        self.path(path_id).and_then(|path| path.remote_status())
785    }
786
787    /// Sets the max_idle_timeout for a specific path
788    ///
789    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
790    ///
791    /// Returns the previous value of the setting.
792    pub fn set_path_max_idle_timeout(
793        &mut self,
794        path_id: PathId,
795        timeout: Option<Duration>,
796    ) -> Result<Option<Duration>, ClosedPath> {
797        let path = self
798            .paths
799            .get_mut(&path_id)
800            .ok_or(ClosedPath { _private: () })?;
801        Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
802    }
803
804    /// Sets the keep_alive_interval for a specific path
805    ///
806    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
807    ///
808    /// Returns the previous value of the setting.
809    pub fn set_path_keep_alive_interval(
810        &mut self,
811        path_id: PathId,
812        interval: Option<Duration>,
813    ) -> Result<Option<Duration>, ClosedPath> {
814        let path = self
815            .paths
816            .get_mut(&path_id)
817            .ok_or(ClosedPath { _private: () })?;
818        Ok(std::mem::replace(&mut path.data.keep_alive, interval))
819    }
820
821    /// Gets the [`PathData`] for a known [`PathId`].
822    ///
823    /// Will panic if the path_id does not reference any known path.
824    #[track_caller]
825    fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
826        &mut self.paths.get_mut(&path_id).expect("known path").data
827    }
828
829    /// Find an open, validated path that's on the same network path as the given network path.
830    ///
831    /// Returns the first path matching, even if there's multiple.
832    fn find_validated_path_on_network_path(
833        &self,
834        network_path: FourTuple,
835    ) -> Option<(&PathId, &PathState)> {
836        self.paths.iter().find(|(path_id, path_state)| {
837            path_state.data.validated
838                // Would this use the same network path, if network_path were used to send right now?
839                && network_path.is_probably_same_path(&path_state.data.network_path)
840                && !self.abandoned_paths.contains(path_id)
841        })
842        // TODO(@divma): we might want to ensure the path has been recently active to consider the
843        // address validated
844        // matheus23: Perhaps looking at !self.abandoned_paths.contains(path_id) is enough, given keep-alives?
845    }
846
847    fn ensure_path(
848        &mut self,
849        path_id: PathId,
850        network_path: FourTuple,
851        now: Instant,
852        pn: Option<u64>,
853    ) -> &mut PathData {
854        let valid_path = self.find_validated_path_on_network_path(network_path);
855        let validated = valid_path.is_some();
856        let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
857        let vacant_entry = match self.paths.entry(path_id) {
858            btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
859            btree_map::Entry::Occupied(occupied_entry) => {
860                return &mut occupied_entry.into_mut().data;
861            }
862        };
863
864        debug!(%validated, %path_id, %network_path, "path added");
865        let peer_max_udp_payload_size =
866            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
867        self.path_counter = self.path_counter.wrapping_add(1);
868        let mut data = PathData::new(
869            network_path,
870            self.allow_mtud,
871            Some(peer_max_udp_payload_size),
872            self.path_counter,
873            now,
874            &self.config,
875        );
876
877        data.validated = validated;
878        if let Some(initial_rtt) = initial_rtt {
879            data.rtt.reset_initial_rtt(initial_rtt);
880        }
881
882        let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
883        self.timers.set(
884            Timer::PerPath(path_id, PathTimer::PathOpen),
885            now + 3 * pto,
886            self.qlog.with_time(now),
887        );
888
889        // for the path to be opened we need to send a packet on the path. Sending a challenge
890        // guarantees this
891        data.send_new_challenge = true;
892
893        let path = vacant_entry.insert(PathState { data, prev: None });
894
895        let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
896        if let Some(pn) = pn {
897            pn_space.dedup.insert(pn);
898        }
899        self.spaces[SpaceId::Data]
900            .number_spaces
901            .insert(path_id, pn_space);
902        self.qlog.emit_tuple_assigned(path_id, network_path, now);
903        &mut path.data
904    }
905
906    /// Returns packets to transmit
907    ///
908    /// Connections should be polled for transmit after:
909    /// - the application performed some I/O on the connection
910    /// - a call was made to `handle_event`
911    /// - a call was made to `handle_timeout`
912    ///
913    /// `max_datagrams` specifies how many datagrams can be returned inside a
914    /// single Transmit using GSO. This must be at least 1.
915    #[must_use]
916    pub fn poll_transmit(
917        &mut self,
918        now: Instant,
919        max_datagrams: NonZeroUsize,
920        buf: &mut Vec<u8>,
921    ) -> Option<Transmit> {
922        if let Some(probing) = self
923            .iroh_hp
924            .server_side_mut()
925            .ok()
926            .and_then(iroh_hp::ServerState::next_probe)
927        {
928            let destination = probing.remote();
929            trace!(%destination, "RAND_DATA packet");
930            let token: u64 = self.rng.random();
931            buf.put_u64(token);
932            probing.finish(token);
933            return Some(Transmit {
934                destination,
935                ecn: None,
936                size: 8,
937                segment_size: None,
938                src_ip: None,
939            });
940        }
941
942        let max_datagrams = match self.config.enable_segmentation_offload {
943            false => NonZeroUsize::MIN,
944            true => max_datagrams,
945        };
946
947        // Each call to poll_transmit can only send datagrams to one destination, because
948        // all datagrams in a GSO batch are for the same destination.  Therefore only
949        // datagrams for one Path ID are produced for each poll_transmit call.
950
951        // TODO(flub): this is wishful thinking and not actually implemented, but perhaps it
952        //   should be:
953
954        // First, if we have to send a close, select a path for that.
955        // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending.
956
957        // For all, open, validated and AVAILABLE paths:
958        // - Is the path congestion blocked or pacing blocked?
959        // - call maybe_queue_ to ensure a tail-loss probe would be sent?
960        // - do we need to send a close message?
961        // - call can_send
962        // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths
963
964        // Check whether we need to send a close message
965        let close = match self.state.as_type() {
966            StateType::Drained => {
967                self.app_limited = true;
968                return None;
969            }
970            StateType::Draining | StateType::Closed => {
971                // self.close is only reset once the associated packet had been
972                // encoded successfully
973                if !self.close {
974                    self.app_limited = true;
975                    return None;
976                }
977                true
978            }
979            _ => false,
980        };
981
982        // Check whether we need to send an ACK_FREQUENCY frame
983        if let Some(config) = &self.config.ack_frequency_config {
984            let rtt = self
985                .paths
986                .values()
987                .map(|p| p.data.rtt.get())
988                .min()
989                .expect("one path exists");
990            self.spaces[SpaceId::Data].pending.ack_frequency = self
991                .ack_frequency
992                .should_send_ack_frequency(rtt, config, &self.peer_params)
993                && self.highest_space == SpaceId::Data
994                && self.peer_supports_ack_frequency();
995        }
996
997        // Whether this packet can be coalesced with another one in the same datagram.
998        let mut coalesce = true;
999
1000        // Whether the last packet in the datagram must be padded so the datagram takes up
1001        // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller.
1002        let mut pad_datagram = PadDatagram::No;
1003
1004        // Whether congestion control stopped the next packet from being sent. Further
1005        // packets could still be built, as e.g. tail-loss probes are not congestion
1006        // limited.
1007        let mut congestion_blocked = false;
1008
1009        // The packet number of the last built packet.
1010        let mut last_packet_number = None;
1011
1012        let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
1013
1014        // If there is any open, validated and available path we only want to send frames to
1015        // any backup path that must be sent on that backup path exclusively.
1016        let have_available_path = self.paths.iter().any(|(id, path)| {
1017            path.data.validated
1018                && path.data.local_status() == PathStatus::Available
1019                && self.rem_cids.contains_key(id)
1020        });
1021
1022        // Setup for the first path_id
1023        let mut transmit = TransmitBuf::new(
1024            buf,
1025            max_datagrams,
1026            self.path_data(path_id).current_mtu().into(),
1027        );
1028        if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1029            return Some(challenge);
1030        }
1031        let mut space_id = match path_id {
1032            PathId::ZERO => SpaceId::Initial,
1033            _ => SpaceId::Data,
1034        };
1035
1036        loop {
1037            // check if there is at least one active CID to use for sending
1038            let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1039                let err = PathError::RemoteCidsExhausted;
1040                if !self.abandoned_paths.contains(&path_id) {
1041                    debug!(?err, %path_id, "no active CID for path");
1042                    self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1043                        id: path_id,
1044                        error: err,
1045                    }));
1046                    // Locally we should have refused to open this path, the remote should
1047                    // have given us CIDs for this path before opening it.  So we can always
1048                    // abandon this here.
1049                    self.close_path(
1050                        now,
1051                        path_id,
1052                        TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1053                    )
1054                    .ok();
1055                    self.spaces[SpaceId::Data]
1056                        .pending
1057                        .path_cids_blocked
1058                        .push(path_id);
1059                } else {
1060                    trace!(%path_id, "remote CIDs retired for abandoned path");
1061                }
1062
1063                match self.paths.keys().find(|&&next| next > path_id) {
1064                    Some(next_path_id) => {
1065                        // See if this next path can send anything.
1066                        path_id = *next_path_id;
1067                        space_id = SpaceId::Data;
1068
1069                        // update per path state
1070                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1071                        if let Some(challenge) =
1072                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1073                        {
1074                            return Some(challenge);
1075                        }
1076
1077                        continue;
1078                    }
1079                    None => {
1080                        // Nothing more to send.
1081                        trace!(
1082                            ?space_id,
1083                            %path_id,
1084                            "no CIDs to send on path, no more paths"
1085                        );
1086                        break;
1087                    }
1088                }
1089            };
1090
1091            // Determine if anything can be sent in this packet number space (SpaceId +
1092            // PathId).
1093            let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1094                // We are trying to coalesce another packet into this datagram.
1095                transmit.datagram_remaining_mut()
1096            } else {
1097                // A new datagram needs to be started.
1098                transmit.segment_size()
1099            };
1100            let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1101            let path_should_send = {
1102                let path_exclusive_only = space_id == SpaceId::Data
1103                    && have_available_path
1104                    && self.path_data(path_id).local_status() == PathStatus::Backup;
1105                let path_should_send = if path_exclusive_only {
1106                    can_send.path_exclusive
1107                } else {
1108                    !can_send.is_empty()
1109                };
1110                let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1111                path_should_send || needs_loss_probe
1112            };
1113
1114            if !path_should_send && space_id < SpaceId::Data {
1115                if self.spaces[space_id].crypto.is_some() {
1116                    trace!(?space_id, %path_id, "nothing to send in space");
1117                }
1118                space_id = space_id.next();
1119                continue;
1120            }
1121
1122            let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1123                // Only check congestion control if a new datagram is needed.
1124                self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1125            } else {
1126                PathBlocked::No
1127            };
1128            if send_blocked != PathBlocked::No {
1129                trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1130                congestion_blocked = true;
1131            }
1132            if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1133                // Higher spaces might still have tail-loss probes to send, which are not
1134                // congestion blocked.
1135                space_id = space_id.next();
1136                continue;
1137            }
1138            if !path_should_send || send_blocked != PathBlocked::No {
1139                // Nothing more to send on this path, check the next path if possible.
1140
1141                // If there are any datagrams in the transmit, packets for another path can
1142                // not be built.
1143                if transmit.num_datagrams() > 0 {
1144                    break;
1145                }
1146
1147                match self.paths.keys().find(|&&next| next > path_id) {
1148                    Some(next_path_id) => {
1149                        // See if this next path can send anything.
1150                        trace!(
1151                            ?space_id,
1152                            %path_id,
1153                            %next_path_id,
1154                            "nothing to send on path"
1155                        );
1156                        path_id = *next_path_id;
1157                        space_id = SpaceId::Data;
1158
1159                        // update per path state
1160                        transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1161                        if let Some(challenge) =
1162                            self.send_prev_path_challenge(now, &mut transmit, path_id)
1163                        {
1164                            return Some(challenge);
1165                        }
1166
1167                        continue;
1168                    }
1169                    None => {
1170                        // Nothing more to send.
1171                        trace!(
1172                            ?space_id,
1173                            %path_id,
1174                            next_path_id=?None::<PathId>,
1175                            "nothing to send on path"
1176                        );
1177                        break;
1178                    }
1179                }
1180            }
1181
1182            // If the datagram is full, we need to start a new one.
1183            if transmit.datagram_remaining_mut() == 0 {
1184                if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1185                    // No more datagrams allowed
1186                    break;
1187                }
1188
1189                match self.spaces[space_id].for_path(path_id).loss_probes {
1190                    0 => transmit.start_new_datagram(),
1191                    _ => {
1192                        // We need something to send for a tail-loss probe.
1193                        let request_immediate_ack =
1194                            space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1195                        self.spaces[space_id].maybe_queue_probe(
1196                            path_id,
1197                            request_immediate_ack,
1198                            &self.streams,
1199                        );
1200
1201                        self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1202
1203                        // Clamp the datagram to at most the minimum MTU to ensure that loss
1204                        // probes can get through and enable recovery even if the path MTU
1205                        // has shrank unexpectedly.
1206                        transmit.start_new_datagram_with_size(std::cmp::min(
1207                            usize::from(INITIAL_MTU),
1208                            transmit.segment_size(),
1209                        ));
1210                    }
1211                }
1212                trace!(count = transmit.num_datagrams(), "new datagram started");
1213                coalesce = true;
1214                pad_datagram = PadDatagram::No;
1215            }
1216
1217            // If coalescing another packet into the existing datagram, there should
1218            // still be enough space for a whole packet.
1219            if transmit.datagram_start_offset() < transmit.len() {
1220                debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1221            }
1222
1223            //
1224            // From here on, we've determined that a packet will definitely be sent.
1225            //
1226
1227            if self.spaces[SpaceId::Initial].crypto.is_some()
1228                && space_id == SpaceId::Handshake
1229                && self.side.is_client()
1230            {
1231                // A client stops both sending and processing Initial packets when it
1232                // sends its first Handshake packet.
1233                self.discard_space(now, SpaceId::Initial);
1234            }
1235            if let Some(ref mut prev) = self.prev_crypto {
1236                prev.update_unacked = false;
1237            }
1238
1239            let mut builder = PacketBuilder::new(
1240                now,
1241                space_id,
1242                path_id,
1243                remote_cid,
1244                &mut transmit,
1245                can_send.other,
1246                self,
1247            )?;
1248            last_packet_number = Some(builder.exact_number);
1249            coalesce = coalesce && !builder.short_header;
1250
1251            if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1252                // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1
1253                pad_datagram |= PadDatagram::ToMinMtu;
1254            }
1255            if space_id == SpaceId::Data && self.config.pad_to_mtu {
1256                pad_datagram |= PadDatagram::ToSegmentSize;
1257            }
1258
1259            if can_send.close {
1260                trace!("sending CONNECTION_CLOSE");
1261                // Encode ACKs before the ConnectionClose message, to give the receiver
1262                // a better approximate on what data has been processed. This is
1263                // especially important with ack delay, since the peer might not
1264                // have gotten any other ACK for the data earlier on.
1265                let is_multipath_negotiated = self.is_multipath_negotiated();
1266                for path_id in self.spaces[space_id]
1267                    .number_spaces
1268                    .iter()
1269                    .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1270                    .map(|(&path_id, _)| path_id)
1271                    .collect::<Vec<_>>()
1272                {
1273                    Self::populate_acks(
1274                        now,
1275                        self.receiving_ecn,
1276                        path_id,
1277                        space_id,
1278                        &mut self.spaces[space_id],
1279                        is_multipath_negotiated,
1280                        &mut builder,
1281                        &mut self.stats.frame_tx,
1282                    );
1283                }
1284
1285                // Since there only 64 ACK frames there will always be enough space
1286                // to encode the ConnectionClose frame too. However we still have the
1287                // check here to prevent crashes if something changes.
1288                debug_assert!(
1289                    builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1290                    "ACKs should leave space for ConnectionClose"
1291                );
1292                let stats = &mut self.stats.frame_tx;
1293                if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1294                    let max_frame_size = builder.frame_space_remaining();
1295                    let close: Close = match self.state.as_type() {
1296                        StateType::Closed => {
1297                            let reason: Close =
1298                                self.state.as_closed().expect("checked").clone().into();
1299                            if space_id == SpaceId::Data || reason.is_transport_layer() {
1300                                reason
1301                            } else {
1302                                TransportError::APPLICATION_ERROR("").into()
1303                            }
1304                        }
1305                        StateType::Draining => TransportError::NO_ERROR("").into(),
1306                        _ => unreachable!(
1307                            "tried to make a close packet when the connection wasn't closed"
1308                        ),
1309                    };
1310                    builder.encode(close.encoder(max_frame_size), stats);
1311                }
1312                builder.finish_and_track(now, self, path_id, pad_datagram);
1313                if space_id == self.highest_space {
1314                    // Don't send another close packet. Even with multipath we only send
1315                    // CONNECTION_CLOSE on a single path since we expect our paths to work.
1316                    self.close = false;
1317                    // `CONNECTION_CLOSE` is the final packet
1318                    break;
1319                } else {
1320                    // Send a close frame in every possible space for robustness, per
1321                    // RFC9000 "Immediate Close during the Handshake". Don't bother trying
1322                    // to send anything else.
1323                    space_id = space_id.next();
1324                    continue;
1325                }
1326            }
1327
1328            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that
1329            // path validation can occur while the link is saturated.
1330            if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1331                let path = self.path_data_mut(path_id);
1332                if let Some((token, network_path)) =
1333                    path.path_responses.pop_off_path(path.network_path)
1334                {
1335                    // TODO(flub): We need to use the right CID!  We shouldn't use the same
1336                    //    CID as the current active one for the path.  Though see also
1337                    //    https://github.com/quinn-rs/quinn/issues/2184
1338                    let response = frame::PathResponse(token);
1339                    trace!(%response, "(off-path)");
1340                    builder.encode(response, &mut self.stats.frame_tx);
1341                    builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
1342                    self.stats.udp_tx.on_sent(1, transmit.len());
1343                    return Some(Transmit {
1344                        destination: network_path.remote,
1345                        size: transmit.len(),
1346                        ecn: None,
1347                        segment_size: None,
1348                        src_ip: network_path.local_ip,
1349                    });
1350                }
1351            }
1352
1353            let path_exclusive_only =
1354                have_available_path && self.path_data(path_id).local_status() == PathStatus::Backup;
1355            self.populate_packet(now, space_id, path_id, path_exclusive_only, &mut builder);
1356
1357            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
1358            // any other reason, there is a bug which leads to one component announcing write
1359            // readiness while not writing any data. This degrades performance. The condition is
1360            // only checked if the full MTU is available and when potentially large fixed-size
1361            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
1362            // writing ACKs.
1363            debug_assert!(
1364                !(builder.sent_frames().is_ack_only(&self.streams)
1365                    && !can_send.acks
1366                    && can_send.other
1367                    && builder.buf.segment_size()
1368                        == self.path_data(path_id).current_mtu() as usize
1369                    && self.datagrams.outgoing.is_empty()),
1370                "SendableFrames was {can_send:?}, but only ACKs have been written"
1371            );
1372            if builder.sent_frames().requires_padding {
1373                pad_datagram |= PadDatagram::ToMinMtu;
1374            }
1375
1376            for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1377                self.spaces[space_id]
1378                    .for_path(*path_id)
1379                    .pending_acks
1380                    .acks_sent();
1381                self.timers.stop(
1382                    Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1383                    self.qlog.with_time(now),
1384                );
1385            }
1386
1387            // Now we need to finish the packet.  Before we do so we need to know if we will
1388            // be coalescing the next packet into this one, or will be ending the datagram
1389            // as well.  Because if this is the last packet in the datagram more padding
1390            // might be needed because of the packet type, or to fill the GSO segment size.
1391
1392            // Are we allowed to coalesce AND is there enough space for another *packet* in
1393            // this datagram AND is there another packet to send in this or the next space?
1394            if coalesce
1395                && builder
1396                    .buf
1397                    .datagram_remaining_mut()
1398                    .saturating_sub(builder.predict_packet_end())
1399                    > MIN_PACKET_SPACE
1400                && self
1401                    .next_send_space(space_id, path_id, builder.buf, close)
1402                    .is_some()
1403            {
1404                // We can append/coalesce the next packet into the current
1405                // datagram. Finish the current packet without adding extra padding.
1406                builder.finish_and_track(now, self, path_id, PadDatagram::No);
1407            } else {
1408                // We need a new datagram for the next packet.  Finish the current
1409                // packet with padding.
1410                if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1411                    // If too many padding bytes would be required to continue the
1412                    // GSO batch after this packet, end the GSO batch here. Ensures
1413                    // that fixed-size frames with heterogeneous sizes
1414                    // (e.g. application datagrams) won't inadvertently waste large
1415                    // amounts of bandwidth. The exact threshold is a bit arbitrary
1416                    // and might benefit from further tuning, though there's no
1417                    // universally optimal value.
1418                    const MAX_PADDING: usize = 32;
1419                    if builder.buf.datagram_remaining_mut()
1420                        > builder.predict_packet_end() + MAX_PADDING
1421                    {
1422                        trace!(
1423                            "GSO truncated by demand for {} padding bytes",
1424                            builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1425                        );
1426                        builder.finish_and_track(now, self, path_id, PadDatagram::No);
1427                        break;
1428                    }
1429
1430                    // Pad the current datagram to GSO segment size so it can be
1431                    // included in the GSO batch.
1432                    builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1433                } else {
1434                    builder.finish_and_track(now, self, path_id, pad_datagram);
1435                }
1436                if transmit.num_datagrams() == 1 {
1437                    transmit.clip_datagram_size();
1438                }
1439            }
1440        }
1441
1442        if let Some(last_packet_number) = last_packet_number {
1443            // Note that when sending in multiple packet spaces the last packet number will
1444            // be the one from the highest packet space.
1445            self.path_data_mut(path_id).congestion.on_sent(
1446                now,
1447                transmit.len() as u64,
1448                last_packet_number,
1449            );
1450        }
1451
1452        self.qlog.emit_recovery_metrics(
1453            path_id,
1454            &mut self.paths.get_mut(&path_id).unwrap().data,
1455            now,
1456        );
1457
1458        self.app_limited = transmit.is_empty() && !congestion_blocked;
1459
1460        // Send MTU probe if necessary
1461        if transmit.is_empty() && self.state.is_established() {
1462            // MTU probing happens only in Data space.
1463            let space_id = SpaceId::Data;
1464            path_id = *self.paths.first_key_value().expect("one path must exist").0;
1465            let probe_data = loop {
1466                // We MTU probe all paths for which all of the following is true:
1467                // - We have an active destination CID for the path.
1468                // - The remote address *and* path are validated.
1469                // - The path is not abandoned.
1470                // - The MTU Discovery subsystem wants to probe the path.
1471                let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1472                let eligible = self.path_data(path_id).validated
1473                    && !self.path_data(path_id).is_validating_path()
1474                    && !self.abandoned_paths.contains(&path_id);
1475                let probe_size = eligible
1476                    .then(|| {
1477                        let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1478                        self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1479                    })
1480                    .flatten();
1481                match (active_cid, probe_size) {
1482                    (Some(active_cid), Some(probe_size)) => {
1483                        // Let's send an MTUD probe!
1484                        break Some((active_cid, probe_size));
1485                    }
1486                    _ => {
1487                        // Find the next path to check if it needs an MTUD probe.
1488                        match self.paths.keys().find(|&&next| next > path_id) {
1489                            Some(next) => {
1490                                path_id = *next;
1491                                continue;
1492                            }
1493                            None => break None,
1494                        }
1495                    }
1496                }
1497            };
1498            if let Some((active_cid, probe_size)) = probe_data {
1499                // We are definitely sending a DPLPMTUD probe.
1500                debug_assert_eq!(transmit.num_datagrams(), 0);
1501                transmit.start_new_datagram_with_size(probe_size as usize);
1502
1503                let mut builder = PacketBuilder::new(
1504                    now,
1505                    space_id,
1506                    path_id,
1507                    active_cid,
1508                    &mut transmit,
1509                    true,
1510                    self,
1511                )?;
1512
1513                // We implement MTU probes as ping packets padded up to the probe size
1514                trace!(?probe_size, "writing MTUD probe");
1515                trace!("PING");
1516                builder.encode(frame::Ping, &mut self.stats.frame_tx);
1517
1518                // If supported by the peer, we want no delays to the probe's ACK
1519                if self.peer_supports_ack_frequency() {
1520                    trace!("IMMEDIATE_ACK");
1521                    builder.encode(frame::ImmediateAck, &mut self.stats.frame_tx);
1522                }
1523
1524                builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1525
1526                self.path_stats
1527                    .entry(path_id)
1528                    .or_default()
1529                    .sent_plpmtud_probes += 1;
1530            }
1531        }
1532
1533        if transmit.is_empty() {
1534            return None;
1535        }
1536
1537        let network_path = self.path_data(path_id).network_path;
1538        trace!(
1539            segment_size = transmit.segment_size(),
1540            last_datagram_len = transmit.len() % transmit.segment_size(),
1541            %network_path,
1542            "sending {} bytes in {} datagrams",
1543            transmit.len(),
1544            transmit.num_datagrams()
1545        );
1546        self.path_data_mut(path_id)
1547            .inc_total_sent(transmit.len() as u64);
1548
1549        self.stats
1550            .udp_tx
1551            .on_sent(transmit.num_datagrams() as u64, transmit.len());
1552
1553        Some(Transmit {
1554            destination: network_path.remote,
1555            size: transmit.len(),
1556            ecn: if self.path_data(path_id).sending_ecn {
1557                Some(EcnCodepoint::Ect0)
1558            } else {
1559                None
1560            },
1561            segment_size: match transmit.num_datagrams() {
1562                1 => None,
1563                _ => Some(transmit.segment_size()),
1564            },
1565            src_ip: network_path.local_ip,
1566        })
1567    }
1568
1569    /// Returns the [`SpaceId`] of the next packet space which has data to send
1570    ///
1571    /// This takes into account the space available to frames in the next datagram.
1572    // TODO(flub): This duplication is not nice.
1573    fn next_send_space(
1574        &mut self,
1575        current_space_id: SpaceId,
1576        path_id: PathId,
1577        buf: &TransmitBuf<'_>,
1578        close: bool,
1579    ) -> Option<SpaceId> {
1580        // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed
1581        // to be able to send an individual frame at least this large in the next 1-RTT
1582        // packet. This could be generalized to support every space, but it's only needed to
1583        // handle large fixed-size frames, which only exist in 1-RTT (application
1584        // datagrams). We don't account for coalesced packets potentially occupying space
1585        // because frames can always spill into the next datagram.
1586        let mut space_id = current_space_id;
1587        loop {
1588            let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1589            if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1590                return Some(space_id);
1591            }
1592            space_id = match space_id {
1593                SpaceId::Initial => SpaceId::Handshake,
1594                SpaceId::Handshake => SpaceId::Data,
1595                SpaceId::Data => break,
1596            }
1597        }
1598        None
1599    }
1600
1601    /// Checks if creating a new datagram would be blocked by congestion control
1602    fn path_congestion_check(
1603        &mut self,
1604        space_id: SpaceId,
1605        path_id: PathId,
1606        transmit: &TransmitBuf<'_>,
1607        can_send: &SendableFrames,
1608        now: Instant,
1609    ) -> PathBlocked {
1610        // Anti-amplification is only based on `total_sent`, which gets updated after
1611        // the transmit is sent. Therefore we pass the amount of bytes for datagrams
1612        // that are already created, as well as 1 byte for starting another datagram. If
1613        // there is any anti-amplification budget left, we always allow a full MTU to be
1614        // sent (see https://github.com/quinn-rs/quinn/issues/1082).
1615        if self.side().is_server()
1616            && self
1617                .path_data(path_id)
1618                .anti_amplification_blocked(transmit.len() as u64 + 1)
1619        {
1620            trace!(?space_id, %path_id, "blocked by anti-amplification");
1621            return PathBlocked::AntiAmplification;
1622        }
1623
1624        // Congestion control check.
1625        // Tail loss probes must not be blocked by congestion, or a deadlock could arise.
1626        let bytes_to_send = transmit.segment_size() as u64;
1627        let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1628
1629        if can_send.other && !need_loss_probe && !can_send.close {
1630            let path = self.path_data(path_id);
1631            if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1632                trace!(?space_id, %path_id, "blocked by congestion control");
1633                return PathBlocked::Congestion;
1634            }
1635        }
1636
1637        // Pacing check.
1638        if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1639            self.timers.set(
1640                Timer::PerPath(path_id, PathTimer::Pacing),
1641                delay,
1642                self.qlog.with_time(now),
1643            );
1644            // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though
1645            // they are not congestion controlled.
1646            trace!(?space_id, %path_id, "blocked by pacing");
1647            return PathBlocked::Pacing;
1648        }
1649
1650        PathBlocked::No
1651    }
1652
1653    /// Send PATH_CHALLENGE for a previous path if necessary
1654    ///
1655    /// QUIC-TRANSPORT section 9.3.3
1656    /// <https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding>
1657    fn send_prev_path_challenge(
1658        &mut self,
1659        now: Instant,
1660        buf: &mut TransmitBuf<'_>,
1661        path_id: PathId,
1662    ) -> Option<Transmit> {
1663        let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1664        // TODO (matheus23): We could use !prev_path.is_validating() here instead to
1665        // (possibly) also re-send challenges when they get lost.
1666        if !prev_path.send_new_challenge {
1667            return None;
1668        };
1669        prev_path.send_new_challenge = false;
1670        let network_path = prev_path.network_path;
1671        let token = self.rng.random();
1672        let info = paths::SentChallengeInfo {
1673            sent_instant: now,
1674            network_path,
1675        };
1676        prev_path.challenges_sent.insert(token, info);
1677        debug_assert_eq!(
1678            self.highest_space,
1679            SpaceId::Data,
1680            "PATH_CHALLENGE queued without 1-RTT keys"
1681        );
1682        buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1683
1684        // Use the previous CID to avoid linking the new path with the previous path. We
1685        // don't bother accounting for possible retirement of that prev_cid because this is
1686        // sent once, immediately after migration, when the CID is known to be valid. Even
1687        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1688        // this is sent first.
1689        debug_assert_eq!(buf.datagram_start_offset(), 0);
1690        let mut builder =
1691            PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, false, self)?;
1692        let challenge = frame::PathChallenge(token);
1693        trace!(%challenge, "validating previous path");
1694        builder.encode(challenge, &mut self.stats.frame_tx);
1695
1696        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1697        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1698        // unless the anti-amplification limit for the path does not permit
1699        // sending a datagram of this size
1700        builder.pad_to(MIN_INITIAL_SIZE);
1701
1702        builder.finish(self, now);
1703        self.stats.udp_tx.on_sent(1, buf.len());
1704
1705        Some(Transmit {
1706            destination: network_path.remote,
1707            size: buf.len(),
1708            ecn: None,
1709            segment_size: None,
1710            src_ip: network_path.local_ip,
1711        })
1712    }
1713
1714    /// Indicate what types of frames are ready to send for the given space
1715    ///
1716    /// *packet_size* is the number of bytes available to build the next packet.  *close*
1717    /// *indicates whether a CONNECTION_CLOSE frame needs to be sent.
1718    fn space_can_send(
1719        &mut self,
1720        space_id: SpaceId,
1721        path_id: PathId,
1722        packet_size: usize,
1723        close: bool,
1724    ) -> SendableFrames {
1725        let pn = self.spaces[SpaceId::Data]
1726            .for_path(path_id)
1727            .peek_tx_number();
1728        let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1729        if self.spaces[space_id].crypto.is_none()
1730            && (space_id != SpaceId::Data
1731                || self.zero_rtt_crypto.is_none()
1732                || self.side.is_server())
1733        {
1734            // No keys available for this space
1735            return SendableFrames::empty();
1736        }
1737        let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1738        if space_id == SpaceId::Data {
1739            can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1740        }
1741
1742        can_send.close = close && self.spaces[space_id].crypto.is_some();
1743
1744        can_send
1745    }
1746
1747    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1748    ///
1749    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1750    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1751    /// extracted through the relevant methods.
1752    pub fn handle_event(&mut self, event: ConnectionEvent) {
1753        use ConnectionEventInner::*;
1754        match event.0 {
1755            Datagram(DatagramConnectionEvent {
1756                now,
1757                network_path,
1758                path_id,
1759                ecn,
1760                first_decode,
1761                remaining,
1762            }) => {
1763                let span = trace_span!("pkt", %path_id);
1764                let _guard = span.enter();
1765
1766                if self.update_network_path_or_discard(network_path, path_id) {
1767                    // A return value of true indicates we should discard this packet.
1768                    return;
1769                }
1770
1771                let was_anti_amplification_blocked = self
1772                    .path(path_id)
1773                    .map(|path| path.anti_amplification_blocked(1))
1774                    .unwrap_or(true); // if we don't know about this path it's eagerly considered as unvalidated
1775                // TODO(@divma): revisit this
1776
1777                self.stats.udp_rx.datagrams += 1;
1778                self.stats.udp_rx.bytes += first_decode.len() as u64;
1779                let data_len = first_decode.len();
1780
1781                self.handle_decode(now, network_path, path_id, ecn, first_decode);
1782                // The current `path` might have changed inside `handle_decode` since the packet
1783                // could have triggered a migration. The packet might also belong to an unknown
1784                // path and have been rejected. Make sure the data received is accounted for the
1785                // most recent path by accessing `path` after `handle_decode`.
1786                if let Some(path) = self.path_mut(path_id) {
1787                    path.inc_total_recvd(data_len as u64);
1788                }
1789
1790                if let Some(data) = remaining {
1791                    self.stats.udp_rx.bytes += data.len() as u64;
1792                    self.handle_coalesced(now, network_path, path_id, ecn, data);
1793                }
1794
1795                if let Some(path) = self.paths.get_mut(&path_id) {
1796                    self.qlog
1797                        .emit_recovery_metrics(path_id, &mut path.data, now);
1798                }
1799
1800                if was_anti_amplification_blocked {
1801                    // A prior attempt to set the loss detection timer may have failed due to
1802                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1803                    // the server's first flight is lost.
1804                    self.set_loss_detection_timer(now, path_id);
1805                }
1806            }
1807            NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1808                let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1809                debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1810                let cid_state = self
1811                    .local_cid_state
1812                    .entry(path_id)
1813                    .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1814                cid_state.new_cids(&ids, now);
1815
1816                ids.into_iter().rev().for_each(|frame| {
1817                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1818                });
1819                // Always update Timer::PushNewCid
1820                self.reset_cid_retirement(now);
1821            }
1822        }
1823    }
1824
1825    /// Updates the network path for `path_id`.
1826    ///
1827    /// Returns true if a packet coming in for this `path_id` over given `network_path` should be discarded.
1828    /// Returns false if the path was updated and the packet doesn't need to be discarded.
1829    fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
1830        let remote_may_migrate = self.side.remote_may_migrate(&self.state);
1831        let local_ip_may_migrate = self.side.is_client();
1832        // If this packet could initiate a migration and we're a client or a server that
1833        // forbids migration, drop the datagram. This could be relaxed to heuristically
1834        // permit NAT-rebinding-like migration.
1835        if let Some(known_path) = self.path_mut(path_id) {
1836            if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
1837                trace!(
1838                    %path_id,
1839                    %network_path,
1840                    %known_path.network_path,
1841                    "discarding packet from unrecognized peer"
1842                );
1843                return true;
1844            }
1845
1846            if known_path.network_path.local_ip.is_some()
1847                && network_path.local_ip.is_some()
1848                && known_path.network_path.local_ip != network_path.local_ip
1849                && !local_ip_may_migrate
1850            {
1851                trace!(
1852                    %path_id,
1853                    %network_path,
1854                    %known_path.network_path,
1855                    "discarding packet sent to incorrect interface"
1856                );
1857                return true;
1858            }
1859            // If the datagram indicates that we've changed our local IP, we update it.
1860            // This is alluded to in Section 5.2 of the Multipath RFC draft 18:
1861            // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#name-using-multiple-paths-on-the
1862            // > Client receives the packet, recognizes a path migration, updates the source address of path 2 to 192.0.2.1.
1863            if let Some(local_ip) = network_path.local_ip {
1864                if known_path
1865                    .network_path
1866                    .local_ip
1867                    .is_some_and(|ip| ip != local_ip)
1868                {
1869                    debug!(
1870                        %path_id,
1871                        %network_path,
1872                        %known_path.network_path,
1873                        "path's local address seemingly migrated"
1874                    );
1875                }
1876                // We update the address without path validation on the client side.
1877                // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#section-5.1
1878                // > Servers observing a 4-tuple change will perform path validation (see Section 9 of [QUIC-TRANSPORT]).
1879                // This sounds like it's *only* the server endpoints that do this.
1880                // TODO(matheus23): We should still consider doing a proper migration on the client side in the future.
1881                // For now, this preserves the behavior of this code pre 4-tuple tracking.
1882                known_path.network_path.local_ip = Some(local_ip);
1883            }
1884        }
1885        false
1886    }
1887
1888    /// Process timer expirations
1889    ///
1890    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1891    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1892    /// methods.
1893    ///
1894    /// It is most efficient to call this immediately after the system clock reaches the latest
1895    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1896    /// no-op and therefore are safe.
1897    pub fn handle_timeout(&mut self, now: Instant) {
1898        while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1899            // TODO(@divma): remove `at` when the unicorn is born
1900            trace!(?timer, at=?now, "timeout");
1901            match timer {
1902                Timer::Conn(timer) => match timer {
1903                    ConnTimer::Close => {
1904                        self.state.move_to_drained(None);
1905                        self.endpoint_events.push_back(EndpointEventInner::Drained);
1906                    }
1907                    ConnTimer::Idle => {
1908                        self.kill(ConnectionError::TimedOut);
1909                    }
1910                    ConnTimer::KeepAlive => {
1911                        trace!("sending keep-alive");
1912                        self.ping();
1913                    }
1914                    ConnTimer::KeyDiscard => {
1915                        self.zero_rtt_crypto = None;
1916                        self.prev_crypto = None;
1917                    }
1918                    ConnTimer::PushNewCid => {
1919                        while let Some((path_id, when)) = self.next_cid_retirement() {
1920                            if when > now {
1921                                break;
1922                            }
1923                            match self.local_cid_state.get_mut(&path_id) {
1924                                None => error!(%path_id, "No local CID state for path"),
1925                                Some(cid_state) => {
1926                                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1927                                    let num_new_cid = cid_state.on_cid_timeout().into();
1928                                    if !self.state.is_closed() {
1929                                        trace!(
1930                                            "push a new CID to peer RETIRE_PRIOR_TO field {}",
1931                                            cid_state.retire_prior_to()
1932                                        );
1933                                        self.endpoint_events.push_back(
1934                                            EndpointEventInner::NeedIdentifiers(
1935                                                path_id,
1936                                                now,
1937                                                num_new_cid,
1938                                            ),
1939                                        );
1940                                    }
1941                                }
1942                            }
1943                        }
1944                    }
1945                },
1946                // TODO: add path_id as span somehow
1947                Timer::PerPath(path_id, timer) => {
1948                    let span = trace_span!("per-path timer fired", %path_id, ?timer);
1949                    let _guard = span.enter();
1950                    match timer {
1951                        PathTimer::PathIdle => {
1952                            self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1953                                .ok();
1954                        }
1955
1956                        PathTimer::PathKeepAlive => {
1957                            trace!("sending keep-alive on path");
1958                            self.ping_path(path_id).ok();
1959                        }
1960                        PathTimer::LossDetection => {
1961                            self.on_loss_detection_timeout(now, path_id);
1962                            self.qlog.emit_recovery_metrics(
1963                                path_id,
1964                                &mut self.paths.get_mut(&path_id).unwrap().data,
1965                                now,
1966                            );
1967                        }
1968                        PathTimer::PathValidation => {
1969                            let Some(path) = self.paths.get_mut(&path_id) else {
1970                                continue;
1971                            };
1972                            self.timers.stop(
1973                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1974                                self.qlog.with_time(now),
1975                            );
1976                            debug!("path validation failed");
1977                            if let Some((_, prev)) = path.prev.take() {
1978                                path.data = prev;
1979                            }
1980                            path.data.challenges_sent.clear();
1981                            path.data.send_new_challenge = false;
1982                        }
1983                        PathTimer::PathChallengeLost => {
1984                            let Some(path) = self.paths.get_mut(&path_id) else {
1985                                continue;
1986                            };
1987                            trace!("path challenge deemed lost");
1988                            path.data.send_new_challenge = true;
1989                        }
1990                        PathTimer::PathOpen => {
1991                            let Some(path) = self.paths.get_mut(&path_id) else {
1992                                continue;
1993                            };
1994                            path.data.challenges_sent.clear();
1995                            path.data.send_new_challenge = false;
1996                            self.timers.stop(
1997                                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1998                                self.qlog.with_time(now),
1999                            );
2000                            debug!("new path validation failed");
2001                            if let Err(err) = self.close_path(
2002                                now,
2003                                path_id,
2004                                TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2005                            ) {
2006                                warn!(?err, "failed closing path");
2007                            }
2008
2009                            self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2010                                id: path_id,
2011                                error: PathError::ValidationFailed,
2012                            }));
2013                        }
2014                        PathTimer::Pacing => trace!("pacing timer expired"),
2015                        PathTimer::MaxAckDelay => {
2016                            trace!("max ack delay reached");
2017                            // This timer is only armed in the Data space
2018                            self.spaces[SpaceId::Data]
2019                                .for_path(path_id)
2020                                .pending_acks
2021                                .on_max_ack_delay_timeout()
2022                        }
2023                        PathTimer::DiscardPath => {
2024                            // The path was abandoned and 3*PTO has expired since.  Clean up all
2025                            // remaining state and install stateless reset token.
2026                            self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2027                            if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2028                                let (min_seq, max_seq) = loc_cid_state.active_seq();
2029                                for seq in min_seq..=max_seq {
2030                                    self.endpoint_events.push_back(
2031                                        EndpointEventInner::RetireConnectionId(
2032                                            now, path_id, seq, false,
2033                                        ),
2034                                    );
2035                                }
2036                            }
2037                            self.discard_path(path_id, now);
2038                        }
2039                    }
2040                }
2041            }
2042        }
2043    }
2044
2045    /// Close a connection immediately
2046    ///
2047    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
2048    /// call this only when all important communications have been completed, e.g. by calling
2049    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
2050    /// [`StreamEvent::Finished`] event.
2051    ///
2052    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
2053    /// delivered. There may still be data from the peer that has not been received.
2054    ///
2055    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
2056    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2057        self.close_inner(
2058            now,
2059            Close::Application(frame::ApplicationClose { error_code, reason }),
2060        )
2061    }
2062
2063    fn close_inner(&mut self, now: Instant, reason: Close) {
2064        let was_closed = self.state.is_closed();
2065        if !was_closed {
2066            self.close_common();
2067            self.set_close_timer(now);
2068            self.close = true;
2069            self.state.move_to_closed_local(reason);
2070        }
2071    }
2072
2073    /// Control datagrams
2074    pub fn datagrams(&mut self) -> Datagrams<'_> {
2075        Datagrams { conn: self }
2076    }
2077
2078    /// Returns connection statistics
2079    pub fn stats(&mut self) -> ConnectionStats {
2080        self.stats.clone()
2081    }
2082
2083    /// Returns path statistics
2084    pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2085        let path = self.paths.get(&path_id)?;
2086        let stats = self.path_stats.entry(path_id).or_default();
2087        stats.rtt = path.data.rtt.get();
2088        stats.cwnd = path.data.congestion.window();
2089        stats.current_mtu = path.data.mtud.current_mtu();
2090        Some(*stats)
2091    }
2092
2093    /// Ping the remote endpoint
2094    ///
2095    /// Causes an ACK-eliciting packet to be transmitted on the connection.
2096    pub fn ping(&mut self) {
2097        // TODO(flub): This is very brute-force: it pings *all* the paths.  Instead it would
2098        //    be nice if we could only send a single packet for this.
2099        for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2100            path_data.ping_pending = true;
2101        }
2102    }
2103
2104    /// Ping the remote endpoint over a specific path
2105    ///
2106    /// Causes an ACK-eliciting packet to be transmitted on the path.
2107    pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2108        let path_data = self.spaces[self.highest_space]
2109            .number_spaces
2110            .get_mut(&path)
2111            .ok_or(ClosedPath { _private: () })?;
2112        path_data.ping_pending = true;
2113        Ok(())
2114    }
2115
2116    /// Update traffic keys spontaneously
2117    ///
2118    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
2119    pub fn force_key_update(&mut self) {
2120        if !self.state.is_established() {
2121            debug!("ignoring forced key update in illegal state");
2122            return;
2123        }
2124        if self.prev_crypto.is_some() {
2125            // We already just updated, or are currently updating, the keys. Concurrent key updates
2126            // are illegal.
2127            debug!("ignoring redundant forced key update");
2128            return;
2129        }
2130        self.update_keys(None, false);
2131    }
2132
2133    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
2134    #[doc(hidden)]
2135    #[deprecated]
2136    pub fn initiate_key_update(&mut self) {
2137        self.force_key_update();
2138    }
2139
2140    /// Get a session reference
2141    pub fn crypto_session(&self) -> &dyn crypto::Session {
2142        &*self.crypto
2143    }
2144
2145    /// Whether the connection is in the process of being established
2146    ///
2147    /// If this returns `false`, the connection may be either established or closed, signaled by the
2148    /// emission of a `Connected` or `ConnectionLost` message respectively.
2149    pub fn is_handshaking(&self) -> bool {
2150        self.state.is_handshake()
2151    }
2152
2153    /// Whether the connection is closed
2154    ///
2155    /// Closed connections cannot transport any further data. A connection becomes closed when
2156    /// either peer application intentionally closes it, or when either transport layer detects an
2157    /// error such as a time-out or certificate validation failure.
2158    ///
2159    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
2160    pub fn is_closed(&self) -> bool {
2161        self.state.is_closed()
2162    }
2163
2164    /// Whether there is no longer any need to keep the connection around
2165    ///
2166    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
2167    /// packets from the peer. All drained connections have been closed.
2168    pub fn is_drained(&self) -> bool {
2169        self.state.is_drained()
2170    }
2171
2172    /// For clients, if the peer accepted the 0-RTT data packets
2173    ///
2174    /// The value is meaningless until after the handshake completes.
2175    pub fn accepted_0rtt(&self) -> bool {
2176        self.accepted_0rtt
2177    }
2178
2179    /// Whether 0-RTT is/was possible during the handshake
2180    pub fn has_0rtt(&self) -> bool {
2181        self.zero_rtt_enabled
2182    }
2183
2184    /// Whether there are any pending retransmits
2185    pub fn has_pending_retransmits(&self) -> bool {
2186        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2187    }
2188
2189    /// Look up whether we're the client or server of this Connection
2190    pub fn side(&self) -> Side {
2191        self.side.side()
2192    }
2193
2194    /// Get the address observed by the remote over the given path
2195    pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2196        self.path(path_id)
2197            .map(|path_data| {
2198                path_data
2199                    .last_observed_addr_report
2200                    .as_ref()
2201                    .map(|observed| observed.socket_addr())
2202            })
2203            .ok_or(ClosedPath { _private: () })
2204    }
2205
2206    /// Current best estimate of this connection's latency (round-trip-time)
2207    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2208        self.path(path_id).map(|d| d.rtt.get())
2209    }
2210
2211    /// Current state of this connection's congestion controller, for debugging purposes
2212    pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2213        self.path(path_id).map(|d| d.congestion.as_ref())
2214    }
2215
2216    /// Modify the number of remotely initiated streams that may be concurrently open
2217    ///
2218    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
2219    /// `count`s increase both minimum and worst-case memory consumption.
2220    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2221        self.streams.set_max_concurrent(dir, count);
2222        // If the limit was reduced, then a flow control update previously deemed insignificant may
2223        // now be significant.
2224        let pending = &mut self.spaces[SpaceId::Data].pending;
2225        self.streams.queue_max_stream_id(pending);
2226    }
2227
2228    /// Modify the number of open paths allowed when multipath is enabled
2229    ///
2230    /// When reducing the number of concurrent paths this will only affect delaying sending
2231    /// new MAX_PATH_ID frames until fewer than this number of paths are possible.  To
2232    /// actively reduce paths they must be closed using [`Connection::close_path`], which
2233    /// can also be used to close not-yet-opened paths.
2234    ///
2235    /// If multipath is not negotiated (see the [`TransportConfig`]) this can not enable
2236    /// multipath and will fail.
2237    pub fn set_max_concurrent_paths(
2238        &mut self,
2239        now: Instant,
2240        count: NonZeroU32,
2241    ) -> Result<(), MultipathNotNegotiated> {
2242        if !self.is_multipath_negotiated() {
2243            return Err(MultipathNotNegotiated { _private: () });
2244        }
2245        self.max_concurrent_paths = count;
2246
2247        let in_use_count = self
2248            .local_max_path_id
2249            .next()
2250            .saturating_sub(self.abandoned_paths.len() as u32)
2251            .as_u32();
2252        let extra_needed = count.get().saturating_sub(in_use_count);
2253        let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2254
2255        self.set_max_path_id(now, new_max_path_id);
2256
2257        Ok(())
2258    }
2259
2260    /// If needed, issues a new MAX_PATH_ID frame and new CIDs for any newly allowed paths
2261    fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2262        if max_path_id <= self.local_max_path_id {
2263            return;
2264        }
2265
2266        self.local_max_path_id = max_path_id;
2267        self.spaces[SpaceId::Data].pending.max_path_id = true;
2268
2269        self.issue_first_path_cids(now);
2270    }
2271
2272    /// Current number of remotely initiated streams that may be concurrently open
2273    ///
2274    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
2275    /// it will not change immediately, even if fewer streams are open. Instead, it will
2276    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
2277    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2278        self.streams.max_concurrent(dir)
2279    }
2280
2281    /// See [`TransportConfig::send_window()`]
2282    pub fn set_send_window(&mut self, send_window: u64) {
2283        self.streams.set_send_window(send_window);
2284    }
2285
2286    /// See [`TransportConfig::receive_window()`]
2287    pub fn set_receive_window(&mut self, receive_window: VarInt) {
2288        if self.streams.set_receive_window(receive_window) {
2289            self.spaces[SpaceId::Data].pending.max_data = true;
2290        }
2291    }
2292
2293    /// Whether the Multipath for QUIC extension is enabled.
2294    ///
2295    /// Multipath is only enabled after the handshake is completed and if it was enabled by both
2296    /// peers.
2297    pub fn is_multipath_negotiated(&self) -> bool {
2298        !self.is_handshaking()
2299            && self.config.max_concurrent_multipath_paths.is_some()
2300            && self.peer_params.initial_max_path_id.is_some()
2301    }
2302
2303    fn on_ack_received(
2304        &mut self,
2305        now: Instant,
2306        space: SpaceId,
2307        ack: frame::Ack,
2308    ) -> Result<(), TransportError> {
2309        // All ACKs are referencing path 0
2310        let path = PathId::ZERO;
2311        self.inner_on_ack_received(now, space, path, ack)
2312    }
2313
2314    fn on_path_ack_received(
2315        &mut self,
2316        now: Instant,
2317        space: SpaceId,
2318        path_ack: frame::PathAck,
2319    ) -> Result<(), TransportError> {
2320        let (ack, path) = path_ack.into_ack();
2321        self.inner_on_ack_received(now, space, path, ack)
2322    }
2323
2324    /// Handles an ACK frame acknowledging packets sent on *path*.
2325    fn inner_on_ack_received(
2326        &mut self,
2327        now: Instant,
2328        space: SpaceId,
2329        path: PathId,
2330        ack: frame::Ack,
2331    ) -> Result<(), TransportError> {
2332        if self.abandoned_paths.contains(&path) {
2333            // See also https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.4.3-3
2334            // > PATH_ACK frames received with an abandoned path ID are silently ignored, as specified in Section 4.
2335            trace!("silently ignoring PATH_ACK on abandoned path");
2336            return Ok(());
2337        }
2338        if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2339            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2340        }
2341        let new_largest = {
2342            let space = &mut self.spaces[space].for_path(path);
2343            if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2344                space.largest_acked_packet = Some(ack.largest);
2345                if let Some(info) = space.sent_packets.get(ack.largest) {
2346                    // This should always succeed, but a misbehaving peer might ACK a packet we
2347                    // haven't sent. At worst, that will result in us spuriously reducing the
2348                    // congestion window.
2349                    space.largest_acked_packet_sent = info.time_sent;
2350                }
2351                true
2352            } else {
2353                false
2354            }
2355        };
2356
2357        if self.detect_spurious_loss(&ack, space, path) {
2358            self.path_data_mut(path)
2359                .congestion
2360                .on_spurious_congestion_event();
2361        }
2362
2363        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
2364        let mut newly_acked = ArrayRangeSet::new();
2365        for range in ack.iter() {
2366            self.spaces[space].for_path(path).check_ack(range.clone())?;
2367            for (pn, _) in self.spaces[space]
2368                .for_path(path)
2369                .sent_packets
2370                .iter_range(range)
2371            {
2372                newly_acked.insert_one(pn);
2373            }
2374        }
2375
2376        if newly_acked.is_empty() {
2377            return Ok(());
2378        }
2379
2380        let mut ack_eliciting_acked = false;
2381        for packet in newly_acked.elts() {
2382            if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2383                for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2384                    // Assume ACKs for all packets below the largest acknowledged in
2385                    // `packet` have been received. This can cause the peer to spuriously
2386                    // retransmit if some of our earlier ACKs were lost, but allows for
2387                    // simpler state tracking. See discussion at
2388                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
2389                    if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2390                        pns.pending_acks.subtract_below(*acked_pn);
2391                    }
2392                }
2393                ack_eliciting_acked |= info.ack_eliciting;
2394
2395                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
2396                let path_data = self.path_data_mut(path);
2397                let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2398                if mtu_updated {
2399                    path_data
2400                        .congestion
2401                        .on_mtu_update(path_data.mtud.current_mtu());
2402                }
2403
2404                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
2405                self.ack_frequency.on_acked(path, packet);
2406
2407                self.on_packet_acked(now, path, info);
2408            }
2409        }
2410
2411        let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2412        let app_limited = self.app_limited;
2413        let path_data = self.path_data_mut(path);
2414        let in_flight = path_data.in_flight.bytes;
2415
2416        path_data
2417            .congestion
2418            .on_end_acks(now, in_flight, app_limited, largest_ackd);
2419
2420        if new_largest && ack_eliciting_acked {
2421            let ack_delay = if space != SpaceId::Data {
2422                Duration::from_micros(0)
2423            } else {
2424                cmp::min(
2425                    self.ack_frequency.peer_max_ack_delay,
2426                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2427                )
2428            };
2429            let rtt = now.saturating_duration_since(
2430                self.spaces[space].for_path(path).largest_acked_packet_sent,
2431            );
2432
2433            let next_pn = self.spaces[space].for_path(path).next_packet_number;
2434            let path_data = self.path_data_mut(path);
2435            // TODO(@divma): should be a method of path, should be contained in a single place
2436            path_data.rtt.update(ack_delay, rtt);
2437            if path_data.first_packet_after_rtt_sample.is_none() {
2438                path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2439            }
2440        }
2441
2442        // Must be called before crypto/pto_count are clobbered
2443        self.detect_lost_packets(now, space, path, true);
2444
2445        if self.peer_completed_address_validation(path) {
2446            self.path_data_mut(path).pto_count = 0;
2447        }
2448
2449        // Explicit congestion notification
2450        // TODO(@divma): this code is a good example of logic that should be contained in a single
2451        // place but it's split between the path data and the packet number space data, we should
2452        // find a way to make this work without two lookups
2453        if self.path_data(path).sending_ecn {
2454            if let Some(ecn) = ack.ecn {
2455                // We only examine ECN counters from ACKs that we are certain we received in transmit
2456                // order, allowing us to compute an increase in ECN counts to compare against the number
2457                // of newly acked packets that remains well-defined in the presence of arbitrary packet
2458                // reordering.
2459                if new_largest {
2460                    let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2461                    self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2462                }
2463            } else {
2464                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
2465                debug!("ECN not acknowledged by peer");
2466                self.path_data_mut(path).sending_ecn = false;
2467            }
2468        }
2469
2470        self.set_loss_detection_timer(now, path);
2471        Ok(())
2472    }
2473
2474    fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2475        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2476
2477        if lost_packets.is_empty() {
2478            return false;
2479        }
2480
2481        for range in ack.iter() {
2482            let spurious_losses: Vec<u64> = lost_packets
2483                .iter_range(range.clone())
2484                .map(|(pn, _info)| pn)
2485                .collect();
2486
2487            for pn in spurious_losses {
2488                lost_packets.remove(pn);
2489            }
2490        }
2491
2492        // If this ACK frame acknowledged all deemed lost packets,
2493        // then we have raised a spurious congestion event in the past.
2494        // We cannot conclude when there are remaining packets,
2495        // but future ACK frames might indicate a spurious loss detection.
2496        lost_packets.is_empty()
2497    }
2498
2499    /// Drain lost packets that we reasonably think will never arrive
2500    ///
2501    /// The current criterion is copied from `msquic`:
2502    /// discard packets that were sent earlier than 2 probe timeouts ago.
2503    fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2504        let two_pto = 2 * self.path_data(path).rtt.pto_base();
2505
2506        let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2507        lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2508    }
2509
2510    /// Process a new ECN block from an in-order ACK
2511    fn process_ecn(
2512        &mut self,
2513        now: Instant,
2514        space: SpaceId,
2515        path: PathId,
2516        newly_acked: u64,
2517        ecn: frame::EcnCounts,
2518        largest_sent_time: Instant,
2519    ) {
2520        match self.spaces[space]
2521            .for_path(path)
2522            .detect_ecn(newly_acked, ecn)
2523        {
2524            Err(e) => {
2525                debug!("halting ECN due to verification failure: {}", e);
2526
2527                self.path_data_mut(path).sending_ecn = false;
2528                // Wipe out the existing value because it might be garbage and could interfere with
2529                // future attempts to use ECN on new paths.
2530                self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2531            }
2532            Ok(false) => {}
2533            Ok(true) => {
2534                self.path_stats.entry(path).or_default().congestion_events += 1;
2535                self.path_data_mut(path).congestion.on_congestion_event(
2536                    now,
2537                    largest_sent_time,
2538                    false,
2539                    true,
2540                    0,
2541                );
2542            }
2543        }
2544    }
2545
2546    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
2547    // high-latency handshakes
2548    fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2549        self.paths
2550            .get_mut(&path_id)
2551            .expect("known path")
2552            .remove_in_flight(&info);
2553        let app_limited = self.app_limited;
2554        let path = self.path_data_mut(path_id);
2555        if info.ack_eliciting && !path.is_validating_path() {
2556            // Only pass ACKs to the congestion controller if we are not validating the current
2557            // path, so as to ignore any ACKs from older paths still coming in.
2558            let rtt = path.rtt;
2559            path.congestion
2560                .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2561        }
2562
2563        // Update state for confirmed delivery of frames
2564        if let Some(retransmits) = info.retransmits.get() {
2565            for (id, _) in retransmits.reset_stream.iter() {
2566                self.streams.reset_acked(*id);
2567            }
2568        }
2569
2570        for frame in info.stream_frames {
2571            self.streams.received_ack_of(frame);
2572        }
2573    }
2574
2575    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2576        let start = if self.zero_rtt_crypto.is_some() {
2577            now
2578        } else {
2579            self.prev_crypto
2580                .as_ref()
2581                .expect("no previous keys")
2582                .end_packet
2583                .as_ref()
2584                .expect("update not acknowledged yet")
2585                .1
2586        };
2587
2588        // QUIC-MULTIPATH § 2.5 Key Phase Update Process: use largest PTO off all paths.
2589        self.timers.set(
2590            Timer::Conn(ConnTimer::KeyDiscard),
2591            start + self.pto_max_path(space, false) * 3,
2592            self.qlog.with_time(now),
2593        );
2594    }
2595
2596    /// Handle a [`PathTimer::LossDetection`] timeout.
2597    ///
2598    /// This timer expires for two reasons:
2599    /// - An ACK-eliciting packet we sent should be considered lost.
2600    /// - The PTO may have expired and a tail-loss probe needs to be scheduled.
2601    ///
2602    /// The former needs us to schedule re-transmission of the lost data.
2603    ///
2604    /// The latter means we have not received an ACK for an ack-eliciting packet we sent
2605    /// within the PTO time-window. We need to schedule a tail-loss probe, an ack-eliciting
2606    /// packet, to try and elicit new acknowledgements. These new acknowledgements will
2607    /// indicate whether the previously sent packets were lost or not.
2608    fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2609        if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2610            // Time threshold loss Detection
2611            self.detect_lost_packets(now, pn_space, path_id, false);
2612            self.set_loss_detection_timer(now, path_id);
2613            return;
2614        }
2615
2616        let (_, space) = match self.pto_time_and_space(now, path_id) {
2617            Some(x) => x,
2618            None => {
2619                error!(%path_id, "PTO expired while unset");
2620                return;
2621            }
2622        };
2623        trace!(
2624            in_flight = self.path_data(path_id).in_flight.bytes,
2625            count = self.path_data(path_id).pto_count,
2626            ?space,
2627            %path_id,
2628            "PTO fired"
2629        );
2630
2631        let count = match self.path_data(path_id).in_flight.ack_eliciting {
2632            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
2633            // deadlock preventions
2634            0 => {
2635                debug_assert!(!self.peer_completed_address_validation(path_id));
2636                1
2637            }
2638            // Conventional loss probe
2639            _ => 2,
2640        };
2641        let pns = self.spaces[space].for_path(path_id);
2642        pns.loss_probes = pns.loss_probes.saturating_add(count);
2643        let path_data = self.path_data_mut(path_id);
2644        path_data.pto_count = path_data.pto_count.saturating_add(1);
2645        self.set_loss_detection_timer(now, path_id);
2646    }
2647
2648    /// Detect any lost packets
2649    ///
2650    /// There are two cases in which we detects lost packets:
2651    ///
2652    /// - We received an ACK packet.
2653    /// - The [`PathTimer::LossDetection`] timer expired. So there is an un-acknowledged packet
2654    ///   that was followed by an acknowledged packet. The loss timer for this
2655    ///   un-acknowledged packet expired and we need to detect that packet as lost.
2656    ///
2657    /// Packets are lost if they are both (See RFC9002 §6.1):
2658    ///
2659    /// - Unacknowledged, in flight and sent prior to an acknowledged packet.
2660    /// - Old enough by either:
2661    ///   - Having a packet number [`TransportConfig::packet_threshold`] lower then the last
2662    ///     acknowledged packet.
2663    ///   - Being sent [`TransportConfig::time_threshold`] * RTT in the past.
2664    fn detect_lost_packets(
2665        &mut self,
2666        now: Instant,
2667        pn_space: SpaceId,
2668        path_id: PathId,
2669        due_to_ack: bool,
2670    ) {
2671        let mut lost_packets = Vec::<u64>::new();
2672        let mut lost_mtu_probe = None;
2673        let mut in_persistent_congestion = false;
2674        let mut size_of_lost_packets = 0u64;
2675        self.spaces[pn_space].for_path(path_id).loss_time = None;
2676
2677        // Find all the lost packets, populating all variables initialised above.
2678
2679        let path = self.path_data(path_id);
2680        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2681        let loss_delay = path
2682            .rtt
2683            .conservative()
2684            .mul_f32(self.config.time_threshold)
2685            .max(TIMER_GRANULARITY);
2686        let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2687
2688        let largest_acked_packet = self.spaces[pn_space]
2689            .for_path(path_id)
2690            .largest_acked_packet
2691            .expect("detect_lost_packets only to be called if path received at least one ACK");
2692        let packet_threshold = self.config.packet_threshold as u64;
2693
2694        // InPersistentCongestion: Determine if all packets in the time period before the newest
2695        // lost packet, including the edges, are marked lost. PTO computation must always
2696        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
2697        let congestion_period = self
2698            .pto(SpaceId::Data, path_id)
2699            .saturating_mul(self.config.persistent_congestion_threshold);
2700        let mut persistent_congestion_start: Option<Instant> = None;
2701        let mut prev_packet = None;
2702        let space = self.spaces[pn_space].for_path(path_id);
2703
2704        for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2705            if prev_packet != Some(packet.wrapping_sub(1)) {
2706                // An intervening packet was acknowledged
2707                persistent_congestion_start = None;
2708            }
2709
2710            // Packets sent before now - loss_delay are deemed lost.
2711            // However, we avoid subtraction as it can panic and there's no
2712            // saturating equivalent of this subtraction operation with a Duration.
2713            let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2714            if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2715                // The packet should be declared lost.
2716                if Some(packet) == in_flight_mtu_probe {
2717                    // Lost MTU probes are not included in `lost_packets`, because they
2718                    // should not trigger a congestion control response
2719                    lost_mtu_probe = in_flight_mtu_probe;
2720                } else {
2721                    lost_packets.push(packet);
2722                    size_of_lost_packets += info.size as u64;
2723                    if info.ack_eliciting && due_to_ack {
2724                        match persistent_congestion_start {
2725                            // Two ACK-eliciting packets lost more than
2726                            // congestion_period apart, with no ACKed packets in between
2727                            Some(start) if info.time_sent - start > congestion_period => {
2728                                in_persistent_congestion = true;
2729                            }
2730                            // Persistent congestion must start after the first RTT sample
2731                            None if first_packet_after_rtt_sample
2732                                .is_some_and(|x| x < (pn_space, packet)) =>
2733                            {
2734                                persistent_congestion_start = Some(info.time_sent);
2735                            }
2736                            _ => {}
2737                        }
2738                    }
2739                }
2740            } else {
2741                // The packet should not yet be declared lost.
2742                if space.loss_time.is_none() {
2743                    // Since we iterate in order the lowest packet number's loss time will
2744                    // always be the earliest.
2745                    space.loss_time = Some(info.time_sent + loss_delay);
2746                }
2747                persistent_congestion_start = None;
2748            }
2749
2750            prev_packet = Some(packet);
2751        }
2752
2753        self.handle_lost_packets(
2754            pn_space,
2755            path_id,
2756            now,
2757            lost_packets,
2758            lost_mtu_probe,
2759            loss_delay,
2760            in_persistent_congestion,
2761            size_of_lost_packets,
2762        );
2763    }
2764
2765    /// Drops the path state, declaring any remaining in-flight packets as lost
2766    fn discard_path(&mut self, path_id: PathId, now: Instant) {
2767        trace!(%path_id, "dropping path state");
2768        let path = self.path_data(path_id);
2769        let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2770
2771        let mut size_of_lost_packets = 0u64; // add to path_stats.lost_bytes;
2772        let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2773            .for_path(path_id)
2774            .sent_packets
2775            .iter()
2776            .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2777            .map(|(pn, info)| {
2778                size_of_lost_packets += info.size as u64;
2779                pn
2780            })
2781            .collect();
2782
2783        if !lost_pns.is_empty() {
2784            trace!(
2785                %path_id,
2786                count = lost_pns.len(),
2787                lost_bytes = size_of_lost_packets,
2788                "packets lost on path abandon"
2789            );
2790            self.handle_lost_packets(
2791                SpaceId::Data,
2792                path_id,
2793                now,
2794                lost_pns,
2795                in_flight_mtu_probe,
2796                Duration::ZERO,
2797                false,
2798                size_of_lost_packets,
2799            );
2800        }
2801        self.paths.remove(&path_id);
2802        self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2803
2804        let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2805        self.events.push_back(
2806            PathEvent::Abandoned {
2807                id: path_id,
2808                path_stats,
2809            }
2810            .into(),
2811        );
2812    }
2813
2814    fn handle_lost_packets(
2815        &mut self,
2816        pn_space: SpaceId,
2817        path_id: PathId,
2818        now: Instant,
2819        lost_packets: Vec<u64>,
2820        lost_mtu_probe: Option<u64>,
2821        loss_delay: Duration,
2822        in_persistent_congestion: bool,
2823        size_of_lost_packets: u64,
2824    ) {
2825        debug_assert!(
2826            {
2827                let mut sorted = lost_packets.clone();
2828                sorted.sort();
2829                sorted == lost_packets
2830            },
2831            "lost_packets must be sorted"
2832        );
2833
2834        self.drain_lost_packets(now, pn_space, path_id);
2835
2836        // OnPacketsLost
2837        if let Some(largest_lost) = lost_packets.last().cloned() {
2838            let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2839            let largest_lost_sent = self.spaces[pn_space]
2840                .for_path(path_id)
2841                .sent_packets
2842                .get(largest_lost)
2843                .unwrap()
2844                .time_sent;
2845            let path_stats = self.path_stats.entry(path_id).or_default();
2846            path_stats.lost_packets += lost_packets.len() as u64;
2847            path_stats.lost_bytes += size_of_lost_packets;
2848            trace!(
2849                %path_id,
2850                count = lost_packets.len(),
2851                lost_bytes = size_of_lost_packets,
2852                "packets lost",
2853            );
2854
2855            for &packet in &lost_packets {
2856                let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2857                    continue;
2858                };
2859                self.qlog
2860                    .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2861                self.paths
2862                    .get_mut(&path_id)
2863                    .unwrap()
2864                    .remove_in_flight(&info);
2865
2866                for frame in info.stream_frames {
2867                    self.streams.retransmit(frame);
2868                }
2869                self.spaces[pn_space].pending |= info.retransmits;
2870                self.path_data_mut(path_id)
2871                    .mtud
2872                    .on_non_probe_lost(packet, info.size);
2873
2874                self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2875                    packet,
2876                    LostPacket {
2877                        time_sent: info.time_sent,
2878                    },
2879                );
2880            }
2881
2882            let path = self.path_data_mut(path_id);
2883            if path.mtud.black_hole_detected(now) {
2884                path.congestion.on_mtu_update(path.mtud.current_mtu());
2885                if let Some(max_datagram_size) = self.datagrams().max_size() {
2886                    self.datagrams.drop_oversized(max_datagram_size);
2887                }
2888                self.path_stats
2889                    .entry(path_id)
2890                    .or_default()
2891                    .black_holes_detected += 1;
2892            }
2893
2894            // Don't apply congestion penalty for lost ack-only packets
2895            let lost_ack_eliciting =
2896                old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2897
2898            if lost_ack_eliciting {
2899                self.path_stats
2900                    .entry(path_id)
2901                    .or_default()
2902                    .congestion_events += 1;
2903                self.path_data_mut(path_id).congestion.on_congestion_event(
2904                    now,
2905                    largest_lost_sent,
2906                    in_persistent_congestion,
2907                    false,
2908                    size_of_lost_packets,
2909                );
2910            }
2911        }
2912
2913        // Handle a lost MTU probe
2914        if let Some(packet) = lost_mtu_probe {
2915            let info = self.spaces[SpaceId::Data]
2916                .for_path(path_id)
2917                .take(packet)
2918                .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and
2919            // therefore must not have been removed yet
2920            self.paths
2921                .get_mut(&path_id)
2922                .unwrap()
2923                .remove_in_flight(&info);
2924            self.path_data_mut(path_id).mtud.on_probe_lost();
2925            self.path_stats
2926                .entry(path_id)
2927                .or_default()
2928                .lost_plpmtud_probes += 1;
2929        }
2930    }
2931
2932    /// Returns the earliest time packets should be declared lost for all spaces on a path.
2933    ///
2934    /// If a path has an acknowledged packet with any prior un-acknowledged packets, the
2935    /// earliest un-acknowledged packet can be declared lost after a timeout has elapsed.
2936    /// The time returned is when this packet should be declared lost.
2937    fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2938        SpaceId::iter()
2939            .filter_map(|id| {
2940                self.spaces[id]
2941                    .number_spaces
2942                    .get(&path_id)
2943                    .and_then(|pns| pns.loss_time)
2944                    .map(|time| (time, id))
2945            })
2946            .min_by_key(|&(time, _)| time)
2947    }
2948
2949    /// Returns the earliest next PTO should fire for all spaces on a path.
2950    fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2951        let path = self.path(path_id)?;
2952        let pto_count = path.pto_count;
2953        let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2954        let mut duration = path.rtt.pto_base() * backoff;
2955
2956        if path_id == PathId::ZERO
2957            && path.in_flight.ack_eliciting == 0
2958            && !self.peer_completed_address_validation(PathId::ZERO)
2959        {
2960            // Address Validation during Connection Establishment:
2961            // https://www.rfc-editor.org/rfc/rfc9000.html#section-8.1. To prevent a
2962            // deadlock if an Initial or Handshake packet from the server is lost and the
2963            // server can not send more due to its anti-amplification limit the client must
2964            // send another packet on PTO.
2965            let space = match self.highest_space {
2966                SpaceId::Handshake => SpaceId::Handshake,
2967                _ => SpaceId::Initial,
2968            };
2969
2970            return Some((now + duration, space));
2971        }
2972
2973        let mut result = None;
2974        for space in SpaceId::iter() {
2975            let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2976                continue;
2977            };
2978
2979            if !pns.has_in_flight() {
2980                continue;
2981            }
2982            if space == SpaceId::Data {
2983                // Skip ApplicationData until handshake completes.
2984                if self.is_handshaking() {
2985                    return result;
2986                }
2987                // Include max_ack_delay and backoff for ApplicationData.
2988                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2989            }
2990            let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
2991                continue;
2992            };
2993            let pto = last_ack_eliciting + duration;
2994            if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2995                if path.anti_amplification_blocked(1) {
2996                    // Nothing would be able to be sent.
2997                    continue;
2998                }
2999                if path.in_flight.ack_eliciting == 0 {
3000                    // Nothing ack-eliciting, no PTO to arm/fire.
3001                    continue;
3002                }
3003                result = Some((pto, space));
3004            }
3005        }
3006        result
3007    }
3008
3009    fn peer_completed_address_validation(&self, path: PathId) -> bool {
3010        // TODO(flub): This logic needs updating for multipath
3011        if self.side.is_server() || self.state.is_closed() {
3012            return true;
3013        }
3014        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
3015        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
3016        self.spaces[SpaceId::Handshake]
3017            .path_space(PathId::ZERO)
3018            .and_then(|pns| pns.largest_acked_packet)
3019            .is_some()
3020            || self.spaces[SpaceId::Data]
3021                .path_space(path)
3022                .and_then(|pns| pns.largest_acked_packet)
3023                .is_some()
3024            || (self.spaces[SpaceId::Data].crypto.is_some()
3025                && self.spaces[SpaceId::Handshake].crypto.is_none())
3026    }
3027
3028    /// Resets the the [`PathTimer::LossDetection`] timer to the next instant it may be needed
3029    ///
3030    /// The timer must fire if either:
3031    /// - An ack-eliciting packet we sent needs to be declared lost.
3032    /// - A tail-loss probe needs to be sent.
3033    ///
3034    /// See [`Connection::on_loss_detection_timeout`] for details.
3035    fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3036        if self.state.is_closed() {
3037            // No loss detection takes place on closed connections, and `close_common` already
3038            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
3039            // reordered packet being handled by state-insensitive code.
3040            return;
3041        }
3042
3043        if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3044            // Time threshold loss detection.
3045            self.timers.set(
3046                Timer::PerPath(path_id, PathTimer::LossDetection),
3047                loss_time,
3048                self.qlog.with_time(now),
3049            );
3050            return;
3051        }
3052
3053        // Determine which PN space to arm PTO for.
3054        // Calculate PTO duration
3055        if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3056            self.timers.set(
3057                Timer::PerPath(path_id, PathTimer::LossDetection),
3058                timeout,
3059                self.qlog.with_time(now),
3060            );
3061        } else {
3062            self.timers.stop(
3063                Timer::PerPath(path_id, PathTimer::LossDetection),
3064                self.qlog.with_time(now),
3065            );
3066        }
3067    }
3068
3069    /// The maximum probe timeout across all paths
3070    ///
3071    /// If `is_closing` is set to `true` it will filter out paths that have not yet been used.
3072    ///
3073    /// See [`Connection::pto`]
3074    fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3075        match space {
3076            SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3077            SpaceId::Data => self
3078                .paths
3079                .iter()
3080                .filter_map(|(path_id, state)| {
3081                    if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3082                        // If we are closing and haven't sent anything yet, do not include
3083                        None
3084                    } else {
3085                        let pto = self.pto(space, *path_id);
3086                        Some(pto)
3087                    }
3088                })
3089                .max()
3090                .expect("there should be one at least path"),
3091        }
3092    }
3093
3094    /// Probe Timeout
3095    ///
3096    /// The PTO is logically the time in which you'd expect to receive an acknowledgement
3097    /// for a packet. So approximately RTT + max_ack_delay.
3098    fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3099        let max_ack_delay = match space {
3100            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3101            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3102        };
3103        self.path_data(path_id).rtt.pto_base() + max_ack_delay
3104    }
3105
3106    fn on_packet_authenticated(
3107        &mut self,
3108        now: Instant,
3109        space_id: SpaceId,
3110        path_id: PathId,
3111        ecn: Option<EcnCodepoint>,
3112        packet: Option<u64>,
3113        spin: bool,
3114        is_1rtt: bool,
3115    ) {
3116        self.total_authed_packets += 1;
3117        if let Some(last_allowed_receive) = self
3118            .paths
3119            .get(&path_id)
3120            .and_then(|path| path.data.last_allowed_receive)
3121        {
3122            if now > last_allowed_receive {
3123                warn!("received data on path which we abandoned more than 3 * PTO ago");
3124                // The peer failed to respond with a PATH_ABANDON in time.
3125                if !self.state.is_closed() {
3126                    // TODO(flub): What should the error code be?
3127                    self.state.move_to_closed(TransportError::NO_ERROR(
3128                        "peer failed to respond with PATH_ABANDON in time",
3129                    ));
3130                    self.close_common();
3131                    self.set_close_timer(now);
3132                    self.close = true;
3133                }
3134                return;
3135            }
3136        }
3137
3138        self.reset_keep_alive(path_id, now);
3139        self.reset_idle_timeout(now, space_id, path_id);
3140        self.permit_idle_reset = true;
3141        self.receiving_ecn |= ecn.is_some();
3142        if let Some(x) = ecn {
3143            let space = &mut self.spaces[space_id];
3144            space.for_path(path_id).ecn_counters += x;
3145
3146            if x.is_ce() {
3147                space
3148                    .for_path(path_id)
3149                    .pending_acks
3150                    .set_immediate_ack_required();
3151            }
3152        }
3153
3154        let packet = match packet {
3155            Some(x) => x,
3156            None => return,
3157        };
3158        match &self.side {
3159            ConnectionSide::Client { .. } => {
3160                // If we received a handshake packet that authenticated, then we're talking to
3161                // the real server.  From now on we should no longer allow the server to migrate
3162                // its address.
3163                if space_id == SpaceId::Handshake {
3164                    if let Some(hs) = self.state.as_handshake_mut() {
3165                        hs.allow_server_migration = false;
3166                    }
3167                }
3168            }
3169            ConnectionSide::Server { .. } => {
3170                if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3171                {
3172                    // A server stops sending and processing Initial packets when it receives its first Handshake packet.
3173                    self.discard_space(now, SpaceId::Initial);
3174                }
3175                if self.zero_rtt_crypto.is_some() && is_1rtt {
3176                    // Discard 0-RTT keys soon after receiving a 1-RTT packet
3177                    self.set_key_discard_timer(now, space_id)
3178                }
3179            }
3180        }
3181        let space = self.spaces[space_id].for_path(path_id);
3182        space.pending_acks.insert_one(packet, now);
3183        if packet >= space.rx_packet.unwrap_or_default() {
3184            space.rx_packet = Some(packet);
3185            // Update outgoing spin bit, inverting iff we're the client
3186            self.spin = self.side.is_client() ^ spin;
3187        }
3188    }
3189
3190    /// Resets the idle timeout timers
3191    ///
3192    /// Without multipath there is only the connection-wide idle timeout. When multipath is
3193    /// enabled there is an additional per-path idle timeout.
3194    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3195        // First reset the global idle timeout.
3196        if let Some(timeout) = self.idle_timeout {
3197            if self.state.is_closed() {
3198                self.timers
3199                    .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3200            } else {
3201                let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3202                self.timers.set(
3203                    Timer::Conn(ConnTimer::Idle),
3204                    now + dt,
3205                    self.qlog.with_time(now),
3206                );
3207            }
3208        }
3209
3210        // Now handle the per-path state
3211        if let Some(timeout) = self.path_data(path_id).idle_timeout {
3212            if self.state.is_closed() {
3213                self.timers.stop(
3214                    Timer::PerPath(path_id, PathTimer::PathIdle),
3215                    self.qlog.with_time(now),
3216                );
3217            } else {
3218                let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3219                self.timers.set(
3220                    Timer::PerPath(path_id, PathTimer::PathIdle),
3221                    now + dt,
3222                    self.qlog.with_time(now),
3223                );
3224            }
3225        }
3226    }
3227
3228    /// Resets both the [`ConnTimer::KeepAlive`] and [`PathTimer::PathKeepAlive`] timers
3229    fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3230        if !self.state.is_established() {
3231            return;
3232        }
3233
3234        if let Some(interval) = self.config.keep_alive_interval {
3235            self.timers.set(
3236                Timer::Conn(ConnTimer::KeepAlive),
3237                now + interval,
3238                self.qlog.with_time(now),
3239            );
3240        }
3241
3242        if let Some(interval) = self.path_data(path_id).keep_alive {
3243            self.timers.set(
3244                Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3245                now + interval,
3246                self.qlog.with_time(now),
3247            );
3248        }
3249    }
3250
3251    /// Sets the timer for when a previously issued CID should be retired next
3252    fn reset_cid_retirement(&mut self, now: Instant) {
3253        if let Some((_path, t)) = self.next_cid_retirement() {
3254            self.timers.set(
3255                Timer::Conn(ConnTimer::PushNewCid),
3256                t,
3257                self.qlog.with_time(now),
3258            );
3259        }
3260    }
3261
3262    /// The next time when a previously issued CID should be retired
3263    fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3264        self.local_cid_state
3265            .iter()
3266            .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3267            .min_by_key(|(_path_id, timeout)| *timeout)
3268    }
3269
3270    /// Handle the already-decrypted first packet from the client
3271    ///
3272    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
3273    /// efficient.
3274    pub(crate) fn handle_first_packet(
3275        &mut self,
3276        now: Instant,
3277        network_path: FourTuple,
3278        ecn: Option<EcnCodepoint>,
3279        packet_number: u64,
3280        packet: InitialPacket,
3281        remaining: Option<BytesMut>,
3282    ) -> Result<(), ConnectionError> {
3283        let span = trace_span!("first recv");
3284        let _guard = span.enter();
3285        debug_assert!(self.side.is_server());
3286        let len = packet.header_data.len() + packet.payload.len();
3287        let path_id = PathId::ZERO;
3288        self.path_data_mut(path_id).total_recvd = len as u64;
3289
3290        if let Some(hs) = self.state.as_handshake_mut() {
3291            hs.expected_token = packet.header.token.clone();
3292        } else {
3293            unreachable!("first packet must be delivered in Handshake state");
3294        }
3295
3296        // The first packet is always on PathId::ZERO
3297        self.on_packet_authenticated(
3298            now,
3299            SpaceId::Initial,
3300            path_id,
3301            ecn,
3302            Some(packet_number),
3303            false,
3304            false,
3305        );
3306
3307        let packet: Packet = packet.into();
3308
3309        let mut qlog = QlogRecvPacket::new(len);
3310        qlog.header(&packet.header, Some(packet_number), path_id);
3311
3312        self.process_decrypted_packet(
3313            now,
3314            network_path,
3315            path_id,
3316            Some(packet_number),
3317            packet,
3318            &mut qlog,
3319        )?;
3320        self.qlog.emit_packet_received(qlog, now);
3321        if let Some(data) = remaining {
3322            self.handle_coalesced(now, network_path, path_id, ecn, data);
3323        }
3324
3325        self.qlog.emit_recovery_metrics(
3326            path_id,
3327            &mut self.paths.get_mut(&path_id).unwrap().data,
3328            now,
3329        );
3330
3331        Ok(())
3332    }
3333
3334    fn init_0rtt(&mut self, now: Instant) {
3335        let (header, packet) = match self.crypto.early_crypto() {
3336            Some(x) => x,
3337            None => return,
3338        };
3339        if self.side.is_client() {
3340            match self.crypto.transport_parameters() {
3341                Ok(params) => {
3342                    let params = params
3343                        .expect("crypto layer didn't supply transport parameters with ticket");
3344                    // Certain values must not be cached
3345                    let params = TransportParameters {
3346                        initial_src_cid: None,
3347                        original_dst_cid: None,
3348                        preferred_address: None,
3349                        retry_src_cid: None,
3350                        stateless_reset_token: None,
3351                        min_ack_delay: None,
3352                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3353                        max_ack_delay: TransportParameters::default().max_ack_delay,
3354                        initial_max_path_id: None,
3355                        ..params
3356                    };
3357                    self.set_peer_params(params);
3358                    self.qlog.emit_peer_transport_params_restored(self, now);
3359                }
3360                Err(e) => {
3361                    error!("session ticket has malformed transport parameters: {}", e);
3362                    return;
3363                }
3364            }
3365        }
3366        trace!("0-RTT enabled");
3367        self.zero_rtt_enabled = true;
3368        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3369    }
3370
3371    fn read_crypto(
3372        &mut self,
3373        space: SpaceId,
3374        crypto: &frame::Crypto,
3375        payload_len: usize,
3376    ) -> Result<(), TransportError> {
3377        let expected = if !self.state.is_handshake() {
3378            SpaceId::Data
3379        } else if self.highest_space == SpaceId::Initial {
3380            SpaceId::Initial
3381        } else {
3382            // On the server, self.highest_space can be Data after receiving the client's first
3383            // flight, but we expect Handshake CRYPTO until the handshake is complete.
3384            SpaceId::Handshake
3385        };
3386        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
3387        // packets are illegal, and we don't process 1-RTT packets until the handshake is
3388        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
3389        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3390
3391        let end = crypto.offset + crypto.data.len() as u64;
3392        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3393            warn!(
3394                "received new {:?} CRYPTO data when expecting {:?}",
3395                space, expected
3396            );
3397            return Err(TransportError::PROTOCOL_VIOLATION(
3398                "new data at unexpected encryption level",
3399            ));
3400        }
3401
3402        let space = &mut self.spaces[space];
3403        let max = end.saturating_sub(space.crypto_stream.bytes_read());
3404        if max > self.config.crypto_buffer_size as u64 {
3405            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3406        }
3407
3408        space
3409            .crypto_stream
3410            .insert(crypto.offset, crypto.data.clone(), payload_len);
3411        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3412            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3413            if self.crypto.read_handshake(&chunk.bytes)? {
3414                self.events.push_back(Event::HandshakeDataReady);
3415            }
3416        }
3417
3418        Ok(())
3419    }
3420
3421    fn write_crypto(&mut self) {
3422        loop {
3423            let space = self.highest_space;
3424            let mut outgoing = Vec::new();
3425            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3426                match space {
3427                    SpaceId::Initial => {
3428                        self.upgrade_crypto(SpaceId::Handshake, crypto);
3429                    }
3430                    SpaceId::Handshake => {
3431                        self.upgrade_crypto(SpaceId::Data, crypto);
3432                    }
3433                    _ => unreachable!("got updated secrets during 1-RTT"),
3434                }
3435            }
3436            if outgoing.is_empty() {
3437                if space == self.highest_space {
3438                    break;
3439                } else {
3440                    // Keys updated, check for more data to send
3441                    continue;
3442                }
3443            }
3444            let offset = self.spaces[space].crypto_offset;
3445            let outgoing = Bytes::from(outgoing);
3446            if let Some(hs) = self.state.as_handshake_mut() {
3447                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3448                    hs.client_hello = Some(outgoing.clone());
3449                }
3450            }
3451            self.spaces[space].crypto_offset += outgoing.len() as u64;
3452            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3453            self.spaces[space].pending.crypto.push_back(frame::Crypto {
3454                offset,
3455                data: outgoing,
3456            });
3457        }
3458    }
3459
3460    /// Switch to stronger cryptography during handshake
3461    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3462        debug_assert!(
3463            self.spaces[space].crypto.is_none(),
3464            "already reached packet space {space:?}"
3465        );
3466        trace!("{:?} keys ready", space);
3467        if space == SpaceId::Data {
3468            // Precompute the first key update
3469            self.next_crypto = Some(
3470                self.crypto
3471                    .next_1rtt_keys()
3472                    .expect("handshake should be complete"),
3473            );
3474        }
3475
3476        self.spaces[space].crypto = Some(crypto);
3477        debug_assert!(space as usize > self.highest_space as usize);
3478        self.highest_space = space;
3479        if space == SpaceId::Data && self.side.is_client() {
3480            // Discard 0-RTT keys because 1-RTT keys are available.
3481            self.zero_rtt_crypto = None;
3482        }
3483    }
3484
3485    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3486        debug_assert!(space_id != SpaceId::Data);
3487        trace!("discarding {:?} keys", space_id);
3488        if space_id == SpaceId::Initial {
3489            // No longer needed
3490            if let ConnectionSide::Client { token, .. } = &mut self.side {
3491                *token = Bytes::new();
3492            }
3493        }
3494        let space = &mut self.spaces[space_id];
3495        space.crypto = None;
3496        let pns = space.for_path(PathId::ZERO);
3497        pns.time_of_last_ack_eliciting_packet = None;
3498        pns.loss_time = None;
3499        pns.loss_probes = 0;
3500        let sent_packets = mem::take(&mut pns.sent_packets);
3501        let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3502        for (_, packet) in sent_packets.into_iter() {
3503            path.data.remove_in_flight(&packet);
3504        }
3505
3506        self.set_loss_detection_timer(now, PathId::ZERO)
3507    }
3508
3509    fn handle_coalesced(
3510        &mut self,
3511        now: Instant,
3512        network_path: FourTuple,
3513        path_id: PathId,
3514        ecn: Option<EcnCodepoint>,
3515        data: BytesMut,
3516    ) {
3517        self.path_data_mut(path_id)
3518            .inc_total_recvd(data.len() as u64);
3519        let mut remaining = Some(data);
3520        let cid_len = self
3521            .local_cid_state
3522            .values()
3523            .map(|cid_state| cid_state.cid_len())
3524            .next()
3525            .expect("one cid_state must exist");
3526        while let Some(data) = remaining {
3527            match PartialDecode::new(
3528                data,
3529                &FixedLengthConnectionIdParser::new(cid_len),
3530                &[self.version],
3531                self.endpoint_config.grease_quic_bit,
3532            ) {
3533                Ok((partial_decode, rest)) => {
3534                    remaining = rest;
3535                    self.handle_decode(now, network_path, path_id, ecn, partial_decode);
3536                }
3537                Err(e) => {
3538                    trace!("malformed header: {}", e);
3539                    return;
3540                }
3541            }
3542        }
3543    }
3544
3545    fn handle_decode(
3546        &mut self,
3547        now: Instant,
3548        network_path: FourTuple,
3549        path_id: PathId,
3550        ecn: Option<EcnCodepoint>,
3551        partial_decode: PartialDecode,
3552    ) {
3553        let qlog = QlogRecvPacket::new(partial_decode.len());
3554        if let Some(decoded) = packet_crypto::unprotect_header(
3555            partial_decode,
3556            &self.spaces,
3557            self.zero_rtt_crypto.as_ref(),
3558            self.peer_params.stateless_reset_token,
3559        ) {
3560            self.handle_packet(
3561                now,
3562                network_path,
3563                path_id,
3564                ecn,
3565                decoded.packet,
3566                decoded.stateless_reset,
3567                qlog,
3568            );
3569        }
3570    }
3571
3572    fn handle_packet(
3573        &mut self,
3574        now: Instant,
3575        network_path: FourTuple,
3576        path_id: PathId,
3577        ecn: Option<EcnCodepoint>,
3578        packet: Option<Packet>,
3579        stateless_reset: bool,
3580        mut qlog: QlogRecvPacket,
3581    ) {
3582        self.stats.udp_rx.ios += 1;
3583        if let Some(ref packet) = packet {
3584            trace!(
3585                "got {:?} packet ({} bytes) from {} using id {}",
3586                packet.header.space(),
3587                packet.payload.len() + packet.header_data.len(),
3588                network_path,
3589                packet.header.dst_cid(),
3590            );
3591        }
3592
3593        if self.is_handshaking() {
3594            if path_id != PathId::ZERO {
3595                debug!(%network_path, %path_id, "discarding multipath packet during handshake");
3596                return;
3597            }
3598            if network_path != self.path_data_mut(path_id).network_path {
3599                if let Some(hs) = self.state.as_handshake() {
3600                    if hs.allow_server_migration {
3601                        trace!(%network_path, prev = %self.path_data(path_id).network_path, "server migrated to new remote");
3602                        self.path_data_mut(path_id).network_path = network_path;
3603                        self.qlog.emit_tuple_assigned(path_id, network_path, now);
3604                    } else {
3605                        debug!("discarding packet with unexpected remote during handshake");
3606                        return;
3607                    }
3608                } else {
3609                    debug!("discarding packet with unexpected remote during handshake");
3610                    return;
3611                }
3612            }
3613        }
3614
3615        let was_closed = self.state.is_closed();
3616        let was_drained = self.state.is_drained();
3617
3618        let decrypted = match packet {
3619            None => Err(None),
3620            Some(mut packet) => self
3621                .decrypt_packet(now, path_id, &mut packet)
3622                .map(move |number| (packet, number)),
3623        };
3624        let result = match decrypted {
3625            _ if stateless_reset => {
3626                debug!("got stateless reset");
3627                Err(ConnectionError::Reset)
3628            }
3629            Err(Some(e)) => {
3630                warn!("illegal packet: {}", e);
3631                Err(e.into())
3632            }
3633            Err(None) => {
3634                debug!("failed to authenticate packet");
3635                self.authentication_failures += 1;
3636                let integrity_limit = self.spaces[self.highest_space]
3637                    .crypto
3638                    .as_ref()
3639                    .unwrap()
3640                    .packet
3641                    .local
3642                    .integrity_limit();
3643                if self.authentication_failures > integrity_limit {
3644                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3645                } else {
3646                    return;
3647                }
3648            }
3649            Ok((packet, number)) => {
3650                qlog.header(&packet.header, number, path_id);
3651                let span = match number {
3652                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3653                    None => trace_span!("recv", space = ?packet.header.space()),
3654                };
3655                let _guard = span.enter();
3656
3657                let dedup = self.spaces[packet.header.space()]
3658                    .path_space_mut(path_id)
3659                    .map(|pns| &mut pns.dedup);
3660                if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3661                    debug!("discarding possible duplicate packet");
3662                    self.qlog.emit_packet_received(qlog, now);
3663                    return;
3664                } else if self.state.is_handshake() && packet.header.is_short() {
3665                    // TODO: SHOULD buffer these to improve reordering tolerance.
3666                    trace!("dropping short packet during handshake");
3667                    self.qlog.emit_packet_received(qlog, now);
3668                    return;
3669                } else {
3670                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3671                        if let Some(hs) = self.state.as_handshake() {
3672                            if self.side.is_server() && token != &hs.expected_token {
3673                                // Clients must send the same retry token in every Initial. Initial
3674                                // packets can be spoofed, so we discard rather than killing the
3675                                // connection.
3676                                warn!("discarding Initial with invalid retry token");
3677                                self.qlog.emit_packet_received(qlog, now);
3678                                return;
3679                            }
3680                        }
3681                    }
3682
3683                    if !self.state.is_closed() {
3684                        let spin = match packet.header {
3685                            Header::Short { spin, .. } => spin,
3686                            _ => false,
3687                        };
3688
3689                        if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3690                            // Only the client is allowed to open paths
3691                            self.ensure_path(path_id, network_path, now, number);
3692                        }
3693                        if self.paths.contains_key(&path_id) {
3694                            self.on_packet_authenticated(
3695                                now,
3696                                packet.header.space(),
3697                                path_id,
3698                                ecn,
3699                                number,
3700                                spin,
3701                                packet.header.is_1rtt(),
3702                            );
3703                        }
3704                    }
3705
3706                    let res = self.process_decrypted_packet(
3707                        now,
3708                        network_path,
3709                        path_id,
3710                        number,
3711                        packet,
3712                        &mut qlog,
3713                    );
3714
3715                    self.qlog.emit_packet_received(qlog, now);
3716                    res
3717                }
3718            }
3719        };
3720
3721        // State transitions for error cases
3722        if let Err(conn_err) = result {
3723            match conn_err {
3724                ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3725                ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3726                ConnectionError::Reset
3727                | ConnectionError::TransportError(TransportError {
3728                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
3729                    ..
3730                }) => {
3731                    self.state.move_to_drained(Some(conn_err));
3732                }
3733                ConnectionError::TimedOut => {
3734                    unreachable!("timeouts aren't generated by packet processing");
3735                }
3736                ConnectionError::TransportError(err) => {
3737                    debug!("closing connection due to transport error: {}", err);
3738                    self.state.move_to_closed(err);
3739                }
3740                ConnectionError::VersionMismatch => {
3741                    self.state.move_to_draining(Some(conn_err));
3742                }
3743                ConnectionError::LocallyClosed => {
3744                    unreachable!("LocallyClosed isn't generated by packet processing");
3745                }
3746                ConnectionError::CidsExhausted => {
3747                    unreachable!("CidsExhausted isn't generated by packet processing");
3748                }
3749            };
3750        }
3751
3752        if !was_closed && self.state.is_closed() {
3753            self.close_common();
3754            if !self.state.is_drained() {
3755                self.set_close_timer(now);
3756            }
3757        }
3758        if !was_drained && self.state.is_drained() {
3759            self.endpoint_events.push_back(EndpointEventInner::Drained);
3760            // Close timer may have been started previously, e.g. if we sent a close and got a
3761            // stateless reset in response
3762            self.timers
3763                .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3764        }
3765
3766        // Transmit CONNECTION_CLOSE if necessary
3767        if matches!(self.state.as_type(), StateType::Closed) {
3768            // If there is no PathData for this PathId the packet was for a brand new
3769            // path. It was a valid packet however, so the remote is valid and we want to
3770            // send CONNECTION_CLOSE.
3771            let path_remote = self
3772                .paths
3773                .get(&path_id)
3774                .map(|p| p.data.network_path)
3775                .unwrap_or(network_path);
3776            self.close = network_path == path_remote;
3777        }
3778    }
3779
3780    fn process_decrypted_packet(
3781        &mut self,
3782        now: Instant,
3783        network_path: FourTuple,
3784        path_id: PathId,
3785        number: Option<u64>,
3786        packet: Packet,
3787        qlog: &mut QlogRecvPacket,
3788    ) -> Result<(), ConnectionError> {
3789        if !self.paths.contains_key(&path_id) {
3790            // There is a chance this is a server side, first (for this path) packet, which would
3791            // be a protocol violation. It's more likely, however, that this is a packet of a
3792            // pruned path
3793            trace!(%path_id, ?number, "discarding packet for unknown path");
3794            return Ok(());
3795        }
3796        let state = match self.state.as_type() {
3797            StateType::Established => {
3798                match packet.header.space() {
3799                    SpaceId::Data => self.process_payload(
3800                        now,
3801                        network_path,
3802                        path_id,
3803                        number.unwrap(),
3804                        packet,
3805                        qlog,
3806                    )?,
3807                    _ if packet.header.has_frames() => {
3808                        self.process_early_payload(now, path_id, packet, qlog)?
3809                    }
3810                    _ => {
3811                        trace!("discarding unexpected pre-handshake packet");
3812                    }
3813                }
3814                return Ok(());
3815            }
3816            StateType::Closed => {
3817                for result in frame::Iter::new(packet.payload.freeze())? {
3818                    let frame = match result {
3819                        Ok(frame) => frame,
3820                        Err(err) => {
3821                            debug!("frame decoding error: {err:?}");
3822                            continue;
3823                        }
3824                    };
3825                    qlog.frame(&frame);
3826
3827                    if let Frame::Padding = frame {
3828                        continue;
3829                    };
3830
3831                    self.stats.frame_rx.record(frame.ty());
3832
3833                    if let Frame::Close(_error) = frame {
3834                        self.state.move_to_draining(None);
3835                        break;
3836                    }
3837                }
3838                return Ok(());
3839            }
3840            StateType::Draining | StateType::Drained => return Ok(()),
3841            StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3842        };
3843
3844        match packet.header {
3845            Header::Retry {
3846                src_cid: rem_cid, ..
3847            } => {
3848                debug_assert_eq!(path_id, PathId::ZERO);
3849                if self.side.is_server() {
3850                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3851                }
3852
3853                let is_valid_retry = self
3854                    .rem_cids
3855                    .get(&path_id)
3856                    .map(|cids| cids.active())
3857                    .map(|orig_dst_cid| {
3858                        self.crypto.is_valid_retry(
3859                            orig_dst_cid,
3860                            &packet.header_data,
3861                            &packet.payload,
3862                        )
3863                    })
3864                    .unwrap_or_default();
3865                if self.total_authed_packets > 1
3866                            || packet.payload.len() <= 16 // token + 16 byte tag
3867                            || !is_valid_retry
3868                {
3869                    trace!("discarding invalid Retry");
3870                    // - After the client has received and processed an Initial or Retry
3871                    //   packet from the server, it MUST discard any subsequent Retry
3872                    //   packets that it receives.
3873                    // - A client MUST discard a Retry packet with a zero-length Retry Token
3874                    //   field.
3875                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
3876                    //   that cannot be validated
3877                    return Ok(());
3878                }
3879
3880                trace!("retrying with CID {}", rem_cid);
3881                let client_hello = state.client_hello.take().unwrap();
3882                self.retry_src_cid = Some(rem_cid);
3883                self.rem_cids
3884                    .get_mut(&path_id)
3885                    .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3886                    .update_initial_cid(rem_cid);
3887                self.rem_handshake_cid = rem_cid;
3888
3889                let space = &mut self.spaces[SpaceId::Initial];
3890                if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3891                    self.on_packet_acked(now, PathId::ZERO, info);
3892                };
3893
3894                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after
3895                // any retransmitted Initials
3896                self.spaces[SpaceId::Initial] = {
3897                    let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3898                    space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3899                    space.crypto_offset = client_hello.len() as u64;
3900                    space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3901                        .for_path(path_id)
3902                        .next_packet_number;
3903                    space.pending.crypto.push_back(frame::Crypto {
3904                        offset: 0,
3905                        data: client_hello,
3906                    });
3907                    space
3908                };
3909
3910                // Retransmit all 0-RTT data
3911                let zero_rtt = mem::take(
3912                    &mut self.spaces[SpaceId::Data]
3913                        .for_path(PathId::ZERO)
3914                        .sent_packets,
3915                );
3916                for (_, info) in zero_rtt.into_iter() {
3917                    self.paths
3918                        .get_mut(&PathId::ZERO)
3919                        .unwrap()
3920                        .remove_in_flight(&info);
3921                    self.spaces[SpaceId::Data].pending |= info.retransmits;
3922                }
3923                self.streams.retransmit_all_for_0rtt();
3924
3925                let token_len = packet.payload.len() - 16;
3926                let ConnectionSide::Client { ref mut token, .. } = self.side else {
3927                    unreachable!("we already short-circuited if we're server");
3928                };
3929                *token = packet.payload.freeze().split_to(token_len);
3930
3931                self.state = State::handshake(state::Handshake {
3932                    expected_token: Bytes::new(),
3933                    rem_cid_set: false,
3934                    client_hello: None,
3935                    allow_server_migration: true,
3936                });
3937                Ok(())
3938            }
3939            Header::Long {
3940                ty: LongType::Handshake,
3941                src_cid: rem_cid,
3942                dst_cid: loc_cid,
3943                ..
3944            } => {
3945                debug_assert_eq!(path_id, PathId::ZERO);
3946                if rem_cid != self.rem_handshake_cid {
3947                    debug!(
3948                        "discarding packet with mismatched remote CID: {} != {}",
3949                        self.rem_handshake_cid, rem_cid
3950                    );
3951                    return Ok(());
3952                }
3953                self.on_path_validated(path_id);
3954
3955                self.process_early_payload(now, path_id, packet, qlog)?;
3956                if self.state.is_closed() {
3957                    return Ok(());
3958                }
3959
3960                if self.crypto.is_handshaking() {
3961                    trace!("handshake ongoing");
3962                    return Ok(());
3963                }
3964
3965                if self.side.is_client() {
3966                    // Client-only because server params were set from the client's Initial
3967                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3968                        TransportError::new(
3969                            TransportErrorCode::crypto(0x6d),
3970                            "transport parameters missing".to_owned(),
3971                        )
3972                    })?;
3973
3974                    if self.has_0rtt() {
3975                        if !self.crypto.early_data_accepted().unwrap() {
3976                            debug_assert!(self.side.is_client());
3977                            debug!("0-RTT rejected");
3978                            self.accepted_0rtt = false;
3979                            self.streams.zero_rtt_rejected();
3980
3981                            // Discard already-queued frames
3982                            self.spaces[SpaceId::Data].pending = Retransmits::default();
3983
3984                            // Discard 0-RTT packets
3985                            let sent_packets = mem::take(
3986                                &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3987                            );
3988                            for (_, packet) in sent_packets.into_iter() {
3989                                self.paths
3990                                    .get_mut(&path_id)
3991                                    .unwrap()
3992                                    .remove_in_flight(&packet);
3993                            }
3994                        } else {
3995                            self.accepted_0rtt = true;
3996                            params.validate_resumption_from(&self.peer_params)?;
3997                        }
3998                    }
3999                    if let Some(token) = params.stateless_reset_token {
4000                        // TODO(matheus23): Reset token for a remote, or for a 4-tuple?
4001                        let remote = self.path_data(path_id).network_path.remote;
4002                        self.endpoint_events
4003                            .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4004                    }
4005                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4006                    self.issue_first_cids(now);
4007                } else {
4008                    // Server-only
4009                    self.spaces[SpaceId::Data].pending.handshake_done = true;
4010                    self.discard_space(now, SpaceId::Handshake);
4011                    self.events.push_back(Event::HandshakeConfirmed);
4012                    trace!("handshake confirmed");
4013                }
4014
4015                self.events.push_back(Event::Connected);
4016                self.state.move_to_established();
4017                trace!("established");
4018
4019                // Multipath can only be enabled after the state has reached Established.
4020                // So this can not happen any earlier.
4021                self.issue_first_path_cids(now);
4022                Ok(())
4023            }
4024            Header::Initial(InitialHeader {
4025                src_cid: rem_cid,
4026                dst_cid: loc_cid,
4027                ..
4028            }) => {
4029                debug_assert_eq!(path_id, PathId::ZERO);
4030                if !state.rem_cid_set {
4031                    trace!("switching remote CID to {}", rem_cid);
4032                    let mut state = state.clone();
4033                    self.rem_cids
4034                        .get_mut(&path_id)
4035                        .expect("PathId::ZERO not yet abandoned")
4036                        .update_initial_cid(rem_cid);
4037                    self.rem_handshake_cid = rem_cid;
4038                    self.orig_rem_cid = rem_cid;
4039                    state.rem_cid_set = true;
4040                    self.state.move_to_handshake(state);
4041                } else if rem_cid != self.rem_handshake_cid {
4042                    debug!(
4043                        "discarding packet with mismatched remote CID: {} != {}",
4044                        self.rem_handshake_cid, rem_cid
4045                    );
4046                    return Ok(());
4047                }
4048
4049                let starting_space = self.highest_space;
4050                self.process_early_payload(now, path_id, packet, qlog)?;
4051
4052                if self.side.is_server()
4053                    && starting_space == SpaceId::Initial
4054                    && self.highest_space != SpaceId::Initial
4055                {
4056                    let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4057                        TransportError::new(
4058                            TransportErrorCode::crypto(0x6d),
4059                            "transport parameters missing".to_owned(),
4060                        )
4061                    })?;
4062                    self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4063                    self.issue_first_cids(now);
4064                    self.init_0rtt(now);
4065                }
4066                Ok(())
4067            }
4068            Header::Long {
4069                ty: LongType::ZeroRtt,
4070                ..
4071            } => {
4072                self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4073                Ok(())
4074            }
4075            Header::VersionNegotiate { .. } => {
4076                if self.total_authed_packets > 1 {
4077                    return Ok(());
4078                }
4079                let supported = packet
4080                    .payload
4081                    .chunks(4)
4082                    .any(|x| match <[u8; 4]>::try_from(x) {
4083                        Ok(version) => self.version == u32::from_be_bytes(version),
4084                        Err(_) => false,
4085                    });
4086                if supported {
4087                    return Ok(());
4088                }
4089                debug!("remote doesn't support our version");
4090                Err(ConnectionError::VersionMismatch)
4091            }
4092            Header::Short { .. } => unreachable!(
4093                "short packets received during handshake are discarded in handle_packet"
4094            ),
4095        }
4096    }
4097
4098    /// Process an Initial or Handshake packet payload
4099    fn process_early_payload(
4100        &mut self,
4101        now: Instant,
4102        path_id: PathId,
4103        packet: Packet,
4104        #[allow(unused)] qlog: &mut QlogRecvPacket,
4105    ) -> Result<(), TransportError> {
4106        debug_assert_ne!(packet.header.space(), SpaceId::Data);
4107        debug_assert_eq!(path_id, PathId::ZERO);
4108        let payload_len = packet.payload.len();
4109        let mut ack_eliciting = false;
4110        for result in frame::Iter::new(packet.payload.freeze())? {
4111            let frame = result?;
4112            qlog.frame(&frame);
4113            let span = match frame {
4114                Frame::Padding => continue,
4115                _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4116            };
4117
4118            self.stats.frame_rx.record(frame.ty());
4119
4120            let _guard = span.as_ref().map(|x| x.enter());
4121            ack_eliciting |= frame.is_ack_eliciting();
4122
4123            // Process frames
4124            if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4125                return Err(TransportError::PROTOCOL_VIOLATION(
4126                    "illegal frame type in handshake",
4127                ));
4128            }
4129
4130            match frame {
4131                Frame::Padding | Frame::Ping => {}
4132                Frame::Crypto(frame) => {
4133                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
4134                }
4135                Frame::Ack(ack) => {
4136                    self.on_ack_received(now, packet.header.space(), ack)?;
4137                }
4138                Frame::PathAck(ack) => {
4139                    span.as_ref()
4140                        .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4141                    self.on_path_ack_received(now, packet.header.space(), ack)?;
4142                }
4143                Frame::Close(reason) => {
4144                    self.state.move_to_draining(Some(reason.into()));
4145                    return Ok(());
4146                }
4147                _ => {
4148                    let mut err =
4149                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4150                    err.frame = frame::MaybeFrame::Known(frame.ty());
4151                    return Err(err);
4152                }
4153            }
4154        }
4155
4156        if ack_eliciting {
4157            // In the initial and handshake spaces, ACKs must be sent immediately
4158            self.spaces[packet.header.space()]
4159                .for_path(path_id)
4160                .pending_acks
4161                .set_immediate_ack_required();
4162        }
4163
4164        self.write_crypto();
4165        Ok(())
4166    }
4167
4168    /// Processes the packet payload, always in the data space.
4169    fn process_payload(
4170        &mut self,
4171        now: Instant,
4172        network_path: FourTuple,
4173        path_id: PathId,
4174        number: u64,
4175        packet: Packet,
4176        #[allow(unused)] qlog: &mut QlogRecvPacket,
4177    ) -> Result<(), TransportError> {
4178        let payload = packet.payload.freeze();
4179        let mut is_probing_packet = true;
4180        let mut close = None;
4181        let payload_len = payload.len();
4182        let mut ack_eliciting = false;
4183        // if this packet triggers a path migration and includes a observed address frame, it's
4184        // stored here
4185        let mut migration_observed_addr = None;
4186        for result in frame::Iter::new(payload)? {
4187            let frame = result?;
4188            qlog.frame(&frame);
4189            let span = match frame {
4190                Frame::Padding => continue,
4191                _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4192            };
4193
4194            self.stats.frame_rx.record(frame.ty());
4195            // Crypto, Stream and Datagram frames are special cased in order no pollute
4196            // the log with payload data
4197            match &frame {
4198                Frame::Crypto(f) => {
4199                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4200                }
4201                Frame::Stream(f) => {
4202                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4203                }
4204                Frame::Datagram(f) => {
4205                    trace!(len = f.data.len(), "got datagram frame");
4206                }
4207                f => {
4208                    trace!("got frame {f}");
4209                }
4210            }
4211
4212            let _guard = span.enter();
4213            if packet.header.is_0rtt() {
4214                match frame {
4215                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4216                        return Err(TransportError::PROTOCOL_VIOLATION(
4217                            "illegal frame type in 0-RTT",
4218                        ));
4219                    }
4220                    _ => {
4221                        if frame.is_1rtt() {
4222                            return Err(TransportError::PROTOCOL_VIOLATION(
4223                                "illegal frame type in 0-RTT",
4224                            ));
4225                        }
4226                    }
4227                }
4228            }
4229            ack_eliciting |= frame.is_ack_eliciting();
4230
4231            // Check whether this could be a probing packet
4232            match frame {
4233                Frame::Padding
4234                | Frame::PathChallenge(_)
4235                | Frame::PathResponse(_)
4236                | Frame::NewConnectionId(_)
4237                | Frame::ObservedAddr(_) => {}
4238                _ => {
4239                    is_probing_packet = false;
4240                }
4241            }
4242
4243            match frame {
4244                Frame::Crypto(frame) => {
4245                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4246                }
4247                Frame::Stream(frame) => {
4248                    if self.streams.received(frame, payload_len)?.should_transmit() {
4249                        self.spaces[SpaceId::Data].pending.max_data = true;
4250                    }
4251                }
4252                Frame::Ack(ack) => {
4253                    self.on_ack_received(now, SpaceId::Data, ack)?;
4254                }
4255                Frame::PathAck(ack) => {
4256                    span.record("path", tracing::field::debug(&ack.path_id));
4257                    self.on_path_ack_received(now, SpaceId::Data, ack)?;
4258                }
4259                Frame::Padding | Frame::Ping => {}
4260                Frame::Close(reason) => {
4261                    close = Some(reason);
4262                }
4263                Frame::PathChallenge(challenge) => {
4264                    let path = &mut self
4265                        .path_mut(path_id)
4266                        .expect("payload is processed only after the path becomes known");
4267                    path.path_responses.push(number, challenge.0, network_path);
4268                    // At this point, update_network_path_or_discard was already called, so
4269                    // we don't need to be lenient about `local_ip` possibly mis-matching.
4270                    if network_path == path.network_path {
4271                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
4272                        // attack. Send a non-probing packet to recover the active path.
4273                        // TODO(flub): No longer true! We now path_challege also to validate
4274                        //    the path if the path is new, without an RFC9000-style
4275                        //    migration involved. This means we add in an extra
4276                        //    IMMEDIATE_ACK on some challenges. It isn't really wrong to do
4277                        //    so, but it still is something untidy. We should instead
4278                        //    suppress this when we know the remote is still validating the
4279                        //    path.
4280                        match self.peer_supports_ack_frequency() {
4281                            true => self.immediate_ack(path_id),
4282                            false => {
4283                                self.ping_path(path_id).ok();
4284                            }
4285                        }
4286                    }
4287                }
4288                Frame::PathResponse(response) => {
4289                    let path = self
4290                        .paths
4291                        .get_mut(&path_id)
4292                        .expect("payload is processed only after the path becomes known");
4293
4294                    use PathTimer::*;
4295                    use paths::OnPathResponseReceived::*;
4296                    match path
4297                        .data
4298                        .on_path_response_received(now, response.0, network_path)
4299                    {
4300                        OnPath { was_open } => {
4301                            let qlog = self.qlog.with_time(now);
4302
4303                            self.timers
4304                                .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4305                            self.timers
4306                                .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4307
4308                            let next_challenge = path
4309                                .data
4310                                .earliest_expiring_challenge()
4311                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4312                            self.timers.set_or_stop(
4313                                Timer::PerPath(path_id, PathChallengeLost),
4314                                next_challenge,
4315                                qlog,
4316                            );
4317
4318                            if !was_open {
4319                                self.events
4320                                    .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4321                                if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4322                                {
4323                                    self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4324                                        id: path_id,
4325                                        addr: observed.socket_addr(),
4326                                    }));
4327                                }
4328                            }
4329                            if let Some((_, ref mut prev)) = path.prev {
4330                                prev.challenges_sent.clear();
4331                                prev.send_new_challenge = false;
4332                            }
4333                        }
4334                        OffPath => {
4335                            debug!("Response to off-path PathChallenge!");
4336                            let next_challenge = path
4337                                .data
4338                                .earliest_expiring_challenge()
4339                                .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4340                            self.timers.set_or_stop(
4341                                Timer::PerPath(path_id, PathChallengeLost),
4342                                next_challenge,
4343                                self.qlog.with_time(now),
4344                            );
4345                        }
4346                        Invalid { expected } => {
4347                            debug!(%response, %network_path, %expected, "ignoring invalid PATH_RESPONSE")
4348                        }
4349                        Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4350                    }
4351                }
4352                Frame::MaxData(frame::MaxData(bytes)) => {
4353                    self.streams.received_max_data(bytes);
4354                }
4355                Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
4356                    self.streams.received_max_stream_data(id, offset)?;
4357                }
4358                Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
4359                    self.streams.received_max_streams(dir, count)?;
4360                }
4361                Frame::ResetStream(frame) => {
4362                    if self.streams.received_reset(frame)?.should_transmit() {
4363                        self.spaces[SpaceId::Data].pending.max_data = true;
4364                    }
4365                }
4366                Frame::DataBlocked { offset } => {
4367                    debug!(offset, "peer claims to be blocked at connection level");
4368                }
4369                Frame::StreamDataBlocked { id, offset } => {
4370                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4371                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4372                        return Err(TransportError::STREAM_STATE_ERROR(
4373                            "STREAM_DATA_BLOCKED on send-only stream",
4374                        ));
4375                    }
4376                    debug!(
4377                        stream = %id,
4378                        offset, "peer claims to be blocked at stream level"
4379                    );
4380                }
4381                Frame::StreamsBlocked { dir, limit } => {
4382                    if limit > MAX_STREAM_COUNT {
4383                        return Err(TransportError::FRAME_ENCODING_ERROR(
4384                            "unrepresentable stream limit",
4385                        ));
4386                    }
4387                    debug!(
4388                        "peer claims to be blocked opening more than {} {} streams",
4389                        limit, dir
4390                    );
4391                }
4392                Frame::StopSending(frame::StopSending { id, error_code }) => {
4393                    if id.initiator() != self.side.side() {
4394                        if id.dir() == Dir::Uni {
4395                            debug!("got STOP_SENDING on recv-only {}", id);
4396                            return Err(TransportError::STREAM_STATE_ERROR(
4397                                "STOP_SENDING on recv-only stream",
4398                            ));
4399                        }
4400                    } else if self.streams.is_local_unopened(id) {
4401                        return Err(TransportError::STREAM_STATE_ERROR(
4402                            "STOP_SENDING on unopened stream",
4403                        ));
4404                    }
4405                    self.streams.received_stop_sending(id, error_code);
4406                }
4407                Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4408                    if let Some(ref path_id) = path_id {
4409                        span.record("path", tracing::field::debug(&path_id));
4410                    }
4411                    let path_id = path_id.unwrap_or_default();
4412                    match self.local_cid_state.get_mut(&path_id) {
4413                        None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4414                        Some(cid_state) => {
4415                            let allow_more_cids = cid_state
4416                                .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4417
4418                            // If the path has closed, we do not issue more CIDs for this path
4419                            // For details see  https://www.ietf.org/archive/id/draft-ietf-quic-multipath-17.html#section-3.2.2
4420                            // > an endpoint SHOULD provide new connection IDs for that path, if still open, using PATH_NEW_CONNECTION_ID frames.
4421                            let has_path = !self.abandoned_paths.contains(&path_id);
4422                            let allow_more_cids = allow_more_cids && has_path;
4423
4424                            self.endpoint_events
4425                                .push_back(EndpointEventInner::RetireConnectionId(
4426                                    now,
4427                                    path_id,
4428                                    sequence,
4429                                    allow_more_cids,
4430                                ));
4431                        }
4432                    }
4433                }
4434                Frame::NewConnectionId(frame) => {
4435                    let path_id = if let Some(path_id) = frame.path_id {
4436                        if !self.is_multipath_negotiated() {
4437                            return Err(TransportError::PROTOCOL_VIOLATION(
4438                                "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4439                            ));
4440                        }
4441                        if path_id > self.local_max_path_id {
4442                            return Err(TransportError::PROTOCOL_VIOLATION(
4443                                "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4444                            ));
4445                        }
4446                        path_id
4447                    } else {
4448                        PathId::ZERO
4449                    };
4450
4451                    if self.abandoned_paths.contains(&path_id) {
4452                        trace!("ignoring issued CID for abandoned path");
4453                        continue;
4454                    }
4455                    if let Some(ref path_id) = frame.path_id {
4456                        span.record("path", tracing::field::debug(&path_id));
4457                    }
4458                    let rem_cids = self
4459                        .rem_cids
4460                        .entry(path_id)
4461                        .or_insert_with(|| CidQueue::new(frame.id));
4462                    if rem_cids.active().is_empty() {
4463                        return Err(TransportError::PROTOCOL_VIOLATION(
4464                            "NEW_CONNECTION_ID when CIDs aren't in use",
4465                        ));
4466                    }
4467                    if frame.retire_prior_to > frame.sequence {
4468                        return Err(TransportError::PROTOCOL_VIOLATION(
4469                            "NEW_CONNECTION_ID retiring unissued CIDs",
4470                        ));
4471                    }
4472
4473                    use crate::cid_queue::InsertError;
4474                    match rem_cids.insert(frame) {
4475                        Ok(None) if self.path(path_id).is_none() => {
4476                            // if this gives us CIDs to open a new path and a nat traversal attempt
4477                            // is underway we could try to probe a pending remote
4478                            self.continue_nat_traversal_round(now);
4479                        }
4480                        Ok(None) => {}
4481                        Ok(Some((retired, reset_token))) => {
4482                            let pending_retired =
4483                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
4484                            /// Ensure `pending_retired` cannot grow without bound. Limit is
4485                            /// somewhat arbitrary but very permissive.
4486                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4487                            // We don't bother counting in-flight frames because those are bounded
4488                            // by congestion control.
4489                            if (pending_retired.len() as u64)
4490                                .saturating_add(retired.end.saturating_sub(retired.start))
4491                                > MAX_PENDING_RETIRED_CIDS
4492                            {
4493                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4494                                    "queued too many retired CIDs",
4495                                ));
4496                            }
4497                            pending_retired.extend(retired.map(|seq| (path_id, seq)));
4498                            // TODO(matheus23): Reset token for a remote or a full 4-tuple?
4499                            self.set_reset_token(path_id, network_path.remote, reset_token);
4500                        }
4501                        Err(InsertError::ExceedsLimit) => {
4502                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4503                        }
4504                        Err(InsertError::Retired) => {
4505                            trace!("discarding already-retired");
4506                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
4507                            // range of connection IDs larger than the active connection ID limit
4508                            // was retired all at once via retire_prior_to.
4509                            self.spaces[SpaceId::Data]
4510                                .pending
4511                                .retire_cids
4512                                .push((path_id, frame.sequence));
4513                            continue;
4514                        }
4515                    };
4516
4517                    if self.side.is_server()
4518                        && path_id == PathId::ZERO
4519                        && self
4520                            .rem_cids
4521                            .get(&PathId::ZERO)
4522                            .map(|cids| cids.active_seq() == 0)
4523                            .unwrap_or_default()
4524                    {
4525                        // We're a server still using the initial remote CID for the client, so
4526                        // let's switch immediately to enable clientside stateless resets.
4527                        self.update_rem_cid(PathId::ZERO);
4528                    }
4529                }
4530                Frame::NewToken(NewToken { token }) => {
4531                    let ConnectionSide::Client {
4532                        token_store,
4533                        server_name,
4534                        ..
4535                    } = &self.side
4536                    else {
4537                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4538                    };
4539                    if token.is_empty() {
4540                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4541                    }
4542                    trace!("got new token");
4543                    token_store.insert(server_name, token);
4544                }
4545                Frame::Datagram(datagram) => {
4546                    if self
4547                        .datagrams
4548                        .received(datagram, &self.config.datagram_receive_buffer_size)?
4549                    {
4550                        self.events.push_back(Event::DatagramReceived);
4551                    }
4552                }
4553                Frame::AckFrequency(ack_frequency) => {
4554                    // This frame can only be sent in the Data space
4555
4556                    if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4557                        // The AckFrequency frame is stale (we have already received a more
4558                        // recent one)
4559                        continue;
4560                    }
4561
4562                    // Update the params for all of our paths
4563                    for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4564                        space.pending_acks.set_ack_frequency_params(&ack_frequency);
4565
4566                        // Our `max_ack_delay` has been updated, so we may need to adjust
4567                        // its associated timeout
4568                        if let Some(timeout) = space
4569                            .pending_acks
4570                            .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4571                        {
4572                            self.timers.set(
4573                                Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4574                                timeout,
4575                                self.qlog.with_time(now),
4576                            );
4577                        }
4578                    }
4579                }
4580                Frame::ImmediateAck => {
4581                    // This frame can only be sent in the Data space
4582                    for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4583                        pns.pending_acks.set_immediate_ack_required();
4584                    }
4585                }
4586                Frame::HandshakeDone => {
4587                    if self.side.is_server() {
4588                        return Err(TransportError::PROTOCOL_VIOLATION(
4589                            "client sent HANDSHAKE_DONE",
4590                        ));
4591                    }
4592                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
4593                        self.discard_space(now, SpaceId::Handshake);
4594                    }
4595                    self.events.push_back(Event::HandshakeConfirmed);
4596                    trace!("handshake confirmed");
4597                }
4598                Frame::ObservedAddr(observed) => {
4599                    // check if params allows the peer to send report and this node to receive it
4600                    trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4601                    if !self
4602                        .peer_params
4603                        .address_discovery_role
4604                        .should_report(&self.config.address_discovery_role)
4605                    {
4606                        return Err(TransportError::PROTOCOL_VIOLATION(
4607                            "received OBSERVED_ADDRESS frame when not negotiated",
4608                        ));
4609                    }
4610                    // must only be sent in data space
4611                    if packet.header.space() != SpaceId::Data {
4612                        return Err(TransportError::PROTOCOL_VIOLATION(
4613                            "OBSERVED_ADDRESS frame outside data space",
4614                        ));
4615                    }
4616
4617                    let path = self.path_data_mut(path_id);
4618                    if network_path == path.network_path {
4619                        if let Some(updated) = path.update_observed_addr_report(observed) {
4620                            if path.open {
4621                                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4622                                    id: path_id,
4623                                    addr: updated,
4624                                }));
4625                            }
4626                            // otherwise the event is reported when the path is deemed open
4627                        }
4628                    } else {
4629                        // include in migration
4630                        migration_observed_addr = Some(observed)
4631                    }
4632                }
4633                Frame::PathAbandon(frame::PathAbandon {
4634                    path_id,
4635                    error_code,
4636                }) => {
4637                    span.record("path", tracing::field::debug(&path_id));
4638                    // TODO(flub): don't really know which error code to use here.
4639                    let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4640                        Ok(()) => {
4641                            trace!("peer abandoned path");
4642                            false
4643                        }
4644                        Err(ClosePathError::LastOpenPath) => {
4645                            trace!("peer abandoned last path, closing connection");
4646                            // TODO(flub): which error code?
4647                            return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4648                        }
4649                        Err(ClosePathError::ClosedPath) => {
4650                            trace!("peer abandoned already closed path");
4651                            true
4652                        }
4653                    };
4654                    // If we receive a retransmit of PATH_ABANDON then we may already have
4655                    // abandoned this path locally.  In that case the DiscardPath timer
4656                    // may already have fired and we no longer have any state for this path.
4657                    // Only set this timer if we still have path state.
4658                    if self.path(path_id).is_some() && !already_abandoned {
4659                        // TODO(flub): Checking is_some() here followed by a number of calls
4660                        //    that would panic if it was None is really ugly.  If only we
4661                        //    could do something like PathData::pto().  One day we'll have
4662                        //    unified SpaceId and PathId and this will be possible.
4663                        let delay = self.pto(SpaceId::Data, path_id) * 3;
4664                        self.timers.set(
4665                            Timer::PerPath(path_id, PathTimer::DiscardPath),
4666                            now + delay,
4667                            self.qlog.with_time(now),
4668                        );
4669                    }
4670                }
4671                Frame::PathStatusAvailable(info) => {
4672                    span.record("path", tracing::field::debug(&info.path_id));
4673                    if self.is_multipath_negotiated() {
4674                        self.on_path_status(
4675                            info.path_id,
4676                            PathStatus::Available,
4677                            info.status_seq_no,
4678                        );
4679                    } else {
4680                        return Err(TransportError::PROTOCOL_VIOLATION(
4681                            "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4682                        ));
4683                    }
4684                }
4685                Frame::PathStatusBackup(info) => {
4686                    span.record("path", tracing::field::debug(&info.path_id));
4687                    if self.is_multipath_negotiated() {
4688                        self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4689                    } else {
4690                        return Err(TransportError::PROTOCOL_VIOLATION(
4691                            "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4692                        ));
4693                    }
4694                }
4695                Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4696                    span.record("path", tracing::field::debug(&path_id));
4697                    if !self.is_multipath_negotiated() {
4698                        return Err(TransportError::PROTOCOL_VIOLATION(
4699                            "received MAX_PATH_ID frame when multipath was not negotiated",
4700                        ));
4701                    }
4702                    // frames that do not increase the path id are ignored
4703                    if path_id > self.remote_max_path_id {
4704                        self.remote_max_path_id = path_id;
4705                        self.issue_first_path_cids(now);
4706                        while let Some(true) = self.continue_nat_traversal_round(now) {}
4707                    }
4708                }
4709                Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4710                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4711                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4712                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4713                    if self.is_multipath_negotiated() {
4714                        if self.local_max_path_id > max_path_id {
4715                            return Err(TransportError::PROTOCOL_VIOLATION(
4716                                "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4717                            ));
4718                        }
4719                        debug!("received PATHS_BLOCKED({:?})", max_path_id);
4720                        // TODO(@divma): ensure max concurrent paths
4721                    } else {
4722                        return Err(TransportError::PROTOCOL_VIOLATION(
4723                            "received PATHS_BLOCKED frame when not multipath was not negotiated",
4724                        ));
4725                    }
4726                }
4727                Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4728                    // Nothing to do.  This is recorded in the frame stats, but otherwise we
4729                    // always issue all CIDs we're allowed to issue, so either this is an
4730                    // impatient peer or a bug on our side.
4731
4732                    // Receipt of a value of Maximum Path Identifier or Path Identifier that is higher than the local maximum value MUST
4733                    // be treated as a connection error of type PROTOCOL_VIOLATION.
4734                    // Ref <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-paths_blocked-and-path_cids>
4735                    if self.is_multipath_negotiated() {
4736                        if path_id > self.local_max_path_id {
4737                            return Err(TransportError::PROTOCOL_VIOLATION(
4738                                "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4739                            ));
4740                        }
4741                        if next_seq.0
4742                            > self
4743                                .local_cid_state
4744                                .get(&path_id)
4745                                .map(|cid_state| cid_state.active_seq().1 + 1)
4746                                .unwrap_or_default()
4747                        {
4748                            return Err(TransportError::PROTOCOL_VIOLATION(
4749                                "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4750                            ));
4751                        }
4752                        debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4753                    } else {
4754                        return Err(TransportError::PROTOCOL_VIOLATION(
4755                            "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4756                        ));
4757                    }
4758                }
4759                Frame::AddAddress(addr) => {
4760                    let client_state = match self.iroh_hp.client_side_mut() {
4761                        Ok(state) => state,
4762                        Err(err) => {
4763                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4764                                "Nat traversal(ADD_ADDRESS): {err}"
4765                            )));
4766                        }
4767                    };
4768
4769                    if !client_state.check_remote_address(&addr) {
4770                        // if the address is not valid we flag it, but update anyway
4771                        warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4772                    }
4773
4774                    match client_state.add_remote_address(addr) {
4775                        Ok(maybe_added) => {
4776                            if let Some(added) = maybe_added {
4777                                self.events.push_back(Event::NatTraversal(
4778                                    iroh_hp::Event::AddressAdded(added),
4779                                ));
4780                            }
4781                        }
4782                        Err(e) => {
4783                            warn!(%e, "failed to add remote address")
4784                        }
4785                    }
4786                }
4787                Frame::RemoveAddress(addr) => {
4788                    let client_state = match self.iroh_hp.client_side_mut() {
4789                        Ok(state) => state,
4790                        Err(err) => {
4791                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4792                                "Nat traversal(REMOVE_ADDRESS): {err}"
4793                            )));
4794                        }
4795                    };
4796                    if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4797                        self.events
4798                            .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4799                                removed_addr,
4800                            )));
4801                    }
4802                }
4803                Frame::ReachOut(reach_out) => {
4804                    let server_state = match self.iroh_hp.server_side_mut() {
4805                        Ok(state) => state,
4806                        Err(err) => {
4807                            return Err(TransportError::PROTOCOL_VIOLATION(format!(
4808                                "Nat traversal(REACH_OUT): {err}"
4809                            )));
4810                        }
4811                    };
4812
4813                    if let Err(err) = server_state.handle_reach_out(reach_out) {
4814                        return Err(TransportError::PROTOCOL_VIOLATION(format!(
4815                            "Nat traversal(REACH_OUT): {err}"
4816                        )));
4817                    }
4818                }
4819            }
4820        }
4821
4822        let space = self.spaces[SpaceId::Data].for_path(path_id);
4823        if space
4824            .pending_acks
4825            .packet_received(now, number, ack_eliciting, &space.dedup)
4826        {
4827            if self.abandoned_paths.contains(&path_id) {
4828                // § 3.4.3 QUIC-MULTIPATH: promptly send ACKs for packets received from
4829                // abandoned paths.
4830                space.pending_acks.set_immediate_ack_required();
4831            } else {
4832                self.timers.set(
4833                    Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4834                    now + self.ack_frequency.max_ack_delay,
4835                    self.qlog.with_time(now),
4836                );
4837            }
4838        }
4839
4840        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
4841        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
4842        // are only freed, and hence only issue credit, once the application has been notified
4843        // during a read on the stream.
4844        let pending = &mut self.spaces[SpaceId::Data].pending;
4845        self.streams.queue_max_stream_id(pending);
4846
4847        if let Some(reason) = close {
4848            self.state.move_to_draining(Some(reason.into()));
4849            self.close = true;
4850        }
4851
4852        if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4853            && !is_probing_packet
4854            && network_path != self.path_data(path_id).network_path
4855        {
4856            let ConnectionSide::Server { ref server_config } = self.side else {
4857                panic!("packets from unknown remote should be dropped by clients");
4858            };
4859            debug_assert!(
4860                server_config.migration,
4861                "migration-initiating packets should have been dropped immediately"
4862            );
4863            self.migrate(path_id, now, network_path, migration_observed_addr);
4864            // Break linkability, if possible
4865            self.update_rem_cid(path_id);
4866            self.spin = false;
4867        }
4868
4869        Ok(())
4870    }
4871
4872    fn migrate(
4873        &mut self,
4874        path_id: PathId,
4875        now: Instant,
4876        network_path: FourTuple,
4877        observed_addr: Option<ObservedAddr>,
4878    ) {
4879        trace!(%network_path, %path_id, "migration initiated");
4880        self.path_counter = self.path_counter.wrapping_add(1);
4881        // TODO(@divma): conditions for path migration in multipath are very specific, check them
4882        // again to prevent path migrations that should actually create a new path
4883
4884        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
4885        // Note that the congestion window will not grow until validation terminates. Helps mitigate
4886        // amplification attacks performed by spoofing source addresses.
4887        let prev_pto = self.pto(SpaceId::Data, path_id);
4888        let known_path = self.paths.get_mut(&path_id).expect("known path");
4889        let path = &mut known_path.data;
4890        let mut new_path = if network_path.remote.is_ipv4()
4891            && network_path.remote.ip() == path.network_path.remote.ip()
4892        {
4893            PathData::from_previous(network_path, path, self.path_counter, now)
4894        } else {
4895            let peer_max_udp_payload_size =
4896                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4897                    .unwrap_or(u16::MAX);
4898            PathData::new(
4899                network_path,
4900                self.allow_mtud,
4901                Some(peer_max_udp_payload_size),
4902                self.path_counter,
4903                now,
4904                &self.config,
4905            )
4906        };
4907        new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4908        if let Some(report) = observed_addr {
4909            if let Some(updated) = new_path.update_observed_addr_report(report) {
4910                tracing::info!("adding observed addr event from migration");
4911                self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4912                    id: path_id,
4913                    addr: updated,
4914                }));
4915            }
4916        }
4917        new_path.send_new_challenge = true;
4918
4919        let mut prev = mem::replace(path, new_path);
4920        // Don't clobber the original path if the previous one hasn't been validated yet
4921        if !prev.is_validating_path() {
4922            prev.send_new_challenge = true;
4923            // We haven't updated the remote CID yet, this captures the remote CID we were using on
4924            // the previous path.
4925
4926            known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4927        }
4928
4929        // We need to re-assign the correct remote to this path in qlog
4930        self.qlog.emit_tuple_assigned(path_id, network_path, now);
4931
4932        self.timers.set(
4933            Timer::PerPath(path_id, PathTimer::PathValidation),
4934            now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4935            self.qlog.with_time(now),
4936        );
4937    }
4938
4939    /// Handle a change in the local address, i.e. an active migration
4940    pub fn local_address_changed(&mut self) {
4941        // TODO(flub): if multipath is enabled this needs to create a new path entirely.
4942        self.update_rem_cid(PathId::ZERO);
4943        self.ping();
4944    }
4945
4946    /// Switch to a previously unused remote connection ID, if possible
4947    fn update_rem_cid(&mut self, path_id: PathId) {
4948        let Some((reset_token, retired)) =
4949            self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4950        else {
4951            return;
4952        };
4953
4954        // Retire the current remote CID and any CIDs we had to skip.
4955        self.spaces[SpaceId::Data]
4956            .pending
4957            .retire_cids
4958            .extend(retired.map(|seq| (path_id, seq)));
4959        let remote = self.path_data(path_id).network_path.remote;
4960        self.set_reset_token(path_id, remote, reset_token);
4961    }
4962
4963    /// Sends this reset token to the endpoint
4964    ///
4965    /// The endpoint needs to know the reset tokens issued by the peer, so that if the peer
4966    /// sends a reset token it knows to route it to this connection. See RFC 9000 section
4967    /// 10.3. Stateless Reset.
4968    ///
4969    /// Reset tokens are different for each path, the endpoint identifies paths by peer
4970    /// socket address however, not by path ID.
4971    fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4972        self.endpoint_events
4973            .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4974
4975        // During the handshake the server sends a reset token in the transport
4976        // parameters. When we are the client and we receive the reset token during the
4977        // handshake we want this to affect our peer transport parameters.
4978        // TODO(flub): Pretty sure this is pointless, the entire params is overwritten
4979        //    shortly after this was called.  And then the params don't have this anymore.
4980        if path_id == PathId::ZERO {
4981            self.peer_params.stateless_reset_token = Some(reset_token);
4982        }
4983    }
4984
4985    /// Issue an initial set of connection IDs to the peer upon connection
4986    fn issue_first_cids(&mut self, now: Instant) {
4987        if self
4988            .local_cid_state
4989            .get(&PathId::ZERO)
4990            .expect("PathId::ZERO exists when the connection is created")
4991            .cid_len()
4992            == 0
4993        {
4994            return;
4995        }
4996
4997        // Subtract 1 to account for the CID we supplied while handshaking
4998        let mut n = self.peer_params.issue_cids_limit() - 1;
4999        if let ConnectionSide::Server { server_config } = &self.side {
5000            if server_config.has_preferred_address() {
5001                // We also sent a CID in the transport parameters
5002                n -= 1;
5003            }
5004        }
5005        self.endpoint_events
5006            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5007    }
5008
5009    /// Issues an initial set of CIDs for paths that have not yet had any CIDs issued
5010    ///
5011    /// Later CIDs are issued when CIDs expire or are retired by the peer.
5012    fn issue_first_path_cids(&mut self, now: Instant) {
5013        if let Some(max_path_id) = self.max_path_id() {
5014            let mut path_id = self.max_path_id_with_cids.next();
5015            while path_id <= max_path_id {
5016                self.endpoint_events
5017                    .push_back(EndpointEventInner::NeedIdentifiers(
5018                        path_id,
5019                        now,
5020                        self.peer_params.issue_cids_limit(),
5021                    ));
5022                path_id = path_id.next();
5023            }
5024            self.max_path_id_with_cids = max_path_id;
5025        }
5026    }
5027
5028    /// Populates a packet with frames
5029    ///
5030    /// This tries to fit as many frames as possible into the packet.
5031    ///
5032    /// *path_exclusive_only* means to only build frames which can only be sent on this
5033    /// *path.  This is used in multipath for backup paths while there is still an active
5034    /// *path.
5035    fn populate_packet<'a, 'b>(
5036        &mut self,
5037        now: Instant,
5038        space_id: SpaceId,
5039        path_id: PathId,
5040        path_exclusive_only: bool,
5041        builder: &mut PacketBuilder,
5042    ) {
5043        let pn = builder.exact_number;
5044        let is_multipath_negotiated = self.is_multipath_negotiated();
5045        let stats = &mut self.stats.frame_tx;
5046        let space = &mut self.spaces[space_id];
5047        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5048        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5049        space
5050            .for_path(path_id)
5051            .pending_acks
5052            .maybe_ack_non_eliciting();
5053
5054        // HANDSHAKE_DONE
5055        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5056            trace!("HANDSHAKE_DONE");
5057            builder.encode(frame::HandshakeDone, stats);
5058        }
5059
5060        // REACH_OUT
5061        // TODO(@divma): path explusive considerations
5062        if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5063            while let Some(local_addr) = addresses.pop() {
5064                let reach_out = frame::ReachOut::new(*round, local_addr);
5065                if builder.frame_space_remaining() > reach_out.size() {
5066                    trace!(%round, ?local_addr, "REACH_OUT");
5067                    builder.encode(reach_out, stats);
5068                } else {
5069                    addresses.push(local_addr);
5070                    break;
5071                }
5072            }
5073            if addresses.is_empty() {
5074                space.pending.reach_out = None;
5075            }
5076        }
5077
5078        // OBSERVED_ADDR
5079        if !path_exclusive_only
5080            && space_id == SpaceId::Data
5081            && self
5082                .config
5083                .address_discovery_role
5084                .should_report(&self.peer_params.address_discovery_role)
5085            && (!path.observed_addr_sent || space.pending.observed_addr)
5086        {
5087            let frame =
5088                frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
5089            if builder.frame_space_remaining() > frame.size() {
5090                trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5091                builder.encode(frame, stats);
5092
5093                self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5094                path.observed_addr_sent = true;
5095
5096                space.pending.observed_addr = false;
5097            }
5098        }
5099
5100        // PING
5101        if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5102            trace!("PING");
5103            builder.encode(frame::Ping, stats);
5104        }
5105
5106        // IMMEDIATE_ACK
5107        if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5108            debug_assert_eq!(
5109                space_id,
5110                SpaceId::Data,
5111                "immediate acks must be sent in the data space"
5112            );
5113            trace!("IMMEDIATE_ACK");
5114            builder.encode(frame::ImmediateAck, stats);
5115        }
5116
5117        // ACK
5118        // TODO(flub): Should this send acks for this path anyway?
5119
5120        if !path_exclusive_only {
5121            for path_id in space
5122                .number_spaces
5123                .iter_mut()
5124                .filter(|(_, pns)| pns.pending_acks.can_send())
5125                .map(|(&path_id, _)| path_id)
5126                .collect::<Vec<_>>()
5127            {
5128                Self::populate_acks(
5129                    now,
5130                    self.receiving_ecn,
5131                    path_id,
5132                    space_id,
5133                    space,
5134                    is_multipath_negotiated,
5135                    builder,
5136                    stats,
5137                );
5138            }
5139        }
5140
5141        // ACK_FREQUENCY
5142        if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5143            let sequence_number = self.ack_frequency.next_sequence_number();
5144
5145            // Safe to unwrap because this is always provided when ACK frequency is enabled
5146            let config = self.config.ack_frequency_config.as_ref().unwrap();
5147
5148            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
5149            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5150                path.rtt.get(),
5151                config,
5152                &self.peer_params,
5153            );
5154
5155            trace!(?max_ack_delay, "ACK_FREQUENCY");
5156
5157            let frame = frame::AckFrequency {
5158                sequence: sequence_number,
5159                ack_eliciting_threshold: config.ack_eliciting_threshold,
5160                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5161                reordering_threshold: config.reordering_threshold,
5162            };
5163            builder.encode(frame, stats);
5164
5165            self.ack_frequency
5166                .ack_frequency_sent(path_id, pn, max_ack_delay);
5167        }
5168
5169        // PATH_CHALLENGE
5170        if builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
5171            && space_id == SpaceId::Data
5172            && path.send_new_challenge
5173            && !self.state.is_closed()
5174        // we don't want to send new challenges if we are already closing
5175        {
5176            path.send_new_challenge = false;
5177
5178            // Generate a new challenge every time we send a new PATH_CHALLENGE
5179            let token = self.rng.random();
5180            let info = paths::SentChallengeInfo {
5181                sent_instant: now,
5182                network_path: path.network_path,
5183            };
5184            path.challenges_sent.insert(token, info);
5185            let challenge = frame::PathChallenge(token);
5186            trace!(frame = %challenge);
5187            builder.encode(challenge, stats);
5188            builder.require_padding();
5189            let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5190            self.timers.set(
5191                Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5192                now + pto,
5193                self.qlog.with_time(now),
5194            );
5195
5196            if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5197                // queue informing the path status along with the challenge
5198                space.pending.path_status.insert(path_id);
5199            }
5200
5201            // Always include an OBSERVED_ADDR frame with a PATH_CHALLENGE, regardless
5202            // of whether one has already been sent on this path.
5203            if space_id == SpaceId::Data
5204                && self
5205                    .config
5206                    .address_discovery_role
5207                    .should_report(&self.peer_params.address_discovery_role)
5208            {
5209                let frame = frame::ObservedAddr::new(
5210                    path.network_path.remote,
5211                    self.next_observed_addr_seq_no,
5212                );
5213                if builder.frame_space_remaining() > frame.size() {
5214                    builder.encode(frame, stats);
5215
5216                    self.next_observed_addr_seq_no =
5217                        self.next_observed_addr_seq_no.saturating_add(1u8);
5218                    path.observed_addr_sent = true;
5219
5220                    space.pending.observed_addr = false;
5221                }
5222            }
5223        }
5224
5225        // PATH_RESPONSE
5226        if builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
5227            && space_id == SpaceId::Data
5228        {
5229            if let Some(token) = path.path_responses.pop_on_path(path.network_path) {
5230                let response = frame::PathResponse(token);
5231                trace!(frame = %response);
5232                builder.encode(response, stats);
5233                builder.require_padding();
5234
5235                // NOTE: this is technically not required but might be useful to ride the
5236                // request/response nature of path challenges to refresh an observation
5237                // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
5238                if space_id == SpaceId::Data
5239                    && self
5240                        .config
5241                        .address_discovery_role
5242                        .should_report(&self.peer_params.address_discovery_role)
5243                {
5244                    let frame = frame::ObservedAddr::new(
5245                        path.network_path.remote,
5246                        self.next_observed_addr_seq_no,
5247                    );
5248                    if builder.frame_space_remaining() > frame.size() {
5249                        builder.encode(frame, stats);
5250
5251                        self.next_observed_addr_seq_no =
5252                            self.next_observed_addr_seq_no.saturating_add(1u8);
5253                        path.observed_addr_sent = true;
5254
5255                        space.pending.observed_addr = false;
5256                    }
5257                }
5258            }
5259        }
5260
5261        // CRYPTO
5262        while !path_exclusive_only
5263            && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
5264            && !is_0rtt
5265        {
5266            let mut frame = match space.pending.crypto.pop_front() {
5267                Some(x) => x,
5268                None => break,
5269            };
5270
5271            // Calculate the maximum amount of crypto data we can store in the buffer.
5272            // Since the offset is known, we can reserve the exact size required to encode it.
5273            // For length we reserve 2bytes which allows to encode up to 2^14,
5274            // which is more than what fits into normally sized QUIC frames.
5275            let max_crypto_data_size = builder.frame_space_remaining()
5276                - 1 // Frame Type
5277                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5278                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
5279
5280            let len = frame
5281                .data
5282                .len()
5283                .min(2usize.pow(14) - 1)
5284                .min(max_crypto_data_size);
5285
5286            let data = frame.data.split_to(len);
5287            let offset = frame.offset;
5288            let truncated = frame::Crypto { offset, data };
5289            trace!(off = offset, len = truncated.data.len(), "CRYPTO");
5290            builder.encode(truncated, stats);
5291
5292            if !frame.data.is_empty() {
5293                frame.offset += len as u64;
5294                space.pending.crypto.push_front(frame);
5295            }
5296        }
5297
5298        // TODO(flub): maybe this is much higher priority?
5299        // PATH_ABANDON
5300        while !path_exclusive_only
5301            && space_id == SpaceId::Data
5302            && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5303        {
5304            let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5305                break;
5306            };
5307            let frame = frame::PathAbandon {
5308                path_id,
5309                error_code,
5310            };
5311            builder.encode(frame, stats);
5312            trace!(%path_id, "PATH_ABANDON");
5313        }
5314
5315        // PATH_STATUS_AVAILABLE & PATH_STATUS_BACKUP
5316        while !path_exclusive_only
5317            && space_id == SpaceId::Data
5318            && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
5319        {
5320            let Some(path_id) = space.pending.path_status.pop_first() else {
5321                break;
5322            };
5323            let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5324                trace!(%path_id, "discarding queued path status for unknown path");
5325                continue;
5326            };
5327
5328            let seq = path.status.seq();
5329            match path.local_status() {
5330                PathStatus::Available => {
5331                    let frame = frame::PathStatusAvailable {
5332                        path_id,
5333                        status_seq_no: seq,
5334                    };
5335                    builder.encode(frame, stats);
5336                    trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5337                }
5338                PathStatus::Backup => {
5339                    let frame = frame::PathStatusBackup {
5340                        path_id,
5341                        status_seq_no: seq,
5342                    };
5343                    builder.encode(frame, stats);
5344                    trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5345                }
5346            }
5347        }
5348
5349        // MAX_PATH_ID
5350        if space_id == SpaceId::Data
5351            && space.pending.max_path_id
5352            && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
5353        {
5354            let frame = frame::MaxPathId(self.local_max_path_id);
5355            builder.encode(frame, stats);
5356            space.pending.max_path_id = false;
5357            trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5358        }
5359
5360        // PATHS_BLOCKED
5361        if space_id == SpaceId::Data
5362            && space.pending.paths_blocked
5363            && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5364        {
5365            let frame = frame::PathsBlocked(self.remote_max_path_id);
5366            builder.encode(frame, stats);
5367            space.pending.paths_blocked = false;
5368            trace!(max_path_id = %self.remote_max_path_id, "PATHS_BLOCKED");
5369        }
5370
5371        // PATH_CIDS_BLOCKED
5372        while space_id == SpaceId::Data
5373            && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5374        {
5375            let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5376                break;
5377            };
5378            let next_seq = match self.rem_cids.get(&path_id) {
5379                Some(cid_queue) => cid_queue.active_seq() + 1,
5380                None => 0,
5381            };
5382            let frame = frame::PathCidsBlocked {
5383                path_id,
5384                next_seq: VarInt(next_seq),
5385            };
5386            builder.encode(frame, stats);
5387            trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5388        }
5389
5390        // RESET_STREAM, STOP_SENDING, MAX_DATA, MAX_STREAM_DATA, MAX_STREAMS
5391        if space_id == SpaceId::Data {
5392            self.streams
5393                .write_control_frames(builder, &mut space.pending, stats);
5394        }
5395
5396        // NEW_CONNECTION_ID
5397        let cid_len = self
5398            .local_cid_state
5399            .values()
5400            .map(|cid_state| cid_state.cid_len())
5401            .max()
5402            .expect("some local CID state must exist");
5403        let new_cid_size_bound =
5404            frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5405        while !path_exclusive_only && builder.frame_space_remaining() > new_cid_size_bound {
5406            let issued = match space.pending.new_cids.pop() {
5407                Some(x) => x,
5408                None => break,
5409            };
5410            let retire_prior_to = self
5411                .local_cid_state
5412                .get(&issued.path_id)
5413                .map(|cid_state| cid_state.retire_prior_to())
5414                .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5415
5416            let cid_path_id = match is_multipath_negotiated {
5417                true => {
5418                    trace!(
5419                        path_id = ?issued.path_id,
5420                        sequence = issued.sequence,
5421                        id = %issued.id,
5422                        "PATH_NEW_CONNECTION_ID",
5423                    );
5424                    Some(issued.path_id)
5425                }
5426                false => {
5427                    trace!(
5428                        sequence = issued.sequence,
5429                        id = %issued.id,
5430                        "NEW_CONNECTION_ID"
5431                    );
5432                    debug_assert_eq!(issued.path_id, PathId::ZERO);
5433                    None
5434                }
5435            };
5436            let frame = frame::NewConnectionId {
5437                path_id: cid_path_id,
5438                sequence: issued.sequence,
5439                retire_prior_to,
5440                id: issued.id,
5441                reset_token: issued.reset_token,
5442            };
5443            builder.encode(frame, stats);
5444        }
5445
5446        // RETIRE_CONNECTION_ID
5447        let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5448        while !path_exclusive_only && builder.frame_space_remaining() > retire_cid_bound {
5449            let (path_id, sequence) = match space.pending.retire_cids.pop() {
5450                Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5451                    trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5452                    (None, seq)
5453                }
5454                Some((path_id, seq)) => {
5455                    trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5456                    (Some(path_id), seq)
5457                }
5458                None => break,
5459            };
5460            let frame = frame::RetireConnectionId { path_id, sequence };
5461            builder.encode(frame, stats);
5462        }
5463
5464        // DATAGRAM
5465        let mut sent_datagrams = false;
5466        while !path_exclusive_only
5467            && builder.frame_space_remaining() > Datagram::SIZE_BOUND
5468            && space_id == SpaceId::Data
5469        {
5470            match self.datagrams.write(builder, stats) {
5471                true => {
5472                    sent_datagrams = true;
5473                }
5474                false => break,
5475            }
5476        }
5477        if self.datagrams.send_blocked && sent_datagrams {
5478            self.events.push_back(Event::DatagramsUnblocked);
5479            self.datagrams.send_blocked = false;
5480        }
5481
5482        let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5483
5484        // NEW_TOKEN
5485        while let Some(network_path) = space.pending.new_tokens.pop() {
5486            if path_exclusive_only {
5487                break;
5488            }
5489            debug_assert_eq!(space_id, SpaceId::Data);
5490            let ConnectionSide::Server { server_config } = &self.side else {
5491                panic!("NEW_TOKEN frames should not be enqueued by clients");
5492            };
5493
5494            if !network_path.is_probably_same_path(&path.network_path) {
5495                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
5496                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
5497                // frames upon an path change. Instead, when the new path becomes validated,
5498                // NEW_TOKEN frames may be enqueued for the new path instead.
5499                continue;
5500            }
5501
5502            let token = Token::new(
5503                TokenPayload::Validation {
5504                    ip: network_path.remote.ip(),
5505                    issued: server_config.time_source.now(),
5506                },
5507                &mut self.rng,
5508            );
5509            let new_token = NewToken {
5510                token: token.encode(&*server_config.token_key).into(),
5511            };
5512
5513            if builder.frame_space_remaining() < new_token.size() {
5514                space.pending.new_tokens.push(network_path);
5515                break;
5516            }
5517
5518            trace!("NEW_TOKEN");
5519            builder.encode(new_token, stats);
5520            builder.retransmits_mut().new_tokens.push(network_path);
5521        }
5522
5523        // STREAM
5524        if !path_exclusive_only && space_id == SpaceId::Data {
5525            self.streams
5526                .write_stream_frames(builder, self.config.send_fairness, stats);
5527        }
5528
5529        // ADD_ADDRESS
5530        // TODO(@divma): check if we need to do path exclusive filters
5531        while space_id == SpaceId::Data
5532            && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
5533        {
5534            if let Some(added_address) = space.pending.add_address.pop_last() {
5535                trace!(
5536                    seq = %added_address.seq_no,
5537                    ip = ?added_address.ip,
5538                    port = added_address.port,
5539                    "ADD_ADDRESS",
5540                );
5541                builder.encode(added_address, stats);
5542            } else {
5543                break;
5544            }
5545        }
5546
5547        // REMOVE_ADDRESS
5548        while space_id == SpaceId::Data
5549            && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
5550        {
5551            if let Some(removed_address) = space.pending.remove_address.pop_last() {
5552                trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5553                builder.encode(removed_address, stats);
5554            } else {
5555                break;
5556            }
5557        }
5558    }
5559
5560    /// Write pending ACKs into a buffer
5561    fn populate_acks<'a, 'b>(
5562        now: Instant,
5563        receiving_ecn: bool,
5564        path_id: PathId,
5565        space_id: SpaceId,
5566        space: &mut PacketSpace,
5567        is_multipath_negotiated: bool,
5568        builder: &mut PacketBuilder<'a, 'b>,
5569        stats: &mut FrameStats,
5570    ) {
5571        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
5572        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5573
5574        debug_assert!(
5575            is_multipath_negotiated || path_id == PathId::ZERO,
5576            "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5577        );
5578        if is_multipath_negotiated {
5579            debug_assert!(
5580                space_id == SpaceId::Data || path_id == PathId::ZERO,
5581                "path acks must be sent in 1RTT space (have {space_id:?})"
5582            );
5583        }
5584
5585        let pns = space.for_path(path_id);
5586        let ranges = pns.pending_acks.ranges();
5587        debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5588        let ecn = if receiving_ecn {
5589            Some(&pns.ecn_counters)
5590        } else {
5591            None
5592        };
5593
5594        let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5595        // TODO: This should come from `TransportConfig` if that gets configurable.
5596        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5597        let delay = delay_micros >> ack_delay_exp.into_inner();
5598
5599        if is_multipath_negotiated && space_id == SpaceId::Data {
5600            if !ranges.is_empty() {
5601                trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5602                let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
5603                builder.encode(frame, stats);
5604            }
5605        } else {
5606            trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5607            builder.encode(frame::Ack::encoder(delay, ranges, ecn), stats);
5608        }
5609    }
5610
5611    fn close_common(&mut self) {
5612        trace!("connection closed");
5613        self.timers.reset();
5614    }
5615
5616    fn set_close_timer(&mut self, now: Instant) {
5617        // QUIC-MULTIPATH § 2.6 Connection Closure: draining for 3*PTO with PTO the max of
5618        // the PTO for all paths.
5619        let pto_max = self.pto_max_path(self.highest_space, true);
5620        self.timers.set(
5621            Timer::Conn(ConnTimer::Close),
5622            now + 3 * pto_max,
5623            self.qlog.with_time(now),
5624        );
5625    }
5626
5627    /// Handle transport parameters received from the peer
5628    ///
5629    /// *rem_cid* and *loc_cid* are the source and destination CIDs respectively of the
5630    /// *packet into which the transport parameters arrived.
5631    fn handle_peer_params(
5632        &mut self,
5633        params: TransportParameters,
5634        loc_cid: ConnectionId,
5635        rem_cid: ConnectionId,
5636        now: Instant,
5637    ) -> Result<(), TransportError> {
5638        if Some(self.orig_rem_cid) != params.initial_src_cid
5639            || (self.side.is_client()
5640                && (Some(self.initial_dst_cid) != params.original_dst_cid
5641                    || self.retry_src_cid != params.retry_src_cid))
5642        {
5643            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5644                "CID authentication failure",
5645            ));
5646        }
5647        if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5648            return Err(TransportError::PROTOCOL_VIOLATION(
5649                "multipath must not use zero-length CIDs",
5650            ));
5651        }
5652
5653        self.set_peer_params(params);
5654        self.qlog.emit_peer_transport_params_received(self, now);
5655
5656        Ok(())
5657    }
5658
5659    fn set_peer_params(&mut self, params: TransportParameters) {
5660        self.streams.set_params(&params);
5661        self.idle_timeout =
5662            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5663        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5664
5665        if let Some(ref info) = params.preferred_address {
5666            // During the handshake PathId::ZERO exists.
5667            self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5668                path_id: None,
5669                sequence: 1,
5670                id: info.connection_id,
5671                reset_token: info.stateless_reset_token,
5672                retire_prior_to: 0,
5673            })
5674            .expect(
5675                "preferred address CID is the first received, and hence is guaranteed to be legal",
5676            );
5677            let remote = self.path_data(PathId::ZERO).network_path.remote;
5678            self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5679        }
5680        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
5681
5682        let mut multipath_enabled = None;
5683        if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5684            self.config.get_initial_max_path_id(),
5685            params.initial_max_path_id,
5686        ) {
5687            // multipath is enabled, register the local and remote maximums
5688            self.local_max_path_id = local_max_path_id;
5689            self.remote_max_path_id = remote_max_path_id;
5690            let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5691            debug!(%initial_max_path_id, "multipath negotiated");
5692            multipath_enabled = Some(initial_max_path_id);
5693        }
5694
5695        if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5696            self.config
5697                .max_remote_nat_traversal_addresses
5698                .zip(params.max_remote_nat_traversal_addresses)
5699        {
5700            if let Some(max_initial_paths) =
5701                multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5702            {
5703                let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5704                let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5705                self.iroh_hp =
5706                    iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5707                debug!(
5708                    %max_remote_addresses, %max_local_addresses,
5709                    "iroh hole punching negotiated"
5710                );
5711
5712                match self.side() {
5713                    Side::Client => {
5714                        if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5715                            // in this case the client might try to open `max_remote_addresses` new
5716                            // paths, but the current multipath configuration will not allow it
5717                            warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5718                        } else if max_local_addresses as u64
5719                            > params.active_connection_id_limit.into_inner()
5720                        {
5721                            // the server allows us to send at most `params.active_connection_id_limit`
5722                            // but they might need at least `max_local_addresses` to effectively send
5723                            // `PATH_CHALLENGE` frames to each advertised local address
5724                            warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5725                        }
5726                    }
5727                    Side::Server => {
5728                        if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5729                            warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5730                        }
5731                    }
5732                }
5733            } else {
5734                debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5735            }
5736        }
5737
5738        self.peer_params = params;
5739        let peer_max_udp_payload_size =
5740            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5741        self.path_data_mut(PathId::ZERO)
5742            .mtud
5743            .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5744    }
5745
5746    /// Decrypts a packet, returning the packet number on success
5747    fn decrypt_packet(
5748        &mut self,
5749        now: Instant,
5750        path_id: PathId,
5751        packet: &mut Packet,
5752    ) -> Result<Option<u64>, Option<TransportError>> {
5753        let result = packet_crypto::decrypt_packet_body(
5754            packet,
5755            path_id,
5756            &self.spaces,
5757            self.zero_rtt_crypto.as_ref(),
5758            self.key_phase,
5759            self.prev_crypto.as_ref(),
5760            self.next_crypto.as_ref(),
5761        )?;
5762
5763        let result = match result {
5764            Some(r) => r,
5765            None => return Ok(None),
5766        };
5767
5768        if result.outgoing_key_update_acked {
5769            if let Some(prev) = self.prev_crypto.as_mut() {
5770                prev.end_packet = Some((result.number, now));
5771                self.set_key_discard_timer(now, packet.header.space());
5772            }
5773        }
5774
5775        if result.incoming_key_update {
5776            trace!("key update authenticated");
5777            self.update_keys(Some((result.number, now)), true);
5778            self.set_key_discard_timer(now, packet.header.space());
5779        }
5780
5781        Ok(Some(result.number))
5782    }
5783
5784    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5785        trace!("executing key update");
5786        // Generate keys for the key phase after the one we're switching to, store them in
5787        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
5788        // `prev_crypto`.
5789        let new = self
5790            .crypto
5791            .next_1rtt_keys()
5792            .expect("only called for `Data` packets");
5793        self.key_phase_size = new
5794            .local
5795            .confidentiality_limit()
5796            .saturating_sub(KEY_UPDATE_MARGIN);
5797        let old = mem::replace(
5798            &mut self.spaces[SpaceId::Data]
5799                .crypto
5800                .as_mut()
5801                .unwrap() // safe because update_keys() can only be triggered by short packets
5802                .packet,
5803            mem::replace(self.next_crypto.as_mut().unwrap(), new),
5804        );
5805        self.spaces[SpaceId::Data]
5806            .iter_paths_mut()
5807            .for_each(|s| s.sent_with_keys = 0);
5808        self.prev_crypto = Some(PrevCrypto {
5809            crypto: old,
5810            end_packet,
5811            update_unacked: remote,
5812        });
5813        self.key_phase = !self.key_phase;
5814    }
5815
5816    fn peer_supports_ack_frequency(&self) -> bool {
5817        self.peer_params.min_ack_delay.is_some()
5818    }
5819
5820    /// Send an IMMEDIATE_ACK frame to the remote endpoint
5821    ///
5822    /// According to the spec, this will result in an error if the remote endpoint does not support
5823    /// the Acknowledgement Frequency extension
5824    pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5825        debug_assert_eq!(
5826            self.highest_space,
5827            SpaceId::Data,
5828            "immediate ack must be written in the data space"
5829        );
5830        self.spaces[self.highest_space]
5831            .for_path(path_id)
5832            .immediate_ack_pending = true;
5833    }
5834
5835    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
5836    #[cfg(test)]
5837    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5838        let (path_id, first_decode, remaining) = match &event.0 {
5839            ConnectionEventInner::Datagram(DatagramConnectionEvent {
5840                path_id,
5841                first_decode,
5842                remaining,
5843                ..
5844            }) => (path_id, first_decode, remaining),
5845            _ => return None,
5846        };
5847
5848        if remaining.is_some() {
5849            panic!("Packets should never be coalesced in tests");
5850        }
5851
5852        let decrypted_header = packet_crypto::unprotect_header(
5853            first_decode.clone(),
5854            &self.spaces,
5855            self.zero_rtt_crypto.as_ref(),
5856            self.peer_params.stateless_reset_token,
5857        )?;
5858
5859        let mut packet = decrypted_header.packet?;
5860        packet_crypto::decrypt_packet_body(
5861            &mut packet,
5862            *path_id,
5863            &self.spaces,
5864            self.zero_rtt_crypto.as_ref(),
5865            self.key_phase,
5866            self.prev_crypto.as_ref(),
5867            self.next_crypto.as_ref(),
5868        )
5869        .ok()?;
5870
5871        Some(packet.payload.to_vec())
5872    }
5873
5874    /// The number of bytes of packets containing retransmittable frames that have not been
5875    /// acknowledged or declared lost.
5876    #[cfg(test)]
5877    pub(crate) fn bytes_in_flight(&self) -> u64 {
5878        // TODO(@divma): consider including for multipath?
5879        self.path_data(PathId::ZERO).in_flight.bytes
5880    }
5881
5882    /// Number of bytes worth of non-ack-only packets that may be sent
5883    #[cfg(test)]
5884    pub(crate) fn congestion_window(&self) -> u64 {
5885        let path = self.path_data(PathId::ZERO);
5886        path.congestion
5887            .window()
5888            .saturating_sub(path.in_flight.bytes)
5889    }
5890
5891    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
5892    #[cfg(test)]
5893    pub(crate) fn is_idle(&self) -> bool {
5894        let current_timers = self.timers.values();
5895        current_timers
5896            .into_iter()
5897            .filter(|(timer, _)| {
5898                !matches!(
5899                    timer,
5900                    Timer::Conn(ConnTimer::KeepAlive)
5901                        | Timer::PerPath(_, PathTimer::PathKeepAlive)
5902                        | Timer::Conn(ConnTimer::PushNewCid)
5903                        | Timer::Conn(ConnTimer::KeyDiscard)
5904                )
5905            })
5906            .min_by_key(|(_, time)| *time)
5907            .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
5908    }
5909
5910    /// Whether explicit congestion notification is in use on outgoing packets.
5911    #[cfg(test)]
5912    pub(crate) fn using_ecn(&self) -> bool {
5913        self.path_data(PathId::ZERO).sending_ecn
5914    }
5915
5916    /// The number of received bytes in the current path
5917    #[cfg(test)]
5918    pub(crate) fn total_recvd(&self) -> u64 {
5919        self.path_data(PathId::ZERO).total_recvd
5920    }
5921
5922    #[cfg(test)]
5923    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5924        self.local_cid_state
5925            .get(&PathId::ZERO)
5926            .unwrap()
5927            .active_seq()
5928    }
5929
5930    #[cfg(test)]
5931    #[track_caller]
5932    pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
5933        self.local_cid_state
5934            .get(&PathId(path_id))
5935            .unwrap()
5936            .active_seq()
5937    }
5938
5939    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
5940    /// with updated `retire_prior_to` field set to `v`
5941    #[cfg(test)]
5942    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5943        let n = self
5944            .local_cid_state
5945            .get_mut(&PathId::ZERO)
5946            .unwrap()
5947            .assign_retire_seq(v);
5948        self.endpoint_events
5949            .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5950    }
5951
5952    /// Check the current active remote CID sequence for `PathId::ZERO`
5953    #[cfg(test)]
5954    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5955        self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
5956    }
5957
5958    /// Returns the detected maximum udp payload size for the current path
5959    #[cfg(test)]
5960    pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
5961        self.path_data(path_id).current_mtu()
5962    }
5963
5964    /// Triggers path validation on all paths
5965    #[cfg(test)]
5966    pub(crate) fn trigger_path_validation(&mut self) {
5967        for path in self.paths.values_mut() {
5968            path.data.send_new_challenge = true;
5969        }
5970    }
5971
5972    /// Whether we have 1-RTT data to send
5973    ///
5974    /// This checks for frames that can only be sent in the data space (1-RTT):
5975    /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated.
5976    /// - Pending PATH_RESPONSE frames.
5977    /// - Pending data to send in STREAM frames.
5978    /// - Pending DATAGRAM frames to send.
5979    ///
5980    /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that
5981    /// may need to be sent.
5982    fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
5983        let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
5984            path.data.send_new_challenge
5985                || path
5986                    .prev
5987                    .as_ref()
5988                    .is_some_and(|(_, path)| path.send_new_challenge)
5989                || !path.data.path_responses.is_empty()
5990        });
5991        let other = self.streams.can_send_stream_data()
5992            || self
5993                .datagrams
5994                .outgoing
5995                .front()
5996                .is_some_and(|x| x.size(true) <= max_size);
5997        SendableFrames {
5998            acks: false,
5999            other,
6000            close: false,
6001            path_exclusive,
6002        }
6003    }
6004
6005    /// Terminate the connection instantly, without sending a close packet
6006    fn kill(&mut self, reason: ConnectionError) {
6007        self.close_common();
6008        self.state.move_to_drained(Some(reason));
6009        self.endpoint_events.push_back(EndpointEventInner::Drained);
6010    }
6011
6012    /// Storage size required for the largest packet that can be transmitted on all currently
6013    /// available paths
6014    ///
6015    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
6016    ///
6017    /// When multipath is enabled, this value is the minimum MTU across all available paths.
6018    pub fn current_mtu(&self) -> u16 {
6019        self.paths
6020            .iter()
6021            .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6022            .map(|(_path_id, path_state)| path_state.data.current_mtu())
6023            .min()
6024            .expect("There is always at least one available path")
6025    }
6026
6027    /// Size of non-frame data for a 1-RTT packet
6028    ///
6029    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
6030    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
6031    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
6032    /// latency and packet loss.
6033    fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6034        let pn_len = PacketNumber::new(
6035            pn,
6036            self.spaces[SpaceId::Data]
6037                .for_path(path)
6038                .largest_acked_packet
6039                .unwrap_or(0),
6040        )
6041        .len();
6042
6043        // 1 byte for flags
6044        1 + self
6045            .rem_cids
6046            .get(&path)
6047            .map(|cids| cids.active().len())
6048            .unwrap_or(20)      // Max CID len in QUIC v1
6049            + pn_len
6050            + self.tag_len_1rtt()
6051    }
6052
6053    fn predict_1rtt_overhead_no_pn(&self) -> usize {
6054        let pn_len = 4;
6055
6056        let cid_len = self
6057            .rem_cids
6058            .values()
6059            .map(|cids| cids.active().len())
6060            .max()
6061            .unwrap_or(20); // Max CID len in QUIC v1
6062
6063        // 1 byte for flags
6064        1 + cid_len + pn_len + self.tag_len_1rtt()
6065    }
6066
6067    fn tag_len_1rtt(&self) -> usize {
6068        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6069            Some(crypto) => Some(&*crypto.packet.local),
6070            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6071        };
6072        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
6073        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
6074        // but that would needlessly prevent sending datagrams during 0-RTT.
6075        key.map_or(16, |x| x.tag_len())
6076    }
6077
6078    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
6079    fn on_path_validated(&mut self, path_id: PathId) {
6080        self.path_data_mut(path_id).validated = true;
6081        let ConnectionSide::Server { server_config } = &self.side else {
6082            return;
6083        };
6084        let network_path = self.path_data(path_id).network_path;
6085        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6086        new_tokens.clear();
6087        for _ in 0..server_config.validation_token.sent {
6088            new_tokens.push(network_path);
6089        }
6090    }
6091
6092    /// Handle new path status information: PATH_STATUS_AVAILABLE, PATH_STATUS_BACKUP
6093    fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6094        if let Some(path) = self.paths.get_mut(&path_id) {
6095            path.data.status.remote_update(status, status_seq_no);
6096        } else {
6097            debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6098        }
6099        self.events.push_back(
6100            PathEvent::RemoteStatus {
6101                id: path_id,
6102                status,
6103            }
6104            .into(),
6105        );
6106    }
6107
6108    /// Returns the maximum [`PathId`] to be used for sending in this connection.
6109    ///
6110    /// This is calculated as minimum between the local and remote's maximums when multipath is
6111    /// enabled, or `None` when disabled.
6112    ///
6113    /// For data that's received, we should use [`Self::local_max_path_id`] instead.
6114    /// The reasoning is that the remote might already have updated to its own newer
6115    /// [`Self::max_path_id`] after sending out a `MAX_PATH_ID` frame, but it got re-ordered.
6116    fn max_path_id(&self) -> Option<PathId> {
6117        if self.is_multipath_negotiated() {
6118            Some(self.remote_max_path_id.min(self.local_max_path_id))
6119        } else {
6120            None
6121        }
6122    }
6123
6124    /// Add addresses the local endpoint considers are reachable for nat traversal
6125    pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6126        if let Some(added) = self.iroh_hp.add_local_address(address)? {
6127            self.spaces[SpaceId::Data].pending.add_address.insert(added);
6128        };
6129        Ok(())
6130    }
6131
6132    /// Removes an address the endpoing no longer considers reachable for nat traversal
6133    ///
6134    /// Addresses not present in the set will be silently ignored.
6135    pub fn remove_nat_traversal_address(
6136        &mut self,
6137        address: SocketAddr,
6138    ) -> Result<(), iroh_hp::Error> {
6139        if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6140            self.spaces[SpaceId::Data]
6141                .pending
6142                .remove_address
6143                .insert(removed);
6144        }
6145        Ok(())
6146    }
6147
6148    /// Get the current local nat traversal addresses
6149    pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6150        self.iroh_hp.get_local_nat_traversal_addresses()
6151    }
6152
6153    /// Get the currently advertised nat traversal addresses by the server
6154    pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6155        Ok(self
6156            .iroh_hp
6157            .client_side()?
6158            .get_remote_nat_traversal_addresses())
6159    }
6160
6161    /// Attempts to open a path for nat traversal.
6162    ///
6163    /// `ipv6` indicates if the path should be opened using an IPV6 remote. If the address is
6164    /// ignored, it will return `None`.
6165    ///
6166    /// On success returns the [`PathId`] and remote address of the path, as well as whether the path
6167    /// existed for the adjusted remote.
6168    fn open_nat_traversal_path(
6169        &mut self,
6170        now: Instant,
6171        (ip, port): (IpAddr, u16),
6172        ipv6: bool,
6173    ) -> Result<Option<(PathId, SocketAddr, bool)>, PathError> {
6174        // If this endpoint is an IPv6 endpoint we use IPv6 addresses for all remotes.
6175        let remote = match ip {
6176            IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6177            IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6178            IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6179            IpAddr::V6(_) => {
6180                trace!("not using IPv6 nat candidate for IPv4 socket");
6181                return Ok(None);
6182            }
6183        };
6184        // TODO(matheus23): Probe the correct 4-tuple, instead of only a remote address?
6185        // By specifying None, we do two things: 1. open_path_ensure won't generate two
6186        // paths to the same remote and 2. we let the OS choose which interface to use for
6187        // sending on that path.
6188        let network_path = FourTuple {
6189            remote,
6190            local_ip: None,
6191        };
6192        match self.open_path_ensure(network_path, PathStatus::Backup, now) {
6193            Ok((path_id, path_was_known)) => {
6194                if path_was_known {
6195                    trace!(%path_id, %remote, "nat traversal: path existed for remote");
6196                }
6197                Ok(Some((path_id, remote, path_was_known)))
6198            }
6199            Err(e) => {
6200                debug!(%remote, %e, "nat traversal: failed to probe remote");
6201                Err(e)
6202            }
6203        }
6204    }
6205
6206    /// Initiates a new nat traversal round
6207    ///
6208    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
6209    /// frames, and initiating probing of the known remote addresses. When a new round is
6210    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
6211    ///
6212    /// Returns the server addresses that are now being probed.
6213    /// If addresses fail due to spurious errors, these might succeed later and not be returned in
6214    /// this set.
6215    pub fn initiate_nat_traversal_round(
6216        &mut self,
6217        now: Instant,
6218    ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6219        if self.state.is_closed() {
6220            return Err(iroh_hp::Error::Closed);
6221        }
6222
6223        let client_state = self.iroh_hp.client_side_mut()?;
6224        let iroh_hp::NatTraversalRound {
6225            new_round,
6226            reach_out_at,
6227            addresses_to_probe,
6228            prev_round_path_ids,
6229        } = client_state.initiate_nat_traversal_round()?;
6230
6231        self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6232
6233        for path_id in prev_round_path_ids {
6234            // TODO(@divma): this sounds reasonable but we need if this actually works for the
6235            // purposes of the protocol
6236            let validated = self
6237                .path(path_id)
6238                .map(|path| path.validated)
6239                .unwrap_or(false);
6240
6241            if !validated {
6242                let _ = self.close_path(
6243                    now,
6244                    path_id,
6245                    TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6246                );
6247            }
6248        }
6249
6250        let mut err = None;
6251
6252        let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6253        let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6254        let ipv6 = self
6255            .paths
6256            .values()
6257            .any(|p| p.data.network_path.remote.is_ipv6());
6258
6259        for (id, address) in addresses_to_probe {
6260            match self.open_nat_traversal_path(now, address, ipv6) {
6261                Ok(None) => {}
6262                Ok(Some((path_id, remote, path_was_known))) => {
6263                    if !path_was_known {
6264                        path_ids.push(path_id);
6265                        probed_addresses.push(remote);
6266                    }
6267                }
6268                Err(e) => {
6269                    self.iroh_hp
6270                        .client_side_mut()
6271                        .expect("validated")
6272                        .report_in_continuation(id, e);
6273                    err.get_or_insert(e);
6274                }
6275            }
6276        }
6277
6278        if let Some(err) = err {
6279            // We failed to probe any addresses, bail out
6280            if probed_addresses.is_empty() {
6281                return Err(iroh_hp::Error::Multipath(err));
6282            }
6283        }
6284
6285        self.iroh_hp
6286            .client_side_mut()
6287            .expect("connection side validated")
6288            .set_round_path_ids(path_ids);
6289
6290        Ok(probed_addresses)
6291    }
6292
6293    /// Attempts to continue a nat traversal round by trying to open paths for pending client probes.
6294    ///
6295    /// If there was nothing to do, it returns `None`. Otherwise it returns whether the path was
6296    /// successfully open.
6297    fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6298        let client_state = self.iroh_hp.client_side_mut().ok()?;
6299        let (id, address) = client_state.continue_nat_traversal_round()?;
6300        let ipv6 = self
6301            .paths
6302            .values()
6303            .any(|p| p.data.network_path.remote.is_ipv6());
6304        let open_result = self.open_nat_traversal_path(now, address, ipv6);
6305        let client_state = self.iroh_hp.client_side_mut().expect("validated");
6306        match open_result {
6307            Ok(None) => Some(true),
6308            Ok(Some((path_id, _remote, path_was_known))) => {
6309                if !path_was_known {
6310                    client_state.add_round_path_id(path_id);
6311                }
6312                Some(true)
6313            }
6314            Err(e) => {
6315                client_state.report_in_continuation(id, e);
6316                Some(false)
6317            }
6318        }
6319    }
6320}
6321
6322impl fmt::Debug for Connection {
6323    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6324        f.debug_struct("Connection")
6325            .field("handshake_cid", &self.handshake_cid)
6326            .finish()
6327    }
6328}
6329
6330#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6331enum PathBlocked {
6332    No,
6333    AntiAmplification,
6334    Congestion,
6335    Pacing,
6336}
6337
6338/// Fields of `Connection` specific to it being client-side or server-side
6339enum ConnectionSide {
6340    Client {
6341        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
6342        token: Bytes,
6343        token_store: Arc<dyn TokenStore>,
6344        server_name: String,
6345    },
6346    Server {
6347        server_config: Arc<ServerConfig>,
6348    },
6349}
6350
6351impl ConnectionSide {
6352    fn remote_may_migrate(&self, state: &State) -> bool {
6353        match self {
6354            Self::Server { server_config } => server_config.migration,
6355            Self::Client { .. } => {
6356                if let Some(hs) = state.as_handshake() {
6357                    hs.allow_server_migration
6358                } else {
6359                    false
6360                }
6361            }
6362        }
6363    }
6364
6365    fn is_client(&self) -> bool {
6366        self.side().is_client()
6367    }
6368
6369    fn is_server(&self) -> bool {
6370        self.side().is_server()
6371    }
6372
6373    fn side(&self) -> Side {
6374        match *self {
6375            Self::Client { .. } => Side::Client,
6376            Self::Server { .. } => Side::Server,
6377        }
6378    }
6379}
6380
6381impl From<SideArgs> for ConnectionSide {
6382    fn from(side: SideArgs) -> Self {
6383        match side {
6384            SideArgs::Client {
6385                token_store,
6386                server_name,
6387            } => Self::Client {
6388                token: token_store.take(&server_name).unwrap_or_default(),
6389                token_store,
6390                server_name,
6391            },
6392            SideArgs::Server {
6393                server_config,
6394                pref_addr_cid: _,
6395                path_validated: _,
6396            } => Self::Server { server_config },
6397        }
6398    }
6399}
6400
6401/// Parameters to `Connection::new` specific to it being client-side or server-side
6402pub(crate) enum SideArgs {
6403    Client {
6404        token_store: Arc<dyn TokenStore>,
6405        server_name: String,
6406    },
6407    Server {
6408        server_config: Arc<ServerConfig>,
6409        pref_addr_cid: Option<ConnectionId>,
6410        path_validated: bool,
6411    },
6412}
6413
6414impl SideArgs {
6415    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6416        match *self {
6417            Self::Client { .. } => None,
6418            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6419        }
6420    }
6421
6422    pub(crate) fn path_validated(&self) -> bool {
6423        match *self {
6424            Self::Client { .. } => true,
6425            Self::Server { path_validated, .. } => path_validated,
6426        }
6427    }
6428
6429    pub(crate) fn side(&self) -> Side {
6430        match *self {
6431            Self::Client { .. } => Side::Client,
6432            Self::Server { .. } => Side::Server,
6433        }
6434    }
6435}
6436
6437/// Reasons why a connection might be lost
6438#[derive(Debug, Error, Clone, PartialEq, Eq)]
6439pub enum ConnectionError {
6440    /// The peer doesn't implement any supported version
6441    #[error("peer doesn't implement any supported version")]
6442    VersionMismatch,
6443    /// The peer violated the QUIC specification as understood by this implementation
6444    #[error(transparent)]
6445    TransportError(#[from] TransportError),
6446    /// The peer's QUIC stack aborted the connection automatically
6447    #[error("aborted by peer: {0}")]
6448    ConnectionClosed(frame::ConnectionClose),
6449    /// The peer closed the connection
6450    #[error("closed by peer: {0}")]
6451    ApplicationClosed(frame::ApplicationClose),
6452    /// The peer is unable to continue processing this connection, usually due to having restarted
6453    #[error("reset by peer")]
6454    Reset,
6455    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
6456    ///
6457    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
6458    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
6459    /// and [`TransportConfig::keep_alive_interval()`].
6460    #[error("timed out")]
6461    TimedOut,
6462    /// The local application closed the connection
6463    #[error("closed")]
6464    LocallyClosed,
6465    /// The connection could not be created because not enough of the CID space is available
6466    ///
6467    /// Try using longer connection IDs.
6468    #[error("CIDs exhausted")]
6469    CidsExhausted,
6470}
6471
6472impl From<Close> for ConnectionError {
6473    fn from(x: Close) -> Self {
6474        match x {
6475            Close::Connection(reason) => Self::ConnectionClosed(reason),
6476            Close::Application(reason) => Self::ApplicationClosed(reason),
6477        }
6478    }
6479}
6480
6481// For compatibility with API consumers
6482impl From<ConnectionError> for io::Error {
6483    fn from(x: ConnectionError) -> Self {
6484        use ConnectionError::*;
6485        let kind = match x {
6486            TimedOut => io::ErrorKind::TimedOut,
6487            Reset => io::ErrorKind::ConnectionReset,
6488            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6489            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6490                io::ErrorKind::Other
6491            }
6492        };
6493        Self::new(kind, x)
6494    }
6495}
6496
6497/// Errors that might trigger a path being closed
6498// TODO(@divma): maybe needs to be reworked based on what we want to do with the public API
6499#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6500pub enum PathError {
6501    /// The extension was not negotiated with the peer
6502    #[error("multipath extension not negotiated")]
6503    MultipathNotNegotiated,
6504    /// Paths can only be opened client-side
6505    #[error("the server side may not open a path")]
6506    ServerSideNotAllowed,
6507    /// Current limits do not allow us to open more paths
6508    #[error("maximum number of concurrent paths reached")]
6509    MaxPathIdReached,
6510    /// No remote CIDs available to open a new path
6511    #[error("remoted CIDs exhausted")]
6512    RemoteCidsExhausted,
6513    /// Path could not be validated and will be abandoned
6514    #[error("path validation failed")]
6515    ValidationFailed,
6516    /// The remote address for the path is not supported by the endpoint
6517    #[error("invalid remote address")]
6518    InvalidRemoteAddress(SocketAddr),
6519}
6520
6521/// Errors triggered when abandoning a path
6522#[derive(Debug, Error, Clone, Eq, PartialEq)]
6523pub enum ClosePathError {
6524    /// The path is already closed or was never opened
6525    #[error("closed path")]
6526    ClosedPath,
6527    /// This is the last path, which can not be abandoned
6528    #[error("last open path")]
6529    LastOpenPath,
6530}
6531
6532#[derive(Debug, Error, Clone, Copy)]
6533#[error("Multipath extension not negotiated")]
6534pub struct MultipathNotNegotiated {
6535    _private: (),
6536}
6537
6538/// Events of interest to the application
6539#[derive(Debug)]
6540pub enum Event {
6541    /// The connection's handshake data is ready
6542    HandshakeDataReady,
6543    /// The connection was successfully established
6544    Connected,
6545    /// The TLS handshake was confirmed
6546    HandshakeConfirmed,
6547    /// The connection was lost
6548    ///
6549    /// Emitted if the peer closes the connection or an error is encountered.
6550    ConnectionLost {
6551        /// Reason that the connection was closed
6552        reason: ConnectionError,
6553    },
6554    /// Stream events
6555    Stream(StreamEvent),
6556    /// One or more application datagrams have been received
6557    DatagramReceived,
6558    /// One or more application datagrams have been sent after blocking
6559    DatagramsUnblocked,
6560    /// (Multi)Path events
6561    Path(PathEvent),
6562    /// Iroh's nat traversal events
6563    NatTraversal(iroh_hp::Event),
6564}
6565
6566impl From<PathEvent> for Event {
6567    fn from(source: PathEvent) -> Self {
6568        Self::Path(source)
6569    }
6570}
6571
6572fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6573    Duration::from_micros(params.max_ack_delay.0 * 1000)
6574}
6575
6576// Prevents overflow and improves behavior in extreme circumstances
6577const MAX_BACKOFF_EXPONENT: u32 = 16;
6578
6579/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
6580///
6581/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
6582/// plus some space for frames. We only care about handshake headers because short header packets
6583/// necessarily have smaller headers, and initial packets are only ever the first packet in a
6584/// datagram (because we coalesce in ascending packet space order and the only reason to split a
6585/// packet is when packet space changes).
6586const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6587
6588/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
6589///
6590/// Excludes packet-type-specific fields such as packet number or Initial token
6591// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
6592// scid len + scid + length + pn
6593const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6594    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6595
6596/// Perform key updates this many packets before the AEAD confidentiality limit.
6597///
6598/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
6599const KEY_UPDATE_MARGIN: u64 = 10_000;
6600
6601#[derive(Default)]
6602struct SentFrames {
6603    retransmits: ThinRetransmits,
6604    /// The packet number of the largest acknowledged packet for each path
6605    largest_acked: FxHashMap<PathId, u64>,
6606    stream_frames: StreamMetaVec,
6607    /// Whether the packet contains non-retransmittable frames (like datagrams)
6608    non_retransmits: bool,
6609    /// If the datagram containing these frames should be padded to the min MTU
6610    requires_padding: bool,
6611}
6612
6613impl SentFrames {
6614    /// Returns whether the packet contains only ACKs
6615    fn is_ack_only(&self, streams: &StreamsState) -> bool {
6616        !self.largest_acked.is_empty()
6617            && !self.non_retransmits
6618            && self.stream_frames.is_empty()
6619            && self.retransmits.is_empty(streams)
6620    }
6621
6622    fn retransmits_mut(&mut self) -> &mut Retransmits {
6623        self.retransmits.get_or_create()
6624    }
6625
6626    fn sent(&mut self, frame: frame::EncodableFrame<'_>) {
6627        use frame::EncodableFrame::*;
6628        match frame {
6629            PathAck(path_ack_encoder) => {
6630                if let Some(max) = path_ack_encoder.ranges.max() {
6631                    self.largest_acked.insert(path_ack_encoder.path_id, max);
6632                }
6633            }
6634            Ack(ack_encoder) => {
6635                if let Some(max) = ack_encoder.ranges.max() {
6636                    self.largest_acked.insert(PathId::ZERO, max);
6637                }
6638            }
6639            Close(_) => { /* non retransmittable, but after this we don't really care */ }
6640            PathResponse(_) => self.non_retransmits = true,
6641            HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
6642            ReachOut(frame::ReachOut { round, ip, port }) => self
6643                .retransmits_mut()
6644                .reach_out
6645                .get_or_insert_with(|| (round, Vec::new()))
6646                .1
6647                .push((ip, port)),
6648            ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
6649            Ping(_) => self.non_retransmits = true,
6650            ImmediateAck(_) => self.non_retransmits = true,
6651            AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
6652            PathChallenge(_) => self.non_retransmits = true,
6653            Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
6654            PathAbandon(path_abandon) => {
6655                self.retransmits_mut()
6656                    .path_abandon
6657                    .entry(path_abandon.path_id)
6658                    .or_insert(path_abandon.error_code);
6659            }
6660            PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
6661            | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
6662                self.retransmits_mut().path_status.insert(path_id);
6663            }
6664            MaxPathId(_) => self.retransmits_mut().max_path_id = true,
6665            PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
6666            PathCidsBlocked(path_cids_blocked) => self
6667                .retransmits_mut()
6668                .path_cids_blocked
6669                .push(path_cids_blocked.path_id),
6670            ResetStream(reset) => self
6671                .retransmits_mut()
6672                .reset_stream
6673                .push((reset.id, reset.error_code)),
6674            StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
6675            NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
6676            RetireConnectionId(retire_cid) => self
6677                .retransmits_mut()
6678                .retire_cids
6679                .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
6680            Datagram(_) => self.non_retransmits = true,
6681            NewToken(_) => {}
6682            AddAddress(add_address) => {
6683                self.retransmits_mut().add_address.insert(add_address);
6684            }
6685            RemoveAddress(remove_address) => {
6686                self.retransmits_mut().remove_address.insert(remove_address);
6687            }
6688            StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
6689            MaxData(_) => self.retransmits_mut().max_data = true,
6690            MaxStreamData(max) => {
6691                self.retransmits_mut().max_stream_data.insert(max.id);
6692            }
6693            MaxStreams(max_streams) => {
6694                self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
6695            }
6696        }
6697    }
6698}
6699
6700/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
6701///
6702/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
6703///
6704/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
6705///
6706/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
6707fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6708    match (x, y) {
6709        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6710        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6711        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6712        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6713    }
6714}
6715
6716#[cfg(test)]
6717mod tests {
6718    use super::*;
6719
6720    #[test]
6721    fn negotiate_max_idle_timeout_commutative() {
6722        let test_params = [
6723            (None, None, None),
6724            (None, Some(VarInt(0)), None),
6725            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6726            (Some(VarInt(0)), Some(VarInt(0)), None),
6727            (
6728                Some(VarInt(2)),
6729                Some(VarInt(0)),
6730                Some(Duration::from_millis(2)),
6731            ),
6732            (
6733                Some(VarInt(1)),
6734                Some(VarInt(4)),
6735                Some(Duration::from_millis(1)),
6736            ),
6737        ];
6738
6739        for (left, right, result) in test_params {
6740            assert_eq!(negotiate_max_idle_timeout(left, right), result);
6741            assert_eq!(negotiate_max_idle_timeout(right, left), result);
6742        }
6743    }
6744}