noq_proto/connection/paths.rs
1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8 PathStats, SpaceKind,
9 mtud::MtuDiscovery,
10 pacing::Pacer,
11 spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14 ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig,
15 TransportErrorCode, VarInt,
16 coding::{self, Decodable, Encodable},
17 congestion,
18 frame::ObservedAddr,
19};
20
21#[cfg(feature = "qlog")]
22use qlog::events::quic::RecoveryMetricsUpdated;
23
24/// Id representing different paths when using multipath extension
25#[cfg_attr(test, derive(test_strategy::Arbitrary))]
26#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
27pub struct PathId(pub(crate) u32);
28
29impl std::hash::Hash for PathId {
30 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
31 state.write_u32(self.0);
32 }
33}
34
35impl identity_hash::IdentityHashable for PathId {}
36
37impl Decodable for PathId {
38 fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
39 let v = VarInt::decode(r)?;
40 let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
41 Ok(Self(v))
42 }
43}
44
45impl Encodable for PathId {
46 fn encode<B: bytes::BufMut>(&self, w: &mut B) {
47 VarInt(self.0.into()).encode(w)
48 }
49}
50
51impl PathId {
52 /// The maximum path ID allowed.
53 pub const MAX: Self = Self(u32::MAX);
54
55 /// The 0 path id.
56 pub const ZERO: Self = Self(0);
57
58 /// The number of bytes this [`PathId`] uses when encoded as a [`VarInt`]
59 pub(crate) const fn size(&self) -> usize {
60 VarInt(self.0 as u64).size()
61 }
62
63 /// Saturating integer addition. Computes self + rhs, saturating at the numeric bounds instead
64 /// of overflowing.
65 pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
66 let rhs = rhs.into();
67 let inner = self.0.saturating_add(rhs.0);
68 Self(inner)
69 }
70
71 /// Saturating integer subtraction. Computes self - rhs, saturating at the numeric bounds
72 /// instead of overflowing.
73 pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
74 let rhs = rhs.into();
75 let inner = self.0.saturating_sub(rhs.0);
76 Self(inner)
77 }
78
79 /// Get the next [`PathId`]
80 pub(crate) fn next(&self) -> Self {
81 self.saturating_add(Self(1))
82 }
83
84 /// Get the underlying u32
85 pub(crate) fn as_u32(&self) -> u32 {
86 self.0
87 }
88}
89
90impl std::fmt::Display for PathId {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 self.0.fmt(f)
93 }
94}
95
96impl<T: Into<u32>> From<T> for PathId {
97 fn from(source: T) -> Self {
98 Self(source.into())
99 }
100}
101
102/// State needed for a single path ID.
103///
104/// A single path ID can migrate according to the rules in RFC9000 §9, either voluntary or
105/// involuntary. We need to keep the [`PathData`] of the previously used such path available
106/// in order to defend against migration attacks (see RFC9000 §9.3.1, §9.3.2 and §9.3.3) as
107/// well as to support path probing (RFC9000 §9.1).
108#[derive(Debug)]
109pub(super) struct PathState {
110 pub(super) data: PathData,
111 pub(super) prev: Option<(ConnectionId, PathData)>,
112}
113
114impl PathState {
115 /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
116 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
117 // Visit known paths from newest to oldest to find the one `pn` was sent on
118 for path_data in [&mut self.data]
119 .into_iter()
120 .chain(self.prev.as_mut().map(|(_, data)| data))
121 {
122 if path_data.remove_in_flight(packet) {
123 return;
124 }
125 }
126 }
127}
128
129#[derive(Debug)]
130pub(super) struct SentChallengeInfo {
131 /// When was the challenge sent on the wire.
132 pub(super) sent_instant: Instant,
133 /// The 4-tuple on which this path challenge was sent.
134 pub(super) network_path: FourTuple,
135}
136
137/// Description of a particular network path
138#[derive(Debug)]
139pub(super) struct PathData {
140 pub(super) network_path: FourTuple,
141 pub(super) rtt: RttEstimator,
142 /// Whether we're enabling ECN on outgoing packets
143 pub(super) sending_ecn: bool,
144 /// Congestion controller state
145 pub(super) congestion: Box<dyn congestion::Controller>,
146 /// Pacing state
147 pub(super) pacing: Pacer,
148 /// Path challenges sent (on the wire, on-path) that we didn't receive a path response for yet
149 on_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
150 /// Path challenges sent (on the wire, off-path) that we didn't receive a path response for yet
151 off_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
152 /// Whether to trigger sending another PATH_CHALLENGE in the next poll_transmit.
153 ///
154 /// This is picked up by [`super::Connection::space_can_send`].
155 ///
156 /// Only used for RFC9000-style path migration and multipath path validation (for opening).
157 ///
158 /// This is **not used** for n0 nat traversal challenge sending.
159 pub(super) pending_on_path_challenge: bool,
160 /// Pending responses to PATH_CHALLENGE frames
161 pub(super) path_responses: PathResponses,
162 /// Whether we're certain the peer can both send and receive on this address
163 ///
164 /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
165 /// migration. Always true for clients.
166 pub(super) validated: bool,
167 /// Total size of all UDP datagrams sent on this path
168 pub(super) total_sent: u64,
169 /// Total size of all UDP datagrams received on this path
170 pub(super) total_recvd: u64,
171 /// The state of the MTU discovery process
172 pub(super) mtud: MtuDiscovery,
173 /// Packet number of the first packet sent after an RTT sample was collected on this path
174 ///
175 /// Used in persistent congestion determination.
176 pub(super) first_packet_after_rtt_sample: Option<(SpaceKind, u64)>,
177 /// The in-flight packets and bytes
178 ///
179 /// Note that this is across all spaces on this path
180 pub(super) in_flight: InFlight,
181 /// Whether this path has had it's remote address reported back to the peer. This only happens
182 /// if both peers agree to so based on their transport parameters.
183 pub(super) observed_addr_sent: bool,
184 /// Observed address frame with the largest sequence number received from the peer on this path.
185 pub(super) last_observed_addr_report: Option<ObservedAddr>,
186 /// The QUIC-MULTIPATH path status
187 pub(super) status: PathStatusState,
188 /// Number of the first packet sent on this path
189 ///
190 /// With RFC9000 §9 style migration (i.e. not multipath) the PathId does not change and
191 /// hence packet numbers continue. This is used to determine whether a packet was sent
192 /// on such an earlier path. Insufficient to determine if a packet was sent on a later
193 /// path.
194 first_packet: Option<u64>,
195 /// The number of times a PTO has been sent without receiving an ack.
196 pub(super) pto_count: u32,
197
198 //
199 // Per-path idle & keep alive
200 //
201 /// Idle timeout for the path
202 ///
203 /// If expired, the path will be abandoned. This is different from the connection-wide
204 /// idle timeout which closes the connection if expired.
205 pub(super) idle_timeout: Option<Duration>,
206 /// Keep alives to send on this path
207 ///
208 /// There is also a connection-level keep alive configured in the
209 /// [`TransportParameters`]. This triggers activity on any path which can keep the
210 /// connection alive.
211 ///
212 /// [`TransportParameters`]: crate::transport_parameters::TransportParameters
213 pub(super) keep_alive: Option<Duration>,
214
215 /// Whether the path has already been considered opened from an application perspective.
216 ///
217 /// This means, for paths other than the original [`PathId::ZERO`], a first path challenge has
218 /// been responded to, regardless of the initial validation status of the path. This state is
219 /// irreversible, since it's not affected by the path being closed.
220 ///
221 /// Sending a PATH_CHALLENGE and receiving a valid response before the application is informed
222 /// of the path, is a way to ensure the path is usable before it is reported. This is not
223 /// required by the spec, and in the future might be changed for simply requiring a first ack'd
224 /// packet.
225 pub(super) open_status: OpenStatus,
226
227 /// Whether we're currently draining the path after having abandoned it.
228 ///
229 /// This should only be true when a path discard timer is armed, and after the path was
230 /// abandoned (and added to the abandoned_paths set).
231 ///
232 /// This will only ever be set from false to true.
233 pub(super) draining: bool,
234
235 /// Snapshot of the qlog recovery metrics
236 #[cfg(feature = "qlog")]
237 recovery_metrics: RecoveryMetrics,
238
239 /// Tag uniquely identifying a path in a connection.
240 ///
241 /// When a migration happens on the same [`PathId`] we still detect a change in the
242 /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
243 /// value to keep track of which 4-tuple a packet belonged to.
244 generation: u64,
245}
246
247impl PathData {
248 pub(super) fn new(
249 network_path: FourTuple,
250 allow_mtud: bool,
251 peer_max_udp_payload_size: Option<u16>,
252 generation: u64,
253 now: Instant,
254 config: &TransportConfig,
255 ) -> Self {
256 let congestion = config
257 .congestion_controller_factory
258 .clone()
259 .build(now, config.get_initial_mtu());
260 Self {
261 network_path,
262 rtt: RttEstimator::new(config.initial_rtt),
263 sending_ecn: true,
264 pacing: Pacer::new(
265 config.initial_rtt,
266 congestion.initial_window(),
267 config.get_initial_mtu(),
268 now,
269 ),
270 congestion,
271 on_path_challenges_unconfirmed: Default::default(),
272 off_path_challenges_unconfirmed: Default::default(),
273 pending_on_path_challenge: false,
274 path_responses: PathResponses::default(),
275 validated: false,
276 total_sent: 0,
277 total_recvd: 0,
278 mtud: config
279 .mtu_discovery_config
280 .as_ref()
281 .filter(|_| allow_mtud)
282 .map_or_else(
283 || MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
284 |mtud_config| {
285 MtuDiscovery::new(
286 config.get_initial_mtu(),
287 config.min_mtu,
288 peer_max_udp_payload_size,
289 mtud_config.clone(),
290 )
291 },
292 ),
293 first_packet_after_rtt_sample: None,
294 in_flight: InFlight::new(),
295 observed_addr_sent: false,
296 last_observed_addr_report: None,
297 status: Default::default(),
298 first_packet: None,
299 pto_count: 0,
300 idle_timeout: config.default_path_max_idle_timeout,
301 keep_alive: config.default_path_keep_alive_interval,
302 open_status: OpenStatus::default(),
303 draining: false,
304 #[cfg(feature = "qlog")]
305 recovery_metrics: RecoveryMetrics::default(),
306 generation,
307 }
308 }
309
310 /// Create a new path from a previous one.
311 ///
312 /// This should only be called when migrating paths.
313 pub(super) fn from_previous(
314 network_path: FourTuple,
315 prev: &Self,
316 generation: u64,
317 now: Instant,
318 ) -> Self {
319 let congestion = prev.congestion.clone_box();
320 let smoothed_rtt = prev.rtt.get();
321 Self {
322 network_path,
323 rtt: prev.rtt,
324 pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
325 sending_ecn: true,
326 congestion,
327 on_path_challenges_unconfirmed: Default::default(),
328 off_path_challenges_unconfirmed: Default::default(),
329 pending_on_path_challenge: false,
330 path_responses: PathResponses::default(),
331 validated: false,
332 total_sent: 0,
333 total_recvd: 0,
334 mtud: prev.mtud.clone(),
335 first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
336 in_flight: InFlight::new(),
337 observed_addr_sent: false,
338 last_observed_addr_report: None,
339 status: prev.status.clone(),
340 first_packet: None,
341 pto_count: 0,
342 idle_timeout: prev.idle_timeout,
343 keep_alive: prev.keep_alive,
344 open_status: OpenStatus::default(),
345 draining: false,
346 #[cfg(feature = "qlog")]
347 recovery_metrics: prev.recovery_metrics.clone(),
348 generation,
349 }
350 }
351
352 /// Whether we're in the process of validating this path with PATH_CHALLENGEs
353 pub(super) fn is_validating_path(&self) -> bool {
354 !self.on_path_challenges_unconfirmed.is_empty() || self.pending_on_path_challenge
355 }
356
357 /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
358 /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
359 pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
360 !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
361 }
362
363 /// Returns the path's current MTU
364 pub(super) fn current_mtu(&self) -> u16 {
365 self.mtud.current_mtu()
366 }
367
368 /// Account for transmission of `packet` with number `pn` in `space`
369 pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
370 self.in_flight.insert(&packet);
371 if self.first_packet.is_none() {
372 self.first_packet = Some(pn);
373 }
374 if let Some(forgotten) = space.sent(pn, packet) {
375 self.remove_in_flight(&forgotten);
376 }
377 }
378
379 pub(super) fn record_path_challenge_sent(
380 &mut self,
381 now: Instant,
382 token: u64,
383 network_path: FourTuple,
384 ) {
385 let info = SentChallengeInfo {
386 sent_instant: now,
387 network_path,
388 };
389 if network_path == self.network_path {
390 self.on_path_challenges_unconfirmed.insert(token, info);
391 } else {
392 self.off_path_challenges_unconfirmed.insert(token, info);
393 }
394 }
395
396 /// Remove `packet` with number `pn` from this path's congestion control counters, or return
397 /// `false` if `pn` was sent before this path was established.
398 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
399 if packet.path_generation != self.generation {
400 return false;
401 }
402 self.in_flight.remove(packet);
403 true
404 }
405
406 /// Increment the total size of sent UDP datagrams
407 pub(super) fn inc_total_sent(&mut self, inc: u64) {
408 self.total_sent = self.total_sent.saturating_add(inc);
409 if !self.validated {
410 trace!(
411 network_path = %self.network_path,
412 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
413 "anti amplification budget decreased"
414 );
415 }
416 }
417
418 /// Increment the total size of received UDP datagrams
419 pub(super) fn inc_total_recvd(&mut self, inc: u64) {
420 self.total_recvd = self.total_recvd.saturating_add(inc);
421 if !self.validated {
422 trace!(
423 network_path = %self.network_path,
424 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
425 "anti amplification budget increased"
426 );
427 }
428 }
429
430 /// The earliest time at which an on-path challenge we sent is considered lost.
431 pub(super) fn earliest_on_path_expiring_challenge(&self) -> Option<Instant> {
432 if self.on_path_challenges_unconfirmed.is_empty() {
433 return None;
434 }
435 let pto = self.rtt.pto_base();
436 self.on_path_challenges_unconfirmed
437 .values()
438 .map(|info| info.sent_instant + pto)
439 .min()
440 }
441
442 /// Handle receiving a PATH_RESPONSE.
443 pub(super) fn on_path_response_received(
444 &mut self,
445 now: Instant,
446 token: u64,
447 network_path: FourTuple,
448 ) -> OnPathResponseReceived {
449 // > § 8.2.3
450 // > Path validation succeeds when a PATH_RESPONSE frame is received that contains the
451 // > data that was sent in a previous PATH_CHALLENGE frame. A PATH_RESPONSE frame
452 // > received on any network path validates the path on which the PATH_CHALLENGE was
453 // > sent.
454 //
455 // At this point we have three potentially different network paths:
456 // - current network path (`Self::network_path`)
457 // - network path used to send the path challenge (`SentChallengeInfo::network_path`)
458 // - network path over which the response arrived (`network_path`)
459 //
460 // As per the spec, this only validates the network path on which this was *sent*.
461 match self.on_path_challenges_unconfirmed.remove(&token) {
462 // Response to an on-path PathChallenge that validates this path.
463 // The sent path should match the current path. However, it's possible that the
464 // challenge was sent when no local_ip was known. This case is allowed as well
465 Some(info) if info.network_path.is_probably_same_path(&self.network_path) => {
466 self.network_path.update_local_if_same_remote(&network_path);
467 let sent_instant = info.sent_instant;
468 if !std::mem::replace(&mut self.validated, true) {
469 trace!("new path validated");
470 }
471 // Clear any other on-path sent challenge
472 self.on_path_challenges_unconfirmed.clear();
473
474 self.pending_on_path_challenge = false;
475
476 // This RTT can only be used for the initial RTT, not as a normal
477 // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
478 let rtt = now.saturating_duration_since(sent_instant);
479 self.rtt.reset_initial_rtt(rtt);
480
481 let prev_status = std::mem::replace(&mut self.open_status, OpenStatus::Informed);
482 OnPathResponseReceived::OnPath {
483 was_open: prev_status == OpenStatus::Informed,
484 }
485 }
486 // Response to an on-path PathChallenge that does not validate this path
487 Some(info) => {
488 // This is a valid path response, but this validates a path we no longer have in
489 // use. Keep only sent challenges for the current path.
490
491 self.on_path_challenges_unconfirmed
492 .retain(|_token, i| i.network_path == self.network_path);
493
494 // if there are no challenges for the current path, schedule one
495 if !self.on_path_challenges_unconfirmed.is_empty() {
496 self.pending_on_path_challenge = true;
497 }
498 OnPathResponseReceived::Ignored {
499 sent_on: info.network_path,
500 current_path: self.network_path,
501 }
502 }
503 None => match self.off_path_challenges_unconfirmed.remove(&token) {
504 // Response to an off-path PathChallenge
505 Some(info) => {
506 // Since we do not store validation state for these paths, we only really care
507 // about reaching the same remote
508 self.off_path_challenges_unconfirmed
509 .retain(|_token, i| i.network_path.remote != info.network_path.remote);
510 OnPathResponseReceived::OffPath
511 }
512 // Response to an unknown PathChallenge. Does not indicate failure
513 None => OnPathResponseReceived::Unknown,
514 },
515 }
516 }
517
518 /// Removes all on-path challenges we remember and cancels sending new on-path challenges.
519 pub(super) fn reset_on_path_challenges(&mut self) {
520 self.on_path_challenges_unconfirmed.clear();
521 self.pending_on_path_challenge = false;
522 }
523
524 #[cfg(feature = "qlog")]
525 pub(super) fn qlog_recovery_metrics(
526 &mut self,
527 path_id: PathId,
528 ) -> Option<RecoveryMetricsUpdated> {
529 let controller_metrics = self.congestion.metrics();
530
531 let metrics = RecoveryMetrics {
532 min_rtt: Some(self.rtt.min),
533 smoothed_rtt: Some(self.rtt.get()),
534 latest_rtt: Some(self.rtt.latest),
535 rtt_variance: Some(self.rtt.var),
536 pto_count: Some(self.pto_count),
537 bytes_in_flight: Some(self.in_flight.bytes),
538 packets_in_flight: Some(self.in_flight.ack_eliciting),
539
540 congestion_window: Some(controller_metrics.congestion_window),
541 ssthresh: controller_metrics.ssthresh,
542 pacing_rate: controller_metrics.pacing_rate,
543 };
544
545 let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
546 self.recovery_metrics = metrics;
547 event
548 }
549
550 /// Return how long we need to wait before sending `bytes_to_send`
551 ///
552 /// See [`Pacer::delay`].
553 pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Instant> {
554 let smoothed_rtt = self.rtt.get();
555 self.pacing.delay(
556 smoothed_rtt,
557 bytes_to_send,
558 self.current_mtu(),
559 self.congestion.window(),
560 now,
561 )
562 }
563
564 /// Updates the last observed address report received on this path.
565 ///
566 /// If the address was updated, it's returned to be informed to the application.
567 #[must_use = "updated observed address must be reported to the application"]
568 pub(super) fn update_observed_addr_report(
569 &mut self,
570 observed: ObservedAddr,
571 ) -> Option<SocketAddr> {
572 match self.last_observed_addr_report.as_mut() {
573 Some(prev) => {
574 if prev.seq_no >= observed.seq_no {
575 // frames that do not increase the sequence number on this path are ignored
576 None
577 } else if prev.ip == observed.ip && prev.port == observed.port {
578 // keep track of the last seq_no but do not report the address as updated
579 prev.seq_no = observed.seq_no;
580 None
581 } else {
582 let addr = observed.socket_addr();
583 self.last_observed_addr_report = Some(observed);
584 Some(addr)
585 }
586 }
587 None => {
588 let addr = observed.socket_addr();
589 self.last_observed_addr_report = Some(observed);
590 Some(addr)
591 }
592 }
593 }
594
595 pub(crate) fn remote_status(&self) -> Option<PathStatus> {
596 self.status.remote_status.map(|(_seq, status)| status)
597 }
598
599 pub(crate) fn local_status(&self) -> PathStatus {
600 self.status.local_status
601 }
602
603 /// Tag uniquely identifying a path in a connection.
604 ///
605 /// When a migration happens on the same [`PathId`] we still detect a change in the
606 /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
607 /// value to keep track of which 4-tuple a packet belonged to.
608 pub(super) fn generation(&self) -> u64 {
609 self.generation
610 }
611}
612
613pub(super) enum OnPathResponseReceived {
614 /// This response validates the path on its current remote address.
615 OnPath { was_open: bool },
616 /// This response is valid, but it's for a remote other than the path's current remote address.
617 OffPath,
618 /// The received token is unknown.
619 Unknown,
620 /// The response is valid but it's not usable for path validation.
621 Ignored {
622 sent_on: FourTuple,
623 current_path: FourTuple,
624 },
625}
626
627#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
628pub(super) enum OpenStatus {
629 /// A first packet has not been sent using this [`PathId`].
630 #[default]
631 Pending,
632 /// The first packet has been sent using this [`PathId`]. However, it is not yet deemed good
633 /// enough to be reported to the application.
634 Sent,
635 /// The application has been informed of this path.
636 Informed,
637}
638
639/// Congestion metrics as described in [`recovery_metrics_updated`].
640///
641/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
642#[cfg(feature = "qlog")]
643#[derive(Default, Clone, PartialEq, Debug)]
644#[non_exhaustive]
645struct RecoveryMetrics {
646 pub min_rtt: Option<Duration>,
647 pub smoothed_rtt: Option<Duration>,
648 pub latest_rtt: Option<Duration>,
649 pub rtt_variance: Option<Duration>,
650 pub pto_count: Option<u32>,
651 pub bytes_in_flight: Option<u64>,
652 pub packets_in_flight: Option<u64>,
653 pub congestion_window: Option<u64>,
654 pub ssthresh: Option<u64>,
655 pub pacing_rate: Option<u64>,
656}
657
658#[cfg(feature = "qlog")]
659impl RecoveryMetrics {
660 /// Retain only values that have been updated since the last snapshot.
661 fn retain_updated(&self, previous: &Self) -> Self {
662 macro_rules! keep_if_changed {
663 ($name:ident) => {
664 if previous.$name == self.$name {
665 None
666 } else {
667 self.$name
668 }
669 };
670 }
671
672 Self {
673 min_rtt: keep_if_changed!(min_rtt),
674 smoothed_rtt: keep_if_changed!(smoothed_rtt),
675 latest_rtt: keep_if_changed!(latest_rtt),
676 rtt_variance: keep_if_changed!(rtt_variance),
677 pto_count: keep_if_changed!(pto_count),
678 bytes_in_flight: keep_if_changed!(bytes_in_flight),
679 packets_in_flight: keep_if_changed!(packets_in_flight),
680 congestion_window: keep_if_changed!(congestion_window),
681 ssthresh: keep_if_changed!(ssthresh),
682 pacing_rate: keep_if_changed!(pacing_rate),
683 }
684 }
685
686 /// Emit a `MetricsUpdated` event containing only updated values
687 fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
688 let updated = self.retain_updated(previous);
689
690 if updated == Self::default() {
691 return None;
692 }
693
694 Some(RecoveryMetricsUpdated {
695 min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
696 smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
697 latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
698 rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
699 pto_count: updated
700 .pto_count
701 .map(|count| count.try_into().unwrap_or(u16::MAX)),
702 bytes_in_flight: updated.bytes_in_flight,
703 packets_in_flight: updated.packets_in_flight,
704 congestion_window: updated.congestion_window,
705 ssthresh: updated.ssthresh,
706 pacing_rate: updated.pacing_rate,
707 path_id: Some(path_id.as_u32() as u64),
708 })
709 }
710}
711
712/// RTT estimation for a particular network path
713#[derive(Copy, Clone, Debug)]
714pub struct RttEstimator {
715 /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
716 latest: Duration,
717 /// The smoothed RTT of the connection, computed as described in RFC6298
718 smoothed: Option<Duration>,
719 /// The RTT variance, computed as described in RFC6298
720 var: Duration,
721 /// The minimum RTT seen in the connection, ignoring ack delay.
722 min: Duration,
723}
724
725impl RttEstimator {
726 pub(super) fn new(initial_rtt: Duration) -> Self {
727 Self {
728 latest: initial_rtt,
729 smoothed: None,
730 var: initial_rtt / 2,
731 min: initial_rtt,
732 }
733 }
734
735 /// Resets the estimator using a new initial_rtt value.
736 ///
737 /// This only resets the initial_rtt **if** no samples have been recorded yet. If there
738 /// are any recorded samples the initial estimate can not be adjusted after the fact.
739 ///
740 /// This is useful when you receive a PATH_RESPONSE in the first packet received on a
741 /// new path. In this case you can use the delay of the PATH_CHALLENGE-PATH_RESPONSE as
742 /// the initial RTT to get a better expected estimation.
743 ///
744 /// A PATH_CHALLENGE-PATH_RESPONSE pair later in the connection should not be used
745 /// explicitly as an estimation since PATH_CHALLENGE is an ACK-eliciting packet itself
746 /// already.
747 pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
748 if self.smoothed.is_none() {
749 self.latest = initial_rtt;
750 self.var = initial_rtt / 2;
751 self.min = initial_rtt;
752 }
753 }
754
755 /// The current best RTT estimation.
756 pub fn get(&self) -> Duration {
757 self.smoothed.unwrap_or(self.latest)
758 }
759
760 /// Conservative estimate of RTT
761 ///
762 /// Takes the maximum of smoothed and latest RTT, as recommended
763 /// in 6.1.2 of the recovery spec (draft 29).
764 pub fn conservative(&self) -> Duration {
765 self.get().max(self.latest)
766 }
767
768 /// Minimum RTT registered so far for this estimator.
769 pub fn min(&self) -> Duration {
770 self.min
771 }
772
773 /// PTO computed as described in RFC9002#6.2.1.
774 pub(crate) fn pto_base(&self) -> Duration {
775 self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
776 }
777
778 /// Records an RTT sample.
779 pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
780 self.latest = rtt;
781 // https://www.rfc-editor.org/rfc/rfc9002.html#section-5.2-3:
782 // min_rtt does not adjust for ack_delay to avoid underestimating.
783 self.min = cmp::min(self.min, self.latest);
784 // Based on RFC6298.
785 if let Some(smoothed) = self.smoothed {
786 let adjusted_rtt = if self.min + ack_delay <= self.latest {
787 self.latest - ack_delay
788 } else {
789 self.latest
790 };
791 let var_sample = smoothed.abs_diff(adjusted_rtt);
792 self.var = (3 * self.var + var_sample) / 4;
793 self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
794 } else {
795 self.smoothed = Some(self.latest);
796 self.var = self.latest / 2;
797 self.min = self.latest;
798 }
799 }
800}
801
802#[derive(Default, Debug)]
803pub(crate) struct PathResponses {
804 pending: Vec<PathResponse>,
805}
806
807impl PathResponses {
808 pub(crate) fn push(&mut self, packet: u64, token: u64, network_path: FourTuple) {
809 /// Arbitrary permissive limit to prevent abuse
810 const MAX_PATH_RESPONSES: usize = 16;
811 let response = PathResponse {
812 packet,
813 token,
814 network_path,
815 };
816 let existing = self
817 .pending
818 .iter_mut()
819 .find(|x| x.network_path.remote == network_path.remote);
820 if let Some(existing) = existing {
821 // Update a queued response
822 if existing.packet <= packet {
823 *existing = response;
824 }
825 return;
826 }
827 if self.pending.len() < MAX_PATH_RESPONSES {
828 self.pending.push(response);
829 } else {
830 // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
831 // older challenges.
832 trace!("ignoring excessive PATH_CHALLENGE");
833 }
834 }
835
836 pub(crate) fn pop_off_path(&mut self, network_path: FourTuple) -> Option<(u64, FourTuple)> {
837 let response = *self.pending.last()?;
838 // We use an exact comparison here, because once we've received for the first time,
839 // we really should either already have a local_ip, or we will never get one
840 // (because our OS doesn't support it).
841 if response.network_path == network_path {
842 // We don't bother searching further because we expect that the on-path response will
843 // get drained in the immediate future by a call to `pop_on_path`
844 return None;
845 }
846 self.pending.pop();
847 Some((response.token, response.network_path))
848 }
849
850 pub(crate) fn pop_on_path(&mut self, network_path: FourTuple) -> Option<u64> {
851 let response = *self.pending.last()?;
852 // Using an exact comparison. See explanation in `pop_off_path`.
853 if response.network_path != network_path {
854 // We don't bother searching further because we expect that the off-path response will
855 // get drained in the immediate future by a call to `pop_off_path`
856 return None;
857 }
858 self.pending.pop();
859 Some(response.token)
860 }
861
862 pub(crate) fn is_empty(&self) -> bool {
863 self.pending.is_empty()
864 }
865}
866
867#[derive(Copy, Clone, Debug)]
868struct PathResponse {
869 /// The packet number the corresponding PATH_CHALLENGE was received in
870 packet: u64,
871 /// The token of the PATH_CHALLENGE
872 token: u64,
873 /// The path the corresponding PATH_CHALLENGE was received from
874 network_path: FourTuple,
875}
876
877/// Summary statistics of packets that have been sent on a particular path, but which have not yet
878/// been acked or deemed lost
879#[derive(Debug)]
880pub(super) struct InFlight {
881 /// Sum of the sizes of all sent packets considered "in flight" by congestion control
882 ///
883 /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
884 /// count towards this to ensure congestion control does not impede congestion feedback.
885 pub(super) bytes: u64,
886 /// Number of packets in flight containing frames other than ACK and PADDING
887 ///
888 /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
889 /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
890 /// also be nonzero.
891 pub(super) ack_eliciting: u64,
892}
893
894impl InFlight {
895 fn new() -> Self {
896 Self {
897 bytes: 0,
898 ack_eliciting: 0,
899 }
900 }
901
902 fn insert(&mut self, packet: &SentPacket) {
903 self.bytes += u64::from(packet.size);
904 self.ack_eliciting += u64::from(packet.ack_eliciting);
905 }
906
907 /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
908 fn remove(&mut self, packet: &SentPacket) {
909 self.bytes -= u64::from(packet.size);
910 self.ack_eliciting -= u64::from(packet.ack_eliciting);
911 }
912}
913
914/// State for QUIC-MULTIPATH PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP frames
915#[derive(Debug, Clone, Default)]
916pub(super) struct PathStatusState {
917 /// The local status
918 local_status: PathStatus,
919 /// Local sequence number, for both PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP
920 ///
921 /// This is the number of the *next* path status frame to be sent.
922 local_seq: VarInt,
923 /// The status set by the remote
924 remote_status: Option<(VarInt, PathStatus)>,
925}
926
927impl PathStatusState {
928 /// To be called on received PATH_STATUS_AVAILABLE/PATH_STATUS_BACKUP frames
929 pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
930 if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
931 return trace!(%seq, "ignoring path status update");
932 }
933
934 let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
935 if prev != Some(status) {
936 debug!(?status, ?seq, "remote changed path status");
937 }
938 }
939
940 /// Updates the local status
941 ///
942 /// If the local status changed, the previous value is returned
943 pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
944 if self.local_status == status {
945 return None;
946 }
947
948 self.local_seq = self.local_seq.saturating_add(1u8);
949 Some(std::mem::replace(&mut self.local_status, status))
950 }
951
952 pub(crate) fn seq(&self) -> VarInt {
953 self.local_seq
954 }
955}
956
957/// The QUIC-MULTIPATH path status
958///
959/// See section "3.3 Path Status Management":
960/// <https://quicwg.org/multipath/draft-ietf-quic-multipath.html#name-path-status-management>
961#[cfg_attr(test, derive(test_strategy::Arbitrary))]
962#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
963pub enum PathStatus {
964 /// Paths marked with as available will be used when scheduling packets
965 ///
966 /// If multiple paths are available, packets will be scheduled on whichever has
967 /// capacity.
968 #[default]
969 Available,
970 /// Paths marked as backup will only be used if there are no available paths
971 ///
972 /// If the max_idle_timeout is specified the path will be kept alive so that it does not
973 /// expire.
974 Backup,
975}
976
977/// Application events about paths
978#[derive(Debug, Clone, PartialEq, Eq)]
979pub enum PathEvent {
980 /// A new path has been opened
981 Opened {
982 /// Which path is now open
983 id: PathId,
984 },
985 /// A path was abandoned and is no longer usable.
986 ///
987 /// This event will always be followed by [`Self::Discarded`] after some time.
988 Abandoned {
989 /// With path was abandoned.
990 id: PathId,
991 /// Reason why this path was abandoned.
992 reason: PathAbandonReason,
993 },
994 /// A path was discarded and all remaining state for it has been removed.
995 ///
996 /// This event is the last event for a path, and is always emitted after [`Self::Abandoned`].
997 Discarded {
998 /// Which path had its state dropped
999 id: PathId,
1000 /// The final path stats, they are no longer available via [`Connection::stats`]
1001 ///
1002 /// [`Connection::stats`]: super::Connection::stats
1003 path_stats: PathStats,
1004 },
1005 /// The remote changed the status of the path
1006 ///
1007 /// The local status is not changed because of this event. It is up to the application
1008 /// to update the local status, which is used for packet scheduling, when the remote
1009 /// changes the status.
1010 RemoteStatus {
1011 /// Path which has changed status
1012 id: PathId,
1013 /// The new status set by the remote
1014 status: PathStatus,
1015 },
1016 /// Received an observation of our external address from the peer.
1017 ObservedAddr {
1018 /// Path over which the observed address was reported, [`PathId::ZERO`] when multipath is
1019 /// not negotiated
1020 id: PathId,
1021 /// The address observed by the remote over this path
1022 addr: SocketAddr,
1023 },
1024}
1025
1026/// Reason for why a path was abandoned.
1027#[derive(Debug, Clone, Eq, PartialEq)]
1028pub enum PathAbandonReason {
1029 /// The path was closed locally by the application.
1030 ApplicationClosed {
1031 /// The error code to be sent with the abandon frame.
1032 error_code: VarInt,
1033 },
1034 /// We didn't receive a path response in time after opening this path.
1035 ValidationFailed,
1036 /// We didn't receive any data from the remote within the path's idle timeout.
1037 TimedOut,
1038 /// The path became unusable after a local network change.
1039 UnusableAfterNetworkChange,
1040 /// The path was opened in a NAT traversal round which was terminated.
1041 NatTraversalRoundEnded,
1042 /// The remote closed the path.
1043 RemoteAbandoned {
1044 /// The error that was sent with the abandon frame.
1045 error_code: VarInt,
1046 },
1047}
1048
1049impl PathAbandonReason {
1050 /// Returns `true` if the closing of this path was initiated locally.
1051 pub(crate) fn is_locally_initiated(&self) -> bool {
1052 !matches!(self, Self::RemoteAbandoned { .. })
1053 }
1054
1055 /// Returns the error code to send with a PATH_ABANDON frame.
1056 pub(crate) fn error_code(&self) -> TransportErrorCode {
1057 match self {
1058 Self::ApplicationClosed { error_code } => (*error_code).into(),
1059 Self::NatTraversalRoundEnded => TransportErrorCode::APPLICATION_ABANDON_PATH,
1060 Self::ValidationFailed | Self::TimedOut | Self::UnusableAfterNetworkChange => {
1061 TransportErrorCode::PATH_UNSTABLE_OR_POOR
1062 }
1063 Self::RemoteAbandoned { error_code } => (*error_code).into(),
1064 }
1065 }
1066}
1067
1068/// Error from setting path status
1069#[derive(Debug, Error, Clone, PartialEq, Eq)]
1070pub enum SetPathStatusError {
1071 /// Error indicating that a path has not been opened or has already been abandoned
1072 #[error("closed path")]
1073 ClosedPath,
1074 /// Error indicating that this operation requires multipath to be negotiated whereas it hasn't been
1075 #[error("multipath not negotiated")]
1076 MultipathNotNegotiated,
1077}
1078
1079/// Error indicating that a path has not been opened or has already been abandoned
1080#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
1081#[error("closed path")]
1082pub struct ClosedPath {
1083 pub(super) _private: (),
1084}
1085
1086#[cfg(test)]
1087mod tests {
1088 use super::*;
1089
1090 #[test]
1091 fn test_path_id_saturating_add() {
1092 // add within range behaves normally
1093 let large: PathId = u16::MAX.into();
1094 let next = u32::from(u16::MAX) + 1;
1095 assert_eq!(large.saturating_add(1u8), PathId::from(next));
1096
1097 // outside range saturates
1098 assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
1099 }
1100}