1use std::{cmp, net::SocketAddr};
2
3use identity_hash::IntMap;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::{
8 PathError, PathStats,
9 mtud::MtuDiscovery,
10 pacing::Pacer,
11 spaces::{PacketNumberSpace, SentPacket},
12};
13use crate::{
14 ConnectionId, Duration, Instant, TIMER_GRANULARITY, TransportConfig, VarInt, coding,
15 congestion, frame::ObservedAddr, packet::SpaceId,
16};
17
18#[cfg(feature = "qlog")]
19use qlog::events::quic::RecoveryMetricsUpdated;
20
21#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
23pub struct PathId(pub(crate) u32);
24
25impl std::hash::Hash for PathId {
26 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
27 state.write_u32(self.0);
28 }
29}
30
31impl identity_hash::IdentityHashable for PathId {}
32
33impl coding::Codec for PathId {
34 fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
35 let v = VarInt::decode(r)?;
36 let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
37 Ok(Self(v))
38 }
39
40 fn encode<B: bytes::BufMut>(&self, w: &mut B) {
41 VarInt(self.0.into()).encode(w)
42 }
43}
44
45impl PathId {
46 pub const MAX: Self = Self(u32::MAX);
48
49 pub const ZERO: Self = Self(0);
51
52 pub(crate) const fn size(&self) -> usize {
54 VarInt(self.0 as u64).size()
55 }
56
57 pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
60 let rhs = rhs.into();
61 let inner = self.0.saturating_add(rhs.0);
62 Self(inner)
63 }
64
65 pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
68 let rhs = rhs.into();
69 let inner = self.0.saturating_sub(rhs.0);
70 Self(inner)
71 }
72
73 pub(crate) fn next(&self) -> Self {
75 self.saturating_add(Self(1))
76 }
77
78 pub(crate) fn as_u32(&self) -> u32 {
80 self.0
81 }
82}
83
84impl std::fmt::Display for PathId {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 self.0.fmt(f)
87 }
88}
89
90impl<T: Into<u32>> From<T> for PathId {
91 fn from(source: T) -> Self {
92 Self(source.into())
93 }
94}
95
96#[derive(Debug)]
103pub(super) struct PathState {
104 pub(super) data: PathData,
105 pub(super) prev: Option<(ConnectionId, PathData)>,
106}
107
108impl PathState {
109 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
111 for path_data in [&mut self.data]
113 .into_iter()
114 .chain(self.prev.as_mut().map(|(_, data)| data))
115 {
116 if path_data.remove_in_flight(packet) {
117 return;
118 }
119 }
120 }
121}
122
123#[derive(Debug)]
124pub(super) struct SentChallengeInfo {
125 pub(super) sent_instant: Instant,
127 pub(super) remote: SocketAddr,
129}
130
131#[derive(Debug)]
133pub(super) struct PathData {
134 pub(super) remote: SocketAddr,
135 pub(super) rtt: RttEstimator,
136 pub(super) sending_ecn: bool,
138 pub(super) congestion: Box<dyn congestion::Controller>,
140 pub(super) pacing: Pacer,
142 pub(super) challenges_sent: IntMap<u64, SentChallengeInfo>,
144 pub(super) send_new_challenge: bool,
146 pub(super) path_responses: PathResponses,
148 pub(super) validated: bool,
153 pub(super) total_sent: u64,
155 pub(super) total_recvd: u64,
157 pub(super) mtud: MtuDiscovery,
159 pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
163 pub(super) in_flight: InFlight,
167 pub(super) observed_addr_sent: bool,
170 pub(super) last_observed_addr_report: Option<ObservedAddr>,
172 pub(super) status: PathStatusState,
174 first_packet: Option<u64>,
181 pub(super) pto_count: u32,
183
184 pub(super) idle_timeout: Option<Duration>,
192 pub(super) keep_alive: Option<Duration>,
200
201 pub(super) open: bool,
207
208 pub(super) last_allowed_receive: Option<Instant>,
215
216 #[cfg(feature = "qlog")]
218 recovery_metrics: RecoveryMetrics,
219
220 generation: u64,
222}
223
224impl PathData {
225 pub(super) fn new(
226 remote: SocketAddr,
227 allow_mtud: bool,
228 peer_max_udp_payload_size: Option<u16>,
229 generation: u64,
230 now: Instant,
231 config: &TransportConfig,
232 ) -> Self {
233 let congestion = config
234 .congestion_controller_factory
235 .clone()
236 .build(now, config.get_initial_mtu());
237 Self {
238 remote,
239 rtt: RttEstimator::new(config.initial_rtt),
240 sending_ecn: true,
241 pacing: Pacer::new(
242 config.initial_rtt,
243 congestion.initial_window(),
244 config.get_initial_mtu(),
245 now,
246 ),
247 congestion,
248 challenges_sent: Default::default(),
249 send_new_challenge: false,
250 path_responses: PathResponses::default(),
251 validated: false,
252 total_sent: 0,
253 total_recvd: 0,
254 mtud: config
255 .mtu_discovery_config
256 .as_ref()
257 .filter(|_| allow_mtud)
258 .map_or(
259 MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
260 |mtud_config| {
261 MtuDiscovery::new(
262 config.get_initial_mtu(),
263 config.min_mtu,
264 peer_max_udp_payload_size,
265 mtud_config.clone(),
266 )
267 },
268 ),
269 first_packet_after_rtt_sample: None,
270 in_flight: InFlight::new(),
271 observed_addr_sent: false,
272 last_observed_addr_report: None,
273 status: Default::default(),
274 first_packet: None,
275 pto_count: 0,
276 idle_timeout: None,
277 keep_alive: None,
278 open: false,
279 last_allowed_receive: None,
280 #[cfg(feature = "qlog")]
281 recovery_metrics: RecoveryMetrics::default(),
282 generation,
283 }
284 }
285
286 pub(super) fn from_previous(
290 remote: SocketAddr,
291 prev: &Self,
292 generation: u64,
293 now: Instant,
294 ) -> Self {
295 let congestion = prev.congestion.clone_box();
296 let smoothed_rtt = prev.rtt.get();
297 Self {
298 remote,
299 rtt: prev.rtt,
300 pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
301 sending_ecn: true,
302 congestion,
303 challenges_sent: Default::default(),
304 send_new_challenge: false,
305 path_responses: PathResponses::default(),
306 validated: false,
307 total_sent: 0,
308 total_recvd: 0,
309 mtud: prev.mtud.clone(),
310 first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
311 in_flight: InFlight::new(),
312 observed_addr_sent: false,
313 last_observed_addr_report: None,
314 status: prev.status.clone(),
315 first_packet: None,
316 pto_count: 0,
317 idle_timeout: prev.idle_timeout,
318 keep_alive: prev.keep_alive,
319 open: false,
320 last_allowed_receive: None,
321 #[cfg(feature = "qlog")]
322 recovery_metrics: prev.recovery_metrics.clone(),
323 generation,
324 }
325 }
326
327 pub(super) fn is_validating_path(&self) -> bool {
329 !self.challenges_sent.is_empty() || self.send_new_challenge
330 }
331
332 pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
335 !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
336 }
337
338 pub(super) fn current_mtu(&self) -> u16 {
340 self.mtud.current_mtu()
341 }
342
343 pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
345 self.in_flight.insert(&packet);
346 if self.first_packet.is_none() {
347 self.first_packet = Some(pn);
348 }
349 if let Some(forgotten) = space.sent(pn, packet) {
350 self.remove_in_flight(&forgotten);
351 }
352 }
353
354 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
357 if packet.path_generation != self.generation {
358 return false;
359 }
360 self.in_flight.remove(packet);
361 true
362 }
363
364 pub(super) fn inc_total_sent(&mut self, inc: u64) {
366 self.total_sent = self.total_sent.saturating_add(inc);
367 if !self.validated {
368 trace!(
369 remote = %self.remote,
370 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
371 "anti amplification budget decreased"
372 );
373 }
374 }
375
376 pub(super) fn inc_total_recvd(&mut self, inc: u64) {
378 self.total_recvd = self.total_recvd.saturating_add(inc);
379 if !self.validated {
380 trace!(
381 remote = %self.remote,
382 anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
383 "anti amplification budget increased"
384 );
385 }
386 }
387
388 pub(super) fn earliest_expiring_challenge(&self) -> Option<Instant> {
390 if self.challenges_sent.is_empty() {
391 return None;
392 }
393 let pto = self.rtt.pto_base();
394 self.challenges_sent
395 .values()
396 .map(|info| info.sent_instant)
397 .min()
398 .map(|sent_instant| sent_instant + pto)
399 }
400
401 pub(super) fn on_path_response_received(
403 &mut self,
404 now: Instant,
405 token: u64,
406 remote: SocketAddr,
407 ) -> OnPathResponseReceived {
408 match self.challenges_sent.get(&token) {
409 Some(info) if info.remote == remote && self.remote == remote => {
411 let sent_instant = info.sent_instant;
412 if !std::mem::replace(&mut self.validated, true) {
413 trace!("new path validated");
414 }
415 self.challenges_sent
417 .retain(|_token, info| info.remote != remote);
418
419 self.send_new_challenge = false;
420
421 let rtt = now.saturating_duration_since(sent_instant);
424 self.rtt.reset_initial_rtt(rtt);
425
426 let was_open = std::mem::replace(&mut self.open, true);
427 OnPathResponseReceived::OnPath { was_open }
428 }
429 Some(info) if info.remote == remote => {
431 self.challenges_sent
432 .retain(|_token, info| info.remote != remote);
433 OnPathResponseReceived::OffPath
434 }
435 Some(info) => OnPathResponseReceived::Invalid {
437 expected: info.remote,
438 },
439 None => OnPathResponseReceived::Unknown,
441 }
442 }
443
444 #[cfg(feature = "qlog")]
445 pub(super) fn qlog_recovery_metrics(
446 &mut self,
447 path_id: PathId,
448 ) -> Option<RecoveryMetricsUpdated> {
449 let controller_metrics = self.congestion.metrics();
450
451 let metrics = RecoveryMetrics {
452 min_rtt: Some(self.rtt.min),
453 smoothed_rtt: Some(self.rtt.get()),
454 latest_rtt: Some(self.rtt.latest),
455 rtt_variance: Some(self.rtt.var),
456 pto_count: Some(self.pto_count),
457 bytes_in_flight: Some(self.in_flight.bytes),
458 packets_in_flight: Some(self.in_flight.ack_eliciting),
459
460 congestion_window: Some(controller_metrics.congestion_window),
461 ssthresh: controller_metrics.ssthresh,
462 pacing_rate: controller_metrics.pacing_rate,
463 };
464
465 let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
466 self.recovery_metrics = metrics;
467 event
468 }
469
470 pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Instant> {
474 let smoothed_rtt = self.rtt.get();
475 self.pacing.delay(
476 smoothed_rtt,
477 bytes_to_send,
478 self.current_mtu(),
479 self.congestion.window(),
480 now,
481 )
482 }
483
484 #[must_use = "updated observed address must be reported to the application"]
488 pub(super) fn update_observed_addr_report(
489 &mut self,
490 observed: ObservedAddr,
491 ) -> Option<SocketAddr> {
492 match self.last_observed_addr_report.as_mut() {
493 Some(prev) => {
494 if prev.seq_no >= observed.seq_no {
495 None
497 } else if prev.ip == observed.ip && prev.port == observed.port {
498 prev.seq_no = observed.seq_no;
500 None
501 } else {
502 let addr = observed.socket_addr();
503 self.last_observed_addr_report = Some(observed);
504 Some(addr)
505 }
506 }
507 None => {
508 let addr = observed.socket_addr();
509 self.last_observed_addr_report = Some(observed);
510 Some(addr)
511 }
512 }
513 }
514
515 pub(crate) fn remote_status(&self) -> Option<PathStatus> {
516 self.status.remote_status.map(|(_seq, status)| status)
517 }
518
519 pub(crate) fn local_status(&self) -> PathStatus {
520 self.status.local_status
521 }
522
523 pub(super) fn generation(&self) -> u64 {
524 self.generation
525 }
526}
527
528pub(super) enum OnPathResponseReceived {
529 OnPath { was_open: bool },
531 OffPath,
533 Unknown,
535 Invalid {
537 expected: SocketAddr,
539 },
540}
541
542#[cfg(feature = "qlog")]
546#[derive(Default, Clone, PartialEq, Debug)]
547#[non_exhaustive]
548struct RecoveryMetrics {
549 pub min_rtt: Option<Duration>,
550 pub smoothed_rtt: Option<Duration>,
551 pub latest_rtt: Option<Duration>,
552 pub rtt_variance: Option<Duration>,
553 pub pto_count: Option<u32>,
554 pub bytes_in_flight: Option<u64>,
555 pub packets_in_flight: Option<u64>,
556 pub congestion_window: Option<u64>,
557 pub ssthresh: Option<u64>,
558 pub pacing_rate: Option<u64>,
559}
560
561#[cfg(feature = "qlog")]
562impl RecoveryMetrics {
563 fn retain_updated(&self, previous: &Self) -> Self {
565 macro_rules! keep_if_changed {
566 ($name:ident) => {
567 if previous.$name == self.$name {
568 None
569 } else {
570 self.$name
571 }
572 };
573 }
574
575 Self {
576 min_rtt: keep_if_changed!(min_rtt),
577 smoothed_rtt: keep_if_changed!(smoothed_rtt),
578 latest_rtt: keep_if_changed!(latest_rtt),
579 rtt_variance: keep_if_changed!(rtt_variance),
580 pto_count: keep_if_changed!(pto_count),
581 bytes_in_flight: keep_if_changed!(bytes_in_flight),
582 packets_in_flight: keep_if_changed!(packets_in_flight),
583 congestion_window: keep_if_changed!(congestion_window),
584 ssthresh: keep_if_changed!(ssthresh),
585 pacing_rate: keep_if_changed!(pacing_rate),
586 }
587 }
588
589 fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
591 let updated = self.retain_updated(previous);
592
593 if updated == Self::default() {
594 return None;
595 }
596
597 Some(RecoveryMetricsUpdated {
598 min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
599 smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
600 latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
601 rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
602 pto_count: updated
603 .pto_count
604 .map(|count| count.try_into().unwrap_or(u16::MAX)),
605 bytes_in_flight: updated.bytes_in_flight,
606 packets_in_flight: updated.packets_in_flight,
607 congestion_window: updated.congestion_window,
608 ssthresh: updated.ssthresh,
609 pacing_rate: updated.pacing_rate,
610 path_id: Some(path_id.as_u32() as u64),
611 })
612 }
613}
614
615#[derive(Copy, Clone, Debug)]
617pub struct RttEstimator {
618 latest: Duration,
620 smoothed: Option<Duration>,
622 var: Duration,
624 min: Duration,
626}
627
628impl RttEstimator {
629 pub(super) fn new(initial_rtt: Duration) -> Self {
630 Self {
631 latest: initial_rtt,
632 smoothed: None,
633 var: initial_rtt / 2,
634 min: initial_rtt,
635 }
636 }
637
638 pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
651 if self.smoothed.is_none() {
652 self.latest = initial_rtt;
653 self.var = initial_rtt / 2;
654 self.min = initial_rtt;
655 }
656 }
657
658 pub fn get(&self) -> Duration {
660 self.smoothed.unwrap_or(self.latest)
661 }
662
663 pub fn conservative(&self) -> Duration {
668 self.get().max(self.latest)
669 }
670
671 pub fn min(&self) -> Duration {
673 self.min
674 }
675
676 pub(crate) fn pto_base(&self) -> Duration {
678 self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
679 }
680
681 pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
683 self.latest = rtt;
684 self.min = cmp::min(self.min, self.latest);
687 if let Some(smoothed) = self.smoothed {
689 let adjusted_rtt = if self.min + ack_delay <= self.latest {
690 self.latest - ack_delay
691 } else {
692 self.latest
693 };
694 let var_sample = smoothed.abs_diff(adjusted_rtt);
695 self.var = (3 * self.var + var_sample) / 4;
696 self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
697 } else {
698 self.smoothed = Some(self.latest);
699 self.var = self.latest / 2;
700 self.min = self.latest;
701 }
702 }
703}
704
705#[derive(Default, Debug)]
706pub(crate) struct PathResponses {
707 pending: Vec<PathResponse>,
708}
709
710impl PathResponses {
711 pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
712 const MAX_PATH_RESPONSES: usize = 16;
714 let response = PathResponse {
715 packet,
716 token,
717 remote,
718 };
719 let existing = self.pending.iter_mut().find(|x| x.remote == remote);
720 if let Some(existing) = existing {
721 if existing.packet <= packet {
723 *existing = response;
724 }
725 return;
726 }
727 if self.pending.len() < MAX_PATH_RESPONSES {
728 self.pending.push(response);
729 } else {
730 trace!("ignoring excessive PATH_CHALLENGE");
733 }
734 }
735
736 pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
737 let response = *self.pending.last()?;
738 if response.remote == remote {
739 return None;
742 }
743 self.pending.pop();
744 Some((response.token, response.remote))
745 }
746
747 pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
748 let response = *self.pending.last()?;
749 if response.remote != remote {
750 return None;
753 }
754 self.pending.pop();
755 Some(response.token)
756 }
757
758 pub(crate) fn is_empty(&self) -> bool {
759 self.pending.is_empty()
760 }
761}
762
763#[derive(Copy, Clone, Debug)]
764struct PathResponse {
765 packet: u64,
767 token: u64,
769 remote: SocketAddr,
771}
772
773#[derive(Debug)]
776pub(super) struct InFlight {
777 pub(super) bytes: u64,
782 pub(super) ack_eliciting: u64,
788}
789
790impl InFlight {
791 fn new() -> Self {
792 Self {
793 bytes: 0,
794 ack_eliciting: 0,
795 }
796 }
797
798 fn insert(&mut self, packet: &SentPacket) {
799 self.bytes += u64::from(packet.size);
800 self.ack_eliciting += u64::from(packet.ack_eliciting);
801 }
802
803 fn remove(&mut self, packet: &SentPacket) {
805 self.bytes -= u64::from(packet.size);
806 self.ack_eliciting -= u64::from(packet.ack_eliciting);
807 }
808}
809
810#[derive(Debug, Clone, Default)]
812pub(super) struct PathStatusState {
813 local_status: PathStatus,
815 local_seq: VarInt,
819 remote_status: Option<(VarInt, PathStatus)>,
821}
822
823impl PathStatusState {
824 pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
826 if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
827 return trace!(%seq, "ignoring path status update");
828 }
829
830 let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
831 if prev != Some(status) {
832 debug!(?status, ?seq, "remote changed path status");
833 }
834 }
835
836 pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
840 if self.local_status == status {
841 return None;
842 }
843
844 self.local_seq = self.local_seq.saturating_add(1u8);
845 Some(std::mem::replace(&mut self.local_status, status))
846 }
847
848 pub(crate) fn seq(&self) -> VarInt {
849 self.local_seq
850 }
851}
852
853#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
858pub enum PathStatus {
859 #[default]
864 Available,
865 Backup,
870}
871
872#[derive(Debug, Clone, PartialEq, Eq)]
874pub enum PathEvent {
875 Opened {
877 id: PathId,
879 },
880 Closed {
882 id: PathId,
884 error_code: VarInt,
888 },
889 Abandoned {
893 id: PathId,
895 path_stats: PathStats,
899 },
900 LocallyClosed {
902 id: PathId,
904 error: PathError,
906 },
907 RemoteStatus {
913 id: PathId,
915 status: PathStatus,
917 },
918 ObservedAddr {
920 id: PathId,
923 addr: SocketAddr,
925 },
926}
927
928#[derive(Debug, Error, Clone, PartialEq, Eq)]
930pub enum SetPathStatusError {
931 #[error("closed path")]
933 ClosedPath,
934 #[error("multipath not negotiated")]
936 MultipathNotNegotiated,
937}
938
939#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
941#[error("closed path")]
942pub struct ClosedPath {
943 pub(super) _private: (),
944}
945
946#[cfg(test)]
947mod tests {
948 use super::*;
949
950 #[test]
951 fn test_path_id_saturating_add() {
952 let large: PathId = u16::MAX.into();
954 let next = u32::from(u16::MAX) + 1;
955 assert_eq!(large.saturating_add(1u8), PathId::from(next));
956
957 assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
959 }
960}