noq_proto/connection/
paths.rs

1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8    PathStats, SpaceKind,
9    mtud::MtuDiscovery,
10    pacing::Pacer,
11    spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14    ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig,
15    TransportErrorCode, VarInt,
16    coding::{self, Decodable, Encodable},
17    congestion,
18    connection::{MAX_BACKOFF_EXPONENT, MAX_PTO_INTERVAL},
19    frame::ObservedAddr,
20};
21
22#[cfg(feature = "qlog")]
23use qlog::events::quic::RecoveryMetricsUpdated;
24
25/// Id representing different paths when using multipath extension
26#[cfg_attr(test, derive(test_strategy::Arbitrary))]
27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
28pub struct PathId(pub(crate) u32);
29
30impl std::hash::Hash for PathId {
31    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
32        state.write_u32(self.0);
33    }
34}
35
36impl Decodable for PathId {
37    fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
38        let v = VarInt::decode(r)?;
39        let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
40        Ok(Self(v))
41    }
42}
43
44impl Encodable for PathId {
45    fn encode<B: bytes::BufMut>(&self, w: &mut B) {
46        VarInt(self.0.into()).encode(w)
47    }
48}
49
50impl PathId {
51    /// The maximum path ID allowed.
52    pub const MAX: Self = Self(u32::MAX);
53
54    /// The 0 path id.
55    pub const ZERO: Self = Self(0);
56
57    /// The number of bytes this [`PathId`] uses when encoded as a [`VarInt`]
58    pub(crate) const fn size(&self) -> usize {
59        VarInt(self.0 as u64).size()
60    }
61
62    /// Saturating integer addition. Computes self + rhs, saturating at the numeric bounds instead
63    /// of overflowing.
64    pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
65        let rhs = rhs.into();
66        let inner = self.0.saturating_add(rhs.0);
67        Self(inner)
68    }
69
70    /// Saturating integer subtraction. Computes self - rhs, saturating at the numeric bounds
71    /// instead of overflowing.
72    pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
73        let rhs = rhs.into();
74        let inner = self.0.saturating_sub(rhs.0);
75        Self(inner)
76    }
77
78    /// Get the next [`PathId`]
79    pub(crate) fn next(&self) -> Self {
80        self.saturating_add(Self(1))
81    }
82
83    /// Get the underlying u32
84    pub(crate) fn as_u32(&self) -> u32 {
85        self.0
86    }
87}
88
89impl std::fmt::Display for PathId {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        self.0.fmt(f)
92    }
93}
94
95impl<T: Into<u32>> From<T> for PathId {
96    fn from(source: T) -> Self {
97        Self(source.into())
98    }
99}
100
101/// State needed for a single path ID.
102///
103/// A single path ID can migrate according to the rules in RFC9000 §9, either voluntary or
104/// involuntary. We need to keep the [`PathData`] of the previously used such path available
105/// in order to defend against migration attacks (see RFC9000 §9.3.1, §9.3.2 and §9.3.3) as
106/// well as to support path probing (RFC9000 §9.1).
107#[derive(Debug)]
108pub(super) struct PathState {
109    pub(super) data: PathData,
110    pub(super) prev: Option<(ConnectionId, PathData)>,
111}
112
113impl PathState {
114    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
115    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
116        // Visit known paths from newest to oldest to find the one `pn` was sent on
117        for path_data in [&mut self.data]
118            .into_iter()
119            .chain(self.prev.as_mut().map(|(_, data)| data))
120        {
121            if path_data.remove_in_flight(packet) {
122                return;
123            }
124        }
125    }
126}
127
128#[derive(Debug)]
129pub(super) struct SentChallengeInfo {
130    /// When was the challenge sent on the wire.
131    pub(super) sent_instant: Instant,
132    /// The 4-tuple on which this path challenge was sent.
133    pub(super) network_path: FourTuple,
134}
135
136/// State of particular network path 4-tuple within a [`PacketNumberSpace`].
137///
138/// With QUIC-Multipath a path is identified by a [`PathId`] and it is possible to have
139/// multiple paths on the same 4-tuple. Furthermore a single QUIC-Multipath path can migrate
140/// to a different 4-tuple, in a similar manner as an RFC9000 connection can use "path
141/// migration" to move to a different 4-tuple. There are thus two states we keep for paths:
142///
143/// - [`PacketNumberSpace`]: The state for a single packet number space, i.e. [`PathId`],
144///   which remains in place across path migrations to different 4-tuples.
145///
146///   This is stored in [`PacketSpace::number_spaces`] indexed on [`PathId`].
147///
148/// - [`PathData`]: The state we keep for each unique 4-tuple within a space. Of note is
149///   that a single [`PathData`] can never belong to a different [`PacketNumberSpace`].
150///
151///   This is stored in [`Connection::paths`] indexed by the current [`PathId`] for which
152///   space it exists. Either as the primary 4-tuple or as the previous 4-tuple just after a
153///   migration.
154///
155/// It follows that there might be several [`PathData`] structs for the same 4-tuple if
156/// several spaces are sharing the same 4-tuple. Note that during the handshake, the
157/// Initial, Handshake and Data spaces for [`PathId::ZERO`] all share the same [`PathData`].
158///
159/// [`PacketSpace::number_spaces`]: super::spaces::PacketSpace::number_spaces
160/// [`Connection::paths`]: super::Connection::paths
161#[derive(Debug)]
162pub(super) struct PathData {
163    pub(super) network_path: FourTuple,
164    pub(super) rtt: RttEstimator,
165    /// Whether we're enabling ECN on outgoing packets
166    pub(super) sending_ecn: bool,
167    /// Congestion controller state
168    pub(super) congestion: Box<dyn congestion::Controller>,
169    /// Pacing state
170    pub(super) pacing: Pacer,
171    /// Whether the last `poll_transmit_on_path` call yielded no data because there was
172    /// no outgoing application data.
173    ///
174    /// The RFC writes:
175    /// > When bytes in flight is smaller than the congestion window and sending is not pacing limited,
176    /// > the congestion window is underutilized. This can happen due to insufficient application data
177    /// > or flow control limits. When this occurs, the congestion window SHOULD NOT be increased in
178    /// > either slow start or congestion avoidance.
179    ///
180    /// (RFC9002, section 7.8)
181    ///
182    /// I.e. when app_limited is true, the congestion controller doesn't increase the congestion window.
183    pub(super) app_limited: bool,
184
185    /// Path challenges sent (on the wire, on-path) that we didn't receive a path response for yet
186    on_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
187    /// Whether to trigger sending another PATH_CHALLENGE in the next poll_transmit.
188    ///
189    /// This is picked up by [`super::Connection::space_can_send`].
190    ///
191    /// Only used for RFC9000-style path migration and multipath path validation (for opening).
192    ///
193    /// This is **not used** for n0 nat traversal challenge sending.
194    pub(super) pending_on_path_challenge: bool,
195    /// How often we've deemed a path challenge to be lost.
196    ///
197    /// Similar to [`Self::pto_count`], but for on-path path challenges.
198    /// Used to calculate exponential backoff for retrying path challenges.
199    pub(super) on_path_challenges_lost: u32,
200    /// Pending responses to PATH_CHALLENGE frames
201    pub(super) path_responses: PathResponses,
202    /// Whether we're certain the peer can both send and receive on this address
203    ///
204    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
205    /// migration. Always true for clients.
206    pub(super) validated: bool,
207    /// Total size of all UDP datagrams sent on this path
208    pub(super) total_sent: u64,
209    /// Total size of all UDP datagrams received on this path
210    pub(super) total_recvd: u64,
211    /// The state of the MTU discovery process
212    pub(super) mtud: MtuDiscovery,
213    /// Packet number of the first packet sent after an RTT sample was collected on this path
214    ///
215    /// Used in persistent congestion determination.
216    pub(super) first_packet_after_rtt_sample: Option<(SpaceKind, u64)>,
217    /// The in-flight packets and bytes
218    ///
219    /// Note that this is across all spaces on this path
220    pub(super) in_flight: InFlight,
221    /// Queue of data that must be sent over this specific [`PathData::generation`] path.
222    pub(super) pending: PathRetransmits,
223    /// Observed address frame with the largest sequence number received from the peer on this path.
224    pub(super) last_observed_addr_report: Option<ObservedAddr>,
225    /// The QUIC-MULTIPATH path status
226    pub(super) status: PathStatusState,
227    /// Number of the first packet sent on this path
228    ///
229    /// With RFC9000 §9 style migration (i.e. not multipath) the PathId does not change and
230    /// hence packet numbers continue. This is used to determine whether a packet was sent
231    /// on such an earlier path. Insufficient to determine if a packet was sent on a later
232    /// path.
233    first_packet: Option<u64>,
234    /// The number of times a tail-loss probe has been sent without receiving an ack.
235    ///
236    /// This is incremented by one every time the [`LossDetection`] timer fires because a
237    /// tail-loss probe needs to be sent. Once an acknowledgement for a packet is received
238    /// again it is reset to 0. Used to compute the PTO duration.
239    ///
240    /// [`LossDetection`]: super::timer::PathTimer::LossDetection
241    pub(super) pto_count: u32,
242
243    //
244    // Per-path idle & keep alive
245    //
246    /// Idle timeout for the path
247    ///
248    /// If expired, the path will be abandoned.  This is different from the connection-wide
249    /// idle timeout which closes the connection if expired.
250    pub(super) idle_timeout: Option<Duration>,
251    /// Keep alives to send on this path
252    ///
253    /// There is also a connection-level keep alive configured in the
254    /// [`TransportParameters`].  This triggers activity on any path which can keep the
255    /// connection alive.
256    ///
257    /// [`TransportParameters`]: crate::transport_parameters::TransportParameters
258    pub(super) keep_alive: Option<Duration>,
259    /// Whether to reset the idle timer when the next ack-eliciting packet is sent.
260    ///
261    /// Whenever we receive an authenticated packet the connection and path idle timers are
262    /// reset if a maximum idle timeout was negotiated. However on the first ack-eliciting
263    /// packet *sent* after this the idle timer also needs to be reset to avoid the idle
264    /// timer firing while the sent packet is in-fight. See
265    /// <https://www.rfc-editor.org/rfc/rfc9000.html#section-10.1>.
266    pub(super) permit_idle_reset: bool,
267
268    /// Whether the path has already been considered opened from an application perspective.
269    ///
270    /// This means, for paths other than the original [`PathId::ZERO`], a first path challenge has
271    /// been responded to, regardless of the initial validation status of the path. This state is
272    /// irreversible, since it's not affected by the path being closed.
273    ///
274    /// Sending a PATH_CHALLENGE and receiving a valid response before the application is informed
275    /// of the path, is a way to ensure the path is usable before it is reported. This is not
276    /// required by the spec, and in the future might be changed for simply requiring a first ack'd
277    /// packet.
278    pub(super) open_status: OpenStatus,
279
280    /// Whether we're currently draining the path after having abandoned it.
281    ///
282    /// This should only be true when a path discard timer is armed, and after the path was
283    /// abandoned (and added to the abandoned_paths set).
284    ///
285    /// This will only ever be set from false to true.
286    pub(super) draining: bool,
287
288    /// Snapshot of the qlog recovery metrics
289    #[cfg(feature = "qlog")]
290    recovery_metrics: RecoveryMetrics,
291
292    /// Tag uniquely identifying a path in a connection.
293    ///
294    /// When a migration happens on the same [`PathId`] we still detect a change in the
295    /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
296    /// value to keep track of which 4-tuple a packet belonged to.
297    generation: u64,
298}
299
300impl PathData {
301    pub(super) fn new(
302        network_path: FourTuple,
303        allow_mtud: bool,
304        peer_max_udp_payload_size: Option<u16>,
305        generation: u64,
306        now: Instant,
307        config: &TransportConfig,
308    ) -> Self {
309        let congestion = config
310            .congestion_controller_factory
311            .clone()
312            .build(now, config.get_initial_mtu());
313        Self {
314            network_path,
315            rtt: RttEstimator::new(config.initial_rtt),
316            sending_ecn: true,
317            pacing: Pacer::new(
318                config.initial_rtt,
319                congestion.initial_window(),
320                config.get_initial_mtu(),
321                config.max_outgoing_bytes_per_second,
322                now,
323            ),
324            congestion,
325            app_limited: false,
326            on_path_challenges_unconfirmed: Default::default(),
327            on_path_challenges_lost: 0,
328            pending_on_path_challenge: false,
329            path_responses: PathResponses::default(),
330            validated: false,
331            total_sent: 0,
332            total_recvd: 0,
333            mtud: config
334                .mtu_discovery_config
335                .as_ref()
336                .filter(|_| allow_mtud)
337                .map_or_else(
338                    || MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
339                    |mtud_config| {
340                        MtuDiscovery::new(
341                            config.get_initial_mtu(),
342                            config.min_mtu,
343                            peer_max_udp_payload_size,
344                            mtud_config.clone(),
345                        )
346                    },
347                ),
348            first_packet_after_rtt_sample: None,
349            in_flight: InFlight::new(),
350            pending: PathRetransmits::default(),
351            last_observed_addr_report: None,
352            status: Default::default(),
353            first_packet: None,
354            pto_count: 0,
355            idle_timeout: config.default_path_max_idle_timeout,
356            keep_alive: config.default_path_keep_alive_interval,
357            permit_idle_reset: true,
358            open_status: OpenStatus::default(),
359            draining: false,
360            #[cfg(feature = "qlog")]
361            recovery_metrics: RecoveryMetrics::default(),
362            generation,
363        }
364    }
365
366    /// Create a new path from a previous one.
367    ///
368    /// This should only be called when migrating paths.
369    pub(super) fn from_previous(
370        network_path: FourTuple,
371        prev: &Self,
372        generation: u64,
373        now: Instant,
374    ) -> Self {
375        let congestion = prev.congestion.clone_box();
376        let smoothed_rtt = prev.rtt.get();
377        Self {
378            network_path,
379            rtt: prev.rtt,
380            pacing: Pacer::new(
381                smoothed_rtt,
382                congestion.window(),
383                prev.current_mtu(),
384                prev.pacing.max_bytes_per_second(),
385                now,
386            ),
387            sending_ecn: true,
388            congestion,
389            app_limited: false,
390            on_path_challenges_unconfirmed: Default::default(),
391            on_path_challenges_lost: 0,
392            pending_on_path_challenge: false,
393            path_responses: PathResponses::default(),
394            validated: false,
395            total_sent: 0,
396            total_recvd: 0,
397            mtud: prev.mtud.clone(),
398            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
399            in_flight: InFlight::new(),
400            pending: PathRetransmits::default(),
401            last_observed_addr_report: None,
402            status: prev.status.clone(),
403            first_packet: None,
404            pto_count: 0,
405            idle_timeout: prev.idle_timeout,
406            keep_alive: prev.keep_alive,
407            permit_idle_reset: true,
408            open_status: OpenStatus::default(),
409            draining: false,
410            #[cfg(feature = "qlog")]
411            recovery_metrics: prev.recovery_metrics.clone(),
412            generation,
413        }
414    }
415
416    /// Whether we're in the process of validating this path with PATH_CHALLENGEs
417    pub(super) fn is_validating_path(&self) -> bool {
418        !self.on_path_challenges_unconfirmed.is_empty() || self.pending_on_path_challenge
419    }
420
421    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
422    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
423    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
424        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
425    }
426
427    /// Returns the path's current MTU
428    pub(super) fn current_mtu(&self) -> u16 {
429        self.mtud.current_mtu()
430    }
431
432    /// Account for transmission of `packet` with number `pn` in `space`
433    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
434        self.in_flight.insert(&packet);
435        if self.first_packet.is_none() {
436            self.first_packet = Some(pn);
437        }
438        if let Some(forgotten) = space.sent(pn, packet) {
439            self.remove_in_flight(&forgotten);
440        }
441    }
442
443    pub(super) fn record_path_challenge_sent(
444        &mut self,
445        now: Instant,
446        token: u64,
447        network_path: FourTuple,
448    ) {
449        let info = SentChallengeInfo {
450            sent_instant: now,
451            network_path,
452        };
453        debug_assert_eq!(network_path, self.network_path);
454        self.on_path_challenges_unconfirmed.insert(token, info);
455    }
456
457    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
458    /// `false` if `pn` was sent before this path was established.
459    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
460        if packet.path_generation != self.generation {
461            return false;
462        }
463        self.in_flight.remove(packet);
464        true
465    }
466
467    /// Increment the total size of sent UDP datagrams
468    pub(super) fn inc_total_sent(&mut self, inc: u64) {
469        self.total_sent = self.total_sent.saturating_add(inc);
470        if !self.validated {
471            trace!(
472                network_path = %self.network_path,
473                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
474                "anti amplification budget decreased"
475            );
476        }
477    }
478
479    /// Increment the total size of received UDP datagrams
480    pub(super) fn inc_total_recvd(&mut self, inc: u64) {
481        self.total_recvd = self.total_recvd.saturating_add(inc);
482        if !self.validated {
483            trace!(
484                network_path = %self.network_path,
485                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
486                "anti amplification budget increased"
487            );
488        }
489    }
490
491    /// The earliest time at which an on-path challenge we sent is considered lost.
492    pub(super) fn earliest_on_path_expiring_challenge(&self) -> Option<Instant> {
493        if self.on_path_challenges_unconfirmed.is_empty() {
494            return None;
495        }
496        let duration = self.on_path_challenge_expiry();
497        self.on_path_challenges_unconfirmed
498            .values()
499            .map(|info| info.sent_instant + duration)
500            .min()
501    }
502
503    pub(super) fn on_path_challenge_expiry(&self) -> Duration {
504        let backoff = 2u32.pow(self.on_path_challenges_lost.min(MAX_BACKOFF_EXPONENT));
505        let duration = self.rtt.pto_base() * backoff;
506        duration.min(MAX_PTO_INTERVAL)
507    }
508
509    /// Handle receiving a PATH_RESPONSE.
510    pub(super) fn on_path_response_received(
511        &mut self,
512        now: Instant,
513        token: u64,
514        network_path: FourTuple,
515    ) -> OnPathResponseReceived {
516        // > § 8.2.3
517        // > Path validation succeeds when a PATH_RESPONSE frame is received that contains the
518        // > data that was sent in a previous PATH_CHALLENGE frame. A PATH_RESPONSE frame
519        // > received on any network path validates the path on which the PATH_CHALLENGE was
520        // > sent.
521        //
522        // At this point we have three potentially different network paths:
523        // - current network path (`Self::network_path`)
524        // - network path used to send the path challenge (`SentChallengeInfo::network_path`)
525        // - network path over which the response arrived (`network_path`)
526        //
527        // As per the spec, this only validates the network path on which this was *sent*.
528        match self.on_path_challenges_unconfirmed.remove(&token) {
529            // Response to an on-path PathChallenge that validates this path.
530            // The sent path should match the current path. However, it's possible that the
531            // challenge was sent when no local_ip was known. This case is allowed as well.
532            Some(info) if info.network_path.is_probably_same_path(&self.network_path) => {
533                self.network_path.update_local_if_same_remote(&network_path);
534                let sent_instant = info.sent_instant;
535                if !std::mem::replace(&mut self.validated, true) {
536                    trace!("new path validated");
537                }
538                // Clear any other on-path sent challenges and stop sending new ones.
539                self.reset_on_path_challenges();
540
541                // This RTT can only be used for the initial RTT, not as a normal
542                // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
543                let rtt = now.saturating_duration_since(sent_instant);
544                self.rtt.reset_initial_rtt(rtt);
545
546                let prev_status = std::mem::replace(&mut self.open_status, OpenStatus::Informed);
547                OnPathResponseReceived::OnPath {
548                    was_open: matches!(prev_status, OpenStatus::Informed),
549                }
550            }
551            // Response to an on-path PathChallenge that does not validate this path.
552            Some(info) => {
553                // This is a valid path response, but this validates a 4-tuple we no longer
554                // have in use. Keep only sent challenges for the current path.
555                self.on_path_challenges_unconfirmed
556                    .retain(|_token, i| i.network_path == self.network_path);
557
558                // If there are no challenges for the current path, schedule one
559                if !self.on_path_challenges_unconfirmed.is_empty() {
560                    self.pending_on_path_challenge = true;
561                }
562                OnPathResponseReceived::Ignored {
563                    sent_on: info.network_path,
564                    current_path: self.network_path,
565                }
566            }
567            None => {
568                // Response to an unknown PathChallenge. Does not indicate failure.
569                OnPathResponseReceived::Unknown
570            }
571        }
572    }
573
574    /// Removes all on-path challenges we remember and cancels sending new on-path challenges.
575    pub(super) fn reset_on_path_challenges(&mut self) {
576        self.on_path_challenges_unconfirmed.clear();
577        self.pending_on_path_challenge = false;
578        self.on_path_challenges_lost = 0;
579    }
580
581    #[cfg(feature = "qlog")]
582    pub(super) fn qlog_recovery_metrics(
583        &mut self,
584        path_id: PathId,
585    ) -> Option<RecoveryMetricsUpdated> {
586        let controller_metrics = self.congestion.metrics();
587
588        let metrics = RecoveryMetrics {
589            min_rtt: Some(self.rtt.min),
590            smoothed_rtt: Some(self.rtt.get()),
591            latest_rtt: Some(self.rtt.latest),
592            rtt_variance: Some(self.rtt.var),
593            pto_count: Some(self.pto_count),
594            bytes_in_flight: Some(self.in_flight.bytes),
595            packets_in_flight: Some(self.in_flight.ack_eliciting),
596
597            congestion_window: Some(controller_metrics.congestion_window),
598            ssthresh: controller_metrics.ssthresh,
599            pacing_rate: controller_metrics.pacing_rate,
600        };
601
602        let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
603        self.recovery_metrics = metrics;
604        event
605    }
606
607    /// Return how long we need to wait before sending `bytes_to_send`
608    ///
609    /// See [`Pacer::delay`].
610    pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Duration> {
611        let smoothed_rtt = self.rtt.get();
612        let metrics = self.congestion.metrics();
613        self.pacing.delay(
614            smoothed_rtt,
615            bytes_to_send,
616            self.current_mtu(),
617            metrics.congestion_window,
618            now,
619            metrics.send_quantum,
620            metrics.pacing_rate,
621        )
622    }
623
624    /// Updates the last observed address report received on this path.
625    ///
626    /// If the address was updated, it's returned to be informed to the application.
627    #[must_use = "updated observed address must be reported to the application"]
628    pub(super) fn update_observed_addr_report(
629        &mut self,
630        observed: ObservedAddr,
631    ) -> Option<SocketAddr> {
632        match self.last_observed_addr_report.as_mut() {
633            Some(prev) => {
634                if prev.seq_no >= observed.seq_no {
635                    // frames that do not increase the sequence number on this path are ignored
636                    None
637                } else if prev.ip == observed.ip && prev.port == observed.port {
638                    // keep track of the last seq_no but do not report the address as updated
639                    prev.seq_no = observed.seq_no;
640                    None
641                } else {
642                    let addr = observed.socket_addr();
643                    self.last_observed_addr_report = Some(observed);
644                    Some(addr)
645                }
646            }
647            None => {
648                let addr = observed.socket_addr();
649                self.last_observed_addr_report = Some(observed);
650                Some(addr)
651            }
652        }
653    }
654
655    pub(crate) fn remote_status(&self) -> Option<PathStatus> {
656        self.status.remote_status.map(|(_seq, status)| status)
657    }
658
659    pub(crate) fn local_status(&self) -> PathStatus {
660        self.status.local_status
661    }
662
663    /// Tag uniquely identifying a path in a connection.
664    ///
665    /// When a migration happens on the same [`PathId`] we still detect a change in the
666    /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
667    /// value to keep track of which 4-tuple a packet belonged to.
668    pub(super) fn generation(&self) -> u64 {
669        self.generation
670    }
671}
672
673pub(super) enum OnPathResponseReceived {
674    /// This response validates the path on its current remote address.
675    OnPath { was_open: bool },
676    /// The received token is unknown.
677    Unknown,
678    /// The response is valid but it's not usable for path validation.
679    Ignored {
680        sent_on: FourTuple,
681        current_path: FourTuple,
682    },
683}
684
685#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
686pub(super) enum OpenStatus {
687    /// A first packet has not been sent using this [`PathId`].
688    #[default]
689    Pending,
690    /// The first packet has been sent using this [`PathId`]. However, it is not yet deemed good
691    /// enough to be reported to the application.
692    Sent,
693    /// The application has been informed of this path.
694    Informed,
695}
696
697/// Congestion metrics as described in [`recovery_metrics_updated`].
698///
699/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
700#[cfg(feature = "qlog")]
701#[derive(Default, Clone, PartialEq, Debug)]
702#[non_exhaustive]
703struct RecoveryMetrics {
704    pub min_rtt: Option<Duration>,
705    pub smoothed_rtt: Option<Duration>,
706    pub latest_rtt: Option<Duration>,
707    pub rtt_variance: Option<Duration>,
708    pub pto_count: Option<u32>,
709    pub bytes_in_flight: Option<u64>,
710    pub packets_in_flight: Option<u64>,
711    pub congestion_window: Option<u64>,
712    pub ssthresh: Option<u64>,
713    pub pacing_rate: Option<u64>,
714}
715
716#[cfg(feature = "qlog")]
717impl RecoveryMetrics {
718    /// Retain only values that have been updated since the last snapshot.
719    fn retain_updated(&self, previous: &Self) -> Self {
720        macro_rules! keep_if_changed {
721            ($name:ident) => {
722                if previous.$name == self.$name {
723                    None
724                } else {
725                    self.$name
726                }
727            };
728        }
729
730        Self {
731            min_rtt: keep_if_changed!(min_rtt),
732            smoothed_rtt: keep_if_changed!(smoothed_rtt),
733            latest_rtt: keep_if_changed!(latest_rtt),
734            rtt_variance: keep_if_changed!(rtt_variance),
735            pto_count: keep_if_changed!(pto_count),
736            bytes_in_flight: keep_if_changed!(bytes_in_flight),
737            packets_in_flight: keep_if_changed!(packets_in_flight),
738            congestion_window: keep_if_changed!(congestion_window),
739            ssthresh: keep_if_changed!(ssthresh),
740            pacing_rate: keep_if_changed!(pacing_rate),
741        }
742    }
743
744    /// Emit a `MetricsUpdated` event containing only updated values
745    fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
746        let updated = self.retain_updated(previous);
747
748        if updated == Self::default() {
749            return None;
750        }
751
752        Some(RecoveryMetricsUpdated {
753            min_rtt: updated.min_rtt.map(|rtt| rtt.as_micros() as f32 / 1000.0),
754            smoothed_rtt: updated
755                .smoothed_rtt
756                .map(|rtt| rtt.as_micros() as f32 / 1000.0),
757            latest_rtt: updated
758                .latest_rtt
759                .map(|rtt| rtt.as_micros() as f32 / 1000.0),
760            rtt_variance: updated
761                .rtt_variance
762                .map(|rtt| rtt.as_micros() as f32 / 1000.0),
763            pto_count: updated
764                .pto_count
765                .map(|count| count.try_into().unwrap_or(u16::MAX)),
766            bytes_in_flight: updated.bytes_in_flight,
767            packets_in_flight: updated.packets_in_flight,
768            congestion_window: updated.congestion_window,
769            ssthresh: updated.ssthresh,
770            pacing_rate: updated.pacing_rate,
771            path_id: Some(path_id.as_u32() as u64),
772            ex_data: Default::default(),
773        })
774    }
775}
776
777/// RTT estimation for a particular network path
778#[derive(Copy, Clone, Debug)]
779pub struct RttEstimator {
780    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
781    latest: Duration,
782    /// The smoothed RTT of the connection, computed as described in RFC6298
783    smoothed: Option<Duration>,
784    /// The RTT variance, computed as described in RFC6298
785    var: Duration,
786    /// The minimum RTT seen in the connection, ignoring ack delay.
787    min: Duration,
788}
789
790impl RttEstimator {
791    pub(super) fn new(initial_rtt: Duration) -> Self {
792        Self {
793            latest: initial_rtt,
794            smoothed: None,
795            var: initial_rtt / 2,
796            min: initial_rtt,
797        }
798    }
799
800    /// Resets the estimator using a new initial_rtt value.
801    ///
802    /// This only resets the initial_rtt **if** no samples have been recorded yet. If there
803    /// are any recorded samples the initial estimate can not be adjusted after the fact.
804    ///
805    /// This is useful when you receive a PATH_RESPONSE in the first packet received on a
806    /// new path. In this case you can use the delay of the PATH_CHALLENGE-PATH_RESPONSE as
807    /// the initial RTT to get a better expected estimation.
808    ///
809    /// A PATH_CHALLENGE-PATH_RESPONSE pair later in the connection should not be used
810    /// explicitly as an estimation since PATH_CHALLENGE is an ACK-eliciting packet itself
811    /// already.
812    pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
813        if self.smoothed.is_none() {
814            self.latest = initial_rtt;
815            self.var = initial_rtt / 2;
816            self.min = initial_rtt;
817        }
818    }
819
820    /// The current best RTT estimation.
821    pub fn get(&self) -> Duration {
822        self.smoothed.unwrap_or(self.latest)
823    }
824
825    /// Conservative estimate of RTT
826    ///
827    /// Takes the maximum of smoothed and latest RTT, as recommended
828    /// in 6.1.2 of the recovery spec (draft 29).
829    pub fn conservative(&self) -> Duration {
830        self.get().max(self.latest)
831    }
832
833    /// Minimum RTT registered so far for this estimator.
834    pub fn min(&self) -> Duration {
835        self.min
836    }
837
838    /// PTO computed as described in RFC9002#6.2.1.
839    pub(crate) fn pto_base(&self) -> Duration {
840        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
841    }
842
843    /// Records an RTT sample.
844    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
845        self.latest = rtt;
846        // https://www.rfc-editor.org/rfc/rfc9002.html#section-5.2-3:
847        // min_rtt does not adjust for ack_delay to avoid underestimating.
848        self.min = cmp::min(self.min, self.latest);
849        // Based on RFC6298.
850        if let Some(smoothed) = self.smoothed {
851            let adjusted_rtt = if self.min + ack_delay <= self.latest {
852                self.latest - ack_delay
853            } else {
854                self.latest
855            };
856            let var_sample = smoothed.abs_diff(adjusted_rtt);
857            self.var = (3 * self.var + var_sample) / 4;
858            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
859        } else {
860            self.smoothed = Some(self.latest);
861            self.var = self.latest / 2;
862            self.min = self.latest;
863        }
864    }
865}
866
867#[derive(Default, Debug)]
868pub(crate) struct PathResponses {
869    pending: Vec<PathResponse>,
870}
871
872impl PathResponses {
873    pub(crate) fn push(&mut self, packet: u64, token: u64, network_path: FourTuple) {
874        /// An arbitrary permissive limit to prevent abuse.
875        ///
876        /// If we've negotiated the n0 NAT Traversal extension, and one user might have a lot
877        /// of addresses, e.g. because of having lots of interfaces (we've seen >25 interfaces
878        /// on Macs with docker and other things), then we need to be able to process at least
879        /// as many PATH_CHALLENGE frames as there are interfaces.
880        /// On top of that, there are retries, which make it possible that we need to process
881        /// even more.
882        ///
883        /// Considering that there can be up to 2 `PathData`s per active `PathId`, and
884        /// reasonable default values for maximum concurrent multipath paths are ~8 and each
885        /// `PathResponse` struct takes up 72 bytes at the moment this, means an attacker can
886        /// cause us to keep `32 * 2 * 8 * 72 = ~37KB` of data around.
887        const MAX_PATH_RESPONSES: usize = 32;
888        let response = PathResponse {
889            packet,
890            token,
891            network_path,
892        };
893        let existing = self
894            .pending
895            .iter_mut()
896            .find(|x| x.network_path.remote == network_path.remote);
897        if let Some(existing) = existing {
898            // Update a queued response
899            if existing.packet <= packet {
900                *existing = response;
901            }
902            return;
903        }
904        if self.pending.len() < MAX_PATH_RESPONSES {
905            self.pending.push(response);
906        } else {
907            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
908            // older challenges.
909            trace!("ignoring excessive PATH_CHALLENGE");
910        }
911    }
912
913    pub(crate) fn pop_off_path(&mut self, network_path: FourTuple) -> Option<(u64, FourTuple)> {
914        let response = *self.pending.last()?;
915        // We use an exact comparison here, because once we've received for the first time,
916        // we really should either already have a local_ip, or we will never get one
917        // (because our OS doesn't support it).
918        if response.network_path == network_path {
919            // We don't bother searching further because we expect that the on-path response will
920            // get drained in the immediate future by a call to `pop_on_path`
921            return None;
922        }
923        self.pending.pop();
924        Some((response.token, response.network_path))
925    }
926
927    pub(crate) fn pop_on_path(&mut self, network_path: FourTuple) -> Option<u64> {
928        let response = *self.pending.last()?;
929        // Using an exact comparison. See explanation in `pop_off_path`.
930        if response.network_path != network_path {
931            // We don't bother searching further because we expect that the off-path response will
932            // get drained in the immediate future by a call to `pop_off_path`
933            return None;
934        }
935        self.pending.pop();
936        Some(response.token)
937    }
938
939    pub(crate) fn is_empty(&self) -> bool {
940        self.pending.is_empty()
941    }
942}
943
944#[derive(Copy, Clone, Debug)]
945struct PathResponse {
946    /// The packet number the corresponding PATH_CHALLENGE was received in
947    packet: u64,
948    /// The token of the PATH_CHALLENGE
949    token: u64,
950    /// The path the corresponding PATH_CHALLENGE was received from
951    network_path: FourTuple,
952}
953
954/// Summary statistics of packets that have been sent on a particular path, but which have not yet
955/// been acked or deemed lost
956#[derive(Debug)]
957pub(super) struct InFlight {
958    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
959    ///
960    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
961    /// count towards this to ensure congestion control does not impede congestion feedback.
962    pub(super) bytes: u64,
963    /// Number of packets in flight containing frames other than ACK and PADDING
964    ///
965    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
966    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
967    /// also be nonzero.
968    pub(super) ack_eliciting: u64,
969}
970
971impl InFlight {
972    fn new() -> Self {
973        Self {
974            bytes: 0,
975            ack_eliciting: 0,
976        }
977    }
978
979    fn insert(&mut self, packet: &SentPacket) {
980        self.bytes += u64::from(packet.size);
981        self.ack_eliciting += u64::from(packet.ack_eliciting);
982    }
983
984    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
985    fn remove(&mut self, packet: &SentPacket) {
986        self.bytes -= u64::from(packet.size);
987        self.ack_eliciting -= u64::from(packet.ack_eliciting);
988    }
989}
990
991/// State for QUIC-MULTIPATH PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP frames
992#[derive(Debug, Clone, Default)]
993pub(super) struct PathStatusState {
994    /// The local status
995    local_status: PathStatus,
996    /// Local sequence number, for both PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP
997    ///
998    /// This is the number of the *next* path status frame to be sent.
999    local_seq: VarInt,
1000    /// The status set by the remote
1001    remote_status: Option<(VarInt, PathStatus)>,
1002}
1003
1004impl PathStatusState {
1005    /// To be called on received PATH_STATUS_AVAILABLE/PATH_STATUS_BACKUP frames
1006    pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
1007        if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
1008            return trace!(%seq, "ignoring path status update");
1009        }
1010
1011        let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
1012        if prev != Some(status) {
1013            debug!(?status, ?seq, "remote changed path status");
1014        }
1015    }
1016
1017    /// Updates the local status
1018    ///
1019    /// If the local status changed, the previous value is returned
1020    pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
1021        if self.local_status == status {
1022            return None;
1023        }
1024
1025        self.local_seq = self.local_seq.saturating_add(1u8);
1026        Some(std::mem::replace(&mut self.local_status, status))
1027    }
1028
1029    pub(crate) fn seq(&self) -> VarInt {
1030        self.local_seq
1031    }
1032}
1033
1034/// The QUIC-MULTIPATH path status
1035///
1036/// See section "3.3 Path Status Management":
1037/// <https://quicwg.org/multipath/draft-ietf-quic-multipath.html#name-path-status-management>
1038#[cfg_attr(test, derive(test_strategy::Arbitrary))]
1039#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
1040pub enum PathStatus {
1041    /// Paths marked with as available will be used when scheduling packets
1042    ///
1043    /// If multiple paths are available, packets will be scheduled on whichever has
1044    /// capacity.
1045    #[default]
1046    Available,
1047    /// Paths marked as backup will only be used if there are no available paths
1048    ///
1049    /// If the max_idle_timeout is specified the path will be kept alive so that it does not
1050    /// expire.
1051    Backup,
1052}
1053
1054/// Application events about paths
1055#[derive(Debug, Clone, PartialEq, Eq)]
1056#[non_exhaustive]
1057pub enum PathEvent {
1058    /// A new path has established connection with the peer.
1059    #[non_exhaustive]
1060    Established {
1061        /// The path which can now be used for application data.
1062        id: PathId,
1063    },
1064    /// A path was abandoned and is no longer usable.
1065    ///
1066    /// Note that this may be the first event for a path: If a path is abandoned
1067    /// before having been established, no [`Self::Established`] event is emitted.
1068    ///
1069    /// This event will always be followed by [`Self::Discarded`] after some time.
1070    #[non_exhaustive]
1071    Abandoned {
1072        /// The path that was abandoned.
1073        id: PathId,
1074        /// Reason why this path was abandoned.
1075        reason: PathAbandonReason,
1076    },
1077    /// A path was discarded and all remaining state for it has been removed.
1078    ///
1079    /// This event is the last event for a path, and is always emitted after [`Self::Abandoned`].
1080    #[non_exhaustive]
1081    Discarded {
1082        /// Which path had its state dropped
1083        id: PathId,
1084        /// The final path stats, they are no longer available via [`Connection::stats`]
1085        ///
1086        /// [`Connection::stats`]: super::Connection::stats
1087        path_stats: Box<PathStats>,
1088    },
1089    /// The remote changed the status of the path
1090    ///
1091    /// The local status is not changed because of this event. It is up to the application
1092    /// to update the local status, which is used for packet scheduling, when the remote
1093    /// changes the status.
1094    #[non_exhaustive]
1095    RemoteStatus {
1096        /// Path which has changed status
1097        id: PathId,
1098        /// The new status set by the remote
1099        status: PathStatus,
1100    },
1101    /// Received an observation of our external address from the peer.
1102    #[non_exhaustive]
1103    ObservedAddr {
1104        /// Path over which the observed address was reported, [`PathId::ZERO`] when multipath is
1105        /// not negotiated
1106        id: PathId,
1107        /// The address observed by the remote over this path
1108        addr: SocketAddr,
1109    },
1110}
1111
1112/// Reason for why a path was abandoned.
1113#[derive(Debug, Clone, Eq, PartialEq)]
1114pub enum PathAbandonReason {
1115    /// The path was closed locally by the application.
1116    ApplicationClosed {
1117        /// The error code to be sent with the abandon frame.
1118        error_code: VarInt,
1119    },
1120    /// We didn't receive a path response in time after opening this path.
1121    ValidationFailed,
1122    /// We didn't receive any data from the remote within the path's idle timeout.
1123    TimedOut,
1124    /// The path became unusable after a local network change.
1125    UnusableAfterNetworkChange,
1126    /// The remote closed the path.
1127    RemoteAbandoned {
1128        /// The error that was sent with the abandon frame.
1129        error_code: VarInt,
1130    },
1131}
1132
1133impl PathAbandonReason {
1134    /// Whether this abandon was initiated by the remote peer.
1135    pub(crate) fn is_remote(&self) -> bool {
1136        matches!(self, Self::RemoteAbandoned { .. })
1137    }
1138
1139    /// Returns the error code to send with a PATH_ABANDON frame.
1140    pub(crate) fn error_code(&self) -> TransportErrorCode {
1141        match self {
1142            Self::ApplicationClosed { error_code } => (*error_code).into(),
1143            Self::ValidationFailed | Self::TimedOut | Self::UnusableAfterNetworkChange => {
1144                TransportErrorCode::PATH_UNSTABLE_OR_POOR
1145            }
1146            Self::RemoteAbandoned { error_code } => (*error_code).into(),
1147        }
1148    }
1149}
1150
1151/// Error from setting path status
1152#[derive(Debug, Error, Clone, PartialEq, Eq)]
1153pub enum SetPathStatusError {
1154    /// Error indicating that a path has not been opened or has already been abandoned
1155    #[error("closed path")]
1156    ClosedPath,
1157    /// Error indicating that this operation requires multipath to be negotiated whereas it hasn't been
1158    #[error("multipath not negotiated")]
1159    MultipathNotNegotiated,
1160}
1161
1162/// Error indicating that a path has not been opened or has already been abandoned
1163#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
1164#[error("closed path")]
1165pub struct ClosedPath {
1166    pub(super) _private: (),
1167}
1168
1169/// Path-specific retransmittable data lost in a packet.
1170#[derive(Debug, Default, Clone)]
1171pub(super) struct PathRetransmits {
1172    /// Whether this path needs to report its remote address back to the peer.
1173    ///
1174    /// This only happens if both peers agree to do so based on their transport parameters.
1175    pub(super) observed_address: bool,
1176}
1177
1178impl PathRetransmits {
1179    pub(super) fn is_empty(&self) -> bool {
1180        let PathRetransmits { observed_address } = self;
1181        !observed_address
1182    }
1183}
1184
1185impl std::ops::BitOrAssign for PathRetransmits {
1186    fn bitor_assign(&mut self, rhs: Self) {
1187        let Self { observed_address } = rhs;
1188        self.observed_address |= observed_address;
1189    }
1190}
1191
1192#[cfg(test)]
1193mod tests {
1194    use super::*;
1195
1196    #[test]
1197    fn test_path_id_saturating_add() {
1198        // add within range behaves normally
1199        let large: PathId = u16::MAX.into();
1200        let next = u32::from(u16::MAX) + 1;
1201        assert_eq!(large.saturating_add(1u8), PathId::from(next));
1202
1203        // outside range saturates
1204        assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
1205    }
1206}