iroh_quinn_proto/connection/
paths.rs

1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8    PathError, PathStats,
9    mtud::MtuDiscovery,
10    pacing::Pacer,
11    spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14    ConnectionId, Duration, Instant, TIMER_GRANULARITY, TransportConfig, VarInt, coding,
15    congestion, frame::ObservedAddr, packet::SpaceId,
16};
17
18#[cfg(feature = "qlog")]
19use qlog::events::quic::RecoveryMetricsUpdated;
20
21/// Id representing different paths when using multipath extension
22#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
23pub struct PathId(pub(crate) u32);
24
25impl std::hash::Hash for PathId {
26    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
27        state.write_u32(self.0);
28    }
29}
30
31impl identity_hash::IdentityHashable for PathId {}
32
33impl coding::Codec for PathId {
34    fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
35        let v = VarInt::decode(r)?;
36        let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
37        Ok(Self(v))
38    }
39
40    fn encode<B: bytes::BufMut>(&self, w: &mut B) {
41        VarInt(self.0.into()).encode(w)
42    }
43}
44
45impl PathId {
46    /// The maximum path ID allowed.
47    pub const MAX: Self = Self(u32::MAX);
48
49    /// The 0 path id.
50    pub const ZERO: Self = Self(0);
51
52    /// The number of bytes this [`PathId`] uses when encoded as a [`VarInt`]
53    pub(crate) const fn size(&self) -> usize {
54        VarInt(self.0 as u64).size()
55    }
56
57    /// Saturating integer addition. Computes self + rhs, saturating at the numeric bounds instead
58    /// of overflowing.
59    pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
60        let rhs = rhs.into();
61        let inner = self.0.saturating_add(rhs.0);
62        Self(inner)
63    }
64
65    /// Saturating integer subtraction. Computes self - rhs, saturating at the numeric bounds
66    /// instead of overflowing.
67    pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
68        let rhs = rhs.into();
69        let inner = self.0.saturating_sub(rhs.0);
70        Self(inner)
71    }
72
73    /// Get the next [`PathId`]
74    pub(crate) fn next(&self) -> Self {
75        self.saturating_add(Self(1))
76    }
77
78    /// Get the underlying u32
79    pub(crate) fn as_u32(&self) -> u32 {
80        self.0
81    }
82}
83
84impl std::fmt::Display for PathId {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        self.0.fmt(f)
87    }
88}
89
90impl<T: Into<u32>> From<T> for PathId {
91    fn from(source: T) -> Self {
92        Self(source.into())
93    }
94}
95
96/// State needed for a single path ID.
97///
98/// A single path ID can migrate according to the rules in RFC9000 §9, either voluntary or
99/// involuntary. We need to keep the [`PathData`] of the previously used such path available
100/// in order to defend against migration attacks (see RFC9000 §9.3.1, §9.3.2 and §9.3.3) as
101/// well as to support path probing (RFC9000 §9.1).
102#[derive(Debug)]
103pub(super) struct PathState {
104    pub(super) data: PathData,
105    pub(super) prev: Option<(ConnectionId, PathData)>,
106}
107
108impl PathState {
109    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
110    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
111        // Visit known paths from newest to oldest to find the one `pn` was sent on
112        for path_data in [&mut self.data]
113            .into_iter()
114            .chain(self.prev.as_mut().map(|(_, data)| data))
115        {
116            if path_data.remove_in_flight(packet) {
117                return;
118            }
119        }
120    }
121}
122
123#[derive(Debug)]
124pub(super) struct SentChallengeInfo {
125    /// When was the challenge sent on the wire.
126    pub(super) sent_instant: Instant,
127    /// The remote to which this path challenge was sent.
128    pub(super) remote: SocketAddr,
129}
130
131/// Description of a particular network path
132#[derive(Debug)]
133pub(super) struct PathData {
134    pub(super) remote: SocketAddr,
135    pub(super) rtt: RttEstimator,
136    /// Whether we're enabling ECN on outgoing packets
137    pub(super) sending_ecn: bool,
138    /// Congestion controller state
139    pub(super) congestion: Box<dyn congestion::Controller>,
140    /// Pacing state
141    pub(super) pacing: Pacer,
142    /// Actually sent challenges (on the wire).
143    pub(super) challenges_sent: IntMap<u64, SentChallengeInfo>,
144    /// Whether to *immediately* trigger another PATH_CHALLENGE (via [`super::Connection::can_send`])
145    pub(super) send_new_challenge: bool,
146    /// Pending responses to PATH_CHALLENGE frames
147    pub(super) path_responses: PathResponses,
148    /// Whether we're certain the peer can both send and receive on this address
149    ///
150    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
151    /// migration. Always true for clients.
152    pub(super) validated: bool,
153    /// Total size of all UDP datagrams sent on this path
154    pub(super) total_sent: u64,
155    /// Total size of all UDP datagrams received on this path
156    pub(super) total_recvd: u64,
157    /// The state of the MTU discovery process
158    pub(super) mtud: MtuDiscovery,
159    /// Packet number of the first packet sent after an RTT sample was collected on this path
160    ///
161    /// Used in persistent congestion determination.
162    pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
163    /// The in-flight packets and bytes
164    ///
165    /// Note that this is across all spaces on this path
166    pub(super) in_flight: InFlight,
167    /// Whether this path has had it's remote address reported back to the peer. This only happens
168    /// if both peers agree to so based on their transport parameters.
169    pub(super) observed_addr_sent: bool,
170    /// Observed address frame with the largest sequence number received from the peer on this path.
171    pub(super) last_observed_addr_report: Option<ObservedAddr>,
172    /// The QUIC-MULTIPATH path status
173    pub(super) status: PathStatusState,
174    /// Number of the first packet sent on this path
175    ///
176    /// With RFC9000 §9 style migration (i.e. not multipath) the PathId does not change and
177    /// hence packet numbers continue. This is used to determine whether a packet was sent
178    /// on such an earlier path. Insufficient to determine if a packet was sent on a later
179    /// path.
180    first_packet: Option<u64>,
181    /// The number of times a PTO has been sent without receiving an ack.
182    pub(super) pto_count: u32,
183
184    //
185    // Per-path idle & keep alive
186    //
187    /// Idle timeout for the path
188    ///
189    /// If expired, the path will be abandoned.  This is different from the connection-wide
190    /// idle timeout which closes the connection if expired.
191    pub(super) idle_timeout: Option<Duration>,
192    /// Keep alives to send on this path
193    ///
194    /// There is also a connection-level keep alive configured in the
195    /// [`TransportParameters`].  This triggers activity on any path which can keep the
196    /// connection alive.
197    ///
198    /// [`TransportParameters`]: crate::transport_parameters::TransportParameters
199    pub(super) keep_alive: Option<Duration>,
200
201    /// Whether the path has already been considered opened from an application perspective
202    ///
203    /// This means, for paths other than the original [`PathId::ZERO`], a first path challenge has
204    /// been responded to, regardless of the initial validation status of the path. This state is
205    /// irreversible, since it's not affected by the path being closed.
206    pub(super) open: bool,
207
208    /// The time at which this path state should've received a PATH_ABANDON already.
209    ///
210    /// Receiving data on this path generates a transport error after that point in time.
211    /// This is checked in [`crate::Connection::on_packet_authenticated`].
212    ///
213    /// If set to `None`, then this path isn't abandoned yet and is allowed to receive data.
214    pub(super) last_allowed_receive: Option<Instant>,
215
216    /// Snapshot of the qlog recovery metrics
217    #[cfg(feature = "qlog")]
218    recovery_metrics: RecoveryMetrics,
219
220    /// Tag uniquely identifying a path in a connection
221    generation: u64,
222}
223
224impl PathData {
225    pub(super) fn new(
226        remote: SocketAddr,
227        allow_mtud: bool,
228        peer_max_udp_payload_size: Option<u16>,
229        generation: u64,
230        now: Instant,
231        config: &TransportConfig,
232    ) -> Self {
233        let congestion = config
234            .congestion_controller_factory
235            .clone()
236            .build(now, config.get_initial_mtu());
237        Self {
238            remote,
239            rtt: RttEstimator::new(config.initial_rtt),
240            sending_ecn: true,
241            pacing: Pacer::new(
242                config.initial_rtt,
243                congestion.initial_window(),
244                config.get_initial_mtu(),
245                now,
246            ),
247            congestion,
248            challenges_sent: Default::default(),
249            send_new_challenge: false,
250            path_responses: PathResponses::default(),
251            validated: false,
252            total_sent: 0,
253            total_recvd: 0,
254            mtud: config
255                .mtu_discovery_config
256                .as_ref()
257                .filter(|_| allow_mtud)
258                .map_or(
259                    MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
260                    |mtud_config| {
261                        MtuDiscovery::new(
262                            config.get_initial_mtu(),
263                            config.min_mtu,
264                            peer_max_udp_payload_size,
265                            mtud_config.clone(),
266                        )
267                    },
268                ),
269            first_packet_after_rtt_sample: None,
270            in_flight: InFlight::new(),
271            observed_addr_sent: false,
272            last_observed_addr_report: None,
273            status: Default::default(),
274            first_packet: None,
275            pto_count: 0,
276            idle_timeout: None,
277            keep_alive: None,
278            open: false,
279            last_allowed_receive: None,
280            #[cfg(feature = "qlog")]
281            recovery_metrics: RecoveryMetrics::default(),
282            generation,
283        }
284    }
285
286    /// Create a new path from a previous one.
287    ///
288    /// This should only be called when migrating paths.
289    pub(super) fn from_previous(
290        remote: SocketAddr,
291        prev: &Self,
292        generation: u64,
293        now: Instant,
294    ) -> Self {
295        let congestion = prev.congestion.clone_box();
296        let smoothed_rtt = prev.rtt.get();
297        Self {
298            remote,
299            rtt: prev.rtt,
300            pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
301            sending_ecn: true,
302            congestion,
303            challenges_sent: Default::default(),
304            send_new_challenge: false,
305            path_responses: PathResponses::default(),
306            validated: false,
307            total_sent: 0,
308            total_recvd: 0,
309            mtud: prev.mtud.clone(),
310            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
311            in_flight: InFlight::new(),
312            observed_addr_sent: false,
313            last_observed_addr_report: None,
314            status: prev.status.clone(),
315            first_packet: None,
316            pto_count: 0,
317            idle_timeout: prev.idle_timeout,
318            keep_alive: prev.keep_alive,
319            open: false,
320            last_allowed_receive: None,
321            #[cfg(feature = "qlog")]
322            recovery_metrics: prev.recovery_metrics.clone(),
323            generation,
324        }
325    }
326
327    /// Whether we're in the process of validating this path with PATH_CHALLENGEs
328    pub(super) fn is_validating_path(&self) -> bool {
329        !self.challenges_sent.is_empty() || self.send_new_challenge
330    }
331
332    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
333    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
334    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
335        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
336    }
337
338    /// Returns the path's current MTU
339    pub(super) fn current_mtu(&self) -> u16 {
340        self.mtud.current_mtu()
341    }
342
343    /// Account for transmission of `packet` with number `pn` in `space`
344    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
345        self.in_flight.insert(&packet);
346        if self.first_packet.is_none() {
347            self.first_packet = Some(pn);
348        }
349        if let Some(forgotten) = space.sent(pn, packet) {
350            self.remove_in_flight(&forgotten);
351        }
352    }
353
354    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
355    /// `false` if `pn` was sent before this path was established.
356    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
357        if packet.path_generation != self.generation {
358            return false;
359        }
360        self.in_flight.remove(packet);
361        true
362    }
363
364    /// Increment the total size of sent UDP datagrams
365    pub(super) fn inc_total_sent(&mut self, inc: u64) {
366        self.total_sent = self.total_sent.saturating_add(inc);
367        if !self.validated {
368            trace!(
369                remote = %self.remote,
370                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
371                "anti amplification budget decreased"
372            );
373        }
374    }
375
376    /// Increment the total size of received UDP datagrams
377    pub(super) fn inc_total_recvd(&mut self, inc: u64) {
378        self.total_recvd = self.total_recvd.saturating_add(inc);
379        if !self.validated {
380            trace!(
381                remote = %self.remote,
382                anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
383                "anti amplification budget increased"
384            );
385        }
386    }
387
388    #[cfg(feature = "qlog")]
389    pub(super) fn qlog_recovery_metrics(
390        &mut self,
391        path_id: PathId,
392    ) -> Option<RecoveryMetricsUpdated> {
393        let controller_metrics = self.congestion.metrics();
394
395        let metrics = RecoveryMetrics {
396            min_rtt: Some(self.rtt.min),
397            smoothed_rtt: Some(self.rtt.get()),
398            latest_rtt: Some(self.rtt.latest),
399            rtt_variance: Some(self.rtt.var),
400            pto_count: Some(self.pto_count),
401            bytes_in_flight: Some(self.in_flight.bytes),
402            packets_in_flight: Some(self.in_flight.ack_eliciting),
403
404            congestion_window: Some(controller_metrics.congestion_window),
405            ssthresh: controller_metrics.ssthresh,
406            pacing_rate: controller_metrics.pacing_rate,
407        };
408
409        let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
410        self.recovery_metrics = metrics;
411        event
412    }
413
414    /// Return how long we need to wait before sending `bytes_to_send`
415    ///
416    /// See [`Pacer::delay`].
417    pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Instant> {
418        let smoothed_rtt = self.rtt.get();
419        self.pacing.delay(
420            smoothed_rtt,
421            bytes_to_send,
422            self.current_mtu(),
423            self.congestion.window(),
424            now,
425        )
426    }
427
428    /// Updates the last observed address report received on this path.
429    ///
430    /// If the address was updated, it's returned to be informed to the application.
431    #[must_use = "updated observed address must be reported to the application"]
432    pub(super) fn update_observed_addr_report(
433        &mut self,
434        observed: ObservedAddr,
435    ) -> Option<SocketAddr> {
436        match self.last_observed_addr_report.as_mut() {
437            Some(prev) => {
438                if prev.seq_no >= observed.seq_no {
439                    // frames that do not increase the sequence number on this path are ignored
440                    None
441                } else if prev.ip == observed.ip && prev.port == observed.port {
442                    // keep track of the last seq_no but do not report the address as updated
443                    prev.seq_no = observed.seq_no;
444                    None
445                } else {
446                    let addr = observed.socket_addr();
447                    self.last_observed_addr_report = Some(observed);
448                    Some(addr)
449                }
450            }
451            None => {
452                let addr = observed.socket_addr();
453                self.last_observed_addr_report = Some(observed);
454                Some(addr)
455            }
456        }
457    }
458
459    pub(crate) fn remote_status(&self) -> Option<PathStatus> {
460        self.status.remote_status.map(|(_seq, status)| status)
461    }
462
463    pub(crate) fn local_status(&self) -> PathStatus {
464        self.status.local_status
465    }
466
467    pub(super) fn generation(&self) -> u64 {
468        self.generation
469    }
470}
471
472/// Congestion metrics as described in [`recovery_metrics_updated`].
473///
474/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
475#[cfg(feature = "qlog")]
476#[derive(Default, Clone, PartialEq, Debug)]
477#[non_exhaustive]
478struct RecoveryMetrics {
479    pub min_rtt: Option<Duration>,
480    pub smoothed_rtt: Option<Duration>,
481    pub latest_rtt: Option<Duration>,
482    pub rtt_variance: Option<Duration>,
483    pub pto_count: Option<u32>,
484    pub bytes_in_flight: Option<u64>,
485    pub packets_in_flight: Option<u64>,
486    pub congestion_window: Option<u64>,
487    pub ssthresh: Option<u64>,
488    pub pacing_rate: Option<u64>,
489}
490
491#[cfg(feature = "qlog")]
492impl RecoveryMetrics {
493    /// Retain only values that have been updated since the last snapshot.
494    fn retain_updated(&self, previous: &Self) -> Self {
495        macro_rules! keep_if_changed {
496            ($name:ident) => {
497                if previous.$name == self.$name {
498                    None
499                } else {
500                    self.$name
501                }
502            };
503        }
504
505        Self {
506            min_rtt: keep_if_changed!(min_rtt),
507            smoothed_rtt: keep_if_changed!(smoothed_rtt),
508            latest_rtt: keep_if_changed!(latest_rtt),
509            rtt_variance: keep_if_changed!(rtt_variance),
510            pto_count: keep_if_changed!(pto_count),
511            bytes_in_flight: keep_if_changed!(bytes_in_flight),
512            packets_in_flight: keep_if_changed!(packets_in_flight),
513            congestion_window: keep_if_changed!(congestion_window),
514            ssthresh: keep_if_changed!(ssthresh),
515            pacing_rate: keep_if_changed!(pacing_rate),
516        }
517    }
518
519    /// Emit a `MetricsUpdated` event containing only updated values
520    fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
521        let updated = self.retain_updated(previous);
522
523        if updated == Self::default() {
524            return None;
525        }
526
527        Some(RecoveryMetricsUpdated {
528            min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
529            smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
530            latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
531            rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
532            pto_count: updated
533                .pto_count
534                .map(|count| count.try_into().unwrap_or(u16::MAX)),
535            bytes_in_flight: updated.bytes_in_flight,
536            packets_in_flight: updated.packets_in_flight,
537            congestion_window: updated.congestion_window,
538            ssthresh: updated.ssthresh,
539            pacing_rate: updated.pacing_rate,
540            path_id: Some(path_id.as_u32() as u64),
541        })
542    }
543}
544
545/// RTT estimation for a particular network path
546#[derive(Copy, Clone, Debug)]
547pub struct RttEstimator {
548    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
549    latest: Duration,
550    /// The smoothed RTT of the connection, computed as described in RFC6298
551    smoothed: Option<Duration>,
552    /// The RTT variance, computed as described in RFC6298
553    var: Duration,
554    /// The minimum RTT seen in the connection, ignoring ack delay.
555    min: Duration,
556}
557
558impl RttEstimator {
559    pub(super) fn new(initial_rtt: Duration) -> Self {
560        Self {
561            latest: initial_rtt,
562            smoothed: None,
563            var: initial_rtt / 2,
564            min: initial_rtt,
565        }
566    }
567
568    /// Resets the estimator using a new initial_rtt value.
569    ///
570    /// This only resets the initial_rtt **if** no samples have been recorded yet. If there
571    /// are any recorded samples the initial estimate can not be adjusted after the fact.
572    ///
573    /// This is useful when you receive a PATH_RESPONSE in the first packet received on a
574    /// new path. In this case you can use the delay of the PATH_CHALLENGE-PATH_RESPONSE as
575    /// the initial RTT to get a better expected estimation.
576    ///
577    /// A PATH_CHALLENGE-PATH_RESPONSE pair later in the connection should not be used
578    /// explicitly as an estimation since PATH_CHALLENGE is an ACK-eliciting packet itself
579    /// already.
580    pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
581        if self.smoothed.is_none() {
582            self.latest = initial_rtt;
583            self.var = initial_rtt / 2;
584            self.min = initial_rtt;
585        }
586    }
587
588    /// The current best RTT estimation.
589    pub fn get(&self) -> Duration {
590        self.smoothed.unwrap_or(self.latest)
591    }
592
593    /// Conservative estimate of RTT
594    ///
595    /// Takes the maximum of smoothed and latest RTT, as recommended
596    /// in 6.1.2 of the recovery spec (draft 29).
597    pub fn conservative(&self) -> Duration {
598        self.get().max(self.latest)
599    }
600
601    /// Minimum RTT registered so far for this estimator.
602    pub fn min(&self) -> Duration {
603        self.min
604    }
605
606    /// PTO computed as described in RFC9002#6.2.1.
607    pub(crate) fn pto_base(&self) -> Duration {
608        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
609    }
610
611    /// Records an RTT sample.
612    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
613        self.latest = rtt;
614        // https://www.rfc-editor.org/rfc/rfc9002.html#section-5.2-3:
615        // min_rtt does not adjust for ack_delay to avoid underestimating.
616        self.min = cmp::min(self.min, self.latest);
617        // Based on RFC6298.
618        if let Some(smoothed) = self.smoothed {
619            let adjusted_rtt = if self.min + ack_delay <= self.latest {
620                self.latest - ack_delay
621            } else {
622                self.latest
623            };
624            let var_sample = smoothed.abs_diff(adjusted_rtt);
625            self.var = (3 * self.var + var_sample) / 4;
626            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
627        } else {
628            self.smoothed = Some(self.latest);
629            self.var = self.latest / 2;
630            self.min = self.latest;
631        }
632    }
633}
634
635#[derive(Default, Debug)]
636pub(crate) struct PathResponses {
637    pending: Vec<PathResponse>,
638}
639
640impl PathResponses {
641    pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
642        /// Arbitrary permissive limit to prevent abuse
643        const MAX_PATH_RESPONSES: usize = 16;
644        let response = PathResponse {
645            packet,
646            token,
647            remote,
648        };
649        let existing = self.pending.iter_mut().find(|x| x.remote == remote);
650        if let Some(existing) = existing {
651            // Update a queued response
652            if existing.packet <= packet {
653                *existing = response;
654            }
655            return;
656        }
657        if self.pending.len() < MAX_PATH_RESPONSES {
658            self.pending.push(response);
659        } else {
660            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
661            // older challenges.
662            trace!("ignoring excessive PATH_CHALLENGE");
663        }
664    }
665
666    pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
667        let response = *self.pending.last()?;
668        if response.remote == remote {
669            // We don't bother searching further because we expect that the on-path response will
670            // get drained in the immediate future by a call to `pop_on_path`
671            return None;
672        }
673        self.pending.pop();
674        Some((response.token, response.remote))
675    }
676
677    pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
678        let response = *self.pending.last()?;
679        if response.remote != remote {
680            // We don't bother searching further because we expect that the off-path response will
681            // get drained in the immediate future by a call to `pop_off_path`
682            return None;
683        }
684        self.pending.pop();
685        Some(response.token)
686    }
687
688    pub(crate) fn is_empty(&self) -> bool {
689        self.pending.is_empty()
690    }
691}
692
693#[derive(Copy, Clone, Debug)]
694struct PathResponse {
695    /// The packet number the corresponding PATH_CHALLENGE was received in
696    packet: u64,
697    /// The token of the PATH_CHALLENGE
698    token: u64,
699    /// The address the corresponding PATH_CHALLENGE was received from
700    remote: SocketAddr,
701}
702
703/// Summary statistics of packets that have been sent on a particular path, but which have not yet
704/// been acked or deemed lost
705#[derive(Debug)]
706pub(super) struct InFlight {
707    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
708    ///
709    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
710    /// count towards this to ensure congestion control does not impede congestion feedback.
711    pub(super) bytes: u64,
712    /// Number of packets in flight containing frames other than ACK and PADDING
713    ///
714    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
715    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
716    /// also be nonzero.
717    pub(super) ack_eliciting: u64,
718}
719
720impl InFlight {
721    fn new() -> Self {
722        Self {
723            bytes: 0,
724            ack_eliciting: 0,
725        }
726    }
727
728    fn insert(&mut self, packet: &SentPacket) {
729        self.bytes += u64::from(packet.size);
730        self.ack_eliciting += u64::from(packet.ack_eliciting);
731    }
732
733    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
734    fn remove(&mut self, packet: &SentPacket) {
735        self.bytes -= u64::from(packet.size);
736        self.ack_eliciting -= u64::from(packet.ack_eliciting);
737    }
738}
739
740/// State for QUIC-MULTIPATH PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP frames
741#[derive(Debug, Clone, Default)]
742pub(super) struct PathStatusState {
743    /// The local status
744    local_status: PathStatus,
745    /// Local sequence number, for both PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP
746    ///
747    /// This is the number of the *next* path status frame to be sent.
748    local_seq: VarInt,
749    /// The status set by the remote
750    remote_status: Option<(VarInt, PathStatus)>,
751}
752
753impl PathStatusState {
754    /// To be called on received PATH_STATUS_AVAILABLE/PATH_STATUS_BACKUP frames
755    pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
756        if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
757            return trace!(%seq, "ignoring path status update");
758        }
759
760        let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
761        if prev != Some(status) {
762            debug!(?status, ?seq, "remote changed path status");
763        }
764    }
765
766    /// Updates the local status
767    ///
768    /// If the local status changed, the previous value is returned
769    pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
770        if self.local_status == status {
771            return None;
772        }
773
774        self.local_seq = self.local_seq.saturating_add(1u8);
775        Some(std::mem::replace(&mut self.local_status, status))
776    }
777
778    pub(crate) fn seq(&self) -> VarInt {
779        self.local_seq
780    }
781}
782
783/// The QUIC-MULTIPATH path status
784///
785/// See section "3.3 Path Status Management":
786/// <https://quicwg.org/multipath/draft-ietf-quic-multipath.html#name-path-status-management>
787#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
788pub enum PathStatus {
789    /// Paths marked with as available will be used when scheduling packets
790    ///
791    /// If multiple paths are available, packets will be scheduled on whichever has
792    /// capacity.
793    #[default]
794    Available,
795    /// Paths marked as backup will only be used if there are no available paths
796    ///
797    /// If the max_idle_timeout is specified the path will be kept alive so that it does not
798    /// expire.
799    Backup,
800}
801
802/// Application events about paths
803#[derive(Debug, Clone, PartialEq, Eq)]
804pub enum PathEvent {
805    /// A new path has been opened
806    Opened {
807        /// Which path is now open
808        id: PathId,
809    },
810    /// A path has been closed
811    Closed {
812        /// Which path has been closed
813        id: PathId,
814        /// Error code supplied by the peer
815        /// See <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-14.html#name-error-codes>
816        /// for a list of known errors.
817        error_code: VarInt,
818    },
819    /// All remaining state for a path has been removed
820    ///
821    /// The [`PathEvent::Closed`] would have been emitted for this path earlier.
822    Abandoned {
823        /// Which path had its state dropped
824        id: PathId,
825        /// The final path stats, they are no longer available via [`Connection::stats`]
826        ///
827        /// [`Connection::stats`]: super::Connection::stats
828        path_stats: PathStats,
829    },
830    /// Path was closed locally
831    LocallyClosed {
832        /// Path for which the error occurred
833        id: PathId,
834        /// The error that occurred
835        error: PathError,
836    },
837    /// The remote changed the status of the path
838    ///
839    /// The local status is not changed because of this event. It is up to the application
840    /// to update the local status, which is used for packet scheduling, when the remote
841    /// changes the status.
842    RemoteStatus {
843        /// Path which has changed status
844        id: PathId,
845        /// The new status set by the remote
846        status: PathStatus,
847    },
848    /// Received an observation of our external address from the peer.
849    ObservedAddr {
850        /// Path over which the observed address was reported, [`PathId::ZERO`] when multipath is
851        /// not negotiated
852        id: PathId,
853        /// The address observed by the remote over this path
854        addr: SocketAddr,
855    },
856}
857
858/// Error from setting path status
859#[derive(Debug, Error, Clone, PartialEq, Eq)]
860pub enum SetPathStatusError {
861    /// Error indicating that a path has not been opened or has already been abandoned
862    #[error("closed path")]
863    ClosedPath,
864    /// Error indicating that this operation requires multipath to be negotiated whereas it hasn't been
865    #[error("multipath not negotiated")]
866    MultipathNotNegotiated,
867}
868
869/// Error indicating that a path has not been opened or has already been abandoned
870#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
871#[error("closed path")]
872pub struct ClosedPath {
873    pub(super) _private: (),
874}
875
876#[cfg(test)]
877mod tests {
878    use super::*;
879
880    #[test]
881    fn test_path_id_saturating_add() {
882        // add within range behaves normally
883        let large: PathId = u16::MAX.into();
884        let next = u32::from(u16::MAX) + 1;
885        assert_eq!(large.saturating_add(1u8), PathId::from(next));
886
887        // outside range saturates
888        assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
889    }
890}