1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::SocketAddr,
7 num::{NonZeroU32, NonZeroUsize},
8 sync::Arc,
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13
14use rand::{RngExt, SeedableRng, rngs::StdRng};
15use rustc_hash::{FxHashMap, FxHashSet};
16use thiserror::Error;
17use tracing::{debug, error, trace, trace_span, warn};
18
19use crate::{
20 Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
21 MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
22 TransportError, TransportErrorCode, VarInt,
23 cid_generator::ConnectionIdGenerator,
24 cid_queue::CidQueue,
25 config::{ServerConfig, TransportConfig},
26 congestion::Controller,
27 connection::{
28 qlog::{QlogRecvPacket, QlogSink},
29 spaces::LostPacket,
30 stats::PathStatsMap,
31 timer::{ConnTimer, PathTimer},
32 },
33 crypto::{self, Keys},
34 frame::{
35 self, Close, DataBlocked, Datagram, FrameStruct, NewToken, ObservedAddr, StreamDataBlocked,
36 StreamsBlocked,
37 },
38 n0_nat_traversal,
39 packet::{
40 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
41 PacketNumber, PartialDecode, SpaceId,
42 },
43 range_set::ArrayRangeSet,
44 shared::{
45 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
46 EndpointEvent, EndpointEventInner,
47 },
48 token::{ResetToken, Token, TokenPayload},
49 transport_parameters::TransportParameters,
50};
51
52mod ack_frequency;
53use ack_frequency::AckFrequencyState;
54
55mod assembler;
56pub use assembler::Chunk;
57
58mod cid_state;
59use cid_state::CidState;
60
61mod datagrams;
62use datagrams::DatagramState;
63pub use datagrams::{Datagrams, SendDatagramError};
64
65mod mtud;
66mod pacing;
67
68mod packet_builder;
69use packet_builder::{PacketBuilder, PadDatagram};
70
71mod packet_crypto;
72use packet_crypto::CryptoState;
73pub(crate) use packet_crypto::EncryptionLevel;
74
75mod paths;
76pub use paths::{
77 ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError,
78};
79use paths::{PathData, PathState};
80
81pub(crate) mod qlog;
82pub(crate) mod send_buffer;
83
84pub(crate) mod spaces;
85#[cfg(fuzzing)]
86pub use spaces::Retransmits;
87#[cfg(not(fuzzing))]
88use spaces::Retransmits;
89pub(crate) use spaces::SpaceKind;
90use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
91
92mod stats;
93pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
94
95mod streams;
96#[cfg(fuzzing)]
97pub use streams::StreamsState;
98#[cfg(not(fuzzing))]
99use streams::StreamsState;
100pub use streams::{
101 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
102 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
103};
104
105mod timer;
106use timer::{Timer, TimerTable};
107
108mod transmit_buf;
109use transmit_buf::TransmitBuf;
110
111mod state;
112
113#[cfg(not(fuzzing))]
114use state::State;
115#[cfg(fuzzing)]
116pub use state::State;
117use state::StateType;
118
119pub struct Connection {
159 endpoint_config: Arc<EndpointConfig>,
160 config: Arc<TransportConfig>,
161 rng: StdRng,
162 crypto_state: CryptoState,
164 handshake_cid: ConnectionId,
166 remote_handshake_cid: ConnectionId,
168 paths: BTreeMap<PathId, PathState>,
174 path_generation_counter: u64,
185 allow_mtud: bool,
187 state: State,
188 side: ConnectionSide,
189 peer_params: TransportParameters,
191 original_remote_cid: ConnectionId,
193 initial_dst_cid: ConnectionId,
195 retry_src_cid: Option<ConnectionId>,
198 events: VecDeque<Event>,
200 endpoint_events: VecDeque<EndpointEventInner>,
201 spin_enabled: bool,
203 spin: bool,
205 spaces: [PacketSpace; 3],
207 highest_space: SpaceKind,
209 idle_timeout: Option<Duration>,
211 timers: TimerTable,
212 authentication_failures: u64,
214
215 connection_close_pending: bool,
220
221 ack_frequency: AckFrequencyState,
225
226 receiving_ecn: bool,
231 total_authed_packets: u64,
233
234 next_observed_addr_seq_no: VarInt,
239
240 streams: StreamsState,
241 remote_cids: FxHashMap<PathId, CidQueue>,
247 local_cid_state: FxHashMap<PathId, CidState>,
254 datagrams: DatagramState,
256 path_stats: PathStatsMap,
258 partial_stats: ConnectionStats,
264 version: u32,
266
267 max_concurrent_paths: NonZeroU32,
276 local_max_path_id: PathId,
291 remote_max_path_id: PathId,
297 max_path_id_with_cids: PathId,
303 abandoned_paths: FxHashSet<PathId>,
311
312 n0_nat_traversal: n0_nat_traversal::State,
314 qlog: QlogSink,
315}
316
317impl Connection {
318 pub(crate) fn new(
319 endpoint_config: Arc<EndpointConfig>,
320 config: Arc<TransportConfig>,
321 init_cid: ConnectionId,
322 local_cid: ConnectionId,
323 remote_cid: ConnectionId,
324 network_path: FourTuple,
325 crypto: Box<dyn crypto::Session>,
326 cid_gen: &dyn ConnectionIdGenerator,
327 now: Instant,
328 version: u32,
329 allow_mtud: bool,
330 rng_seed: [u8; 32],
331 side_args: SideArgs,
332 qlog: QlogSink,
333 ) -> Self {
334 let pref_addr_cid = side_args.pref_addr_cid();
335 let path_validated = side_args.path_validated();
336 let connection_side = ConnectionSide::from(side_args);
337 let side = connection_side.side();
338 let mut rng = StdRng::from_seed(rng_seed);
339 let initial_space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
340 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
341 #[cfg(test)]
342 let data_space = match config.deterministic_packet_numbers {
343 true => PacketSpace::new_deterministic(now, SpaceId::Data),
344 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
345 };
346 #[cfg(not(test))]
347 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
348 let state = State::handshake(state::Handshake {
349 remote_cid_set: side.is_server(),
350 expected_token: Bytes::new(),
351 client_hello: None,
352 allow_server_migration: side.is_client(),
353 });
354 let local_cid_state = FxHashMap::from_iter([(
355 PathId::ZERO,
356 CidState::new(
357 cid_gen.cid_len(),
358 cid_gen.cid_lifetime(),
359 now,
360 if pref_addr_cid.is_some() { 2 } else { 1 },
361 ),
362 )]);
363
364 let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
365 path.open_status = paths::OpenStatus::Informed;
366 let mut this = Self {
367 endpoint_config,
368 crypto_state: CryptoState::new(crypto, init_cid, side, &mut rng),
369 handshake_cid: local_cid,
370 remote_handshake_cid: remote_cid,
371 local_cid_state,
372 paths: BTreeMap::from_iter([(
373 PathId::ZERO,
374 PathState {
375 data: path,
376 prev: None,
377 },
378 )]),
379 path_generation_counter: 0,
380 allow_mtud,
381 state,
382 side: connection_side,
383 peer_params: TransportParameters::default(),
384 original_remote_cid: remote_cid,
385 initial_dst_cid: init_cid,
386 retry_src_cid: None,
387 events: VecDeque::new(),
388 endpoint_events: VecDeque::new(),
389 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
390 spin: false,
391 spaces: [initial_space, handshake_space, data_space],
392 highest_space: SpaceKind::Initial,
393 idle_timeout: match config.max_idle_timeout {
394 None | Some(VarInt(0)) => None,
395 Some(dur) => Some(Duration::from_millis(dur.0)),
396 },
397 timers: TimerTable::default(),
398 authentication_failures: 0,
399 connection_close_pending: false,
400
401 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
402 &TransportParameters::default(),
403 )),
404
405 receiving_ecn: false,
406 total_authed_packets: 0,
407
408 next_observed_addr_seq_no: 0u32.into(),
409
410 streams: StreamsState::new(
411 side,
412 config.max_concurrent_uni_streams,
413 config.max_concurrent_bidi_streams,
414 config.send_window,
415 config.receive_window,
416 config.stream_receive_window,
417 ),
418 datagrams: DatagramState::default(),
419 config,
420 remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]),
421 rng,
422 path_stats: Default::default(),
423 partial_stats: ConnectionStats::default(),
424 version,
425
426 max_concurrent_paths: NonZeroU32::MIN,
428 local_max_path_id: PathId::ZERO,
429 remote_max_path_id: PathId::ZERO,
430 max_path_id_with_cids: PathId::ZERO,
431 abandoned_paths: Default::default(),
432
433 n0_nat_traversal: Default::default(),
434 qlog,
435 };
436 if path_validated {
437 this.on_path_validated(PathId::ZERO);
438 }
439 if side.is_client() {
440 this.write_crypto();
442 this.init_0rtt(now);
443 }
444 this.qlog
445 .emit_tuple_assigned(PathId::ZERO, network_path, now);
446 this
447 }
448
449 #[must_use]
457 pub fn poll_timeout(&mut self) -> Option<Instant> {
458 self.timers.peek()
459 }
460
461 #[must_use]
467 pub fn poll(&mut self) -> Option<Event> {
468 if let Some(x) = self.events.pop_front() {
469 return Some(x);
470 }
471
472 if let Some(event) = self.streams.poll() {
473 return Some(Event::Stream(event));
474 }
475
476 if let Some(reason) = self.state.take_error() {
477 return Some(Event::ConnectionLost { reason });
478 }
479
480 None
481 }
482
483 #[must_use]
485 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
486 self.endpoint_events.pop_front().map(EndpointEvent)
487 }
488
489 #[must_use]
491 pub fn streams(&mut self) -> Streams<'_> {
492 Streams {
493 state: &mut self.streams,
494 conn_state: &self.state,
495 }
496 }
497
498 #[must_use]
500 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
501 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
502 RecvStream {
503 id,
504 state: &mut self.streams,
505 pending: &mut self.spaces[SpaceId::Data].pending,
506 }
507 }
508
509 #[must_use]
511 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
512 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
513 SendStream {
514 id,
515 state: &mut self.streams,
516 pending: &mut self.spaces[SpaceId::Data].pending,
517 conn_state: &self.state,
518 }
519 }
520
521 pub fn open_path_ensure(
538 &mut self,
539 network_path: FourTuple,
540 initial_status: PathStatus,
541 now: Instant,
542 ) -> Result<(PathId, bool), PathError> {
543 let existing_open_path = self.paths.iter().find(|(id, path)| {
544 network_path.is_probably_same_path(&path.data.network_path)
545 && !self.abandoned_paths.contains(*id)
546 });
547 match existing_open_path {
548 Some((path_id, _state)) => Ok((*path_id, true)),
549 None => Ok((self.open_path(network_path, initial_status, now)?, false)),
550 }
551 }
552
553 pub fn open_path(
558 &mut self,
559 network_path: FourTuple,
560 initial_status: PathStatus,
561 now: Instant,
562 ) -> Result<PathId, PathError> {
563 if !self.is_multipath_negotiated() {
564 return Err(PathError::MultipathNotNegotiated);
565 }
566 if self.side().is_server() {
567 return Err(PathError::ServerSideNotAllowed);
568 }
569
570 let max_abandoned = self.abandoned_paths.iter().max().copied();
571 let max_used = self.paths.keys().last().copied();
572 let path_id = max_abandoned
573 .max(max_used)
574 .unwrap_or(PathId::ZERO)
575 .saturating_add(1u8);
576
577 if Some(path_id) > self.max_path_id() {
578 return Err(PathError::MaxPathIdReached);
579 }
580 if path_id > self.remote_max_path_id {
581 self.spaces[SpaceId::Data].pending.paths_blocked = true;
582 return Err(PathError::MaxPathIdReached);
583 }
584 if self
585 .remote_cids
586 .get(&path_id)
587 .map(CidQueue::active)
588 .is_none()
589 {
590 self.spaces[SpaceId::Data]
591 .pending
592 .path_cids_blocked
593 .insert(path_id);
594 return Err(PathError::RemoteCidsExhausted);
595 }
596
597 let path = self.ensure_path(path_id, network_path, now, None);
598 path.status.local_update(initial_status);
599
600 Ok(path_id)
601 }
602
603 pub fn close_path(
609 &mut self,
610 now: Instant,
611 path_id: PathId,
612 error_code: VarInt,
613 ) -> Result<(), ClosePathError> {
614 self.close_path_inner(
615 now,
616 path_id,
617 PathAbandonReason::ApplicationClosed { error_code },
618 )
619 }
620
621 pub(crate) fn close_path_inner(
626 &mut self,
627 now: Instant,
628 path_id: PathId,
629 reason: PathAbandonReason,
630 ) -> Result<(), ClosePathError> {
631 if self.state.is_drained() {
632 return Ok(());
633 }
634
635 if !self.is_multipath_negotiated() {
636 return Err(ClosePathError::MultipathNotNegotiated);
637 }
638 if self.abandoned_paths.contains(&path_id)
639 || Some(path_id) > self.max_path_id()
640 || !self.paths.contains_key(&path_id)
641 {
642 return Err(ClosePathError::ClosedPath);
643 }
644
645 let is_last_path = !self
646 .paths
647 .keys()
648 .any(|id| *id != path_id && !self.abandoned_paths.contains(id));
649
650 if is_last_path && !reason.is_remote() {
651 return Err(ClosePathError::LastOpenPath);
652 }
653
654 self.abandon_path(now, path_id, reason);
655
656 if is_last_path {
660 let rtt = RttEstimator::new(self.config.initial_rtt);
664 let pto = rtt.pto_base() + self.ack_frequency.max_ack_delay_for_pto();
665 let grace = pto * 3;
666 self.timers.set(
667 Timer::Conn(ConnTimer::NoAvailablePath),
668 now + grace,
669 self.qlog.with_time(now),
670 );
671 }
672
673 Ok(())
674 }
675
676 fn abandon_path(&mut self, now: Instant, path_id: PathId, reason: PathAbandonReason) {
681 trace!(%path_id, ?reason, "abandoning path");
682
683 let pending_space = &mut self.spaces[SpaceId::Data].pending;
684 pending_space
686 .path_abandon
687 .insert(path_id, reason.error_code());
688
689 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
691 pending_space.path_cids_blocked.retain(|&id| id != path_id);
692 pending_space.path_status.retain(|&id| id != path_id);
693
694 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
696 for sent_packet in space.sent_packets.values_mut() {
697 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
698 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
699 retransmits.path_cids_blocked.retain(|&id| id != path_id);
700 retransmits.path_status.retain(|&id| id != path_id);
701 }
702 }
703 }
704
705 self.spaces[SpaceId::Data].for_path(path_id).loss_probes = 0;
710
711 debug_assert!(!self.state.is_drained()); self.endpoint_events
716 .push_back(EndpointEventInner::RetireResetToken(path_id));
717
718 self.abandoned_paths.insert(path_id);
719
720 for timer in timer::PathTimer::VALUES {
721 let keep_timer = match timer {
723 PathTimer::PathValidationFailed
727 | PathTimer::PathChallengeLost
728 | PathTimer::AbandonFromValidation => false,
729 PathTimer::PathKeepAlive | PathTimer::PathIdle => false,
732 PathTimer::MaxAckDelay => false,
735 PathTimer::PathDrained => false,
738 PathTimer::LossDetection => true,
741 PathTimer::Pacing => true,
745 };
746
747 if !keep_timer {
748 let qlog = self.qlog.with_time(now);
749 self.timers.stop(Timer::PerPath(path_id, timer), qlog);
750 }
751 }
752
753 self.set_loss_detection_timer(now, path_id);
758
759 self.events.push_back(Event::Path(PathEvent::Abandoned {
761 id: path_id,
762 reason,
763 }));
764 }
765
766 #[track_caller]
770 fn path_data(&self, path_id: PathId) -> &PathData {
771 if let Some(data) = self.paths.get(&path_id) {
772 &data.data
773 } else {
774 panic!(
775 "unknown path: {path_id}, currently known paths: {:?}",
776 self.paths.keys().collect::<Vec<_>>()
777 );
778 }
779 }
780
781 #[track_caller]
785 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
786 &mut self.paths.get_mut(&path_id).expect("known path").data
787 }
788
789 fn path(&self, path_id: PathId) -> Option<&PathData> {
791 self.paths.get(&path_id).map(|path_state| &path_state.data)
792 }
793
794 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
796 self.paths
797 .get_mut(&path_id)
798 .map(|path_state| &mut path_state.data)
799 }
800
801 pub fn paths(&self) -> Vec<PathId> {
805 self.paths.keys().copied().collect()
806 }
807
808 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
810 self.path(path_id)
811 .map(PathData::local_status)
812 .ok_or(ClosedPath { _private: () })
813 }
814
815 pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
817 self.path(path_id)
818 .map(|path| path.network_path)
819 .ok_or(ClosedPath { _private: () })
820 }
821
822 pub fn set_path_status(
826 &mut self,
827 path_id: PathId,
828 status: PathStatus,
829 ) -> Result<PathStatus, SetPathStatusError> {
830 if !self.is_multipath_negotiated() {
831 return Err(SetPathStatusError::MultipathNotNegotiated);
832 }
833 let path = self
834 .path_mut(path_id)
835 .ok_or(SetPathStatusError::ClosedPath)?;
836 let prev = match path.status.local_update(status) {
837 Some(prev) => {
838 self.spaces[SpaceId::Data]
839 .pending
840 .path_status
841 .insert(path_id);
842 prev
843 }
844 None => path.local_status(),
845 };
846 Ok(prev)
847 }
848
849 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
854 self.path(path_id).and_then(|path| path.remote_status())
855 }
856
857 pub fn set_path_max_idle_timeout(
866 &mut self,
867 now: Instant,
868 path_id: PathId,
869 timeout: Option<Duration>,
870 ) -> Result<Option<Duration>, ClosedPath> {
871 let path = self
872 .paths
873 .get_mut(&path_id)
874 .ok_or(ClosedPath { _private: () })?;
875 let prev = std::mem::replace(&mut path.data.idle_timeout, timeout);
876
877 if !self.state.is_closed() {
879 if let Some(new_timeout) = timeout {
880 let timer = Timer::PerPath(path_id, PathTimer::PathIdle);
881 let deadline = match (prev, self.timers.get(timer)) {
882 (Some(old_timeout), Some(old_deadline)) => {
883 let last_activity = old_deadline.checked_sub(old_timeout).unwrap_or(now);
884 last_activity + new_timeout
885 }
886 _ => now + new_timeout,
887 };
888 self.timers.set(timer, deadline, self.qlog.with_time(now));
889 } else {
890 self.timers.stop(
891 Timer::PerPath(path_id, PathTimer::PathIdle),
892 self.qlog.with_time(now),
893 );
894 }
895 }
896
897 Ok(prev)
898 }
899
900 pub fn set_path_keep_alive_interval(
906 &mut self,
907 path_id: PathId,
908 interval: Option<Duration>,
909 ) -> Result<Option<Duration>, ClosedPath> {
910 let path = self
911 .paths
912 .get_mut(&path_id)
913 .ok_or(ClosedPath { _private: () })?;
914 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
915 }
916
917 fn find_validated_path_on_network_path(
921 &self,
922 network_path: FourTuple,
923 ) -> Option<(&PathId, &PathState)> {
924 self.paths.iter().find(|(path_id, path_state)| {
925 path_state.data.validated
926 && network_path.is_probably_same_path(&path_state.data.network_path)
928 && !self.abandoned_paths.contains(path_id)
929 })
930 }
934
935 fn ensure_path(
939 &mut self,
940 path_id: PathId,
941 network_path: FourTuple,
942 now: Instant,
943 pn: Option<u64>,
944 ) -> &mut PathData {
945 let valid_path = self.find_validated_path_on_network_path(network_path);
946 let validated = valid_path.is_some();
947 let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
948 let vacant_entry = match self.paths.entry(path_id) {
949 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
950 btree_map::Entry::Occupied(occupied_entry) => {
951 return &mut occupied_entry.into_mut().data;
952 }
953 };
954
955 debug!(%validated, %path_id, %network_path, "path added");
956
957 self.timers.stop(
959 Timer::Conn(ConnTimer::NoAvailablePath),
960 self.qlog.with_time(now),
961 );
962 let peer_max_udp_payload_size =
963 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
964 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
965 let mut data = PathData::new(
966 network_path,
967 self.allow_mtud,
968 Some(peer_max_udp_payload_size),
969 self.path_generation_counter,
970 now,
971 &self.config,
972 );
973
974 data.validated = validated;
975 if let Some(initial_rtt) = initial_rtt {
976 data.rtt.reset_initial_rtt(initial_rtt);
977 }
978
979 data.pending_on_path_challenge = true;
982
983 let path = vacant_entry.insert(PathState { data, prev: None });
984
985 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
986 if let Some(pn) = pn {
987 pn_space.dedup.insert(pn);
988 }
989 self.spaces[SpaceId::Data]
990 .number_spaces
991 .insert(path_id, pn_space);
992 self.qlog.emit_tuple_assigned(path_id, network_path, now);
993
994 if !self.remote_cids.contains_key(&path_id) {
998 debug!(%path_id, "Remote opened path without issuing CIDs");
999 self.spaces[SpaceId::Data]
1000 .pending
1001 .path_cids_blocked
1002 .insert(path_id);
1003 }
1006
1007 &mut path.data
1008 }
1009
1010 #[must_use]
1020 pub fn poll_transmit(
1021 &mut self,
1022 now: Instant,
1023 max_datagrams: NonZeroUsize,
1024 buf: &mut Vec<u8>,
1025 ) -> Option<Transmit> {
1026 let max_datagrams = match self.config.enable_segmentation_offload {
1027 false => NonZeroUsize::MIN,
1028 true => max_datagrams,
1029 };
1030
1031 let connection_close_pending = match self.state.as_type() {
1037 StateType::Drained => {
1038 for path in self.paths.values_mut() {
1039 path.data.app_limited = true;
1040 }
1041 return None;
1042 }
1043 StateType::Draining | StateType::Closed => {
1044 if !self.connection_close_pending {
1047 for path in self.paths.values_mut() {
1048 path.data.app_limited = true;
1049 }
1050 return None;
1051 }
1052 true
1053 }
1054 _ => false,
1055 };
1056
1057 if let Some(config) = &self.config.ack_frequency_config {
1059 let rtt = self
1060 .paths
1061 .values()
1062 .map(|p| p.data.rtt.get())
1063 .min()
1064 .expect("one path exists");
1065 self.spaces[SpaceId::Data].pending.ack_frequency = self
1066 .ack_frequency
1067 .should_send_ack_frequency(rtt, config, &self.peer_params)
1068 && self.highest_space == SpaceKind::Data
1069 && self.peer_supports_ack_frequency();
1070 }
1071
1072 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1073 while let Some(path_id) = next_path_id {
1074 if !connection_close_pending
1075 && let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id)
1076 {
1077 return Some(transmit);
1078 }
1079
1080 let info = self.scheduling_info(path_id);
1081 if let Some(transmit) = self.poll_transmit_on_path(
1082 now,
1083 buf,
1084 path_id,
1085 max_datagrams,
1086 &info,
1087 connection_close_pending,
1088 ) {
1089 return Some(transmit);
1090 }
1091
1092 debug_assert!(
1095 buf.is_empty(),
1096 "nothing to send on path but buffer not empty"
1097 );
1098
1099 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1100 }
1101
1102 debug_assert!(
1104 buf.is_empty(),
1105 "there was data in the buffer, but it was not sent"
1106 );
1107
1108 if self.state.is_established() {
1109 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1111 while let Some(path_id) = next_path_id {
1112 if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) {
1113 return Some(transmit);
1114 }
1115 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1116 }
1117 }
1118
1119 None
1120 }
1121
1122 fn scheduling_info(&self, path_id: PathId) -> PathSchedulingInfo {
1140 let have_validated_status_available_space = self.paths.iter().any(|(path_id, path)| {
1142 self.remote_cids.contains_key(path_id)
1143 && !self.abandoned_paths.contains(path_id)
1144 && path.data.validated
1145 && path.data.local_status() == PathStatus::Available
1146 });
1147
1148 let have_validated_space = self.paths.iter().any(|(path_id, path)| {
1150 self.remote_cids.contains_key(path_id)
1151 && !self.abandoned_paths.contains(path_id)
1152 && path.data.validated
1153 });
1154
1155 let is_handshaking = self.is_handshaking();
1156 let has_cids = self.remote_cids.contains_key(&path_id);
1157 let is_abandoned = self.abandoned_paths.contains(&path_id);
1158 let path_data = self.path_data(path_id);
1159 let validated = path_data.validated;
1160 let status = path_data.local_status();
1161
1162 let may_send_data = has_cids
1165 && !is_abandoned
1166 && if is_handshaking {
1167 true
1171 } else if !validated {
1172 false
1179 } else {
1180 match status {
1181 PathStatus::Available => {
1182 true
1184 }
1185 PathStatus::Backup => {
1186 !have_validated_status_available_space
1188 }
1189 }
1190 };
1191
1192 let may_send_close = has_cids
1197 && !is_abandoned
1198 && if !validated && have_validated_status_available_space {
1199 false
1201 } else {
1202 true
1204 };
1205
1206 let may_self_abandon = has_cids && validated && !have_validated_space;
1210
1211 PathSchedulingInfo {
1212 is_abandoned,
1213 may_send_data,
1214 may_send_close,
1215 may_self_abandon,
1216 }
1217 }
1218
1219 fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit {
1220 debug_assert!(
1221 !transmit.is_empty(),
1222 "must not be called with an empty transmit buffer"
1223 );
1224
1225 let network_path = self.path_data(path_id).network_path;
1226 trace!(
1227 segment_size = transmit.segment_size(),
1228 last_datagram_len = transmit.len() % transmit.segment_size(),
1229 %network_path,
1230 "sending {} bytes in {} datagrams",
1231 transmit.len(),
1232 transmit.num_datagrams()
1233 );
1234 self.path_data_mut(path_id)
1235 .inc_total_sent(transmit.len() as u64);
1236
1237 self.path_stats
1238 .for_path(path_id)
1239 .udp_tx
1240 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1241
1242 Transmit {
1243 destination: network_path.remote,
1244 size: transmit.len(),
1245 ecn: if self.path_data(path_id).sending_ecn {
1246 Some(EcnCodepoint::Ect0)
1247 } else {
1248 None
1249 },
1250 segment_size: match transmit.num_datagrams() {
1251 1 => None,
1252 _ => Some(transmit.segment_size()),
1253 },
1254 src_ip: network_path.local_ip,
1255 }
1256 }
1257
1258 fn poll_transmit_off_path(
1260 &mut self,
1261 now: Instant,
1262 buf: &mut Vec<u8>,
1263 path_id: PathId,
1264 ) -> Option<Transmit> {
1265 if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) {
1266 return Some(challenge);
1267 }
1268 if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
1269 return Some(response);
1270 }
1271 if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
1272 return Some(challenge);
1273 }
1274 None
1275 }
1276
1277 #[must_use]
1284 fn poll_transmit_on_path(
1285 &mut self,
1286 now: Instant,
1287 buf: &mut Vec<u8>,
1288 path_id: PathId,
1289 max_datagrams: NonZeroUsize,
1290 scheduling_info: &PathSchedulingInfo,
1291 connection_close_pending: bool,
1292 ) -> Option<Transmit> {
1293 let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else {
1295 if !self.abandoned_paths.contains(&path_id) {
1296 debug!(%path_id, "no remote CIDs for path");
1297 }
1298 return None;
1299 };
1300
1301 let mut pad_datagram = PadDatagram::No;
1307
1308 let mut last_packet_number = None;
1312
1313 let mut congestion_blocked = false;
1316
1317 let pmtu = self.path_data(path_id).current_mtu().into();
1319 let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu);
1320
1321 for space_id in SpaceId::iter() {
1323 if path_id != PathId::ZERO && space_id != SpaceId::Data {
1325 continue;
1326 }
1327 match self.poll_transmit_path_space(
1328 now,
1329 &mut transmit,
1330 path_id,
1331 space_id,
1332 remote_cid,
1333 scheduling_info,
1334 connection_close_pending,
1335 pad_datagram,
1336 ) {
1337 PollPathSpaceStatus::NothingToSend {
1338 congestion_blocked: cb,
1339 } => {
1340 congestion_blocked |= cb;
1341 }
1344 PollPathSpaceStatus::WrotePacket {
1345 last_packet_number: pn,
1346 pad_datagram: pad,
1347 } => {
1348 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1349 last_packet_number = Some(pn);
1350 pad_datagram = pad;
1351 continue;
1356 }
1357 PollPathSpaceStatus::Send {
1358 last_packet_number: pn,
1359 } => {
1360 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1361 last_packet_number = Some(pn);
1362 break;
1363 }
1364 }
1365 }
1366
1367 if last_packet_number.is_some() || congestion_blocked {
1368 self.qlog.emit_recovery_metrics(
1369 path_id,
1370 &mut self
1371 .paths
1372 .get_mut(&path_id)
1373 .expect("path_id was iterated from self.paths above")
1374 .data,
1375 now,
1376 );
1377 }
1378
1379 self.path_data_mut(path_id).app_limited =
1380 last_packet_number.is_none() && !congestion_blocked;
1381
1382 match last_packet_number {
1383 Some(last_packet_number) => {
1384 self.path_data_mut(path_id).congestion.on_sent(
1387 now,
1388 transmit.len() as u64,
1389 last_packet_number,
1390 );
1391 Some(self.build_transmit(path_id, transmit))
1392 }
1393 None => None,
1394 }
1395 }
1396
1397 #[must_use]
1399 fn poll_transmit_path_space(
1400 &mut self,
1401 now: Instant,
1402 transmit: &mut TransmitBuf<'_>,
1403 path_id: PathId,
1404 space_id: SpaceId,
1405 remote_cid: ConnectionId,
1406 scheduling_info: &PathSchedulingInfo,
1407 connection_close_pending: bool,
1409 mut pad_datagram: PadDatagram,
1411 ) -> PollPathSpaceStatus {
1412 let mut last_packet_number = None;
1415
1416 loop {
1432 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1434 transmit.datagram_remaining_mut()
1436 } else {
1437 transmit.segment_size()
1439 };
1440 let can_send =
1441 self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending);
1442 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1443 let space_will_send = {
1444 if scheduling_info.is_abandoned {
1445 scheduling_info.may_self_abandon
1450 && self.spaces[space_id]
1451 .pending
1452 .path_abandon
1453 .contains_key(&path_id)
1454 } else if can_send.close && scheduling_info.may_send_close {
1455 true
1457 } else if needs_loss_probe || can_send.space_specific {
1458 true
1461 } else {
1462 !can_send.is_empty() && scheduling_info.may_send_data
1465 }
1466 };
1467
1468 if !space_will_send {
1469 return match last_packet_number {
1472 Some(pn) => PollPathSpaceStatus::WrotePacket {
1473 last_packet_number: pn,
1474 pad_datagram,
1475 },
1476 None => {
1477 if self.crypto_state.has_keys(space_id.encryption_level())
1479 || (space_id == SpaceId::Data
1480 && self.crypto_state.has_keys(EncryptionLevel::ZeroRtt))
1481 {
1482 trace!(?space_id, %path_id, "nothing to send in space");
1483 }
1484 PollPathSpaceStatus::NothingToSend {
1485 congestion_blocked: false,
1486 }
1487 }
1488 };
1489 }
1490
1491 if transmit.datagram_remaining_mut() == 0 {
1495 let congestion_blocked =
1496 self.path_congestion_check(space_id, path_id, transmit, &can_send, now);
1497 if congestion_blocked != PathBlocked::No {
1498 return match last_packet_number {
1500 Some(pn) => PollPathSpaceStatus::WrotePacket {
1501 last_packet_number: pn,
1502 pad_datagram,
1503 },
1504 None => {
1505 return PollPathSpaceStatus::NothingToSend {
1506 congestion_blocked: true,
1507 };
1508 }
1509 };
1510 }
1511
1512 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1515 return match last_packet_number {
1518 Some(pn) => PollPathSpaceStatus::WrotePacket {
1519 last_packet_number: pn,
1520 pad_datagram,
1521 },
1522 None => {
1523 return PollPathSpaceStatus::NothingToSend {
1524 congestion_blocked: false,
1525 };
1526 }
1527 };
1528 }
1529
1530 if needs_loss_probe {
1531 let request_immediate_ack =
1533 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1534 self.spaces[space_id].queue_tail_loss_probe(
1535 path_id,
1536 request_immediate_ack,
1537 &self.streams,
1538 );
1539
1540 self.spaces[space_id].for_path(path_id).loss_probes -= 1; transmit.start_new_datagram_with_size(std::cmp::min(
1546 usize::from(INITIAL_MTU),
1547 transmit.segment_size(),
1548 ));
1549 } else {
1550 transmit.start_new_datagram();
1551 }
1552 trace!(count = transmit.num_datagrams(), "new datagram started");
1553
1554 pad_datagram = PadDatagram::No;
1556 }
1557
1558 if transmit.datagram_start_offset() < transmit.len() {
1561 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1562 }
1563
1564 if self.crypto_state.has_keys(EncryptionLevel::Initial)
1569 && space_id == SpaceId::Handshake
1570 && self.side.is_client()
1571 {
1572 self.discard_space(now, SpaceKind::Initial);
1575 }
1576 if let Some(ref mut prev) = self.crypto_state.prev_crypto {
1577 prev.update_unacked = false;
1578 }
1579
1580 let Some(mut builder) =
1581 PacketBuilder::new(now, space_id, path_id, remote_cid, transmit, self)
1582 else {
1583 return PollPathSpaceStatus::NothingToSend {
1590 congestion_blocked: false,
1591 };
1592 };
1593 last_packet_number = Some(builder.packet_number);
1594
1595 if space_id == SpaceId::Initial
1596 && (self.side.is_client() || can_send.is_ack_eliciting())
1597 {
1598 pad_datagram |= PadDatagram::ToMinMtu;
1600 }
1601 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1602 pad_datagram |= PadDatagram::ToSegmentSize;
1603 }
1604
1605 if scheduling_info.may_send_close && can_send.close {
1606 trace!("sending CONNECTION_CLOSE");
1607 let is_multipath_negotiated = self.is_multipath_negotiated();
1612 for path_id in self.spaces[space_id]
1613 .number_spaces
1614 .iter()
1615 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1616 .map(|(&path_id, _)| path_id)
1617 .collect::<Vec<_>>()
1618 {
1619 Self::populate_acks(
1620 now,
1621 self.receiving_ecn,
1622 path_id,
1623 space_id,
1624 &mut self.spaces[space_id],
1625 is_multipath_negotiated,
1626 &mut builder,
1627 &mut self.path_stats.for_path(path_id).frame_tx,
1628 self.crypto_state.has_keys(space_id.encryption_level()),
1629 );
1630 }
1631
1632 debug_assert!(
1640 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1641 "ACKs should leave space for ConnectionClose"
1642 );
1643 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
1644 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1645 let max_frame_size = builder.frame_space_remaining();
1646 let close: Close = match self.state.as_type() {
1647 StateType::Closed => {
1648 let reason: Close =
1649 self.state.as_closed().expect("checked").clone().into();
1650 if space_id == SpaceId::Data || reason.is_transport_layer() {
1651 reason
1652 } else {
1653 TransportError::APPLICATION_ERROR("").into()
1654 }
1655 }
1656 StateType::Draining => TransportError::NO_ERROR("").into(),
1657 _ => unreachable!(
1658 "tried to make a close packet when the connection wasn't closed"
1659 ),
1660 };
1661 builder.write_frame(close.encoder(max_frame_size), stats);
1662 }
1663 let last_pn = builder.packet_number;
1664 builder.finish_and_track(now, self, path_id, pad_datagram);
1665 if space_id.kind() == self.highest_space {
1666 self.connection_close_pending = false;
1669 }
1670 return PollPathSpaceStatus::WrotePacket {
1683 last_packet_number: last_pn,
1684 pad_datagram,
1685 };
1686 }
1687
1688 self.populate_packet(now, space_id, path_id, scheduling_info, &mut builder);
1689
1690 debug_assert!(
1697 !(builder.sent_frames().is_ack_only(&self.streams)
1698 && !can_send.acks
1699 && (can_send.other || can_send.space_specific)
1700 && builder.buf.segment_size()
1701 == self.path_data(path_id).current_mtu() as usize
1702 && self.datagrams.outgoing.is_empty()),
1703 "SendableFrames was {can_send:?}, but only ACKs have been written"
1704 );
1705 if builder.sent_frames().requires_padding {
1706 pad_datagram |= PadDatagram::ToMinMtu;
1707 }
1708
1709 for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1710 self.spaces[space_id]
1711 .for_path(*path_id)
1712 .pending_acks
1713 .acks_sent();
1714 self.timers.stop(
1715 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1716 self.qlog.with_time(now),
1717 );
1718 }
1719
1720 if builder.can_coalesce && path_id == PathId::ZERO && {
1728 let max_packet_size = builder
1729 .buf
1730 .datagram_remaining_mut()
1731 .saturating_sub(builder.predict_packet_end());
1732 max_packet_size > MIN_PACKET_SPACE
1733 && self.has_pending_packet(space_id, max_packet_size, connection_close_pending)
1734 } {
1735 trace!("will coalesce with next packet");
1738 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1739 } else {
1740 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1746 const MAX_PADDING: usize = 32;
1754 if builder.buf.datagram_remaining_mut()
1755 > builder.predict_packet_end() + MAX_PADDING
1756 {
1757 trace!(
1758 "GSO truncated by demand for {} padding bytes",
1759 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1760 );
1761 let last_pn = builder.packet_number;
1762 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1763 return PollPathSpaceStatus::Send {
1764 last_packet_number: last_pn,
1765 };
1766 }
1767
1768 builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1771 } else {
1772 builder.finish_and_track(now, self, path_id, pad_datagram);
1773 }
1774
1775 if transmit.num_datagrams() == 1 {
1778 transmit.clip_segment_size();
1779 }
1780 }
1781 }
1782 }
1783
1784 fn poll_transmit_mtu_probe(
1785 &mut self,
1786 now: Instant,
1787 buf: &mut Vec<u8>,
1788 path_id: PathId,
1789 ) -> Option<Transmit> {
1790 let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?;
1791
1792 let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize);
1794 transmit.start_new_datagram_with_size(probe_size as usize);
1795
1796 let mut builder =
1797 PacketBuilder::new(now, SpaceId::Data, path_id, active_cid, &mut transmit, self)?;
1798
1799 trace!(?probe_size, "writing MTUD probe");
1801 builder.write_frame(frame::Ping, &mut self.path_stats.for_path(path_id).frame_tx);
1802
1803 if self.peer_supports_ack_frequency() {
1805 builder.write_frame(
1806 frame::ImmediateAck,
1807 &mut self.path_stats.for_path(path_id).frame_tx,
1808 );
1809 }
1810
1811 builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1812
1813 self.path_stats.for_path(path_id).sent_plpmtud_probes += 1;
1814
1815 Some(self.build_transmit(path_id, transmit))
1816 }
1817
1818 fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> {
1826 let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?;
1827 let is_eligible = self.path_data(path_id).validated
1828 && !self.path_data(path_id).is_validating_path()
1829 && !self.abandoned_paths.contains(&path_id);
1830
1831 if !is_eligible {
1832 return None;
1833 }
1834 let next_pn = self.spaces[SpaceId::Data]
1835 .for_path(path_id)
1836 .peek_tx_number();
1837 let probe_size = self
1838 .path_data_mut(path_id)
1839 .mtud
1840 .poll_transmit(now, next_pn)?;
1841
1842 Some((active_cid, probe_size))
1843 }
1844
1845 fn has_pending_packet(
1862 &mut self,
1863 current_space_id: SpaceId,
1864 max_packet_size: usize,
1865 connection_close_pending: bool,
1866 ) -> bool {
1867 let mut space_id = current_space_id;
1868 loop {
1869 let can_send = self.space_can_send(
1870 space_id,
1871 PathId::ZERO,
1872 max_packet_size,
1873 connection_close_pending,
1874 );
1875 if !can_send.is_empty() {
1876 return true;
1877 }
1878 match space_id.next() {
1879 Some(next_space_id) => space_id = next_space_id,
1880 None => break,
1881 }
1882 }
1883 false
1884 }
1885
1886 fn path_congestion_check(
1888 &mut self,
1889 space_id: SpaceId,
1890 path_id: PathId,
1891 transmit: &TransmitBuf<'_>,
1892 can_send: &SendableFrames,
1893 now: Instant,
1894 ) -> PathBlocked {
1895 if self.side().is_server()
1901 && self
1902 .path_data(path_id)
1903 .anti_amplification_blocked(transmit.len() as u64 + 1)
1904 {
1905 trace!(?space_id, %path_id, "blocked by anti-amplification");
1906 return PathBlocked::AntiAmplification;
1907 }
1908
1909 let bytes_to_send = transmit.segment_size() as u64;
1912 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1913
1914 if can_send.other && !need_loss_probe && !can_send.close {
1915 let path = self.path_data(path_id);
1916 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1917 trace!(
1918 ?space_id,
1919 %path_id,
1920 in_flight=%path.in_flight.bytes,
1921 congestion_window=%path.congestion.window(),
1922 "blocked by congestion control",
1923 );
1924 return PathBlocked::Congestion;
1925 }
1926 }
1927
1928 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1930 let resume_time = now + delay;
1931 self.timers.set(
1932 Timer::PerPath(path_id, PathTimer::Pacing),
1933 resume_time,
1934 self.qlog.with_time(now),
1935 );
1936 trace!(?space_id, %path_id, ?delay, "blocked by pacing");
1939 return PathBlocked::Pacing;
1940 }
1941
1942 PathBlocked::No
1943 }
1944
1945 fn send_prev_path_challenge(
1950 &mut self,
1951 now: Instant,
1952 buf: &mut Vec<u8>,
1953 path_id: PathId,
1954 ) -> Option<Transmit> {
1955 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1956 if !prev_path.pending_on_path_challenge {
1957 return None;
1958 };
1959 prev_path.pending_on_path_challenge = false;
1960 let token = self.rng.random();
1961 let network_path = prev_path.network_path;
1962 prev_path.record_path_challenge_sent(now, token, network_path);
1963
1964 debug_assert_eq!(
1965 self.highest_space,
1966 SpaceKind::Data,
1967 "PATH_CHALLENGE queued without 1-RTT keys"
1968 );
1969 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1970 buf.start_new_datagram();
1971
1972 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, self)?;
1978 let challenge = frame::PathChallenge(token);
1979 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
1980 builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1981
1982 builder.pad_to(MIN_INITIAL_SIZE);
1987
1988 builder.finish(self, now);
1989 self.path_stats
1990 .for_path(path_id)
1991 .udp_tx
1992 .on_sent(1, buf.len());
1993
1994 trace!(
1995 dst = ?network_path.remote,
1996 src = ?network_path.local_ip,
1997 len = buf.len(),
1998 "sending prev_path off-path challenge",
1999 );
2000 Some(Transmit {
2001 destination: network_path.remote,
2002 size: buf.len(),
2003 ecn: None,
2004 segment_size: None,
2005 src_ip: network_path.local_ip,
2006 })
2007 }
2008
2009 fn send_off_path_path_response(
2010 &mut self,
2011 now: Instant,
2012 buf: &mut Vec<u8>,
2013 path_id: PathId,
2014 ) -> Option<Transmit> {
2015 let path = self.paths.get_mut(&path_id).map(|state| &mut state.data)?;
2016 let cid_queue = self.remote_cids.get_mut(&path_id)?;
2017 let (token, network_path) = path.path_responses.pop_off_path(path.network_path)?;
2018
2019 let cid = cid_queue.active();
2021
2022 let frame = frame::PathResponse(token);
2023
2024 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2025 buf.start_new_datagram();
2026
2027 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, self)?;
2028 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2029 builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
2030
2031 if self
2036 .find_validated_path_on_network_path(network_path)
2037 .is_none()
2038 && self.n0_nat_traversal.client_side().is_ok()
2039 {
2040 let token = self.rng.random();
2041 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2042 builder.write_frame(frame::PathChallenge(token), stats);
2043 let ip_port = (network_path.remote.ip(), network_path.remote.port());
2044 self.n0_nat_traversal.mark_probe_sent(ip_port, token);
2045 }
2046
2047 builder.pad_to(MIN_INITIAL_SIZE);
2050 builder.finish(self, now);
2051
2052 let size = buf.len();
2053 self.path_stats.for_path(path_id).udp_tx.on_sent(1, size);
2054
2055 trace!(
2056 dst = ?network_path.remote,
2057 src = ?network_path.local_ip,
2058 len = buf.len(),
2059 "sending off-path PATH_RESPONSE",
2060 );
2061 Some(Transmit {
2062 destination: network_path.remote,
2063 size,
2064 ecn: None,
2065 segment_size: None,
2066 src_ip: network_path.local_ip,
2067 })
2068 }
2069
2070 fn send_nat_traversal_path_challenge(
2072 &mut self,
2073 now: Instant,
2074 buf: &mut Vec<u8>,
2075 path_id: PathId,
2076 ) -> Option<Transmit> {
2077 let remote = self.n0_nat_traversal.next_probe_addr()?;
2078
2079 if !self.paths.get(&path_id)?.data.validated {
2080 return None;
2082 }
2083
2084 let Some(cid) = self
2089 .remote_cids
2090 .get(&path_id)
2091 .map(|cid_queue| cid_queue.active())
2092 else {
2093 trace!(%path_id, "Not sending NAT traversal probe for path with no CIDs");
2094 return None;
2095 };
2096 let token = self.rng.random();
2097
2098 let frame = frame::PathChallenge(token);
2099
2100 let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2101 buf.start_new_datagram();
2102
2103 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, self)?;
2104 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2105 builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
2106 builder.finish(self, now);
2109
2110 self.n0_nat_traversal.mark_probe_sent(remote, token);
2112
2113 let size = buf.len();
2114 self.path_stats.for_path(path_id).udp_tx.on_sent(1, size);
2115
2116 trace!(dst = ?remote, len = buf.len(), "sending off-path NAT probe");
2117 Some(Transmit {
2118 destination: remote.into(),
2119 size,
2120 ecn: None,
2121 segment_size: None,
2122 src_ip: None,
2123 })
2124 }
2125
2126 fn space_can_send(
2134 &mut self,
2135 space_id: SpaceId,
2136 path_id: PathId,
2137 packet_size: usize,
2138 connection_close_pending: bool,
2139 ) -> SendableFrames {
2140 let space = &mut self.spaces[space_id];
2141 let space_has_crypto = self.crypto_state.has_keys(space_id.encryption_level());
2142
2143 if !space_has_crypto
2144 && (space_id != SpaceId::Data
2145 || !self.crypto_state.has_keys(EncryptionLevel::ZeroRtt)
2146 || self.side.is_server())
2147 {
2148 return SendableFrames::empty();
2150 }
2151
2152 let mut can_send = space.can_send(path_id, &self.streams);
2153
2154 if space_id == SpaceId::Data {
2156 let pn = space.for_path(path_id).peek_tx_number();
2157 let frame_space_1rtt =
2163 packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
2164 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
2165 }
2166
2167 can_send.close = connection_close_pending && space_has_crypto;
2168
2169 can_send
2170 }
2171
2172 pub fn handle_event(&mut self, event: ConnectionEvent) {
2178 use ConnectionEventInner::*;
2179 match event.0 {
2180 Datagram(DatagramConnectionEvent {
2181 now,
2182 network_path,
2183 path_id,
2184 ecn,
2185 first_decode,
2186 remaining,
2187 }) => {
2188 let span = trace_span!("pkt", %path_id);
2189 let _guard = span.enter();
2190
2191 if self.early_discard_packet(network_path, path_id) {
2192 return;
2194 }
2195
2196 let was_anti_amplification_blocked = self
2197 .path(path_id)
2198 .map(|path| path.anti_amplification_blocked(1))
2199 .unwrap_or(false);
2202
2203 let rx = &mut self.path_stats.for_path(path_id).udp_rx;
2204 rx.datagrams += 1;
2205 rx.bytes += first_decode.len() as u64;
2206 let data_len = first_decode.len();
2207
2208 self.handle_decode(now, network_path, path_id, ecn, first_decode);
2209 if let Some(path) = self.path_mut(path_id) {
2214 path.inc_total_recvd(data_len as u64);
2215 }
2216
2217 if let Some(data) = remaining {
2218 self.path_stats.for_path(path_id).udp_rx.bytes += data.len() as u64;
2219 self.handle_coalesced(now, network_path, path_id, ecn, data);
2220 }
2221
2222 if let Some(path) = self.paths.get_mut(&path_id) {
2223 self.qlog
2224 .emit_recovery_metrics(path_id, &mut path.data, now);
2225 }
2226
2227 if was_anti_amplification_blocked {
2228 self.set_loss_detection_timer(now, path_id);
2232 }
2233 }
2234 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
2235 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
2236 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
2237
2238 if self.abandoned_paths.contains(&path_id) {
2241 if !self.state.is_drained() {
2242 for issued in &ids {
2243 self.endpoint_events
2244 .push_back(EndpointEventInner::RetireConnectionId(
2245 now,
2246 path_id,
2247 issued.sequence,
2248 false,
2249 ));
2250 }
2251 }
2252 return;
2253 }
2254
2255 let cid_state = self
2256 .local_cid_state
2257 .entry(path_id)
2258 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
2259 cid_state.new_cids(&ids, now);
2260
2261 ids.into_iter().rev().for_each(|frame| {
2262 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
2263 });
2264 self.reset_cid_retirement(now);
2266 }
2267 }
2268 }
2269
2270 fn early_discard_packet(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
2278 if self.is_handshaking() && path_id != PathId::ZERO {
2279 debug!(%network_path, %path_id, "discarding multipath packet during handshake");
2280 return true;
2281 }
2282
2283 let remote_may_migrate = self.remote_may_migrate();
2288
2289 let local_ip_may_migrate = self.local_ip_may_migrate();
2290
2291 if let Some(known_path) = self.path_mut(path_id) {
2295 if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
2296 trace!(
2297 %path_id,
2298 %network_path,
2299 %known_path.network_path,
2300 "discarding packet from unrecognized peer"
2301 );
2302 return true;
2303 }
2304
2305 if known_path.network_path.local_ip.is_some()
2306 && network_path.local_ip.is_some()
2307 && known_path.network_path.local_ip != network_path.local_ip
2308 && !local_ip_may_migrate
2309 {
2310 trace!(
2311 %path_id,
2312 %network_path,
2313 %known_path.network_path,
2314 "discarding packet sent to incorrect interface"
2315 );
2316 return true;
2317 }
2318 }
2319 false
2320 }
2321
2322 fn remote_may_migrate(&self) -> bool {
2336 match &self.side {
2337 ConnectionSide::Server { server_config } => {
2338 server_config.migration && self.is_handshake_confirmed()
2339 }
2340 ConnectionSide::Client { .. } => {
2341 if let Some(hs) = self.state.as_handshake() {
2342 hs.allow_server_migration
2343 } else {
2344 self.n0_nat_traversal.is_negotiated() && self.is_handshake_confirmed()
2345 }
2346 }
2347 }
2348 }
2349
2350 fn local_ip_may_migrate(&self) -> bool {
2363 (self.side.is_client() || self.n0_nat_traversal.is_negotiated())
2364 && self.is_handshake_confirmed()
2365 }
2366 pub fn handle_timeout(&mut self, now: Instant) {
2376 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
2377 let span = match timer {
2378 Timer::Conn(timer) => trace_span!("timeout", scope = "conn", ?timer),
2379 Timer::PerPath(path_id, timer) => {
2380 trace_span!("timer_fired", scope="path", %path_id, ?timer)
2381 }
2382 };
2383 let _guard = span.enter();
2384 trace!("timeout");
2385 match timer {
2386 Timer::Conn(timer) => match timer {
2387 ConnTimer::Close => {
2388 self.state.move_to_drained(None);
2389 self.endpoint_events.push_back(EndpointEventInner::Drained);
2392 }
2393 ConnTimer::Idle => {
2394 self.kill(ConnectionError::TimedOut);
2395 }
2396 ConnTimer::KeepAlive => {
2397 self.ping();
2398 }
2399 ConnTimer::KeyDiscard => {
2400 self.crypto_state.discard_temporary_keys();
2401 }
2402 ConnTimer::PushNewCid => {
2403 while let Some((path_id, when)) = self.next_cid_retirement() {
2404 if when > now {
2405 break;
2406 }
2407 match self.local_cid_state.get_mut(&path_id) {
2408 None => error!(%path_id, "No local CID state for path"),
2409 Some(cid_state) => {
2410 let num_new_cid = cid_state.on_cid_timeout().into();
2412 if !self.state.is_closed() {
2413 trace!(
2414 "push a new CID to peer RETIRE_PRIOR_TO field {}",
2415 cid_state.retire_prior_to()
2416 );
2417 self.endpoint_events.push_back(
2418 EndpointEventInner::NeedIdentifiers(
2419 path_id,
2420 now,
2421 num_new_cid,
2422 ),
2423 );
2424 }
2425 }
2426 }
2427 }
2428 }
2429 ConnTimer::NoAvailablePath => {
2430 if self.state.is_closed() || self.state.is_drained() {
2435 error!("no viable path timer fired, but connection already closing");
2438 } else {
2439 trace!("no viable path grace period expired, closing connection");
2440 let err = TransportError::NO_VIABLE_PATH(
2441 "last path abandoned, no new path opened",
2442 );
2443 self.close_common();
2444 self.set_close_timer(now);
2445 self.connection_close_pending = true;
2446 self.state.move_to_closed(err);
2447 }
2448 }
2449 ConnTimer::NatTraversalProbeRetry => {
2450 self.n0_nat_traversal.queue_retries(self.is_ipv6());
2451 if let Some(delay) =
2452 self.n0_nat_traversal.retry_delay(self.config.initial_rtt)
2453 {
2454 self.timers.set(
2455 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
2456 now + delay,
2457 self.qlog.with_time(now),
2458 );
2459 trace!("re-queued NAT probes");
2460 } else {
2461 trace!("no more NAT probes remaining");
2462 }
2463 }
2464 },
2465 Timer::PerPath(path_id, timer) => {
2466 match timer {
2467 PathTimer::PathIdle => {
2468 if let Err(err) =
2469 self.close_path_inner(now, path_id, PathAbandonReason::TimedOut)
2470 {
2471 warn!(?err, "failed closing path");
2472 }
2473 }
2474
2475 PathTimer::PathKeepAlive => {
2476 self.ping_path(path_id).ok();
2477 }
2478 PathTimer::LossDetection => {
2479 self.on_loss_detection_timeout(now, path_id);
2480 self.qlog.emit_recovery_metrics(
2481 path_id,
2482 &mut self
2483 .paths
2484 .get_mut(&path_id)
2485 .expect("loss-detection timer fires only on live paths")
2486 .data,
2487 now,
2488 );
2489 }
2490 PathTimer::PathValidationFailed => {
2491 let Some(path) = self.paths.get_mut(&path_id) else {
2492 continue;
2493 };
2494 self.timers.stop(
2495 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2496 self.qlog.with_time(now),
2497 );
2498 debug!("path migration validation failed");
2499 if let Some((_, prev)) = path.prev.take() {
2500 path.data = prev;
2501 }
2502 path.data.reset_on_path_challenges();
2503 }
2504 PathTimer::PathChallengeLost => {
2505 let Some(path) = self.paths.get_mut(&path_id) else {
2506 continue;
2507 };
2508 trace!("path challenge deemed lost");
2509 path.data.pending_on_path_challenge = true;
2510 }
2511 PathTimer::AbandonFromValidation => {
2512 let Some(path) = self.paths.get_mut(&path_id) else {
2513 continue;
2514 };
2515 path.data.reset_on_path_challenges();
2516 self.timers.stop(
2517 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2518 self.qlog.with_time(now),
2519 );
2520 debug!("new path validation failed");
2521 if let Err(err) = self.close_path_inner(
2522 now,
2523 path_id,
2524 PathAbandonReason::ValidationFailed,
2525 ) {
2526 warn!(?err, "failed closing path");
2527 }
2528 }
2529 PathTimer::Pacing => {}
2530 PathTimer::MaxAckDelay => {
2531 self.spaces[SpaceId::Data]
2533 .for_path(path_id)
2534 .pending_acks
2535 .on_max_ack_delay_timeout()
2536 }
2537 PathTimer::PathDrained => {
2538 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2541 if let Some(local_cid_state) = self.local_cid_state.remove(&path_id) {
2542 debug_assert!(!self.state.is_drained()); let (min_seq, max_seq) = local_cid_state.active_seq();
2544 for seq in min_seq..=max_seq {
2545 self.endpoint_events.push_back(
2546 EndpointEventInner::RetireConnectionId(
2547 now, path_id, seq, false,
2548 ),
2549 );
2550 }
2551 }
2552 self.discard_path(path_id, now);
2553 }
2554 }
2555 }
2556 }
2557 }
2558 }
2559
2560 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2572 self.close_inner(
2573 now,
2574 Close::Application(frame::ApplicationClose { error_code, reason }),
2575 )
2576 }
2577
2578 fn close_inner(&mut self, now: Instant, reason: Close) {
2594 let was_closed = self.state.is_closed();
2595 if !was_closed {
2596 self.close_common();
2597 self.set_close_timer(now);
2598 self.connection_close_pending = true;
2599 self.state.move_to_closed_local(reason);
2600 }
2601 }
2602
2603 pub fn datagrams(&mut self) -> Datagrams<'_> {
2605 Datagrams { conn: self }
2606 }
2607
2608 pub fn stats(&mut self) -> ConnectionStats {
2610 let mut stats = self.partial_stats.clone();
2611
2612 for path_stats in self.path_stats.iter_stats() {
2613 stats += *path_stats;
2618 }
2619
2620 stats
2621 }
2622
2623 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2625 let path = self.paths.get(&path_id)?;
2626 let stats = self.path_stats.for_path(path_id);
2627 stats.rtt = path.data.rtt.get();
2628 stats.cwnd = path.data.congestion.window();
2629 stats.current_mtu = path.data.mtud.current_mtu();
2630 Some(*stats)
2631 }
2632
2633 pub fn ping(&mut self) {
2637 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2640 path_data.ping_pending = true;
2641 }
2642 }
2643
2644 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2648 let path_data = self.spaces[self.highest_space]
2649 .number_spaces
2650 .get_mut(&path)
2651 .ok_or(ClosedPath { _private: () })?;
2652 path_data.ping_pending = true;
2653 Ok(())
2654 }
2655
2656 pub fn force_key_update(&mut self) {
2660 if !self.state.is_established() {
2661 debug!("ignoring forced key update in illegal state");
2662 return;
2663 }
2664 if self.crypto_state.prev_crypto.is_some() {
2665 debug!("ignoring redundant forced key update");
2668 return;
2669 }
2670 self.crypto_state.update_keys(None, false);
2671 }
2672
2673 pub fn crypto_session(&self) -> &dyn crypto::Session {
2675 self.crypto_state.session.as_ref()
2676 }
2677
2678 pub fn is_handshaking(&self) -> bool {
2688 self.state.is_handshake()
2689 }
2690
2691 pub fn is_closed(&self) -> bool {
2702 self.state.is_closed()
2703 }
2704
2705 pub fn is_drained(&self) -> bool {
2710 self.state.is_drained()
2711 }
2712
2713 pub fn accepted_0rtt(&self) -> bool {
2717 self.crypto_state.accepted_0rtt
2718 }
2719
2720 pub fn has_0rtt(&self) -> bool {
2722 self.crypto_state.zero_rtt_enabled
2723 }
2724
2725 pub fn has_pending_retransmits(&self) -> bool {
2727 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2728 }
2729
2730 pub fn side(&self) -> Side {
2732 self.side.side()
2733 }
2734
2735 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2737 self.path(path_id)
2738 .map(|path_data| {
2739 path_data
2740 .last_observed_addr_report
2741 .as_ref()
2742 .map(|observed| observed.socket_addr())
2743 })
2744 .ok_or(ClosedPath { _private: () })
2745 }
2746
2747 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2749 self.path(path_id).map(|d| d.rtt.get())
2750 }
2751
2752 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2754 self.path(path_id).map(|d| d.congestion.as_ref())
2755 }
2756
2757 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2762 self.streams.set_max_concurrent(dir, count);
2763 let pending = &mut self.spaces[SpaceId::Data].pending;
2766 self.streams.queue_max_stream_id(pending);
2767 }
2768
2769 pub fn set_max_concurrent_paths(
2779 &mut self,
2780 now: Instant,
2781 count: NonZeroU32,
2782 ) -> Result<(), MultipathNotNegotiated> {
2783 if !self.is_multipath_negotiated() {
2784 return Err(MultipathNotNegotiated { _private: () });
2785 }
2786 self.max_concurrent_paths = count;
2787
2788 let in_use_count = self
2789 .local_max_path_id
2790 .next()
2791 .saturating_sub(self.abandoned_paths.len() as u32)
2792 .as_u32();
2793 let extra_needed = count.get().saturating_sub(in_use_count);
2794 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2795
2796 self.set_max_path_id(now, new_max_path_id);
2797
2798 Ok(())
2799 }
2800
2801 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2803 if max_path_id <= self.local_max_path_id {
2804 return;
2805 }
2806
2807 self.local_max_path_id = max_path_id;
2808 self.spaces[SpaceId::Data].pending.max_path_id = true;
2809
2810 self.issue_first_path_cids(now);
2811 }
2812
2813 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2819 self.streams.max_concurrent(dir)
2820 }
2821
2822 pub fn set_send_window(&mut self, send_window: u64) {
2824 self.streams.set_send_window(send_window);
2825 }
2826
2827 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2829 if self.streams.set_receive_window(receive_window) {
2830 self.spaces[SpaceId::Data].pending.max_data = true;
2831 }
2832 }
2833
2834 pub fn is_multipath_negotiated(&self) -> bool {
2839 !self.is_handshaking()
2840 && self.config.max_concurrent_multipath_paths.is_some()
2841 && self.peer_params.initial_max_path_id.is_some()
2842 }
2843
2844 fn on_ack_received(
2845 &mut self,
2846 now: Instant,
2847 space: SpaceId,
2848 ack: frame::Ack,
2849 ) -> Result<(), TransportError> {
2850 let path = PathId::ZERO;
2852 self.inner_on_ack_received(now, space, path, ack)
2853 }
2854
2855 fn on_path_ack_received(
2856 &mut self,
2857 now: Instant,
2858 space: SpaceId,
2859 path_ack: frame::PathAck,
2860 ) -> Result<(), TransportError> {
2861 let (ack, path) = path_ack.into_ack();
2862 self.inner_on_ack_received(now, space, path, ack)
2863 }
2864
2865 fn inner_on_ack_received(
2867 &mut self,
2868 now: Instant,
2869 space: SpaceId,
2870 path: PathId,
2871 ack: frame::Ack,
2872 ) -> Result<(), TransportError> {
2873 if !self.spaces[space].number_spaces.contains_key(&path) {
2874 if self.abandoned_paths.contains(&path) {
2875 trace!("silently ignoring PATH_ACK on discarded path");
2881 return Ok(());
2882 } else {
2883 return Err(TransportError::PROTOCOL_VIOLATION(
2884 "received PATH_ACK with path ID never used",
2885 ));
2886 }
2887 }
2888 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2889 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2890 }
2891 let new_largest_pn = {
2893 let space = &mut self.spaces[space].for_path(path);
2894 if space
2895 .largest_acked_packet_pn
2896 .is_none_or(|pn| ack.largest > pn)
2897 {
2898 space.largest_acked_packet_pn = Some(ack.largest);
2899 if let Some(info) = space.sent_packets.get(ack.largest) {
2900 space.largest_acked_packet_send_time = info.time_sent;
2904 }
2905 Some(ack.largest)
2906 } else {
2907 None
2908 }
2909 };
2910
2911 if self.detect_spurious_loss(&ack, space, path) {
2912 self.path_stats.for_path(path).spurious_congestion_events += 1;
2913 self.path_data_mut(path)
2914 .congestion
2915 .on_spurious_congestion_event();
2916 }
2917
2918 let mut newly_acked = ArrayRangeSet::new();
2920 for range in ack.iter() {
2921 self.spaces[space].for_path(path).check_ack(range.clone())?;
2922 for (pn, _) in self.spaces[space]
2923 .for_path(path)
2924 .sent_packets
2925 .iter_range(range)
2926 {
2927 newly_acked.insert_one(pn);
2928 }
2929 }
2930
2931 if newly_acked.is_empty() {
2932 return Ok(());
2933 }
2934
2935 let mut ack_eliciting_acked = false;
2936 for packet in newly_acked.elts() {
2937 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2938 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2939 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2945 pns.pending_acks.subtract_below(*acked_pn);
2946 }
2947 }
2948 ack_eliciting_acked |= info.ack_eliciting;
2949
2950 let path_data = self.path_data_mut(path);
2952 let mtu_updated = path_data.mtud.on_acked(space.kind(), packet, info.size);
2953 if mtu_updated {
2954 path_data
2955 .congestion
2956 .on_mtu_update(path_data.mtud.current_mtu());
2957 }
2958
2959 self.ack_frequency.on_acked(path, packet);
2961
2962 self.on_packet_acked(now, path, packet, info);
2963 }
2964 }
2965
2966 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet_pn;
2967 let path_data = self.path_data_mut(path);
2968 let app_limited = path_data.app_limited;
2969 let in_flight = path_data.in_flight.bytes;
2970
2971 path_data
2972 .congestion
2973 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2974
2975 if new_largest_pn.is_some() && ack_eliciting_acked {
2976 let ack_delay = if space != SpaceId::Data {
2977 Duration::from_micros(0)
2978 } else {
2979 cmp::min(
2980 self.ack_frequency.peer_max_ack_delay,
2981 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2982 )
2983 };
2984 let rtt = now.saturating_duration_since(
2985 self.spaces[space]
2986 .for_path(path)
2987 .largest_acked_packet_send_time,
2988 );
2989
2990 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2991 let path_data = self.path_data_mut(path);
2992 path_data.rtt.update(ack_delay, rtt);
2994 if path_data.first_packet_after_rtt_sample.is_none() {
2995 path_data.first_packet_after_rtt_sample = Some((space.kind(), next_pn));
2996 }
2997 }
2998
2999 self.detect_lost_packets(now, space, path, true);
3001
3002 if self.peer_completed_handshake_address_validation() {
3007 self.path_data_mut(path).pto_count = 0;
3008 }
3009
3010 if self.path_data(path).sending_ecn {
3015 if let Some(ecn) = ack.ecn {
3016 if let Some(largest_sent_pn) = new_largest_pn {
3021 let sent = self.spaces[space]
3022 .for_path(path)
3023 .largest_acked_packet_send_time;
3024 self.process_ecn(
3025 now,
3026 space,
3027 path,
3028 newly_acked.len() as u64,
3029 ecn,
3030 sent,
3031 largest_sent_pn,
3032 );
3033 }
3034 } else {
3035 debug!("ECN not acknowledged by peer");
3037 self.path_data_mut(path).sending_ecn = false;
3038 }
3039 }
3040
3041 self.set_loss_detection_timer(now, path);
3042 Ok(())
3043 }
3044
3045 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
3046 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
3047
3048 if lost_packets.is_empty() {
3049 return false;
3050 }
3051
3052 for range in ack.iter() {
3053 let spurious_losses: Vec<u64> = lost_packets
3054 .iter_range(range.clone())
3055 .map(|(pn, _info)| pn)
3056 .collect();
3057
3058 for pn in spurious_losses {
3059 lost_packets.remove(pn);
3060 }
3061 }
3062
3063 lost_packets.is_empty()
3068 }
3069
3070 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
3075 let two_pto = 2 * self.path_data(path).rtt.pto_base();
3076
3077 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
3078 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
3079 }
3080
3081 fn process_ecn(
3083 &mut self,
3084 now: Instant,
3085 space: SpaceId,
3086 path: PathId,
3087 newly_acked_pn: u64,
3088 ecn: frame::EcnCounts,
3089 largest_sent_time: Instant,
3090 largest_sent_pn: u64,
3091 ) {
3092 match self.spaces[space]
3093 .for_path(path)
3094 .detect_ecn(newly_acked_pn, ecn)
3095 {
3096 Err(e) => {
3097 debug!("halting ECN due to verification failure: {}", e);
3098
3099 self.path_data_mut(path).sending_ecn = false;
3100 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
3103 }
3104 Ok(false) => {}
3105 Ok(true) => {
3106 self.path_stats.for_path(path).congestion_events += 1;
3107 self.path_data_mut(path).congestion.on_congestion_event(
3108 now,
3109 largest_sent_time,
3110 false,
3111 true,
3112 0,
3113 largest_sent_pn,
3114 );
3115 }
3116 }
3117 }
3118
3119 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, pn: u64, info: SentPacket) {
3122 let path = self.path_data_mut(path_id);
3123 let app_limited = path.app_limited;
3124 path.remove_in_flight(&info);
3125 if info.ack_eliciting && info.path_generation == path.generation() {
3126 let rtt = path.rtt;
3130 path.congestion
3131 .on_ack(now, info.time_sent, info.size.into(), pn, app_limited, &rtt);
3132 }
3133
3134 if let Some(retransmits) = info.retransmits.get() {
3136 for (id, _) in retransmits.reset_stream.iter() {
3137 self.streams.reset_acked(*id);
3138 }
3139 }
3140
3141 for frame in info.stream_frames {
3142 self.streams.received_ack_of(frame);
3143 }
3144 }
3145
3146 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceKind) {
3147 let start = if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) {
3148 now
3149 } else {
3150 self.crypto_state
3151 .prev_crypto
3152 .as_ref()
3153 .expect("no previous keys")
3154 .end_packet
3155 .as_ref()
3156 .expect("update not acknowledged yet")
3157 .1
3158 };
3159
3160 self.timers.set(
3162 Timer::Conn(ConnTimer::KeyDiscard),
3163 start + self.max_pto_for_space(space) * 3,
3164 self.qlog.with_time(now),
3165 );
3166 }
3167
3168 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
3181 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
3182 self.detect_lost_packets(now, pn_space, path_id, false);
3184 self.set_loss_detection_timer(now, path_id);
3185 return;
3186 }
3187
3188 let Some((_, space)) = self.pto_time_and_space(now, path_id) else {
3189 error!(%path_id, "PTO expired while unset");
3190 return;
3191 };
3192 trace!(
3193 in_flight = self.path_data(path_id).in_flight.bytes,
3194 count = self.path_data(path_id).pto_count,
3195 ?space,
3196 %path_id,
3197 "PTO fired"
3198 );
3199
3200 let count = match self.path_data(path_id).in_flight.ack_eliciting {
3201 0 => {
3204 debug_assert!(!self.peer_completed_handshake_address_validation());
3205 1
3206 }
3207 _ => 2,
3209 };
3210 let pns = self.spaces[space].for_path(path_id);
3211 pns.loss_probes = pns.loss_probes.saturating_add(count);
3212 let path_data = self.path_data_mut(path_id);
3213 path_data.pto_count = path_data.pto_count.saturating_add(1);
3214 self.set_loss_detection_timer(now, path_id);
3215 }
3216
3217 fn detect_lost_packets(
3234 &mut self,
3235 now: Instant,
3236 pn_space: SpaceId,
3237 path_id: PathId,
3238 due_to_ack: bool,
3239 ) {
3240 let mut lost_packets = Vec::<u64>::new();
3241 let mut lost_mtu_probe = None;
3242 let mut in_persistent_congestion = false;
3243 let mut size_of_lost_packets = 0u64;
3244 self.spaces[pn_space].for_path(path_id).loss_time = None;
3245
3246 let path = self.path_data(path_id);
3249 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3250 let loss_delay = path
3251 .rtt
3252 .conservative()
3253 .mul_f32(self.config.time_threshold)
3254 .max(TIMER_GRANULARITY);
3255 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
3256
3257 let largest_acked_packet_pn = self.spaces[pn_space]
3258 .for_path(path_id)
3259 .largest_acked_packet_pn
3260 .expect("detect_lost_packets only to be called if path received at least one ACK");
3261 let packet_threshold = self.config.packet_threshold as u64;
3262
3263 let congestion_period = self
3267 .pto(SpaceKind::Data, path_id)
3268 .saturating_mul(self.config.persistent_congestion_threshold);
3269 let mut persistent_congestion_start: Option<Instant> = None;
3270 let mut prev_packet = None;
3271 let space = self.spaces[pn_space].for_path(path_id);
3272
3273 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet_pn) {
3274 if prev_packet != Some(packet.wrapping_sub(1)) {
3275 persistent_congestion_start = None;
3277 }
3278
3279 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
3283 if packet_too_old || largest_acked_packet_pn >= packet + packet_threshold {
3284 if Some(packet) == in_flight_mtu_probe {
3286 lost_mtu_probe = in_flight_mtu_probe;
3289 } else {
3290 lost_packets.push(packet);
3291 size_of_lost_packets += info.size as u64;
3292 if info.ack_eliciting && due_to_ack {
3293 match persistent_congestion_start {
3294 Some(start) if info.time_sent - start > congestion_period => {
3297 in_persistent_congestion = true;
3298 }
3299 None if first_packet_after_rtt_sample
3301 .is_some_and(|x| x < (pn_space.kind(), packet)) =>
3302 {
3303 persistent_congestion_start = Some(info.time_sent);
3304 }
3305 _ => {}
3306 }
3307 }
3308 }
3309 } else {
3310 if space.loss_time.is_none() {
3312 space.loss_time = Some(info.time_sent + loss_delay);
3315 }
3316 persistent_congestion_start = None;
3317 }
3318
3319 prev_packet = Some(packet);
3320 }
3321
3322 self.handle_lost_packets(
3323 pn_space,
3324 path_id,
3325 now,
3326 lost_packets,
3327 lost_mtu_probe,
3328 loss_delay,
3329 in_persistent_congestion,
3330 size_of_lost_packets,
3331 );
3332 }
3333
3334 fn discard_path(&mut self, path_id: PathId, now: Instant) {
3336 trace!(%path_id, "dropping path state");
3337 let path = self.path_data(path_id);
3338 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3339
3340 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
3342 .for_path(path_id)
3343 .sent_packets
3344 .iter()
3345 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
3346 .map(|(pn, info)| {
3347 size_of_lost_packets += info.size as u64;
3348 pn
3349 })
3350 .collect();
3351
3352 if !lost_pns.is_empty() {
3353 trace!(
3354 %path_id,
3355 count = lost_pns.len(),
3356 lost_bytes = size_of_lost_packets,
3357 "packets lost on path abandon"
3358 );
3359 self.handle_lost_packets(
3360 SpaceId::Data,
3361 path_id,
3362 now,
3363 lost_pns,
3364 in_flight_mtu_probe,
3365 Duration::ZERO,
3366 false,
3367 size_of_lost_packets,
3368 );
3369 }
3370 let path_stats = self.path_stats.discard(&path_id);
3373 self.partial_stats += path_stats;
3374 self.paths.remove(&path_id);
3375 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
3376
3377 self.events.push_back(
3378 PathEvent::Discarded {
3379 id: path_id,
3380 path_stats: Box::new(path_stats),
3381 }
3382 .into(),
3383 );
3384 }
3385
3386 fn handle_lost_packets(
3387 &mut self,
3388 pn_space: SpaceId,
3389 path_id: PathId,
3390 now: Instant,
3391 lost_packets: Vec<u64>,
3392 lost_mtu_probe: Option<u64>,
3393 loss_delay: Duration,
3394 in_persistent_congestion: bool,
3395 size_of_lost_packets: u64,
3396 ) {
3397 debug_assert!(lost_packets.is_sorted(), "lost_packets must be sorted");
3398
3399 self.drain_lost_packets(now, pn_space, path_id);
3400
3401 if let Some(largest_lost) = lost_packets.last().cloned() {
3403 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
3404 let largest_lost_sent = self.spaces[pn_space]
3405 .for_path(path_id)
3406 .sent_packets
3407 .get(largest_lost)
3408 .unwrap()
3409 .time_sent;
3410 let path_stats = self.path_stats.for_path(path_id);
3411 path_stats.lost_packets += lost_packets.len() as u64;
3412 path_stats.lost_bytes += size_of_lost_packets;
3413 trace!(
3414 %path_id,
3415 count = lost_packets.len(),
3416 lost_bytes = size_of_lost_packets,
3417 "packets lost",
3418 );
3419
3420 for &packet in &lost_packets {
3421 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
3422 continue;
3423 };
3424 self.qlog
3425 .emit_packet_lost(packet, &info, loss_delay, pn_space.kind(), now);
3426 self.paths
3427 .get_mut(&path_id)
3428 .unwrap()
3429 .remove_in_flight(&info);
3430
3431 for frame in info.stream_frames {
3432 self.streams.retransmit(frame);
3433 }
3434 self.spaces[pn_space].pending |= info.retransmits;
3435 let path = self.path_data_mut(path_id);
3436 path.mtud.on_non_probe_lost(packet, info.size);
3437 path.congestion.on_packet_lost(info.size, packet, now);
3438
3439 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
3440 packet,
3441 LostPacket {
3442 time_sent: info.time_sent,
3443 },
3444 );
3445 }
3446
3447 let path = self.path_data_mut(path_id);
3448 if path.mtud.black_hole_detected(now) {
3449 path.congestion.on_mtu_update(path.mtud.current_mtu());
3450 if let Some(max_datagram_size) = self.datagrams().max_size()
3451 && self.datagrams.drop_oversized(max_datagram_size)
3452 && self.datagrams.send_blocked
3453 {
3454 self.datagrams.send_blocked = false;
3455 self.events.push_back(Event::DatagramsUnblocked);
3456 }
3457 self.path_stats.for_path(path_id).black_holes_detected += 1;
3458 }
3459
3460 let lost_ack_eliciting =
3462 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
3463
3464 if lost_ack_eliciting {
3465 self.path_stats.for_path(path_id).congestion_events += 1;
3466 self.path_data_mut(path_id).congestion.on_congestion_event(
3467 now,
3468 largest_lost_sent,
3469 in_persistent_congestion,
3470 false,
3471 size_of_lost_packets,
3472 largest_lost,
3473 );
3474 }
3475 }
3476
3477 if let Some(packet) = lost_mtu_probe {
3479 let info = self.spaces[SpaceId::Data]
3480 .for_path(path_id)
3481 .take(packet)
3482 .unwrap(); self.paths
3485 .get_mut(&path_id)
3486 .unwrap()
3487 .remove_in_flight(&info);
3488 self.path_data_mut(path_id).mtud.on_probe_lost();
3489 self.path_stats.for_path(path_id).lost_plpmtud_probes += 1;
3490 }
3491 }
3492
3493 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
3499 SpaceId::iter()
3500 .filter_map(|id| {
3501 self.spaces[id]
3502 .number_spaces
3503 .get(&path_id)
3504 .and_then(|pns| pns.loss_time)
3505 .map(|time| (time, id))
3506 })
3507 .min_by_key(|&(time, _)| time)
3508 }
3509
3510 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3518 let path = self.path(path_id)?;
3519 let pto_count = path.pto_count;
3520
3521 let max_interval = if path.rtt.get() > SLOW_RTT_THRESHOLD {
3523 (path.rtt.get() * 3) / 2
3525 } else if let Some(idle) = path.idle_timeout.or(self.idle_timeout)
3526 && idle <= MIN_IDLE_FOR_FAST_PTO
3527 {
3528 MAX_PTO_FAST_INTERVAL
3531 } else {
3532 MAX_PTO_INTERVAL
3534 };
3535
3536 if path_id == PathId::ZERO
3537 && path.in_flight.ack_eliciting == 0
3538 && !self.peer_completed_handshake_address_validation()
3539 {
3540 let space = match self.highest_space {
3546 SpaceKind::Handshake => SpaceId::Handshake,
3547 _ => SpaceId::Initial,
3548 };
3549
3550 let backoff = 2u32.pow(path.pto_count.min(MAX_BACKOFF_EXPONENT));
3551 let duration = path.rtt.pto_base() * backoff;
3552 let duration = duration.min(max_interval);
3553 return Some((now + duration, space));
3554 }
3555
3556 let mut result = None;
3557 for space in SpaceId::iter() {
3558 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3559 continue;
3560 };
3561
3562 if space == SpaceId::Data && !self.is_handshake_confirmed() {
3563 continue;
3567 }
3568
3569 if !pns.has_in_flight() {
3570 continue;
3571 }
3572
3573 let duration = {
3578 let max_ack_delay = if space == SpaceId::Data {
3579 self.ack_frequency.max_ack_delay_for_pto()
3580 } else {
3581 Duration::ZERO
3582 };
3583 let pto_base = path.rtt.pto_base() + max_ack_delay;
3584 let mut duration = pto_base;
3585 for i in 1..=pto_count {
3586 let exponential_duration = pto_base * 2u32.pow(i.min(MAX_BACKOFF_EXPONENT));
3587 let max_duration = duration + max_interval;
3588 duration = exponential_duration.min(max_duration);
3589 }
3590 duration
3591 };
3592
3593 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3594 continue;
3595 };
3596 let pto = last_ack_eliciting + duration;
3599 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3600 if path.anti_amplification_blocked(1) {
3601 continue;
3603 }
3604 if path.in_flight.ack_eliciting == 0 {
3605 continue;
3607 }
3608 result = Some((pto, space));
3609 }
3610 }
3611 result
3612 }
3613
3614 fn peer_completed_handshake_address_validation(&self) -> bool {
3616 if self.side.is_server() || self.state.is_closed() {
3617 return true;
3618 }
3619 self.spaces[SpaceId::Handshake]
3623 .path_space(PathId::ZERO)
3624 .and_then(|pns| pns.largest_acked_packet_pn)
3625 .is_some()
3626 || self.spaces[SpaceId::Data]
3627 .path_space(PathId::ZERO)
3628 .and_then(|pns| pns.largest_acked_packet_pn)
3629 .is_some()
3630 || (self.crypto_state.has_keys(EncryptionLevel::OneRtt)
3631 && !self.crypto_state.has_keys(EncryptionLevel::Handshake))
3632 }
3633
3634 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3642 if self.state.is_closed() {
3643 return;
3647 }
3648
3649 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3650 self.timers.set(
3652 Timer::PerPath(path_id, PathTimer::LossDetection),
3653 loss_time,
3654 self.qlog.with_time(now),
3655 );
3656 return;
3657 }
3658
3659 if !self.abandoned_paths.contains(&path_id)
3662 && let Some((timeout, _)) = self.pto_time_and_space(now, path_id)
3663 {
3664 self.timers.set(
3665 Timer::PerPath(path_id, PathTimer::LossDetection),
3666 timeout,
3667 self.qlog.with_time(now),
3668 );
3669 } else {
3670 self.timers.stop(
3671 Timer::PerPath(path_id, PathTimer::LossDetection),
3672 self.qlog.with_time(now),
3673 );
3674 }
3675 }
3676
3677 fn max_pto_for_space(&self, space: SpaceKind) -> Duration {
3681 self.paths
3682 .keys()
3683 .map(|path_id| self.pto(space, *path_id))
3684 .max()
3685 .unwrap_or_else(|| {
3686 let rtt = self.config.initial_rtt;
3690 let max_ack_delay = match space {
3691 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3692 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3693 };
3694 rtt + cmp::max(4 * (rtt / 2), TIMER_GRANULARITY) + max_ack_delay
3695 })
3696 }
3697
3698 fn pto(&self, space: SpaceKind, path_id: PathId) -> Duration {
3703 let max_ack_delay = match space {
3704 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3705 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3706 };
3707 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3708 }
3709
3710 fn on_packet_authenticated(
3711 &mut self,
3712 now: Instant,
3713 space_id: SpaceKind,
3714 path_id: PathId,
3715 ecn: Option<EcnCodepoint>,
3716 packet_number: Option<u64>,
3717 spin: bool,
3718 is_1rtt: bool,
3719 remote: &FourTuple,
3720 ) {
3721 let is_on_path = self
3728 .path_data(path_id)
3729 .network_path
3730 .is_probably_same_path(remote);
3731
3732 self.total_authed_packets += 1;
3733 self.reset_keep_alive(path_id, now);
3734 self.reset_idle_timeout(now, space_id, path_id);
3735 self.path_data_mut(path_id).permit_idle_reset = true;
3736
3737 if is_on_path {
3740 self.receiving_ecn |= ecn.is_some();
3741 if let Some(x) = ecn {
3742 let space = &mut self.spaces[space_id];
3743 space.for_path(path_id).ecn_counters += x;
3744
3745 if x.is_ce() {
3746 space
3747 .for_path(path_id)
3748 .pending_acks
3749 .set_immediate_ack_required();
3750 }
3751 }
3752 }
3753
3754 let Some(packet_number) = packet_number else {
3755 return;
3756 };
3757 match &self.side {
3758 ConnectionSide::Client { .. } => {
3759 if space_id == SpaceKind::Handshake
3763 && let Some(hs) = self.state.as_handshake_mut()
3764 {
3765 hs.allow_server_migration = false;
3766 }
3767 }
3768 ConnectionSide::Server { .. } => {
3769 if self.crypto_state.has_keys(EncryptionLevel::Initial)
3770 && space_id == SpaceKind::Handshake
3771 {
3772 self.discard_space(now, SpaceKind::Initial);
3774 }
3775 if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) && is_1rtt {
3776 self.set_key_discard_timer(now, space_id)
3778 }
3779 }
3780 }
3781 let space = self.spaces[space_id].for_path(path_id);
3782
3783 space.pending_acks.insert_one(packet_number, now);
3784 if packet_number >= space.largest_received_packet_number.unwrap_or_default() {
3785 space.largest_received_packet_number = Some(packet_number);
3786
3787 if is_on_path {
3789 self.spin = self.side.is_client() ^ spin;
3790 }
3791 }
3792 }
3793
3794 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceKind, path_id: PathId) {
3799 if let Some(timeout) = self.idle_timeout {
3801 if self.state.is_closed() {
3802 self.timers
3803 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3804 } else {
3805 let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space));
3806 self.timers.set(
3807 Timer::Conn(ConnTimer::Idle),
3808 now + dt,
3809 self.qlog.with_time(now),
3810 );
3811 }
3812 }
3813
3814 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3816 if self.state.is_closed() {
3817 self.timers.stop(
3818 Timer::PerPath(path_id, PathTimer::PathIdle),
3819 self.qlog.with_time(now),
3820 );
3821 } else {
3822 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3823 self.timers.set(
3824 Timer::PerPath(path_id, PathTimer::PathIdle),
3825 now + dt,
3826 self.qlog.with_time(now),
3827 );
3828 }
3829 }
3830 }
3831
3832 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3834 if !self.state.is_established() {
3835 return;
3836 }
3837
3838 if let Some(interval) = self.config.keep_alive_interval {
3839 self.timers.set(
3840 Timer::Conn(ConnTimer::KeepAlive),
3841 now + interval,
3842 self.qlog.with_time(now),
3843 );
3844 }
3845
3846 if let Some(interval) = self.path_data(path_id).keep_alive {
3847 self.timers.set(
3848 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3849 now + interval,
3850 self.qlog.with_time(now),
3851 );
3852 }
3853 }
3854
3855 fn reset_cid_retirement(&mut self, now: Instant) {
3857 if let Some((_path, t)) = self.next_cid_retirement() {
3858 self.timers.set(
3859 Timer::Conn(ConnTimer::PushNewCid),
3860 t,
3861 self.qlog.with_time(now),
3862 );
3863 }
3864 }
3865
3866 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3868 self.local_cid_state
3869 .iter()
3870 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3871 .min_by_key(|(_path_id, timeout)| *timeout)
3872 }
3873
3874 pub(crate) fn handle_first_packet(
3879 &mut self,
3880 now: Instant,
3881 network_path: FourTuple,
3882 ecn: Option<EcnCodepoint>,
3883 packet_number: u64,
3884 packet: InitialPacket,
3885 remaining: Option<BytesMut>,
3886 ) -> Result<(), ConnectionError> {
3887 let span = trace_span!("first recv");
3888 let _guard = span.enter();
3889 debug_assert!(self.side.is_server());
3890 let len = packet.header_data.len() + packet.payload.len();
3891 let path_id = PathId::ZERO;
3892 self.path_data_mut(path_id).total_recvd = len as u64;
3893
3894 if let Some(hs) = self.state.as_handshake_mut() {
3895 hs.expected_token = packet.header.token.clone();
3896 } else {
3897 unreachable!("first packet must be delivered in Handshake state");
3898 }
3899
3900 self.on_packet_authenticated(
3902 now,
3903 SpaceKind::Initial,
3904 path_id,
3905 ecn,
3906 Some(packet_number),
3907 false,
3908 false,
3909 &network_path,
3910 );
3911
3912 let packet: Packet = packet.into();
3913
3914 let mut qlog = QlogRecvPacket::new(len);
3915 qlog.header(&packet.header, Some(packet_number), path_id);
3916
3917 self.process_decrypted_packet(
3918 now,
3919 network_path,
3920 path_id,
3921 Some(packet_number),
3922 packet,
3923 &mut qlog,
3924 )?;
3925 self.qlog.emit_packet_received(qlog, now);
3926 if let Some(data) = remaining {
3927 self.handle_coalesced(now, network_path, path_id, ecn, data);
3928 }
3929
3930 self.qlog.emit_recovery_metrics(
3931 path_id,
3932 &mut self
3933 .paths
3934 .get_mut(&path_id)
3935 .expect("path_id was supplied by the caller for an active path")
3936 .data,
3937 now,
3938 );
3939
3940 Ok(())
3941 }
3942
3943 fn init_0rtt(&mut self, now: Instant) {
3944 let Some((header, packet)) = self.crypto_state.session.early_crypto() else {
3945 return;
3946 };
3947 if self.side.is_client() {
3948 match self.crypto_state.session.transport_parameters() {
3949 Ok(params) => {
3950 let params = params
3951 .expect("crypto layer didn't supply transport parameters with ticket");
3952 let params = TransportParameters {
3954 initial_src_cid: None,
3955 original_dst_cid: None,
3956 preferred_address: None,
3957 retry_src_cid: None,
3958 stateless_reset_token: None,
3959 min_ack_delay: None,
3960 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3961 max_ack_delay: TransportParameters::default().max_ack_delay,
3962 initial_max_path_id: None,
3963 ..params
3964 };
3965 self.set_peer_params(params);
3966 self.qlog.emit_peer_transport_params_restored(self, now);
3967 }
3968 Err(e) => {
3969 error!("session ticket has malformed transport parameters: {}", e);
3970 return;
3971 }
3972 }
3973 }
3974 trace!("0-RTT enabled");
3975 self.crypto_state.enable_zero_rtt(header, packet);
3976 }
3977
3978 fn read_crypto(
3979 &mut self,
3980 space: SpaceId,
3981 crypto: &frame::Crypto,
3982 payload_len: usize,
3983 ) -> Result<(), TransportError> {
3984 let expected = if !self.state.is_handshake() {
3985 SpaceId::Data
3986 } else if self.highest_space == SpaceKind::Initial {
3987 SpaceId::Initial
3988 } else {
3989 SpaceId::Handshake
3992 };
3993 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3997
3998 let end = crypto.offset + crypto.data.len() as u64;
3999 if space < expected
4000 && end
4001 > self.crypto_state.spaces[space.kind()]
4002 .crypto_stream
4003 .bytes_read()
4004 {
4005 warn!(
4006 "received new {:?} CRYPTO data when expecting {:?}",
4007 space, expected
4008 );
4009 return Err(TransportError::PROTOCOL_VIOLATION(
4010 "new data at unexpected encryption level",
4011 ));
4012 }
4013
4014 let crypto_space = &mut self.crypto_state.spaces[space.kind()];
4015 let max = end.saturating_sub(crypto_space.crypto_stream.bytes_read());
4016 if max > self.config.crypto_buffer_size as u64 {
4017 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
4018 }
4019
4020 crypto_space
4021 .crypto_stream
4022 .insert(crypto.offset, crypto.data.clone(), payload_len);
4023 while let Some(chunk) = crypto_space.crypto_stream.read(usize::MAX, true) {
4024 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
4025 if self.crypto_state.session.read_handshake(&chunk.bytes)? {
4026 self.events.push_back(Event::HandshakeDataReady);
4027 }
4028 }
4029
4030 Ok(())
4031 }
4032
4033 fn write_crypto(&mut self) {
4034 loop {
4035 let space = self.highest_space;
4036 let mut outgoing = Vec::new();
4037 if let Some(crypto) = self.crypto_state.session.write_handshake(&mut outgoing) {
4038 match space {
4039 SpaceKind::Initial => {
4040 self.upgrade_crypto(SpaceKind::Handshake, crypto);
4041 }
4042 SpaceKind::Handshake => {
4043 self.upgrade_crypto(SpaceKind::Data, crypto);
4044 }
4045 SpaceKind::Data => unreachable!("got updated secrets during 1-RTT"),
4046 }
4047 }
4048 if outgoing.is_empty() {
4049 if space == self.highest_space {
4050 break;
4051 } else {
4052 continue;
4054 }
4055 }
4056 let offset = self.crypto_state.spaces[space].crypto_offset;
4057 let outgoing = Bytes::from(outgoing);
4058 if let Some(hs) = self.state.as_handshake_mut()
4059 && space == SpaceKind::Initial
4060 && offset == 0
4061 && self.side.is_client()
4062 {
4063 hs.client_hello = Some(outgoing.clone());
4064 }
4065 self.crypto_state.spaces[space].crypto_offset += outgoing.len() as u64;
4066 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
4067 self.spaces[space].pending.crypto.push_back(frame::Crypto {
4068 offset,
4069 data: outgoing,
4070 });
4071 }
4072 }
4073
4074 fn upgrade_crypto(&mut self, space: SpaceKind, crypto: Keys) {
4076 debug_assert!(
4077 !self.crypto_state.has_keys(space.encryption_level()),
4078 "already reached packet space {space:?}"
4079 );
4080 trace!("{:?} keys ready", space);
4081 if space == SpaceKind::Data {
4082 self.crypto_state.next_crypto = Some(
4084 self.crypto_state
4085 .session
4086 .next_1rtt_keys()
4087 .expect("handshake should be complete"),
4088 );
4089 }
4090
4091 self.crypto_state.spaces[space].keys = Some(crypto);
4092 debug_assert!(space > self.highest_space);
4093 self.highest_space = space;
4094 if space == SpaceKind::Data && self.side.is_client() {
4095 self.crypto_state.discard_zero_rtt();
4097 }
4098 }
4099
4100 fn discard_space(&mut self, now: Instant, space: SpaceKind) {
4101 debug_assert!(space != SpaceKind::Data);
4102 trace!("discarding {:?} keys", space);
4103 if space == SpaceKind::Initial {
4104 if let ConnectionSide::Client { token, .. } = &mut self.side {
4106 *token = Bytes::new();
4107 }
4108 }
4109 self.crypto_state.spaces[space].keys = None;
4110 let space = &mut self.spaces[space];
4111 let pns = space.for_path(PathId::ZERO);
4112 pns.time_of_last_ack_eliciting_packet = None;
4113 pns.loss_time = None;
4114 pns.loss_probes = 0;
4115 let sent_packets = mem::take(&mut pns.sent_packets);
4116 let path = self
4117 .paths
4118 .get_mut(&PathId::ZERO)
4119 .expect("PathId::ZERO is alive while Initial/Handshake spaces exist");
4120 for (_, packet) in sent_packets.into_iter() {
4121 path.data.remove_in_flight(&packet);
4122 }
4123
4124 self.set_loss_detection_timer(now, PathId::ZERO)
4125 }
4126
4127 fn handle_coalesced(
4128 &mut self,
4129 now: Instant,
4130 network_path: FourTuple,
4131 path_id: PathId,
4132 ecn: Option<EcnCodepoint>,
4133 data: BytesMut,
4134 ) {
4135 self.path_data_mut(path_id)
4136 .inc_total_recvd(data.len() as u64);
4137 let mut remaining = Some(data);
4138 let cid_len = self
4139 .local_cid_state
4140 .values()
4141 .map(|cid_state| cid_state.cid_len())
4142 .next()
4143 .expect("one cid_state must exist");
4144 while let Some(data) = remaining {
4145 match PartialDecode::new(
4146 data,
4147 &FixedLengthConnectionIdParser::new(cid_len),
4148 &[self.version],
4149 self.endpoint_config.grease_quic_bit,
4150 ) {
4151 Ok((partial_decode, rest)) => {
4152 remaining = rest;
4153 self.handle_decode(now, network_path, path_id, ecn, partial_decode);
4154 }
4155 Err(e) => {
4156 trace!("malformed header: {}", e);
4157 return;
4158 }
4159 }
4160 }
4161 }
4162
4163 fn handle_decode(
4169 &mut self,
4170 now: Instant,
4171 network_path: FourTuple,
4172 path_id: PathId,
4173 ecn: Option<EcnCodepoint>,
4174 partial_decode: PartialDecode,
4175 ) {
4176 let qlog = QlogRecvPacket::new(partial_decode.len());
4177 if let Some(decoded) = self
4178 .crypto_state
4179 .unprotect_header(partial_decode, self.peer_params.stateless_reset_token)
4180 {
4181 self.handle_packet(
4182 now,
4183 network_path,
4184 path_id,
4185 ecn,
4186 decoded.packet,
4187 decoded.stateless_reset,
4188 qlog,
4189 );
4190 }
4191 }
4192
4193 fn handle_packet(
4200 &mut self,
4201 now: Instant,
4202 network_path: FourTuple,
4203 path_id: PathId,
4204 ecn: Option<EcnCodepoint>,
4205 packet: Option<Packet>,
4206 stateless_reset: bool,
4207 mut qlog: QlogRecvPacket,
4208 ) {
4209 self.path_stats.for_path(path_id).udp_rx.ios += 1;
4210
4211 if let Some(ref packet) = packet {
4212 trace!(
4213 "got {:?} packet ({} bytes) from {} using id {}",
4214 packet.header.space(),
4215 packet.payload.len() + packet.header_data.len(),
4216 network_path,
4217 packet.header.dst_cid(),
4218 );
4219 }
4220
4221 let was_closed = self.state.is_closed();
4222 let was_drained = self.state.is_drained();
4223
4224 let decrypted = match packet {
4226 None => Err(None),
4227 Some(mut packet) => self
4228 .decrypt_packet(now, path_id, &mut packet)
4229 .map(move |number| (packet, number)),
4230 };
4231 let result = match decrypted {
4232 _ if stateless_reset => {
4233 debug!("got stateless reset");
4234 Err(ConnectionError::Reset)
4235 }
4236 Err(Some(e)) => {
4237 warn!("illegal packet: {}", e);
4238 Err(e.into())
4239 }
4240 Err(None) => {
4241 debug!("failed to authenticate packet");
4242 self.authentication_failures += 1;
4243 let integrity_limit = self
4244 .crypto_state
4245 .integrity_limit(self.highest_space)
4246 .unwrap();
4247 if self.authentication_failures > integrity_limit {
4248 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
4249 } else {
4250 return;
4251 }
4252 }
4253 Ok((packet, pn)) => {
4254 qlog.header(&packet.header, pn, path_id);
4256 let span = match pn {
4257 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
4258 None => trace_span!("recv", space = ?packet.header.space()),
4259 };
4260 let _guard = span.enter();
4261
4262 if self.is_handshaking()
4270 && self
4271 .path(path_id)
4272 .map(|path_data| {
4273 !path_data.network_path.is_probably_same_path(&network_path)
4274 })
4275 .unwrap_or(false)
4276 {
4277 if let Some(hs) = self.state.as_handshake()
4278 && hs.allow_server_migration
4279 {
4280 trace!(
4281 %network_path,
4282 prev = %self.path_data(path_id).network_path,
4283 "server migrated to new remote",
4284 );
4285 self.path_data_mut(path_id).network_path = network_path;
4286 self.qlog.emit_tuple_assigned(path_id, network_path, now);
4287 } else {
4288 debug!(
4289 recv_path = %network_path,
4290 expected_path = %self.path_data_mut(path_id).network_path,
4291 "discarding packet with unexpected remote during handshake",
4292 );
4293 return;
4294 }
4295 }
4296
4297 let dedup = self.spaces[packet.header.space()]
4298 .path_space_mut(path_id)
4299 .map(|pns| &mut pns.dedup);
4300 if pn.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
4301 debug!("discarding possible duplicate packet");
4302 self.qlog.emit_packet_received(qlog, now);
4303 return;
4304 } else if self.state.is_handshake() && packet.header.is_short() {
4305 trace!("dropping short packet during handshake");
4307 self.qlog.emit_packet_received(qlog, now);
4308 return;
4309 } else {
4310 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header
4311 && let Some(hs) = self.state.as_handshake()
4312 && self.side.is_server()
4313 && token != &hs.expected_token
4314 {
4315 warn!("discarding Initial with invalid retry token");
4319 self.qlog.emit_packet_received(qlog, now);
4320 return;
4321 }
4322
4323 if !self.state.is_closed() {
4324 let spin = match packet.header {
4325 Header::Short { spin, .. } => spin,
4326 _ => false,
4327 };
4328
4329 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
4330 self.ensure_path(path_id, network_path, now, pn);
4332 }
4333 if self.paths.contains_key(&path_id) {
4334 self.on_packet_authenticated(
4335 now,
4336 packet.header.space(),
4337 path_id,
4338 ecn,
4339 pn,
4340 spin,
4341 packet.header.is_1rtt(),
4342 &network_path,
4343 );
4344 }
4345 }
4346
4347 let res = self.process_decrypted_packet(
4348 now,
4349 network_path,
4350 path_id,
4351 pn,
4352 packet,
4353 &mut qlog,
4354 );
4355
4356 self.qlog.emit_packet_received(qlog, now);
4357 res
4358 }
4359 }
4360 };
4361
4362 if let Err(conn_err) = result {
4364 match conn_err {
4365 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
4366 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
4367 ConnectionError::Reset
4368 | ConnectionError::TransportError(TransportError {
4369 code: TransportErrorCode::AEAD_LIMIT_REACHED,
4370 ..
4371 }) => {
4372 self.state.move_to_drained(Some(conn_err));
4373 }
4374 ConnectionError::TimedOut => {
4375 unreachable!("timeouts aren't generated by packet processing");
4376 }
4377 ConnectionError::TransportError(err) => {
4378 debug!("closing connection due to transport error: {}", err);
4379 self.state.move_to_closed(err);
4380 }
4381 ConnectionError::VersionMismatch => {
4382 self.state.move_to_draining(Some(conn_err));
4383 }
4384 ConnectionError::LocallyClosed => {
4385 unreachable!("LocallyClosed isn't generated by packet processing");
4386 }
4387 ConnectionError::CidsExhausted => {
4388 unreachable!("CidsExhausted isn't generated by packet processing");
4389 }
4390 };
4391 }
4392
4393 if !was_closed && self.state.is_closed() {
4394 self.close_common();
4395 if !self.state.is_drained() {
4396 self.set_close_timer(now);
4397 }
4398 }
4399 if !was_drained && self.state.is_drained() {
4400 self.endpoint_events.push_back(EndpointEventInner::Drained);
4401 self.timers
4404 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
4405 }
4406
4407 if matches!(self.state.as_type(), StateType::Closed) {
4414 if self
4432 .paths
4433 .get(&path_id)
4434 .map(|p| p.data.validated && p.data.network_path == network_path)
4435 .unwrap_or(false)
4436 {
4437 self.connection_close_pending = true;
4438 }
4439 }
4440 }
4441
4442 fn process_decrypted_packet(
4443 &mut self,
4444 now: Instant,
4445 network_path: FourTuple,
4446 path_id: PathId,
4447 number: Option<u64>,
4448 packet: Packet,
4449 qlog: &mut QlogRecvPacket,
4450 ) -> Result<(), ConnectionError> {
4451 if !self.paths.contains_key(&path_id) {
4452 trace!(%path_id, ?number, "discarding packet for unknown path");
4456 return Ok(());
4457 }
4458 let state = match self.state.as_type() {
4459 StateType::Established => {
4460 match packet.header.space() {
4461 SpaceKind::Data => self.process_payload(
4462 now,
4463 network_path,
4464 path_id,
4465 number.unwrap(),
4466 packet,
4467 qlog,
4468 )?,
4469 _ if packet.header.has_frames() => {
4470 self.process_early_payload(now, path_id, packet, qlog)?
4471 }
4472 _ => {
4473 trace!("discarding unexpected pre-handshake packet");
4474 }
4475 }
4476 return Ok(());
4477 }
4478 StateType::Closed => {
4479 for result in frame::Iter::new(packet.payload.freeze())? {
4480 let frame = match result {
4481 Ok(frame) => frame,
4482 Err(err) => {
4483 debug!("frame decoding error: {err:?}");
4484 continue;
4485 }
4486 };
4487 qlog.frame(&frame);
4488
4489 if let Frame::Padding = frame {
4490 continue;
4491 };
4492
4493 self.path_stats
4494 .for_path(path_id)
4495 .frame_rx
4496 .record(frame.ty());
4497
4498 if let Frame::Close(_error) = frame {
4499 self.state.move_to_draining(None);
4500 break;
4501 }
4502 }
4503 return Ok(());
4504 }
4505 StateType::Draining | StateType::Drained => return Ok(()),
4506 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
4507 };
4508
4509 match packet.header {
4510 Header::Retry {
4511 src_cid: remote_cid,
4512 ..
4513 } => {
4514 debug_assert_eq!(path_id, PathId::ZERO);
4515 if self.side.is_server() {
4516 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
4517 }
4518
4519 let is_valid_retry = self
4520 .remote_cids
4521 .get(&path_id)
4522 .map(|cids| cids.active())
4523 .map(|orig_dst_cid| {
4524 self.crypto_state.session.is_valid_retry(
4525 orig_dst_cid,
4526 &packet.header_data,
4527 &packet.payload,
4528 )
4529 })
4530 .unwrap_or_default();
4531 if self.total_authed_packets > 1
4532 || packet.payload.len() <= 16 || !is_valid_retry
4534 {
4535 trace!("discarding invalid Retry");
4536 return Ok(());
4544 }
4545
4546 trace!("retrying with CID {}", remote_cid);
4547 let client_hello = state.client_hello.take().unwrap();
4548 self.retry_src_cid = Some(remote_cid);
4549 self.remote_cids
4550 .get_mut(&path_id)
4551 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
4552 .update_initial_cid(remote_cid);
4553 self.remote_handshake_cid = remote_cid;
4554
4555 let space = &mut self.spaces[SpaceId::Initial];
4556 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
4557 self.on_packet_acked(now, PathId::ZERO, 0, info);
4558 };
4559
4560 self.discard_space(now, SpaceKind::Initial); let crypto_space = &mut self.crypto_state.spaces[SpaceKind::Initial];
4563 crypto_space.keys = Some(
4564 self.crypto_state
4565 .session
4566 .initial_keys(remote_cid, self.side.side()),
4567 );
4568 crypto_space.crypto_offset = client_hello.len() as u64;
4569
4570 let next_pn = self.spaces[SpaceId::Initial]
4571 .for_path(path_id)
4572 .next_packet_number;
4573 self.spaces[SpaceId::Initial] = {
4574 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
4575 space.for_path(path_id).next_packet_number = next_pn;
4576 space.pending.crypto.push_back(frame::Crypto {
4577 offset: 0,
4578 data: client_hello,
4579 });
4580 space
4581 };
4582
4583 let zero_rtt = mem::take(
4585 &mut self.spaces[SpaceId::Data]
4586 .for_path(PathId::ZERO)
4587 .sent_packets,
4588 );
4589 for (_, info) in zero_rtt.into_iter() {
4590 self.paths
4591 .get_mut(&PathId::ZERO)
4592 .unwrap()
4593 .remove_in_flight(&info);
4594 self.spaces[SpaceId::Data].pending |= info.retransmits;
4595 }
4596 self.streams.retransmit_all_for_0rtt();
4597
4598 let token_len = packet.payload.len() - 16;
4599 let ConnectionSide::Client { ref mut token, .. } = self.side else {
4600 unreachable!("we already short-circuited if we're server");
4601 };
4602 *token = packet.payload.freeze().split_to(token_len);
4603
4604 self.state = State::handshake(state::Handshake {
4605 expected_token: Bytes::new(),
4606 remote_cid_set: false,
4607 client_hello: None,
4608 allow_server_migration: true,
4609 });
4610 Ok(())
4611 }
4612 Header::Long {
4613 ty: LongType::Handshake,
4614 src_cid: remote_cid,
4615 dst_cid: local_cid,
4616 ..
4617 } => {
4618 debug_assert_eq!(path_id, PathId::ZERO);
4619 if remote_cid != self.remote_handshake_cid {
4620 debug!(
4621 "discarding packet with mismatched remote CID: {} != {}",
4622 self.remote_handshake_cid, remote_cid
4623 );
4624 return Ok(());
4625 }
4626 self.on_path_validated(path_id);
4627
4628 self.process_early_payload(now, path_id, packet, qlog)?;
4629 if self.state.is_closed() {
4630 return Ok(());
4631 }
4632
4633 if self.crypto_state.session.is_handshaking() {
4634 trace!("handshake ongoing");
4635 return Ok(());
4636 }
4637
4638 if self.side.is_client() {
4639 let params = self
4641 .crypto_state
4642 .session
4643 .transport_parameters()?
4644 .ok_or_else(|| {
4645 TransportError::new(
4646 TransportErrorCode::crypto(0x6d),
4647 "transport parameters missing".to_owned(),
4648 )
4649 })?;
4650
4651 if self.has_0rtt() {
4652 if !self.crypto_state.session.early_data_accepted().unwrap() {
4653 debug_assert!(self.side.is_client());
4654 debug!("0-RTT rejected");
4655 self.crypto_state.accepted_0rtt = false;
4656 self.streams.zero_rtt_rejected();
4657
4658 self.spaces[SpaceId::Data].pending = Retransmits::default();
4660
4661 let sent_packets = mem::take(
4663 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4664 );
4665 for (_, packet) in sent_packets.into_iter() {
4666 self.paths
4667 .get_mut(&path_id)
4668 .unwrap()
4669 .remove_in_flight(&packet);
4670 }
4671 } else {
4672 self.crypto_state.accepted_0rtt = true;
4673 params.validate_resumption_from(&self.peer_params)?;
4674 }
4675 }
4676 if let Some(token) = params.stateless_reset_token {
4677 let remote = self.path_data(path_id).network_path.remote;
4678 debug_assert!(!self.state.is_drained()); self.endpoint_events
4680 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4681 }
4682 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4683 self.issue_first_cids(now);
4684 } else {
4685 self.spaces[SpaceId::Data].pending.handshake_done = true;
4687 self.discard_space(now, SpaceKind::Handshake);
4688 self.events.push_back(Event::HandshakeConfirmed);
4689 trace!("handshake confirmed");
4690 }
4691
4692 self.events.push_back(Event::Connected);
4693 self.state.move_to_established();
4694 trace!("established");
4695
4696 self.issue_first_path_cids(now);
4699 Ok(())
4700 }
4701 Header::Initial(InitialHeader {
4702 src_cid: remote_cid,
4703 dst_cid: local_cid,
4704 ..
4705 }) => {
4706 debug_assert_eq!(path_id, PathId::ZERO);
4707 if !state.remote_cid_set {
4708 trace!("switching remote CID to {}", remote_cid);
4709 let mut state = state.clone();
4710 self.remote_cids
4711 .get_mut(&path_id)
4712 .expect("PathId::ZERO not yet abandoned")
4713 .update_initial_cid(remote_cid);
4714 self.remote_handshake_cid = remote_cid;
4715 self.original_remote_cid = remote_cid;
4716 state.remote_cid_set = true;
4717 self.state.move_to_handshake(state);
4718 } else if remote_cid != self.remote_handshake_cid {
4719 debug!(
4720 "discarding packet with mismatched remote CID: {} != {}",
4721 self.remote_handshake_cid, remote_cid
4722 );
4723 return Ok(());
4724 }
4725
4726 let starting_space = self.highest_space;
4727 self.process_early_payload(now, path_id, packet, qlog)?;
4728
4729 if self.side.is_server()
4730 && starting_space == SpaceKind::Initial
4731 && self.highest_space != SpaceKind::Initial
4732 {
4733 let params = self
4734 .crypto_state
4735 .session
4736 .transport_parameters()?
4737 .ok_or_else(|| {
4738 TransportError::new(
4739 TransportErrorCode::crypto(0x6d),
4740 "transport parameters missing".to_owned(),
4741 )
4742 })?;
4743 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4744 self.issue_first_cids(now);
4745 self.init_0rtt(now);
4746 }
4747 Ok(())
4748 }
4749 Header::Long {
4750 ty: LongType::ZeroRtt,
4751 ..
4752 } => {
4753 self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4754 Ok(())
4755 }
4756 Header::VersionNegotiate { .. } => {
4757 if self.total_authed_packets > 1 {
4758 return Ok(());
4759 }
4760 let supported = packet
4761 .payload
4762 .chunks(4)
4763 .any(|x| match <[u8; 4]>::try_from(x) {
4764 Ok(version) => self.version == u32::from_be_bytes(version),
4765 Err(_) => false,
4766 });
4767 if supported {
4768 return Ok(());
4769 }
4770 debug!("remote doesn't support our version");
4771 Err(ConnectionError::VersionMismatch)
4772 }
4773 Header::Short { .. } => unreachable!(
4774 "short packets received during handshake are discarded in handle_packet"
4775 ),
4776 }
4777 }
4778
4779 fn process_early_payload(
4781 &mut self,
4782 now: Instant,
4783 path_id: PathId,
4784 packet: Packet,
4785 #[allow(unused)] qlog: &mut QlogRecvPacket,
4786 ) -> Result<(), TransportError> {
4787 debug_assert_ne!(packet.header.space(), SpaceKind::Data);
4788 debug_assert_eq!(path_id, PathId::ZERO);
4789 let payload_len = packet.payload.len();
4790 let mut ack_eliciting = false;
4791 for result in frame::Iter::new(packet.payload.freeze())? {
4792 let frame = result?;
4793 qlog.frame(&frame);
4794 let span = match frame {
4795 Frame::Padding => continue,
4796 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4797 };
4798
4799 self.path_stats
4800 .for_path(path_id)
4801 .frame_rx
4802 .record(frame.ty());
4803
4804 let _guard = span.as_ref().map(|x| x.enter());
4805 ack_eliciting |= frame.is_ack_eliciting();
4806
4807 if frame.is_1rtt() && packet.header.space() != SpaceKind::Data {
4809 return Err(TransportError::PROTOCOL_VIOLATION(
4810 "illegal frame type in handshake",
4811 ));
4812 }
4813
4814 match frame {
4815 Frame::Padding | Frame::Ping => {}
4816 Frame::Crypto(frame) => {
4817 self.read_crypto(packet.header.space().into(), &frame, payload_len)?;
4818 }
4819 Frame::Ack(ack) => {
4820 self.on_ack_received(now, packet.header.space().into(), ack)?;
4821 }
4822 Frame::PathAck(ack) => {
4823 span.as_ref()
4824 .map(|span| span.record("path", tracing::field::display(&ack.path_id)));
4825 self.on_path_ack_received(now, packet.header.space().into(), ack)?;
4826 }
4827 Frame::Close(reason) => {
4828 self.state.move_to_draining(Some(reason.into()));
4829 return Ok(());
4830 }
4831 _ => {
4832 let mut err =
4833 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4834 err.frame = frame::MaybeFrame::Known(frame.ty());
4835 return Err(err);
4836 }
4837 }
4838 }
4839
4840 if ack_eliciting {
4841 self.spaces[packet.header.space()]
4843 .for_path(path_id)
4844 .pending_acks
4845 .set_immediate_ack_required();
4846 }
4847
4848 self.write_crypto();
4849 Ok(())
4850 }
4851
4852 fn process_payload(
4854 &mut self,
4855 now: Instant,
4856 network_path: FourTuple,
4857 path_id: PathId,
4858 number: u64,
4859 packet: Packet,
4860 #[allow(unused)] qlog: &mut QlogRecvPacket,
4861 ) -> Result<(), TransportError> {
4862 let is_multipath_negotiated = self.is_multipath_negotiated();
4863 let payload = packet.payload.freeze();
4864 let mut is_probing_packet = true;
4865 let mut close = None;
4866 let payload_len = payload.len();
4867 let mut ack_eliciting = false;
4868 let mut migration_observed_addr = None;
4871 for result in frame::Iter::new(payload)? {
4872 let frame = result?;
4873 qlog.frame(&frame);
4874 let span = match frame {
4875 Frame::Padding => continue,
4876 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4877 };
4878
4879 self.path_stats
4880 .for_path(path_id)
4881 .frame_rx
4882 .record(frame.ty());
4883 match &frame {
4886 Frame::Crypto(f) => {
4887 trace!(offset = f.offset, len = f.data.len(), "got frame CRYPTO");
4888 }
4889 Frame::Stream(f) => {
4890 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got frame STREAM");
4891 }
4892 Frame::Datagram(f) => {
4893 trace!(len = f.data.len(), "got frame DATAGRAM");
4894 }
4895 f => {
4896 trace!("got frame {f}");
4897 }
4898 }
4899
4900 let _guard = span.enter();
4901 if packet.header.is_0rtt() {
4902 match frame {
4903 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4904 return Err(TransportError::PROTOCOL_VIOLATION(
4905 "illegal frame type in 0-RTT",
4906 ));
4907 }
4908 _ => {
4909 if frame.is_1rtt() {
4910 return Err(TransportError::PROTOCOL_VIOLATION(
4911 "illegal frame type in 0-RTT",
4912 ));
4913 }
4914 }
4915 }
4916 }
4917 ack_eliciting |= frame.is_ack_eliciting();
4918
4919 match frame {
4921 Frame::Padding
4922 | Frame::PathChallenge(_)
4923 | Frame::PathResponse(_)
4924 | Frame::NewConnectionId(_)
4925 | Frame::ObservedAddr(_) => {}
4926 _ => {
4927 is_probing_packet = false;
4928 }
4929 }
4930
4931 match frame {
4932 Frame::Crypto(frame) => {
4933 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4934 }
4935 Frame::Stream(frame) => {
4936 if self.streams.received(frame, payload_len)?.should_transmit() {
4937 self.spaces[SpaceId::Data].pending.max_data = true;
4938 }
4939 }
4940 Frame::Ack(ack) => {
4941 self.on_ack_received(now, SpaceId::Data, ack)?;
4942 }
4943 Frame::PathAck(ack) => {
4944 if !self.is_multipath_negotiated() {
4945 return Err(TransportError::PROTOCOL_VIOLATION(
4946 "received PATH_ACK frame when multipath was not negotiated",
4947 ));
4948 }
4949 span.record("path", tracing::field::display(&ack.path_id));
4950 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4951 }
4952 Frame::Padding | Frame::Ping => {}
4953 Frame::Close(reason) => {
4954 close = Some(reason);
4955 }
4956 Frame::PathChallenge(challenge) => {
4957 let path = &mut self
4958 .path_mut(path_id)
4959 .expect("payload is processed only after the path becomes known");
4960 path.path_responses.push(number, challenge.0, network_path);
4961 if network_path.remote == path.network_path.remote {
4965 match self.peer_supports_ack_frequency() {
4973 true => self.immediate_ack(path_id),
4974 false => {
4975 self.ping_path(path_id).ok();
4976 }
4977 }
4978 }
4979 }
4980 Frame::PathResponse(response) => {
4981 if self
4983 .n0_nat_traversal
4984 .handle_path_response(network_path, response.0)
4985 {
4986 self.open_nat_traversed_paths(now);
4987 } else {
4988 let path = self
4991 .paths
4992 .get_mut(&path_id)
4993 .expect("payload is processed only after the path becomes known");
4994
4995 use PathTimer::*;
4996 use paths::OnPathResponseReceived::*;
4997 match path
4998 .data
4999 .on_path_response_received(now, response.0, network_path)
5000 {
5001 OnPath { was_open } => {
5002 let qlog = self.qlog.with_time(now);
5003
5004 self.timers.stop(
5005 Timer::PerPath(path_id, PathValidationFailed),
5006 qlog.clone(),
5007 );
5008 self.timers.stop(
5009 Timer::PerPath(path_id, AbandonFromValidation),
5010 qlog.clone(),
5011 );
5012
5013 let next_challenge = path
5014 .data
5015 .earliest_on_path_expiring_challenge()
5016 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
5017 self.timers.set_or_stop(
5018 Timer::PerPath(path_id, PathChallengeLost),
5019 next_challenge,
5020 qlog,
5021 );
5022
5023 if !was_open {
5024 if is_multipath_negotiated {
5025 self.events.push_back(Event::Path(PathEvent::Opened {
5026 id: path_id,
5027 }));
5028 }
5029 if let Some(observed) =
5030 path.data.last_observed_addr_report.as_ref()
5031 {
5032 self.events.push_back(Event::Path(
5033 PathEvent::ObservedAddr {
5034 id: path_id,
5035 addr: observed.socket_addr(),
5036 },
5037 ));
5038 }
5039 }
5040 if let Some((_, ref mut prev)) = path.prev {
5041 prev.reset_on_path_challenges();
5046 }
5047 }
5048 Ignored {
5049 sent_on,
5050 current_path,
5051 } => {
5052 debug!(%sent_on, %current_path, %response, "ignoring valid PATH_RESPONSE")
5053 }
5054 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
5055 }
5056 }
5057 }
5058 Frame::MaxData(frame::MaxData(bytes)) => {
5059 self.streams.received_max_data(bytes);
5060 }
5061 Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
5062 self.streams.received_max_stream_data(id, offset)?;
5063 }
5064 Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
5065 self.streams.received_max_streams(dir, count)?;
5066 }
5067 Frame::ResetStream(frame) => {
5068 if self.streams.received_reset(frame)?.should_transmit() {
5069 self.spaces[SpaceId::Data].pending.max_data = true;
5070 }
5071 }
5072 Frame::DataBlocked(DataBlocked(offset)) => {
5073 debug!(offset, "peer claims to be blocked at connection level");
5074 }
5075 Frame::StreamDataBlocked(StreamDataBlocked { id, offset }) => {
5076 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
5077 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
5078 return Err(TransportError::STREAM_STATE_ERROR(
5079 "STREAM_DATA_BLOCKED on send-only stream",
5080 ));
5081 }
5082 debug!(
5083 stream = %id,
5084 offset, "peer claims to be blocked at stream level"
5085 );
5086 }
5087 Frame::StreamsBlocked(StreamsBlocked { dir, limit }) => {
5088 if limit > MAX_STREAM_COUNT {
5089 return Err(TransportError::FRAME_ENCODING_ERROR(
5090 "unrepresentable stream limit",
5091 ));
5092 }
5093 debug!(
5094 "peer claims to be blocked opening more than {} {} streams",
5095 limit, dir
5096 );
5097 }
5098 Frame::StopSending(frame::StopSending { id, error_code }) => {
5099 if id.initiator() != self.side.side() {
5100 if id.dir() == Dir::Uni {
5101 debug!("got STOP_SENDING on recv-only {}", id);
5102 return Err(TransportError::STREAM_STATE_ERROR(
5103 "STOP_SENDING on recv-only stream",
5104 ));
5105 }
5106 } else if self.streams.is_local_unopened(id) {
5107 return Err(TransportError::STREAM_STATE_ERROR(
5108 "STOP_SENDING on unopened stream",
5109 ));
5110 }
5111 self.streams.received_stop_sending(id, error_code);
5112 }
5113 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
5114 if let Some(ref path_id) = path_id {
5115 span.record("path", tracing::field::display(&path_id));
5116 }
5117 let path_id = path_id.unwrap_or_default();
5118 match self.local_cid_state.get_mut(&path_id) {
5119 None => debug!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
5120 Some(cid_state) => {
5121 let allow_more_cids = cid_state
5122 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
5123
5124 let has_path = !self.abandoned_paths.contains(&path_id);
5128 let allow_more_cids = allow_more_cids && has_path;
5129
5130 debug_assert!(!self.state.is_drained()); self.endpoint_events
5132 .push_back(EndpointEventInner::RetireConnectionId(
5133 now,
5134 path_id,
5135 sequence,
5136 allow_more_cids,
5137 ));
5138 }
5139 }
5140 }
5141 Frame::NewConnectionId(frame) => {
5142 let path_id = if let Some(path_id) = frame.path_id {
5143 if !self.is_multipath_negotiated() {
5144 return Err(TransportError::PROTOCOL_VIOLATION(
5145 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
5146 ));
5147 }
5148 if path_id > self.local_max_path_id {
5149 return Err(TransportError::PROTOCOL_VIOLATION(
5150 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
5151 ));
5152 }
5153 path_id
5154 } else {
5155 PathId::ZERO
5156 };
5157
5158 if let Some(ref path_id) = frame.path_id {
5159 span.record("path", tracing::field::display(&path_id));
5160 }
5161
5162 if self.abandoned_paths.contains(&path_id) {
5163 trace!("ignoring issued CID for abandoned path");
5164 continue;
5165 }
5166 let remote_cids = self
5167 .remote_cids
5168 .entry(path_id)
5169 .or_insert_with(|| CidQueue::new(frame.id));
5170 if remote_cids.active().is_empty() {
5171 return Err(TransportError::PROTOCOL_VIOLATION(
5172 "NEW_CONNECTION_ID when CIDs aren't in use",
5173 ));
5174 }
5175 if frame.retire_prior_to > frame.sequence {
5176 return Err(TransportError::PROTOCOL_VIOLATION(
5177 "NEW_CONNECTION_ID retiring unissued CIDs",
5178 ));
5179 }
5180
5181 use crate::cid_queue::InsertError;
5182 match remote_cids.insert(frame) {
5183 Ok(None) => {
5184 self.open_nat_traversed_paths(now);
5185 }
5186 Ok(Some((retired, reset_token))) => {
5187 let pending_retired =
5188 &mut self.spaces[SpaceId::Data].pending.retire_cids;
5189 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
5192 if (pending_retired.len() as u64)
5195 .saturating_add(retired.end.saturating_sub(retired.start))
5196 > MAX_PENDING_RETIRED_CIDS
5197 {
5198 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
5199 "queued too many retired CIDs",
5200 ));
5201 }
5202 pending_retired.extend(retired.map(|seq| (path_id, seq)));
5203 self.set_reset_token(path_id, network_path.remote, reset_token);
5204 self.open_nat_traversed_paths(now);
5205 }
5206 Err(InsertError::ExceedsLimit) => {
5207 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
5208 }
5209 Err(InsertError::Retired) => {
5210 trace!("discarding already-retired");
5211 self.spaces[SpaceId::Data]
5215 .pending
5216 .retire_cids
5217 .push((path_id, frame.sequence));
5218 continue;
5219 }
5220 };
5221
5222 if self.side.is_server()
5223 && path_id == PathId::ZERO
5224 && self
5225 .remote_cids
5226 .get(&PathId::ZERO)
5227 .map(|cids| cids.active_seq() == 0)
5228 .unwrap_or_default()
5229 {
5230 self.update_remote_cid(PathId::ZERO);
5233 }
5234 }
5235 Frame::NewToken(NewToken { token }) => {
5236 let ConnectionSide::Client {
5237 token_store,
5238 server_name,
5239 ..
5240 } = &self.side
5241 else {
5242 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
5243 };
5244 if token.is_empty() {
5245 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
5246 }
5247 trace!("got new token");
5248 token_store.insert(server_name, token);
5249 }
5250 Frame::Datagram(datagram) => {
5251 if self
5252 .datagrams
5253 .received(datagram, &self.config.datagram_receive_buffer_size)?
5254 {
5255 self.events.push_back(Event::DatagramReceived);
5256 }
5257 }
5258 Frame::AckFrequency(ack_frequency) => {
5259 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
5262 continue;
5265 }
5266
5267 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
5269 space.pending_acks.set_ack_frequency_params(&ack_frequency);
5270
5271 if !self.abandoned_paths.contains(path_id)
5275 && let Some(timeout) = space
5276 .pending_acks
5277 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
5278 {
5279 self.timers.set(
5280 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
5281 timeout,
5282 self.qlog.with_time(now),
5283 );
5284 }
5285 }
5286 }
5287 Frame::ImmediateAck => {
5288 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
5290 pns.pending_acks.set_immediate_ack_required();
5291 }
5292 }
5293 Frame::HandshakeDone => {
5294 if self.side.is_server() {
5295 return Err(TransportError::PROTOCOL_VIOLATION(
5296 "client sent HANDSHAKE_DONE",
5297 ));
5298 }
5299 if self.crypto_state.has_keys(EncryptionLevel::Handshake) {
5300 self.discard_space(now, SpaceKind::Handshake);
5301 self.events.push_back(Event::HandshakeConfirmed);
5302 trace!("handshake confirmed");
5303 }
5304 }
5305 Frame::ObservedAddr(observed) => {
5306 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
5308 if !self
5309 .peer_params
5310 .address_discovery_role
5311 .should_report(&self.config.address_discovery_role)
5312 {
5313 return Err(TransportError::PROTOCOL_VIOLATION(
5314 "received OBSERVED_ADDRESS frame when not negotiated",
5315 ));
5316 }
5317 if packet.header.space() != SpaceKind::Data {
5319 return Err(TransportError::PROTOCOL_VIOLATION(
5320 "OBSERVED_ADDRESS frame outside data space",
5321 ));
5322 }
5323
5324 let path = self.path_data_mut(path_id);
5325 if path.network_path.is_probably_same_path(&network_path) {
5326 if let Some(updated) = path.update_observed_addr_report(observed)
5327 && path.open_status == paths::OpenStatus::Informed
5328 {
5329 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5330 id: path_id,
5331 addr: updated,
5332 }));
5333 }
5335 } else {
5336 migration_observed_addr = Some(observed)
5338 }
5339 }
5340 Frame::PathAbandon(frame::PathAbandon {
5341 path_id,
5342 error_code,
5343 }) => {
5344 span.record("path", tracing::field::display(&path_id));
5345 match self.close_path_inner(
5346 now,
5347 path_id,
5348 PathAbandonReason::RemoteAbandoned {
5349 error_code: error_code.into(),
5350 },
5351 ) {
5352 Ok(()) => {
5353 trace!("peer abandoned path");
5354 }
5355 Err(ClosePathError::ClosedPath) => {
5356 trace!("peer abandoned already closed path");
5357 }
5358 Err(ClosePathError::MultipathNotNegotiated) => {
5359 return Err(TransportError::PROTOCOL_VIOLATION(
5360 "received PATH_ABANDON frame when multipath was not negotiated",
5361 ));
5362 }
5363 Err(ClosePathError::LastOpenPath) => {
5364 error!(
5367 "peer abandoned last path but close_path_inner returned LastOpenPath"
5368 );
5369 }
5370 };
5371
5372 if let Some(path) = self.paths.get_mut(&path_id)
5374 && !mem::replace(&mut path.data.draining, true)
5375 {
5376 let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5377 let pto = path.data.rtt.pto_base() + ack_delay;
5378 self.timers.set(
5379 Timer::PerPath(path_id, PathTimer::PathDrained),
5380 now + 3 * pto,
5381 self.qlog.with_time(now),
5382 );
5383
5384 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
5385 }
5386 }
5387 Frame::PathStatusAvailable(info) => {
5388 span.record("path", tracing::field::display(&info.path_id));
5389 if self.is_multipath_negotiated() {
5390 self.on_path_status(
5391 info.path_id,
5392 PathStatus::Available,
5393 info.status_seq_no,
5394 );
5395 } else {
5396 return Err(TransportError::PROTOCOL_VIOLATION(
5397 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
5398 ));
5399 }
5400 }
5401 Frame::PathStatusBackup(info) => {
5402 span.record("path", tracing::field::display(&info.path_id));
5403 if self.is_multipath_negotiated() {
5404 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
5405 } else {
5406 return Err(TransportError::PROTOCOL_VIOLATION(
5407 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
5408 ));
5409 }
5410 }
5411 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
5412 span.record("path", tracing::field::display(&path_id));
5413 if !self.is_multipath_negotiated() {
5414 return Err(TransportError::PROTOCOL_VIOLATION(
5415 "received MAX_PATH_ID frame when multipath was not negotiated",
5416 ));
5417 }
5418 if path_id > self.remote_max_path_id {
5420 self.remote_max_path_id = path_id;
5421 self.issue_first_path_cids(now);
5422 self.open_nat_traversed_paths(now);
5423 }
5424 }
5425 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
5426 if self.is_multipath_negotiated() {
5430 if max_path_id > self.local_max_path_id {
5431 return Err(TransportError::PROTOCOL_VIOLATION(
5432 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
5433 ));
5434 }
5435 debug!("received PATHS_BLOCKED({:?})", max_path_id);
5436 } else {
5438 return Err(TransportError::PROTOCOL_VIOLATION(
5439 "received PATHS_BLOCKED frame when not multipath was not negotiated",
5440 ));
5441 }
5442 }
5443 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
5444 if self.is_multipath_negotiated() {
5452 if path_id > self.local_max_path_id {
5453 return Err(TransportError::PROTOCOL_VIOLATION(
5454 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
5455 ));
5456 }
5457 if next_seq.0
5458 > self
5459 .local_cid_state
5460 .get(&path_id)
5461 .map(|cid_state| cid_state.active_seq().1 + 1)
5462 .unwrap_or_default()
5463 {
5464 return Err(TransportError::PROTOCOL_VIOLATION(
5465 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
5466 ));
5467 }
5468 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
5469 } else {
5470 return Err(TransportError::PROTOCOL_VIOLATION(
5471 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
5472 ));
5473 }
5474 }
5475 Frame::AddAddress(addr) => {
5476 let client_state = match self.n0_nat_traversal.client_side_mut() {
5477 Ok(state) => state,
5478 Err(err) => {
5479 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5480 "Nat traversal(ADD_ADDRESS): {err}"
5481 )));
5482 }
5483 };
5484
5485 if !client_state.check_remote_address(&addr) {
5486 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
5488 }
5489
5490 match client_state.add_remote_address(addr) {
5491 Ok(maybe_added) => {
5492 if let Some(added) = maybe_added {
5493 self.events.push_back(Event::NatTraversal(
5494 n0_nat_traversal::Event::AddressAdded(added),
5495 ));
5496 }
5497 }
5498 Err(e) => {
5499 warn!(%e, "failed to add remote address")
5500 }
5501 }
5502 }
5503 Frame::RemoveAddress(addr) => {
5504 let client_state = match self.n0_nat_traversal.client_side_mut() {
5505 Ok(state) => state,
5506 Err(err) => {
5507 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5508 "Nat traversal(REMOVE_ADDRESS): {err}"
5509 )));
5510 }
5511 };
5512 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
5513 self.events.push_back(Event::NatTraversal(
5514 n0_nat_traversal::Event::AddressRemoved(removed_addr),
5515 ));
5516 }
5517 }
5518 Frame::ReachOut(reach_out) => {
5519 let ipv6 = self.is_ipv6();
5520 let server_state = match self.n0_nat_traversal.server_side_mut() {
5521 Ok(state) => state,
5522 Err(err) => {
5523 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5524 "Nat traversal(REACH_OUT): {err}"
5525 )));
5526 }
5527 };
5528
5529 let round_before = server_state.current_round();
5530
5531 if let Err(err) = server_state.handle_reach_out(reach_out, ipv6) {
5532 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5533 "Nat traversal(REACH_OUT): {err}"
5534 )));
5535 }
5536
5537 if server_state.current_round() > round_before {
5538 if let Some(delay) =
5540 self.n0_nat_traversal.retry_delay(self.config.initial_rtt)
5541 {
5542 self.timers.set(
5543 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
5544 now + delay,
5545 self.qlog.with_time(now),
5546 );
5547 }
5548 }
5549 }
5550 }
5551 }
5552
5553 let space = self.spaces[SpaceId::Data].for_path(path_id);
5554 if space
5555 .pending_acks
5556 .packet_received(now, number, ack_eliciting, &space.dedup)
5557 {
5558 if self.abandoned_paths.contains(&path_id) {
5559 space.pending_acks.set_immediate_ack_required();
5562 } else {
5563 self.timers.set(
5564 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
5565 now + self.ack_frequency.max_ack_delay,
5566 self.qlog.with_time(now),
5567 );
5568 }
5569 }
5570
5571 let pending = &mut self.spaces[SpaceId::Data].pending;
5576 self.streams.queue_max_stream_id(pending);
5577
5578 if let Some(reason) = close {
5579 self.state.move_to_draining(Some(reason.into()));
5580 self.connection_close_pending = true;
5581 }
5582
5583 let migrate_on_any_packet =
5586 self.is_multipath_negotiated() && !self.n0_nat_traversal.is_negotiated();
5587
5588 let is_largest_received_pn = Some(number)
5590 == self.spaces[SpaceId::Data]
5591 .for_path(path_id)
5592 .largest_received_packet_number;
5593
5594 if (migrate_on_any_packet || !is_probing_packet)
5599 && is_largest_received_pn
5600 && self.local_ip_may_migrate()
5601 && let Some(new_local_ip) = network_path.local_ip
5602 {
5603 let path_data = self.path_data_mut(path_id);
5604 if path_data
5605 .network_path
5606 .local_ip
5607 .is_some_and(|ip| ip != new_local_ip)
5608 {
5609 debug!(
5610 %path_id,
5611 new_4tuple = %network_path,
5612 prev_4tuple = %path_data.network_path,
5613 "local address passive migration"
5614 );
5615 }
5616 path_data.network_path.local_ip = Some(new_local_ip)
5617 }
5618
5619 if (migrate_on_any_packet || !is_probing_packet)
5621 && is_largest_received_pn
5622 && network_path.remote != self.path_data(path_id).network_path.remote
5623 && self.remote_may_migrate()
5624 {
5625 self.migrate(path_id, now, network_path, migration_observed_addr);
5626 self.update_remote_cid(path_id);
5628 self.spin = false;
5629 }
5630
5631 Ok(())
5632 }
5633
5634 fn open_nat_traversed_paths(&mut self, now: Instant) {
5636 while let Some(network_path) = self
5637 .n0_nat_traversal
5638 .client_side_mut()
5639 .ok()
5640 .and_then(|s| s.pop_pending_path_open())
5641 {
5642 match self.open_path_ensure(network_path, PathStatus::Backup, now) {
5643 Ok((path_id, already_existed)) => {
5644 debug!(
5645 %path_id,
5646 ?network_path,
5647 new_path = !already_existed,
5648 "Opened NAT traversal path",
5649 );
5650 }
5651 Err(err) => match err {
5652 PathError::MultipathNotNegotiated
5653 | PathError::ServerSideNotAllowed
5654 | PathError::ValidationFailed
5655 | PathError::InvalidRemoteAddress(_) => {
5656 error!(
5657 ?err,
5658 ?network_path,
5659 "Failed to open path for successful NAT traversal"
5660 );
5661 }
5662 PathError::MaxPathIdReached | PathError::RemoteCidsExhausted => {
5663 self.n0_nat_traversal
5665 .client_side_mut()
5666 .map(|s| s.push_pending_path_open(network_path))
5667 .ok();
5668 debug!(
5669 ?err,
5670 ?network_path,
5671 "Blocked opening NAT traversal path, enqueued"
5672 );
5673 return;
5674 }
5675 },
5676 }
5677 }
5678 }
5679
5680 fn migrate(
5685 &mut self,
5686 path_id: PathId,
5687 now: Instant,
5688 network_path: FourTuple,
5689 observed_addr: Option<ObservedAddr>,
5690 ) {
5691 trace!(
5692 new_4tuple = %network_path,
5693 prev_4tuple = %self.path_data(path_id).network_path,
5694 %path_id,
5695 "migration initiated",
5696 );
5697 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
5698 let prev_pto = self.pto(SpaceKind::Data, path_id);
5705 let path = self.paths.get_mut(&path_id).expect("known path");
5706 let mut new_path_data = if network_path.remote.is_ipv4()
5707 && network_path.remote.ip() == path.data.network_path.remote.ip()
5708 {
5709 PathData::from_previous(network_path, &path.data, self.path_generation_counter, now)
5710 } else {
5711 let peer_max_udp_payload_size =
5712 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
5713 .unwrap_or(u16::MAX);
5714 PathData::new(
5715 network_path,
5716 self.allow_mtud,
5717 Some(peer_max_udp_payload_size),
5718 self.path_generation_counter,
5719 now,
5720 &self.config,
5721 )
5722 };
5723 new_path_data.last_observed_addr_report = path.data.last_observed_addr_report.clone();
5724 if let Some(report) = observed_addr
5725 && let Some(updated) = new_path_data.update_observed_addr_report(report)
5726 {
5727 tracing::info!("adding observed addr event from migration");
5728 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5729 id: path_id,
5730 addr: updated,
5731 }));
5732 }
5733 new_path_data.pending_on_path_challenge = true;
5734
5735 let mut prev_path_data = mem::replace(&mut path.data, new_path_data);
5736
5737 if !prev_path_data.validated
5746 && let Some(cid) = self.remote_cids.get(&path_id).map(CidQueue::active)
5747 {
5748 prev_path_data.pending_on_path_challenge = true;
5749 path.prev = Some((cid, prev_path_data));
5752 }
5753
5754 self.qlog.emit_tuple_assigned(path_id, network_path, now);
5756
5757 self.timers.set(
5758 Timer::PerPath(path_id, PathTimer::PathValidationFailed),
5759 now + 3 * cmp::max(self.pto(SpaceKind::Data, path_id), prev_pto),
5760 self.qlog.with_time(now),
5761 );
5762 }
5763
5764 pub fn handle_network_change(&mut self, hint: Option<&dyn NetworkChangeHint>, now: Instant) {
5781 debug!("network changed");
5782 if self.state.is_drained() {
5783 return;
5784 }
5785 if self.highest_space < SpaceKind::Data {
5786 for path in self.paths.values_mut() {
5787 path.data.network_path.local_ip = None;
5789 }
5790
5791 self.update_remote_cid(PathId::ZERO);
5792 self.ping();
5793
5794 return;
5795 }
5796
5797 let mut non_recoverable_paths = Vec::default();
5800 let mut recoverable_paths = Vec::default();
5801 let mut open_paths = 0;
5802
5803 let is_multipath_negotiated = self.is_multipath_negotiated();
5804 let is_client = self.side().is_client();
5805 let immediate_ack_allowed = self.peer_supports_ack_frequency();
5806
5807 for (path_id, path) in self.paths.iter_mut() {
5808 if self.abandoned_paths.contains(path_id) {
5809 continue;
5810 }
5811 open_paths += 1;
5812
5813 let network_path = path.data.network_path;
5816
5817 path.data.network_path.local_ip = None;
5820 let remote = network_path.remote;
5821
5822 let attempt_to_recover = if is_multipath_negotiated {
5826 hint.map(|h| h.is_path_recoverable(*path_id, network_path))
5830 .unwrap_or(!is_client)
5831 } else {
5832 true
5834 };
5835
5836 if attempt_to_recover {
5837 recoverable_paths.push((*path_id, remote));
5838 } else {
5839 non_recoverable_paths.push((*path_id, remote, path.data.local_status()))
5840 }
5841 }
5842
5843 let open_first = open_paths == non_recoverable_paths.len();
5852
5853 for (path_id, remote, status) in non_recoverable_paths.into_iter() {
5854 let network_path = FourTuple {
5855 remote,
5856 local_ip: None, };
5858
5859 if open_first && let Err(e) = self.open_path(network_path, status, now) {
5860 if self.side().is_client() {
5861 debug!(%e, "Failed to open new path for network change");
5862 }
5863 recoverable_paths.push((path_id, remote));
5865 continue;
5866 }
5867
5868 if let Err(e) =
5869 self.close_path_inner(now, path_id, PathAbandonReason::UnusableAfterNetworkChange)
5870 {
5871 debug!(%e,"Failed to close unrecoverable path after network change");
5872 recoverable_paths.push((path_id, remote));
5873 continue;
5874 }
5875
5876 if !open_first && let Err(e) = self.open_path(network_path, status, now) {
5877 debug!(%e,"Failed to open new path for network change");
5881 }
5882 }
5883
5884 for (path_id, remote) in recoverable_paths.into_iter() {
5887 if let Some(path_space) = self.spaces[SpaceId::Data].number_spaces.get_mut(&path_id) {
5889 path_space.ping_pending = true;
5890
5891 if immediate_ack_allowed {
5892 path_space.immediate_ack_pending = true;
5893 }
5894 }
5895
5896 if let Some(path) = self.paths.get_mut(&path_id) {
5901 path.data.pto_count = 0;
5902 }
5903 self.set_loss_detection_timer(now, path_id);
5904
5905 let Some((reset_token, retired)) =
5906 self.remote_cids.get_mut(&path_id).and_then(CidQueue::next)
5907 else {
5908 continue;
5909 };
5910
5911 self.spaces[SpaceId::Data]
5913 .pending
5914 .retire_cids
5915 .extend(retired.map(|seq| (path_id, seq)));
5916
5917 debug_assert!(!self.state.is_drained()); self.endpoint_events
5919 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5920 }
5921 }
5922
5923 fn update_remote_cid(&mut self, path_id: PathId) {
5925 let Some((reset_token, retired)) = self
5926 .remote_cids
5927 .get_mut(&path_id)
5928 .and_then(|cids| cids.next())
5929 else {
5930 return;
5931 };
5932
5933 self.spaces[SpaceId::Data]
5935 .pending
5936 .retire_cids
5937 .extend(retired.map(|seq| (path_id, seq)));
5938 let remote = self.path_data(path_id).network_path.remote;
5939 self.set_reset_token(path_id, remote, reset_token);
5940 }
5941
5942 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5951 debug_assert!(!self.state.is_drained()); self.endpoint_events
5953 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5954
5955 if path_id == PathId::ZERO {
5961 self.peer_params.stateless_reset_token = Some(reset_token);
5962 }
5963 }
5964
5965 fn issue_first_cids(&mut self, now: Instant) {
5967 if self
5968 .local_cid_state
5969 .get(&PathId::ZERO)
5970 .expect("PathId::ZERO exists when the connection is created")
5971 .cid_len()
5972 == 0
5973 {
5974 return;
5975 }
5976
5977 let mut n = self.peer_params.issue_cids_limit() - 1;
5979 if let ConnectionSide::Server { server_config } = &self.side
5980 && server_config.has_preferred_address()
5981 {
5982 n -= 1;
5984 }
5985 debug_assert!(!self.state.is_drained()); self.endpoint_events
5987 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5988 }
5989
5990 fn issue_first_path_cids(&mut self, now: Instant) {
5994 if let Some(max_path_id) = self.max_path_id() {
5995 let mut path_id = self.max_path_id_with_cids.next();
5996 while path_id <= max_path_id {
5997 self.endpoint_events
5998 .push_back(EndpointEventInner::NeedIdentifiers(
5999 path_id,
6000 now,
6001 self.peer_params.issue_cids_limit(),
6002 ));
6003 path_id = path_id.next();
6004 }
6005 self.max_path_id_with_cids = max_path_id;
6006 }
6007 }
6008
6009 fn populate_packet<'a, 'b>(
6017 &mut self,
6018 now: Instant,
6019 space_id: SpaceId,
6020 path_id: PathId,
6021 scheduling_info: &PathSchedulingInfo,
6022 builder: &mut PacketBuilder<'a, 'b>,
6023 ) {
6024 let is_multipath_negotiated = self.is_multipath_negotiated();
6025 let space_has_keys = self.crypto_state.has_keys(space_id.encryption_level());
6026 let is_0rtt = space_id == SpaceId::Data && !space_has_keys;
6027 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
6028 let space = &mut self.spaces[space_id];
6029 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6030 space
6031 .for_path(path_id)
6032 .pending_acks
6033 .maybe_ack_non_eliciting();
6034
6035 if !is_0rtt
6037 && !scheduling_info.is_abandoned
6038 && scheduling_info.may_send_data
6039 && mem::replace(&mut space.pending.handshake_done, false)
6040 {
6041 builder.write_frame(frame::HandshakeDone, stats);
6042 }
6043
6044 if !scheduling_info.is_abandoned
6046 && mem::replace(&mut space.for_path(path_id).ping_pending, false)
6047 {
6048 builder.write_frame(frame::Ping, stats);
6049 }
6050
6051 if !scheduling_info.is_abandoned
6053 && mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false)
6054 {
6055 debug_assert_eq!(
6056 space_id,
6057 SpaceId::Data,
6058 "immediate acks must be sent in the data space"
6059 );
6060 builder.write_frame(frame::ImmediateAck, stats);
6061 }
6062
6063 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
6065 for path_id in space
6066 .number_spaces
6067 .iter_mut()
6068 .filter(|(_, pns)| pns.pending_acks.can_send())
6069 .map(|(&path_id, _)| path_id)
6070 .collect::<Vec<_>>()
6071 {
6072 Self::populate_acks(
6073 now,
6074 self.receiving_ecn,
6075 path_id,
6076 space_id,
6077 space,
6078 is_multipath_negotiated,
6079 builder,
6080 stats,
6081 space_has_keys,
6082 );
6083 }
6084 }
6085
6086 if !scheduling_info.is_abandoned
6088 && scheduling_info.may_send_data
6089 && mem::replace(&mut space.pending.ack_frequency, false)
6090 {
6091 let sequence_number = self.ack_frequency.next_sequence_number();
6092
6093 let config = self.config.ack_frequency_config.as_ref().unwrap();
6095
6096 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
6098 path.rtt.get(),
6099 config,
6100 &self.peer_params,
6101 );
6102
6103 let frame = frame::AckFrequency {
6104 sequence: sequence_number,
6105 ack_eliciting_threshold: config.ack_eliciting_threshold,
6106 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
6107 reordering_threshold: config.reordering_threshold,
6108 };
6109 builder.write_frame(frame, stats);
6110
6111 self.ack_frequency
6112 .ack_frequency_sent(path_id, builder.packet_number, max_ack_delay);
6113 }
6114
6115 if !scheduling_info.is_abandoned
6117 && space_id == SpaceId::Data
6118 && path.pending_on_path_challenge
6119 && !self.state.is_closed()
6120 && builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
6121 {
6123 path.pending_on_path_challenge = false;
6124
6125 let token = self.rng.random();
6126 path.record_path_challenge_sent(now, token, path.network_path);
6127 let challenge = frame::PathChallenge(token);
6129 builder.write_frame(challenge, stats);
6130 builder.require_padding();
6131 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
6132 match path.open_status {
6133 paths::OpenStatus::Sent | paths::OpenStatus::Informed => {}
6134 paths::OpenStatus::Pending => {
6135 path.open_status = paths::OpenStatus::Sent;
6136 self.timers.set(
6137 Timer::PerPath(path_id, PathTimer::AbandonFromValidation),
6138 now + 3 * pto,
6139 self.qlog.with_time(now),
6140 );
6141 }
6142 }
6143
6144 self.timers.set(
6145 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
6146 now + pto,
6147 self.qlog.with_time(now),
6148 );
6149
6150 if is_multipath_negotiated && !path.validated && path.pending_on_path_challenge {
6151 space.pending.path_status.insert(path_id);
6153 }
6154
6155 if space_id == SpaceId::Data
6158 && self
6159 .config
6160 .address_discovery_role
6161 .should_report(&self.peer_params.address_discovery_role)
6162 {
6163 let frame = frame::ObservedAddr::new(
6164 path.network_path.remote,
6165 self.next_observed_addr_seq_no,
6166 );
6167 if builder.frame_space_remaining() > frame.size() {
6168 builder.write_frame(frame, stats);
6169
6170 self.next_observed_addr_seq_no =
6171 self.next_observed_addr_seq_no.saturating_add(1u8);
6172 path.observed_addr_sent = true;
6173
6174 space.pending.observed_addr = false;
6175 }
6176 }
6177 }
6178
6179 if !scheduling_info.is_abandoned
6181 && space_id == SpaceId::Data
6182 && builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
6183 && let Some(token) = path.path_responses.pop_on_path(path.network_path)
6184 {
6185 let response = frame::PathResponse(token);
6186 builder.write_frame(response, stats);
6187 builder.require_padding();
6188
6189 if space_id == SpaceId::Data
6193 && self
6194 .config
6195 .address_discovery_role
6196 .should_report(&self.peer_params.address_discovery_role)
6197 {
6198 let frame = frame::ObservedAddr::new(
6199 path.network_path.remote,
6200 self.next_observed_addr_seq_no,
6201 );
6202 if builder.frame_space_remaining() > frame.size() {
6203 builder.write_frame(frame, stats);
6204
6205 self.next_observed_addr_seq_no =
6206 self.next_observed_addr_seq_no.saturating_add(1u8);
6207 path.observed_addr_sent = true;
6208
6209 space.pending.observed_addr = false;
6210 }
6211 }
6212 }
6213
6214 while !scheduling_info.is_abandoned
6216 && scheduling_info.may_send_data
6217 && let Some(reach_out) = space
6218 .pending
6219 .reach_out
6220 .pop_if(|frame| builder.frame_space_remaining() >= frame.size())
6221 {
6222 builder.write_frame(reach_out, stats);
6223 }
6224
6225 if space_id == SpaceId::Data
6227 && scheduling_info.is_abandoned
6228 && scheduling_info.may_self_abandon
6229 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
6230 && let Some(error_code) = space.pending.path_abandon.remove(&path_id)
6231 {
6232 let frame = frame::PathAbandon {
6233 path_id,
6234 error_code,
6235 };
6236 builder.write_frame(frame, stats);
6237
6238 self.remote_cids.remove(&path_id);
6241 }
6242 while space_id == SpaceId::Data
6243 && scheduling_info.may_send_data
6244 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
6245 && let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
6246 {
6247 let frame = frame::PathAbandon {
6248 path_id: abandoned_path_id,
6249 error_code,
6250 };
6251 builder.write_frame(frame, stats);
6252
6253 self.remote_cids.remove(&abandoned_path_id);
6256 }
6257
6258 if !scheduling_info.is_abandoned
6260 && scheduling_info.may_send_data
6261 && space_id == SpaceId::Data
6262 && self
6263 .config
6264 .address_discovery_role
6265 .should_report(&self.peer_params.address_discovery_role)
6266 && (!path.observed_addr_sent || space.pending.observed_addr)
6267 {
6268 let frame =
6269 frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
6270 if builder.frame_space_remaining() > frame.size() {
6271 builder.write_frame(frame, stats);
6272
6273 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
6274 path.observed_addr_sent = true;
6275
6276 space.pending.observed_addr = false;
6277 }
6278 }
6279
6280 while !is_0rtt
6282 && !scheduling_info.is_abandoned
6283 && scheduling_info.may_send_data
6284 && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
6285 {
6286 let Some(mut frame) = space.pending.crypto.pop_front() else {
6287 break;
6288 };
6289
6290 let max_crypto_data_size = builder.frame_space_remaining()
6295 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
6297 - 2; let len = frame
6300 .data
6301 .len()
6302 .min(2usize.pow(14) - 1)
6303 .min(max_crypto_data_size);
6304
6305 let data = frame.data.split_to(len);
6306 let offset = frame.offset;
6307 let truncated = frame::Crypto { offset, data };
6308 builder.write_frame(truncated, stats);
6309
6310 if !frame.data.is_empty() {
6311 frame.offset += len as u64;
6312 space.pending.crypto.push_front(frame);
6313 }
6314 }
6315
6316 while space_id == SpaceId::Data
6318 && !scheduling_info.is_abandoned
6319 && scheduling_info.may_send_data
6320 && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
6321 {
6322 let Some(path_id) = space.pending.path_status.pop_first() else {
6323 break;
6324 };
6325 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
6326 trace!(%path_id, "discarding queued path status for unknown path");
6327 continue;
6328 };
6329
6330 let seq = path.status.seq();
6331 match path.local_status() {
6332 PathStatus::Available => {
6333 let frame = frame::PathStatusAvailable {
6334 path_id,
6335 status_seq_no: seq,
6336 };
6337 builder.write_frame(frame, stats);
6338 }
6339 PathStatus::Backup => {
6340 let frame = frame::PathStatusBackup {
6341 path_id,
6342 status_seq_no: seq,
6343 };
6344 builder.write_frame(frame, stats);
6345 }
6346 }
6347 }
6348
6349 if space_id == SpaceId::Data
6351 && !scheduling_info.is_abandoned
6352 && scheduling_info.may_send_data
6353 && space.pending.max_path_id
6354 && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
6355 {
6356 let frame = frame::MaxPathId(self.local_max_path_id);
6357 builder.write_frame(frame, stats);
6358 space.pending.max_path_id = false;
6359 }
6360
6361 if space_id == SpaceId::Data
6363 && !scheduling_info.is_abandoned
6364 && scheduling_info.may_send_data
6365 && space.pending.paths_blocked
6366 && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
6367 {
6368 let frame = frame::PathsBlocked(self.remote_max_path_id);
6369 builder.write_frame(frame, stats);
6370 space.pending.paths_blocked = false;
6371 }
6372
6373 while space_id == SpaceId::Data
6375 && !scheduling_info.is_abandoned
6376 && scheduling_info.may_send_data
6377 && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
6378 {
6379 let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
6380 break;
6381 };
6382 let next_seq = match self.remote_cids.get(&path_id) {
6383 Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
6384 None => VarInt(0),
6385 };
6386 let frame = frame::PathCidsBlocked { path_id, next_seq };
6387 builder.write_frame(frame, stats);
6388 }
6389
6390 if space_id == SpaceId::Data
6392 && !scheduling_info.is_abandoned
6393 && scheduling_info.may_send_data
6394 {
6395 self.streams
6396 .write_control_frames(builder, &mut space.pending, stats);
6397 }
6398
6399 let cid_len = self
6401 .local_cid_state
6402 .values()
6403 .map(|cid_state| cid_state.cid_len())
6404 .max()
6405 .expect("some local CID state must exist");
6406 let new_cid_size_bound =
6407 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
6408 while !scheduling_info.is_abandoned
6409 && scheduling_info.may_send_data
6410 && builder.frame_space_remaining() > new_cid_size_bound
6411 {
6412 let Some(issued) = space.pending.new_cids.pop() else {
6413 break;
6414 };
6415 let Some(cid_state) = self.local_cid_state.get(&issued.path_id) else {
6417 debug!(
6418 path = %issued.path_id, seq = issued.sequence,
6419 "dropping queued NEW_CONNECTION_ID for discarded path",
6420 );
6421 continue;
6422 };
6423 let retire_prior_to = cid_state.retire_prior_to();
6424
6425 let cid_path_id = match is_multipath_negotiated {
6426 true => Some(issued.path_id),
6427 false => {
6428 debug_assert_eq!(issued.path_id, PathId::ZERO);
6429 None
6430 }
6431 };
6432 let frame = frame::NewConnectionId {
6433 path_id: cid_path_id,
6434 sequence: issued.sequence,
6435 retire_prior_to,
6436 id: issued.id,
6437 reset_token: issued.reset_token,
6438 };
6439 builder.write_frame(frame, stats);
6440 }
6441
6442 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
6444 while !scheduling_info.is_abandoned
6445 && scheduling_info.may_send_data
6446 && builder.frame_space_remaining() > retire_cid_bound
6447 {
6448 let (path_id, sequence) = match space.pending.retire_cids.pop() {
6449 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
6450 Some((path_id, seq)) => (Some(path_id), seq),
6451 None => break,
6452 };
6453 let frame = frame::RetireConnectionId { path_id, sequence };
6454 builder.write_frame(frame, stats);
6455 }
6456
6457 let mut sent_datagrams = false;
6459 while !scheduling_info.is_abandoned
6460 && scheduling_info.may_send_data
6461 && builder.frame_space_remaining() > Datagram::SIZE_BOUND
6462 && space_id == SpaceId::Data
6463 {
6464 match self.datagrams.write(builder, stats) {
6465 true => {
6466 sent_datagrams = true;
6467 }
6468 false => break,
6469 }
6470 }
6471 if self.datagrams.send_blocked && sent_datagrams {
6472 self.events.push_back(Event::DatagramsUnblocked);
6473 self.datagrams.send_blocked = false;
6474 }
6475
6476 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6477
6478 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
6480 while let Some(network_path) = space.pending.new_tokens.pop() {
6481 debug_assert_eq!(space_id, SpaceId::Data);
6482 let ConnectionSide::Server { server_config } = &self.side else {
6483 panic!("NEW_TOKEN frames should not be enqueued by clients");
6484 };
6485
6486 if !network_path.is_probably_same_path(&path.network_path) {
6487 continue;
6492 }
6493
6494 let token = Token::new(
6495 TokenPayload::Validation {
6496 ip: network_path.remote.ip(),
6497 issued: server_config.time_source.now(),
6498 },
6499 &mut self.rng,
6500 );
6501 let new_token = NewToken {
6502 token: token.encode(&*server_config.token_key).into(),
6503 };
6504
6505 if builder.frame_space_remaining() < new_token.size() {
6506 space.pending.new_tokens.push(network_path);
6507 break;
6508 }
6509
6510 builder.write_frame(new_token, stats);
6511 builder.retransmits_mut().new_tokens.push(network_path);
6512 }
6513 }
6514
6515 while space_id == SpaceId::Data
6517 && !scheduling_info.is_abandoned
6518 && scheduling_info.may_send_data
6519 && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
6520 {
6521 if let Some(added_address) = space.pending.add_address.pop_last() {
6522 builder.write_frame(added_address, stats);
6523 } else {
6524 break;
6525 }
6526 }
6527
6528 while space_id == SpaceId::Data
6530 && !scheduling_info.is_abandoned
6531 && scheduling_info.may_send_data
6532 && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
6533 {
6534 if let Some(removed_address) = space.pending.remove_address.pop_last() {
6535 builder.write_frame(removed_address, stats);
6536 } else {
6537 break;
6538 }
6539 }
6540
6541 if !scheduling_info.is_abandoned
6543 && scheduling_info.may_send_data
6544 && space_id == SpaceId::Data
6545 {
6546 self.streams
6547 .write_stream_frames(builder, self.config.send_fairness, stats);
6548 }
6549 }
6550
6551 fn populate_acks<'a, 'b>(
6553 now: Instant,
6554 receiving_ecn: bool,
6555 path_id: PathId,
6556 space_id: SpaceId,
6557 space: &mut PacketSpace,
6558 is_multipath_negotiated: bool,
6559 builder: &mut PacketBuilder<'a, 'b>,
6560 stats: &mut FrameStats,
6561 space_has_keys: bool,
6562 ) {
6563 debug_assert!(space_has_keys, "tried to send ACK in 0-RTT");
6565
6566 debug_assert!(
6567 is_multipath_negotiated || path_id == PathId::ZERO,
6568 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
6569 );
6570 if is_multipath_negotiated {
6571 debug_assert!(
6572 space_id == SpaceId::Data || path_id == PathId::ZERO,
6573 "path acks must be sent in 1RTT space (have {space_id:?})"
6574 );
6575 }
6576
6577 let pns = space.for_path(path_id);
6578 let ranges = pns.pending_acks.ranges();
6579 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
6580 let ecn = if receiving_ecn {
6581 Some(&pns.ecn_counters)
6582 } else {
6583 None
6584 };
6585
6586 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
6587 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
6589 let delay = delay_micros >> ack_delay_exp.into_inner();
6590
6591 if is_multipath_negotiated && space_id == SpaceId::Data {
6592 if !ranges.is_empty() {
6593 let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
6594 builder.write_frame(frame, stats);
6595 }
6596 } else {
6597 builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
6598 }
6599 }
6600
6601 fn close_common(&mut self) {
6602 trace!("connection closed");
6603 self.timers.reset();
6604 }
6605
6606 fn set_close_timer(&mut self, now: Instant) {
6607 let pto_max = self.max_pto_for_space(self.highest_space);
6610 self.timers.set(
6611 Timer::Conn(ConnTimer::Close),
6612 now + 3 * pto_max,
6613 self.qlog.with_time(now),
6614 );
6615 }
6616
6617 fn handle_peer_params(
6622 &mut self,
6623 params: TransportParameters,
6624 local_cid: ConnectionId,
6625 remote_cid: ConnectionId,
6626 now: Instant,
6627 ) -> Result<(), TransportError> {
6628 if Some(self.original_remote_cid) != params.initial_src_cid
6629 || (self.side.is_client()
6630 && (Some(self.initial_dst_cid) != params.original_dst_cid
6631 || self.retry_src_cid != params.retry_src_cid))
6632 {
6633 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
6634 "CID authentication failure",
6635 ));
6636 }
6637 if params.initial_max_path_id.is_some() && (local_cid.is_empty() || remote_cid.is_empty()) {
6638 return Err(TransportError::PROTOCOL_VIOLATION(
6639 "multipath must not use zero-length CIDs",
6640 ));
6641 }
6642
6643 self.set_peer_params(params);
6644 self.qlog.emit_peer_transport_params_received(self, now);
6645
6646 Ok(())
6647 }
6648
6649 fn set_peer_params(&mut self, params: TransportParameters) {
6650 self.streams.set_params(¶ms);
6651 self.idle_timeout =
6652 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
6653 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
6654
6655 if let Some(ref info) = params.preferred_address {
6656 self.remote_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
6658 path_id: None,
6659 sequence: 1,
6660 id: info.connection_id,
6661 reset_token: info.stateless_reset_token,
6662 retire_prior_to: 0,
6663 })
6664 .expect(
6665 "preferred address CID is the first received, and hence is guaranteed to be legal",
6666 );
6667 let remote = self.path_data(PathId::ZERO).network_path.remote;
6668 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
6669 }
6670 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
6671
6672 let mut multipath_enabled = false;
6673 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
6674 self.config.get_initial_max_path_id(),
6675 params.initial_max_path_id,
6676 ) {
6677 self.local_max_path_id = local_max_path_id;
6679 self.remote_max_path_id = remote_max_path_id;
6680 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
6681 debug!(%initial_max_path_id, "multipath negotiated");
6682 multipath_enabled = true;
6683 }
6684
6685 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
6686 self.config
6687 .max_remote_nat_traversal_addresses
6688 .zip(params.max_remote_nat_traversal_addresses)
6689 {
6690 if multipath_enabled {
6691 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
6692 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
6693 self.n0_nat_traversal = n0_nat_traversal::State::new(
6694 max_remote_addresses,
6695 max_local_addresses,
6696 self.side(),
6697 );
6698 debug!(
6699 %max_remote_addresses, %max_local_addresses,
6700 "n0's nat traversal negotiated"
6701 );
6702 } else {
6703 debug!("n0 nat traversal enabled for both endpoints, but multipath is missing")
6704 }
6705 }
6706
6707 self.peer_params = params;
6708 let peer_max_udp_payload_size =
6709 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
6710 self.path_data_mut(PathId::ZERO)
6711 .mtud
6712 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
6713 }
6714
6715 fn decrypt_packet(
6717 &mut self,
6718 now: Instant,
6719 path_id: PathId,
6720 packet: &mut Packet,
6721 ) -> Result<Option<u64>, Option<TransportError>> {
6722 let result = self
6723 .crypto_state
6724 .decrypt_packet_body(packet, path_id, &self.spaces)?;
6725
6726 let Some(result) = result else {
6727 return Ok(None);
6728 };
6729
6730 if result.outgoing_key_update_acked
6731 && let Some(prev) = self.crypto_state.prev_crypto.as_mut()
6732 {
6733 prev.end_packet = Some((result.packet_number, now));
6734 self.set_key_discard_timer(now, packet.header.space());
6735 }
6736
6737 if result.incoming_key_update {
6738 trace!("key update authenticated");
6739 self.crypto_state
6740 .update_keys(Some((result.packet_number, now)), true);
6741 self.set_key_discard_timer(now, packet.header.space());
6742 }
6743
6744 Ok(Some(result.packet_number))
6745 }
6746
6747 fn peer_supports_ack_frequency(&self) -> bool {
6748 self.peer_params.min_ack_delay.is_some()
6749 }
6750
6751 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
6756 debug_assert_eq!(
6757 self.highest_space,
6758 SpaceKind::Data,
6759 "immediate ack must be written in the data space"
6760 );
6761 self.spaces[SpaceId::Data]
6762 .for_path(path_id)
6763 .immediate_ack_pending = true;
6764 }
6765
6766 #[cfg(test)]
6768 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
6769 let ConnectionEventInner::Datagram(DatagramConnectionEvent {
6770 path_id,
6771 first_decode,
6772 remaining,
6773 ..
6774 }) = &event.0
6775 else {
6776 return None;
6777 };
6778
6779 if remaining.is_some() {
6780 panic!("Packets should never be coalesced in tests");
6781 }
6782
6783 let decrypted_header = self
6784 .crypto_state
6785 .unprotect_header(first_decode.clone(), self.peer_params.stateless_reset_token)?;
6786
6787 let mut packet = decrypted_header.packet?;
6788 self.crypto_state
6789 .decrypt_packet_body(&mut packet, *path_id, &self.spaces)
6790 .ok()?;
6791
6792 Some(packet.payload.to_vec())
6793 }
6794
6795 #[cfg(test)]
6798 pub(crate) fn bytes_in_flight(&self) -> u64 {
6799 self.path_data(PathId::ZERO).in_flight.bytes
6801 }
6802
6803 #[cfg(test)]
6805 pub(crate) fn congestion_window(&self) -> u64 {
6806 let path = self.path_data(PathId::ZERO);
6807 path.congestion
6808 .window()
6809 .saturating_sub(path.in_flight.bytes)
6810 }
6811
6812 #[cfg(test)]
6814 pub(crate) fn is_idle(&self) -> bool {
6815 let current_timers = self.timers.values();
6816 current_timers
6817 .into_iter()
6818 .filter(|(timer, _)| {
6819 !matches!(
6820 timer,
6821 Timer::Conn(ConnTimer::KeepAlive)
6822 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6823 | Timer::Conn(ConnTimer::PushNewCid)
6824 | Timer::Conn(ConnTimer::KeyDiscard)
6825 )
6826 })
6827 .min_by_key(|(_, time)| *time)
6828 .is_none_or(|(timer, _)| {
6829 matches!(
6830 timer,
6831 Timer::Conn(ConnTimer::Idle) | Timer::PerPath(_, PathTimer::PathIdle)
6832 )
6833 })
6834 }
6835
6836 #[cfg(test)]
6838 pub(crate) fn using_ecn(&self) -> bool {
6839 self.path_data(PathId::ZERO).sending_ecn
6840 }
6841
6842 #[cfg(test)]
6844 pub(crate) fn total_recvd(&self) -> u64 {
6845 self.path_data(PathId::ZERO).total_recvd
6846 }
6847
6848 #[cfg(test)]
6849 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6850 self.local_cid_state
6851 .get(&PathId::ZERO)
6852 .unwrap()
6853 .active_seq()
6854 }
6855
6856 #[cfg(test)]
6857 #[track_caller]
6858 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6859 self.local_cid_state
6860 .get(&PathId(path_id))
6861 .unwrap()
6862 .active_seq()
6863 }
6864
6865 #[cfg(test)]
6868 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6869 let n = self
6870 .local_cid_state
6871 .get_mut(&PathId::ZERO)
6872 .unwrap()
6873 .assign_retire_seq(v);
6874 debug_assert!(!self.state.is_drained()); self.endpoint_events
6876 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6877 }
6878
6879 #[cfg(test)]
6881 pub(crate) fn active_remote_cid_seq(&self) -> u64 {
6882 self.remote_cids.get(&PathId::ZERO).unwrap().active_seq()
6883 }
6884
6885 #[cfg(test)]
6887 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6888 self.path_data(path_id).current_mtu()
6889 }
6890
6891 #[cfg(test)]
6893 pub(crate) fn trigger_path_validation(&mut self) {
6894 for path in self.paths.values_mut() {
6895 path.data.pending_on_path_challenge = true;
6896 }
6897 }
6898
6899 #[cfg(test)]
6901 pub fn simulate_protocol_violation(&mut self, now: Instant) {
6902 if !self.state.is_closed() {
6903 self.state
6904 .move_to_closed(TransportError::PROTOCOL_VIOLATION("simulated violation"));
6905 self.close_common();
6906 if !self.state.is_drained() {
6907 self.set_close_timer(now);
6908 }
6909 self.connection_close_pending = true;
6910 }
6911 }
6912
6913 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6924 let space_specific = self.paths.get(&path_id).is_some_and(|path| {
6925 path.data.pending_on_path_challenge || !path.data.path_responses.is_empty()
6926 });
6927
6928 let other = self.streams.can_send_stream_data()
6930 || self
6931 .datagrams
6932 .outgoing
6933 .front()
6934 .is_some_and(|x| x.size(true) <= max_size);
6935
6936 SendableFrames {
6938 acks: false,
6939 close: false,
6940 space_specific,
6941 other,
6942 }
6943 }
6944
6945 fn kill(&mut self, reason: ConnectionError) {
6947 self.close_common();
6948 self.state.move_to_drained(Some(reason));
6949 self.endpoint_events.push_back(EndpointEventInner::Drained);
6952 }
6953
6954 pub fn current_mtu(&self) -> u16 {
6961 self.paths
6962 .iter()
6963 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6964 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6965 .min()
6966 .unwrap_or(INITIAL_MTU)
6967 }
6968
6969 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6976 let pn_len = PacketNumber::new(
6977 pn,
6978 self.spaces[SpaceId::Data]
6979 .for_path(path)
6980 .largest_acked_packet_pn
6981 .unwrap_or(0),
6982 )
6983 .len();
6984
6985 1 + self
6987 .remote_cids
6988 .get(&path)
6989 .map(|cids| cids.active().len())
6990 .unwrap_or(20) + pn_len
6992 + self.tag_len_1rtt()
6993 }
6994
6995 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6996 let pn_len = 4;
6997
6998 let cid_len = self
6999 .remote_cids
7000 .values()
7001 .map(|cids| cids.active().len())
7002 .max()
7003 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
7007 }
7008
7009 fn tag_len_1rtt(&self) -> usize {
7010 let packet_crypto = self
7012 .crypto_state
7013 .encryption_keys(SpaceKind::Data, self.side.side())
7014 .map(|(_header, packet, _level)| packet);
7015 packet_crypto.map_or(16, |x| x.tag_len())
7019 }
7020
7021 fn on_path_validated(&mut self, path_id: PathId) {
7023 self.path_data_mut(path_id).validated = true;
7024 let ConnectionSide::Server { server_config } = &self.side else {
7025 return;
7026 };
7027 let network_path = self.path_data(path_id).network_path;
7028 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
7029 new_tokens.clear();
7030 for _ in 0..server_config.validation_token.sent {
7031 new_tokens.push(network_path);
7032 }
7033 }
7034
7035 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
7037 if let Some(path) = self.paths.get_mut(&path_id) {
7038 path.data.status.remote_update(status, status_seq_no);
7039 } else {
7040 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
7041 }
7042 self.events.push_back(
7043 PathEvent::RemoteStatus {
7044 id: path_id,
7045 status,
7046 }
7047 .into(),
7048 );
7049 }
7050
7051 fn max_path_id(&self) -> Option<PathId> {
7060 if self.is_multipath_negotiated() {
7061 Some(self.remote_max_path_id.min(self.local_max_path_id))
7062 } else {
7063 None
7064 }
7065 }
7066
7067 fn is_ipv6(&self) -> bool {
7072 self.paths
7073 .values()
7074 .any(|p| p.data.network_path.remote.is_ipv6())
7075 }
7076
7077 pub fn add_nat_traversal_address(
7079 &mut self,
7080 address: SocketAddr,
7081 ) -> Result<(), n0_nat_traversal::Error> {
7082 if let Some(added) = self.n0_nat_traversal.add_local_address(address)? {
7083 self.spaces[SpaceId::Data].pending.add_address.insert(added);
7084 };
7085 Ok(())
7086 }
7087
7088 pub fn remove_nat_traversal_address(
7092 &mut self,
7093 address: SocketAddr,
7094 ) -> Result<(), n0_nat_traversal::Error> {
7095 if let Some(removed) = self.n0_nat_traversal.remove_local_address(address)? {
7096 self.spaces[SpaceId::Data]
7097 .pending
7098 .remove_address
7099 .insert(removed);
7100 }
7101 Ok(())
7102 }
7103
7104 pub fn get_local_nat_traversal_addresses(
7106 &self,
7107 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7108 self.n0_nat_traversal.get_local_nat_traversal_addresses()
7109 }
7110
7111 pub fn get_remote_nat_traversal_addresses(
7113 &self,
7114 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7115 Ok(self
7116 .n0_nat_traversal
7117 .client_side()?
7118 .get_remote_nat_traversal_addresses())
7119 }
7120
7121 pub fn initiate_nat_traversal_round(
7133 &mut self,
7134 now: Instant,
7135 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7136 if self.state.is_closed() {
7137 return Err(n0_nat_traversal::Error::Closed);
7138 }
7139
7140 let ipv6 = self.is_ipv6();
7141 let client_state = self.n0_nat_traversal.client_side_mut()?;
7142 let (mut reach_out_frames, probed_addrs) =
7143 client_state.initiate_nat_traversal_round(ipv6)?;
7144 if let Some(delay) = self.n0_nat_traversal.retry_delay(self.config.initial_rtt) {
7145 self.timers.set(
7146 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
7147 now + delay,
7148 self.qlog.with_time(now),
7149 );
7150 }
7151
7152 self.spaces[SpaceId::Data]
7153 .pending
7154 .reach_out
7155 .append(&mut reach_out_frames);
7156
7157 Ok(probed_addrs)
7158 }
7159
7160 fn is_handshake_confirmed(&self) -> bool {
7169 !self.is_handshaking() && !self.crypto_state.has_keys(EncryptionLevel::Handshake)
7170 }
7171}
7172
7173impl fmt::Debug for Connection {
7174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
7175 f.debug_struct("Connection")
7176 .field("handshake_cid", &self.handshake_cid)
7177 .finish()
7178 }
7179}
7180
7181pub trait NetworkChangeHint: std::fmt::Debug + 'static {
7183 fn is_path_recoverable(&self, path_id: PathId, network_path: FourTuple) -> bool;
7192}
7193
7194#[derive(Debug)]
7196enum PollPathSpaceStatus {
7197 NothingToSend {
7199 congestion_blocked: bool,
7201 },
7202 WrotePacket {
7204 last_packet_number: u64,
7206 pad_datagram: PadDatagram,
7220 },
7221 Send {
7228 last_packet_number: u64,
7230 },
7231}
7232
7233#[derive(Debug, Copy, Clone)]
7239struct PathSchedulingInfo {
7240 is_abandoned: bool,
7246 may_send_data: bool,
7264 may_send_close: bool,
7270 may_self_abandon: bool,
7271}
7272
7273#[derive(Debug, Copy, Clone, PartialEq, Eq)]
7274enum PathBlocked {
7275 No,
7276 AntiAmplification,
7277 Congestion,
7278 Pacing,
7279}
7280
7281enum ConnectionSide {
7283 Client {
7284 token: Bytes,
7286 token_store: Arc<dyn TokenStore>,
7287 server_name: String,
7288 },
7289 Server {
7290 server_config: Arc<ServerConfig>,
7291 },
7292}
7293
7294impl ConnectionSide {
7295 fn is_client(&self) -> bool {
7296 self.side().is_client()
7297 }
7298
7299 fn is_server(&self) -> bool {
7300 self.side().is_server()
7301 }
7302
7303 fn side(&self) -> Side {
7304 match *self {
7305 Self::Client { .. } => Side::Client,
7306 Self::Server { .. } => Side::Server,
7307 }
7308 }
7309}
7310
7311impl From<SideArgs> for ConnectionSide {
7312 fn from(side: SideArgs) -> Self {
7313 match side {
7314 SideArgs::Client {
7315 token_store,
7316 server_name,
7317 } => Self::Client {
7318 token: token_store.take(&server_name).unwrap_or_default(),
7319 token_store,
7320 server_name,
7321 },
7322 SideArgs::Server {
7323 server_config,
7324 pref_addr_cid: _,
7325 path_validated: _,
7326 } => Self::Server { server_config },
7327 }
7328 }
7329}
7330
7331pub(crate) enum SideArgs {
7333 Client {
7334 token_store: Arc<dyn TokenStore>,
7335 server_name: String,
7336 },
7337 Server {
7338 server_config: Arc<ServerConfig>,
7339 pref_addr_cid: Option<ConnectionId>,
7340 path_validated: bool,
7341 },
7342}
7343
7344impl SideArgs {
7345 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
7346 match *self {
7347 Self::Client { .. } => None,
7348 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
7349 }
7350 }
7351
7352 pub(crate) fn path_validated(&self) -> bool {
7353 match *self {
7354 Self::Client { .. } => true,
7355 Self::Server { path_validated, .. } => path_validated,
7356 }
7357 }
7358
7359 pub(crate) fn side(&self) -> Side {
7360 match *self {
7361 Self::Client { .. } => Side::Client,
7362 Self::Server { .. } => Side::Server,
7363 }
7364 }
7365}
7366
7367#[derive(Debug, Error, Clone, PartialEq, Eq)]
7369pub enum ConnectionError {
7370 #[error("peer doesn't implement any supported version")]
7372 VersionMismatch,
7373 #[error(transparent)]
7375 TransportError(#[from] TransportError),
7376 #[error("aborted by peer: {0}")]
7378 ConnectionClosed(frame::ConnectionClose),
7379 #[error("closed by peer: {0}")]
7381 ApplicationClosed(frame::ApplicationClose),
7382 #[error("reset by peer")]
7384 Reset,
7385 #[error("timed out")]
7391 TimedOut,
7392 #[error("closed")]
7394 LocallyClosed,
7395 #[error("CIDs exhausted")]
7399 CidsExhausted,
7400}
7401
7402impl From<Close> for ConnectionError {
7403 fn from(x: Close) -> Self {
7404 match x {
7405 Close::Connection(reason) => Self::ConnectionClosed(reason),
7406 Close::Application(reason) => Self::ApplicationClosed(reason),
7407 }
7408 }
7409}
7410
7411impl From<ConnectionError> for io::Error {
7413 fn from(x: ConnectionError) -> Self {
7414 use ConnectionError::*;
7415 let kind = match x {
7416 TimedOut => io::ErrorKind::TimedOut,
7417 Reset => io::ErrorKind::ConnectionReset,
7418 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
7419 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
7420 io::ErrorKind::Other
7421 }
7422 };
7423 Self::new(kind, x)
7424 }
7425}
7426
7427#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
7430pub enum PathError {
7431 #[error("multipath extension not negotiated")]
7433 MultipathNotNegotiated,
7434 #[error("the server side may not open a path")]
7436 ServerSideNotAllowed,
7437 #[error("maximum number of concurrent paths reached")]
7439 MaxPathIdReached,
7440 #[error("remoted CIDs exhausted")]
7442 RemoteCidsExhausted,
7443 #[error("path validation failed")]
7445 ValidationFailed,
7446 #[error("invalid remote address")]
7448 InvalidRemoteAddress(SocketAddr),
7449}
7450
7451#[derive(Debug, Error, Clone, Eq, PartialEq)]
7453pub enum ClosePathError {
7454 #[error("Multipath extension not negotiated")]
7456 MultipathNotNegotiated,
7457 #[error("closed path")]
7459 ClosedPath,
7460 #[error("last open path")]
7464 LastOpenPath,
7465}
7466
7467#[derive(Debug, Error, Clone, Copy)]
7469#[error("Multipath extension not negotiated")]
7470pub struct MultipathNotNegotiated {
7471 _private: (),
7472}
7473
7474#[derive(Debug)]
7476pub enum Event {
7477 HandshakeDataReady,
7479 Connected,
7481 HandshakeConfirmed,
7483 ConnectionLost {
7490 reason: ConnectionError,
7492 },
7493 Stream(StreamEvent),
7495 DatagramReceived,
7497 DatagramsUnblocked,
7499 Path(PathEvent),
7501 NatTraversal(n0_nat_traversal::Event),
7503}
7504
7505impl From<PathEvent> for Event {
7506 fn from(source: PathEvent) -> Self {
7507 Self::Path(source)
7508 }
7509}
7510
7511fn get_max_ack_delay(params: &TransportParameters) -> Duration {
7512 Duration::from_micros(params.max_ack_delay.0 * 1000)
7513}
7514
7515const MAX_BACKOFF_EXPONENT: u32 = 16;
7517
7518const MAX_PTO_INTERVAL: Duration = Duration::from_secs(2);
7522
7523const MIN_IDLE_FOR_FAST_PTO: Duration = Duration::from_secs(25);
7525
7526const MAX_PTO_FAST_INTERVAL: Duration = Duration::from_secs(1);
7531
7532const SLOW_RTT_THRESHOLD: Duration =
7537 Duration::from_millis((MAX_PTO_INTERVAL.as_millis() as u64 * 2) / 3);
7538
7539const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
7547
7548const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
7554 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
7555
7556#[derive(Default)]
7557struct SentFrames {
7558 retransmits: ThinRetransmits,
7559 largest_acked: FxHashMap<PathId, u64>,
7561 stream_frames: StreamMetaVec,
7562 non_retransmits: bool,
7564 requires_padding: bool,
7566}
7567
7568impl SentFrames {
7569 fn is_ack_only(&self, streams: &StreamsState) -> bool {
7571 !self.largest_acked.is_empty()
7572 && !self.non_retransmits
7573 && self.stream_frames.is_empty()
7574 && self.retransmits.is_empty(streams)
7575 }
7576
7577 fn retransmits_mut(&mut self) -> &mut Retransmits {
7578 self.retransmits.get_or_create()
7579 }
7580
7581 fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
7582 use frame::EncodableFrame::*;
7583 match frame {
7584 PathAck(path_ack_encoder) => {
7585 if let Some(max) = path_ack_encoder.ranges.max() {
7586 self.largest_acked.insert(path_ack_encoder.path_id, max);
7587 }
7588 }
7589 Ack(ack_encoder) => {
7590 if let Some(max) = ack_encoder.ranges.max() {
7591 self.largest_acked.insert(PathId::ZERO, max);
7592 }
7593 }
7594 Close(_) => { }
7595 PathResponse(_) => self.non_retransmits = true,
7596 HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
7597 ReachOut(frame) => self.retransmits_mut().reach_out.push(frame),
7598 ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
7599 Ping(_) => self.non_retransmits = true,
7600 ImmediateAck(_) => self.non_retransmits = true,
7601 AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
7602 PathChallenge(_) => self.non_retransmits = true,
7603 Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
7604 PathAbandon(path_abandon) => {
7605 self.retransmits_mut()
7606 .path_abandon
7607 .entry(path_abandon.path_id)
7608 .or_insert(path_abandon.error_code);
7609 }
7610 PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
7611 | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
7612 self.retransmits_mut().path_status.insert(path_id);
7613 }
7614 MaxPathId(_) => self.retransmits_mut().max_path_id = true,
7615 PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
7616 PathCidsBlocked(path_cids_blocked) => {
7617 self.retransmits_mut()
7618 .path_cids_blocked
7619 .insert(path_cids_blocked.path_id);
7620 }
7621 ResetStream(reset) => self
7622 .retransmits_mut()
7623 .reset_stream
7624 .push((reset.id, reset.error_code)),
7625 StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
7626 NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
7627 RetireConnectionId(retire_cid) => self
7628 .retransmits_mut()
7629 .retire_cids
7630 .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
7631 Datagram(_) => self.non_retransmits = true,
7632 NewToken(_) => {}
7633 AddAddress(add_address) => {
7634 self.retransmits_mut().add_address.insert(add_address);
7635 }
7636 RemoveAddress(remove_address) => {
7637 self.retransmits_mut().remove_address.insert(remove_address);
7638 }
7639 StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
7640 MaxData(_) => self.retransmits_mut().max_data = true,
7641 MaxStreamData(max) => {
7642 self.retransmits_mut().max_stream_data.insert(max.id);
7643 }
7644 MaxStreams(max_streams) => {
7645 self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
7646 }
7647 StreamsBlocked(streams_blocked) => {
7648 self.retransmits_mut().streams_blocked[streams_blocked.dir as usize] = true
7649 }
7650 }
7651 }
7652}
7653
7654fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
7662 match (x, y) {
7663 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
7664 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
7665 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
7666 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
7667 }
7668}
7669
7670#[cfg(test)]
7671mod tests {
7672 use super::*;
7673
7674 #[test]
7675 fn negotiate_max_idle_timeout_commutative() {
7676 let test_params = [
7677 (None, None, None),
7678 (None, Some(VarInt(0)), None),
7679 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7680 (Some(VarInt(0)), Some(VarInt(0)), None),
7681 (
7682 Some(VarInt(2)),
7683 Some(VarInt(0)),
7684 Some(Duration::from_millis(2)),
7685 ),
7686 (
7687 Some(VarInt(1)),
7688 Some(VarInt(4)),
7689 Some(Duration::from_millis(1)),
7690 ),
7691 ];
7692
7693 for (left, right, result) in test_params {
7694 assert_eq!(negotiate_max_idle_timeout(left, right), result);
7695 assert_eq!(negotiate_max_idle_timeout(right, left), result);
7696 }
7697 }
7698}