iroh_quinn_proto/connection/
paths.rs

1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8    PathError, PathStats,
9    mtud::MtuDiscovery,
10    pacing::Pacer,
11    spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14    ConnectionId, Duration, Instant, TIMER_GRANULARITY, TransportConfig, VarInt, coding,
15    congestion, frame::ObservedAddr, packet::SpaceId,
16};
17
18#[cfg(feature = "qlog")]
19use qlog::events::quic::RecoveryMetricsUpdated;
20
21/// Id representing different paths when using multipath extension
22#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
23pub struct PathId(pub(crate) u32);
24
25impl std::hash::Hash for PathId {
26    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
27        state.write_u32(self.0);
28    }
29}
30
31impl identity_hash::IdentityHashable for PathId {}
32
33impl coding::Codec for PathId {
34    fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
35        let v = VarInt::decode(r)?;
36        let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
37        Ok(Self(v))
38    }
39
40    fn encode<B: bytes::BufMut>(&self, w: &mut B) {
41        VarInt(self.0.into()).encode(w)
42    }
43}
44
45impl PathId {
46    /// The maximum path ID allowed.
47    pub const MAX: Self = Self(u32::MAX);
48
49    /// The 0 path id.
50    pub const ZERO: Self = Self(0);
51
52    /// The number of bytes this [`PathId`] uses when encoded as a [`VarInt`]
53    pub(crate) const fn size(&self) -> usize {
54        VarInt(self.0 as u64).size()
55    }
56
57    /// Saturating integer addition. Computes self + rhs, saturating at the numeric bounds instead
58    /// of overflowing.
59    pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
60        let rhs = rhs.into();
61        let inner = self.0.saturating_add(rhs.0);
62        Self(inner)
63    }
64
65    /// Saturating integer subtraction. Computes self - rhs, saturating at the numeric bounds
66    /// instead of overflowing.
67    pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
68        let rhs = rhs.into();
69        let inner = self.0.saturating_sub(rhs.0);
70        Self(inner)
71    }
72
73    /// Get the next [`PathId`]
74    pub(crate) fn next(&self) -> Self {
75        self.saturating_add(Self(1))
76    }
77
78    /// Get the underlying u32
79    pub(crate) fn as_u32(&self) -> u32 {
80        self.0
81    }
82}
83
84impl std::fmt::Display for PathId {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        self.0.fmt(f)
87    }
88}
89
90impl<T: Into<u32>> From<T> for PathId {
91    fn from(source: T) -> Self {
92        Self(source.into())
93    }
94}
95
96/// State needed for a single path ID.
97///
98/// A single path ID can migrate according to the rules in RFC9000 §9, either voluntary or
99/// involuntary. We need to keep the [`PathData`] of the previously used such path available
100/// in order to defend against migration attacks (see RFC9000 §9.3.1, §9.3.2 and §9.3.3) as
101/// well as to support path probing (RFC9000 §9.1).
102#[derive(Debug)]
103pub(super) struct PathState {
104    pub(super) data: PathData,
105    pub(super) prev: Option<(ConnectionId, PathData)>,
106}
107
108impl PathState {
109    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
110    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
111        // Visit known paths from newest to oldest to find the one `pn` was sent on
112        for path_data in [&mut self.data]
113            .into_iter()
114            .chain(self.prev.as_mut().map(|(_, data)| data))
115        {
116            if path_data.remove_in_flight(packet) {
117                return;
118            }
119        }
120    }
121}
122
123#[derive(Debug)]
124pub(super) struct SentChallengeInfo {
125    /// When was the challenge sent on the wire.
126    pub(super) sent_instant: Instant,
127    /// The remote to which this path challenge was sent.
128    pub(super) remote: SocketAddr,
129}
130
131/// Description of a particular network path
132#[derive(Debug)]
133pub(super) struct PathData {
134    pub(super) remote: SocketAddr,
135    pub(super) rtt: RttEstimator,
136    /// Whether we're enabling ECN on outgoing packets
137    pub(super) sending_ecn: bool,
138    /// Congestion controller state
139    pub(super) congestion: Box<dyn congestion::Controller>,
140    /// Pacing state
141    pub(super) pacing: Pacer,
142    /// Actually sent challenges (on the wire).
143    pub(super) challenges_sent: IntMap<u64, SentChallengeInfo>,
144    /// Whether to *immediately* trigger another PATH_CHALLENGE.
145    ///
146    /// This is picked up by [`super::Connection::space_can_send`].
147    pub(super) send_new_challenge: bool,
148    /// Pending responses to PATH_CHALLENGE frames
149    pub(super) path_responses: PathResponses,
150    /// Whether we're certain the peer can both send and receive on this address
151    ///
152    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
153    /// migration. Always true for clients.
154    pub(super) validated: bool,
155    /// Total size of all UDP datagrams sent on this path
156    pub(super) total_sent: u64,
157    /// Total size of all UDP datagrams received on this path
158    pub(super) total_recvd: u64,
159    /// The state of the MTU discovery process
160    pub(super) mtud: MtuDiscovery,
161    /// Packet number of the first packet sent after an RTT sample was collected on this path
162    ///
163    /// Used in persistent congestion determination.
164    pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
165    /// The in-flight packets and bytes
166    ///
167    /// Note that this is across all spaces on this path
168    pub(super) in_flight: InFlight,
169    /// Whether this path has had it's remote address reported back to the peer. This only happens
170    /// if both peers agree to so based on their transport parameters.
171    pub(super) observed_addr_sent: bool,
172    /// Observed address frame with the largest sequence number received from the peer on this path.
173    pub(super) last_observed_addr_report: Option<ObservedAddr>,
174    /// The QUIC-MULTIPATH path status
175    pub(super) status: PathStatusState,
176    /// Number of the first packet sent on this path
177    ///
178    /// With RFC9000 §9 style migration (i.e. not multipath) the PathId does not change and
179    /// hence packet numbers continue. This is used to determine whether a packet was sent
180    /// on such an earlier path. Insufficient to determine if a packet was sent on a later
181    /// path.
182    first_packet: Option<u64>,
183    /// The number of times a PTO has been sent without receiving an ack.
184    pub(super) pto_count: u32,
185
186    //
187    // Per-path idle & keep alive
188    //
189    /// Idle timeout for the path
190    ///
191    /// If expired, the path will be abandoned.  This is different from the connection-wide
192    /// idle timeout which closes the connection if expired.
193    pub(super) idle_timeout: Option<Duration>,
194    /// Keep alives to send on this path
195    ///
196    /// There is also a connection-level keep alive configured in the
197    /// [`TransportParameters`].  This triggers activity on any path which can keep the
198    /// connection alive.
199    ///
200    /// [`TransportParameters`]: crate::transport_parameters::TransportParameters
201    pub(super) keep_alive: Option<Duration>,
202
203    /// Whether the path has already been considered opened from an application perspective
204    ///
205    /// This means, for paths other than the original [`PathId::ZERO`], a first path challenge has
206    /// been responded to, regardless of the initial validation status of the path. This state is
207    /// irreversible, since it's not affected by the path being closed.
208    pub(super) open: bool,
209
210    /// State relevant to sending and receiving PATH_ABANDON frames.
211    pub(super) abandon_state: AbandonState,
212
213    /// Snapshot of the qlog recovery metrics
214    #[cfg(feature = "qlog")]
215    recovery_metrics: RecoveryMetrics,
216
217    /// Tag uniquely identifying a path in a connection
218    generation: u64,
219}
220
221/// The abandon-relevant state a path can be in.
222#[derive(Debug)]
223pub(super) enum AbandonState {
224    /// This path wasn't abandoned yet.
225    NotAbandoned,
226    /// A PATH_ABANDON frame was sent for this path, and we're expecting a response
227    /// by the given deadline.
228    ///
229    /// If we don't receive PATH_ABANDON by that time *and* we receive a packet on this
230    /// path, then we assume our peer has ignored our PATH_ABANDON and error out.
231    ///
232    /// This is checked in [`crate::Connection::on_packet_authenticated`].
233    ExpectingPathAbandon { deadline: Instant },
234    /// We received a PATH_ABANDON and are just waiting for the path's packets to drain
235    /// and eventually will discard the path.
236    ReceivedPathAbandon,
237}
238
239impl PathData {
240    pub(super) fn new(
241        remote: SocketAddr,
242        allow_mtud: bool,
243        peer_max_udp_payload_size: Option<u16>,
244        generation: u64,
245        now: Instant,
246        config: &TransportConfig,
247    ) -> Self {
248        let congestion = config
249            .congestion_controller_factory
250            .clone()
251            .build(now, config.get_initial_mtu());
252        Self {
253            remote,
254            rtt: RttEstimator::new(config.initial_rtt),
255            sending_ecn: true,
256            pacing: Pacer::new(
257                config.initial_rtt,
258                congestion.initial_window(),
259                config.get_initial_mtu(),
260                now,
261            ),
262            congestion,
263            challenges_sent: Default::default(),
264            send_new_challenge: false,
265            path_responses: PathResponses::default(),
266            validated: false,
267            total_sent: 0,
268            total_recvd: 0,
269            mtud: config
270                .mtu_discovery_config
271                .as_ref()
272                .filter(|_| allow_mtud)
273                .map_or(
274                    MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
275                    |mtud_config| {
276                        MtuDiscovery::new(
277                            config.get_initial_mtu(),
278                            config.min_mtu,
279                            peer_max_udp_payload_size,
280                            mtud_config.clone(),
281                        )
282                    },
283                ),
284            first_packet_after_rtt_sample: None,
285            in_flight: InFlight::new(),
286            observed_addr_sent: false,
287            last_observed_addr_report: None,
288            status: Default::default(),
289            first_packet: None,
290            pto_count: 0,
291            idle_timeout: config.default_path_max_idle_timeout,
292            keep_alive: config.default_path_keep_alive_interval,
293            open: false,
294            abandon_state: AbandonState::NotAbandoned,
295            #[cfg(feature = "qlog")]
296            recovery_metrics: RecoveryMetrics::default(),
297            generation,
298        }
299    }
300
301    /// Create a new path from a previous one.
302    ///
303    /// This should only be called when migrating paths.
304    pub(super) fn from_previous(
305        remote: SocketAddr,
306        prev: &Self,
307        generation: u64,
308        now: Instant,
309    ) -> Self {
310        let congestion = prev.congestion.clone_box();
311        let smoothed_rtt = prev.rtt.get();
312        Self {
313            remote,
314            rtt: prev.rtt,
315            pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
316            sending_ecn: true,
317            congestion,
318            challenges_sent: Default::default(),
319            send_new_challenge: false,
320            path_responses: PathResponses::default(),
321            validated: false,
322            total_sent: 0,
323            total_recvd: 0,
324            mtud: prev.mtud.clone(),
325            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
326            in_flight: InFlight::new(),
327            observed_addr_sent: false,
328            last_observed_addr_report: None,
329            status: prev.status.clone(),
330            first_packet: None,
331            pto_count: 0,
332            idle_timeout: prev.idle_timeout,
333            keep_alive: prev.keep_alive,
334            open: false,
335            abandon_state: AbandonState::NotAbandoned,
336            #[cfg(feature = "qlog")]
337            recovery_metrics: prev.recovery_metrics.clone(),
338            generation,
339        }
340    }
341
342    /// Whether we're in the process of validating this path with PATH_CHALLENGEs
343    pub(super) fn is_validating_path(&self) -> bool {
344        !self.challenges_sent.is_empty() || self.send_new_challenge
345    }
346
347    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
348    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
349    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
350        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
351    }
352
353    /// Returns the path's current MTU
354    pub(super) fn current_mtu(&self) -> u16 {
355        self.mtud.current_mtu()
356    }
357
358    /// Account for transmission of `packet` with number `pn` in `space`
359    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
360        self.in_flight.insert(&packet);
361        if self.first_packet.is_none() {
362            self.first_packet = Some(pn);
363        }
364        if let Some(forgotten) = space.sent(pn, packet) {
365            self.remove_in_flight(&forgotten);
366        }
367    }
368
369    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
370    /// `false` if `pn` was sent before this path was established.
371    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
372        if packet.path_generation != self.generation {
373            return false;
374        }
375        self.in_flight.remove(packet);
376        true
377    }
378
379    /// Increment the total size of sent UDP datagrams
380    pub(super) fn inc_total_sent(&mut self, inc: u64) {
381        self.total_sent = self.total_sent.saturating_add(inc);
382        if !self.validated {
383            trace!(
384                remote = %self.remote,
385                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
386                "anti amplification budget decreased"
387            );
388        }
389    }
390
391    /// Increment the total size of received UDP datagrams
392    pub(super) fn inc_total_recvd(&mut self, inc: u64) {
393        self.total_recvd = self.total_recvd.saturating_add(inc);
394        if !self.validated {
395            trace!(
396                remote = %self.remote,
397                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
398                "anti amplification budget increased"
399            );
400        }
401    }
402
403    /// The earliest time at which a sent challenge is considered lost.
404    pub(super) fn earliest_expiring_challenge(&self) -> Option<Instant> {
405        if self.challenges_sent.is_empty() {
406            return None;
407        }
408        let pto = self.rtt.pto_base();
409        self.challenges_sent
410            .values()
411            .map(|info| info.sent_instant)
412            .min()
413            .map(|sent_instant| sent_instant + pto)
414    }
415
416    /// Handle receiving a PATH_RESPONSE.
417    pub(super) fn on_path_response_received(
418        &mut self,
419        now: Instant,
420        token: u64,
421        remote: SocketAddr,
422    ) -> OnPathResponseReceived {
423        match self.challenges_sent.get(&token) {
424            // Response to an on-path PathChallenge
425            Some(info) if info.remote == remote && self.remote == remote => {
426                let sent_instant = info.sent_instant;
427                if !std::mem::replace(&mut self.validated, true) {
428                    trace!("new path validated");
429                }
430                // Clear any other on-path sent challenge.
431                self.challenges_sent
432                    .retain(|_token, info| info.remote != remote);
433
434                self.send_new_challenge = false;
435
436                // This RTT can only be used for the initial RTT, not as a normal
437                // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
438                let rtt = now.saturating_duration_since(sent_instant);
439                self.rtt.reset_initial_rtt(rtt);
440
441                let was_open = std::mem::replace(&mut self.open, true);
442                OnPathResponseReceived::OnPath { was_open }
443            }
444            // Response to an off-path PathChallenge
445            Some(info) if info.remote == remote => {
446                self.challenges_sent
447                    .retain(|_token, info| info.remote != remote);
448                OnPathResponseReceived::OffPath
449            }
450            // Response to a PathChallenge we recognize, but from an invalid remote
451            Some(info) => OnPathResponseReceived::Invalid {
452                expected: info.remote,
453            },
454            // Response to an unknown PathChallenge
455            None => OnPathResponseReceived::Unknown,
456        }
457    }
458
459    #[cfg(feature = "qlog")]
460    pub(super) fn qlog_recovery_metrics(
461        &mut self,
462        path_id: PathId,
463    ) -> Option<RecoveryMetricsUpdated> {
464        let controller_metrics = self.congestion.metrics();
465
466        let metrics = RecoveryMetrics {
467            min_rtt: Some(self.rtt.min),
468            smoothed_rtt: Some(self.rtt.get()),
469            latest_rtt: Some(self.rtt.latest),
470            rtt_variance: Some(self.rtt.var),
471            pto_count: Some(self.pto_count),
472            bytes_in_flight: Some(self.in_flight.bytes),
473            packets_in_flight: Some(self.in_flight.ack_eliciting),
474
475            congestion_window: Some(controller_metrics.congestion_window),
476            ssthresh: controller_metrics.ssthresh,
477            pacing_rate: controller_metrics.pacing_rate,
478        };
479
480        let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
481        self.recovery_metrics = metrics;
482        event
483    }
484
485    /// Return how long we need to wait before sending `bytes_to_send`
486    ///
487    /// See [`Pacer::delay`].
488    pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Instant> {
489        let smoothed_rtt = self.rtt.get();
490        self.pacing.delay(
491            smoothed_rtt,
492            bytes_to_send,
493            self.current_mtu(),
494            self.congestion.window(),
495            now,
496        )
497    }
498
499    /// Updates the last observed address report received on this path.
500    ///
501    /// If the address was updated, it's returned to be informed to the application.
502    #[must_use = "updated observed address must be reported to the application"]
503    pub(super) fn update_observed_addr_report(
504        &mut self,
505        observed: ObservedAddr,
506    ) -> Option<SocketAddr> {
507        match self.last_observed_addr_report.as_mut() {
508            Some(prev) => {
509                if prev.seq_no >= observed.seq_no {
510                    // frames that do not increase the sequence number on this path are ignored
511                    None
512                } else if prev.ip == observed.ip && prev.port == observed.port {
513                    // keep track of the last seq_no but do not report the address as updated
514                    prev.seq_no = observed.seq_no;
515                    None
516                } else {
517                    let addr = observed.socket_addr();
518                    self.last_observed_addr_report = Some(observed);
519                    Some(addr)
520                }
521            }
522            None => {
523                let addr = observed.socket_addr();
524                self.last_observed_addr_report = Some(observed);
525                Some(addr)
526            }
527        }
528    }
529
530    pub(crate) fn remote_status(&self) -> Option<PathStatus> {
531        self.status.remote_status.map(|(_seq, status)| status)
532    }
533
534    pub(crate) fn local_status(&self) -> PathStatus {
535        self.status.local_status
536    }
537
538    pub(super) fn generation(&self) -> u64 {
539        self.generation
540    }
541}
542
543pub(super) enum OnPathResponseReceived {
544    /// This response validates the path on its current remote address.
545    OnPath { was_open: bool },
546    /// This response is valid, but it's for a remote other than the path's current remote address.
547    OffPath,
548    /// The received token is unknown.
549    Unknown,
550    /// The response is invalid.
551    Invalid {
552        /// The remote that was expected for this token.
553        expected: SocketAddr,
554    },
555}
556
557/// Congestion metrics as described in [`recovery_metrics_updated`].
558///
559/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
560#[cfg(feature = "qlog")]
561#[derive(Default, Clone, PartialEq, Debug)]
562#[non_exhaustive]
563struct RecoveryMetrics {
564    pub min_rtt: Option<Duration>,
565    pub smoothed_rtt: Option<Duration>,
566    pub latest_rtt: Option<Duration>,
567    pub rtt_variance: Option<Duration>,
568    pub pto_count: Option<u32>,
569    pub bytes_in_flight: Option<u64>,
570    pub packets_in_flight: Option<u64>,
571    pub congestion_window: Option<u64>,
572    pub ssthresh: Option<u64>,
573    pub pacing_rate: Option<u64>,
574}
575
576#[cfg(feature = "qlog")]
577impl RecoveryMetrics {
578    /// Retain only values that have been updated since the last snapshot.
579    fn retain_updated(&self, previous: &Self) -> Self {
580        macro_rules! keep_if_changed {
581            ($name:ident) => {
582                if previous.$name == self.$name {
583                    None
584                } else {
585                    self.$name
586                }
587            };
588        }
589
590        Self {
591            min_rtt: keep_if_changed!(min_rtt),
592            smoothed_rtt: keep_if_changed!(smoothed_rtt),
593            latest_rtt: keep_if_changed!(latest_rtt),
594            rtt_variance: keep_if_changed!(rtt_variance),
595            pto_count: keep_if_changed!(pto_count),
596            bytes_in_flight: keep_if_changed!(bytes_in_flight),
597            packets_in_flight: keep_if_changed!(packets_in_flight),
598            congestion_window: keep_if_changed!(congestion_window),
599            ssthresh: keep_if_changed!(ssthresh),
600            pacing_rate: keep_if_changed!(pacing_rate),
601        }
602    }
603
604    /// Emit a `MetricsUpdated` event containing only updated values
605    fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
606        let updated = self.retain_updated(previous);
607
608        if updated == Self::default() {
609            return None;
610        }
611
612        Some(RecoveryMetricsUpdated {
613            min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
614            smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
615            latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
616            rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
617            pto_count: updated
618                .pto_count
619                .map(|count| count.try_into().unwrap_or(u16::MAX)),
620            bytes_in_flight: updated.bytes_in_flight,
621            packets_in_flight: updated.packets_in_flight,
622            congestion_window: updated.congestion_window,
623            ssthresh: updated.ssthresh,
624            pacing_rate: updated.pacing_rate,
625            path_id: Some(path_id.as_u32() as u64),
626        })
627    }
628}
629
630/// RTT estimation for a particular network path
631#[derive(Copy, Clone, Debug)]
632pub struct RttEstimator {
633    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
634    latest: Duration,
635    /// The smoothed RTT of the connection, computed as described in RFC6298
636    smoothed: Option<Duration>,
637    /// The RTT variance, computed as described in RFC6298
638    var: Duration,
639    /// The minimum RTT seen in the connection, ignoring ack delay.
640    min: Duration,
641}
642
643impl RttEstimator {
644    pub(super) fn new(initial_rtt: Duration) -> Self {
645        Self {
646            latest: initial_rtt,
647            smoothed: None,
648            var: initial_rtt / 2,
649            min: initial_rtt,
650        }
651    }
652
653    /// Resets the estimator using a new initial_rtt value.
654    ///
655    /// This only resets the initial_rtt **if** no samples have been recorded yet. If there
656    /// are any recorded samples the initial estimate can not be adjusted after the fact.
657    ///
658    /// This is useful when you receive a PATH_RESPONSE in the first packet received on a
659    /// new path. In this case you can use the delay of the PATH_CHALLENGE-PATH_RESPONSE as
660    /// the initial RTT to get a better expected estimation.
661    ///
662    /// A PATH_CHALLENGE-PATH_RESPONSE pair later in the connection should not be used
663    /// explicitly as an estimation since PATH_CHALLENGE is an ACK-eliciting packet itself
664    /// already.
665    pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
666        if self.smoothed.is_none() {
667            self.latest = initial_rtt;
668            self.var = initial_rtt / 2;
669            self.min = initial_rtt;
670        }
671    }
672
673    /// The current best RTT estimation.
674    pub fn get(&self) -> Duration {
675        self.smoothed.unwrap_or(self.latest)
676    }
677
678    /// Conservative estimate of RTT
679    ///
680    /// Takes the maximum of smoothed and latest RTT, as recommended
681    /// in 6.1.2 of the recovery spec (draft 29).
682    pub fn conservative(&self) -> Duration {
683        self.get().max(self.latest)
684    }
685
686    /// Minimum RTT registered so far for this estimator.
687    pub fn min(&self) -> Duration {
688        self.min
689    }
690
691    /// PTO computed as described in RFC9002#6.2.1.
692    pub(crate) fn pto_base(&self) -> Duration {
693        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
694    }
695
696    /// Records an RTT sample.
697    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
698        self.latest = rtt;
699        // https://www.rfc-editor.org/rfc/rfc9002.html#section-5.2-3:
700        // min_rtt does not adjust for ack_delay to avoid underestimating.
701        self.min = cmp::min(self.min, self.latest);
702        // Based on RFC6298.
703        if let Some(smoothed) = self.smoothed {
704            let adjusted_rtt = if self.min + ack_delay <= self.latest {
705                self.latest - ack_delay
706            } else {
707                self.latest
708            };
709            let var_sample = smoothed.abs_diff(adjusted_rtt);
710            self.var = (3 * self.var + var_sample) / 4;
711            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
712        } else {
713            self.smoothed = Some(self.latest);
714            self.var = self.latest / 2;
715            self.min = self.latest;
716        }
717    }
718}
719
720#[derive(Default, Debug)]
721pub(crate) struct PathResponses {
722    pending: Vec<PathResponse>,
723}
724
725impl PathResponses {
726    pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
727        /// Arbitrary permissive limit to prevent abuse
728        const MAX_PATH_RESPONSES: usize = 16;
729        let response = PathResponse {
730            packet,
731            token,
732            remote,
733        };
734        let existing = self.pending.iter_mut().find(|x| x.remote == remote);
735        if let Some(existing) = existing {
736            // Update a queued response
737            if existing.packet <= packet {
738                *existing = response;
739            }
740            return;
741        }
742        if self.pending.len() < MAX_PATH_RESPONSES {
743            self.pending.push(response);
744        } else {
745            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
746            // older challenges.
747            trace!("ignoring excessive PATH_CHALLENGE");
748        }
749    }
750
751    pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
752        let response = *self.pending.last()?;
753        if response.remote == remote {
754            // We don't bother searching further because we expect that the on-path response will
755            // get drained in the immediate future by a call to `pop_on_path`
756            return None;
757        }
758        self.pending.pop();
759        Some((response.token, response.remote))
760    }
761
762    pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
763        let response = *self.pending.last()?;
764        if response.remote != remote {
765            // We don't bother searching further because we expect that the off-path response will
766            // get drained in the immediate future by a call to `pop_off_path`
767            return None;
768        }
769        self.pending.pop();
770        Some(response.token)
771    }
772
773    pub(crate) fn is_empty(&self) -> bool {
774        self.pending.is_empty()
775    }
776}
777
778#[derive(Copy, Clone, Debug)]
779struct PathResponse {
780    /// The packet number the corresponding PATH_CHALLENGE was received in
781    packet: u64,
782    /// The token of the PATH_CHALLENGE
783    token: u64,
784    /// The address the corresponding PATH_CHALLENGE was received from
785    remote: SocketAddr,
786}
787
788/// Summary statistics of packets that have been sent on a particular path, but which have not yet
789/// been acked or deemed lost
790#[derive(Debug)]
791pub(super) struct InFlight {
792    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
793    ///
794    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
795    /// count towards this to ensure congestion control does not impede congestion feedback.
796    pub(super) bytes: u64,
797    /// Number of packets in flight containing frames other than ACK and PADDING
798    ///
799    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
800    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
801    /// also be nonzero.
802    pub(super) ack_eliciting: u64,
803}
804
805impl InFlight {
806    fn new() -> Self {
807        Self {
808            bytes: 0,
809            ack_eliciting: 0,
810        }
811    }
812
813    fn insert(&mut self, packet: &SentPacket) {
814        self.bytes += u64::from(packet.size);
815        self.ack_eliciting += u64::from(packet.ack_eliciting);
816    }
817
818    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
819    fn remove(&mut self, packet: &SentPacket) {
820        self.bytes -= u64::from(packet.size);
821        self.ack_eliciting -= u64::from(packet.ack_eliciting);
822    }
823}
824
825/// State for QUIC-MULTIPATH PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP frames
826#[derive(Debug, Clone, Default)]
827pub(super) struct PathStatusState {
828    /// The local status
829    local_status: PathStatus,
830    /// Local sequence number, for both PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP
831    ///
832    /// This is the number of the *next* path status frame to be sent.
833    local_seq: VarInt,
834    /// The status set by the remote
835    remote_status: Option<(VarInt, PathStatus)>,
836}
837
838impl PathStatusState {
839    /// To be called on received PATH_STATUS_AVAILABLE/PATH_STATUS_BACKUP frames
840    pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
841        if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
842            return trace!(%seq, "ignoring path status update");
843        }
844
845        let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
846        if prev != Some(status) {
847            debug!(?status, ?seq, "remote changed path status");
848        }
849    }
850
851    /// Updates the local status
852    ///
853    /// If the local status changed, the previous value is returned
854    pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
855        if self.local_status == status {
856            return None;
857        }
858
859        self.local_seq = self.local_seq.saturating_add(1u8);
860        Some(std::mem::replace(&mut self.local_status, status))
861    }
862
863    pub(crate) fn seq(&self) -> VarInt {
864        self.local_seq
865    }
866}
867
868/// The QUIC-MULTIPATH path status
869///
870/// See section "3.3 Path Status Management":
871/// <https://quicwg.org/multipath/draft-ietf-quic-multipath.html#name-path-status-management>
872#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
873pub enum PathStatus {
874    /// Paths marked with as available will be used when scheduling packets
875    ///
876    /// If multiple paths are available, packets will be scheduled on whichever has
877    /// capacity.
878    #[default]
879    Available,
880    /// Paths marked as backup will only be used if there are no available paths
881    ///
882    /// If the max_idle_timeout is specified the path will be kept alive so that it does not
883    /// expire.
884    Backup,
885}
886
887/// Application events about paths
888#[derive(Debug, Clone, PartialEq, Eq)]
889pub enum PathEvent {
890    /// A new path has been opened
891    Opened {
892        /// Which path is now open
893        id: PathId,
894    },
895    /// A path has been closed
896    Closed {
897        /// Which path has been closed
898        id: PathId,
899        /// Error code supplied by the peer
900        /// See <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-error-codes>
901        /// for a list of known errors.
902        error_code: VarInt,
903    },
904    /// All remaining state for a path has been removed
905    ///
906    /// The [`PathEvent::Closed`] would have been emitted for this path earlier.
907    Abandoned {
908        /// Which path had its state dropped
909        id: PathId,
910        /// The final path stats, they are no longer available via [`Connection::stats`]
911        ///
912        /// [`Connection::stats`]: super::Connection::stats
913        path_stats: PathStats,
914    },
915    /// Path was closed locally
916    LocallyClosed {
917        /// Path for which the error occurred
918        id: PathId,
919        /// The error that occurred
920        error: PathError,
921    },
922    /// The remote changed the status of the path
923    ///
924    /// The local status is not changed because of this event. It is up to the application
925    /// to update the local status, which is used for packet scheduling, when the remote
926    /// changes the status.
927    RemoteStatus {
928        /// Path which has changed status
929        id: PathId,
930        /// The new status set by the remote
931        status: PathStatus,
932    },
933    /// Received an observation of our external address from the peer.
934    ObservedAddr {
935        /// Path over which the observed address was reported, [`PathId::ZERO`] when multipath is
936        /// not negotiated
937        id: PathId,
938        /// The address observed by the remote over this path
939        addr: SocketAddr,
940    },
941}
942
943/// Error from setting path status
944#[derive(Debug, Error, Clone, PartialEq, Eq)]
945pub enum SetPathStatusError {
946    /// Error indicating that a path has not been opened or has already been abandoned
947    #[error("closed path")]
948    ClosedPath,
949    /// Error indicating that this operation requires multipath to be negotiated whereas it hasn't been
950    #[error("multipath not negotiated")]
951    MultipathNotNegotiated,
952}
953
954/// Error indicating that a path has not been opened or has already been abandoned
955#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
956#[error("closed path")]
957pub struct ClosedPath {
958    pub(super) _private: (),
959}
960
961#[cfg(test)]
962mod tests {
963    use super::*;
964
965    #[test]
966    fn test_path_id_saturating_add() {
967        // add within range behaves normally
968        let large: PathId = u16::MAX.into();
969        let next = u32::from(u16::MAX) + 1;
970        assert_eq!(large.saturating_add(1u8), PathId::from(next));
971
972        // outside range saturates
973        assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
974    }
975}