noq_proto/connection/
paths.rs

1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8    PathStats, SpaceKind,
9    mtud::MtuDiscovery,
10    pacing::Pacer,
11    spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14    ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig,
15    TransportErrorCode, VarInt,
16    coding::{self, Decodable, Encodable},
17    congestion,
18    frame::ObservedAddr,
19};
20
21#[cfg(feature = "qlog")]
22use qlog::events::quic::RecoveryMetricsUpdated;
23
24/// Id representing different paths when using multipath extension
25#[cfg_attr(test, derive(test_strategy::Arbitrary))]
26#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
27pub struct PathId(pub(crate) u32);
28
29impl std::hash::Hash for PathId {
30    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
31        state.write_u32(self.0);
32    }
33}
34
35impl Decodable for PathId {
36    fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
37        let v = VarInt::decode(r)?;
38        let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
39        Ok(Self(v))
40    }
41}
42
43impl Encodable for PathId {
44    fn encode<B: bytes::BufMut>(&self, w: &mut B) {
45        VarInt(self.0.into()).encode(w)
46    }
47}
48
49impl PathId {
50    /// The maximum path ID allowed.
51    pub const MAX: Self = Self(u32::MAX);
52
53    /// The 0 path id.
54    pub const ZERO: Self = Self(0);
55
56    /// The number of bytes this [`PathId`] uses when encoded as a [`VarInt`]
57    pub(crate) const fn size(&self) -> usize {
58        VarInt(self.0 as u64).size()
59    }
60
61    /// Saturating integer addition. Computes self + rhs, saturating at the numeric bounds instead
62    /// of overflowing.
63    pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
64        let rhs = rhs.into();
65        let inner = self.0.saturating_add(rhs.0);
66        Self(inner)
67    }
68
69    /// Saturating integer subtraction. Computes self - rhs, saturating at the numeric bounds
70    /// instead of overflowing.
71    pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
72        let rhs = rhs.into();
73        let inner = self.0.saturating_sub(rhs.0);
74        Self(inner)
75    }
76
77    /// Get the next [`PathId`]
78    pub(crate) fn next(&self) -> Self {
79        self.saturating_add(Self(1))
80    }
81
82    /// Get the underlying u32
83    pub(crate) fn as_u32(&self) -> u32 {
84        self.0
85    }
86}
87
88impl std::fmt::Display for PathId {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        self.0.fmt(f)
91    }
92}
93
94impl<T: Into<u32>> From<T> for PathId {
95    fn from(source: T) -> Self {
96        Self(source.into())
97    }
98}
99
100/// State needed for a single path ID.
101///
102/// A single path ID can migrate according to the rules in RFC9000 §9, either voluntary or
103/// involuntary. We need to keep the [`PathData`] of the previously used such path available
104/// in order to defend against migration attacks (see RFC9000 §9.3.1, §9.3.2 and §9.3.3) as
105/// well as to support path probing (RFC9000 §9.1).
106#[derive(Debug)]
107pub(super) struct PathState {
108    pub(super) data: PathData,
109    pub(super) prev: Option<(ConnectionId, PathData)>,
110}
111
112impl PathState {
113    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
114    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
115        // Visit known paths from newest to oldest to find the one `pn` was sent on
116        for path_data in [&mut self.data]
117            .into_iter()
118            .chain(self.prev.as_mut().map(|(_, data)| data))
119        {
120            if path_data.remove_in_flight(packet) {
121                return;
122            }
123        }
124    }
125}
126
127#[derive(Debug)]
128pub(super) struct SentChallengeInfo {
129    /// When was the challenge sent on the wire.
130    pub(super) sent_instant: Instant,
131    /// The 4-tuple on which this path challenge was sent.
132    pub(super) network_path: FourTuple,
133}
134
135/// State of particular network path 4-tuple within a [`PacketNumberSpace`].
136///
137/// With QUIC-Multipath a path is identified by a [`PathId`] and it is possible to have
138/// multiple paths on the same 4-tuple. Furthermore a single QUIC-Multipath path can migrate
139/// to a different 4-tuple, in a similar manner as an RFC9000 connection can use "path
140/// migration" to move to a different 4-tuple. There are thus two states we keep for paths:
141///
142/// - [`PacketNumberSpace`]: The state for a single packet number space, i.e. [`PathId`],
143///   which remains in place across path migrations to different 4-tuples.
144///
145///   This is stored in [`PacketSpace::number_spaces`] indexed on [`PathId`].
146///
147/// - [`PathData`]: The state we keep for each unique 4-tuple within a space. Of note is
148///   that a single [`PathData`] can never belong to a different [`PacketNumberSpace`].
149///
150///   This is stored in [`Connection::paths`] indexed by the current [`PathId`] for which
151///   space it exists. Either as the primary 4-tuple or as the previous 4-tuple just after a
152///   migration.
153///
154/// It follows that there might be several [`PathData`] structs for the same 4-tuple if
155/// several spaces are sharing the same 4-tuple. Note that during the handshake, the
156/// Initial, Handshake and Data spaces for [`PathId::ZERO`] all share the same [`PathData`].
157///
158/// [`PacketSpace::number_spaces`]: super::spaces::PacketSpace::number_spaces
159/// [`Connection::paths`]: super::Connection::paths
160#[derive(Debug)]
161pub(super) struct PathData {
162    pub(super) network_path: FourTuple,
163    pub(super) rtt: RttEstimator,
164    /// Whether we're enabling ECN on outgoing packets
165    pub(super) sending_ecn: bool,
166    /// Congestion controller state
167    pub(super) congestion: Box<dyn congestion::Controller>,
168    /// Pacing state
169    pub(super) pacing: Pacer,
170    /// Whether the last `poll_transmit_on_path` call yielded no data because there was
171    /// no outgoing application data.
172    ///
173    /// The RFC writes:
174    /// > When bytes in flight is smaller than the congestion window and sending is not pacing limited,
175    /// > the congestion window is underutilized. This can happen due to insufficient application data
176    /// > or flow control limits. When this occurs, the congestion window SHOULD NOT be increased in
177    /// > either slow start or congestion avoidance.
178    ///
179    /// (RFC9002, section 7.8)
180    ///
181    /// I.e. when app_limited is true, the congestion controller doesn't increase the congestion window.
182    pub(super) app_limited: bool,
183
184    /// Path challenges sent (on the wire, on-path) that we didn't receive a path response for yet
185    on_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
186    /// Whether to trigger sending another PATH_CHALLENGE in the next poll_transmit.
187    ///
188    /// This is picked up by [`super::Connection::space_can_send`].
189    ///
190    /// Only used for RFC9000-style path migration and multipath path validation (for opening).
191    ///
192    /// This is **not used** for n0 nat traversal challenge sending.
193    pub(super) pending_on_path_challenge: bool,
194    /// Pending responses to PATH_CHALLENGE frames
195    pub(super) path_responses: PathResponses,
196    /// Whether we're certain the peer can both send and receive on this address
197    ///
198    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
199    /// migration. Always true for clients.
200    pub(super) validated: bool,
201    /// Total size of all UDP datagrams sent on this path
202    pub(super) total_sent: u64,
203    /// Total size of all UDP datagrams received on this path
204    pub(super) total_recvd: u64,
205    /// The state of the MTU discovery process
206    pub(super) mtud: MtuDiscovery,
207    /// Packet number of the first packet sent after an RTT sample was collected on this path
208    ///
209    /// Used in persistent congestion determination.
210    pub(super) first_packet_after_rtt_sample: Option<(SpaceKind, u64)>,
211    /// The in-flight packets and bytes
212    ///
213    /// Note that this is across all spaces on this path
214    pub(super) in_flight: InFlight,
215    /// Whether this path has had it's remote address reported back to the peer. This only happens
216    /// if both peers agree to so based on their transport parameters.
217    pub(super) observed_addr_sent: bool,
218    /// Observed address frame with the largest sequence number received from the peer on this path.
219    pub(super) last_observed_addr_report: Option<ObservedAddr>,
220    /// The QUIC-MULTIPATH path status
221    pub(super) status: PathStatusState,
222    /// Number of the first packet sent on this path
223    ///
224    /// With RFC9000 §9 style migration (i.e. not multipath) the PathId does not change and
225    /// hence packet numbers continue. This is used to determine whether a packet was sent
226    /// on such an earlier path. Insufficient to determine if a packet was sent on a later
227    /// path.
228    first_packet: Option<u64>,
229    /// The number of times a tail-loss probe has been sent without receiving an ack.
230    ///
231    /// This is incremented by one every time the [`LossDetection`] timer fires because a
232    /// tail-loss probe needs to be sent. Once an acknowledgement for a packet is received
233    /// again it is reset to 0. Used to compute the PTO duration.
234    ///
235    /// [`LossDetection`]: super::timer::PathTimer::LossDetection
236    pub(super) pto_count: u32,
237
238    //
239    // Per-path idle & keep alive
240    //
241    /// Idle timeout for the path
242    ///
243    /// If expired, the path will be abandoned.  This is different from the connection-wide
244    /// idle timeout which closes the connection if expired.
245    pub(super) idle_timeout: Option<Duration>,
246    /// Keep alives to send on this path
247    ///
248    /// There is also a connection-level keep alive configured in the
249    /// [`TransportParameters`].  This triggers activity on any path which can keep the
250    /// connection alive.
251    ///
252    /// [`TransportParameters`]: crate::transport_parameters::TransportParameters
253    pub(super) keep_alive: Option<Duration>,
254    /// Whether to reset the idle timer when the next ack-eliciting packet is sent.
255    ///
256    /// Whenever we receive an authenticated packet the connection and path idle timers are
257    /// reset if a maximum idle timeout was negotiated. However on the first ack-eliciting
258    /// packet *sent* after this the idle timer also needs to be reset to avoid the idle
259    /// timer firing while the sent packet is in-fight. See
260    /// <https://www.rfc-editor.org/rfc/rfc9000.html#section-10.1>.
261    pub(super) permit_idle_reset: bool,
262
263    /// Whether the path has already been considered opened from an application perspective.
264    ///
265    /// This means, for paths other than the original [`PathId::ZERO`], a first path challenge has
266    /// been responded to, regardless of the initial validation status of the path. This state is
267    /// irreversible, since it's not affected by the path being closed.
268    ///
269    /// Sending a PATH_CHALLENGE and receiving a valid response before the application is informed
270    /// of the path, is a way to ensure the path is usable before it is reported. This is not
271    /// required by the spec, and in the future might be changed for simply requiring a first ack'd
272    /// packet.
273    pub(super) open_status: OpenStatus,
274
275    /// Whether we're currently draining the path after having abandoned it.
276    ///
277    /// This should only be true when a path discard timer is armed, and after the path was
278    /// abandoned (and added to the abandoned_paths set).
279    ///
280    /// This will only ever be set from false to true.
281    pub(super) draining: bool,
282
283    /// Snapshot of the qlog recovery metrics
284    #[cfg(feature = "qlog")]
285    recovery_metrics: RecoveryMetrics,
286
287    /// Tag uniquely identifying a path in a connection.
288    ///
289    /// When a migration happens on the same [`PathId`] we still detect a change in the
290    /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
291    /// value to keep track of which 4-tuple a packet belonged to.
292    generation: u64,
293}
294
295impl PathData {
296    pub(super) fn new(
297        network_path: FourTuple,
298        allow_mtud: bool,
299        peer_max_udp_payload_size: Option<u16>,
300        generation: u64,
301        now: Instant,
302        config: &TransportConfig,
303    ) -> Self {
304        let congestion = config
305            .congestion_controller_factory
306            .clone()
307            .build(now, config.get_initial_mtu());
308        Self {
309            network_path,
310            rtt: RttEstimator::new(config.initial_rtt),
311            sending_ecn: true,
312            pacing: Pacer::new(
313                config.initial_rtt,
314                congestion.initial_window(),
315                config.get_initial_mtu(),
316                config.max_outgoing_bytes_per_second,
317                now,
318            ),
319            congestion,
320            app_limited: false,
321            on_path_challenges_unconfirmed: Default::default(),
322            pending_on_path_challenge: false,
323            path_responses: PathResponses::default(),
324            validated: false,
325            total_sent: 0,
326            total_recvd: 0,
327            mtud: config
328                .mtu_discovery_config
329                .as_ref()
330                .filter(|_| allow_mtud)
331                .map_or_else(
332                    || MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
333                    |mtud_config| {
334                        MtuDiscovery::new(
335                            config.get_initial_mtu(),
336                            config.min_mtu,
337                            peer_max_udp_payload_size,
338                            mtud_config.clone(),
339                        )
340                    },
341                ),
342            first_packet_after_rtt_sample: None,
343            in_flight: InFlight::new(),
344            observed_addr_sent: false,
345            last_observed_addr_report: None,
346            status: Default::default(),
347            first_packet: None,
348            pto_count: 0,
349            idle_timeout: config.default_path_max_idle_timeout,
350            keep_alive: config.default_path_keep_alive_interval,
351            permit_idle_reset: true,
352            open_status: OpenStatus::default(),
353            draining: false,
354            #[cfg(feature = "qlog")]
355            recovery_metrics: RecoveryMetrics::default(),
356            generation,
357        }
358    }
359
360    /// Create a new path from a previous one.
361    ///
362    /// This should only be called when migrating paths.
363    pub(super) fn from_previous(
364        network_path: FourTuple,
365        prev: &Self,
366        generation: u64,
367        now: Instant,
368    ) -> Self {
369        let congestion = prev.congestion.clone_box();
370        let smoothed_rtt = prev.rtt.get();
371        Self {
372            network_path,
373            rtt: prev.rtt,
374            pacing: Pacer::new(
375                smoothed_rtt,
376                congestion.window(),
377                prev.current_mtu(),
378                prev.pacing.max_bytes_per_second(),
379                now,
380            ),
381            sending_ecn: true,
382            congestion,
383            app_limited: false,
384            on_path_challenges_unconfirmed: Default::default(),
385            pending_on_path_challenge: false,
386            path_responses: PathResponses::default(),
387            validated: false,
388            total_sent: 0,
389            total_recvd: 0,
390            mtud: prev.mtud.clone(),
391            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
392            in_flight: InFlight::new(),
393            observed_addr_sent: false,
394            last_observed_addr_report: None,
395            status: prev.status.clone(),
396            first_packet: None,
397            pto_count: 0,
398            idle_timeout: prev.idle_timeout,
399            keep_alive: prev.keep_alive,
400            permit_idle_reset: true,
401            open_status: OpenStatus::default(),
402            draining: false,
403            #[cfg(feature = "qlog")]
404            recovery_metrics: prev.recovery_metrics.clone(),
405            generation,
406        }
407    }
408
409    /// Whether we're in the process of validating this path with PATH_CHALLENGEs
410    pub(super) fn is_validating_path(&self) -> bool {
411        !self.on_path_challenges_unconfirmed.is_empty() || self.pending_on_path_challenge
412    }
413
414    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
415    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
416    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
417        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
418    }
419
420    /// Returns the path's current MTU
421    pub(super) fn current_mtu(&self) -> u16 {
422        self.mtud.current_mtu()
423    }
424
425    /// Account for transmission of `packet` with number `pn` in `space`
426    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
427        self.in_flight.insert(&packet);
428        if self.first_packet.is_none() {
429            self.first_packet = Some(pn);
430        }
431        if let Some(forgotten) = space.sent(pn, packet) {
432            self.remove_in_flight(&forgotten);
433        }
434    }
435
436    pub(super) fn record_path_challenge_sent(
437        &mut self,
438        now: Instant,
439        token: u64,
440        network_path: FourTuple,
441    ) {
442        let info = SentChallengeInfo {
443            sent_instant: now,
444            network_path,
445        };
446        debug_assert_eq!(network_path, self.network_path);
447        self.on_path_challenges_unconfirmed.insert(token, info);
448    }
449
450    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
451    /// `false` if `pn` was sent before this path was established.
452    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
453        if packet.path_generation != self.generation {
454            return false;
455        }
456        self.in_flight.remove(packet);
457        true
458    }
459
460    /// Increment the total size of sent UDP datagrams
461    pub(super) fn inc_total_sent(&mut self, inc: u64) {
462        self.total_sent = self.total_sent.saturating_add(inc);
463        if !self.validated {
464            trace!(
465                network_path = %self.network_path,
466                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
467                "anti amplification budget decreased"
468            );
469        }
470    }
471
472    /// Increment the total size of received UDP datagrams
473    pub(super) fn inc_total_recvd(&mut self, inc: u64) {
474        self.total_recvd = self.total_recvd.saturating_add(inc);
475        if !self.validated {
476            trace!(
477                network_path = %self.network_path,
478                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
479                "anti amplification budget increased"
480            );
481        }
482    }
483
484    /// The earliest time at which an on-path challenge we sent is considered lost.
485    pub(super) fn earliest_on_path_expiring_challenge(&self) -> Option<Instant> {
486        if self.on_path_challenges_unconfirmed.is_empty() {
487            return None;
488        }
489        let pto = self.rtt.pto_base();
490        self.on_path_challenges_unconfirmed
491            .values()
492            .map(|info| info.sent_instant + pto)
493            .min()
494    }
495
496    /// Handle receiving a PATH_RESPONSE.
497    pub(super) fn on_path_response_received(
498        &mut self,
499        now: Instant,
500        token: u64,
501        network_path: FourTuple,
502    ) -> OnPathResponseReceived {
503        // > § 8.2.3
504        // > Path validation succeeds when a PATH_RESPONSE frame is received that contains the
505        // > data that was sent in a previous PATH_CHALLENGE frame. A PATH_RESPONSE frame
506        // > received on any network path validates the path on which the PATH_CHALLENGE was
507        // > sent.
508        //
509        // At this point we have three potentially different network paths:
510        // - current network path (`Self::network_path`)
511        // - network path used to send the path challenge (`SentChallengeInfo::network_path`)
512        // - network path over which the response arrived (`network_path`)
513        //
514        // As per the spec, this only validates the network path on which this was *sent*.
515        match self.on_path_challenges_unconfirmed.remove(&token) {
516            // Response to an on-path PathChallenge that validates this path.
517            // The sent path should match the current path. However, it's possible that the
518            // challenge was sent when no local_ip was known. This case is allowed as well.
519            Some(info) if info.network_path.is_probably_same_path(&self.network_path) => {
520                self.network_path.update_local_if_same_remote(&network_path);
521                let sent_instant = info.sent_instant;
522                if !std::mem::replace(&mut self.validated, true) {
523                    trace!("new path validated");
524                }
525                // Clear any other on-path sent challenges and stop sending new ones.
526                self.on_path_challenges_unconfirmed.clear();
527                self.pending_on_path_challenge = false;
528
529                // This RTT can only be used for the initial RTT, not as a normal
530                // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
531                let rtt = now.saturating_duration_since(sent_instant);
532                self.rtt.reset_initial_rtt(rtt);
533
534                let prev_status = std::mem::replace(&mut self.open_status, OpenStatus::Informed);
535                OnPathResponseReceived::OnPath {
536                    was_open: matches!(prev_status, OpenStatus::Informed),
537                }
538            }
539            // Response to an on-path PathChallenge that does not validate this path.
540            Some(info) => {
541                // This is a valid path response, but this validates a 4-tuple we no longer
542                // have in use. Keep only sent challenges for the current path.
543                self.on_path_challenges_unconfirmed
544                    .retain(|_token, i| i.network_path == self.network_path);
545
546                // If there are no challenges for the current path, schedule one
547                if !self.on_path_challenges_unconfirmed.is_empty() {
548                    self.pending_on_path_challenge = true;
549                }
550                OnPathResponseReceived::Ignored {
551                    sent_on: info.network_path,
552                    current_path: self.network_path,
553                }
554            }
555            None => {
556                // Response to an unknown PathChallenge. Does not indicate failure.
557                OnPathResponseReceived::Unknown
558            }
559        }
560    }
561
562    /// Removes all on-path challenges we remember and cancels sending new on-path challenges.
563    pub(super) fn reset_on_path_challenges(&mut self) {
564        self.on_path_challenges_unconfirmed.clear();
565        self.pending_on_path_challenge = false;
566    }
567
568    #[cfg(feature = "qlog")]
569    pub(super) fn qlog_recovery_metrics(
570        &mut self,
571        path_id: PathId,
572    ) -> Option<RecoveryMetricsUpdated> {
573        let controller_metrics = self.congestion.metrics();
574
575        let metrics = RecoveryMetrics {
576            min_rtt: Some(self.rtt.min),
577            smoothed_rtt: Some(self.rtt.get()),
578            latest_rtt: Some(self.rtt.latest),
579            rtt_variance: Some(self.rtt.var),
580            pto_count: Some(self.pto_count),
581            bytes_in_flight: Some(self.in_flight.bytes),
582            packets_in_flight: Some(self.in_flight.ack_eliciting),
583
584            congestion_window: Some(controller_metrics.congestion_window),
585            ssthresh: controller_metrics.ssthresh,
586            pacing_rate: controller_metrics.pacing_rate,
587        };
588
589        let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
590        self.recovery_metrics = metrics;
591        event
592    }
593
594    /// Return how long we need to wait before sending `bytes_to_send`
595    ///
596    /// See [`Pacer::delay`].
597    pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Duration> {
598        let smoothed_rtt = self.rtt.get();
599        let metrics = self.congestion.metrics();
600        self.pacing.delay(
601            smoothed_rtt,
602            bytes_to_send,
603            self.current_mtu(),
604            metrics.congestion_window,
605            now,
606            metrics.send_quantum,
607            metrics.pacing_rate,
608        )
609    }
610
611    /// Updates the last observed address report received on this path.
612    ///
613    /// If the address was updated, it's returned to be informed to the application.
614    #[must_use = "updated observed address must be reported to the application"]
615    pub(super) fn update_observed_addr_report(
616        &mut self,
617        observed: ObservedAddr,
618    ) -> Option<SocketAddr> {
619        match self.last_observed_addr_report.as_mut() {
620            Some(prev) => {
621                if prev.seq_no >= observed.seq_no {
622                    // frames that do not increase the sequence number on this path are ignored
623                    None
624                } else if prev.ip == observed.ip && prev.port == observed.port {
625                    // keep track of the last seq_no but do not report the address as updated
626                    prev.seq_no = observed.seq_no;
627                    None
628                } else {
629                    let addr = observed.socket_addr();
630                    self.last_observed_addr_report = Some(observed);
631                    Some(addr)
632                }
633            }
634            None => {
635                let addr = observed.socket_addr();
636                self.last_observed_addr_report = Some(observed);
637                Some(addr)
638            }
639        }
640    }
641
642    pub(crate) fn remote_status(&self) -> Option<PathStatus> {
643        self.status.remote_status.map(|(_seq, status)| status)
644    }
645
646    pub(crate) fn local_status(&self) -> PathStatus {
647        self.status.local_status
648    }
649
650    /// Tag uniquely identifying a path in a connection.
651    ///
652    /// When a migration happens on the same [`PathId`] we still detect a change in the
653    /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
654    /// value to keep track of which 4-tuple a packet belonged to.
655    pub(super) fn generation(&self) -> u64 {
656        self.generation
657    }
658}
659
660pub(super) enum OnPathResponseReceived {
661    /// This response validates the path on its current remote address.
662    OnPath { was_open: bool },
663    /// The received token is unknown.
664    Unknown,
665    /// The response is valid but it's not usable for path validation.
666    Ignored {
667        sent_on: FourTuple,
668        current_path: FourTuple,
669    },
670}
671
672#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
673pub(super) enum OpenStatus {
674    /// A first packet has not been sent using this [`PathId`].
675    #[default]
676    Pending,
677    /// The first packet has been sent using this [`PathId`]. However, it is not yet deemed good
678    /// enough to be reported to the application.
679    Sent,
680    /// The application has been informed of this path.
681    Informed,
682}
683
684/// Congestion metrics as described in [`recovery_metrics_updated`].
685///
686/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
687#[cfg(feature = "qlog")]
688#[derive(Default, Clone, PartialEq, Debug)]
689#[non_exhaustive]
690struct RecoveryMetrics {
691    pub min_rtt: Option<Duration>,
692    pub smoothed_rtt: Option<Duration>,
693    pub latest_rtt: Option<Duration>,
694    pub rtt_variance: Option<Duration>,
695    pub pto_count: Option<u32>,
696    pub bytes_in_flight: Option<u64>,
697    pub packets_in_flight: Option<u64>,
698    pub congestion_window: Option<u64>,
699    pub ssthresh: Option<u64>,
700    pub pacing_rate: Option<u64>,
701}
702
703#[cfg(feature = "qlog")]
704impl RecoveryMetrics {
705    /// Retain only values that have been updated since the last snapshot.
706    fn retain_updated(&self, previous: &Self) -> Self {
707        macro_rules! keep_if_changed {
708            ($name:ident) => {
709                if previous.$name == self.$name {
710                    None
711                } else {
712                    self.$name
713                }
714            };
715        }
716
717        Self {
718            min_rtt: keep_if_changed!(min_rtt),
719            smoothed_rtt: keep_if_changed!(smoothed_rtt),
720            latest_rtt: keep_if_changed!(latest_rtt),
721            rtt_variance: keep_if_changed!(rtt_variance),
722            pto_count: keep_if_changed!(pto_count),
723            bytes_in_flight: keep_if_changed!(bytes_in_flight),
724            packets_in_flight: keep_if_changed!(packets_in_flight),
725            congestion_window: keep_if_changed!(congestion_window),
726            ssthresh: keep_if_changed!(ssthresh),
727            pacing_rate: keep_if_changed!(pacing_rate),
728        }
729    }
730
731    /// Emit a `MetricsUpdated` event containing only updated values
732    fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
733        let updated = self.retain_updated(previous);
734
735        if updated == Self::default() {
736            return None;
737        }
738
739        Some(RecoveryMetricsUpdated {
740            min_rtt: updated.min_rtt.map(|rtt| rtt.as_micros() as f32 / 1000.0),
741            smoothed_rtt: updated
742                .smoothed_rtt
743                .map(|rtt| rtt.as_micros() as f32 / 1000.0),
744            latest_rtt: updated
745                .latest_rtt
746                .map(|rtt| rtt.as_micros() as f32 / 1000.0),
747            rtt_variance: updated
748                .rtt_variance
749                .map(|rtt| rtt.as_micros() as f32 / 1000.0),
750            pto_count: updated
751                .pto_count
752                .map(|count| count.try_into().unwrap_or(u16::MAX)),
753            bytes_in_flight: updated.bytes_in_flight,
754            packets_in_flight: updated.packets_in_flight,
755            congestion_window: updated.congestion_window,
756            ssthresh: updated.ssthresh,
757            pacing_rate: updated.pacing_rate,
758            path_id: Some(path_id.as_u32() as u64),
759        })
760    }
761}
762
763/// RTT estimation for a particular network path
764#[derive(Copy, Clone, Debug)]
765pub struct RttEstimator {
766    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
767    latest: Duration,
768    /// The smoothed RTT of the connection, computed as described in RFC6298
769    smoothed: Option<Duration>,
770    /// The RTT variance, computed as described in RFC6298
771    var: Duration,
772    /// The minimum RTT seen in the connection, ignoring ack delay.
773    min: Duration,
774}
775
776impl RttEstimator {
777    pub(super) fn new(initial_rtt: Duration) -> Self {
778        Self {
779            latest: initial_rtt,
780            smoothed: None,
781            var: initial_rtt / 2,
782            min: initial_rtt,
783        }
784    }
785
786    /// Resets the estimator using a new initial_rtt value.
787    ///
788    /// This only resets the initial_rtt **if** no samples have been recorded yet. If there
789    /// are any recorded samples the initial estimate can not be adjusted after the fact.
790    ///
791    /// This is useful when you receive a PATH_RESPONSE in the first packet received on a
792    /// new path. In this case you can use the delay of the PATH_CHALLENGE-PATH_RESPONSE as
793    /// the initial RTT to get a better expected estimation.
794    ///
795    /// A PATH_CHALLENGE-PATH_RESPONSE pair later in the connection should not be used
796    /// explicitly as an estimation since PATH_CHALLENGE is an ACK-eliciting packet itself
797    /// already.
798    pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
799        if self.smoothed.is_none() {
800            self.latest = initial_rtt;
801            self.var = initial_rtt / 2;
802            self.min = initial_rtt;
803        }
804    }
805
806    /// The current best RTT estimation.
807    pub fn get(&self) -> Duration {
808        self.smoothed.unwrap_or(self.latest)
809    }
810
811    /// Conservative estimate of RTT
812    ///
813    /// Takes the maximum of smoothed and latest RTT, as recommended
814    /// in 6.1.2 of the recovery spec (draft 29).
815    pub fn conservative(&self) -> Duration {
816        self.get().max(self.latest)
817    }
818
819    /// Minimum RTT registered so far for this estimator.
820    pub fn min(&self) -> Duration {
821        self.min
822    }
823
824    /// PTO computed as described in RFC9002#6.2.1.
825    pub(crate) fn pto_base(&self) -> Duration {
826        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
827    }
828
829    /// Records an RTT sample.
830    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
831        self.latest = rtt;
832        // https://www.rfc-editor.org/rfc/rfc9002.html#section-5.2-3:
833        // min_rtt does not adjust for ack_delay to avoid underestimating.
834        self.min = cmp::min(self.min, self.latest);
835        // Based on RFC6298.
836        if let Some(smoothed) = self.smoothed {
837            let adjusted_rtt = if self.min + ack_delay <= self.latest {
838                self.latest - ack_delay
839            } else {
840                self.latest
841            };
842            let var_sample = smoothed.abs_diff(adjusted_rtt);
843            self.var = (3 * self.var + var_sample) / 4;
844            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
845        } else {
846            self.smoothed = Some(self.latest);
847            self.var = self.latest / 2;
848            self.min = self.latest;
849        }
850    }
851}
852
853#[derive(Default, Debug)]
854pub(crate) struct PathResponses {
855    pending: Vec<PathResponse>,
856}
857
858impl PathResponses {
859    pub(crate) fn push(&mut self, packet: u64, token: u64, network_path: FourTuple) {
860        /// An arbitrary permissive limit to prevent abuse.
861        ///
862        /// If we've negotiated the n0 NAT Traversal extension, and one user might have a lot
863        /// of addresses, e.g. because of having lots of interfaces (we've seen >25 interfaces
864        /// on Macs with docker and other things), then we need to be able to process at least
865        /// as many PATH_CHALLENGE frames as there are interfaces.
866        /// On top of that, there are retries, which make it possible that we need to process
867        /// even more.
868        ///
869        /// Considering that there can be up to 2 `PathData`s per active `PathId`, and
870        /// reasonable default values for maximum concurrent multipath paths are ~8 and each
871        /// `PathResponse` struct takes up 72 bytes at the moment this, means an attacker can
872        /// cause us to keep `32 * 2 * 8 * 72 = ~37KB` of data around.
873        const MAX_PATH_RESPONSES: usize = 32;
874        let response = PathResponse {
875            packet,
876            token,
877            network_path,
878        };
879        let existing = self
880            .pending
881            .iter_mut()
882            .find(|x| x.network_path.remote == network_path.remote);
883        if let Some(existing) = existing {
884            // Update a queued response
885            if existing.packet <= packet {
886                *existing = response;
887            }
888            return;
889        }
890        if self.pending.len() < MAX_PATH_RESPONSES {
891            self.pending.push(response);
892        } else {
893            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
894            // older challenges.
895            trace!("ignoring excessive PATH_CHALLENGE");
896        }
897    }
898
899    pub(crate) fn pop_off_path(&mut self, network_path: FourTuple) -> Option<(u64, FourTuple)> {
900        let response = *self.pending.last()?;
901        // We use an exact comparison here, because once we've received for the first time,
902        // we really should either already have a local_ip, or we will never get one
903        // (because our OS doesn't support it).
904        if response.network_path == network_path {
905            // We don't bother searching further because we expect that the on-path response will
906            // get drained in the immediate future by a call to `pop_on_path`
907            return None;
908        }
909        self.pending.pop();
910        Some((response.token, response.network_path))
911    }
912
913    pub(crate) fn pop_on_path(&mut self, network_path: FourTuple) -> Option<u64> {
914        let response = *self.pending.last()?;
915        // Using an exact comparison. See explanation in `pop_off_path`.
916        if response.network_path != network_path {
917            // We don't bother searching further because we expect that the off-path response will
918            // get drained in the immediate future by a call to `pop_off_path`
919            return None;
920        }
921        self.pending.pop();
922        Some(response.token)
923    }
924
925    pub(crate) fn is_empty(&self) -> bool {
926        self.pending.is_empty()
927    }
928}
929
930#[derive(Copy, Clone, Debug)]
931struct PathResponse {
932    /// The packet number the corresponding PATH_CHALLENGE was received in
933    packet: u64,
934    /// The token of the PATH_CHALLENGE
935    token: u64,
936    /// The path the corresponding PATH_CHALLENGE was received from
937    network_path: FourTuple,
938}
939
940/// Summary statistics of packets that have been sent on a particular path, but which have not yet
941/// been acked or deemed lost
942#[derive(Debug)]
943pub(super) struct InFlight {
944    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
945    ///
946    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
947    /// count towards this to ensure congestion control does not impede congestion feedback.
948    pub(super) bytes: u64,
949    /// Number of packets in flight containing frames other than ACK and PADDING
950    ///
951    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
952    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
953    /// also be nonzero.
954    pub(super) ack_eliciting: u64,
955}
956
957impl InFlight {
958    fn new() -> Self {
959        Self {
960            bytes: 0,
961            ack_eliciting: 0,
962        }
963    }
964
965    fn insert(&mut self, packet: &SentPacket) {
966        self.bytes += u64::from(packet.size);
967        self.ack_eliciting += u64::from(packet.ack_eliciting);
968    }
969
970    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
971    fn remove(&mut self, packet: &SentPacket) {
972        self.bytes -= u64::from(packet.size);
973        self.ack_eliciting -= u64::from(packet.ack_eliciting);
974    }
975}
976
977/// State for QUIC-MULTIPATH PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP frames
978#[derive(Debug, Clone, Default)]
979pub(super) struct PathStatusState {
980    /// The local status
981    local_status: PathStatus,
982    /// Local sequence number, for both PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP
983    ///
984    /// This is the number of the *next* path status frame to be sent.
985    local_seq: VarInt,
986    /// The status set by the remote
987    remote_status: Option<(VarInt, PathStatus)>,
988}
989
990impl PathStatusState {
991    /// To be called on received PATH_STATUS_AVAILABLE/PATH_STATUS_BACKUP frames
992    pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
993        if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
994            return trace!(%seq, "ignoring path status update");
995        }
996
997        let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
998        if prev != Some(status) {
999            debug!(?status, ?seq, "remote changed path status");
1000        }
1001    }
1002
1003    /// Updates the local status
1004    ///
1005    /// If the local status changed, the previous value is returned
1006    pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
1007        if self.local_status == status {
1008            return None;
1009        }
1010
1011        self.local_seq = self.local_seq.saturating_add(1u8);
1012        Some(std::mem::replace(&mut self.local_status, status))
1013    }
1014
1015    pub(crate) fn seq(&self) -> VarInt {
1016        self.local_seq
1017    }
1018}
1019
1020/// The QUIC-MULTIPATH path status
1021///
1022/// See section "3.3 Path Status Management":
1023/// <https://quicwg.org/multipath/draft-ietf-quic-multipath.html#name-path-status-management>
1024#[cfg_attr(test, derive(test_strategy::Arbitrary))]
1025#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
1026pub enum PathStatus {
1027    /// Paths marked with as available will be used when scheduling packets
1028    ///
1029    /// If multiple paths are available, packets will be scheduled on whichever has
1030    /// capacity.
1031    #[default]
1032    Available,
1033    /// Paths marked as backup will only be used if there are no available paths
1034    ///
1035    /// If the max_idle_timeout is specified the path will be kept alive so that it does not
1036    /// expire.
1037    Backup,
1038}
1039
1040/// Application events about paths
1041#[derive(Debug, Clone, PartialEq, Eq)]
1042#[non_exhaustive]
1043pub enum PathEvent {
1044    /// A new path has established connection with the peer.
1045    #[non_exhaustive]
1046    Established {
1047        /// The path which can now be used for application data.
1048        id: PathId,
1049    },
1050    /// A path was abandoned and is no longer usable.
1051    ///
1052    /// This event will always be followed by [`Self::Discarded`] after some time.
1053    #[non_exhaustive]
1054    Abandoned {
1055        /// With path was abandoned.
1056        id: PathId,
1057        /// Reason why this path was abandoned.
1058        reason: PathAbandonReason,
1059    },
1060    /// A path was discarded and all remaining state for it has been removed.
1061    ///
1062    /// This event is the last event for a path, and is always emitted after [`Self::Abandoned`].
1063    #[non_exhaustive]
1064    Discarded {
1065        /// Which path had its state dropped
1066        id: PathId,
1067        /// The final path stats, they are no longer available via [`Connection::stats`]
1068        ///
1069        /// [`Connection::stats`]: super::Connection::stats
1070        path_stats: Box<PathStats>,
1071    },
1072    /// The remote changed the status of the path
1073    ///
1074    /// The local status is not changed because of this event. It is up to the application
1075    /// to update the local status, which is used for packet scheduling, when the remote
1076    /// changes the status.
1077    #[non_exhaustive]
1078    RemoteStatus {
1079        /// Path which has changed status
1080        id: PathId,
1081        /// The new status set by the remote
1082        status: PathStatus,
1083    },
1084    /// Received an observation of our external address from the peer.
1085    #[non_exhaustive]
1086    ObservedAddr {
1087        /// Path over which the observed address was reported, [`PathId::ZERO`] when multipath is
1088        /// not negotiated
1089        id: PathId,
1090        /// The address observed by the remote over this path
1091        addr: SocketAddr,
1092    },
1093}
1094
1095/// Reason for why a path was abandoned.
1096#[derive(Debug, Clone, Eq, PartialEq)]
1097pub enum PathAbandonReason {
1098    /// The path was closed locally by the application.
1099    ApplicationClosed {
1100        /// The error code to be sent with the abandon frame.
1101        error_code: VarInt,
1102    },
1103    /// We didn't receive a path response in time after opening this path.
1104    ValidationFailed,
1105    /// We didn't receive any data from the remote within the path's idle timeout.
1106    TimedOut,
1107    /// The path became unusable after a local network change.
1108    UnusableAfterNetworkChange,
1109    /// The remote closed the path.
1110    RemoteAbandoned {
1111        /// The error that was sent with the abandon frame.
1112        error_code: VarInt,
1113    },
1114}
1115
1116impl PathAbandonReason {
1117    /// Whether this abandon was initiated by the remote peer.
1118    pub(crate) fn is_remote(&self) -> bool {
1119        matches!(self, Self::RemoteAbandoned { .. })
1120    }
1121
1122    /// Returns the error code to send with a PATH_ABANDON frame.
1123    pub(crate) fn error_code(&self) -> TransportErrorCode {
1124        match self {
1125            Self::ApplicationClosed { error_code } => (*error_code).into(),
1126            Self::ValidationFailed | Self::TimedOut | Self::UnusableAfterNetworkChange => {
1127                TransportErrorCode::PATH_UNSTABLE_OR_POOR
1128            }
1129            Self::RemoteAbandoned { error_code } => (*error_code).into(),
1130        }
1131    }
1132}
1133
1134/// Error from setting path status
1135#[derive(Debug, Error, Clone, PartialEq, Eq)]
1136pub enum SetPathStatusError {
1137    /// Error indicating that a path has not been opened or has already been abandoned
1138    #[error("closed path")]
1139    ClosedPath,
1140    /// Error indicating that this operation requires multipath to be negotiated whereas it hasn't been
1141    #[error("multipath not negotiated")]
1142    MultipathNotNegotiated,
1143}
1144
1145/// Error indicating that a path has not been opened or has already been abandoned
1146#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
1147#[error("closed path")]
1148pub struct ClosedPath {
1149    pub(super) _private: (),
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154    use super::*;
1155
1156    #[test]
1157    fn test_path_id_saturating_add() {
1158        // add within range behaves normally
1159        let large: PathId = u16::MAX.into();
1160        let next = u32::from(u16::MAX) + 1;
1161        assert_eq!(large.saturating_add(1u8), PathId::from(next));
1162
1163        // outside range saturates
1164        assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
1165    }
1166}