noq_proto/connection/paths.rs
1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8 PathStats, SpaceKind,
9 mtud::MtuDiscovery,
10 pacing::Pacer,
11 spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14 ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig,
15 TransportErrorCode, VarInt,
16 coding::{self, Decodable, Encodable},
17 congestion,
18 connection::{MAX_BACKOFF_EXPONENT, MAX_PTO_INTERVAL},
19 frame::ObservedAddr,
20};
21
22#[cfg(feature = "qlog")]
23use qlog::events::quic::RecoveryMetricsUpdated;
24
25/// Id representing different paths when using multipath extension
26#[cfg_attr(test, derive(test_strategy::Arbitrary))]
27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
28pub struct PathId(pub(crate) u32);
29
30impl std::hash::Hash for PathId {
31 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
32 state.write_u32(self.0);
33 }
34}
35
36impl Decodable for PathId {
37 fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
38 let v = VarInt::decode(r)?;
39 let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
40 Ok(Self(v))
41 }
42}
43
44impl Encodable for PathId {
45 fn encode<B: bytes::BufMut>(&self, w: &mut B) {
46 VarInt(self.0.into()).encode(w)
47 }
48}
49
50impl PathId {
51 /// The maximum path ID allowed.
52 pub const MAX: Self = Self(u32::MAX);
53
54 /// The 0 path id.
55 pub const ZERO: Self = Self(0);
56
57 /// The number of bytes this [`PathId`] uses when encoded as a [`VarInt`]
58 pub(crate) const fn size(&self) -> usize {
59 VarInt(self.0 as u64).size()
60 }
61
62 /// Saturating integer addition. Computes self + rhs, saturating at the numeric bounds instead
63 /// of overflowing.
64 pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
65 let rhs = rhs.into();
66 let inner = self.0.saturating_add(rhs.0);
67 Self(inner)
68 }
69
70 /// Saturating integer subtraction. Computes self - rhs, saturating at the numeric bounds
71 /// instead of overflowing.
72 pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
73 let rhs = rhs.into();
74 let inner = self.0.saturating_sub(rhs.0);
75 Self(inner)
76 }
77
78 /// Get the next [`PathId`]
79 pub(crate) fn next(&self) -> Self {
80 self.saturating_add(Self(1))
81 }
82
83 /// Get the underlying u32
84 pub(crate) fn as_u32(&self) -> u32 {
85 self.0
86 }
87}
88
89impl std::fmt::Display for PathId {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 self.0.fmt(f)
92 }
93}
94
95impl<T: Into<u32>> From<T> for PathId {
96 fn from(source: T) -> Self {
97 Self(source.into())
98 }
99}
100
101/// State needed for a single path ID.
102///
103/// A single path ID can migrate according to the rules in RFC9000 §9, either voluntary or
104/// involuntary. We need to keep the [`PathData`] of the previously used such path available
105/// in order to defend against migration attacks (see RFC9000 §9.3.1, §9.3.2 and §9.3.3) as
106/// well as to support path probing (RFC9000 §9.1).
107#[derive(Debug)]
108pub(super) struct PathState {
109 pub(super) data: PathData,
110 pub(super) prev: Option<(ConnectionId, PathData)>,
111}
112
113impl PathState {
114 /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
115 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
116 // Visit known paths from newest to oldest to find the one `pn` was sent on
117 for path_data in [&mut self.data]
118 .into_iter()
119 .chain(self.prev.as_mut().map(|(_, data)| data))
120 {
121 if path_data.remove_in_flight(packet) {
122 return;
123 }
124 }
125 }
126}
127
128#[derive(Debug)]
129pub(super) struct SentChallengeInfo {
130 /// When was the challenge sent on the wire.
131 pub(super) sent_instant: Instant,
132 /// The 4-tuple on which this path challenge was sent.
133 pub(super) network_path: FourTuple,
134}
135
136/// State of particular network path 4-tuple within a [`PacketNumberSpace`].
137///
138/// With QUIC-Multipath a path is identified by a [`PathId`] and it is possible to have
139/// multiple paths on the same 4-tuple. Furthermore a single QUIC-Multipath path can migrate
140/// to a different 4-tuple, in a similar manner as an RFC9000 connection can use "path
141/// migration" to move to a different 4-tuple. There are thus two states we keep for paths:
142///
143/// - [`PacketNumberSpace`]: The state for a single packet number space, i.e. [`PathId`],
144/// which remains in place across path migrations to different 4-tuples.
145///
146/// This is stored in [`PacketSpace::number_spaces`] indexed on [`PathId`].
147///
148/// - [`PathData`]: The state we keep for each unique 4-tuple within a space. Of note is
149/// that a single [`PathData`] can never belong to a different [`PacketNumberSpace`].
150///
151/// This is stored in [`Connection::paths`] indexed by the current [`PathId`] for which
152/// space it exists. Either as the primary 4-tuple or as the previous 4-tuple just after a
153/// migration.
154///
155/// It follows that there might be several [`PathData`] structs for the same 4-tuple if
156/// several spaces are sharing the same 4-tuple. Note that during the handshake, the
157/// Initial, Handshake and Data spaces for [`PathId::ZERO`] all share the same [`PathData`].
158///
159/// [`PacketSpace::number_spaces`]: super::spaces::PacketSpace::number_spaces
160/// [`Connection::paths`]: super::Connection::paths
161#[derive(Debug)]
162pub(super) struct PathData {
163 pub(super) network_path: FourTuple,
164 pub(super) rtt: RttEstimator,
165 /// Whether we're enabling ECN on outgoing packets
166 pub(super) sending_ecn: bool,
167 /// Congestion controller state
168 pub(super) congestion: Box<dyn congestion::Controller>,
169 /// Pacing state
170 pub(super) pacing: Pacer,
171 /// Whether the last `poll_transmit_on_path` call yielded no data because there was
172 /// no outgoing application data.
173 ///
174 /// The RFC writes:
175 /// > When bytes in flight is smaller than the congestion window and sending is not pacing limited,
176 /// > the congestion window is underutilized. This can happen due to insufficient application data
177 /// > or flow control limits. When this occurs, the congestion window SHOULD NOT be increased in
178 /// > either slow start or congestion avoidance.
179 ///
180 /// (RFC9002, section 7.8)
181 ///
182 /// I.e. when app_limited is true, the congestion controller doesn't increase the congestion window.
183 pub(super) app_limited: bool,
184
185 /// Path challenges sent (on the wire, on-path) that we didn't receive a path response for yet
186 on_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
187 /// Whether to trigger sending another PATH_CHALLENGE in the next poll_transmit.
188 ///
189 /// This is picked up by [`super::Connection::space_can_send`].
190 ///
191 /// Only used for RFC9000-style path migration and multipath path validation (for opening).
192 ///
193 /// This is **not used** for n0 nat traversal challenge sending.
194 pub(super) pending_on_path_challenge: bool,
195 /// How often we've deemed a path challenge to be lost.
196 ///
197 /// Similar to [`Self::pto_count`], but for on-path path challenges.
198 /// Used to calculate exponential backoff for retrying path challenges.
199 pub(super) on_path_challenges_lost: u32,
200 /// Pending responses to PATH_CHALLENGE frames
201 pub(super) path_responses: PathResponses,
202 /// Whether we're certain the peer can both send and receive on this address
203 ///
204 /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
205 /// migration. Always true for clients.
206 pub(super) validated: bool,
207 /// Total size of all UDP datagrams sent on this path
208 pub(super) total_sent: u64,
209 /// Total size of all UDP datagrams received on this path
210 pub(super) total_recvd: u64,
211 /// The state of the MTU discovery process
212 pub(super) mtud: MtuDiscovery,
213 /// Packet number of the first packet sent after an RTT sample was collected on this path
214 ///
215 /// Used in persistent congestion determination.
216 pub(super) first_packet_after_rtt_sample: Option<(SpaceKind, u64)>,
217 /// The in-flight packets and bytes
218 ///
219 /// Note that this is across all spaces on this path
220 pub(super) in_flight: InFlight,
221 /// Whether this path has had it's remote address reported back to the peer. This only happens
222 /// if both peers agree to so based on their transport parameters.
223 pub(super) observed_addr_sent: bool,
224 /// Observed address frame with the largest sequence number received from the peer on this path.
225 pub(super) last_observed_addr_report: Option<ObservedAddr>,
226 /// The QUIC-MULTIPATH path status
227 pub(super) status: PathStatusState,
228 /// Number of the first packet sent on this path
229 ///
230 /// With RFC9000 §9 style migration (i.e. not multipath) the PathId does not change and
231 /// hence packet numbers continue. This is used to determine whether a packet was sent
232 /// on such an earlier path. Insufficient to determine if a packet was sent on a later
233 /// path.
234 first_packet: Option<u64>,
235 /// The number of times a tail-loss probe has been sent without receiving an ack.
236 ///
237 /// This is incremented by one every time the [`LossDetection`] timer fires because a
238 /// tail-loss probe needs to be sent. Once an acknowledgement for a packet is received
239 /// again it is reset to 0. Used to compute the PTO duration.
240 ///
241 /// [`LossDetection`]: super::timer::PathTimer::LossDetection
242 pub(super) pto_count: u32,
243
244 //
245 // Per-path idle & keep alive
246 //
247 /// Idle timeout for the path
248 ///
249 /// If expired, the path will be abandoned. This is different from the connection-wide
250 /// idle timeout which closes the connection if expired.
251 pub(super) idle_timeout: Option<Duration>,
252 /// Keep alives to send on this path
253 ///
254 /// There is also a connection-level keep alive configured in the
255 /// [`TransportParameters`]. This triggers activity on any path which can keep the
256 /// connection alive.
257 ///
258 /// [`TransportParameters`]: crate::transport_parameters::TransportParameters
259 pub(super) keep_alive: Option<Duration>,
260 /// Whether to reset the idle timer when the next ack-eliciting packet is sent.
261 ///
262 /// Whenever we receive an authenticated packet the connection and path idle timers are
263 /// reset if a maximum idle timeout was negotiated. However on the first ack-eliciting
264 /// packet *sent* after this the idle timer also needs to be reset to avoid the idle
265 /// timer firing while the sent packet is in-fight. See
266 /// <https://www.rfc-editor.org/rfc/rfc9000.html#section-10.1>.
267 pub(super) permit_idle_reset: bool,
268
269 /// Whether the path has already been considered opened from an application perspective.
270 ///
271 /// This means, for paths other than the original [`PathId::ZERO`], a first path challenge has
272 /// been responded to, regardless of the initial validation status of the path. This state is
273 /// irreversible, since it's not affected by the path being closed.
274 ///
275 /// Sending a PATH_CHALLENGE and receiving a valid response before the application is informed
276 /// of the path, is a way to ensure the path is usable before it is reported. This is not
277 /// required by the spec, and in the future might be changed for simply requiring a first ack'd
278 /// packet.
279 pub(super) open_status: OpenStatus,
280
281 /// Whether we're currently draining the path after having abandoned it.
282 ///
283 /// This should only be true when a path discard timer is armed, and after the path was
284 /// abandoned (and added to the abandoned_paths set).
285 ///
286 /// This will only ever be set from false to true.
287 pub(super) draining: bool,
288
289 /// Snapshot of the qlog recovery metrics
290 #[cfg(feature = "qlog")]
291 recovery_metrics: RecoveryMetrics,
292
293 /// Tag uniquely identifying a path in a connection.
294 ///
295 /// When a migration happens on the same [`PathId`] we still detect a change in the
296 /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
297 /// value to keep track of which 4-tuple a packet belonged to.
298 generation: u64,
299}
300
301impl PathData {
302 pub(super) fn new(
303 network_path: FourTuple,
304 allow_mtud: bool,
305 peer_max_udp_payload_size: Option<u16>,
306 generation: u64,
307 now: Instant,
308 config: &TransportConfig,
309 ) -> Self {
310 let congestion = config
311 .congestion_controller_factory
312 .clone()
313 .build(now, config.get_initial_mtu());
314 Self {
315 network_path,
316 rtt: RttEstimator::new(config.initial_rtt),
317 sending_ecn: true,
318 pacing: Pacer::new(
319 config.initial_rtt,
320 congestion.initial_window(),
321 config.get_initial_mtu(),
322 config.max_outgoing_bytes_per_second,
323 now,
324 ),
325 congestion,
326 app_limited: false,
327 on_path_challenges_unconfirmed: Default::default(),
328 on_path_challenges_lost: 0,
329 pending_on_path_challenge: false,
330 path_responses: PathResponses::default(),
331 validated: false,
332 total_sent: 0,
333 total_recvd: 0,
334 mtud: config
335 .mtu_discovery_config
336 .as_ref()
337 .filter(|_| allow_mtud)
338 .map_or_else(
339 || MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
340 |mtud_config| {
341 MtuDiscovery::new(
342 config.get_initial_mtu(),
343 config.min_mtu,
344 peer_max_udp_payload_size,
345 mtud_config.clone(),
346 )
347 },
348 ),
349 first_packet_after_rtt_sample: None,
350 in_flight: InFlight::new(),
351 observed_addr_sent: false,
352 last_observed_addr_report: None,
353 status: Default::default(),
354 first_packet: None,
355 pto_count: 0,
356 idle_timeout: config.default_path_max_idle_timeout,
357 keep_alive: config.default_path_keep_alive_interval,
358 permit_idle_reset: true,
359 open_status: OpenStatus::default(),
360 draining: false,
361 #[cfg(feature = "qlog")]
362 recovery_metrics: RecoveryMetrics::default(),
363 generation,
364 }
365 }
366
367 /// Create a new path from a previous one.
368 ///
369 /// This should only be called when migrating paths.
370 pub(super) fn from_previous(
371 network_path: FourTuple,
372 prev: &Self,
373 generation: u64,
374 now: Instant,
375 ) -> Self {
376 let congestion = prev.congestion.clone_box();
377 let smoothed_rtt = prev.rtt.get();
378 Self {
379 network_path,
380 rtt: prev.rtt,
381 pacing: Pacer::new(
382 smoothed_rtt,
383 congestion.window(),
384 prev.current_mtu(),
385 prev.pacing.max_bytes_per_second(),
386 now,
387 ),
388 sending_ecn: true,
389 congestion,
390 app_limited: false,
391 on_path_challenges_unconfirmed: Default::default(),
392 on_path_challenges_lost: 0,
393 pending_on_path_challenge: false,
394 path_responses: PathResponses::default(),
395 validated: false,
396 total_sent: 0,
397 total_recvd: 0,
398 mtud: prev.mtud.clone(),
399 first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
400 in_flight: InFlight::new(),
401 observed_addr_sent: false,
402 last_observed_addr_report: None,
403 status: prev.status.clone(),
404 first_packet: None,
405 pto_count: 0,
406 idle_timeout: prev.idle_timeout,
407 keep_alive: prev.keep_alive,
408 permit_idle_reset: true,
409 open_status: OpenStatus::default(),
410 draining: false,
411 #[cfg(feature = "qlog")]
412 recovery_metrics: prev.recovery_metrics.clone(),
413 generation,
414 }
415 }
416
417 /// Whether we're in the process of validating this path with PATH_CHALLENGEs
418 pub(super) fn is_validating_path(&self) -> bool {
419 !self.on_path_challenges_unconfirmed.is_empty() || self.pending_on_path_challenge
420 }
421
422 /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
423 /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
424 pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
425 !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
426 }
427
428 /// Returns the path's current MTU
429 pub(super) fn current_mtu(&self) -> u16 {
430 self.mtud.current_mtu()
431 }
432
433 /// Account for transmission of `packet` with number `pn` in `space`
434 pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
435 self.in_flight.insert(&packet);
436 if self.first_packet.is_none() {
437 self.first_packet = Some(pn);
438 }
439 if let Some(forgotten) = space.sent(pn, packet) {
440 self.remove_in_flight(&forgotten);
441 }
442 }
443
444 pub(super) fn record_path_challenge_sent(
445 &mut self,
446 now: Instant,
447 token: u64,
448 network_path: FourTuple,
449 ) {
450 let info = SentChallengeInfo {
451 sent_instant: now,
452 network_path,
453 };
454 debug_assert_eq!(network_path, self.network_path);
455 self.on_path_challenges_unconfirmed.insert(token, info);
456 }
457
458 /// Remove `packet` with number `pn` from this path's congestion control counters, or return
459 /// `false` if `pn` was sent before this path was established.
460 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
461 if packet.path_generation != self.generation {
462 return false;
463 }
464 self.in_flight.remove(packet);
465 true
466 }
467
468 /// Increment the total size of sent UDP datagrams
469 pub(super) fn inc_total_sent(&mut self, inc: u64) {
470 self.total_sent = self.total_sent.saturating_add(inc);
471 if !self.validated {
472 trace!(
473 network_path = %self.network_path,
474 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
475 "anti amplification budget decreased"
476 );
477 }
478 }
479
480 /// Increment the total size of received UDP datagrams
481 pub(super) fn inc_total_recvd(&mut self, inc: u64) {
482 self.total_recvd = self.total_recvd.saturating_add(inc);
483 if !self.validated {
484 trace!(
485 network_path = %self.network_path,
486 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
487 "anti amplification budget increased"
488 );
489 }
490 }
491
492 /// The earliest time at which an on-path challenge we sent is considered lost.
493 pub(super) fn earliest_on_path_expiring_challenge(&self) -> Option<Instant> {
494 if self.on_path_challenges_unconfirmed.is_empty() {
495 return None;
496 }
497 let duration = self.on_path_challenge_expiry();
498 self.on_path_challenges_unconfirmed
499 .values()
500 .map(|info| info.sent_instant + duration)
501 .min()
502 }
503
504 pub(super) fn on_path_challenge_expiry(&self) -> Duration {
505 let backoff = 2u32.pow(self.on_path_challenges_lost.min(MAX_BACKOFF_EXPONENT));
506 let duration = self.rtt.pto_base() * backoff;
507 duration.min(MAX_PTO_INTERVAL)
508 }
509
510 /// Handle receiving a PATH_RESPONSE.
511 pub(super) fn on_path_response_received(
512 &mut self,
513 now: Instant,
514 token: u64,
515 network_path: FourTuple,
516 ) -> OnPathResponseReceived {
517 // > § 8.2.3
518 // > Path validation succeeds when a PATH_RESPONSE frame is received that contains the
519 // > data that was sent in a previous PATH_CHALLENGE frame. A PATH_RESPONSE frame
520 // > received on any network path validates the path on which the PATH_CHALLENGE was
521 // > sent.
522 //
523 // At this point we have three potentially different network paths:
524 // - current network path (`Self::network_path`)
525 // - network path used to send the path challenge (`SentChallengeInfo::network_path`)
526 // - network path over which the response arrived (`network_path`)
527 //
528 // As per the spec, this only validates the network path on which this was *sent*.
529 match self.on_path_challenges_unconfirmed.remove(&token) {
530 // Response to an on-path PathChallenge that validates this path.
531 // The sent path should match the current path. However, it's possible that the
532 // challenge was sent when no local_ip was known. This case is allowed as well.
533 Some(info) if info.network_path.is_probably_same_path(&self.network_path) => {
534 self.network_path.update_local_if_same_remote(&network_path);
535 let sent_instant = info.sent_instant;
536 if !std::mem::replace(&mut self.validated, true) {
537 trace!("new path validated");
538 }
539 // Clear any other on-path sent challenges and stop sending new ones.
540 self.reset_on_path_challenges();
541
542 // This RTT can only be used for the initial RTT, not as a normal
543 // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
544 let rtt = now.saturating_duration_since(sent_instant);
545 self.rtt.reset_initial_rtt(rtt);
546
547 let prev_status = std::mem::replace(&mut self.open_status, OpenStatus::Informed);
548 OnPathResponseReceived::OnPath {
549 was_open: matches!(prev_status, OpenStatus::Informed),
550 }
551 }
552 // Response to an on-path PathChallenge that does not validate this path.
553 Some(info) => {
554 // This is a valid path response, but this validates a 4-tuple we no longer
555 // have in use. Keep only sent challenges for the current path.
556 self.on_path_challenges_unconfirmed
557 .retain(|_token, i| i.network_path == self.network_path);
558
559 // If there are no challenges for the current path, schedule one
560 if !self.on_path_challenges_unconfirmed.is_empty() {
561 self.pending_on_path_challenge = true;
562 }
563 OnPathResponseReceived::Ignored {
564 sent_on: info.network_path,
565 current_path: self.network_path,
566 }
567 }
568 None => {
569 // Response to an unknown PathChallenge. Does not indicate failure.
570 OnPathResponseReceived::Unknown
571 }
572 }
573 }
574
575 /// Removes all on-path challenges we remember and cancels sending new on-path challenges.
576 pub(super) fn reset_on_path_challenges(&mut self) {
577 self.on_path_challenges_unconfirmed.clear();
578 self.pending_on_path_challenge = false;
579 self.on_path_challenges_lost = 0;
580 }
581
582 #[cfg(feature = "qlog")]
583 pub(super) fn qlog_recovery_metrics(
584 &mut self,
585 path_id: PathId,
586 ) -> Option<RecoveryMetricsUpdated> {
587 let controller_metrics = self.congestion.metrics();
588
589 let metrics = RecoveryMetrics {
590 min_rtt: Some(self.rtt.min),
591 smoothed_rtt: Some(self.rtt.get()),
592 latest_rtt: Some(self.rtt.latest),
593 rtt_variance: Some(self.rtt.var),
594 pto_count: Some(self.pto_count),
595 bytes_in_flight: Some(self.in_flight.bytes),
596 packets_in_flight: Some(self.in_flight.ack_eliciting),
597
598 congestion_window: Some(controller_metrics.congestion_window),
599 ssthresh: controller_metrics.ssthresh,
600 pacing_rate: controller_metrics.pacing_rate,
601 };
602
603 let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
604 self.recovery_metrics = metrics;
605 event
606 }
607
608 /// Return how long we need to wait before sending `bytes_to_send`
609 ///
610 /// See [`Pacer::delay`].
611 pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Duration> {
612 let smoothed_rtt = self.rtt.get();
613 let metrics = self.congestion.metrics();
614 self.pacing.delay(
615 smoothed_rtt,
616 bytes_to_send,
617 self.current_mtu(),
618 metrics.congestion_window,
619 now,
620 metrics.send_quantum,
621 metrics.pacing_rate,
622 )
623 }
624
625 /// Updates the last observed address report received on this path.
626 ///
627 /// If the address was updated, it's returned to be informed to the application.
628 #[must_use = "updated observed address must be reported to the application"]
629 pub(super) fn update_observed_addr_report(
630 &mut self,
631 observed: ObservedAddr,
632 ) -> Option<SocketAddr> {
633 match self.last_observed_addr_report.as_mut() {
634 Some(prev) => {
635 if prev.seq_no >= observed.seq_no {
636 // frames that do not increase the sequence number on this path are ignored
637 None
638 } else if prev.ip == observed.ip && prev.port == observed.port {
639 // keep track of the last seq_no but do not report the address as updated
640 prev.seq_no = observed.seq_no;
641 None
642 } else {
643 let addr = observed.socket_addr();
644 self.last_observed_addr_report = Some(observed);
645 Some(addr)
646 }
647 }
648 None => {
649 let addr = observed.socket_addr();
650 self.last_observed_addr_report = Some(observed);
651 Some(addr)
652 }
653 }
654 }
655
656 pub(crate) fn remote_status(&self) -> Option<PathStatus> {
657 self.status.remote_status.map(|(_seq, status)| status)
658 }
659
660 pub(crate) fn local_status(&self) -> PathStatus {
661 self.status.local_status
662 }
663
664 /// Tag uniquely identifying a path in a connection.
665 ///
666 /// When a migration happens on the same [`PathId`] we still detect a change in the
667 /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
668 /// value to keep track of which 4-tuple a packet belonged to.
669 pub(super) fn generation(&self) -> u64 {
670 self.generation
671 }
672}
673
674pub(super) enum OnPathResponseReceived {
675 /// This response validates the path on its current remote address.
676 OnPath { was_open: bool },
677 /// The received token is unknown.
678 Unknown,
679 /// The response is valid but it's not usable for path validation.
680 Ignored {
681 sent_on: FourTuple,
682 current_path: FourTuple,
683 },
684}
685
686#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
687pub(super) enum OpenStatus {
688 /// A first packet has not been sent using this [`PathId`].
689 #[default]
690 Pending,
691 /// The first packet has been sent using this [`PathId`]. However, it is not yet deemed good
692 /// enough to be reported to the application.
693 Sent,
694 /// The application has been informed of this path.
695 Informed,
696}
697
698/// Congestion metrics as described in [`recovery_metrics_updated`].
699///
700/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
701#[cfg(feature = "qlog")]
702#[derive(Default, Clone, PartialEq, Debug)]
703#[non_exhaustive]
704struct RecoveryMetrics {
705 pub min_rtt: Option<Duration>,
706 pub smoothed_rtt: Option<Duration>,
707 pub latest_rtt: Option<Duration>,
708 pub rtt_variance: Option<Duration>,
709 pub pto_count: Option<u32>,
710 pub bytes_in_flight: Option<u64>,
711 pub packets_in_flight: Option<u64>,
712 pub congestion_window: Option<u64>,
713 pub ssthresh: Option<u64>,
714 pub pacing_rate: Option<u64>,
715}
716
717#[cfg(feature = "qlog")]
718impl RecoveryMetrics {
719 /// Retain only values that have been updated since the last snapshot.
720 fn retain_updated(&self, previous: &Self) -> Self {
721 macro_rules! keep_if_changed {
722 ($name:ident) => {
723 if previous.$name == self.$name {
724 None
725 } else {
726 self.$name
727 }
728 };
729 }
730
731 Self {
732 min_rtt: keep_if_changed!(min_rtt),
733 smoothed_rtt: keep_if_changed!(smoothed_rtt),
734 latest_rtt: keep_if_changed!(latest_rtt),
735 rtt_variance: keep_if_changed!(rtt_variance),
736 pto_count: keep_if_changed!(pto_count),
737 bytes_in_flight: keep_if_changed!(bytes_in_flight),
738 packets_in_flight: keep_if_changed!(packets_in_flight),
739 congestion_window: keep_if_changed!(congestion_window),
740 ssthresh: keep_if_changed!(ssthresh),
741 pacing_rate: keep_if_changed!(pacing_rate),
742 }
743 }
744
745 /// Emit a `MetricsUpdated` event containing only updated values
746 fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
747 let updated = self.retain_updated(previous);
748
749 if updated == Self::default() {
750 return None;
751 }
752
753 Some(RecoveryMetricsUpdated {
754 min_rtt: updated.min_rtt.map(|rtt| rtt.as_micros() as f32 / 1000.0),
755 smoothed_rtt: updated
756 .smoothed_rtt
757 .map(|rtt| rtt.as_micros() as f32 / 1000.0),
758 latest_rtt: updated
759 .latest_rtt
760 .map(|rtt| rtt.as_micros() as f32 / 1000.0),
761 rtt_variance: updated
762 .rtt_variance
763 .map(|rtt| rtt.as_micros() as f32 / 1000.0),
764 pto_count: updated
765 .pto_count
766 .map(|count| count.try_into().unwrap_or(u16::MAX)),
767 bytes_in_flight: updated.bytes_in_flight,
768 packets_in_flight: updated.packets_in_flight,
769 congestion_window: updated.congestion_window,
770 ssthresh: updated.ssthresh,
771 pacing_rate: updated.pacing_rate,
772 path_id: Some(path_id.as_u32() as u64),
773 })
774 }
775}
776
777/// RTT estimation for a particular network path
778#[derive(Copy, Clone, Debug)]
779pub struct RttEstimator {
780 /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
781 latest: Duration,
782 /// The smoothed RTT of the connection, computed as described in RFC6298
783 smoothed: Option<Duration>,
784 /// The RTT variance, computed as described in RFC6298
785 var: Duration,
786 /// The minimum RTT seen in the connection, ignoring ack delay.
787 min: Duration,
788}
789
790impl RttEstimator {
791 pub(super) fn new(initial_rtt: Duration) -> Self {
792 Self {
793 latest: initial_rtt,
794 smoothed: None,
795 var: initial_rtt / 2,
796 min: initial_rtt,
797 }
798 }
799
800 /// Resets the estimator using a new initial_rtt value.
801 ///
802 /// This only resets the initial_rtt **if** no samples have been recorded yet. If there
803 /// are any recorded samples the initial estimate can not be adjusted after the fact.
804 ///
805 /// This is useful when you receive a PATH_RESPONSE in the first packet received on a
806 /// new path. In this case you can use the delay of the PATH_CHALLENGE-PATH_RESPONSE as
807 /// the initial RTT to get a better expected estimation.
808 ///
809 /// A PATH_CHALLENGE-PATH_RESPONSE pair later in the connection should not be used
810 /// explicitly as an estimation since PATH_CHALLENGE is an ACK-eliciting packet itself
811 /// already.
812 pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
813 if self.smoothed.is_none() {
814 self.latest = initial_rtt;
815 self.var = initial_rtt / 2;
816 self.min = initial_rtt;
817 }
818 }
819
820 /// The current best RTT estimation.
821 pub fn get(&self) -> Duration {
822 self.smoothed.unwrap_or(self.latest)
823 }
824
825 /// Conservative estimate of RTT
826 ///
827 /// Takes the maximum of smoothed and latest RTT, as recommended
828 /// in 6.1.2 of the recovery spec (draft 29).
829 pub fn conservative(&self) -> Duration {
830 self.get().max(self.latest)
831 }
832
833 /// Minimum RTT registered so far for this estimator.
834 pub fn min(&self) -> Duration {
835 self.min
836 }
837
838 /// PTO computed as described in RFC9002#6.2.1.
839 pub(crate) fn pto_base(&self) -> Duration {
840 self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
841 }
842
843 /// Records an RTT sample.
844 pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
845 self.latest = rtt;
846 // https://www.rfc-editor.org/rfc/rfc9002.html#section-5.2-3:
847 // min_rtt does not adjust for ack_delay to avoid underestimating.
848 self.min = cmp::min(self.min, self.latest);
849 // Based on RFC6298.
850 if let Some(smoothed) = self.smoothed {
851 let adjusted_rtt = if self.min + ack_delay <= self.latest {
852 self.latest - ack_delay
853 } else {
854 self.latest
855 };
856 let var_sample = smoothed.abs_diff(adjusted_rtt);
857 self.var = (3 * self.var + var_sample) / 4;
858 self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
859 } else {
860 self.smoothed = Some(self.latest);
861 self.var = self.latest / 2;
862 self.min = self.latest;
863 }
864 }
865}
866
867#[derive(Default, Debug)]
868pub(crate) struct PathResponses {
869 pending: Vec<PathResponse>,
870}
871
872impl PathResponses {
873 pub(crate) fn push(&mut self, packet: u64, token: u64, network_path: FourTuple) {
874 /// An arbitrary permissive limit to prevent abuse.
875 ///
876 /// If we've negotiated the n0 NAT Traversal extension, and one user might have a lot
877 /// of addresses, e.g. because of having lots of interfaces (we've seen >25 interfaces
878 /// on Macs with docker and other things), then we need to be able to process at least
879 /// as many PATH_CHALLENGE frames as there are interfaces.
880 /// On top of that, there are retries, which make it possible that we need to process
881 /// even more.
882 ///
883 /// Considering that there can be up to 2 `PathData`s per active `PathId`, and
884 /// reasonable default values for maximum concurrent multipath paths are ~8 and each
885 /// `PathResponse` struct takes up 72 bytes at the moment this, means an attacker can
886 /// cause us to keep `32 * 2 * 8 * 72 = ~37KB` of data around.
887 const MAX_PATH_RESPONSES: usize = 32;
888 let response = PathResponse {
889 packet,
890 token,
891 network_path,
892 };
893 let existing = self
894 .pending
895 .iter_mut()
896 .find(|x| x.network_path.remote == network_path.remote);
897 if let Some(existing) = existing {
898 // Update a queued response
899 if existing.packet <= packet {
900 *existing = response;
901 }
902 return;
903 }
904 if self.pending.len() < MAX_PATH_RESPONSES {
905 self.pending.push(response);
906 } else {
907 // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
908 // older challenges.
909 trace!("ignoring excessive PATH_CHALLENGE");
910 }
911 }
912
913 pub(crate) fn pop_off_path(&mut self, network_path: FourTuple) -> Option<(u64, FourTuple)> {
914 let response = *self.pending.last()?;
915 // We use an exact comparison here, because once we've received for the first time,
916 // we really should either already have a local_ip, or we will never get one
917 // (because our OS doesn't support it).
918 if response.network_path == network_path {
919 // We don't bother searching further because we expect that the on-path response will
920 // get drained in the immediate future by a call to `pop_on_path`
921 return None;
922 }
923 self.pending.pop();
924 Some((response.token, response.network_path))
925 }
926
927 pub(crate) fn pop_on_path(&mut self, network_path: FourTuple) -> Option<u64> {
928 let response = *self.pending.last()?;
929 // Using an exact comparison. See explanation in `pop_off_path`.
930 if response.network_path != network_path {
931 // We don't bother searching further because we expect that the off-path response will
932 // get drained in the immediate future by a call to `pop_off_path`
933 return None;
934 }
935 self.pending.pop();
936 Some(response.token)
937 }
938
939 pub(crate) fn is_empty(&self) -> bool {
940 self.pending.is_empty()
941 }
942}
943
944#[derive(Copy, Clone, Debug)]
945struct PathResponse {
946 /// The packet number the corresponding PATH_CHALLENGE was received in
947 packet: u64,
948 /// The token of the PATH_CHALLENGE
949 token: u64,
950 /// The path the corresponding PATH_CHALLENGE was received from
951 network_path: FourTuple,
952}
953
954/// Summary statistics of packets that have been sent on a particular path, but which have not yet
955/// been acked or deemed lost
956#[derive(Debug)]
957pub(super) struct InFlight {
958 /// Sum of the sizes of all sent packets considered "in flight" by congestion control
959 ///
960 /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
961 /// count towards this to ensure congestion control does not impede congestion feedback.
962 pub(super) bytes: u64,
963 /// Number of packets in flight containing frames other than ACK and PADDING
964 ///
965 /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
966 /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
967 /// also be nonzero.
968 pub(super) ack_eliciting: u64,
969}
970
971impl InFlight {
972 fn new() -> Self {
973 Self {
974 bytes: 0,
975 ack_eliciting: 0,
976 }
977 }
978
979 fn insert(&mut self, packet: &SentPacket) {
980 self.bytes += u64::from(packet.size);
981 self.ack_eliciting += u64::from(packet.ack_eliciting);
982 }
983
984 /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
985 fn remove(&mut self, packet: &SentPacket) {
986 self.bytes -= u64::from(packet.size);
987 self.ack_eliciting -= u64::from(packet.ack_eliciting);
988 }
989}
990
991/// State for QUIC-MULTIPATH PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP frames
992#[derive(Debug, Clone, Default)]
993pub(super) struct PathStatusState {
994 /// The local status
995 local_status: PathStatus,
996 /// Local sequence number, for both PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP
997 ///
998 /// This is the number of the *next* path status frame to be sent.
999 local_seq: VarInt,
1000 /// The status set by the remote
1001 remote_status: Option<(VarInt, PathStatus)>,
1002}
1003
1004impl PathStatusState {
1005 /// To be called on received PATH_STATUS_AVAILABLE/PATH_STATUS_BACKUP frames
1006 pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
1007 if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
1008 return trace!(%seq, "ignoring path status update");
1009 }
1010
1011 let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
1012 if prev != Some(status) {
1013 debug!(?status, ?seq, "remote changed path status");
1014 }
1015 }
1016
1017 /// Updates the local status
1018 ///
1019 /// If the local status changed, the previous value is returned
1020 pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
1021 if self.local_status == status {
1022 return None;
1023 }
1024
1025 self.local_seq = self.local_seq.saturating_add(1u8);
1026 Some(std::mem::replace(&mut self.local_status, status))
1027 }
1028
1029 pub(crate) fn seq(&self) -> VarInt {
1030 self.local_seq
1031 }
1032}
1033
1034/// The QUIC-MULTIPATH path status
1035///
1036/// See section "3.3 Path Status Management":
1037/// <https://quicwg.org/multipath/draft-ietf-quic-multipath.html#name-path-status-management>
1038#[cfg_attr(test, derive(test_strategy::Arbitrary))]
1039#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
1040pub enum PathStatus {
1041 /// Paths marked with as available will be used when scheduling packets
1042 ///
1043 /// If multiple paths are available, packets will be scheduled on whichever has
1044 /// capacity.
1045 #[default]
1046 Available,
1047 /// Paths marked as backup will only be used if there are no available paths
1048 ///
1049 /// If the max_idle_timeout is specified the path will be kept alive so that it does not
1050 /// expire.
1051 Backup,
1052}
1053
1054/// Application events about paths
1055#[derive(Debug, Clone, PartialEq, Eq)]
1056#[non_exhaustive]
1057pub enum PathEvent {
1058 /// A new path has established connection with the peer.
1059 #[non_exhaustive]
1060 Established {
1061 /// The path which can now be used for application data.
1062 id: PathId,
1063 },
1064 /// A path was abandoned and is no longer usable.
1065 ///
1066 /// This event will always be followed by [`Self::Discarded`] after some time.
1067 #[non_exhaustive]
1068 Abandoned {
1069 /// With path was abandoned.
1070 id: PathId,
1071 /// Reason why this path was abandoned.
1072 reason: PathAbandonReason,
1073 },
1074 /// A path was discarded and all remaining state for it has been removed.
1075 ///
1076 /// This event is the last event for a path, and is always emitted after [`Self::Abandoned`].
1077 #[non_exhaustive]
1078 Discarded {
1079 /// Which path had its state dropped
1080 id: PathId,
1081 /// The final path stats, they are no longer available via [`Connection::stats`]
1082 ///
1083 /// [`Connection::stats`]: super::Connection::stats
1084 path_stats: Box<PathStats>,
1085 },
1086 /// The remote changed the status of the path
1087 ///
1088 /// The local status is not changed because of this event. It is up to the application
1089 /// to update the local status, which is used for packet scheduling, when the remote
1090 /// changes the status.
1091 #[non_exhaustive]
1092 RemoteStatus {
1093 /// Path which has changed status
1094 id: PathId,
1095 /// The new status set by the remote
1096 status: PathStatus,
1097 },
1098 /// Received an observation of our external address from the peer.
1099 #[non_exhaustive]
1100 ObservedAddr {
1101 /// Path over which the observed address was reported, [`PathId::ZERO`] when multipath is
1102 /// not negotiated
1103 id: PathId,
1104 /// The address observed by the remote over this path
1105 addr: SocketAddr,
1106 },
1107}
1108
1109/// Reason for why a path was abandoned.
1110#[derive(Debug, Clone, Eq, PartialEq)]
1111pub enum PathAbandonReason {
1112 /// The path was closed locally by the application.
1113 ApplicationClosed {
1114 /// The error code to be sent with the abandon frame.
1115 error_code: VarInt,
1116 },
1117 /// We didn't receive a path response in time after opening this path.
1118 ValidationFailed,
1119 /// We didn't receive any data from the remote within the path's idle timeout.
1120 TimedOut,
1121 /// The path became unusable after a local network change.
1122 UnusableAfterNetworkChange,
1123 /// The remote closed the path.
1124 RemoteAbandoned {
1125 /// The error that was sent with the abandon frame.
1126 error_code: VarInt,
1127 },
1128}
1129
1130impl PathAbandonReason {
1131 /// Whether this abandon was initiated by the remote peer.
1132 pub(crate) fn is_remote(&self) -> bool {
1133 matches!(self, Self::RemoteAbandoned { .. })
1134 }
1135
1136 /// Returns the error code to send with a PATH_ABANDON frame.
1137 pub(crate) fn error_code(&self) -> TransportErrorCode {
1138 match self {
1139 Self::ApplicationClosed { error_code } => (*error_code).into(),
1140 Self::ValidationFailed | Self::TimedOut | Self::UnusableAfterNetworkChange => {
1141 TransportErrorCode::PATH_UNSTABLE_OR_POOR
1142 }
1143 Self::RemoteAbandoned { error_code } => (*error_code).into(),
1144 }
1145 }
1146}
1147
1148/// Error from setting path status
1149#[derive(Debug, Error, Clone, PartialEq, Eq)]
1150pub enum SetPathStatusError {
1151 /// Error indicating that a path has not been opened or has already been abandoned
1152 #[error("closed path")]
1153 ClosedPath,
1154 /// Error indicating that this operation requires multipath to be negotiated whereas it hasn't been
1155 #[error("multipath not negotiated")]
1156 MultipathNotNegotiated,
1157}
1158
1159/// Error indicating that a path has not been opened or has already been abandoned
1160#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
1161#[error("closed path")]
1162pub struct ClosedPath {
1163 pub(super) _private: (),
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use super::*;
1169
1170 #[test]
1171 fn test_path_id_saturating_add() {
1172 // add within range behaves normally
1173 let large: PathId = u16::MAX.into();
1174 let next = u32::from(u16::MAX) + 1;
1175 assert_eq!(large.saturating_add(1u8), PathId::from(next));
1176
1177 // outside range saturates
1178 assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
1179 }
1180}