1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8 PathError, PathStats,
9 mtud::MtuDiscovery,
10 pacing::Pacer,
11 spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14 ConnectionId, Duration, Instant, TIMER_GRANULARITY, TransportConfig, VarInt, coding,
15 congestion, frame::ObservedAddr, packet::SpaceId,
16};
17
18#[cfg(feature = "qlog")]
19use qlog::events::quic::RecoveryMetricsUpdated;
20
21#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
23pub struct PathId(pub(crate) u32);
24
25impl std::hash::Hash for PathId {
26 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
27 state.write_u32(self.0);
28 }
29}
30
31impl identity_hash::IdentityHashable for PathId {}
32
33impl coding::Codec for PathId {
34 fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
35 let v = VarInt::decode(r)?;
36 let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
37 Ok(Self(v))
38 }
39
40 fn encode<B: bytes::BufMut>(&self, w: &mut B) {
41 VarInt(self.0.into()).encode(w)
42 }
43}
44
45impl PathId {
46 pub const MAX: Self = Self(u32::MAX);
48
49 pub const ZERO: Self = Self(0);
51
52 pub(crate) const fn size(&self) -> usize {
54 VarInt(self.0 as u64).size()
55 }
56
57 pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
60 let rhs = rhs.into();
61 let inner = self.0.saturating_add(rhs.0);
62 Self(inner)
63 }
64
65 pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
68 let rhs = rhs.into();
69 let inner = self.0.saturating_sub(rhs.0);
70 Self(inner)
71 }
72
73 pub(crate) fn next(&self) -> Self {
75 self.saturating_add(Self(1))
76 }
77
78 pub(crate) fn as_u32(&self) -> u32 {
80 self.0
81 }
82}
83
84impl std::fmt::Display for PathId {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 self.0.fmt(f)
87 }
88}
89
90impl<T: Into<u32>> From<T> for PathId {
91 fn from(source: T) -> Self {
92 Self(source.into())
93 }
94}
95
96#[derive(Debug)]
103pub(super) struct PathState {
104 pub(super) data: PathData,
105 pub(super) prev: Option<(ConnectionId, PathData)>,
106}
107
108impl PathState {
109 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
111 for path_data in [&mut self.data]
113 .into_iter()
114 .chain(self.prev.as_mut().map(|(_, data)| data))
115 {
116 if path_data.remove_in_flight(packet) {
117 return;
118 }
119 }
120 }
121}
122
123#[derive(Debug)]
124pub(super) struct SentChallengeInfo {
125 pub(super) sent_instant: Instant,
127 pub(super) remote: SocketAddr,
129}
130
131#[derive(Debug)]
133pub(super) struct PathData {
134 pub(super) remote: SocketAddr,
135 pub(super) rtt: RttEstimator,
136 pub(super) sending_ecn: bool,
138 pub(super) congestion: Box<dyn congestion::Controller>,
140 pub(super) pacing: Pacer,
142 pub(super) challenges_sent: IntMap<u64, SentChallengeInfo>,
144 pub(super) send_new_challenge: bool,
146 pub(super) path_responses: PathResponses,
148 pub(super) validated: bool,
153 pub(super) total_sent: u64,
155 pub(super) last_sent: Option<Instant>,
157 pub(super) total_recvd: u64,
159 pub(super) mtud: MtuDiscovery,
161 pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
165 pub(super) in_flight: InFlight,
169 pub(super) observed_addr_sent: bool,
172 pub(super) last_observed_addr_report: Option<ObservedAddr>,
174 pub(super) status: PathStatusState,
176 first_packet: Option<u64>,
183 pub(super) pto_count: u32,
185
186 pub(super) idle_timeout: Option<Duration>,
194 pub(super) keep_alive: Option<Duration>,
202
203 pub(super) open: bool,
209
210 pub(super) last_allowed_receive: Option<Instant>,
217
218 #[cfg(feature = "qlog")]
220 recovery_metrics: RecoveryMetrics,
221
222 generation: u64,
224}
225
226impl PathData {
227 pub(super) fn new(
228 remote: SocketAddr,
229 allow_mtud: bool,
230 peer_max_udp_payload_size: Option<u16>,
231 generation: u64,
232 now: Instant,
233 config: &TransportConfig,
234 ) -> Self {
235 let congestion = config
236 .congestion_controller_factory
237 .clone()
238 .build(now, config.get_initial_mtu());
239 Self {
240 remote,
241 rtt: RttEstimator::new(config.initial_rtt),
242 sending_ecn: true,
243 pacing: Pacer::new(
244 config.initial_rtt,
245 congestion.initial_window(),
246 config.get_initial_mtu(),
247 now,
248 ),
249 congestion,
250 challenges_sent: Default::default(),
251 send_new_challenge: false,
252 path_responses: PathResponses::default(),
253 validated: false,
254 total_sent: 0,
255 last_sent: None,
256 total_recvd: 0,
257 mtud: config
258 .mtu_discovery_config
259 .as_ref()
260 .filter(|_| allow_mtud)
261 .map_or(
262 MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
263 |mtud_config| {
264 MtuDiscovery::new(
265 config.get_initial_mtu(),
266 config.min_mtu,
267 peer_max_udp_payload_size,
268 mtud_config.clone(),
269 )
270 },
271 ),
272 first_packet_after_rtt_sample: None,
273 in_flight: InFlight::new(),
274 observed_addr_sent: false,
275 last_observed_addr_report: None,
276 status: Default::default(),
277 first_packet: None,
278 pto_count: 0,
279 idle_timeout: None,
280 keep_alive: None,
281 open: false,
282 last_allowed_receive: None,
283 #[cfg(feature = "qlog")]
284 recovery_metrics: RecoveryMetrics::default(),
285 generation,
286 }
287 }
288
289 pub(super) fn from_previous(
293 remote: SocketAddr,
294 prev: &Self,
295 generation: u64,
296 now: Instant,
297 ) -> Self {
298 let congestion = prev.congestion.clone_box();
299 let smoothed_rtt = prev.rtt.get();
300 Self {
301 remote,
302 rtt: prev.rtt,
303 pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
304 sending_ecn: true,
305 congestion,
306 challenges_sent: Default::default(),
307 send_new_challenge: false,
308 path_responses: PathResponses::default(),
309 validated: false,
310 total_sent: 0,
311 last_sent: None,
312 total_recvd: 0,
313 mtud: prev.mtud.clone(),
314 first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
315 in_flight: InFlight::new(),
316 observed_addr_sent: false,
317 last_observed_addr_report: None,
318 status: prev.status.clone(),
319 first_packet: None,
320 pto_count: 0,
321 idle_timeout: prev.idle_timeout,
322 keep_alive: prev.keep_alive,
323 open: false,
324 last_allowed_receive: None,
325 #[cfg(feature = "qlog")]
326 recovery_metrics: prev.recovery_metrics.clone(),
327 generation,
328 }
329 }
330
331 pub(super) fn is_validating_path(&self) -> bool {
333 !self.challenges_sent.is_empty() || self.send_new_challenge
334 }
335
336 pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
339 !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
340 }
341
342 pub(super) fn current_mtu(&self) -> u16 {
344 self.mtud.current_mtu()
345 }
346
347 pub(super) fn sent(
349 &mut self,
350 now: Instant,
351 pn: u64,
352 packet: SentPacket,
353 space: &mut PacketNumberSpace,
354 ) {
355 self.last_sent.replace(now);
356 self.in_flight.insert(&packet);
357 if self.first_packet.is_none() {
358 self.first_packet = Some(pn);
359 }
360 if let Some(forgotten) = space.sent(pn, packet) {
361 self.remove_in_flight(&forgotten);
362 }
363 }
364
365 pub(crate) fn timer_offset(&self, now: Instant, duration: Duration) -> Instant {
366 let start = self.last_sent.unwrap_or(now);
367 let end = start + duration;
368 if end > start {
369 return now;
370 }
371 end
372 }
373
374 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
377 if packet.path_generation != self.generation {
378 return false;
379 }
380 self.in_flight.remove(packet);
381 true
382 }
383
384 pub(super) fn inc_total_sent(&mut self, inc: u64) {
386 self.total_sent = self.total_sent.saturating_add(inc);
387 if !self.validated {
388 trace!(
389 remote = %self.remote,
390 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
391 "anti amplification budget decreased"
392 );
393 }
394 }
395
396 pub(super) fn inc_total_recvd(&mut self, inc: u64) {
398 self.total_recvd = self.total_recvd.saturating_add(inc);
399 if !self.validated {
400 trace!(
401 remote = %self.remote,
402 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
403 "anti amplification budget increased"
404 );
405 }
406 }
407
408 pub(super) fn earliest_expiring_challenge(&self) -> Option<Instant> {
410 if self.challenges_sent.is_empty() {
411 return None;
412 }
413 let pto = self.rtt.pto_base();
414 self.challenges_sent
415 .values()
416 .map(|info| info.sent_instant)
417 .min()
418 .map(|sent_instant| sent_instant + pto)
419 }
420
421 pub(super) fn on_path_response_received(
423 &mut self,
424 now: Instant,
425 token: u64,
426 remote: SocketAddr,
427 ) -> OnPathResponseReceived {
428 match self.challenges_sent.get(&token) {
429 Some(info) if info.remote == remote && self.remote == remote => {
431 let sent_instant = info.sent_instant;
432 if !std::mem::replace(&mut self.validated, true) {
433 trace!("new path validated");
434 }
435 self.challenges_sent
437 .retain(|_token, info| info.remote != remote);
438
439 self.send_new_challenge = false;
440
441 let rtt = now.saturating_duration_since(sent_instant);
444 self.rtt.reset_initial_rtt(rtt);
445
446 let was_open = std::mem::replace(&mut self.open, true);
447 OnPathResponseReceived::OnPath { was_open }
448 }
449 Some(info) if info.remote == remote => {
451 self.challenges_sent
452 .retain(|_token, info| info.remote != remote);
453 OnPathResponseReceived::OffPath
454 }
455 Some(info) => OnPathResponseReceived::Invalid {
457 expected: info.remote,
458 },
459 None => OnPathResponseReceived::Unknown,
461 }
462 }
463
464 #[cfg(feature = "qlog")]
465 pub(super) fn qlog_recovery_metrics(
466 &mut self,
467 path_id: PathId,
468 ) -> Option<RecoveryMetricsUpdated> {
469 let controller_metrics = self.congestion.metrics();
470
471 let metrics = RecoveryMetrics {
472 min_rtt: Some(self.rtt.min),
473 smoothed_rtt: Some(self.rtt.get()),
474 latest_rtt: Some(self.rtt.latest),
475 rtt_variance: Some(self.rtt.var),
476 pto_count: Some(self.pto_count),
477 bytes_in_flight: Some(self.in_flight.bytes),
478 packets_in_flight: Some(self.in_flight.ack_eliciting),
479
480 congestion_window: Some(controller_metrics.congestion_window),
481 ssthresh: controller_metrics.ssthresh,
482 pacing_rate: controller_metrics.pacing_rate,
483 };
484
485 let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
486 self.recovery_metrics = metrics;
487 event
488 }
489
490 pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Instant> {
494 let smoothed_rtt = self.rtt.get();
495 self.pacing.delay(
496 smoothed_rtt,
497 bytes_to_send,
498 self.current_mtu(),
499 self.congestion.window(),
500 now,
501 )
502 }
503
504 #[must_use = "updated observed address must be reported to the application"]
508 pub(super) fn update_observed_addr_report(
509 &mut self,
510 observed: ObservedAddr,
511 ) -> Option<SocketAddr> {
512 match self.last_observed_addr_report.as_mut() {
513 Some(prev) => {
514 if prev.seq_no >= observed.seq_no {
515 None
517 } else if prev.ip == observed.ip && prev.port == observed.port {
518 prev.seq_no = observed.seq_no;
520 None
521 } else {
522 let addr = observed.socket_addr();
523 self.last_observed_addr_report = Some(observed);
524 Some(addr)
525 }
526 }
527 None => {
528 let addr = observed.socket_addr();
529 self.last_observed_addr_report = Some(observed);
530 Some(addr)
531 }
532 }
533 }
534
535 pub(crate) fn remote_status(&self) -> Option<PathStatus> {
536 self.status.remote_status.map(|(_seq, status)| status)
537 }
538
539 pub(crate) fn local_status(&self) -> PathStatus {
540 self.status.local_status
541 }
542
543 pub(super) fn generation(&self) -> u64 {
544 self.generation
545 }
546}
547
548pub(super) enum OnPathResponseReceived {
549 OnPath { was_open: bool },
551 OffPath,
553 Unknown,
555 Invalid {
557 expected: SocketAddr,
559 },
560}
561
562#[cfg(feature = "qlog")]
566#[derive(Default, Clone, PartialEq, Debug)]
567#[non_exhaustive]
568struct RecoveryMetrics {
569 pub min_rtt: Option<Duration>,
570 pub smoothed_rtt: Option<Duration>,
571 pub latest_rtt: Option<Duration>,
572 pub rtt_variance: Option<Duration>,
573 pub pto_count: Option<u32>,
574 pub bytes_in_flight: Option<u64>,
575 pub packets_in_flight: Option<u64>,
576 pub congestion_window: Option<u64>,
577 pub ssthresh: Option<u64>,
578 pub pacing_rate: Option<u64>,
579}
580
581#[cfg(feature = "qlog")]
582impl RecoveryMetrics {
583 fn retain_updated(&self, previous: &Self) -> Self {
585 macro_rules! keep_if_changed {
586 ($name:ident) => {
587 if previous.$name == self.$name {
588 None
589 } else {
590 self.$name
591 }
592 };
593 }
594
595 Self {
596 min_rtt: keep_if_changed!(min_rtt),
597 smoothed_rtt: keep_if_changed!(smoothed_rtt),
598 latest_rtt: keep_if_changed!(latest_rtt),
599 rtt_variance: keep_if_changed!(rtt_variance),
600 pto_count: keep_if_changed!(pto_count),
601 bytes_in_flight: keep_if_changed!(bytes_in_flight),
602 packets_in_flight: keep_if_changed!(packets_in_flight),
603 congestion_window: keep_if_changed!(congestion_window),
604 ssthresh: keep_if_changed!(ssthresh),
605 pacing_rate: keep_if_changed!(pacing_rate),
606 }
607 }
608
609 fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
611 let updated = self.retain_updated(previous);
612
613 if updated == Self::default() {
614 return None;
615 }
616
617 Some(RecoveryMetricsUpdated {
618 min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
619 smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
620 latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
621 rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
622 pto_count: updated
623 .pto_count
624 .map(|count| count.try_into().unwrap_or(u16::MAX)),
625 bytes_in_flight: updated.bytes_in_flight,
626 packets_in_flight: updated.packets_in_flight,
627 congestion_window: updated.congestion_window,
628 ssthresh: updated.ssthresh,
629 pacing_rate: updated.pacing_rate,
630 path_id: Some(path_id.as_u32() as u64),
631 })
632 }
633}
634
635#[derive(Copy, Clone, Debug)]
637pub struct RttEstimator {
638 latest: Duration,
640 smoothed: Option<Duration>,
642 var: Duration,
644 min: Duration,
646}
647
648impl RttEstimator {
649 pub(super) fn new(initial_rtt: Duration) -> Self {
650 Self {
651 latest: initial_rtt,
652 smoothed: None,
653 var: initial_rtt / 2,
654 min: initial_rtt,
655 }
656 }
657
658 pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
671 if self.smoothed.is_none() {
672 self.latest = initial_rtt;
673 self.var = initial_rtt / 2;
674 self.min = initial_rtt;
675 }
676 }
677
678 pub fn get(&self) -> Duration {
680 self.smoothed.unwrap_or(self.latest)
681 }
682
683 pub fn conservative(&self) -> Duration {
688 self.get().max(self.latest)
689 }
690
691 pub fn min(&self) -> Duration {
693 self.min
694 }
695
696 pub(crate) fn pto_base(&self) -> Duration {
698 self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
699 }
700
701 pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
703 self.latest = rtt;
704 self.min = cmp::min(self.min, self.latest);
707 if let Some(smoothed) = self.smoothed {
709 let adjusted_rtt = if self.min + ack_delay <= self.latest {
710 self.latest - ack_delay
711 } else {
712 self.latest
713 };
714 let var_sample = smoothed.abs_diff(adjusted_rtt);
715 self.var = (3 * self.var + var_sample) / 4;
716 self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
717 } else {
718 self.smoothed = Some(self.latest);
719 self.var = self.latest / 2;
720 self.min = self.latest;
721 }
722 }
723}
724
725#[derive(Default, Debug)]
726pub(crate) struct PathResponses {
727 pending: Vec<PathResponse>,
728}
729
730impl PathResponses {
731 pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
732 const MAX_PATH_RESPONSES: usize = 16;
734 let response = PathResponse {
735 packet,
736 token,
737 remote,
738 };
739 let existing = self.pending.iter_mut().find(|x| x.remote == remote);
740 if let Some(existing) = existing {
741 if existing.packet <= packet {
743 *existing = response;
744 }
745 return;
746 }
747 if self.pending.len() < MAX_PATH_RESPONSES {
748 self.pending.push(response);
749 } else {
750 trace!("ignoring excessive PATH_CHALLENGE");
753 }
754 }
755
756 pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
757 let response = *self.pending.last()?;
758 if response.remote == remote {
759 return None;
762 }
763 self.pending.pop();
764 Some((response.token, response.remote))
765 }
766
767 pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
768 let response = *self.pending.last()?;
769 if response.remote != remote {
770 return None;
773 }
774 self.pending.pop();
775 Some(response.token)
776 }
777
778 pub(crate) fn is_empty(&self) -> bool {
779 self.pending.is_empty()
780 }
781}
782
783#[derive(Copy, Clone, Debug)]
784struct PathResponse {
785 packet: u64,
787 token: u64,
789 remote: SocketAddr,
791}
792
793#[derive(Debug)]
796pub(super) struct InFlight {
797 pub(super) bytes: u64,
802 pub(super) ack_eliciting: u64,
808}
809
810impl InFlight {
811 fn new() -> Self {
812 Self {
813 bytes: 0,
814 ack_eliciting: 0,
815 }
816 }
817
818 fn insert(&mut self, packet: &SentPacket) {
819 self.bytes += u64::from(packet.size);
820 self.ack_eliciting += u64::from(packet.ack_eliciting);
821 }
822
823 fn remove(&mut self, packet: &SentPacket) {
825 self.bytes -= u64::from(packet.size);
826 self.ack_eliciting -= u64::from(packet.ack_eliciting);
827 }
828}
829
830#[derive(Debug, Clone, Default)]
832pub(super) struct PathStatusState {
833 local_status: PathStatus,
835 local_seq: VarInt,
839 remote_status: Option<(VarInt, PathStatus)>,
841}
842
843impl PathStatusState {
844 pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
846 if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
847 return trace!(%seq, "ignoring path status update");
848 }
849
850 let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
851 if prev != Some(status) {
852 debug!(?status, ?seq, "remote changed path status");
853 }
854 }
855
856 pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
860 if self.local_status == status {
861 return None;
862 }
863
864 self.local_seq = self.local_seq.saturating_add(1u8);
865 Some(std::mem::replace(&mut self.local_status, status))
866 }
867
868 pub(crate) fn seq(&self) -> VarInt {
869 self.local_seq
870 }
871}
872
873#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
878pub enum PathStatus {
879 #[default]
884 Available,
885 Backup,
890}
891
892#[derive(Debug, Clone, PartialEq, Eq)]
894pub enum PathEvent {
895 Opened {
897 id: PathId,
899 },
900 Closed {
902 id: PathId,
904 error_code: VarInt,
908 },
909 Abandoned {
913 id: PathId,
915 path_stats: PathStats,
919 },
920 LocallyClosed {
922 id: PathId,
924 error: PathError,
926 },
927 RemoteStatus {
933 id: PathId,
935 status: PathStatus,
937 },
938 ObservedAddr {
940 id: PathId,
943 addr: SocketAddr,
945 },
946}
947
948#[derive(Debug, Error, Clone, PartialEq, Eq)]
950pub enum SetPathStatusError {
951 #[error("closed path")]
953 ClosedPath,
954 #[error("multipath not negotiated")]
956 MultipathNotNegotiated,
957}
958
959#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
961#[error("closed path")]
962pub struct ClosedPath {
963 pub(super) _private: (),
964}
965
966#[cfg(test)]
967mod tests {
968 use super::*;
969
970 #[test]
971 fn test_path_id_saturating_add() {
972 let large: PathId = u16::MAX.into();
974 let next = u32::from(u16::MAX) + 1;
975 assert_eq!(large.saturating_add(1u8), PathId::from(next));
976
977 assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
979 }
980}