1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8 PathError, PathStats,
9 mtud::MtuDiscovery,
10 pacing::Pacer,
11 spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14 ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig, VarInt,
15 coding::{self, Decodable, Encodable},
16 congestion,
17 frame::ObservedAddr,
18 packet::SpaceId,
19};
20
21#[cfg(feature = "qlog")]
22use qlog::events::quic::RecoveryMetricsUpdated;
23
24#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
26pub struct PathId(pub(crate) u32);
27
28impl std::hash::Hash for PathId {
29 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
30 state.write_u32(self.0);
31 }
32}
33
34impl identity_hash::IdentityHashable for PathId {}
35
36impl Decodable for PathId {
37 fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
38 let v = VarInt::decode(r)?;
39 let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
40 Ok(Self(v))
41 }
42}
43
44impl Encodable for PathId {
45 fn encode<B: bytes::BufMut>(&self, w: &mut B) {
46 VarInt(self.0.into()).encode(w)
47 }
48}
49
50impl PathId {
51 pub const MAX: Self = Self(u32::MAX);
53
54 pub const ZERO: Self = Self(0);
56
57 pub(crate) const fn size(&self) -> usize {
59 VarInt(self.0 as u64).size()
60 }
61
62 pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
65 let rhs = rhs.into();
66 let inner = self.0.saturating_add(rhs.0);
67 Self(inner)
68 }
69
70 pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
73 let rhs = rhs.into();
74 let inner = self.0.saturating_sub(rhs.0);
75 Self(inner)
76 }
77
78 pub(crate) fn next(&self) -> Self {
80 self.saturating_add(Self(1))
81 }
82
83 pub(crate) fn as_u32(&self) -> u32 {
85 self.0
86 }
87}
88
89impl std::fmt::Display for PathId {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 self.0.fmt(f)
92 }
93}
94
95impl<T: Into<u32>> From<T> for PathId {
96 fn from(source: T) -> Self {
97 Self(source.into())
98 }
99}
100
101#[derive(Debug)]
108pub(super) struct PathState {
109 pub(super) data: PathData,
110 pub(super) prev: Option<(ConnectionId, PathData)>,
111}
112
113impl PathState {
114 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
116 for path_data in [&mut self.data]
118 .into_iter()
119 .chain(self.prev.as_mut().map(|(_, data)| data))
120 {
121 if path_data.remove_in_flight(packet) {
122 return;
123 }
124 }
125 }
126}
127
128#[derive(Debug)]
129pub(super) struct SentChallengeInfo {
130 pub(super) sent_instant: Instant,
132 pub(super) network_path: FourTuple,
134}
135
136#[derive(Debug)]
138pub(super) struct PathData {
139 pub(super) network_path: FourTuple,
140 pub(super) rtt: RttEstimator,
141 pub(super) sending_ecn: bool,
143 pub(super) congestion: Box<dyn congestion::Controller>,
145 pub(super) pacing: Pacer,
147 pub(super) challenges_sent: IntMap<u64, SentChallengeInfo>,
149 pub(super) send_new_challenge: bool,
153 pub(super) path_responses: PathResponses,
155 pub(super) validated: bool,
160 pub(super) total_sent: u64,
162 pub(super) total_recvd: u64,
164 pub(super) mtud: MtuDiscovery,
166 pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
170 pub(super) in_flight: InFlight,
174 pub(super) observed_addr_sent: bool,
177 pub(super) last_observed_addr_report: Option<ObservedAddr>,
179 pub(super) status: PathStatusState,
181 first_packet: Option<u64>,
188 pub(super) pto_count: u32,
190
191 pub(super) idle_timeout: Option<Duration>,
199 pub(super) keep_alive: Option<Duration>,
207
208 pub(super) open: bool,
214
215 pub(super) last_allowed_receive: Option<Instant>,
222
223 #[cfg(feature = "qlog")]
225 recovery_metrics: RecoveryMetrics,
226
227 generation: u64,
229}
230
231impl PathData {
232 pub(super) fn new(
233 network_path: FourTuple,
234 allow_mtud: bool,
235 peer_max_udp_payload_size: Option<u16>,
236 generation: u64,
237 now: Instant,
238 config: &TransportConfig,
239 ) -> Self {
240 let congestion = config
241 .congestion_controller_factory
242 .clone()
243 .build(now, config.get_initial_mtu());
244 Self {
245 network_path,
246 rtt: RttEstimator::new(config.initial_rtt),
247 sending_ecn: true,
248 pacing: Pacer::new(
249 config.initial_rtt,
250 congestion.initial_window(),
251 config.get_initial_mtu(),
252 now,
253 ),
254 congestion,
255 challenges_sent: Default::default(),
256 send_new_challenge: false,
257 path_responses: PathResponses::default(),
258 validated: false,
259 total_sent: 0,
260 total_recvd: 0,
261 mtud: config
262 .mtu_discovery_config
263 .as_ref()
264 .filter(|_| allow_mtud)
265 .map_or(
266 MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
267 |mtud_config| {
268 MtuDiscovery::new(
269 config.get_initial_mtu(),
270 config.min_mtu,
271 peer_max_udp_payload_size,
272 mtud_config.clone(),
273 )
274 },
275 ),
276 first_packet_after_rtt_sample: None,
277 in_flight: InFlight::new(),
278 observed_addr_sent: false,
279 last_observed_addr_report: None,
280 status: Default::default(),
281 first_packet: None,
282 pto_count: 0,
283 idle_timeout: config.default_path_max_idle_timeout,
284 keep_alive: config.default_path_keep_alive_interval,
285 open: false,
286 last_allowed_receive: None,
287 #[cfg(feature = "qlog")]
288 recovery_metrics: RecoveryMetrics::default(),
289 generation,
290 }
291 }
292
293 pub(super) fn from_previous(
297 network_path: FourTuple,
298 prev: &Self,
299 generation: u64,
300 now: Instant,
301 ) -> Self {
302 let congestion = prev.congestion.clone_box();
303 let smoothed_rtt = prev.rtt.get();
304 Self {
305 network_path,
306 rtt: prev.rtt,
307 pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
308 sending_ecn: true,
309 congestion,
310 challenges_sent: Default::default(),
311 send_new_challenge: false,
312 path_responses: PathResponses::default(),
313 validated: false,
314 total_sent: 0,
315 total_recvd: 0,
316 mtud: prev.mtud.clone(),
317 first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
318 in_flight: InFlight::new(),
319 observed_addr_sent: false,
320 last_observed_addr_report: None,
321 status: prev.status.clone(),
322 first_packet: None,
323 pto_count: 0,
324 idle_timeout: prev.idle_timeout,
325 keep_alive: prev.keep_alive,
326 open: false,
327 last_allowed_receive: None,
328 #[cfg(feature = "qlog")]
329 recovery_metrics: prev.recovery_metrics.clone(),
330 generation,
331 }
332 }
333
334 pub(super) fn is_validating_path(&self) -> bool {
336 !self.challenges_sent.is_empty() || self.send_new_challenge
337 }
338
339 pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
342 !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
343 }
344
345 pub(super) fn current_mtu(&self) -> u16 {
347 self.mtud.current_mtu()
348 }
349
350 pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
352 self.in_flight.insert(&packet);
353 if self.first_packet.is_none() {
354 self.first_packet = Some(pn);
355 }
356 if let Some(forgotten) = space.sent(pn, packet) {
357 self.remove_in_flight(&forgotten);
358 }
359 }
360
361 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
364 if packet.path_generation != self.generation {
365 return false;
366 }
367 self.in_flight.remove(packet);
368 true
369 }
370
371 pub(super) fn inc_total_sent(&mut self, inc: u64) {
373 self.total_sent = self.total_sent.saturating_add(inc);
374 if !self.validated {
375 trace!(
376 network_path = %self.network_path,
377 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
378 "anti amplification budget decreased"
379 );
380 }
381 }
382
383 pub(super) fn inc_total_recvd(&mut self, inc: u64) {
385 self.total_recvd = self.total_recvd.saturating_add(inc);
386 if !self.validated {
387 trace!(
388 network_path = %self.network_path,
389 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
390 "anti amplification budget increased"
391 );
392 }
393 }
394
395 pub(super) fn earliest_expiring_challenge(&self) -> Option<Instant> {
397 if self.challenges_sent.is_empty() {
398 return None;
399 }
400 let pto = self.rtt.pto_base();
401 self.challenges_sent
402 .values()
403 .map(|info| info.sent_instant)
404 .min()
405 .map(|sent_instant| sent_instant + pto)
406 }
407
408 pub(super) fn on_path_response_received(
410 &mut self,
411 now: Instant,
412 token: u64,
413 network_path: FourTuple,
414 ) -> OnPathResponseReceived {
415 match self.challenges_sent.get(&token) {
416 Some(info)
418 if info.network_path.is_probably_same_path(&network_path)
419 && self.network_path.is_probably_same_path(&network_path) =>
420 {
421 self.network_path.update_local_if_same_remote(&network_path);
422 let sent_instant = info.sent_instant;
423 if !std::mem::replace(&mut self.validated, true) {
424 trace!("new path validated");
425 }
426 self.challenges_sent
428 .retain(|_token, info| !info.network_path.is_probably_same_path(&network_path));
429
430 self.send_new_challenge = false;
431
432 let rtt = now.saturating_duration_since(sent_instant);
435 self.rtt.reset_initial_rtt(rtt);
436
437 let was_open = std::mem::replace(&mut self.open, true);
438 OnPathResponseReceived::OnPath { was_open }
439 }
440 Some(info) if info.network_path.is_probably_same_path(&network_path) => {
442 self.challenges_sent
443 .retain(|_token, info| !info.network_path.is_probably_same_path(&network_path));
444 OnPathResponseReceived::OffPath
445 }
446 Some(info) => OnPathResponseReceived::Invalid {
448 expected: info.network_path,
449 },
450 None => OnPathResponseReceived::Unknown,
452 }
453 }
454
455 #[cfg(feature = "qlog")]
456 pub(super) fn qlog_recovery_metrics(
457 &mut self,
458 path_id: PathId,
459 ) -> Option<RecoveryMetricsUpdated> {
460 let controller_metrics = self.congestion.metrics();
461
462 let metrics = RecoveryMetrics {
463 min_rtt: Some(self.rtt.min),
464 smoothed_rtt: Some(self.rtt.get()),
465 latest_rtt: Some(self.rtt.latest),
466 rtt_variance: Some(self.rtt.var),
467 pto_count: Some(self.pto_count),
468 bytes_in_flight: Some(self.in_flight.bytes),
469 packets_in_flight: Some(self.in_flight.ack_eliciting),
470
471 congestion_window: Some(controller_metrics.congestion_window),
472 ssthresh: controller_metrics.ssthresh,
473 pacing_rate: controller_metrics.pacing_rate,
474 };
475
476 let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
477 self.recovery_metrics = metrics;
478 event
479 }
480
481 pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Instant> {
485 let smoothed_rtt = self.rtt.get();
486 self.pacing.delay(
487 smoothed_rtt,
488 bytes_to_send,
489 self.current_mtu(),
490 self.congestion.window(),
491 now,
492 )
493 }
494
495 #[must_use = "updated observed address must be reported to the application"]
499 pub(super) fn update_observed_addr_report(
500 &mut self,
501 observed: ObservedAddr,
502 ) -> Option<SocketAddr> {
503 match self.last_observed_addr_report.as_mut() {
504 Some(prev) => {
505 if prev.seq_no >= observed.seq_no {
506 None
508 } else if prev.ip == observed.ip && prev.port == observed.port {
509 prev.seq_no = observed.seq_no;
511 None
512 } else {
513 let addr = observed.socket_addr();
514 self.last_observed_addr_report = Some(observed);
515 Some(addr)
516 }
517 }
518 None => {
519 let addr = observed.socket_addr();
520 self.last_observed_addr_report = Some(observed);
521 Some(addr)
522 }
523 }
524 }
525
526 pub(crate) fn remote_status(&self) -> Option<PathStatus> {
527 self.status.remote_status.map(|(_seq, status)| status)
528 }
529
530 pub(crate) fn local_status(&self) -> PathStatus {
531 self.status.local_status
532 }
533
534 pub(super) fn generation(&self) -> u64 {
535 self.generation
536 }
537}
538
539pub(super) enum OnPathResponseReceived {
540 OnPath { was_open: bool },
542 OffPath,
544 Unknown,
546 Invalid {
548 expected: FourTuple,
550 },
551}
552
553#[cfg(feature = "qlog")]
557#[derive(Default, Clone, PartialEq, Debug)]
558#[non_exhaustive]
559struct RecoveryMetrics {
560 pub min_rtt: Option<Duration>,
561 pub smoothed_rtt: Option<Duration>,
562 pub latest_rtt: Option<Duration>,
563 pub rtt_variance: Option<Duration>,
564 pub pto_count: Option<u32>,
565 pub bytes_in_flight: Option<u64>,
566 pub packets_in_flight: Option<u64>,
567 pub congestion_window: Option<u64>,
568 pub ssthresh: Option<u64>,
569 pub pacing_rate: Option<u64>,
570}
571
572#[cfg(feature = "qlog")]
573impl RecoveryMetrics {
574 fn retain_updated(&self, previous: &Self) -> Self {
576 macro_rules! keep_if_changed {
577 ($name:ident) => {
578 if previous.$name == self.$name {
579 None
580 } else {
581 self.$name
582 }
583 };
584 }
585
586 Self {
587 min_rtt: keep_if_changed!(min_rtt),
588 smoothed_rtt: keep_if_changed!(smoothed_rtt),
589 latest_rtt: keep_if_changed!(latest_rtt),
590 rtt_variance: keep_if_changed!(rtt_variance),
591 pto_count: keep_if_changed!(pto_count),
592 bytes_in_flight: keep_if_changed!(bytes_in_flight),
593 packets_in_flight: keep_if_changed!(packets_in_flight),
594 congestion_window: keep_if_changed!(congestion_window),
595 ssthresh: keep_if_changed!(ssthresh),
596 pacing_rate: keep_if_changed!(pacing_rate),
597 }
598 }
599
600 fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
602 let updated = self.retain_updated(previous);
603
604 if updated == Self::default() {
605 return None;
606 }
607
608 Some(RecoveryMetricsUpdated {
609 min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
610 smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
611 latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
612 rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
613 pto_count: updated
614 .pto_count
615 .map(|count| count.try_into().unwrap_or(u16::MAX)),
616 bytes_in_flight: updated.bytes_in_flight,
617 packets_in_flight: updated.packets_in_flight,
618 congestion_window: updated.congestion_window,
619 ssthresh: updated.ssthresh,
620 pacing_rate: updated.pacing_rate,
621 path_id: Some(path_id.as_u32() as u64),
622 })
623 }
624}
625
626#[derive(Copy, Clone, Debug)]
628pub struct RttEstimator {
629 latest: Duration,
631 smoothed: Option<Duration>,
633 var: Duration,
635 min: Duration,
637}
638
639impl RttEstimator {
640 pub(super) fn new(initial_rtt: Duration) -> Self {
641 Self {
642 latest: initial_rtt,
643 smoothed: None,
644 var: initial_rtt / 2,
645 min: initial_rtt,
646 }
647 }
648
649 pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
662 if self.smoothed.is_none() {
663 self.latest = initial_rtt;
664 self.var = initial_rtt / 2;
665 self.min = initial_rtt;
666 }
667 }
668
669 pub fn get(&self) -> Duration {
671 self.smoothed.unwrap_or(self.latest)
672 }
673
674 pub fn conservative(&self) -> Duration {
679 self.get().max(self.latest)
680 }
681
682 pub fn min(&self) -> Duration {
684 self.min
685 }
686
687 pub(crate) fn pto_base(&self) -> Duration {
689 self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
690 }
691
692 pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
694 self.latest = rtt;
695 self.min = cmp::min(self.min, self.latest);
698 if let Some(smoothed) = self.smoothed {
700 let adjusted_rtt = if self.min + ack_delay <= self.latest {
701 self.latest - ack_delay
702 } else {
703 self.latest
704 };
705 let var_sample = smoothed.abs_diff(adjusted_rtt);
706 self.var = (3 * self.var + var_sample) / 4;
707 self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
708 } else {
709 self.smoothed = Some(self.latest);
710 self.var = self.latest / 2;
711 self.min = self.latest;
712 }
713 }
714}
715
716#[derive(Default, Debug)]
717pub(crate) struct PathResponses {
718 pending: Vec<PathResponse>,
719}
720
721impl PathResponses {
722 pub(crate) fn push(&mut self, packet: u64, token: u64, network_path: FourTuple) {
723 const MAX_PATH_RESPONSES: usize = 16;
725 let response = PathResponse {
726 packet,
727 token,
728 network_path,
729 };
730 let existing = self
731 .pending
732 .iter_mut()
733 .find(|x| x.network_path.remote == network_path.remote);
734 if let Some(existing) = existing {
735 if existing.packet <= packet {
737 *existing = response;
738 }
739 return;
740 }
741 if self.pending.len() < MAX_PATH_RESPONSES {
742 self.pending.push(response);
743 } else {
744 trace!("ignoring excessive PATH_CHALLENGE");
747 }
748 }
749
750 pub(crate) fn pop_off_path(&mut self, network_path: FourTuple) -> Option<(u64, FourTuple)> {
751 let response = *self.pending.last()?;
752 if response.network_path == network_path {
756 return None;
759 }
760 self.pending.pop();
761 Some((response.token, response.network_path))
762 }
763
764 pub(crate) fn pop_on_path(&mut self, network_path: FourTuple) -> Option<u64> {
765 let response = *self.pending.last()?;
766 if response.network_path != network_path {
768 return None;
771 }
772 self.pending.pop();
773 Some(response.token)
774 }
775
776 pub(crate) fn is_empty(&self) -> bool {
777 self.pending.is_empty()
778 }
779}
780
781#[derive(Copy, Clone, Debug)]
782struct PathResponse {
783 packet: u64,
785 token: u64,
787 network_path: FourTuple,
789}
790
791#[derive(Debug)]
794pub(super) struct InFlight {
795 pub(super) bytes: u64,
800 pub(super) ack_eliciting: u64,
806}
807
808impl InFlight {
809 fn new() -> Self {
810 Self {
811 bytes: 0,
812 ack_eliciting: 0,
813 }
814 }
815
816 fn insert(&mut self, packet: &SentPacket) {
817 self.bytes += u64::from(packet.size);
818 self.ack_eliciting += u64::from(packet.ack_eliciting);
819 }
820
821 fn remove(&mut self, packet: &SentPacket) {
823 self.bytes -= u64::from(packet.size);
824 self.ack_eliciting -= u64::from(packet.ack_eliciting);
825 }
826}
827
828#[derive(Debug, Clone, Default)]
830pub(super) struct PathStatusState {
831 local_status: PathStatus,
833 local_seq: VarInt,
837 remote_status: Option<(VarInt, PathStatus)>,
839}
840
841impl PathStatusState {
842 pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
844 if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
845 return trace!(%seq, "ignoring path status update");
846 }
847
848 let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
849 if prev != Some(status) {
850 debug!(?status, ?seq, "remote changed path status");
851 }
852 }
853
854 pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
858 if self.local_status == status {
859 return None;
860 }
861
862 self.local_seq = self.local_seq.saturating_add(1u8);
863 Some(std::mem::replace(&mut self.local_status, status))
864 }
865
866 pub(crate) fn seq(&self) -> VarInt {
867 self.local_seq
868 }
869}
870
871#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
876pub enum PathStatus {
877 #[default]
882 Available,
883 Backup,
888}
889
890#[derive(Debug, Clone, PartialEq, Eq)]
892pub enum PathEvent {
893 Opened {
895 id: PathId,
897 },
898 Closed {
900 id: PathId,
902 error_code: VarInt,
906 },
907 Abandoned {
911 id: PathId,
913 path_stats: PathStats,
917 },
918 LocallyClosed {
920 id: PathId,
922 error: PathError,
924 },
925 RemoteStatus {
931 id: PathId,
933 status: PathStatus,
935 },
936 ObservedAddr {
938 id: PathId,
941 addr: SocketAddr,
943 },
944}
945
946#[derive(Debug, Error, Clone, PartialEq, Eq)]
948pub enum SetPathStatusError {
949 #[error("closed path")]
951 ClosedPath,
952 #[error("multipath not negotiated")]
954 MultipathNotNegotiated,
955}
956
957#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
959#[error("closed path")]
960pub struct ClosedPath {
961 pub(super) _private: (),
962}
963
964#[cfg(test)]
965mod tests {
966 use super::*;
967
968 #[test]
969 fn test_path_id_saturating_add() {
970 let large: PathId = u16::MAX.into();
972 let next = u32::from(u16::MAX) + 1;
973 assert_eq!(large.saturating_add(1u8), PathId::from(next));
974
975 assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
977 }
978}