1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::{NonZeroU32, NonZeroUsize},
8 sync::Arc,
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13
14use rand::{Rng, SeedableRng, rngs::StdRng};
15use rustc_hash::{FxHashMap, FxHashSet};
16use thiserror::Error;
17use tracing::{debug, error, trace, trace_span, warn};
18
19use crate::{
20 Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
21 MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
22 TransportError, TransportErrorCode, VarInt,
23 cid_generator::ConnectionIdGenerator,
24 cid_queue::CidQueue,
25 config::{ServerConfig, TransportConfig},
26 congestion::Controller,
27 connection::{
28 qlog::{QlogRecvPacket, QlogSink},
29 spaces::LostPacket,
30 timer::{ConnTimer, PathTimer},
31 },
32 crypto::{self, Keys},
33 frame::{
34 self, Close, DataBlocked, Datagram, FrameStruct, NewToken, ObservedAddr, StreamDataBlocked,
35 StreamsBlocked,
36 },
37 n0_nat_traversal,
38 packet::{
39 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40 PacketNumber, PartialDecode, SpaceId,
41 },
42 range_set::ArrayRangeSet,
43 shared::{
44 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45 EndpointEvent, EndpointEventInner,
46 },
47 token::{ResetToken, Token, TokenPayload},
48 transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54mod assembler;
55pub use assembler::Chunk;
56
57mod cid_state;
58use cid_state::CidState;
59
60mod datagrams;
61use datagrams::DatagramState;
62pub use datagrams::{Datagrams, SendDatagramError};
63
64mod mtud;
65mod pacing;
66
67mod packet_builder;
68use packet_builder::{PacketBuilder, PadDatagram};
69
70mod packet_crypto;
71use packet_crypto::CryptoState;
72pub(crate) use packet_crypto::EncryptionLevel;
73
74mod paths;
75pub use paths::{
76 ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError,
77};
78use paths::{PathData, PathState};
79
80pub(crate) mod qlog;
81pub(crate) mod send_buffer;
82
83mod spaces;
84#[cfg(fuzzing)]
85pub use spaces::Retransmits;
86#[cfg(not(fuzzing))]
87use spaces::Retransmits;
88pub(crate) use spaces::SpaceKind;
89use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
90
91mod stats;
92pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
93
94mod streams;
95#[cfg(fuzzing)]
96pub use streams::StreamsState;
97#[cfg(not(fuzzing))]
98use streams::StreamsState;
99pub use streams::{
100 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
101 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
102};
103
104mod timer;
105use timer::{Timer, TimerTable};
106
107mod transmit_buf;
108use transmit_buf::TransmitBuf;
109
110mod state;
111
112#[cfg(not(fuzzing))]
113use state::State;
114#[cfg(fuzzing)]
115pub use state::State;
116use state::StateType;
117
118pub struct Connection {
158 endpoint_config: Arc<EndpointConfig>,
159 config: Arc<TransportConfig>,
160 rng: StdRng,
161 crypto_state: CryptoState,
163 handshake_cid: ConnectionId,
165 remote_handshake_cid: ConnectionId,
167 paths: BTreeMap<PathId, PathState>,
173 path_generation_counter: u64,
184 allow_mtud: bool,
186 state: State,
187 side: ConnectionSide,
188 peer_params: TransportParameters,
190 original_remote_cid: ConnectionId,
192 initial_dst_cid: ConnectionId,
194 retry_src_cid: Option<ConnectionId>,
197 events: VecDeque<Event>,
199 endpoint_events: VecDeque<EndpointEventInner>,
200 spin_enabled: bool,
202 spin: bool,
204 spaces: [PacketSpace; 3],
206 highest_space: SpaceKind,
208 permit_idle_reset: bool,
210 idle_timeout: Option<Duration>,
212 timers: TimerTable,
213 authentication_failures: u64,
215
216 connection_close_pending: bool,
221
222 ack_frequency: AckFrequencyState,
226
227 receiving_ecn: bool,
232 total_authed_packets: u64,
234 app_limited: bool,
237
238 next_observed_addr_seq_no: VarInt,
243
244 streams: StreamsState,
245 remote_cids: FxHashMap<PathId, CidQueue>,
251 local_cid_state: FxHashMap<PathId, CidState>,
258 datagrams: DatagramState,
260 stats: ConnectionStats,
262 path_stats: FxHashMap<PathId, PathStats>,
264 version: u32,
266
267 max_concurrent_paths: NonZeroU32,
276 local_max_path_id: PathId,
291 remote_max_path_id: PathId,
297 max_path_id_with_cids: PathId,
303 abandoned_paths: FxHashSet<PathId>,
311
312 n0_nat_traversal: n0_nat_traversal::State,
314 qlog: QlogSink,
315}
316
317impl Connection {
318 pub(crate) fn new(
319 endpoint_config: Arc<EndpointConfig>,
320 config: Arc<TransportConfig>,
321 init_cid: ConnectionId,
322 local_cid: ConnectionId,
323 remote_cid: ConnectionId,
324 network_path: FourTuple,
325 crypto: Box<dyn crypto::Session>,
326 cid_gen: &dyn ConnectionIdGenerator,
327 now: Instant,
328 version: u32,
329 allow_mtud: bool,
330 rng_seed: [u8; 32],
331 side_args: SideArgs,
332 qlog: QlogSink,
333 ) -> Self {
334 let pref_addr_cid = side_args.pref_addr_cid();
335 let path_validated = side_args.path_validated();
336 let connection_side = ConnectionSide::from(side_args);
337 let side = connection_side.side();
338 let mut rng = StdRng::from_seed(rng_seed);
339 let initial_space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
340 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
341 #[cfg(test)]
342 let data_space = match config.deterministic_packet_numbers {
343 true => PacketSpace::new_deterministic(now, SpaceId::Data),
344 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
345 };
346 #[cfg(not(test))]
347 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
348 let state = State::handshake(state::Handshake {
349 remote_cid_set: side.is_server(),
350 expected_token: Bytes::new(),
351 client_hello: None,
352 allow_server_migration: side.is_client(),
353 });
354 let local_cid_state = FxHashMap::from_iter([(
355 PathId::ZERO,
356 CidState::new(
357 cid_gen.cid_len(),
358 cid_gen.cid_lifetime(),
359 now,
360 if pref_addr_cid.is_some() { 2 } else { 1 },
361 ),
362 )]);
363
364 let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
365 path.open_status = paths::OpenStatus::Informed;
366 let mut this = Self {
367 endpoint_config,
368 crypto_state: CryptoState::new(crypto, init_cid, side, &mut rng),
369 handshake_cid: local_cid,
370 remote_handshake_cid: remote_cid,
371 local_cid_state,
372 paths: BTreeMap::from_iter([(
373 PathId::ZERO,
374 PathState {
375 data: path,
376 prev: None,
377 },
378 )]),
379 path_generation_counter: 0,
380 allow_mtud,
381 state,
382 side: connection_side,
383 peer_params: TransportParameters::default(),
384 original_remote_cid: remote_cid,
385 initial_dst_cid: init_cid,
386 retry_src_cid: None,
387 events: VecDeque::new(),
388 endpoint_events: VecDeque::new(),
389 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
390 spin: false,
391 spaces: [initial_space, handshake_space, data_space],
392 highest_space: SpaceKind::Initial,
393 permit_idle_reset: true,
394 idle_timeout: match config.max_idle_timeout {
395 None | Some(VarInt(0)) => None,
396 Some(dur) => Some(Duration::from_millis(dur.0)),
397 },
398 timers: TimerTable::default(),
399 authentication_failures: 0,
400 connection_close_pending: false,
401
402 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
403 &TransportParameters::default(),
404 )),
405
406 app_limited: false,
407 receiving_ecn: false,
408 total_authed_packets: 0,
409
410 next_observed_addr_seq_no: 0u32.into(),
411
412 streams: StreamsState::new(
413 side,
414 config.max_concurrent_uni_streams,
415 config.max_concurrent_bidi_streams,
416 config.send_window,
417 config.receive_window,
418 config.stream_receive_window,
419 ),
420 datagrams: DatagramState::default(),
421 config,
422 remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]),
423 rng,
424 stats: ConnectionStats::default(),
425 path_stats: Default::default(),
426 version,
427
428 max_concurrent_paths: NonZeroU32::MIN,
430 local_max_path_id: PathId::ZERO,
431 remote_max_path_id: PathId::ZERO,
432 max_path_id_with_cids: PathId::ZERO,
433 abandoned_paths: Default::default(),
434
435 n0_nat_traversal: Default::default(),
436 qlog,
437 };
438 if path_validated {
439 this.on_path_validated(PathId::ZERO);
440 }
441 if side.is_client() {
442 this.write_crypto();
444 this.init_0rtt(now);
445 }
446 this.qlog
447 .emit_tuple_assigned(PathId::ZERO, network_path, now);
448 this
449 }
450
451 #[must_use]
459 pub fn poll_timeout(&mut self) -> Option<Instant> {
460 self.timers.peek()
461 }
462
463 #[must_use]
469 pub fn poll(&mut self) -> Option<Event> {
470 if let Some(x) = self.events.pop_front() {
471 return Some(x);
472 }
473
474 if let Some(event) = self.streams.poll() {
475 return Some(Event::Stream(event));
476 }
477
478 if let Some(reason) = self.state.take_error() {
479 return Some(Event::ConnectionLost { reason });
480 }
481
482 None
483 }
484
485 #[must_use]
487 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
488 self.endpoint_events.pop_front().map(EndpointEvent)
489 }
490
491 #[must_use]
493 pub fn streams(&mut self) -> Streams<'_> {
494 Streams {
495 state: &mut self.streams,
496 conn_state: &self.state,
497 }
498 }
499
500 #[must_use]
502 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
503 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
504 RecvStream {
505 id,
506 state: &mut self.streams,
507 pending: &mut self.spaces[SpaceId::Data].pending,
508 }
509 }
510
511 #[must_use]
513 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
514 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
515 SendStream {
516 id,
517 state: &mut self.streams,
518 pending: &mut self.spaces[SpaceId::Data].pending,
519 conn_state: &self.state,
520 }
521 }
522
523 pub fn open_path_ensure(
540 &mut self,
541 network_path: FourTuple,
542 initial_status: PathStatus,
543 now: Instant,
544 ) -> Result<(PathId, bool), PathError> {
545 let existing_open_path = self.paths.iter().find(|(id, path)| {
546 network_path.is_probably_same_path(&path.data.network_path)
547 && !self.abandoned_paths.contains(*id)
548 });
549 match existing_open_path {
550 Some((path_id, _state)) => Ok((*path_id, true)),
551 None => Ok((self.open_path(network_path, initial_status, now)?, false)),
552 }
553 }
554
555 pub fn open_path(
560 &mut self,
561 network_path: FourTuple,
562 initial_status: PathStatus,
563 now: Instant,
564 ) -> Result<PathId, PathError> {
565 if !self.is_multipath_negotiated() {
566 return Err(PathError::MultipathNotNegotiated);
567 }
568 if self.side().is_server() {
569 return Err(PathError::ServerSideNotAllowed);
570 }
571
572 let max_abandoned = self.abandoned_paths.iter().max().copied();
573 let max_used = self.paths.keys().last().copied();
574 let path_id = max_abandoned
575 .max(max_used)
576 .unwrap_or(PathId::ZERO)
577 .saturating_add(1u8);
578
579 if Some(path_id) > self.max_path_id() {
580 return Err(PathError::MaxPathIdReached);
581 }
582 if path_id > self.remote_max_path_id {
583 self.spaces[SpaceId::Data].pending.paths_blocked = true;
584 return Err(PathError::MaxPathIdReached);
585 }
586 if self
587 .remote_cids
588 .get(&path_id)
589 .map(CidQueue::active)
590 .is_none()
591 {
592 self.spaces[SpaceId::Data]
593 .pending
594 .path_cids_blocked
595 .insert(path_id);
596 return Err(PathError::RemoteCidsExhausted);
597 }
598
599 let path = self.ensure_path(path_id, network_path, now, None);
600 path.status.local_update(initial_status);
601
602 Ok(path_id)
603 }
604
605 pub fn close_path(
611 &mut self,
612 now: Instant,
613 path_id: PathId,
614 error_code: VarInt,
615 ) -> Result<(), ClosePathError> {
616 self.close_path_inner(
617 now,
618 path_id,
619 PathAbandonReason::ApplicationClosed { error_code },
620 )
621 }
622
623 fn close_path_inner(
628 &mut self,
629 now: Instant,
630 path_id: PathId,
631 reason: PathAbandonReason,
632 ) -> Result<(), ClosePathError> {
633 if self.state.is_drained() {
634 return Ok(());
635 }
636
637 if !self.is_multipath_negotiated() {
638 return Err(ClosePathError::MultipathNotNegotiated);
639 }
640 if self.abandoned_paths.contains(&path_id)
641 || Some(path_id) > self.max_path_id()
642 || !self.paths.contains_key(&path_id)
643 {
644 return Err(ClosePathError::ClosedPath);
645 }
646
647 if reason.is_locally_initiated() {
648 let has_remaining_validated_paths = self.paths.iter().any(|(id, path)| {
649 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
650 });
651 if !has_remaining_validated_paths {
652 return Err(ClosePathError::LastOpenPath);
653 }
654 } else {
655 let has_remaining_paths = self
659 .paths
660 .keys()
661 .any(|id| *id != path_id && !self.abandoned_paths.contains(id));
662 if !has_remaining_paths {
663 return Err(ClosePathError::LastOpenPath);
664 }
665 }
666
667 self.spaces[SpaceId::Data]
669 .pending
670 .path_abandon
671 .insert(path_id, reason.error_code());
672
673 let pending_space = &mut self.spaces[SpaceId::Data].pending;
675 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
676 pending_space.path_cids_blocked.retain(|&id| id != path_id);
677 pending_space.path_status.retain(|&id| id != path_id);
678
679 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
681 for sent_packet in space.sent_packets.values_mut() {
682 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
683 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
684 retransmits.path_cids_blocked.retain(|&id| id != path_id);
685 retransmits.path_status.retain(|&id| id != path_id);
686 }
687 }
688 }
689
690 self.remote_cids.remove(&path_id);
696 debug_assert!(!self.state.is_drained()); self.endpoint_events
698 .push_back(EndpointEventInner::RetireResetToken(path_id));
699
700 trace!(%path_id, "abandoning path");
701 self.abandoned_paths.insert(path_id);
702
703 for timer in timer::PathTimer::VALUES {
704 let keep_timer = match timer {
706 PathTimer::PathValidation | PathTimer::PathChallengeLost | PathTimer::PathOpen => {
710 false
711 }
712 PathTimer::PathKeepAlive | PathTimer::PathIdle => false,
715 PathTimer::MaxAckDelay => false,
718 PathTimer::DiscardPath => false,
721 PathTimer::LossDetection => true,
724 PathTimer::Pacing => true,
728 };
729
730 if !keep_timer {
731 let qlog = self.qlog.with_time(now);
732 self.timers.stop(Timer::PerPath(path_id, timer), qlog);
733 }
734 }
735
736 self.events.push_back(Event::Path(PathEvent::Abandoned {
738 id: path_id,
739 reason,
740 }));
741
742 Ok(())
743 }
744
745 #[track_caller]
749 fn path_data(&self, path_id: PathId) -> &PathData {
750 if let Some(data) = self.paths.get(&path_id) {
751 &data.data
752 } else {
753 panic!(
754 "unknown path: {path_id}, currently known paths: {:?}",
755 self.paths.keys().collect::<Vec<_>>()
756 );
757 }
758 }
759
760 fn path(&self, path_id: PathId) -> Option<&PathData> {
762 self.paths.get(&path_id).map(|path_state| &path_state.data)
763 }
764
765 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
767 self.paths
768 .get_mut(&path_id)
769 .map(|path_state| &mut path_state.data)
770 }
771
772 pub fn paths(&self) -> Vec<PathId> {
776 self.paths.keys().copied().collect()
777 }
778
779 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
781 self.path(path_id)
782 .map(PathData::local_status)
783 .ok_or(ClosedPath { _private: () })
784 }
785
786 pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
788 self.path(path_id)
789 .map(|path| path.network_path)
790 .ok_or(ClosedPath { _private: () })
791 }
792
793 pub fn set_path_status(
797 &mut self,
798 path_id: PathId,
799 status: PathStatus,
800 ) -> Result<PathStatus, SetPathStatusError> {
801 if !self.is_multipath_negotiated() {
802 return Err(SetPathStatusError::MultipathNotNegotiated);
803 }
804 let path = self
805 .path_mut(path_id)
806 .ok_or(SetPathStatusError::ClosedPath)?;
807 let prev = match path.status.local_update(status) {
808 Some(prev) => {
809 self.spaces[SpaceId::Data]
810 .pending
811 .path_status
812 .insert(path_id);
813 prev
814 }
815 None => path.local_status(),
816 };
817 Ok(prev)
818 }
819
820 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
825 self.path(path_id).and_then(|path| path.remote_status())
826 }
827
828 pub fn set_path_max_idle_timeout(
834 &mut self,
835 path_id: PathId,
836 timeout: Option<Duration>,
837 ) -> Result<Option<Duration>, ClosedPath> {
838 let path = self
839 .paths
840 .get_mut(&path_id)
841 .ok_or(ClosedPath { _private: () })?;
842 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
843 }
844
845 pub fn set_path_keep_alive_interval(
851 &mut self,
852 path_id: PathId,
853 interval: Option<Duration>,
854 ) -> Result<Option<Duration>, ClosedPath> {
855 let path = self
856 .paths
857 .get_mut(&path_id)
858 .ok_or(ClosedPath { _private: () })?;
859 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
860 }
861
862 #[track_caller]
866 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
867 &mut self.paths.get_mut(&path_id).expect("known path").data
868 }
869
870 fn find_validated_path_on_network_path(
874 &self,
875 network_path: FourTuple,
876 ) -> Option<(&PathId, &PathState)> {
877 self.paths.iter().find(|(path_id, path_state)| {
878 path_state.data.validated
879 && network_path.is_probably_same_path(&path_state.data.network_path)
881 && !self.abandoned_paths.contains(path_id)
882 })
883 }
887
888 fn ensure_path(
892 &mut self,
893 path_id: PathId,
894 network_path: FourTuple,
895 now: Instant,
896 pn: Option<u64>,
897 ) -> &mut PathData {
898 let valid_path = self.find_validated_path_on_network_path(network_path);
899 let validated = valid_path.is_some();
900 let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
901 let vacant_entry = match self.paths.entry(path_id) {
902 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
903 btree_map::Entry::Occupied(occupied_entry) => {
904 return &mut occupied_entry.into_mut().data;
905 }
906 };
907
908 debug!(%validated, %path_id, %network_path, "path added");
909 let peer_max_udp_payload_size =
910 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
911 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
912 let mut data = PathData::new(
913 network_path,
914 self.allow_mtud,
915 Some(peer_max_udp_payload_size),
916 self.path_generation_counter,
917 now,
918 &self.config,
919 );
920
921 data.validated = validated;
922 if let Some(initial_rtt) = initial_rtt {
923 data.rtt.reset_initial_rtt(initial_rtt);
924 }
925
926 data.pending_on_path_challenge = true;
929
930 let path = vacant_entry.insert(PathState { data, prev: None });
931
932 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
933 if let Some(pn) = pn {
934 pn_space.dedup.insert(pn);
935 }
936 self.spaces[SpaceId::Data]
937 .number_spaces
938 .insert(path_id, pn_space);
939 self.qlog.emit_tuple_assigned(path_id, network_path, now);
940
941 if !self.remote_cids.contains_key(&path_id) {
945 debug!(%path_id, "Remote opened path without issuing CIDs");
946 self.spaces[SpaceId::Data]
947 .pending
948 .path_cids_blocked
949 .insert(path_id);
950 }
953
954 &mut path.data
955 }
956
957 #[must_use]
967 pub fn poll_transmit(
968 &mut self,
969 now: Instant,
970 max_datagrams: NonZeroUsize,
971 buf: &mut Vec<u8>,
972 ) -> Option<Transmit> {
973 let max_datagrams = match self.config.enable_segmentation_offload {
974 false => NonZeroUsize::MIN,
975 true => max_datagrams,
976 };
977
978 let connection_close_pending = match self.state.as_type() {
984 StateType::Drained => {
985 self.app_limited = true;
986 return None;
987 }
988 StateType::Draining | StateType::Closed => {
989 if !self.connection_close_pending {
992 self.app_limited = true;
993 return None;
994 }
995 true
996 }
997 _ => false,
998 };
999
1000 if let Some(config) = &self.config.ack_frequency_config {
1002 let rtt = self
1003 .paths
1004 .values()
1005 .map(|p| p.data.rtt.get())
1006 .min()
1007 .expect("one path exists");
1008 self.spaces[SpaceId::Data].pending.ack_frequency = self
1009 .ack_frequency
1010 .should_send_ack_frequency(rtt, config, &self.peer_params)
1011 && self.highest_space == SpaceKind::Data
1012 && self.peer_supports_ack_frequency();
1013 }
1014
1015 let scheduling_info: BTreeMap<PathId, PathSchedulingInfo> = {
1017 let have_validated_status_available_space = self.paths.iter().any(|(path_id, path)| {
1018 self.remote_cids.contains_key(path_id)
1019 && !self.abandoned_paths.contains(path_id)
1020 && path.data.validated
1021 && path.data.local_status() == PathStatus::Available
1022 });
1023 let is_handshaking = self.is_handshaking();
1024 tracing::warn!(?is_handshaking);
1025 self.paths
1026 .iter()
1027 .map(|(path_id, path)| {
1028 let has_cids = self.remote_cids.contains_key(path_id);
1029 let validated = path.data.validated;
1030 let abandoned = self.abandoned_paths.contains(path_id);
1031 let status = path.data.local_status();
1032
1033 let may_send_data = has_cids
1036 && !abandoned
1037 && if is_handshaking {
1038 true
1042 } else if !validated {
1043 false
1050 } else {
1051 match status {
1052 PathStatus::Available => {
1053 true
1055 }
1056 PathStatus::Backup => {
1057 !have_validated_status_available_space
1059 }
1060 }
1061 };
1062 let may_send_close = has_cids
1067 && !abandoned
1068 && if !validated && have_validated_status_available_space {
1069 false
1071 } else {
1072 true
1074 };
1075 (
1076 *path_id,
1077 PathSchedulingInfo {
1078 abandoned,
1079 may_send_data,
1080 may_send_close,
1081 },
1082 )
1083 })
1084 .collect()
1085 };
1086
1087 let path_ids: Vec<_> = self.paths.keys().copied().collect();
1090
1091 let mut congestion_blocked = false;
1094
1095 for (&path_id, info) in scheduling_info.iter() {
1096 if !connection_close_pending
1097 && let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id)
1098 {
1099 return Some(transmit);
1100 }
1101
1102 match self.poll_transmit_on_path(
1104 now,
1105 buf,
1106 path_id,
1107 max_datagrams,
1108 info,
1109 connection_close_pending,
1110 ) {
1111 PollPathStatus::Send(transmit) => {
1112 return Some(transmit);
1113 }
1114 PollPathStatus::NothingToSend {
1115 congestion_blocked: cb,
1116 } => {
1117 congestion_blocked |= cb;
1118 debug_assert!(
1121 buf.is_empty(),
1122 "nothing to send on path but buffer not empty"
1123 );
1124 }
1125 }
1126 }
1127
1128 debug_assert!(
1130 buf.is_empty(),
1131 "there was data in the buffer, but it was not sent"
1132 );
1133
1134 self.app_limited = !congestion_blocked;
1135
1136 if self.state.is_established() {
1137 for path_id in path_ids {
1139 if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) {
1140 return Some(transmit);
1141 }
1142 }
1143 }
1144
1145 None
1146 }
1147
1148 fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit {
1149 debug_assert!(
1150 !transmit.is_empty(),
1151 "must not be called with an empty transmit buffer"
1152 );
1153
1154 let network_path = self.path_data(path_id).network_path;
1155 trace!(
1156 segment_size = transmit.segment_size(),
1157 last_datagram_len = transmit.len() % transmit.segment_size(),
1158 %network_path,
1159 "sending {} bytes in {} datagrams",
1160 transmit.len(),
1161 transmit.num_datagrams()
1162 );
1163 self.path_data_mut(path_id)
1164 .inc_total_sent(transmit.len() as u64);
1165
1166 self.stats
1167 .udp_tx
1168 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1169 self.path_stats
1170 .entry(path_id)
1171 .or_default()
1172 .udp_tx
1173 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1174
1175 Transmit {
1176 destination: network_path.remote,
1177 size: transmit.len(),
1178 ecn: if self.path_data(path_id).sending_ecn {
1179 Some(EcnCodepoint::Ect0)
1180 } else {
1181 None
1182 },
1183 segment_size: match transmit.num_datagrams() {
1184 1 => None,
1185 _ => Some(transmit.segment_size()),
1186 },
1187 src_ip: network_path.local_ip,
1188 }
1189 }
1190
1191 fn poll_transmit_off_path(
1193 &mut self,
1194 now: Instant,
1195 buf: &mut Vec<u8>,
1196 path_id: PathId,
1197 ) -> Option<Transmit> {
1198 if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) {
1199 return Some(challenge);
1200 }
1201 if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
1202 return Some(response);
1203 }
1204 if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
1205 return Some(challenge);
1206 }
1207 None
1208 }
1209
1210 #[must_use]
1217 fn poll_transmit_on_path(
1218 &mut self,
1219 now: Instant,
1220 buf: &mut Vec<u8>,
1221 path_id: PathId,
1222 max_datagrams: NonZeroUsize,
1223 scheduling_info: &PathSchedulingInfo,
1224 connection_close_pending: bool,
1225 ) -> PollPathStatus {
1226 let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else {
1228 if !self.abandoned_paths.contains(&path_id) {
1229 debug!(%path_id, "no remote CIDs for path");
1230 }
1231 return PollPathStatus::NothingToSend {
1232 congestion_blocked: false,
1233 };
1234 };
1235
1236 let mut pad_datagram = PadDatagram::No;
1242
1243 let mut last_packet_number = None;
1247
1248 let mut congestion_blocked = false;
1251
1252 let pmtu = self.path_data(path_id).current_mtu().into();
1254 let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu);
1255
1256 for space_id in SpaceId::iter() {
1258 if path_id != PathId::ZERO && space_id != SpaceId::Data {
1260 continue;
1261 }
1262 match self.poll_transmit_path_space(
1263 now,
1264 &mut transmit,
1265 path_id,
1266 space_id,
1267 remote_cid,
1268 scheduling_info,
1269 connection_close_pending,
1270 pad_datagram,
1271 ) {
1272 PollPathSpaceStatus::NothingToSend {
1273 congestion_blocked: cb,
1274 } => {
1275 congestion_blocked |= cb;
1276 }
1279 PollPathSpaceStatus::WrotePacket {
1280 last_packet_number: pn,
1281 pad_datagram: pad,
1282 } => {
1283 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1284 last_packet_number = Some(pn);
1285 pad_datagram = pad;
1286 continue;
1291 }
1292 PollPathSpaceStatus::Send {
1293 last_packet_number: pn,
1294 } => {
1295 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1296 last_packet_number = Some(pn);
1297 break;
1298 }
1299 }
1300 }
1301
1302 if last_packet_number.is_some() || congestion_blocked {
1303 self.qlog.emit_recovery_metrics(
1304 path_id,
1305 &mut self.paths.get_mut(&path_id).unwrap().data,
1306 now,
1307 );
1308 }
1309
1310 match last_packet_number {
1311 Some(last_packet_number) => {
1312 self.path_data_mut(path_id).congestion.on_sent(
1315 now,
1316 transmit.len() as u64,
1317 last_packet_number,
1318 );
1319 PollPathStatus::Send(self.build_transmit(path_id, transmit))
1320 }
1321 None => PollPathStatus::NothingToSend { congestion_blocked },
1322 }
1323 }
1324
1325 #[must_use]
1327 fn poll_transmit_path_space(
1328 &mut self,
1329 now: Instant,
1330 transmit: &mut TransmitBuf<'_>,
1331 path_id: PathId,
1332 space_id: SpaceId,
1333 remote_cid: ConnectionId,
1334 scheduling_info: &PathSchedulingInfo,
1335 connection_close_pending: bool,
1337 mut pad_datagram: PadDatagram,
1339 ) -> PollPathSpaceStatus {
1340 let mut last_packet_number = None;
1343
1344 loop {
1360 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1362 transmit.datagram_remaining_mut()
1364 } else {
1365 transmit.segment_size()
1367 };
1368 let can_send =
1369 self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending);
1370 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1371 let space_will_send = {
1372 if scheduling_info.abandoned {
1373 false
1376 } else if can_send.close && scheduling_info.may_send_close {
1377 true
1379 } else if needs_loss_probe {
1380 true
1382 } else if can_send.space_id_only {
1383 true
1385 } else {
1386 !can_send.is_empty() && scheduling_info.may_send_data
1389 }
1390 };
1391 tracing::warn!(?can_send, ?scheduling_info, ?space_will_send, "checking");
1392
1393 if !space_will_send {
1394 return match last_packet_number {
1397 Some(pn) => PollPathSpaceStatus::WrotePacket {
1398 last_packet_number: pn,
1399 pad_datagram,
1400 },
1401 None => {
1402 if self.crypto_state.has_keys(space_id.encryption_level())
1404 || (space_id == SpaceId::Data
1405 && self.crypto_state.has_keys(EncryptionLevel::ZeroRtt))
1406 {
1407 trace!(?space_id, %path_id, "nothing to send in space");
1408 }
1409 PollPathSpaceStatus::NothingToSend {
1410 congestion_blocked: false,
1411 }
1412 }
1413 };
1414 }
1415
1416 if transmit.datagram_remaining_mut() == 0 {
1420 let congestion_blocked =
1421 self.path_congestion_check(space_id, path_id, transmit, &can_send, now);
1422 if congestion_blocked != PathBlocked::No {
1423 return match last_packet_number {
1425 Some(pn) => PollPathSpaceStatus::WrotePacket {
1426 last_packet_number: pn,
1427 pad_datagram,
1428 },
1429 None => {
1430 return PollPathSpaceStatus::NothingToSend {
1431 congestion_blocked: true,
1432 };
1433 }
1434 };
1435 }
1436 }
1437
1438 if transmit.datagram_remaining_mut() == 0 {
1441 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1442 return match last_packet_number {
1445 Some(pn) => PollPathSpaceStatus::WrotePacket {
1446 last_packet_number: pn,
1447 pad_datagram,
1448 },
1449 None => {
1450 return PollPathSpaceStatus::NothingToSend {
1451 congestion_blocked: false,
1452 };
1453 }
1454 };
1455 }
1456
1457 match self.spaces[space_id].for_path(path_id).loss_probes {
1458 0 => transmit.start_new_datagram(),
1459 _ => {
1460 let request_immediate_ack =
1462 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1463 self.spaces[space_id].maybe_queue_probe(
1465 path_id,
1466 request_immediate_ack,
1467 &self.streams,
1468 );
1469
1470 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1471
1472 transmit.start_new_datagram_with_size(std::cmp::min(
1476 usize::from(INITIAL_MTU),
1477 transmit.segment_size(),
1478 ));
1479 }
1480 }
1481 trace!(count = transmit.num_datagrams(), "new datagram started");
1482
1483 pad_datagram = PadDatagram::No;
1485 }
1486
1487 if transmit.datagram_start_offset() < transmit.len() {
1490 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1491 }
1492
1493 if self.crypto_state.has_keys(EncryptionLevel::Initial)
1498 && space_id == SpaceId::Handshake
1499 && self.side.is_client()
1500 {
1501 self.discard_space(now, SpaceKind::Initial);
1504 }
1505 if let Some(ref mut prev) = self.crypto_state.prev_crypto {
1506 prev.update_unacked = false;
1507 }
1508
1509 let Some(mut builder) = PacketBuilder::new(
1510 now,
1511 space_id,
1512 path_id,
1513 remote_cid,
1514 transmit,
1515 can_send.is_ack_eliciting(),
1516 self,
1517 ) else {
1518 return PollPathSpaceStatus::NothingToSend {
1525 congestion_blocked: false,
1526 };
1527 };
1528 last_packet_number = Some(builder.packet_number);
1529
1530 if space_id == SpaceId::Initial
1531 && (self.side.is_client() || can_send.is_ack_eliciting())
1532 {
1533 pad_datagram |= PadDatagram::ToMinMtu;
1535 }
1536 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1537 pad_datagram |= PadDatagram::ToSegmentSize;
1538 }
1539
1540 if can_send.close {
1541 trace!("sending CONNECTION_CLOSE");
1542 let is_multipath_negotiated = self.is_multipath_negotiated();
1547 for path_id in self.spaces[space_id]
1548 .number_spaces
1549 .iter()
1550 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1551 .map(|(&path_id, _)| path_id)
1552 .collect::<Vec<_>>()
1553 {
1554 Self::populate_acks(
1555 now,
1556 self.receiving_ecn,
1557 path_id,
1558 space_id,
1559 &mut self.spaces[space_id],
1560 is_multipath_negotiated,
1561 &mut builder,
1562 &mut self.stats.frame_tx,
1563 self.crypto_state.has_keys(space_id.encryption_level()),
1564 );
1565 }
1566
1567 debug_assert!(
1575 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1576 "ACKs should leave space for ConnectionClose"
1577 );
1578 let stats = &mut self.stats.frame_tx;
1579 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1580 let max_frame_size = builder.frame_space_remaining();
1581 let close: Close = match self.state.as_type() {
1582 StateType::Closed => {
1583 let reason: Close =
1584 self.state.as_closed().expect("checked").clone().into();
1585 if space_id == SpaceId::Data || reason.is_transport_layer() {
1586 reason
1587 } else {
1588 TransportError::APPLICATION_ERROR("").into()
1589 }
1590 }
1591 StateType::Draining => TransportError::NO_ERROR("").into(),
1592 _ => unreachable!(
1593 "tried to make a close packet when the connection wasn't closed"
1594 ),
1595 };
1596 builder.write_frame(close.encoder(max_frame_size), stats);
1597 }
1598 let last_pn = builder.packet_number;
1599 builder.finish_and_track(now, self, path_id, pad_datagram);
1600 if space_id.kind() == self.highest_space {
1601 self.connection_close_pending = false;
1604 }
1605 return PollPathSpaceStatus::WrotePacket {
1618 last_packet_number: last_pn,
1619 pad_datagram,
1620 };
1621 }
1622
1623 self.populate_packet(now, space_id, path_id, scheduling_info, &mut builder);
1624
1625 debug_assert!(
1632 !(builder.sent_frames().is_ack_only(&self.streams)
1633 && !can_send.acks
1634 && (can_send.other || can_send.space_id_only)
1635 && builder.buf.segment_size()
1636 == self.path_data(path_id).current_mtu() as usize
1637 && self.datagrams.outgoing.is_empty()),
1638 "SendableFrames was {can_send:?}, but only ACKs have been written"
1639 );
1640 if builder.sent_frames().requires_padding {
1641 pad_datagram |= PadDatagram::ToMinMtu;
1642 }
1643
1644 for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1645 self.spaces[space_id]
1646 .for_path(*path_id)
1647 .pending_acks
1648 .acks_sent();
1649 self.timers.stop(
1650 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1651 self.qlog.with_time(now),
1652 );
1653 }
1654
1655 if builder.can_coalesce && path_id == PathId::ZERO && {
1663 let max_packet_size = builder
1664 .buf
1665 .datagram_remaining_mut()
1666 .saturating_sub(builder.predict_packet_end());
1667 max_packet_size > MIN_PACKET_SPACE
1668 && self.has_pending_packet(space_id, max_packet_size, connection_close_pending)
1669 } {
1670 trace!("will coalesce with next packet");
1673 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1674 } else {
1675 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1681 const MAX_PADDING: usize = 32;
1689 if builder.buf.datagram_remaining_mut()
1690 > builder.predict_packet_end() + MAX_PADDING
1691 {
1692 trace!(
1693 "GSO truncated by demand for {} padding bytes",
1694 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1695 );
1696 let last_pn = builder.packet_number;
1697 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1698 return PollPathSpaceStatus::Send {
1699 last_packet_number: last_pn,
1700 };
1701 }
1702
1703 builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1706 } else {
1707 builder.finish_and_track(now, self, path_id, pad_datagram);
1708 }
1709
1710 if transmit.num_datagrams() == 1 {
1713 transmit.clip_segment_size();
1714 }
1715 }
1716 }
1717 }
1718
1719 fn poll_transmit_mtu_probe(
1720 &mut self,
1721 now: Instant,
1722 buf: &mut Vec<u8>,
1723 path_id: PathId,
1724 ) -> Option<Transmit> {
1725 let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?;
1726
1727 let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize);
1729 transmit.start_new_datagram_with_size(probe_size as usize);
1730
1731 let mut builder = PacketBuilder::new(
1732 now,
1733 SpaceId::Data,
1734 path_id,
1735 active_cid,
1736 &mut transmit,
1737 true,
1738 self,
1739 )?;
1740
1741 trace!(?probe_size, "writing MTUD probe");
1743 builder.write_frame(frame::Ping, &mut self.stats.frame_tx);
1744
1745 if self.peer_supports_ack_frequency() {
1747 builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx);
1748 }
1749
1750 builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1751
1752 self.path_stats
1753 .entry(path_id)
1754 .or_default()
1755 .sent_plpmtud_probes += 1;
1756
1757 Some(self.build_transmit(path_id, transmit))
1758 }
1759
1760 fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> {
1768 let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?;
1769 let is_eligible = self.path_data(path_id).validated
1770 && !self.path_data(path_id).is_validating_path()
1771 && !self.abandoned_paths.contains(&path_id);
1772
1773 if !is_eligible {
1774 return None;
1775 }
1776 let next_pn = self.spaces[SpaceId::Data]
1777 .for_path(path_id)
1778 .peek_tx_number();
1779 let probe_size = self
1780 .path_data_mut(path_id)
1781 .mtud
1782 .poll_transmit(now, next_pn)?;
1783
1784 Some((active_cid, probe_size))
1785 }
1786
1787 fn has_pending_packet(
1804 &mut self,
1805 current_space_id: SpaceId,
1806 max_packet_size: usize,
1807 connection_close_pending: bool,
1808 ) -> bool {
1809 let mut space_id = current_space_id;
1810 loop {
1811 let can_send = self.space_can_send(
1812 space_id,
1813 PathId::ZERO, max_packet_size,
1815 connection_close_pending,
1816 );
1817 if !can_send.is_empty() {
1818 return true;
1819 }
1820 match space_id.next() {
1821 Some(next_space_id) => space_id = next_space_id,
1822 None => break,
1823 }
1824 }
1825 false
1826 }
1827
1828 fn path_congestion_check(
1830 &mut self,
1831 space_id: SpaceId,
1832 path_id: PathId,
1833 transmit: &TransmitBuf<'_>,
1834 can_send: &SendableFrames,
1835 now: Instant,
1836 ) -> PathBlocked {
1837 if self.side().is_server()
1843 && self
1844 .path_data(path_id)
1845 .anti_amplification_blocked(transmit.len() as u64 + 1)
1846 {
1847 trace!(?space_id, %path_id, "blocked by anti-amplification");
1848 return PathBlocked::AntiAmplification;
1849 }
1850
1851 let bytes_to_send = transmit.segment_size() as u64;
1854 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1855
1856 if can_send.other && !need_loss_probe && !can_send.close {
1857 let path = self.path_data(path_id);
1858 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1859 trace!(?space_id, %path_id, "blocked by congestion control");
1860 return PathBlocked::Congestion;
1861 }
1862 }
1863
1864 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1866 self.timers.set(
1867 Timer::PerPath(path_id, PathTimer::Pacing),
1868 delay,
1869 self.qlog.with_time(now),
1870 );
1871 trace!(?space_id, %path_id, "blocked by pacing");
1874 return PathBlocked::Pacing;
1875 }
1876
1877 PathBlocked::No
1878 }
1879
1880 fn send_prev_path_challenge(
1885 &mut self,
1886 now: Instant,
1887 buf: &mut Vec<u8>,
1888 path_id: PathId,
1889 ) -> Option<Transmit> {
1890 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1891 if !prev_path.pending_on_path_challenge {
1894 return None;
1895 };
1896 prev_path.pending_on_path_challenge = false;
1897 let token = self.rng.random();
1898 let network_path = prev_path.network_path;
1899 prev_path.record_path_challenge_sent(now, token, network_path);
1900
1901 debug_assert_eq!(
1902 self.highest_space,
1903 SpaceKind::Data,
1904 "PATH_CHALLENGE queued without 1-RTT keys"
1905 );
1906 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1907 buf.start_new_datagram();
1908
1909 let mut builder =
1915 PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, false, self)?;
1916 let challenge = frame::PathChallenge(token);
1917 let stats = &mut self.stats.frame_tx;
1918 builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1919
1920 builder.pad_to(MIN_INITIAL_SIZE);
1925
1926 builder.finish(self, now);
1927 self.stats.udp_tx.on_sent(1, buf.len());
1928 self.path_stats
1929 .entry(path_id)
1930 .or_default()
1931 .udp_tx
1932 .on_sent(1, buf.len());
1933
1934 Some(Transmit {
1935 destination: network_path.remote,
1936 size: buf.len(),
1937 ecn: None,
1938 segment_size: None,
1939 src_ip: network_path.local_ip,
1940 })
1941 }
1942
1943 fn send_off_path_path_response(
1944 &mut self,
1945 now: Instant,
1946 buf: &mut Vec<u8>,
1947 path_id: PathId,
1948 ) -> Option<Transmit> {
1949 let path = self.paths.get_mut(&path_id).map(|state| &mut state.data)?;
1950 let cid_queue = self.remote_cids.get_mut(&path_id)?;
1951 let (token, network_path) = path.path_responses.pop_off_path(path.network_path)?;
1952
1953 let cid = cid_queue
1954 .next_reserved()
1955 .unwrap_or_else(|| cid_queue.active());
1956 let frame = frame::PathResponse(token);
1960
1961 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1962 buf.start_new_datagram();
1963
1964 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, false, self)?;
1965 let stats = &mut self.stats.frame_tx;
1966 builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
1967 builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
1968
1969 let size = buf.len();
1970
1971 self.stats.udp_tx.on_sent(1, size);
1972 self.path_stats
1973 .entry(path_id)
1974 .or_default()
1975 .udp_tx
1976 .on_sent(1, size);
1977 Some(Transmit {
1978 destination: network_path.remote,
1979 size,
1980 ecn: None,
1981 segment_size: None,
1982 src_ip: network_path.local_ip,
1983 })
1984 }
1985
1986 fn send_nat_traversal_path_challenge(
1991 &mut self,
1992 now: Instant,
1993 buf: &mut Vec<u8>,
1994 path_id: PathId,
1995 ) -> Option<Transmit> {
1996 let server_side = self.n0_nat_traversal.server_side_mut().ok()?;
1997 let probe = server_side.next_probe()?;
1998 if !self.paths.get(&path_id)?.data.validated {
1999 return None;
2001 }
2002
2003 let remote_cids = self.remote_cids.get_mut(&path_id)?;
2004
2005 if remote_cids.remaining() < 2 {
2008 return None;
2009 }
2010
2011 let cid = remote_cids.next_reserved()?;
2012 let remote = probe.remote();
2013 let token = self.rng.random();
2014 probe.mark_as_sent();
2015
2016 let frame = frame::PathChallenge(token);
2017
2018 let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2019 buf.start_new_datagram();
2020
2021 let mut builder =
2022 PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, false, self)?;
2023 let stats = &mut self.stats.frame_tx;
2024 builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
2025 builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
2026
2027 let path = &mut self.paths.get_mut(&path_id).expect("checked").data;
2028 let network_path = FourTuple {
2029 remote,
2030 local_ip: None,
2031 };
2032
2033 path.record_path_challenge_sent(now, token, network_path);
2034
2035 let size = buf.len();
2036
2037 self.stats.udp_tx.on_sent(1, size);
2038 self.path_stats
2039 .entry(path_id)
2040 .or_default()
2041 .udp_tx
2042 .on_sent(1, size);
2043
2044 Some(Transmit {
2045 destination: remote,
2046 size,
2047 ecn: None,
2048 segment_size: None,
2049 src_ip: None,
2050 })
2051 }
2052
2053 fn space_can_send(
2061 &mut self,
2062 space_id: SpaceId,
2063 path_id: PathId,
2064 packet_size: usize,
2065 connection_close_pending: bool,
2066 ) -> SendableFrames {
2067 let space = &mut self.spaces[space_id];
2068 let space_has_crypto = self.crypto_state.has_keys(space_id.encryption_level());
2069
2070 if !space_has_crypto
2071 && (space_id != SpaceId::Data
2072 || !self.crypto_state.has_keys(EncryptionLevel::ZeroRtt)
2073 || self.side.is_server())
2074 {
2075 return SendableFrames::empty();
2077 }
2078
2079 let mut can_send = space.can_send(path_id, &self.streams);
2080
2081 if space_id == SpaceId::Data {
2083 let pn = space.for_path(path_id).peek_tx_number();
2084 let frame_space_1rtt =
2090 packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
2091 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
2092 }
2093
2094 can_send.close = connection_close_pending && space_has_crypto;
2095
2096 can_send
2097 }
2098
2099 pub fn handle_event(&mut self, event: ConnectionEvent) {
2105 use ConnectionEventInner::*;
2106 match event.0 {
2107 Datagram(DatagramConnectionEvent {
2108 now,
2109 network_path,
2110 path_id,
2111 ecn,
2112 first_decode,
2113 remaining,
2114 }) => {
2115 let span = trace_span!("pkt", %path_id);
2116 let _guard = span.enter();
2117
2118 if self.update_network_path_or_discard(network_path, path_id) {
2119 return;
2121 }
2122
2123 let was_anti_amplification_blocked = self
2124 .path(path_id)
2125 .map(|path| path.anti_amplification_blocked(1))
2126 .unwrap_or(false);
2129
2130 self.stats.udp_rx.datagrams += 1;
2131 self.stats.udp_rx.bytes += first_decode.len() as u64;
2132 let rx = &mut self.path_stats.entry(path_id).or_default().udp_rx;
2133 rx.datagrams += 1;
2134 rx.bytes += first_decode.len() as u64;
2135 let data_len = first_decode.len();
2136
2137 self.handle_decode(now, network_path, path_id, ecn, first_decode);
2138 if let Some(path) = self.path_mut(path_id) {
2143 path.inc_total_recvd(data_len as u64);
2144 }
2145
2146 if let Some(data) = remaining {
2147 self.stats.udp_rx.bytes += data.len() as u64;
2148 self.path_stats.entry(path_id).or_default().udp_rx.bytes += data.len() as u64;
2149 self.handle_coalesced(now, network_path, path_id, ecn, data);
2150 }
2151
2152 if let Some(path) = self.paths.get_mut(&path_id) {
2153 self.qlog
2154 .emit_recovery_metrics(path_id, &mut path.data, now);
2155 }
2156
2157 if was_anti_amplification_blocked {
2158 self.set_loss_detection_timer(now, path_id);
2162 }
2163 }
2164 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
2165 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
2166 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
2167 let cid_state = self
2168 .local_cid_state
2169 .entry(path_id)
2170 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
2171 cid_state.new_cids(&ids, now);
2172
2173 ids.into_iter().rev().for_each(|frame| {
2174 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
2175 });
2176 self.reset_cid_retirement(now);
2178 }
2179 }
2180 }
2181
2182 fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
2187 let remote_may_migrate = self.side.remote_may_migrate(&self.state);
2188 let local_ip_may_migrate = self.side.is_client();
2189 if let Some(known_path) = self.path_mut(path_id) {
2193 if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
2194 trace!(
2195 %path_id,
2196 %network_path,
2197 %known_path.network_path,
2198 "discarding packet from unrecognized peer"
2199 );
2200 return true;
2201 }
2202
2203 if known_path.network_path.local_ip.is_some()
2204 && network_path.local_ip.is_some()
2205 && known_path.network_path.local_ip != network_path.local_ip
2206 && !local_ip_may_migrate
2207 {
2208 trace!(
2209 %path_id,
2210 %network_path,
2211 %known_path.network_path,
2212 "discarding packet sent to incorrect interface"
2213 );
2214 return true;
2215 }
2216 if let Some(local_ip) = network_path.local_ip {
2221 if known_path
2222 .network_path
2223 .local_ip
2224 .is_some_and(|ip| ip != local_ip)
2225 {
2226 debug!(
2227 %path_id,
2228 %network_path,
2229 %known_path.network_path,
2230 "path's local address seemingly migrated"
2231 );
2232 }
2233 known_path.network_path.local_ip = Some(local_ip);
2240 }
2241 }
2242 false
2243 }
2244
2245 pub fn handle_timeout(&mut self, now: Instant) {
2255 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
2256 trace!(?timer, at=?now, "timeout");
2258 match timer {
2259 Timer::Conn(timer) => match timer {
2260 ConnTimer::Close => {
2261 self.state.move_to_drained(None);
2262 self.endpoint_events.push_back(EndpointEventInner::Drained);
2265 }
2266 ConnTimer::Idle => {
2267 self.kill(ConnectionError::TimedOut);
2268 }
2269 ConnTimer::KeepAlive => {
2270 trace!("sending keep-alive");
2271 self.ping();
2272 }
2273 ConnTimer::KeyDiscard => {
2274 self.crypto_state.discard_temporary_keys();
2275 }
2276 ConnTimer::PushNewCid => {
2277 while let Some((path_id, when)) = self.next_cid_retirement() {
2278 if when > now {
2279 break;
2280 }
2281 match self.local_cid_state.get_mut(&path_id) {
2282 None => error!(%path_id, "No local CID state for path"),
2283 Some(cid_state) => {
2284 let num_new_cid = cid_state.on_cid_timeout().into();
2286 if !self.state.is_closed() {
2287 trace!(
2288 "push a new CID to peer RETIRE_PRIOR_TO field {}",
2289 cid_state.retire_prior_to()
2290 );
2291 self.endpoint_events.push_back(
2292 EndpointEventInner::NeedIdentifiers(
2293 path_id,
2294 now,
2295 num_new_cid,
2296 ),
2297 );
2298 }
2299 }
2300 }
2301 }
2302 }
2303 },
2304 Timer::PerPath(path_id, timer) => {
2306 let span = trace_span!("per-path timer fired", %path_id, ?timer);
2307 let _guard = span.enter();
2308 match timer {
2309 PathTimer::PathIdle => {
2310 if let Err(err) =
2311 self.close_path_inner(now, path_id, PathAbandonReason::TimedOut)
2312 {
2313 warn!(?err, "failed closing path");
2314 }
2315 }
2316
2317 PathTimer::PathKeepAlive => {
2318 trace!("sending keep-alive on path");
2319 self.ping_path(path_id).ok();
2320 }
2321 PathTimer::LossDetection => {
2322 self.on_loss_detection_timeout(now, path_id);
2323 self.qlog.emit_recovery_metrics(
2324 path_id,
2325 &mut self.paths.get_mut(&path_id).unwrap().data,
2326 now,
2327 );
2328 }
2329 PathTimer::PathValidation => {
2330 let Some(path) = self.paths.get_mut(&path_id) else {
2331 continue;
2332 };
2333 self.timers.stop(
2334 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2335 self.qlog.with_time(now),
2336 );
2337 debug!("path validation failed");
2338 if let Some((_, prev)) = path.prev.take() {
2339 path.data = prev;
2340 }
2341 path.data.reset_on_path_challenges();
2342 }
2343 PathTimer::PathChallengeLost => {
2344 let Some(path) = self.paths.get_mut(&path_id) else {
2345 continue;
2346 };
2347 trace!("path challenge deemed lost");
2348 path.data.pending_on_path_challenge = true;
2349 }
2350 PathTimer::PathOpen => {
2351 let Some(path) = self.paths.get_mut(&path_id) else {
2352 continue;
2353 };
2354 path.data.reset_on_path_challenges();
2355 self.timers.stop(
2356 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2357 self.qlog.with_time(now),
2358 );
2359 debug!("new path validation failed");
2360 if let Err(err) = self.close_path_inner(
2361 now,
2362 path_id,
2363 PathAbandonReason::ValidationFailed,
2364 ) {
2365 warn!(?err, "failed closing path");
2366 }
2367 }
2368 PathTimer::Pacing => trace!("pacing timer expired"),
2369 PathTimer::MaxAckDelay => {
2370 trace!("max ack delay reached");
2371 self.spaces[SpaceId::Data]
2373 .for_path(path_id)
2374 .pending_acks
2375 .on_max_ack_delay_timeout()
2376 }
2377 PathTimer::DiscardPath => {
2378 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2381 if let Some(local_cid_state) = self.local_cid_state.remove(&path_id) {
2382 debug_assert!(!self.state.is_drained()); let (min_seq, max_seq) = local_cid_state.active_seq();
2384 for seq in min_seq..=max_seq {
2385 self.endpoint_events.push_back(
2386 EndpointEventInner::RetireConnectionId(
2387 now, path_id, seq, false,
2388 ),
2389 );
2390 }
2391 }
2392 self.discard_path(path_id, now);
2393 }
2394 }
2395 }
2396 }
2397 }
2398 }
2399
2400 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2412 self.close_inner(
2413 now,
2414 Close::Application(frame::ApplicationClose { error_code, reason }),
2415 )
2416 }
2417
2418 fn close_inner(&mut self, now: Instant, reason: Close) {
2434 let was_closed = self.state.is_closed();
2435 if !was_closed {
2436 self.close_common();
2437 self.set_close_timer(now);
2438 self.connection_close_pending = true;
2439 self.state.move_to_closed_local(reason);
2440 }
2441 }
2442
2443 pub fn datagrams(&mut self) -> Datagrams<'_> {
2445 Datagrams { conn: self }
2446 }
2447
2448 pub fn stats(&mut self) -> ConnectionStats {
2450 self.stats.clone()
2451 }
2452
2453 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2455 let path = self.paths.get(&path_id)?;
2456 let stats = self.path_stats.entry(path_id).or_default();
2457 stats.rtt = path.data.rtt.get();
2458 stats.cwnd = path.data.congestion.window();
2459 stats.current_mtu = path.data.mtud.current_mtu();
2460 Some(*stats)
2461 }
2462
2463 pub fn ping(&mut self) {
2467 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2470 path_data.ping_pending = true;
2471 }
2472 }
2473
2474 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2478 let path_data = self.spaces[self.highest_space]
2479 .number_spaces
2480 .get_mut(&path)
2481 .ok_or(ClosedPath { _private: () })?;
2482 path_data.ping_pending = true;
2483 Ok(())
2484 }
2485
2486 pub fn force_key_update(&mut self) {
2490 if !self.state.is_established() {
2491 debug!("ignoring forced key update in illegal state");
2492 return;
2493 }
2494 if self.crypto_state.prev_crypto.is_some() {
2495 debug!("ignoring redundant forced key update");
2498 return;
2499 }
2500 self.crypto_state.update_keys(None, false);
2501 }
2502
2503 pub fn crypto_session(&self) -> &dyn crypto::Session {
2505 self.crypto_state.session.as_ref()
2506 }
2507
2508 pub fn is_handshaking(&self) -> bool {
2513 self.state.is_handshake()
2514 }
2515
2516 pub fn is_closed(&self) -> bool {
2524 self.state.is_closed()
2525 }
2526
2527 pub fn is_drained(&self) -> bool {
2532 self.state.is_drained()
2533 }
2534
2535 pub fn accepted_0rtt(&self) -> bool {
2539 self.crypto_state.accepted_0rtt
2540 }
2541
2542 pub fn has_0rtt(&self) -> bool {
2544 self.crypto_state.zero_rtt_enabled
2545 }
2546
2547 pub fn has_pending_retransmits(&self) -> bool {
2549 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2550 }
2551
2552 pub fn side(&self) -> Side {
2554 self.side.side()
2555 }
2556
2557 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2559 self.path(path_id)
2560 .map(|path_data| {
2561 path_data
2562 .last_observed_addr_report
2563 .as_ref()
2564 .map(|observed| observed.socket_addr())
2565 })
2566 .ok_or(ClosedPath { _private: () })
2567 }
2568
2569 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2571 self.path(path_id).map(|d| d.rtt.get())
2572 }
2573
2574 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2576 self.path(path_id).map(|d| d.congestion.as_ref())
2577 }
2578
2579 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2584 self.streams.set_max_concurrent(dir, count);
2585 let pending = &mut self.spaces[SpaceId::Data].pending;
2588 self.streams.queue_max_stream_id(pending);
2589 }
2590
2591 pub fn set_max_concurrent_paths(
2601 &mut self,
2602 now: Instant,
2603 count: NonZeroU32,
2604 ) -> Result<(), MultipathNotNegotiated> {
2605 if !self.is_multipath_negotiated() {
2606 return Err(MultipathNotNegotiated { _private: () });
2607 }
2608 self.max_concurrent_paths = count;
2609
2610 let in_use_count = self
2611 .local_max_path_id
2612 .next()
2613 .saturating_sub(self.abandoned_paths.len() as u32)
2614 .as_u32();
2615 let extra_needed = count.get().saturating_sub(in_use_count);
2616 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2617
2618 self.set_max_path_id(now, new_max_path_id);
2619
2620 Ok(())
2621 }
2622
2623 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2625 if max_path_id <= self.local_max_path_id {
2626 return;
2627 }
2628
2629 self.local_max_path_id = max_path_id;
2630 self.spaces[SpaceId::Data].pending.max_path_id = true;
2631
2632 self.issue_first_path_cids(now);
2633 }
2634
2635 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2641 self.streams.max_concurrent(dir)
2642 }
2643
2644 pub fn set_send_window(&mut self, send_window: u64) {
2646 self.streams.set_send_window(send_window);
2647 }
2648
2649 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2651 if self.streams.set_receive_window(receive_window) {
2652 self.spaces[SpaceId::Data].pending.max_data = true;
2653 }
2654 }
2655
2656 pub fn is_multipath_negotiated(&self) -> bool {
2661 !self.is_handshaking()
2662 && self.config.max_concurrent_multipath_paths.is_some()
2663 && self.peer_params.initial_max_path_id.is_some()
2664 }
2665
2666 fn on_ack_received(
2667 &mut self,
2668 now: Instant,
2669 space: SpaceId,
2670 ack: frame::Ack,
2671 ) -> Result<(), TransportError> {
2672 let path = PathId::ZERO;
2674 self.inner_on_ack_received(now, space, path, ack)
2675 }
2676
2677 fn on_path_ack_received(
2678 &mut self,
2679 now: Instant,
2680 space: SpaceId,
2681 path_ack: frame::PathAck,
2682 ) -> Result<(), TransportError> {
2683 let (ack, path) = path_ack.into_ack();
2684 self.inner_on_ack_received(now, space, path, ack)
2685 }
2686
2687 fn inner_on_ack_received(
2689 &mut self,
2690 now: Instant,
2691 space: SpaceId,
2692 path: PathId,
2693 ack: frame::Ack,
2694 ) -> Result<(), TransportError> {
2695 if self.abandoned_paths.contains(&path) {
2696 trace!("silently ignoring PATH_ACK on abandoned path");
2699 return Ok(());
2700 }
2701 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2702 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2703 }
2704 let new_largest = {
2705 let space = &mut self.spaces[space].for_path(path);
2706 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2707 space.largest_acked_packet = Some(ack.largest);
2708 if let Some(info) = space.sent_packets.get(ack.largest) {
2709 space.largest_acked_packet_sent = info.time_sent;
2713 }
2714 true
2715 } else {
2716 false
2717 }
2718 };
2719
2720 if self.detect_spurious_loss(&ack, space, path) {
2721 self.path_data_mut(path)
2722 .congestion
2723 .on_spurious_congestion_event();
2724 }
2725
2726 let mut newly_acked = ArrayRangeSet::new();
2728 for range in ack.iter() {
2729 self.spaces[space].for_path(path).check_ack(range.clone())?;
2730 for (pn, _) in self.spaces[space]
2731 .for_path(path)
2732 .sent_packets
2733 .iter_range(range)
2734 {
2735 newly_acked.insert_one(pn);
2736 }
2737 }
2738
2739 if newly_acked.is_empty() {
2740 return Ok(());
2741 }
2742
2743 let mut ack_eliciting_acked = false;
2744 for packet in newly_acked.elts() {
2745 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2746 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2747 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2753 pns.pending_acks.subtract_below(*acked_pn);
2754 }
2755 }
2756 ack_eliciting_acked |= info.ack_eliciting;
2757
2758 let path_data = self.path_data_mut(path);
2760 let mtu_updated = path_data.mtud.on_acked(space.kind(), packet, info.size);
2761 if mtu_updated {
2762 path_data
2763 .congestion
2764 .on_mtu_update(path_data.mtud.current_mtu());
2765 }
2766
2767 self.ack_frequency.on_acked(path, packet);
2769
2770 self.on_packet_acked(now, path, info);
2771 }
2772 }
2773
2774 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2775 let app_limited = self.app_limited;
2776 let path_data = self.path_data_mut(path);
2777 let in_flight = path_data.in_flight.bytes;
2778
2779 path_data
2780 .congestion
2781 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2782
2783 if new_largest && ack_eliciting_acked {
2784 let ack_delay = if space != SpaceId::Data {
2785 Duration::from_micros(0)
2786 } else {
2787 cmp::min(
2788 self.ack_frequency.peer_max_ack_delay,
2789 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2790 )
2791 };
2792 let rtt = now.saturating_duration_since(
2793 self.spaces[space].for_path(path).largest_acked_packet_sent,
2794 );
2795
2796 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2797 let path_data = self.path_data_mut(path);
2798 path_data.rtt.update(ack_delay, rtt);
2800 if path_data.first_packet_after_rtt_sample.is_none() {
2801 path_data.first_packet_after_rtt_sample = Some((space.kind(), next_pn));
2802 }
2803 }
2804
2805 self.detect_lost_packets(now, space, path, true);
2807
2808 if self.peer_completed_address_validation(path) {
2809 self.path_data_mut(path).pto_count = 0;
2810 }
2811
2812 if self.path_data(path).sending_ecn {
2817 if let Some(ecn) = ack.ecn {
2818 if new_largest {
2823 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2824 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2825 }
2826 } else {
2827 debug!("ECN not acknowledged by peer");
2829 self.path_data_mut(path).sending_ecn = false;
2830 }
2831 }
2832
2833 self.set_loss_detection_timer(now, path);
2834 Ok(())
2835 }
2836
2837 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2838 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2839
2840 if lost_packets.is_empty() {
2841 return false;
2842 }
2843
2844 for range in ack.iter() {
2845 let spurious_losses: Vec<u64> = lost_packets
2846 .iter_range(range.clone())
2847 .map(|(pn, _info)| pn)
2848 .collect();
2849
2850 for pn in spurious_losses {
2851 lost_packets.remove(pn);
2852 }
2853 }
2854
2855 lost_packets.is_empty()
2860 }
2861
2862 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2867 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2868
2869 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2870 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2871 }
2872
2873 fn process_ecn(
2875 &mut self,
2876 now: Instant,
2877 space: SpaceId,
2878 path: PathId,
2879 newly_acked: u64,
2880 ecn: frame::EcnCounts,
2881 largest_sent_time: Instant,
2882 ) {
2883 match self.spaces[space]
2884 .for_path(path)
2885 .detect_ecn(newly_acked, ecn)
2886 {
2887 Err(e) => {
2888 debug!("halting ECN due to verification failure: {}", e);
2889
2890 self.path_data_mut(path).sending_ecn = false;
2891 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2894 }
2895 Ok(false) => {}
2896 Ok(true) => {
2897 self.path_stats.entry(path).or_default().congestion_events += 1;
2898 self.path_data_mut(path).congestion.on_congestion_event(
2899 now,
2900 largest_sent_time,
2901 false,
2902 true,
2903 0,
2904 );
2905 }
2906 }
2907 }
2908
2909 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2912 self.paths
2913 .get_mut(&path_id)
2914 .expect("known path")
2915 .remove_in_flight(&info);
2916 let app_limited = self.app_limited;
2917 let path = self.path_data_mut(path_id);
2918 if info.ack_eliciting && !path.is_validating_path() {
2919 let rtt = path.rtt;
2922 path.congestion
2923 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2924 }
2925
2926 if let Some(retransmits) = info.retransmits.get() {
2928 for (id, _) in retransmits.reset_stream.iter() {
2929 self.streams.reset_acked(*id);
2930 }
2931 }
2932
2933 for frame in info.stream_frames {
2934 self.streams.received_ack_of(frame);
2935 }
2936 }
2937
2938 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceKind) {
2939 let start = if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) {
2940 now
2941 } else {
2942 self.crypto_state
2943 .prev_crypto
2944 .as_ref()
2945 .expect("no previous keys")
2946 .end_packet
2947 .as_ref()
2948 .expect("update not acknowledged yet")
2949 .1
2950 };
2951
2952 self.timers.set(
2954 Timer::Conn(ConnTimer::KeyDiscard),
2955 start + self.max_pto_for_space(space) * 3,
2956 self.qlog.with_time(now),
2957 );
2958 }
2959
2960 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2973 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2974 self.detect_lost_packets(now, pn_space, path_id, false);
2976 self.set_loss_detection_timer(now, path_id);
2977 return;
2978 }
2979
2980 let (_, space) = match self.pto_time_and_space(now, path_id) {
2981 Some(x) => x,
2982 None => {
2983 error!(%path_id, "PTO expired while unset");
2984 return;
2985 }
2986 };
2987 trace!(
2988 in_flight = self.path_data(path_id).in_flight.bytes,
2989 count = self.path_data(path_id).pto_count,
2990 ?space,
2991 %path_id,
2992 "PTO fired"
2993 );
2994
2995 let count = match self.path_data(path_id).in_flight.ack_eliciting {
2996 0 => {
2999 debug_assert!(!self.peer_completed_address_validation(path_id));
3000 1
3001 }
3002 _ => 2,
3004 };
3005 let pns = self.spaces[space].for_path(path_id);
3006 pns.loss_probes = pns.loss_probes.saturating_add(count);
3007 let path_data = self.path_data_mut(path_id);
3008 path_data.pto_count = path_data.pto_count.saturating_add(1);
3009 self.set_loss_detection_timer(now, path_id);
3010 }
3011
3012 fn detect_lost_packets(
3029 &mut self,
3030 now: Instant,
3031 pn_space: SpaceId,
3032 path_id: PathId,
3033 due_to_ack: bool,
3034 ) {
3035 let mut lost_packets = Vec::<u64>::new();
3036 let mut lost_mtu_probe = None;
3037 let mut in_persistent_congestion = false;
3038 let mut size_of_lost_packets = 0u64;
3039 self.spaces[pn_space].for_path(path_id).loss_time = None;
3040
3041 let path = self.path_data(path_id);
3044 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3045 let loss_delay = path
3046 .rtt
3047 .conservative()
3048 .mul_f32(self.config.time_threshold)
3049 .max(TIMER_GRANULARITY);
3050 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
3051
3052 let largest_acked_packet = self.spaces[pn_space]
3053 .for_path(path_id)
3054 .largest_acked_packet
3055 .expect("detect_lost_packets only to be called if path received at least one ACK");
3056 let packet_threshold = self.config.packet_threshold as u64;
3057
3058 let congestion_period = self
3062 .pto(SpaceKind::Data, path_id)
3063 .saturating_mul(self.config.persistent_congestion_threshold);
3064 let mut persistent_congestion_start: Option<Instant> = None;
3065 let mut prev_packet = None;
3066 let space = self.spaces[pn_space].for_path(path_id);
3067
3068 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
3069 if prev_packet != Some(packet.wrapping_sub(1)) {
3070 persistent_congestion_start = None;
3072 }
3073
3074 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
3078 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
3079 if Some(packet) == in_flight_mtu_probe {
3081 lost_mtu_probe = in_flight_mtu_probe;
3084 } else {
3085 lost_packets.push(packet);
3086 size_of_lost_packets += info.size as u64;
3087 if info.ack_eliciting && due_to_ack {
3088 match persistent_congestion_start {
3089 Some(start) if info.time_sent - start > congestion_period => {
3092 in_persistent_congestion = true;
3093 }
3094 None if first_packet_after_rtt_sample
3096 .is_some_and(|x| x < (pn_space.kind(), packet)) =>
3097 {
3098 persistent_congestion_start = Some(info.time_sent);
3099 }
3100 _ => {}
3101 }
3102 }
3103 }
3104 } else {
3105 if space.loss_time.is_none() {
3107 space.loss_time = Some(info.time_sent + loss_delay);
3110 }
3111 persistent_congestion_start = None;
3112 }
3113
3114 prev_packet = Some(packet);
3115 }
3116
3117 self.handle_lost_packets(
3118 pn_space,
3119 path_id,
3120 now,
3121 lost_packets,
3122 lost_mtu_probe,
3123 loss_delay,
3124 in_persistent_congestion,
3125 size_of_lost_packets,
3126 );
3127 }
3128
3129 fn discard_path(&mut self, path_id: PathId, now: Instant) {
3131 trace!(%path_id, "dropping path state");
3132 let path = self.path_data(path_id);
3133 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3134
3135 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
3137 .for_path(path_id)
3138 .sent_packets
3139 .iter()
3140 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
3141 .map(|(pn, info)| {
3142 size_of_lost_packets += info.size as u64;
3143 pn
3144 })
3145 .collect();
3146
3147 if !lost_pns.is_empty() {
3148 trace!(
3149 %path_id,
3150 count = lost_pns.len(),
3151 lost_bytes = size_of_lost_packets,
3152 "packets lost on path abandon"
3153 );
3154 self.handle_lost_packets(
3155 SpaceId::Data,
3156 path_id,
3157 now,
3158 lost_pns,
3159 in_flight_mtu_probe,
3160 Duration::ZERO,
3161 false,
3162 size_of_lost_packets,
3163 );
3164 }
3165 let path_stats = self.path_stats(path_id).unwrap_or_default();
3168 self.path_stats.remove(&path_id);
3169 self.paths.remove(&path_id);
3170 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
3171
3172 self.events.push_back(
3173 PathEvent::Discarded {
3174 id: path_id,
3175 path_stats,
3176 }
3177 .into(),
3178 );
3179 }
3180
3181 fn handle_lost_packets(
3182 &mut self,
3183 pn_space: SpaceId,
3184 path_id: PathId,
3185 now: Instant,
3186 lost_packets: Vec<u64>,
3187 lost_mtu_probe: Option<u64>,
3188 loss_delay: Duration,
3189 in_persistent_congestion: bool,
3190 size_of_lost_packets: u64,
3191 ) {
3192 debug_assert!(
3193 {
3194 let mut sorted = lost_packets.clone();
3195 sorted.sort();
3196 sorted == lost_packets
3197 },
3198 "lost_packets must be sorted"
3199 );
3200
3201 self.drain_lost_packets(now, pn_space, path_id);
3202
3203 if let Some(largest_lost) = lost_packets.last().cloned() {
3205 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
3206 let largest_lost_sent = self.spaces[pn_space]
3207 .for_path(path_id)
3208 .sent_packets
3209 .get(largest_lost)
3210 .unwrap()
3211 .time_sent;
3212 let path_stats = self.path_stats.entry(path_id).or_default();
3213 path_stats.lost_packets += lost_packets.len() as u64;
3214 path_stats.lost_bytes += size_of_lost_packets;
3215 trace!(
3216 %path_id,
3217 count = lost_packets.len(),
3218 lost_bytes = size_of_lost_packets,
3219 "packets lost",
3220 );
3221
3222 for &packet in &lost_packets {
3223 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
3224 continue;
3225 };
3226 self.qlog
3227 .emit_packet_lost(packet, &info, loss_delay, pn_space.kind(), now);
3228 self.paths
3229 .get_mut(&path_id)
3230 .unwrap()
3231 .remove_in_flight(&info);
3232
3233 for frame in info.stream_frames {
3234 self.streams.retransmit(frame);
3235 }
3236 self.spaces[pn_space].pending |= info.retransmits;
3237 self.path_data_mut(path_id)
3238 .mtud
3239 .on_non_probe_lost(packet, info.size);
3240
3241 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
3242 packet,
3243 LostPacket {
3244 time_sent: info.time_sent,
3245 },
3246 );
3247 }
3248
3249 let path = self.path_data_mut(path_id);
3250 if path.mtud.black_hole_detected(now) {
3251 path.congestion.on_mtu_update(path.mtud.current_mtu());
3252 if let Some(max_datagram_size) = self.datagrams().max_size()
3253 && self.datagrams.drop_oversized(max_datagram_size)
3254 && self.datagrams.send_blocked
3255 {
3256 self.datagrams.send_blocked = false;
3257 self.events.push_back(Event::DatagramsUnblocked);
3258 }
3259 self.path_stats
3260 .entry(path_id)
3261 .or_default()
3262 .black_holes_detected += 1;
3263 }
3264
3265 let lost_ack_eliciting =
3267 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
3268
3269 if lost_ack_eliciting {
3270 self.path_stats
3271 .entry(path_id)
3272 .or_default()
3273 .congestion_events += 1;
3274 self.path_data_mut(path_id).congestion.on_congestion_event(
3275 now,
3276 largest_lost_sent,
3277 in_persistent_congestion,
3278 false,
3279 size_of_lost_packets,
3280 );
3281 }
3282 }
3283
3284 if let Some(packet) = lost_mtu_probe {
3286 let info = self.spaces[SpaceId::Data]
3287 .for_path(path_id)
3288 .take(packet)
3289 .unwrap(); self.paths
3292 .get_mut(&path_id)
3293 .unwrap()
3294 .remove_in_flight(&info);
3295 self.path_data_mut(path_id).mtud.on_probe_lost();
3296 self.path_stats
3297 .entry(path_id)
3298 .or_default()
3299 .lost_plpmtud_probes += 1;
3300 }
3301 }
3302
3303 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
3309 SpaceId::iter()
3310 .filter_map(|id| {
3311 self.spaces[id]
3312 .number_spaces
3313 .get(&path_id)
3314 .and_then(|pns| pns.loss_time)
3315 .map(|time| (time, id))
3316 })
3317 .min_by_key(|&(time, _)| time)
3318 }
3319
3320 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3322 let path = self.path(path_id)?;
3323 let pto_count = path.pto_count;
3324 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
3325 let mut duration = path.rtt.pto_base() * backoff;
3326
3327 if path_id == PathId::ZERO
3328 && path.in_flight.ack_eliciting == 0
3329 && !self.peer_completed_address_validation(PathId::ZERO)
3330 {
3331 let space = match self.highest_space {
3337 SpaceKind::Handshake => SpaceId::Handshake,
3338 _ => SpaceId::Initial,
3339 };
3340
3341 return Some((now + duration, space));
3342 }
3343
3344 let mut result = None;
3345 for space in SpaceId::iter() {
3346 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3347 continue;
3348 };
3349
3350 if !pns.has_in_flight() {
3351 continue;
3352 }
3353 if space == SpaceId::Data {
3354 if self.is_handshaking() {
3356 return result;
3357 }
3358 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3360 }
3361 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3362 continue;
3363 };
3364 let pto = last_ack_eliciting + duration;
3365 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3366 if path.anti_amplification_blocked(1) {
3367 continue;
3369 }
3370 if path.in_flight.ack_eliciting == 0 {
3371 continue;
3373 }
3374 result = Some((pto, space));
3375 }
3376 }
3377 result
3378 }
3379
3380 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3381 if self.side.is_server() || self.state.is_closed() {
3383 return true;
3384 }
3385 self.spaces[SpaceId::Handshake]
3388 .path_space(PathId::ZERO)
3389 .and_then(|pns| pns.largest_acked_packet)
3390 .is_some()
3391 || self.spaces[SpaceId::Data]
3392 .path_space(path)
3393 .and_then(|pns| pns.largest_acked_packet)
3394 .is_some()
3395 || (self.crypto_state.has_keys(EncryptionLevel::OneRtt)
3396 && !self.crypto_state.has_keys(EncryptionLevel::Handshake))
3397 }
3398
3399 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3407 if self.state.is_closed() {
3408 return;
3412 }
3413
3414 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3415 self.timers.set(
3417 Timer::PerPath(path_id, PathTimer::LossDetection),
3418 loss_time,
3419 self.qlog.with_time(now),
3420 );
3421 return;
3422 }
3423
3424 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3427 self.timers.set(
3428 Timer::PerPath(path_id, PathTimer::LossDetection),
3429 timeout,
3430 self.qlog.with_time(now),
3431 );
3432 } else {
3433 self.timers.stop(
3434 Timer::PerPath(path_id, PathTimer::LossDetection),
3435 self.qlog.with_time(now),
3436 );
3437 }
3438 }
3439
3440 fn max_pto_for_space(&self, space: SpaceKind) -> Duration {
3444 self.paths
3445 .keys()
3446 .map(|path_id| self.pto(space, *path_id))
3447 .max()
3448 .expect("there should be at least one path")
3449 }
3450
3451 fn pto(&self, space: SpaceKind, path_id: PathId) -> Duration {
3456 let max_ack_delay = match space {
3457 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3458 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3459 };
3460 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3461 }
3462
3463 fn on_packet_authenticated(
3464 &mut self,
3465 now: Instant,
3466 space_id: SpaceKind,
3467 path_id: PathId,
3468 ecn: Option<EcnCodepoint>,
3469 packet: Option<u64>,
3470 spin: bool,
3471 is_1rtt: bool,
3472 ) {
3473 self.total_authed_packets += 1;
3474 self.reset_keep_alive(path_id, now);
3475 self.reset_idle_timeout(now, space_id, path_id);
3476 self.permit_idle_reset = true;
3477 self.receiving_ecn |= ecn.is_some();
3478 if let Some(x) = ecn {
3479 let space = &mut self.spaces[space_id];
3480 space.for_path(path_id).ecn_counters += x;
3481
3482 if x.is_ce() {
3483 space
3484 .for_path(path_id)
3485 .pending_acks
3486 .set_immediate_ack_required();
3487 }
3488 }
3489
3490 let packet = match packet {
3491 Some(x) => x,
3492 None => return,
3493 };
3494 match &self.side {
3495 ConnectionSide::Client { .. } => {
3496 if space_id == SpaceKind::Handshake
3500 && let Some(hs) = self.state.as_handshake_mut()
3501 {
3502 hs.allow_server_migration = false;
3503 }
3504 }
3505 ConnectionSide::Server { .. } => {
3506 if self.crypto_state.has_keys(EncryptionLevel::Initial)
3507 && space_id == SpaceKind::Handshake
3508 {
3509 self.discard_space(now, SpaceKind::Initial);
3511 }
3512 if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) && is_1rtt {
3513 self.set_key_discard_timer(now, space_id)
3515 }
3516 }
3517 }
3518 let space = self.spaces[space_id].for_path(path_id);
3519 space.pending_acks.insert_one(packet, now);
3520 if packet >= space.rx_packet.unwrap_or_default() {
3521 space.rx_packet = Some(packet);
3522 self.spin = self.side.is_client() ^ spin;
3524 }
3525 }
3526
3527 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceKind, path_id: PathId) {
3532 if let Some(timeout) = self.idle_timeout {
3534 if self.state.is_closed() {
3535 self.timers
3536 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3537 } else {
3538 let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space));
3539 self.timers.set(
3540 Timer::Conn(ConnTimer::Idle),
3541 now + dt,
3542 self.qlog.with_time(now),
3543 );
3544 }
3545 }
3546
3547 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3549 if self.state.is_closed() {
3550 self.timers.stop(
3551 Timer::PerPath(path_id, PathTimer::PathIdle),
3552 self.qlog.with_time(now),
3553 );
3554 } else {
3555 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3556 self.timers.set(
3557 Timer::PerPath(path_id, PathTimer::PathIdle),
3558 now + dt,
3559 self.qlog.with_time(now),
3560 );
3561 }
3562 }
3563 }
3564
3565 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3567 if !self.state.is_established() {
3568 return;
3569 }
3570
3571 if let Some(interval) = self.config.keep_alive_interval {
3572 self.timers.set(
3573 Timer::Conn(ConnTimer::KeepAlive),
3574 now + interval,
3575 self.qlog.with_time(now),
3576 );
3577 }
3578
3579 if let Some(interval) = self.path_data(path_id).keep_alive {
3580 self.timers.set(
3581 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3582 now + interval,
3583 self.qlog.with_time(now),
3584 );
3585 }
3586 }
3587
3588 fn reset_cid_retirement(&mut self, now: Instant) {
3590 if let Some((_path, t)) = self.next_cid_retirement() {
3591 self.timers.set(
3592 Timer::Conn(ConnTimer::PushNewCid),
3593 t,
3594 self.qlog.with_time(now),
3595 );
3596 }
3597 }
3598
3599 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3601 self.local_cid_state
3602 .iter()
3603 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3604 .min_by_key(|(_path_id, timeout)| *timeout)
3605 }
3606
3607 pub(crate) fn handle_first_packet(
3612 &mut self,
3613 now: Instant,
3614 network_path: FourTuple,
3615 ecn: Option<EcnCodepoint>,
3616 packet_number: u64,
3617 packet: InitialPacket,
3618 remaining: Option<BytesMut>,
3619 ) -> Result<(), ConnectionError> {
3620 let span = trace_span!("first recv");
3621 let _guard = span.enter();
3622 debug_assert!(self.side.is_server());
3623 let len = packet.header_data.len() + packet.payload.len();
3624 let path_id = PathId::ZERO;
3625 self.path_data_mut(path_id).total_recvd = len as u64;
3626
3627 if let Some(hs) = self.state.as_handshake_mut() {
3628 hs.expected_token = packet.header.token.clone();
3629 } else {
3630 unreachable!("first packet must be delivered in Handshake state");
3631 }
3632
3633 self.on_packet_authenticated(
3635 now,
3636 SpaceKind::Initial,
3637 path_id,
3638 ecn,
3639 Some(packet_number),
3640 false,
3641 false,
3642 );
3643
3644 let packet: Packet = packet.into();
3645
3646 let mut qlog = QlogRecvPacket::new(len);
3647 qlog.header(&packet.header, Some(packet_number), path_id);
3648
3649 self.process_decrypted_packet(
3650 now,
3651 network_path,
3652 path_id,
3653 Some(packet_number),
3654 packet,
3655 &mut qlog,
3656 )?;
3657 self.qlog.emit_packet_received(qlog, now);
3658 if let Some(data) = remaining {
3659 self.handle_coalesced(now, network_path, path_id, ecn, data);
3660 }
3661
3662 self.qlog.emit_recovery_metrics(
3663 path_id,
3664 &mut self.paths.get_mut(&path_id).unwrap().data,
3665 now,
3666 );
3667
3668 Ok(())
3669 }
3670
3671 fn init_0rtt(&mut self, now: Instant) {
3672 let (header, packet) = match self.crypto_state.session.early_crypto() {
3673 Some(x) => x,
3674 None => return,
3675 };
3676 if self.side.is_client() {
3677 match self.crypto_state.session.transport_parameters() {
3678 Ok(params) => {
3679 let params = params
3680 .expect("crypto layer didn't supply transport parameters with ticket");
3681 let params = TransportParameters {
3683 initial_src_cid: None,
3684 original_dst_cid: None,
3685 preferred_address: None,
3686 retry_src_cid: None,
3687 stateless_reset_token: None,
3688 min_ack_delay: None,
3689 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3690 max_ack_delay: TransportParameters::default().max_ack_delay,
3691 initial_max_path_id: None,
3692 ..params
3693 };
3694 self.set_peer_params(params);
3695 self.qlog.emit_peer_transport_params_restored(self, now);
3696 }
3697 Err(e) => {
3698 error!("session ticket has malformed transport parameters: {}", e);
3699 return;
3700 }
3701 }
3702 }
3703 trace!("0-RTT enabled");
3704 self.crypto_state.enable_zero_rtt(header, packet);
3705 }
3706
3707 fn read_crypto(
3708 &mut self,
3709 space: SpaceId,
3710 crypto: &frame::Crypto,
3711 payload_len: usize,
3712 ) -> Result<(), TransportError> {
3713 let expected = if !self.state.is_handshake() {
3714 SpaceId::Data
3715 } else if self.highest_space == SpaceKind::Initial {
3716 SpaceId::Initial
3717 } else {
3718 SpaceId::Handshake
3721 };
3722 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3726
3727 let end = crypto.offset + crypto.data.len() as u64;
3728 if space < expected
3729 && end
3730 > self.crypto_state.spaces[space.kind()]
3731 .crypto_stream
3732 .bytes_read()
3733 {
3734 warn!(
3735 "received new {:?} CRYPTO data when expecting {:?}",
3736 space, expected
3737 );
3738 return Err(TransportError::PROTOCOL_VIOLATION(
3739 "new data at unexpected encryption level",
3740 ));
3741 }
3742
3743 let crypto_space = &mut self.crypto_state.spaces[space.kind()];
3744 let max = end.saturating_sub(crypto_space.crypto_stream.bytes_read());
3745 if max > self.config.crypto_buffer_size as u64 {
3746 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3747 }
3748
3749 crypto_space
3750 .crypto_stream
3751 .insert(crypto.offset, crypto.data.clone(), payload_len);
3752 while let Some(chunk) = crypto_space.crypto_stream.read(usize::MAX, true) {
3753 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3754 if self.crypto_state.session.read_handshake(&chunk.bytes)? {
3755 self.events.push_back(Event::HandshakeDataReady);
3756 }
3757 }
3758
3759 Ok(())
3760 }
3761
3762 fn write_crypto(&mut self) {
3763 loop {
3764 let space = self.highest_space;
3765 let mut outgoing = Vec::new();
3766 if let Some(crypto) = self.crypto_state.session.write_handshake(&mut outgoing) {
3767 match space {
3768 SpaceKind::Initial => {
3769 self.upgrade_crypto(SpaceKind::Handshake, crypto);
3770 }
3771 SpaceKind::Handshake => {
3772 self.upgrade_crypto(SpaceKind::Data, crypto);
3773 }
3774 SpaceKind::Data => unreachable!("got updated secrets during 1-RTT"),
3775 }
3776 }
3777 if outgoing.is_empty() {
3778 if space == self.highest_space {
3779 break;
3780 } else {
3781 continue;
3783 }
3784 }
3785 let offset = self.crypto_state.spaces[space].crypto_offset;
3786 let outgoing = Bytes::from(outgoing);
3787 if let Some(hs) = self.state.as_handshake_mut()
3788 && space == SpaceKind::Initial
3789 && offset == 0
3790 && self.side.is_client()
3791 {
3792 hs.client_hello = Some(outgoing.clone());
3793 }
3794 self.crypto_state.spaces[space].crypto_offset += outgoing.len() as u64;
3795 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3796 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3797 offset,
3798 data: outgoing,
3799 });
3800 }
3801 }
3802
3803 fn upgrade_crypto(&mut self, space: SpaceKind, crypto: Keys) {
3805 debug_assert!(
3806 !self.crypto_state.has_keys(space.encryption_level()),
3807 "already reached packet space {space:?}"
3808 );
3809 trace!("{:?} keys ready", space);
3810 if space == SpaceKind::Data {
3811 self.crypto_state.next_crypto = Some(
3813 self.crypto_state
3814 .session
3815 .next_1rtt_keys()
3816 .expect("handshake should be complete"),
3817 );
3818 }
3819
3820 self.crypto_state.spaces[space].keys = Some(crypto);
3821 debug_assert!(space > self.highest_space);
3822 self.highest_space = space;
3823 if space == SpaceKind::Data && self.side.is_client() {
3824 self.crypto_state.discard_zero_rtt();
3826 }
3827 }
3828
3829 fn discard_space(&mut self, now: Instant, space: SpaceKind) {
3830 debug_assert!(space != SpaceKind::Data);
3831 trace!("discarding {:?} keys", space);
3832 if space == SpaceKind::Initial {
3833 if let ConnectionSide::Client { token, .. } = &mut self.side {
3835 *token = Bytes::new();
3836 }
3837 }
3838 self.crypto_state.spaces[space].keys = None;
3839 let space = &mut self.spaces[space];
3840 let pns = space.for_path(PathId::ZERO);
3841 pns.time_of_last_ack_eliciting_packet = None;
3842 pns.loss_time = None;
3843 pns.loss_probes = 0;
3844 let sent_packets = mem::take(&mut pns.sent_packets);
3845 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3846 for (_, packet) in sent_packets.into_iter() {
3847 path.data.remove_in_flight(&packet);
3848 }
3849
3850 self.set_loss_detection_timer(now, PathId::ZERO)
3851 }
3852
3853 fn handle_coalesced(
3854 &mut self,
3855 now: Instant,
3856 network_path: FourTuple,
3857 path_id: PathId,
3858 ecn: Option<EcnCodepoint>,
3859 data: BytesMut,
3860 ) {
3861 self.path_data_mut(path_id)
3862 .inc_total_recvd(data.len() as u64);
3863 let mut remaining = Some(data);
3864 let cid_len = self
3865 .local_cid_state
3866 .values()
3867 .map(|cid_state| cid_state.cid_len())
3868 .next()
3869 .expect("one cid_state must exist");
3870 while let Some(data) = remaining {
3871 match PartialDecode::new(
3872 data,
3873 &FixedLengthConnectionIdParser::new(cid_len),
3874 &[self.version],
3875 self.endpoint_config.grease_quic_bit,
3876 ) {
3877 Ok((partial_decode, rest)) => {
3878 remaining = rest;
3879 self.handle_decode(now, network_path, path_id, ecn, partial_decode);
3880 }
3881 Err(e) => {
3882 trace!("malformed header: {}", e);
3883 return;
3884 }
3885 }
3886 }
3887 }
3888
3889 fn handle_decode(
3890 &mut self,
3891 now: Instant,
3892 network_path: FourTuple,
3893 path_id: PathId,
3894 ecn: Option<EcnCodepoint>,
3895 partial_decode: PartialDecode,
3896 ) {
3897 let qlog = QlogRecvPacket::new(partial_decode.len());
3898 if let Some(decoded) = self
3899 .crypto_state
3900 .unprotect_header(partial_decode, self.peer_params.stateless_reset_token)
3901 {
3902 self.handle_packet(
3903 now,
3904 network_path,
3905 path_id,
3906 ecn,
3907 decoded.packet,
3908 decoded.stateless_reset,
3909 qlog,
3910 );
3911 }
3912 }
3913
3914 fn handle_packet(
3915 &mut self,
3916 now: Instant,
3917 network_path: FourTuple,
3918 path_id: PathId,
3919 ecn: Option<EcnCodepoint>,
3920 packet: Option<Packet>,
3921 stateless_reset: bool,
3922 mut qlog: QlogRecvPacket,
3923 ) {
3924 self.stats.udp_rx.ios += 1;
3925 self.path_stats.entry(path_id).or_default().udp_rx.ios += 1;
3926
3927 if let Some(ref packet) = packet {
3928 trace!(
3929 "got {:?} packet ({} bytes) from {} using id {}",
3930 packet.header.space(),
3931 packet.payload.len() + packet.header_data.len(),
3932 network_path,
3933 packet.header.dst_cid(),
3934 );
3935 }
3936
3937 if self.is_handshaking() {
3938 if path_id != PathId::ZERO {
3939 debug!(%network_path, %path_id, "discarding multipath packet during handshake");
3940 return;
3941 }
3942 if network_path != self.path_data_mut(path_id).network_path {
3943 if let Some(hs) = self.state.as_handshake() {
3944 if hs.allow_server_migration {
3945 trace!(%network_path, prev = %self.path_data(path_id).network_path, "server migrated to new remote");
3946 self.path_data_mut(path_id).network_path = network_path;
3947 self.qlog.emit_tuple_assigned(path_id, network_path, now);
3948 } else {
3949 debug!("discarding packet with unexpected remote during handshake");
3950 return;
3951 }
3952 } else {
3953 debug!("discarding packet with unexpected remote during handshake");
3954 return;
3955 }
3956 }
3957 }
3958
3959 let was_closed = self.state.is_closed();
3960 let was_drained = self.state.is_drained();
3961
3962 let decrypted = match packet {
3963 None => Err(None),
3964 Some(mut packet) => self
3965 .decrypt_packet(now, path_id, &mut packet)
3966 .map(move |number| (packet, number)),
3967 };
3968 let result = match decrypted {
3969 _ if stateless_reset => {
3970 debug!("got stateless reset");
3971 Err(ConnectionError::Reset)
3972 }
3973 Err(Some(e)) => {
3974 warn!("illegal packet: {}", e);
3975 Err(e.into())
3976 }
3977 Err(None) => {
3978 debug!("failed to authenticate packet");
3979 self.authentication_failures += 1;
3980 let integrity_limit = self
3981 .crypto_state
3982 .integrity_limit(self.highest_space)
3983 .unwrap();
3984 if self.authentication_failures > integrity_limit {
3985 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3986 } else {
3987 return;
3988 }
3989 }
3990 Ok((packet, number)) => {
3991 qlog.header(&packet.header, number, path_id);
3992 let span = match number {
3993 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3994 None => trace_span!("recv", space = ?packet.header.space()),
3995 };
3996 let _guard = span.enter();
3997
3998 let dedup = self.spaces[packet.header.space()]
3999 .path_space_mut(path_id)
4000 .map(|pns| &mut pns.dedup);
4001 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
4002 debug!("discarding possible duplicate packet");
4003 self.qlog.emit_packet_received(qlog, now);
4004 return;
4005 } else if self.state.is_handshake() && packet.header.is_short() {
4006 trace!("dropping short packet during handshake");
4008 self.qlog.emit_packet_received(qlog, now);
4009 return;
4010 } else {
4011 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header
4012 && let Some(hs) = self.state.as_handshake()
4013 && self.side.is_server()
4014 && token != &hs.expected_token
4015 {
4016 warn!("discarding Initial with invalid retry token");
4020 self.qlog.emit_packet_received(qlog, now);
4021 return;
4022 }
4023
4024 if !self.state.is_closed() {
4025 let spin = match packet.header {
4026 Header::Short { spin, .. } => spin,
4027 _ => false,
4028 };
4029
4030 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
4031 self.ensure_path(path_id, network_path, now, number);
4033 }
4034 if self.paths.contains_key(&path_id) {
4035 self.on_packet_authenticated(
4036 now,
4037 packet.header.space(),
4038 path_id,
4039 ecn,
4040 number,
4041 spin,
4042 packet.header.is_1rtt(),
4043 );
4044 }
4045 }
4046
4047 let res = self.process_decrypted_packet(
4048 now,
4049 network_path,
4050 path_id,
4051 number,
4052 packet,
4053 &mut qlog,
4054 );
4055
4056 self.qlog.emit_packet_received(qlog, now);
4057 res
4058 }
4059 }
4060 };
4061
4062 if let Err(conn_err) = result {
4064 match conn_err {
4065 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
4066 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
4067 ConnectionError::Reset
4068 | ConnectionError::TransportError(TransportError {
4069 code: TransportErrorCode::AEAD_LIMIT_REACHED,
4070 ..
4071 }) => {
4072 self.state.move_to_drained(Some(conn_err));
4073 }
4074 ConnectionError::TimedOut => {
4075 unreachable!("timeouts aren't generated by packet processing");
4076 }
4077 ConnectionError::TransportError(err) => {
4078 debug!("closing connection due to transport error: {}", err);
4079 self.state.move_to_closed(err);
4080 }
4081 ConnectionError::VersionMismatch => {
4082 self.state.move_to_draining(Some(conn_err));
4083 }
4084 ConnectionError::LocallyClosed => {
4085 unreachable!("LocallyClosed isn't generated by packet processing");
4086 }
4087 ConnectionError::CidsExhausted => {
4088 unreachable!("CidsExhausted isn't generated by packet processing");
4089 }
4090 };
4091 }
4092
4093 if !was_closed && self.state.is_closed() {
4094 self.close_common();
4095 if !self.state.is_drained() {
4096 self.set_close_timer(now);
4097 }
4098 }
4099 if !was_drained && self.state.is_drained() {
4100 self.endpoint_events.push_back(EndpointEventInner::Drained);
4101 self.timers
4104 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
4105 }
4106
4107 if matches!(self.state.as_type(), StateType::Closed) {
4114 if self
4132 .paths
4133 .get(&path_id)
4134 .map(|p| p.data.validated && p.data.network_path == network_path)
4135 .unwrap_or(false)
4136 {
4137 self.connection_close_pending = true;
4138 }
4139 }
4140 }
4141
4142 fn process_decrypted_packet(
4143 &mut self,
4144 now: Instant,
4145 network_path: FourTuple,
4146 path_id: PathId,
4147 number: Option<u64>,
4148 packet: Packet,
4149 qlog: &mut QlogRecvPacket,
4150 ) -> Result<(), ConnectionError> {
4151 if !self.paths.contains_key(&path_id) {
4152 trace!(%path_id, ?number, "discarding packet for unknown path");
4156 return Ok(());
4157 }
4158 let state = match self.state.as_type() {
4159 StateType::Established => {
4160 match packet.header.space() {
4161 SpaceKind::Data => self.process_payload(
4162 now,
4163 network_path,
4164 path_id,
4165 number.unwrap(),
4166 packet,
4167 qlog,
4168 )?,
4169 _ if packet.header.has_frames() => {
4170 self.process_early_payload(now, path_id, packet, qlog)?
4171 }
4172 _ => {
4173 trace!("discarding unexpected pre-handshake packet");
4174 }
4175 }
4176 return Ok(());
4177 }
4178 StateType::Closed => {
4179 for result in frame::Iter::new(packet.payload.freeze())? {
4180 let frame = match result {
4181 Ok(frame) => frame,
4182 Err(err) => {
4183 debug!("frame decoding error: {err:?}");
4184 continue;
4185 }
4186 };
4187 qlog.frame(&frame);
4188
4189 if let Frame::Padding = frame {
4190 continue;
4191 };
4192
4193 self.stats.frame_rx.record(frame.ty());
4194
4195 if let Frame::Close(_error) = frame {
4196 self.state.move_to_draining(None);
4197 break;
4198 }
4199 }
4200 return Ok(());
4201 }
4202 StateType::Draining | StateType::Drained => return Ok(()),
4203 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
4204 };
4205
4206 match packet.header {
4207 Header::Retry {
4208 src_cid: remote_cid,
4209 ..
4210 } => {
4211 debug_assert_eq!(path_id, PathId::ZERO);
4212 if self.side.is_server() {
4213 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
4214 }
4215
4216 let is_valid_retry = self
4217 .remote_cids
4218 .get(&path_id)
4219 .map(|cids| cids.active())
4220 .map(|orig_dst_cid| {
4221 self.crypto_state.session.is_valid_retry(
4222 orig_dst_cid,
4223 &packet.header_data,
4224 &packet.payload,
4225 )
4226 })
4227 .unwrap_or_default();
4228 if self.total_authed_packets > 1
4229 || packet.payload.len() <= 16 || !is_valid_retry
4231 {
4232 trace!("discarding invalid Retry");
4233 return Ok(());
4241 }
4242
4243 trace!("retrying with CID {}", remote_cid);
4244 let client_hello = state.client_hello.take().unwrap();
4245 self.retry_src_cid = Some(remote_cid);
4246 self.remote_cids
4247 .get_mut(&path_id)
4248 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
4249 .update_initial_cid(remote_cid);
4250 self.remote_handshake_cid = remote_cid;
4251
4252 let space = &mut self.spaces[SpaceId::Initial];
4253 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
4254 self.on_packet_acked(now, PathId::ZERO, info);
4255 };
4256
4257 self.discard_space(now, SpaceKind::Initial); let crypto_space = &mut self.crypto_state.spaces[SpaceKind::Initial];
4260 crypto_space.keys = Some(
4261 self.crypto_state
4262 .session
4263 .initial_keys(remote_cid, self.side.side()),
4264 );
4265 crypto_space.crypto_offset = client_hello.len() as u64;
4266
4267 let next_pn = self.spaces[SpaceId::Initial]
4268 .for_path(path_id)
4269 .next_packet_number;
4270 self.spaces[SpaceId::Initial] = {
4271 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
4272 space.for_path(path_id).next_packet_number = next_pn;
4273 space.pending.crypto.push_back(frame::Crypto {
4274 offset: 0,
4275 data: client_hello,
4276 });
4277 space
4278 };
4279
4280 let zero_rtt = mem::take(
4282 &mut self.spaces[SpaceId::Data]
4283 .for_path(PathId::ZERO)
4284 .sent_packets,
4285 );
4286 for (_, info) in zero_rtt.into_iter() {
4287 self.paths
4288 .get_mut(&PathId::ZERO)
4289 .unwrap()
4290 .remove_in_flight(&info);
4291 self.spaces[SpaceId::Data].pending |= info.retransmits;
4292 }
4293 self.streams.retransmit_all_for_0rtt();
4294
4295 let token_len = packet.payload.len() - 16;
4296 let ConnectionSide::Client { ref mut token, .. } = self.side else {
4297 unreachable!("we already short-circuited if we're server");
4298 };
4299 *token = packet.payload.freeze().split_to(token_len);
4300
4301 self.state = State::handshake(state::Handshake {
4302 expected_token: Bytes::new(),
4303 remote_cid_set: false,
4304 client_hello: None,
4305 allow_server_migration: true,
4306 });
4307 Ok(())
4308 }
4309 Header::Long {
4310 ty: LongType::Handshake,
4311 src_cid: remote_cid,
4312 dst_cid: local_cid,
4313 ..
4314 } => {
4315 debug_assert_eq!(path_id, PathId::ZERO);
4316 if remote_cid != self.remote_handshake_cid {
4317 debug!(
4318 "discarding packet with mismatched remote CID: {} != {}",
4319 self.remote_handshake_cid, remote_cid
4320 );
4321 return Ok(());
4322 }
4323 self.on_path_validated(path_id);
4324
4325 self.process_early_payload(now, path_id, packet, qlog)?;
4326 if self.state.is_closed() {
4327 return Ok(());
4328 }
4329
4330 if self.crypto_state.session.is_handshaking() {
4331 trace!("handshake ongoing");
4332 return Ok(());
4333 }
4334
4335 if self.side.is_client() {
4336 let params = self
4338 .crypto_state
4339 .session
4340 .transport_parameters()?
4341 .ok_or_else(|| {
4342 TransportError::new(
4343 TransportErrorCode::crypto(0x6d),
4344 "transport parameters missing".to_owned(),
4345 )
4346 })?;
4347
4348 if self.has_0rtt() {
4349 if !self.crypto_state.session.early_data_accepted().unwrap() {
4350 debug_assert!(self.side.is_client());
4351 debug!("0-RTT rejected");
4352 self.crypto_state.accepted_0rtt = false;
4353 self.streams.zero_rtt_rejected();
4354
4355 self.spaces[SpaceId::Data].pending = Retransmits::default();
4357
4358 let sent_packets = mem::take(
4360 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4361 );
4362 for (_, packet) in sent_packets.into_iter() {
4363 self.paths
4364 .get_mut(&path_id)
4365 .unwrap()
4366 .remove_in_flight(&packet);
4367 }
4368 } else {
4369 self.crypto_state.accepted_0rtt = true;
4370 params.validate_resumption_from(&self.peer_params)?;
4371 }
4372 }
4373 if let Some(token) = params.stateless_reset_token {
4374 let remote = self.path_data(path_id).network_path.remote;
4375 debug_assert!(!self.state.is_drained()); self.endpoint_events
4377 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4378 }
4379 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4380 self.issue_first_cids(now);
4381 } else {
4382 self.spaces[SpaceId::Data].pending.handshake_done = true;
4384 self.discard_space(now, SpaceKind::Handshake);
4385 self.events.push_back(Event::HandshakeConfirmed);
4386 trace!("handshake confirmed");
4387 }
4388
4389 self.events.push_back(Event::Connected);
4390 self.state.move_to_established();
4391 trace!("established");
4392
4393 self.issue_first_path_cids(now);
4396 Ok(())
4397 }
4398 Header::Initial(InitialHeader {
4399 src_cid: remote_cid,
4400 dst_cid: local_cid,
4401 ..
4402 }) => {
4403 debug_assert_eq!(path_id, PathId::ZERO);
4404 if !state.remote_cid_set {
4405 trace!("switching remote CID to {}", remote_cid);
4406 let mut state = state.clone();
4407 self.remote_cids
4408 .get_mut(&path_id)
4409 .expect("PathId::ZERO not yet abandoned")
4410 .update_initial_cid(remote_cid);
4411 self.remote_handshake_cid = remote_cid;
4412 self.original_remote_cid = remote_cid;
4413 state.remote_cid_set = true;
4414 self.state.move_to_handshake(state);
4415 } else if remote_cid != self.remote_handshake_cid {
4416 debug!(
4417 "discarding packet with mismatched remote CID: {} != {}",
4418 self.remote_handshake_cid, remote_cid
4419 );
4420 return Ok(());
4421 }
4422
4423 let starting_space = self.highest_space;
4424 self.process_early_payload(now, path_id, packet, qlog)?;
4425
4426 if self.side.is_server()
4427 && starting_space == SpaceKind::Initial
4428 && self.highest_space != SpaceKind::Initial
4429 {
4430 let params = self
4431 .crypto_state
4432 .session
4433 .transport_parameters()?
4434 .ok_or_else(|| {
4435 TransportError::new(
4436 TransportErrorCode::crypto(0x6d),
4437 "transport parameters missing".to_owned(),
4438 )
4439 })?;
4440 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4441 self.issue_first_cids(now);
4442 self.init_0rtt(now);
4443 }
4444 Ok(())
4445 }
4446 Header::Long {
4447 ty: LongType::ZeroRtt,
4448 ..
4449 } => {
4450 self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4451 Ok(())
4452 }
4453 Header::VersionNegotiate { .. } => {
4454 if self.total_authed_packets > 1 {
4455 return Ok(());
4456 }
4457 let supported = packet
4458 .payload
4459 .chunks(4)
4460 .any(|x| match <[u8; 4]>::try_from(x) {
4461 Ok(version) => self.version == u32::from_be_bytes(version),
4462 Err(_) => false,
4463 });
4464 if supported {
4465 return Ok(());
4466 }
4467 debug!("remote doesn't support our version");
4468 Err(ConnectionError::VersionMismatch)
4469 }
4470 Header::Short { .. } => unreachable!(
4471 "short packets received during handshake are discarded in handle_packet"
4472 ),
4473 }
4474 }
4475
4476 fn process_early_payload(
4478 &mut self,
4479 now: Instant,
4480 path_id: PathId,
4481 packet: Packet,
4482 #[allow(unused)] qlog: &mut QlogRecvPacket,
4483 ) -> Result<(), TransportError> {
4484 debug_assert_ne!(packet.header.space(), SpaceKind::Data);
4485 debug_assert_eq!(path_id, PathId::ZERO);
4486 let payload_len = packet.payload.len();
4487 let mut ack_eliciting = false;
4488 for result in frame::Iter::new(packet.payload.freeze())? {
4489 let frame = result?;
4490 qlog.frame(&frame);
4491 let span = match frame {
4492 Frame::Padding => continue,
4493 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4494 };
4495
4496 self.stats.frame_rx.record(frame.ty());
4497
4498 let _guard = span.as_ref().map(|x| x.enter());
4499 ack_eliciting |= frame.is_ack_eliciting();
4500
4501 if frame.is_1rtt() && packet.header.space() != SpaceKind::Data {
4503 return Err(TransportError::PROTOCOL_VIOLATION(
4504 "illegal frame type in handshake",
4505 ));
4506 }
4507
4508 match frame {
4509 Frame::Padding | Frame::Ping => {}
4510 Frame::Crypto(frame) => {
4511 self.read_crypto(packet.header.space().into(), &frame, payload_len)?;
4512 }
4513 Frame::Ack(ack) => {
4514 self.on_ack_received(now, packet.header.space().into(), ack)?;
4515 }
4516 Frame::PathAck(ack) => {
4517 span.as_ref()
4518 .map(|span| span.record("path", tracing::field::display(&ack.path_id)));
4519 self.on_path_ack_received(now, packet.header.space().into(), ack)?;
4520 }
4521 Frame::Close(reason) => {
4522 self.state.move_to_draining(Some(reason.into()));
4523 return Ok(());
4524 }
4525 _ => {
4526 let mut err =
4527 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4528 err.frame = frame::MaybeFrame::Known(frame.ty());
4529 return Err(err);
4530 }
4531 }
4532 }
4533
4534 if ack_eliciting {
4535 self.spaces[packet.header.space()]
4537 .for_path(path_id)
4538 .pending_acks
4539 .set_immediate_ack_required();
4540 }
4541
4542 self.write_crypto();
4543 Ok(())
4544 }
4545
4546 fn process_payload(
4548 &mut self,
4549 now: Instant,
4550 network_path: FourTuple,
4551 path_id: PathId,
4552 number: u64,
4553 packet: Packet,
4554 #[allow(unused)] qlog: &mut QlogRecvPacket,
4555 ) -> Result<(), TransportError> {
4556 let is_multipath_negotiated = self.is_multipath_negotiated();
4557 let payload = packet.payload.freeze();
4558 let mut is_probing_packet = true;
4559 let mut close = None;
4560 let payload_len = payload.len();
4561 let mut ack_eliciting = false;
4562 let mut migration_observed_addr = None;
4565 for result in frame::Iter::new(payload)? {
4566 let frame = result?;
4567 qlog.frame(&frame);
4568 let span = match frame {
4569 Frame::Padding => continue,
4570 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4571 };
4572
4573 self.stats.frame_rx.record(frame.ty());
4574 match &frame {
4577 Frame::Crypto(f) => {
4578 trace!(offset = f.offset, len = f.data.len(), "got frame CRYPTO");
4579 }
4580 Frame::Stream(f) => {
4581 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got frame STREAM");
4582 }
4583 Frame::Datagram(f) => {
4584 trace!(len = f.data.len(), "got frame DATAGRAM");
4585 }
4586 f => {
4587 trace!("got frame {f}");
4588 }
4589 }
4590
4591 let _guard = span.enter();
4592 if packet.header.is_0rtt() {
4593 match frame {
4594 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4595 return Err(TransportError::PROTOCOL_VIOLATION(
4596 "illegal frame type in 0-RTT",
4597 ));
4598 }
4599 _ => {
4600 if frame.is_1rtt() {
4601 return Err(TransportError::PROTOCOL_VIOLATION(
4602 "illegal frame type in 0-RTT",
4603 ));
4604 }
4605 }
4606 }
4607 }
4608 ack_eliciting |= frame.is_ack_eliciting();
4609
4610 match frame {
4612 Frame::Padding
4613 | Frame::PathChallenge(_)
4614 | Frame::PathResponse(_)
4615 | Frame::NewConnectionId(_)
4616 | Frame::ObservedAddr(_) => {}
4617 _ => {
4618 is_probing_packet = false;
4619 }
4620 }
4621
4622 match frame {
4623 Frame::Crypto(frame) => {
4624 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4625 }
4626 Frame::Stream(frame) => {
4627 if self.streams.received(frame, payload_len)?.should_transmit() {
4628 self.spaces[SpaceId::Data].pending.max_data = true;
4629 }
4630 }
4631 Frame::Ack(ack) => {
4632 self.on_ack_received(now, SpaceId::Data, ack)?;
4633 }
4634 Frame::PathAck(ack) => {
4635 span.record("path", tracing::field::display(&ack.path_id));
4636 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4637 }
4638 Frame::Padding | Frame::Ping => {}
4639 Frame::Close(reason) => {
4640 close = Some(reason);
4641 }
4642 Frame::PathChallenge(challenge) => {
4643 let path = &mut self
4644 .path_mut(path_id)
4645 .expect("payload is processed only after the path becomes known");
4646 path.path_responses.push(number, challenge.0, network_path);
4647 if network_path == path.network_path {
4650 match self.peer_supports_ack_frequency() {
4660 true => self.immediate_ack(path_id),
4661 false => {
4662 self.ping_path(path_id).ok();
4663 }
4664 }
4665 }
4666 }
4667 Frame::PathResponse(response) => {
4668 let path = self
4669 .paths
4670 .get_mut(&path_id)
4671 .expect("payload is processed only after the path becomes known");
4672
4673 use PathTimer::*;
4674 use paths::OnPathResponseReceived::*;
4675 match path
4676 .data
4677 .on_path_response_received(now, response.0, network_path)
4678 {
4679 OnPath { was_open } => {
4680 let qlog = self.qlog.with_time(now);
4681
4682 self.timers
4683 .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4684 self.timers
4685 .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4686
4687 let next_challenge = path
4688 .data
4689 .earliest_on_path_expiring_challenge()
4690 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4691 self.timers.set_or_stop(
4692 Timer::PerPath(path_id, PathChallengeLost),
4693 next_challenge,
4694 qlog,
4695 );
4696
4697 if !was_open {
4698 if is_multipath_negotiated {
4699 self.events
4700 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4701 }
4702 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4703 {
4704 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4705 id: path_id,
4706 addr: observed.socket_addr(),
4707 }));
4708 }
4709 }
4710 if let Some((_, ref mut prev)) = path.prev {
4711 prev.reset_on_path_challenges();
4712 }
4713 }
4714 OffPath => {
4715 debug!(%response, "Valid response to off-path PATH_CHALLENGE");
4716 }
4717 Ignored {
4718 sent_on,
4719 current_path,
4720 } => {
4721 debug!(%sent_on, %current_path, %response, "ignoring valid PATH_RESPONSE")
4722 }
4723 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4724 }
4725 }
4726 Frame::MaxData(frame::MaxData(bytes)) => {
4727 self.streams.received_max_data(bytes);
4728 }
4729 Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
4730 self.streams.received_max_stream_data(id, offset)?;
4731 }
4732 Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
4733 self.streams.received_max_streams(dir, count)?;
4734 }
4735 Frame::ResetStream(frame) => {
4736 if self.streams.received_reset(frame)?.should_transmit() {
4737 self.spaces[SpaceId::Data].pending.max_data = true;
4738 }
4739 }
4740 Frame::DataBlocked(DataBlocked(offset)) => {
4741 debug!(offset, "peer claims to be blocked at connection level");
4742 }
4743 Frame::StreamDataBlocked(StreamDataBlocked { id, offset }) => {
4744 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4745 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4746 return Err(TransportError::STREAM_STATE_ERROR(
4747 "STREAM_DATA_BLOCKED on send-only stream",
4748 ));
4749 }
4750 debug!(
4751 stream = %id,
4752 offset, "peer claims to be blocked at stream level"
4753 );
4754 }
4755 Frame::StreamsBlocked(StreamsBlocked { dir, limit }) => {
4756 if limit > MAX_STREAM_COUNT {
4757 return Err(TransportError::FRAME_ENCODING_ERROR(
4758 "unrepresentable stream limit",
4759 ));
4760 }
4761 debug!(
4762 "peer claims to be blocked opening more than {} {} streams",
4763 limit, dir
4764 );
4765 }
4766 Frame::StopSending(frame::StopSending { id, error_code }) => {
4767 if id.initiator() != self.side.side() {
4768 if id.dir() == Dir::Uni {
4769 debug!("got STOP_SENDING on recv-only {}", id);
4770 return Err(TransportError::STREAM_STATE_ERROR(
4771 "STOP_SENDING on recv-only stream",
4772 ));
4773 }
4774 } else if self.streams.is_local_unopened(id) {
4775 return Err(TransportError::STREAM_STATE_ERROR(
4776 "STOP_SENDING on unopened stream",
4777 ));
4778 }
4779 self.streams.received_stop_sending(id, error_code);
4780 }
4781 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4782 if let Some(ref path_id) = path_id {
4783 span.record("path", tracing::field::display(&path_id));
4784 }
4785 let path_id = path_id.unwrap_or_default();
4786 match self.local_cid_state.get_mut(&path_id) {
4787 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4788 Some(cid_state) => {
4789 let allow_more_cids = cid_state
4790 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4791
4792 let has_path = !self.abandoned_paths.contains(&path_id);
4796 let allow_more_cids = allow_more_cids && has_path;
4797
4798 debug_assert!(!self.state.is_drained()); self.endpoint_events
4800 .push_back(EndpointEventInner::RetireConnectionId(
4801 now,
4802 path_id,
4803 sequence,
4804 allow_more_cids,
4805 ));
4806 }
4807 }
4808 }
4809 Frame::NewConnectionId(frame) => {
4810 let path_id = if let Some(path_id) = frame.path_id {
4811 if !self.is_multipath_negotiated() {
4812 return Err(TransportError::PROTOCOL_VIOLATION(
4813 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4814 ));
4815 }
4816 if path_id > self.local_max_path_id {
4817 return Err(TransportError::PROTOCOL_VIOLATION(
4818 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4819 ));
4820 }
4821 path_id
4822 } else {
4823 PathId::ZERO
4824 };
4825
4826 if let Some(ref path_id) = frame.path_id {
4827 span.record("path", tracing::field::display(&path_id));
4828 }
4829
4830 if self.abandoned_paths.contains(&path_id) {
4831 trace!("ignoring issued CID for abandoned path");
4832 continue;
4833 }
4834 let remote_cids = self
4835 .remote_cids
4836 .entry(path_id)
4837 .or_insert_with(|| CidQueue::new(frame.id));
4838 if remote_cids.active().is_empty() {
4839 return Err(TransportError::PROTOCOL_VIOLATION(
4840 "NEW_CONNECTION_ID when CIDs aren't in use",
4841 ));
4842 }
4843 if frame.retire_prior_to > frame.sequence {
4844 return Err(TransportError::PROTOCOL_VIOLATION(
4845 "NEW_CONNECTION_ID retiring unissued CIDs",
4846 ));
4847 }
4848
4849 use crate::cid_queue::InsertError;
4850 match remote_cids.insert(frame) {
4851 Ok(None) if self.path(path_id).is_none() => {
4852 self.continue_nat_traversal_round(now);
4855 }
4856 Ok(None) => {}
4857 Ok(Some((retired, reset_token))) => {
4858 let pending_retired =
4859 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4860 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4863 if (pending_retired.len() as u64)
4866 .saturating_add(retired.end.saturating_sub(retired.start))
4867 > MAX_PENDING_RETIRED_CIDS
4868 {
4869 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4870 "queued too many retired CIDs",
4871 ));
4872 }
4873 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4874 self.set_reset_token(path_id, network_path.remote, reset_token);
4875 }
4876 Err(InsertError::ExceedsLimit) => {
4877 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4878 }
4879 Err(InsertError::Retired) => {
4880 trace!("discarding already-retired");
4881 self.spaces[SpaceId::Data]
4885 .pending
4886 .retire_cids
4887 .push((path_id, frame.sequence));
4888 continue;
4889 }
4890 };
4891
4892 if self.side.is_server()
4893 && path_id == PathId::ZERO
4894 && self
4895 .remote_cids
4896 .get(&PathId::ZERO)
4897 .map(|cids| cids.active_seq() == 0)
4898 .unwrap_or_default()
4899 {
4900 self.update_remote_cid(PathId::ZERO);
4903 }
4904 }
4905 Frame::NewToken(NewToken { token }) => {
4906 let ConnectionSide::Client {
4907 token_store,
4908 server_name,
4909 ..
4910 } = &self.side
4911 else {
4912 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4913 };
4914 if token.is_empty() {
4915 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4916 }
4917 trace!("got new token");
4918 token_store.insert(server_name, token);
4919 }
4920 Frame::Datagram(datagram) => {
4921 if self
4922 .datagrams
4923 .received(datagram, &self.config.datagram_receive_buffer_size)?
4924 {
4925 self.events.push_back(Event::DatagramReceived);
4926 }
4927 }
4928 Frame::AckFrequency(ack_frequency) => {
4929 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4932 continue;
4935 }
4936
4937 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4939 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4940
4941 if let Some(timeout) = space
4944 .pending_acks
4945 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4946 {
4947 self.timers.set(
4948 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4949 timeout,
4950 self.qlog.with_time(now),
4951 );
4952 }
4953 }
4954 }
4955 Frame::ImmediateAck => {
4956 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4958 pns.pending_acks.set_immediate_ack_required();
4959 }
4960 }
4961 Frame::HandshakeDone => {
4962 if self.side.is_server() {
4963 return Err(TransportError::PROTOCOL_VIOLATION(
4964 "client sent HANDSHAKE_DONE",
4965 ));
4966 }
4967 if self.crypto_state.has_keys(EncryptionLevel::Handshake) {
4968 self.discard_space(now, SpaceKind::Handshake);
4969 }
4970 self.events.push_back(Event::HandshakeConfirmed);
4971 trace!("handshake confirmed");
4972 }
4973 Frame::ObservedAddr(observed) => {
4974 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4976 if !self
4977 .peer_params
4978 .address_discovery_role
4979 .should_report(&self.config.address_discovery_role)
4980 {
4981 return Err(TransportError::PROTOCOL_VIOLATION(
4982 "received OBSERVED_ADDRESS frame when not negotiated",
4983 ));
4984 }
4985 if packet.header.space() != SpaceKind::Data {
4987 return Err(TransportError::PROTOCOL_VIOLATION(
4988 "OBSERVED_ADDRESS frame outside data space",
4989 ));
4990 }
4991
4992 let path = self.path_data_mut(path_id);
4993 if network_path == path.network_path {
4994 if let Some(updated) = path.update_observed_addr_report(observed)
4995 && path.open_status == paths::OpenStatus::Informed
4996 {
4997 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4998 id: path_id,
4999 addr: updated,
5000 }));
5001 }
5003 } else {
5004 migration_observed_addr = Some(observed)
5006 }
5007 }
5008 Frame::PathAbandon(frame::PathAbandon {
5009 path_id,
5010 error_code,
5011 }) => {
5012 span.record("path", tracing::field::display(&path_id));
5013 match self.close_path_inner(
5014 now,
5015 path_id,
5016 PathAbandonReason::RemoteAbandoned {
5017 error_code: error_code.into(),
5018 },
5019 ) {
5020 Ok(()) => {
5021 trace!("peer abandoned path");
5022 }
5023 Err(ClosePathError::LastOpenPath) => {
5024 trace!("peer abandoned last path, closing connection");
5025 return Err(TransportError::NO_VIABLE_PATH(
5026 "last path abandoned by peer",
5027 ));
5028 }
5029 Err(ClosePathError::ClosedPath) => {
5030 trace!("peer abandoned already closed path");
5031 }
5032 Err(ClosePathError::MultipathNotNegotiated) => {
5033 return Err(TransportError::PROTOCOL_VIOLATION(
5034 "received PATH_ABANDON frame when multipath was not negotiated",
5035 ));
5036 }
5037 };
5038
5039 if let Some(path) = self.paths.get_mut(&path_id)
5041 && !mem::replace(&mut path.data.draining, true)
5042 {
5043 let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5044 let pto = path.data.rtt.pto_base() + ack_delay;
5045 self.timers.set(
5046 Timer::PerPath(path_id, PathTimer::DiscardPath),
5047 now + 3 * pto,
5048 self.qlog.with_time(now),
5049 );
5050
5051 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
5052 }
5053 }
5054 Frame::PathStatusAvailable(info) => {
5055 span.record("path", tracing::field::display(&info.path_id));
5056 if self.is_multipath_negotiated() {
5057 self.on_path_status(
5058 info.path_id,
5059 PathStatus::Available,
5060 info.status_seq_no,
5061 );
5062 } else {
5063 return Err(TransportError::PROTOCOL_VIOLATION(
5064 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
5065 ));
5066 }
5067 }
5068 Frame::PathStatusBackup(info) => {
5069 span.record("path", tracing::field::display(&info.path_id));
5070 if self.is_multipath_negotiated() {
5071 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
5072 } else {
5073 return Err(TransportError::PROTOCOL_VIOLATION(
5074 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
5075 ));
5076 }
5077 }
5078 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
5079 span.record("path", tracing::field::display(&path_id));
5080 if !self.is_multipath_negotiated() {
5081 return Err(TransportError::PROTOCOL_VIOLATION(
5082 "received MAX_PATH_ID frame when multipath was not negotiated",
5083 ));
5084 }
5085 if path_id > self.remote_max_path_id {
5087 self.remote_max_path_id = path_id;
5088 self.issue_first_path_cids(now);
5089 while let Some(true) = self.continue_nat_traversal_round(now) {}
5090 }
5091 }
5092 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
5093 if self.is_multipath_negotiated() {
5097 if max_path_id > self.local_max_path_id {
5098 return Err(TransportError::PROTOCOL_VIOLATION(
5099 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
5100 ));
5101 }
5102 debug!("received PATHS_BLOCKED({:?})", max_path_id);
5103 } else {
5105 return Err(TransportError::PROTOCOL_VIOLATION(
5106 "received PATHS_BLOCKED frame when not multipath was not negotiated",
5107 ));
5108 }
5109 }
5110 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
5111 if self.is_multipath_negotiated() {
5119 if path_id > self.local_max_path_id {
5120 return Err(TransportError::PROTOCOL_VIOLATION(
5121 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
5122 ));
5123 }
5124 if next_seq.0
5125 > self
5126 .local_cid_state
5127 .get(&path_id)
5128 .map(|cid_state| cid_state.active_seq().1 + 1)
5129 .unwrap_or_default()
5130 {
5131 return Err(TransportError::PROTOCOL_VIOLATION(
5132 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
5133 ));
5134 }
5135 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
5136 } else {
5137 return Err(TransportError::PROTOCOL_VIOLATION(
5138 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
5139 ));
5140 }
5141 }
5142 Frame::AddAddress(addr) => {
5143 let client_state = match self.n0_nat_traversal.client_side_mut() {
5144 Ok(state) => state,
5145 Err(err) => {
5146 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5147 "Nat traversal(ADD_ADDRESS): {err}"
5148 )));
5149 }
5150 };
5151
5152 if !client_state.check_remote_address(&addr) {
5153 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
5155 }
5156
5157 match client_state.add_remote_address(addr) {
5158 Ok(maybe_added) => {
5159 if let Some(added) = maybe_added {
5160 self.events.push_back(Event::NatTraversal(
5161 n0_nat_traversal::Event::AddressAdded(added),
5162 ));
5163 }
5164 }
5165 Err(e) => {
5166 warn!(%e, "failed to add remote address")
5167 }
5168 }
5169 }
5170 Frame::RemoveAddress(addr) => {
5171 let client_state = match self.n0_nat_traversal.client_side_mut() {
5172 Ok(state) => state,
5173 Err(err) => {
5174 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5175 "Nat traversal(REMOVE_ADDRESS): {err}"
5176 )));
5177 }
5178 };
5179 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
5180 self.events.push_back(Event::NatTraversal(
5181 n0_nat_traversal::Event::AddressRemoved(removed_addr),
5182 ));
5183 }
5184 }
5185 Frame::ReachOut(reach_out) => {
5186 let ipv6 = self.is_ipv6();
5187 let server_state = match self.n0_nat_traversal.server_side_mut() {
5188 Ok(state) => state,
5189 Err(err) => {
5190 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5191 "Nat traversal(REACH_OUT): {err}"
5192 )));
5193 }
5194 };
5195
5196 if let Err(err) = server_state.handle_reach_out(reach_out, ipv6) {
5197 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5198 "Nat traversal(REACH_OUT): {err}"
5199 )));
5200 }
5201 }
5202 }
5203 }
5204
5205 let space = self.spaces[SpaceId::Data].for_path(path_id);
5206 if space
5207 .pending_acks
5208 .packet_received(now, number, ack_eliciting, &space.dedup)
5209 {
5210 if self.abandoned_paths.contains(&path_id) {
5211 space.pending_acks.set_immediate_ack_required();
5214 } else {
5215 self.timers.set(
5216 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
5217 now + self.ack_frequency.max_ack_delay,
5218 self.qlog.with_time(now),
5219 );
5220 }
5221 }
5222
5223 let pending = &mut self.spaces[SpaceId::Data].pending;
5228 self.streams.queue_max_stream_id(pending);
5229
5230 if let Some(reason) = close {
5231 self.state.move_to_draining(Some(reason.into()));
5232 self.connection_close_pending = true;
5233 }
5234
5235 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
5236 && !is_probing_packet
5237 && network_path != self.path_data(path_id).network_path
5238 {
5239 let ConnectionSide::Server { ref server_config } = self.side else {
5240 panic!("packets from unknown remote should be dropped by clients");
5241 };
5242 debug_assert!(
5243 server_config.migration,
5244 "migration-initiating packets should have been dropped immediately"
5245 );
5246 self.migrate(path_id, now, network_path, migration_observed_addr);
5247 self.update_remote_cid(path_id);
5249 self.spin = false;
5250 }
5251
5252 Ok(())
5253 }
5254
5255 fn migrate(
5256 &mut self,
5257 path_id: PathId,
5258 now: Instant,
5259 network_path: FourTuple,
5260 observed_addr: Option<ObservedAddr>,
5261 ) {
5262 trace!(%network_path, %path_id, "migration initiated");
5263 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
5264 let prev_pto = self.pto(SpaceKind::Data, path_id);
5271 let path = self.paths.get_mut(&path_id).expect("known path");
5272 let mut new_path_data = if network_path.remote.is_ipv4()
5273 && network_path.remote.ip() == path.data.network_path.remote.ip()
5274 {
5275 PathData::from_previous(network_path, &path.data, self.path_generation_counter, now)
5276 } else {
5277 let peer_max_udp_payload_size =
5278 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
5279 .unwrap_or(u16::MAX);
5280 PathData::new(
5281 network_path,
5282 self.allow_mtud,
5283 Some(peer_max_udp_payload_size),
5284 self.path_generation_counter,
5285 now,
5286 &self.config,
5287 )
5288 };
5289 new_path_data.last_observed_addr_report = path.data.last_observed_addr_report.clone();
5290 if let Some(report) = observed_addr
5291 && let Some(updated) = new_path_data.update_observed_addr_report(report)
5292 {
5293 tracing::info!("adding observed addr event from migration");
5294 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5295 id: path_id,
5296 addr: updated,
5297 }));
5298 }
5299 new_path_data.pending_on_path_challenge = true;
5300
5301 let mut prev_path_data = mem::replace(&mut path.data, new_path_data);
5302
5303 if !prev_path_data.validated
5312 && let Some(cid) = self.remote_cids.get(&path_id).map(CidQueue::active)
5313 {
5314 prev_path_data.pending_on_path_challenge = true;
5315 path.prev = Some((cid, prev_path_data));
5318 }
5319
5320 self.qlog.emit_tuple_assigned(path_id, network_path, now);
5322
5323 self.timers.set(
5324 Timer::PerPath(path_id, PathTimer::PathValidation),
5325 now + 3 * cmp::max(self.pto(SpaceKind::Data, path_id), prev_pto),
5326 self.qlog.with_time(now),
5327 );
5328 }
5329
5330 pub fn handle_network_change(&mut self, hint: Option<&dyn NetworkChangeHint>, now: Instant) {
5347 debug!("network changed");
5348 if self.state.is_drained() {
5349 return;
5350 }
5351 if self.highest_space < SpaceKind::Data {
5352 for path in self.paths.values_mut() {
5353 path.data.network_path.local_ip = None;
5355 }
5356
5357 self.update_remote_cid(PathId::ZERO);
5358 self.ping();
5359
5360 return;
5361 }
5362
5363 let mut non_recoverable_paths = Vec::default();
5366 let mut recoverable_paths = Vec::default();
5367 let mut open_paths = 0;
5368
5369 let is_multipath_negotiated = self.is_multipath_negotiated();
5370 let is_client = self.side().is_client();
5371 let immediate_ack_allowed = self.peer_supports_ack_frequency();
5372
5373 for (path_id, path) in self.paths.iter_mut() {
5374 if self.abandoned_paths.contains(path_id) {
5375 continue;
5376 }
5377 open_paths += 1;
5378
5379 path.data.network_path.local_ip = None;
5382
5383 let network_path = path.data.network_path;
5384 let remote = network_path.remote;
5385
5386 let attempt_to_recover = if is_multipath_negotiated {
5390 if is_client {
5391 hint.map(|h| h.is_path_recoverable(*path_id, network_path))
5392 .unwrap_or(false)
5393 } else {
5394 true
5398 }
5399 } else {
5400 true
5402 };
5403
5404 if attempt_to_recover {
5405 recoverable_paths.push((*path_id, remote));
5406 } else {
5407 non_recoverable_paths.push((*path_id, remote, path.data.local_status()))
5408 }
5409 }
5410
5411 let open_first = open_paths == non_recoverable_paths.len();
5420
5421 for (path_id, remote, status) in non_recoverable_paths.into_iter() {
5422 let network_path = FourTuple {
5423 remote,
5424 local_ip: None, };
5426
5427 if open_first && let Err(e) = self.open_path(network_path, status, now) {
5428 debug!(%e,"Failed to open new path for network change");
5429 recoverable_paths.push((path_id, remote));
5431 continue;
5432 }
5433
5434 if let Err(e) =
5435 self.close_path_inner(now, path_id, PathAbandonReason::UnusableAfterNetworkChange)
5436 {
5437 debug!(%e,"Failed to close unrecoverable path after network change");
5438 recoverable_paths.push((path_id, remote));
5439 continue;
5440 }
5441
5442 if !open_first && let Err(e) = self.open_path(network_path, status, now) {
5443 debug!(%e,"Failed to open new path for network change");
5447 }
5448 }
5449
5450 for (path_id, remote) in recoverable_paths.into_iter() {
5453 let space = &mut self.spaces[SpaceId::Data];
5454
5455 if let Some(path_space) = space.number_spaces.get_mut(&path_id) {
5457 path_space.ping_pending = true;
5458
5459 if immediate_ack_allowed {
5460 path_space.immediate_ack_pending = true;
5461 }
5462 }
5463
5464 let Some((reset_token, retired)) =
5465 self.remote_cids.get_mut(&path_id).and_then(CidQueue::next)
5466 else {
5467 continue;
5468 };
5469
5470 space
5472 .pending
5473 .retire_cids
5474 .extend(retired.map(|seq| (path_id, seq)));
5475
5476 debug_assert!(!self.state.is_drained()); self.endpoint_events
5478 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5479 }
5480 }
5481
5482 fn update_remote_cid(&mut self, path_id: PathId) {
5484 let Some((reset_token, retired)) = self
5485 .remote_cids
5486 .get_mut(&path_id)
5487 .and_then(|cids| cids.next())
5488 else {
5489 return;
5490 };
5491
5492 self.spaces[SpaceId::Data]
5494 .pending
5495 .retire_cids
5496 .extend(retired.map(|seq| (path_id, seq)));
5497 let remote = self.path_data(path_id).network_path.remote;
5498 self.set_reset_token(path_id, remote, reset_token);
5499 }
5500
5501 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5510 debug_assert!(!self.state.is_drained()); self.endpoint_events
5512 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5513
5514 if path_id == PathId::ZERO {
5520 self.peer_params.stateless_reset_token = Some(reset_token);
5521 }
5522 }
5523
5524 fn issue_first_cids(&mut self, now: Instant) {
5526 if self
5527 .local_cid_state
5528 .get(&PathId::ZERO)
5529 .expect("PathId::ZERO exists when the connection is created")
5530 .cid_len()
5531 == 0
5532 {
5533 return;
5534 }
5535
5536 let mut n = self.peer_params.issue_cids_limit() - 1;
5538 if let ConnectionSide::Server { server_config } = &self.side
5539 && server_config.has_preferred_address()
5540 {
5541 n -= 1;
5543 }
5544 debug_assert!(!self.state.is_drained()); self.endpoint_events
5546 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5547 }
5548
5549 fn issue_first_path_cids(&mut self, now: Instant) {
5553 if let Some(max_path_id) = self.max_path_id() {
5554 let mut path_id = self.max_path_id_with_cids.next();
5555 while path_id <= max_path_id {
5556 self.endpoint_events
5557 .push_back(EndpointEventInner::NeedIdentifiers(
5558 path_id,
5559 now,
5560 self.peer_params.issue_cids_limit(),
5561 ));
5562 path_id = path_id.next();
5563 }
5564 self.max_path_id_with_cids = max_path_id;
5565 }
5566 }
5567
5568 fn populate_packet<'a, 'b>(
5576 &mut self,
5577 now: Instant,
5578 space_id: SpaceId,
5579 path_id: PathId,
5580 scheduling_info: &PathSchedulingInfo,
5581 builder: &mut PacketBuilder<'a, 'b>,
5582 ) {
5583 let is_multipath_negotiated = self.is_multipath_negotiated();
5584 let space_has_keys = self.crypto_state.has_keys(space_id.encryption_level());
5585 let is_0rtt = space_id == SpaceId::Data && !space_has_keys;
5586 let stats = &mut self.stats.frame_tx;
5587 let space = &mut self.spaces[space_id];
5588 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5589 space
5590 .for_path(path_id)
5591 .pending_acks
5592 .maybe_ack_non_eliciting();
5593
5594 if !is_0rtt
5596 && scheduling_info.may_send_data
5597 && mem::replace(&mut space.pending.handshake_done, false)
5598 {
5599 builder.write_frame(frame::HandshakeDone, stats);
5600 }
5601
5602 if let Some((round, addresses)) = space.pending.reach_out.as_mut()
5604 && scheduling_info.may_send_data
5605 {
5606 while let Some(local_addr) = addresses.iter().next().copied() {
5607 let local_addr = addresses.take(&local_addr).expect("found from iter");
5608 let reach_out = frame::ReachOut::new(*round, local_addr);
5609 if builder.frame_space_remaining() > reach_out.size() {
5610 builder.write_frame(reach_out, stats);
5611 } else {
5612 addresses.insert(local_addr);
5613 break;
5614 }
5615 }
5616 if addresses.is_empty() {
5617 space.pending.reach_out = None;
5618 }
5619 }
5620
5621 if scheduling_info.may_send_data
5623 && space_id == SpaceId::Data
5624 && self
5625 .config
5626 .address_discovery_role
5627 .should_report(&self.peer_params.address_discovery_role)
5628 && (!path.observed_addr_sent || space.pending.observed_addr)
5629 {
5630 let frame =
5631 frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
5632 if builder.frame_space_remaining() > frame.size() {
5633 builder.write_frame(frame, stats);
5634
5635 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5636 path.observed_addr_sent = true;
5637
5638 space.pending.observed_addr = false;
5639 }
5640 }
5641
5642 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5644 builder.write_frame(frame::Ping, stats);
5645 }
5646
5647 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5649 debug_assert_eq!(
5650 space_id,
5651 SpaceId::Data,
5652 "immediate acks must be sent in the data space"
5653 );
5654 builder.write_frame(frame::ImmediateAck, stats);
5655 }
5656
5657 if scheduling_info.may_send_data {
5659 for path_id in space
5660 .number_spaces
5661 .iter_mut()
5662 .filter(|(_, pns)| pns.pending_acks.can_send())
5663 .map(|(&path_id, _)| path_id)
5664 .collect::<Vec<_>>()
5665 {
5666 Self::populate_acks(
5667 now,
5668 self.receiving_ecn,
5669 path_id,
5670 space_id,
5671 space,
5672 is_multipath_negotiated,
5673 builder,
5674 stats,
5675 space_has_keys,
5676 );
5677 }
5678 }
5679
5680 if scheduling_info.may_send_data && mem::replace(&mut space.pending.ack_frequency, false) {
5682 let sequence_number = self.ack_frequency.next_sequence_number();
5683
5684 let config = self.config.ack_frequency_config.as_ref().unwrap();
5686
5687 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5689 path.rtt.get(),
5690 config,
5691 &self.peer_params,
5692 );
5693
5694 let frame = frame::AckFrequency {
5695 sequence: sequence_number,
5696 ack_eliciting_threshold: config.ack_eliciting_threshold,
5697 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5698 reordering_threshold: config.reordering_threshold,
5699 };
5700 builder.write_frame(frame, stats);
5701
5702 self.ack_frequency
5703 .ack_frequency_sent(path_id, builder.packet_number, max_ack_delay);
5704 }
5705
5706 if builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
5708 && space_id == SpaceId::Data
5709 && path.pending_on_path_challenge
5710 && !self.state.is_closed()
5711 {
5713 path.pending_on_path_challenge = false;
5714
5715 let token = self.rng.random();
5716 path.record_path_challenge_sent(now, token, path.network_path);
5717 let challenge = frame::PathChallenge(token);
5719 builder.write_frame(challenge, stats);
5720 builder.require_padding();
5721 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5722 if path.open_status == paths::OpenStatus::Pending {
5723 path.open_status = paths::OpenStatus::Sent;
5724 self.timers.set(
5725 Timer::PerPath(path_id, PathTimer::PathOpen),
5726 now + 3 * pto,
5727 self.qlog.with_time(now),
5728 );
5729 }
5730
5731 self.timers.set(
5732 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5733 now + pto,
5734 self.qlog.with_time(now),
5735 );
5736
5737 if is_multipath_negotiated && !path.validated && path.pending_on_path_challenge {
5738 space.pending.path_status.insert(path_id);
5740 }
5741
5742 if space_id == SpaceId::Data
5745 && self
5746 .config
5747 .address_discovery_role
5748 .should_report(&self.peer_params.address_discovery_role)
5749 {
5750 let frame = frame::ObservedAddr::new(
5751 path.network_path.remote,
5752 self.next_observed_addr_seq_no,
5753 );
5754 if builder.frame_space_remaining() > frame.size() {
5755 builder.write_frame(frame, stats);
5756
5757 self.next_observed_addr_seq_no =
5758 self.next_observed_addr_seq_no.saturating_add(1u8);
5759 path.observed_addr_sent = true;
5760
5761 space.pending.observed_addr = false;
5762 }
5763 }
5764 }
5765
5766 if builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
5768 && space_id == SpaceId::Data
5769 && let Some(token) = path.path_responses.pop_on_path(path.network_path)
5770 {
5771 let response = frame::PathResponse(token);
5772 trace!(frame = %response);
5773 builder.write_frame(response, stats);
5774 builder.require_padding();
5775
5776 if space_id == SpaceId::Data
5780 && self
5781 .config
5782 .address_discovery_role
5783 .should_report(&self.peer_params.address_discovery_role)
5784 {
5785 let frame = frame::ObservedAddr::new(
5786 path.network_path.remote,
5787 self.next_observed_addr_seq_no,
5788 );
5789 if builder.frame_space_remaining() > frame.size() {
5790 builder.write_frame(frame, stats);
5791
5792 self.next_observed_addr_seq_no =
5793 self.next_observed_addr_seq_no.saturating_add(1u8);
5794 path.observed_addr_sent = true;
5795
5796 space.pending.observed_addr = false;
5797 }
5798 }
5799 }
5800
5801 while !is_0rtt
5803 && scheduling_info.may_send_data
5804 && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
5805 {
5806 let mut frame = match space.pending.crypto.pop_front() {
5807 Some(x) => x,
5808 None => break,
5809 };
5810
5811 let max_crypto_data_size = builder.frame_space_remaining()
5816 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5818 - 2; let len = frame
5821 .data
5822 .len()
5823 .min(2usize.pow(14) - 1)
5824 .min(max_crypto_data_size);
5825
5826 let data = frame.data.split_to(len);
5827 let offset = frame.offset;
5828 let truncated = frame::Crypto { offset, data };
5829 builder.write_frame(truncated, stats);
5830
5831 if !frame.data.is_empty() {
5832 frame.offset += len as u64;
5833 space.pending.crypto.push_front(frame);
5834 }
5835 }
5836
5837 while space_id == SpaceId::Data
5840 && scheduling_info.may_send_data
5841 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5842 {
5843 let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
5844 else {
5845 break;
5846 };
5847 let frame = frame::PathAbandon {
5848 path_id: abandoned_path_id,
5849 error_code,
5850 };
5851 builder.write_frame(frame, stats);
5852 }
5853
5854 while space_id == SpaceId::Data
5856 && scheduling_info.may_send_data
5857 && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
5858 {
5859 let Some(path_id) = space.pending.path_status.pop_first() else {
5860 break;
5861 };
5862 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5863 trace!(%path_id, "discarding queued path status for unknown path");
5864 continue;
5865 };
5866
5867 let seq = path.status.seq();
5868 match path.local_status() {
5869 PathStatus::Available => {
5870 let frame = frame::PathStatusAvailable {
5871 path_id,
5872 status_seq_no: seq,
5873 };
5874 builder.write_frame(frame, stats);
5875 }
5876 PathStatus::Backup => {
5877 let frame = frame::PathStatusBackup {
5878 path_id,
5879 status_seq_no: seq,
5880 };
5881 builder.write_frame(frame, stats);
5882 }
5883 }
5884 }
5885
5886 if space_id == SpaceId::Data
5888 && scheduling_info.may_send_data
5889 && space.pending.max_path_id
5890 && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
5891 {
5892 let frame = frame::MaxPathId(self.local_max_path_id);
5893 builder.write_frame(frame, stats);
5894 space.pending.max_path_id = false;
5895 }
5896
5897 if space_id == SpaceId::Data
5899 && scheduling_info.may_send_data
5900 && space.pending.paths_blocked
5901 && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5902 {
5903 let frame = frame::PathsBlocked(self.remote_max_path_id);
5904 builder.write_frame(frame, stats);
5905 space.pending.paths_blocked = false;
5906 }
5907
5908 while space_id == SpaceId::Data
5910 && scheduling_info.may_send_data
5911 && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5912 {
5913 let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
5914 break;
5915 };
5916 let next_seq = match self.remote_cids.get(&path_id) {
5917 Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
5918 None => VarInt(0),
5919 };
5920 let frame = frame::PathCidsBlocked { path_id, next_seq };
5921 builder.write_frame(frame, stats);
5922 }
5923
5924 if space_id == SpaceId::Data && scheduling_info.may_send_data {
5926 self.streams
5927 .write_control_frames(builder, &mut space.pending, stats);
5928 }
5929
5930 let cid_len = self
5932 .local_cid_state
5933 .values()
5934 .map(|cid_state| cid_state.cid_len())
5935 .max()
5936 .expect("some local CID state must exist");
5937 let new_cid_size_bound =
5938 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5939 while scheduling_info.may_send_data && builder.frame_space_remaining() > new_cid_size_bound
5940 {
5941 let issued = match space.pending.new_cids.pop() {
5942 Some(x) => x,
5943 None => break,
5944 };
5945 let retire_prior_to = self
5946 .local_cid_state
5947 .get(&issued.path_id)
5948 .map(|cid_state| cid_state.retire_prior_to())
5949 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5950
5951 let cid_path_id = match is_multipath_negotiated {
5952 true => Some(issued.path_id),
5953 false => {
5954 debug_assert_eq!(issued.path_id, PathId::ZERO);
5955 None
5956 }
5957 };
5958 let frame = frame::NewConnectionId {
5959 path_id: cid_path_id,
5960 sequence: issued.sequence,
5961 retire_prior_to,
5962 id: issued.id,
5963 reset_token: issued.reset_token,
5964 };
5965 builder.write_frame(frame, stats);
5966 }
5967
5968 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5970 while scheduling_info.may_send_data && builder.frame_space_remaining() > retire_cid_bound {
5971 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5972 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
5973 Some((path_id, seq)) => (Some(path_id), seq),
5974 None => break,
5975 };
5976 let frame = frame::RetireConnectionId { path_id, sequence };
5977 builder.write_frame(frame, stats);
5978 }
5979
5980 let mut sent_datagrams = false;
5982 while scheduling_info.may_send_data
5983 && builder.frame_space_remaining() > Datagram::SIZE_BOUND
5984 && space_id == SpaceId::Data
5985 {
5986 match self.datagrams.write(builder, stats) {
5987 true => {
5988 sent_datagrams = true;
5989 }
5990 false => break,
5991 }
5992 }
5993 if self.datagrams.send_blocked && sent_datagrams {
5994 self.events.push_back(Event::DatagramsUnblocked);
5995 self.datagrams.send_blocked = false;
5996 }
5997
5998 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5999
6000 if scheduling_info.may_send_data {
6002 while let Some(network_path) = space.pending.new_tokens.pop() {
6003 debug_assert_eq!(space_id, SpaceId::Data);
6004 let ConnectionSide::Server { server_config } = &self.side else {
6005 panic!("NEW_TOKEN frames should not be enqueued by clients");
6006 };
6007
6008 if !network_path.is_probably_same_path(&path.network_path) {
6009 continue;
6014 }
6015
6016 let token = Token::new(
6017 TokenPayload::Validation {
6018 ip: network_path.remote.ip(),
6019 issued: server_config.time_source.now(),
6020 },
6021 &mut self.rng,
6022 );
6023 let new_token = NewToken {
6024 token: token.encode(&*server_config.token_key).into(),
6025 };
6026
6027 if builder.frame_space_remaining() < new_token.size() {
6028 space.pending.new_tokens.push(network_path);
6029 break;
6030 }
6031
6032 builder.write_frame(new_token, stats);
6033 builder.retransmits_mut().new_tokens.push(network_path);
6034 }
6035 }
6036
6037 if scheduling_info.may_send_data && space_id == SpaceId::Data {
6039 self.streams
6040 .write_stream_frames(builder, self.config.send_fairness, stats);
6041 }
6042
6043 while space_id == SpaceId::Data
6045 && scheduling_info.may_send_data
6046 && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
6047 {
6048 if let Some(added_address) = space.pending.add_address.pop_last() {
6049 builder.write_frame(added_address, stats);
6050 } else {
6051 break;
6052 }
6053 }
6054
6055 while space_id == SpaceId::Data
6057 && scheduling_info.may_send_data
6058 && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
6059 {
6060 if let Some(removed_address) = space.pending.remove_address.pop_last() {
6061 builder.write_frame(removed_address, stats);
6062 } else {
6063 break;
6064 }
6065 }
6066 }
6067
6068 fn populate_acks<'a, 'b>(
6070 now: Instant,
6071 receiving_ecn: bool,
6072 path_id: PathId,
6073 space_id: SpaceId,
6074 space: &mut PacketSpace,
6075 is_multipath_negotiated: bool,
6076 builder: &mut PacketBuilder<'a, 'b>,
6077 stats: &mut FrameStats,
6078 space_has_keys: bool,
6079 ) {
6080 debug_assert!(space_has_keys, "tried to send ACK in 0-RTT");
6082
6083 debug_assert!(
6084 is_multipath_negotiated || path_id == PathId::ZERO,
6085 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
6086 );
6087 if is_multipath_negotiated {
6088 debug_assert!(
6089 space_id == SpaceId::Data || path_id == PathId::ZERO,
6090 "path acks must be sent in 1RTT space (have {space_id:?})"
6091 );
6092 }
6093
6094 let pns = space.for_path(path_id);
6095 let ranges = pns.pending_acks.ranges();
6096 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
6097 let ecn = if receiving_ecn {
6098 Some(&pns.ecn_counters)
6099 } else {
6100 None
6101 };
6102
6103 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
6104 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
6106 let delay = delay_micros >> ack_delay_exp.into_inner();
6107
6108 if is_multipath_negotiated && space_id == SpaceId::Data {
6109 if !ranges.is_empty() {
6110 let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
6111 builder.write_frame(frame, stats);
6112 }
6113 } else {
6114 builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
6115 }
6116 }
6117
6118 fn close_common(&mut self) {
6119 trace!("connection closed");
6120 self.timers.reset();
6121 }
6122
6123 fn set_close_timer(&mut self, now: Instant) {
6124 let pto_max = self.max_pto_for_space(self.highest_space);
6127 self.timers.set(
6128 Timer::Conn(ConnTimer::Close),
6129 now + 3 * pto_max,
6130 self.qlog.with_time(now),
6131 );
6132 }
6133
6134 fn handle_peer_params(
6139 &mut self,
6140 params: TransportParameters,
6141 local_cid: ConnectionId,
6142 remote_cid: ConnectionId,
6143 now: Instant,
6144 ) -> Result<(), TransportError> {
6145 if Some(self.original_remote_cid) != params.initial_src_cid
6146 || (self.side.is_client()
6147 && (Some(self.initial_dst_cid) != params.original_dst_cid
6148 || self.retry_src_cid != params.retry_src_cid))
6149 {
6150 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
6151 "CID authentication failure",
6152 ));
6153 }
6154 if params.initial_max_path_id.is_some() && (local_cid.is_empty() || remote_cid.is_empty()) {
6155 return Err(TransportError::PROTOCOL_VIOLATION(
6156 "multipath must not use zero-length CIDs",
6157 ));
6158 }
6159
6160 self.set_peer_params(params);
6161 self.qlog.emit_peer_transport_params_received(self, now);
6162
6163 Ok(())
6164 }
6165
6166 fn set_peer_params(&mut self, params: TransportParameters) {
6167 self.streams.set_params(¶ms);
6168 self.idle_timeout =
6169 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
6170 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
6171
6172 if let Some(ref info) = params.preferred_address {
6173 self.remote_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
6175 path_id: None,
6176 sequence: 1,
6177 id: info.connection_id,
6178 reset_token: info.stateless_reset_token,
6179 retire_prior_to: 0,
6180 })
6181 .expect(
6182 "preferred address CID is the first received, and hence is guaranteed to be legal",
6183 );
6184 let remote = self.path_data(PathId::ZERO).network_path.remote;
6185 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
6186 }
6187 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
6188
6189 let mut multipath_enabled = None;
6190 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
6191 self.config.get_initial_max_path_id(),
6192 params.initial_max_path_id,
6193 ) {
6194 self.local_max_path_id = local_max_path_id;
6196 self.remote_max_path_id = remote_max_path_id;
6197 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
6198 debug!(%initial_max_path_id, "multipath negotiated");
6199 multipath_enabled = Some(initial_max_path_id);
6200 }
6201
6202 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
6203 self.config
6204 .max_remote_nat_traversal_addresses
6205 .zip(params.max_remote_nat_traversal_addresses)
6206 {
6207 if let Some(max_initial_paths) =
6208 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
6209 {
6210 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
6211 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
6212 self.n0_nat_traversal = n0_nat_traversal::State::new(
6213 max_remote_addresses,
6214 max_local_addresses,
6215 self.side(),
6216 );
6217 debug!(
6218 %max_remote_addresses, %max_local_addresses,
6219 "n0's nat traversal negotiated"
6220 );
6221
6222 match self.side() {
6223 Side::Client => {
6224 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
6225 debug!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
6228 } else if max_local_addresses as u64
6229 > params.active_connection_id_limit.into_inner()
6230 {
6231 debug!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
6235 }
6236 }
6237 Side::Server => {
6238 if (max_initial_paths.as_u32() as u64) < crate::LOCAL_CID_COUNT {
6239 debug!(%max_initial_paths, local_cid_limit=%crate::LOCAL_CID_COUNT, "local server configuration might cause nat traversal issues")
6240 }
6241 }
6242 }
6243 } else {
6244 debug!("n0 nat traversal enabled for both endpoints, but multipath is missing")
6245 }
6246 }
6247
6248 self.peer_params = params;
6249 let peer_max_udp_payload_size =
6250 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
6251 self.path_data_mut(PathId::ZERO)
6252 .mtud
6253 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
6254 }
6255
6256 fn decrypt_packet(
6258 &mut self,
6259 now: Instant,
6260 path_id: PathId,
6261 packet: &mut Packet,
6262 ) -> Result<Option<u64>, Option<TransportError>> {
6263 let result = self
6264 .crypto_state
6265 .decrypt_packet_body(packet, path_id, &self.spaces)?;
6266
6267 let result = match result {
6268 Some(r) => r,
6269 None => return Ok(None),
6270 };
6271
6272 if result.outgoing_key_update_acked
6273 && let Some(prev) = self.crypto_state.prev_crypto.as_mut()
6274 {
6275 prev.end_packet = Some((result.number, now));
6276 self.set_key_discard_timer(now, packet.header.space());
6277 }
6278
6279 if result.incoming_key_update {
6280 trace!("key update authenticated");
6281 self.crypto_state
6282 .update_keys(Some((result.number, now)), true);
6283 self.set_key_discard_timer(now, packet.header.space());
6284 }
6285
6286 Ok(Some(result.number))
6287 }
6288
6289 fn peer_supports_ack_frequency(&self) -> bool {
6290 self.peer_params.min_ack_delay.is_some()
6291 }
6292
6293 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
6298 debug_assert_eq!(
6299 self.highest_space,
6300 SpaceKind::Data,
6301 "immediate ack must be written in the data space"
6302 );
6303 self.spaces[SpaceId::Data]
6304 .for_path(path_id)
6305 .immediate_ack_pending = true;
6306 }
6307
6308 #[cfg(test)]
6310 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
6311 let (path_id, first_decode, remaining) = match &event.0 {
6312 ConnectionEventInner::Datagram(DatagramConnectionEvent {
6313 path_id,
6314 first_decode,
6315 remaining,
6316 ..
6317 }) => (path_id, first_decode, remaining),
6318 _ => return None,
6319 };
6320
6321 if remaining.is_some() {
6322 panic!("Packets should never be coalesced in tests");
6323 }
6324
6325 let decrypted_header = self
6326 .crypto_state
6327 .unprotect_header(first_decode.clone(), self.peer_params.stateless_reset_token)?;
6328
6329 let mut packet = decrypted_header.packet?;
6330 self.crypto_state
6331 .decrypt_packet_body(&mut packet, *path_id, &self.spaces)
6332 .ok()?;
6333
6334 Some(packet.payload.to_vec())
6335 }
6336
6337 #[cfg(test)]
6340 pub(crate) fn bytes_in_flight(&self) -> u64 {
6341 self.path_data(PathId::ZERO).in_flight.bytes
6343 }
6344
6345 #[cfg(test)]
6347 pub(crate) fn congestion_window(&self) -> u64 {
6348 let path = self.path_data(PathId::ZERO);
6349 path.congestion
6350 .window()
6351 .saturating_sub(path.in_flight.bytes)
6352 }
6353
6354 #[cfg(test)]
6356 pub(crate) fn is_idle(&self) -> bool {
6357 let current_timers = self.timers.values();
6358 current_timers
6359 .into_iter()
6360 .filter(|(timer, _)| {
6361 !matches!(
6362 timer,
6363 Timer::Conn(ConnTimer::KeepAlive)
6364 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6365 | Timer::Conn(ConnTimer::PushNewCid)
6366 | Timer::Conn(ConnTimer::KeyDiscard)
6367 )
6368 })
6369 .min_by_key(|(_, time)| *time)
6370 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6371 }
6372
6373 #[cfg(test)]
6375 pub(crate) fn using_ecn(&self) -> bool {
6376 self.path_data(PathId::ZERO).sending_ecn
6377 }
6378
6379 #[cfg(test)]
6381 pub(crate) fn total_recvd(&self) -> u64 {
6382 self.path_data(PathId::ZERO).total_recvd
6383 }
6384
6385 #[cfg(test)]
6386 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6387 self.local_cid_state
6388 .get(&PathId::ZERO)
6389 .unwrap()
6390 .active_seq()
6391 }
6392
6393 #[cfg(test)]
6394 #[track_caller]
6395 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6396 self.local_cid_state
6397 .get(&PathId(path_id))
6398 .unwrap()
6399 .active_seq()
6400 }
6401
6402 #[cfg(test)]
6405 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6406 let n = self
6407 .local_cid_state
6408 .get_mut(&PathId::ZERO)
6409 .unwrap()
6410 .assign_retire_seq(v);
6411 debug_assert!(!self.state.is_drained()); self.endpoint_events
6413 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6414 }
6415
6416 #[cfg(test)]
6418 pub(crate) fn active_remote_cid_seq(&self) -> u64 {
6419 self.remote_cids.get(&PathId::ZERO).unwrap().active_seq()
6420 }
6421
6422 #[cfg(test)]
6424 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6425 self.path_data(path_id).current_mtu()
6426 }
6427
6428 #[cfg(test)]
6430 pub(crate) fn trigger_path_validation(&mut self) {
6431 for path in self.paths.values_mut() {
6432 path.data.pending_on_path_challenge = true;
6433 }
6434 }
6435
6436 #[cfg(test)]
6438 pub fn simulate_protocol_violation(&mut self, now: Instant) {
6439 if !self.state.is_closed() {
6440 self.state
6441 .move_to_closed(TransportError::PROTOCOL_VIOLATION("simulated violation"));
6442 self.close_common();
6443 if !self.state.is_drained() {
6444 self.set_close_timer(now);
6445 }
6446 self.connection_close_pending = true;
6447 }
6448 }
6449
6450 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6461 let space_id_only = self.paths.get(&path_id).is_some_and(|path| {
6462 path.data.pending_on_path_challenge || !path.data.path_responses.is_empty()
6463 });
6464
6465 let other = self.streams.can_send_stream_data()
6467 || self
6468 .datagrams
6469 .outgoing
6470 .front()
6471 .is_some_and(|x| x.size(true) <= max_size);
6472
6473 SendableFrames {
6475 acks: false,
6476 close: false,
6477 space_id_only,
6478 other,
6479 }
6480 }
6481
6482 fn kill(&mut self, reason: ConnectionError) {
6484 self.close_common();
6485 self.state.move_to_drained(Some(reason));
6486 self.endpoint_events.push_back(EndpointEventInner::Drained);
6489 }
6490
6491 pub fn current_mtu(&self) -> u16 {
6498 self.paths
6499 .iter()
6500 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6501 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6502 .min()
6503 .expect("There is always at least one available path")
6504 }
6505
6506 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6513 let pn_len = PacketNumber::new(
6514 pn,
6515 self.spaces[SpaceId::Data]
6516 .for_path(path)
6517 .largest_acked_packet
6518 .unwrap_or(0),
6519 )
6520 .len();
6521
6522 1 + self
6524 .remote_cids
6525 .get(&path)
6526 .map(|cids| cids.active().len())
6527 .unwrap_or(20) + pn_len
6529 + self.tag_len_1rtt()
6530 }
6531
6532 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6533 let pn_len = 4;
6534
6535 let cid_len = self
6536 .remote_cids
6537 .values()
6538 .map(|cids| cids.active().len())
6539 .max()
6540 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6544 }
6545
6546 fn tag_len_1rtt(&self) -> usize {
6547 let packet_crypto = self
6549 .crypto_state
6550 .encryption_keys(SpaceKind::Data, self.side.side())
6551 .map(|(_header, packet, _level)| packet);
6552 packet_crypto.map_or(16, |x| x.tag_len())
6556 }
6557
6558 fn on_path_validated(&mut self, path_id: PathId) {
6560 self.path_data_mut(path_id).validated = true;
6561 let ConnectionSide::Server { server_config } = &self.side else {
6562 return;
6563 };
6564 let network_path = self.path_data(path_id).network_path;
6565 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6566 new_tokens.clear();
6567 for _ in 0..server_config.validation_token.sent {
6568 new_tokens.push(network_path);
6569 }
6570 }
6571
6572 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6574 if let Some(path) = self.paths.get_mut(&path_id) {
6575 path.data.status.remote_update(status, status_seq_no);
6576 } else {
6577 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6578 }
6579 self.events.push_back(
6580 PathEvent::RemoteStatus {
6581 id: path_id,
6582 status,
6583 }
6584 .into(),
6585 );
6586 }
6587
6588 fn max_path_id(&self) -> Option<PathId> {
6597 if self.is_multipath_negotiated() {
6598 Some(self.remote_max_path_id.min(self.local_max_path_id))
6599 } else {
6600 None
6601 }
6602 }
6603
6604 fn is_ipv6(&self) -> bool {
6609 self.paths
6610 .values()
6611 .any(|p| p.data.network_path.remote.is_ipv6())
6612 }
6613
6614 pub fn add_nat_traversal_address(
6616 &mut self,
6617 address: SocketAddr,
6618 ) -> Result<(), n0_nat_traversal::Error> {
6619 if let Some(added) = self.n0_nat_traversal.add_local_address(address)? {
6620 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6621 };
6622 Ok(())
6623 }
6624
6625 pub fn remove_nat_traversal_address(
6629 &mut self,
6630 address: SocketAddr,
6631 ) -> Result<(), n0_nat_traversal::Error> {
6632 if let Some(removed) = self.n0_nat_traversal.remove_local_address(address)? {
6633 self.spaces[SpaceId::Data]
6634 .pending
6635 .remove_address
6636 .insert(removed);
6637 }
6638 Ok(())
6639 }
6640
6641 pub fn get_local_nat_traversal_addresses(
6643 &self,
6644 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6645 self.n0_nat_traversal.get_local_nat_traversal_addresses()
6646 }
6647
6648 pub fn get_remote_nat_traversal_addresses(
6650 &self,
6651 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6652 Ok(self
6653 .n0_nat_traversal
6654 .client_side()?
6655 .get_remote_nat_traversal_addresses())
6656 }
6657
6658 fn open_nat_traversal_path(
6662 &mut self,
6663 now: Instant,
6664 ip_port: (IpAddr, u16),
6665 ) -> Result<Option<(PathId, SocketAddr)>, PathError> {
6666 let remote = ip_port.into();
6667 let network_path = FourTuple {
6672 remote,
6673 local_ip: None,
6674 };
6675 match self.open_path_ensure(network_path, PathStatus::Backup, now) {
6676 Ok((path_id, path_was_known)) => {
6677 if path_was_known {
6678 trace!(%path_id, %remote, "nat traversal: path existed for remote");
6679 }
6680 Ok(Some((path_id, remote)))
6681 }
6682 Err(e) => {
6683 debug!(%remote, %e, "nat traversal: failed to probe remote");
6684 Err(e)
6685 }
6686 }
6687 }
6688
6689 pub fn initiate_nat_traversal_round(
6699 &mut self,
6700 now: Instant,
6701 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6702 if self.state.is_closed() {
6703 return Err(n0_nat_traversal::Error::Closed);
6704 }
6705
6706 let ipv6 = self.is_ipv6();
6707 let client_state = self.n0_nat_traversal.client_side_mut()?;
6708 let n0_nat_traversal::NatTraversalRound {
6709 new_round,
6710 reach_out_at,
6711 addresses_to_probe,
6712 prev_round_path_ids,
6713 } = client_state.initiate_nat_traversal_round(ipv6)?;
6714
6715 trace!(%new_round, reach_out=reach_out_at.len(), to_probe=addresses_to_probe.len(),
6716 "initiating nat traversal round");
6717
6718 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6719
6720 for path_id in prev_round_path_ids {
6721 let Some(path) = self.path(path_id) else {
6722 continue;
6723 };
6724 let ip = path.network_path.remote.ip();
6725 let port = path.network_path.remote.port();
6726
6727 if !addresses_to_probe
6731 .iter()
6732 .any(|(_, probe)| *probe == (ip, port))
6733 && !path.validated
6734 && !self.abandoned_paths.contains(&path_id)
6735 {
6736 trace!(%path_id, "closing path from previous round");
6737 let _ =
6738 self.close_path_inner(now, path_id, PathAbandonReason::NatTraversalRoundEnded);
6739 }
6740 }
6741
6742 let mut err = None;
6743
6744 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6745 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6746
6747 for (id, address) in addresses_to_probe {
6748 match self.open_nat_traversal_path(now, address) {
6749 Ok(None) => {}
6750 Ok(Some((path_id, remote))) => {
6751 path_ids.push(path_id);
6752 probed_addresses.push(remote);
6753 }
6754 Err(e) => {
6755 self.n0_nat_traversal
6756 .client_side_mut()
6757 .expect("validated")
6758 .report_in_continuation(id, e);
6759 err.get_or_insert(e);
6760 }
6761 }
6762 }
6763
6764 if let Some(err) = err {
6765 if probed_addresses.is_empty() {
6767 return Err(n0_nat_traversal::Error::Multipath(err));
6768 }
6769 }
6770
6771 self.n0_nat_traversal
6772 .client_side_mut()
6773 .expect("connection side validated")
6774 .set_round_path_ids(path_ids);
6775
6776 Ok(probed_addresses)
6777 }
6778
6779 fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6784 let ipv6 = self.is_ipv6();
6785 let client_state = self.n0_nat_traversal.client_side_mut().ok()?;
6786 let (id, address) = client_state.continue_nat_traversal_round(ipv6)?;
6787 let open_result = self.open_nat_traversal_path(now, address);
6788 let client_state = self.n0_nat_traversal.client_side_mut().expect("validated");
6789 match open_result {
6790 Ok(None) => Some(true),
6791 Ok(Some((path_id, _remote))) => {
6792 client_state.add_round_path_id(path_id);
6793 Some(true)
6794 }
6795 Err(e) => {
6796 client_state.report_in_continuation(id, e);
6797 Some(false)
6798 }
6799 }
6800 }
6801}
6802
6803impl fmt::Debug for Connection {
6804 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
6805 f.debug_struct("Connection")
6806 .field("handshake_cid", &self.handshake_cid)
6807 .finish()
6808 }
6809}
6810
6811pub trait NetworkChangeHint: std::fmt::Debug + 'static {
6813 fn is_path_recoverable(&self, path_id: PathId, network_path: FourTuple) -> bool;
6822}
6823
6824#[derive(Debug)]
6826enum PollPathStatus {
6827 NothingToSend {
6829 congestion_blocked: bool,
6831 },
6832 Send(Transmit),
6834}
6835
6836#[derive(Debug)]
6838enum PollPathSpaceStatus {
6839 NothingToSend {
6841 congestion_blocked: bool,
6843 },
6844 WrotePacket {
6846 last_packet_number: u64,
6848 pad_datagram: PadDatagram,
6862 },
6863 Send {
6870 last_packet_number: u64,
6872 },
6873}
6874
6875#[derive(Debug, Copy, Clone)]
6881struct PathSchedulingInfo {
6882 abandoned: bool,
6888 may_send_data: bool,
6903 may_send_close: bool,
6909}
6910
6911#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6912enum PathBlocked {
6913 No,
6914 AntiAmplification,
6915 Congestion,
6916 Pacing,
6917}
6918
6919enum ConnectionSide {
6921 Client {
6922 token: Bytes,
6924 token_store: Arc<dyn TokenStore>,
6925 server_name: String,
6926 },
6927 Server {
6928 server_config: Arc<ServerConfig>,
6929 },
6930}
6931
6932impl ConnectionSide {
6933 fn remote_may_migrate(&self, state: &State) -> bool {
6934 match self {
6935 Self::Server { server_config } => server_config.migration,
6936 Self::Client { .. } => {
6937 if let Some(hs) = state.as_handshake() {
6938 hs.allow_server_migration
6939 } else {
6940 false
6941 }
6942 }
6943 }
6944 }
6945
6946 fn is_client(&self) -> bool {
6947 self.side().is_client()
6948 }
6949
6950 fn is_server(&self) -> bool {
6951 self.side().is_server()
6952 }
6953
6954 fn side(&self) -> Side {
6955 match *self {
6956 Self::Client { .. } => Side::Client,
6957 Self::Server { .. } => Side::Server,
6958 }
6959 }
6960}
6961
6962impl From<SideArgs> for ConnectionSide {
6963 fn from(side: SideArgs) -> Self {
6964 match side {
6965 SideArgs::Client {
6966 token_store,
6967 server_name,
6968 } => Self::Client {
6969 token: token_store.take(&server_name).unwrap_or_default(),
6970 token_store,
6971 server_name,
6972 },
6973 SideArgs::Server {
6974 server_config,
6975 pref_addr_cid: _,
6976 path_validated: _,
6977 } => Self::Server { server_config },
6978 }
6979 }
6980}
6981
6982pub(crate) enum SideArgs {
6984 Client {
6985 token_store: Arc<dyn TokenStore>,
6986 server_name: String,
6987 },
6988 Server {
6989 server_config: Arc<ServerConfig>,
6990 pref_addr_cid: Option<ConnectionId>,
6991 path_validated: bool,
6992 },
6993}
6994
6995impl SideArgs {
6996 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6997 match *self {
6998 Self::Client { .. } => None,
6999 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
7000 }
7001 }
7002
7003 pub(crate) fn path_validated(&self) -> bool {
7004 match *self {
7005 Self::Client { .. } => true,
7006 Self::Server { path_validated, .. } => path_validated,
7007 }
7008 }
7009
7010 pub(crate) fn side(&self) -> Side {
7011 match *self {
7012 Self::Client { .. } => Side::Client,
7013 Self::Server { .. } => Side::Server,
7014 }
7015 }
7016}
7017
7018#[derive(Debug, Error, Clone, PartialEq, Eq)]
7020pub enum ConnectionError {
7021 #[error("peer doesn't implement any supported version")]
7023 VersionMismatch,
7024 #[error(transparent)]
7026 TransportError(#[from] TransportError),
7027 #[error("aborted by peer: {0}")]
7029 ConnectionClosed(frame::ConnectionClose),
7030 #[error("closed by peer: {0}")]
7032 ApplicationClosed(frame::ApplicationClose),
7033 #[error("reset by peer")]
7035 Reset,
7036 #[error("timed out")]
7042 TimedOut,
7043 #[error("closed")]
7045 LocallyClosed,
7046 #[error("CIDs exhausted")]
7050 CidsExhausted,
7051}
7052
7053impl From<Close> for ConnectionError {
7054 fn from(x: Close) -> Self {
7055 match x {
7056 Close::Connection(reason) => Self::ConnectionClosed(reason),
7057 Close::Application(reason) => Self::ApplicationClosed(reason),
7058 }
7059 }
7060}
7061
7062impl From<ConnectionError> for io::Error {
7064 fn from(x: ConnectionError) -> Self {
7065 use ConnectionError::*;
7066 let kind = match x {
7067 TimedOut => io::ErrorKind::TimedOut,
7068 Reset => io::ErrorKind::ConnectionReset,
7069 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
7070 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
7071 io::ErrorKind::Other
7072 }
7073 };
7074 Self::new(kind, x)
7075 }
7076}
7077
7078#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
7081pub enum PathError {
7082 #[error("multipath extension not negotiated")]
7084 MultipathNotNegotiated,
7085 #[error("the server side may not open a path")]
7087 ServerSideNotAllowed,
7088 #[error("maximum number of concurrent paths reached")]
7090 MaxPathIdReached,
7091 #[error("remoted CIDs exhausted")]
7093 RemoteCidsExhausted,
7094 #[error("path validation failed")]
7096 ValidationFailed,
7097 #[error("invalid remote address")]
7099 InvalidRemoteAddress(SocketAddr),
7100}
7101
7102#[derive(Debug, Error, Clone, Eq, PartialEq)]
7104pub enum ClosePathError {
7105 #[error("Multipath extension not negotiated")]
7107 MultipathNotNegotiated,
7108 #[error("closed path")]
7110 ClosedPath,
7111 #[error("last open path")]
7113 LastOpenPath,
7114}
7115
7116#[derive(Debug, Error, Clone, Copy)]
7118#[error("Multipath extension not negotiated")]
7119pub struct MultipathNotNegotiated {
7120 _private: (),
7121}
7122
7123#[derive(Debug)]
7125pub enum Event {
7126 HandshakeDataReady,
7128 Connected,
7130 HandshakeConfirmed,
7132 ConnectionLost {
7136 reason: ConnectionError,
7138 },
7139 Stream(StreamEvent),
7141 DatagramReceived,
7143 DatagramsUnblocked,
7145 Path(PathEvent),
7147 NatTraversal(n0_nat_traversal::Event),
7149}
7150
7151impl From<PathEvent> for Event {
7152 fn from(source: PathEvent) -> Self {
7153 Self::Path(source)
7154 }
7155}
7156
7157fn get_max_ack_delay(params: &TransportParameters) -> Duration {
7158 Duration::from_micros(params.max_ack_delay.0 * 1000)
7159}
7160
7161const MAX_BACKOFF_EXPONENT: u32 = 16;
7163
7164const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
7172
7173const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
7179 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
7180
7181#[derive(Default)]
7182struct SentFrames {
7183 retransmits: ThinRetransmits,
7184 largest_acked: FxHashMap<PathId, u64>,
7186 stream_frames: StreamMetaVec,
7187 non_retransmits: bool,
7189 requires_padding: bool,
7191}
7192
7193impl SentFrames {
7194 fn is_ack_only(&self, streams: &StreamsState) -> bool {
7196 !self.largest_acked.is_empty()
7197 && !self.non_retransmits
7198 && self.stream_frames.is_empty()
7199 && self.retransmits.is_empty(streams)
7200 }
7201
7202 fn retransmits_mut(&mut self) -> &mut Retransmits {
7203 self.retransmits.get_or_create()
7204 }
7205
7206 fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
7207 use frame::EncodableFrame::*;
7208 match frame {
7209 PathAck(path_ack_encoder) => {
7210 if let Some(max) = path_ack_encoder.ranges.max() {
7211 self.largest_acked.insert(path_ack_encoder.path_id, max);
7212 }
7213 }
7214 Ack(ack_encoder) => {
7215 if let Some(max) = ack_encoder.ranges.max() {
7216 self.largest_acked.insert(PathId::ZERO, max);
7217 }
7218 }
7219 Close(_) => { }
7220 PathResponse(_) => self.non_retransmits = true,
7221 HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
7222 ReachOut(frame::ReachOut { round, ip, port }) => {
7223 let (recorded_round, reach_outs) = self
7224 .retransmits_mut()
7225 .reach_out
7226 .get_or_insert_with(|| (round, FxHashSet::default()));
7227 if *recorded_round == round {
7229 reach_outs.insert((ip, port));
7231 } else if *recorded_round < round {
7232 *recorded_round = round;
7234 reach_outs.drain();
7235 reach_outs.insert((ip, port));
7236 } else {
7237 }
7239 }
7240
7241 ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
7242 Ping(_) => self.non_retransmits = true,
7243 ImmediateAck(_) => self.non_retransmits = true,
7244 AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
7245 PathChallenge(_) => self.non_retransmits = true,
7246 Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
7247 PathAbandon(path_abandon) => {
7248 self.retransmits_mut()
7249 .path_abandon
7250 .entry(path_abandon.path_id)
7251 .or_insert(path_abandon.error_code);
7252 }
7253 PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
7254 | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
7255 self.retransmits_mut().path_status.insert(path_id);
7256 }
7257 MaxPathId(_) => self.retransmits_mut().max_path_id = true,
7258 PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
7259 PathCidsBlocked(path_cids_blocked) => {
7260 self.retransmits_mut()
7261 .path_cids_blocked
7262 .insert(path_cids_blocked.path_id);
7263 }
7264 ResetStream(reset) => self
7265 .retransmits_mut()
7266 .reset_stream
7267 .push((reset.id, reset.error_code)),
7268 StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
7269 NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
7270 RetireConnectionId(retire_cid) => self
7271 .retransmits_mut()
7272 .retire_cids
7273 .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
7274 Datagram(_) => self.non_retransmits = true,
7275 NewToken(_) => {}
7276 AddAddress(add_address) => {
7277 self.retransmits_mut().add_address.insert(add_address);
7278 }
7279 RemoveAddress(remove_address) => {
7280 self.retransmits_mut().remove_address.insert(remove_address);
7281 }
7282 StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
7283 MaxData(_) => self.retransmits_mut().max_data = true,
7284 MaxStreamData(max) => {
7285 self.retransmits_mut().max_stream_data.insert(max.id);
7286 }
7287 MaxStreams(max_streams) => {
7288 self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
7289 }
7290 }
7291 }
7292}
7293
7294fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
7302 match (x, y) {
7303 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
7304 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
7305 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
7306 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
7307 }
7308}
7309
7310#[cfg(test)]
7311mod tests {
7312 use super::*;
7313
7314 #[test]
7315 fn negotiate_max_idle_timeout_commutative() {
7316 let test_params = [
7317 (None, None, None),
7318 (None, Some(VarInt(0)), None),
7319 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7320 (Some(VarInt(0)), Some(VarInt(0)), None),
7321 (
7322 Some(VarInt(2)),
7323 Some(VarInt(0)),
7324 Some(Duration::from_millis(2)),
7325 ),
7326 (
7327 Some(VarInt(1)),
7328 Some(VarInt(4)),
7329 Some(Duration::from_millis(1)),
7330 ),
7331 ];
7332
7333 for (left, right, result) in test_params {
7334 assert_eq!(negotiate_max_idle_timeout(left, right), result);
7335 assert_eq!(negotiate_max_idle_timeout(right, left), result);
7336 }
7337 }
7338}