noq_proto/connection/
paths.rs

1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8    PathStats, SpaceKind,
9    mtud::MtuDiscovery,
10    pacing::Pacer,
11    spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14    ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig,
15    TransportErrorCode, VarInt,
16    coding::{self, Decodable, Encodable},
17    congestion,
18    frame::ObservedAddr,
19};
20
21#[cfg(feature = "qlog")]
22use qlog::events::quic::RecoveryMetricsUpdated;
23
24/// Id representing different paths when using multipath extension
25#[cfg_attr(test, derive(test_strategy::Arbitrary))]
26#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
27pub struct PathId(pub(crate) u32);
28
29impl std::hash::Hash for PathId {
30    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
31        state.write_u32(self.0);
32    }
33}
34
35impl identity_hash::IdentityHashable for PathId {}
36
37impl Decodable for PathId {
38    fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
39        let v = VarInt::decode(r)?;
40        let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
41        Ok(Self(v))
42    }
43}
44
45impl Encodable for PathId {
46    fn encode<B: bytes::BufMut>(&self, w: &mut B) {
47        VarInt(self.0.into()).encode(w)
48    }
49}
50
51impl PathId {
52    /// The maximum path ID allowed.
53    pub const MAX: Self = Self(u32::MAX);
54
55    /// The 0 path id.
56    pub const ZERO: Self = Self(0);
57
58    /// The number of bytes this [`PathId`] uses when encoded as a [`VarInt`]
59    pub(crate) const fn size(&self) -> usize {
60        VarInt(self.0 as u64).size()
61    }
62
63    /// Saturating integer addition. Computes self + rhs, saturating at the numeric bounds instead
64    /// of overflowing.
65    pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
66        let rhs = rhs.into();
67        let inner = self.0.saturating_add(rhs.0);
68        Self(inner)
69    }
70
71    /// Saturating integer subtraction. Computes self - rhs, saturating at the numeric bounds
72    /// instead of overflowing.
73    pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
74        let rhs = rhs.into();
75        let inner = self.0.saturating_sub(rhs.0);
76        Self(inner)
77    }
78
79    /// Get the next [`PathId`]
80    pub(crate) fn next(&self) -> Self {
81        self.saturating_add(Self(1))
82    }
83
84    /// Get the underlying u32
85    pub(crate) fn as_u32(&self) -> u32 {
86        self.0
87    }
88}
89
90impl std::fmt::Display for PathId {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        self.0.fmt(f)
93    }
94}
95
96impl<T: Into<u32>> From<T> for PathId {
97    fn from(source: T) -> Self {
98        Self(source.into())
99    }
100}
101
102/// State needed for a single path ID.
103///
104/// A single path ID can migrate according to the rules in RFC9000 §9, either voluntary or
105/// involuntary. We need to keep the [`PathData`] of the previously used such path available
106/// in order to defend against migration attacks (see RFC9000 §9.3.1, §9.3.2 and §9.3.3) as
107/// well as to support path probing (RFC9000 §9.1).
108#[derive(Debug)]
109pub(super) struct PathState {
110    pub(super) data: PathData,
111    pub(super) prev: Option<(ConnectionId, PathData)>,
112}
113
114impl PathState {
115    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
116    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
117        // Visit known paths from newest to oldest to find the one `pn` was sent on
118        for path_data in [&mut self.data]
119            .into_iter()
120            .chain(self.prev.as_mut().map(|(_, data)| data))
121        {
122            if path_data.remove_in_flight(packet) {
123                return;
124            }
125        }
126    }
127}
128
129#[derive(Debug)]
130pub(super) struct SentChallengeInfo {
131    /// When was the challenge sent on the wire.
132    pub(super) sent_instant: Instant,
133    /// The 4-tuple on which this path challenge was sent.
134    pub(super) network_path: FourTuple,
135}
136
137/// Description of a particular network path
138#[derive(Debug)]
139pub(super) struct PathData {
140    pub(super) network_path: FourTuple,
141    pub(super) rtt: RttEstimator,
142    /// Whether we're enabling ECN on outgoing packets
143    pub(super) sending_ecn: bool,
144    /// Congestion controller state
145    pub(super) congestion: Box<dyn congestion::Controller>,
146    /// Pacing state
147    pub(super) pacing: Pacer,
148    /// Path challenges sent (on the wire, on-path) that we didn't receive a path response for yet
149    on_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
150    /// Path challenges sent (on the wire, off-path) that we didn't receive a path response for yet
151    off_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
152    /// Whether to trigger sending another PATH_CHALLENGE in the next poll_transmit.
153    ///
154    /// This is picked up by [`super::Connection::space_can_send`].
155    ///
156    /// Only used for RFC9000-style path migration and multipath path validation (for opening).
157    ///
158    /// This is **not used** for n0 nat traversal challenge sending.
159    pub(super) pending_on_path_challenge: bool,
160    /// Pending responses to PATH_CHALLENGE frames
161    pub(super) path_responses: PathResponses,
162    /// Whether we're certain the peer can both send and receive on this address
163    ///
164    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
165    /// migration. Always true for clients.
166    pub(super) validated: bool,
167    /// Total size of all UDP datagrams sent on this path
168    pub(super) total_sent: u64,
169    /// Total size of all UDP datagrams received on this path
170    pub(super) total_recvd: u64,
171    /// The state of the MTU discovery process
172    pub(super) mtud: MtuDiscovery,
173    /// Packet number of the first packet sent after an RTT sample was collected on this path
174    ///
175    /// Used in persistent congestion determination.
176    pub(super) first_packet_after_rtt_sample: Option<(SpaceKind, u64)>,
177    /// The in-flight packets and bytes
178    ///
179    /// Note that this is across all spaces on this path
180    pub(super) in_flight: InFlight,
181    /// Whether this path has had it's remote address reported back to the peer. This only happens
182    /// if both peers agree to so based on their transport parameters.
183    pub(super) observed_addr_sent: bool,
184    /// Observed address frame with the largest sequence number received from the peer on this path.
185    pub(super) last_observed_addr_report: Option<ObservedAddr>,
186    /// The QUIC-MULTIPATH path status
187    pub(super) status: PathStatusState,
188    /// Number of the first packet sent on this path
189    ///
190    /// With RFC9000 §9 style migration (i.e. not multipath) the PathId does not change and
191    /// hence packet numbers continue. This is used to determine whether a packet was sent
192    /// on such an earlier path. Insufficient to determine if a packet was sent on a later
193    /// path.
194    first_packet: Option<u64>,
195    /// The number of times a PTO has been sent without receiving an ack.
196    pub(super) pto_count: u32,
197
198    //
199    // Per-path idle & keep alive
200    //
201    /// Idle timeout for the path
202    ///
203    /// If expired, the path will be abandoned.  This is different from the connection-wide
204    /// idle timeout which closes the connection if expired.
205    pub(super) idle_timeout: Option<Duration>,
206    /// Keep alives to send on this path
207    ///
208    /// There is also a connection-level keep alive configured in the
209    /// [`TransportParameters`].  This triggers activity on any path which can keep the
210    /// connection alive.
211    ///
212    /// [`TransportParameters`]: crate::transport_parameters::TransportParameters
213    pub(super) keep_alive: Option<Duration>,
214
215    /// Whether the path has already been considered opened from an application perspective.
216    ///
217    /// This means, for paths other than the original [`PathId::ZERO`], a first path challenge has
218    /// been responded to, regardless of the initial validation status of the path. This state is
219    /// irreversible, since it's not affected by the path being closed.
220    ///
221    /// Sending a PATH_CHALLENGE and receiving a valid response before the application is informed
222    /// of the path, is a way to ensure the path is usable before it is reported. This is not
223    /// required by the spec, and in the future might be changed for simply requiring a first ack'd
224    /// packet.
225    pub(super) open_status: OpenStatus,
226
227    /// Whether we're currently draining the path after having abandoned it.
228    ///
229    /// This should only be true when a path discard timer is armed, and after the path was
230    /// abandoned (and added to the abandoned_paths set).
231    ///
232    /// This will only ever be set from false to true.
233    pub(super) draining: bool,
234
235    /// Snapshot of the qlog recovery metrics
236    #[cfg(feature = "qlog")]
237    recovery_metrics: RecoveryMetrics,
238
239    /// Tag uniquely identifying a path in a connection
240    generation: u64,
241}
242
243impl PathData {
244    pub(super) fn new(
245        network_path: FourTuple,
246        allow_mtud: bool,
247        peer_max_udp_payload_size: Option<u16>,
248        generation: u64,
249        now: Instant,
250        config: &TransportConfig,
251    ) -> Self {
252        let congestion = config
253            .congestion_controller_factory
254            .clone()
255            .build(now, config.get_initial_mtu());
256        Self {
257            network_path,
258            rtt: RttEstimator::new(config.initial_rtt),
259            sending_ecn: true,
260            pacing: Pacer::new(
261                config.initial_rtt,
262                congestion.initial_window(),
263                config.get_initial_mtu(),
264                now,
265            ),
266            congestion,
267            on_path_challenges_unconfirmed: Default::default(),
268            off_path_challenges_unconfirmed: Default::default(),
269            pending_on_path_challenge: false,
270            path_responses: PathResponses::default(),
271            validated: false,
272            total_sent: 0,
273            total_recvd: 0,
274            mtud: config
275                .mtu_discovery_config
276                .as_ref()
277                .filter(|_| allow_mtud)
278                .map_or_else(
279                    || MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
280                    |mtud_config| {
281                        MtuDiscovery::new(
282                            config.get_initial_mtu(),
283                            config.min_mtu,
284                            peer_max_udp_payload_size,
285                            mtud_config.clone(),
286                        )
287                    },
288                ),
289            first_packet_after_rtt_sample: None,
290            in_flight: InFlight::new(),
291            observed_addr_sent: false,
292            last_observed_addr_report: None,
293            status: Default::default(),
294            first_packet: None,
295            pto_count: 0,
296            idle_timeout: config.default_path_max_idle_timeout,
297            keep_alive: config.default_path_keep_alive_interval,
298            open_status: OpenStatus::default(),
299            draining: false,
300            #[cfg(feature = "qlog")]
301            recovery_metrics: RecoveryMetrics::default(),
302            generation,
303        }
304    }
305
306    /// Create a new path from a previous one.
307    ///
308    /// This should only be called when migrating paths.
309    pub(super) fn from_previous(
310        network_path: FourTuple,
311        prev: &Self,
312        generation: u64,
313        now: Instant,
314    ) -> Self {
315        let congestion = prev.congestion.clone_box();
316        let smoothed_rtt = prev.rtt.get();
317        Self {
318            network_path,
319            rtt: prev.rtt,
320            pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
321            sending_ecn: true,
322            congestion,
323            on_path_challenges_unconfirmed: Default::default(),
324            off_path_challenges_unconfirmed: Default::default(),
325            pending_on_path_challenge: false,
326            path_responses: PathResponses::default(),
327            validated: false,
328            total_sent: 0,
329            total_recvd: 0,
330            mtud: prev.mtud.clone(),
331            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
332            in_flight: InFlight::new(),
333            observed_addr_sent: false,
334            last_observed_addr_report: None,
335            status: prev.status.clone(),
336            first_packet: None,
337            pto_count: 0,
338            idle_timeout: prev.idle_timeout,
339            keep_alive: prev.keep_alive,
340            open_status: OpenStatus::default(),
341            draining: false,
342            #[cfg(feature = "qlog")]
343            recovery_metrics: prev.recovery_metrics.clone(),
344            generation,
345        }
346    }
347
348    /// Whether we're in the process of validating this path with PATH_CHALLENGEs
349    pub(super) fn is_validating_path(&self) -> bool {
350        !self.on_path_challenges_unconfirmed.is_empty() || self.pending_on_path_challenge
351    }
352
353    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
354    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
355    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
356        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
357    }
358
359    /// Returns the path's current MTU
360    pub(super) fn current_mtu(&self) -> u16 {
361        self.mtud.current_mtu()
362    }
363
364    /// Account for transmission of `packet` with number `pn` in `space`
365    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
366        self.in_flight.insert(&packet);
367        if self.first_packet.is_none() {
368            self.first_packet = Some(pn);
369        }
370        if let Some(forgotten) = space.sent(pn, packet) {
371            self.remove_in_flight(&forgotten);
372        }
373    }
374
375    pub(super) fn record_path_challenge_sent(
376        &mut self,
377        now: Instant,
378        token: u64,
379        network_path: FourTuple,
380    ) {
381        let info = SentChallengeInfo {
382            sent_instant: now,
383            network_path,
384        };
385        if network_path == self.network_path {
386            self.on_path_challenges_unconfirmed.insert(token, info);
387        } else {
388            self.off_path_challenges_unconfirmed.insert(token, info);
389        }
390    }
391
392    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
393    /// `false` if `pn` was sent before this path was established.
394    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
395        if packet.path_generation != self.generation {
396            return false;
397        }
398        self.in_flight.remove(packet);
399        true
400    }
401
402    /// Increment the total size of sent UDP datagrams
403    pub(super) fn inc_total_sent(&mut self, inc: u64) {
404        self.total_sent = self.total_sent.saturating_add(inc);
405        if !self.validated {
406            trace!(
407                network_path = %self.network_path,
408                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
409                "anti amplification budget decreased"
410            );
411        }
412    }
413
414    /// Increment the total size of received UDP datagrams
415    pub(super) fn inc_total_recvd(&mut self, inc: u64) {
416        self.total_recvd = self.total_recvd.saturating_add(inc);
417        if !self.validated {
418            trace!(
419                network_path = %self.network_path,
420                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
421                "anti amplification budget increased"
422            );
423        }
424    }
425
426    /// The earliest time at which an on-path challenge we sent is considered lost.
427    pub(super) fn earliest_on_path_expiring_challenge(&self) -> Option<Instant> {
428        if self.on_path_challenges_unconfirmed.is_empty() {
429            return None;
430        }
431        let pto = self.rtt.pto_base();
432        self.on_path_challenges_unconfirmed
433            .values()
434            .map(|info| info.sent_instant + pto)
435            .min()
436    }
437
438    /// Handle receiving a PATH_RESPONSE.
439    pub(super) fn on_path_response_received(
440        &mut self,
441        now: Instant,
442        token: u64,
443        network_path: FourTuple,
444    ) -> OnPathResponseReceived {
445        // > § 8.2.3
446        // > Path validation succeeds when a PATH_RESPONSE frame is received that contains the
447        // > data that was sent in a previous PATH_CHALLENGE frame. A PATH_RESPONSE frame
448        // > received on any network path validates the path on which the PATH_CHALLENGE was
449        // > sent.
450        //
451        // At this point we have three potentially different network paths:
452        // - current network path (`Self::network_path`)
453        // - network path used to send the path challenge (`SentChallengeInfo::network_path`)
454        // - network path over which the response arrived (`network_path`)
455        //
456        // As per the spec, this only validates the network path on which this was *sent*.
457        match self.on_path_challenges_unconfirmed.remove(&token) {
458            // Response to an on-path PathChallenge that validates this path.
459            // The sent path should match the current path. However, it's possible that the
460            // challenge was sent when no local_ip was known. This case is allowed as well
461            Some(info) if info.network_path.is_probably_same_path(&self.network_path) => {
462                self.network_path.update_local_if_same_remote(&network_path);
463                let sent_instant = info.sent_instant;
464                if !std::mem::replace(&mut self.validated, true) {
465                    trace!("new path validated");
466                }
467                // Clear any other on-path sent challenge
468                self.on_path_challenges_unconfirmed.clear();
469
470                self.pending_on_path_challenge = false;
471
472                // This RTT can only be used for the initial RTT, not as a normal
473                // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
474                let rtt = now.saturating_duration_since(sent_instant);
475                self.rtt.reset_initial_rtt(rtt);
476
477                let prev_status = std::mem::replace(&mut self.open_status, OpenStatus::Informed);
478                OnPathResponseReceived::OnPath {
479                    was_open: prev_status == OpenStatus::Informed,
480                }
481            }
482            // Response to an on-path PathChallenge that does not validate this path
483            Some(info) => {
484                // This is a valid path response, but this validates a path we no longer have in
485                // use. Keep only sent challenges for the current path.
486
487                self.on_path_challenges_unconfirmed
488                    .retain(|_token, i| i.network_path == self.network_path);
489
490                // if there are no challenges for the current path, schedule one
491                if !self.on_path_challenges_unconfirmed.is_empty() {
492                    self.pending_on_path_challenge = true;
493                }
494                OnPathResponseReceived::Ignored {
495                    sent_on: info.network_path,
496                    current_path: self.network_path,
497                }
498            }
499            None => match self.off_path_challenges_unconfirmed.remove(&token) {
500                // Response to an off-path PathChallenge
501                Some(info) => {
502                    // Since we do not store validation state for these paths, we only really care
503                    // about reaching the same remote
504                    self.off_path_challenges_unconfirmed
505                        .retain(|_token, i| i.network_path.remote != info.network_path.remote);
506                    OnPathResponseReceived::OffPath
507                }
508                // Response to an unknown PathChallenge. Does not indicate failure
509                None => OnPathResponseReceived::Unknown,
510            },
511        }
512    }
513
514    /// Removes all on-path challenges we remember and cancels sending new on-path challenges.
515    pub(super) fn reset_on_path_challenges(&mut self) {
516        self.on_path_challenges_unconfirmed.clear();
517        self.pending_on_path_challenge = false;
518    }
519
520    #[cfg(feature = "qlog")]
521    pub(super) fn qlog_recovery_metrics(
522        &mut self,
523        path_id: PathId,
524    ) -> Option<RecoveryMetricsUpdated> {
525        let controller_metrics = self.congestion.metrics();
526
527        let metrics = RecoveryMetrics {
528            min_rtt: Some(self.rtt.min),
529            smoothed_rtt: Some(self.rtt.get()),
530            latest_rtt: Some(self.rtt.latest),
531            rtt_variance: Some(self.rtt.var),
532            pto_count: Some(self.pto_count),
533            bytes_in_flight: Some(self.in_flight.bytes),
534            packets_in_flight: Some(self.in_flight.ack_eliciting),
535
536            congestion_window: Some(controller_metrics.congestion_window),
537            ssthresh: controller_metrics.ssthresh,
538            pacing_rate: controller_metrics.pacing_rate,
539        };
540
541        let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
542        self.recovery_metrics = metrics;
543        event
544    }
545
546    /// Return how long we need to wait before sending `bytes_to_send`
547    ///
548    /// See [`Pacer::delay`].
549    pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Instant> {
550        let smoothed_rtt = self.rtt.get();
551        self.pacing.delay(
552            smoothed_rtt,
553            bytes_to_send,
554            self.current_mtu(),
555            self.congestion.window(),
556            now,
557        )
558    }
559
560    /// Updates the last observed address report received on this path.
561    ///
562    /// If the address was updated, it's returned to be informed to the application.
563    #[must_use = "updated observed address must be reported to the application"]
564    pub(super) fn update_observed_addr_report(
565        &mut self,
566        observed: ObservedAddr,
567    ) -> Option<SocketAddr> {
568        match self.last_observed_addr_report.as_mut() {
569            Some(prev) => {
570                if prev.seq_no >= observed.seq_no {
571                    // frames that do not increase the sequence number on this path are ignored
572                    None
573                } else if prev.ip == observed.ip && prev.port == observed.port {
574                    // keep track of the last seq_no but do not report the address as updated
575                    prev.seq_no = observed.seq_no;
576                    None
577                } else {
578                    let addr = observed.socket_addr();
579                    self.last_observed_addr_report = Some(observed);
580                    Some(addr)
581                }
582            }
583            None => {
584                let addr = observed.socket_addr();
585                self.last_observed_addr_report = Some(observed);
586                Some(addr)
587            }
588        }
589    }
590
591    pub(crate) fn remote_status(&self) -> Option<PathStatus> {
592        self.status.remote_status.map(|(_seq, status)| status)
593    }
594
595    pub(crate) fn local_status(&self) -> PathStatus {
596        self.status.local_status
597    }
598
599    pub(super) fn generation(&self) -> u64 {
600        self.generation
601    }
602}
603
604pub(super) enum OnPathResponseReceived {
605    /// This response validates the path on its current remote address.
606    OnPath { was_open: bool },
607    /// This response is valid, but it's for a remote other than the path's current remote address.
608    OffPath,
609    /// The received token is unknown.
610    Unknown,
611    /// The response is valid but it's not usable for path validation.
612    Ignored {
613        sent_on: FourTuple,
614        current_path: FourTuple,
615    },
616}
617
618#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
619pub(super) enum OpenStatus {
620    /// A first packet has not been sent using this [`PathId`].
621    #[default]
622    Pending,
623    /// The first packet has been sent using this [`PathId`]. However, it is not yet deemed good
624    /// enough to be reported to the application.
625    Sent,
626    /// The application has been informed of this path.
627    Informed,
628}
629
630/// Congestion metrics as described in [`recovery_metrics_updated`].
631///
632/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
633#[cfg(feature = "qlog")]
634#[derive(Default, Clone, PartialEq, Debug)]
635#[non_exhaustive]
636struct RecoveryMetrics {
637    pub min_rtt: Option<Duration>,
638    pub smoothed_rtt: Option<Duration>,
639    pub latest_rtt: Option<Duration>,
640    pub rtt_variance: Option<Duration>,
641    pub pto_count: Option<u32>,
642    pub bytes_in_flight: Option<u64>,
643    pub packets_in_flight: Option<u64>,
644    pub congestion_window: Option<u64>,
645    pub ssthresh: Option<u64>,
646    pub pacing_rate: Option<u64>,
647}
648
649#[cfg(feature = "qlog")]
650impl RecoveryMetrics {
651    /// Retain only values that have been updated since the last snapshot.
652    fn retain_updated(&self, previous: &Self) -> Self {
653        macro_rules! keep_if_changed {
654            ($name:ident) => {
655                if previous.$name == self.$name {
656                    None
657                } else {
658                    self.$name
659                }
660            };
661        }
662
663        Self {
664            min_rtt: keep_if_changed!(min_rtt),
665            smoothed_rtt: keep_if_changed!(smoothed_rtt),
666            latest_rtt: keep_if_changed!(latest_rtt),
667            rtt_variance: keep_if_changed!(rtt_variance),
668            pto_count: keep_if_changed!(pto_count),
669            bytes_in_flight: keep_if_changed!(bytes_in_flight),
670            packets_in_flight: keep_if_changed!(packets_in_flight),
671            congestion_window: keep_if_changed!(congestion_window),
672            ssthresh: keep_if_changed!(ssthresh),
673            pacing_rate: keep_if_changed!(pacing_rate),
674        }
675    }
676
677    /// Emit a `MetricsUpdated` event containing only updated values
678    fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
679        let updated = self.retain_updated(previous);
680
681        if updated == Self::default() {
682            return None;
683        }
684
685        Some(RecoveryMetricsUpdated {
686            min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
687            smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
688            latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
689            rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
690            pto_count: updated
691                .pto_count
692                .map(|count| count.try_into().unwrap_or(u16::MAX)),
693            bytes_in_flight: updated.bytes_in_flight,
694            packets_in_flight: updated.packets_in_flight,
695            congestion_window: updated.congestion_window,
696            ssthresh: updated.ssthresh,
697            pacing_rate: updated.pacing_rate,
698            path_id: Some(path_id.as_u32() as u64),
699        })
700    }
701}
702
703/// RTT estimation for a particular network path
704#[derive(Copy, Clone, Debug)]
705pub struct RttEstimator {
706    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
707    latest: Duration,
708    /// The smoothed RTT of the connection, computed as described in RFC6298
709    smoothed: Option<Duration>,
710    /// The RTT variance, computed as described in RFC6298
711    var: Duration,
712    /// The minimum RTT seen in the connection, ignoring ack delay.
713    min: Duration,
714}
715
716impl RttEstimator {
717    pub(super) fn new(initial_rtt: Duration) -> Self {
718        Self {
719            latest: initial_rtt,
720            smoothed: None,
721            var: initial_rtt / 2,
722            min: initial_rtt,
723        }
724    }
725
726    /// Resets the estimator using a new initial_rtt value.
727    ///
728    /// This only resets the initial_rtt **if** no samples have been recorded yet. If there
729    /// are any recorded samples the initial estimate can not be adjusted after the fact.
730    ///
731    /// This is useful when you receive a PATH_RESPONSE in the first packet received on a
732    /// new path. In this case you can use the delay of the PATH_CHALLENGE-PATH_RESPONSE as
733    /// the initial RTT to get a better expected estimation.
734    ///
735    /// A PATH_CHALLENGE-PATH_RESPONSE pair later in the connection should not be used
736    /// explicitly as an estimation since PATH_CHALLENGE is an ACK-eliciting packet itself
737    /// already.
738    pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
739        if self.smoothed.is_none() {
740            self.latest = initial_rtt;
741            self.var = initial_rtt / 2;
742            self.min = initial_rtt;
743        }
744    }
745
746    /// The current best RTT estimation.
747    pub fn get(&self) -> Duration {
748        self.smoothed.unwrap_or(self.latest)
749    }
750
751    /// Conservative estimate of RTT
752    ///
753    /// Takes the maximum of smoothed and latest RTT, as recommended
754    /// in 6.1.2 of the recovery spec (draft 29).
755    pub fn conservative(&self) -> Duration {
756        self.get().max(self.latest)
757    }
758
759    /// Minimum RTT registered so far for this estimator.
760    pub fn min(&self) -> Duration {
761        self.min
762    }
763
764    /// PTO computed as described in RFC9002#6.2.1.
765    pub(crate) fn pto_base(&self) -> Duration {
766        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
767    }
768
769    /// Records an RTT sample.
770    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
771        self.latest = rtt;
772        // https://www.rfc-editor.org/rfc/rfc9002.html#section-5.2-3:
773        // min_rtt does not adjust for ack_delay to avoid underestimating.
774        self.min = cmp::min(self.min, self.latest);
775        // Based on RFC6298.
776        if let Some(smoothed) = self.smoothed {
777            let adjusted_rtt = if self.min + ack_delay <= self.latest {
778                self.latest - ack_delay
779            } else {
780                self.latest
781            };
782            let var_sample = smoothed.abs_diff(adjusted_rtt);
783            self.var = (3 * self.var + var_sample) / 4;
784            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
785        } else {
786            self.smoothed = Some(self.latest);
787            self.var = self.latest / 2;
788            self.min = self.latest;
789        }
790    }
791}
792
793#[derive(Default, Debug)]
794pub(crate) struct PathResponses {
795    pending: Vec<PathResponse>,
796}
797
798impl PathResponses {
799    pub(crate) fn push(&mut self, packet: u64, token: u64, network_path: FourTuple) {
800        /// Arbitrary permissive limit to prevent abuse
801        const MAX_PATH_RESPONSES: usize = 16;
802        let response = PathResponse {
803            packet,
804            token,
805            network_path,
806        };
807        let existing = self
808            .pending
809            .iter_mut()
810            .find(|x| x.network_path.remote == network_path.remote);
811        if let Some(existing) = existing {
812            // Update a queued response
813            if existing.packet <= packet {
814                *existing = response;
815            }
816            return;
817        }
818        if self.pending.len() < MAX_PATH_RESPONSES {
819            self.pending.push(response);
820        } else {
821            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
822            // older challenges.
823            trace!("ignoring excessive PATH_CHALLENGE");
824        }
825    }
826
827    pub(crate) fn pop_off_path(&mut self, network_path: FourTuple) -> Option<(u64, FourTuple)> {
828        let response = *self.pending.last()?;
829        // We use an exact comparison here, because once we've received for the first time,
830        // we really should either already have a local_ip, or we will never get one
831        // (because our OS doesn't support it).
832        if response.network_path == network_path {
833            // We don't bother searching further because we expect that the on-path response will
834            // get drained in the immediate future by a call to `pop_on_path`
835            return None;
836        }
837        self.pending.pop();
838        Some((response.token, response.network_path))
839    }
840
841    pub(crate) fn pop_on_path(&mut self, network_path: FourTuple) -> Option<u64> {
842        let response = *self.pending.last()?;
843        // Using an exact comparison. See explanation in `pop_off_path`.
844        if response.network_path != network_path {
845            // We don't bother searching further because we expect that the off-path response will
846            // get drained in the immediate future by a call to `pop_off_path`
847            return None;
848        }
849        self.pending.pop();
850        Some(response.token)
851    }
852
853    pub(crate) fn is_empty(&self) -> bool {
854        self.pending.is_empty()
855    }
856}
857
858#[derive(Copy, Clone, Debug)]
859struct PathResponse {
860    /// The packet number the corresponding PATH_CHALLENGE was received in
861    packet: u64,
862    /// The token of the PATH_CHALLENGE
863    token: u64,
864    /// The path the corresponding PATH_CHALLENGE was received from
865    network_path: FourTuple,
866}
867
868/// Summary statistics of packets that have been sent on a particular path, but which have not yet
869/// been acked or deemed lost
870#[derive(Debug)]
871pub(super) struct InFlight {
872    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
873    ///
874    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
875    /// count towards this to ensure congestion control does not impede congestion feedback.
876    pub(super) bytes: u64,
877    /// Number of packets in flight containing frames other than ACK and PADDING
878    ///
879    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
880    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
881    /// also be nonzero.
882    pub(super) ack_eliciting: u64,
883}
884
885impl InFlight {
886    fn new() -> Self {
887        Self {
888            bytes: 0,
889            ack_eliciting: 0,
890        }
891    }
892
893    fn insert(&mut self, packet: &SentPacket) {
894        self.bytes += u64::from(packet.size);
895        self.ack_eliciting += u64::from(packet.ack_eliciting);
896    }
897
898    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
899    fn remove(&mut self, packet: &SentPacket) {
900        self.bytes -= u64::from(packet.size);
901        self.ack_eliciting -= u64::from(packet.ack_eliciting);
902    }
903}
904
905/// State for QUIC-MULTIPATH PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP frames
906#[derive(Debug, Clone, Default)]
907pub(super) struct PathStatusState {
908    /// The local status
909    local_status: PathStatus,
910    /// Local sequence number, for both PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP
911    ///
912    /// This is the number of the *next* path status frame to be sent.
913    local_seq: VarInt,
914    /// The status set by the remote
915    remote_status: Option<(VarInt, PathStatus)>,
916}
917
918impl PathStatusState {
919    /// To be called on received PATH_STATUS_AVAILABLE/PATH_STATUS_BACKUP frames
920    pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
921        if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
922            return trace!(%seq, "ignoring path status update");
923        }
924
925        let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
926        if prev != Some(status) {
927            debug!(?status, ?seq, "remote changed path status");
928        }
929    }
930
931    /// Updates the local status
932    ///
933    /// If the local status changed, the previous value is returned
934    pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
935        if self.local_status == status {
936            return None;
937        }
938
939        self.local_seq = self.local_seq.saturating_add(1u8);
940        Some(std::mem::replace(&mut self.local_status, status))
941    }
942
943    pub(crate) fn seq(&self) -> VarInt {
944        self.local_seq
945    }
946}
947
948/// The QUIC-MULTIPATH path status
949///
950/// See section "3.3 Path Status Management":
951/// <https://quicwg.org/multipath/draft-ietf-quic-multipath.html#name-path-status-management>
952#[cfg_attr(test, derive(test_strategy::Arbitrary))]
953#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
954pub enum PathStatus {
955    /// Paths marked with as available will be used when scheduling packets
956    ///
957    /// If multiple paths are available, packets will be scheduled on whichever has
958    /// capacity.
959    #[default]
960    Available,
961    /// Paths marked as backup will only be used if there are no available paths
962    ///
963    /// If the max_idle_timeout is specified the path will be kept alive so that it does not
964    /// expire.
965    Backup,
966}
967
968/// Application events about paths
969#[derive(Debug, Clone, PartialEq, Eq)]
970pub enum PathEvent {
971    /// A new path has been opened
972    Opened {
973        /// Which path is now open
974        id: PathId,
975    },
976    /// A path was abandoned and is no longer usable.
977    ///
978    /// This event will always be followed by [`Self::Discarded`] after some time.
979    Abandoned {
980        /// With path was abandoned.
981        id: PathId,
982        /// Reason why this path was abandoned.
983        reason: PathAbandonReason,
984    },
985    /// A path was discarded and all remaining state for it has been removed.
986    ///
987    /// This event is the last event for a path, and is always emitted after [`Self::Abandoned`].
988    Discarded {
989        /// Which path had its state dropped
990        id: PathId,
991        /// The final path stats, they are no longer available via [`Connection::stats`]
992        ///
993        /// [`Connection::stats`]: super::Connection::stats
994        path_stats: PathStats,
995    },
996    /// The remote changed the status of the path
997    ///
998    /// The local status is not changed because of this event. It is up to the application
999    /// to update the local status, which is used for packet scheduling, when the remote
1000    /// changes the status.
1001    RemoteStatus {
1002        /// Path which has changed status
1003        id: PathId,
1004        /// The new status set by the remote
1005        status: PathStatus,
1006    },
1007    /// Received an observation of our external address from the peer.
1008    ObservedAddr {
1009        /// Path over which the observed address was reported, [`PathId::ZERO`] when multipath is
1010        /// not negotiated
1011        id: PathId,
1012        /// The address observed by the remote over this path
1013        addr: SocketAddr,
1014    },
1015}
1016
1017/// Reason for why a path was abandoned.
1018#[derive(Debug, Clone, Eq, PartialEq)]
1019pub enum PathAbandonReason {
1020    /// The path was closed locally by the application.
1021    ApplicationClosed {
1022        /// The error code to be sent with the abandon frame.
1023        error_code: VarInt,
1024    },
1025    /// We didn't receive a path response in time after opening this path.
1026    ValidationFailed,
1027    /// We didn't receive any data from the remote within the path's idle timeout.
1028    TimedOut,
1029    /// The path became unusable after a local network change.
1030    UnusableAfterNetworkChange,
1031    /// The path was opened in a NAT traversal round which was terminated.
1032    NatTraversalRoundEnded,
1033    /// The remote closed the path.
1034    RemoteAbandoned {
1035        /// The error that was sent with the abandon frame.
1036        error_code: VarInt,
1037    },
1038}
1039
1040impl PathAbandonReason {
1041    /// Returns `true` if the closing of this path was initiated locally.
1042    pub(crate) fn is_locally_initiated(&self) -> bool {
1043        !matches!(self, Self::RemoteAbandoned { .. })
1044    }
1045
1046    /// Returns the error code to send with a PATH_ABANDON frame.
1047    pub(crate) fn error_code(&self) -> TransportErrorCode {
1048        match self {
1049            Self::ApplicationClosed { error_code } => (*error_code).into(),
1050            Self::NatTraversalRoundEnded => TransportErrorCode::APPLICATION_ABANDON_PATH,
1051            Self::ValidationFailed | Self::TimedOut | Self::UnusableAfterNetworkChange => {
1052                TransportErrorCode::PATH_UNSTABLE_OR_POOR
1053            }
1054            Self::RemoteAbandoned { error_code } => (*error_code).into(),
1055        }
1056    }
1057}
1058
1059/// Error from setting path status
1060#[derive(Debug, Error, Clone, PartialEq, Eq)]
1061pub enum SetPathStatusError {
1062    /// Error indicating that a path has not been opened or has already been abandoned
1063    #[error("closed path")]
1064    ClosedPath,
1065    /// Error indicating that this operation requires multipath to be negotiated whereas it hasn't been
1066    #[error("multipath not negotiated")]
1067    MultipathNotNegotiated,
1068}
1069
1070/// Error indicating that a path has not been opened or has already been abandoned
1071#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
1072#[error("closed path")]
1073pub struct ClosedPath {
1074    pub(super) _private: (),
1075}
1076
1077#[cfg(test)]
1078mod tests {
1079    use super::*;
1080
1081    #[test]
1082    fn test_path_id_saturating_add() {
1083        // add within range behaves normally
1084        let large: PathId = u16::MAX.into();
1085        let next = u32::from(u16::MAX) + 1;
1086        assert_eq!(large.saturating_add(1u8), PathId::from(next));
1087
1088        // outside range saturates
1089        assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
1090    }
1091}