noq_proto/connection/
paths.rs

1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8    PathStats, SpaceKind,
9    mtud::MtuDiscovery,
10    pacing::Pacer,
11    spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14    ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig,
15    TransportErrorCode, VarInt,
16    coding::{self, Decodable, Encodable},
17    congestion,
18    frame::ObservedAddr,
19};
20
21#[cfg(feature = "qlog")]
22use qlog::events::quic::RecoveryMetricsUpdated;
23
24/// Id representing different paths when using multipath extension
25#[cfg_attr(test, derive(test_strategy::Arbitrary))]
26#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
27pub struct PathId(pub(crate) u32);
28
29impl std::hash::Hash for PathId {
30    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
31        state.write_u32(self.0);
32    }
33}
34
35impl identity_hash::IdentityHashable for PathId {}
36
37impl Decodable for PathId {
38    fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
39        let v = VarInt::decode(r)?;
40        let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
41        Ok(Self(v))
42    }
43}
44
45impl Encodable for PathId {
46    fn encode<B: bytes::BufMut>(&self, w: &mut B) {
47        VarInt(self.0.into()).encode(w)
48    }
49}
50
51impl PathId {
52    /// The maximum path ID allowed.
53    pub const MAX: Self = Self(u32::MAX);
54
55    /// The 0 path id.
56    pub const ZERO: Self = Self(0);
57
58    /// The number of bytes this [`PathId`] uses when encoded as a [`VarInt`]
59    pub(crate) const fn size(&self) -> usize {
60        VarInt(self.0 as u64).size()
61    }
62
63    /// Saturating integer addition. Computes self + rhs, saturating at the numeric bounds instead
64    /// of overflowing.
65    pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
66        let rhs = rhs.into();
67        let inner = self.0.saturating_add(rhs.0);
68        Self(inner)
69    }
70
71    /// Saturating integer subtraction. Computes self - rhs, saturating at the numeric bounds
72    /// instead of overflowing.
73    pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
74        let rhs = rhs.into();
75        let inner = self.0.saturating_sub(rhs.0);
76        Self(inner)
77    }
78
79    /// Get the next [`PathId`]
80    pub(crate) fn next(&self) -> Self {
81        self.saturating_add(Self(1))
82    }
83
84    /// Get the underlying u32
85    pub(crate) fn as_u32(&self) -> u32 {
86        self.0
87    }
88}
89
90impl std::fmt::Display for PathId {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        self.0.fmt(f)
93    }
94}
95
96impl<T: Into<u32>> From<T> for PathId {
97    fn from(source: T) -> Self {
98        Self(source.into())
99    }
100}
101
102/// State needed for a single path ID.
103///
104/// A single path ID can migrate according to the rules in RFC9000 §9, either voluntary or
105/// involuntary. We need to keep the [`PathData`] of the previously used such path available
106/// in order to defend against migration attacks (see RFC9000 §9.3.1, §9.3.2 and §9.3.3) as
107/// well as to support path probing (RFC9000 §9.1).
108#[derive(Debug)]
109pub(super) struct PathState {
110    pub(super) data: PathData,
111    pub(super) prev: Option<(ConnectionId, PathData)>,
112}
113
114impl PathState {
115    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
116    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
117        // Visit known paths from newest to oldest to find the one `pn` was sent on
118        for path_data in [&mut self.data]
119            .into_iter()
120            .chain(self.prev.as_mut().map(|(_, data)| data))
121        {
122            if path_data.remove_in_flight(packet) {
123                return;
124            }
125        }
126    }
127}
128
129#[derive(Debug)]
130pub(super) struct SentChallengeInfo {
131    /// When was the challenge sent on the wire.
132    pub(super) sent_instant: Instant,
133    /// The 4-tuple on which this path challenge was sent.
134    pub(super) network_path: FourTuple,
135}
136
137/// State of particular network path 4-tuple within a [`PacketNumberSpace`].
138///
139/// With QUIC-Multipath a path is identified by a [`PathId`] and it is possible to have
140/// multiple paths on the same 4-tuple. Furthermore a single QUIC-Multipath path can migrate
141/// to a different 4-tuple, in a similar manner as an RFC9000 connection can use "path
142/// migration" to move to a different 4-tuple. There are thus two states we keep for paths:
143///
144/// - [`PacketNumberSpace`]: The state for a single packet number space, i.e. [`PathId`],
145///   which remains in place across path migrations to different 4-tuples.
146///
147///   This is stored in [`PacketSpace::number_spaces`] indexed on [`PathId`].
148///
149/// - [`PathData`]: The state we keep for each unique 4-tuple within a space. Of note is
150///   that a single [`PathData`] can never belong to a different [`PacketNumberSpace`].
151///
152///   This is stored in [`Connection::paths`] indexed by the current [`PathId`] for which
153///   space it exists. Either as the primary 4-tuple or as the previous 4-tuple just after a
154///   migration.
155///
156/// It follows that there might be several [`PathData`] structs for the same 4-tuple if
157/// several spaces are sharing the same 4-tuple. Note that during the handshake, the
158/// Initial, Handshake and Data spaces for [`PathId::ZERO`] all share the same [`PathData`].
159///
160/// [`PacketSpace::number_spaces`]: super::spaces::PacketSpace::number_spaces
161/// [`Connection::paths`]: super::Connection::paths
162#[derive(Debug)]
163pub(super) struct PathData {
164    pub(super) network_path: FourTuple,
165    pub(super) rtt: RttEstimator,
166    /// Whether we're enabling ECN on outgoing packets
167    pub(super) sending_ecn: bool,
168    /// Congestion controller state
169    pub(super) congestion: Box<dyn congestion::Controller>,
170    /// Pacing state
171    pub(super) pacing: Pacer,
172    /// Whether the last `poll_transmit_on_path` call yielded no data because there was
173    /// no outgoing application data.
174    ///
175    /// The RFC writes:
176    /// > When bytes in flight is smaller than the congestion window and sending is not pacing limited,
177    /// > the congestion window is underutilized. This can happen due to insufficient application data
178    /// > or flow control limits. When this occurs, the congestion window SHOULD NOT be increased in
179    /// > either slow start or congestion avoidance.
180    ///
181    /// (RFC9002, section 7.8)
182    ///
183    /// I.e. when app_limited is true, the congestion controller doesn't increase the congestion window.
184    pub(super) app_limited: bool,
185
186    /// Path challenges sent (on the wire, on-path) that we didn't receive a path response for yet
187    on_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
188    /// Whether to trigger sending another PATH_CHALLENGE in the next poll_transmit.
189    ///
190    /// This is picked up by [`super::Connection::space_can_send`].
191    ///
192    /// Only used for RFC9000-style path migration and multipath path validation (for opening).
193    ///
194    /// This is **not used** for n0 nat traversal challenge sending.
195    pub(super) pending_on_path_challenge: bool,
196    /// Pending responses to PATH_CHALLENGE frames
197    pub(super) path_responses: PathResponses,
198    /// Whether we're certain the peer can both send and receive on this address
199    ///
200    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
201    /// migration. Always true for clients.
202    pub(super) validated: bool,
203    /// Total size of all UDP datagrams sent on this path
204    pub(super) total_sent: u64,
205    /// Total size of all UDP datagrams received on this path
206    pub(super) total_recvd: u64,
207    /// The state of the MTU discovery process
208    pub(super) mtud: MtuDiscovery,
209    /// Packet number of the first packet sent after an RTT sample was collected on this path
210    ///
211    /// Used in persistent congestion determination.
212    pub(super) first_packet_after_rtt_sample: Option<(SpaceKind, u64)>,
213    /// The in-flight packets and bytes
214    ///
215    /// Note that this is across all spaces on this path
216    pub(super) in_flight: InFlight,
217    /// Whether this path has had it's remote address reported back to the peer. This only happens
218    /// if both peers agree to so based on their transport parameters.
219    pub(super) observed_addr_sent: bool,
220    /// Observed address frame with the largest sequence number received from the peer on this path.
221    pub(super) last_observed_addr_report: Option<ObservedAddr>,
222    /// The QUIC-MULTIPATH path status
223    pub(super) status: PathStatusState,
224    /// Number of the first packet sent on this path
225    ///
226    /// With RFC9000 §9 style migration (i.e. not multipath) the PathId does not change and
227    /// hence packet numbers continue. This is used to determine whether a packet was sent
228    /// on such an earlier path. Insufficient to determine if a packet was sent on a later
229    /// path.
230    first_packet: Option<u64>,
231    /// The number of times a tail-loss probe has been sent without receiving an ack.
232    ///
233    /// This is incremented by one every time the [`LossDetection`] timer fires because a
234    /// tail-loss probe needs to be sent. Once an acknowledgement for a packet is received
235    /// again it is reset to 0. Used to compute the PTO duration.
236    ///
237    /// [`LossDetection`]: super::timer::PathTimer::LossDetection
238    pub(super) pto_count: u32,
239
240    //
241    // Per-path idle & keep alive
242    //
243    /// Idle timeout for the path
244    ///
245    /// If expired, the path will be abandoned.  This is different from the connection-wide
246    /// idle timeout which closes the connection if expired.
247    pub(super) idle_timeout: Option<Duration>,
248    /// Keep alives to send on this path
249    ///
250    /// There is also a connection-level keep alive configured in the
251    /// [`TransportParameters`].  This triggers activity on any path which can keep the
252    /// connection alive.
253    ///
254    /// [`TransportParameters`]: crate::transport_parameters::TransportParameters
255    pub(super) keep_alive: Option<Duration>,
256    /// Whether to reset the idle timer when the next ack-eliciting packet is sent.
257    ///
258    /// Whenever we receive an authenticated packet the connection and path idle timers are
259    /// reset if a maximum idle timeout was negotiated. However on the first ack-eliciting
260    /// packet *sent* after this the idle timer also needs to be reset to avoid the idle
261    /// timer firing while the sent packet is in-fight. See
262    /// <https://www.rfc-editor.org/rfc/rfc9000.html#section-10.1>.
263    pub(super) permit_idle_reset: bool,
264
265    /// Whether the path has already been considered opened from an application perspective.
266    ///
267    /// This means, for paths other than the original [`PathId::ZERO`], a first path challenge has
268    /// been responded to, regardless of the initial validation status of the path. This state is
269    /// irreversible, since it's not affected by the path being closed.
270    ///
271    /// Sending a PATH_CHALLENGE and receiving a valid response before the application is informed
272    /// of the path, is a way to ensure the path is usable before it is reported. This is not
273    /// required by the spec, and in the future might be changed for simply requiring a first ack'd
274    /// packet.
275    pub(super) open_status: OpenStatus,
276
277    /// Whether we're currently draining the path after having abandoned it.
278    ///
279    /// This should only be true when a path discard timer is armed, and after the path was
280    /// abandoned (and added to the abandoned_paths set).
281    ///
282    /// This will only ever be set from false to true.
283    pub(super) draining: bool,
284
285    /// Snapshot of the qlog recovery metrics
286    #[cfg(feature = "qlog")]
287    recovery_metrics: RecoveryMetrics,
288
289    /// Tag uniquely identifying a path in a connection.
290    ///
291    /// When a migration happens on the same [`PathId`] we still detect a change in the
292    /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
293    /// value to keep track of which 4-tuple a packet belonged to.
294    generation: u64,
295}
296
297impl PathData {
298    pub(super) fn new(
299        network_path: FourTuple,
300        allow_mtud: bool,
301        peer_max_udp_payload_size: Option<u16>,
302        generation: u64,
303        now: Instant,
304        config: &TransportConfig,
305    ) -> Self {
306        let congestion = config
307            .congestion_controller_factory
308            .clone()
309            .build(now, config.get_initial_mtu());
310        Self {
311            network_path,
312            rtt: RttEstimator::new(config.initial_rtt),
313            sending_ecn: true,
314            pacing: Pacer::new(
315                config.initial_rtt,
316                congestion.initial_window(),
317                config.get_initial_mtu(),
318                config.max_outgoing_bytes_per_second,
319                now,
320            ),
321            congestion,
322            app_limited: false,
323            on_path_challenges_unconfirmed: Default::default(),
324            pending_on_path_challenge: false,
325            path_responses: PathResponses::default(),
326            validated: false,
327            total_sent: 0,
328            total_recvd: 0,
329            mtud: config
330                .mtu_discovery_config
331                .as_ref()
332                .filter(|_| allow_mtud)
333                .map_or_else(
334                    || MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
335                    |mtud_config| {
336                        MtuDiscovery::new(
337                            config.get_initial_mtu(),
338                            config.min_mtu,
339                            peer_max_udp_payload_size,
340                            mtud_config.clone(),
341                        )
342                    },
343                ),
344            first_packet_after_rtt_sample: None,
345            in_flight: InFlight::new(),
346            observed_addr_sent: false,
347            last_observed_addr_report: None,
348            status: Default::default(),
349            first_packet: None,
350            pto_count: 0,
351            idle_timeout: config.default_path_max_idle_timeout,
352            keep_alive: config.default_path_keep_alive_interval,
353            permit_idle_reset: true,
354            open_status: OpenStatus::default(),
355            draining: false,
356            #[cfg(feature = "qlog")]
357            recovery_metrics: RecoveryMetrics::default(),
358            generation,
359        }
360    }
361
362    /// Create a new path from a previous one.
363    ///
364    /// This should only be called when migrating paths.
365    pub(super) fn from_previous(
366        network_path: FourTuple,
367        prev: &Self,
368        generation: u64,
369        now: Instant,
370    ) -> Self {
371        let congestion = prev.congestion.clone_box();
372        let smoothed_rtt = prev.rtt.get();
373        Self {
374            network_path,
375            rtt: prev.rtt,
376            pacing: Pacer::new(
377                smoothed_rtt,
378                congestion.window(),
379                prev.current_mtu(),
380                prev.pacing.max_bytes_per_second(),
381                now,
382            ),
383            sending_ecn: true,
384            congestion,
385            app_limited: false,
386            on_path_challenges_unconfirmed: Default::default(),
387            pending_on_path_challenge: false,
388            path_responses: PathResponses::default(),
389            validated: false,
390            total_sent: 0,
391            total_recvd: 0,
392            mtud: prev.mtud.clone(),
393            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
394            in_flight: InFlight::new(),
395            observed_addr_sent: false,
396            last_observed_addr_report: None,
397            status: prev.status.clone(),
398            first_packet: None,
399            pto_count: 0,
400            idle_timeout: prev.idle_timeout,
401            keep_alive: prev.keep_alive,
402            permit_idle_reset: true,
403            open_status: OpenStatus::default(),
404            draining: false,
405            #[cfg(feature = "qlog")]
406            recovery_metrics: prev.recovery_metrics.clone(),
407            generation,
408        }
409    }
410
411    /// Whether we're in the process of validating this path with PATH_CHALLENGEs
412    pub(super) fn is_validating_path(&self) -> bool {
413        !self.on_path_challenges_unconfirmed.is_empty() || self.pending_on_path_challenge
414    }
415
416    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
417    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
418    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
419        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
420    }
421
422    /// Returns the path's current MTU
423    pub(super) fn current_mtu(&self) -> u16 {
424        self.mtud.current_mtu()
425    }
426
427    /// Account for transmission of `packet` with number `pn` in `space`
428    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
429        self.in_flight.insert(&packet);
430        if self.first_packet.is_none() {
431            self.first_packet = Some(pn);
432        }
433        if let Some(forgotten) = space.sent(pn, packet) {
434            self.remove_in_flight(&forgotten);
435        }
436    }
437
438    pub(super) fn record_path_challenge_sent(
439        &mut self,
440        now: Instant,
441        token: u64,
442        network_path: FourTuple,
443    ) {
444        let info = SentChallengeInfo {
445            sent_instant: now,
446            network_path,
447        };
448        debug_assert_eq!(network_path, self.network_path);
449        self.on_path_challenges_unconfirmed.insert(token, info);
450    }
451
452    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
453    /// `false` if `pn` was sent before this path was established.
454    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
455        if packet.path_generation != self.generation {
456            return false;
457        }
458        self.in_flight.remove(packet);
459        true
460    }
461
462    /// Increment the total size of sent UDP datagrams
463    pub(super) fn inc_total_sent(&mut self, inc: u64) {
464        self.total_sent = self.total_sent.saturating_add(inc);
465        if !self.validated {
466            trace!(
467                network_path = %self.network_path,
468                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
469                "anti amplification budget decreased"
470            );
471        }
472    }
473
474    /// Increment the total size of received UDP datagrams
475    pub(super) fn inc_total_recvd(&mut self, inc: u64) {
476        self.total_recvd = self.total_recvd.saturating_add(inc);
477        if !self.validated {
478            trace!(
479                network_path = %self.network_path,
480                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
481                "anti amplification budget increased"
482            );
483        }
484    }
485
486    /// The earliest time at which an on-path challenge we sent is considered lost.
487    pub(super) fn earliest_on_path_expiring_challenge(&self) -> Option<Instant> {
488        if self.on_path_challenges_unconfirmed.is_empty() {
489            return None;
490        }
491        let pto = self.rtt.pto_base();
492        self.on_path_challenges_unconfirmed
493            .values()
494            .map(|info| info.sent_instant + pto)
495            .min()
496    }
497
498    /// Handle receiving a PATH_RESPONSE.
499    pub(super) fn on_path_response_received(
500        &mut self,
501        now: Instant,
502        token: u64,
503        network_path: FourTuple,
504    ) -> OnPathResponseReceived {
505        // > § 8.2.3
506        // > Path validation succeeds when a PATH_RESPONSE frame is received that contains the
507        // > data that was sent in a previous PATH_CHALLENGE frame. A PATH_RESPONSE frame
508        // > received on any network path validates the path on which the PATH_CHALLENGE was
509        // > sent.
510        //
511        // At this point we have three potentially different network paths:
512        // - current network path (`Self::network_path`)
513        // - network path used to send the path challenge (`SentChallengeInfo::network_path`)
514        // - network path over which the response arrived (`network_path`)
515        //
516        // As per the spec, this only validates the network path on which this was *sent*.
517        match self.on_path_challenges_unconfirmed.remove(&token) {
518            // Response to an on-path PathChallenge that validates this path.
519            // The sent path should match the current path. However, it's possible that the
520            // challenge was sent when no local_ip was known. This case is allowed as well.
521            Some(info) if info.network_path.is_probably_same_path(&self.network_path) => {
522                self.network_path.update_local_if_same_remote(&network_path);
523                let sent_instant = info.sent_instant;
524                if !std::mem::replace(&mut self.validated, true) {
525                    trace!("new path validated");
526                }
527                // Clear any other on-path sent challenges and stop sending new ones.
528                self.on_path_challenges_unconfirmed.clear();
529                self.pending_on_path_challenge = false;
530
531                // This RTT can only be used for the initial RTT, not as a normal
532                // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
533                let rtt = now.saturating_duration_since(sent_instant);
534                self.rtt.reset_initial_rtt(rtt);
535
536                let prev_status = std::mem::replace(&mut self.open_status, OpenStatus::Informed);
537                OnPathResponseReceived::OnPath {
538                    was_open: matches!(prev_status, OpenStatus::Informed),
539                }
540            }
541            // Response to an on-path PathChallenge that does not validate this path.
542            Some(info) => {
543                // This is a valid path response, but this validates a 4-tuple we no longer
544                // have in use. Keep only sent challenges for the current path.
545                self.on_path_challenges_unconfirmed
546                    .retain(|_token, i| i.network_path == self.network_path);
547
548                // If there are no challenges for the current path, schedule one
549                if !self.on_path_challenges_unconfirmed.is_empty() {
550                    self.pending_on_path_challenge = true;
551                }
552                OnPathResponseReceived::Ignored {
553                    sent_on: info.network_path,
554                    current_path: self.network_path,
555                }
556            }
557            None => {
558                // Response to an unknown PathChallenge. Does not indicate failure.
559                OnPathResponseReceived::Unknown
560            }
561        }
562    }
563
564    /// Removes all on-path challenges we remember and cancels sending new on-path challenges.
565    pub(super) fn reset_on_path_challenges(&mut self) {
566        self.on_path_challenges_unconfirmed.clear();
567        self.pending_on_path_challenge = false;
568    }
569
570    #[cfg(feature = "qlog")]
571    pub(super) fn qlog_recovery_metrics(
572        &mut self,
573        path_id: PathId,
574    ) -> Option<RecoveryMetricsUpdated> {
575        let controller_metrics = self.congestion.metrics();
576
577        let metrics = RecoveryMetrics {
578            min_rtt: Some(self.rtt.min),
579            smoothed_rtt: Some(self.rtt.get()),
580            latest_rtt: Some(self.rtt.latest),
581            rtt_variance: Some(self.rtt.var),
582            pto_count: Some(self.pto_count),
583            bytes_in_flight: Some(self.in_flight.bytes),
584            packets_in_flight: Some(self.in_flight.ack_eliciting),
585
586            congestion_window: Some(controller_metrics.congestion_window),
587            ssthresh: controller_metrics.ssthresh,
588            pacing_rate: controller_metrics.pacing_rate,
589        };
590
591        let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
592        self.recovery_metrics = metrics;
593        event
594    }
595
596    /// Return how long we need to wait before sending `bytes_to_send`
597    ///
598    /// See [`Pacer::delay`].
599    pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Duration> {
600        let smoothed_rtt = self.rtt.get();
601        let metrics = self.congestion.metrics();
602        self.pacing.delay(
603            smoothed_rtt,
604            bytes_to_send,
605            self.current_mtu(),
606            metrics.congestion_window,
607            now,
608            metrics.send_quantum,
609            metrics.pacing_rate,
610        )
611    }
612
613    /// Updates the last observed address report received on this path.
614    ///
615    /// If the address was updated, it's returned to be informed to the application.
616    #[must_use = "updated observed address must be reported to the application"]
617    pub(super) fn update_observed_addr_report(
618        &mut self,
619        observed: ObservedAddr,
620    ) -> Option<SocketAddr> {
621        match self.last_observed_addr_report.as_mut() {
622            Some(prev) => {
623                if prev.seq_no >= observed.seq_no {
624                    // frames that do not increase the sequence number on this path are ignored
625                    None
626                } else if prev.ip == observed.ip && prev.port == observed.port {
627                    // keep track of the last seq_no but do not report the address as updated
628                    prev.seq_no = observed.seq_no;
629                    None
630                } else {
631                    let addr = observed.socket_addr();
632                    self.last_observed_addr_report = Some(observed);
633                    Some(addr)
634                }
635            }
636            None => {
637                let addr = observed.socket_addr();
638                self.last_observed_addr_report = Some(observed);
639                Some(addr)
640            }
641        }
642    }
643
644    pub(crate) fn remote_status(&self) -> Option<PathStatus> {
645        self.status.remote_status.map(|(_seq, status)| status)
646    }
647
648    pub(crate) fn local_status(&self) -> PathStatus {
649        self.status.local_status
650    }
651
652    /// Tag uniquely identifying a path in a connection.
653    ///
654    /// When a migration happens on the same [`PathId`] we still detect a change in the
655    /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
656    /// value to keep track of which 4-tuple a packet belonged to.
657    pub(super) fn generation(&self) -> u64 {
658        self.generation
659    }
660}
661
662pub(super) enum OnPathResponseReceived {
663    /// This response validates the path on its current remote address.
664    OnPath { was_open: bool },
665    /// The received token is unknown.
666    Unknown,
667    /// The response is valid but it's not usable for path validation.
668    Ignored {
669        sent_on: FourTuple,
670        current_path: FourTuple,
671    },
672}
673
674#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
675pub(super) enum OpenStatus {
676    /// A first packet has not been sent using this [`PathId`].
677    #[default]
678    Pending,
679    /// The first packet has been sent using this [`PathId`]. However, it is not yet deemed good
680    /// enough to be reported to the application.
681    Sent,
682    /// The application has been informed of this path.
683    Informed,
684}
685
686/// Congestion metrics as described in [`recovery_metrics_updated`].
687///
688/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
689#[cfg(feature = "qlog")]
690#[derive(Default, Clone, PartialEq, Debug)]
691#[non_exhaustive]
692struct RecoveryMetrics {
693    pub min_rtt: Option<Duration>,
694    pub smoothed_rtt: Option<Duration>,
695    pub latest_rtt: Option<Duration>,
696    pub rtt_variance: Option<Duration>,
697    pub pto_count: Option<u32>,
698    pub bytes_in_flight: Option<u64>,
699    pub packets_in_flight: Option<u64>,
700    pub congestion_window: Option<u64>,
701    pub ssthresh: Option<u64>,
702    pub pacing_rate: Option<u64>,
703}
704
705#[cfg(feature = "qlog")]
706impl RecoveryMetrics {
707    /// Retain only values that have been updated since the last snapshot.
708    fn retain_updated(&self, previous: &Self) -> Self {
709        macro_rules! keep_if_changed {
710            ($name:ident) => {
711                if previous.$name == self.$name {
712                    None
713                } else {
714                    self.$name
715                }
716            };
717        }
718
719        Self {
720            min_rtt: keep_if_changed!(min_rtt),
721            smoothed_rtt: keep_if_changed!(smoothed_rtt),
722            latest_rtt: keep_if_changed!(latest_rtt),
723            rtt_variance: keep_if_changed!(rtt_variance),
724            pto_count: keep_if_changed!(pto_count),
725            bytes_in_flight: keep_if_changed!(bytes_in_flight),
726            packets_in_flight: keep_if_changed!(packets_in_flight),
727            congestion_window: keep_if_changed!(congestion_window),
728            ssthresh: keep_if_changed!(ssthresh),
729            pacing_rate: keep_if_changed!(pacing_rate),
730        }
731    }
732
733    /// Emit a `MetricsUpdated` event containing only updated values
734    fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
735        let updated = self.retain_updated(previous);
736
737        if updated == Self::default() {
738            return None;
739        }
740
741        Some(RecoveryMetricsUpdated {
742            min_rtt: updated.min_rtt.map(|rtt| rtt.as_micros() as f32 / 1000.0),
743            smoothed_rtt: updated
744                .smoothed_rtt
745                .map(|rtt| rtt.as_micros() as f32 / 1000.0),
746            latest_rtt: updated
747                .latest_rtt
748                .map(|rtt| rtt.as_micros() as f32 / 1000.0),
749            rtt_variance: updated
750                .rtt_variance
751                .map(|rtt| rtt.as_micros() as f32 / 1000.0),
752            pto_count: updated
753                .pto_count
754                .map(|count| count.try_into().unwrap_or(u16::MAX)),
755            bytes_in_flight: updated.bytes_in_flight,
756            packets_in_flight: updated.packets_in_flight,
757            congestion_window: updated.congestion_window,
758            ssthresh: updated.ssthresh,
759            pacing_rate: updated.pacing_rate,
760            path_id: Some(path_id.as_u32() as u64),
761        })
762    }
763}
764
765/// RTT estimation for a particular network path
766#[derive(Copy, Clone, Debug)]
767pub struct RttEstimator {
768    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
769    latest: Duration,
770    /// The smoothed RTT of the connection, computed as described in RFC6298
771    smoothed: Option<Duration>,
772    /// The RTT variance, computed as described in RFC6298
773    var: Duration,
774    /// The minimum RTT seen in the connection, ignoring ack delay.
775    min: Duration,
776}
777
778impl RttEstimator {
779    pub(super) fn new(initial_rtt: Duration) -> Self {
780        Self {
781            latest: initial_rtt,
782            smoothed: None,
783            var: initial_rtt / 2,
784            min: initial_rtt,
785        }
786    }
787
788    /// Resets the estimator using a new initial_rtt value.
789    ///
790    /// This only resets the initial_rtt **if** no samples have been recorded yet. If there
791    /// are any recorded samples the initial estimate can not be adjusted after the fact.
792    ///
793    /// This is useful when you receive a PATH_RESPONSE in the first packet received on a
794    /// new path. In this case you can use the delay of the PATH_CHALLENGE-PATH_RESPONSE as
795    /// the initial RTT to get a better expected estimation.
796    ///
797    /// A PATH_CHALLENGE-PATH_RESPONSE pair later in the connection should not be used
798    /// explicitly as an estimation since PATH_CHALLENGE is an ACK-eliciting packet itself
799    /// already.
800    pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
801        if self.smoothed.is_none() {
802            self.latest = initial_rtt;
803            self.var = initial_rtt / 2;
804            self.min = initial_rtt;
805        }
806    }
807
808    /// The current best RTT estimation.
809    pub fn get(&self) -> Duration {
810        self.smoothed.unwrap_or(self.latest)
811    }
812
813    /// Conservative estimate of RTT
814    ///
815    /// Takes the maximum of smoothed and latest RTT, as recommended
816    /// in 6.1.2 of the recovery spec (draft 29).
817    pub fn conservative(&self) -> Duration {
818        self.get().max(self.latest)
819    }
820
821    /// Minimum RTT registered so far for this estimator.
822    pub fn min(&self) -> Duration {
823        self.min
824    }
825
826    /// PTO computed as described in RFC9002#6.2.1.
827    pub(crate) fn pto_base(&self) -> Duration {
828        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
829    }
830
831    /// Records an RTT sample.
832    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
833        self.latest = rtt;
834        // https://www.rfc-editor.org/rfc/rfc9002.html#section-5.2-3:
835        // min_rtt does not adjust for ack_delay to avoid underestimating.
836        self.min = cmp::min(self.min, self.latest);
837        // Based on RFC6298.
838        if let Some(smoothed) = self.smoothed {
839            let adjusted_rtt = if self.min + ack_delay <= self.latest {
840                self.latest - ack_delay
841            } else {
842                self.latest
843            };
844            let var_sample = smoothed.abs_diff(adjusted_rtt);
845            self.var = (3 * self.var + var_sample) / 4;
846            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
847        } else {
848            self.smoothed = Some(self.latest);
849            self.var = self.latest / 2;
850            self.min = self.latest;
851        }
852    }
853}
854
855#[derive(Default, Debug)]
856pub(crate) struct PathResponses {
857    pending: Vec<PathResponse>,
858}
859
860impl PathResponses {
861    pub(crate) fn push(&mut self, packet: u64, token: u64, network_path: FourTuple) {
862        /// An arbitrary permissive limit to prevent abuse.
863        ///
864        /// If we've negotiated the n0 NAT Traversal extension, and one user might have a lot
865        /// of addresses, e.g. because of having lots of interfaces (we've seen >25 interfaces
866        /// on Macs with docker and other things), then we need to be able to process at least
867        /// as many PATH_CHALLENGE frames as there are interfaces.
868        /// On top of that, there are retries, which make it possible that we need to process
869        /// even more.
870        ///
871        /// Considering that there can be up to 2 `PathData`s per active `PathId`, and
872        /// reasonable default values for maximum concurrent multipath paths are ~8 and each
873        /// `PathResponse` struct takes up 72 bytes at the moment this, means an attacker can
874        /// cause us to keep `32 * 2 * 8 * 72 = ~37KB` of data around.
875        const MAX_PATH_RESPONSES: usize = 32;
876        let response = PathResponse {
877            packet,
878            token,
879            network_path,
880        };
881        let existing = self
882            .pending
883            .iter_mut()
884            .find(|x| x.network_path.remote == network_path.remote);
885        if let Some(existing) = existing {
886            // Update a queued response
887            if existing.packet <= packet {
888                *existing = response;
889            }
890            return;
891        }
892        if self.pending.len() < MAX_PATH_RESPONSES {
893            self.pending.push(response);
894        } else {
895            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
896            // older challenges.
897            trace!("ignoring excessive PATH_CHALLENGE");
898        }
899    }
900
901    pub(crate) fn pop_off_path(&mut self, network_path: FourTuple) -> Option<(u64, FourTuple)> {
902        let response = *self.pending.last()?;
903        // We use an exact comparison here, because once we've received for the first time,
904        // we really should either already have a local_ip, or we will never get one
905        // (because our OS doesn't support it).
906        if response.network_path == network_path {
907            // We don't bother searching further because we expect that the on-path response will
908            // get drained in the immediate future by a call to `pop_on_path`
909            return None;
910        }
911        self.pending.pop();
912        Some((response.token, response.network_path))
913    }
914
915    pub(crate) fn pop_on_path(&mut self, network_path: FourTuple) -> Option<u64> {
916        let response = *self.pending.last()?;
917        // Using an exact comparison. See explanation in `pop_off_path`.
918        if response.network_path != network_path {
919            // We don't bother searching further because we expect that the off-path response will
920            // get drained in the immediate future by a call to `pop_off_path`
921            return None;
922        }
923        self.pending.pop();
924        Some(response.token)
925    }
926
927    pub(crate) fn is_empty(&self) -> bool {
928        self.pending.is_empty()
929    }
930}
931
932#[derive(Copy, Clone, Debug)]
933struct PathResponse {
934    /// The packet number the corresponding PATH_CHALLENGE was received in
935    packet: u64,
936    /// The token of the PATH_CHALLENGE
937    token: u64,
938    /// The path the corresponding PATH_CHALLENGE was received from
939    network_path: FourTuple,
940}
941
942/// Summary statistics of packets that have been sent on a particular path, but which have not yet
943/// been acked or deemed lost
944#[derive(Debug)]
945pub(super) struct InFlight {
946    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
947    ///
948    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
949    /// count towards this to ensure congestion control does not impede congestion feedback.
950    pub(super) bytes: u64,
951    /// Number of packets in flight containing frames other than ACK and PADDING
952    ///
953    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
954    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
955    /// also be nonzero.
956    pub(super) ack_eliciting: u64,
957}
958
959impl InFlight {
960    fn new() -> Self {
961        Self {
962            bytes: 0,
963            ack_eliciting: 0,
964        }
965    }
966
967    fn insert(&mut self, packet: &SentPacket) {
968        self.bytes += u64::from(packet.size);
969        self.ack_eliciting += u64::from(packet.ack_eliciting);
970    }
971
972    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
973    fn remove(&mut self, packet: &SentPacket) {
974        self.bytes -= u64::from(packet.size);
975        self.ack_eliciting -= u64::from(packet.ack_eliciting);
976    }
977}
978
979/// State for QUIC-MULTIPATH PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP frames
980#[derive(Debug, Clone, Default)]
981pub(super) struct PathStatusState {
982    /// The local status
983    local_status: PathStatus,
984    /// Local sequence number, for both PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP
985    ///
986    /// This is the number of the *next* path status frame to be sent.
987    local_seq: VarInt,
988    /// The status set by the remote
989    remote_status: Option<(VarInt, PathStatus)>,
990}
991
992impl PathStatusState {
993    /// To be called on received PATH_STATUS_AVAILABLE/PATH_STATUS_BACKUP frames
994    pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
995        if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
996            return trace!(%seq, "ignoring path status update");
997        }
998
999        let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
1000        if prev != Some(status) {
1001            debug!(?status, ?seq, "remote changed path status");
1002        }
1003    }
1004
1005    /// Updates the local status
1006    ///
1007    /// If the local status changed, the previous value is returned
1008    pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
1009        if self.local_status == status {
1010            return None;
1011        }
1012
1013        self.local_seq = self.local_seq.saturating_add(1u8);
1014        Some(std::mem::replace(&mut self.local_status, status))
1015    }
1016
1017    pub(crate) fn seq(&self) -> VarInt {
1018        self.local_seq
1019    }
1020}
1021
1022/// The QUIC-MULTIPATH path status
1023///
1024/// See section "3.3 Path Status Management":
1025/// <https://quicwg.org/multipath/draft-ietf-quic-multipath.html#name-path-status-management>
1026#[cfg_attr(test, derive(test_strategy::Arbitrary))]
1027#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
1028pub enum PathStatus {
1029    /// Paths marked with as available will be used when scheduling packets
1030    ///
1031    /// If multiple paths are available, packets will be scheduled on whichever has
1032    /// capacity.
1033    #[default]
1034    Available,
1035    /// Paths marked as backup will only be used if there are no available paths
1036    ///
1037    /// If the max_idle_timeout is specified the path will be kept alive so that it does not
1038    /// expire.
1039    Backup,
1040}
1041
1042/// Application events about paths
1043#[derive(Debug, Clone, PartialEq, Eq)]
1044pub enum PathEvent {
1045    /// A new path has established connection with the peer.
1046    Established {
1047        /// The path which can now be used for application data.
1048        id: PathId,
1049    },
1050    /// A path was abandoned and is no longer usable.
1051    ///
1052    /// This event will always be followed by [`Self::Discarded`] after some time.
1053    Abandoned {
1054        /// With path was abandoned.
1055        id: PathId,
1056        /// Reason why this path was abandoned.
1057        reason: PathAbandonReason,
1058    },
1059    /// A path was discarded and all remaining state for it has been removed.
1060    ///
1061    /// This event is the last event for a path, and is always emitted after [`Self::Abandoned`].
1062    Discarded {
1063        /// Which path had its state dropped
1064        id: PathId,
1065        /// The final path stats, they are no longer available via [`Connection::stats`]
1066        ///
1067        /// [`Connection::stats`]: super::Connection::stats
1068        path_stats: Box<PathStats>,
1069    },
1070    /// The remote changed the status of the path
1071    ///
1072    /// The local status is not changed because of this event. It is up to the application
1073    /// to update the local status, which is used for packet scheduling, when the remote
1074    /// changes the status.
1075    RemoteStatus {
1076        /// Path which has changed status
1077        id: PathId,
1078        /// The new status set by the remote
1079        status: PathStatus,
1080    },
1081    /// Received an observation of our external address from the peer.
1082    ObservedAddr {
1083        /// Path over which the observed address was reported, [`PathId::ZERO`] when multipath is
1084        /// not negotiated
1085        id: PathId,
1086        /// The address observed by the remote over this path
1087        addr: SocketAddr,
1088    },
1089}
1090
1091/// Reason for why a path was abandoned.
1092#[derive(Debug, Clone, Eq, PartialEq)]
1093pub enum PathAbandonReason {
1094    /// The path was closed locally by the application.
1095    ApplicationClosed {
1096        /// The error code to be sent with the abandon frame.
1097        error_code: VarInt,
1098    },
1099    /// We didn't receive a path response in time after opening this path.
1100    ValidationFailed,
1101    /// We didn't receive any data from the remote within the path's idle timeout.
1102    TimedOut,
1103    /// The path became unusable after a local network change.
1104    UnusableAfterNetworkChange,
1105    /// The remote closed the path.
1106    RemoteAbandoned {
1107        /// The error that was sent with the abandon frame.
1108        error_code: VarInt,
1109    },
1110}
1111
1112impl PathAbandonReason {
1113    /// Whether this abandon was initiated by the remote peer.
1114    pub(crate) fn is_remote(&self) -> bool {
1115        matches!(self, Self::RemoteAbandoned { .. })
1116    }
1117
1118    /// Returns the error code to send with a PATH_ABANDON frame.
1119    pub(crate) fn error_code(&self) -> TransportErrorCode {
1120        match self {
1121            Self::ApplicationClosed { error_code } => (*error_code).into(),
1122            Self::ValidationFailed | Self::TimedOut | Self::UnusableAfterNetworkChange => {
1123                TransportErrorCode::PATH_UNSTABLE_OR_POOR
1124            }
1125            Self::RemoteAbandoned { error_code } => (*error_code).into(),
1126        }
1127    }
1128}
1129
1130/// Error from setting path status
1131#[derive(Debug, Error, Clone, PartialEq, Eq)]
1132pub enum SetPathStatusError {
1133    /// Error indicating that a path has not been opened or has already been abandoned
1134    #[error("closed path")]
1135    ClosedPath,
1136    /// Error indicating that this operation requires multipath to be negotiated whereas it hasn't been
1137    #[error("multipath not negotiated")]
1138    MultipathNotNegotiated,
1139}
1140
1141/// Error indicating that a path has not been opened or has already been abandoned
1142#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
1143#[error("closed path")]
1144pub struct ClosedPath {
1145    pub(super) _private: (),
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150    use super::*;
1151
1152    #[test]
1153    fn test_path_id_saturating_add() {
1154        // add within range behaves normally
1155        let large: PathId = u16::MAX.into();
1156        let next = u32::from(u16::MAX) + 1;
1157        assert_eq!(large.saturating_add(1u8), PathId::from(next));
1158
1159        // outside range saturates
1160        assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
1161    }
1162}