iroh_quinn_proto/connection/
paths.rs

1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8    PathError, PathStats,
9    mtud::MtuDiscovery,
10    pacing::Pacer,
11    spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14    ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig, VarInt,
15    coding::{self, Decodable, Encodable},
16    congestion,
17    frame::ObservedAddr,
18    packet::SpaceId,
19};
20
21#[cfg(feature = "qlog")]
22use qlog::events::quic::RecoveryMetricsUpdated;
23
24/// Id representing different paths when using multipath extension
25#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
26pub struct PathId(pub(crate) u32);
27
28impl std::hash::Hash for PathId {
29    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
30        state.write_u32(self.0);
31    }
32}
33
34impl identity_hash::IdentityHashable for PathId {}
35
36impl Decodable for PathId {
37    fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
38        let v = VarInt::decode(r)?;
39        let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
40        Ok(Self(v))
41    }
42}
43
44impl Encodable for PathId {
45    fn encode<B: bytes::BufMut>(&self, w: &mut B) {
46        VarInt(self.0.into()).encode(w)
47    }
48}
49
50impl PathId {
51    /// The maximum path ID allowed.
52    pub const MAX: Self = Self(u32::MAX);
53
54    /// The 0 path id.
55    pub const ZERO: Self = Self(0);
56
57    /// The number of bytes this [`PathId`] uses when encoded as a [`VarInt`]
58    pub(crate) const fn size(&self) -> usize {
59        VarInt(self.0 as u64).size()
60    }
61
62    /// Saturating integer addition. Computes self + rhs, saturating at the numeric bounds instead
63    /// of overflowing.
64    pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
65        let rhs = rhs.into();
66        let inner = self.0.saturating_add(rhs.0);
67        Self(inner)
68    }
69
70    /// Saturating integer subtraction. Computes self - rhs, saturating at the numeric bounds
71    /// instead of overflowing.
72    pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
73        let rhs = rhs.into();
74        let inner = self.0.saturating_sub(rhs.0);
75        Self(inner)
76    }
77
78    /// Get the next [`PathId`]
79    pub(crate) fn next(&self) -> Self {
80        self.saturating_add(Self(1))
81    }
82
83    /// Get the underlying u32
84    pub(crate) fn as_u32(&self) -> u32 {
85        self.0
86    }
87}
88
89impl std::fmt::Display for PathId {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        self.0.fmt(f)
92    }
93}
94
95impl<T: Into<u32>> From<T> for PathId {
96    fn from(source: T) -> Self {
97        Self(source.into())
98    }
99}
100
101/// State needed for a single path ID.
102///
103/// A single path ID can migrate according to the rules in RFC9000 §9, either voluntary or
104/// involuntary. We need to keep the [`PathData`] of the previously used such path available
105/// in order to defend against migration attacks (see RFC9000 §9.3.1, §9.3.2 and §9.3.3) as
106/// well as to support path probing (RFC9000 §9.1).
107#[derive(Debug)]
108pub(super) struct PathState {
109    pub(super) data: PathData,
110    pub(super) prev: Option<(ConnectionId, PathData)>,
111}
112
113impl PathState {
114    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
115    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
116        // Visit known paths from newest to oldest to find the one `pn` was sent on
117        for path_data in [&mut self.data]
118            .into_iter()
119            .chain(self.prev.as_mut().map(|(_, data)| data))
120        {
121            if path_data.remove_in_flight(packet) {
122                return;
123            }
124        }
125    }
126}
127
128#[derive(Debug)]
129pub(super) struct SentChallengeInfo {
130    /// When was the challenge sent on the wire.
131    pub(super) sent_instant: Instant,
132    /// The 4-tuple on which this path challenge was sent.
133    pub(super) network_path: FourTuple,
134}
135
136/// Description of a particular network path
137#[derive(Debug)]
138pub(super) struct PathData {
139    pub(super) network_path: FourTuple,
140    pub(super) rtt: RttEstimator,
141    /// Whether we're enabling ECN on outgoing packets
142    pub(super) sending_ecn: bool,
143    /// Congestion controller state
144    pub(super) congestion: Box<dyn congestion::Controller>,
145    /// Pacing state
146    pub(super) pacing: Pacer,
147    /// Actually sent challenges (on the wire).
148    pub(super) challenges_sent: IntMap<u64, SentChallengeInfo>,
149    /// Whether to *immediately* trigger another PATH_CHALLENGE.
150    ///
151    /// This is picked up by [`super::Connection::space_can_send`].
152    pub(super) send_new_challenge: bool,
153    /// Pending responses to PATH_CHALLENGE frames
154    pub(super) path_responses: PathResponses,
155    /// Whether we're certain the peer can both send and receive on this address
156    ///
157    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
158    /// migration. Always true for clients.
159    pub(super) validated: bool,
160    /// Total size of all UDP datagrams sent on this path
161    pub(super) total_sent: u64,
162    /// Total size of all UDP datagrams received on this path
163    pub(super) total_recvd: u64,
164    /// The state of the MTU discovery process
165    pub(super) mtud: MtuDiscovery,
166    /// Packet number of the first packet sent after an RTT sample was collected on this path
167    ///
168    /// Used in persistent congestion determination.
169    pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
170    /// The in-flight packets and bytes
171    ///
172    /// Note that this is across all spaces on this path
173    pub(super) in_flight: InFlight,
174    /// Whether this path has had it's remote address reported back to the peer. This only happens
175    /// if both peers agree to so based on their transport parameters.
176    pub(super) observed_addr_sent: bool,
177    /// Observed address frame with the largest sequence number received from the peer on this path.
178    pub(super) last_observed_addr_report: Option<ObservedAddr>,
179    /// The QUIC-MULTIPATH path status
180    pub(super) status: PathStatusState,
181    /// Number of the first packet sent on this path
182    ///
183    /// With RFC9000 §9 style migration (i.e. not multipath) the PathId does not change and
184    /// hence packet numbers continue. This is used to determine whether a packet was sent
185    /// on such an earlier path. Insufficient to determine if a packet was sent on a later
186    /// path.
187    first_packet: Option<u64>,
188    /// The number of times a PTO has been sent without receiving an ack.
189    pub(super) pto_count: u32,
190
191    //
192    // Per-path idle & keep alive
193    //
194    /// Idle timeout for the path
195    ///
196    /// If expired, the path will be abandoned.  This is different from the connection-wide
197    /// idle timeout which closes the connection if expired.
198    pub(super) idle_timeout: Option<Duration>,
199    /// Keep alives to send on this path
200    ///
201    /// There is also a connection-level keep alive configured in the
202    /// [`TransportParameters`].  This triggers activity on any path which can keep the
203    /// connection alive.
204    ///
205    /// [`TransportParameters`]: crate::transport_parameters::TransportParameters
206    pub(super) keep_alive: Option<Duration>,
207
208    /// Whether the path has already been considered opened from an application perspective
209    ///
210    /// This means, for paths other than the original [`PathId::ZERO`], a first path challenge has
211    /// been responded to, regardless of the initial validation status of the path. This state is
212    /// irreversible, since it's not affected by the path being closed.
213    pub(super) open: bool,
214
215    /// The time at which this path state should've received a PATH_ABANDON already.
216    ///
217    /// Receiving data on this path generates a transport error after that point in time.
218    /// This is checked in [`crate::Connection::on_packet_authenticated`].
219    ///
220    /// If set to `None`, then this path isn't abandoned yet and is allowed to receive data.
221    pub(super) last_allowed_receive: Option<Instant>,
222
223    /// Snapshot of the qlog recovery metrics
224    #[cfg(feature = "qlog")]
225    recovery_metrics: RecoveryMetrics,
226
227    /// Tag uniquely identifying a path in a connection
228    generation: u64,
229}
230
231impl PathData {
232    pub(super) fn new(
233        network_path: FourTuple,
234        allow_mtud: bool,
235        peer_max_udp_payload_size: Option<u16>,
236        generation: u64,
237        now: Instant,
238        config: &TransportConfig,
239    ) -> Self {
240        let congestion = config
241            .congestion_controller_factory
242            .clone()
243            .build(now, config.get_initial_mtu());
244        Self {
245            network_path,
246            rtt: RttEstimator::new(config.initial_rtt),
247            sending_ecn: true,
248            pacing: Pacer::new(
249                config.initial_rtt,
250                congestion.initial_window(),
251                config.get_initial_mtu(),
252                now,
253            ),
254            congestion,
255            challenges_sent: Default::default(),
256            send_new_challenge: false,
257            path_responses: PathResponses::default(),
258            validated: false,
259            total_sent: 0,
260            total_recvd: 0,
261            mtud: config
262                .mtu_discovery_config
263                .as_ref()
264                .filter(|_| allow_mtud)
265                .map_or(
266                    MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
267                    |mtud_config| {
268                        MtuDiscovery::new(
269                            config.get_initial_mtu(),
270                            config.min_mtu,
271                            peer_max_udp_payload_size,
272                            mtud_config.clone(),
273                        )
274                    },
275                ),
276            first_packet_after_rtt_sample: None,
277            in_flight: InFlight::new(),
278            observed_addr_sent: false,
279            last_observed_addr_report: None,
280            status: Default::default(),
281            first_packet: None,
282            pto_count: 0,
283            idle_timeout: config.default_path_max_idle_timeout,
284            keep_alive: config.default_path_keep_alive_interval,
285            open: false,
286            last_allowed_receive: None,
287            #[cfg(feature = "qlog")]
288            recovery_metrics: RecoveryMetrics::default(),
289            generation,
290        }
291    }
292
293    /// Create a new path from a previous one.
294    ///
295    /// This should only be called when migrating paths.
296    pub(super) fn from_previous(
297        network_path: FourTuple,
298        prev: &Self,
299        generation: u64,
300        now: Instant,
301    ) -> Self {
302        let congestion = prev.congestion.clone_box();
303        let smoothed_rtt = prev.rtt.get();
304        Self {
305            network_path,
306            rtt: prev.rtt,
307            pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
308            sending_ecn: true,
309            congestion,
310            challenges_sent: Default::default(),
311            send_new_challenge: false,
312            path_responses: PathResponses::default(),
313            validated: false,
314            total_sent: 0,
315            total_recvd: 0,
316            mtud: prev.mtud.clone(),
317            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
318            in_flight: InFlight::new(),
319            observed_addr_sent: false,
320            last_observed_addr_report: None,
321            status: prev.status.clone(),
322            first_packet: None,
323            pto_count: 0,
324            idle_timeout: prev.idle_timeout,
325            keep_alive: prev.keep_alive,
326            open: false,
327            last_allowed_receive: None,
328            #[cfg(feature = "qlog")]
329            recovery_metrics: prev.recovery_metrics.clone(),
330            generation,
331        }
332    }
333
334    /// Whether we're in the process of validating this path with PATH_CHALLENGEs
335    pub(super) fn is_validating_path(&self) -> bool {
336        !self.challenges_sent.is_empty() || self.send_new_challenge
337    }
338
339    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
340    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
341    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
342        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
343    }
344
345    /// Returns the path's current MTU
346    pub(super) fn current_mtu(&self) -> u16 {
347        self.mtud.current_mtu()
348    }
349
350    /// Account for transmission of `packet` with number `pn` in `space`
351    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
352        self.in_flight.insert(&packet);
353        if self.first_packet.is_none() {
354            self.first_packet = Some(pn);
355        }
356        if let Some(forgotten) = space.sent(pn, packet) {
357            self.remove_in_flight(&forgotten);
358        }
359    }
360
361    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
362    /// `false` if `pn` was sent before this path was established.
363    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
364        if packet.path_generation != self.generation {
365            return false;
366        }
367        self.in_flight.remove(packet);
368        true
369    }
370
371    /// Increment the total size of sent UDP datagrams
372    pub(super) fn inc_total_sent(&mut self, inc: u64) {
373        self.total_sent = self.total_sent.saturating_add(inc);
374        if !self.validated {
375            trace!(
376                network_path = %self.network_path,
377                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
378                "anti amplification budget decreased"
379            );
380        }
381    }
382
383    /// Increment the total size of received UDP datagrams
384    pub(super) fn inc_total_recvd(&mut self, inc: u64) {
385        self.total_recvd = self.total_recvd.saturating_add(inc);
386        if !self.validated {
387            trace!(
388                network_path = %self.network_path,
389                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
390                "anti amplification budget increased"
391            );
392        }
393    }
394
395    /// The earliest time at which a sent challenge is considered lost.
396    pub(super) fn earliest_expiring_challenge(&self) -> Option<Instant> {
397        if self.challenges_sent.is_empty() {
398            return None;
399        }
400        let pto = self.rtt.pto_base();
401        self.challenges_sent
402            .values()
403            .map(|info| info.sent_instant)
404            .min()
405            .map(|sent_instant| sent_instant + pto)
406    }
407
408    /// Handle receiving a PATH_RESPONSE.
409    pub(super) fn on_path_response_received(
410        &mut self,
411        now: Instant,
412        token: u64,
413        network_path: FourTuple,
414    ) -> OnPathResponseReceived {
415        match self.challenges_sent.get(&token) {
416            // Response to an on-path PathChallenge
417            Some(info)
418                if info.network_path.is_probably_same_path(&network_path)
419                    && self.network_path.is_probably_same_path(&network_path) =>
420            {
421                self.network_path.update_local_if_same_remote(&network_path);
422                let sent_instant = info.sent_instant;
423                if !std::mem::replace(&mut self.validated, true) {
424                    trace!("new path validated");
425                }
426                // Clear any other on-path sent challenge.
427                self.challenges_sent
428                    .retain(|_token, info| !info.network_path.is_probably_same_path(&network_path));
429
430                self.send_new_challenge = false;
431
432                // This RTT can only be used for the initial RTT, not as a normal
433                // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
434                let rtt = now.saturating_duration_since(sent_instant);
435                self.rtt.reset_initial_rtt(rtt);
436
437                let was_open = std::mem::replace(&mut self.open, true);
438                OnPathResponseReceived::OnPath { was_open }
439            }
440            // Response to an off-path PathChallenge
441            Some(info) if info.network_path.is_probably_same_path(&network_path) => {
442                self.challenges_sent
443                    .retain(|_token, info| !info.network_path.is_probably_same_path(&network_path));
444                OnPathResponseReceived::OffPath
445            }
446            // Response to a PathChallenge we recognize, but from an invalid remote
447            Some(info) => OnPathResponseReceived::Invalid {
448                expected: info.network_path,
449            },
450            // Response to an unknown PathChallenge
451            None => OnPathResponseReceived::Unknown,
452        }
453    }
454
455    #[cfg(feature = "qlog")]
456    pub(super) fn qlog_recovery_metrics(
457        &mut self,
458        path_id: PathId,
459    ) -> Option<RecoveryMetricsUpdated> {
460        let controller_metrics = self.congestion.metrics();
461
462        let metrics = RecoveryMetrics {
463            min_rtt: Some(self.rtt.min),
464            smoothed_rtt: Some(self.rtt.get()),
465            latest_rtt: Some(self.rtt.latest),
466            rtt_variance: Some(self.rtt.var),
467            pto_count: Some(self.pto_count),
468            bytes_in_flight: Some(self.in_flight.bytes),
469            packets_in_flight: Some(self.in_flight.ack_eliciting),
470
471            congestion_window: Some(controller_metrics.congestion_window),
472            ssthresh: controller_metrics.ssthresh,
473            pacing_rate: controller_metrics.pacing_rate,
474        };
475
476        let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
477        self.recovery_metrics = metrics;
478        event
479    }
480
481    /// Return how long we need to wait before sending `bytes_to_send`
482    ///
483    /// See [`Pacer::delay`].
484    pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Instant> {
485        let smoothed_rtt = self.rtt.get();
486        self.pacing.delay(
487            smoothed_rtt,
488            bytes_to_send,
489            self.current_mtu(),
490            self.congestion.window(),
491            now,
492        )
493    }
494
495    /// Updates the last observed address report received on this path.
496    ///
497    /// If the address was updated, it's returned to be informed to the application.
498    #[must_use = "updated observed address must be reported to the application"]
499    pub(super) fn update_observed_addr_report(
500        &mut self,
501        observed: ObservedAddr,
502    ) -> Option<SocketAddr> {
503        match self.last_observed_addr_report.as_mut() {
504            Some(prev) => {
505                if prev.seq_no >= observed.seq_no {
506                    // frames that do not increase the sequence number on this path are ignored
507                    None
508                } else if prev.ip == observed.ip && prev.port == observed.port {
509                    // keep track of the last seq_no but do not report the address as updated
510                    prev.seq_no = observed.seq_no;
511                    None
512                } else {
513                    let addr = observed.socket_addr();
514                    self.last_observed_addr_report = Some(observed);
515                    Some(addr)
516                }
517            }
518            None => {
519                let addr = observed.socket_addr();
520                self.last_observed_addr_report = Some(observed);
521                Some(addr)
522            }
523        }
524    }
525
526    pub(crate) fn remote_status(&self) -> Option<PathStatus> {
527        self.status.remote_status.map(|(_seq, status)| status)
528    }
529
530    pub(crate) fn local_status(&self) -> PathStatus {
531        self.status.local_status
532    }
533
534    pub(super) fn generation(&self) -> u64 {
535        self.generation
536    }
537}
538
539pub(super) enum OnPathResponseReceived {
540    /// This response validates the path on its current remote address.
541    OnPath { was_open: bool },
542    /// This response is valid, but it's for a remote other than the path's current remote address.
543    OffPath,
544    /// The received token is unknown.
545    Unknown,
546    /// The response is invalid.
547    Invalid {
548        /// The 4-tuple that was expected for this token.
549        expected: FourTuple,
550    },
551}
552
553/// Congestion metrics as described in [`recovery_metrics_updated`].
554///
555/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
556#[cfg(feature = "qlog")]
557#[derive(Default, Clone, PartialEq, Debug)]
558#[non_exhaustive]
559struct RecoveryMetrics {
560    pub min_rtt: Option<Duration>,
561    pub smoothed_rtt: Option<Duration>,
562    pub latest_rtt: Option<Duration>,
563    pub rtt_variance: Option<Duration>,
564    pub pto_count: Option<u32>,
565    pub bytes_in_flight: Option<u64>,
566    pub packets_in_flight: Option<u64>,
567    pub congestion_window: Option<u64>,
568    pub ssthresh: Option<u64>,
569    pub pacing_rate: Option<u64>,
570}
571
572#[cfg(feature = "qlog")]
573impl RecoveryMetrics {
574    /// Retain only values that have been updated since the last snapshot.
575    fn retain_updated(&self, previous: &Self) -> Self {
576        macro_rules! keep_if_changed {
577            ($name:ident) => {
578                if previous.$name == self.$name {
579                    None
580                } else {
581                    self.$name
582                }
583            };
584        }
585
586        Self {
587            min_rtt: keep_if_changed!(min_rtt),
588            smoothed_rtt: keep_if_changed!(smoothed_rtt),
589            latest_rtt: keep_if_changed!(latest_rtt),
590            rtt_variance: keep_if_changed!(rtt_variance),
591            pto_count: keep_if_changed!(pto_count),
592            bytes_in_flight: keep_if_changed!(bytes_in_flight),
593            packets_in_flight: keep_if_changed!(packets_in_flight),
594            congestion_window: keep_if_changed!(congestion_window),
595            ssthresh: keep_if_changed!(ssthresh),
596            pacing_rate: keep_if_changed!(pacing_rate),
597        }
598    }
599
600    /// Emit a `MetricsUpdated` event containing only updated values
601    fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
602        let updated = self.retain_updated(previous);
603
604        if updated == Self::default() {
605            return None;
606        }
607
608        Some(RecoveryMetricsUpdated {
609            min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
610            smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
611            latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
612            rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
613            pto_count: updated
614                .pto_count
615                .map(|count| count.try_into().unwrap_or(u16::MAX)),
616            bytes_in_flight: updated.bytes_in_flight,
617            packets_in_flight: updated.packets_in_flight,
618            congestion_window: updated.congestion_window,
619            ssthresh: updated.ssthresh,
620            pacing_rate: updated.pacing_rate,
621            path_id: Some(path_id.as_u32() as u64),
622        })
623    }
624}
625
626/// RTT estimation for a particular network path
627#[derive(Copy, Clone, Debug)]
628pub struct RttEstimator {
629    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
630    latest: Duration,
631    /// The smoothed RTT of the connection, computed as described in RFC6298
632    smoothed: Option<Duration>,
633    /// The RTT variance, computed as described in RFC6298
634    var: Duration,
635    /// The minimum RTT seen in the connection, ignoring ack delay.
636    min: Duration,
637}
638
639impl RttEstimator {
640    pub(super) fn new(initial_rtt: Duration) -> Self {
641        Self {
642            latest: initial_rtt,
643            smoothed: None,
644            var: initial_rtt / 2,
645            min: initial_rtt,
646        }
647    }
648
649    /// Resets the estimator using a new initial_rtt value.
650    ///
651    /// This only resets the initial_rtt **if** no samples have been recorded yet. If there
652    /// are any recorded samples the initial estimate can not be adjusted after the fact.
653    ///
654    /// This is useful when you receive a PATH_RESPONSE in the first packet received on a
655    /// new path. In this case you can use the delay of the PATH_CHALLENGE-PATH_RESPONSE as
656    /// the initial RTT to get a better expected estimation.
657    ///
658    /// A PATH_CHALLENGE-PATH_RESPONSE pair later in the connection should not be used
659    /// explicitly as an estimation since PATH_CHALLENGE is an ACK-eliciting packet itself
660    /// already.
661    pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
662        if self.smoothed.is_none() {
663            self.latest = initial_rtt;
664            self.var = initial_rtt / 2;
665            self.min = initial_rtt;
666        }
667    }
668
669    /// The current best RTT estimation.
670    pub fn get(&self) -> Duration {
671        self.smoothed.unwrap_or(self.latest)
672    }
673
674    /// Conservative estimate of RTT
675    ///
676    /// Takes the maximum of smoothed and latest RTT, as recommended
677    /// in 6.1.2 of the recovery spec (draft 29).
678    pub fn conservative(&self) -> Duration {
679        self.get().max(self.latest)
680    }
681
682    /// Minimum RTT registered so far for this estimator.
683    pub fn min(&self) -> Duration {
684        self.min
685    }
686
687    /// PTO computed as described in RFC9002#6.2.1.
688    pub(crate) fn pto_base(&self) -> Duration {
689        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
690    }
691
692    /// Records an RTT sample.
693    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
694        self.latest = rtt;
695        // https://www.rfc-editor.org/rfc/rfc9002.html#section-5.2-3:
696        // min_rtt does not adjust for ack_delay to avoid underestimating.
697        self.min = cmp::min(self.min, self.latest);
698        // Based on RFC6298.
699        if let Some(smoothed) = self.smoothed {
700            let adjusted_rtt = if self.min + ack_delay <= self.latest {
701                self.latest - ack_delay
702            } else {
703                self.latest
704            };
705            let var_sample = smoothed.abs_diff(adjusted_rtt);
706            self.var = (3 * self.var + var_sample) / 4;
707            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
708        } else {
709            self.smoothed = Some(self.latest);
710            self.var = self.latest / 2;
711            self.min = self.latest;
712        }
713    }
714}
715
716#[derive(Default, Debug)]
717pub(crate) struct PathResponses {
718    pending: Vec<PathResponse>,
719}
720
721impl PathResponses {
722    pub(crate) fn push(&mut self, packet: u64, token: u64, network_path: FourTuple) {
723        /// Arbitrary permissive limit to prevent abuse
724        const MAX_PATH_RESPONSES: usize = 16;
725        let response = PathResponse {
726            packet,
727            token,
728            network_path,
729        };
730        let existing = self
731            .pending
732            .iter_mut()
733            .find(|x| x.network_path.remote == network_path.remote);
734        if let Some(existing) = existing {
735            // Update a queued response
736            if existing.packet <= packet {
737                *existing = response;
738            }
739            return;
740        }
741        if self.pending.len() < MAX_PATH_RESPONSES {
742            self.pending.push(response);
743        } else {
744            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
745            // older challenges.
746            trace!("ignoring excessive PATH_CHALLENGE");
747        }
748    }
749
750    pub(crate) fn pop_off_path(&mut self, network_path: FourTuple) -> Option<(u64, FourTuple)> {
751        let response = *self.pending.last()?;
752        // We use an exact comparison here, because once we've received for the first time,
753        // we really should either already have a local_ip, or we will never get one
754        // (because our OS doesn't support it).
755        if response.network_path == network_path {
756            // We don't bother searching further because we expect that the on-path response will
757            // get drained in the immediate future by a call to `pop_on_path`
758            return None;
759        }
760        self.pending.pop();
761        Some((response.token, response.network_path))
762    }
763
764    pub(crate) fn pop_on_path(&mut self, network_path: FourTuple) -> Option<u64> {
765        let response = *self.pending.last()?;
766        // Using an exact comparison. See explanation in `pop_off_path`.
767        if response.network_path != network_path {
768            // We don't bother searching further because we expect that the off-path response will
769            // get drained in the immediate future by a call to `pop_off_path`
770            return None;
771        }
772        self.pending.pop();
773        Some(response.token)
774    }
775
776    pub(crate) fn is_empty(&self) -> bool {
777        self.pending.is_empty()
778    }
779}
780
781#[derive(Copy, Clone, Debug)]
782struct PathResponse {
783    /// The packet number the corresponding PATH_CHALLENGE was received in
784    packet: u64,
785    /// The token of the PATH_CHALLENGE
786    token: u64,
787    /// The path the corresponding PATH_CHALLENGE was received from
788    network_path: FourTuple,
789}
790
791/// Summary statistics of packets that have been sent on a particular path, but which have not yet
792/// been acked or deemed lost
793#[derive(Debug)]
794pub(super) struct InFlight {
795    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
796    ///
797    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
798    /// count towards this to ensure congestion control does not impede congestion feedback.
799    pub(super) bytes: u64,
800    /// Number of packets in flight containing frames other than ACK and PADDING
801    ///
802    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
803    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
804    /// also be nonzero.
805    pub(super) ack_eliciting: u64,
806}
807
808impl InFlight {
809    fn new() -> Self {
810        Self {
811            bytes: 0,
812            ack_eliciting: 0,
813        }
814    }
815
816    fn insert(&mut self, packet: &SentPacket) {
817        self.bytes += u64::from(packet.size);
818        self.ack_eliciting += u64::from(packet.ack_eliciting);
819    }
820
821    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
822    fn remove(&mut self, packet: &SentPacket) {
823        self.bytes -= u64::from(packet.size);
824        self.ack_eliciting -= u64::from(packet.ack_eliciting);
825    }
826}
827
828/// State for QUIC-MULTIPATH PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP frames
829#[derive(Debug, Clone, Default)]
830pub(super) struct PathStatusState {
831    /// The local status
832    local_status: PathStatus,
833    /// Local sequence number, for both PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP
834    ///
835    /// This is the number of the *next* path status frame to be sent.
836    local_seq: VarInt,
837    /// The status set by the remote
838    remote_status: Option<(VarInt, PathStatus)>,
839}
840
841impl PathStatusState {
842    /// To be called on received PATH_STATUS_AVAILABLE/PATH_STATUS_BACKUP frames
843    pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
844        if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
845            return trace!(%seq, "ignoring path status update");
846        }
847
848        let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
849        if prev != Some(status) {
850            debug!(?status, ?seq, "remote changed path status");
851        }
852    }
853
854    /// Updates the local status
855    ///
856    /// If the local status changed, the previous value is returned
857    pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
858        if self.local_status == status {
859            return None;
860        }
861
862        self.local_seq = self.local_seq.saturating_add(1u8);
863        Some(std::mem::replace(&mut self.local_status, status))
864    }
865
866    pub(crate) fn seq(&self) -> VarInt {
867        self.local_seq
868    }
869}
870
871/// The QUIC-MULTIPATH path status
872///
873/// See section "3.3 Path Status Management":
874/// <https://quicwg.org/multipath/draft-ietf-quic-multipath.html#name-path-status-management>
875#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
876pub enum PathStatus {
877    /// Paths marked with as available will be used when scheduling packets
878    ///
879    /// If multiple paths are available, packets will be scheduled on whichever has
880    /// capacity.
881    #[default]
882    Available,
883    /// Paths marked as backup will only be used if there are no available paths
884    ///
885    /// If the max_idle_timeout is specified the path will be kept alive so that it does not
886    /// expire.
887    Backup,
888}
889
890/// Application events about paths
891#[derive(Debug, Clone, PartialEq, Eq)]
892pub enum PathEvent {
893    /// A new path has been opened
894    Opened {
895        /// Which path is now open
896        id: PathId,
897    },
898    /// A path has been closed
899    Closed {
900        /// Which path has been closed
901        id: PathId,
902        /// Error code supplied by the peer
903        /// See <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-error-codes>
904        /// for a list of known errors.
905        error_code: VarInt,
906    },
907    /// All remaining state for a path has been removed
908    ///
909    /// The [`PathEvent::Closed`] would have been emitted for this path earlier.
910    Abandoned {
911        /// Which path had its state dropped
912        id: PathId,
913        /// The final path stats, they are no longer available via [`Connection::stats`]
914        ///
915        /// [`Connection::stats`]: super::Connection::stats
916        path_stats: PathStats,
917    },
918    /// Path was closed locally
919    LocallyClosed {
920        /// Path for which the error occurred
921        id: PathId,
922        /// The error that occurred
923        error: PathError,
924    },
925    /// The remote changed the status of the path
926    ///
927    /// The local status is not changed because of this event. It is up to the application
928    /// to update the local status, which is used for packet scheduling, when the remote
929    /// changes the status.
930    RemoteStatus {
931        /// Path which has changed status
932        id: PathId,
933        /// The new status set by the remote
934        status: PathStatus,
935    },
936    /// Received an observation of our external address from the peer.
937    ObservedAddr {
938        /// Path over which the observed address was reported, [`PathId::ZERO`] when multipath is
939        /// not negotiated
940        id: PathId,
941        /// The address observed by the remote over this path
942        addr: SocketAddr,
943    },
944}
945
946/// Error from setting path status
947#[derive(Debug, Error, Clone, PartialEq, Eq)]
948pub enum SetPathStatusError {
949    /// Error indicating that a path has not been opened or has already been abandoned
950    #[error("closed path")]
951    ClosedPath,
952    /// Error indicating that this operation requires multipath to be negotiated whereas it hasn't been
953    #[error("multipath not negotiated")]
954    MultipathNotNegotiated,
955}
956
957/// Error indicating that a path has not been opened or has already been abandoned
958#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
959#[error("closed path")]
960pub struct ClosedPath {
961    pub(super) _private: (),
962}
963
964#[cfg(test)]
965mod tests {
966    use super::*;
967
968    #[test]
969    fn test_path_id_saturating_add() {
970        // add within range behaves normally
971        let large: PathId = u16::MAX.into();
972        let next = u32::from(u16::MAX) + 1;
973        assert_eq!(large.saturating_add(1u8), PathId::from(next));
974
975        // outside range saturates
976        assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
977    }
978}