1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8 PathError, PathStats,
9 mtud::MtuDiscovery,
10 pacing::Pacer,
11 spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14 ConnectionId, Duration, Instant, TIMER_GRANULARITY, TransportConfig, VarInt, coding,
15 congestion, frame::ObservedAddr, packet::SpaceId,
16};
17
18#[cfg(feature = "qlog")]
19use qlog::events::quic::RecoveryMetricsUpdated;
20
21#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
23pub struct PathId(pub(crate) u32);
24
25impl std::hash::Hash for PathId {
26 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
27 state.write_u32(self.0);
28 }
29}
30
31impl identity_hash::IdentityHashable for PathId {}
32
33impl coding::Codec for PathId {
34 fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
35 let v = VarInt::decode(r)?;
36 let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
37 Ok(Self(v))
38 }
39
40 fn encode<B: bytes::BufMut>(&self, w: &mut B) {
41 VarInt(self.0.into()).encode(w)
42 }
43}
44
45impl PathId {
46 pub const MAX: Self = Self(u32::MAX);
48
49 pub const ZERO: Self = Self(0);
51
52 pub(crate) const fn size(&self) -> usize {
54 VarInt(self.0 as u64).size()
55 }
56
57 pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
60 let rhs = rhs.into();
61 let inner = self.0.saturating_add(rhs.0);
62 Self(inner)
63 }
64
65 pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
68 let rhs = rhs.into();
69 let inner = self.0.saturating_sub(rhs.0);
70 Self(inner)
71 }
72
73 pub(crate) fn next(&self) -> Self {
75 self.saturating_add(Self(1))
76 }
77
78 pub(crate) fn as_u32(&self) -> u32 {
80 self.0
81 }
82}
83
84impl std::fmt::Display for PathId {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 self.0.fmt(f)
87 }
88}
89
90impl<T: Into<u32>> From<T> for PathId {
91 fn from(source: T) -> Self {
92 Self(source.into())
93 }
94}
95
96#[derive(Debug)]
103pub(super) struct PathState {
104 pub(super) data: PathData,
105 pub(super) prev: Option<(ConnectionId, PathData)>,
106}
107
108impl PathState {
109 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
111 for path_data in [&mut self.data]
113 .into_iter()
114 .chain(self.prev.as_mut().map(|(_, data)| data))
115 {
116 if path_data.remove_in_flight(packet) {
117 return;
118 }
119 }
120 }
121}
122
123#[derive(Debug)]
124pub(super) struct SentChallengeInfo {
125 pub(super) sent_instant: Instant,
127 pub(super) remote: SocketAddr,
129}
130
131#[derive(Debug)]
133pub(super) struct PathData {
134 pub(super) remote: SocketAddr,
135 pub(super) rtt: RttEstimator,
136 pub(super) sending_ecn: bool,
138 pub(super) congestion: Box<dyn congestion::Controller>,
140 pub(super) pacing: Pacer,
142 pub(super) challenges_sent: IntMap<u64, SentChallengeInfo>,
144 pub(super) send_new_challenge: bool,
148 pub(super) path_responses: PathResponses,
150 pub(super) validated: bool,
155 pub(super) total_sent: u64,
157 pub(super) total_recvd: u64,
159 pub(super) mtud: MtuDiscovery,
161 pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
165 pub(super) in_flight: InFlight,
169 pub(super) observed_addr_sent: bool,
172 pub(super) last_observed_addr_report: Option<ObservedAddr>,
174 pub(super) status: PathStatusState,
176 first_packet: Option<u64>,
183 pub(super) pto_count: u32,
185
186 pub(super) idle_timeout: Option<Duration>,
194 pub(super) keep_alive: Option<Duration>,
202
203 pub(super) open: bool,
209
210 pub(super) last_allowed_receive: Option<Instant>,
217
218 #[cfg(feature = "qlog")]
220 recovery_metrics: RecoveryMetrics,
221
222 generation: u64,
224}
225
226impl PathData {
227 pub(super) fn new(
228 remote: SocketAddr,
229 allow_mtud: bool,
230 peer_max_udp_payload_size: Option<u16>,
231 generation: u64,
232 now: Instant,
233 config: &TransportConfig,
234 ) -> Self {
235 let congestion = config
236 .congestion_controller_factory
237 .clone()
238 .build(now, config.get_initial_mtu());
239 Self {
240 remote,
241 rtt: RttEstimator::new(config.initial_rtt),
242 sending_ecn: true,
243 pacing: Pacer::new(
244 config.initial_rtt,
245 congestion.initial_window(),
246 config.get_initial_mtu(),
247 now,
248 ),
249 congestion,
250 challenges_sent: Default::default(),
251 send_new_challenge: false,
252 path_responses: PathResponses::default(),
253 validated: false,
254 total_sent: 0,
255 total_recvd: 0,
256 mtud: config
257 .mtu_discovery_config
258 .as_ref()
259 .filter(|_| allow_mtud)
260 .map_or(
261 MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
262 |mtud_config| {
263 MtuDiscovery::new(
264 config.get_initial_mtu(),
265 config.min_mtu,
266 peer_max_udp_payload_size,
267 mtud_config.clone(),
268 )
269 },
270 ),
271 first_packet_after_rtt_sample: None,
272 in_flight: InFlight::new(),
273 observed_addr_sent: false,
274 last_observed_addr_report: None,
275 status: Default::default(),
276 first_packet: None,
277 pto_count: 0,
278 idle_timeout: config.default_path_max_idle_timeout,
279 keep_alive: config.default_path_keep_alive_interval,
280 open: false,
281 last_allowed_receive: None,
282 #[cfg(feature = "qlog")]
283 recovery_metrics: RecoveryMetrics::default(),
284 generation,
285 }
286 }
287
288 pub(super) fn from_previous(
292 remote: SocketAddr,
293 prev: &Self,
294 generation: u64,
295 now: Instant,
296 ) -> Self {
297 let congestion = prev.congestion.clone_box();
298 let smoothed_rtt = prev.rtt.get();
299 Self {
300 remote,
301 rtt: prev.rtt,
302 pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
303 sending_ecn: true,
304 congestion,
305 challenges_sent: Default::default(),
306 send_new_challenge: false,
307 path_responses: PathResponses::default(),
308 validated: false,
309 total_sent: 0,
310 total_recvd: 0,
311 mtud: prev.mtud.clone(),
312 first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
313 in_flight: InFlight::new(),
314 observed_addr_sent: false,
315 last_observed_addr_report: None,
316 status: prev.status.clone(),
317 first_packet: None,
318 pto_count: 0,
319 idle_timeout: prev.idle_timeout,
320 keep_alive: prev.keep_alive,
321 open: false,
322 last_allowed_receive: None,
323 #[cfg(feature = "qlog")]
324 recovery_metrics: prev.recovery_metrics.clone(),
325 generation,
326 }
327 }
328
329 pub(super) fn is_validating_path(&self) -> bool {
331 !self.challenges_sent.is_empty() || self.send_new_challenge
332 }
333
334 pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
337 !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
338 }
339
340 pub(super) fn current_mtu(&self) -> u16 {
342 self.mtud.current_mtu()
343 }
344
345 pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
347 self.in_flight.insert(&packet);
348 if self.first_packet.is_none() {
349 self.first_packet = Some(pn);
350 }
351 if let Some(forgotten) = space.sent(pn, packet) {
352 self.remove_in_flight(&forgotten);
353 }
354 }
355
356 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
359 if packet.path_generation != self.generation {
360 return false;
361 }
362 self.in_flight.remove(packet);
363 true
364 }
365
366 pub(super) fn inc_total_sent(&mut self, inc: u64) {
368 self.total_sent = self.total_sent.saturating_add(inc);
369 if !self.validated {
370 trace!(
371 remote = %self.remote,
372 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
373 "anti amplification budget decreased"
374 );
375 }
376 }
377
378 pub(super) fn inc_total_recvd(&mut self, inc: u64) {
380 self.total_recvd = self.total_recvd.saturating_add(inc);
381 if !self.validated {
382 trace!(
383 remote = %self.remote,
384 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
385 "anti amplification budget increased"
386 );
387 }
388 }
389
390 pub(super) fn earliest_expiring_challenge(&self) -> Option<Instant> {
392 if self.challenges_sent.is_empty() {
393 return None;
394 }
395 let pto = self.rtt.pto_base();
396 self.challenges_sent
397 .values()
398 .map(|info| info.sent_instant)
399 .min()
400 .map(|sent_instant| sent_instant + pto)
401 }
402
403 pub(super) fn on_path_response_received(
405 &mut self,
406 now: Instant,
407 token: u64,
408 remote: SocketAddr,
409 ) -> OnPathResponseReceived {
410 match self.challenges_sent.get(&token) {
411 Some(info) if info.remote == remote && self.remote == remote => {
413 let sent_instant = info.sent_instant;
414 if !std::mem::replace(&mut self.validated, true) {
415 trace!("new path validated");
416 }
417 self.challenges_sent
419 .retain(|_token, info| info.remote != remote);
420
421 self.send_new_challenge = false;
422
423 let rtt = now.saturating_duration_since(sent_instant);
426 self.rtt.reset_initial_rtt(rtt);
427
428 let was_open = std::mem::replace(&mut self.open, true);
429 OnPathResponseReceived::OnPath { was_open }
430 }
431 Some(info) if info.remote == remote => {
433 self.challenges_sent
434 .retain(|_token, info| info.remote != remote);
435 OnPathResponseReceived::OffPath
436 }
437 Some(info) => OnPathResponseReceived::Invalid {
439 expected: info.remote,
440 },
441 None => OnPathResponseReceived::Unknown,
443 }
444 }
445
446 #[cfg(feature = "qlog")]
447 pub(super) fn qlog_recovery_metrics(
448 &mut self,
449 path_id: PathId,
450 ) -> Option<RecoveryMetricsUpdated> {
451 let controller_metrics = self.congestion.metrics();
452
453 let metrics = RecoveryMetrics {
454 min_rtt: Some(self.rtt.min),
455 smoothed_rtt: Some(self.rtt.get()),
456 latest_rtt: Some(self.rtt.latest),
457 rtt_variance: Some(self.rtt.var),
458 pto_count: Some(self.pto_count),
459 bytes_in_flight: Some(self.in_flight.bytes),
460 packets_in_flight: Some(self.in_flight.ack_eliciting),
461
462 congestion_window: Some(controller_metrics.congestion_window),
463 ssthresh: controller_metrics.ssthresh,
464 pacing_rate: controller_metrics.pacing_rate,
465 };
466
467 let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
468 self.recovery_metrics = metrics;
469 event
470 }
471
472 pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Instant> {
476 let smoothed_rtt = self.rtt.get();
477 self.pacing.delay(
478 smoothed_rtt,
479 bytes_to_send,
480 self.current_mtu(),
481 self.congestion.window(),
482 now,
483 )
484 }
485
486 #[must_use = "updated observed address must be reported to the application"]
490 pub(super) fn update_observed_addr_report(
491 &mut self,
492 observed: ObservedAddr,
493 ) -> Option<SocketAddr> {
494 match self.last_observed_addr_report.as_mut() {
495 Some(prev) => {
496 if prev.seq_no >= observed.seq_no {
497 None
499 } else if prev.ip == observed.ip && prev.port == observed.port {
500 prev.seq_no = observed.seq_no;
502 None
503 } else {
504 let addr = observed.socket_addr();
505 self.last_observed_addr_report = Some(observed);
506 Some(addr)
507 }
508 }
509 None => {
510 let addr = observed.socket_addr();
511 self.last_observed_addr_report = Some(observed);
512 Some(addr)
513 }
514 }
515 }
516
517 pub(crate) fn remote_status(&self) -> Option<PathStatus> {
518 self.status.remote_status.map(|(_seq, status)| status)
519 }
520
521 pub(crate) fn local_status(&self) -> PathStatus {
522 self.status.local_status
523 }
524
525 pub(super) fn generation(&self) -> u64 {
526 self.generation
527 }
528}
529
530pub(super) enum OnPathResponseReceived {
531 OnPath { was_open: bool },
533 OffPath,
535 Unknown,
537 Invalid {
539 expected: SocketAddr,
541 },
542}
543
544#[cfg(feature = "qlog")]
548#[derive(Default, Clone, PartialEq, Debug)]
549#[non_exhaustive]
550struct RecoveryMetrics {
551 pub min_rtt: Option<Duration>,
552 pub smoothed_rtt: Option<Duration>,
553 pub latest_rtt: Option<Duration>,
554 pub rtt_variance: Option<Duration>,
555 pub pto_count: Option<u32>,
556 pub bytes_in_flight: Option<u64>,
557 pub packets_in_flight: Option<u64>,
558 pub congestion_window: Option<u64>,
559 pub ssthresh: Option<u64>,
560 pub pacing_rate: Option<u64>,
561}
562
563#[cfg(feature = "qlog")]
564impl RecoveryMetrics {
565 fn retain_updated(&self, previous: &Self) -> Self {
567 macro_rules! keep_if_changed {
568 ($name:ident) => {
569 if previous.$name == self.$name {
570 None
571 } else {
572 self.$name
573 }
574 };
575 }
576
577 Self {
578 min_rtt: keep_if_changed!(min_rtt),
579 smoothed_rtt: keep_if_changed!(smoothed_rtt),
580 latest_rtt: keep_if_changed!(latest_rtt),
581 rtt_variance: keep_if_changed!(rtt_variance),
582 pto_count: keep_if_changed!(pto_count),
583 bytes_in_flight: keep_if_changed!(bytes_in_flight),
584 packets_in_flight: keep_if_changed!(packets_in_flight),
585 congestion_window: keep_if_changed!(congestion_window),
586 ssthresh: keep_if_changed!(ssthresh),
587 pacing_rate: keep_if_changed!(pacing_rate),
588 }
589 }
590
591 fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
593 let updated = self.retain_updated(previous);
594
595 if updated == Self::default() {
596 return None;
597 }
598
599 Some(RecoveryMetricsUpdated {
600 min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
601 smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
602 latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
603 rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
604 pto_count: updated
605 .pto_count
606 .map(|count| count.try_into().unwrap_or(u16::MAX)),
607 bytes_in_flight: updated.bytes_in_flight,
608 packets_in_flight: updated.packets_in_flight,
609 congestion_window: updated.congestion_window,
610 ssthresh: updated.ssthresh,
611 pacing_rate: updated.pacing_rate,
612 path_id: Some(path_id.as_u32() as u64),
613 })
614 }
615}
616
617#[derive(Copy, Clone, Debug)]
619pub struct RttEstimator {
620 latest: Duration,
622 smoothed: Option<Duration>,
624 var: Duration,
626 min: Duration,
628}
629
630impl RttEstimator {
631 pub(super) fn new(initial_rtt: Duration) -> Self {
632 Self {
633 latest: initial_rtt,
634 smoothed: None,
635 var: initial_rtt / 2,
636 min: initial_rtt,
637 }
638 }
639
640 pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
653 if self.smoothed.is_none() {
654 self.latest = initial_rtt;
655 self.var = initial_rtt / 2;
656 self.min = initial_rtt;
657 }
658 }
659
660 pub fn get(&self) -> Duration {
662 self.smoothed.unwrap_or(self.latest)
663 }
664
665 pub fn conservative(&self) -> Duration {
670 self.get().max(self.latest)
671 }
672
673 pub fn min(&self) -> Duration {
675 self.min
676 }
677
678 pub(crate) fn pto_base(&self) -> Duration {
680 self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
681 }
682
683 pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
685 self.latest = rtt;
686 self.min = cmp::min(self.min, self.latest);
689 if let Some(smoothed) = self.smoothed {
691 let adjusted_rtt = if self.min + ack_delay <= self.latest {
692 self.latest - ack_delay
693 } else {
694 self.latest
695 };
696 let var_sample = smoothed.abs_diff(adjusted_rtt);
697 self.var = (3 * self.var + var_sample) / 4;
698 self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
699 } else {
700 self.smoothed = Some(self.latest);
701 self.var = self.latest / 2;
702 self.min = self.latest;
703 }
704 }
705}
706
707#[derive(Default, Debug)]
708pub(crate) struct PathResponses {
709 pending: Vec<PathResponse>,
710}
711
712impl PathResponses {
713 pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
714 const MAX_PATH_RESPONSES: usize = 16;
716 let response = PathResponse {
717 packet,
718 token,
719 remote,
720 };
721 let existing = self.pending.iter_mut().find(|x| x.remote == remote);
722 if let Some(existing) = existing {
723 if existing.packet <= packet {
725 *existing = response;
726 }
727 return;
728 }
729 if self.pending.len() < MAX_PATH_RESPONSES {
730 self.pending.push(response);
731 } else {
732 trace!("ignoring excessive PATH_CHALLENGE");
735 }
736 }
737
738 pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
739 let response = *self.pending.last()?;
740 if response.remote == remote {
741 return None;
744 }
745 self.pending.pop();
746 Some((response.token, response.remote))
747 }
748
749 pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
750 let response = *self.pending.last()?;
751 if response.remote != remote {
752 return None;
755 }
756 self.pending.pop();
757 Some(response.token)
758 }
759
760 pub(crate) fn is_empty(&self) -> bool {
761 self.pending.is_empty()
762 }
763}
764
765#[derive(Copy, Clone, Debug)]
766struct PathResponse {
767 packet: u64,
769 token: u64,
771 remote: SocketAddr,
773}
774
775#[derive(Debug)]
778pub(super) struct InFlight {
779 pub(super) bytes: u64,
784 pub(super) ack_eliciting: u64,
790}
791
792impl InFlight {
793 fn new() -> Self {
794 Self {
795 bytes: 0,
796 ack_eliciting: 0,
797 }
798 }
799
800 fn insert(&mut self, packet: &SentPacket) {
801 self.bytes += u64::from(packet.size);
802 self.ack_eliciting += u64::from(packet.ack_eliciting);
803 }
804
805 fn remove(&mut self, packet: &SentPacket) {
807 self.bytes -= u64::from(packet.size);
808 self.ack_eliciting -= u64::from(packet.ack_eliciting);
809 }
810}
811
812#[derive(Debug, Clone, Default)]
814pub(super) struct PathStatusState {
815 local_status: PathStatus,
817 local_seq: VarInt,
821 remote_status: Option<(VarInt, PathStatus)>,
823}
824
825impl PathStatusState {
826 pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
828 if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
829 return trace!(%seq, "ignoring path status update");
830 }
831
832 let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
833 if prev != Some(status) {
834 debug!(?status, ?seq, "remote changed path status");
835 }
836 }
837
838 pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
842 if self.local_status == status {
843 return None;
844 }
845
846 self.local_seq = self.local_seq.saturating_add(1u8);
847 Some(std::mem::replace(&mut self.local_status, status))
848 }
849
850 pub(crate) fn seq(&self) -> VarInt {
851 self.local_seq
852 }
853}
854
855#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
860pub enum PathStatus {
861 #[default]
866 Available,
867 Backup,
872}
873
874#[derive(Debug, Clone, PartialEq, Eq)]
876pub enum PathEvent {
877 Opened {
879 id: PathId,
881 },
882 Closed {
884 id: PathId,
886 error_code: VarInt,
890 },
891 Abandoned {
895 id: PathId,
897 path_stats: PathStats,
901 },
902 LocallyClosed {
904 id: PathId,
906 error: PathError,
908 },
909 RemoteStatus {
915 id: PathId,
917 status: PathStatus,
919 },
920 ObservedAddr {
922 id: PathId,
925 addr: SocketAddr,
927 },
928}
929
930#[derive(Debug, Error, Clone, PartialEq, Eq)]
932pub enum SetPathStatusError {
933 #[error("closed path")]
935 ClosedPath,
936 #[error("multipath not negotiated")]
938 MultipathNotNegotiated,
939}
940
941#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
943#[error("closed path")]
944pub struct ClosedPath {
945 pub(super) _private: (),
946}
947
948#[cfg(test)]
949mod tests {
950 use super::*;
951
952 #[test]
953 fn test_path_id_saturating_add() {
954 let large: PathId = u16::MAX.into();
956 let next = u32::from(u16::MAX) + 1;
957 assert_eq!(large.saturating_add(1u8), PathId::from(next));
958
959 assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
961 }
962}