1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::SocketAddr,
7 num::{NonZeroU32, NonZeroUsize},
8 sync::Arc,
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13
14use rand::{RngExt, SeedableRng, rngs::StdRng};
15use rustc_hash::{FxHashMap, FxHashSet};
16use thiserror::Error;
17use tracing::{debug, error, trace, trace_span, warn};
18
19use crate::{
20 Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
21 MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
22 TransportError, TransportErrorCode, VarInt,
23 cid_generator::ConnectionIdGenerator,
24 cid_queue::CidQueue,
25 config::{ServerConfig, TransportConfig},
26 congestion::Controller,
27 connection::{
28 qlog::{QlogRecvPacket, QlogSink},
29 spaces::LostPacket,
30 stats::PathStatsMap,
31 timer::{ConnTimer, PathTimer},
32 },
33 crypto::{self, Keys},
34 frame::{
35 self, Close, DataBlocked, Datagram, FrameStruct, NewToken, ObservedAddr, StreamDataBlocked,
36 StreamsBlocked,
37 },
38 n0_nat_traversal,
39 packet::{
40 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
41 PacketNumber, PartialDecode, SpaceId,
42 },
43 range_set::ArrayRangeSet,
44 shared::{
45 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
46 EndpointEvent, EndpointEventInner,
47 },
48 token::{ResetToken, Token, TokenPayload},
49 transport_parameters::TransportParameters,
50};
51
52mod ack_frequency;
53use ack_frequency::AckFrequencyState;
54
55mod assembler;
56pub use assembler::Chunk;
57
58mod cid_state;
59use cid_state::CidState;
60
61mod datagrams;
62use datagrams::DatagramState;
63pub use datagrams::{Datagrams, SendDatagramError};
64
65mod mtud;
66mod pacing;
67
68mod packet_builder;
69use packet_builder::{PacketBuilder, PadDatagram};
70
71mod packet_crypto;
72use packet_crypto::CryptoState;
73pub(crate) use packet_crypto::EncryptionLevel;
74
75mod paths;
76pub use paths::{
77 ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError,
78};
79use paths::{PathData, PathState};
80
81pub(crate) mod qlog;
82pub(crate) mod send_buffer;
83
84pub(crate) mod spaces;
85#[cfg(fuzzing)]
86pub use spaces::Retransmits;
87#[cfg(not(fuzzing))]
88use spaces::Retransmits;
89pub(crate) use spaces::SpaceKind;
90use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
91
92mod stats;
93pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
94
95mod streams;
96#[cfg(fuzzing)]
97pub use streams::StreamsState;
98#[cfg(not(fuzzing))]
99use streams::StreamsState;
100pub use streams::{
101 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
102 ShouldTransmit, StreamEvent, Streams, WriteError,
103};
104
105mod timer;
106use timer::{Timer, TimerTable};
107
108mod transmit_buf;
109use transmit_buf::TransmitBuf;
110
111mod state;
112
113#[cfg(not(fuzzing))]
114use state::State;
115#[cfg(fuzzing)]
116pub use state::State;
117use state::StateType;
118
119pub struct Connection {
159 endpoint_config: Arc<EndpointConfig>,
160 config: Arc<TransportConfig>,
161 rng: StdRng,
162 crypto_state: CryptoState,
164 handshake_cid: ConnectionId,
166 remote_handshake_cid: ConnectionId,
168 paths: BTreeMap<PathId, PathState>,
174 path_generation_counter: u64,
185 allow_mtud: bool,
187 state: State,
188 side: ConnectionSide,
189 peer_params: TransportParameters,
191 original_remote_cid: ConnectionId,
193 initial_dst_cid: ConnectionId,
195 retry_src_cid: Option<ConnectionId>,
198 events: VecDeque<Event>,
200 endpoint_events: VecDeque<EndpointEventInner>,
201 spin_enabled: bool,
203 spin: bool,
205 spaces: [PacketSpace; 3],
207 highest_space: SpaceKind,
209 idle_timeout: Option<Duration>,
211 timers: TimerTable,
212 authentication_failures: u64,
214
215 connection_close_pending: bool,
220
221 ack_frequency: AckFrequencyState,
225
226 receiving_ecn: bool,
231 total_authed_packets: u64,
233
234 next_observed_addr_seq_no: VarInt,
239
240 streams: StreamsState,
241 remote_cids: FxHashMap<PathId, CidQueue>,
247 local_cid_state: FxHashMap<PathId, CidState>,
254 datagrams: DatagramState,
256 path_stats: PathStatsMap,
258 partial_stats: ConnectionStats,
264 version: u32,
266
267 max_concurrent_paths: NonZeroU32,
276 local_max_path_id: PathId,
291 remote_max_path_id: PathId,
297 max_path_id_with_cids: PathId,
303 abandoned_paths: FxHashSet<PathId>,
311
312 n0_nat_traversal: n0_nat_traversal::State,
314 qlog: QlogSink,
315}
316
317impl Connection {
318 pub(crate) fn new(
319 endpoint_config: Arc<EndpointConfig>,
320 config: Arc<TransportConfig>,
321 init_cid: ConnectionId,
322 local_cid: ConnectionId,
323 remote_cid: ConnectionId,
324 network_path: FourTuple,
325 crypto: Box<dyn crypto::Session>,
326 cid_gen: &dyn ConnectionIdGenerator,
327 now: Instant,
328 version: u32,
329 allow_mtud: bool,
330 rng_seed: [u8; 32],
331 side_args: SideArgs,
332 qlog: QlogSink,
333 ) -> Self {
334 let pref_addr_cid = side_args.pref_addr_cid();
335 let path_validated = side_args.path_validated();
336 let connection_side = ConnectionSide::from(side_args);
337 let side = connection_side.side();
338 let mut rng = StdRng::from_seed(rng_seed);
339 let initial_space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
340 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
341 #[cfg(test)]
342 let data_space = match config.deterministic_packet_numbers {
343 true => PacketSpace::new_deterministic(now, SpaceId::Data),
344 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
345 };
346 #[cfg(not(test))]
347 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
348 let state = State::handshake(state::Handshake {
349 remote_cid_set: side.is_server(),
350 expected_token: Bytes::new(),
351 client_hello: None,
352 allow_server_migration: side.is_client() && config.server_handshake_migration,
353 });
354 let local_cid_state = FxHashMap::from_iter([(
355 PathId::ZERO,
356 CidState::new(
357 cid_gen.cid_len(),
358 cid_gen.cid_lifetime(),
359 now,
360 if pref_addr_cid.is_some() { 2 } else { 1 },
361 ),
362 )]);
363
364 let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
365 path.open_status = paths::OpenStatus::Informed;
366 let mut this = Self {
367 endpoint_config,
368 crypto_state: CryptoState::new(crypto, init_cid, side, &mut rng),
369 handshake_cid: local_cid,
370 remote_handshake_cid: remote_cid,
371 local_cid_state,
372 paths: BTreeMap::from_iter([(
373 PathId::ZERO,
374 PathState {
375 data: path,
376 prev: None,
377 },
378 )]),
379 path_generation_counter: 0,
380 allow_mtud,
381 state,
382 side: connection_side,
383 peer_params: TransportParameters::default(),
384 original_remote_cid: remote_cid,
385 initial_dst_cid: init_cid,
386 retry_src_cid: None,
387 events: VecDeque::new(),
388 endpoint_events: VecDeque::new(),
389 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
390 spin: false,
391 spaces: [initial_space, handshake_space, data_space],
392 highest_space: SpaceKind::Initial,
393 idle_timeout: match config.max_idle_timeout {
394 None | Some(VarInt(0)) => None,
395 Some(dur) => Some(Duration::from_millis(dur.0)),
396 },
397 timers: TimerTable::default(),
398 authentication_failures: 0,
399 connection_close_pending: false,
400
401 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
402 &TransportParameters::default(),
403 )),
404
405 receiving_ecn: false,
406 total_authed_packets: 0,
407
408 next_observed_addr_seq_no: 0u32.into(),
409
410 streams: StreamsState::new(
411 side,
412 config.max_concurrent_uni_streams,
413 config.max_concurrent_bidi_streams,
414 config.send_window,
415 config.receive_window,
416 config.stream_receive_window,
417 ),
418 datagrams: DatagramState::default(),
419 config,
420 remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]),
421 rng,
422 path_stats: Default::default(),
423 partial_stats: ConnectionStats::default(),
424 version,
425
426 max_concurrent_paths: NonZeroU32::MIN,
428 local_max_path_id: PathId::ZERO,
429 remote_max_path_id: PathId::ZERO,
430 max_path_id_with_cids: PathId::ZERO,
431 abandoned_paths: Default::default(),
432
433 n0_nat_traversal: Default::default(),
434 qlog,
435 };
436 if path_validated {
437 this.on_path_validated(PathId::ZERO);
438 }
439 if side.is_client() {
440 this.write_crypto();
442 this.init_0rtt(now);
443 }
444 this.qlog
445 .emit_tuple_assigned(PathId::ZERO, network_path, now);
446 this
447 }
448
449 #[must_use]
457 pub fn poll_timeout(&mut self) -> Option<Instant> {
458 self.timers.peek()
459 }
460
461 #[must_use]
467 pub fn poll(&mut self) -> Option<Event> {
468 if let Some(x) = self.events.pop_front() {
469 return Some(x);
470 }
471
472 if let Some(event) = self.streams.poll() {
473 return Some(Event::Stream(event));
474 }
475
476 if let Some(reason) = self.state.take_error() {
477 return Some(Event::ConnectionLost { reason });
478 }
479
480 None
481 }
482
483 #[must_use]
485 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
486 self.endpoint_events.pop_front().map(EndpointEvent)
487 }
488
489 #[must_use]
491 pub fn streams(&mut self) -> Streams<'_> {
492 Streams {
493 state: &mut self.streams,
494 conn_state: &self.state,
495 }
496 }
497
498 #[must_use]
500 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
501 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
502 RecvStream {
503 id,
504 state: &mut self.streams,
505 pending: &mut self.spaces[SpaceId::Data].pending,
506 }
507 }
508
509 #[must_use]
511 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
512 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
513 SendStream {
514 id,
515 state: &mut self.streams,
516 pending: &mut self.spaces[SpaceId::Data].pending,
517 conn_state: &self.state,
518 }
519 }
520
521 pub fn open_path_ensure(
538 &mut self,
539 network_path: FourTuple,
540 initial_status: PathStatus,
541 now: Instant,
542 ) -> Result<(PathId, bool), PathError> {
543 let existing_open_path = self.paths.iter().find(|(id, path)| {
544 network_path.is_probably_same_path(&path.data.network_path)
545 && !self.abandoned_paths.contains(*id)
546 });
547 match existing_open_path {
548 Some((path_id, _state)) => Ok((*path_id, true)),
549 None => Ok((self.open_path(network_path, initial_status, now)?, false)),
550 }
551 }
552
553 pub fn open_path(
559 &mut self,
560 network_path: FourTuple,
561 initial_status: PathStatus,
562 now: Instant,
563 ) -> Result<PathId, PathError> {
564 if !self.is_multipath_negotiated() {
565 return Err(PathError::MultipathNotNegotiated);
566 }
567 if self.side().is_server() {
568 return Err(PathError::ServerSideNotAllowed);
569 }
570
571 let max_abandoned = self.abandoned_paths.iter().max().copied();
572 let max_used = self.paths.keys().last().copied();
573 let path_id = max_abandoned
574 .max(max_used)
575 .unwrap_or(PathId::ZERO)
576 .saturating_add(1u8);
577
578 if Some(path_id) > self.max_path_id() {
579 return Err(PathError::MaxPathIdReached);
580 }
581 if path_id > self.remote_max_path_id {
582 self.spaces[SpaceId::Data].pending.paths_blocked = true;
583 return Err(PathError::MaxPathIdReached);
584 }
585 if self
586 .remote_cids
587 .get(&path_id)
588 .map(CidQueue::active)
589 .is_none()
590 {
591 self.spaces[SpaceId::Data]
592 .pending
593 .path_cids_blocked
594 .insert(path_id);
595 return Err(PathError::RemoteCidsExhausted);
596 }
597
598 let path = self.ensure_path(path_id, network_path, now, None);
599 path.status.local_update(initial_status);
600
601 Ok(path_id)
602 }
603
604 pub fn close_path(
610 &mut self,
611 now: Instant,
612 path_id: PathId,
613 error_code: VarInt,
614 ) -> Result<(), ClosePathError> {
615 self.close_path_inner(
616 now,
617 path_id,
618 PathAbandonReason::ApplicationClosed { error_code },
619 )
620 }
621
622 pub(crate) fn close_path_inner(
627 &mut self,
628 now: Instant,
629 path_id: PathId,
630 reason: PathAbandonReason,
631 ) -> Result<(), ClosePathError> {
632 if self.state.is_drained() {
633 return Ok(());
634 }
635
636 if !self.is_multipath_negotiated() {
637 return Err(ClosePathError::MultipathNotNegotiated);
638 }
639 if self.abandoned_paths.contains(&path_id)
640 || Some(path_id) > self.max_path_id()
641 || !self.paths.contains_key(&path_id)
642 {
643 return Err(ClosePathError::ClosedPath);
644 }
645
646 let is_last_path = !self
647 .paths
648 .keys()
649 .any(|id| *id != path_id && !self.abandoned_paths.contains(id));
650
651 if is_last_path && !reason.is_remote() {
652 return Err(ClosePathError::LastOpenPath);
653 }
654
655 self.abandon_path(now, path_id, reason);
656
657 if is_last_path {
661 let rtt = RttEstimator::new(self.config.initial_rtt);
665 let pto = rtt.pto_base() + self.ack_frequency.max_ack_delay_for_pto();
666 let grace = pto * 3;
667 self.timers.set(
668 Timer::Conn(ConnTimer::NoAvailablePath),
669 now + grace,
670 self.qlog.with_time(now),
671 );
672 }
673
674 Ok(())
675 }
676
677 fn abandon_path(&mut self, now: Instant, path_id: PathId, reason: PathAbandonReason) {
682 trace!(%path_id, ?reason, "abandoning path");
683
684 let pending_space = &mut self.spaces[SpaceId::Data].pending;
685 pending_space
687 .path_abandon
688 .insert(path_id, reason.error_code());
689
690 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
692 pending_space.path_cids_blocked.retain(|&id| id != path_id);
693 pending_space.path_status.retain(|&id| id != path_id);
694
695 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
697 for sent_packet in space.sent_packets.values_mut() {
698 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
699 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
700 retransmits.path_cids_blocked.retain(|&id| id != path_id);
701 retransmits.path_status.retain(|&id| id != path_id);
702 }
703 }
704 }
705
706 self.spaces[SpaceId::Data].for_path(path_id).loss_probes = 0;
711
712 debug_assert!(!self.state.is_drained()); self.endpoint_events
717 .push_back(EndpointEventInner::RetireResetToken(path_id));
718
719 self.abandoned_paths.insert(path_id);
720
721 for timer in timer::PathTimer::VALUES {
722 let keep_timer = match timer {
724 PathTimer::PathValidationFailed
728 | PathTimer::PathChallengeLost
729 | PathTimer::AbandonFromValidation => false,
730 PathTimer::PathKeepAlive | PathTimer::PathIdle => false,
733 PathTimer::MaxAckDelay => false,
736 PathTimer::PathDrained => false,
739 PathTimer::LossDetection => true,
742 PathTimer::Pacing => true,
746 };
747
748 if !keep_timer {
749 let qlog = self.qlog.with_time(now);
750 self.timers.stop(Timer::PerPath(path_id, timer), qlog);
751 }
752 }
753
754 self.set_loss_detection_timer(now, path_id);
759
760 self.events.push_back(Event::Path(PathEvent::Abandoned {
762 id: path_id,
763 reason,
764 }));
765 }
766
767 #[track_caller]
771 fn path_data(&self, path_id: PathId) -> &PathData {
772 if let Some(data) = self.paths.get(&path_id) {
773 &data.data
774 } else {
775 panic!(
776 "unknown path: {path_id}, currently known paths: {:?}",
777 self.paths.keys().collect::<Vec<_>>()
778 );
779 }
780 }
781
782 #[track_caller]
786 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
787 &mut self.paths.get_mut(&path_id).expect("known path").data
788 }
789
790 fn path(&self, path_id: PathId) -> Option<&PathData> {
792 self.paths.get(&path_id).map(|path_state| &path_state.data)
793 }
794
795 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
797 self.paths
798 .get_mut(&path_id)
799 .map(|path_state| &mut path_state.data)
800 }
801
802 pub fn paths(&self) -> Vec<PathId> {
806 self.paths.keys().copied().collect()
807 }
808
809 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
811 self.path(path_id)
812 .map(PathData::local_status)
813 .ok_or(ClosedPath { _private: () })
814 }
815
816 pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
818 self.path(path_id)
819 .map(|path| path.network_path)
820 .ok_or(ClosedPath { _private: () })
821 }
822
823 pub fn set_path_status(
827 &mut self,
828 path_id: PathId,
829 status: PathStatus,
830 ) -> Result<PathStatus, SetPathStatusError> {
831 if !self.is_multipath_negotiated() {
832 return Err(SetPathStatusError::MultipathNotNegotiated);
833 }
834 let path = self
835 .path_mut(path_id)
836 .ok_or(SetPathStatusError::ClosedPath)?;
837 let prev = match path.status.local_update(status) {
838 Some(prev) => {
839 self.spaces[SpaceId::Data]
840 .pending
841 .path_status
842 .insert(path_id);
843 prev
844 }
845 None => path.local_status(),
846 };
847 Ok(prev)
848 }
849
850 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
855 self.path(path_id).and_then(|path| path.remote_status())
856 }
857
858 pub fn set_path_max_idle_timeout(
867 &mut self,
868 now: Instant,
869 path_id: PathId,
870 timeout: Option<Duration>,
871 ) -> Result<Option<Duration>, ClosedPath> {
872 let path = self
873 .paths
874 .get_mut(&path_id)
875 .ok_or(ClosedPath { _private: () })?;
876 let prev = std::mem::replace(&mut path.data.idle_timeout, timeout);
877
878 if !self.state.is_closed() {
880 if let Some(new_timeout) = timeout {
881 let timer = Timer::PerPath(path_id, PathTimer::PathIdle);
882 let deadline = match (prev, self.timers.get(timer)) {
883 (Some(old_timeout), Some(old_deadline)) => {
884 let last_activity = old_deadline.checked_sub(old_timeout).unwrap_or(now);
885 last_activity + new_timeout
886 }
887 _ => now + new_timeout,
888 };
889 self.timers.set(timer, deadline, self.qlog.with_time(now));
890 } else {
891 self.timers.stop(
892 Timer::PerPath(path_id, PathTimer::PathIdle),
893 self.qlog.with_time(now),
894 );
895 }
896 }
897
898 Ok(prev)
899 }
900
901 pub fn set_path_keep_alive_interval(
907 &mut self,
908 path_id: PathId,
909 interval: Option<Duration>,
910 ) -> Result<Option<Duration>, ClosedPath> {
911 let path = self
912 .paths
913 .get_mut(&path_id)
914 .ok_or(ClosedPath { _private: () })?;
915 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
916 }
917
918 fn find_validated_path_on_network_path(
922 &self,
923 network_path: FourTuple,
924 ) -> Option<(&PathId, &PathState)> {
925 self.paths.iter().find(|(path_id, path_state)| {
926 path_state.data.validated
927 && network_path.is_probably_same_path(&path_state.data.network_path)
929 && !self.abandoned_paths.contains(path_id)
930 })
931 }
935
936 fn ensure_path(
940 &mut self,
941 path_id: PathId,
942 network_path: FourTuple,
943 now: Instant,
944 pn: Option<u64>,
945 ) -> &mut PathData {
946 let valid_path = self.find_validated_path_on_network_path(network_path);
947 let validated = valid_path.is_some();
948 let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
949 let vacant_entry = match self.paths.entry(path_id) {
950 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
951 btree_map::Entry::Occupied(occupied_entry) => {
952 return &mut occupied_entry.into_mut().data;
953 }
954 };
955
956 debug!(%validated, %path_id, %network_path, "path added");
957
958 self.timers.stop(
960 Timer::Conn(ConnTimer::NoAvailablePath),
961 self.qlog.with_time(now),
962 );
963 let peer_max_udp_payload_size =
964 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
965 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
966 let mut data = PathData::new(
967 network_path,
968 self.allow_mtud,
969 Some(peer_max_udp_payload_size),
970 self.path_generation_counter,
971 now,
972 &self.config,
973 );
974
975 data.validated = validated;
976 if let Some(initial_rtt) = initial_rtt {
977 data.rtt.reset_initial_rtt(initial_rtt);
978 }
979
980 data.pending_on_path_challenge = true;
983
984 let path = vacant_entry.insert(PathState { data, prev: None });
985
986 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
987 if let Some(pn) = pn {
988 pn_space.dedup.insert(pn);
989 }
990 self.spaces[SpaceId::Data]
991 .number_spaces
992 .insert(path_id, pn_space);
993 self.qlog.emit_tuple_assigned(path_id, network_path, now);
994
995 if !self.remote_cids.contains_key(&path_id) {
999 debug!(%path_id, "Remote opened path without issuing CIDs");
1000 self.spaces[SpaceId::Data]
1001 .pending
1002 .path_cids_blocked
1003 .insert(path_id);
1004 }
1007
1008 &mut path.data
1009 }
1010
1011 #[must_use]
1021 pub fn poll_transmit(
1022 &mut self,
1023 now: Instant,
1024 max_datagrams: NonZeroUsize,
1025 buf: &mut Vec<u8>,
1026 ) -> Option<Transmit> {
1027 let max_datagrams = match self.config.enable_segmentation_offload {
1028 false => NonZeroUsize::MIN,
1029 true => max_datagrams,
1030 };
1031
1032 let connection_close_pending = match self.state.as_type() {
1038 StateType::Drained => {
1039 for path in self.paths.values_mut() {
1040 path.data.app_limited = true;
1041 }
1042 return None;
1043 }
1044 StateType::Draining | StateType::Closed => {
1045 if !self.connection_close_pending {
1048 for path in self.paths.values_mut() {
1049 path.data.app_limited = true;
1050 }
1051 return None;
1052 }
1053 true
1054 }
1055 _ => false,
1056 };
1057
1058 if let Some(config) = &self.config.ack_frequency_config {
1060 let rtt = self
1061 .paths
1062 .values()
1063 .map(|p| p.data.rtt.get())
1064 .min()
1065 .expect("one path exists");
1066 self.spaces[SpaceId::Data].pending.ack_frequency = self
1067 .ack_frequency
1068 .should_send_ack_frequency(rtt, config, &self.peer_params)
1069 && self.highest_space == SpaceKind::Data
1070 && self.peer_supports_ack_frequency();
1071 }
1072
1073 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1074 while let Some(path_id) = next_path_id {
1075 if !connection_close_pending
1076 && let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id)
1077 {
1078 return Some(transmit);
1079 }
1080
1081 let info = self.scheduling_info(path_id);
1082 if let Some(transmit) = self.poll_transmit_on_path(
1083 now,
1084 buf,
1085 path_id,
1086 max_datagrams,
1087 &info,
1088 connection_close_pending,
1089 ) {
1090 return Some(transmit);
1091 }
1092
1093 debug_assert!(
1096 buf.is_empty(),
1097 "nothing to send on path but buffer not empty"
1098 );
1099
1100 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1101 }
1102
1103 debug_assert!(
1105 buf.is_empty(),
1106 "there was data in the buffer, but it was not sent"
1107 );
1108
1109 if self.state.is_established() {
1110 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1112 while let Some(path_id) = next_path_id {
1113 if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) {
1114 return Some(transmit);
1115 }
1116 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1117 }
1118 }
1119
1120 None
1121 }
1122
1123 fn scheduling_info(&self, path_id: PathId) -> PathSchedulingInfo {
1141 let have_validated_status_available_space = self.paths.iter().any(|(path_id, path)| {
1143 self.remote_cids.contains_key(path_id)
1144 && !self.abandoned_paths.contains(path_id)
1145 && path.data.validated
1146 && path.data.local_status() == PathStatus::Available
1147 });
1148
1149 let have_validated_space = self.paths.iter().any(|(path_id, path)| {
1151 self.remote_cids.contains_key(path_id)
1152 && !self.abandoned_paths.contains(path_id)
1153 && path.data.validated
1154 });
1155
1156 let is_handshaking = self.is_handshaking();
1157 let has_cids = self.remote_cids.contains_key(&path_id);
1158 let is_abandoned = self.abandoned_paths.contains(&path_id);
1159 let path_data = self.path_data(path_id);
1160 let validated = path_data.validated;
1161 let status = path_data.local_status();
1162
1163 let may_send_data = has_cids
1166 && !is_abandoned
1167 && if is_handshaking {
1168 true
1172 } else if !validated {
1173 false
1180 } else {
1181 match status {
1182 PathStatus::Available => {
1183 true
1185 }
1186 PathStatus::Backup => {
1187 !have_validated_status_available_space
1189 }
1190 }
1191 };
1192
1193 let may_send_close = has_cids
1198 && !is_abandoned
1199 && if !validated && have_validated_status_available_space {
1200 false
1202 } else {
1203 true
1205 };
1206
1207 let may_self_abandon = has_cids && validated && !have_validated_space;
1211
1212 PathSchedulingInfo {
1213 is_abandoned,
1214 may_send_data,
1215 may_send_close,
1216 may_self_abandon,
1217 }
1218 }
1219
1220 fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit {
1221 debug_assert!(
1222 !transmit.is_empty(),
1223 "must not be called with an empty transmit buffer"
1224 );
1225
1226 let network_path = self.path_data(path_id).network_path;
1227 trace!(
1228 segment_size = transmit.segment_size(),
1229 last_datagram_len = transmit.len() % transmit.segment_size(),
1230 %network_path,
1231 "sending {} bytes in {} datagrams",
1232 transmit.len(),
1233 transmit.num_datagrams()
1234 );
1235 self.path_data_mut(path_id)
1236 .inc_total_sent(transmit.len() as u64);
1237
1238 self.path_stats
1239 .for_path(path_id)
1240 .udp_tx
1241 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1242
1243 Transmit {
1244 destination: network_path.remote,
1245 size: transmit.len(),
1246 ecn: if self.path_data(path_id).sending_ecn {
1247 Some(EcnCodepoint::Ect0)
1248 } else {
1249 None
1250 },
1251 segment_size: match transmit.num_datagrams() {
1252 1 => None,
1253 _ => Some(transmit.segment_size()),
1254 },
1255 src_ip: network_path.local_ip,
1256 }
1257 }
1258
1259 fn poll_transmit_off_path(
1261 &mut self,
1262 now: Instant,
1263 buf: &mut Vec<u8>,
1264 path_id: PathId,
1265 ) -> Option<Transmit> {
1266 if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) {
1267 return Some(challenge);
1268 }
1269 if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
1270 return Some(response);
1271 }
1272 if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
1273 return Some(challenge);
1274 }
1275 None
1276 }
1277
1278 #[must_use]
1285 fn poll_transmit_on_path(
1286 &mut self,
1287 now: Instant,
1288 buf: &mut Vec<u8>,
1289 path_id: PathId,
1290 max_datagrams: NonZeroUsize,
1291 scheduling_info: &PathSchedulingInfo,
1292 connection_close_pending: bool,
1293 ) -> Option<Transmit> {
1294 let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else {
1296 if !self.abandoned_paths.contains(&path_id) {
1297 debug!(%path_id, "no remote CIDs for path");
1298 }
1299 return None;
1300 };
1301
1302 let mut pad_datagram = PadDatagram::No;
1308
1309 let mut last_packet_number = None;
1313
1314 let mut congestion_blocked = false;
1317
1318 let pmtu = self.path_data(path_id).current_mtu().into();
1320 let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu);
1321
1322 for space_id in SpaceId::iter() {
1324 if path_id != PathId::ZERO && space_id != SpaceId::Data {
1326 continue;
1327 }
1328 match self.poll_transmit_path_space(
1329 now,
1330 &mut transmit,
1331 path_id,
1332 space_id,
1333 remote_cid,
1334 scheduling_info,
1335 connection_close_pending,
1336 pad_datagram,
1337 ) {
1338 PollPathSpaceStatus::NothingToSend {
1339 congestion_blocked: cb,
1340 } => {
1341 congestion_blocked |= cb;
1342 }
1345 PollPathSpaceStatus::WrotePacket {
1346 last_packet_number: pn,
1347 pad_datagram: pad,
1348 } => {
1349 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1350 last_packet_number = Some(pn);
1351 pad_datagram = pad;
1352 continue;
1357 }
1358 PollPathSpaceStatus::Send {
1359 last_packet_number: pn,
1360 } => {
1361 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1362 last_packet_number = Some(pn);
1363 break;
1364 }
1365 }
1366 }
1367
1368 if last_packet_number.is_some() || congestion_blocked {
1369 self.qlog.emit_recovery_metrics(
1370 path_id,
1371 &mut self
1372 .paths
1373 .get_mut(&path_id)
1374 .expect("path_id was iterated from self.paths above")
1375 .data,
1376 now,
1377 );
1378 }
1379
1380 self.path_data_mut(path_id).app_limited =
1381 last_packet_number.is_none() && !congestion_blocked;
1382
1383 match last_packet_number {
1384 Some(last_packet_number) => {
1385 self.path_data_mut(path_id).congestion.on_sent(
1388 now,
1389 transmit.len() as u64,
1390 last_packet_number,
1391 );
1392 Some(self.build_transmit(path_id, transmit))
1393 }
1394 None => None,
1395 }
1396 }
1397
1398 #[must_use]
1400 fn poll_transmit_path_space(
1401 &mut self,
1402 now: Instant,
1403 transmit: &mut TransmitBuf<'_>,
1404 path_id: PathId,
1405 space_id: SpaceId,
1406 remote_cid: ConnectionId,
1407 scheduling_info: &PathSchedulingInfo,
1408 connection_close_pending: bool,
1410 mut pad_datagram: PadDatagram,
1412 ) -> PollPathSpaceStatus {
1413 let mut last_packet_number = None;
1416
1417 loop {
1433 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1435 transmit.datagram_remaining_mut()
1437 } else {
1438 transmit.segment_size()
1440 };
1441 let can_send =
1442 self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending);
1443 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1444 let space_will_send = {
1445 if scheduling_info.is_abandoned {
1446 scheduling_info.may_self_abandon
1451 && self.spaces[space_id]
1452 .pending
1453 .path_abandon
1454 .contains_key(&path_id)
1455 } else if can_send.close && scheduling_info.may_send_close {
1456 true
1458 } else if needs_loss_probe || can_send.space_specific {
1459 true
1462 } else {
1463 !can_send.is_empty() && scheduling_info.may_send_data
1466 }
1467 };
1468
1469 if !space_will_send {
1470 return match last_packet_number {
1473 Some(pn) => PollPathSpaceStatus::WrotePacket {
1474 last_packet_number: pn,
1475 pad_datagram,
1476 },
1477 None => {
1478 if self.crypto_state.has_keys(space_id.encryption_level())
1480 || (space_id == SpaceId::Data
1481 && self.crypto_state.has_keys(EncryptionLevel::ZeroRtt))
1482 {
1483 trace!(?space_id, %path_id, "nothing to send in space");
1484 }
1485 PollPathSpaceStatus::NothingToSend {
1486 congestion_blocked: false,
1487 }
1488 }
1489 };
1490 }
1491
1492 if transmit.datagram_remaining_mut() == 0 {
1496 let congestion_blocked =
1497 self.path_congestion_check(space_id, path_id, transmit, &can_send, now);
1498 if congestion_blocked != PathBlocked::No {
1499 return match last_packet_number {
1501 Some(pn) => PollPathSpaceStatus::WrotePacket {
1502 last_packet_number: pn,
1503 pad_datagram,
1504 },
1505 None => {
1506 return PollPathSpaceStatus::NothingToSend {
1507 congestion_blocked: true,
1508 };
1509 }
1510 };
1511 }
1512
1513 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1516 return match last_packet_number {
1519 Some(pn) => PollPathSpaceStatus::WrotePacket {
1520 last_packet_number: pn,
1521 pad_datagram,
1522 },
1523 None => {
1524 return PollPathSpaceStatus::NothingToSend {
1525 congestion_blocked: false,
1526 };
1527 }
1528 };
1529 }
1530
1531 if needs_loss_probe {
1532 let request_immediate_ack =
1534 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1535 self.spaces[space_id].queue_tail_loss_probe(
1536 path_id,
1537 request_immediate_ack,
1538 &self.streams,
1539 );
1540
1541 self.spaces[space_id].for_path(path_id).loss_probes -= 1; transmit.start_new_datagram_with_size(std::cmp::min(
1547 usize::from(INITIAL_MTU),
1548 transmit.segment_size(),
1549 ));
1550 } else {
1551 transmit.start_new_datagram();
1552 }
1553 trace!(count = transmit.num_datagrams(), "new datagram started");
1554
1555 pad_datagram = PadDatagram::No;
1557 }
1558
1559 if transmit.datagram_start_offset() < transmit.len() {
1562 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1563 }
1564
1565 if self.crypto_state.has_keys(EncryptionLevel::Initial)
1570 && space_id == SpaceId::Handshake
1571 && self.side.is_client()
1572 {
1573 self.discard_space(now, SpaceKind::Initial);
1576 }
1577 if let Some(ref mut prev) = self.crypto_state.prev_crypto {
1578 prev.update_unacked = false;
1579 }
1580
1581 let Some(mut builder) =
1582 PacketBuilder::new(now, space_id, path_id, remote_cid, transmit, self)
1583 else {
1584 return PollPathSpaceStatus::NothingToSend {
1591 congestion_blocked: false,
1592 };
1593 };
1594 last_packet_number = Some(builder.packet_number);
1595
1596 if space_id == SpaceId::Initial
1597 && (self.side.is_client() || can_send.is_ack_eliciting())
1598 {
1599 pad_datagram |= PadDatagram::ToMinMtu;
1601 }
1602 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1603 pad_datagram |= PadDatagram::ToSegmentSize;
1604 }
1605
1606 if scheduling_info.may_send_close && can_send.close {
1607 trace!("sending CONNECTION_CLOSE");
1608 let is_multipath_negotiated = self.is_multipath_negotiated();
1613 for path_id in self.spaces[space_id]
1614 .number_spaces
1615 .iter()
1616 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1617 .map(|(&path_id, _)| path_id)
1618 .collect::<Vec<_>>()
1619 {
1620 Self::populate_acks(
1621 now,
1622 self.receiving_ecn,
1623 path_id,
1624 space_id,
1625 &mut self.spaces[space_id],
1626 is_multipath_negotiated,
1627 &mut builder,
1628 &mut self.path_stats.for_path(path_id).frame_tx,
1629 self.crypto_state.has_keys(space_id.encryption_level()),
1630 );
1631 }
1632
1633 debug_assert!(
1641 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1642 "ACKs should leave space for ConnectionClose"
1643 );
1644 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
1645 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1646 let max_frame_size = builder.frame_space_remaining();
1647 let close: Close = match self.state.as_type() {
1648 StateType::Closed => {
1649 let reason: Close =
1650 self.state.as_closed().expect("checked").clone().into();
1651 if space_id == SpaceId::Data || reason.is_transport_layer() {
1652 reason
1653 } else {
1654 TransportError::APPLICATION_ERROR("").into()
1655 }
1656 }
1657 StateType::Draining => TransportError::NO_ERROR("").into(),
1658 _ => unreachable!(
1659 "tried to make a close packet when the connection wasn't closed"
1660 ),
1661 };
1662 builder.write_frame(close.encoder(max_frame_size), stats);
1663 }
1664 let last_pn = builder.packet_number;
1665 builder.finish_and_track(now, self, path_id, pad_datagram);
1666 if space_id.kind() == self.highest_space {
1667 self.connection_close_pending = false;
1670 }
1671 return PollPathSpaceStatus::WrotePacket {
1684 last_packet_number: last_pn,
1685 pad_datagram,
1686 };
1687 }
1688
1689 self.populate_packet(now, space_id, path_id, scheduling_info, &mut builder);
1690
1691 debug_assert!(
1698 !(builder.sent_frames().is_ack_only(&self.streams)
1699 && !can_send.acks
1700 && (can_send.other || can_send.space_specific)
1701 && builder.buf.segment_size()
1702 == self.path_data(path_id).current_mtu() as usize
1703 && self.datagrams.outgoing.is_empty()),
1704 "SendableFrames was {can_send:?}, but only ACKs have been written"
1705 );
1706 if builder.sent_frames().requires_padding {
1707 pad_datagram |= PadDatagram::ToMinMtu;
1708 }
1709
1710 for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1711 self.spaces[space_id]
1712 .for_path(*path_id)
1713 .pending_acks
1714 .acks_sent();
1715 self.timers.stop(
1716 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1717 self.qlog.with_time(now),
1718 );
1719 }
1720
1721 if builder.can_coalesce && path_id == PathId::ZERO && {
1729 let max_packet_size = builder
1730 .buf
1731 .datagram_remaining_mut()
1732 .saturating_sub(builder.predict_packet_end());
1733 max_packet_size > MIN_PACKET_SPACE
1734 && self.has_pending_packet(space_id, max_packet_size, connection_close_pending)
1735 } {
1736 trace!("will coalesce with next packet");
1739 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1740 } else {
1741 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1747 const MAX_PADDING: usize = 32;
1755 if builder.buf.datagram_remaining_mut()
1756 > builder.predict_packet_end() + MAX_PADDING
1757 {
1758 trace!(
1759 "GSO truncated by demand for {} padding bytes",
1760 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1761 );
1762 let last_pn = builder.packet_number;
1763 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1764 return PollPathSpaceStatus::Send {
1765 last_packet_number: last_pn,
1766 };
1767 }
1768
1769 builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1772 } else {
1773 builder.finish_and_track(now, self, path_id, pad_datagram);
1774 }
1775
1776 if transmit.num_datagrams() == 1 {
1779 transmit.clip_segment_size();
1780 }
1781 }
1782 }
1783 }
1784
1785 fn poll_transmit_mtu_probe(
1786 &mut self,
1787 now: Instant,
1788 buf: &mut Vec<u8>,
1789 path_id: PathId,
1790 ) -> Option<Transmit> {
1791 let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?;
1792
1793 let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize);
1795 transmit.start_new_datagram_with_size(probe_size as usize);
1796
1797 let mut builder =
1798 PacketBuilder::new(now, SpaceId::Data, path_id, active_cid, &mut transmit, self)?;
1799
1800 trace!(?probe_size, "writing MTUD probe");
1802 builder.write_frame(frame::Ping, &mut self.path_stats.for_path(path_id).frame_tx);
1803
1804 if self.peer_supports_ack_frequency() {
1806 builder.write_frame(
1807 frame::ImmediateAck,
1808 &mut self.path_stats.for_path(path_id).frame_tx,
1809 );
1810 }
1811
1812 builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1813
1814 self.path_stats.for_path(path_id).sent_plpmtud_probes += 1;
1815
1816 Some(self.build_transmit(path_id, transmit))
1817 }
1818
1819 fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> {
1827 let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?;
1828 let is_eligible = self.path_data(path_id).validated
1829 && !self.path_data(path_id).is_validating_path()
1830 && !self.abandoned_paths.contains(&path_id);
1831
1832 if !is_eligible {
1833 return None;
1834 }
1835 let next_pn = self.spaces[SpaceId::Data]
1836 .for_path(path_id)
1837 .peek_tx_number();
1838 let probe_size = self
1839 .path_data_mut(path_id)
1840 .mtud
1841 .poll_transmit(now, next_pn)?;
1842
1843 Some((active_cid, probe_size))
1844 }
1845
1846 fn has_pending_packet(
1863 &mut self,
1864 current_space_id: SpaceId,
1865 max_packet_size: usize,
1866 connection_close_pending: bool,
1867 ) -> bool {
1868 let mut space_id = current_space_id;
1869 loop {
1870 let can_send = self.space_can_send(
1871 space_id,
1872 PathId::ZERO,
1873 max_packet_size,
1874 connection_close_pending,
1875 );
1876 if !can_send.is_empty() {
1877 return true;
1878 }
1879 match space_id.next() {
1880 Some(next_space_id) => space_id = next_space_id,
1881 None => break,
1882 }
1883 }
1884 false
1885 }
1886
1887 fn path_congestion_check(
1889 &mut self,
1890 space_id: SpaceId,
1891 path_id: PathId,
1892 transmit: &TransmitBuf<'_>,
1893 can_send: &SendableFrames,
1894 now: Instant,
1895 ) -> PathBlocked {
1896 if self.side().is_server()
1902 && self
1903 .path_data(path_id)
1904 .anti_amplification_blocked(transmit.len() as u64 + 1)
1905 {
1906 trace!(?space_id, %path_id, "blocked by anti-amplification");
1907 return PathBlocked::AntiAmplification;
1908 }
1909
1910 let bytes_to_send = transmit.segment_size() as u64;
1913 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1914
1915 if can_send.other && !need_loss_probe && !can_send.close {
1916 let path = self.path_data(path_id);
1917 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1918 trace!(
1919 ?space_id,
1920 %path_id,
1921 in_flight=%path.in_flight.bytes,
1922 congestion_window=%path.congestion.window(),
1923 "blocked by congestion control",
1924 );
1925 return PathBlocked::Congestion;
1926 }
1927 }
1928
1929 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1931 let resume_time = now + delay;
1932 self.timers.set(
1933 Timer::PerPath(path_id, PathTimer::Pacing),
1934 resume_time,
1935 self.qlog.with_time(now),
1936 );
1937 trace!(?space_id, %path_id, ?delay, "blocked by pacing");
1940 return PathBlocked::Pacing;
1941 }
1942
1943 PathBlocked::No
1944 }
1945
1946 fn send_prev_path_challenge(
1951 &mut self,
1952 now: Instant,
1953 buf: &mut Vec<u8>,
1954 path_id: PathId,
1955 ) -> Option<Transmit> {
1956 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1957 if !prev_path.pending_on_path_challenge {
1958 return None;
1959 };
1960 prev_path.pending_on_path_challenge = false;
1961 let token = self.rng.random();
1962 let network_path = prev_path.network_path;
1963 prev_path.record_path_challenge_sent(now, token, network_path);
1964
1965 debug_assert_eq!(
1966 self.highest_space,
1967 SpaceKind::Data,
1968 "PATH_CHALLENGE queued without 1-RTT keys"
1969 );
1970 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1971 buf.start_new_datagram();
1972
1973 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, self)?;
1979 let challenge = frame::PathChallenge(token);
1980 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
1981 builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1982
1983 builder.pad_to(MIN_INITIAL_SIZE);
1988
1989 builder.finish(self, now);
1990 self.path_stats
1991 .for_path(path_id)
1992 .udp_tx
1993 .on_sent(1, buf.len());
1994
1995 trace!(
1996 dst = ?network_path.remote,
1997 src = ?network_path.local_ip,
1998 len = buf.len(),
1999 "sending prev_path off-path challenge",
2000 );
2001 Some(Transmit {
2002 destination: network_path.remote,
2003 size: buf.len(),
2004 ecn: None,
2005 segment_size: None,
2006 src_ip: network_path.local_ip,
2007 })
2008 }
2009
2010 fn send_off_path_path_response(
2011 &mut self,
2012 now: Instant,
2013 buf: &mut Vec<u8>,
2014 path_id: PathId,
2015 ) -> Option<Transmit> {
2016 let path = self.paths.get_mut(&path_id).map(|state| &mut state.data)?;
2017 let cid_queue = self.remote_cids.get_mut(&path_id)?;
2018 let (token, network_path) = path.path_responses.pop_off_path(path.network_path)?;
2019
2020 let cid = cid_queue.active();
2022
2023 let frame = frame::PathResponse(token);
2024
2025 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2026 buf.start_new_datagram();
2027
2028 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, self)?;
2029 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2030 builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
2031
2032 if self
2037 .find_validated_path_on_network_path(network_path)
2038 .is_none()
2039 && self.n0_nat_traversal.client_side().is_ok()
2040 {
2041 let token = self.rng.random();
2042 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2043 builder.write_frame(frame::PathChallenge(token), stats);
2044 let ip_port = (network_path.remote.ip(), network_path.remote.port());
2045 self.n0_nat_traversal.mark_probe_sent(ip_port, token);
2046 }
2047
2048 builder.pad_to(MIN_INITIAL_SIZE);
2051 builder.finish(self, now);
2052
2053 let size = buf.len();
2054 self.path_stats.for_path(path_id).udp_tx.on_sent(1, size);
2055
2056 trace!(
2057 dst = ?network_path.remote,
2058 src = ?network_path.local_ip,
2059 len = buf.len(),
2060 "sending off-path PATH_RESPONSE",
2061 );
2062 Some(Transmit {
2063 destination: network_path.remote,
2064 size,
2065 ecn: None,
2066 segment_size: None,
2067 src_ip: network_path.local_ip,
2068 })
2069 }
2070
2071 fn send_nat_traversal_path_challenge(
2073 &mut self,
2074 now: Instant,
2075 buf: &mut Vec<u8>,
2076 path_id: PathId,
2077 ) -> Option<Transmit> {
2078 let remote = self.n0_nat_traversal.next_probe_addr()?;
2079
2080 if !self.paths.get(&path_id)?.data.validated {
2081 return None;
2083 }
2084
2085 let Some(cid) = self
2090 .remote_cids
2091 .get(&path_id)
2092 .map(|cid_queue| cid_queue.active())
2093 else {
2094 trace!(%path_id, "Not sending NAT traversal probe for path with no CIDs");
2095 return None;
2096 };
2097 let token = self.rng.random();
2098
2099 let frame = frame::PathChallenge(token);
2100
2101 let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2102 buf.start_new_datagram();
2103
2104 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, self)?;
2105 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2106 builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
2107 builder.finish(self, now);
2110
2111 self.n0_nat_traversal.mark_probe_sent(remote, token);
2113
2114 let size = buf.len();
2115 self.path_stats.for_path(path_id).udp_tx.on_sent(1, size);
2116
2117 trace!(dst = ?remote, len = buf.len(), "sending off-path NAT probe");
2118 Some(Transmit {
2119 destination: remote.into(),
2120 size,
2121 ecn: None,
2122 segment_size: None,
2123 src_ip: None,
2124 })
2125 }
2126
2127 fn space_can_send(
2135 &mut self,
2136 space_id: SpaceId,
2137 path_id: PathId,
2138 packet_size: usize,
2139 connection_close_pending: bool,
2140 ) -> SendableFrames {
2141 let space = &mut self.spaces[space_id];
2142 let space_has_crypto = self.crypto_state.has_keys(space_id.encryption_level());
2143
2144 if !space_has_crypto
2145 && (space_id != SpaceId::Data
2146 || !self.crypto_state.has_keys(EncryptionLevel::ZeroRtt)
2147 || self.side.is_server())
2148 {
2149 return SendableFrames::empty();
2151 }
2152
2153 let mut can_send = space.can_send(path_id, &self.streams);
2154
2155 if space_id == SpaceId::Data {
2157 let pn = space.for_path(path_id).peek_tx_number();
2158 let frame_space_1rtt =
2164 packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
2165 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
2166 }
2167
2168 can_send.close = connection_close_pending && space_has_crypto;
2169
2170 can_send
2171 }
2172
2173 pub fn handle_event(&mut self, event: ConnectionEvent) {
2179 use ConnectionEventInner::*;
2180 match event.0 {
2181 Datagram(DatagramConnectionEvent {
2182 now,
2183 network_path,
2184 path_id,
2185 ecn,
2186 first_decode,
2187 remaining,
2188 }) => {
2189 let span = trace_span!("pkt", %path_id);
2190 let _guard = span.enter();
2191
2192 if self.early_discard_packet(network_path, path_id) {
2193 return;
2195 }
2196
2197 let was_anti_amplification_blocked = self
2198 .path(path_id)
2199 .map(|path| path.anti_amplification_blocked(1))
2200 .unwrap_or(false);
2203
2204 let rx = &mut self.path_stats.for_path(path_id).udp_rx;
2205 rx.datagrams += 1;
2206 rx.bytes += first_decode.len() as u64;
2207 let data_len = first_decode.len();
2208
2209 self.handle_decode(now, network_path, path_id, ecn, first_decode);
2210 if let Some(path) = self.path_mut(path_id) {
2215 path.inc_total_recvd(data_len as u64);
2216 }
2217
2218 if let Some(data) = remaining {
2219 self.path_stats.for_path(path_id).udp_rx.bytes += data.len() as u64;
2220 self.handle_coalesced(now, network_path, path_id, ecn, data);
2221 }
2222
2223 if let Some(path) = self.paths.get_mut(&path_id) {
2224 self.qlog
2225 .emit_recovery_metrics(path_id, &mut path.data, now);
2226 }
2227
2228 if was_anti_amplification_blocked {
2229 self.set_loss_detection_timer(now, path_id);
2233 }
2234 }
2235 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
2236 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
2237 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
2238
2239 if self.abandoned_paths.contains(&path_id) {
2242 if !self.state.is_drained() {
2243 for issued in &ids {
2244 self.endpoint_events
2245 .push_back(EndpointEventInner::RetireConnectionId(
2246 now,
2247 path_id,
2248 issued.sequence,
2249 false,
2250 ));
2251 }
2252 }
2253 return;
2254 }
2255
2256 let cid_state = self
2257 .local_cid_state
2258 .entry(path_id)
2259 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
2260 cid_state.new_cids(&ids, now);
2261
2262 ids.into_iter().rev().for_each(|frame| {
2263 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
2264 });
2265 self.reset_cid_retirement(now);
2267 }
2268 }
2269 }
2270
2271 fn early_discard_packet(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
2279 if self.is_handshaking() && path_id != PathId::ZERO {
2280 debug!(%network_path, %path_id, "discarding multipath packet during handshake");
2281 return true;
2282 }
2283
2284 let peer_may_probe = self.peer_may_probe();
2285 let local_ip_may_migrate = self.local_ip_may_migrate();
2286
2287 if let Some(known_path) = self.path_mut(path_id) {
2291 if network_path.remote != known_path.network_path.remote && !peer_may_probe {
2292 trace!(
2293 %path_id,
2294 %network_path,
2295 %known_path.network_path,
2296 "discarding packet from unrecognized peer"
2297 );
2298 return true;
2299 }
2300
2301 if known_path.network_path.local_ip.is_some()
2302 && network_path.local_ip.is_some()
2303 && known_path.network_path.local_ip != network_path.local_ip
2304 && !local_ip_may_migrate
2305 {
2306 trace!(
2307 %path_id,
2308 %network_path,
2309 %known_path.network_path,
2310 "discarding packet sent to incorrect interface"
2311 );
2312 return true;
2313 }
2314 }
2315 false
2316 }
2317
2318 fn peer_may_probe(&self) -> bool {
2329 match &self.side {
2330 ConnectionSide::Client { .. } => {
2331 if let Some(hs) = self.state.as_handshake() {
2332 hs.allow_server_migration
2333 } else {
2334 self.n0_nat_traversal.is_negotiated() && self.is_handshake_confirmed()
2335 }
2336 }
2337 ConnectionSide::Server { server_config } => {
2338 self.is_handshake_confirmed()
2339 && (server_config.migration || self.n0_nat_traversal.is_negotiated())
2340 }
2341 }
2342 }
2343
2344 fn peer_may_migrate(&self) -> bool {
2356 match &self.side {
2357 ConnectionSide::Server { server_config } => {
2358 server_config.migration && self.is_handshake_confirmed()
2359 }
2360 ConnectionSide::Client { .. } => false,
2361 }
2362 }
2363
2364 fn local_ip_may_migrate(&self) -> bool {
2377 (self.side.is_client() || self.n0_nat_traversal.is_negotiated())
2378 && self.is_handshake_confirmed()
2379 }
2380 pub fn handle_timeout(&mut self, now: Instant) {
2390 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
2391 let span = match timer {
2392 Timer::Conn(timer) => trace_span!("timeout", scope = "conn", ?timer),
2393 Timer::PerPath(path_id, timer) => {
2394 trace_span!("timer_fired", scope="path", %path_id, ?timer)
2395 }
2396 };
2397 let _guard = span.enter();
2398 trace!("timeout");
2399 match timer {
2400 Timer::Conn(timer) => match timer {
2401 ConnTimer::Close => {
2402 let was_draining = self.state.move_to_drained(None);
2403 if !was_draining {
2404 self.endpoint_events.push_back(EndpointEventInner::Draining);
2405 }
2406 self.endpoint_events.push_back(EndpointEventInner::Drained);
2409 }
2410 ConnTimer::Idle => {
2411 self.kill(ConnectionError::TimedOut);
2412 }
2413 ConnTimer::KeepAlive => {
2414 self.ping();
2415 }
2416 ConnTimer::KeyDiscard => {
2417 self.crypto_state.discard_temporary_keys();
2418 }
2419 ConnTimer::PushNewCid => {
2420 while let Some((path_id, when)) = self.next_cid_retirement() {
2421 if when > now {
2422 break;
2423 }
2424 match self.local_cid_state.get_mut(&path_id) {
2425 None => error!(%path_id, "No local CID state for path"),
2426 Some(cid_state) => {
2427 let num_new_cid = cid_state.on_cid_timeout().into();
2429 if !self.state.is_closed() {
2430 trace!(
2431 "push a new CID to peer RETIRE_PRIOR_TO field {}",
2432 cid_state.retire_prior_to()
2433 );
2434 self.endpoint_events.push_back(
2435 EndpointEventInner::NeedIdentifiers(
2436 path_id,
2437 now,
2438 num_new_cid,
2439 ),
2440 );
2441 }
2442 }
2443 }
2444 }
2445 }
2446 ConnTimer::NoAvailablePath => {
2447 if self.state.is_closed() || self.state.is_drained() {
2452 error!("no viable path timer fired, but connection already closing");
2455 } else {
2456 trace!("no viable path grace period expired, closing connection");
2457 let err = TransportError::NO_VIABLE_PATH(
2458 "last path abandoned, no new path opened",
2459 );
2460 self.close_common();
2461 self.set_close_timer(now);
2462 self.connection_close_pending = true;
2463 self.state.move_to_closed(err);
2464 }
2465 }
2466 ConnTimer::NatTraversalProbeRetry => {
2467 self.n0_nat_traversal.queue_retries(self.is_ipv6());
2468 if let Some(delay) =
2469 self.n0_nat_traversal.retry_delay(self.config.initial_rtt)
2470 {
2471 self.timers.set(
2472 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
2473 now + delay,
2474 self.qlog.with_time(now),
2475 );
2476 trace!("re-queued NAT probes");
2477 } else {
2478 trace!("no more NAT probes remaining");
2479 }
2480 }
2481 },
2482 Timer::PerPath(path_id, timer) => {
2483 match timer {
2484 PathTimer::PathIdle => {
2485 if let Err(err) =
2486 self.close_path_inner(now, path_id, PathAbandonReason::TimedOut)
2487 {
2488 warn!(?err, "failed closing path");
2489 }
2490 }
2491
2492 PathTimer::PathKeepAlive => {
2493 self.ping_path(path_id).ok();
2494 }
2495 PathTimer::LossDetection => {
2496 self.on_loss_detection_timeout(now, path_id);
2497 self.qlog.emit_recovery_metrics(
2498 path_id,
2499 &mut self
2500 .paths
2501 .get_mut(&path_id)
2502 .expect("loss-detection timer fires only on live paths")
2503 .data,
2504 now,
2505 );
2506 }
2507 PathTimer::PathValidationFailed => {
2508 let Some(path) = self.paths.get_mut(&path_id) else {
2509 continue;
2510 };
2511 self.timers.stop(
2512 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2513 self.qlog.with_time(now),
2514 );
2515 debug!("path migration validation failed");
2516 if let Some((_, prev)) = path.prev.take() {
2517 path.data = prev;
2518 }
2519 path.data.reset_on_path_challenges();
2520 }
2521 PathTimer::PathChallengeLost => {
2522 let Some(path) = self.paths.get_mut(&path_id) else {
2523 continue;
2524 };
2525 trace!(?path.data.on_path_challenges_lost, "path challenge deemed lost");
2526 path.data.pending_on_path_challenge = true;
2527 path.data.on_path_challenges_lost += 1;
2528 }
2529 PathTimer::AbandonFromValidation => {
2530 let Some(path) = self.paths.get_mut(&path_id) else {
2531 continue;
2532 };
2533 path.data.reset_on_path_challenges();
2534 self.timers.stop(
2535 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2536 self.qlog.with_time(now),
2537 );
2538 debug!("new path validation failed");
2539 if let Err(err) = self.close_path_inner(
2540 now,
2541 path_id,
2542 PathAbandonReason::ValidationFailed,
2543 ) {
2544 warn!(?err, "failed closing path");
2545 }
2546 }
2547 PathTimer::Pacing => {}
2548 PathTimer::MaxAckDelay => {
2549 self.spaces[SpaceId::Data]
2551 .for_path(path_id)
2552 .pending_acks
2553 .on_max_ack_delay_timeout()
2554 }
2555 PathTimer::PathDrained => {
2556 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2559 if let Some(local_cid_state) = self.local_cid_state.remove(&path_id) {
2560 debug_assert!(!self.state.is_drained()); let (min_seq, max_seq) = local_cid_state.active_seq();
2562 for seq in min_seq..=max_seq {
2563 self.endpoint_events.push_back(
2564 EndpointEventInner::RetireConnectionId(
2565 now, path_id, seq, false,
2566 ),
2567 );
2568 }
2569 }
2570 self.discard_path(path_id, now);
2571 }
2572 }
2573 }
2574 }
2575 }
2576 }
2577
2578 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2590 self.close_inner(
2591 now,
2592 Close::Application(frame::ApplicationClose { error_code, reason }),
2593 )
2594 }
2595
2596 fn close_inner(&mut self, now: Instant, reason: Close) {
2612 let was_closed = self.state.is_closed();
2613 if !was_closed {
2614 self.close_common();
2615 self.set_close_timer(now);
2616 self.connection_close_pending = true;
2617 self.state.move_to_closed_local(reason);
2618 }
2619 }
2620
2621 pub fn datagrams(&mut self) -> Datagrams<'_> {
2623 Datagrams { conn: self }
2624 }
2625
2626 pub fn stats(&mut self) -> ConnectionStats {
2628 let mut stats = self.partial_stats.clone();
2629
2630 for path_stats in self.path_stats.iter_stats() {
2631 stats += *path_stats;
2636 }
2637
2638 stats
2639 }
2640
2641 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2643 let path = self.paths.get(&path_id)?;
2644 let stats = self.path_stats.for_path(path_id);
2645 stats.rtt = path.data.rtt.get();
2646 stats.cwnd = path.data.congestion.window();
2647 stats.current_mtu = path.data.mtud.current_mtu();
2648 Some(*stats)
2649 }
2650
2651 pub fn ping(&mut self) {
2655 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2658 path_data.ping_pending = true;
2659 }
2660 }
2661
2662 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2666 let path_data = self.spaces[self.highest_space]
2667 .number_spaces
2668 .get_mut(&path)
2669 .ok_or(ClosedPath { _private: () })?;
2670 path_data.ping_pending = true;
2671 Ok(())
2672 }
2673
2674 pub fn force_key_update(&mut self) {
2678 if !self.state.is_established() {
2679 debug!("ignoring forced key update in illegal state");
2680 return;
2681 }
2682 if self.crypto_state.prev_crypto.is_some() {
2683 debug!("ignoring redundant forced key update");
2686 return;
2687 }
2688 self.crypto_state.update_keys(None, false);
2689 }
2690
2691 pub fn crypto_session(&self) -> &dyn crypto::Session {
2693 self.crypto_state.session.as_ref()
2694 }
2695
2696 pub fn is_handshaking(&self) -> bool {
2706 self.state.is_handshake()
2707 }
2708
2709 pub fn is_closed(&self) -> bool {
2720 self.state.is_closed()
2721 }
2722
2723 pub fn is_drained(&self) -> bool {
2728 self.state.is_drained()
2729 }
2730
2731 pub fn accepted_0rtt(&self) -> bool {
2735 self.crypto_state.accepted_0rtt
2736 }
2737
2738 pub fn has_0rtt(&self) -> bool {
2740 self.crypto_state.zero_rtt_enabled
2741 }
2742
2743 pub fn has_pending_retransmits(&self) -> bool {
2745 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2746 }
2747
2748 pub fn side(&self) -> Side {
2750 self.side.side()
2751 }
2752
2753 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2755 self.path(path_id)
2756 .map(|path_data| {
2757 path_data
2758 .last_observed_addr_report
2759 .as_ref()
2760 .map(|observed| observed.socket_addr())
2761 })
2762 .ok_or(ClosedPath { _private: () })
2763 }
2764
2765 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2767 self.path(path_id).map(|d| d.rtt.get())
2768 }
2769
2770 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2772 self.path(path_id).map(|d| d.congestion.as_ref())
2773 }
2774
2775 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2780 self.streams.set_max_concurrent(dir, count);
2781 let pending = &mut self.spaces[SpaceId::Data].pending;
2784 self.streams.queue_max_stream_id(pending);
2785 }
2786
2787 pub fn set_max_concurrent_paths(
2797 &mut self,
2798 now: Instant,
2799 count: NonZeroU32,
2800 ) -> Result<(), MultipathNotNegotiated> {
2801 if !self.is_multipath_negotiated() {
2802 return Err(MultipathNotNegotiated { _private: () });
2803 }
2804 self.max_concurrent_paths = count;
2805
2806 let in_use_count = self
2807 .local_max_path_id
2808 .next()
2809 .saturating_sub(self.abandoned_paths.len() as u32)
2810 .as_u32();
2811 let extra_needed = count.get().saturating_sub(in_use_count);
2812 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2813
2814 self.set_max_path_id(now, new_max_path_id);
2815
2816 Ok(())
2817 }
2818
2819 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2821 if max_path_id <= self.local_max_path_id {
2822 return;
2823 }
2824
2825 self.local_max_path_id = max_path_id;
2826 self.spaces[SpaceId::Data].pending.max_path_id = true;
2827
2828 self.issue_first_path_cids(now);
2829 }
2830
2831 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2837 self.streams.max_concurrent(dir)
2838 }
2839
2840 pub fn set_send_window(&mut self, send_window: u64) {
2842 self.streams.set_send_window(send_window);
2843 }
2844
2845 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2847 if self.streams.set_receive_window(receive_window) {
2848 self.spaces[SpaceId::Data].pending.max_data = true;
2849 }
2850 }
2851
2852 pub fn is_multipath_negotiated(&self) -> bool {
2857 !self.is_handshaking()
2858 && self.config.max_concurrent_multipath_paths.is_some()
2859 && self.peer_params.initial_max_path_id.is_some()
2860 }
2861
2862 fn on_ack_received(
2863 &mut self,
2864 now: Instant,
2865 space: SpaceId,
2866 ack: frame::Ack,
2867 ) -> Result<(), TransportError> {
2868 let path = PathId::ZERO;
2870 self.inner_on_ack_received(now, space, path, ack)
2871 }
2872
2873 fn on_path_ack_received(
2874 &mut self,
2875 now: Instant,
2876 space: SpaceId,
2877 path_ack: frame::PathAck,
2878 ) -> Result<(), TransportError> {
2879 let (ack, path) = path_ack.into_ack();
2880 self.inner_on_ack_received(now, space, path, ack)
2881 }
2882
2883 fn inner_on_ack_received(
2885 &mut self,
2886 now: Instant,
2887 space: SpaceId,
2888 path: PathId,
2889 ack: frame::Ack,
2890 ) -> Result<(), TransportError> {
2891 if !self.spaces[space].number_spaces.contains_key(&path) {
2892 if self.abandoned_paths.contains(&path) {
2893 trace!("silently ignoring PATH_ACK on discarded path");
2899 return Ok(());
2900 } else {
2901 return Err(TransportError::PROTOCOL_VIOLATION(
2902 "received PATH_ACK with path ID never used",
2903 ));
2904 }
2905 }
2906 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2907 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2908 }
2909 let new_largest_pn = {
2911 let space = &mut self.spaces[space].for_path(path);
2912 if space
2913 .largest_acked_packet_pn
2914 .is_none_or(|pn| ack.largest > pn)
2915 {
2916 space.largest_acked_packet_pn = Some(ack.largest);
2917 if let Some(info) = space.sent_packets.get(ack.largest) {
2918 space.largest_acked_packet_send_time = info.time_sent;
2922 }
2923 Some(ack.largest)
2924 } else {
2925 None
2926 }
2927 };
2928
2929 if self.detect_spurious_loss(&ack, space, path) {
2930 self.path_stats.for_path(path).spurious_congestion_events += 1;
2931 self.path_data_mut(path)
2932 .congestion
2933 .on_spurious_congestion_event();
2934 }
2935
2936 let mut newly_acked = ArrayRangeSet::new();
2938 for range in ack.iter() {
2939 self.spaces[space].for_path(path).check_ack(range.clone())?;
2940 for (pn, _) in self.spaces[space]
2941 .for_path(path)
2942 .sent_packets
2943 .iter_range(range)
2944 {
2945 newly_acked.insert_one(pn);
2946 }
2947 }
2948
2949 if newly_acked.is_empty() {
2950 return Ok(());
2951 }
2952
2953 let mut ack_eliciting_acked = false;
2954 for packet in newly_acked.elts() {
2955 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2956 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2957 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2963 pns.pending_acks.subtract_below(*acked_pn);
2964 }
2965 }
2966 ack_eliciting_acked |= info.ack_eliciting;
2967
2968 let path_data = self.path_data_mut(path);
2970 let mtu_updated = path_data.mtud.on_acked(space.kind(), packet, info.size);
2971 if mtu_updated {
2972 path_data
2973 .congestion
2974 .on_mtu_update(path_data.mtud.current_mtu());
2975 }
2976
2977 self.ack_frequency.on_acked(path, packet);
2979
2980 self.on_packet_acked(now, path, packet, info);
2981 }
2982 }
2983
2984 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet_pn;
2985 let path_data = self.path_data_mut(path);
2986 let app_limited = path_data.app_limited;
2987 let in_flight = path_data.in_flight.bytes;
2988
2989 path_data
2990 .congestion
2991 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2992
2993 if new_largest_pn.is_some() && ack_eliciting_acked {
2994 let ack_delay = if space != SpaceId::Data {
2995 Duration::from_micros(0)
2996 } else {
2997 cmp::min(
2998 self.ack_frequency.peer_max_ack_delay,
2999 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
3000 )
3001 };
3002 let rtt = now.saturating_duration_since(
3003 self.spaces[space]
3004 .for_path(path)
3005 .largest_acked_packet_send_time,
3006 );
3007
3008 let next_pn = self.spaces[space].for_path(path).next_packet_number;
3009 let path_data = self.path_data_mut(path);
3010 path_data.rtt.update(ack_delay, rtt);
3012 if path_data.first_packet_after_rtt_sample.is_none() {
3013 path_data.first_packet_after_rtt_sample = Some((space.kind(), next_pn));
3014 }
3015 }
3016
3017 self.detect_lost_packets(now, space, path, true);
3019
3020 if self.peer_completed_handshake_address_validation() {
3025 self.path_data_mut(path).pto_count = 0;
3026 }
3027
3028 if self.path_data(path).sending_ecn {
3033 if let Some(ecn) = ack.ecn {
3034 if let Some(largest_sent_pn) = new_largest_pn {
3039 let sent = self.spaces[space]
3040 .for_path(path)
3041 .largest_acked_packet_send_time;
3042 self.process_ecn(
3043 now,
3044 space,
3045 path,
3046 newly_acked.len() as u64,
3047 ecn,
3048 sent,
3049 largest_sent_pn,
3050 );
3051 }
3052 } else {
3053 debug!("ECN not acknowledged by peer");
3055 self.path_data_mut(path).sending_ecn = false;
3056 }
3057 }
3058
3059 self.set_loss_detection_timer(now, path);
3060 Ok(())
3061 }
3062
3063 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
3064 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
3065
3066 if lost_packets.is_empty() {
3067 return false;
3068 }
3069
3070 for range in ack.iter() {
3071 let spurious_losses: Vec<u64> = lost_packets
3072 .iter_range(range.clone())
3073 .map(|(pn, _info)| pn)
3074 .collect();
3075
3076 for pn in spurious_losses {
3077 lost_packets.remove(pn);
3078 }
3079 }
3080
3081 lost_packets.is_empty()
3086 }
3087
3088 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
3093 let two_pto = 2 * self.path_data(path).rtt.pto_base();
3094
3095 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
3096 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
3097 }
3098
3099 fn process_ecn(
3101 &mut self,
3102 now: Instant,
3103 space: SpaceId,
3104 path: PathId,
3105 newly_acked_pn: u64,
3106 ecn: frame::EcnCounts,
3107 largest_sent_time: Instant,
3108 largest_sent_pn: u64,
3109 ) {
3110 match self.spaces[space]
3111 .for_path(path)
3112 .detect_ecn(newly_acked_pn, ecn)
3113 {
3114 Err(e) => {
3115 debug!("halting ECN due to verification failure: {}", e);
3116
3117 self.path_data_mut(path).sending_ecn = false;
3118 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
3121 }
3122 Ok(false) => {}
3123 Ok(true) => {
3124 self.path_stats.for_path(path).congestion_events += 1;
3125 self.path_data_mut(path).congestion.on_congestion_event(
3126 now,
3127 largest_sent_time,
3128 false,
3129 true,
3130 0,
3131 largest_sent_pn,
3132 );
3133 }
3134 }
3135 }
3136
3137 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, pn: u64, info: SentPacket) {
3140 let path = self.path_data_mut(path_id);
3141 let app_limited = path.app_limited;
3142 path.remove_in_flight(&info);
3143 if info.ack_eliciting && info.path_generation == path.generation() {
3144 let rtt = path.rtt;
3148 path.congestion
3149 .on_ack(now, info.time_sent, info.size.into(), pn, app_limited, &rtt);
3150 }
3151
3152 if let Some(retransmits) = info.retransmits.get() {
3154 for (id, _) in retransmits.reset_stream.iter() {
3155 self.streams.reset_acked(*id);
3156 }
3157 }
3158
3159 for frame in info.stream_frames {
3160 self.streams.received_ack_of(frame);
3161 }
3162 }
3163
3164 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceKind) {
3165 let start = if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) {
3166 now
3167 } else {
3168 self.crypto_state
3169 .prev_crypto
3170 .as_ref()
3171 .expect("no previous keys")
3172 .end_packet
3173 .as_ref()
3174 .expect("update not acknowledged yet")
3175 .1
3176 };
3177
3178 self.timers.set(
3180 Timer::Conn(ConnTimer::KeyDiscard),
3181 start + self.max_pto_for_space(space) * 3,
3182 self.qlog.with_time(now),
3183 );
3184 }
3185
3186 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
3199 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
3200 self.detect_lost_packets(now, pn_space, path_id, false);
3202 self.set_loss_detection_timer(now, path_id);
3203 return;
3204 }
3205
3206 let Some((_, space)) = self.pto_time_and_space(now, path_id) else {
3207 error!(%path_id, "PTO expired while unset");
3208 return;
3209 };
3210 trace!(
3211 in_flight = self.path_data(path_id).in_flight.bytes,
3212 count = self.path_data(path_id).pto_count,
3213 ?space,
3214 %path_id,
3215 "PTO fired"
3216 );
3217
3218 let count = match self.path_data(path_id).in_flight.ack_eliciting {
3219 0 => {
3222 debug_assert!(!self.peer_completed_handshake_address_validation());
3223 1
3224 }
3225 _ => 2,
3227 };
3228 let pns = self.spaces[space].for_path(path_id);
3229 pns.loss_probes = pns.loss_probes.saturating_add(count);
3230 let path_data = self.path_data_mut(path_id);
3231 path_data.pto_count = path_data.pto_count.saturating_add(1);
3232 self.set_loss_detection_timer(now, path_id);
3233 }
3234
3235 fn detect_lost_packets(
3252 &mut self,
3253 now: Instant,
3254 pn_space: SpaceId,
3255 path_id: PathId,
3256 due_to_ack: bool,
3257 ) {
3258 let mut lost_packets = Vec::<u64>::new();
3259 let mut lost_mtu_probe = None;
3260 let mut in_persistent_congestion = false;
3261 let mut size_of_lost_packets = 0u64;
3262 self.spaces[pn_space].for_path(path_id).loss_time = None;
3263
3264 let path = self.path_data(path_id);
3267 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3268 let loss_delay = path
3269 .rtt
3270 .conservative()
3271 .mul_f32(self.config.time_threshold)
3272 .max(TIMER_GRANULARITY);
3273 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
3274
3275 let largest_acked_packet_pn = self.spaces[pn_space]
3276 .for_path(path_id)
3277 .largest_acked_packet_pn
3278 .expect("detect_lost_packets only to be called if path received at least one ACK");
3279 let packet_threshold = self.config.packet_threshold as u64;
3280
3281 let congestion_period = self
3285 .pto(SpaceKind::Data, path_id)
3286 .saturating_mul(self.config.persistent_congestion_threshold);
3287 let mut persistent_congestion_start: Option<Instant> = None;
3288 let mut prev_packet = None;
3289 let space = self.spaces[pn_space].for_path(path_id);
3290
3291 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet_pn) {
3292 if prev_packet != Some(packet.wrapping_sub(1)) {
3293 persistent_congestion_start = None;
3295 }
3296
3297 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
3301 if packet_too_old || largest_acked_packet_pn >= packet + packet_threshold {
3302 if Some(packet) == in_flight_mtu_probe {
3304 lost_mtu_probe = in_flight_mtu_probe;
3307 } else {
3308 lost_packets.push(packet);
3309 size_of_lost_packets += info.size as u64;
3310 if info.ack_eliciting && due_to_ack {
3311 match persistent_congestion_start {
3312 Some(start) if info.time_sent - start > congestion_period => {
3315 in_persistent_congestion = true;
3316 }
3317 None if first_packet_after_rtt_sample
3319 .is_some_and(|x| x < (pn_space.kind(), packet)) =>
3320 {
3321 persistent_congestion_start = Some(info.time_sent);
3322 }
3323 _ => {}
3324 }
3325 }
3326 }
3327 } else {
3328 if space.loss_time.is_none() {
3330 space.loss_time = Some(info.time_sent + loss_delay);
3333 }
3334 persistent_congestion_start = None;
3335 }
3336
3337 prev_packet = Some(packet);
3338 }
3339
3340 self.handle_lost_packets(
3341 pn_space,
3342 path_id,
3343 now,
3344 lost_packets,
3345 lost_mtu_probe,
3346 loss_delay,
3347 in_persistent_congestion,
3348 size_of_lost_packets,
3349 );
3350 }
3351
3352 fn discard_path(&mut self, path_id: PathId, now: Instant) {
3354 trace!(%path_id, "dropping path state");
3355 let path = self.path_data(path_id);
3356 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3357
3358 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
3360 .for_path(path_id)
3361 .sent_packets
3362 .iter()
3363 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
3364 .map(|(pn, info)| {
3365 size_of_lost_packets += info.size as u64;
3366 pn
3367 })
3368 .collect();
3369
3370 if !lost_pns.is_empty() {
3371 trace!(
3372 %path_id,
3373 count = lost_pns.len(),
3374 lost_bytes = size_of_lost_packets,
3375 "packets lost on path abandon"
3376 );
3377 self.handle_lost_packets(
3378 SpaceId::Data,
3379 path_id,
3380 now,
3381 lost_pns,
3382 in_flight_mtu_probe,
3383 Duration::ZERO,
3384 false,
3385 size_of_lost_packets,
3386 );
3387 }
3388 let path_stats = self.path_stats.discard(&path_id);
3391 self.partial_stats += path_stats;
3392 self.paths.remove(&path_id);
3393 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
3394
3395 self.events.push_back(
3396 PathEvent::Discarded {
3397 id: path_id,
3398 path_stats: Box::new(path_stats),
3399 }
3400 .into(),
3401 );
3402 }
3403
3404 fn handle_lost_packets(
3405 &mut self,
3406 pn_space: SpaceId,
3407 path_id: PathId,
3408 now: Instant,
3409 lost_packets: Vec<u64>,
3410 lost_mtu_probe: Option<u64>,
3411 loss_delay: Duration,
3412 in_persistent_congestion: bool,
3413 size_of_lost_packets: u64,
3414 ) {
3415 debug_assert!(lost_packets.is_sorted(), "lost_packets must be sorted");
3416
3417 self.drain_lost_packets(now, pn_space, path_id);
3418
3419 if let Some(largest_lost) = lost_packets.last().cloned() {
3421 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
3422 let largest_lost_sent = self.spaces[pn_space]
3423 .for_path(path_id)
3424 .sent_packets
3425 .get(largest_lost)
3426 .unwrap()
3427 .time_sent;
3428 let path_stats = self.path_stats.for_path(path_id);
3429 path_stats.lost_packets += lost_packets.len() as u64;
3430 path_stats.lost_bytes += size_of_lost_packets;
3431 trace!(
3432 %path_id,
3433 count = lost_packets.len(),
3434 lost_bytes = size_of_lost_packets,
3435 "packets lost",
3436 );
3437
3438 for &packet in &lost_packets {
3439 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
3440 continue;
3441 };
3442 self.qlog
3443 .emit_packet_lost(packet, &info, loss_delay, pn_space.kind(), now);
3444 self.paths
3445 .get_mut(&path_id)
3446 .unwrap()
3447 .remove_in_flight(&info);
3448
3449 for frame in info.stream_frames {
3450 self.streams.retransmit(frame);
3451 }
3452 self.spaces[pn_space].pending |= info.retransmits;
3453 let path = self.path_data_mut(path_id);
3454 path.mtud.on_non_probe_lost(packet, info.size);
3455 path.congestion.on_packet_lost(info.size, packet, now);
3456
3457 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
3458 packet,
3459 LostPacket {
3460 time_sent: info.time_sent,
3461 },
3462 );
3463 }
3464
3465 let path = self.path_data_mut(path_id);
3466 if path.mtud.black_hole_detected(now) {
3467 path.congestion.on_mtu_update(path.mtud.current_mtu());
3468 if let Some(max_datagram_size) = self.datagrams().max_size()
3469 && self.datagrams.drop_oversized(max_datagram_size)
3470 && self.datagrams.send_blocked
3471 {
3472 self.datagrams.send_blocked = false;
3473 self.events.push_back(Event::DatagramsUnblocked);
3474 }
3475 self.path_stats.for_path(path_id).black_holes_detected += 1;
3476 }
3477
3478 let lost_ack_eliciting =
3480 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
3481
3482 if lost_ack_eliciting {
3483 self.path_stats.for_path(path_id).congestion_events += 1;
3484 self.path_data_mut(path_id).congestion.on_congestion_event(
3485 now,
3486 largest_lost_sent,
3487 in_persistent_congestion,
3488 false,
3489 size_of_lost_packets,
3490 largest_lost,
3491 );
3492 }
3493 }
3494
3495 if let Some(packet) = lost_mtu_probe {
3497 let info = self.spaces[SpaceId::Data]
3498 .for_path(path_id)
3499 .take(packet)
3500 .unwrap(); self.paths
3503 .get_mut(&path_id)
3504 .unwrap()
3505 .remove_in_flight(&info);
3506 self.path_data_mut(path_id).mtud.on_probe_lost();
3507 self.path_stats.for_path(path_id).lost_plpmtud_probes += 1;
3508 }
3509 }
3510
3511 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
3517 SpaceId::iter()
3518 .filter_map(|id| {
3519 self.spaces[id]
3520 .number_spaces
3521 .get(&path_id)
3522 .and_then(|pns| pns.loss_time)
3523 .map(|time| (time, id))
3524 })
3525 .min_by_key(|&(time, _)| time)
3526 }
3527
3528 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3536 let path = self.path(path_id)?;
3537 let pto_count = path.pto_count;
3538
3539 let max_interval = if path.rtt.get() > SLOW_RTT_THRESHOLD {
3541 (path.rtt.get() * 3) / 2
3543 } else if let Some(idle) = path.idle_timeout.or(self.idle_timeout)
3544 && idle <= MIN_IDLE_FOR_FAST_PTO
3545 {
3546 MAX_PTO_FAST_INTERVAL
3549 } else {
3550 MAX_PTO_INTERVAL
3552 };
3553
3554 if path_id == PathId::ZERO
3555 && path.in_flight.ack_eliciting == 0
3556 && !self.peer_completed_handshake_address_validation()
3557 {
3558 let space = match self.highest_space {
3564 SpaceKind::Handshake => SpaceId::Handshake,
3565 _ => SpaceId::Initial,
3566 };
3567
3568 let backoff = 2u32.pow(path.pto_count.min(MAX_BACKOFF_EXPONENT));
3569 let duration = path.rtt.pto_base() * backoff;
3570 let duration = duration.min(max_interval);
3571 return Some((now + duration, space));
3572 }
3573
3574 let mut result = None;
3575 for space in SpaceId::iter() {
3576 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3577 continue;
3578 };
3579
3580 if space == SpaceId::Data && !self.is_handshake_confirmed() {
3581 continue;
3585 }
3586
3587 if !pns.has_in_flight() {
3588 continue;
3589 }
3590
3591 let duration = {
3596 let max_ack_delay = if space == SpaceId::Data {
3597 self.ack_frequency.max_ack_delay_for_pto()
3598 } else {
3599 Duration::ZERO
3600 };
3601 let pto_base = path.rtt.pto_base() + max_ack_delay;
3602 let mut duration = pto_base;
3603 for i in 1..=pto_count {
3604 let exponential_duration = pto_base * 2u32.pow(i.min(MAX_BACKOFF_EXPONENT));
3605 let max_duration = duration + max_interval;
3606 duration = exponential_duration.min(max_duration);
3607 }
3608 duration
3609 };
3610
3611 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3612 continue;
3613 };
3614 let pto = last_ack_eliciting + duration;
3617 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3618 if path.anti_amplification_blocked(1) {
3619 continue;
3621 }
3622 if path.in_flight.ack_eliciting == 0 {
3623 continue;
3625 }
3626 result = Some((pto, space));
3627 }
3628 }
3629 result
3630 }
3631
3632 fn peer_completed_handshake_address_validation(&self) -> bool {
3634 if self.side.is_server() || self.state.is_closed() {
3635 return true;
3636 }
3637 self.spaces[SpaceId::Handshake]
3641 .path_space(PathId::ZERO)
3642 .and_then(|pns| pns.largest_acked_packet_pn)
3643 .is_some()
3644 || self.spaces[SpaceId::Data]
3645 .path_space(PathId::ZERO)
3646 .and_then(|pns| pns.largest_acked_packet_pn)
3647 .is_some()
3648 || (self.crypto_state.has_keys(EncryptionLevel::OneRtt)
3649 && !self.crypto_state.has_keys(EncryptionLevel::Handshake))
3650 }
3651
3652 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3660 if self.state.is_closed() {
3661 return;
3665 }
3666
3667 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3668 self.timers.set(
3670 Timer::PerPath(path_id, PathTimer::LossDetection),
3671 loss_time,
3672 self.qlog.with_time(now),
3673 );
3674 return;
3675 }
3676
3677 if !self.abandoned_paths.contains(&path_id)
3680 && let Some((timeout, _)) = self.pto_time_and_space(now, path_id)
3681 {
3682 self.timers.set(
3683 Timer::PerPath(path_id, PathTimer::LossDetection),
3684 timeout,
3685 self.qlog.with_time(now),
3686 );
3687 } else {
3688 self.timers.stop(
3689 Timer::PerPath(path_id, PathTimer::LossDetection),
3690 self.qlog.with_time(now),
3691 );
3692 }
3693 }
3694
3695 fn max_pto_for_space(&self, space: SpaceKind) -> Duration {
3699 self.paths
3700 .keys()
3701 .map(|path_id| self.pto(space, *path_id))
3702 .max()
3703 .unwrap_or_else(|| {
3704 let rtt = self.config.initial_rtt;
3708 let max_ack_delay = match space {
3709 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3710 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3711 };
3712 rtt + cmp::max(4 * (rtt / 2), TIMER_GRANULARITY) + max_ack_delay
3713 })
3714 }
3715
3716 fn pto(&self, space: SpaceKind, path_id: PathId) -> Duration {
3721 let max_ack_delay = match space {
3722 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3723 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3724 };
3725 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3726 }
3727
3728 fn on_packet_authenticated(
3729 &mut self,
3730 now: Instant,
3731 space_id: SpaceKind,
3732 path_id: PathId,
3733 ecn: Option<EcnCodepoint>,
3734 packet_number: Option<u64>,
3735 spin: bool,
3736 is_1rtt: bool,
3737 remote: &FourTuple,
3738 ) {
3739 let is_on_path = self
3746 .path_data(path_id)
3747 .network_path
3748 .is_probably_same_path(remote);
3749
3750 self.total_authed_packets += 1;
3751 self.reset_keep_alive(path_id, now);
3752 self.reset_idle_timeout(now, space_id, path_id);
3753 self.path_data_mut(path_id).permit_idle_reset = true;
3754
3755 if is_on_path {
3758 self.receiving_ecn |= ecn.is_some();
3759 if let Some(x) = ecn {
3760 let space = &mut self.spaces[space_id];
3761 space.for_path(path_id).ecn_counters += x;
3762
3763 if x.is_ce() {
3764 space
3765 .for_path(path_id)
3766 .pending_acks
3767 .set_immediate_ack_required();
3768 }
3769 }
3770 }
3771
3772 let Some(packet_number) = packet_number else {
3773 return;
3774 };
3775 match &self.side {
3776 ConnectionSide::Client { .. } => {
3777 if space_id == SpaceKind::Handshake
3781 && let Some(hs) = self.state.as_handshake_mut()
3782 {
3783 hs.allow_server_migration = false;
3784 }
3785 }
3786 ConnectionSide::Server { .. } => {
3787 if self.crypto_state.has_keys(EncryptionLevel::Initial)
3788 && space_id == SpaceKind::Handshake
3789 {
3790 self.discard_space(now, SpaceKind::Initial);
3792 }
3793 if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) && is_1rtt {
3794 self.set_key_discard_timer(now, space_id)
3796 }
3797 }
3798 }
3799 let space = self.spaces[space_id].for_path(path_id);
3800
3801 space.pending_acks.insert_one(packet_number, now);
3802 if packet_number >= space.largest_received_packet_number.unwrap_or_default() {
3803 space.largest_received_packet_number = Some(packet_number);
3804
3805 if is_on_path {
3807 self.spin = self.side.is_client() ^ spin;
3808 }
3809 }
3810 }
3811
3812 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceKind, path_id: PathId) {
3817 if let Some(timeout) = self.idle_timeout {
3819 if self.state.is_closed() {
3820 self.timers
3821 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3822 } else {
3823 let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space));
3824 self.timers.set(
3825 Timer::Conn(ConnTimer::Idle),
3826 now + dt,
3827 self.qlog.with_time(now),
3828 );
3829 }
3830 }
3831
3832 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3834 if self.state.is_closed() {
3835 self.timers.stop(
3836 Timer::PerPath(path_id, PathTimer::PathIdle),
3837 self.qlog.with_time(now),
3838 );
3839 } else {
3840 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3841 self.timers.set(
3842 Timer::PerPath(path_id, PathTimer::PathIdle),
3843 now + dt,
3844 self.qlog.with_time(now),
3845 );
3846 }
3847 }
3848 }
3849
3850 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3852 if !self.state.is_established() {
3853 return;
3854 }
3855
3856 if let Some(interval) = self.config.keep_alive_interval {
3857 self.timers.set(
3858 Timer::Conn(ConnTimer::KeepAlive),
3859 now + interval,
3860 self.qlog.with_time(now),
3861 );
3862 }
3863
3864 if let Some(interval) = self.path_data(path_id).keep_alive {
3865 self.timers.set(
3866 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3867 now + interval,
3868 self.qlog.with_time(now),
3869 );
3870 }
3871 }
3872
3873 fn reset_cid_retirement(&mut self, now: Instant) {
3875 if let Some((_path, t)) = self.next_cid_retirement() {
3876 self.timers.set(
3877 Timer::Conn(ConnTimer::PushNewCid),
3878 t,
3879 self.qlog.with_time(now),
3880 );
3881 }
3882 }
3883
3884 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3886 self.local_cid_state
3887 .iter()
3888 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3889 .min_by_key(|(_path_id, timeout)| *timeout)
3890 }
3891
3892 pub(crate) fn handle_first_packet(
3897 &mut self,
3898 now: Instant,
3899 network_path: FourTuple,
3900 ecn: Option<EcnCodepoint>,
3901 packet_number: u64,
3902 packet: InitialPacket,
3903 remaining: Option<BytesMut>,
3904 ) -> Result<(), ConnectionError> {
3905 let span = trace_span!("first recv");
3906 let _guard = span.enter();
3907 debug_assert!(self.side.is_server());
3908 let len = packet.header_data.len() + packet.payload.len();
3909 let path_id = PathId::ZERO;
3910 self.path_data_mut(path_id).total_recvd = len as u64;
3911
3912 if let Some(hs) = self.state.as_handshake_mut() {
3913 hs.expected_token = packet.header.token.clone();
3914 } else {
3915 unreachable!("first packet must be delivered in Handshake state");
3916 }
3917
3918 self.on_packet_authenticated(
3920 now,
3921 SpaceKind::Initial,
3922 path_id,
3923 ecn,
3924 Some(packet_number),
3925 false,
3926 false,
3927 &network_path,
3928 );
3929
3930 let packet: Packet = packet.into();
3931
3932 let mut qlog = QlogRecvPacket::new(len);
3933 qlog.header(&packet.header, Some(packet_number), path_id);
3934
3935 self.process_decrypted_packet(
3936 now,
3937 network_path,
3938 path_id,
3939 Some(packet_number),
3940 packet,
3941 &mut qlog,
3942 )?;
3943 self.qlog.emit_packet_received(qlog, now);
3944 if let Some(data) = remaining {
3945 self.handle_coalesced(now, network_path, path_id, ecn, data);
3946 }
3947
3948 self.qlog.emit_recovery_metrics(
3949 path_id,
3950 &mut self
3951 .paths
3952 .get_mut(&path_id)
3953 .expect("path_id was supplied by the caller for an active path")
3954 .data,
3955 now,
3956 );
3957
3958 Ok(())
3959 }
3960
3961 fn init_0rtt(&mut self, now: Instant) {
3962 let Some((header, packet)) = self.crypto_state.session.early_crypto() else {
3963 return;
3964 };
3965 if self.side.is_client() {
3966 match self.crypto_state.session.transport_parameters() {
3967 Ok(params) => {
3968 let params = params
3969 .expect("crypto layer didn't supply transport parameters with ticket");
3970 let params = TransportParameters {
3972 initial_src_cid: None,
3973 original_dst_cid: None,
3974 preferred_address: None,
3975 retry_src_cid: None,
3976 stateless_reset_token: None,
3977 min_ack_delay: None,
3978 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3979 max_ack_delay: TransportParameters::default().max_ack_delay,
3980 initial_max_path_id: None,
3981 ..params
3982 };
3983 self.set_peer_params(params);
3984 self.qlog.emit_peer_transport_params_restored(self, now);
3985 }
3986 Err(e) => {
3987 error!("session ticket has malformed transport parameters: {}", e);
3988 return;
3989 }
3990 }
3991 }
3992 trace!("0-RTT enabled");
3993 self.crypto_state.enable_zero_rtt(header, packet);
3994 }
3995
3996 fn read_crypto(
3997 &mut self,
3998 space: SpaceId,
3999 crypto: &frame::Crypto,
4000 payload_len: usize,
4001 ) -> Result<(), TransportError> {
4002 let expected = if !self.state.is_handshake() {
4003 SpaceId::Data
4004 } else if self.highest_space == SpaceKind::Initial {
4005 SpaceId::Initial
4006 } else {
4007 SpaceId::Handshake
4010 };
4011 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
4015
4016 let end = crypto.offset + crypto.data.len() as u64;
4017 if space < expected
4018 && end
4019 > self.crypto_state.spaces[space.kind()]
4020 .crypto_stream
4021 .bytes_read()
4022 {
4023 warn!(
4024 "received new {:?} CRYPTO data when expecting {:?}",
4025 space, expected
4026 );
4027 return Err(TransportError::PROTOCOL_VIOLATION(
4028 "new data at unexpected encryption level",
4029 ));
4030 }
4031
4032 let crypto_space = &mut self.crypto_state.spaces[space.kind()];
4033 let max = end.saturating_sub(crypto_space.crypto_stream.bytes_read());
4034 if max > self.config.crypto_buffer_size as u64 {
4035 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
4036 }
4037
4038 crypto_space
4039 .crypto_stream
4040 .insert(crypto.offset, crypto.data.clone(), payload_len);
4041 while let Some(chunk) = crypto_space.crypto_stream.read(usize::MAX, true) {
4042 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
4043 if self.crypto_state.session.read_handshake(&chunk.bytes)? {
4044 self.events.push_back(Event::HandshakeDataReady);
4045 }
4046 }
4047
4048 Ok(())
4049 }
4050
4051 fn write_crypto(&mut self) {
4052 loop {
4053 let space = self.highest_space;
4054 let mut outgoing = Vec::new();
4055 if let Some(crypto) = self.crypto_state.session.write_handshake(&mut outgoing) {
4056 match space {
4057 SpaceKind::Initial => {
4058 self.upgrade_crypto(SpaceKind::Handshake, crypto);
4059 }
4060 SpaceKind::Handshake => {
4061 self.upgrade_crypto(SpaceKind::Data, crypto);
4062 }
4063 SpaceKind::Data => unreachable!("got updated secrets during 1-RTT"),
4064 }
4065 }
4066 if outgoing.is_empty() {
4067 if space == self.highest_space {
4068 break;
4069 } else {
4070 continue;
4072 }
4073 }
4074 let offset = self.crypto_state.spaces[space].crypto_offset;
4075 let outgoing = Bytes::from(outgoing);
4076 if let Some(hs) = self.state.as_handshake_mut()
4077 && space == SpaceKind::Initial
4078 && offset == 0
4079 && self.side.is_client()
4080 {
4081 hs.client_hello = Some(outgoing.clone());
4082 }
4083 self.crypto_state.spaces[space].crypto_offset += outgoing.len() as u64;
4084 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
4085 self.spaces[space].pending.crypto.push_back(frame::Crypto {
4086 offset,
4087 data: outgoing,
4088 });
4089 }
4090 }
4091
4092 fn upgrade_crypto(&mut self, space: SpaceKind, crypto: Keys) {
4094 debug_assert!(
4095 !self.crypto_state.has_keys(space.encryption_level()),
4096 "already reached packet space {space:?}"
4097 );
4098 trace!("{:?} keys ready", space);
4099 if space == SpaceKind::Data {
4100 self.crypto_state.next_crypto = Some(
4102 self.crypto_state
4103 .session
4104 .next_1rtt_keys()
4105 .expect("handshake should be complete"),
4106 );
4107 }
4108
4109 self.crypto_state.spaces[space].keys = Some(crypto);
4110 debug_assert!(space > self.highest_space);
4111 self.highest_space = space;
4112 if space == SpaceKind::Data && self.side.is_client() {
4113 self.crypto_state.discard_zero_rtt();
4115 }
4116 }
4117
4118 fn discard_space(&mut self, now: Instant, space: SpaceKind) {
4119 debug_assert!(space != SpaceKind::Data);
4120 trace!("discarding {:?} keys", space);
4121 if space == SpaceKind::Initial {
4122 if let ConnectionSide::Client { token, .. } = &mut self.side {
4124 *token = Bytes::new();
4125 }
4126 }
4127 self.crypto_state.spaces[space].keys = None;
4128 let space = &mut self.spaces[space];
4129 let pns = space.for_path(PathId::ZERO);
4130 pns.time_of_last_ack_eliciting_packet = None;
4131 pns.loss_time = None;
4132 pns.loss_probes = 0;
4133 let sent_packets = mem::take(&mut pns.sent_packets);
4134 let path = self
4135 .paths
4136 .get_mut(&PathId::ZERO)
4137 .expect("PathId::ZERO is alive while Initial/Handshake spaces exist");
4138 for (_, packet) in sent_packets.into_iter() {
4139 path.data.remove_in_flight(&packet);
4140 }
4141
4142 self.set_loss_detection_timer(now, PathId::ZERO)
4143 }
4144
4145 fn handle_coalesced(
4146 &mut self,
4147 now: Instant,
4148 network_path: FourTuple,
4149 path_id: PathId,
4150 ecn: Option<EcnCodepoint>,
4151 data: BytesMut,
4152 ) {
4153 self.path_data_mut(path_id)
4154 .inc_total_recvd(data.len() as u64);
4155 let mut remaining = Some(data);
4156 let cid_len = self
4157 .local_cid_state
4158 .values()
4159 .map(|cid_state| cid_state.cid_len())
4160 .next()
4161 .expect("one cid_state must exist");
4162 while let Some(data) = remaining {
4163 match PartialDecode::new(
4164 data,
4165 &FixedLengthConnectionIdParser::new(cid_len),
4166 &[self.version],
4167 self.endpoint_config.grease_quic_bit,
4168 ) {
4169 Ok((partial_decode, rest)) => {
4170 remaining = rest;
4171 self.handle_decode(now, network_path, path_id, ecn, partial_decode);
4172 }
4173 Err(e) => {
4174 trace!("malformed header: {}", e);
4175 return;
4176 }
4177 }
4178 }
4179 }
4180
4181 fn handle_decode(
4187 &mut self,
4188 now: Instant,
4189 network_path: FourTuple,
4190 path_id: PathId,
4191 ecn: Option<EcnCodepoint>,
4192 partial_decode: PartialDecode,
4193 ) {
4194 let qlog = QlogRecvPacket::new(partial_decode.len());
4195 if let Some(decoded) = self
4196 .crypto_state
4197 .unprotect_header(partial_decode, self.peer_params.stateless_reset_token)
4198 {
4199 self.handle_packet(
4200 now,
4201 network_path,
4202 path_id,
4203 ecn,
4204 decoded.packet,
4205 decoded.stateless_reset,
4206 qlog,
4207 );
4208 }
4209 }
4210
4211 fn handle_packet(
4218 &mut self,
4219 now: Instant,
4220 network_path: FourTuple,
4221 path_id: PathId,
4222 ecn: Option<EcnCodepoint>,
4223 packet: Option<Packet>,
4224 stateless_reset: bool,
4225 mut qlog: QlogRecvPacket,
4226 ) {
4227 self.path_stats.for_path(path_id).udp_rx.ios += 1;
4228
4229 if let Some(ref packet) = packet {
4230 trace!(
4231 "got {:?} packet ({} bytes) from {} using id {}",
4232 packet.header.space(),
4233 packet.payload.len() + packet.header_data.len(),
4234 network_path,
4235 packet.header.dst_cid(),
4236 );
4237 }
4238
4239 let was_closed = self.state.is_closed();
4240 let was_drained = self.state.is_drained();
4241
4242 let decrypted = match packet {
4244 None => Err(None),
4245 Some(mut packet) => self
4246 .decrypt_packet(now, path_id, &mut packet)
4247 .map(move |number| (packet, number)),
4248 };
4249 let result = match decrypted {
4250 _ if stateless_reset => {
4251 debug!("got stateless reset");
4252 Err(ConnectionError::Reset)
4253 }
4254 Err(Some(e)) => {
4255 warn!("illegal packet: {}", e);
4256 Err(e.into())
4257 }
4258 Err(None) => {
4259 debug!("failed to authenticate packet");
4260 self.authentication_failures += 1;
4261 let integrity_limit = self
4262 .crypto_state
4263 .integrity_limit(self.highest_space)
4264 .unwrap();
4265 if self.authentication_failures > integrity_limit {
4266 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
4267 } else {
4268 return;
4269 }
4270 }
4271 Ok((packet, pn)) => {
4272 qlog.header(&packet.header, pn, path_id);
4274 let span = match pn {
4275 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
4276 None => trace_span!("recv", space = ?packet.header.space()),
4277 };
4278 let _guard = span.enter();
4279
4280 if self.is_handshaking()
4288 && self
4289 .path(path_id)
4290 .map(|path_data| {
4291 !path_data.network_path.is_probably_same_path(&network_path)
4292 })
4293 .unwrap_or(false)
4294 {
4295 if let Some(hs) = self.state.as_handshake()
4296 && hs.allow_server_migration
4297 {
4298 trace!(
4299 %network_path,
4300 prev = %self.path_data(path_id).network_path,
4301 "server migrated to new remote",
4302 );
4303 self.path_data_mut(path_id).network_path = network_path;
4304 self.qlog.emit_tuple_assigned(path_id, network_path, now);
4305 } else {
4306 debug!(
4307 recv_path = %network_path,
4308 expected_path = %self.path_data_mut(path_id).network_path,
4309 "discarding packet with unexpected remote during handshake",
4310 );
4311 return;
4312 }
4313 }
4314
4315 let dedup = self.spaces[packet.header.space()]
4316 .path_space_mut(path_id)
4317 .map(|pns| &mut pns.dedup);
4318 if pn.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
4319 debug!("discarding possible duplicate packet");
4320 self.qlog.emit_packet_received(qlog, now);
4321 return;
4322 } else if self.state.is_handshake() && packet.header.is_short() {
4323 trace!("dropping short packet during handshake");
4325 self.qlog.emit_packet_received(qlog, now);
4326 return;
4327 } else {
4328 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header
4329 && let Some(hs) = self.state.as_handshake()
4330 && self.side.is_server()
4331 && token != &hs.expected_token
4332 {
4333 warn!("discarding Initial with invalid retry token");
4337 self.qlog.emit_packet_received(qlog, now);
4338 return;
4339 }
4340
4341 if !self.state.is_closed() {
4342 let spin = match packet.header {
4343 Header::Short { spin, .. } => spin,
4344 _ => false,
4345 };
4346
4347 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
4348 self.ensure_path(path_id, network_path, now, pn);
4350 }
4351 if self.paths.contains_key(&path_id) {
4352 self.on_packet_authenticated(
4353 now,
4354 packet.header.space(),
4355 path_id,
4356 ecn,
4357 pn,
4358 spin,
4359 packet.header.is_1rtt(),
4360 &network_path,
4361 );
4362 }
4363 }
4364
4365 let res = self.process_decrypted_packet(
4366 now,
4367 network_path,
4368 path_id,
4369 pn,
4370 packet,
4371 &mut qlog,
4372 );
4373
4374 self.qlog.emit_packet_received(qlog, now);
4375 res
4376 }
4377 }
4378 };
4379
4380 if let Err(conn_err) = result {
4382 match conn_err {
4383 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
4384 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
4385 ConnectionError::Reset
4386 | ConnectionError::TransportError(TransportError {
4387 code: TransportErrorCode::AEAD_LIMIT_REACHED,
4388 ..
4389 }) => {
4390 let was_draining = self.state.move_to_drained(Some(conn_err));
4391 if !was_draining {
4392 self.endpoint_events.push_back(EndpointEventInner::Draining);
4393 }
4394 }
4395 ConnectionError::TimedOut => {
4396 unreachable!("timeouts aren't generated by packet processing");
4397 }
4398 ConnectionError::TransportError(err) => {
4399 debug!("closing connection due to transport error: {}", err);
4400 self.state.move_to_closed(err);
4401 }
4402 ConnectionError::VersionMismatch => {
4403 self.state.move_to_draining(Some(conn_err));
4404 self.endpoint_events.push_back(EndpointEventInner::Draining);
4405 }
4406 ConnectionError::LocallyClosed => {
4407 unreachable!("LocallyClosed isn't generated by packet processing");
4408 }
4409 ConnectionError::CidsExhausted => {
4410 unreachable!("CidsExhausted isn't generated by packet processing");
4411 }
4412 };
4413 }
4414
4415 if !was_closed && self.state.is_closed() {
4416 self.close_common();
4417 if !self.state.is_drained() {
4418 self.set_close_timer(now);
4419 }
4420 }
4421 if !was_drained && self.state.is_drained() {
4422 self.endpoint_events.push_back(EndpointEventInner::Drained);
4423 self.timers
4426 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
4427 }
4428
4429 if matches!(self.state.as_type(), StateType::Closed) {
4436 if self
4454 .paths
4455 .get(&path_id)
4456 .map(|p| p.data.validated && p.data.network_path == network_path)
4457 .unwrap_or(false)
4458 {
4459 self.connection_close_pending = true;
4460 }
4461 }
4462 }
4463
4464 fn process_decrypted_packet(
4465 &mut self,
4466 now: Instant,
4467 network_path: FourTuple,
4468 path_id: PathId,
4469 number: Option<u64>,
4470 packet: Packet,
4471 qlog: &mut QlogRecvPacket,
4472 ) -> Result<(), ConnectionError> {
4473 if !self.paths.contains_key(&path_id) {
4474 trace!(%path_id, ?number, "discarding packet for unknown path");
4478 return Ok(());
4479 }
4480 let state = match self.state.as_type() {
4481 StateType::Established => {
4482 match packet.header.space() {
4483 SpaceKind::Data => self.process_payload(
4484 now,
4485 network_path,
4486 path_id,
4487 number.unwrap(),
4488 packet,
4489 qlog,
4490 )?,
4491 _ if packet.header.has_frames() => {
4492 self.process_early_payload(now, path_id, packet, qlog)?
4493 }
4494 _ => {
4495 trace!("discarding unexpected pre-handshake packet");
4496 }
4497 }
4498 return Ok(());
4499 }
4500 StateType::Closed => {
4501 for result in frame::Iter::new(packet.payload.freeze())? {
4502 let frame = match result {
4503 Ok(frame) => frame,
4504 Err(err) => {
4505 debug!("frame decoding error: {err:?}");
4506 continue;
4507 }
4508 };
4509 qlog.frame(&frame);
4510
4511 if let Frame::Padding = frame {
4512 continue;
4513 };
4514
4515 trace!(?frame, "processing frame in closed state");
4516
4517 self.path_stats
4518 .for_path(path_id)
4519 .frame_rx
4520 .record(frame.ty());
4521
4522 if let Frame::Close(_error) = frame {
4523 self.state.move_to_draining(None);
4524 self.endpoint_events.push_back(EndpointEventInner::Draining);
4525 break;
4526 }
4527 }
4528 return Ok(());
4529 }
4530 StateType::Draining | StateType::Drained => return Ok(()),
4531 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
4532 };
4533
4534 match packet.header {
4535 Header::Retry {
4536 src_cid: remote_cid,
4537 ..
4538 } => {
4539 debug_assert_eq!(path_id, PathId::ZERO);
4540 if self.side.is_server() {
4541 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
4542 }
4543
4544 let is_valid_retry = self
4545 .remote_cids
4546 .get(&path_id)
4547 .map(|cids| cids.active())
4548 .map(|orig_dst_cid| {
4549 self.crypto_state.session.is_valid_retry(
4550 orig_dst_cid,
4551 &packet.header_data,
4552 &packet.payload,
4553 )
4554 })
4555 .unwrap_or_default();
4556 if self.total_authed_packets > 1
4557 || packet.payload.len() <= 16 || !is_valid_retry
4559 {
4560 trace!("discarding invalid Retry");
4561 return Ok(());
4569 }
4570
4571 trace!("retrying with CID {}", remote_cid);
4572 let client_hello = state.client_hello.take().unwrap();
4573 self.retry_src_cid = Some(remote_cid);
4574 self.remote_cids
4575 .get_mut(&path_id)
4576 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
4577 .update_initial_cid(remote_cid);
4578 self.remote_handshake_cid = remote_cid;
4579
4580 let space = &mut self.spaces[SpaceId::Initial];
4581 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
4582 self.on_packet_acked(now, PathId::ZERO, 0, info);
4583 };
4584
4585 self.discard_space(now, SpaceKind::Initial); let crypto_space = &mut self.crypto_state.spaces[SpaceKind::Initial];
4588 crypto_space.keys = Some(
4589 self.crypto_state
4590 .session
4591 .initial_keys(remote_cid, self.side.side()),
4592 );
4593 crypto_space.crypto_offset = client_hello.len() as u64;
4594
4595 let next_pn = self.spaces[SpaceId::Initial]
4596 .for_path(path_id)
4597 .next_packet_number;
4598 self.spaces[SpaceId::Initial] = {
4599 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
4600 space.for_path(path_id).next_packet_number = next_pn;
4601 space.pending.crypto.push_back(frame::Crypto {
4602 offset: 0,
4603 data: client_hello,
4604 });
4605 space
4606 };
4607
4608 let zero_rtt = mem::take(
4610 &mut self.spaces[SpaceId::Data]
4611 .for_path(PathId::ZERO)
4612 .sent_packets,
4613 );
4614 for (_, info) in zero_rtt.into_iter() {
4615 self.paths
4616 .get_mut(&PathId::ZERO)
4617 .unwrap()
4618 .remove_in_flight(&info);
4619 self.spaces[SpaceId::Data].pending |= info.retransmits;
4620 }
4621 self.streams.retransmit_all_for_0rtt();
4622
4623 let token_len = packet.payload.len() - 16;
4624 let ConnectionSide::Client { ref mut token, .. } = self.side else {
4625 unreachable!("we already short-circuited if we're server");
4626 };
4627 *token = packet.payload.freeze().split_to(token_len);
4628
4629 self.state = State::handshake(state::Handshake {
4630 expected_token: Bytes::new(),
4631 remote_cid_set: false,
4632 client_hello: None,
4633 allow_server_migration: self.config.server_handshake_migration,
4634 });
4635 Ok(())
4636 }
4637 Header::Long {
4638 ty: LongType::Handshake,
4639 src_cid: remote_cid,
4640 dst_cid: local_cid,
4641 ..
4642 } => {
4643 debug_assert_eq!(path_id, PathId::ZERO);
4644 if remote_cid != self.remote_handshake_cid {
4645 debug!(
4646 "discarding packet with mismatched remote CID: {} != {}",
4647 self.remote_handshake_cid, remote_cid
4648 );
4649 return Ok(());
4650 }
4651 self.on_path_validated(path_id);
4652
4653 self.process_early_payload(now, path_id, packet, qlog)?;
4654 if self.state.is_closed() {
4655 return Ok(());
4656 }
4657
4658 if self.crypto_state.session.is_handshaking() {
4659 trace!("handshake ongoing");
4660 return Ok(());
4661 }
4662
4663 if self.side.is_client() {
4664 let params = self
4666 .crypto_state
4667 .session
4668 .transport_parameters()?
4669 .ok_or_else(|| {
4670 TransportError::new(
4671 TransportErrorCode::crypto(0x6d),
4672 "transport parameters missing".to_owned(),
4673 )
4674 })?;
4675
4676 if self.has_0rtt() {
4677 if !self.crypto_state.session.early_data_accepted().unwrap() {
4678 debug_assert!(self.side.is_client());
4679 debug!("0-RTT rejected");
4680 self.crypto_state.accepted_0rtt = false;
4681 self.streams.zero_rtt_rejected();
4682
4683 self.spaces[SpaceId::Data].pending = Retransmits::default();
4685
4686 let sent_packets = mem::take(
4688 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4689 );
4690 for (_, packet) in sent_packets.into_iter() {
4691 self.paths
4692 .get_mut(&path_id)
4693 .unwrap()
4694 .remove_in_flight(&packet);
4695 }
4696 } else {
4697 self.crypto_state.accepted_0rtt = true;
4698 params.validate_resumption_from(&self.peer_params)?;
4699 }
4700 }
4701 if let Some(token) = params.stateless_reset_token {
4702 let remote = self.path_data(path_id).network_path.remote;
4703 debug_assert!(!self.state.is_drained()); self.endpoint_events
4705 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4706 }
4707 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4708 self.issue_first_cids(now);
4709 } else {
4710 self.spaces[SpaceId::Data].pending.handshake_done = true;
4712 self.discard_space(now, SpaceKind::Handshake);
4713 self.events.push_back(Event::HandshakeConfirmed);
4714 trace!("handshake confirmed");
4715 }
4716
4717 self.events.push_back(Event::Connected);
4718 self.state.move_to_established();
4719 trace!("established");
4720
4721 self.issue_first_path_cids(now);
4724 Ok(())
4725 }
4726 Header::Initial(InitialHeader {
4727 src_cid: remote_cid,
4728 dst_cid: local_cid,
4729 ..
4730 }) => {
4731 debug_assert_eq!(path_id, PathId::ZERO);
4732 if !state.remote_cid_set {
4733 trace!("switching remote CID to {}", remote_cid);
4734 let mut state = state.clone();
4735 self.remote_cids
4736 .get_mut(&path_id)
4737 .expect("PathId::ZERO not yet abandoned")
4738 .update_initial_cid(remote_cid);
4739 self.remote_handshake_cid = remote_cid;
4740 self.original_remote_cid = remote_cid;
4741 state.remote_cid_set = true;
4742 self.state.move_to_handshake(state);
4743 } else if remote_cid != self.remote_handshake_cid {
4744 debug!(
4745 "discarding packet with mismatched remote CID: {} != {}",
4746 self.remote_handshake_cid, remote_cid
4747 );
4748 return Ok(());
4749 }
4750
4751 let starting_space = self.highest_space;
4752 self.process_early_payload(now, path_id, packet, qlog)?;
4753
4754 if self.side.is_server()
4755 && starting_space == SpaceKind::Initial
4756 && self.highest_space != SpaceKind::Initial
4757 {
4758 let params = self
4759 .crypto_state
4760 .session
4761 .transport_parameters()?
4762 .ok_or_else(|| {
4763 TransportError::new(
4764 TransportErrorCode::crypto(0x6d),
4765 "transport parameters missing".to_owned(),
4766 )
4767 })?;
4768 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4769 self.issue_first_cids(now);
4770 self.init_0rtt(now);
4771 }
4772 Ok(())
4773 }
4774 Header::Long {
4775 ty: LongType::ZeroRtt,
4776 ..
4777 } => {
4778 self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4779 Ok(())
4780 }
4781 Header::VersionNegotiate { .. } => {
4782 if self.total_authed_packets > 1 {
4783 return Ok(());
4784 }
4785 let supported = packet
4786 .payload
4787 .chunks(4)
4788 .any(|x| match <[u8; 4]>::try_from(x) {
4789 Ok(version) => self.version == u32::from_be_bytes(version),
4790 Err(_) => false,
4791 });
4792 if supported {
4793 return Ok(());
4794 }
4795 debug!("remote doesn't support our version");
4796 Err(ConnectionError::VersionMismatch)
4797 }
4798 Header::Short { .. } => unreachable!(
4799 "short packets received during handshake are discarded in handle_packet"
4800 ),
4801 }
4802 }
4803
4804 fn process_early_payload(
4806 &mut self,
4807 now: Instant,
4808 path_id: PathId,
4809 packet: Packet,
4810 #[allow(unused)] qlog: &mut QlogRecvPacket,
4811 ) -> Result<(), TransportError> {
4812 debug_assert_ne!(packet.header.space(), SpaceKind::Data);
4813 debug_assert_eq!(path_id, PathId::ZERO);
4814 let payload_len = packet.payload.len();
4815 let mut ack_eliciting = false;
4816 for result in frame::Iter::new(packet.payload.freeze())? {
4817 let frame = result?;
4818 qlog.frame(&frame);
4819 let span = match frame {
4820 Frame::Padding => continue,
4821 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4822 };
4823
4824 self.path_stats
4825 .for_path(path_id)
4826 .frame_rx
4827 .record(frame.ty());
4828
4829 let _guard = span.as_ref().map(|x| x.enter());
4830 ack_eliciting |= frame.is_ack_eliciting();
4831
4832 if frame.is_1rtt() && packet.header.space() != SpaceKind::Data {
4834 return Err(TransportError::PROTOCOL_VIOLATION(
4835 "illegal frame type in handshake",
4836 ));
4837 }
4838
4839 match frame {
4840 Frame::Padding | Frame::Ping => {}
4841 Frame::Crypto(frame) => {
4842 self.read_crypto(packet.header.space().into(), &frame, payload_len)?;
4843 }
4844 Frame::Ack(ack) => {
4845 self.on_ack_received(now, packet.header.space().into(), ack)?;
4846 }
4847 Frame::PathAck(ack) => {
4848 span.as_ref()
4849 .map(|span| span.record("path", tracing::field::display(&ack.path_id)));
4850 self.on_path_ack_received(now, packet.header.space().into(), ack)?;
4851 }
4852 Frame::Close(reason) => {
4853 self.state.move_to_draining(Some(reason.into()));
4854 self.endpoint_events.push_back(EndpointEventInner::Draining);
4855 return Ok(());
4856 }
4857 _ => {
4858 let mut err =
4859 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4860 err.frame = frame::MaybeFrame::Known(frame.ty());
4861 return Err(err);
4862 }
4863 }
4864 }
4865
4866 if ack_eliciting {
4867 self.spaces[packet.header.space()]
4869 .for_path(path_id)
4870 .pending_acks
4871 .set_immediate_ack_required();
4872 }
4873
4874 self.write_crypto();
4875 Ok(())
4876 }
4877
4878 fn process_payload(
4880 &mut self,
4881 now: Instant,
4882 network_path: FourTuple,
4883 path_id: PathId,
4884 number: u64,
4885 packet: Packet,
4886 #[allow(unused)] qlog: &mut QlogRecvPacket,
4887 ) -> Result<(), TransportError> {
4888 let is_multipath_negotiated = self.is_multipath_negotiated();
4889 let payload = packet.payload.freeze();
4890 let mut is_probing_packet = true;
4891 let mut close = None;
4892 let payload_len = payload.len();
4893 let mut ack_eliciting = false;
4894 let mut migration_observed_addr = None;
4897 for result in frame::Iter::new(payload)? {
4898 let frame = result?;
4899 qlog.frame(&frame);
4900 let span = match frame {
4901 Frame::Padding => continue,
4902 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4903 };
4904
4905 self.path_stats
4906 .for_path(path_id)
4907 .frame_rx
4908 .record(frame.ty());
4909 match &frame {
4912 Frame::Crypto(f) => {
4913 trace!(offset = f.offset, len = f.data.len(), "got frame CRYPTO");
4914 }
4915 Frame::Stream(f) => {
4916 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got frame STREAM");
4917 }
4918 Frame::Datagram(f) => {
4919 trace!(len = f.data.len(), "got frame DATAGRAM");
4920 }
4921 f => {
4922 trace!("got frame {f}");
4923 }
4924 }
4925
4926 let _guard = span.enter();
4927 if packet.header.is_0rtt() {
4928 match frame {
4929 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4930 return Err(TransportError::PROTOCOL_VIOLATION(
4931 "illegal frame type in 0-RTT",
4932 ));
4933 }
4934 _ => {
4935 if frame.is_1rtt() {
4936 return Err(TransportError::PROTOCOL_VIOLATION(
4937 "illegal frame type in 0-RTT",
4938 ));
4939 }
4940 }
4941 }
4942 }
4943 ack_eliciting |= frame.is_ack_eliciting();
4944
4945 match frame {
4947 Frame::Padding
4948 | Frame::PathChallenge(_)
4949 | Frame::PathResponse(_)
4950 | Frame::NewConnectionId(_)
4951 | Frame::ObservedAddr(_) => {}
4952 _ => {
4953 is_probing_packet = false;
4954 }
4955 }
4956
4957 match frame {
4958 Frame::Crypto(frame) => {
4959 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4960 }
4961 Frame::Stream(frame) => {
4962 if self.streams.received(frame, payload_len)?.should_transmit() {
4963 self.spaces[SpaceId::Data].pending.max_data = true;
4964 }
4965 }
4966 Frame::Ack(ack) => {
4967 self.on_ack_received(now, SpaceId::Data, ack)?;
4968 }
4969 Frame::PathAck(ack) => {
4970 if !self.is_multipath_negotiated() {
4971 return Err(TransportError::PROTOCOL_VIOLATION(
4972 "received PATH_ACK frame when multipath was not negotiated",
4973 ));
4974 }
4975 span.record("path", tracing::field::display(&ack.path_id));
4976 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4977 }
4978 Frame::Padding | Frame::Ping => {}
4979 Frame::Close(reason) => {
4980 close = Some(reason);
4981 }
4982 Frame::PathChallenge(challenge) => {
4983 let path = &mut self
4984 .path_mut(path_id)
4985 .expect("payload is processed only after the path becomes known");
4986 path.path_responses.push(number, challenge.0, network_path);
4987 if network_path.remote == path.network_path.remote {
4991 match self.peer_supports_ack_frequency() {
4999 true => self.immediate_ack(path_id),
5000 false => {
5001 self.ping_path(path_id).ok();
5002 }
5003 }
5004 }
5005 }
5006 Frame::PathResponse(response) => {
5007 if self
5009 .n0_nat_traversal
5010 .handle_path_response(network_path, response.0)
5011 {
5012 self.open_nat_traversed_paths(now);
5013 } else {
5014 let path = self
5017 .paths
5018 .get_mut(&path_id)
5019 .expect("payload is processed only after the path becomes known");
5020
5021 use PathTimer::*;
5022 use paths::OnPathResponseReceived::*;
5023 match path
5024 .data
5025 .on_path_response_received(now, response.0, network_path)
5026 {
5027 OnPath { was_open } => {
5028 let qlog = self.qlog.with_time(now);
5029
5030 self.timers.stop(
5031 Timer::PerPath(path_id, PathValidationFailed),
5032 qlog.clone(),
5033 );
5034 self.timers.stop(
5035 Timer::PerPath(path_id, AbandonFromValidation),
5036 qlog.clone(),
5037 );
5038
5039 let next_challenge = path
5040 .data
5041 .earliest_on_path_expiring_challenge()
5042 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
5043 self.timers.set_or_stop(
5044 Timer::PerPath(path_id, PathChallengeLost),
5045 next_challenge,
5046 qlog,
5047 );
5048
5049 if !was_open {
5050 if is_multipath_negotiated {
5051 self.events.push_back(Event::Path(
5052 PathEvent::Established { id: path_id },
5053 ));
5054 }
5055 if let Some(observed) =
5056 path.data.last_observed_addr_report.as_ref()
5057 {
5058 self.events.push_back(Event::Path(
5059 PathEvent::ObservedAddr {
5060 id: path_id,
5061 addr: observed.socket_addr(),
5062 },
5063 ));
5064 }
5065 }
5066 if let Some((_, ref mut prev)) = path.prev {
5067 prev.reset_on_path_challenges();
5072 }
5073 }
5074 Ignored {
5075 sent_on,
5076 current_path,
5077 } => {
5078 debug!(%sent_on, %current_path, %response, "ignoring valid PATH_RESPONSE")
5079 }
5080 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
5081 }
5082 }
5083 }
5084 Frame::MaxData(frame::MaxData(bytes)) => {
5085 self.streams.received_max_data(bytes);
5086 }
5087 Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
5088 self.streams.received_max_stream_data(id, offset)?;
5089 }
5090 Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
5091 self.streams.received_max_streams(dir, count)?;
5092 }
5093 Frame::ResetStream(frame) => {
5094 if self.streams.received_reset(frame)?.should_transmit() {
5095 self.spaces[SpaceId::Data].pending.max_data = true;
5096 }
5097 }
5098 Frame::DataBlocked(DataBlocked(offset)) => {
5099 debug!(offset, "peer claims to be blocked at connection level");
5100 }
5101 Frame::StreamDataBlocked(StreamDataBlocked { id, offset }) => {
5102 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
5103 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
5104 return Err(TransportError::STREAM_STATE_ERROR(
5105 "STREAM_DATA_BLOCKED on send-only stream",
5106 ));
5107 }
5108 debug!(
5109 stream = %id,
5110 offset, "peer claims to be blocked at stream level"
5111 );
5112 }
5113 Frame::StreamsBlocked(StreamsBlocked { dir, limit }) => {
5114 if limit > MAX_STREAM_COUNT {
5115 return Err(TransportError::FRAME_ENCODING_ERROR(
5116 "unrepresentable stream limit",
5117 ));
5118 }
5119 debug!(
5120 "peer claims to be blocked opening more than {} {} streams",
5121 limit, dir
5122 );
5123 }
5124 Frame::StopSending(frame::StopSending { id, error_code }) => {
5125 if id.initiator() != self.side.side() {
5126 if id.dir() == Dir::Uni {
5127 debug!("got STOP_SENDING on recv-only {}", id);
5128 return Err(TransportError::STREAM_STATE_ERROR(
5129 "STOP_SENDING on recv-only stream",
5130 ));
5131 }
5132 } else if self.streams.is_local_unopened(id) {
5133 return Err(TransportError::STREAM_STATE_ERROR(
5134 "STOP_SENDING on unopened stream",
5135 ));
5136 }
5137 self.streams.received_stop_sending(id, error_code);
5138 }
5139 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
5140 if let Some(ref path_id) = path_id {
5141 span.record("path", tracing::field::display(&path_id));
5142 }
5143 let path_id = path_id.unwrap_or_default();
5144 match self.local_cid_state.get_mut(&path_id) {
5145 None => debug!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
5146 Some(cid_state) => {
5147 let allow_more_cids = cid_state
5148 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
5149
5150 let has_path = !self.abandoned_paths.contains(&path_id);
5154 let allow_more_cids = allow_more_cids && has_path;
5155
5156 debug_assert!(!self.state.is_drained()); self.endpoint_events
5158 .push_back(EndpointEventInner::RetireConnectionId(
5159 now,
5160 path_id,
5161 sequence,
5162 allow_more_cids,
5163 ));
5164 }
5165 }
5166 }
5167 Frame::NewConnectionId(frame) => {
5168 let path_id = if let Some(path_id) = frame.path_id {
5169 if !self.is_multipath_negotiated() {
5170 return Err(TransportError::PROTOCOL_VIOLATION(
5171 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
5172 ));
5173 }
5174 if path_id > self.local_max_path_id {
5175 return Err(TransportError::PROTOCOL_VIOLATION(
5176 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
5177 ));
5178 }
5179 path_id
5180 } else {
5181 PathId::ZERO
5182 };
5183
5184 if let Some(ref path_id) = frame.path_id {
5185 span.record("path", tracing::field::display(&path_id));
5186 }
5187
5188 if self.abandoned_paths.contains(&path_id) {
5189 trace!("ignoring issued CID for abandoned path");
5190 continue;
5191 }
5192 let remote_cids = self
5193 .remote_cids
5194 .entry(path_id)
5195 .or_insert_with(|| CidQueue::new(frame.id));
5196 if remote_cids.active().is_empty() {
5197 return Err(TransportError::PROTOCOL_VIOLATION(
5198 "NEW_CONNECTION_ID when CIDs aren't in use",
5199 ));
5200 }
5201 if frame.retire_prior_to > frame.sequence {
5202 return Err(TransportError::PROTOCOL_VIOLATION(
5203 "NEW_CONNECTION_ID retiring unissued CIDs",
5204 ));
5205 }
5206
5207 use crate::cid_queue::InsertError;
5208 match remote_cids.insert(frame) {
5209 Ok(None) => {
5210 self.open_nat_traversed_paths(now);
5211 }
5212 Ok(Some((retired, reset_token))) => {
5213 let pending_retired =
5214 &mut self.spaces[SpaceId::Data].pending.retire_cids;
5215 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
5218 if (pending_retired.len() as u64)
5221 .saturating_add(retired.end.saturating_sub(retired.start))
5222 > MAX_PENDING_RETIRED_CIDS
5223 {
5224 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
5225 "queued too many retired CIDs",
5226 ));
5227 }
5228 pending_retired.extend(retired.map(|seq| (path_id, seq)));
5229 self.set_reset_token(path_id, network_path.remote, reset_token);
5230 self.open_nat_traversed_paths(now);
5231 }
5232 Err(InsertError::ExceedsLimit) => {
5233 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
5234 }
5235 Err(InsertError::Retired) => {
5236 trace!("discarding already-retired");
5237 self.spaces[SpaceId::Data]
5241 .pending
5242 .retire_cids
5243 .push((path_id, frame.sequence));
5244 continue;
5245 }
5246 };
5247
5248 if self.side.is_server()
5249 && path_id == PathId::ZERO
5250 && self
5251 .remote_cids
5252 .get(&PathId::ZERO)
5253 .map(|cids| cids.active_seq() == 0)
5254 .unwrap_or_default()
5255 {
5256 self.update_remote_cid(PathId::ZERO);
5259 }
5260 }
5261 Frame::NewToken(NewToken { token }) => {
5262 let ConnectionSide::Client {
5263 token_store,
5264 server_name,
5265 ..
5266 } = &self.side
5267 else {
5268 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
5269 };
5270 if token.is_empty() {
5271 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
5272 }
5273 trace!("got new token");
5274 token_store.insert(server_name, token);
5275 }
5276 Frame::Datagram(datagram) => {
5277 if self
5278 .datagrams
5279 .received(datagram, &self.config.datagram_receive_buffer_size)?
5280 {
5281 self.events.push_back(Event::DatagramReceived);
5282 }
5283 }
5284 Frame::AckFrequency(ack_frequency) => {
5285 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
5288 continue;
5291 }
5292
5293 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
5295 space.pending_acks.set_ack_frequency_params(&ack_frequency);
5296
5297 if !self.abandoned_paths.contains(path_id)
5301 && let Some(timeout) = space
5302 .pending_acks
5303 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
5304 {
5305 self.timers.set(
5306 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
5307 timeout,
5308 self.qlog.with_time(now),
5309 );
5310 }
5311 }
5312 }
5313 Frame::ImmediateAck => {
5314 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
5316 pns.pending_acks.set_immediate_ack_required();
5317 }
5318 }
5319 Frame::HandshakeDone => {
5320 if self.side.is_server() {
5321 return Err(TransportError::PROTOCOL_VIOLATION(
5322 "client sent HANDSHAKE_DONE",
5323 ));
5324 }
5325 if self.crypto_state.has_keys(EncryptionLevel::Handshake) {
5326 self.discard_space(now, SpaceKind::Handshake);
5327 self.events.push_back(Event::HandshakeConfirmed);
5328 trace!("handshake confirmed");
5329 }
5330 }
5331 Frame::ObservedAddr(observed) => {
5332 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
5334 if !self
5335 .peer_params
5336 .address_discovery_role
5337 .should_report(&self.config.address_discovery_role)
5338 {
5339 return Err(TransportError::PROTOCOL_VIOLATION(
5340 "received OBSERVED_ADDRESS frame when not negotiated",
5341 ));
5342 }
5343 if packet.header.space() != SpaceKind::Data {
5345 return Err(TransportError::PROTOCOL_VIOLATION(
5346 "OBSERVED_ADDRESS frame outside data space",
5347 ));
5348 }
5349
5350 let path = self.path_data_mut(path_id);
5351 if path.network_path.remote == network_path.remote {
5352 if let Some(updated) = path.update_observed_addr_report(observed)
5353 && path.open_status == paths::OpenStatus::Informed
5354 {
5355 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5356 id: path_id,
5357 addr: updated,
5358 }));
5359 }
5361 } else {
5362 migration_observed_addr = Some(observed)
5364 }
5365 }
5366 Frame::PathAbandon(frame::PathAbandon {
5367 path_id,
5368 error_code,
5369 }) => {
5370 span.record("path", tracing::field::display(&path_id));
5371 match self.close_path_inner(
5372 now,
5373 path_id,
5374 PathAbandonReason::RemoteAbandoned {
5375 error_code: error_code.into(),
5376 },
5377 ) {
5378 Ok(()) => {
5379 trace!("peer abandoned path");
5380 }
5381 Err(ClosePathError::ClosedPath) => {
5382 trace!("peer abandoned already closed path");
5383 }
5384 Err(ClosePathError::MultipathNotNegotiated) => {
5385 return Err(TransportError::PROTOCOL_VIOLATION(
5386 "received PATH_ABANDON frame when multipath was not negotiated",
5387 ));
5388 }
5389 Err(ClosePathError::LastOpenPath) => {
5390 error!(
5393 "peer abandoned last path but close_path_inner returned LastOpenPath"
5394 );
5395 }
5396 };
5397
5398 if let Some(path) = self.paths.get_mut(&path_id)
5400 && !mem::replace(&mut path.data.draining, true)
5401 {
5402 let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5403 let pto = path.data.rtt.pto_base() + ack_delay;
5404 self.timers.set(
5405 Timer::PerPath(path_id, PathTimer::PathDrained),
5406 now + 3 * pto,
5407 self.qlog.with_time(now),
5408 );
5409
5410 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
5411 }
5412 }
5413 Frame::PathStatusAvailable(info) => {
5414 span.record("path", tracing::field::display(&info.path_id));
5415 if self.is_multipath_negotiated() {
5416 self.on_path_status(
5417 info.path_id,
5418 PathStatus::Available,
5419 info.status_seq_no,
5420 );
5421 } else {
5422 return Err(TransportError::PROTOCOL_VIOLATION(
5423 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
5424 ));
5425 }
5426 }
5427 Frame::PathStatusBackup(info) => {
5428 span.record("path", tracing::field::display(&info.path_id));
5429 if self.is_multipath_negotiated() {
5430 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
5431 } else {
5432 return Err(TransportError::PROTOCOL_VIOLATION(
5433 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
5434 ));
5435 }
5436 }
5437 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
5438 span.record("path", tracing::field::display(&path_id));
5439 if !self.is_multipath_negotiated() {
5440 return Err(TransportError::PROTOCOL_VIOLATION(
5441 "received MAX_PATH_ID frame when multipath was not negotiated",
5442 ));
5443 }
5444 if path_id > self.remote_max_path_id {
5446 self.remote_max_path_id = path_id;
5447 self.issue_first_path_cids(now);
5448 self.open_nat_traversed_paths(now);
5449 }
5450 }
5451 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
5452 if self.is_multipath_negotiated() {
5456 if max_path_id > self.local_max_path_id {
5457 return Err(TransportError::PROTOCOL_VIOLATION(
5458 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
5459 ));
5460 }
5461 debug!("received PATHS_BLOCKED({:?})", max_path_id);
5462 } else {
5464 return Err(TransportError::PROTOCOL_VIOLATION(
5465 "received PATHS_BLOCKED frame when not multipath was not negotiated",
5466 ));
5467 }
5468 }
5469 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
5470 if self.is_multipath_negotiated() {
5478 if path_id > self.local_max_path_id {
5479 return Err(TransportError::PROTOCOL_VIOLATION(
5480 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
5481 ));
5482 }
5483 if next_seq.0
5484 > self
5485 .local_cid_state
5486 .get(&path_id)
5487 .map(|cid_state| cid_state.active_seq().1 + 1)
5488 .unwrap_or_default()
5489 {
5490 return Err(TransportError::PROTOCOL_VIOLATION(
5491 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
5492 ));
5493 }
5494 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
5495 } else {
5496 return Err(TransportError::PROTOCOL_VIOLATION(
5497 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
5498 ));
5499 }
5500 }
5501 Frame::AddAddress(addr) => {
5502 let client_state = match self.n0_nat_traversal.client_side_mut() {
5503 Ok(state) => state,
5504 Err(err) => {
5505 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5506 "Nat traversal(ADD_ADDRESS): {err}"
5507 )));
5508 }
5509 };
5510
5511 if !client_state.check_remote_address(&addr) {
5512 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
5514 }
5515
5516 match client_state.add_remote_address(addr) {
5517 Ok(maybe_added) => {
5518 if let Some(added) = maybe_added {
5519 self.events.push_back(Event::NatTraversal(
5520 n0_nat_traversal::Event::AddressAdded(added),
5521 ));
5522 }
5523 }
5524 Err(e) => {
5525 warn!(%e, "failed to add remote address")
5526 }
5527 }
5528 }
5529 Frame::RemoveAddress(addr) => {
5530 let client_state = match self.n0_nat_traversal.client_side_mut() {
5531 Ok(state) => state,
5532 Err(err) => {
5533 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5534 "Nat traversal(REMOVE_ADDRESS): {err}"
5535 )));
5536 }
5537 };
5538 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
5539 self.events.push_back(Event::NatTraversal(
5540 n0_nat_traversal::Event::AddressRemoved(removed_addr),
5541 ));
5542 }
5543 }
5544 Frame::ReachOut(reach_out) => {
5545 let ipv6 = self.is_ipv6();
5546 let server_state = match self.n0_nat_traversal.server_side_mut() {
5547 Ok(state) => state,
5548 Err(err) => {
5549 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5550 "Nat traversal(REACH_OUT): {err}"
5551 )));
5552 }
5553 };
5554
5555 let round_before = server_state.current_round();
5556
5557 if let Err(err) = server_state.handle_reach_out(reach_out, ipv6) {
5558 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5559 "Nat traversal(REACH_OUT): {err}"
5560 )));
5561 }
5562
5563 if server_state.current_round() > round_before {
5564 if let Some(delay) =
5566 self.n0_nat_traversal.retry_delay(self.config.initial_rtt)
5567 {
5568 self.timers.set(
5569 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
5570 now + delay,
5571 self.qlog.with_time(now),
5572 );
5573 }
5574 }
5575 }
5576 }
5577 }
5578
5579 let space = self.spaces[SpaceId::Data].for_path(path_id);
5580 if space
5581 .pending_acks
5582 .packet_received(now, number, ack_eliciting, &space.dedup)
5583 {
5584 if self.abandoned_paths.contains(&path_id) {
5585 space.pending_acks.set_immediate_ack_required();
5588 } else {
5589 self.timers.set(
5590 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
5591 now + self.ack_frequency.max_ack_delay,
5592 self.qlog.with_time(now),
5593 );
5594 }
5595 }
5596
5597 let pending = &mut self.spaces[SpaceId::Data].pending;
5602 self.streams.queue_max_stream_id(pending);
5603
5604 if let Some(reason) = close {
5605 self.state.move_to_draining(Some(reason.into()));
5606 self.endpoint_events.push_back(EndpointEventInner::Draining);
5607 self.connection_close_pending = true;
5608 }
5609
5610 let migrate_on_any_packet =
5613 self.is_multipath_negotiated() && !self.n0_nat_traversal.is_negotiated();
5614
5615 let is_largest_received_pn = Some(number)
5617 == self.spaces[SpaceId::Data]
5618 .for_path(path_id)
5619 .largest_received_packet_number;
5620
5621 if (migrate_on_any_packet || !is_probing_packet)
5626 && is_largest_received_pn
5627 && self.local_ip_may_migrate()
5628 && let Some(new_local_ip) = network_path.local_ip
5629 {
5630 let path_data = self.path_data_mut(path_id);
5631 if path_data
5632 .network_path
5633 .local_ip
5634 .is_some_and(|ip| ip != new_local_ip)
5635 {
5636 debug!(
5637 %path_id,
5638 new_4tuple = %network_path,
5639 prev_4tuple = %path_data.network_path,
5640 "local address passive migration"
5641 );
5642 }
5643 path_data.network_path.local_ip = Some(new_local_ip)
5644 }
5645
5646 if self.peer_may_migrate()
5648 && (migrate_on_any_packet || !is_probing_packet)
5649 && is_largest_received_pn
5650 && network_path.remote != self.path_data(path_id).network_path.remote
5651 {
5652 self.migrate(path_id, now, network_path, migration_observed_addr);
5653 self.update_remote_cid(path_id);
5655 self.spin = false;
5656 }
5657
5658 Ok(())
5659 }
5660
5661 fn open_nat_traversed_paths(&mut self, now: Instant) {
5663 while let Some(network_path) = self
5664 .n0_nat_traversal
5665 .client_side_mut()
5666 .ok()
5667 .and_then(|s| s.pop_pending_path_open())
5668 {
5669 match self.open_path_ensure(network_path, PathStatus::Backup, now) {
5670 Ok((path_id, already_existed)) => {
5671 debug!(
5672 %path_id,
5673 ?network_path,
5674 new_path = !already_existed,
5675 "Opened NAT traversal path",
5676 );
5677 }
5678 Err(err) => match err {
5679 PathError::MultipathNotNegotiated
5680 | PathError::ServerSideNotAllowed
5681 | PathError::ValidationFailed
5682 | PathError::InvalidRemoteAddress(_) => {
5683 error!(
5684 ?err,
5685 ?network_path,
5686 "Failed to open path for successful NAT traversal"
5687 );
5688 }
5689 PathError::MaxPathIdReached | PathError::RemoteCidsExhausted => {
5690 self.n0_nat_traversal
5692 .client_side_mut()
5693 .map(|s| s.push_pending_path_open(network_path))
5694 .ok();
5695 debug!(
5696 ?err,
5697 ?network_path,
5698 "Blocked opening NAT traversal path, enqueued"
5699 );
5700 return;
5701 }
5702 },
5703 }
5704 }
5705 }
5706
5707 fn migrate(
5712 &mut self,
5713 path_id: PathId,
5714 now: Instant,
5715 network_path: FourTuple,
5716 observed_addr: Option<ObservedAddr>,
5717 ) {
5718 trace!(
5719 new_4tuple = %network_path,
5720 prev_4tuple = %self.path_data(path_id).network_path,
5721 %path_id,
5722 "migration initiated",
5723 );
5724 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
5725 let prev_pto = self.pto(SpaceKind::Data, path_id);
5732 let path = self.paths.get_mut(&path_id).expect("known path");
5733 let mut new_path_data = if network_path.remote.is_ipv4()
5734 && network_path.remote.ip() == path.data.network_path.remote.ip()
5735 {
5736 PathData::from_previous(network_path, &path.data, self.path_generation_counter, now)
5737 } else {
5738 let peer_max_udp_payload_size =
5739 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
5740 .unwrap_or(u16::MAX);
5741 PathData::new(
5742 network_path,
5743 self.allow_mtud,
5744 Some(peer_max_udp_payload_size),
5745 self.path_generation_counter,
5746 now,
5747 &self.config,
5748 )
5749 };
5750 new_path_data.last_observed_addr_report = path.data.last_observed_addr_report.clone();
5751 if let Some(report) = observed_addr
5752 && let Some(updated) = new_path_data.update_observed_addr_report(report)
5753 {
5754 tracing::info!("adding observed addr event from migration");
5755 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5756 id: path_id,
5757 addr: updated,
5758 }));
5759 }
5760 new_path_data.pending_on_path_challenge = true;
5761
5762 let mut prev_path_data = mem::replace(&mut path.data, new_path_data);
5763
5764 if !prev_path_data.validated
5773 && let Some(cid) = self.remote_cids.get(&path_id).map(CidQueue::active)
5774 {
5775 prev_path_data.pending_on_path_challenge = true;
5776 path.prev = Some((cid, prev_path_data));
5779 }
5780
5781 self.qlog.emit_tuple_assigned(path_id, network_path, now);
5783
5784 self.timers.set(
5785 Timer::PerPath(path_id, PathTimer::PathValidationFailed),
5786 now + 3 * cmp::max(self.pto(SpaceKind::Data, path_id), prev_pto),
5787 self.qlog.with_time(now),
5788 );
5789 }
5790
5791 pub fn handle_network_change(&mut self, hint: Option<&dyn NetworkChangeHint>, now: Instant) {
5808 debug!("network changed");
5809 if self.state.is_drained() {
5810 return;
5811 }
5812 if self.highest_space < SpaceKind::Data {
5813 for path in self.paths.values_mut() {
5814 path.data.network_path.local_ip = None;
5816 }
5817
5818 self.update_remote_cid(PathId::ZERO);
5819 self.ping();
5820
5821 return;
5822 }
5823
5824 let mut non_recoverable_paths = Vec::default();
5827 let mut recoverable_paths = Vec::default();
5828 let mut open_paths = 0;
5829
5830 let is_multipath_negotiated = self.is_multipath_negotiated();
5831 let is_client = self.side().is_client();
5832 let immediate_ack_allowed = self.peer_supports_ack_frequency();
5833
5834 for (path_id, path) in self.paths.iter_mut() {
5835 if self.abandoned_paths.contains(path_id) {
5836 continue;
5837 }
5838 open_paths += 1;
5839
5840 let network_path = path.data.network_path;
5843
5844 path.data.network_path.local_ip = None;
5847 let remote = network_path.remote;
5848
5849 let attempt_to_recover = if is_multipath_negotiated {
5853 hint.map(|h| h.is_path_recoverable(*path_id, network_path))
5857 .unwrap_or(!is_client)
5858 } else {
5859 true
5861 };
5862
5863 if attempt_to_recover {
5864 recoverable_paths.push((*path_id, remote));
5865 } else {
5866 non_recoverable_paths.push((*path_id, remote, path.data.local_status()))
5867 }
5868 }
5869
5870 let open_first = open_paths == non_recoverable_paths.len();
5879
5880 for (path_id, remote, status) in non_recoverable_paths.into_iter() {
5881 let network_path = FourTuple {
5882 remote,
5883 local_ip: None, };
5885
5886 if open_first && let Err(e) = self.open_path(network_path, status, now) {
5887 if self.side().is_client() {
5888 debug!(%e, "Failed to open new path for network change");
5889 }
5890 recoverable_paths.push((path_id, remote));
5892 continue;
5893 }
5894
5895 if let Err(e) =
5896 self.close_path_inner(now, path_id, PathAbandonReason::UnusableAfterNetworkChange)
5897 {
5898 debug!(%e,"Failed to close unrecoverable path after network change");
5899 recoverable_paths.push((path_id, remote));
5900 continue;
5901 }
5902
5903 if !open_first && let Err(e) = self.open_path(network_path, status, now) {
5904 debug!(%e,"Failed to open new path for network change");
5908 }
5909 }
5910
5911 for (path_id, remote) in recoverable_paths.into_iter() {
5914 if let Some(path_space) = self.spaces[SpaceId::Data].number_spaces.get_mut(&path_id) {
5916 path_space.ping_pending = true;
5917
5918 if immediate_ack_allowed {
5919 path_space.immediate_ack_pending = true;
5920 }
5921 }
5922
5923 if let Some(path) = self.paths.get_mut(&path_id) {
5928 path.data.pto_count = 0;
5929 }
5930 self.set_loss_detection_timer(now, path_id);
5931
5932 let Some((reset_token, retired)) =
5933 self.remote_cids.get_mut(&path_id).and_then(CidQueue::next)
5934 else {
5935 continue;
5936 };
5937
5938 self.spaces[SpaceId::Data]
5940 .pending
5941 .retire_cids
5942 .extend(retired.map(|seq| (path_id, seq)));
5943
5944 debug_assert!(!self.state.is_drained()); self.endpoint_events
5946 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5947 }
5948 }
5949
5950 fn update_remote_cid(&mut self, path_id: PathId) {
5952 let Some((reset_token, retired)) = self
5953 .remote_cids
5954 .get_mut(&path_id)
5955 .and_then(|cids| cids.next())
5956 else {
5957 return;
5958 };
5959
5960 self.spaces[SpaceId::Data]
5962 .pending
5963 .retire_cids
5964 .extend(retired.map(|seq| (path_id, seq)));
5965 let remote = self.path_data(path_id).network_path.remote;
5966 self.set_reset_token(path_id, remote, reset_token);
5967 }
5968
5969 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5978 debug_assert!(!self.state.is_drained()); self.endpoint_events
5980 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5981
5982 if path_id == PathId::ZERO {
5988 self.peer_params.stateless_reset_token = Some(reset_token);
5989 }
5990 }
5991
5992 fn issue_first_cids(&mut self, now: Instant) {
5994 if self
5995 .local_cid_state
5996 .get(&PathId::ZERO)
5997 .expect("PathId::ZERO exists when the connection is created")
5998 .cid_len()
5999 == 0
6000 {
6001 return;
6002 }
6003
6004 let mut n = self.peer_params.issue_cids_limit() - 1;
6006 if let ConnectionSide::Server { server_config } = &self.side
6007 && server_config.has_preferred_address()
6008 {
6009 n -= 1;
6011 }
6012 debug_assert!(!self.state.is_drained()); self.endpoint_events
6014 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6015 }
6016
6017 fn issue_first_path_cids(&mut self, now: Instant) {
6021 if let Some(max_path_id) = self.max_path_id() {
6022 let mut path_id = self.max_path_id_with_cids.next();
6023 while path_id <= max_path_id {
6024 self.endpoint_events
6025 .push_back(EndpointEventInner::NeedIdentifiers(
6026 path_id,
6027 now,
6028 self.peer_params.issue_cids_limit(),
6029 ));
6030 path_id = path_id.next();
6031 }
6032 self.max_path_id_with_cids = max_path_id;
6033 }
6034 }
6035
6036 fn populate_packet<'a, 'b>(
6044 &mut self,
6045 now: Instant,
6046 space_id: SpaceId,
6047 path_id: PathId,
6048 scheduling_info: &PathSchedulingInfo,
6049 builder: &mut PacketBuilder<'a, 'b>,
6050 ) {
6051 let is_multipath_negotiated = self.is_multipath_negotiated();
6052 let space_has_keys = self.crypto_state.has_keys(space_id.encryption_level());
6053 let is_0rtt = space_id == SpaceId::Data && !space_has_keys;
6054 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
6055 let space = &mut self.spaces[space_id];
6056 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6057 space
6058 .for_path(path_id)
6059 .pending_acks
6060 .maybe_ack_non_eliciting();
6061
6062 if !is_0rtt
6064 && !scheduling_info.is_abandoned
6065 && scheduling_info.may_send_data
6066 && mem::replace(&mut space.pending.handshake_done, false)
6067 {
6068 builder.write_frame(frame::HandshakeDone, stats);
6069 }
6070
6071 if !scheduling_info.is_abandoned
6073 && mem::replace(&mut space.for_path(path_id).ping_pending, false)
6074 {
6075 builder.write_frame(frame::Ping, stats);
6076 }
6077
6078 if !scheduling_info.is_abandoned
6080 && mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false)
6081 {
6082 debug_assert_eq!(
6083 space_id,
6084 SpaceId::Data,
6085 "immediate acks must be sent in the data space"
6086 );
6087 builder.write_frame(frame::ImmediateAck, stats);
6088 }
6089
6090 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
6092 for path_id in space
6093 .number_spaces
6094 .iter_mut()
6095 .filter(|(_, pns)| pns.pending_acks.can_send())
6096 .map(|(&path_id, _)| path_id)
6097 .collect::<Vec<_>>()
6098 {
6099 Self::populate_acks(
6100 now,
6101 self.receiving_ecn,
6102 path_id,
6103 space_id,
6104 space,
6105 is_multipath_negotiated,
6106 builder,
6107 stats,
6108 space_has_keys,
6109 );
6110 }
6111 }
6112
6113 if !scheduling_info.is_abandoned
6115 && scheduling_info.may_send_data
6116 && mem::replace(&mut space.pending.ack_frequency, false)
6117 {
6118 let sequence_number = self.ack_frequency.next_sequence_number();
6119
6120 let config = self.config.ack_frequency_config.as_ref().unwrap();
6122
6123 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
6125 path.rtt.get(),
6126 config,
6127 &self.peer_params,
6128 );
6129
6130 let frame = frame::AckFrequency {
6131 sequence: sequence_number,
6132 ack_eliciting_threshold: config.ack_eliciting_threshold,
6133 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
6134 reordering_threshold: config.reordering_threshold,
6135 };
6136 builder.write_frame(frame, stats);
6137
6138 self.ack_frequency
6139 .ack_frequency_sent(path_id, builder.packet_number, max_ack_delay);
6140 }
6141
6142 if !scheduling_info.is_abandoned
6144 && space_id == SpaceId::Data
6145 && path.pending_on_path_challenge
6146 && !self.state.is_closed()
6147 && builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
6148 {
6150 path.pending_on_path_challenge = false;
6151
6152 let token = self.rng.random();
6153 path.record_path_challenge_sent(now, token, path.network_path);
6154 let challenge = frame::PathChallenge(token);
6156 builder.write_frame(challenge, stats);
6157 builder.require_padding();
6158 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
6159 match path.open_status {
6160 paths::OpenStatus::Sent | paths::OpenStatus::Informed => {}
6161 paths::OpenStatus::Pending => {
6162 path.open_status = paths::OpenStatus::Sent;
6163 self.timers.set(
6164 Timer::PerPath(path_id, PathTimer::AbandonFromValidation),
6165 now + 3 * pto,
6166 self.qlog.with_time(now),
6167 );
6168 }
6169 }
6170
6171 self.timers.set(
6172 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
6173 now + path.on_path_challenge_expiry(),
6174 self.qlog.with_time(now),
6175 );
6176
6177 if is_multipath_negotiated && !path.validated && path.pending_on_path_challenge {
6178 space.pending.path_status.insert(path_id);
6180 }
6181
6182 if space_id == SpaceId::Data
6185 && self
6186 .config
6187 .address_discovery_role
6188 .should_report(&self.peer_params.address_discovery_role)
6189 {
6190 let frame = frame::ObservedAddr::new(
6191 path.network_path.remote,
6192 self.next_observed_addr_seq_no,
6193 );
6194 if builder.frame_space_remaining() > frame.size() {
6195 builder.write_frame(frame, stats);
6196
6197 self.next_observed_addr_seq_no =
6198 self.next_observed_addr_seq_no.saturating_add(1u8);
6199 path.observed_addr_sent = true;
6200
6201 space.pending.observed_addr = false;
6202 }
6203 }
6204 }
6205
6206 if !scheduling_info.is_abandoned
6208 && space_id == SpaceId::Data
6209 && builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
6210 && let Some(token) = path.path_responses.pop_on_path(path.network_path)
6211 {
6212 let response = frame::PathResponse(token);
6213 builder.write_frame(response, stats);
6214 builder.require_padding();
6215
6216 if space_id == SpaceId::Data
6220 && self
6221 .config
6222 .address_discovery_role
6223 .should_report(&self.peer_params.address_discovery_role)
6224 {
6225 let frame = frame::ObservedAddr::new(
6226 path.network_path.remote,
6227 self.next_observed_addr_seq_no,
6228 );
6229 if builder.frame_space_remaining() > frame.size() {
6230 builder.write_frame(frame, stats);
6231
6232 self.next_observed_addr_seq_no =
6233 self.next_observed_addr_seq_no.saturating_add(1u8);
6234 path.observed_addr_sent = true;
6235
6236 space.pending.observed_addr = false;
6237 }
6238 }
6239 }
6240
6241 while !scheduling_info.is_abandoned
6243 && scheduling_info.may_send_data
6244 && let Some(reach_out) = space
6245 .pending
6246 .reach_out
6247 .pop_if(|frame| builder.frame_space_remaining() >= frame.size())
6248 {
6249 builder.write_frame(reach_out, stats);
6250 }
6251
6252 if space_id == SpaceId::Data
6254 && scheduling_info.is_abandoned
6255 && scheduling_info.may_self_abandon
6256 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
6257 && let Some(error_code) = space.pending.path_abandon.remove(&path_id)
6258 {
6259 let frame = frame::PathAbandon {
6260 path_id,
6261 error_code,
6262 };
6263 builder.write_frame(frame, stats);
6264
6265 self.remote_cids.remove(&path_id);
6268 }
6269 while space_id == SpaceId::Data
6270 && scheduling_info.may_send_data
6271 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
6272 && let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
6273 {
6274 let frame = frame::PathAbandon {
6275 path_id: abandoned_path_id,
6276 error_code,
6277 };
6278 builder.write_frame(frame, stats);
6279
6280 self.remote_cids.remove(&abandoned_path_id);
6283 }
6284
6285 if !scheduling_info.is_abandoned
6287 && scheduling_info.may_send_data
6288 && space_id == SpaceId::Data
6289 && self
6290 .config
6291 .address_discovery_role
6292 .should_report(&self.peer_params.address_discovery_role)
6293 && (!path.observed_addr_sent || space.pending.observed_addr)
6294 {
6295 let frame =
6296 frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
6297 if builder.frame_space_remaining() > frame.size() {
6298 builder.write_frame(frame, stats);
6299
6300 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
6301 path.observed_addr_sent = true;
6302
6303 space.pending.observed_addr = false;
6304 }
6305 }
6306
6307 while !is_0rtt
6309 && !scheduling_info.is_abandoned
6310 && scheduling_info.may_send_data
6311 && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
6312 {
6313 let Some(mut frame) = space.pending.crypto.pop_front() else {
6314 break;
6315 };
6316
6317 let max_crypto_data_size = builder.frame_space_remaining()
6322 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
6324 - 2; let len = frame
6327 .data
6328 .len()
6329 .min(2usize.pow(14) - 1)
6330 .min(max_crypto_data_size);
6331
6332 let data = frame.data.split_to(len);
6333 let offset = frame.offset;
6334 let truncated = frame::Crypto { offset, data };
6335 builder.write_frame(truncated, stats);
6336
6337 if !frame.data.is_empty() {
6338 frame.offset += len as u64;
6339 space.pending.crypto.push_front(frame);
6340 }
6341 }
6342
6343 while space_id == SpaceId::Data
6345 && !scheduling_info.is_abandoned
6346 && scheduling_info.may_send_data
6347 && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
6348 {
6349 let Some(path_id) = space.pending.path_status.pop_first() else {
6350 break;
6351 };
6352 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
6353 trace!(%path_id, "discarding queued path status for unknown path");
6354 continue;
6355 };
6356
6357 let seq = path.status.seq();
6358 match path.local_status() {
6359 PathStatus::Available => {
6360 let frame = frame::PathStatusAvailable {
6361 path_id,
6362 status_seq_no: seq,
6363 };
6364 builder.write_frame(frame, stats);
6365 }
6366 PathStatus::Backup => {
6367 let frame = frame::PathStatusBackup {
6368 path_id,
6369 status_seq_no: seq,
6370 };
6371 builder.write_frame(frame, stats);
6372 }
6373 }
6374 }
6375
6376 if space_id == SpaceId::Data
6378 && !scheduling_info.is_abandoned
6379 && scheduling_info.may_send_data
6380 && space.pending.max_path_id
6381 && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
6382 {
6383 let frame = frame::MaxPathId(self.local_max_path_id);
6384 builder.write_frame(frame, stats);
6385 space.pending.max_path_id = false;
6386 }
6387
6388 if space_id == SpaceId::Data
6390 && !scheduling_info.is_abandoned
6391 && scheduling_info.may_send_data
6392 && space.pending.paths_blocked
6393 && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
6394 {
6395 let frame = frame::PathsBlocked(self.remote_max_path_id);
6396 builder.write_frame(frame, stats);
6397 space.pending.paths_blocked = false;
6398 }
6399
6400 while space_id == SpaceId::Data
6402 && !scheduling_info.is_abandoned
6403 && scheduling_info.may_send_data
6404 && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
6405 {
6406 let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
6407 break;
6408 };
6409 let next_seq = match self.remote_cids.get(&path_id) {
6410 Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
6411 None => VarInt(0),
6412 };
6413 let frame = frame::PathCidsBlocked { path_id, next_seq };
6414 builder.write_frame(frame, stats);
6415 }
6416
6417 if space_id == SpaceId::Data
6419 && !scheduling_info.is_abandoned
6420 && scheduling_info.may_send_data
6421 {
6422 self.streams
6423 .write_control_frames(builder, &mut space.pending, stats);
6424 }
6425
6426 let cid_len = self
6428 .local_cid_state
6429 .values()
6430 .map(|cid_state| cid_state.cid_len())
6431 .max()
6432 .expect("some local CID state must exist");
6433 let new_cid_size_bound =
6434 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
6435 while !scheduling_info.is_abandoned
6436 && scheduling_info.may_send_data
6437 && builder.frame_space_remaining() > new_cid_size_bound
6438 {
6439 let Some(issued) = space.pending.new_cids.pop() else {
6440 break;
6441 };
6442 let Some(cid_state) = self.local_cid_state.get(&issued.path_id) else {
6444 debug!(
6445 path = %issued.path_id, seq = issued.sequence,
6446 "dropping queued NEW_CONNECTION_ID for discarded path",
6447 );
6448 continue;
6449 };
6450 let retire_prior_to = cid_state.retire_prior_to();
6451
6452 let cid_path_id = match is_multipath_negotiated {
6453 true => Some(issued.path_id),
6454 false => {
6455 debug_assert_eq!(issued.path_id, PathId::ZERO);
6456 None
6457 }
6458 };
6459 let frame = frame::NewConnectionId {
6460 path_id: cid_path_id,
6461 sequence: issued.sequence,
6462 retire_prior_to,
6463 id: issued.id,
6464 reset_token: issued.reset_token,
6465 };
6466 builder.write_frame(frame, stats);
6467 }
6468
6469 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
6471 while !scheduling_info.is_abandoned
6472 && scheduling_info.may_send_data
6473 && builder.frame_space_remaining() > retire_cid_bound
6474 {
6475 let (path_id, sequence) = match space.pending.retire_cids.pop() {
6476 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
6477 Some((path_id, seq)) => (Some(path_id), seq),
6478 None => break,
6479 };
6480 let frame = frame::RetireConnectionId { path_id, sequence };
6481 builder.write_frame(frame, stats);
6482 }
6483
6484 let mut sent_datagrams = false;
6486 while !scheduling_info.is_abandoned
6487 && scheduling_info.may_send_data
6488 && builder.frame_space_remaining() > Datagram::SIZE_BOUND
6489 && space_id == SpaceId::Data
6490 {
6491 match self.datagrams.write(builder, stats) {
6492 true => {
6493 sent_datagrams = true;
6494 }
6495 false => break,
6496 }
6497 }
6498 if self.datagrams.send_blocked && sent_datagrams {
6499 self.events.push_back(Event::DatagramsUnblocked);
6500 self.datagrams.send_blocked = false;
6501 }
6502
6503 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6504
6505 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
6507 while let Some(network_path) = space.pending.new_tokens.pop() {
6508 debug_assert_eq!(space_id, SpaceId::Data);
6509 let ConnectionSide::Server { server_config } = &self.side else {
6510 panic!("NEW_TOKEN frames should not be enqueued by clients");
6511 };
6512
6513 if !network_path.is_probably_same_path(&path.network_path) {
6514 continue;
6519 }
6520
6521 let token = Token::new(
6522 TokenPayload::Validation {
6523 ip: network_path.remote.ip(),
6524 issued: server_config.time_source.now(),
6525 },
6526 &mut self.rng,
6527 );
6528 let new_token = NewToken {
6529 token: token.encode(&*server_config.token_key).into(),
6530 };
6531
6532 if builder.frame_space_remaining() < new_token.size() {
6533 space.pending.new_tokens.push(network_path);
6534 break;
6535 }
6536
6537 builder.write_frame(new_token, stats);
6538 builder.retransmits_mut().new_tokens.push(network_path);
6539 }
6540 }
6541
6542 while space_id == SpaceId::Data
6544 && !scheduling_info.is_abandoned
6545 && scheduling_info.may_send_data
6546 && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
6547 {
6548 if let Some(added_address) = space.pending.add_address.pop_last() {
6549 builder.write_frame(added_address, stats);
6550 } else {
6551 break;
6552 }
6553 }
6554
6555 while space_id == SpaceId::Data
6557 && !scheduling_info.is_abandoned
6558 && scheduling_info.may_send_data
6559 && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
6560 {
6561 if let Some(removed_address) = space.pending.remove_address.pop_last() {
6562 builder.write_frame(removed_address, stats);
6563 } else {
6564 break;
6565 }
6566 }
6567
6568 if !scheduling_info.is_abandoned
6570 && scheduling_info.may_send_data
6571 && space_id == SpaceId::Data
6572 {
6573 self.streams
6574 .write_stream_frames(builder, self.config.send_fairness, stats);
6575 }
6576 }
6577
6578 fn populate_acks<'a, 'b>(
6580 now: Instant,
6581 receiving_ecn: bool,
6582 path_id: PathId,
6583 space_id: SpaceId,
6584 space: &mut PacketSpace,
6585 is_multipath_negotiated: bool,
6586 builder: &mut PacketBuilder<'a, 'b>,
6587 stats: &mut FrameStats,
6588 space_has_keys: bool,
6589 ) {
6590 debug_assert!(space_has_keys, "tried to send ACK in 0-RTT");
6592
6593 debug_assert!(
6594 is_multipath_negotiated || path_id == PathId::ZERO,
6595 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
6596 );
6597 if is_multipath_negotiated {
6598 debug_assert!(
6599 space_id == SpaceId::Data || path_id == PathId::ZERO,
6600 "path acks must be sent in 1RTT space (have {space_id:?})"
6601 );
6602 }
6603
6604 let pns = space.for_path(path_id);
6605 let ranges = pns.pending_acks.ranges();
6606 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
6607 let ecn = if receiving_ecn {
6608 Some(&pns.ecn_counters)
6609 } else {
6610 None
6611 };
6612
6613 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
6614 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
6616 let delay = delay_micros >> ack_delay_exp.into_inner();
6617
6618 if is_multipath_negotiated && space_id == SpaceId::Data {
6619 if !ranges.is_empty() {
6620 let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
6621 builder.write_frame(frame, stats);
6622 }
6623 } else {
6624 builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
6625 }
6626 }
6627
6628 fn close_common(&mut self) {
6629 trace!("connection closed");
6630 self.timers.reset();
6631 }
6632
6633 fn set_close_timer(&mut self, now: Instant) {
6634 let pto_max = self.max_pto_for_space(self.highest_space);
6637 self.timers.set(
6638 Timer::Conn(ConnTimer::Close),
6639 now + 3 * pto_max,
6640 self.qlog.with_time(now),
6641 );
6642 }
6643
6644 fn handle_peer_params(
6649 &mut self,
6650 params: TransportParameters,
6651 local_cid: ConnectionId,
6652 remote_cid: ConnectionId,
6653 now: Instant,
6654 ) -> Result<(), TransportError> {
6655 if Some(self.original_remote_cid) != params.initial_src_cid
6656 || (self.side.is_client()
6657 && (Some(self.initial_dst_cid) != params.original_dst_cid
6658 || self.retry_src_cid != params.retry_src_cid))
6659 {
6660 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
6661 "CID authentication failure",
6662 ));
6663 }
6664 if params.initial_max_path_id.is_some() && (local_cid.is_empty() || remote_cid.is_empty()) {
6665 return Err(TransportError::PROTOCOL_VIOLATION(
6666 "multipath must not use zero-length CIDs",
6667 ));
6668 }
6669
6670 self.set_peer_params(params);
6671 self.qlog.emit_peer_transport_params_received(self, now);
6672
6673 Ok(())
6674 }
6675
6676 fn set_peer_params(&mut self, params: TransportParameters) {
6677 self.streams.set_params(¶ms);
6678 self.idle_timeout =
6679 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
6680 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
6681
6682 if let Some(ref info) = params.preferred_address {
6683 self.remote_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
6685 path_id: None,
6686 sequence: 1,
6687 id: info.connection_id,
6688 reset_token: info.stateless_reset_token,
6689 retire_prior_to: 0,
6690 })
6691 .expect(
6692 "preferred address CID is the first received, and hence is guaranteed to be legal",
6693 );
6694 let remote = self.path_data(PathId::ZERO).network_path.remote;
6695 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
6696 }
6697 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
6698
6699 let mut multipath_enabled = false;
6700 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
6701 self.config.get_initial_max_path_id(),
6702 params.initial_max_path_id,
6703 ) {
6704 self.local_max_path_id = local_max_path_id;
6706 self.remote_max_path_id = remote_max_path_id;
6707 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
6708 debug!(%initial_max_path_id, "multipath negotiated");
6709 multipath_enabled = true;
6710 }
6711
6712 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
6713 self.config
6714 .max_remote_nat_traversal_addresses
6715 .zip(params.max_remote_nat_traversal_addresses)
6716 {
6717 if multipath_enabled {
6718 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
6719 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
6720 self.n0_nat_traversal = n0_nat_traversal::State::new(
6721 max_remote_addresses,
6722 max_local_addresses,
6723 self.side(),
6724 );
6725 debug!(
6726 %max_remote_addresses, %max_local_addresses,
6727 "n0's nat traversal negotiated"
6728 );
6729 } else {
6730 debug!("n0 nat traversal enabled for both endpoints, but multipath is missing")
6731 }
6732 }
6733
6734 self.peer_params = params;
6735 let peer_max_udp_payload_size =
6736 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
6737 self.path_data_mut(PathId::ZERO)
6738 .mtud
6739 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
6740 }
6741
6742 fn decrypt_packet(
6744 &mut self,
6745 now: Instant,
6746 path_id: PathId,
6747 packet: &mut Packet,
6748 ) -> Result<Option<u64>, Option<TransportError>> {
6749 let result = self
6750 .crypto_state
6751 .decrypt_packet_body(packet, path_id, &self.spaces)?;
6752
6753 let Some(result) = result else {
6754 return Ok(None);
6755 };
6756
6757 if result.outgoing_key_update_acked
6758 && let Some(prev) = self.crypto_state.prev_crypto.as_mut()
6759 {
6760 prev.end_packet = Some((result.packet_number, now));
6761 self.set_key_discard_timer(now, packet.header.space());
6762 }
6763
6764 if result.incoming_key_update {
6765 trace!("key update authenticated");
6766 self.crypto_state
6767 .update_keys(Some((result.packet_number, now)), true);
6768 self.set_key_discard_timer(now, packet.header.space());
6769 }
6770
6771 Ok(Some(result.packet_number))
6772 }
6773
6774 fn peer_supports_ack_frequency(&self) -> bool {
6775 self.peer_params.min_ack_delay.is_some()
6776 }
6777
6778 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
6783 debug_assert_eq!(
6784 self.highest_space,
6785 SpaceKind::Data,
6786 "immediate ack must be written in the data space"
6787 );
6788 self.spaces[SpaceId::Data]
6789 .for_path(path_id)
6790 .immediate_ack_pending = true;
6791 }
6792
6793 #[cfg(test)]
6795 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
6796 let ConnectionEventInner::Datagram(DatagramConnectionEvent {
6797 path_id,
6798 first_decode,
6799 remaining,
6800 ..
6801 }) = &event.0
6802 else {
6803 return None;
6804 };
6805
6806 if remaining.is_some() {
6807 panic!("Packets should never be coalesced in tests");
6808 }
6809
6810 let decrypted_header = self
6811 .crypto_state
6812 .unprotect_header(first_decode.clone(), self.peer_params.stateless_reset_token)?;
6813
6814 let mut packet = decrypted_header.packet?;
6815 self.crypto_state
6816 .decrypt_packet_body(&mut packet, *path_id, &self.spaces)
6817 .ok()?;
6818
6819 Some(packet.payload.to_vec())
6820 }
6821
6822 #[cfg(test)]
6825 pub(crate) fn bytes_in_flight(&self) -> u64 {
6826 self.path_data(PathId::ZERO).in_flight.bytes
6828 }
6829
6830 #[cfg(test)]
6832 pub(crate) fn congestion_window(&self) -> u64 {
6833 let path = self.path_data(PathId::ZERO);
6834 path.congestion
6835 .window()
6836 .saturating_sub(path.in_flight.bytes)
6837 }
6838
6839 #[cfg(test)]
6841 pub(crate) fn is_idle(&self) -> bool {
6842 let current_timers = self.timers.values();
6843 current_timers
6844 .into_iter()
6845 .filter(|(timer, _)| {
6846 !matches!(
6847 timer,
6848 Timer::Conn(ConnTimer::KeepAlive)
6849 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6850 | Timer::Conn(ConnTimer::PushNewCid)
6851 | Timer::Conn(ConnTimer::KeyDiscard)
6852 )
6853 })
6854 .min_by_key(|(_, time)| *time)
6855 .is_none_or(|(timer, _)| {
6856 matches!(
6857 timer,
6858 Timer::Conn(ConnTimer::Idle) | Timer::PerPath(_, PathTimer::PathIdle)
6859 )
6860 })
6861 }
6862
6863 #[cfg(test)]
6865 pub(crate) fn using_ecn(&self) -> bool {
6866 self.path_data(PathId::ZERO).sending_ecn
6867 }
6868
6869 #[cfg(test)]
6871 pub(crate) fn total_recvd(&self) -> u64 {
6872 self.path_data(PathId::ZERO).total_recvd
6873 }
6874
6875 #[cfg(test)]
6876 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6877 self.local_cid_state
6878 .get(&PathId::ZERO)
6879 .unwrap()
6880 .active_seq()
6881 }
6882
6883 #[cfg(test)]
6884 #[track_caller]
6885 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6886 self.local_cid_state
6887 .get(&PathId(path_id))
6888 .unwrap()
6889 .active_seq()
6890 }
6891
6892 #[cfg(test)]
6895 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6896 let n = self
6897 .local_cid_state
6898 .get_mut(&PathId::ZERO)
6899 .unwrap()
6900 .assign_retire_seq(v);
6901 debug_assert!(!self.state.is_drained()); self.endpoint_events
6903 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6904 }
6905
6906 #[cfg(test)]
6908 pub(crate) fn active_remote_cid_seq(&self) -> u64 {
6909 self.remote_cids.get(&PathId::ZERO).unwrap().active_seq()
6910 }
6911
6912 #[cfg(test)]
6914 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6915 self.path_data(path_id).current_mtu()
6916 }
6917
6918 #[cfg(test)]
6920 pub(crate) fn trigger_path_validation(&mut self) {
6921 for path in self.paths.values_mut() {
6922 path.data.pending_on_path_challenge = true;
6923 }
6924 }
6925
6926 #[cfg(test)]
6928 pub fn simulate_protocol_violation(&mut self, now: Instant) {
6929 if !self.state.is_closed() {
6930 self.state
6931 .move_to_closed(TransportError::PROTOCOL_VIOLATION("simulated violation"));
6932 self.close_common();
6933 if !self.state.is_drained() {
6934 self.set_close_timer(now);
6935 }
6936 self.connection_close_pending = true;
6937 }
6938 }
6939
6940 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6951 let space_specific = self.paths.get(&path_id).is_some_and(|path| {
6952 path.data.pending_on_path_challenge || !path.data.path_responses.is_empty()
6953 });
6954
6955 let other = self.streams.can_send_stream_data()
6957 || self
6958 .datagrams
6959 .outgoing
6960 .front()
6961 .is_some_and(|x| x.size(true) <= max_size);
6962
6963 SendableFrames {
6965 acks: false,
6966 close: false,
6967 space_specific,
6968 other,
6969 }
6970 }
6971
6972 fn kill(&mut self, reason: ConnectionError) {
6974 self.close_common();
6975 let was_draining = self.state.move_to_drained(Some(reason));
6976 if !was_draining {
6977 self.endpoint_events.push_back(EndpointEventInner::Draining);
6978 }
6979 self.endpoint_events.push_back(EndpointEventInner::Drained);
6982 }
6983
6984 pub fn current_mtu(&self) -> u16 {
6991 self.paths
6992 .iter()
6993 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6994 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6995 .min()
6996 .unwrap_or(INITIAL_MTU)
6997 }
6998
6999 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
7006 let pn_len = PacketNumber::new(
7007 pn,
7008 self.spaces[SpaceId::Data]
7009 .for_path(path)
7010 .largest_acked_packet_pn
7011 .unwrap_or(0),
7012 )
7013 .len();
7014
7015 1 + self
7017 .remote_cids
7018 .get(&path)
7019 .map(|cids| cids.active().len())
7020 .unwrap_or(20) + pn_len
7022 + self.tag_len_1rtt()
7023 }
7024
7025 fn predict_1rtt_overhead_no_pn(&self) -> usize {
7026 let pn_len = 4;
7027
7028 let cid_len = self
7029 .remote_cids
7030 .values()
7031 .map(|cids| cids.active().len())
7032 .max()
7033 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
7037 }
7038
7039 fn tag_len_1rtt(&self) -> usize {
7040 let packet_crypto = self
7042 .crypto_state
7043 .encryption_keys(SpaceKind::Data, self.side.side())
7044 .map(|(_header, packet, _level)| packet);
7045 packet_crypto.map_or(16, |x| x.tag_len())
7049 }
7050
7051 fn on_path_validated(&mut self, path_id: PathId) {
7053 self.path_data_mut(path_id).validated = true;
7054 let ConnectionSide::Server { server_config } = &self.side else {
7055 return;
7056 };
7057 let network_path = self.path_data(path_id).network_path;
7058 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
7059 new_tokens.clear();
7060 for _ in 0..server_config.validation_token.sent {
7061 new_tokens.push(network_path);
7062 }
7063 }
7064
7065 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
7067 if let Some(path) = self.paths.get_mut(&path_id) {
7068 path.data.status.remote_update(status, status_seq_no);
7069 } else {
7070 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
7071 }
7072 self.events.push_back(
7073 PathEvent::RemoteStatus {
7074 id: path_id,
7075 status,
7076 }
7077 .into(),
7078 );
7079 }
7080
7081 fn max_path_id(&self) -> Option<PathId> {
7090 if self.is_multipath_negotiated() {
7091 Some(self.remote_max_path_id.min(self.local_max_path_id))
7092 } else {
7093 None
7094 }
7095 }
7096
7097 pub(crate) fn is_ipv6(&self) -> bool {
7102 self.paths
7103 .values()
7104 .any(|p| p.data.network_path.remote.is_ipv6())
7105 }
7106
7107 pub fn add_nat_traversal_address(
7109 &mut self,
7110 address: SocketAddr,
7111 ) -> Result<(), n0_nat_traversal::Error> {
7112 if let Some(added) = self.n0_nat_traversal.add_local_address(address)? {
7113 self.spaces[SpaceId::Data].pending.add_address.insert(added);
7114 };
7115 Ok(())
7116 }
7117
7118 pub fn remove_nat_traversal_address(
7122 &mut self,
7123 address: SocketAddr,
7124 ) -> Result<(), n0_nat_traversal::Error> {
7125 if let Some(removed) = self.n0_nat_traversal.remove_local_address(address)? {
7126 self.spaces[SpaceId::Data]
7127 .pending
7128 .remove_address
7129 .insert(removed);
7130 }
7131 Ok(())
7132 }
7133
7134 pub fn get_local_nat_traversal_addresses(
7136 &self,
7137 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7138 self.n0_nat_traversal.get_local_nat_traversal_addresses()
7139 }
7140
7141 pub fn get_remote_nat_traversal_addresses(
7143 &self,
7144 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7145 Ok(self
7146 .n0_nat_traversal
7147 .client_side()?
7148 .get_remote_nat_traversal_addresses())
7149 }
7150
7151 pub fn initiate_nat_traversal_round(
7163 &mut self,
7164 now: Instant,
7165 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7166 if self.state.is_closed() {
7167 return Err(n0_nat_traversal::Error::Closed);
7168 }
7169
7170 let ipv6 = self.is_ipv6();
7171 let client_state = self.n0_nat_traversal.client_side_mut()?;
7172 let (mut reach_out_frames, probed_addrs) =
7173 client_state.initiate_nat_traversal_round(ipv6)?;
7174 if let Some(delay) = self.n0_nat_traversal.retry_delay(self.config.initial_rtt) {
7175 self.timers.set(
7176 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
7177 now + delay,
7178 self.qlog.with_time(now),
7179 );
7180 }
7181
7182 self.spaces[SpaceId::Data]
7183 .pending
7184 .reach_out
7185 .append(&mut reach_out_frames);
7186
7187 Ok(probed_addrs)
7188 }
7189
7190 fn is_handshake_confirmed(&self) -> bool {
7199 !self.is_handshaking() && !self.crypto_state.has_keys(EncryptionLevel::Handshake)
7200 }
7201}
7202
7203impl fmt::Debug for Connection {
7204 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
7205 f.debug_struct("Connection")
7206 .field("handshake_cid", &self.handshake_cid)
7207 .finish()
7208 }
7209}
7210
7211pub trait NetworkChangeHint: std::fmt::Debug + 'static {
7213 fn is_path_recoverable(&self, path_id: PathId, network_path: FourTuple) -> bool;
7222}
7223
7224#[derive(Debug)]
7226enum PollPathSpaceStatus {
7227 NothingToSend {
7229 congestion_blocked: bool,
7231 },
7232 WrotePacket {
7234 last_packet_number: u64,
7236 pad_datagram: PadDatagram,
7250 },
7251 Send {
7258 last_packet_number: u64,
7260 },
7261}
7262
7263#[derive(Debug, Copy, Clone)]
7269struct PathSchedulingInfo {
7270 is_abandoned: bool,
7276 may_send_data: bool,
7294 may_send_close: bool,
7300 may_self_abandon: bool,
7301}
7302
7303#[derive(Debug, Copy, Clone, PartialEq, Eq)]
7304enum PathBlocked {
7305 No,
7306 AntiAmplification,
7307 Congestion,
7308 Pacing,
7309}
7310
7311enum ConnectionSide {
7313 Client {
7314 token: Bytes,
7316 token_store: Arc<dyn TokenStore>,
7317 server_name: String,
7318 },
7319 Server {
7320 server_config: Arc<ServerConfig>,
7321 },
7322}
7323
7324impl ConnectionSide {
7325 fn is_client(&self) -> bool {
7326 self.side().is_client()
7327 }
7328
7329 fn is_server(&self) -> bool {
7330 self.side().is_server()
7331 }
7332
7333 fn side(&self) -> Side {
7334 match *self {
7335 Self::Client { .. } => Side::Client,
7336 Self::Server { .. } => Side::Server,
7337 }
7338 }
7339}
7340
7341impl From<SideArgs> for ConnectionSide {
7342 fn from(side: SideArgs) -> Self {
7343 match side {
7344 SideArgs::Client {
7345 token_store,
7346 server_name,
7347 } => Self::Client {
7348 token: token_store.take(&server_name).unwrap_or_default(),
7349 token_store,
7350 server_name,
7351 },
7352 SideArgs::Server {
7353 server_config,
7354 pref_addr_cid: _,
7355 path_validated: _,
7356 } => Self::Server { server_config },
7357 }
7358 }
7359}
7360
7361pub(crate) enum SideArgs {
7363 Client {
7364 token_store: Arc<dyn TokenStore>,
7365 server_name: String,
7366 },
7367 Server {
7368 server_config: Arc<ServerConfig>,
7369 pref_addr_cid: Option<ConnectionId>,
7370 path_validated: bool,
7371 },
7372}
7373
7374impl SideArgs {
7375 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
7376 match *self {
7377 Self::Client { .. } => None,
7378 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
7379 }
7380 }
7381
7382 pub(crate) fn path_validated(&self) -> bool {
7383 match *self {
7384 Self::Client { .. } => true,
7385 Self::Server { path_validated, .. } => path_validated,
7386 }
7387 }
7388
7389 pub(crate) fn side(&self) -> Side {
7390 match *self {
7391 Self::Client { .. } => Side::Client,
7392 Self::Server { .. } => Side::Server,
7393 }
7394 }
7395}
7396
7397#[derive(Debug, Error, Clone, PartialEq, Eq)]
7399pub enum ConnectionError {
7400 #[error("peer doesn't implement any supported version")]
7402 VersionMismatch,
7403 #[error(transparent)]
7405 TransportError(#[from] TransportError),
7406 #[error("aborted by peer: {0}")]
7408 ConnectionClosed(frame::ConnectionClose),
7409 #[error("closed by peer: {0}")]
7411 ApplicationClosed(frame::ApplicationClose),
7412 #[error("reset by peer")]
7414 Reset,
7415 #[error("timed out")]
7421 TimedOut,
7422 #[error("closed")]
7424 LocallyClosed,
7425 #[error("CIDs exhausted")]
7429 CidsExhausted,
7430}
7431
7432impl From<Close> for ConnectionError {
7433 fn from(x: Close) -> Self {
7434 match x {
7435 Close::Connection(reason) => Self::ConnectionClosed(reason),
7436 Close::Application(reason) => Self::ApplicationClosed(reason),
7437 }
7438 }
7439}
7440
7441impl From<ConnectionError> for io::Error {
7443 fn from(x: ConnectionError) -> Self {
7444 use ConnectionError::*;
7445 let kind = match x {
7446 TimedOut => io::ErrorKind::TimedOut,
7447 Reset => io::ErrorKind::ConnectionReset,
7448 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
7449 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
7450 io::ErrorKind::Other
7451 }
7452 };
7453 Self::new(kind, x)
7454 }
7455}
7456
7457#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
7460pub enum PathError {
7461 #[error("multipath extension not negotiated")]
7463 MultipathNotNegotiated,
7464 #[error("the server side may not open a path")]
7466 ServerSideNotAllowed,
7467 #[error("maximum number of concurrent paths reached")]
7469 MaxPathIdReached,
7470 #[error("remoted CIDs exhausted")]
7472 RemoteCidsExhausted,
7473 #[error("path validation failed")]
7475 ValidationFailed,
7476 #[error("invalid remote address")]
7478 InvalidRemoteAddress(SocketAddr),
7479}
7480
7481#[derive(Debug, Error, Clone, Eq, PartialEq)]
7483pub enum ClosePathError {
7484 #[error("Multipath extension not negotiated")]
7486 MultipathNotNegotiated,
7487 #[error("closed path")]
7489 ClosedPath,
7490 #[error("last open path")]
7494 LastOpenPath,
7495}
7496
7497#[derive(Debug, Error, Clone, Copy)]
7499#[error("Multipath extension not negotiated")]
7500pub struct MultipathNotNegotiated {
7501 _private: (),
7502}
7503
7504#[derive(Debug)]
7506pub enum Event {
7507 HandshakeDataReady,
7509 Connected,
7511 HandshakeConfirmed,
7513 ConnectionLost {
7520 reason: ConnectionError,
7522 },
7523 Stream(StreamEvent),
7525 DatagramReceived,
7527 DatagramsUnblocked,
7529 Path(PathEvent),
7531 NatTraversal(n0_nat_traversal::Event),
7533}
7534
7535impl From<PathEvent> for Event {
7536 fn from(source: PathEvent) -> Self {
7537 Self::Path(source)
7538 }
7539}
7540
7541fn get_max_ack_delay(params: &TransportParameters) -> Duration {
7542 Duration::from_micros(params.max_ack_delay.0 * 1000)
7543}
7544
7545const MAX_BACKOFF_EXPONENT: u32 = 16;
7547
7548const MAX_PTO_INTERVAL: Duration = Duration::from_secs(2);
7552
7553const MIN_IDLE_FOR_FAST_PTO: Duration = Duration::from_secs(25);
7555
7556const MAX_PTO_FAST_INTERVAL: Duration = Duration::from_secs(1);
7561
7562const SLOW_RTT_THRESHOLD: Duration =
7567 Duration::from_millis((MAX_PTO_INTERVAL.as_millis() as u64 * 2) / 3);
7568
7569const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
7577
7578const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
7584 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
7585
7586#[derive(Default)]
7587struct SentFrames {
7588 retransmits: ThinRetransmits,
7589 largest_acked: FxHashMap<PathId, u64>,
7591 stream_frames: StreamMetaVec,
7592 non_retransmits: bool,
7594 requires_padding: bool,
7596}
7597
7598impl SentFrames {
7599 fn is_ack_only(&self, streams: &StreamsState) -> bool {
7601 !self.largest_acked.is_empty()
7602 && !self.non_retransmits
7603 && self.stream_frames.is_empty()
7604 && self.retransmits.is_empty(streams)
7605 }
7606
7607 fn retransmits_mut(&mut self) -> &mut Retransmits {
7608 self.retransmits.get_or_create()
7609 }
7610
7611 fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
7612 use frame::EncodableFrame::*;
7613 match frame {
7614 PathAck(path_ack_encoder) => {
7615 if let Some(max) = path_ack_encoder.ranges.max() {
7616 self.largest_acked.insert(path_ack_encoder.path_id, max);
7617 }
7618 }
7619 Ack(ack_encoder) => {
7620 if let Some(max) = ack_encoder.ranges.max() {
7621 self.largest_acked.insert(PathId::ZERO, max);
7622 }
7623 }
7624 Close(_) => { }
7625 PathResponse(_) => self.non_retransmits = true,
7626 HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
7627 ReachOut(frame) => self.retransmits_mut().reach_out.push(frame),
7628 ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
7629 Ping(_) => self.non_retransmits = true,
7630 ImmediateAck(_) => self.non_retransmits = true,
7631 AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
7632 PathChallenge(_) => self.non_retransmits = true,
7633 Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
7634 PathAbandon(path_abandon) => {
7635 self.retransmits_mut()
7636 .path_abandon
7637 .entry(path_abandon.path_id)
7638 .or_insert(path_abandon.error_code);
7639 }
7640 PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
7641 | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
7642 self.retransmits_mut().path_status.insert(path_id);
7643 }
7644 MaxPathId(_) => self.retransmits_mut().max_path_id = true,
7645 PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
7646 PathCidsBlocked(path_cids_blocked) => {
7647 self.retransmits_mut()
7648 .path_cids_blocked
7649 .insert(path_cids_blocked.path_id);
7650 }
7651 ResetStream(reset) => self
7652 .retransmits_mut()
7653 .reset_stream
7654 .push((reset.id, reset.error_code)),
7655 StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
7656 NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
7657 RetireConnectionId(retire_cid) => self
7658 .retransmits_mut()
7659 .retire_cids
7660 .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
7661 Datagram(_) => self.non_retransmits = true,
7662 NewToken(_) => {}
7663 AddAddress(add_address) => {
7664 self.retransmits_mut().add_address.insert(add_address);
7665 }
7666 RemoveAddress(remove_address) => {
7667 self.retransmits_mut().remove_address.insert(remove_address);
7668 }
7669 StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
7670 MaxData(_) => self.retransmits_mut().max_data = true,
7671 MaxStreamData(max) => {
7672 self.retransmits_mut().max_stream_data.insert(max.id);
7673 }
7674 MaxStreams(max_streams) => {
7675 self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
7676 }
7677 StreamsBlocked(streams_blocked) => {
7678 self.retransmits_mut().streams_blocked[streams_blocked.dir as usize] = true
7679 }
7680 }
7681 }
7682}
7683
7684fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
7692 match (x, y) {
7693 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
7694 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
7695 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
7696 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
7697 }
7698}
7699
7700#[cfg(test)]
7701mod tests {
7702 use super::*;
7703
7704 #[test]
7705 fn negotiate_max_idle_timeout_commutative() {
7706 let test_params = [
7707 (None, None, None),
7708 (None, Some(VarInt(0)), None),
7709 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7710 (Some(VarInt(0)), Some(VarInt(0)), None),
7711 (
7712 Some(VarInt(2)),
7713 Some(VarInt(0)),
7714 Some(Duration::from_millis(2)),
7715 ),
7716 (
7717 Some(VarInt(1)),
7718 Some(VarInt(4)),
7719 Some(Duration::from_millis(1)),
7720 ),
7721 ];
7722
7723 for (left, right, result) in test_params {
7724 assert_eq!(negotiate_max_idle_timeout(left, right), result);
7725 assert_eq!(negotiate_max_idle_timeout(right, left), result);
7726 }
7727 }
7728}