noq_proto/connection/paths.rs
1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8 PathStats, SpaceKind,
9 mtud::MtuDiscovery,
10 pacing::Pacer,
11 spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14 ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig,
15 TransportErrorCode, VarInt,
16 coding::{self, Decodable, Encodable},
17 congestion,
18 frame::ObservedAddr,
19};
20
21#[cfg(feature = "qlog")]
22use qlog::events::quic::RecoveryMetricsUpdated;
23
24/// Id representing different paths when using multipath extension
25#[cfg_attr(test, derive(test_strategy::Arbitrary))]
26#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
27pub struct PathId(pub(crate) u32);
28
29impl std::hash::Hash for PathId {
30 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
31 state.write_u32(self.0);
32 }
33}
34
35impl identity_hash::IdentityHashable for PathId {}
36
37impl Decodable for PathId {
38 fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
39 let v = VarInt::decode(r)?;
40 let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
41 Ok(Self(v))
42 }
43}
44
45impl Encodable for PathId {
46 fn encode<B: bytes::BufMut>(&self, w: &mut B) {
47 VarInt(self.0.into()).encode(w)
48 }
49}
50
51impl PathId {
52 /// The maximum path ID allowed.
53 pub const MAX: Self = Self(u32::MAX);
54
55 /// The 0 path id.
56 pub const ZERO: Self = Self(0);
57
58 /// The number of bytes this [`PathId`] uses when encoded as a [`VarInt`]
59 pub(crate) const fn size(&self) -> usize {
60 VarInt(self.0 as u64).size()
61 }
62
63 /// Saturating integer addition. Computes self + rhs, saturating at the numeric bounds instead
64 /// of overflowing.
65 pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
66 let rhs = rhs.into();
67 let inner = self.0.saturating_add(rhs.0);
68 Self(inner)
69 }
70
71 /// Saturating integer subtraction. Computes self - rhs, saturating at the numeric bounds
72 /// instead of overflowing.
73 pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
74 let rhs = rhs.into();
75 let inner = self.0.saturating_sub(rhs.0);
76 Self(inner)
77 }
78
79 /// Get the next [`PathId`]
80 pub(crate) fn next(&self) -> Self {
81 self.saturating_add(Self(1))
82 }
83
84 /// Get the underlying u32
85 pub(crate) fn as_u32(&self) -> u32 {
86 self.0
87 }
88}
89
90impl std::fmt::Display for PathId {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 self.0.fmt(f)
93 }
94}
95
96impl<T: Into<u32>> From<T> for PathId {
97 fn from(source: T) -> Self {
98 Self(source.into())
99 }
100}
101
102/// State needed for a single path ID.
103///
104/// A single path ID can migrate according to the rules in RFC9000 §9, either voluntary or
105/// involuntary. We need to keep the [`PathData`] of the previously used such path available
106/// in order to defend against migration attacks (see RFC9000 §9.3.1, §9.3.2 and §9.3.3) as
107/// well as to support path probing (RFC9000 §9.1).
108#[derive(Debug)]
109pub(super) struct PathState {
110 pub(super) data: PathData,
111 pub(super) prev: Option<(ConnectionId, PathData)>,
112}
113
114impl PathState {
115 /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
116 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
117 // Visit known paths from newest to oldest to find the one `pn` was sent on
118 for path_data in [&mut self.data]
119 .into_iter()
120 .chain(self.prev.as_mut().map(|(_, data)| data))
121 {
122 if path_data.remove_in_flight(packet) {
123 return;
124 }
125 }
126 }
127}
128
129#[derive(Debug)]
130pub(super) struct SentChallengeInfo {
131 /// When was the challenge sent on the wire.
132 pub(super) sent_instant: Instant,
133 /// The 4-tuple on which this path challenge was sent.
134 pub(super) network_path: FourTuple,
135}
136
137/// State of particular network path 4-tuple within a [`PacketNumberSpace`].
138///
139/// With QUIC-Multipath a path is identified by a [`PathId`] and it is possible to have
140/// multiple paths on the same 4-tuple. Furthermore a single QUIC-Multipath path can migrate
141/// to a different 4-tuple, in a similar manner as an RFC9000 connection can use "path
142/// migration" to move to a different 4-tuple. There are thus two states we keep for paths:
143///
144/// - [`PacketNumberSpace`]: The state for a single packet number space, i.e. [`PathId`],
145/// which remains in place across path migrations to different 4-tuples.
146///
147/// This is stored in [`PacketSpace::number_spaces`] indexed on [`PathId`].
148///
149/// - [`PathData`]: The state we keep for each unique 4-tuple within a space. Of note is
150/// that a single [`PathData`] can never belong to a different [`PacketNumberSpace`].
151///
152/// This is stored in [`Connection::paths`] indexed by the current [`PathId`] for which
153/// space it exists. Either as the primary 4-tuple or as the previous 4-tuple just after a
154/// migration.
155///
156/// It follows that there might be several [`PathData`] structs for the same 4-tuple if
157/// several spaces are sharing the same 4-tuple. Note that during the handshake, the
158/// Initial, Handshake and Data spaces for [`PathId::ZERO`] all share the same [`PathData`].
159///
160/// [`PacketSpace::number_spaces`]: super::spaces::PacketSpace::number_spaces
161/// [`Connection::paths`]: super::Connection::paths
162#[derive(Debug)]
163pub(super) struct PathData {
164 pub(super) network_path: FourTuple,
165 pub(super) rtt: RttEstimator,
166 /// Whether we're enabling ECN on outgoing packets
167 pub(super) sending_ecn: bool,
168 /// Congestion controller state
169 pub(super) congestion: Box<dyn congestion::Controller>,
170 /// Pacing state
171 pub(super) pacing: Pacer,
172 /// Whether the last `poll_transmit_on_path` call yielded no data because there was
173 /// no outgoing application data.
174 ///
175 /// The RFC writes:
176 /// > When bytes in flight is smaller than the congestion window and sending is not pacing limited,
177 /// > the congestion window is underutilized. This can happen due to insufficient application data
178 /// > or flow control limits. When this occurs, the congestion window SHOULD NOT be increased in
179 /// > either slow start or congestion avoidance.
180 ///
181 /// (RFC9002, section 7.8)
182 ///
183 /// I.e. when app_limited is true, the congestion controller doesn't increase the congestion window.
184 pub(super) app_limited: bool,
185
186 /// Path challenges sent (on the wire, on-path) that we didn't receive a path response for yet
187 on_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
188 /// Path challenges sent (on the wire, off-path) that we didn't receive a path response for yet
189 off_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
190 /// Whether to trigger sending another PATH_CHALLENGE in the next poll_transmit.
191 ///
192 /// This is picked up by [`super::Connection::space_can_send`].
193 ///
194 /// Only used for RFC9000-style path migration and multipath path validation (for opening).
195 ///
196 /// This is **not used** for n0 nat traversal challenge sending.
197 pub(super) pending_on_path_challenge: bool,
198 /// Pending responses to PATH_CHALLENGE frames
199 pub(super) path_responses: PathResponses,
200 /// Whether we're certain the peer can both send and receive on this address
201 ///
202 /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
203 /// migration. Always true for clients.
204 pub(super) validated: bool,
205 /// Total size of all UDP datagrams sent on this path
206 pub(super) total_sent: u64,
207 /// Total size of all UDP datagrams received on this path
208 pub(super) total_recvd: u64,
209 /// The state of the MTU discovery process
210 pub(super) mtud: MtuDiscovery,
211 /// Packet number of the first packet sent after an RTT sample was collected on this path
212 ///
213 /// Used in persistent congestion determination.
214 pub(super) first_packet_after_rtt_sample: Option<(SpaceKind, u64)>,
215 /// The in-flight packets and bytes
216 ///
217 /// Note that this is across all spaces on this path
218 pub(super) in_flight: InFlight,
219 /// Whether this path has had it's remote address reported back to the peer. This only happens
220 /// if both peers agree to so based on their transport parameters.
221 pub(super) observed_addr_sent: bool,
222 /// Observed address frame with the largest sequence number received from the peer on this path.
223 pub(super) last_observed_addr_report: Option<ObservedAddr>,
224 /// The QUIC-MULTIPATH path status
225 pub(super) status: PathStatusState,
226 /// Number of the first packet sent on this path
227 ///
228 /// With RFC9000 §9 style migration (i.e. not multipath) the PathId does not change and
229 /// hence packet numbers continue. This is used to determine whether a packet was sent
230 /// on such an earlier path. Insufficient to determine if a packet was sent on a later
231 /// path.
232 first_packet: Option<u64>,
233 /// The number of times a tail-loss probe has been sent without receiving an ack.
234 ///
235 /// This is incremented by one every time the [`LossDetection`] timer fires because a
236 /// tail-loss probe needs to be sent. Once an acknowledgement for a packet is received
237 /// again it is reset to 0. Used to compute the PTO duration.
238 ///
239 /// [`LossDetection`]: super::timer::PathTimer::LossDetection
240 pub(super) pto_count: u32,
241
242 //
243 // Per-path idle & keep alive
244 //
245 /// Idle timeout for the path
246 ///
247 /// If expired, the path will be abandoned. This is different from the connection-wide
248 /// idle timeout which closes the connection if expired.
249 pub(super) idle_timeout: Option<Duration>,
250 /// Keep alives to send on this path
251 ///
252 /// There is also a connection-level keep alive configured in the
253 /// [`TransportParameters`]. This triggers activity on any path which can keep the
254 /// connection alive.
255 ///
256 /// [`TransportParameters`]: crate::transport_parameters::TransportParameters
257 pub(super) keep_alive: Option<Duration>,
258 /// Whether to reset the idle timer when the next ack-eliciting packet is sent.
259 ///
260 /// Whenever we receive an authenticated packet the connection and path idle timers are
261 /// reset if a maximum idle timeout was negotiated. However on the first ack-eliciting
262 /// packet *sent* after this the idle timer also needs to be reset to avoid the idle
263 /// timer firing while the sent packet is in-fight. See
264 /// <https://www.rfc-editor.org/rfc/rfc9000.html#section-10.1>.
265 pub(super) permit_idle_reset: bool,
266
267 /// Whether the path has already been considered opened from an application perspective.
268 ///
269 /// This means, for paths other than the original [`PathId::ZERO`], a first path challenge has
270 /// been responded to, regardless of the initial validation status of the path. This state is
271 /// irreversible, since it's not affected by the path being closed.
272 ///
273 /// Sending a PATH_CHALLENGE and receiving a valid response before the application is informed
274 /// of the path, is a way to ensure the path is usable before it is reported. This is not
275 /// required by the spec, and in the future might be changed for simply requiring a first ack'd
276 /// packet.
277 pub(super) open_status: OpenStatus,
278
279 /// Whether we're currently draining the path after having abandoned it.
280 ///
281 /// This should only be true when a path discard timer is armed, and after the path was
282 /// abandoned (and added to the abandoned_paths set).
283 ///
284 /// This will only ever be set from false to true.
285 pub(super) draining: bool,
286
287 /// Snapshot of the qlog recovery metrics
288 #[cfg(feature = "qlog")]
289 recovery_metrics: RecoveryMetrics,
290
291 /// Tag uniquely identifying a path in a connection.
292 ///
293 /// When a migration happens on the same [`PathId`] we still detect a change in the
294 /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
295 /// value to keep track of which 4-tuple a packet belonged to.
296 generation: u64,
297}
298
299impl PathData {
300 pub(super) fn new(
301 network_path: FourTuple,
302 allow_mtud: bool,
303 peer_max_udp_payload_size: Option<u16>,
304 generation: u64,
305 now: Instant,
306 config: &TransportConfig,
307 ) -> Self {
308 let congestion = config
309 .congestion_controller_factory
310 .clone()
311 .build(now, config.get_initial_mtu());
312 Self {
313 network_path,
314 rtt: RttEstimator::new(config.initial_rtt),
315 sending_ecn: true,
316 pacing: Pacer::new(
317 config.initial_rtt,
318 congestion.initial_window(),
319 config.get_initial_mtu(),
320 config.max_outgoing_bytes_per_second,
321 now,
322 ),
323 congestion,
324 app_limited: false,
325 on_path_challenges_unconfirmed: Default::default(),
326 off_path_challenges_unconfirmed: Default::default(),
327 pending_on_path_challenge: false,
328 path_responses: PathResponses::default(),
329 validated: false,
330 total_sent: 0,
331 total_recvd: 0,
332 mtud: config
333 .mtu_discovery_config
334 .as_ref()
335 .filter(|_| allow_mtud)
336 .map_or_else(
337 || MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
338 |mtud_config| {
339 MtuDiscovery::new(
340 config.get_initial_mtu(),
341 config.min_mtu,
342 peer_max_udp_payload_size,
343 mtud_config.clone(),
344 )
345 },
346 ),
347 first_packet_after_rtt_sample: None,
348 in_flight: InFlight::new(),
349 observed_addr_sent: false,
350 last_observed_addr_report: None,
351 status: Default::default(),
352 first_packet: None,
353 pto_count: 0,
354 idle_timeout: config.default_path_max_idle_timeout,
355 keep_alive: config.default_path_keep_alive_interval,
356 permit_idle_reset: true,
357 open_status: OpenStatus::default(),
358 draining: false,
359 #[cfg(feature = "qlog")]
360 recovery_metrics: RecoveryMetrics::default(),
361 generation,
362 }
363 }
364
365 /// Create a new path from a previous one.
366 ///
367 /// This should only be called when migrating paths.
368 pub(super) fn from_previous(
369 network_path: FourTuple,
370 prev: &Self,
371 generation: u64,
372 now: Instant,
373 ) -> Self {
374 let congestion = prev.congestion.clone_box();
375 let smoothed_rtt = prev.rtt.get();
376 Self {
377 network_path,
378 rtt: prev.rtt,
379 pacing: Pacer::new(
380 smoothed_rtt,
381 congestion.window(),
382 prev.current_mtu(),
383 prev.pacing.max_bytes_per_second(),
384 now,
385 ),
386 sending_ecn: true,
387 congestion,
388 app_limited: false,
389 on_path_challenges_unconfirmed: Default::default(),
390 off_path_challenges_unconfirmed: Default::default(),
391 pending_on_path_challenge: false,
392 path_responses: PathResponses::default(),
393 validated: false,
394 total_sent: 0,
395 total_recvd: 0,
396 mtud: prev.mtud.clone(),
397 first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
398 in_flight: InFlight::new(),
399 observed_addr_sent: false,
400 last_observed_addr_report: None,
401 status: prev.status.clone(),
402 first_packet: None,
403 pto_count: 0,
404 idle_timeout: prev.idle_timeout,
405 keep_alive: prev.keep_alive,
406 permit_idle_reset: true,
407 open_status: OpenStatus::default(),
408 draining: false,
409 #[cfg(feature = "qlog")]
410 recovery_metrics: prev.recovery_metrics.clone(),
411 generation,
412 }
413 }
414
415 /// Whether we're in the process of validating this path with PATH_CHALLENGEs
416 pub(super) fn is_validating_path(&self) -> bool {
417 !self.on_path_challenges_unconfirmed.is_empty() || self.pending_on_path_challenge
418 }
419
420 /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
421 /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
422 pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
423 !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
424 }
425
426 /// Returns the path's current MTU
427 pub(super) fn current_mtu(&self) -> u16 {
428 self.mtud.current_mtu()
429 }
430
431 /// Account for transmission of `packet` with number `pn` in `space`
432 pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
433 self.in_flight.insert(&packet);
434 if self.first_packet.is_none() {
435 self.first_packet = Some(pn);
436 }
437 if let Some(forgotten) = space.sent(pn, packet) {
438 self.remove_in_flight(&forgotten);
439 }
440 }
441
442 pub(super) fn record_path_challenge_sent(
443 &mut self,
444 now: Instant,
445 token: u64,
446 network_path: FourTuple,
447 ) {
448 let info = SentChallengeInfo {
449 sent_instant: now,
450 network_path,
451 };
452 if network_path == self.network_path {
453 self.on_path_challenges_unconfirmed.insert(token, info);
454 } else {
455 self.off_path_challenges_unconfirmed.insert(token, info);
456 }
457 }
458
459 /// Remove `packet` with number `pn` from this path's congestion control counters, or return
460 /// `false` if `pn` was sent before this path was established.
461 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
462 if packet.path_generation != self.generation {
463 return false;
464 }
465 self.in_flight.remove(packet);
466 true
467 }
468
469 /// Increment the total size of sent UDP datagrams
470 pub(super) fn inc_total_sent(&mut self, inc: u64) {
471 self.total_sent = self.total_sent.saturating_add(inc);
472 if !self.validated {
473 trace!(
474 network_path = %self.network_path,
475 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
476 "anti amplification budget decreased"
477 );
478 }
479 }
480
481 /// Increment the total size of received UDP datagrams
482 pub(super) fn inc_total_recvd(&mut self, inc: u64) {
483 self.total_recvd = self.total_recvd.saturating_add(inc);
484 if !self.validated {
485 trace!(
486 network_path = %self.network_path,
487 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
488 "anti amplification budget increased"
489 );
490 }
491 }
492
493 /// The earliest time at which an on-path challenge we sent is considered lost.
494 pub(super) fn earliest_on_path_expiring_challenge(&self) -> Option<Instant> {
495 if self.on_path_challenges_unconfirmed.is_empty() {
496 return None;
497 }
498 let pto = self.rtt.pto_base();
499 self.on_path_challenges_unconfirmed
500 .values()
501 .map(|info| info.sent_instant + pto)
502 .min()
503 }
504
505 /// Handle receiving a PATH_RESPONSE.
506 pub(super) fn on_path_response_received(
507 &mut self,
508 now: Instant,
509 token: u64,
510 network_path: FourTuple,
511 ) -> OnPathResponseReceived {
512 // > § 8.2.3
513 // > Path validation succeeds when a PATH_RESPONSE frame is received that contains the
514 // > data that was sent in a previous PATH_CHALLENGE frame. A PATH_RESPONSE frame
515 // > received on any network path validates the path on which the PATH_CHALLENGE was
516 // > sent.
517 //
518 // At this point we have three potentially different network paths:
519 // - current network path (`Self::network_path`)
520 // - network path used to send the path challenge (`SentChallengeInfo::network_path`)
521 // - network path over which the response arrived (`network_path`)
522 //
523 // As per the spec, this only validates the network path on which this was *sent*.
524 match self.on_path_challenges_unconfirmed.remove(&token) {
525 // Response to an on-path PathChallenge that validates this path.
526 // The sent path should match the current path. However, it's possible that the
527 // challenge was sent when no local_ip was known. This case is allowed as well
528 Some(info) if info.network_path.is_probably_same_path(&self.network_path) => {
529 self.network_path.update_local_if_same_remote(&network_path);
530 let sent_instant = info.sent_instant;
531 if !std::mem::replace(&mut self.validated, true) {
532 trace!("new path validated");
533 }
534 // Clear any other on-path sent challenge
535 self.on_path_challenges_unconfirmed.clear();
536
537 self.pending_on_path_challenge = false;
538
539 // This RTT can only be used for the initial RTT, not as a normal
540 // sample: https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-2.
541 let rtt = now.saturating_duration_since(sent_instant);
542 self.rtt.reset_initial_rtt(rtt);
543
544 let prev_status = std::mem::replace(&mut self.open_status, OpenStatus::Informed);
545 OnPathResponseReceived::OnPath {
546 was_open: matches!(
547 prev_status,
548 OpenStatus::Informed | OpenStatus::Revalidating
549 ),
550 }
551 }
552 // Response to an on-path PathChallenge that does not validate this path
553 Some(info) => {
554 // This is a valid path response, but this validates a path we no longer have in
555 // use. Keep only sent challenges for the current path.
556
557 self.on_path_challenges_unconfirmed
558 .retain(|_token, i| i.network_path == self.network_path);
559
560 // if there are no challenges for the current path, schedule one
561 if !self.on_path_challenges_unconfirmed.is_empty() {
562 self.pending_on_path_challenge = true;
563 }
564 OnPathResponseReceived::Ignored {
565 sent_on: info.network_path,
566 current_path: self.network_path,
567 }
568 }
569 None => match self.off_path_challenges_unconfirmed.remove(&token) {
570 // Response to an off-path PathChallenge
571 Some(info) => {
572 // Since we do not store validation state for these paths, we only really care
573 // about reaching the same remote
574 self.off_path_challenges_unconfirmed
575 .retain(|_token, i| i.network_path.remote != info.network_path.remote);
576 OnPathResponseReceived::OffPath
577 }
578 // Response to an unknown PathChallenge. Does not indicate failure
579 None => OnPathResponseReceived::Unknown,
580 },
581 }
582 }
583
584 /// Removes all on-path challenges we remember and cancels sending new on-path challenges.
585 pub(super) fn reset_on_path_challenges(&mut self) {
586 self.on_path_challenges_unconfirmed.clear();
587 self.pending_on_path_challenge = false;
588 }
589
590 /// Returns whether there are any pending off-path challenges.
591 pub(super) fn has_off_path_challenges(&self) -> bool {
592 !self.off_path_challenges_unconfirmed.is_empty()
593 }
594
595 /// Clears all off-path challenges.
596 ///
597 /// Used when a new NAT traversal round starts and old probes are obsolete.
598 pub(super) fn clear_off_path_challenges(&mut self) {
599 self.off_path_challenges_unconfirmed.clear();
600 }
601
602 #[cfg(feature = "qlog")]
603 pub(super) fn qlog_recovery_metrics(
604 &mut self,
605 path_id: PathId,
606 ) -> Option<RecoveryMetricsUpdated> {
607 let controller_metrics = self.congestion.metrics();
608
609 let metrics = RecoveryMetrics {
610 min_rtt: Some(self.rtt.min),
611 smoothed_rtt: Some(self.rtt.get()),
612 latest_rtt: Some(self.rtt.latest),
613 rtt_variance: Some(self.rtt.var),
614 pto_count: Some(self.pto_count),
615 bytes_in_flight: Some(self.in_flight.bytes),
616 packets_in_flight: Some(self.in_flight.ack_eliciting),
617
618 congestion_window: Some(controller_metrics.congestion_window),
619 ssthresh: controller_metrics.ssthresh,
620 pacing_rate: controller_metrics.pacing_rate,
621 };
622
623 let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
624 self.recovery_metrics = metrics;
625 event
626 }
627
628 /// Return how long we need to wait before sending `bytes_to_send`
629 ///
630 /// See [`Pacer::delay`].
631 pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Duration> {
632 let smoothed_rtt = self.rtt.get();
633 self.pacing.delay(
634 smoothed_rtt,
635 bytes_to_send,
636 self.current_mtu(),
637 self.congestion.window(),
638 now,
639 )
640 }
641
642 /// Updates the last observed address report received on this path.
643 ///
644 /// If the address was updated, it's returned to be informed to the application.
645 #[must_use = "updated observed address must be reported to the application"]
646 pub(super) fn update_observed_addr_report(
647 &mut self,
648 observed: ObservedAddr,
649 ) -> Option<SocketAddr> {
650 match self.last_observed_addr_report.as_mut() {
651 Some(prev) => {
652 if prev.seq_no >= observed.seq_no {
653 // frames that do not increase the sequence number on this path are ignored
654 None
655 } else if prev.ip == observed.ip && prev.port == observed.port {
656 // keep track of the last seq_no but do not report the address as updated
657 prev.seq_no = observed.seq_no;
658 None
659 } else {
660 let addr = observed.socket_addr();
661 self.last_observed_addr_report = Some(observed);
662 Some(addr)
663 }
664 }
665 None => {
666 let addr = observed.socket_addr();
667 self.last_observed_addr_report = Some(observed);
668 Some(addr)
669 }
670 }
671 }
672
673 pub(crate) fn remote_status(&self) -> Option<PathStatus> {
674 self.status.remote_status.map(|(_seq, status)| status)
675 }
676
677 pub(crate) fn local_status(&self) -> PathStatus {
678 self.status.local_status
679 }
680
681 /// Tag uniquely identifying a path in a connection.
682 ///
683 /// When a migration happens on the same [`PathId`] we still detect a change in the
684 /// 4-tuple and generate a new [`PathData`] for it. Each such generation has a unique
685 /// value to keep track of which 4-tuple a packet belonged to.
686 pub(super) fn generation(&self) -> u64 {
687 self.generation
688 }
689}
690
691pub(super) enum OnPathResponseReceived {
692 /// This response validates the path on its current remote address.
693 OnPath { was_open: bool },
694 /// This response is valid, but it's for a remote other than the path's current remote address.
695 OffPath,
696 /// The received token is unknown.
697 Unknown,
698 /// The response is valid but it's not usable for path validation.
699 Ignored {
700 sent_on: FourTuple,
701 current_path: FourTuple,
702 },
703}
704
705#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
706pub(super) enum OpenStatus {
707 /// A first packet has not been sent using this [`PathId`].
708 #[default]
709 Pending,
710 /// The first packet has been sent using this [`PathId`]. However, it is not yet deemed good
711 /// enough to be reported to the application.
712 Sent,
713 /// The application has been informed of this path.
714 Informed,
715 /// The path was [`Self::Informed`] before, but we want to trigger path validation again.
716 ///
717 /// This is used to ensure we properly stop trying to re-send path challenges eventually, without
718 /// having to switch to [`Self::Pending`] when re-validating, as that would trigger another
719 /// application-level event about the path opening once validation succeeds.
720 Revalidating,
721}
722
723/// Congestion metrics as described in [`recovery_metrics_updated`].
724///
725/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
726#[cfg(feature = "qlog")]
727#[derive(Default, Clone, PartialEq, Debug)]
728#[non_exhaustive]
729struct RecoveryMetrics {
730 pub min_rtt: Option<Duration>,
731 pub smoothed_rtt: Option<Duration>,
732 pub latest_rtt: Option<Duration>,
733 pub rtt_variance: Option<Duration>,
734 pub pto_count: Option<u32>,
735 pub bytes_in_flight: Option<u64>,
736 pub packets_in_flight: Option<u64>,
737 pub congestion_window: Option<u64>,
738 pub ssthresh: Option<u64>,
739 pub pacing_rate: Option<u64>,
740}
741
742#[cfg(feature = "qlog")]
743impl RecoveryMetrics {
744 /// Retain only values that have been updated since the last snapshot.
745 fn retain_updated(&self, previous: &Self) -> Self {
746 macro_rules! keep_if_changed {
747 ($name:ident) => {
748 if previous.$name == self.$name {
749 None
750 } else {
751 self.$name
752 }
753 };
754 }
755
756 Self {
757 min_rtt: keep_if_changed!(min_rtt),
758 smoothed_rtt: keep_if_changed!(smoothed_rtt),
759 latest_rtt: keep_if_changed!(latest_rtt),
760 rtt_variance: keep_if_changed!(rtt_variance),
761 pto_count: keep_if_changed!(pto_count),
762 bytes_in_flight: keep_if_changed!(bytes_in_flight),
763 packets_in_flight: keep_if_changed!(packets_in_flight),
764 congestion_window: keep_if_changed!(congestion_window),
765 ssthresh: keep_if_changed!(ssthresh),
766 pacing_rate: keep_if_changed!(pacing_rate),
767 }
768 }
769
770 /// Emit a `MetricsUpdated` event containing only updated values
771 fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
772 let updated = self.retain_updated(previous);
773
774 if updated == Self::default() {
775 return None;
776 }
777
778 Some(RecoveryMetricsUpdated {
779 min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
780 smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
781 latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
782 rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
783 pto_count: updated
784 .pto_count
785 .map(|count| count.try_into().unwrap_or(u16::MAX)),
786 bytes_in_flight: updated.bytes_in_flight,
787 packets_in_flight: updated.packets_in_flight,
788 congestion_window: updated.congestion_window,
789 ssthresh: updated.ssthresh,
790 pacing_rate: updated.pacing_rate,
791 path_id: Some(path_id.as_u32() as u64),
792 })
793 }
794}
795
796/// RTT estimation for a particular network path
797#[derive(Copy, Clone, Debug)]
798pub struct RttEstimator {
799 /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
800 latest: Duration,
801 /// The smoothed RTT of the connection, computed as described in RFC6298
802 smoothed: Option<Duration>,
803 /// The RTT variance, computed as described in RFC6298
804 var: Duration,
805 /// The minimum RTT seen in the connection, ignoring ack delay.
806 min: Duration,
807}
808
809impl RttEstimator {
810 pub(super) fn new(initial_rtt: Duration) -> Self {
811 Self {
812 latest: initial_rtt,
813 smoothed: None,
814 var: initial_rtt / 2,
815 min: initial_rtt,
816 }
817 }
818
819 /// Resets the estimator using a new initial_rtt value.
820 ///
821 /// This only resets the initial_rtt **if** no samples have been recorded yet. If there
822 /// are any recorded samples the initial estimate can not be adjusted after the fact.
823 ///
824 /// This is useful when you receive a PATH_RESPONSE in the first packet received on a
825 /// new path. In this case you can use the delay of the PATH_CHALLENGE-PATH_RESPONSE as
826 /// the initial RTT to get a better expected estimation.
827 ///
828 /// A PATH_CHALLENGE-PATH_RESPONSE pair later in the connection should not be used
829 /// explicitly as an estimation since PATH_CHALLENGE is an ACK-eliciting packet itself
830 /// already.
831 pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
832 if self.smoothed.is_none() {
833 self.latest = initial_rtt;
834 self.var = initial_rtt / 2;
835 self.min = initial_rtt;
836 }
837 }
838
839 /// The current best RTT estimation.
840 pub fn get(&self) -> Duration {
841 self.smoothed.unwrap_or(self.latest)
842 }
843
844 /// Conservative estimate of RTT
845 ///
846 /// Takes the maximum of smoothed and latest RTT, as recommended
847 /// in 6.1.2 of the recovery spec (draft 29).
848 pub fn conservative(&self) -> Duration {
849 self.get().max(self.latest)
850 }
851
852 /// Minimum RTT registered so far for this estimator.
853 pub fn min(&self) -> Duration {
854 self.min
855 }
856
857 /// PTO computed as described in RFC9002#6.2.1.
858 pub(crate) fn pto_base(&self) -> Duration {
859 self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
860 }
861
862 /// Records an RTT sample.
863 pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
864 self.latest = rtt;
865 // https://www.rfc-editor.org/rfc/rfc9002.html#section-5.2-3:
866 // min_rtt does not adjust for ack_delay to avoid underestimating.
867 self.min = cmp::min(self.min, self.latest);
868 // Based on RFC6298.
869 if let Some(smoothed) = self.smoothed {
870 let adjusted_rtt = if self.min + ack_delay <= self.latest {
871 self.latest - ack_delay
872 } else {
873 self.latest
874 };
875 let var_sample = smoothed.abs_diff(adjusted_rtt);
876 self.var = (3 * self.var + var_sample) / 4;
877 self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
878 } else {
879 self.smoothed = Some(self.latest);
880 self.var = self.latest / 2;
881 self.min = self.latest;
882 }
883 }
884}
885
886#[derive(Default, Debug)]
887pub(crate) struct PathResponses {
888 pending: Vec<PathResponse>,
889}
890
891impl PathResponses {
892 pub(crate) fn push(&mut self, packet: u64, token: u64, network_path: FourTuple) {
893 /// Arbitrary permissive limit to prevent abuse
894 const MAX_PATH_RESPONSES: usize = 16;
895 let response = PathResponse {
896 packet,
897 token,
898 network_path,
899 };
900 let existing = self
901 .pending
902 .iter_mut()
903 .find(|x| x.network_path.remote == network_path.remote);
904 if let Some(existing) = existing {
905 // Update a queued response
906 if existing.packet <= packet {
907 *existing = response;
908 }
909 return;
910 }
911 if self.pending.len() < MAX_PATH_RESPONSES {
912 self.pending.push(response);
913 } else {
914 // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
915 // older challenges.
916 trace!("ignoring excessive PATH_CHALLENGE");
917 }
918 }
919
920 pub(crate) fn pop_off_path(&mut self, network_path: FourTuple) -> Option<(u64, FourTuple)> {
921 let response = *self.pending.last()?;
922 // We use an exact comparison here, because once we've received for the first time,
923 // we really should either already have a local_ip, or we will never get one
924 // (because our OS doesn't support it).
925 if response.network_path == network_path {
926 // We don't bother searching further because we expect that the on-path response will
927 // get drained in the immediate future by a call to `pop_on_path`
928 return None;
929 }
930 self.pending.pop();
931 Some((response.token, response.network_path))
932 }
933
934 pub(crate) fn pop_on_path(&mut self, network_path: FourTuple) -> Option<u64> {
935 let response = *self.pending.last()?;
936 // Using an exact comparison. See explanation in `pop_off_path`.
937 if response.network_path != network_path {
938 // We don't bother searching further because we expect that the off-path response will
939 // get drained in the immediate future by a call to `pop_off_path`
940 return None;
941 }
942 self.pending.pop();
943 Some(response.token)
944 }
945
946 pub(crate) fn is_empty(&self) -> bool {
947 self.pending.is_empty()
948 }
949}
950
951#[derive(Copy, Clone, Debug)]
952struct PathResponse {
953 /// The packet number the corresponding PATH_CHALLENGE was received in
954 packet: u64,
955 /// The token of the PATH_CHALLENGE
956 token: u64,
957 /// The path the corresponding PATH_CHALLENGE was received from
958 network_path: FourTuple,
959}
960
961/// Summary statistics of packets that have been sent on a particular path, but which have not yet
962/// been acked or deemed lost
963#[derive(Debug)]
964pub(super) struct InFlight {
965 /// Sum of the sizes of all sent packets considered "in flight" by congestion control
966 ///
967 /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
968 /// count towards this to ensure congestion control does not impede congestion feedback.
969 pub(super) bytes: u64,
970 /// Number of packets in flight containing frames other than ACK and PADDING
971 ///
972 /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
973 /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
974 /// also be nonzero.
975 pub(super) ack_eliciting: u64,
976}
977
978impl InFlight {
979 fn new() -> Self {
980 Self {
981 bytes: 0,
982 ack_eliciting: 0,
983 }
984 }
985
986 fn insert(&mut self, packet: &SentPacket) {
987 self.bytes += u64::from(packet.size);
988 self.ack_eliciting += u64::from(packet.ack_eliciting);
989 }
990
991 /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
992 fn remove(&mut self, packet: &SentPacket) {
993 self.bytes -= u64::from(packet.size);
994 self.ack_eliciting -= u64::from(packet.ack_eliciting);
995 }
996}
997
998/// State for QUIC-MULTIPATH PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP frames
999#[derive(Debug, Clone, Default)]
1000pub(super) struct PathStatusState {
1001 /// The local status
1002 local_status: PathStatus,
1003 /// Local sequence number, for both PATH_STATUS_AVAILABLE and PATH_STATUS_BACKUP
1004 ///
1005 /// This is the number of the *next* path status frame to be sent.
1006 local_seq: VarInt,
1007 /// The status set by the remote
1008 remote_status: Option<(VarInt, PathStatus)>,
1009}
1010
1011impl PathStatusState {
1012 /// To be called on received PATH_STATUS_AVAILABLE/PATH_STATUS_BACKUP frames
1013 pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
1014 if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
1015 return trace!(%seq, "ignoring path status update");
1016 }
1017
1018 let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
1019 if prev != Some(status) {
1020 debug!(?status, ?seq, "remote changed path status");
1021 }
1022 }
1023
1024 /// Updates the local status
1025 ///
1026 /// If the local status changed, the previous value is returned
1027 pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
1028 if self.local_status == status {
1029 return None;
1030 }
1031
1032 self.local_seq = self.local_seq.saturating_add(1u8);
1033 Some(std::mem::replace(&mut self.local_status, status))
1034 }
1035
1036 pub(crate) fn seq(&self) -> VarInt {
1037 self.local_seq
1038 }
1039}
1040
1041/// The QUIC-MULTIPATH path status
1042///
1043/// See section "3.3 Path Status Management":
1044/// <https://quicwg.org/multipath/draft-ietf-quic-multipath.html#name-path-status-management>
1045#[cfg_attr(test, derive(test_strategy::Arbitrary))]
1046#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
1047pub enum PathStatus {
1048 /// Paths marked with as available will be used when scheduling packets
1049 ///
1050 /// If multiple paths are available, packets will be scheduled on whichever has
1051 /// capacity.
1052 #[default]
1053 Available,
1054 /// Paths marked as backup will only be used if there are no available paths
1055 ///
1056 /// If the max_idle_timeout is specified the path will be kept alive so that it does not
1057 /// expire.
1058 Backup,
1059}
1060
1061/// Application events about paths
1062#[derive(Debug, Clone, PartialEq, Eq)]
1063pub enum PathEvent {
1064 /// A new path has been opened
1065 Opened {
1066 /// Which path is now open
1067 id: PathId,
1068 },
1069 /// A path was abandoned and is no longer usable.
1070 ///
1071 /// This event will always be followed by [`Self::Discarded`] after some time.
1072 Abandoned {
1073 /// With path was abandoned.
1074 id: PathId,
1075 /// Reason why this path was abandoned.
1076 reason: PathAbandonReason,
1077 },
1078 /// A path was discarded and all remaining state for it has been removed.
1079 ///
1080 /// This event is the last event for a path, and is always emitted after [`Self::Abandoned`].
1081 Discarded {
1082 /// Which path had its state dropped
1083 id: PathId,
1084 /// The final path stats, they are no longer available via [`Connection::stats`]
1085 ///
1086 /// [`Connection::stats`]: super::Connection::stats
1087 path_stats: Box<PathStats>,
1088 },
1089 /// The remote changed the status of the path
1090 ///
1091 /// The local status is not changed because of this event. It is up to the application
1092 /// to update the local status, which is used for packet scheduling, when the remote
1093 /// changes the status.
1094 RemoteStatus {
1095 /// Path which has changed status
1096 id: PathId,
1097 /// The new status set by the remote
1098 status: PathStatus,
1099 },
1100 /// Received an observation of our external address from the peer.
1101 ObservedAddr {
1102 /// Path over which the observed address was reported, [`PathId::ZERO`] when multipath is
1103 /// not negotiated
1104 id: PathId,
1105 /// The address observed by the remote over this path
1106 addr: SocketAddr,
1107 },
1108}
1109
1110/// Reason for why a path was abandoned.
1111#[derive(Debug, Clone, Eq, PartialEq)]
1112pub enum PathAbandonReason {
1113 /// The path was closed locally by the application.
1114 ApplicationClosed {
1115 /// The error code to be sent with the abandon frame.
1116 error_code: VarInt,
1117 },
1118 /// We didn't receive a path response in time after opening this path.
1119 ValidationFailed,
1120 /// We didn't receive any data from the remote within the path's idle timeout.
1121 TimedOut,
1122 /// The path became unusable after a local network change.
1123 UnusableAfterNetworkChange,
1124 /// The path was opened in a NAT traversal round which was terminated.
1125 NatTraversalRoundEnded,
1126 /// The remote closed the path.
1127 RemoteAbandoned {
1128 /// The error that was sent with the abandon frame.
1129 error_code: VarInt,
1130 },
1131}
1132
1133impl PathAbandonReason {
1134 /// Whether this abandon was initiated by the remote peer.
1135 pub(crate) fn is_remote(&self) -> bool {
1136 matches!(self, Self::RemoteAbandoned { .. })
1137 }
1138
1139 /// Returns the error code to send with a PATH_ABANDON frame.
1140 pub(crate) fn error_code(&self) -> TransportErrorCode {
1141 match self {
1142 Self::ApplicationClosed { error_code } => (*error_code).into(),
1143 Self::NatTraversalRoundEnded => TransportErrorCode::APPLICATION_ABANDON_PATH,
1144 Self::ValidationFailed | Self::TimedOut | Self::UnusableAfterNetworkChange => {
1145 TransportErrorCode::PATH_UNSTABLE_OR_POOR
1146 }
1147 Self::RemoteAbandoned { error_code } => (*error_code).into(),
1148 }
1149 }
1150}
1151
1152/// Error from setting path status
1153#[derive(Debug, Error, Clone, PartialEq, Eq)]
1154pub enum SetPathStatusError {
1155 /// Error indicating that a path has not been opened or has already been abandoned
1156 #[error("closed path")]
1157 ClosedPath,
1158 /// Error indicating that this operation requires multipath to be negotiated whereas it hasn't been
1159 #[error("multipath not negotiated")]
1160 MultipathNotNegotiated,
1161}
1162
1163/// Error indicating that a path has not been opened or has already been abandoned
1164#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
1165#[error("closed path")]
1166pub struct ClosedPath {
1167 pub(super) _private: (),
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use super::*;
1173
1174 #[test]
1175 fn test_path_id_saturating_add() {
1176 // add within range behaves normally
1177 let large: PathId = u16::MAX.into();
1178 let next = u32::from(u16::MAX) + 1;
1179 assert_eq!(large.saturating_add(1u8), PathId::from(next));
1180
1181 // outside range saturates
1182 assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
1183 }
1184}