1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::{NonZeroU32, NonZeroUsize},
8 sync::Arc,
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13
14use rand::{Rng, SeedableRng, rngs::StdRng};
15use rustc_hash::{FxHashMap, FxHashSet};
16use thiserror::Error;
17use tracing::{debug, error, trace, trace_span, warn};
18
19use crate::{
20 Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
21 MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
22 TransportError, TransportErrorCode, VarInt,
23 cid_generator::ConnectionIdGenerator,
24 cid_queue::CidQueue,
25 config::{ServerConfig, TransportConfig},
26 congestion::Controller,
27 connection::{
28 qlog::{QlogRecvPacket, QlogSink},
29 spaces::LostPacket,
30 timer::{ConnTimer, PathTimer},
31 },
32 crypto::{self, Keys},
33 frame::{
34 self, Close, DataBlocked, Datagram, FrameStruct, NewToken, ObservedAddr, StreamDataBlocked,
35 StreamsBlocked,
36 },
37 n0_nat_traversal,
38 packet::{
39 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40 PacketNumber, PartialDecode, SpaceId,
41 },
42 range_set::ArrayRangeSet,
43 shared::{
44 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45 EndpointEvent, EndpointEventInner,
46 },
47 token::{ResetToken, Token, TokenPayload},
48 transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54mod assembler;
55pub use assembler::Chunk;
56
57mod cid_state;
58use cid_state::CidState;
59
60mod datagrams;
61use datagrams::DatagramState;
62pub use datagrams::{Datagrams, SendDatagramError};
63
64mod mtud;
65mod pacing;
66
67mod packet_builder;
68use packet_builder::{PacketBuilder, PadDatagram};
69
70mod packet_crypto;
71use packet_crypto::CryptoState;
72pub(crate) use packet_crypto::EncryptionLevel;
73
74mod paths;
75pub use paths::{
76 ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError,
77};
78use paths::{PathData, PathState};
79
80pub(crate) mod qlog;
81pub(crate) mod send_buffer;
82
83mod spaces;
84#[cfg(fuzzing)]
85pub use spaces::Retransmits;
86#[cfg(not(fuzzing))]
87use spaces::Retransmits;
88pub(crate) use spaces::SpaceKind;
89use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
90
91mod stats;
92pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
93
94mod streams;
95#[cfg(fuzzing)]
96pub use streams::StreamsState;
97#[cfg(not(fuzzing))]
98use streams::StreamsState;
99pub use streams::{
100 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
101 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
102};
103
104mod timer;
105use timer::{Timer, TimerTable};
106
107mod transmit_buf;
108use transmit_buf::TransmitBuf;
109
110mod state;
111
112#[cfg(not(fuzzing))]
113use state::State;
114#[cfg(fuzzing)]
115pub use state::State;
116use state::StateType;
117
118pub struct Connection {
158 endpoint_config: Arc<EndpointConfig>,
159 config: Arc<TransportConfig>,
160 rng: StdRng,
161 crypto_state: CryptoState,
163 handshake_cid: ConnectionId,
165 remote_handshake_cid: ConnectionId,
167 paths: BTreeMap<PathId, PathState>,
173 path_generation_counter: u64,
184 allow_mtud: bool,
186 state: State,
187 side: ConnectionSide,
188 peer_params: TransportParameters,
190 original_remote_cid: ConnectionId,
192 initial_dst_cid: ConnectionId,
194 retry_src_cid: Option<ConnectionId>,
197 events: VecDeque<Event>,
199 endpoint_events: VecDeque<EndpointEventInner>,
200 spin_enabled: bool,
202 spin: bool,
204 spaces: [PacketSpace; 3],
206 highest_space: SpaceKind,
208 permit_idle_reset: bool,
210 idle_timeout: Option<Duration>,
212 timers: TimerTable,
213 authentication_failures: u64,
215
216 connection_close_pending: bool,
221
222 ack_frequency: AckFrequencyState,
226
227 receiving_ecn: bool,
232 total_authed_packets: u64,
234 app_limited: bool,
237
238 next_observed_addr_seq_no: VarInt,
243
244 streams: StreamsState,
245 remote_cids: FxHashMap<PathId, CidQueue>,
251 local_cid_state: FxHashMap<PathId, CidState>,
258 datagrams: DatagramState,
260 stats: ConnectionStats,
262 path_stats: FxHashMap<PathId, PathStats>,
264 version: u32,
266
267 max_concurrent_paths: NonZeroU32,
276 local_max_path_id: PathId,
291 remote_max_path_id: PathId,
297 max_path_id_with_cids: PathId,
303 abandoned_paths: FxHashSet<PathId>,
311
312 n0_nat_traversal: n0_nat_traversal::State,
314 qlog: QlogSink,
315}
316
317impl Connection {
318 pub(crate) fn new(
319 endpoint_config: Arc<EndpointConfig>,
320 config: Arc<TransportConfig>,
321 init_cid: ConnectionId,
322 local_cid: ConnectionId,
323 remote_cid: ConnectionId,
324 network_path: FourTuple,
325 crypto: Box<dyn crypto::Session>,
326 cid_gen: &dyn ConnectionIdGenerator,
327 now: Instant,
328 version: u32,
329 allow_mtud: bool,
330 rng_seed: [u8; 32],
331 side_args: SideArgs,
332 qlog: QlogSink,
333 ) -> Self {
334 let pref_addr_cid = side_args.pref_addr_cid();
335 let path_validated = side_args.path_validated();
336 let connection_side = ConnectionSide::from(side_args);
337 let side = connection_side.side();
338 let mut rng = StdRng::from_seed(rng_seed);
339 let initial_space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
340 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
341 #[cfg(test)]
342 let data_space = match config.deterministic_packet_numbers {
343 true => PacketSpace::new_deterministic(now, SpaceId::Data),
344 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
345 };
346 #[cfg(not(test))]
347 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
348 let state = State::handshake(state::Handshake {
349 remote_cid_set: side.is_server(),
350 expected_token: Bytes::new(),
351 client_hello: None,
352 allow_server_migration: side.is_client(),
353 });
354 let local_cid_state = FxHashMap::from_iter([(
355 PathId::ZERO,
356 CidState::new(
357 cid_gen.cid_len(),
358 cid_gen.cid_lifetime(),
359 now,
360 if pref_addr_cid.is_some() { 2 } else { 1 },
361 ),
362 )]);
363
364 let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
365 path.open_status = paths::OpenStatus::Informed;
366 let mut this = Self {
367 endpoint_config,
368 crypto_state: CryptoState::new(crypto, init_cid, side, &mut rng),
369 handshake_cid: local_cid,
370 remote_handshake_cid: remote_cid,
371 local_cid_state,
372 paths: BTreeMap::from_iter([(
373 PathId::ZERO,
374 PathState {
375 data: path,
376 prev: None,
377 },
378 )]),
379 path_generation_counter: 0,
380 allow_mtud,
381 state,
382 side: connection_side,
383 peer_params: TransportParameters::default(),
384 original_remote_cid: remote_cid,
385 initial_dst_cid: init_cid,
386 retry_src_cid: None,
387 events: VecDeque::new(),
388 endpoint_events: VecDeque::new(),
389 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
390 spin: false,
391 spaces: [initial_space, handshake_space, data_space],
392 highest_space: SpaceKind::Initial,
393 permit_idle_reset: true,
394 idle_timeout: match config.max_idle_timeout {
395 None | Some(VarInt(0)) => None,
396 Some(dur) => Some(Duration::from_millis(dur.0)),
397 },
398 timers: TimerTable::default(),
399 authentication_failures: 0,
400 connection_close_pending: false,
401
402 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
403 &TransportParameters::default(),
404 )),
405
406 app_limited: false,
407 receiving_ecn: false,
408 total_authed_packets: 0,
409
410 next_observed_addr_seq_no: 0u32.into(),
411
412 streams: StreamsState::new(
413 side,
414 config.max_concurrent_uni_streams,
415 config.max_concurrent_bidi_streams,
416 config.send_window,
417 config.receive_window,
418 config.stream_receive_window,
419 ),
420 datagrams: DatagramState::default(),
421 config,
422 remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]),
423 rng,
424 stats: ConnectionStats::default(),
425 path_stats: Default::default(),
426 version,
427
428 max_concurrent_paths: NonZeroU32::MIN,
430 local_max_path_id: PathId::ZERO,
431 remote_max_path_id: PathId::ZERO,
432 max_path_id_with_cids: PathId::ZERO,
433 abandoned_paths: Default::default(),
434
435 n0_nat_traversal: Default::default(),
436 qlog,
437 };
438 if path_validated {
439 this.on_path_validated(PathId::ZERO);
440 }
441 if side.is_client() {
442 this.write_crypto();
444 this.init_0rtt(now);
445 }
446 this.qlog
447 .emit_tuple_assigned(PathId::ZERO, network_path, now);
448 this
449 }
450
451 #[must_use]
459 pub fn poll_timeout(&mut self) -> Option<Instant> {
460 self.timers.peek()
461 }
462
463 #[must_use]
469 pub fn poll(&mut self) -> Option<Event> {
470 if let Some(x) = self.events.pop_front() {
471 return Some(x);
472 }
473
474 if let Some(event) = self.streams.poll() {
475 return Some(Event::Stream(event));
476 }
477
478 if let Some(reason) = self.state.take_error() {
479 return Some(Event::ConnectionLost { reason });
480 }
481
482 None
483 }
484
485 #[must_use]
487 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
488 self.endpoint_events.pop_front().map(EndpointEvent)
489 }
490
491 #[must_use]
493 pub fn streams(&mut self) -> Streams<'_> {
494 Streams {
495 state: &mut self.streams,
496 conn_state: &self.state,
497 }
498 }
499
500 #[must_use]
502 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
503 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
504 RecvStream {
505 id,
506 state: &mut self.streams,
507 pending: &mut self.spaces[SpaceId::Data].pending,
508 }
509 }
510
511 #[must_use]
513 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
514 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
515 SendStream {
516 id,
517 state: &mut self.streams,
518 pending: &mut self.spaces[SpaceId::Data].pending,
519 conn_state: &self.state,
520 }
521 }
522
523 pub fn open_path_ensure(
540 &mut self,
541 network_path: FourTuple,
542 initial_status: PathStatus,
543 now: Instant,
544 ) -> Result<(PathId, bool), PathError> {
545 let existing_open_path = self.paths.iter().find(|(id, path)| {
546 network_path.is_probably_same_path(&path.data.network_path)
547 && !self.abandoned_paths.contains(*id)
548 });
549 match existing_open_path {
550 Some((path_id, _state)) => Ok((*path_id, true)),
551 None => Ok((self.open_path(network_path, initial_status, now)?, false)),
552 }
553 }
554
555 pub fn open_path(
560 &mut self,
561 network_path: FourTuple,
562 initial_status: PathStatus,
563 now: Instant,
564 ) -> Result<PathId, PathError> {
565 if !self.is_multipath_negotiated() {
566 return Err(PathError::MultipathNotNegotiated);
567 }
568 if self.side().is_server() {
569 return Err(PathError::ServerSideNotAllowed);
570 }
571
572 let max_abandoned = self.abandoned_paths.iter().max().copied();
573 let max_used = self.paths.keys().last().copied();
574 let path_id = max_abandoned
575 .max(max_used)
576 .unwrap_or(PathId::ZERO)
577 .saturating_add(1u8);
578
579 if Some(path_id) > self.max_path_id() {
580 return Err(PathError::MaxPathIdReached);
581 }
582 if path_id > self.remote_max_path_id {
583 self.spaces[SpaceId::Data].pending.paths_blocked = true;
584 return Err(PathError::MaxPathIdReached);
585 }
586 if self
587 .remote_cids
588 .get(&path_id)
589 .map(CidQueue::active)
590 .is_none()
591 {
592 self.spaces[SpaceId::Data]
593 .pending
594 .path_cids_blocked
595 .insert(path_id);
596 return Err(PathError::RemoteCidsExhausted);
597 }
598
599 let path = self.ensure_path(path_id, network_path, now, None);
600 path.status.local_update(initial_status);
601
602 Ok(path_id)
603 }
604
605 pub fn close_path(
611 &mut self,
612 now: Instant,
613 path_id: PathId,
614 error_code: VarInt,
615 ) -> Result<(), ClosePathError> {
616 self.close_path_inner(
617 now,
618 path_id,
619 PathAbandonReason::ApplicationClosed { error_code },
620 )
621 }
622
623 fn close_path_inner(
628 &mut self,
629 now: Instant,
630 path_id: PathId,
631 reason: PathAbandonReason,
632 ) -> Result<(), ClosePathError> {
633 if self.state.is_drained() {
634 return Ok(());
635 }
636
637 if !self.is_multipath_negotiated() {
638 return Err(ClosePathError::MultipathNotNegotiated);
639 }
640 if self.abandoned_paths.contains(&path_id)
641 || Some(path_id) > self.max_path_id()
642 || !self.paths.contains_key(&path_id)
643 {
644 return Err(ClosePathError::ClosedPath);
645 }
646
647 if reason.is_locally_initiated() {
648 let has_remaining_validated_paths = self.paths.iter().any(|(id, path)| {
649 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
650 });
651 if !has_remaining_validated_paths {
652 return Err(ClosePathError::LastOpenPath);
653 }
654 } else {
655 let has_remaining_paths = self
659 .paths
660 .keys()
661 .any(|id| *id != path_id && !self.abandoned_paths.contains(id));
662 if !has_remaining_paths {
663 return Err(ClosePathError::LastOpenPath);
664 }
665 }
666
667 trace!(%path_id, ?reason, "closing path");
668
669 self.spaces[SpaceId::Data]
671 .pending
672 .path_abandon
673 .insert(path_id, reason.error_code());
674
675 let pending_space = &mut self.spaces[SpaceId::Data].pending;
677 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
678 pending_space.path_cids_blocked.retain(|&id| id != path_id);
679 pending_space.path_status.retain(|&id| id != path_id);
680
681 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
683 for sent_packet in space.sent_packets.values_mut() {
684 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
685 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
686 retransmits.path_cids_blocked.retain(|&id| id != path_id);
687 retransmits.path_status.retain(|&id| id != path_id);
688 }
689 }
690 }
691
692 self.remote_cids.remove(&path_id);
698 debug_assert!(!self.state.is_drained()); self.endpoint_events
700 .push_back(EndpointEventInner::RetireResetToken(path_id));
701
702 trace!(%path_id, "abandoning path");
703 self.abandoned_paths.insert(path_id);
704
705 for timer in timer::PathTimer::VALUES {
706 let keep_timer = match timer {
708 PathTimer::PathValidation | PathTimer::PathChallengeLost | PathTimer::PathOpen => {
712 false
713 }
714 PathTimer::PathKeepAlive | PathTimer::PathIdle => false,
717 PathTimer::MaxAckDelay => false,
720 PathTimer::DiscardPath => false,
723 PathTimer::LossDetection => true,
726 PathTimer::Pacing => true,
730 };
731
732 if !keep_timer {
733 let qlog = self.qlog.with_time(now);
734 self.timers.stop(Timer::PerPath(path_id, timer), qlog);
735 }
736 }
737
738 self.events.push_back(Event::Path(PathEvent::Abandoned {
740 id: path_id,
741 reason,
742 }));
743
744 Ok(())
745 }
746
747 #[track_caller]
751 fn path_data(&self, path_id: PathId) -> &PathData {
752 if let Some(data) = self.paths.get(&path_id) {
753 &data.data
754 } else {
755 panic!(
756 "unknown path: {path_id}, currently known paths: {:?}",
757 self.paths.keys().collect::<Vec<_>>()
758 );
759 }
760 }
761
762 fn path(&self, path_id: PathId) -> Option<&PathData> {
764 self.paths.get(&path_id).map(|path_state| &path_state.data)
765 }
766
767 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
769 self.paths
770 .get_mut(&path_id)
771 .map(|path_state| &mut path_state.data)
772 }
773
774 pub fn paths(&self) -> Vec<PathId> {
778 self.paths.keys().copied().collect()
779 }
780
781 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
783 self.path(path_id)
784 .map(PathData::local_status)
785 .ok_or(ClosedPath { _private: () })
786 }
787
788 pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
790 self.path(path_id)
791 .map(|path| path.network_path)
792 .ok_or(ClosedPath { _private: () })
793 }
794
795 pub fn set_path_status(
799 &mut self,
800 path_id: PathId,
801 status: PathStatus,
802 ) -> Result<PathStatus, SetPathStatusError> {
803 if !self.is_multipath_negotiated() {
804 return Err(SetPathStatusError::MultipathNotNegotiated);
805 }
806 let path = self
807 .path_mut(path_id)
808 .ok_or(SetPathStatusError::ClosedPath)?;
809 let prev = match path.status.local_update(status) {
810 Some(prev) => {
811 self.spaces[SpaceId::Data]
812 .pending
813 .path_status
814 .insert(path_id);
815 prev
816 }
817 None => path.local_status(),
818 };
819 Ok(prev)
820 }
821
822 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
827 self.path(path_id).and_then(|path| path.remote_status())
828 }
829
830 pub fn set_path_max_idle_timeout(
836 &mut self,
837 path_id: PathId,
838 timeout: Option<Duration>,
839 ) -> Result<Option<Duration>, ClosedPath> {
840 let path = self
841 .paths
842 .get_mut(&path_id)
843 .ok_or(ClosedPath { _private: () })?;
844 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
845 }
846
847 pub fn set_path_keep_alive_interval(
853 &mut self,
854 path_id: PathId,
855 interval: Option<Duration>,
856 ) -> Result<Option<Duration>, ClosedPath> {
857 let path = self
858 .paths
859 .get_mut(&path_id)
860 .ok_or(ClosedPath { _private: () })?;
861 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
862 }
863
864 #[track_caller]
868 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
869 &mut self.paths.get_mut(&path_id).expect("known path").data
870 }
871
872 fn find_validated_path_on_network_path(
876 &self,
877 network_path: FourTuple,
878 ) -> Option<(&PathId, &PathState)> {
879 self.paths.iter().find(|(path_id, path_state)| {
880 path_state.data.validated
881 && network_path.is_probably_same_path(&path_state.data.network_path)
883 && !self.abandoned_paths.contains(path_id)
884 })
885 }
889
890 fn ensure_path(
894 &mut self,
895 path_id: PathId,
896 network_path: FourTuple,
897 now: Instant,
898 pn: Option<u64>,
899 ) -> &mut PathData {
900 let valid_path = self.find_validated_path_on_network_path(network_path);
901 let validated = valid_path.is_some();
902 let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
903 let vacant_entry = match self.paths.entry(path_id) {
904 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
905 btree_map::Entry::Occupied(occupied_entry) => {
906 return &mut occupied_entry.into_mut().data;
907 }
908 };
909
910 debug!(%validated, %path_id, %network_path, "path added");
911 let peer_max_udp_payload_size =
912 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
913 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
914 let mut data = PathData::new(
915 network_path,
916 self.allow_mtud,
917 Some(peer_max_udp_payload_size),
918 self.path_generation_counter,
919 now,
920 &self.config,
921 );
922
923 data.validated = validated;
924 if let Some(initial_rtt) = initial_rtt {
925 data.rtt.reset_initial_rtt(initial_rtt);
926 }
927
928 data.pending_on_path_challenge = true;
931
932 let path = vacant_entry.insert(PathState { data, prev: None });
933
934 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
935 if let Some(pn) = pn {
936 pn_space.dedup.insert(pn);
937 }
938 self.spaces[SpaceId::Data]
939 .number_spaces
940 .insert(path_id, pn_space);
941 self.qlog.emit_tuple_assigned(path_id, network_path, now);
942
943 if !self.remote_cids.contains_key(&path_id) {
947 debug!(%path_id, "Remote opened path without issuing CIDs");
948 self.spaces[SpaceId::Data]
949 .pending
950 .path_cids_blocked
951 .insert(path_id);
952 }
955
956 &mut path.data
957 }
958
959 #[must_use]
969 pub fn poll_transmit(
970 &mut self,
971 now: Instant,
972 max_datagrams: NonZeroUsize,
973 buf: &mut Vec<u8>,
974 ) -> Option<Transmit> {
975 let max_datagrams = match self.config.enable_segmentation_offload {
976 false => NonZeroUsize::MIN,
977 true => max_datagrams,
978 };
979
980 let connection_close_pending = match self.state.as_type() {
986 StateType::Drained => {
987 self.app_limited = true;
988 return None;
989 }
990 StateType::Draining | StateType::Closed => {
991 if !self.connection_close_pending {
994 self.app_limited = true;
995 return None;
996 }
997 true
998 }
999 _ => false,
1000 };
1001
1002 if let Some(config) = &self.config.ack_frequency_config {
1004 let rtt = self
1005 .paths
1006 .values()
1007 .map(|p| p.data.rtt.get())
1008 .min()
1009 .expect("one path exists");
1010 self.spaces[SpaceId::Data].pending.ack_frequency = self
1011 .ack_frequency
1012 .should_send_ack_frequency(rtt, config, &self.peer_params)
1013 && self.highest_space == SpaceKind::Data
1014 && self.peer_supports_ack_frequency();
1015 }
1016
1017 let mut congestion_blocked = false;
1020
1021 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1022 while let Some(path_id) = next_path_id {
1023 if !connection_close_pending
1024 && let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id)
1025 {
1026 return Some(transmit);
1027 }
1028
1029 let info = self.scheduling_info(path_id);
1030 match self.poll_transmit_on_path(
1031 now,
1032 buf,
1033 path_id,
1034 max_datagrams,
1035 &info,
1036 connection_close_pending,
1037 ) {
1038 PollPathStatus::Send(transmit) => {
1039 return Some(transmit);
1040 }
1041 PollPathStatus::NothingToSend {
1042 congestion_blocked: cb,
1043 } => {
1044 congestion_blocked |= cb;
1045 debug_assert!(
1048 buf.is_empty(),
1049 "nothing to send on path but buffer not empty"
1050 );
1051 }
1052 }
1053
1054 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1055 }
1056
1057 debug_assert!(
1059 buf.is_empty(),
1060 "there was data in the buffer, but it was not sent"
1061 );
1062
1063 self.app_limited = !congestion_blocked;
1064
1065 if self.state.is_established() {
1066 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1068 while let Some(path_id) = next_path_id {
1069 if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) {
1070 return Some(transmit);
1071 }
1072 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1073 }
1074 }
1075
1076 None
1077 }
1078
1079 fn scheduling_info(&self, path_id: PathId) -> PathSchedulingInfo {
1097 let have_validated_status_available_space = self.paths.iter().any(|(path_id, path)| {
1098 self.remote_cids.contains_key(path_id)
1099 && !self.abandoned_paths.contains(path_id)
1100 && path.data.validated
1101 && path.data.local_status() == PathStatus::Available
1102 });
1103 let is_handshaking = self.is_handshaking();
1104 let has_cids = self.remote_cids.contains_key(&path_id);
1105 let abandoned = self.abandoned_paths.contains(&path_id);
1106 let path_data = self.path_data(path_id);
1107 let validated = path_data.validated;
1108 let status = path_data.local_status();
1109
1110 let may_send_data = has_cids
1113 && !abandoned
1114 && if is_handshaking {
1115 true
1119 } else if !validated {
1120 false
1127 } else {
1128 match status {
1129 PathStatus::Available => {
1130 true
1132 }
1133 PathStatus::Backup => {
1134 !have_validated_status_available_space
1136 }
1137 }
1138 };
1139
1140 let may_send_close = has_cids
1145 && !abandoned
1146 && if !validated && have_validated_status_available_space {
1147 false
1149 } else {
1150 true
1152 };
1153
1154 PathSchedulingInfo {
1155 abandoned,
1156 may_send_data,
1157 may_send_close,
1158 }
1159 }
1160
1161 fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit {
1162 debug_assert!(
1163 !transmit.is_empty(),
1164 "must not be called with an empty transmit buffer"
1165 );
1166
1167 let network_path = self.path_data(path_id).network_path;
1168 trace!(
1169 segment_size = transmit.segment_size(),
1170 last_datagram_len = transmit.len() % transmit.segment_size(),
1171 %network_path,
1172 "sending {} bytes in {} datagrams",
1173 transmit.len(),
1174 transmit.num_datagrams()
1175 );
1176 self.path_data_mut(path_id)
1177 .inc_total_sent(transmit.len() as u64);
1178
1179 self.stats
1180 .udp_tx
1181 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1182 self.path_stats
1183 .entry(path_id)
1184 .or_default()
1185 .udp_tx
1186 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1187
1188 Transmit {
1189 destination: network_path.remote,
1190 size: transmit.len(),
1191 ecn: if self.path_data(path_id).sending_ecn {
1192 Some(EcnCodepoint::Ect0)
1193 } else {
1194 None
1195 },
1196 segment_size: match transmit.num_datagrams() {
1197 1 => None,
1198 _ => Some(transmit.segment_size()),
1199 },
1200 src_ip: network_path.local_ip,
1201 }
1202 }
1203
1204 fn poll_transmit_off_path(
1206 &mut self,
1207 now: Instant,
1208 buf: &mut Vec<u8>,
1209 path_id: PathId,
1210 ) -> Option<Transmit> {
1211 if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) {
1212 return Some(challenge);
1213 }
1214 if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
1215 return Some(response);
1216 }
1217 if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
1218 return Some(challenge);
1219 }
1220 None
1221 }
1222
1223 #[must_use]
1230 fn poll_transmit_on_path(
1231 &mut self,
1232 now: Instant,
1233 buf: &mut Vec<u8>,
1234 path_id: PathId,
1235 max_datagrams: NonZeroUsize,
1236 scheduling_info: &PathSchedulingInfo,
1237 connection_close_pending: bool,
1238 ) -> PollPathStatus {
1239 let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else {
1241 if !self.abandoned_paths.contains(&path_id) {
1242 debug!(%path_id, "no remote CIDs for path");
1243 }
1244 return PollPathStatus::NothingToSend {
1245 congestion_blocked: false,
1246 };
1247 };
1248
1249 let mut pad_datagram = PadDatagram::No;
1255
1256 let mut last_packet_number = None;
1260
1261 let mut congestion_blocked = false;
1264
1265 let pmtu = self.path_data(path_id).current_mtu().into();
1267 let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu);
1268
1269 for space_id in SpaceId::iter() {
1271 if path_id != PathId::ZERO && space_id != SpaceId::Data {
1273 continue;
1274 }
1275 match self.poll_transmit_path_space(
1276 now,
1277 &mut transmit,
1278 path_id,
1279 space_id,
1280 remote_cid,
1281 scheduling_info,
1282 connection_close_pending,
1283 pad_datagram,
1284 ) {
1285 PollPathSpaceStatus::NothingToSend {
1286 congestion_blocked: cb,
1287 } => {
1288 congestion_blocked |= cb;
1289 }
1292 PollPathSpaceStatus::WrotePacket {
1293 last_packet_number: pn,
1294 pad_datagram: pad,
1295 } => {
1296 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1297 last_packet_number = Some(pn);
1298 pad_datagram = pad;
1299 continue;
1304 }
1305 PollPathSpaceStatus::Send {
1306 last_packet_number: pn,
1307 } => {
1308 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1309 last_packet_number = Some(pn);
1310 break;
1311 }
1312 }
1313 }
1314
1315 if last_packet_number.is_some() || congestion_blocked {
1316 self.qlog.emit_recovery_metrics(
1317 path_id,
1318 &mut self.paths.get_mut(&path_id).unwrap().data,
1319 now,
1320 );
1321 }
1322
1323 match last_packet_number {
1324 Some(last_packet_number) => {
1325 self.path_data_mut(path_id).congestion.on_sent(
1328 now,
1329 transmit.len() as u64,
1330 last_packet_number,
1331 );
1332 PollPathStatus::Send(self.build_transmit(path_id, transmit))
1333 }
1334 None => PollPathStatus::NothingToSend { congestion_blocked },
1335 }
1336 }
1337
1338 #[must_use]
1340 fn poll_transmit_path_space(
1341 &mut self,
1342 now: Instant,
1343 transmit: &mut TransmitBuf<'_>,
1344 path_id: PathId,
1345 space_id: SpaceId,
1346 remote_cid: ConnectionId,
1347 scheduling_info: &PathSchedulingInfo,
1348 connection_close_pending: bool,
1350 mut pad_datagram: PadDatagram,
1352 ) -> PollPathSpaceStatus {
1353 let mut last_packet_number = None;
1356
1357 loop {
1373 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1375 transmit.datagram_remaining_mut()
1377 } else {
1378 transmit.segment_size()
1380 };
1381 let can_send =
1382 self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending);
1383 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1384 let space_will_send = {
1385 if scheduling_info.abandoned {
1386 false
1389 } else if can_send.close && scheduling_info.may_send_close {
1390 true
1392 } else if needs_loss_probe || can_send.space_specific {
1393 true
1396 } else {
1397 !can_send.is_empty() && scheduling_info.may_send_data
1400 }
1401 };
1402
1403 if !space_will_send {
1404 return match last_packet_number {
1407 Some(pn) => PollPathSpaceStatus::WrotePacket {
1408 last_packet_number: pn,
1409 pad_datagram,
1410 },
1411 None => {
1412 if self.crypto_state.has_keys(space_id.encryption_level())
1414 || (space_id == SpaceId::Data
1415 && self.crypto_state.has_keys(EncryptionLevel::ZeroRtt))
1416 {
1417 trace!(?space_id, %path_id, "nothing to send in space");
1418 }
1419 PollPathSpaceStatus::NothingToSend {
1420 congestion_blocked: false,
1421 }
1422 }
1423 };
1424 }
1425
1426 if transmit.datagram_remaining_mut() == 0 {
1430 let congestion_blocked =
1431 self.path_congestion_check(space_id, path_id, transmit, &can_send, now);
1432 if congestion_blocked != PathBlocked::No {
1433 return match last_packet_number {
1435 Some(pn) => PollPathSpaceStatus::WrotePacket {
1436 last_packet_number: pn,
1437 pad_datagram,
1438 },
1439 None => {
1440 return PollPathSpaceStatus::NothingToSend {
1441 congestion_blocked: true,
1442 };
1443 }
1444 };
1445 }
1446 }
1447
1448 if transmit.datagram_remaining_mut() == 0 {
1451 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1452 return match last_packet_number {
1455 Some(pn) => PollPathSpaceStatus::WrotePacket {
1456 last_packet_number: pn,
1457 pad_datagram,
1458 },
1459 None => {
1460 return PollPathSpaceStatus::NothingToSend {
1461 congestion_blocked: false,
1462 };
1463 }
1464 };
1465 }
1466
1467 match self.spaces[space_id].for_path(path_id).loss_probes {
1468 0 => transmit.start_new_datagram(),
1469 _ => {
1470 let request_immediate_ack =
1472 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1473 self.spaces[space_id].maybe_queue_probe(
1474 path_id,
1475 request_immediate_ack,
1476 &self.streams,
1477 );
1478
1479 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1480
1481 transmit.start_new_datagram_with_size(std::cmp::min(
1485 usize::from(INITIAL_MTU),
1486 transmit.segment_size(),
1487 ));
1488 }
1489 }
1490 trace!(count = transmit.num_datagrams(), "new datagram started");
1491
1492 pad_datagram = PadDatagram::No;
1494 }
1495
1496 if transmit.datagram_start_offset() < transmit.len() {
1499 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1500 }
1501
1502 if self.crypto_state.has_keys(EncryptionLevel::Initial)
1507 && space_id == SpaceId::Handshake
1508 && self.side.is_client()
1509 {
1510 self.discard_space(now, SpaceKind::Initial);
1513 }
1514 if let Some(ref mut prev) = self.crypto_state.prev_crypto {
1515 prev.update_unacked = false;
1516 }
1517
1518 let Some(mut builder) = PacketBuilder::new(
1519 now,
1520 space_id,
1521 path_id,
1522 remote_cid,
1523 transmit,
1524 can_send.is_ack_eliciting(),
1525 self,
1526 ) else {
1527 return PollPathSpaceStatus::NothingToSend {
1534 congestion_blocked: false,
1535 };
1536 };
1537 last_packet_number = Some(builder.packet_number);
1538
1539 if space_id == SpaceId::Initial
1540 && (self.side.is_client() || can_send.is_ack_eliciting())
1541 {
1542 pad_datagram |= PadDatagram::ToMinMtu;
1544 }
1545 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1546 pad_datagram |= PadDatagram::ToSegmentSize;
1547 }
1548
1549 if can_send.close {
1550 trace!("sending CONNECTION_CLOSE");
1551 let is_multipath_negotiated = self.is_multipath_negotiated();
1556 for path_id in self.spaces[space_id]
1557 .number_spaces
1558 .iter()
1559 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1560 .map(|(&path_id, _)| path_id)
1561 .collect::<Vec<_>>()
1562 {
1563 Self::populate_acks(
1564 now,
1565 self.receiving_ecn,
1566 path_id,
1567 space_id,
1568 &mut self.spaces[space_id],
1569 is_multipath_negotiated,
1570 &mut builder,
1571 &mut self.stats.frame_tx,
1572 self.crypto_state.has_keys(space_id.encryption_level()),
1573 );
1574 }
1575
1576 debug_assert!(
1584 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1585 "ACKs should leave space for ConnectionClose"
1586 );
1587 let stats = &mut self.stats.frame_tx;
1588 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1589 let max_frame_size = builder.frame_space_remaining();
1590 let close: Close = match self.state.as_type() {
1591 StateType::Closed => {
1592 let reason: Close =
1593 self.state.as_closed().expect("checked").clone().into();
1594 if space_id == SpaceId::Data || reason.is_transport_layer() {
1595 reason
1596 } else {
1597 TransportError::APPLICATION_ERROR("").into()
1598 }
1599 }
1600 StateType::Draining => TransportError::NO_ERROR("").into(),
1601 _ => unreachable!(
1602 "tried to make a close packet when the connection wasn't closed"
1603 ),
1604 };
1605 builder.write_frame(close.encoder(max_frame_size), stats);
1606 }
1607 let last_pn = builder.packet_number;
1608 builder.finish_and_track(now, self, path_id, pad_datagram);
1609 if space_id.kind() == self.highest_space {
1610 self.connection_close_pending = false;
1613 }
1614 return PollPathSpaceStatus::WrotePacket {
1627 last_packet_number: last_pn,
1628 pad_datagram,
1629 };
1630 }
1631
1632 self.populate_packet(now, space_id, path_id, scheduling_info, &mut builder);
1633
1634 debug_assert!(
1641 !(builder.sent_frames().is_ack_only(&self.streams)
1642 && !can_send.acks
1643 && (can_send.other || can_send.space_specific)
1644 && builder.buf.segment_size()
1645 == self.path_data(path_id).current_mtu() as usize
1646 && self.datagrams.outgoing.is_empty()),
1647 "SendableFrames was {can_send:?}, but only ACKs have been written"
1648 );
1649 if builder.sent_frames().requires_padding {
1650 pad_datagram |= PadDatagram::ToMinMtu;
1651 }
1652
1653 for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1654 self.spaces[space_id]
1655 .for_path(*path_id)
1656 .pending_acks
1657 .acks_sent();
1658 self.timers.stop(
1659 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1660 self.qlog.with_time(now),
1661 );
1662 }
1663
1664 if builder.can_coalesce && path_id == PathId::ZERO && {
1672 let max_packet_size = builder
1673 .buf
1674 .datagram_remaining_mut()
1675 .saturating_sub(builder.predict_packet_end());
1676 max_packet_size > MIN_PACKET_SPACE
1677 && self.has_pending_packet(space_id, max_packet_size, connection_close_pending)
1678 } {
1679 trace!("will coalesce with next packet");
1682 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1683 } else {
1684 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1690 const MAX_PADDING: usize = 32;
1698 if builder.buf.datagram_remaining_mut()
1699 > builder.predict_packet_end() + MAX_PADDING
1700 {
1701 trace!(
1702 "GSO truncated by demand for {} padding bytes",
1703 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1704 );
1705 let last_pn = builder.packet_number;
1706 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1707 return PollPathSpaceStatus::Send {
1708 last_packet_number: last_pn,
1709 };
1710 }
1711
1712 builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1715 } else {
1716 builder.finish_and_track(now, self, path_id, pad_datagram);
1717 }
1718
1719 if transmit.num_datagrams() == 1 {
1722 transmit.clip_segment_size();
1723 }
1724 }
1725 }
1726 }
1727
1728 fn poll_transmit_mtu_probe(
1729 &mut self,
1730 now: Instant,
1731 buf: &mut Vec<u8>,
1732 path_id: PathId,
1733 ) -> Option<Transmit> {
1734 let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?;
1735
1736 let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize);
1738 transmit.start_new_datagram_with_size(probe_size as usize);
1739
1740 let mut builder = PacketBuilder::new(
1741 now,
1742 SpaceId::Data,
1743 path_id,
1744 active_cid,
1745 &mut transmit,
1746 true,
1747 self,
1748 )?;
1749
1750 trace!(?probe_size, "writing MTUD probe");
1752 builder.write_frame(frame::Ping, &mut self.stats.frame_tx);
1753
1754 if self.peer_supports_ack_frequency() {
1756 builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx);
1757 }
1758
1759 builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1760
1761 self.path_stats
1762 .entry(path_id)
1763 .or_default()
1764 .sent_plpmtud_probes += 1;
1765
1766 Some(self.build_transmit(path_id, transmit))
1767 }
1768
1769 fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> {
1777 let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?;
1778 let is_eligible = self.path_data(path_id).validated
1779 && !self.path_data(path_id).is_validating_path()
1780 && !self.abandoned_paths.contains(&path_id);
1781
1782 if !is_eligible {
1783 return None;
1784 }
1785 let next_pn = self.spaces[SpaceId::Data]
1786 .for_path(path_id)
1787 .peek_tx_number();
1788 let probe_size = self
1789 .path_data_mut(path_id)
1790 .mtud
1791 .poll_transmit(now, next_pn)?;
1792
1793 Some((active_cid, probe_size))
1794 }
1795
1796 fn has_pending_packet(
1813 &mut self,
1814 current_space_id: SpaceId,
1815 max_packet_size: usize,
1816 connection_close_pending: bool,
1817 ) -> bool {
1818 let mut space_id = current_space_id;
1819 loop {
1820 let can_send = self.space_can_send(
1821 space_id,
1822 PathId::ZERO,
1823 max_packet_size,
1824 connection_close_pending,
1825 );
1826 if !can_send.is_empty() {
1827 return true;
1828 }
1829 match space_id.next() {
1830 Some(next_space_id) => space_id = next_space_id,
1831 None => break,
1832 }
1833 }
1834 false
1835 }
1836
1837 fn path_congestion_check(
1839 &mut self,
1840 space_id: SpaceId,
1841 path_id: PathId,
1842 transmit: &TransmitBuf<'_>,
1843 can_send: &SendableFrames,
1844 now: Instant,
1845 ) -> PathBlocked {
1846 if self.side().is_server()
1852 && self
1853 .path_data(path_id)
1854 .anti_amplification_blocked(transmit.len() as u64 + 1)
1855 {
1856 trace!(?space_id, %path_id, "blocked by anti-amplification");
1857 return PathBlocked::AntiAmplification;
1858 }
1859
1860 let bytes_to_send = transmit.segment_size() as u64;
1863 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1864
1865 if can_send.other && !need_loss_probe && !can_send.close {
1866 let path = self.path_data(path_id);
1867 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1868 trace!(
1869 ?space_id,
1870 %path_id,
1871 in_flight=%path.in_flight.bytes,
1872 congestion_window=%path.congestion.window(),
1873 "blocked by congestion control",
1874 );
1875 return PathBlocked::Congestion;
1876 }
1877 }
1878
1879 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1881 self.timers.set(
1882 Timer::PerPath(path_id, PathTimer::Pacing),
1883 delay,
1884 self.qlog.with_time(now),
1885 );
1886 trace!(?space_id, %path_id, "blocked by pacing");
1889 return PathBlocked::Pacing;
1890 }
1891
1892 PathBlocked::No
1893 }
1894
1895 fn send_prev_path_challenge(
1900 &mut self,
1901 now: Instant,
1902 buf: &mut Vec<u8>,
1903 path_id: PathId,
1904 ) -> Option<Transmit> {
1905 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1906 if !prev_path.pending_on_path_challenge {
1909 return None;
1910 };
1911 prev_path.pending_on_path_challenge = false;
1912 let token = self.rng.random();
1913 let network_path = prev_path.network_path;
1914 prev_path.record_path_challenge_sent(now, token, network_path);
1915
1916 debug_assert_eq!(
1917 self.highest_space,
1918 SpaceKind::Data,
1919 "PATH_CHALLENGE queued without 1-RTT keys"
1920 );
1921 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1922 buf.start_new_datagram();
1923
1924 let mut builder =
1930 PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, false, self)?;
1931 let challenge = frame::PathChallenge(token);
1932 let stats = &mut self.stats.frame_tx;
1933 builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1934
1935 builder.pad_to(MIN_INITIAL_SIZE);
1940
1941 builder.finish(self, now);
1942 self.stats.udp_tx.on_sent(1, buf.len());
1943 self.path_stats
1944 .entry(path_id)
1945 .or_default()
1946 .udp_tx
1947 .on_sent(1, buf.len());
1948
1949 Some(Transmit {
1950 destination: network_path.remote,
1951 size: buf.len(),
1952 ecn: None,
1953 segment_size: None,
1954 src_ip: network_path.local_ip,
1955 })
1956 }
1957
1958 fn send_off_path_path_response(
1959 &mut self,
1960 now: Instant,
1961 buf: &mut Vec<u8>,
1962 path_id: PathId,
1963 ) -> Option<Transmit> {
1964 let path = self.paths.get_mut(&path_id).map(|state| &mut state.data)?;
1965 let cid_queue = self.remote_cids.get_mut(&path_id)?;
1966 let (token, network_path) = path.path_responses.pop_off_path(path.network_path)?;
1967
1968 let cid = cid_queue
1969 .next_reserved()
1970 .unwrap_or_else(|| cid_queue.active());
1971 let frame = frame::PathResponse(token);
1975
1976 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1977 buf.start_new_datagram();
1978
1979 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, false, self)?;
1980 let stats = &mut self.stats.frame_tx;
1981 builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
1982 builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
1983
1984 let size = buf.len();
1985
1986 self.stats.udp_tx.on_sent(1, size);
1987 self.path_stats
1988 .entry(path_id)
1989 .or_default()
1990 .udp_tx
1991 .on_sent(1, size);
1992 Some(Transmit {
1993 destination: network_path.remote,
1994 size,
1995 ecn: None,
1996 segment_size: None,
1997 src_ip: network_path.local_ip,
1998 })
1999 }
2000
2001 fn send_nat_traversal_path_challenge(
2006 &mut self,
2007 now: Instant,
2008 buf: &mut Vec<u8>,
2009 path_id: PathId,
2010 ) -> Option<Transmit> {
2011 let server_side = self.n0_nat_traversal.server_side_mut().ok()?;
2012 let probe = server_side.next_probe()?;
2013 if !self.paths.get(&path_id)?.data.validated {
2014 return None;
2016 }
2017
2018 let remote_cids = self.remote_cids.get_mut(&path_id)?;
2019
2020 if remote_cids.remaining() < 2 {
2023 return None;
2024 }
2025
2026 let cid = remote_cids.next_reserved()?;
2027 let remote = probe.remote();
2028 let token = self.rng.random();
2029 probe.mark_as_sent();
2030
2031 let frame = frame::PathChallenge(token);
2032
2033 let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2034 buf.start_new_datagram();
2035
2036 let mut builder =
2037 PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, false, self)?;
2038 let stats = &mut self.stats.frame_tx;
2039 builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
2040 builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
2041
2042 let path = &mut self.paths.get_mut(&path_id).expect("checked").data;
2043 let network_path = FourTuple {
2044 remote,
2045 local_ip: None,
2046 };
2047
2048 path.record_path_challenge_sent(now, token, network_path);
2049
2050 let size = buf.len();
2051
2052 self.stats.udp_tx.on_sent(1, size);
2053 self.path_stats
2054 .entry(path_id)
2055 .or_default()
2056 .udp_tx
2057 .on_sent(1, size);
2058
2059 Some(Transmit {
2060 destination: remote,
2061 size,
2062 ecn: None,
2063 segment_size: None,
2064 src_ip: None,
2065 })
2066 }
2067
2068 fn space_can_send(
2076 &mut self,
2077 space_id: SpaceId,
2078 path_id: PathId,
2079 packet_size: usize,
2080 connection_close_pending: bool,
2081 ) -> SendableFrames {
2082 let space = &mut self.spaces[space_id];
2083 let space_has_crypto = self.crypto_state.has_keys(space_id.encryption_level());
2084
2085 if !space_has_crypto
2086 && (space_id != SpaceId::Data
2087 || !self.crypto_state.has_keys(EncryptionLevel::ZeroRtt)
2088 || self.side.is_server())
2089 {
2090 return SendableFrames::empty();
2092 }
2093
2094 let mut can_send = space.can_send(path_id, &self.streams);
2095
2096 if space_id == SpaceId::Data {
2098 let pn = space.for_path(path_id).peek_tx_number();
2099 let frame_space_1rtt =
2105 packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
2106 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
2107 }
2108
2109 can_send.close = connection_close_pending && space_has_crypto;
2110
2111 can_send
2112 }
2113
2114 pub fn handle_event(&mut self, event: ConnectionEvent) {
2120 use ConnectionEventInner::*;
2121 match event.0 {
2122 Datagram(DatagramConnectionEvent {
2123 now,
2124 network_path,
2125 path_id,
2126 ecn,
2127 first_decode,
2128 remaining,
2129 }) => {
2130 let span = trace_span!("pkt", %path_id);
2131 let _guard = span.enter();
2132
2133 if self.update_network_path_or_discard(network_path, path_id) {
2134 return;
2136 }
2137
2138 let was_anti_amplification_blocked = self
2139 .path(path_id)
2140 .map(|path| path.anti_amplification_blocked(1))
2141 .unwrap_or(false);
2144
2145 self.stats.udp_rx.datagrams += 1;
2146 self.stats.udp_rx.bytes += first_decode.len() as u64;
2147 let rx = &mut self.path_stats.entry(path_id).or_default().udp_rx;
2148 rx.datagrams += 1;
2149 rx.bytes += first_decode.len() as u64;
2150 let data_len = first_decode.len();
2151
2152 self.handle_decode(now, network_path, path_id, ecn, first_decode);
2153 if let Some(path) = self.path_mut(path_id) {
2158 path.inc_total_recvd(data_len as u64);
2159 }
2160
2161 if let Some(data) = remaining {
2162 self.stats.udp_rx.bytes += data.len() as u64;
2163 self.path_stats.entry(path_id).or_default().udp_rx.bytes += data.len() as u64;
2164 self.handle_coalesced(now, network_path, path_id, ecn, data);
2165 }
2166
2167 if let Some(path) = self.paths.get_mut(&path_id) {
2168 self.qlog
2169 .emit_recovery_metrics(path_id, &mut path.data, now);
2170 }
2171
2172 if was_anti_amplification_blocked {
2173 self.set_loss_detection_timer(now, path_id);
2177 }
2178 }
2179 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
2180 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
2181 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
2182 let cid_state = self
2183 .local_cid_state
2184 .entry(path_id)
2185 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
2186 cid_state.new_cids(&ids, now);
2187
2188 ids.into_iter().rev().for_each(|frame| {
2189 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
2190 });
2191 self.reset_cid_retirement(now);
2193 }
2194 }
2195 }
2196
2197 fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
2202 let remote_may_migrate = self.side.remote_may_migrate(&self.state);
2203 let local_ip_may_migrate = self.side.is_client();
2204 if let Some(known_path) = self.path_mut(path_id) {
2208 if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
2209 trace!(
2210 %path_id,
2211 %network_path,
2212 %known_path.network_path,
2213 "discarding packet from unrecognized peer"
2214 );
2215 return true;
2216 }
2217
2218 if known_path.network_path.local_ip.is_some()
2219 && network_path.local_ip.is_some()
2220 && known_path.network_path.local_ip != network_path.local_ip
2221 && !local_ip_may_migrate
2222 {
2223 trace!(
2224 %path_id,
2225 %network_path,
2226 %known_path.network_path,
2227 "discarding packet sent to incorrect interface"
2228 );
2229 return true;
2230 }
2231 if let Some(local_ip) = network_path.local_ip {
2236 if known_path
2237 .network_path
2238 .local_ip
2239 .is_some_and(|ip| ip != local_ip)
2240 {
2241 debug!(
2242 %path_id,
2243 %network_path,
2244 %known_path.network_path,
2245 "path's local address seemingly migrated"
2246 );
2247 }
2248 known_path.network_path.local_ip = Some(local_ip);
2255 }
2256 }
2257 false
2258 }
2259
2260 pub fn handle_timeout(&mut self, now: Instant) {
2270 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
2271 trace!(?timer, at=?now, "timeout");
2273 match timer {
2274 Timer::Conn(timer) => match timer {
2275 ConnTimer::Close => {
2276 self.state.move_to_drained(None);
2277 self.endpoint_events.push_back(EndpointEventInner::Drained);
2280 }
2281 ConnTimer::Idle => {
2282 self.kill(ConnectionError::TimedOut);
2283 }
2284 ConnTimer::KeepAlive => {
2285 trace!("sending keep-alive");
2286 self.ping();
2287 }
2288 ConnTimer::KeyDiscard => {
2289 self.crypto_state.discard_temporary_keys();
2290 }
2291 ConnTimer::PushNewCid => {
2292 while let Some((path_id, when)) = self.next_cid_retirement() {
2293 if when > now {
2294 break;
2295 }
2296 match self.local_cid_state.get_mut(&path_id) {
2297 None => error!(%path_id, "No local CID state for path"),
2298 Some(cid_state) => {
2299 let num_new_cid = cid_state.on_cid_timeout().into();
2301 if !self.state.is_closed() {
2302 trace!(
2303 "push a new CID to peer RETIRE_PRIOR_TO field {}",
2304 cid_state.retire_prior_to()
2305 );
2306 self.endpoint_events.push_back(
2307 EndpointEventInner::NeedIdentifiers(
2308 path_id,
2309 now,
2310 num_new_cid,
2311 ),
2312 );
2313 }
2314 }
2315 }
2316 }
2317 }
2318 },
2319 Timer::PerPath(path_id, timer) => {
2321 let span = trace_span!("per-path timer fired", %path_id, ?timer);
2322 let _guard = span.enter();
2323 match timer {
2324 PathTimer::PathIdle => {
2325 if let Err(err) =
2326 self.close_path_inner(now, path_id, PathAbandonReason::TimedOut)
2327 {
2328 warn!(?err, "failed closing path");
2329 }
2330 }
2331
2332 PathTimer::PathKeepAlive => {
2333 trace!("sending keep-alive on path");
2334 self.ping_path(path_id).ok();
2335 }
2336 PathTimer::LossDetection => {
2337 self.on_loss_detection_timeout(now, path_id);
2338 self.qlog.emit_recovery_metrics(
2339 path_id,
2340 &mut self.paths.get_mut(&path_id).unwrap().data,
2341 now,
2342 );
2343 }
2344 PathTimer::PathValidation => {
2345 let Some(path) = self.paths.get_mut(&path_id) else {
2346 continue;
2347 };
2348 self.timers.stop(
2349 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2350 self.qlog.with_time(now),
2351 );
2352 debug!("path validation failed");
2353 if let Some((_, prev)) = path.prev.take() {
2354 path.data = prev;
2355 }
2356 path.data.reset_on_path_challenges();
2357 }
2358 PathTimer::PathChallengeLost => {
2359 let Some(path) = self.paths.get_mut(&path_id) else {
2360 continue;
2361 };
2362 trace!("path challenge deemed lost");
2363 path.data.pending_on_path_challenge = true;
2364 }
2365 PathTimer::PathOpen => {
2366 let Some(path) = self.paths.get_mut(&path_id) else {
2367 continue;
2368 };
2369 path.data.reset_on_path_challenges();
2370 self.timers.stop(
2371 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2372 self.qlog.with_time(now),
2373 );
2374 debug!("new path validation failed");
2375 if let Err(err) = self.close_path_inner(
2376 now,
2377 path_id,
2378 PathAbandonReason::ValidationFailed,
2379 ) {
2380 warn!(?err, "failed closing path");
2381 }
2382 }
2383 PathTimer::Pacing => trace!("pacing timer expired"),
2384 PathTimer::MaxAckDelay => {
2385 trace!("max ack delay reached");
2386 self.spaces[SpaceId::Data]
2388 .for_path(path_id)
2389 .pending_acks
2390 .on_max_ack_delay_timeout()
2391 }
2392 PathTimer::DiscardPath => {
2393 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2396 if let Some(local_cid_state) = self.local_cid_state.remove(&path_id) {
2397 debug_assert!(!self.state.is_drained()); let (min_seq, max_seq) = local_cid_state.active_seq();
2399 for seq in min_seq..=max_seq {
2400 self.endpoint_events.push_back(
2401 EndpointEventInner::RetireConnectionId(
2402 now, path_id, seq, false,
2403 ),
2404 );
2405 }
2406 }
2407 self.discard_path(path_id, now);
2408 }
2409 }
2410 }
2411 }
2412 }
2413 }
2414
2415 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2427 self.close_inner(
2428 now,
2429 Close::Application(frame::ApplicationClose { error_code, reason }),
2430 )
2431 }
2432
2433 fn close_inner(&mut self, now: Instant, reason: Close) {
2449 let was_closed = self.state.is_closed();
2450 if !was_closed {
2451 self.close_common();
2452 self.set_close_timer(now);
2453 self.connection_close_pending = true;
2454 self.state.move_to_closed_local(reason);
2455 }
2456 }
2457
2458 pub fn datagrams(&mut self) -> Datagrams<'_> {
2460 Datagrams { conn: self }
2461 }
2462
2463 pub fn stats(&mut self) -> ConnectionStats {
2465 self.stats.clone()
2466 }
2467
2468 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2470 let path = self.paths.get(&path_id)?;
2471 let stats = self.path_stats.entry(path_id).or_default();
2472 stats.rtt = path.data.rtt.get();
2473 stats.cwnd = path.data.congestion.window();
2474 stats.current_mtu = path.data.mtud.current_mtu();
2475 Some(*stats)
2476 }
2477
2478 pub fn ping(&mut self) {
2482 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2485 path_data.ping_pending = true;
2486 }
2487 }
2488
2489 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2493 let path_data = self.spaces[self.highest_space]
2494 .number_spaces
2495 .get_mut(&path)
2496 .ok_or(ClosedPath { _private: () })?;
2497 path_data.ping_pending = true;
2498 Ok(())
2499 }
2500
2501 pub fn force_key_update(&mut self) {
2505 if !self.state.is_established() {
2506 debug!("ignoring forced key update in illegal state");
2507 return;
2508 }
2509 if self.crypto_state.prev_crypto.is_some() {
2510 debug!("ignoring redundant forced key update");
2513 return;
2514 }
2515 self.crypto_state.update_keys(None, false);
2516 }
2517
2518 pub fn crypto_session(&self) -> &dyn crypto::Session {
2520 self.crypto_state.session.as_ref()
2521 }
2522
2523 pub fn is_handshaking(&self) -> bool {
2528 self.state.is_handshake()
2529 }
2530
2531 pub fn is_closed(&self) -> bool {
2539 self.state.is_closed()
2540 }
2541
2542 pub fn is_drained(&self) -> bool {
2547 self.state.is_drained()
2548 }
2549
2550 pub fn accepted_0rtt(&self) -> bool {
2554 self.crypto_state.accepted_0rtt
2555 }
2556
2557 pub fn has_0rtt(&self) -> bool {
2559 self.crypto_state.zero_rtt_enabled
2560 }
2561
2562 pub fn has_pending_retransmits(&self) -> bool {
2564 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2565 }
2566
2567 pub fn side(&self) -> Side {
2569 self.side.side()
2570 }
2571
2572 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2574 self.path(path_id)
2575 .map(|path_data| {
2576 path_data
2577 .last_observed_addr_report
2578 .as_ref()
2579 .map(|observed| observed.socket_addr())
2580 })
2581 .ok_or(ClosedPath { _private: () })
2582 }
2583
2584 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2586 self.path(path_id).map(|d| d.rtt.get())
2587 }
2588
2589 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2591 self.path(path_id).map(|d| d.congestion.as_ref())
2592 }
2593
2594 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2599 self.streams.set_max_concurrent(dir, count);
2600 let pending = &mut self.spaces[SpaceId::Data].pending;
2603 self.streams.queue_max_stream_id(pending);
2604 }
2605
2606 pub fn set_max_concurrent_paths(
2616 &mut self,
2617 now: Instant,
2618 count: NonZeroU32,
2619 ) -> Result<(), MultipathNotNegotiated> {
2620 if !self.is_multipath_negotiated() {
2621 return Err(MultipathNotNegotiated { _private: () });
2622 }
2623 self.max_concurrent_paths = count;
2624
2625 let in_use_count = self
2626 .local_max_path_id
2627 .next()
2628 .saturating_sub(self.abandoned_paths.len() as u32)
2629 .as_u32();
2630 let extra_needed = count.get().saturating_sub(in_use_count);
2631 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2632
2633 self.set_max_path_id(now, new_max_path_id);
2634
2635 Ok(())
2636 }
2637
2638 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2640 if max_path_id <= self.local_max_path_id {
2641 return;
2642 }
2643
2644 self.local_max_path_id = max_path_id;
2645 self.spaces[SpaceId::Data].pending.max_path_id = true;
2646
2647 self.issue_first_path_cids(now);
2648 }
2649
2650 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2656 self.streams.max_concurrent(dir)
2657 }
2658
2659 pub fn set_send_window(&mut self, send_window: u64) {
2661 self.streams.set_send_window(send_window);
2662 }
2663
2664 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2666 if self.streams.set_receive_window(receive_window) {
2667 self.spaces[SpaceId::Data].pending.max_data = true;
2668 }
2669 }
2670
2671 pub fn is_multipath_negotiated(&self) -> bool {
2676 !self.is_handshaking()
2677 && self.config.max_concurrent_multipath_paths.is_some()
2678 && self.peer_params.initial_max_path_id.is_some()
2679 }
2680
2681 fn on_ack_received(
2682 &mut self,
2683 now: Instant,
2684 space: SpaceId,
2685 ack: frame::Ack,
2686 ) -> Result<(), TransportError> {
2687 let path = PathId::ZERO;
2689 self.inner_on_ack_received(now, space, path, ack)
2690 }
2691
2692 fn on_path_ack_received(
2693 &mut self,
2694 now: Instant,
2695 space: SpaceId,
2696 path_ack: frame::PathAck,
2697 ) -> Result<(), TransportError> {
2698 let (ack, path) = path_ack.into_ack();
2699 self.inner_on_ack_received(now, space, path, ack)
2700 }
2701
2702 fn inner_on_ack_received(
2704 &mut self,
2705 now: Instant,
2706 space: SpaceId,
2707 path: PathId,
2708 ack: frame::Ack,
2709 ) -> Result<(), TransportError> {
2710 if self.abandoned_paths.contains(&path) {
2711 trace!("silently ignoring PATH_ACK on abandoned path");
2714 return Ok(());
2715 }
2716 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2717 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2718 }
2719 let new_largest = {
2720 let space = &mut self.spaces[space].for_path(path);
2721 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2722 space.largest_acked_packet = Some(ack.largest);
2723 if let Some(info) = space.sent_packets.get(ack.largest) {
2724 space.largest_acked_packet_sent = info.time_sent;
2728 }
2729 true
2730 } else {
2731 false
2732 }
2733 };
2734
2735 if self.detect_spurious_loss(&ack, space, path) {
2736 self.path_data_mut(path)
2737 .congestion
2738 .on_spurious_congestion_event();
2739 }
2740
2741 let mut newly_acked = ArrayRangeSet::new();
2743 for range in ack.iter() {
2744 self.spaces[space].for_path(path).check_ack(range.clone())?;
2745 for (pn, _) in self.spaces[space]
2746 .for_path(path)
2747 .sent_packets
2748 .iter_range(range)
2749 {
2750 newly_acked.insert_one(pn);
2751 }
2752 }
2753
2754 if newly_acked.is_empty() {
2755 return Ok(());
2756 }
2757
2758 let mut ack_eliciting_acked = false;
2759 for packet in newly_acked.elts() {
2760 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2761 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2762 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2768 pns.pending_acks.subtract_below(*acked_pn);
2769 }
2770 }
2771 ack_eliciting_acked |= info.ack_eliciting;
2772
2773 let path_data = self.path_data_mut(path);
2775 let mtu_updated = path_data.mtud.on_acked(space.kind(), packet, info.size);
2776 if mtu_updated {
2777 path_data
2778 .congestion
2779 .on_mtu_update(path_data.mtud.current_mtu());
2780 }
2781
2782 self.ack_frequency.on_acked(path, packet);
2784
2785 self.on_packet_acked(now, path, info);
2786 }
2787 }
2788
2789 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2790 let app_limited = self.app_limited;
2791 let path_data = self.path_data_mut(path);
2792 let in_flight = path_data.in_flight.bytes;
2793
2794 path_data
2795 .congestion
2796 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2797
2798 if new_largest && ack_eliciting_acked {
2799 let ack_delay = if space != SpaceId::Data {
2800 Duration::from_micros(0)
2801 } else {
2802 cmp::min(
2803 self.ack_frequency.peer_max_ack_delay,
2804 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2805 )
2806 };
2807 let rtt = now.saturating_duration_since(
2808 self.spaces[space].for_path(path).largest_acked_packet_sent,
2809 );
2810
2811 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2812 let path_data = self.path_data_mut(path);
2813 path_data.rtt.update(ack_delay, rtt);
2815 if path_data.first_packet_after_rtt_sample.is_none() {
2816 path_data.first_packet_after_rtt_sample = Some((space.kind(), next_pn));
2817 }
2818 }
2819
2820 self.detect_lost_packets(now, space, path, true);
2822
2823 if self.peer_completed_address_validation(path) {
2824 self.path_data_mut(path).pto_count = 0;
2825 }
2826
2827 if self.path_data(path).sending_ecn {
2832 if let Some(ecn) = ack.ecn {
2833 if new_largest {
2838 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2839 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2840 }
2841 } else {
2842 debug!("ECN not acknowledged by peer");
2844 self.path_data_mut(path).sending_ecn = false;
2845 }
2846 }
2847
2848 self.set_loss_detection_timer(now, path);
2849 Ok(())
2850 }
2851
2852 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2853 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2854
2855 if lost_packets.is_empty() {
2856 return false;
2857 }
2858
2859 for range in ack.iter() {
2860 let spurious_losses: Vec<u64> = lost_packets
2861 .iter_range(range.clone())
2862 .map(|(pn, _info)| pn)
2863 .collect();
2864
2865 for pn in spurious_losses {
2866 lost_packets.remove(pn);
2867 }
2868 }
2869
2870 lost_packets.is_empty()
2875 }
2876
2877 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2882 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2883
2884 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2885 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2886 }
2887
2888 fn process_ecn(
2890 &mut self,
2891 now: Instant,
2892 space: SpaceId,
2893 path: PathId,
2894 newly_acked: u64,
2895 ecn: frame::EcnCounts,
2896 largest_sent_time: Instant,
2897 ) {
2898 match self.spaces[space]
2899 .for_path(path)
2900 .detect_ecn(newly_acked, ecn)
2901 {
2902 Err(e) => {
2903 debug!("halting ECN due to verification failure: {}", e);
2904
2905 self.path_data_mut(path).sending_ecn = false;
2906 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2909 }
2910 Ok(false) => {}
2911 Ok(true) => {
2912 self.path_stats.entry(path).or_default().congestion_events += 1;
2913 self.path_data_mut(path).congestion.on_congestion_event(
2914 now,
2915 largest_sent_time,
2916 false,
2917 true,
2918 0,
2919 );
2920 }
2921 }
2922 }
2923
2924 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2927 self.paths
2928 .get_mut(&path_id)
2929 .expect("known path")
2930 .remove_in_flight(&info);
2931 let app_limited = self.app_limited;
2932 let path = self.path_data_mut(path_id);
2933 if info.ack_eliciting && !path.is_validating_path() {
2934 let rtt = path.rtt;
2937 path.congestion
2938 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2939 }
2940
2941 if let Some(retransmits) = info.retransmits.get() {
2943 for (id, _) in retransmits.reset_stream.iter() {
2944 self.streams.reset_acked(*id);
2945 }
2946 }
2947
2948 for frame in info.stream_frames {
2949 self.streams.received_ack_of(frame);
2950 }
2951 }
2952
2953 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceKind) {
2954 let start = if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) {
2955 now
2956 } else {
2957 self.crypto_state
2958 .prev_crypto
2959 .as_ref()
2960 .expect("no previous keys")
2961 .end_packet
2962 .as_ref()
2963 .expect("update not acknowledged yet")
2964 .1
2965 };
2966
2967 self.timers.set(
2969 Timer::Conn(ConnTimer::KeyDiscard),
2970 start + self.max_pto_for_space(space) * 3,
2971 self.qlog.with_time(now),
2972 );
2973 }
2974
2975 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2988 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2989 self.detect_lost_packets(now, pn_space, path_id, false);
2991 self.set_loss_detection_timer(now, path_id);
2992 return;
2993 }
2994
2995 let Some((_, space)) = self.pto_time_and_space(now, path_id) else {
2996 error!(%path_id, "PTO expired while unset");
2997 return;
2998 };
2999 trace!(
3000 in_flight = self.path_data(path_id).in_flight.bytes,
3001 count = self.path_data(path_id).pto_count,
3002 ?space,
3003 %path_id,
3004 "PTO fired"
3005 );
3006
3007 let count = match self.path_data(path_id).in_flight.ack_eliciting {
3008 0 => {
3011 debug_assert!(!self.peer_completed_address_validation(path_id));
3012 1
3013 }
3014 _ => 2,
3016 };
3017 let pns = self.spaces[space].for_path(path_id);
3018 pns.loss_probes = pns.loss_probes.saturating_add(count);
3019 let path_data = self.path_data_mut(path_id);
3020 path_data.pto_count = path_data.pto_count.saturating_add(1);
3021 self.set_loss_detection_timer(now, path_id);
3022 }
3023
3024 fn detect_lost_packets(
3041 &mut self,
3042 now: Instant,
3043 pn_space: SpaceId,
3044 path_id: PathId,
3045 due_to_ack: bool,
3046 ) {
3047 let mut lost_packets = Vec::<u64>::new();
3048 let mut lost_mtu_probe = None;
3049 let mut in_persistent_congestion = false;
3050 let mut size_of_lost_packets = 0u64;
3051 self.spaces[pn_space].for_path(path_id).loss_time = None;
3052
3053 let path = self.path_data(path_id);
3056 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3057 let loss_delay = path
3058 .rtt
3059 .conservative()
3060 .mul_f32(self.config.time_threshold)
3061 .max(TIMER_GRANULARITY);
3062 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
3063
3064 let largest_acked_packet = self.spaces[pn_space]
3065 .for_path(path_id)
3066 .largest_acked_packet
3067 .expect("detect_lost_packets only to be called if path received at least one ACK");
3068 let packet_threshold = self.config.packet_threshold as u64;
3069
3070 let congestion_period = self
3074 .pto(SpaceKind::Data, path_id)
3075 .saturating_mul(self.config.persistent_congestion_threshold);
3076 let mut persistent_congestion_start: Option<Instant> = None;
3077 let mut prev_packet = None;
3078 let space = self.spaces[pn_space].for_path(path_id);
3079
3080 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
3081 if prev_packet != Some(packet.wrapping_sub(1)) {
3082 persistent_congestion_start = None;
3084 }
3085
3086 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
3090 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
3091 if Some(packet) == in_flight_mtu_probe {
3093 lost_mtu_probe = in_flight_mtu_probe;
3096 } else {
3097 lost_packets.push(packet);
3098 size_of_lost_packets += info.size as u64;
3099 if info.ack_eliciting && due_to_ack {
3100 match persistent_congestion_start {
3101 Some(start) if info.time_sent - start > congestion_period => {
3104 in_persistent_congestion = true;
3105 }
3106 None if first_packet_after_rtt_sample
3108 .is_some_and(|x| x < (pn_space.kind(), packet)) =>
3109 {
3110 persistent_congestion_start = Some(info.time_sent);
3111 }
3112 _ => {}
3113 }
3114 }
3115 }
3116 } else {
3117 if space.loss_time.is_none() {
3119 space.loss_time = Some(info.time_sent + loss_delay);
3122 }
3123 persistent_congestion_start = None;
3124 }
3125
3126 prev_packet = Some(packet);
3127 }
3128
3129 self.handle_lost_packets(
3130 pn_space,
3131 path_id,
3132 now,
3133 lost_packets,
3134 lost_mtu_probe,
3135 loss_delay,
3136 in_persistent_congestion,
3137 size_of_lost_packets,
3138 );
3139 }
3140
3141 fn discard_path(&mut self, path_id: PathId, now: Instant) {
3143 trace!(%path_id, "dropping path state");
3144 let path = self.path_data(path_id);
3145 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3146
3147 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
3149 .for_path(path_id)
3150 .sent_packets
3151 .iter()
3152 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
3153 .map(|(pn, info)| {
3154 size_of_lost_packets += info.size as u64;
3155 pn
3156 })
3157 .collect();
3158
3159 if !lost_pns.is_empty() {
3160 trace!(
3161 %path_id,
3162 count = lost_pns.len(),
3163 lost_bytes = size_of_lost_packets,
3164 "packets lost on path abandon"
3165 );
3166 self.handle_lost_packets(
3167 SpaceId::Data,
3168 path_id,
3169 now,
3170 lost_pns,
3171 in_flight_mtu_probe,
3172 Duration::ZERO,
3173 false,
3174 size_of_lost_packets,
3175 );
3176 }
3177 let path_stats = self.path_stats(path_id).unwrap_or_default();
3180 self.path_stats.remove(&path_id);
3181 self.paths.remove(&path_id);
3182 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
3183
3184 self.events.push_back(
3185 PathEvent::Discarded {
3186 id: path_id,
3187 path_stats,
3188 }
3189 .into(),
3190 );
3191 }
3192
3193 fn handle_lost_packets(
3194 &mut self,
3195 pn_space: SpaceId,
3196 path_id: PathId,
3197 now: Instant,
3198 lost_packets: Vec<u64>,
3199 lost_mtu_probe: Option<u64>,
3200 loss_delay: Duration,
3201 in_persistent_congestion: bool,
3202 size_of_lost_packets: u64,
3203 ) {
3204 debug_assert!(
3205 {
3206 let mut sorted = lost_packets.clone();
3207 sorted.sort();
3208 sorted == lost_packets
3209 },
3210 "lost_packets must be sorted"
3211 );
3212
3213 self.drain_lost_packets(now, pn_space, path_id);
3214
3215 if let Some(largest_lost) = lost_packets.last().cloned() {
3217 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
3218 let largest_lost_sent = self.spaces[pn_space]
3219 .for_path(path_id)
3220 .sent_packets
3221 .get(largest_lost)
3222 .unwrap()
3223 .time_sent;
3224 let path_stats = self.path_stats.entry(path_id).or_default();
3225 path_stats.lost_packets += lost_packets.len() as u64;
3226 path_stats.lost_bytes += size_of_lost_packets;
3227 trace!(
3228 %path_id,
3229 count = lost_packets.len(),
3230 lost_bytes = size_of_lost_packets,
3231 "packets lost",
3232 );
3233
3234 for &packet in &lost_packets {
3235 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
3236 continue;
3237 };
3238 self.qlog
3239 .emit_packet_lost(packet, &info, loss_delay, pn_space.kind(), now);
3240 self.paths
3241 .get_mut(&path_id)
3242 .unwrap()
3243 .remove_in_flight(&info);
3244
3245 for frame in info.stream_frames {
3246 self.streams.retransmit(frame);
3247 }
3248 self.spaces[pn_space].pending |= info.retransmits;
3249 self.path_data_mut(path_id)
3250 .mtud
3251 .on_non_probe_lost(packet, info.size);
3252
3253 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
3254 packet,
3255 LostPacket {
3256 time_sent: info.time_sent,
3257 },
3258 );
3259 }
3260
3261 let path = self.path_data_mut(path_id);
3262 if path.mtud.black_hole_detected(now) {
3263 path.congestion.on_mtu_update(path.mtud.current_mtu());
3264 if let Some(max_datagram_size) = self.datagrams().max_size()
3265 && self.datagrams.drop_oversized(max_datagram_size)
3266 && self.datagrams.send_blocked
3267 {
3268 self.datagrams.send_blocked = false;
3269 self.events.push_back(Event::DatagramsUnblocked);
3270 }
3271 self.path_stats
3272 .entry(path_id)
3273 .or_default()
3274 .black_holes_detected += 1;
3275 }
3276
3277 let lost_ack_eliciting =
3279 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
3280
3281 if lost_ack_eliciting {
3282 self.path_stats
3283 .entry(path_id)
3284 .or_default()
3285 .congestion_events += 1;
3286 self.path_data_mut(path_id).congestion.on_congestion_event(
3287 now,
3288 largest_lost_sent,
3289 in_persistent_congestion,
3290 false,
3291 size_of_lost_packets,
3292 );
3293 }
3294 }
3295
3296 if let Some(packet) = lost_mtu_probe {
3298 let info = self.spaces[SpaceId::Data]
3299 .for_path(path_id)
3300 .take(packet)
3301 .unwrap(); self.paths
3304 .get_mut(&path_id)
3305 .unwrap()
3306 .remove_in_flight(&info);
3307 self.path_data_mut(path_id).mtud.on_probe_lost();
3308 self.path_stats
3309 .entry(path_id)
3310 .or_default()
3311 .lost_plpmtud_probes += 1;
3312 }
3313 }
3314
3315 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
3321 SpaceId::iter()
3322 .filter_map(|id| {
3323 self.spaces[id]
3324 .number_spaces
3325 .get(&path_id)
3326 .and_then(|pns| pns.loss_time)
3327 .map(|time| (time, id))
3328 })
3329 .min_by_key(|&(time, _)| time)
3330 }
3331
3332 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3334 let path = self.path(path_id)?;
3335 let pto_count = path.pto_count;
3336 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
3337 let mut duration = path.rtt.pto_base() * backoff;
3338
3339 if path_id == PathId::ZERO
3340 && path.in_flight.ack_eliciting == 0
3341 && !self.peer_completed_address_validation(PathId::ZERO)
3342 {
3343 let space = match self.highest_space {
3349 SpaceKind::Handshake => SpaceId::Handshake,
3350 _ => SpaceId::Initial,
3351 };
3352
3353 return Some((now + duration, space));
3354 }
3355
3356 let mut result = None;
3357 for space in SpaceId::iter() {
3358 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3359 continue;
3360 };
3361
3362 if !pns.has_in_flight() {
3363 continue;
3364 }
3365 if space == SpaceId::Data {
3366 if self.is_handshaking() {
3368 return result;
3369 }
3370 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3372 }
3373 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3374 continue;
3375 };
3376 let pto = last_ack_eliciting + duration;
3377 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3378 if path.anti_amplification_blocked(1) {
3379 continue;
3381 }
3382 if path.in_flight.ack_eliciting == 0 {
3383 continue;
3385 }
3386 result = Some((pto, space));
3387 }
3388 }
3389 result
3390 }
3391
3392 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3393 if self.side.is_server() || self.state.is_closed() {
3395 return true;
3396 }
3397 self.spaces[SpaceId::Handshake]
3400 .path_space(PathId::ZERO)
3401 .and_then(|pns| pns.largest_acked_packet)
3402 .is_some()
3403 || self.spaces[SpaceId::Data]
3404 .path_space(path)
3405 .and_then(|pns| pns.largest_acked_packet)
3406 .is_some()
3407 || (self.crypto_state.has_keys(EncryptionLevel::OneRtt)
3408 && !self.crypto_state.has_keys(EncryptionLevel::Handshake))
3409 }
3410
3411 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3419 if self.state.is_closed() {
3420 return;
3424 }
3425
3426 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3427 self.timers.set(
3429 Timer::PerPath(path_id, PathTimer::LossDetection),
3430 loss_time,
3431 self.qlog.with_time(now),
3432 );
3433 return;
3434 }
3435
3436 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3439 self.timers.set(
3440 Timer::PerPath(path_id, PathTimer::LossDetection),
3441 timeout,
3442 self.qlog.with_time(now),
3443 );
3444 } else {
3445 self.timers.stop(
3446 Timer::PerPath(path_id, PathTimer::LossDetection),
3447 self.qlog.with_time(now),
3448 );
3449 }
3450 }
3451
3452 fn max_pto_for_space(&self, space: SpaceKind) -> Duration {
3456 self.paths
3457 .keys()
3458 .map(|path_id| self.pto(space, *path_id))
3459 .max()
3460 .expect("there should be at least one path")
3461 }
3462
3463 fn pto(&self, space: SpaceKind, path_id: PathId) -> Duration {
3468 let max_ack_delay = match space {
3469 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3470 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3471 };
3472 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3473 }
3474
3475 fn on_packet_authenticated(
3476 &mut self,
3477 now: Instant,
3478 space_id: SpaceKind,
3479 path_id: PathId,
3480 ecn: Option<EcnCodepoint>,
3481 packet: Option<u64>,
3482 spin: bool,
3483 is_1rtt: bool,
3484 ) {
3485 self.total_authed_packets += 1;
3486 self.reset_keep_alive(path_id, now);
3487 self.reset_idle_timeout(now, space_id, path_id);
3488 self.permit_idle_reset = true;
3489 self.receiving_ecn |= ecn.is_some();
3490 if let Some(x) = ecn {
3491 let space = &mut self.spaces[space_id];
3492 space.for_path(path_id).ecn_counters += x;
3493
3494 if x.is_ce() {
3495 space
3496 .for_path(path_id)
3497 .pending_acks
3498 .set_immediate_ack_required();
3499 }
3500 }
3501
3502 let Some(packet) = packet else {
3503 return;
3504 };
3505 match &self.side {
3506 ConnectionSide::Client { .. } => {
3507 if space_id == SpaceKind::Handshake
3511 && let Some(hs) = self.state.as_handshake_mut()
3512 {
3513 hs.allow_server_migration = false;
3514 }
3515 }
3516 ConnectionSide::Server { .. } => {
3517 if self.crypto_state.has_keys(EncryptionLevel::Initial)
3518 && space_id == SpaceKind::Handshake
3519 {
3520 self.discard_space(now, SpaceKind::Initial);
3522 }
3523 if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) && is_1rtt {
3524 self.set_key_discard_timer(now, space_id)
3526 }
3527 }
3528 }
3529 let space = self.spaces[space_id].for_path(path_id);
3530 space.pending_acks.insert_one(packet, now);
3531 if packet >= space.rx_packet.unwrap_or_default() {
3532 space.rx_packet = Some(packet);
3533 self.spin = self.side.is_client() ^ spin;
3535 }
3536 }
3537
3538 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceKind, path_id: PathId) {
3543 if let Some(timeout) = self.idle_timeout {
3545 if self.state.is_closed() {
3546 self.timers
3547 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3548 } else {
3549 let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space));
3550 self.timers.set(
3551 Timer::Conn(ConnTimer::Idle),
3552 now + dt,
3553 self.qlog.with_time(now),
3554 );
3555 }
3556 }
3557
3558 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3560 if self.state.is_closed() {
3561 self.timers.stop(
3562 Timer::PerPath(path_id, PathTimer::PathIdle),
3563 self.qlog.with_time(now),
3564 );
3565 } else {
3566 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3567 self.timers.set(
3568 Timer::PerPath(path_id, PathTimer::PathIdle),
3569 now + dt,
3570 self.qlog.with_time(now),
3571 );
3572 }
3573 }
3574 }
3575
3576 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3578 if !self.state.is_established() {
3579 return;
3580 }
3581
3582 if let Some(interval) = self.config.keep_alive_interval {
3583 self.timers.set(
3584 Timer::Conn(ConnTimer::KeepAlive),
3585 now + interval,
3586 self.qlog.with_time(now),
3587 );
3588 }
3589
3590 if let Some(interval) = self.path_data(path_id).keep_alive {
3591 self.timers.set(
3592 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3593 now + interval,
3594 self.qlog.with_time(now),
3595 );
3596 }
3597 }
3598
3599 fn reset_cid_retirement(&mut self, now: Instant) {
3601 if let Some((_path, t)) = self.next_cid_retirement() {
3602 self.timers.set(
3603 Timer::Conn(ConnTimer::PushNewCid),
3604 t,
3605 self.qlog.with_time(now),
3606 );
3607 }
3608 }
3609
3610 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3612 self.local_cid_state
3613 .iter()
3614 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3615 .min_by_key(|(_path_id, timeout)| *timeout)
3616 }
3617
3618 pub(crate) fn handle_first_packet(
3623 &mut self,
3624 now: Instant,
3625 network_path: FourTuple,
3626 ecn: Option<EcnCodepoint>,
3627 packet_number: u64,
3628 packet: InitialPacket,
3629 remaining: Option<BytesMut>,
3630 ) -> Result<(), ConnectionError> {
3631 let span = trace_span!("first recv");
3632 let _guard = span.enter();
3633 debug_assert!(self.side.is_server());
3634 let len = packet.header_data.len() + packet.payload.len();
3635 let path_id = PathId::ZERO;
3636 self.path_data_mut(path_id).total_recvd = len as u64;
3637
3638 if let Some(hs) = self.state.as_handshake_mut() {
3639 hs.expected_token = packet.header.token.clone();
3640 } else {
3641 unreachable!("first packet must be delivered in Handshake state");
3642 }
3643
3644 self.on_packet_authenticated(
3646 now,
3647 SpaceKind::Initial,
3648 path_id,
3649 ecn,
3650 Some(packet_number),
3651 false,
3652 false,
3653 );
3654
3655 let packet: Packet = packet.into();
3656
3657 let mut qlog = QlogRecvPacket::new(len);
3658 qlog.header(&packet.header, Some(packet_number), path_id);
3659
3660 self.process_decrypted_packet(
3661 now,
3662 network_path,
3663 path_id,
3664 Some(packet_number),
3665 packet,
3666 &mut qlog,
3667 )?;
3668 self.qlog.emit_packet_received(qlog, now);
3669 if let Some(data) = remaining {
3670 self.handle_coalesced(now, network_path, path_id, ecn, data);
3671 }
3672
3673 self.qlog.emit_recovery_metrics(
3674 path_id,
3675 &mut self.paths.get_mut(&path_id).unwrap().data,
3676 now,
3677 );
3678
3679 Ok(())
3680 }
3681
3682 fn init_0rtt(&mut self, now: Instant) {
3683 let Some((header, packet)) = self.crypto_state.session.early_crypto() else {
3684 return;
3685 };
3686 if self.side.is_client() {
3687 match self.crypto_state.session.transport_parameters() {
3688 Ok(params) => {
3689 let params = params
3690 .expect("crypto layer didn't supply transport parameters with ticket");
3691 let params = TransportParameters {
3693 initial_src_cid: None,
3694 original_dst_cid: None,
3695 preferred_address: None,
3696 retry_src_cid: None,
3697 stateless_reset_token: None,
3698 min_ack_delay: None,
3699 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3700 max_ack_delay: TransportParameters::default().max_ack_delay,
3701 initial_max_path_id: None,
3702 ..params
3703 };
3704 self.set_peer_params(params);
3705 self.qlog.emit_peer_transport_params_restored(self, now);
3706 }
3707 Err(e) => {
3708 error!("session ticket has malformed transport parameters: {}", e);
3709 return;
3710 }
3711 }
3712 }
3713 trace!("0-RTT enabled");
3714 self.crypto_state.enable_zero_rtt(header, packet);
3715 }
3716
3717 fn read_crypto(
3718 &mut self,
3719 space: SpaceId,
3720 crypto: &frame::Crypto,
3721 payload_len: usize,
3722 ) -> Result<(), TransportError> {
3723 let expected = if !self.state.is_handshake() {
3724 SpaceId::Data
3725 } else if self.highest_space == SpaceKind::Initial {
3726 SpaceId::Initial
3727 } else {
3728 SpaceId::Handshake
3731 };
3732 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3736
3737 let end = crypto.offset + crypto.data.len() as u64;
3738 if space < expected
3739 && end
3740 > self.crypto_state.spaces[space.kind()]
3741 .crypto_stream
3742 .bytes_read()
3743 {
3744 warn!(
3745 "received new {:?} CRYPTO data when expecting {:?}",
3746 space, expected
3747 );
3748 return Err(TransportError::PROTOCOL_VIOLATION(
3749 "new data at unexpected encryption level",
3750 ));
3751 }
3752
3753 let crypto_space = &mut self.crypto_state.spaces[space.kind()];
3754 let max = end.saturating_sub(crypto_space.crypto_stream.bytes_read());
3755 if max > self.config.crypto_buffer_size as u64 {
3756 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3757 }
3758
3759 crypto_space
3760 .crypto_stream
3761 .insert(crypto.offset, crypto.data.clone(), payload_len);
3762 while let Some(chunk) = crypto_space.crypto_stream.read(usize::MAX, true) {
3763 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3764 if self.crypto_state.session.read_handshake(&chunk.bytes)? {
3765 self.events.push_back(Event::HandshakeDataReady);
3766 }
3767 }
3768
3769 Ok(())
3770 }
3771
3772 fn write_crypto(&mut self) {
3773 loop {
3774 let space = self.highest_space;
3775 let mut outgoing = Vec::new();
3776 if let Some(crypto) = self.crypto_state.session.write_handshake(&mut outgoing) {
3777 match space {
3778 SpaceKind::Initial => {
3779 self.upgrade_crypto(SpaceKind::Handshake, crypto);
3780 }
3781 SpaceKind::Handshake => {
3782 self.upgrade_crypto(SpaceKind::Data, crypto);
3783 }
3784 SpaceKind::Data => unreachable!("got updated secrets during 1-RTT"),
3785 }
3786 }
3787 if outgoing.is_empty() {
3788 if space == self.highest_space {
3789 break;
3790 } else {
3791 continue;
3793 }
3794 }
3795 let offset = self.crypto_state.spaces[space].crypto_offset;
3796 let outgoing = Bytes::from(outgoing);
3797 if let Some(hs) = self.state.as_handshake_mut()
3798 && space == SpaceKind::Initial
3799 && offset == 0
3800 && self.side.is_client()
3801 {
3802 hs.client_hello = Some(outgoing.clone());
3803 }
3804 self.crypto_state.spaces[space].crypto_offset += outgoing.len() as u64;
3805 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3806 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3807 offset,
3808 data: outgoing,
3809 });
3810 }
3811 }
3812
3813 fn upgrade_crypto(&mut self, space: SpaceKind, crypto: Keys) {
3815 debug_assert!(
3816 !self.crypto_state.has_keys(space.encryption_level()),
3817 "already reached packet space {space:?}"
3818 );
3819 trace!("{:?} keys ready", space);
3820 if space == SpaceKind::Data {
3821 self.crypto_state.next_crypto = Some(
3823 self.crypto_state
3824 .session
3825 .next_1rtt_keys()
3826 .expect("handshake should be complete"),
3827 );
3828 }
3829
3830 self.crypto_state.spaces[space].keys = Some(crypto);
3831 debug_assert!(space > self.highest_space);
3832 self.highest_space = space;
3833 if space == SpaceKind::Data && self.side.is_client() {
3834 self.crypto_state.discard_zero_rtt();
3836 }
3837 }
3838
3839 fn discard_space(&mut self, now: Instant, space: SpaceKind) {
3840 debug_assert!(space != SpaceKind::Data);
3841 trace!("discarding {:?} keys", space);
3842 if space == SpaceKind::Initial {
3843 if let ConnectionSide::Client { token, .. } = &mut self.side {
3845 *token = Bytes::new();
3846 }
3847 }
3848 self.crypto_state.spaces[space].keys = None;
3849 let space = &mut self.spaces[space];
3850 let pns = space.for_path(PathId::ZERO);
3851 pns.time_of_last_ack_eliciting_packet = None;
3852 pns.loss_time = None;
3853 pns.loss_probes = 0;
3854 let sent_packets = mem::take(&mut pns.sent_packets);
3855 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3856 for (_, packet) in sent_packets.into_iter() {
3857 path.data.remove_in_flight(&packet);
3858 }
3859
3860 self.set_loss_detection_timer(now, PathId::ZERO)
3861 }
3862
3863 fn handle_coalesced(
3864 &mut self,
3865 now: Instant,
3866 network_path: FourTuple,
3867 path_id: PathId,
3868 ecn: Option<EcnCodepoint>,
3869 data: BytesMut,
3870 ) {
3871 self.path_data_mut(path_id)
3872 .inc_total_recvd(data.len() as u64);
3873 let mut remaining = Some(data);
3874 let cid_len = self
3875 .local_cid_state
3876 .values()
3877 .map(|cid_state| cid_state.cid_len())
3878 .next()
3879 .expect("one cid_state must exist");
3880 while let Some(data) = remaining {
3881 match PartialDecode::new(
3882 data,
3883 &FixedLengthConnectionIdParser::new(cid_len),
3884 &[self.version],
3885 self.endpoint_config.grease_quic_bit,
3886 ) {
3887 Ok((partial_decode, rest)) => {
3888 remaining = rest;
3889 self.handle_decode(now, network_path, path_id, ecn, partial_decode);
3890 }
3891 Err(e) => {
3892 trace!("malformed header: {}", e);
3893 return;
3894 }
3895 }
3896 }
3897 }
3898
3899 fn handle_decode(
3900 &mut self,
3901 now: Instant,
3902 network_path: FourTuple,
3903 path_id: PathId,
3904 ecn: Option<EcnCodepoint>,
3905 partial_decode: PartialDecode,
3906 ) {
3907 let qlog = QlogRecvPacket::new(partial_decode.len());
3908 if let Some(decoded) = self
3909 .crypto_state
3910 .unprotect_header(partial_decode, self.peer_params.stateless_reset_token)
3911 {
3912 self.handle_packet(
3913 now,
3914 network_path,
3915 path_id,
3916 ecn,
3917 decoded.packet,
3918 decoded.stateless_reset,
3919 qlog,
3920 );
3921 }
3922 }
3923
3924 fn handle_packet(
3925 &mut self,
3926 now: Instant,
3927 network_path: FourTuple,
3928 path_id: PathId,
3929 ecn: Option<EcnCodepoint>,
3930 packet: Option<Packet>,
3931 stateless_reset: bool,
3932 mut qlog: QlogRecvPacket,
3933 ) {
3934 self.stats.udp_rx.ios += 1;
3935 self.path_stats.entry(path_id).or_default().udp_rx.ios += 1;
3936
3937 if let Some(ref packet) = packet {
3938 trace!(
3939 "got {:?} packet ({} bytes) from {} using id {}",
3940 packet.header.space(),
3941 packet.payload.len() + packet.header_data.len(),
3942 network_path,
3943 packet.header.dst_cid(),
3944 );
3945 }
3946
3947 if self.is_handshaking() {
3948 if path_id != PathId::ZERO {
3949 debug!(%network_path, %path_id, "discarding multipath packet during handshake");
3950 return;
3951 }
3952 if network_path != self.path_data_mut(path_id).network_path {
3953 if let Some(hs) = self.state.as_handshake() {
3954 if hs.allow_server_migration {
3955 trace!(%network_path, prev = %self.path_data(path_id).network_path, "server migrated to new remote");
3956 self.path_data_mut(path_id).network_path = network_path;
3957 self.qlog.emit_tuple_assigned(path_id, network_path, now);
3958 } else {
3959 debug!("discarding packet with unexpected remote during handshake");
3960 return;
3961 }
3962 } else {
3963 debug!("discarding packet with unexpected remote during handshake");
3964 return;
3965 }
3966 }
3967 }
3968
3969 let was_closed = self.state.is_closed();
3970 let was_drained = self.state.is_drained();
3971
3972 let decrypted = match packet {
3973 None => Err(None),
3974 Some(mut packet) => self
3975 .decrypt_packet(now, path_id, &mut packet)
3976 .map(move |number| (packet, number)),
3977 };
3978 let result = match decrypted {
3979 _ if stateless_reset => {
3980 debug!("got stateless reset");
3981 Err(ConnectionError::Reset)
3982 }
3983 Err(Some(e)) => {
3984 warn!("illegal packet: {}", e);
3985 Err(e.into())
3986 }
3987 Err(None) => {
3988 debug!("failed to authenticate packet");
3989 self.authentication_failures += 1;
3990 let integrity_limit = self
3991 .crypto_state
3992 .integrity_limit(self.highest_space)
3993 .unwrap();
3994 if self.authentication_failures > integrity_limit {
3995 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3996 } else {
3997 return;
3998 }
3999 }
4000 Ok((packet, number)) => {
4001 qlog.header(&packet.header, number, path_id);
4002 let span = match number {
4003 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
4004 None => trace_span!("recv", space = ?packet.header.space()),
4005 };
4006 let _guard = span.enter();
4007
4008 let dedup = self.spaces[packet.header.space()]
4009 .path_space_mut(path_id)
4010 .map(|pns| &mut pns.dedup);
4011 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
4012 debug!("discarding possible duplicate packet");
4013 self.qlog.emit_packet_received(qlog, now);
4014 return;
4015 } else if self.state.is_handshake() && packet.header.is_short() {
4016 trace!("dropping short packet during handshake");
4018 self.qlog.emit_packet_received(qlog, now);
4019 return;
4020 } else {
4021 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header
4022 && let Some(hs) = self.state.as_handshake()
4023 && self.side.is_server()
4024 && token != &hs.expected_token
4025 {
4026 warn!("discarding Initial with invalid retry token");
4030 self.qlog.emit_packet_received(qlog, now);
4031 return;
4032 }
4033
4034 if !self.state.is_closed() {
4035 let spin = match packet.header {
4036 Header::Short { spin, .. } => spin,
4037 _ => false,
4038 };
4039
4040 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
4041 self.ensure_path(path_id, network_path, now, number);
4043 }
4044 if self.paths.contains_key(&path_id) {
4045 self.on_packet_authenticated(
4046 now,
4047 packet.header.space(),
4048 path_id,
4049 ecn,
4050 number,
4051 spin,
4052 packet.header.is_1rtt(),
4053 );
4054 }
4055 }
4056
4057 let res = self.process_decrypted_packet(
4058 now,
4059 network_path,
4060 path_id,
4061 number,
4062 packet,
4063 &mut qlog,
4064 );
4065
4066 self.qlog.emit_packet_received(qlog, now);
4067 res
4068 }
4069 }
4070 };
4071
4072 if let Err(conn_err) = result {
4074 match conn_err {
4075 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
4076 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
4077 ConnectionError::Reset
4078 | ConnectionError::TransportError(TransportError {
4079 code: TransportErrorCode::AEAD_LIMIT_REACHED,
4080 ..
4081 }) => {
4082 self.state.move_to_drained(Some(conn_err));
4083 }
4084 ConnectionError::TimedOut => {
4085 unreachable!("timeouts aren't generated by packet processing");
4086 }
4087 ConnectionError::TransportError(err) => {
4088 debug!("closing connection due to transport error: {}", err);
4089 self.state.move_to_closed(err);
4090 }
4091 ConnectionError::VersionMismatch => {
4092 self.state.move_to_draining(Some(conn_err));
4093 }
4094 ConnectionError::LocallyClosed => {
4095 unreachable!("LocallyClosed isn't generated by packet processing");
4096 }
4097 ConnectionError::CidsExhausted => {
4098 unreachable!("CidsExhausted isn't generated by packet processing");
4099 }
4100 };
4101 }
4102
4103 if !was_closed && self.state.is_closed() {
4104 self.close_common();
4105 if !self.state.is_drained() {
4106 self.set_close_timer(now);
4107 }
4108 }
4109 if !was_drained && self.state.is_drained() {
4110 self.endpoint_events.push_back(EndpointEventInner::Drained);
4111 self.timers
4114 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
4115 }
4116
4117 if matches!(self.state.as_type(), StateType::Closed) {
4124 if self
4142 .paths
4143 .get(&path_id)
4144 .map(|p| p.data.validated && p.data.network_path == network_path)
4145 .unwrap_or(false)
4146 {
4147 self.connection_close_pending = true;
4148 }
4149 }
4150 }
4151
4152 fn process_decrypted_packet(
4153 &mut self,
4154 now: Instant,
4155 network_path: FourTuple,
4156 path_id: PathId,
4157 number: Option<u64>,
4158 packet: Packet,
4159 qlog: &mut QlogRecvPacket,
4160 ) -> Result<(), ConnectionError> {
4161 if !self.paths.contains_key(&path_id) {
4162 trace!(%path_id, ?number, "discarding packet for unknown path");
4166 return Ok(());
4167 }
4168 let state = match self.state.as_type() {
4169 StateType::Established => {
4170 match packet.header.space() {
4171 SpaceKind::Data => self.process_payload(
4172 now,
4173 network_path,
4174 path_id,
4175 number.unwrap(),
4176 packet,
4177 qlog,
4178 )?,
4179 _ if packet.header.has_frames() => {
4180 self.process_early_payload(now, path_id, packet, qlog)?
4181 }
4182 _ => {
4183 trace!("discarding unexpected pre-handshake packet");
4184 }
4185 }
4186 return Ok(());
4187 }
4188 StateType::Closed => {
4189 for result in frame::Iter::new(packet.payload.freeze())? {
4190 let frame = match result {
4191 Ok(frame) => frame,
4192 Err(err) => {
4193 debug!("frame decoding error: {err:?}");
4194 continue;
4195 }
4196 };
4197 qlog.frame(&frame);
4198
4199 if let Frame::Padding = frame {
4200 continue;
4201 };
4202
4203 self.stats.frame_rx.record(frame.ty());
4204
4205 if let Frame::Close(_error) = frame {
4206 self.state.move_to_draining(None);
4207 break;
4208 }
4209 }
4210 return Ok(());
4211 }
4212 StateType::Draining | StateType::Drained => return Ok(()),
4213 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
4214 };
4215
4216 match packet.header {
4217 Header::Retry {
4218 src_cid: remote_cid,
4219 ..
4220 } => {
4221 debug_assert_eq!(path_id, PathId::ZERO);
4222 if self.side.is_server() {
4223 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
4224 }
4225
4226 let is_valid_retry = self
4227 .remote_cids
4228 .get(&path_id)
4229 .map(|cids| cids.active())
4230 .map(|orig_dst_cid| {
4231 self.crypto_state.session.is_valid_retry(
4232 orig_dst_cid,
4233 &packet.header_data,
4234 &packet.payload,
4235 )
4236 })
4237 .unwrap_or_default();
4238 if self.total_authed_packets > 1
4239 || packet.payload.len() <= 16 || !is_valid_retry
4241 {
4242 trace!("discarding invalid Retry");
4243 return Ok(());
4251 }
4252
4253 trace!("retrying with CID {}", remote_cid);
4254 let client_hello = state.client_hello.take().unwrap();
4255 self.retry_src_cid = Some(remote_cid);
4256 self.remote_cids
4257 .get_mut(&path_id)
4258 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
4259 .update_initial_cid(remote_cid);
4260 self.remote_handshake_cid = remote_cid;
4261
4262 let space = &mut self.spaces[SpaceId::Initial];
4263 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
4264 self.on_packet_acked(now, PathId::ZERO, info);
4265 };
4266
4267 self.discard_space(now, SpaceKind::Initial); let crypto_space = &mut self.crypto_state.spaces[SpaceKind::Initial];
4270 crypto_space.keys = Some(
4271 self.crypto_state
4272 .session
4273 .initial_keys(remote_cid, self.side.side()),
4274 );
4275 crypto_space.crypto_offset = client_hello.len() as u64;
4276
4277 let next_pn = self.spaces[SpaceId::Initial]
4278 .for_path(path_id)
4279 .next_packet_number;
4280 self.spaces[SpaceId::Initial] = {
4281 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
4282 space.for_path(path_id).next_packet_number = next_pn;
4283 space.pending.crypto.push_back(frame::Crypto {
4284 offset: 0,
4285 data: client_hello,
4286 });
4287 space
4288 };
4289
4290 let zero_rtt = mem::take(
4292 &mut self.spaces[SpaceId::Data]
4293 .for_path(PathId::ZERO)
4294 .sent_packets,
4295 );
4296 for (_, info) in zero_rtt.into_iter() {
4297 self.paths
4298 .get_mut(&PathId::ZERO)
4299 .unwrap()
4300 .remove_in_flight(&info);
4301 self.spaces[SpaceId::Data].pending |= info.retransmits;
4302 }
4303 self.streams.retransmit_all_for_0rtt();
4304
4305 let token_len = packet.payload.len() - 16;
4306 let ConnectionSide::Client { ref mut token, .. } = self.side else {
4307 unreachable!("we already short-circuited if we're server");
4308 };
4309 *token = packet.payload.freeze().split_to(token_len);
4310
4311 self.state = State::handshake(state::Handshake {
4312 expected_token: Bytes::new(),
4313 remote_cid_set: false,
4314 client_hello: None,
4315 allow_server_migration: true,
4316 });
4317 Ok(())
4318 }
4319 Header::Long {
4320 ty: LongType::Handshake,
4321 src_cid: remote_cid,
4322 dst_cid: local_cid,
4323 ..
4324 } => {
4325 debug_assert_eq!(path_id, PathId::ZERO);
4326 if remote_cid != self.remote_handshake_cid {
4327 debug!(
4328 "discarding packet with mismatched remote CID: {} != {}",
4329 self.remote_handshake_cid, remote_cid
4330 );
4331 return Ok(());
4332 }
4333 self.on_path_validated(path_id);
4334
4335 self.process_early_payload(now, path_id, packet, qlog)?;
4336 if self.state.is_closed() {
4337 return Ok(());
4338 }
4339
4340 if self.crypto_state.session.is_handshaking() {
4341 trace!("handshake ongoing");
4342 return Ok(());
4343 }
4344
4345 if self.side.is_client() {
4346 let params = self
4348 .crypto_state
4349 .session
4350 .transport_parameters()?
4351 .ok_or_else(|| {
4352 TransportError::new(
4353 TransportErrorCode::crypto(0x6d),
4354 "transport parameters missing".to_owned(),
4355 )
4356 })?;
4357
4358 if self.has_0rtt() {
4359 if !self.crypto_state.session.early_data_accepted().unwrap() {
4360 debug_assert!(self.side.is_client());
4361 debug!("0-RTT rejected");
4362 self.crypto_state.accepted_0rtt = false;
4363 self.streams.zero_rtt_rejected();
4364
4365 self.spaces[SpaceId::Data].pending = Retransmits::default();
4367
4368 let sent_packets = mem::take(
4370 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4371 );
4372 for (_, packet) in sent_packets.into_iter() {
4373 self.paths
4374 .get_mut(&path_id)
4375 .unwrap()
4376 .remove_in_flight(&packet);
4377 }
4378 } else {
4379 self.crypto_state.accepted_0rtt = true;
4380 params.validate_resumption_from(&self.peer_params)?;
4381 }
4382 }
4383 if let Some(token) = params.stateless_reset_token {
4384 let remote = self.path_data(path_id).network_path.remote;
4385 debug_assert!(!self.state.is_drained()); self.endpoint_events
4387 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4388 }
4389 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4390 self.issue_first_cids(now);
4391 } else {
4392 self.spaces[SpaceId::Data].pending.handshake_done = true;
4394 self.discard_space(now, SpaceKind::Handshake);
4395 self.events.push_back(Event::HandshakeConfirmed);
4396 trace!("handshake confirmed");
4397 }
4398
4399 self.events.push_back(Event::Connected);
4400 self.state.move_to_established();
4401 trace!("established");
4402
4403 self.issue_first_path_cids(now);
4406 Ok(())
4407 }
4408 Header::Initial(InitialHeader {
4409 src_cid: remote_cid,
4410 dst_cid: local_cid,
4411 ..
4412 }) => {
4413 debug_assert_eq!(path_id, PathId::ZERO);
4414 if !state.remote_cid_set {
4415 trace!("switching remote CID to {}", remote_cid);
4416 let mut state = state.clone();
4417 self.remote_cids
4418 .get_mut(&path_id)
4419 .expect("PathId::ZERO not yet abandoned")
4420 .update_initial_cid(remote_cid);
4421 self.remote_handshake_cid = remote_cid;
4422 self.original_remote_cid = remote_cid;
4423 state.remote_cid_set = true;
4424 self.state.move_to_handshake(state);
4425 } else if remote_cid != self.remote_handshake_cid {
4426 debug!(
4427 "discarding packet with mismatched remote CID: {} != {}",
4428 self.remote_handshake_cid, remote_cid
4429 );
4430 return Ok(());
4431 }
4432
4433 let starting_space = self.highest_space;
4434 self.process_early_payload(now, path_id, packet, qlog)?;
4435
4436 if self.side.is_server()
4437 && starting_space == SpaceKind::Initial
4438 && self.highest_space != SpaceKind::Initial
4439 {
4440 let params = self
4441 .crypto_state
4442 .session
4443 .transport_parameters()?
4444 .ok_or_else(|| {
4445 TransportError::new(
4446 TransportErrorCode::crypto(0x6d),
4447 "transport parameters missing".to_owned(),
4448 )
4449 })?;
4450 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4451 self.issue_first_cids(now);
4452 self.init_0rtt(now);
4453 }
4454 Ok(())
4455 }
4456 Header::Long {
4457 ty: LongType::ZeroRtt,
4458 ..
4459 } => {
4460 self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4461 Ok(())
4462 }
4463 Header::VersionNegotiate { .. } => {
4464 if self.total_authed_packets > 1 {
4465 return Ok(());
4466 }
4467 let supported = packet
4468 .payload
4469 .chunks(4)
4470 .any(|x| match <[u8; 4]>::try_from(x) {
4471 Ok(version) => self.version == u32::from_be_bytes(version),
4472 Err(_) => false,
4473 });
4474 if supported {
4475 return Ok(());
4476 }
4477 debug!("remote doesn't support our version");
4478 Err(ConnectionError::VersionMismatch)
4479 }
4480 Header::Short { .. } => unreachable!(
4481 "short packets received during handshake are discarded in handle_packet"
4482 ),
4483 }
4484 }
4485
4486 fn process_early_payload(
4488 &mut self,
4489 now: Instant,
4490 path_id: PathId,
4491 packet: Packet,
4492 #[allow(unused)] qlog: &mut QlogRecvPacket,
4493 ) -> Result<(), TransportError> {
4494 debug_assert_ne!(packet.header.space(), SpaceKind::Data);
4495 debug_assert_eq!(path_id, PathId::ZERO);
4496 let payload_len = packet.payload.len();
4497 let mut ack_eliciting = false;
4498 for result in frame::Iter::new(packet.payload.freeze())? {
4499 let frame = result?;
4500 qlog.frame(&frame);
4501 let span = match frame {
4502 Frame::Padding => continue,
4503 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4504 };
4505
4506 self.stats.frame_rx.record(frame.ty());
4507
4508 let _guard = span.as_ref().map(|x| x.enter());
4509 ack_eliciting |= frame.is_ack_eliciting();
4510
4511 if frame.is_1rtt() && packet.header.space() != SpaceKind::Data {
4513 return Err(TransportError::PROTOCOL_VIOLATION(
4514 "illegal frame type in handshake",
4515 ));
4516 }
4517
4518 match frame {
4519 Frame::Padding | Frame::Ping => {}
4520 Frame::Crypto(frame) => {
4521 self.read_crypto(packet.header.space().into(), &frame, payload_len)?;
4522 }
4523 Frame::Ack(ack) => {
4524 self.on_ack_received(now, packet.header.space().into(), ack)?;
4525 }
4526 Frame::PathAck(ack) => {
4527 span.as_ref()
4528 .map(|span| span.record("path", tracing::field::display(&ack.path_id)));
4529 self.on_path_ack_received(now, packet.header.space().into(), ack)?;
4530 }
4531 Frame::Close(reason) => {
4532 self.state.move_to_draining(Some(reason.into()));
4533 return Ok(());
4534 }
4535 _ => {
4536 let mut err =
4537 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4538 err.frame = frame::MaybeFrame::Known(frame.ty());
4539 return Err(err);
4540 }
4541 }
4542 }
4543
4544 if ack_eliciting {
4545 self.spaces[packet.header.space()]
4547 .for_path(path_id)
4548 .pending_acks
4549 .set_immediate_ack_required();
4550 }
4551
4552 self.write_crypto();
4553 Ok(())
4554 }
4555
4556 fn process_payload(
4558 &mut self,
4559 now: Instant,
4560 network_path: FourTuple,
4561 path_id: PathId,
4562 number: u64,
4563 packet: Packet,
4564 #[allow(unused)] qlog: &mut QlogRecvPacket,
4565 ) -> Result<(), TransportError> {
4566 let is_multipath_negotiated = self.is_multipath_negotiated();
4567 let payload = packet.payload.freeze();
4568 let mut is_probing_packet = true;
4569 let mut close = None;
4570 let payload_len = payload.len();
4571 let mut ack_eliciting = false;
4572 let mut migration_observed_addr = None;
4575 for result in frame::Iter::new(payload)? {
4576 let frame = result?;
4577 qlog.frame(&frame);
4578 let span = match frame {
4579 Frame::Padding => continue,
4580 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4581 };
4582
4583 self.stats.frame_rx.record(frame.ty());
4584 match &frame {
4587 Frame::Crypto(f) => {
4588 trace!(offset = f.offset, len = f.data.len(), "got frame CRYPTO");
4589 }
4590 Frame::Stream(f) => {
4591 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got frame STREAM");
4592 }
4593 Frame::Datagram(f) => {
4594 trace!(len = f.data.len(), "got frame DATAGRAM");
4595 }
4596 f => {
4597 trace!("got frame {f}");
4598 }
4599 }
4600
4601 let _guard = span.enter();
4602 if packet.header.is_0rtt() {
4603 match frame {
4604 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4605 return Err(TransportError::PROTOCOL_VIOLATION(
4606 "illegal frame type in 0-RTT",
4607 ));
4608 }
4609 _ => {
4610 if frame.is_1rtt() {
4611 return Err(TransportError::PROTOCOL_VIOLATION(
4612 "illegal frame type in 0-RTT",
4613 ));
4614 }
4615 }
4616 }
4617 }
4618 ack_eliciting |= frame.is_ack_eliciting();
4619
4620 match frame {
4622 Frame::Padding
4623 | Frame::PathChallenge(_)
4624 | Frame::PathResponse(_)
4625 | Frame::NewConnectionId(_)
4626 | Frame::ObservedAddr(_) => {}
4627 _ => {
4628 is_probing_packet = false;
4629 }
4630 }
4631
4632 match frame {
4633 Frame::Crypto(frame) => {
4634 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4635 }
4636 Frame::Stream(frame) => {
4637 if self.streams.received(frame, payload_len)?.should_transmit() {
4638 self.spaces[SpaceId::Data].pending.max_data = true;
4639 }
4640 }
4641 Frame::Ack(ack) => {
4642 self.on_ack_received(now, SpaceId::Data, ack)?;
4643 }
4644 Frame::PathAck(ack) => {
4645 span.record("path", tracing::field::display(&ack.path_id));
4646 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4647 }
4648 Frame::Padding | Frame::Ping => {}
4649 Frame::Close(reason) => {
4650 close = Some(reason);
4651 }
4652 Frame::PathChallenge(challenge) => {
4653 let path = &mut self
4654 .path_mut(path_id)
4655 .expect("payload is processed only after the path becomes known");
4656 path.path_responses.push(number, challenge.0, network_path);
4657 if network_path == path.network_path {
4660 match self.peer_supports_ack_frequency() {
4670 true => self.immediate_ack(path_id),
4671 false => {
4672 self.ping_path(path_id).ok();
4673 }
4674 }
4675 }
4676 }
4677 Frame::PathResponse(response) => {
4678 let path = self
4679 .paths
4680 .get_mut(&path_id)
4681 .expect("payload is processed only after the path becomes known");
4682
4683 use PathTimer::*;
4684 use paths::OnPathResponseReceived::*;
4685 match path
4686 .data
4687 .on_path_response_received(now, response.0, network_path)
4688 {
4689 OnPath { was_open } => {
4690 let qlog = self.qlog.with_time(now);
4691
4692 self.timers
4693 .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4694 self.timers
4695 .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4696
4697 let next_challenge = path
4698 .data
4699 .earliest_on_path_expiring_challenge()
4700 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4701 self.timers.set_or_stop(
4702 Timer::PerPath(path_id, PathChallengeLost),
4703 next_challenge,
4704 qlog,
4705 );
4706
4707 if !was_open {
4708 if is_multipath_negotiated {
4709 self.events
4710 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4711 }
4712 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4713 {
4714 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4715 id: path_id,
4716 addr: observed.socket_addr(),
4717 }));
4718 }
4719 }
4720 if let Some((_, ref mut prev)) = path.prev {
4721 prev.reset_on_path_challenges();
4722 }
4723 }
4724 OffPath => {
4725 debug!(%response, "Valid response to off-path PATH_CHALLENGE");
4726 }
4727 Ignored {
4728 sent_on,
4729 current_path,
4730 } => {
4731 debug!(%sent_on, %current_path, %response, "ignoring valid PATH_RESPONSE")
4732 }
4733 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4734 }
4735 }
4736 Frame::MaxData(frame::MaxData(bytes)) => {
4737 self.streams.received_max_data(bytes);
4738 }
4739 Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
4740 self.streams.received_max_stream_data(id, offset)?;
4741 }
4742 Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
4743 self.streams.received_max_streams(dir, count)?;
4744 }
4745 Frame::ResetStream(frame) => {
4746 if self.streams.received_reset(frame)?.should_transmit() {
4747 self.spaces[SpaceId::Data].pending.max_data = true;
4748 }
4749 }
4750 Frame::DataBlocked(DataBlocked(offset)) => {
4751 debug!(offset, "peer claims to be blocked at connection level");
4752 }
4753 Frame::StreamDataBlocked(StreamDataBlocked { id, offset }) => {
4754 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4755 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4756 return Err(TransportError::STREAM_STATE_ERROR(
4757 "STREAM_DATA_BLOCKED on send-only stream",
4758 ));
4759 }
4760 debug!(
4761 stream = %id,
4762 offset, "peer claims to be blocked at stream level"
4763 );
4764 }
4765 Frame::StreamsBlocked(StreamsBlocked { dir, limit }) => {
4766 if limit > MAX_STREAM_COUNT {
4767 return Err(TransportError::FRAME_ENCODING_ERROR(
4768 "unrepresentable stream limit",
4769 ));
4770 }
4771 debug!(
4772 "peer claims to be blocked opening more than {} {} streams",
4773 limit, dir
4774 );
4775 }
4776 Frame::StopSending(frame::StopSending { id, error_code }) => {
4777 if id.initiator() != self.side.side() {
4778 if id.dir() == Dir::Uni {
4779 debug!("got STOP_SENDING on recv-only {}", id);
4780 return Err(TransportError::STREAM_STATE_ERROR(
4781 "STOP_SENDING on recv-only stream",
4782 ));
4783 }
4784 } else if self.streams.is_local_unopened(id) {
4785 return Err(TransportError::STREAM_STATE_ERROR(
4786 "STOP_SENDING on unopened stream",
4787 ));
4788 }
4789 self.streams.received_stop_sending(id, error_code);
4790 }
4791 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4792 if let Some(ref path_id) = path_id {
4793 span.record("path", tracing::field::display(&path_id));
4794 }
4795 let path_id = path_id.unwrap_or_default();
4796 match self.local_cid_state.get_mut(&path_id) {
4797 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4798 Some(cid_state) => {
4799 let allow_more_cids = cid_state
4800 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4801
4802 let has_path = !self.abandoned_paths.contains(&path_id);
4806 let allow_more_cids = allow_more_cids && has_path;
4807
4808 debug_assert!(!self.state.is_drained()); self.endpoint_events
4810 .push_back(EndpointEventInner::RetireConnectionId(
4811 now,
4812 path_id,
4813 sequence,
4814 allow_more_cids,
4815 ));
4816 }
4817 }
4818 }
4819 Frame::NewConnectionId(frame) => {
4820 let path_id = if let Some(path_id) = frame.path_id {
4821 if !self.is_multipath_negotiated() {
4822 return Err(TransportError::PROTOCOL_VIOLATION(
4823 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4824 ));
4825 }
4826 if path_id > self.local_max_path_id {
4827 return Err(TransportError::PROTOCOL_VIOLATION(
4828 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4829 ));
4830 }
4831 path_id
4832 } else {
4833 PathId::ZERO
4834 };
4835
4836 if let Some(ref path_id) = frame.path_id {
4837 span.record("path", tracing::field::display(&path_id));
4838 }
4839
4840 if self.abandoned_paths.contains(&path_id) {
4841 trace!("ignoring issued CID for abandoned path");
4842 continue;
4843 }
4844 let remote_cids = self
4845 .remote_cids
4846 .entry(path_id)
4847 .or_insert_with(|| CidQueue::new(frame.id));
4848 if remote_cids.active().is_empty() {
4849 return Err(TransportError::PROTOCOL_VIOLATION(
4850 "NEW_CONNECTION_ID when CIDs aren't in use",
4851 ));
4852 }
4853 if frame.retire_prior_to > frame.sequence {
4854 return Err(TransportError::PROTOCOL_VIOLATION(
4855 "NEW_CONNECTION_ID retiring unissued CIDs",
4856 ));
4857 }
4858
4859 use crate::cid_queue::InsertError;
4860 match remote_cids.insert(frame) {
4861 Ok(None) if self.path(path_id).is_none() => {
4862 self.continue_nat_traversal_round(now);
4865 }
4866 Ok(None) => {}
4867 Ok(Some((retired, reset_token))) => {
4868 let pending_retired =
4869 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4870 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4873 if (pending_retired.len() as u64)
4876 .saturating_add(retired.end.saturating_sub(retired.start))
4877 > MAX_PENDING_RETIRED_CIDS
4878 {
4879 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4880 "queued too many retired CIDs",
4881 ));
4882 }
4883 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4884 self.set_reset_token(path_id, network_path.remote, reset_token);
4885 }
4886 Err(InsertError::ExceedsLimit) => {
4887 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4888 }
4889 Err(InsertError::Retired) => {
4890 trace!("discarding already-retired");
4891 self.spaces[SpaceId::Data]
4895 .pending
4896 .retire_cids
4897 .push((path_id, frame.sequence));
4898 continue;
4899 }
4900 };
4901
4902 if self.side.is_server()
4903 && path_id == PathId::ZERO
4904 && self
4905 .remote_cids
4906 .get(&PathId::ZERO)
4907 .map(|cids| cids.active_seq() == 0)
4908 .unwrap_or_default()
4909 {
4910 self.update_remote_cid(PathId::ZERO);
4913 }
4914 }
4915 Frame::NewToken(NewToken { token }) => {
4916 let ConnectionSide::Client {
4917 token_store,
4918 server_name,
4919 ..
4920 } = &self.side
4921 else {
4922 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4923 };
4924 if token.is_empty() {
4925 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4926 }
4927 trace!("got new token");
4928 token_store.insert(server_name, token);
4929 }
4930 Frame::Datagram(datagram) => {
4931 if self
4932 .datagrams
4933 .received(datagram, &self.config.datagram_receive_buffer_size)?
4934 {
4935 self.events.push_back(Event::DatagramReceived);
4936 }
4937 }
4938 Frame::AckFrequency(ack_frequency) => {
4939 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4942 continue;
4945 }
4946
4947 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4949 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4950
4951 if let Some(timeout) = space
4954 .pending_acks
4955 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4956 {
4957 self.timers.set(
4958 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4959 timeout,
4960 self.qlog.with_time(now),
4961 );
4962 }
4963 }
4964 }
4965 Frame::ImmediateAck => {
4966 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4968 pns.pending_acks.set_immediate_ack_required();
4969 }
4970 }
4971 Frame::HandshakeDone => {
4972 if self.side.is_server() {
4973 return Err(TransportError::PROTOCOL_VIOLATION(
4974 "client sent HANDSHAKE_DONE",
4975 ));
4976 }
4977 if self.crypto_state.has_keys(EncryptionLevel::Handshake) {
4978 self.discard_space(now, SpaceKind::Handshake);
4979 }
4980 self.events.push_back(Event::HandshakeConfirmed);
4981 trace!("handshake confirmed");
4982 }
4983 Frame::ObservedAddr(observed) => {
4984 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4986 if !self
4987 .peer_params
4988 .address_discovery_role
4989 .should_report(&self.config.address_discovery_role)
4990 {
4991 return Err(TransportError::PROTOCOL_VIOLATION(
4992 "received OBSERVED_ADDRESS frame when not negotiated",
4993 ));
4994 }
4995 if packet.header.space() != SpaceKind::Data {
4997 return Err(TransportError::PROTOCOL_VIOLATION(
4998 "OBSERVED_ADDRESS frame outside data space",
4999 ));
5000 }
5001
5002 let path = self.path_data_mut(path_id);
5003 if network_path == path.network_path {
5004 if let Some(updated) = path.update_observed_addr_report(observed)
5005 && path.open_status == paths::OpenStatus::Informed
5006 {
5007 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5008 id: path_id,
5009 addr: updated,
5010 }));
5011 }
5013 } else {
5014 migration_observed_addr = Some(observed)
5016 }
5017 }
5018 Frame::PathAbandon(frame::PathAbandon {
5019 path_id,
5020 error_code,
5021 }) => {
5022 span.record("path", tracing::field::display(&path_id));
5023 match self.close_path_inner(
5024 now,
5025 path_id,
5026 PathAbandonReason::RemoteAbandoned {
5027 error_code: error_code.into(),
5028 },
5029 ) {
5030 Ok(()) => {
5031 trace!("peer abandoned path");
5032 }
5033 Err(ClosePathError::LastOpenPath) => {
5034 trace!("peer abandoned last path, closing connection");
5035 return Err(TransportError::NO_VIABLE_PATH(
5036 "last path abandoned by peer",
5037 ));
5038 }
5039 Err(ClosePathError::ClosedPath) => {
5040 trace!("peer abandoned already closed path");
5041 }
5042 Err(ClosePathError::MultipathNotNegotiated) => {
5043 return Err(TransportError::PROTOCOL_VIOLATION(
5044 "received PATH_ABANDON frame when multipath was not negotiated",
5045 ));
5046 }
5047 };
5048
5049 if let Some(path) = self.paths.get_mut(&path_id)
5051 && !mem::replace(&mut path.data.draining, true)
5052 {
5053 let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5054 let pto = path.data.rtt.pto_base() + ack_delay;
5055 self.timers.set(
5056 Timer::PerPath(path_id, PathTimer::DiscardPath),
5057 now + 3 * pto,
5058 self.qlog.with_time(now),
5059 );
5060
5061 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
5062 }
5063 }
5064 Frame::PathStatusAvailable(info) => {
5065 span.record("path", tracing::field::display(&info.path_id));
5066 if self.is_multipath_negotiated() {
5067 self.on_path_status(
5068 info.path_id,
5069 PathStatus::Available,
5070 info.status_seq_no,
5071 );
5072 } else {
5073 return Err(TransportError::PROTOCOL_VIOLATION(
5074 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
5075 ));
5076 }
5077 }
5078 Frame::PathStatusBackup(info) => {
5079 span.record("path", tracing::field::display(&info.path_id));
5080 if self.is_multipath_negotiated() {
5081 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
5082 } else {
5083 return Err(TransportError::PROTOCOL_VIOLATION(
5084 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
5085 ));
5086 }
5087 }
5088 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
5089 span.record("path", tracing::field::display(&path_id));
5090 if !self.is_multipath_negotiated() {
5091 return Err(TransportError::PROTOCOL_VIOLATION(
5092 "received MAX_PATH_ID frame when multipath was not negotiated",
5093 ));
5094 }
5095 if path_id > self.remote_max_path_id {
5097 self.remote_max_path_id = path_id;
5098 self.issue_first_path_cids(now);
5099 while let Some(true) = self.continue_nat_traversal_round(now) {}
5100 }
5101 }
5102 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
5103 if self.is_multipath_negotiated() {
5107 if max_path_id > self.local_max_path_id {
5108 return Err(TransportError::PROTOCOL_VIOLATION(
5109 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
5110 ));
5111 }
5112 debug!("received PATHS_BLOCKED({:?})", max_path_id);
5113 } else {
5115 return Err(TransportError::PROTOCOL_VIOLATION(
5116 "received PATHS_BLOCKED frame when not multipath was not negotiated",
5117 ));
5118 }
5119 }
5120 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
5121 if self.is_multipath_negotiated() {
5129 if path_id > self.local_max_path_id {
5130 return Err(TransportError::PROTOCOL_VIOLATION(
5131 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
5132 ));
5133 }
5134 if next_seq.0
5135 > self
5136 .local_cid_state
5137 .get(&path_id)
5138 .map(|cid_state| cid_state.active_seq().1 + 1)
5139 .unwrap_or_default()
5140 {
5141 return Err(TransportError::PROTOCOL_VIOLATION(
5142 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
5143 ));
5144 }
5145 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
5146 } else {
5147 return Err(TransportError::PROTOCOL_VIOLATION(
5148 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
5149 ));
5150 }
5151 }
5152 Frame::AddAddress(addr) => {
5153 let client_state = match self.n0_nat_traversal.client_side_mut() {
5154 Ok(state) => state,
5155 Err(err) => {
5156 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5157 "Nat traversal(ADD_ADDRESS): {err}"
5158 )));
5159 }
5160 };
5161
5162 if !client_state.check_remote_address(&addr) {
5163 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
5165 }
5166
5167 match client_state.add_remote_address(addr) {
5168 Ok(maybe_added) => {
5169 if let Some(added) = maybe_added {
5170 self.events.push_back(Event::NatTraversal(
5171 n0_nat_traversal::Event::AddressAdded(added),
5172 ));
5173 }
5174 }
5175 Err(e) => {
5176 warn!(%e, "failed to add remote address")
5177 }
5178 }
5179 }
5180 Frame::RemoveAddress(addr) => {
5181 let client_state = match self.n0_nat_traversal.client_side_mut() {
5182 Ok(state) => state,
5183 Err(err) => {
5184 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5185 "Nat traversal(REMOVE_ADDRESS): {err}"
5186 )));
5187 }
5188 };
5189 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
5190 self.events.push_back(Event::NatTraversal(
5191 n0_nat_traversal::Event::AddressRemoved(removed_addr),
5192 ));
5193 }
5194 }
5195 Frame::ReachOut(reach_out) => {
5196 let ipv6 = self.is_ipv6();
5197 let server_state = match self.n0_nat_traversal.server_side_mut() {
5198 Ok(state) => state,
5199 Err(err) => {
5200 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5201 "Nat traversal(REACH_OUT): {err}"
5202 )));
5203 }
5204 };
5205
5206 if let Err(err) = server_state.handle_reach_out(reach_out, ipv6) {
5207 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5208 "Nat traversal(REACH_OUT): {err}"
5209 )));
5210 }
5211 }
5212 }
5213 }
5214
5215 let space = self.spaces[SpaceId::Data].for_path(path_id);
5216 if space
5217 .pending_acks
5218 .packet_received(now, number, ack_eliciting, &space.dedup)
5219 {
5220 if self.abandoned_paths.contains(&path_id) {
5221 space.pending_acks.set_immediate_ack_required();
5224 } else {
5225 self.timers.set(
5226 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
5227 now + self.ack_frequency.max_ack_delay,
5228 self.qlog.with_time(now),
5229 );
5230 }
5231 }
5232
5233 let pending = &mut self.spaces[SpaceId::Data].pending;
5238 self.streams.queue_max_stream_id(pending);
5239
5240 if let Some(reason) = close {
5241 self.state.move_to_draining(Some(reason.into()));
5242 self.connection_close_pending = true;
5243 }
5244
5245 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
5246 && !is_probing_packet
5247 && network_path != self.path_data(path_id).network_path
5248 {
5249 let ConnectionSide::Server { ref server_config } = self.side else {
5250 panic!("packets from unknown remote should be dropped by clients");
5251 };
5252 debug_assert!(
5253 server_config.migration,
5254 "migration-initiating packets should have been dropped immediately"
5255 );
5256 self.migrate(path_id, now, network_path, migration_observed_addr);
5257 self.update_remote_cid(path_id);
5259 self.spin = false;
5260 }
5261
5262 Ok(())
5263 }
5264
5265 fn migrate(
5266 &mut self,
5267 path_id: PathId,
5268 now: Instant,
5269 network_path: FourTuple,
5270 observed_addr: Option<ObservedAddr>,
5271 ) {
5272 trace!(%network_path, %path_id, "migration initiated");
5273 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
5274 let prev_pto = self.pto(SpaceKind::Data, path_id);
5281 let path = self.paths.get_mut(&path_id).expect("known path");
5282 let mut new_path_data = if network_path.remote.is_ipv4()
5283 && network_path.remote.ip() == path.data.network_path.remote.ip()
5284 {
5285 PathData::from_previous(network_path, &path.data, self.path_generation_counter, now)
5286 } else {
5287 let peer_max_udp_payload_size =
5288 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
5289 .unwrap_or(u16::MAX);
5290 PathData::new(
5291 network_path,
5292 self.allow_mtud,
5293 Some(peer_max_udp_payload_size),
5294 self.path_generation_counter,
5295 now,
5296 &self.config,
5297 )
5298 };
5299 new_path_data.last_observed_addr_report = path.data.last_observed_addr_report.clone();
5300 if let Some(report) = observed_addr
5301 && let Some(updated) = new_path_data.update_observed_addr_report(report)
5302 {
5303 tracing::info!("adding observed addr event from migration");
5304 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5305 id: path_id,
5306 addr: updated,
5307 }));
5308 }
5309 new_path_data.pending_on_path_challenge = true;
5310
5311 let mut prev_path_data = mem::replace(&mut path.data, new_path_data);
5312
5313 if !prev_path_data.validated
5322 && let Some(cid) = self.remote_cids.get(&path_id).map(CidQueue::active)
5323 {
5324 prev_path_data.pending_on_path_challenge = true;
5325 path.prev = Some((cid, prev_path_data));
5328 }
5329
5330 self.qlog.emit_tuple_assigned(path_id, network_path, now);
5332
5333 self.timers.set(
5334 Timer::PerPath(path_id, PathTimer::PathValidation),
5335 now + 3 * cmp::max(self.pto(SpaceKind::Data, path_id), prev_pto),
5336 self.qlog.with_time(now),
5337 );
5338 }
5339
5340 pub fn handle_network_change(&mut self, hint: Option<&dyn NetworkChangeHint>, now: Instant) {
5357 debug!("network changed");
5358 if self.state.is_drained() {
5359 return;
5360 }
5361 if self.highest_space < SpaceKind::Data {
5362 for path in self.paths.values_mut() {
5363 path.data.network_path.local_ip = None;
5365 }
5366
5367 self.update_remote_cid(PathId::ZERO);
5368 self.ping();
5369
5370 return;
5371 }
5372
5373 let mut non_recoverable_paths = Vec::default();
5376 let mut recoverable_paths = Vec::default();
5377 let mut open_paths = 0;
5378
5379 let is_multipath_negotiated = self.is_multipath_negotiated();
5380 let is_client = self.side().is_client();
5381 let immediate_ack_allowed = self.peer_supports_ack_frequency();
5382
5383 for (path_id, path) in self.paths.iter_mut() {
5384 if self.abandoned_paths.contains(path_id) {
5385 continue;
5386 }
5387 open_paths += 1;
5388
5389 path.data.network_path.local_ip = None;
5392
5393 let network_path = path.data.network_path;
5394 let remote = network_path.remote;
5395
5396 let attempt_to_recover = if is_multipath_negotiated {
5400 if is_client {
5401 hint.map(|h| h.is_path_recoverable(*path_id, network_path))
5402 .unwrap_or(false)
5403 } else {
5404 true
5408 }
5409 } else {
5410 true
5412 };
5413
5414 if attempt_to_recover {
5415 recoverable_paths.push((*path_id, remote));
5416 } else {
5417 non_recoverable_paths.push((*path_id, remote, path.data.local_status()))
5418 }
5419 }
5420
5421 let open_first = open_paths == non_recoverable_paths.len();
5430
5431 for (path_id, remote, status) in non_recoverable_paths.into_iter() {
5432 let network_path = FourTuple {
5433 remote,
5434 local_ip: None, };
5436
5437 if open_first && let Err(e) = self.open_path(network_path, status, now) {
5438 debug!(%e,"Failed to open new path for network change");
5439 recoverable_paths.push((path_id, remote));
5441 continue;
5442 }
5443
5444 if let Err(e) =
5445 self.close_path_inner(now, path_id, PathAbandonReason::UnusableAfterNetworkChange)
5446 {
5447 debug!(%e,"Failed to close unrecoverable path after network change");
5448 recoverable_paths.push((path_id, remote));
5449 continue;
5450 }
5451
5452 if !open_first && let Err(e) = self.open_path(network_path, status, now) {
5453 debug!(%e,"Failed to open new path for network change");
5457 }
5458 }
5459
5460 for (path_id, remote) in recoverable_paths.into_iter() {
5463 let space = &mut self.spaces[SpaceId::Data];
5464
5465 if let Some(path_space) = space.number_spaces.get_mut(&path_id) {
5467 path_space.ping_pending = true;
5468
5469 if immediate_ack_allowed {
5470 path_space.immediate_ack_pending = true;
5471 }
5472 }
5473
5474 let Some((reset_token, retired)) =
5475 self.remote_cids.get_mut(&path_id).and_then(CidQueue::next)
5476 else {
5477 continue;
5478 };
5479
5480 space
5482 .pending
5483 .retire_cids
5484 .extend(retired.map(|seq| (path_id, seq)));
5485
5486 debug_assert!(!self.state.is_drained()); self.endpoint_events
5488 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5489 }
5490 }
5491
5492 fn update_remote_cid(&mut self, path_id: PathId) {
5494 let Some((reset_token, retired)) = self
5495 .remote_cids
5496 .get_mut(&path_id)
5497 .and_then(|cids| cids.next())
5498 else {
5499 return;
5500 };
5501
5502 self.spaces[SpaceId::Data]
5504 .pending
5505 .retire_cids
5506 .extend(retired.map(|seq| (path_id, seq)));
5507 let remote = self.path_data(path_id).network_path.remote;
5508 self.set_reset_token(path_id, remote, reset_token);
5509 }
5510
5511 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5520 debug_assert!(!self.state.is_drained()); self.endpoint_events
5522 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5523
5524 if path_id == PathId::ZERO {
5530 self.peer_params.stateless_reset_token = Some(reset_token);
5531 }
5532 }
5533
5534 fn issue_first_cids(&mut self, now: Instant) {
5536 if self
5537 .local_cid_state
5538 .get(&PathId::ZERO)
5539 .expect("PathId::ZERO exists when the connection is created")
5540 .cid_len()
5541 == 0
5542 {
5543 return;
5544 }
5545
5546 let mut n = self.peer_params.issue_cids_limit() - 1;
5548 if let ConnectionSide::Server { server_config } = &self.side
5549 && server_config.has_preferred_address()
5550 {
5551 n -= 1;
5553 }
5554 debug_assert!(!self.state.is_drained()); self.endpoint_events
5556 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5557 }
5558
5559 fn issue_first_path_cids(&mut self, now: Instant) {
5563 if let Some(max_path_id) = self.max_path_id() {
5564 let mut path_id = self.max_path_id_with_cids.next();
5565 while path_id <= max_path_id {
5566 self.endpoint_events
5567 .push_back(EndpointEventInner::NeedIdentifiers(
5568 path_id,
5569 now,
5570 self.peer_params.issue_cids_limit(),
5571 ));
5572 path_id = path_id.next();
5573 }
5574 self.max_path_id_with_cids = max_path_id;
5575 }
5576 }
5577
5578 fn populate_packet<'a, 'b>(
5586 &mut self,
5587 now: Instant,
5588 space_id: SpaceId,
5589 path_id: PathId,
5590 scheduling_info: &PathSchedulingInfo,
5591 builder: &mut PacketBuilder<'a, 'b>,
5592 ) {
5593 let is_multipath_negotiated = self.is_multipath_negotiated();
5594 let space_has_keys = self.crypto_state.has_keys(space_id.encryption_level());
5595 let is_0rtt = space_id == SpaceId::Data && !space_has_keys;
5596 let stats = &mut self.stats.frame_tx;
5597 let space = &mut self.spaces[space_id];
5598 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5599 space
5600 .for_path(path_id)
5601 .pending_acks
5602 .maybe_ack_non_eliciting();
5603
5604 if !is_0rtt
5606 && scheduling_info.may_send_data
5607 && mem::replace(&mut space.pending.handshake_done, false)
5608 {
5609 builder.write_frame(frame::HandshakeDone, stats);
5610 }
5611
5612 if let Some((round, addresses)) = space.pending.reach_out.as_mut()
5614 && scheduling_info.may_send_data
5615 {
5616 while let Some(local_addr) = addresses.iter().next().copied() {
5617 let local_addr = addresses.take(&local_addr).expect("found from iter");
5618 let reach_out = frame::ReachOut::new(*round, local_addr);
5619 if builder.frame_space_remaining() > reach_out.size() {
5620 builder.write_frame(reach_out, stats);
5621 } else {
5622 addresses.insert(local_addr);
5623 break;
5624 }
5625 }
5626 if addresses.is_empty() {
5627 space.pending.reach_out = None;
5628 }
5629 }
5630
5631 if scheduling_info.may_send_data
5633 && space_id == SpaceId::Data
5634 && self
5635 .config
5636 .address_discovery_role
5637 .should_report(&self.peer_params.address_discovery_role)
5638 && (!path.observed_addr_sent || space.pending.observed_addr)
5639 {
5640 let frame =
5641 frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
5642 if builder.frame_space_remaining() > frame.size() {
5643 builder.write_frame(frame, stats);
5644
5645 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5646 path.observed_addr_sent = true;
5647
5648 space.pending.observed_addr = false;
5649 }
5650 }
5651
5652 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5654 builder.write_frame(frame::Ping, stats);
5655 }
5656
5657 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5659 debug_assert_eq!(
5660 space_id,
5661 SpaceId::Data,
5662 "immediate acks must be sent in the data space"
5663 );
5664 builder.write_frame(frame::ImmediateAck, stats);
5665 }
5666
5667 if scheduling_info.may_send_data {
5669 for path_id in space
5670 .number_spaces
5671 .iter_mut()
5672 .filter(|(_, pns)| pns.pending_acks.can_send())
5673 .map(|(&path_id, _)| path_id)
5674 .collect::<Vec<_>>()
5675 {
5676 Self::populate_acks(
5677 now,
5678 self.receiving_ecn,
5679 path_id,
5680 space_id,
5681 space,
5682 is_multipath_negotiated,
5683 builder,
5684 stats,
5685 space_has_keys,
5686 );
5687 }
5688 }
5689
5690 if scheduling_info.may_send_data && mem::replace(&mut space.pending.ack_frequency, false) {
5692 let sequence_number = self.ack_frequency.next_sequence_number();
5693
5694 let config = self.config.ack_frequency_config.as_ref().unwrap();
5696
5697 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5699 path.rtt.get(),
5700 config,
5701 &self.peer_params,
5702 );
5703
5704 let frame = frame::AckFrequency {
5705 sequence: sequence_number,
5706 ack_eliciting_threshold: config.ack_eliciting_threshold,
5707 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5708 reordering_threshold: config.reordering_threshold,
5709 };
5710 builder.write_frame(frame, stats);
5711
5712 self.ack_frequency
5713 .ack_frequency_sent(path_id, builder.packet_number, max_ack_delay);
5714 }
5715
5716 if builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
5718 && space_id == SpaceId::Data
5719 && path.pending_on_path_challenge
5720 && !self.state.is_closed()
5721 {
5723 path.pending_on_path_challenge = false;
5724
5725 let token = self.rng.random();
5726 path.record_path_challenge_sent(now, token, path.network_path);
5727 let challenge = frame::PathChallenge(token);
5729 builder.write_frame(challenge, stats);
5730 builder.require_padding();
5731 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5732 if path.open_status == paths::OpenStatus::Pending {
5733 path.open_status = paths::OpenStatus::Sent;
5734 self.timers.set(
5735 Timer::PerPath(path_id, PathTimer::PathOpen),
5736 now + 3 * pto,
5737 self.qlog.with_time(now),
5738 );
5739 }
5740
5741 self.timers.set(
5742 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5743 now + pto,
5744 self.qlog.with_time(now),
5745 );
5746
5747 if is_multipath_negotiated && !path.validated && path.pending_on_path_challenge {
5748 space.pending.path_status.insert(path_id);
5750 }
5751
5752 if space_id == SpaceId::Data
5755 && self
5756 .config
5757 .address_discovery_role
5758 .should_report(&self.peer_params.address_discovery_role)
5759 {
5760 let frame = frame::ObservedAddr::new(
5761 path.network_path.remote,
5762 self.next_observed_addr_seq_no,
5763 );
5764 if builder.frame_space_remaining() > frame.size() {
5765 builder.write_frame(frame, stats);
5766
5767 self.next_observed_addr_seq_no =
5768 self.next_observed_addr_seq_no.saturating_add(1u8);
5769 path.observed_addr_sent = true;
5770
5771 space.pending.observed_addr = false;
5772 }
5773 }
5774 }
5775
5776 if builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
5778 && space_id == SpaceId::Data
5779 && let Some(token) = path.path_responses.pop_on_path(path.network_path)
5780 {
5781 let response = frame::PathResponse(token);
5782 builder.write_frame(response, stats);
5783 builder.require_padding();
5784
5785 if space_id == SpaceId::Data
5789 && self
5790 .config
5791 .address_discovery_role
5792 .should_report(&self.peer_params.address_discovery_role)
5793 {
5794 let frame = frame::ObservedAddr::new(
5795 path.network_path.remote,
5796 self.next_observed_addr_seq_no,
5797 );
5798 if builder.frame_space_remaining() > frame.size() {
5799 builder.write_frame(frame, stats);
5800
5801 self.next_observed_addr_seq_no =
5802 self.next_observed_addr_seq_no.saturating_add(1u8);
5803 path.observed_addr_sent = true;
5804
5805 space.pending.observed_addr = false;
5806 }
5807 }
5808 }
5809
5810 while !is_0rtt
5812 && scheduling_info.may_send_data
5813 && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
5814 {
5815 let Some(mut frame) = space.pending.crypto.pop_front() else {
5816 break;
5817 };
5818
5819 let max_crypto_data_size = builder.frame_space_remaining()
5824 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5826 - 2; let len = frame
5829 .data
5830 .len()
5831 .min(2usize.pow(14) - 1)
5832 .min(max_crypto_data_size);
5833
5834 let data = frame.data.split_to(len);
5835 let offset = frame.offset;
5836 let truncated = frame::Crypto { offset, data };
5837 builder.write_frame(truncated, stats);
5838
5839 if !frame.data.is_empty() {
5840 frame.offset += len as u64;
5841 space.pending.crypto.push_front(frame);
5842 }
5843 }
5844
5845 while space_id == SpaceId::Data
5848 && scheduling_info.may_send_data
5849 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5850 {
5851 let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
5852 else {
5853 break;
5854 };
5855 let frame = frame::PathAbandon {
5856 path_id: abandoned_path_id,
5857 error_code,
5858 };
5859 builder.write_frame(frame, stats);
5860 }
5861
5862 while space_id == SpaceId::Data
5864 && scheduling_info.may_send_data
5865 && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
5866 {
5867 let Some(path_id) = space.pending.path_status.pop_first() else {
5868 break;
5869 };
5870 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5871 trace!(%path_id, "discarding queued path status for unknown path");
5872 continue;
5873 };
5874
5875 let seq = path.status.seq();
5876 match path.local_status() {
5877 PathStatus::Available => {
5878 let frame = frame::PathStatusAvailable {
5879 path_id,
5880 status_seq_no: seq,
5881 };
5882 builder.write_frame(frame, stats);
5883 }
5884 PathStatus::Backup => {
5885 let frame = frame::PathStatusBackup {
5886 path_id,
5887 status_seq_no: seq,
5888 };
5889 builder.write_frame(frame, stats);
5890 }
5891 }
5892 }
5893
5894 if space_id == SpaceId::Data
5896 && scheduling_info.may_send_data
5897 && space.pending.max_path_id
5898 && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
5899 {
5900 let frame = frame::MaxPathId(self.local_max_path_id);
5901 builder.write_frame(frame, stats);
5902 space.pending.max_path_id = false;
5903 }
5904
5905 if space_id == SpaceId::Data
5907 && scheduling_info.may_send_data
5908 && space.pending.paths_blocked
5909 && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5910 {
5911 let frame = frame::PathsBlocked(self.remote_max_path_id);
5912 builder.write_frame(frame, stats);
5913 space.pending.paths_blocked = false;
5914 }
5915
5916 while space_id == SpaceId::Data
5918 && scheduling_info.may_send_data
5919 && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5920 {
5921 let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
5922 break;
5923 };
5924 let next_seq = match self.remote_cids.get(&path_id) {
5925 Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
5926 None => VarInt(0),
5927 };
5928 let frame = frame::PathCidsBlocked { path_id, next_seq };
5929 builder.write_frame(frame, stats);
5930 }
5931
5932 if space_id == SpaceId::Data && scheduling_info.may_send_data {
5934 self.streams
5935 .write_control_frames(builder, &mut space.pending, stats);
5936 }
5937
5938 let cid_len = self
5940 .local_cid_state
5941 .values()
5942 .map(|cid_state| cid_state.cid_len())
5943 .max()
5944 .expect("some local CID state must exist");
5945 let new_cid_size_bound =
5946 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5947 while scheduling_info.may_send_data && builder.frame_space_remaining() > new_cid_size_bound
5948 {
5949 let Some(issued) = space.pending.new_cids.pop() else {
5950 break;
5951 };
5952 let retire_prior_to = self
5953 .local_cid_state
5954 .get(&issued.path_id)
5955 .map(|cid_state| cid_state.retire_prior_to())
5956 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5957
5958 let cid_path_id = match is_multipath_negotiated {
5959 true => Some(issued.path_id),
5960 false => {
5961 debug_assert_eq!(issued.path_id, PathId::ZERO);
5962 None
5963 }
5964 };
5965 let frame = frame::NewConnectionId {
5966 path_id: cid_path_id,
5967 sequence: issued.sequence,
5968 retire_prior_to,
5969 id: issued.id,
5970 reset_token: issued.reset_token,
5971 };
5972 builder.write_frame(frame, stats);
5973 }
5974
5975 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5977 while scheduling_info.may_send_data && builder.frame_space_remaining() > retire_cid_bound {
5978 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5979 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
5980 Some((path_id, seq)) => (Some(path_id), seq),
5981 None => break,
5982 };
5983 let frame = frame::RetireConnectionId { path_id, sequence };
5984 builder.write_frame(frame, stats);
5985 }
5986
5987 let mut sent_datagrams = false;
5989 while scheduling_info.may_send_data
5990 && builder.frame_space_remaining() > Datagram::SIZE_BOUND
5991 && space_id == SpaceId::Data
5992 {
5993 match self.datagrams.write(builder, stats) {
5994 true => {
5995 sent_datagrams = true;
5996 }
5997 false => break,
5998 }
5999 }
6000 if self.datagrams.send_blocked && sent_datagrams {
6001 self.events.push_back(Event::DatagramsUnblocked);
6002 self.datagrams.send_blocked = false;
6003 }
6004
6005 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6006
6007 if scheduling_info.may_send_data {
6009 while let Some(network_path) = space.pending.new_tokens.pop() {
6010 debug_assert_eq!(space_id, SpaceId::Data);
6011 let ConnectionSide::Server { server_config } = &self.side else {
6012 panic!("NEW_TOKEN frames should not be enqueued by clients");
6013 };
6014
6015 if !network_path.is_probably_same_path(&path.network_path) {
6016 continue;
6021 }
6022
6023 let token = Token::new(
6024 TokenPayload::Validation {
6025 ip: network_path.remote.ip(),
6026 issued: server_config.time_source.now(),
6027 },
6028 &mut self.rng,
6029 );
6030 let new_token = NewToken {
6031 token: token.encode(&*server_config.token_key).into(),
6032 };
6033
6034 if builder.frame_space_remaining() < new_token.size() {
6035 space.pending.new_tokens.push(network_path);
6036 break;
6037 }
6038
6039 builder.write_frame(new_token, stats);
6040 builder.retransmits_mut().new_tokens.push(network_path);
6041 }
6042 }
6043
6044 if scheduling_info.may_send_data && space_id == SpaceId::Data {
6046 self.streams
6047 .write_stream_frames(builder, self.config.send_fairness, stats);
6048 }
6049
6050 while space_id == SpaceId::Data
6052 && scheduling_info.may_send_data
6053 && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
6054 {
6055 if let Some(added_address) = space.pending.add_address.pop_last() {
6056 builder.write_frame(added_address, stats);
6057 } else {
6058 break;
6059 }
6060 }
6061
6062 while space_id == SpaceId::Data
6064 && scheduling_info.may_send_data
6065 && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
6066 {
6067 if let Some(removed_address) = space.pending.remove_address.pop_last() {
6068 builder.write_frame(removed_address, stats);
6069 } else {
6070 break;
6071 }
6072 }
6073 }
6074
6075 fn populate_acks<'a, 'b>(
6077 now: Instant,
6078 receiving_ecn: bool,
6079 path_id: PathId,
6080 space_id: SpaceId,
6081 space: &mut PacketSpace,
6082 is_multipath_negotiated: bool,
6083 builder: &mut PacketBuilder<'a, 'b>,
6084 stats: &mut FrameStats,
6085 space_has_keys: bool,
6086 ) {
6087 debug_assert!(space_has_keys, "tried to send ACK in 0-RTT");
6089
6090 debug_assert!(
6091 is_multipath_negotiated || path_id == PathId::ZERO,
6092 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
6093 );
6094 if is_multipath_negotiated {
6095 debug_assert!(
6096 space_id == SpaceId::Data || path_id == PathId::ZERO,
6097 "path acks must be sent in 1RTT space (have {space_id:?})"
6098 );
6099 }
6100
6101 let pns = space.for_path(path_id);
6102 let ranges = pns.pending_acks.ranges();
6103 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
6104 let ecn = if receiving_ecn {
6105 Some(&pns.ecn_counters)
6106 } else {
6107 None
6108 };
6109
6110 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
6111 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
6113 let delay = delay_micros >> ack_delay_exp.into_inner();
6114
6115 if is_multipath_negotiated && space_id == SpaceId::Data {
6116 if !ranges.is_empty() {
6117 let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
6118 builder.write_frame(frame, stats);
6119 }
6120 } else {
6121 builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
6122 }
6123 }
6124
6125 fn close_common(&mut self) {
6126 trace!("connection closed");
6127 self.timers.reset();
6128 }
6129
6130 fn set_close_timer(&mut self, now: Instant) {
6131 let pto_max = self.max_pto_for_space(self.highest_space);
6134 self.timers.set(
6135 Timer::Conn(ConnTimer::Close),
6136 now + 3 * pto_max,
6137 self.qlog.with_time(now),
6138 );
6139 }
6140
6141 fn handle_peer_params(
6146 &mut self,
6147 params: TransportParameters,
6148 local_cid: ConnectionId,
6149 remote_cid: ConnectionId,
6150 now: Instant,
6151 ) -> Result<(), TransportError> {
6152 if Some(self.original_remote_cid) != params.initial_src_cid
6153 || (self.side.is_client()
6154 && (Some(self.initial_dst_cid) != params.original_dst_cid
6155 || self.retry_src_cid != params.retry_src_cid))
6156 {
6157 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
6158 "CID authentication failure",
6159 ));
6160 }
6161 if params.initial_max_path_id.is_some() && (local_cid.is_empty() || remote_cid.is_empty()) {
6162 return Err(TransportError::PROTOCOL_VIOLATION(
6163 "multipath must not use zero-length CIDs",
6164 ));
6165 }
6166
6167 self.set_peer_params(params);
6168 self.qlog.emit_peer_transport_params_received(self, now);
6169
6170 Ok(())
6171 }
6172
6173 fn set_peer_params(&mut self, params: TransportParameters) {
6174 self.streams.set_params(¶ms);
6175 self.idle_timeout =
6176 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
6177 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
6178
6179 if let Some(ref info) = params.preferred_address {
6180 self.remote_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
6182 path_id: None,
6183 sequence: 1,
6184 id: info.connection_id,
6185 reset_token: info.stateless_reset_token,
6186 retire_prior_to: 0,
6187 })
6188 .expect(
6189 "preferred address CID is the first received, and hence is guaranteed to be legal",
6190 );
6191 let remote = self.path_data(PathId::ZERO).network_path.remote;
6192 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
6193 }
6194 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
6195
6196 let mut multipath_enabled = None;
6197 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
6198 self.config.get_initial_max_path_id(),
6199 params.initial_max_path_id,
6200 ) {
6201 self.local_max_path_id = local_max_path_id;
6203 self.remote_max_path_id = remote_max_path_id;
6204 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
6205 debug!(%initial_max_path_id, "multipath negotiated");
6206 multipath_enabled = Some(initial_max_path_id);
6207 }
6208
6209 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
6210 self.config
6211 .max_remote_nat_traversal_addresses
6212 .zip(params.max_remote_nat_traversal_addresses)
6213 {
6214 if let Some(max_initial_paths) =
6215 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
6216 {
6217 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
6218 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
6219 self.n0_nat_traversal = n0_nat_traversal::State::new(
6220 max_remote_addresses,
6221 max_local_addresses,
6222 self.side(),
6223 );
6224 debug!(
6225 %max_remote_addresses, %max_local_addresses,
6226 "n0's nat traversal negotiated"
6227 );
6228
6229 match self.side() {
6230 Side::Client => {
6231 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
6232 debug!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
6235 } else if max_local_addresses as u64
6236 > params.active_connection_id_limit.into_inner()
6237 {
6238 debug!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
6242 }
6243 }
6244 Side::Server => {
6245 if (max_initial_paths.as_u32() as u64) < crate::LOCAL_CID_COUNT {
6246 debug!(%max_initial_paths, local_cid_limit=%crate::LOCAL_CID_COUNT, "local server configuration might cause nat traversal issues")
6247 }
6248 }
6249 }
6250 } else {
6251 debug!("n0 nat traversal enabled for both endpoints, but multipath is missing")
6252 }
6253 }
6254
6255 self.peer_params = params;
6256 let peer_max_udp_payload_size =
6257 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
6258 self.path_data_mut(PathId::ZERO)
6259 .mtud
6260 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
6261 }
6262
6263 fn decrypt_packet(
6265 &mut self,
6266 now: Instant,
6267 path_id: PathId,
6268 packet: &mut Packet,
6269 ) -> Result<Option<u64>, Option<TransportError>> {
6270 let result = self
6271 .crypto_state
6272 .decrypt_packet_body(packet, path_id, &self.spaces)?;
6273
6274 let Some(result) = result else {
6275 return Ok(None);
6276 };
6277
6278 if result.outgoing_key_update_acked
6279 && let Some(prev) = self.crypto_state.prev_crypto.as_mut()
6280 {
6281 prev.end_packet = Some((result.number, now));
6282 self.set_key_discard_timer(now, packet.header.space());
6283 }
6284
6285 if result.incoming_key_update {
6286 trace!("key update authenticated");
6287 self.crypto_state
6288 .update_keys(Some((result.number, now)), true);
6289 self.set_key_discard_timer(now, packet.header.space());
6290 }
6291
6292 Ok(Some(result.number))
6293 }
6294
6295 fn peer_supports_ack_frequency(&self) -> bool {
6296 self.peer_params.min_ack_delay.is_some()
6297 }
6298
6299 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
6304 debug_assert_eq!(
6305 self.highest_space,
6306 SpaceKind::Data,
6307 "immediate ack must be written in the data space"
6308 );
6309 self.spaces[SpaceId::Data]
6310 .for_path(path_id)
6311 .immediate_ack_pending = true;
6312 }
6313
6314 #[cfg(test)]
6316 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
6317 let ConnectionEventInner::Datagram(DatagramConnectionEvent {
6318 path_id,
6319 first_decode,
6320 remaining,
6321 ..
6322 }) = &event.0
6323 else {
6324 return None;
6325 };
6326
6327 if remaining.is_some() {
6328 panic!("Packets should never be coalesced in tests");
6329 }
6330
6331 let decrypted_header = self
6332 .crypto_state
6333 .unprotect_header(first_decode.clone(), self.peer_params.stateless_reset_token)?;
6334
6335 let mut packet = decrypted_header.packet?;
6336 self.crypto_state
6337 .decrypt_packet_body(&mut packet, *path_id, &self.spaces)
6338 .ok()?;
6339
6340 Some(packet.payload.to_vec())
6341 }
6342
6343 #[cfg(test)]
6346 pub(crate) fn bytes_in_flight(&self) -> u64 {
6347 self.path_data(PathId::ZERO).in_flight.bytes
6349 }
6350
6351 #[cfg(test)]
6353 pub(crate) fn congestion_window(&self) -> u64 {
6354 let path = self.path_data(PathId::ZERO);
6355 path.congestion
6356 .window()
6357 .saturating_sub(path.in_flight.bytes)
6358 }
6359
6360 #[cfg(test)]
6362 pub(crate) fn is_idle(&self) -> bool {
6363 let current_timers = self.timers.values();
6364 current_timers
6365 .into_iter()
6366 .filter(|(timer, _)| {
6367 !matches!(
6368 timer,
6369 Timer::Conn(ConnTimer::KeepAlive)
6370 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6371 | Timer::Conn(ConnTimer::PushNewCid)
6372 | Timer::Conn(ConnTimer::KeyDiscard)
6373 )
6374 })
6375 .min_by_key(|(_, time)| *time)
6376 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6377 }
6378
6379 #[cfg(test)]
6381 pub(crate) fn using_ecn(&self) -> bool {
6382 self.path_data(PathId::ZERO).sending_ecn
6383 }
6384
6385 #[cfg(test)]
6387 pub(crate) fn total_recvd(&self) -> u64 {
6388 self.path_data(PathId::ZERO).total_recvd
6389 }
6390
6391 #[cfg(test)]
6392 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6393 self.local_cid_state
6394 .get(&PathId::ZERO)
6395 .unwrap()
6396 .active_seq()
6397 }
6398
6399 #[cfg(test)]
6400 #[track_caller]
6401 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6402 self.local_cid_state
6403 .get(&PathId(path_id))
6404 .unwrap()
6405 .active_seq()
6406 }
6407
6408 #[cfg(test)]
6411 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6412 let n = self
6413 .local_cid_state
6414 .get_mut(&PathId::ZERO)
6415 .unwrap()
6416 .assign_retire_seq(v);
6417 debug_assert!(!self.state.is_drained()); self.endpoint_events
6419 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6420 }
6421
6422 #[cfg(test)]
6424 pub(crate) fn active_remote_cid_seq(&self) -> u64 {
6425 self.remote_cids.get(&PathId::ZERO).unwrap().active_seq()
6426 }
6427
6428 #[cfg(test)]
6430 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6431 self.path_data(path_id).current_mtu()
6432 }
6433
6434 #[cfg(test)]
6436 pub(crate) fn trigger_path_validation(&mut self) {
6437 for path in self.paths.values_mut() {
6438 path.data.pending_on_path_challenge = true;
6439 }
6440 }
6441
6442 #[cfg(test)]
6444 pub fn simulate_protocol_violation(&mut self, now: Instant) {
6445 if !self.state.is_closed() {
6446 self.state
6447 .move_to_closed(TransportError::PROTOCOL_VIOLATION("simulated violation"));
6448 self.close_common();
6449 if !self.state.is_drained() {
6450 self.set_close_timer(now);
6451 }
6452 self.connection_close_pending = true;
6453 }
6454 }
6455
6456 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6467 let space_specific = self.paths.get(&path_id).is_some_and(|path| {
6468 path.data.pending_on_path_challenge || !path.data.path_responses.is_empty()
6469 });
6470
6471 let other = self.streams.can_send_stream_data()
6473 || self
6474 .datagrams
6475 .outgoing
6476 .front()
6477 .is_some_and(|x| x.size(true) <= max_size);
6478
6479 SendableFrames {
6481 acks: false,
6482 close: false,
6483 space_specific,
6484 other,
6485 }
6486 }
6487
6488 fn kill(&mut self, reason: ConnectionError) {
6490 self.close_common();
6491 self.state.move_to_drained(Some(reason));
6492 self.endpoint_events.push_back(EndpointEventInner::Drained);
6495 }
6496
6497 pub fn current_mtu(&self) -> u16 {
6504 self.paths
6505 .iter()
6506 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6507 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6508 .min()
6509 .expect("There is always at least one available path")
6510 }
6511
6512 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6519 let pn_len = PacketNumber::new(
6520 pn,
6521 self.spaces[SpaceId::Data]
6522 .for_path(path)
6523 .largest_acked_packet
6524 .unwrap_or(0),
6525 )
6526 .len();
6527
6528 1 + self
6530 .remote_cids
6531 .get(&path)
6532 .map(|cids| cids.active().len())
6533 .unwrap_or(20) + pn_len
6535 + self.tag_len_1rtt()
6536 }
6537
6538 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6539 let pn_len = 4;
6540
6541 let cid_len = self
6542 .remote_cids
6543 .values()
6544 .map(|cids| cids.active().len())
6545 .max()
6546 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6550 }
6551
6552 fn tag_len_1rtt(&self) -> usize {
6553 let packet_crypto = self
6555 .crypto_state
6556 .encryption_keys(SpaceKind::Data, self.side.side())
6557 .map(|(_header, packet, _level)| packet);
6558 packet_crypto.map_or(16, |x| x.tag_len())
6562 }
6563
6564 fn on_path_validated(&mut self, path_id: PathId) {
6566 self.path_data_mut(path_id).validated = true;
6567 let ConnectionSide::Server { server_config } = &self.side else {
6568 return;
6569 };
6570 let network_path = self.path_data(path_id).network_path;
6571 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6572 new_tokens.clear();
6573 for _ in 0..server_config.validation_token.sent {
6574 new_tokens.push(network_path);
6575 }
6576 }
6577
6578 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6580 if let Some(path) = self.paths.get_mut(&path_id) {
6581 path.data.status.remote_update(status, status_seq_no);
6582 } else {
6583 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6584 }
6585 self.events.push_back(
6586 PathEvent::RemoteStatus {
6587 id: path_id,
6588 status,
6589 }
6590 .into(),
6591 );
6592 }
6593
6594 fn max_path_id(&self) -> Option<PathId> {
6603 if self.is_multipath_negotiated() {
6604 Some(self.remote_max_path_id.min(self.local_max_path_id))
6605 } else {
6606 None
6607 }
6608 }
6609
6610 fn is_ipv6(&self) -> bool {
6615 self.paths
6616 .values()
6617 .any(|p| p.data.network_path.remote.is_ipv6())
6618 }
6619
6620 pub fn add_nat_traversal_address(
6622 &mut self,
6623 address: SocketAddr,
6624 ) -> Result<(), n0_nat_traversal::Error> {
6625 if let Some(added) = self.n0_nat_traversal.add_local_address(address)? {
6626 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6627 };
6628 Ok(())
6629 }
6630
6631 pub fn remove_nat_traversal_address(
6635 &mut self,
6636 address: SocketAddr,
6637 ) -> Result<(), n0_nat_traversal::Error> {
6638 if let Some(removed) = self.n0_nat_traversal.remove_local_address(address)? {
6639 self.spaces[SpaceId::Data]
6640 .pending
6641 .remove_address
6642 .insert(removed);
6643 }
6644 Ok(())
6645 }
6646
6647 pub fn get_local_nat_traversal_addresses(
6649 &self,
6650 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6651 self.n0_nat_traversal.get_local_nat_traversal_addresses()
6652 }
6653
6654 pub fn get_remote_nat_traversal_addresses(
6656 &self,
6657 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6658 Ok(self
6659 .n0_nat_traversal
6660 .client_side()?
6661 .get_remote_nat_traversal_addresses())
6662 }
6663
6664 fn open_nat_traversal_path(
6668 &mut self,
6669 now: Instant,
6670 ip_port: (IpAddr, u16),
6671 ) -> Result<Option<(PathId, SocketAddr)>, PathError> {
6672 let remote = ip_port.into();
6673 let network_path = FourTuple {
6678 remote,
6679 local_ip: None,
6680 };
6681 match self.open_path_ensure(network_path, PathStatus::Backup, now) {
6682 Ok((path_id, path_was_known)) => {
6683 if path_was_known {
6684 trace!(%path_id, %remote, "nat traversal: path existed for remote");
6685 }
6686 Ok(Some((path_id, remote)))
6687 }
6688 Err(e) => {
6689 debug!(%remote, %e, "nat traversal: failed to probe remote");
6690 Err(e)
6691 }
6692 }
6693 }
6694
6695 pub fn initiate_nat_traversal_round(
6705 &mut self,
6706 now: Instant,
6707 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6708 if self.state.is_closed() {
6709 return Err(n0_nat_traversal::Error::Closed);
6710 }
6711
6712 let ipv6 = self.is_ipv6();
6713 let client_state = self.n0_nat_traversal.client_side_mut()?;
6714 let n0_nat_traversal::NatTraversalRound {
6715 new_round,
6716 reach_out_at,
6717 addresses_to_probe,
6718 prev_round_path_ids,
6719 } = client_state.initiate_nat_traversal_round(ipv6)?;
6720
6721 trace!(%new_round, reach_out=reach_out_at.len(), to_probe=addresses_to_probe.len(),
6722 "initiating nat traversal round");
6723
6724 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6725
6726 for path_id in prev_round_path_ids {
6727 let Some(path) = self.path(path_id) else {
6728 continue;
6729 };
6730 let ip = path.network_path.remote.ip();
6731 let port = path.network_path.remote.port();
6732
6733 if !addresses_to_probe
6737 .iter()
6738 .any(|(_, probe)| *probe == (ip, port))
6739 && !path.validated
6740 && !self.abandoned_paths.contains(&path_id)
6741 {
6742 trace!(%path_id, "closing path from previous round");
6743 let _ =
6744 self.close_path_inner(now, path_id, PathAbandonReason::NatTraversalRoundEnded);
6745 }
6746 }
6747
6748 let mut err = None;
6749
6750 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6751 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6752
6753 for (id, address) in addresses_to_probe {
6754 match self.open_nat_traversal_path(now, address) {
6755 Ok(None) => {}
6756 Ok(Some((path_id, remote))) => {
6757 path_ids.push(path_id);
6758 probed_addresses.push(remote);
6759 }
6760 Err(e) => {
6761 self.n0_nat_traversal
6762 .client_side_mut()
6763 .expect("validated")
6764 .report_in_continuation(id, e);
6765 err.get_or_insert(e);
6766 }
6767 }
6768 }
6769
6770 if let Some(err) = err {
6771 if probed_addresses.is_empty() {
6773 return Err(n0_nat_traversal::Error::Multipath(err));
6774 }
6775 }
6776
6777 self.n0_nat_traversal
6778 .client_side_mut()
6779 .expect("connection side validated")
6780 .set_round_path_ids(path_ids);
6781
6782 Ok(probed_addresses)
6783 }
6784
6785 fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6790 let ipv6 = self.is_ipv6();
6791 let client_state = self.n0_nat_traversal.client_side_mut().ok()?;
6792 let (id, address) = client_state.continue_nat_traversal_round(ipv6)?;
6793 let open_result = self.open_nat_traversal_path(now, address);
6794 let client_state = self.n0_nat_traversal.client_side_mut().expect("validated");
6795 match open_result {
6796 Ok(None) => Some(true),
6797 Ok(Some((path_id, _remote))) => {
6798 client_state.add_round_path_id(path_id);
6799 Some(true)
6800 }
6801 Err(e) => {
6802 client_state.report_in_continuation(id, e);
6803 Some(false)
6804 }
6805 }
6806 }
6807}
6808
6809impl fmt::Debug for Connection {
6810 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
6811 f.debug_struct("Connection")
6812 .field("handshake_cid", &self.handshake_cid)
6813 .finish()
6814 }
6815}
6816
6817pub trait NetworkChangeHint: std::fmt::Debug + 'static {
6819 fn is_path_recoverable(&self, path_id: PathId, network_path: FourTuple) -> bool;
6828}
6829
6830#[derive(Debug)]
6832enum PollPathStatus {
6833 NothingToSend {
6835 congestion_blocked: bool,
6837 },
6838 Send(Transmit),
6840}
6841
6842#[derive(Debug)]
6844enum PollPathSpaceStatus {
6845 NothingToSend {
6847 congestion_blocked: bool,
6849 },
6850 WrotePacket {
6852 last_packet_number: u64,
6854 pad_datagram: PadDatagram,
6868 },
6869 Send {
6876 last_packet_number: u64,
6878 },
6879}
6880
6881#[derive(Debug, Copy, Clone)]
6887struct PathSchedulingInfo {
6888 abandoned: bool,
6894 may_send_data: bool,
6912 may_send_close: bool,
6918}
6919
6920#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6921enum PathBlocked {
6922 No,
6923 AntiAmplification,
6924 Congestion,
6925 Pacing,
6926}
6927
6928enum ConnectionSide {
6930 Client {
6931 token: Bytes,
6933 token_store: Arc<dyn TokenStore>,
6934 server_name: String,
6935 },
6936 Server {
6937 server_config: Arc<ServerConfig>,
6938 },
6939}
6940
6941impl ConnectionSide {
6942 fn remote_may_migrate(&self, state: &State) -> bool {
6943 match self {
6944 Self::Server { server_config } => server_config.migration,
6945 Self::Client { .. } => {
6946 if let Some(hs) = state.as_handshake() {
6947 hs.allow_server_migration
6948 } else {
6949 false
6950 }
6951 }
6952 }
6953 }
6954
6955 fn is_client(&self) -> bool {
6956 self.side().is_client()
6957 }
6958
6959 fn is_server(&self) -> bool {
6960 self.side().is_server()
6961 }
6962
6963 fn side(&self) -> Side {
6964 match *self {
6965 Self::Client { .. } => Side::Client,
6966 Self::Server { .. } => Side::Server,
6967 }
6968 }
6969}
6970
6971impl From<SideArgs> for ConnectionSide {
6972 fn from(side: SideArgs) -> Self {
6973 match side {
6974 SideArgs::Client {
6975 token_store,
6976 server_name,
6977 } => Self::Client {
6978 token: token_store.take(&server_name).unwrap_or_default(),
6979 token_store,
6980 server_name,
6981 },
6982 SideArgs::Server {
6983 server_config,
6984 pref_addr_cid: _,
6985 path_validated: _,
6986 } => Self::Server { server_config },
6987 }
6988 }
6989}
6990
6991pub(crate) enum SideArgs {
6993 Client {
6994 token_store: Arc<dyn TokenStore>,
6995 server_name: String,
6996 },
6997 Server {
6998 server_config: Arc<ServerConfig>,
6999 pref_addr_cid: Option<ConnectionId>,
7000 path_validated: bool,
7001 },
7002}
7003
7004impl SideArgs {
7005 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
7006 match *self {
7007 Self::Client { .. } => None,
7008 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
7009 }
7010 }
7011
7012 pub(crate) fn path_validated(&self) -> bool {
7013 match *self {
7014 Self::Client { .. } => true,
7015 Self::Server { path_validated, .. } => path_validated,
7016 }
7017 }
7018
7019 pub(crate) fn side(&self) -> Side {
7020 match *self {
7021 Self::Client { .. } => Side::Client,
7022 Self::Server { .. } => Side::Server,
7023 }
7024 }
7025}
7026
7027#[derive(Debug, Error, Clone, PartialEq, Eq)]
7029pub enum ConnectionError {
7030 #[error("peer doesn't implement any supported version")]
7032 VersionMismatch,
7033 #[error(transparent)]
7035 TransportError(#[from] TransportError),
7036 #[error("aborted by peer: {0}")]
7038 ConnectionClosed(frame::ConnectionClose),
7039 #[error("closed by peer: {0}")]
7041 ApplicationClosed(frame::ApplicationClose),
7042 #[error("reset by peer")]
7044 Reset,
7045 #[error("timed out")]
7051 TimedOut,
7052 #[error("closed")]
7054 LocallyClosed,
7055 #[error("CIDs exhausted")]
7059 CidsExhausted,
7060}
7061
7062impl From<Close> for ConnectionError {
7063 fn from(x: Close) -> Self {
7064 match x {
7065 Close::Connection(reason) => Self::ConnectionClosed(reason),
7066 Close::Application(reason) => Self::ApplicationClosed(reason),
7067 }
7068 }
7069}
7070
7071impl From<ConnectionError> for io::Error {
7073 fn from(x: ConnectionError) -> Self {
7074 use ConnectionError::*;
7075 let kind = match x {
7076 TimedOut => io::ErrorKind::TimedOut,
7077 Reset => io::ErrorKind::ConnectionReset,
7078 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
7079 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
7080 io::ErrorKind::Other
7081 }
7082 };
7083 Self::new(kind, x)
7084 }
7085}
7086
7087#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
7090pub enum PathError {
7091 #[error("multipath extension not negotiated")]
7093 MultipathNotNegotiated,
7094 #[error("the server side may not open a path")]
7096 ServerSideNotAllowed,
7097 #[error("maximum number of concurrent paths reached")]
7099 MaxPathIdReached,
7100 #[error("remoted CIDs exhausted")]
7102 RemoteCidsExhausted,
7103 #[error("path validation failed")]
7105 ValidationFailed,
7106 #[error("invalid remote address")]
7108 InvalidRemoteAddress(SocketAddr),
7109}
7110
7111#[derive(Debug, Error, Clone, Eq, PartialEq)]
7113pub enum ClosePathError {
7114 #[error("Multipath extension not negotiated")]
7116 MultipathNotNegotiated,
7117 #[error("closed path")]
7119 ClosedPath,
7120 #[error("last open path")]
7122 LastOpenPath,
7123}
7124
7125#[derive(Debug, Error, Clone, Copy)]
7127#[error("Multipath extension not negotiated")]
7128pub struct MultipathNotNegotiated {
7129 _private: (),
7130}
7131
7132#[derive(Debug)]
7134pub enum Event {
7135 HandshakeDataReady,
7137 Connected,
7139 HandshakeConfirmed,
7141 ConnectionLost {
7145 reason: ConnectionError,
7147 },
7148 Stream(StreamEvent),
7150 DatagramReceived,
7152 DatagramsUnblocked,
7154 Path(PathEvent),
7156 NatTraversal(n0_nat_traversal::Event),
7158}
7159
7160impl From<PathEvent> for Event {
7161 fn from(source: PathEvent) -> Self {
7162 Self::Path(source)
7163 }
7164}
7165
7166fn get_max_ack_delay(params: &TransportParameters) -> Duration {
7167 Duration::from_micros(params.max_ack_delay.0 * 1000)
7168}
7169
7170const MAX_BACKOFF_EXPONENT: u32 = 16;
7172
7173const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
7181
7182const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
7188 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
7189
7190#[derive(Default)]
7191struct SentFrames {
7192 retransmits: ThinRetransmits,
7193 largest_acked: FxHashMap<PathId, u64>,
7195 stream_frames: StreamMetaVec,
7196 non_retransmits: bool,
7198 requires_padding: bool,
7200}
7201
7202impl SentFrames {
7203 fn is_ack_only(&self, streams: &StreamsState) -> bool {
7205 !self.largest_acked.is_empty()
7206 && !self.non_retransmits
7207 && self.stream_frames.is_empty()
7208 && self.retransmits.is_empty(streams)
7209 }
7210
7211 fn retransmits_mut(&mut self) -> &mut Retransmits {
7212 self.retransmits.get_or_create()
7213 }
7214
7215 fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
7216 use frame::EncodableFrame::*;
7217 match frame {
7218 PathAck(path_ack_encoder) => {
7219 if let Some(max) = path_ack_encoder.ranges.max() {
7220 self.largest_acked.insert(path_ack_encoder.path_id, max);
7221 }
7222 }
7223 Ack(ack_encoder) => {
7224 if let Some(max) = ack_encoder.ranges.max() {
7225 self.largest_acked.insert(PathId::ZERO, max);
7226 }
7227 }
7228 Close(_) => { }
7229 PathResponse(_) => self.non_retransmits = true,
7230 HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
7231 ReachOut(frame::ReachOut { round, ip, port }) => {
7232 let (recorded_round, reach_outs) = self
7233 .retransmits_mut()
7234 .reach_out
7235 .get_or_insert_with(|| (round, FxHashSet::default()));
7236 if *recorded_round == round {
7238 reach_outs.insert((ip, port));
7240 } else if *recorded_round < round {
7241 *recorded_round = round;
7243 reach_outs.drain();
7244 reach_outs.insert((ip, port));
7245 } else {
7246 }
7248 }
7249
7250 ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
7251 Ping(_) => self.non_retransmits = true,
7252 ImmediateAck(_) => self.non_retransmits = true,
7253 AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
7254 PathChallenge(_) => self.non_retransmits = true,
7255 Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
7256 PathAbandon(path_abandon) => {
7257 self.retransmits_mut()
7258 .path_abandon
7259 .entry(path_abandon.path_id)
7260 .or_insert(path_abandon.error_code);
7261 }
7262 PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
7263 | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
7264 self.retransmits_mut().path_status.insert(path_id);
7265 }
7266 MaxPathId(_) => self.retransmits_mut().max_path_id = true,
7267 PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
7268 PathCidsBlocked(path_cids_blocked) => {
7269 self.retransmits_mut()
7270 .path_cids_blocked
7271 .insert(path_cids_blocked.path_id);
7272 }
7273 ResetStream(reset) => self
7274 .retransmits_mut()
7275 .reset_stream
7276 .push((reset.id, reset.error_code)),
7277 StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
7278 NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
7279 RetireConnectionId(retire_cid) => self
7280 .retransmits_mut()
7281 .retire_cids
7282 .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
7283 Datagram(_) => self.non_retransmits = true,
7284 NewToken(_) => {}
7285 AddAddress(add_address) => {
7286 self.retransmits_mut().add_address.insert(add_address);
7287 }
7288 RemoveAddress(remove_address) => {
7289 self.retransmits_mut().remove_address.insert(remove_address);
7290 }
7291 StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
7292 MaxData(_) => self.retransmits_mut().max_data = true,
7293 MaxStreamData(max) => {
7294 self.retransmits_mut().max_stream_data.insert(max.id);
7295 }
7296 MaxStreams(max_streams) => {
7297 self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
7298 }
7299 }
7300 }
7301}
7302
7303fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
7311 match (x, y) {
7312 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
7313 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
7314 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
7315 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
7316 }
7317}
7318
7319#[cfg(test)]
7320mod tests {
7321 use super::*;
7322
7323 #[test]
7324 fn negotiate_max_idle_timeout_commutative() {
7325 let test_params = [
7326 (None, None, None),
7327 (None, Some(VarInt(0)), None),
7328 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7329 (Some(VarInt(0)), Some(VarInt(0)), None),
7330 (
7331 Some(VarInt(2)),
7332 Some(VarInt(0)),
7333 Some(Duration::from_millis(2)),
7334 ),
7335 (
7336 Some(VarInt(1)),
7337 Some(VarInt(4)),
7338 Some(Duration::from_millis(1)),
7339 ),
7340 ];
7341
7342 for (left, right, result) in test_params {
7343 assert_eq!(negotiate_max_idle_timeout(left, right), result);
7344 assert_eq!(negotiate_max_idle_timeout(right, left), result);
7345 }
7346 }
7347}