1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8 PathError, PathStats,
9 mtud::MtuDiscovery,
10 pacing::Pacer,
11 spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14 ConnectionId, Duration, Instant, TIMER_GRANULARITY, TransportConfig, VarInt, coding,
15 congestion, frame::ObservedAddr, packet::SpaceId,
16};
17
18#[cfg(feature = "qlog")]
19use qlog::events::quic::RecoveryMetricsUpdated;
20
21#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
23pub struct PathId(pub(crate) u32);
24
25impl std::hash::Hash for PathId {
26 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
27 state.write_u32(self.0);
28 }
29}
30
31impl identity_hash::IdentityHashable for PathId {}
32
33impl coding::Codec for PathId {
34 fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
35 let v = VarInt::decode(r)?;
36 let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
37 Ok(Self(v))
38 }
39
40 fn encode<B: bytes::BufMut>(&self, w: &mut B) {
41 VarInt(self.0.into()).encode(w)
42 }
43}
44
45impl PathId {
46 pub const MAX: Self = Self(u32::MAX);
48
49 pub const ZERO: Self = Self(0);
51
52 pub(crate) const fn size(&self) -> usize {
54 VarInt(self.0 as u64).size()
55 }
56
57 pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
60 let rhs = rhs.into();
61 let inner = self.0.saturating_add(rhs.0);
62 Self(inner)
63 }
64
65 pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
68 let rhs = rhs.into();
69 let inner = self.0.saturating_sub(rhs.0);
70 Self(inner)
71 }
72
73 pub(crate) fn next(&self) -> Self {
75 self.saturating_add(Self(1))
76 }
77
78 pub(crate) fn as_u32(&self) -> u32 {
80 self.0
81 }
82}
83
84impl std::fmt::Display for PathId {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 self.0.fmt(f)
87 }
88}
89
90impl<T: Into<u32>> From<T> for PathId {
91 fn from(source: T) -> Self {
92 Self(source.into())
93 }
94}
95
96#[derive(Debug)]
103pub(super) struct PathState {
104 pub(super) data: PathData,
105 pub(super) prev: Option<(ConnectionId, PathData)>,
106}
107
108impl PathState {
109 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
111 for path_data in [&mut self.data]
113 .into_iter()
114 .chain(self.prev.as_mut().map(|(_, data)| data))
115 {
116 if path_data.remove_in_flight(packet) {
117 return;
118 }
119 }
120 }
121}
122
123#[derive(Debug)]
124pub(super) struct SentChallengeInfo {
125 pub(super) sent_instant: Instant,
127 pub(super) remote: SocketAddr,
129}
130
131#[derive(Debug)]
133pub(super) struct PathData {
134 pub(super) remote: SocketAddr,
135 pub(super) rtt: RttEstimator,
136 pub(super) sending_ecn: bool,
138 pub(super) congestion: Box<dyn congestion::Controller>,
140 pub(super) pacing: Pacer,
142 pub(super) challenges_sent: IntMap<u64, SentChallengeInfo>,
144 pub(super) send_new_challenge: bool,
148 pub(super) path_responses: PathResponses,
150 pub(super) validated: bool,
155 pub(super) total_sent: u64,
157 pub(super) total_recvd: u64,
159 pub(super) mtud: MtuDiscovery,
161 pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
165 pub(super) in_flight: InFlight,
169 pub(super) observed_addr_sent: bool,
172 pub(super) last_observed_addr_report: Option<ObservedAddr>,
174 pub(super) status: PathStatusState,
176 first_packet: Option<u64>,
183 pub(super) pto_count: u32,
185
186 pub(super) idle_timeout: Option<Duration>,
194 pub(super) keep_alive: Option<Duration>,
202
203 pub(super) open: bool,
209
210 pub(super) abandon_state: AbandonState,
212
213 #[cfg(feature = "qlog")]
215 recovery_metrics: RecoveryMetrics,
216
217 generation: u64,
219}
220
221#[derive(Debug)]
223pub(super) enum AbandonState {
224 NotAbandoned,
226 ExpectingPathAbandon { deadline: Instant },
234 ReceivedPathAbandon,
237}
238
239impl PathData {
240 pub(super) fn new(
241 remote: SocketAddr,
242 allow_mtud: bool,
243 peer_max_udp_payload_size: Option<u16>,
244 generation: u64,
245 now: Instant,
246 config: &TransportConfig,
247 ) -> Self {
248 let congestion = config
249 .congestion_controller_factory
250 .clone()
251 .build(now, config.get_initial_mtu());
252 Self {
253 remote,
254 rtt: RttEstimator::new(config.initial_rtt),
255 sending_ecn: true,
256 pacing: Pacer::new(
257 config.initial_rtt,
258 congestion.initial_window(),
259 config.get_initial_mtu(),
260 now,
261 ),
262 congestion,
263 challenges_sent: Default::default(),
264 send_new_challenge: false,
265 path_responses: PathResponses::default(),
266 validated: false,
267 total_sent: 0,
268 total_recvd: 0,
269 mtud: config
270 .mtu_discovery_config
271 .as_ref()
272 .filter(|_| allow_mtud)
273 .map_or(
274 MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
275 |mtud_config| {
276 MtuDiscovery::new(
277 config.get_initial_mtu(),
278 config.min_mtu,
279 peer_max_udp_payload_size,
280 mtud_config.clone(),
281 )
282 },
283 ),
284 first_packet_after_rtt_sample: None,
285 in_flight: InFlight::new(),
286 observed_addr_sent: false,
287 last_observed_addr_report: None,
288 status: Default::default(),
289 first_packet: None,
290 pto_count: 0,
291 idle_timeout: config.default_path_max_idle_timeout,
292 keep_alive: config.default_path_keep_alive_interval,
293 open: false,
294 abandon_state: AbandonState::NotAbandoned,
295 #[cfg(feature = "qlog")]
296 recovery_metrics: RecoveryMetrics::default(),
297 generation,
298 }
299 }
300
301 pub(super) fn from_previous(
305 remote: SocketAddr,
306 prev: &Self,
307 generation: u64,
308 now: Instant,
309 ) -> Self {
310 let congestion = prev.congestion.clone_box();
311 let smoothed_rtt = prev.rtt.get();
312 Self {
313 remote,
314 rtt: prev.rtt,
315 pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
316 sending_ecn: true,
317 congestion,
318 challenges_sent: Default::default(),
319 send_new_challenge: false,
320 path_responses: PathResponses::default(),
321 validated: false,
322 total_sent: 0,
323 total_recvd: 0,
324 mtud: prev.mtud.clone(),
325 first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
326 in_flight: InFlight::new(),
327 observed_addr_sent: false,
328 last_observed_addr_report: None,
329 status: prev.status.clone(),
330 first_packet: None,
331 pto_count: 0,
332 idle_timeout: prev.idle_timeout,
333 keep_alive: prev.keep_alive,
334 open: false,
335 abandon_state: AbandonState::NotAbandoned,
336 #[cfg(feature = "qlog")]
337 recovery_metrics: prev.recovery_metrics.clone(),
338 generation,
339 }
340 }
341
342 pub(super) fn is_validating_path(&self) -> bool {
344 !self.challenges_sent.is_empty() || self.send_new_challenge
345 }
346
347 pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
350 !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
351 }
352
353 pub(super) fn current_mtu(&self) -> u16 {
355 self.mtud.current_mtu()
356 }
357
358 pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
360 self.in_flight.insert(&packet);
361 if self.first_packet.is_none() {
362 self.first_packet = Some(pn);
363 }
364 if let Some(forgotten) = space.sent(pn, packet) {
365 self.remove_in_flight(&forgotten);
366 }
367 }
368
369 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
372 if packet.path_generation != self.generation {
373 return false;
374 }
375 self.in_flight.remove(packet);
376 true
377 }
378
379 pub(super) fn inc_total_sent(&mut self, inc: u64) {
381 self.total_sent = self.total_sent.saturating_add(inc);
382 if !self.validated {
383 trace!(
384 remote = %self.remote,
385 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
386 "anti amplification budget decreased"
387 );
388 }
389 }
390
391 pub(super) fn inc_total_recvd(&mut self, inc: u64) {
393 self.total_recvd = self.total_recvd.saturating_add(inc);
394 if !self.validated {
395 trace!(
396 remote = %self.remote,
397 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
398 "anti amplification budget increased"
399 );
400 }
401 }
402
403 pub(super) fn earliest_expiring_challenge(&self) -> Option<Instant> {
405 if self.challenges_sent.is_empty() {
406 return None;
407 }
408 let pto = self.rtt.pto_base();
409 self.challenges_sent
410 .values()
411 .map(|info| info.sent_instant)
412 .min()
413 .map(|sent_instant| sent_instant + pto)
414 }
415
416 pub(super) fn on_path_response_received(
418 &mut self,
419 now: Instant,
420 token: u64,
421 remote: SocketAddr,
422 ) -> OnPathResponseReceived {
423 match self.challenges_sent.get(&token) {
424 Some(info) if info.remote == remote && self.remote == remote => {
426 let sent_instant = info.sent_instant;
427 if !std::mem::replace(&mut self.validated, true) {
428 trace!("new path validated");
429 }
430 self.challenges_sent
432 .retain(|_token, info| info.remote != remote);
433
434 self.send_new_challenge = false;
435
436 let rtt = now.saturating_duration_since(sent_instant);
439 self.rtt.reset_initial_rtt(rtt);
440
441 let was_open = std::mem::replace(&mut self.open, true);
442 OnPathResponseReceived::OnPath { was_open }
443 }
444 Some(info) if info.remote == remote => {
446 self.challenges_sent
447 .retain(|_token, info| info.remote != remote);
448 OnPathResponseReceived::OffPath
449 }
450 Some(info) => OnPathResponseReceived::Invalid {
452 expected: info.remote,
453 },
454 None => OnPathResponseReceived::Unknown,
456 }
457 }
458
459 #[cfg(feature = "qlog")]
460 pub(super) fn qlog_recovery_metrics(
461 &mut self,
462 path_id: PathId,
463 ) -> Option<RecoveryMetricsUpdated> {
464 let controller_metrics = self.congestion.metrics();
465
466 let metrics = RecoveryMetrics {
467 min_rtt: Some(self.rtt.min),
468 smoothed_rtt: Some(self.rtt.get()),
469 latest_rtt: Some(self.rtt.latest),
470 rtt_variance: Some(self.rtt.var),
471 pto_count: Some(self.pto_count),
472 bytes_in_flight: Some(self.in_flight.bytes),
473 packets_in_flight: Some(self.in_flight.ack_eliciting),
474
475 congestion_window: Some(controller_metrics.congestion_window),
476 ssthresh: controller_metrics.ssthresh,
477 pacing_rate: controller_metrics.pacing_rate,
478 };
479
480 let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
481 self.recovery_metrics = metrics;
482 event
483 }
484
485 pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Instant> {
489 let smoothed_rtt = self.rtt.get();
490 self.pacing.delay(
491 smoothed_rtt,
492 bytes_to_send,
493 self.current_mtu(),
494 self.congestion.window(),
495 now,
496 )
497 }
498
499 #[must_use = "updated observed address must be reported to the application"]
503 pub(super) fn update_observed_addr_report(
504 &mut self,
505 observed: ObservedAddr,
506 ) -> Option<SocketAddr> {
507 match self.last_observed_addr_report.as_mut() {
508 Some(prev) => {
509 if prev.seq_no >= observed.seq_no {
510 None
512 } else if prev.ip == observed.ip && prev.port == observed.port {
513 prev.seq_no = observed.seq_no;
515 None
516 } else {
517 let addr = observed.socket_addr();
518 self.last_observed_addr_report = Some(observed);
519 Some(addr)
520 }
521 }
522 None => {
523 let addr = observed.socket_addr();
524 self.last_observed_addr_report = Some(observed);
525 Some(addr)
526 }
527 }
528 }
529
530 pub(crate) fn remote_status(&self) -> Option<PathStatus> {
531 self.status.remote_status.map(|(_seq, status)| status)
532 }
533
534 pub(crate) fn local_status(&self) -> PathStatus {
535 self.status.local_status
536 }
537
538 pub(super) fn generation(&self) -> u64 {
539 self.generation
540 }
541}
542
543pub(super) enum OnPathResponseReceived {
544 OnPath { was_open: bool },
546 OffPath,
548 Unknown,
550 Invalid {
552 expected: SocketAddr,
554 },
555}
556
557#[cfg(feature = "qlog")]
561#[derive(Default, Clone, PartialEq, Debug)]
562#[non_exhaustive]
563struct RecoveryMetrics {
564 pub min_rtt: Option<Duration>,
565 pub smoothed_rtt: Option<Duration>,
566 pub latest_rtt: Option<Duration>,
567 pub rtt_variance: Option<Duration>,
568 pub pto_count: Option<u32>,
569 pub bytes_in_flight: Option<u64>,
570 pub packets_in_flight: Option<u64>,
571 pub congestion_window: Option<u64>,
572 pub ssthresh: Option<u64>,
573 pub pacing_rate: Option<u64>,
574}
575
576#[cfg(feature = "qlog")]
577impl RecoveryMetrics {
578 fn retain_updated(&self, previous: &Self) -> Self {
580 macro_rules! keep_if_changed {
581 ($name:ident) => {
582 if previous.$name == self.$name {
583 None
584 } else {
585 self.$name
586 }
587 };
588 }
589
590 Self {
591 min_rtt: keep_if_changed!(min_rtt),
592 smoothed_rtt: keep_if_changed!(smoothed_rtt),
593 latest_rtt: keep_if_changed!(latest_rtt),
594 rtt_variance: keep_if_changed!(rtt_variance),
595 pto_count: keep_if_changed!(pto_count),
596 bytes_in_flight: keep_if_changed!(bytes_in_flight),
597 packets_in_flight: keep_if_changed!(packets_in_flight),
598 congestion_window: keep_if_changed!(congestion_window),
599 ssthresh: keep_if_changed!(ssthresh),
600 pacing_rate: keep_if_changed!(pacing_rate),
601 }
602 }
603
604 fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
606 let updated = self.retain_updated(previous);
607
608 if updated == Self::default() {
609 return None;
610 }
611
612 Some(RecoveryMetricsUpdated {
613 min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
614 smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
615 latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
616 rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
617 pto_count: updated
618 .pto_count
619 .map(|count| count.try_into().unwrap_or(u16::MAX)),
620 bytes_in_flight: updated.bytes_in_flight,
621 packets_in_flight: updated.packets_in_flight,
622 congestion_window: updated.congestion_window,
623 ssthresh: updated.ssthresh,
624 pacing_rate: updated.pacing_rate,
625 path_id: Some(path_id.as_u32() as u64),
626 })
627 }
628}
629
630#[derive(Copy, Clone, Debug)]
632pub struct RttEstimator {
633 latest: Duration,
635 smoothed: Option<Duration>,
637 var: Duration,
639 min: Duration,
641}
642
643impl RttEstimator {
644 pub(super) fn new(initial_rtt: Duration) -> Self {
645 Self {
646 latest: initial_rtt,
647 smoothed: None,
648 var: initial_rtt / 2,
649 min: initial_rtt,
650 }
651 }
652
653 pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
666 if self.smoothed.is_none() {
667 self.latest = initial_rtt;
668 self.var = initial_rtt / 2;
669 self.min = initial_rtt;
670 }
671 }
672
673 pub fn get(&self) -> Duration {
675 self.smoothed.unwrap_or(self.latest)
676 }
677
678 pub fn conservative(&self) -> Duration {
683 self.get().max(self.latest)
684 }
685
686 pub fn min(&self) -> Duration {
688 self.min
689 }
690
691 pub(crate) fn pto_base(&self) -> Duration {
693 self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
694 }
695
696 pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
698 self.latest = rtt;
699 self.min = cmp::min(self.min, self.latest);
702 if let Some(smoothed) = self.smoothed {
704 let adjusted_rtt = if self.min + ack_delay <= self.latest {
705 self.latest - ack_delay
706 } else {
707 self.latest
708 };
709 let var_sample = smoothed.abs_diff(adjusted_rtt);
710 self.var = (3 * self.var + var_sample) / 4;
711 self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
712 } else {
713 self.smoothed = Some(self.latest);
714 self.var = self.latest / 2;
715 self.min = self.latest;
716 }
717 }
718}
719
720#[derive(Default, Debug)]
721pub(crate) struct PathResponses {
722 pending: Vec<PathResponse>,
723}
724
725impl PathResponses {
726 pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
727 const MAX_PATH_RESPONSES: usize = 16;
729 let response = PathResponse {
730 packet,
731 token,
732 remote,
733 };
734 let existing = self.pending.iter_mut().find(|x| x.remote == remote);
735 if let Some(existing) = existing {
736 if existing.packet <= packet {
738 *existing = response;
739 }
740 return;
741 }
742 if self.pending.len() < MAX_PATH_RESPONSES {
743 self.pending.push(response);
744 } else {
745 trace!("ignoring excessive PATH_CHALLENGE");
748 }
749 }
750
751 pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
752 let response = *self.pending.last()?;
753 if response.remote == remote {
754 return None;
757 }
758 self.pending.pop();
759 Some((response.token, response.remote))
760 }
761
762 pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
763 let response = *self.pending.last()?;
764 if response.remote != remote {
765 return None;
768 }
769 self.pending.pop();
770 Some(response.token)
771 }
772
773 pub(crate) fn is_empty(&self) -> bool {
774 self.pending.is_empty()
775 }
776}
777
778#[derive(Copy, Clone, Debug)]
779struct PathResponse {
780 packet: u64,
782 token: u64,
784 remote: SocketAddr,
786}
787
788#[derive(Debug)]
791pub(super) struct InFlight {
792 pub(super) bytes: u64,
797 pub(super) ack_eliciting: u64,
803}
804
805impl InFlight {
806 fn new() -> Self {
807 Self {
808 bytes: 0,
809 ack_eliciting: 0,
810 }
811 }
812
813 fn insert(&mut self, packet: &SentPacket) {
814 self.bytes += u64::from(packet.size);
815 self.ack_eliciting += u64::from(packet.ack_eliciting);
816 }
817
818 fn remove(&mut self, packet: &SentPacket) {
820 self.bytes -= u64::from(packet.size);
821 self.ack_eliciting -= u64::from(packet.ack_eliciting);
822 }
823}
824
825#[derive(Debug, Clone, Default)]
827pub(super) struct PathStatusState {
828 local_status: PathStatus,
830 local_seq: VarInt,
834 remote_status: Option<(VarInt, PathStatus)>,
836}
837
838impl PathStatusState {
839 pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
841 if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
842 return trace!(%seq, "ignoring path status update");
843 }
844
845 let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
846 if prev != Some(status) {
847 debug!(?status, ?seq, "remote changed path status");
848 }
849 }
850
851 pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
855 if self.local_status == status {
856 return None;
857 }
858
859 self.local_seq = self.local_seq.saturating_add(1u8);
860 Some(std::mem::replace(&mut self.local_status, status))
861 }
862
863 pub(crate) fn seq(&self) -> VarInt {
864 self.local_seq
865 }
866}
867
868#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
873pub enum PathStatus {
874 #[default]
879 Available,
880 Backup,
885}
886
887#[derive(Debug, Clone, PartialEq, Eq)]
889pub enum PathEvent {
890 Opened {
892 id: PathId,
894 },
895 Closed {
897 id: PathId,
899 error_code: VarInt,
903 },
904 Abandoned {
908 id: PathId,
910 path_stats: PathStats,
914 },
915 LocallyClosed {
917 id: PathId,
919 error: PathError,
921 },
922 RemoteStatus {
928 id: PathId,
930 status: PathStatus,
932 },
933 ObservedAddr {
935 id: PathId,
938 addr: SocketAddr,
940 },
941}
942
943#[derive(Debug, Error, Clone, PartialEq, Eq)]
945pub enum SetPathStatusError {
946 #[error("closed path")]
948 ClosedPath,
949 #[error("multipath not negotiated")]
951 MultipathNotNegotiated,
952}
953
954#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
956#[error("closed path")]
957pub struct ClosedPath {
958 pub(super) _private: (),
959}
960
961#[cfg(test)]
962mod tests {
963 use super::*;
964
965 #[test]
966 fn test_path_id_saturating_add() {
967 let large: PathId = u16::MAX.into();
969 let next = u32::from(u16::MAX) + 1;
970 assert_eq!(large.saturating_add(1u8), PathId::from(next));
971
972 assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
974 }
975}