1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::{NonZeroU32, NonZeroUsize},
8 sync::Arc,
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13
14use rand::{RngExt, SeedableRng, rngs::StdRng};
15use rustc_hash::{FxHashMap, FxHashSet};
16use thiserror::Error;
17use tracing::{debug, error, trace, trace_span, warn};
18
19use crate::{
20 Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
21 MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
22 TransportError, TransportErrorCode, VarInt,
23 cid_generator::ConnectionIdGenerator,
24 cid_queue::CidQueue,
25 config::{ServerConfig, TransportConfig},
26 congestion::Controller,
27 connection::{
28 qlog::{QlogRecvPacket, QlogSink},
29 spaces::LostPacket,
30 timer::{ConnTimer, PathTimer},
31 },
32 crypto::{self, Keys},
33 frame::{
34 self, Close, DataBlocked, Datagram, FrameStruct, NewToken, ObservedAddr, StreamDataBlocked,
35 StreamsBlocked,
36 },
37 n0_nat_traversal,
38 packet::{
39 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40 PacketNumber, PartialDecode, SpaceId,
41 },
42 range_set::ArrayRangeSet,
43 shared::{
44 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45 EndpointEvent, EndpointEventInner,
46 },
47 token::{ResetToken, Token, TokenPayload},
48 transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54mod assembler;
55pub use assembler::Chunk;
56
57mod cid_state;
58use cid_state::CidState;
59
60mod datagrams;
61use datagrams::DatagramState;
62pub use datagrams::{Datagrams, SendDatagramError};
63
64mod mtud;
65mod pacing;
66
67mod packet_builder;
68use packet_builder::{PacketBuilder, PadDatagram};
69
70mod packet_crypto;
71use packet_crypto::CryptoState;
72pub(crate) use packet_crypto::EncryptionLevel;
73
74mod paths;
75pub use paths::{
76 ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError,
77};
78use paths::{PathData, PathState};
79
80pub(crate) mod qlog;
81pub(crate) mod send_buffer;
82
83mod spaces;
84#[cfg(fuzzing)]
85pub use spaces::Retransmits;
86#[cfg(not(fuzzing))]
87use spaces::Retransmits;
88pub(crate) use spaces::SpaceKind;
89use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
90
91mod stats;
92pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
93
94mod streams;
95#[cfg(fuzzing)]
96pub use streams::StreamsState;
97#[cfg(not(fuzzing))]
98use streams::StreamsState;
99pub use streams::{
100 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
101 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
102};
103
104mod timer;
105use timer::{Timer, TimerTable};
106
107mod transmit_buf;
108use transmit_buf::TransmitBuf;
109
110mod state;
111
112#[cfg(not(fuzzing))]
113use state::State;
114#[cfg(fuzzing)]
115pub use state::State;
116use state::StateType;
117
118pub struct Connection {
158 endpoint_config: Arc<EndpointConfig>,
159 config: Arc<TransportConfig>,
160 rng: StdRng,
161 crypto_state: CryptoState,
163 handshake_cid: ConnectionId,
165 remote_handshake_cid: ConnectionId,
167 paths: BTreeMap<PathId, PathState>,
173 path_generation_counter: u64,
184 allow_mtud: bool,
186 state: State,
187 side: ConnectionSide,
188 peer_params: TransportParameters,
190 original_remote_cid: ConnectionId,
192 initial_dst_cid: ConnectionId,
194 retry_src_cid: Option<ConnectionId>,
197 events: VecDeque<Event>,
199 endpoint_events: VecDeque<EndpointEventInner>,
200 spin_enabled: bool,
202 spin: bool,
204 spaces: [PacketSpace; 3],
206 highest_space: SpaceKind,
208 permit_idle_reset: bool,
210 idle_timeout: Option<Duration>,
212 timers: TimerTable,
213 authentication_failures: u64,
215
216 connection_close_pending: bool,
221
222 ack_frequency: AckFrequencyState,
226
227 receiving_ecn: bool,
232 total_authed_packets: u64,
234 app_limited: bool,
237
238 next_observed_addr_seq_no: VarInt,
243
244 streams: StreamsState,
245 remote_cids: FxHashMap<PathId, CidQueue>,
251 local_cid_state: FxHashMap<PathId, CidState>,
258 datagrams: DatagramState,
260 stats: ConnectionStats,
262 path_stats: FxHashMap<PathId, PathStats>,
264 version: u32,
266
267 max_concurrent_paths: NonZeroU32,
276 local_max_path_id: PathId,
291 remote_max_path_id: PathId,
297 max_path_id_with_cids: PathId,
303 abandoned_paths: FxHashSet<PathId>,
311
312 n0_nat_traversal: n0_nat_traversal::State,
314 qlog: QlogSink,
315}
316
317impl Connection {
318 pub(crate) fn new(
319 endpoint_config: Arc<EndpointConfig>,
320 config: Arc<TransportConfig>,
321 init_cid: ConnectionId,
322 local_cid: ConnectionId,
323 remote_cid: ConnectionId,
324 network_path: FourTuple,
325 crypto: Box<dyn crypto::Session>,
326 cid_gen: &dyn ConnectionIdGenerator,
327 now: Instant,
328 version: u32,
329 allow_mtud: bool,
330 rng_seed: [u8; 32],
331 side_args: SideArgs,
332 qlog: QlogSink,
333 ) -> Self {
334 let pref_addr_cid = side_args.pref_addr_cid();
335 let path_validated = side_args.path_validated();
336 let connection_side = ConnectionSide::from(side_args);
337 let side = connection_side.side();
338 let mut rng = StdRng::from_seed(rng_seed);
339 let initial_space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
340 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
341 #[cfg(test)]
342 let data_space = match config.deterministic_packet_numbers {
343 true => PacketSpace::new_deterministic(now, SpaceId::Data),
344 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
345 };
346 #[cfg(not(test))]
347 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
348 let state = State::handshake(state::Handshake {
349 remote_cid_set: side.is_server(),
350 expected_token: Bytes::new(),
351 client_hello: None,
352 allow_server_migration: side.is_client(),
353 });
354 let local_cid_state = FxHashMap::from_iter([(
355 PathId::ZERO,
356 CidState::new(
357 cid_gen.cid_len(),
358 cid_gen.cid_lifetime(),
359 now,
360 if pref_addr_cid.is_some() { 2 } else { 1 },
361 ),
362 )]);
363
364 let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
365 path.open_status = paths::OpenStatus::Informed;
366 let mut this = Self {
367 endpoint_config,
368 crypto_state: CryptoState::new(crypto, init_cid, side, &mut rng),
369 handshake_cid: local_cid,
370 remote_handshake_cid: remote_cid,
371 local_cid_state,
372 paths: BTreeMap::from_iter([(
373 PathId::ZERO,
374 PathState {
375 data: path,
376 prev: None,
377 },
378 )]),
379 path_generation_counter: 0,
380 allow_mtud,
381 state,
382 side: connection_side,
383 peer_params: TransportParameters::default(),
384 original_remote_cid: remote_cid,
385 initial_dst_cid: init_cid,
386 retry_src_cid: None,
387 events: VecDeque::new(),
388 endpoint_events: VecDeque::new(),
389 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
390 spin: false,
391 spaces: [initial_space, handshake_space, data_space],
392 highest_space: SpaceKind::Initial,
393 permit_idle_reset: true,
394 idle_timeout: match config.max_idle_timeout {
395 None | Some(VarInt(0)) => None,
396 Some(dur) => Some(Duration::from_millis(dur.0)),
397 },
398 timers: TimerTable::default(),
399 authentication_failures: 0,
400 connection_close_pending: false,
401
402 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
403 &TransportParameters::default(),
404 )),
405
406 app_limited: false,
407 receiving_ecn: false,
408 total_authed_packets: 0,
409
410 next_observed_addr_seq_no: 0u32.into(),
411
412 streams: StreamsState::new(
413 side,
414 config.max_concurrent_uni_streams,
415 config.max_concurrent_bidi_streams,
416 config.send_window,
417 config.receive_window,
418 config.stream_receive_window,
419 ),
420 datagrams: DatagramState::default(),
421 config,
422 remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]),
423 rng,
424 stats: ConnectionStats::default(),
425 path_stats: Default::default(),
426 version,
427
428 max_concurrent_paths: NonZeroU32::MIN,
430 local_max_path_id: PathId::ZERO,
431 remote_max_path_id: PathId::ZERO,
432 max_path_id_with_cids: PathId::ZERO,
433 abandoned_paths: Default::default(),
434
435 n0_nat_traversal: Default::default(),
436 qlog,
437 };
438 if path_validated {
439 this.on_path_validated(PathId::ZERO);
440 }
441 if side.is_client() {
442 this.write_crypto();
444 this.init_0rtt(now);
445 }
446 this.qlog
447 .emit_tuple_assigned(PathId::ZERO, network_path, now);
448 this
449 }
450
451 #[must_use]
459 pub fn poll_timeout(&mut self) -> Option<Instant> {
460 self.timers.peek()
461 }
462
463 #[must_use]
469 pub fn poll(&mut self) -> Option<Event> {
470 if let Some(x) = self.events.pop_front() {
471 return Some(x);
472 }
473
474 if let Some(event) = self.streams.poll() {
475 return Some(Event::Stream(event));
476 }
477
478 if let Some(reason) = self.state.take_error() {
479 return Some(Event::ConnectionLost { reason });
480 }
481
482 None
483 }
484
485 #[must_use]
487 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
488 self.endpoint_events.pop_front().map(EndpointEvent)
489 }
490
491 #[must_use]
493 pub fn streams(&mut self) -> Streams<'_> {
494 Streams {
495 state: &mut self.streams,
496 conn_state: &self.state,
497 }
498 }
499
500 #[must_use]
502 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
503 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
504 RecvStream {
505 id,
506 state: &mut self.streams,
507 pending: &mut self.spaces[SpaceId::Data].pending,
508 }
509 }
510
511 #[must_use]
513 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
514 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
515 SendStream {
516 id,
517 state: &mut self.streams,
518 pending: &mut self.spaces[SpaceId::Data].pending,
519 conn_state: &self.state,
520 }
521 }
522
523 pub fn open_path_ensure(
540 &mut self,
541 network_path: FourTuple,
542 initial_status: PathStatus,
543 now: Instant,
544 ) -> Result<(PathId, bool), PathError> {
545 let existing_open_path = self.paths.iter().find(|(id, path)| {
546 network_path.is_probably_same_path(&path.data.network_path)
547 && !self.abandoned_paths.contains(*id)
548 });
549 match existing_open_path {
550 Some((path_id, _state)) => Ok((*path_id, true)),
551 None => Ok((self.open_path(network_path, initial_status, now)?, false)),
552 }
553 }
554
555 pub fn open_path(
560 &mut self,
561 network_path: FourTuple,
562 initial_status: PathStatus,
563 now: Instant,
564 ) -> Result<PathId, PathError> {
565 if !self.is_multipath_negotiated() {
566 return Err(PathError::MultipathNotNegotiated);
567 }
568 if self.side().is_server() {
569 return Err(PathError::ServerSideNotAllowed);
570 }
571
572 let max_abandoned = self.abandoned_paths.iter().max().copied();
573 let max_used = self.paths.keys().last().copied();
574 let path_id = max_abandoned
575 .max(max_used)
576 .unwrap_or(PathId::ZERO)
577 .saturating_add(1u8);
578
579 if Some(path_id) > self.max_path_id() {
580 return Err(PathError::MaxPathIdReached);
581 }
582 if path_id > self.remote_max_path_id {
583 self.spaces[SpaceId::Data].pending.paths_blocked = true;
584 return Err(PathError::MaxPathIdReached);
585 }
586 if self
587 .remote_cids
588 .get(&path_id)
589 .map(CidQueue::active)
590 .is_none()
591 {
592 self.spaces[SpaceId::Data]
593 .pending
594 .path_cids_blocked
595 .insert(path_id);
596 return Err(PathError::RemoteCidsExhausted);
597 }
598
599 let path = self.ensure_path(path_id, network_path, now, None);
600 path.status.local_update(initial_status);
601
602 Ok(path_id)
603 }
604
605 pub fn close_path(
611 &mut self,
612 now: Instant,
613 path_id: PathId,
614 error_code: VarInt,
615 ) -> Result<(), ClosePathError> {
616 self.close_path_inner(
617 now,
618 path_id,
619 PathAbandonReason::ApplicationClosed { error_code },
620 )
621 }
622
623 fn close_path_inner(
628 &mut self,
629 now: Instant,
630 path_id: PathId,
631 reason: PathAbandonReason,
632 ) -> Result<(), ClosePathError> {
633 if self.state.is_drained() {
634 return Ok(());
635 }
636
637 if !self.is_multipath_negotiated() {
638 return Err(ClosePathError::MultipathNotNegotiated);
639 }
640 if self.abandoned_paths.contains(&path_id)
641 || Some(path_id) > self.max_path_id()
642 || !self.paths.contains_key(&path_id)
643 {
644 return Err(ClosePathError::ClosedPath);
645 }
646
647 if reason.is_locally_initiated() {
648 let has_remaining_validated_paths = self.paths.iter().any(|(id, path)| {
649 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
650 });
651 if !has_remaining_validated_paths {
652 return Err(ClosePathError::LastOpenPath);
653 }
654 } else {
655 let has_remaining_paths = self
659 .paths
660 .keys()
661 .any(|id| *id != path_id && !self.abandoned_paths.contains(id));
662 if !has_remaining_paths {
663 return Err(ClosePathError::LastOpenPath);
664 }
665 }
666
667 trace!(%path_id, ?reason, "closing path");
668
669 self.spaces[SpaceId::Data]
671 .pending
672 .path_abandon
673 .insert(path_id, reason.error_code());
674
675 let pending_space = &mut self.spaces[SpaceId::Data].pending;
677 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
678 pending_space.path_cids_blocked.retain(|&id| id != path_id);
679 pending_space.path_status.retain(|&id| id != path_id);
680
681 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
683 for sent_packet in space.sent_packets.values_mut() {
684 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
685 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
686 retransmits.path_cids_blocked.retain(|&id| id != path_id);
687 retransmits.path_status.retain(|&id| id != path_id);
688 }
689 }
690 }
691
692 debug_assert!(!self.state.is_drained()); self.endpoint_events
694 .push_back(EndpointEventInner::RetireResetToken(path_id));
695
696 trace!(%path_id, "abandoning path");
697 self.abandoned_paths.insert(path_id);
698
699 for timer in timer::PathTimer::VALUES {
700 let keep_timer = match timer {
702 PathTimer::PathValidation | PathTimer::PathChallengeLost | PathTimer::PathOpen => {
706 false
707 }
708 PathTimer::PathKeepAlive | PathTimer::PathIdle => false,
711 PathTimer::MaxAckDelay => false,
714 PathTimer::DiscardPath => false,
717 PathTimer::LossDetection => true,
720 PathTimer::Pacing => true,
724 };
725
726 if !keep_timer {
727 let qlog = self.qlog.with_time(now);
728 self.timers.stop(Timer::PerPath(path_id, timer), qlog);
729 }
730 }
731
732 self.events.push_back(Event::Path(PathEvent::Abandoned {
734 id: path_id,
735 reason,
736 }));
737
738 Ok(())
739 }
740
741 #[track_caller]
745 fn path_data(&self, path_id: PathId) -> &PathData {
746 if let Some(data) = self.paths.get(&path_id) {
747 &data.data
748 } else {
749 panic!(
750 "unknown path: {path_id}, currently known paths: {:?}",
751 self.paths.keys().collect::<Vec<_>>()
752 );
753 }
754 }
755
756 fn path(&self, path_id: PathId) -> Option<&PathData> {
758 self.paths.get(&path_id).map(|path_state| &path_state.data)
759 }
760
761 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
763 self.paths
764 .get_mut(&path_id)
765 .map(|path_state| &mut path_state.data)
766 }
767
768 pub fn paths(&self) -> Vec<PathId> {
772 self.paths.keys().copied().collect()
773 }
774
775 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
777 self.path(path_id)
778 .map(PathData::local_status)
779 .ok_or(ClosedPath { _private: () })
780 }
781
782 pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
784 self.path(path_id)
785 .map(|path| path.network_path)
786 .ok_or(ClosedPath { _private: () })
787 }
788
789 pub fn set_path_status(
793 &mut self,
794 path_id: PathId,
795 status: PathStatus,
796 ) -> Result<PathStatus, SetPathStatusError> {
797 if !self.is_multipath_negotiated() {
798 return Err(SetPathStatusError::MultipathNotNegotiated);
799 }
800 let path = self
801 .path_mut(path_id)
802 .ok_or(SetPathStatusError::ClosedPath)?;
803 let prev = match path.status.local_update(status) {
804 Some(prev) => {
805 self.spaces[SpaceId::Data]
806 .pending
807 .path_status
808 .insert(path_id);
809 prev
810 }
811 None => path.local_status(),
812 };
813 Ok(prev)
814 }
815
816 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
821 self.path(path_id).and_then(|path| path.remote_status())
822 }
823
824 pub fn set_path_max_idle_timeout(
830 &mut self,
831 path_id: PathId,
832 timeout: Option<Duration>,
833 ) -> Result<Option<Duration>, ClosedPath> {
834 let path = self
835 .paths
836 .get_mut(&path_id)
837 .ok_or(ClosedPath { _private: () })?;
838 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
839 }
840
841 pub fn set_path_keep_alive_interval(
847 &mut self,
848 path_id: PathId,
849 interval: Option<Duration>,
850 ) -> Result<Option<Duration>, ClosedPath> {
851 let path = self
852 .paths
853 .get_mut(&path_id)
854 .ok_or(ClosedPath { _private: () })?;
855 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
856 }
857
858 #[track_caller]
862 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
863 &mut self.paths.get_mut(&path_id).expect("known path").data
864 }
865
866 fn find_validated_path_on_network_path(
870 &self,
871 network_path: FourTuple,
872 ) -> Option<(&PathId, &PathState)> {
873 self.paths.iter().find(|(path_id, path_state)| {
874 path_state.data.validated
875 && network_path.is_probably_same_path(&path_state.data.network_path)
877 && !self.abandoned_paths.contains(path_id)
878 })
879 }
883
884 fn ensure_path(
888 &mut self,
889 path_id: PathId,
890 network_path: FourTuple,
891 now: Instant,
892 pn: Option<u64>,
893 ) -> &mut PathData {
894 let valid_path = self.find_validated_path_on_network_path(network_path);
895 let validated = valid_path.is_some();
896 let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
897 let vacant_entry = match self.paths.entry(path_id) {
898 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
899 btree_map::Entry::Occupied(occupied_entry) => {
900 return &mut occupied_entry.into_mut().data;
901 }
902 };
903
904 debug!(%validated, %path_id, %network_path, "path added");
905 let peer_max_udp_payload_size =
906 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
907 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
908 let mut data = PathData::new(
909 network_path,
910 self.allow_mtud,
911 Some(peer_max_udp_payload_size),
912 self.path_generation_counter,
913 now,
914 &self.config,
915 );
916
917 data.validated = validated;
918 if let Some(initial_rtt) = initial_rtt {
919 data.rtt.reset_initial_rtt(initial_rtt);
920 }
921
922 data.pending_on_path_challenge = true;
925
926 let path = vacant_entry.insert(PathState { data, prev: None });
927
928 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
929 if let Some(pn) = pn {
930 pn_space.dedup.insert(pn);
931 }
932 self.spaces[SpaceId::Data]
933 .number_spaces
934 .insert(path_id, pn_space);
935 self.qlog.emit_tuple_assigned(path_id, network_path, now);
936
937 if !self.remote_cids.contains_key(&path_id) {
941 debug!(%path_id, "Remote opened path without issuing CIDs");
942 self.spaces[SpaceId::Data]
943 .pending
944 .path_cids_blocked
945 .insert(path_id);
946 }
949
950 &mut path.data
951 }
952
953 #[must_use]
963 pub fn poll_transmit(
964 &mut self,
965 now: Instant,
966 max_datagrams: NonZeroUsize,
967 buf: &mut Vec<u8>,
968 ) -> Option<Transmit> {
969 let max_datagrams = match self.config.enable_segmentation_offload {
970 false => NonZeroUsize::MIN,
971 true => max_datagrams,
972 };
973
974 let connection_close_pending = match self.state.as_type() {
980 StateType::Drained => {
981 self.app_limited = true;
982 return None;
983 }
984 StateType::Draining | StateType::Closed => {
985 if !self.connection_close_pending {
988 self.app_limited = true;
989 return None;
990 }
991 true
992 }
993 _ => false,
994 };
995
996 if let Some(config) = &self.config.ack_frequency_config {
998 let rtt = self
999 .paths
1000 .values()
1001 .map(|p| p.data.rtt.get())
1002 .min()
1003 .expect("one path exists");
1004 self.spaces[SpaceId::Data].pending.ack_frequency = self
1005 .ack_frequency
1006 .should_send_ack_frequency(rtt, config, &self.peer_params)
1007 && self.highest_space == SpaceKind::Data
1008 && self.peer_supports_ack_frequency();
1009 }
1010
1011 let mut congestion_blocked = false;
1014
1015 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1016 while let Some(path_id) = next_path_id {
1017 if !connection_close_pending
1018 && let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id)
1019 {
1020 return Some(transmit);
1021 }
1022
1023 let info = self.scheduling_info(path_id);
1024 match self.poll_transmit_on_path(
1025 now,
1026 buf,
1027 path_id,
1028 max_datagrams,
1029 &info,
1030 connection_close_pending,
1031 ) {
1032 PollPathStatus::Send(transmit) => {
1033 return Some(transmit);
1034 }
1035 PollPathStatus::NothingToSend {
1036 congestion_blocked: cb,
1037 } => {
1038 congestion_blocked |= cb;
1039 debug_assert!(
1042 buf.is_empty(),
1043 "nothing to send on path but buffer not empty"
1044 );
1045 }
1046 }
1047
1048 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1049 }
1050
1051 debug_assert!(
1053 buf.is_empty(),
1054 "there was data in the buffer, but it was not sent"
1055 );
1056
1057 self.app_limited = !congestion_blocked;
1058
1059 if self.state.is_established() {
1060 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1062 while let Some(path_id) = next_path_id {
1063 if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) {
1064 return Some(transmit);
1065 }
1066 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1067 }
1068 }
1069
1070 None
1071 }
1072
1073 fn scheduling_info(&self, path_id: PathId) -> PathSchedulingInfo {
1091 let have_validated_status_available_space = self.paths.iter().any(|(path_id, path)| {
1093 self.remote_cids.contains_key(path_id)
1094 && !self.abandoned_paths.contains(path_id)
1095 && path.data.validated
1096 && path.data.local_status() == PathStatus::Available
1097 });
1098
1099 let have_validated_space = self.paths.iter().any(|(path_id, path)| {
1101 self.remote_cids.contains_key(path_id)
1102 && !self.abandoned_paths.contains(path_id)
1103 && path.data.validated
1104 });
1105
1106 let is_handshaking = self.is_handshaking();
1107 let has_cids = self.remote_cids.contains_key(&path_id);
1108 let is_abandoned = self.abandoned_paths.contains(&path_id);
1109 let path_data = self.path_data(path_id);
1110 let validated = path_data.validated;
1111 let status = path_data.local_status();
1112
1113 let may_send_data = has_cids
1116 && !is_abandoned
1117 && if is_handshaking {
1118 true
1122 } else if !validated {
1123 false
1130 } else {
1131 match status {
1132 PathStatus::Available => {
1133 true
1135 }
1136 PathStatus::Backup => {
1137 !have_validated_status_available_space
1139 }
1140 }
1141 };
1142
1143 let may_send_close = has_cids
1148 && !is_abandoned
1149 && if !validated && have_validated_status_available_space {
1150 false
1152 } else {
1153 true
1155 };
1156
1157 let may_self_abandon = has_cids && validated && !have_validated_space;
1161
1162 PathSchedulingInfo {
1163 is_abandoned,
1164 may_send_data,
1165 may_send_close,
1166 may_self_abandon,
1167 }
1168 }
1169
1170 fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit {
1171 debug_assert!(
1172 !transmit.is_empty(),
1173 "must not be called with an empty transmit buffer"
1174 );
1175
1176 let network_path = self.path_data(path_id).network_path;
1177 trace!(
1178 segment_size = transmit.segment_size(),
1179 last_datagram_len = transmit.len() % transmit.segment_size(),
1180 %network_path,
1181 "sending {} bytes in {} datagrams",
1182 transmit.len(),
1183 transmit.num_datagrams()
1184 );
1185 self.path_data_mut(path_id)
1186 .inc_total_sent(transmit.len() as u64);
1187
1188 self.stats
1189 .udp_tx
1190 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1191 self.path_stats
1192 .entry(path_id)
1193 .or_default()
1194 .udp_tx
1195 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1196
1197 Transmit {
1198 destination: network_path.remote,
1199 size: transmit.len(),
1200 ecn: if self.path_data(path_id).sending_ecn {
1201 Some(EcnCodepoint::Ect0)
1202 } else {
1203 None
1204 },
1205 segment_size: match transmit.num_datagrams() {
1206 1 => None,
1207 _ => Some(transmit.segment_size()),
1208 },
1209 src_ip: network_path.local_ip,
1210 }
1211 }
1212
1213 fn poll_transmit_off_path(
1215 &mut self,
1216 now: Instant,
1217 buf: &mut Vec<u8>,
1218 path_id: PathId,
1219 ) -> Option<Transmit> {
1220 if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) {
1221 return Some(challenge);
1222 }
1223 if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
1224 return Some(response);
1225 }
1226 if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
1227 return Some(challenge);
1228 }
1229 None
1230 }
1231
1232 #[must_use]
1239 fn poll_transmit_on_path(
1240 &mut self,
1241 now: Instant,
1242 buf: &mut Vec<u8>,
1243 path_id: PathId,
1244 max_datagrams: NonZeroUsize,
1245 scheduling_info: &PathSchedulingInfo,
1246 connection_close_pending: bool,
1247 ) -> PollPathStatus {
1248 let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else {
1250 if !self.abandoned_paths.contains(&path_id) {
1251 debug!(%path_id, "no remote CIDs for path");
1252 }
1253 return PollPathStatus::NothingToSend {
1254 congestion_blocked: false,
1255 };
1256 };
1257
1258 let mut pad_datagram = PadDatagram::No;
1264
1265 let mut last_packet_number = None;
1269
1270 let mut congestion_blocked = false;
1273
1274 let pmtu = self.path_data(path_id).current_mtu().into();
1276 let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu);
1277
1278 for space_id in SpaceId::iter() {
1280 if path_id != PathId::ZERO && space_id != SpaceId::Data {
1282 continue;
1283 }
1284 match self.poll_transmit_path_space(
1285 now,
1286 &mut transmit,
1287 path_id,
1288 space_id,
1289 remote_cid,
1290 scheduling_info,
1291 connection_close_pending,
1292 pad_datagram,
1293 ) {
1294 PollPathSpaceStatus::NothingToSend {
1295 congestion_blocked: cb,
1296 } => {
1297 congestion_blocked |= cb;
1298 }
1301 PollPathSpaceStatus::WrotePacket {
1302 last_packet_number: pn,
1303 pad_datagram: pad,
1304 } => {
1305 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1306 last_packet_number = Some(pn);
1307 pad_datagram = pad;
1308 continue;
1313 }
1314 PollPathSpaceStatus::Send {
1315 last_packet_number: pn,
1316 } => {
1317 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1318 last_packet_number = Some(pn);
1319 break;
1320 }
1321 }
1322 }
1323
1324 if last_packet_number.is_some() || congestion_blocked {
1325 self.qlog.emit_recovery_metrics(
1326 path_id,
1327 &mut self.paths.get_mut(&path_id).unwrap().data,
1328 now,
1329 );
1330 }
1331
1332 match last_packet_number {
1333 Some(last_packet_number) => {
1334 self.path_data_mut(path_id).congestion.on_sent(
1337 now,
1338 transmit.len() as u64,
1339 last_packet_number,
1340 );
1341 PollPathStatus::Send(self.build_transmit(path_id, transmit))
1342 }
1343 None => PollPathStatus::NothingToSend { congestion_blocked },
1344 }
1345 }
1346
1347 #[must_use]
1349 fn poll_transmit_path_space(
1350 &mut self,
1351 now: Instant,
1352 transmit: &mut TransmitBuf<'_>,
1353 path_id: PathId,
1354 space_id: SpaceId,
1355 remote_cid: ConnectionId,
1356 scheduling_info: &PathSchedulingInfo,
1357 connection_close_pending: bool,
1359 mut pad_datagram: PadDatagram,
1361 ) -> PollPathSpaceStatus {
1362 let mut last_packet_number = None;
1365
1366 loop {
1382 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1384 transmit.datagram_remaining_mut()
1386 } else {
1387 transmit.segment_size()
1389 };
1390 let can_send =
1391 self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending);
1392 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1393 let space_will_send = {
1394 if scheduling_info.is_abandoned {
1395 scheduling_info.may_self_abandon
1400 && self.spaces[space_id]
1401 .pending
1402 .path_abandon
1403 .contains_key(&path_id)
1404 } else if can_send.close && scheduling_info.may_send_close {
1405 true
1407 } else if needs_loss_probe || can_send.space_specific {
1408 true
1411 } else {
1412 !can_send.is_empty() && scheduling_info.may_send_data
1415 }
1416 };
1417
1418 if !space_will_send {
1419 return match last_packet_number {
1422 Some(pn) => PollPathSpaceStatus::WrotePacket {
1423 last_packet_number: pn,
1424 pad_datagram,
1425 },
1426 None => {
1427 if self.crypto_state.has_keys(space_id.encryption_level())
1429 || (space_id == SpaceId::Data
1430 && self.crypto_state.has_keys(EncryptionLevel::ZeroRtt))
1431 {
1432 trace!(?space_id, %path_id, "nothing to send in space");
1433 }
1434 PollPathSpaceStatus::NothingToSend {
1435 congestion_blocked: false,
1436 }
1437 }
1438 };
1439 }
1440
1441 if transmit.datagram_remaining_mut() == 0 {
1445 let congestion_blocked =
1446 self.path_congestion_check(space_id, path_id, transmit, &can_send, now);
1447 if congestion_blocked != PathBlocked::No {
1448 return match last_packet_number {
1450 Some(pn) => PollPathSpaceStatus::WrotePacket {
1451 last_packet_number: pn,
1452 pad_datagram,
1453 },
1454 None => {
1455 return PollPathSpaceStatus::NothingToSend {
1456 congestion_blocked: true,
1457 };
1458 }
1459 };
1460 }
1461 }
1462
1463 if transmit.datagram_remaining_mut() == 0 {
1466 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1467 return match last_packet_number {
1470 Some(pn) => PollPathSpaceStatus::WrotePacket {
1471 last_packet_number: pn,
1472 pad_datagram,
1473 },
1474 None => {
1475 return PollPathSpaceStatus::NothingToSend {
1476 congestion_blocked: false,
1477 };
1478 }
1479 };
1480 }
1481
1482 match self.spaces[space_id].for_path(path_id).loss_probes {
1483 0 => transmit.start_new_datagram(),
1484 _ => {
1485 let request_immediate_ack =
1487 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1488 self.spaces[space_id].maybe_queue_probe(
1489 path_id,
1490 request_immediate_ack,
1491 &self.streams,
1492 );
1493
1494 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1495
1496 transmit.start_new_datagram_with_size(std::cmp::min(
1500 usize::from(INITIAL_MTU),
1501 transmit.segment_size(),
1502 ));
1503 }
1504 }
1505 trace!(count = transmit.num_datagrams(), "new datagram started");
1506
1507 pad_datagram = PadDatagram::No;
1509 }
1510
1511 if transmit.datagram_start_offset() < transmit.len() {
1514 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1515 }
1516
1517 if self.crypto_state.has_keys(EncryptionLevel::Initial)
1522 && space_id == SpaceId::Handshake
1523 && self.side.is_client()
1524 {
1525 self.discard_space(now, SpaceKind::Initial);
1528 }
1529 if let Some(ref mut prev) = self.crypto_state.prev_crypto {
1530 prev.update_unacked = false;
1531 }
1532
1533 let Some(mut builder) = PacketBuilder::new(
1534 now,
1535 space_id,
1536 path_id,
1537 remote_cid,
1538 transmit,
1539 can_send.is_ack_eliciting(),
1540 self,
1541 ) else {
1542 return PollPathSpaceStatus::NothingToSend {
1549 congestion_blocked: false,
1550 };
1551 };
1552 last_packet_number = Some(builder.packet_number);
1553
1554 if space_id == SpaceId::Initial
1555 && (self.side.is_client() || can_send.is_ack_eliciting())
1556 {
1557 pad_datagram |= PadDatagram::ToMinMtu;
1559 }
1560 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1561 pad_datagram |= PadDatagram::ToSegmentSize;
1562 }
1563
1564 if scheduling_info.may_send_close && can_send.close {
1565 trace!("sending CONNECTION_CLOSE");
1566 let is_multipath_negotiated = self.is_multipath_negotiated();
1571 for path_id in self.spaces[space_id]
1572 .number_spaces
1573 .iter()
1574 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1575 .map(|(&path_id, _)| path_id)
1576 .collect::<Vec<_>>()
1577 {
1578 Self::populate_acks(
1579 now,
1580 self.receiving_ecn,
1581 path_id,
1582 space_id,
1583 &mut self.spaces[space_id],
1584 is_multipath_negotiated,
1585 &mut builder,
1586 &mut self.stats.frame_tx,
1587 self.crypto_state.has_keys(space_id.encryption_level()),
1588 );
1589 }
1590
1591 debug_assert!(
1599 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1600 "ACKs should leave space for ConnectionClose"
1601 );
1602 let stats = &mut self.stats.frame_tx;
1603 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1604 let max_frame_size = builder.frame_space_remaining();
1605 let close: Close = match self.state.as_type() {
1606 StateType::Closed => {
1607 let reason: Close =
1608 self.state.as_closed().expect("checked").clone().into();
1609 if space_id == SpaceId::Data || reason.is_transport_layer() {
1610 reason
1611 } else {
1612 TransportError::APPLICATION_ERROR("").into()
1613 }
1614 }
1615 StateType::Draining => TransportError::NO_ERROR("").into(),
1616 _ => unreachable!(
1617 "tried to make a close packet when the connection wasn't closed"
1618 ),
1619 };
1620 builder.write_frame(close.encoder(max_frame_size), stats);
1621 }
1622 let last_pn = builder.packet_number;
1623 builder.finish_and_track(now, self, path_id, pad_datagram);
1624 if space_id.kind() == self.highest_space {
1625 self.connection_close_pending = false;
1628 }
1629 return PollPathSpaceStatus::WrotePacket {
1642 last_packet_number: last_pn,
1643 pad_datagram,
1644 };
1645 }
1646
1647 self.populate_packet(now, space_id, path_id, scheduling_info, &mut builder);
1648
1649 debug_assert!(
1656 !(builder.sent_frames().is_ack_only(&self.streams)
1657 && !can_send.acks
1658 && (can_send.other || can_send.space_specific)
1659 && builder.buf.segment_size()
1660 == self.path_data(path_id).current_mtu() as usize
1661 && self.datagrams.outgoing.is_empty()),
1662 "SendableFrames was {can_send:?}, but only ACKs have been written"
1663 );
1664 if builder.sent_frames().requires_padding {
1665 pad_datagram |= PadDatagram::ToMinMtu;
1666 }
1667
1668 for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1669 self.spaces[space_id]
1670 .for_path(*path_id)
1671 .pending_acks
1672 .acks_sent();
1673 self.timers.stop(
1674 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1675 self.qlog.with_time(now),
1676 );
1677 }
1678
1679 if builder.can_coalesce && path_id == PathId::ZERO && {
1687 let max_packet_size = builder
1688 .buf
1689 .datagram_remaining_mut()
1690 .saturating_sub(builder.predict_packet_end());
1691 max_packet_size > MIN_PACKET_SPACE
1692 && self.has_pending_packet(space_id, max_packet_size, connection_close_pending)
1693 } {
1694 trace!("will coalesce with next packet");
1697 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1698 } else {
1699 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1705 const MAX_PADDING: usize = 32;
1713 if builder.buf.datagram_remaining_mut()
1714 > builder.predict_packet_end() + MAX_PADDING
1715 {
1716 trace!(
1717 "GSO truncated by demand for {} padding bytes",
1718 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1719 );
1720 let last_pn = builder.packet_number;
1721 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1722 return PollPathSpaceStatus::Send {
1723 last_packet_number: last_pn,
1724 };
1725 }
1726
1727 builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1730 } else {
1731 builder.finish_and_track(now, self, path_id, pad_datagram);
1732 }
1733
1734 if transmit.num_datagrams() == 1 {
1737 transmit.clip_segment_size();
1738 }
1739 }
1740 }
1741 }
1742
1743 fn poll_transmit_mtu_probe(
1744 &mut self,
1745 now: Instant,
1746 buf: &mut Vec<u8>,
1747 path_id: PathId,
1748 ) -> Option<Transmit> {
1749 let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?;
1750
1751 let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize);
1753 transmit.start_new_datagram_with_size(probe_size as usize);
1754
1755 let mut builder = PacketBuilder::new(
1756 now,
1757 SpaceId::Data,
1758 path_id,
1759 active_cid,
1760 &mut transmit,
1761 true,
1762 self,
1763 )?;
1764
1765 trace!(?probe_size, "writing MTUD probe");
1767 builder.write_frame(frame::Ping, &mut self.stats.frame_tx);
1768
1769 if self.peer_supports_ack_frequency() {
1771 builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx);
1772 }
1773
1774 builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1775
1776 self.path_stats
1777 .entry(path_id)
1778 .or_default()
1779 .sent_plpmtud_probes += 1;
1780
1781 Some(self.build_transmit(path_id, transmit))
1782 }
1783
1784 fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> {
1792 let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?;
1793 let is_eligible = self.path_data(path_id).validated
1794 && !self.path_data(path_id).is_validating_path()
1795 && !self.abandoned_paths.contains(&path_id);
1796
1797 if !is_eligible {
1798 return None;
1799 }
1800 let next_pn = self.spaces[SpaceId::Data]
1801 .for_path(path_id)
1802 .peek_tx_number();
1803 let probe_size = self
1804 .path_data_mut(path_id)
1805 .mtud
1806 .poll_transmit(now, next_pn)?;
1807
1808 Some((active_cid, probe_size))
1809 }
1810
1811 fn has_pending_packet(
1828 &mut self,
1829 current_space_id: SpaceId,
1830 max_packet_size: usize,
1831 connection_close_pending: bool,
1832 ) -> bool {
1833 let mut space_id = current_space_id;
1834 loop {
1835 let can_send = self.space_can_send(
1836 space_id,
1837 PathId::ZERO,
1838 max_packet_size,
1839 connection_close_pending,
1840 );
1841 if !can_send.is_empty() {
1842 return true;
1843 }
1844 match space_id.next() {
1845 Some(next_space_id) => space_id = next_space_id,
1846 None => break,
1847 }
1848 }
1849 false
1850 }
1851
1852 fn path_congestion_check(
1854 &mut self,
1855 space_id: SpaceId,
1856 path_id: PathId,
1857 transmit: &TransmitBuf<'_>,
1858 can_send: &SendableFrames,
1859 now: Instant,
1860 ) -> PathBlocked {
1861 if self.side().is_server()
1867 && self
1868 .path_data(path_id)
1869 .anti_amplification_blocked(transmit.len() as u64 + 1)
1870 {
1871 trace!(?space_id, %path_id, "blocked by anti-amplification");
1872 return PathBlocked::AntiAmplification;
1873 }
1874
1875 let bytes_to_send = transmit.segment_size() as u64;
1878 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1879
1880 if can_send.other && !need_loss_probe && !can_send.close {
1881 let path = self.path_data(path_id);
1882 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1883 trace!(
1884 ?space_id,
1885 %path_id,
1886 in_flight=%path.in_flight.bytes,
1887 congestion_window=%path.congestion.window(),
1888 "blocked by congestion control",
1889 );
1890 return PathBlocked::Congestion;
1891 }
1892 }
1893
1894 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1896 self.timers.set(
1897 Timer::PerPath(path_id, PathTimer::Pacing),
1898 delay,
1899 self.qlog.with_time(now),
1900 );
1901 trace!(?space_id, %path_id, "blocked by pacing");
1904 return PathBlocked::Pacing;
1905 }
1906
1907 PathBlocked::No
1908 }
1909
1910 fn send_prev_path_challenge(
1915 &mut self,
1916 now: Instant,
1917 buf: &mut Vec<u8>,
1918 path_id: PathId,
1919 ) -> Option<Transmit> {
1920 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1921 if !prev_path.pending_on_path_challenge {
1924 return None;
1925 };
1926 prev_path.pending_on_path_challenge = false;
1927 let token = self.rng.random();
1928 let network_path = prev_path.network_path;
1929 prev_path.record_path_challenge_sent(now, token, network_path);
1930
1931 debug_assert_eq!(
1932 self.highest_space,
1933 SpaceKind::Data,
1934 "PATH_CHALLENGE queued without 1-RTT keys"
1935 );
1936 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1937 buf.start_new_datagram();
1938
1939 let mut builder =
1945 PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, false, self)?;
1946 let challenge = frame::PathChallenge(token);
1947 let stats = &mut self.stats.frame_tx;
1948 builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1949
1950 builder.pad_to(MIN_INITIAL_SIZE);
1955
1956 builder.finish(self, now);
1957 self.stats.udp_tx.on_sent(1, buf.len());
1958 self.path_stats
1959 .entry(path_id)
1960 .or_default()
1961 .udp_tx
1962 .on_sent(1, buf.len());
1963
1964 Some(Transmit {
1965 destination: network_path.remote,
1966 size: buf.len(),
1967 ecn: None,
1968 segment_size: None,
1969 src_ip: network_path.local_ip,
1970 })
1971 }
1972
1973 fn send_off_path_path_response(
1974 &mut self,
1975 now: Instant,
1976 buf: &mut Vec<u8>,
1977 path_id: PathId,
1978 ) -> Option<Transmit> {
1979 let path = self.paths.get_mut(&path_id).map(|state| &mut state.data)?;
1980 let cid_queue = self.remote_cids.get_mut(&path_id)?;
1981 let (token, network_path) = path.path_responses.pop_off_path(path.network_path)?;
1982
1983 let cid = cid_queue
1984 .next_reserved()
1985 .unwrap_or_else(|| cid_queue.active());
1986 let frame = frame::PathResponse(token);
1990
1991 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1992 buf.start_new_datagram();
1993
1994 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, false, self)?;
1995 let stats = &mut self.stats.frame_tx;
1996 builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
1997 builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
1998
1999 let size = buf.len();
2000
2001 self.stats.udp_tx.on_sent(1, size);
2002 self.path_stats
2003 .entry(path_id)
2004 .or_default()
2005 .udp_tx
2006 .on_sent(1, size);
2007 Some(Transmit {
2008 destination: network_path.remote,
2009 size,
2010 ecn: None,
2011 segment_size: None,
2012 src_ip: network_path.local_ip,
2013 })
2014 }
2015
2016 fn send_nat_traversal_path_challenge(
2021 &mut self,
2022 now: Instant,
2023 buf: &mut Vec<u8>,
2024 path_id: PathId,
2025 ) -> Option<Transmit> {
2026 let server_side = self.n0_nat_traversal.server_side_mut().ok()?;
2027 let probe = server_side.next_probe()?;
2028 if !self.paths.get(&path_id)?.data.validated {
2029 return None;
2031 }
2032
2033 let remote_cids = self.remote_cids.get_mut(&path_id)?;
2034
2035 if remote_cids.remaining() < 2 {
2038 return None;
2039 }
2040
2041 let cid = remote_cids.next_reserved()?;
2042 let remote = probe.remote();
2043 let token = self.rng.random();
2044 probe.mark_as_sent();
2045
2046 let frame = frame::PathChallenge(token);
2047
2048 let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2049 buf.start_new_datagram();
2050
2051 let mut builder =
2052 PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, false, self)?;
2053 let stats = &mut self.stats.frame_tx;
2054 builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
2055 builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
2056
2057 let path = &mut self.paths.get_mut(&path_id).expect("checked").data;
2058 let network_path = FourTuple {
2059 remote,
2060 local_ip: None,
2061 };
2062
2063 path.record_path_challenge_sent(now, token, network_path);
2064
2065 let size = buf.len();
2066
2067 self.stats.udp_tx.on_sent(1, size);
2068 self.path_stats
2069 .entry(path_id)
2070 .or_default()
2071 .udp_tx
2072 .on_sent(1, size);
2073
2074 Some(Transmit {
2075 destination: remote,
2076 size,
2077 ecn: None,
2078 segment_size: None,
2079 src_ip: None,
2080 })
2081 }
2082
2083 fn space_can_send(
2091 &mut self,
2092 space_id: SpaceId,
2093 path_id: PathId,
2094 packet_size: usize,
2095 connection_close_pending: bool,
2096 ) -> SendableFrames {
2097 let space = &mut self.spaces[space_id];
2098 let space_has_crypto = self.crypto_state.has_keys(space_id.encryption_level());
2099
2100 if !space_has_crypto
2101 && (space_id != SpaceId::Data
2102 || !self.crypto_state.has_keys(EncryptionLevel::ZeroRtt)
2103 || self.side.is_server())
2104 {
2105 return SendableFrames::empty();
2107 }
2108
2109 let mut can_send = space.can_send(path_id, &self.streams);
2110
2111 if space_id == SpaceId::Data {
2113 let pn = space.for_path(path_id).peek_tx_number();
2114 let frame_space_1rtt =
2120 packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
2121 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
2122 }
2123
2124 can_send.close = connection_close_pending && space_has_crypto;
2125
2126 can_send
2127 }
2128
2129 pub fn handle_event(&mut self, event: ConnectionEvent) {
2135 use ConnectionEventInner::*;
2136 match event.0 {
2137 Datagram(DatagramConnectionEvent {
2138 now,
2139 network_path,
2140 path_id,
2141 ecn,
2142 first_decode,
2143 remaining,
2144 }) => {
2145 let span = trace_span!("pkt", %path_id);
2146 let _guard = span.enter();
2147
2148 if self.update_network_path_or_discard(network_path, path_id) {
2149 return;
2151 }
2152
2153 let was_anti_amplification_blocked = self
2154 .path(path_id)
2155 .map(|path| path.anti_amplification_blocked(1))
2156 .unwrap_or(false);
2159
2160 self.stats.udp_rx.datagrams += 1;
2161 self.stats.udp_rx.bytes += first_decode.len() as u64;
2162 let rx = &mut self.path_stats.entry(path_id).or_default().udp_rx;
2163 rx.datagrams += 1;
2164 rx.bytes += first_decode.len() as u64;
2165 let data_len = first_decode.len();
2166
2167 self.handle_decode(now, network_path, path_id, ecn, first_decode);
2168 if let Some(path) = self.path_mut(path_id) {
2173 path.inc_total_recvd(data_len as u64);
2174 }
2175
2176 if let Some(data) = remaining {
2177 self.stats.udp_rx.bytes += data.len() as u64;
2178 self.path_stats.entry(path_id).or_default().udp_rx.bytes += data.len() as u64;
2179 self.handle_coalesced(now, network_path, path_id, ecn, data);
2180 }
2181
2182 if let Some(path) = self.paths.get_mut(&path_id) {
2183 self.qlog
2184 .emit_recovery_metrics(path_id, &mut path.data, now);
2185 }
2186
2187 if was_anti_amplification_blocked {
2188 self.set_loss_detection_timer(now, path_id);
2192 }
2193 }
2194 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
2195 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
2196 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
2197 let cid_state = self
2198 .local_cid_state
2199 .entry(path_id)
2200 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
2201 cid_state.new_cids(&ids, now);
2202
2203 ids.into_iter().rev().for_each(|frame| {
2204 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
2205 });
2206 self.reset_cid_retirement(now);
2208 }
2209 }
2210 }
2211
2212 fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
2217 let remote_may_migrate = self.side.remote_may_migrate(&self.state);
2218 let local_ip_may_migrate = self.side.is_client();
2219 if let Some(known_path) = self.path_mut(path_id) {
2223 if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
2224 trace!(
2225 %path_id,
2226 %network_path,
2227 %known_path.network_path,
2228 "discarding packet from unrecognized peer"
2229 );
2230 return true;
2231 }
2232
2233 if known_path.network_path.local_ip.is_some()
2234 && network_path.local_ip.is_some()
2235 && known_path.network_path.local_ip != network_path.local_ip
2236 && !local_ip_may_migrate
2237 {
2238 trace!(
2239 %path_id,
2240 %network_path,
2241 %known_path.network_path,
2242 "discarding packet sent to incorrect interface"
2243 );
2244 return true;
2245 }
2246 if let Some(local_ip) = network_path.local_ip {
2251 if known_path
2252 .network_path
2253 .local_ip
2254 .is_some_and(|ip| ip != local_ip)
2255 {
2256 debug!(
2257 %path_id,
2258 %network_path,
2259 %known_path.network_path,
2260 "path's local address seemingly migrated"
2261 );
2262 }
2263 known_path.network_path.local_ip = Some(local_ip);
2270 }
2271 }
2272 false
2273 }
2274
2275 pub fn handle_timeout(&mut self, now: Instant) {
2285 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
2286 trace!(?timer, at=?now, "timeout");
2288 match timer {
2289 Timer::Conn(timer) => match timer {
2290 ConnTimer::Close => {
2291 self.state.move_to_drained(None);
2292 self.endpoint_events.push_back(EndpointEventInner::Drained);
2295 }
2296 ConnTimer::Idle => {
2297 self.kill(ConnectionError::TimedOut);
2298 }
2299 ConnTimer::KeepAlive => {
2300 trace!("sending keep-alive");
2301 self.ping();
2302 }
2303 ConnTimer::KeyDiscard => {
2304 self.crypto_state.discard_temporary_keys();
2305 }
2306 ConnTimer::PushNewCid => {
2307 while let Some((path_id, when)) = self.next_cid_retirement() {
2308 if when > now {
2309 break;
2310 }
2311 match self.local_cid_state.get_mut(&path_id) {
2312 None => error!(%path_id, "No local CID state for path"),
2313 Some(cid_state) => {
2314 let num_new_cid = cid_state.on_cid_timeout().into();
2316 if !self.state.is_closed() {
2317 trace!(
2318 "push a new CID to peer RETIRE_PRIOR_TO field {}",
2319 cid_state.retire_prior_to()
2320 );
2321 self.endpoint_events.push_back(
2322 EndpointEventInner::NeedIdentifiers(
2323 path_id,
2324 now,
2325 num_new_cid,
2326 ),
2327 );
2328 }
2329 }
2330 }
2331 }
2332 }
2333 },
2334 Timer::PerPath(path_id, timer) => {
2336 let span = trace_span!("per-path timer fired", %path_id, ?timer);
2337 let _guard = span.enter();
2338 match timer {
2339 PathTimer::PathIdle => {
2340 if let Err(err) =
2341 self.close_path_inner(now, path_id, PathAbandonReason::TimedOut)
2342 {
2343 warn!(?err, "failed closing path");
2344 }
2345 }
2346
2347 PathTimer::PathKeepAlive => {
2348 trace!("sending keep-alive on path");
2349 self.ping_path(path_id).ok();
2350 }
2351 PathTimer::LossDetection => {
2352 self.on_loss_detection_timeout(now, path_id);
2353 self.qlog.emit_recovery_metrics(
2354 path_id,
2355 &mut self.paths.get_mut(&path_id).unwrap().data,
2356 now,
2357 );
2358 }
2359 PathTimer::PathValidation => {
2360 let Some(path) = self.paths.get_mut(&path_id) else {
2361 continue;
2362 };
2363 self.timers.stop(
2364 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2365 self.qlog.with_time(now),
2366 );
2367 debug!("path validation failed");
2368 if let Some((_, prev)) = path.prev.take() {
2369 path.data = prev;
2370 }
2371 path.data.reset_on_path_challenges();
2372 }
2373 PathTimer::PathChallengeLost => {
2374 let Some(path) = self.paths.get_mut(&path_id) else {
2375 continue;
2376 };
2377 trace!("path challenge deemed lost");
2378 path.data.pending_on_path_challenge = true;
2379 }
2380 PathTimer::PathOpen => {
2381 let Some(path) = self.paths.get_mut(&path_id) else {
2382 continue;
2383 };
2384 path.data.reset_on_path_challenges();
2385 self.timers.stop(
2386 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2387 self.qlog.with_time(now),
2388 );
2389 debug!("new path validation failed");
2390 if let Err(err) = self.close_path_inner(
2391 now,
2392 path_id,
2393 PathAbandonReason::ValidationFailed,
2394 ) {
2395 warn!(?err, "failed closing path");
2396 }
2397 }
2398 PathTimer::Pacing => trace!("pacing timer expired"),
2399 PathTimer::MaxAckDelay => {
2400 trace!("max ack delay reached");
2401 self.spaces[SpaceId::Data]
2403 .for_path(path_id)
2404 .pending_acks
2405 .on_max_ack_delay_timeout()
2406 }
2407 PathTimer::DiscardPath => {
2408 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2411 if let Some(local_cid_state) = self.local_cid_state.remove(&path_id) {
2412 debug_assert!(!self.state.is_drained()); let (min_seq, max_seq) = local_cid_state.active_seq();
2414 for seq in min_seq..=max_seq {
2415 self.endpoint_events.push_back(
2416 EndpointEventInner::RetireConnectionId(
2417 now, path_id, seq, false,
2418 ),
2419 );
2420 }
2421 }
2422 self.discard_path(path_id, now);
2423 }
2424 }
2425 }
2426 }
2427 }
2428 }
2429
2430 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2442 self.close_inner(
2443 now,
2444 Close::Application(frame::ApplicationClose { error_code, reason }),
2445 )
2446 }
2447
2448 fn close_inner(&mut self, now: Instant, reason: Close) {
2464 let was_closed = self.state.is_closed();
2465 if !was_closed {
2466 self.close_common();
2467 self.set_close_timer(now);
2468 self.connection_close_pending = true;
2469 self.state.move_to_closed_local(reason);
2470 }
2471 }
2472
2473 pub fn datagrams(&mut self) -> Datagrams<'_> {
2475 Datagrams { conn: self }
2476 }
2477
2478 pub fn stats(&mut self) -> ConnectionStats {
2480 self.stats.clone()
2481 }
2482
2483 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2485 let path = self.paths.get(&path_id)?;
2486 let stats = self.path_stats.entry(path_id).or_default();
2487 stats.rtt = path.data.rtt.get();
2488 stats.cwnd = path.data.congestion.window();
2489 stats.current_mtu = path.data.mtud.current_mtu();
2490 Some(*stats)
2491 }
2492
2493 pub fn ping(&mut self) {
2497 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2500 path_data.ping_pending = true;
2501 }
2502 }
2503
2504 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2508 let path_data = self.spaces[self.highest_space]
2509 .number_spaces
2510 .get_mut(&path)
2511 .ok_or(ClosedPath { _private: () })?;
2512 path_data.ping_pending = true;
2513 Ok(())
2514 }
2515
2516 pub fn force_key_update(&mut self) {
2520 if !self.state.is_established() {
2521 debug!("ignoring forced key update in illegal state");
2522 return;
2523 }
2524 if self.crypto_state.prev_crypto.is_some() {
2525 debug!("ignoring redundant forced key update");
2528 return;
2529 }
2530 self.crypto_state.update_keys(None, false);
2531 }
2532
2533 pub fn crypto_session(&self) -> &dyn crypto::Session {
2535 self.crypto_state.session.as_ref()
2536 }
2537
2538 pub fn is_handshaking(&self) -> bool {
2543 self.state.is_handshake()
2544 }
2545
2546 pub fn is_closed(&self) -> bool {
2554 self.state.is_closed()
2555 }
2556
2557 pub fn is_drained(&self) -> bool {
2562 self.state.is_drained()
2563 }
2564
2565 pub fn accepted_0rtt(&self) -> bool {
2569 self.crypto_state.accepted_0rtt
2570 }
2571
2572 pub fn has_0rtt(&self) -> bool {
2574 self.crypto_state.zero_rtt_enabled
2575 }
2576
2577 pub fn has_pending_retransmits(&self) -> bool {
2579 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2580 }
2581
2582 pub fn side(&self) -> Side {
2584 self.side.side()
2585 }
2586
2587 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2589 self.path(path_id)
2590 .map(|path_data| {
2591 path_data
2592 .last_observed_addr_report
2593 .as_ref()
2594 .map(|observed| observed.socket_addr())
2595 })
2596 .ok_or(ClosedPath { _private: () })
2597 }
2598
2599 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2601 self.path(path_id).map(|d| d.rtt.get())
2602 }
2603
2604 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2606 self.path(path_id).map(|d| d.congestion.as_ref())
2607 }
2608
2609 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2614 self.streams.set_max_concurrent(dir, count);
2615 let pending = &mut self.spaces[SpaceId::Data].pending;
2618 self.streams.queue_max_stream_id(pending);
2619 }
2620
2621 pub fn set_max_concurrent_paths(
2631 &mut self,
2632 now: Instant,
2633 count: NonZeroU32,
2634 ) -> Result<(), MultipathNotNegotiated> {
2635 if !self.is_multipath_negotiated() {
2636 return Err(MultipathNotNegotiated { _private: () });
2637 }
2638 self.max_concurrent_paths = count;
2639
2640 let in_use_count = self
2641 .local_max_path_id
2642 .next()
2643 .saturating_sub(self.abandoned_paths.len() as u32)
2644 .as_u32();
2645 let extra_needed = count.get().saturating_sub(in_use_count);
2646 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2647
2648 self.set_max_path_id(now, new_max_path_id);
2649
2650 Ok(())
2651 }
2652
2653 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2655 if max_path_id <= self.local_max_path_id {
2656 return;
2657 }
2658
2659 self.local_max_path_id = max_path_id;
2660 self.spaces[SpaceId::Data].pending.max_path_id = true;
2661
2662 self.issue_first_path_cids(now);
2663 }
2664
2665 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2671 self.streams.max_concurrent(dir)
2672 }
2673
2674 pub fn set_send_window(&mut self, send_window: u64) {
2676 self.streams.set_send_window(send_window);
2677 }
2678
2679 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2681 if self.streams.set_receive_window(receive_window) {
2682 self.spaces[SpaceId::Data].pending.max_data = true;
2683 }
2684 }
2685
2686 pub fn is_multipath_negotiated(&self) -> bool {
2691 !self.is_handshaking()
2692 && self.config.max_concurrent_multipath_paths.is_some()
2693 && self.peer_params.initial_max_path_id.is_some()
2694 }
2695
2696 fn on_ack_received(
2697 &mut self,
2698 now: Instant,
2699 space: SpaceId,
2700 ack: frame::Ack,
2701 ) -> Result<(), TransportError> {
2702 let path = PathId::ZERO;
2704 self.inner_on_ack_received(now, space, path, ack)
2705 }
2706
2707 fn on_path_ack_received(
2708 &mut self,
2709 now: Instant,
2710 space: SpaceId,
2711 path_ack: frame::PathAck,
2712 ) -> Result<(), TransportError> {
2713 let (ack, path) = path_ack.into_ack();
2714 self.inner_on_ack_received(now, space, path, ack)
2715 }
2716
2717 fn inner_on_ack_received(
2719 &mut self,
2720 now: Instant,
2721 space: SpaceId,
2722 path: PathId,
2723 ack: frame::Ack,
2724 ) -> Result<(), TransportError> {
2725 if self.abandoned_paths.contains(&path) {
2726 trace!("silently ignoring PATH_ACK on abandoned path");
2729 return Ok(());
2730 }
2731 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2732 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2733 }
2734 let new_largest = {
2735 let space = &mut self.spaces[space].for_path(path);
2736 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2737 space.largest_acked_packet = Some(ack.largest);
2738 if let Some(info) = space.sent_packets.get(ack.largest) {
2739 space.largest_acked_packet_sent = info.time_sent;
2743 }
2744 true
2745 } else {
2746 false
2747 }
2748 };
2749
2750 if self.detect_spurious_loss(&ack, space, path) {
2751 self.path_data_mut(path)
2752 .congestion
2753 .on_spurious_congestion_event();
2754 }
2755
2756 let mut newly_acked = ArrayRangeSet::new();
2758 for range in ack.iter() {
2759 self.spaces[space].for_path(path).check_ack(range.clone())?;
2760 for (pn, _) in self.spaces[space]
2761 .for_path(path)
2762 .sent_packets
2763 .iter_range(range)
2764 {
2765 newly_acked.insert_one(pn);
2766 }
2767 }
2768
2769 if newly_acked.is_empty() {
2770 return Ok(());
2771 }
2772
2773 let mut ack_eliciting_acked = false;
2774 for packet in newly_acked.elts() {
2775 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2776 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2777 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2783 pns.pending_acks.subtract_below(*acked_pn);
2784 }
2785 }
2786 ack_eliciting_acked |= info.ack_eliciting;
2787
2788 let path_data = self.path_data_mut(path);
2790 let mtu_updated = path_data.mtud.on_acked(space.kind(), packet, info.size);
2791 if mtu_updated {
2792 path_data
2793 .congestion
2794 .on_mtu_update(path_data.mtud.current_mtu());
2795 }
2796
2797 self.ack_frequency.on_acked(path, packet);
2799
2800 self.on_packet_acked(now, path, info);
2801 }
2802 }
2803
2804 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2805 let app_limited = self.app_limited;
2806 let path_data = self.path_data_mut(path);
2807 let in_flight = path_data.in_flight.bytes;
2808
2809 path_data
2810 .congestion
2811 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2812
2813 if new_largest && ack_eliciting_acked {
2814 let ack_delay = if space != SpaceId::Data {
2815 Duration::from_micros(0)
2816 } else {
2817 cmp::min(
2818 self.ack_frequency.peer_max_ack_delay,
2819 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2820 )
2821 };
2822 let rtt = now.saturating_duration_since(
2823 self.spaces[space].for_path(path).largest_acked_packet_sent,
2824 );
2825
2826 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2827 let path_data = self.path_data_mut(path);
2828 path_data.rtt.update(ack_delay, rtt);
2830 if path_data.first_packet_after_rtt_sample.is_none() {
2831 path_data.first_packet_after_rtt_sample = Some((space.kind(), next_pn));
2832 }
2833 }
2834
2835 self.detect_lost_packets(now, space, path, true);
2837
2838 if self.peer_completed_address_validation(path) {
2839 self.path_data_mut(path).pto_count = 0;
2840 }
2841
2842 if self.path_data(path).sending_ecn {
2847 if let Some(ecn) = ack.ecn {
2848 if new_largest {
2853 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2854 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2855 }
2856 } else {
2857 debug!("ECN not acknowledged by peer");
2859 self.path_data_mut(path).sending_ecn = false;
2860 }
2861 }
2862
2863 self.set_loss_detection_timer(now, path);
2864 Ok(())
2865 }
2866
2867 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2868 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2869
2870 if lost_packets.is_empty() {
2871 return false;
2872 }
2873
2874 for range in ack.iter() {
2875 let spurious_losses: Vec<u64> = lost_packets
2876 .iter_range(range.clone())
2877 .map(|(pn, _info)| pn)
2878 .collect();
2879
2880 for pn in spurious_losses {
2881 lost_packets.remove(pn);
2882 }
2883 }
2884
2885 lost_packets.is_empty()
2890 }
2891
2892 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2897 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2898
2899 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2900 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2901 }
2902
2903 fn process_ecn(
2905 &mut self,
2906 now: Instant,
2907 space: SpaceId,
2908 path: PathId,
2909 newly_acked: u64,
2910 ecn: frame::EcnCounts,
2911 largest_sent_time: Instant,
2912 ) {
2913 match self.spaces[space]
2914 .for_path(path)
2915 .detect_ecn(newly_acked, ecn)
2916 {
2917 Err(e) => {
2918 debug!("halting ECN due to verification failure: {}", e);
2919
2920 self.path_data_mut(path).sending_ecn = false;
2921 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2924 }
2925 Ok(false) => {}
2926 Ok(true) => {
2927 self.path_stats.entry(path).or_default().congestion_events += 1;
2928 self.path_data_mut(path).congestion.on_congestion_event(
2929 now,
2930 largest_sent_time,
2931 false,
2932 true,
2933 0,
2934 );
2935 }
2936 }
2937 }
2938
2939 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2942 let app_limited = self.app_limited;
2943 let path = self.path_data_mut(path_id);
2944 path.remove_in_flight(&info);
2945 if info.ack_eliciting && info.path_generation == path.generation() {
2946 let rtt = path.rtt;
2950 path.congestion
2951 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2952 }
2953
2954 if let Some(retransmits) = info.retransmits.get() {
2956 for (id, _) in retransmits.reset_stream.iter() {
2957 self.streams.reset_acked(*id);
2958 }
2959 }
2960
2961 for frame in info.stream_frames {
2962 self.streams.received_ack_of(frame);
2963 }
2964 }
2965
2966 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceKind) {
2967 let start = if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) {
2968 now
2969 } else {
2970 self.crypto_state
2971 .prev_crypto
2972 .as_ref()
2973 .expect("no previous keys")
2974 .end_packet
2975 .as_ref()
2976 .expect("update not acknowledged yet")
2977 .1
2978 };
2979
2980 self.timers.set(
2982 Timer::Conn(ConnTimer::KeyDiscard),
2983 start + self.max_pto_for_space(space) * 3,
2984 self.qlog.with_time(now),
2985 );
2986 }
2987
2988 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
3001 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
3002 self.detect_lost_packets(now, pn_space, path_id, false);
3004 self.set_loss_detection_timer(now, path_id);
3005 return;
3006 }
3007
3008 let Some((_, space)) = self.pto_time_and_space(now, path_id) else {
3009 error!(%path_id, "PTO expired while unset");
3010 return;
3011 };
3012 trace!(
3013 in_flight = self.path_data(path_id).in_flight.bytes,
3014 count = self.path_data(path_id).pto_count,
3015 ?space,
3016 %path_id,
3017 "PTO fired"
3018 );
3019
3020 let count = match self.path_data(path_id).in_flight.ack_eliciting {
3021 0 => {
3024 debug_assert!(!self.peer_completed_address_validation(path_id));
3025 1
3026 }
3027 _ => 2,
3029 };
3030 let pns = self.spaces[space].for_path(path_id);
3031 pns.loss_probes = pns.loss_probes.saturating_add(count);
3032 let path_data = self.path_data_mut(path_id);
3033 path_data.pto_count = path_data.pto_count.saturating_add(1);
3034 self.set_loss_detection_timer(now, path_id);
3035 }
3036
3037 fn detect_lost_packets(
3054 &mut self,
3055 now: Instant,
3056 pn_space: SpaceId,
3057 path_id: PathId,
3058 due_to_ack: bool,
3059 ) {
3060 let mut lost_packets = Vec::<u64>::new();
3061 let mut lost_mtu_probe = None;
3062 let mut in_persistent_congestion = false;
3063 let mut size_of_lost_packets = 0u64;
3064 self.spaces[pn_space].for_path(path_id).loss_time = None;
3065
3066 let path = self.path_data(path_id);
3069 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3070 let loss_delay = path
3071 .rtt
3072 .conservative()
3073 .mul_f32(self.config.time_threshold)
3074 .max(TIMER_GRANULARITY);
3075 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
3076
3077 let largest_acked_packet = self.spaces[pn_space]
3078 .for_path(path_id)
3079 .largest_acked_packet
3080 .expect("detect_lost_packets only to be called if path received at least one ACK");
3081 let packet_threshold = self.config.packet_threshold as u64;
3082
3083 let congestion_period = self
3087 .pto(SpaceKind::Data, path_id)
3088 .saturating_mul(self.config.persistent_congestion_threshold);
3089 let mut persistent_congestion_start: Option<Instant> = None;
3090 let mut prev_packet = None;
3091 let space = self.spaces[pn_space].for_path(path_id);
3092
3093 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
3094 if prev_packet != Some(packet.wrapping_sub(1)) {
3095 persistent_congestion_start = None;
3097 }
3098
3099 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
3103 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
3104 if Some(packet) == in_flight_mtu_probe {
3106 lost_mtu_probe = in_flight_mtu_probe;
3109 } else {
3110 lost_packets.push(packet);
3111 size_of_lost_packets += info.size as u64;
3112 if info.ack_eliciting && due_to_ack {
3113 match persistent_congestion_start {
3114 Some(start) if info.time_sent - start > congestion_period => {
3117 in_persistent_congestion = true;
3118 }
3119 None if first_packet_after_rtt_sample
3121 .is_some_and(|x| x < (pn_space.kind(), packet)) =>
3122 {
3123 persistent_congestion_start = Some(info.time_sent);
3124 }
3125 _ => {}
3126 }
3127 }
3128 }
3129 } else {
3130 if space.loss_time.is_none() {
3132 space.loss_time = Some(info.time_sent + loss_delay);
3135 }
3136 persistent_congestion_start = None;
3137 }
3138
3139 prev_packet = Some(packet);
3140 }
3141
3142 self.handle_lost_packets(
3143 pn_space,
3144 path_id,
3145 now,
3146 lost_packets,
3147 lost_mtu_probe,
3148 loss_delay,
3149 in_persistent_congestion,
3150 size_of_lost_packets,
3151 );
3152 }
3153
3154 fn discard_path(&mut self, path_id: PathId, now: Instant) {
3156 trace!(%path_id, "dropping path state");
3157 let path = self.path_data(path_id);
3158 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3159
3160 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
3162 .for_path(path_id)
3163 .sent_packets
3164 .iter()
3165 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
3166 .map(|(pn, info)| {
3167 size_of_lost_packets += info.size as u64;
3168 pn
3169 })
3170 .collect();
3171
3172 if !lost_pns.is_empty() {
3173 trace!(
3174 %path_id,
3175 count = lost_pns.len(),
3176 lost_bytes = size_of_lost_packets,
3177 "packets lost on path abandon"
3178 );
3179 self.handle_lost_packets(
3180 SpaceId::Data,
3181 path_id,
3182 now,
3183 lost_pns,
3184 in_flight_mtu_probe,
3185 Duration::ZERO,
3186 false,
3187 size_of_lost_packets,
3188 );
3189 }
3190 let path_stats = self.path_stats(path_id).unwrap_or_default();
3193 self.path_stats.remove(&path_id);
3194 self.paths.remove(&path_id);
3195 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
3196
3197 self.events.push_back(
3198 PathEvent::Discarded {
3199 id: path_id,
3200 path_stats,
3201 }
3202 .into(),
3203 );
3204 }
3205
3206 fn handle_lost_packets(
3207 &mut self,
3208 pn_space: SpaceId,
3209 path_id: PathId,
3210 now: Instant,
3211 lost_packets: Vec<u64>,
3212 lost_mtu_probe: Option<u64>,
3213 loss_delay: Duration,
3214 in_persistent_congestion: bool,
3215 size_of_lost_packets: u64,
3216 ) {
3217 debug_assert!(
3218 {
3219 let mut sorted = lost_packets.clone();
3220 sorted.sort();
3221 sorted == lost_packets
3222 },
3223 "lost_packets must be sorted"
3224 );
3225
3226 self.drain_lost_packets(now, pn_space, path_id);
3227
3228 if let Some(largest_lost) = lost_packets.last().cloned() {
3230 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
3231 let largest_lost_sent = self.spaces[pn_space]
3232 .for_path(path_id)
3233 .sent_packets
3234 .get(largest_lost)
3235 .unwrap()
3236 .time_sent;
3237 let path_stats = self.path_stats.entry(path_id).or_default();
3238 path_stats.lost_packets += lost_packets.len() as u64;
3239 path_stats.lost_bytes += size_of_lost_packets;
3240 trace!(
3241 %path_id,
3242 count = lost_packets.len(),
3243 lost_bytes = size_of_lost_packets,
3244 "packets lost",
3245 );
3246
3247 for &packet in &lost_packets {
3248 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
3249 continue;
3250 };
3251 self.qlog
3252 .emit_packet_lost(packet, &info, loss_delay, pn_space.kind(), now);
3253 self.paths
3254 .get_mut(&path_id)
3255 .unwrap()
3256 .remove_in_flight(&info);
3257
3258 for frame in info.stream_frames {
3259 self.streams.retransmit(frame);
3260 }
3261 self.spaces[pn_space].pending |= info.retransmits;
3262 self.path_data_mut(path_id)
3263 .mtud
3264 .on_non_probe_lost(packet, info.size);
3265
3266 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
3267 packet,
3268 LostPacket {
3269 time_sent: info.time_sent,
3270 },
3271 );
3272 }
3273
3274 let path = self.path_data_mut(path_id);
3275 if path.mtud.black_hole_detected(now) {
3276 path.congestion.on_mtu_update(path.mtud.current_mtu());
3277 if let Some(max_datagram_size) = self.datagrams().max_size()
3278 && self.datagrams.drop_oversized(max_datagram_size)
3279 && self.datagrams.send_blocked
3280 {
3281 self.datagrams.send_blocked = false;
3282 self.events.push_back(Event::DatagramsUnblocked);
3283 }
3284 self.path_stats
3285 .entry(path_id)
3286 .or_default()
3287 .black_holes_detected += 1;
3288 }
3289
3290 let lost_ack_eliciting =
3292 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
3293
3294 if lost_ack_eliciting {
3295 self.path_stats
3296 .entry(path_id)
3297 .or_default()
3298 .congestion_events += 1;
3299 self.path_data_mut(path_id).congestion.on_congestion_event(
3300 now,
3301 largest_lost_sent,
3302 in_persistent_congestion,
3303 false,
3304 size_of_lost_packets,
3305 );
3306 }
3307 }
3308
3309 if let Some(packet) = lost_mtu_probe {
3311 let info = self.spaces[SpaceId::Data]
3312 .for_path(path_id)
3313 .take(packet)
3314 .unwrap(); self.paths
3317 .get_mut(&path_id)
3318 .unwrap()
3319 .remove_in_flight(&info);
3320 self.path_data_mut(path_id).mtud.on_probe_lost();
3321 self.path_stats
3322 .entry(path_id)
3323 .or_default()
3324 .lost_plpmtud_probes += 1;
3325 }
3326 }
3327
3328 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
3334 SpaceId::iter()
3335 .filter_map(|id| {
3336 self.spaces[id]
3337 .number_spaces
3338 .get(&path_id)
3339 .and_then(|pns| pns.loss_time)
3340 .map(|time| (time, id))
3341 })
3342 .min_by_key(|&(time, _)| time)
3343 }
3344
3345 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3347 let path = self.path(path_id)?;
3348 let pto_count = path.pto_count;
3349 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
3350 let mut duration = path.rtt.pto_base() * backoff;
3351
3352 if path_id == PathId::ZERO
3353 && path.in_flight.ack_eliciting == 0
3354 && !self.peer_completed_address_validation(PathId::ZERO)
3355 {
3356 let space = match self.highest_space {
3362 SpaceKind::Handshake => SpaceId::Handshake,
3363 _ => SpaceId::Initial,
3364 };
3365
3366 return Some((now + duration, space));
3367 }
3368
3369 let mut result = None;
3370 for space in SpaceId::iter() {
3371 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3372 continue;
3373 };
3374
3375 if !pns.has_in_flight() {
3376 continue;
3377 }
3378 if space == SpaceId::Data {
3379 if self.is_handshaking() {
3381 return result;
3382 }
3383 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3385 }
3386 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3387 continue;
3388 };
3389 let pto = last_ack_eliciting + duration;
3390 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3391 if path.anti_amplification_blocked(1) {
3392 continue;
3394 }
3395 if path.in_flight.ack_eliciting == 0 {
3396 continue;
3398 }
3399 result = Some((pto, space));
3400 }
3401 }
3402 result
3403 }
3404
3405 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3406 if self.side.is_server() || self.state.is_closed() {
3408 return true;
3409 }
3410 self.spaces[SpaceId::Handshake]
3413 .path_space(PathId::ZERO)
3414 .and_then(|pns| pns.largest_acked_packet)
3415 .is_some()
3416 || self.spaces[SpaceId::Data]
3417 .path_space(path)
3418 .and_then(|pns| pns.largest_acked_packet)
3419 .is_some()
3420 || (self.crypto_state.has_keys(EncryptionLevel::OneRtt)
3421 && !self.crypto_state.has_keys(EncryptionLevel::Handshake))
3422 }
3423
3424 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3432 if self.state.is_closed() {
3433 return;
3437 }
3438
3439 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3440 self.timers.set(
3442 Timer::PerPath(path_id, PathTimer::LossDetection),
3443 loss_time,
3444 self.qlog.with_time(now),
3445 );
3446 return;
3447 }
3448
3449 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3452 self.timers.set(
3453 Timer::PerPath(path_id, PathTimer::LossDetection),
3454 timeout,
3455 self.qlog.with_time(now),
3456 );
3457 } else {
3458 self.timers.stop(
3459 Timer::PerPath(path_id, PathTimer::LossDetection),
3460 self.qlog.with_time(now),
3461 );
3462 }
3463 }
3464
3465 fn max_pto_for_space(&self, space: SpaceKind) -> Duration {
3469 self.paths
3470 .keys()
3471 .map(|path_id| self.pto(space, *path_id))
3472 .max()
3473 .expect("there should be at least one path")
3474 }
3475
3476 fn pto(&self, space: SpaceKind, path_id: PathId) -> Duration {
3481 let max_ack_delay = match space {
3482 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3483 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3484 };
3485 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3486 }
3487
3488 fn on_packet_authenticated(
3489 &mut self,
3490 now: Instant,
3491 space_id: SpaceKind,
3492 path_id: PathId,
3493 ecn: Option<EcnCodepoint>,
3494 packet: Option<u64>,
3495 spin: bool,
3496 is_1rtt: bool,
3497 ) {
3498 self.total_authed_packets += 1;
3499 self.reset_keep_alive(path_id, now);
3500 self.reset_idle_timeout(now, space_id, path_id);
3501 self.permit_idle_reset = true;
3502 self.receiving_ecn |= ecn.is_some();
3503 if let Some(x) = ecn {
3504 let space = &mut self.spaces[space_id];
3505 space.for_path(path_id).ecn_counters += x;
3506
3507 if x.is_ce() {
3508 space
3509 .for_path(path_id)
3510 .pending_acks
3511 .set_immediate_ack_required();
3512 }
3513 }
3514
3515 let Some(packet) = packet else {
3516 return;
3517 };
3518 match &self.side {
3519 ConnectionSide::Client { .. } => {
3520 if space_id == SpaceKind::Handshake
3524 && let Some(hs) = self.state.as_handshake_mut()
3525 {
3526 hs.allow_server_migration = false;
3527 }
3528 }
3529 ConnectionSide::Server { .. } => {
3530 if self.crypto_state.has_keys(EncryptionLevel::Initial)
3531 && space_id == SpaceKind::Handshake
3532 {
3533 self.discard_space(now, SpaceKind::Initial);
3535 }
3536 if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) && is_1rtt {
3537 self.set_key_discard_timer(now, space_id)
3539 }
3540 }
3541 }
3542 let space = self.spaces[space_id].for_path(path_id);
3543 space.pending_acks.insert_one(packet, now);
3544 if packet >= space.rx_packet.unwrap_or_default() {
3545 space.rx_packet = Some(packet);
3546 self.spin = self.side.is_client() ^ spin;
3548 }
3549 }
3550
3551 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceKind, path_id: PathId) {
3556 if let Some(timeout) = self.idle_timeout {
3558 if self.state.is_closed() {
3559 self.timers
3560 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3561 } else {
3562 let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space));
3563 self.timers.set(
3564 Timer::Conn(ConnTimer::Idle),
3565 now + dt,
3566 self.qlog.with_time(now),
3567 );
3568 }
3569 }
3570
3571 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3573 if self.state.is_closed() {
3574 self.timers.stop(
3575 Timer::PerPath(path_id, PathTimer::PathIdle),
3576 self.qlog.with_time(now),
3577 );
3578 } else {
3579 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3580 self.timers.set(
3581 Timer::PerPath(path_id, PathTimer::PathIdle),
3582 now + dt,
3583 self.qlog.with_time(now),
3584 );
3585 }
3586 }
3587 }
3588
3589 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3591 if !self.state.is_established() {
3592 return;
3593 }
3594
3595 if let Some(interval) = self.config.keep_alive_interval {
3596 self.timers.set(
3597 Timer::Conn(ConnTimer::KeepAlive),
3598 now + interval,
3599 self.qlog.with_time(now),
3600 );
3601 }
3602
3603 if let Some(interval) = self.path_data(path_id).keep_alive {
3604 self.timers.set(
3605 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3606 now + interval,
3607 self.qlog.with_time(now),
3608 );
3609 }
3610 }
3611
3612 fn reset_cid_retirement(&mut self, now: Instant) {
3614 if let Some((_path, t)) = self.next_cid_retirement() {
3615 self.timers.set(
3616 Timer::Conn(ConnTimer::PushNewCid),
3617 t,
3618 self.qlog.with_time(now),
3619 );
3620 }
3621 }
3622
3623 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3625 self.local_cid_state
3626 .iter()
3627 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3628 .min_by_key(|(_path_id, timeout)| *timeout)
3629 }
3630
3631 pub(crate) fn handle_first_packet(
3636 &mut self,
3637 now: Instant,
3638 network_path: FourTuple,
3639 ecn: Option<EcnCodepoint>,
3640 packet_number: u64,
3641 packet: InitialPacket,
3642 remaining: Option<BytesMut>,
3643 ) -> Result<(), ConnectionError> {
3644 let span = trace_span!("first recv");
3645 let _guard = span.enter();
3646 debug_assert!(self.side.is_server());
3647 let len = packet.header_data.len() + packet.payload.len();
3648 let path_id = PathId::ZERO;
3649 self.path_data_mut(path_id).total_recvd = len as u64;
3650
3651 if let Some(hs) = self.state.as_handshake_mut() {
3652 hs.expected_token = packet.header.token.clone();
3653 } else {
3654 unreachable!("first packet must be delivered in Handshake state");
3655 }
3656
3657 self.on_packet_authenticated(
3659 now,
3660 SpaceKind::Initial,
3661 path_id,
3662 ecn,
3663 Some(packet_number),
3664 false,
3665 false,
3666 );
3667
3668 let packet: Packet = packet.into();
3669
3670 let mut qlog = QlogRecvPacket::new(len);
3671 qlog.header(&packet.header, Some(packet_number), path_id);
3672
3673 self.process_decrypted_packet(
3674 now,
3675 network_path,
3676 path_id,
3677 Some(packet_number),
3678 packet,
3679 &mut qlog,
3680 )?;
3681 self.qlog.emit_packet_received(qlog, now);
3682 if let Some(data) = remaining {
3683 self.handle_coalesced(now, network_path, path_id, ecn, data);
3684 }
3685
3686 self.qlog.emit_recovery_metrics(
3687 path_id,
3688 &mut self.paths.get_mut(&path_id).unwrap().data,
3689 now,
3690 );
3691
3692 Ok(())
3693 }
3694
3695 fn init_0rtt(&mut self, now: Instant) {
3696 let Some((header, packet)) = self.crypto_state.session.early_crypto() else {
3697 return;
3698 };
3699 if self.side.is_client() {
3700 match self.crypto_state.session.transport_parameters() {
3701 Ok(params) => {
3702 let params = params
3703 .expect("crypto layer didn't supply transport parameters with ticket");
3704 let params = TransportParameters {
3706 initial_src_cid: None,
3707 original_dst_cid: None,
3708 preferred_address: None,
3709 retry_src_cid: None,
3710 stateless_reset_token: None,
3711 min_ack_delay: None,
3712 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3713 max_ack_delay: TransportParameters::default().max_ack_delay,
3714 initial_max_path_id: None,
3715 ..params
3716 };
3717 self.set_peer_params(params);
3718 self.qlog.emit_peer_transport_params_restored(self, now);
3719 }
3720 Err(e) => {
3721 error!("session ticket has malformed transport parameters: {}", e);
3722 return;
3723 }
3724 }
3725 }
3726 trace!("0-RTT enabled");
3727 self.crypto_state.enable_zero_rtt(header, packet);
3728 }
3729
3730 fn read_crypto(
3731 &mut self,
3732 space: SpaceId,
3733 crypto: &frame::Crypto,
3734 payload_len: usize,
3735 ) -> Result<(), TransportError> {
3736 let expected = if !self.state.is_handshake() {
3737 SpaceId::Data
3738 } else if self.highest_space == SpaceKind::Initial {
3739 SpaceId::Initial
3740 } else {
3741 SpaceId::Handshake
3744 };
3745 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3749
3750 let end = crypto.offset + crypto.data.len() as u64;
3751 if space < expected
3752 && end
3753 > self.crypto_state.spaces[space.kind()]
3754 .crypto_stream
3755 .bytes_read()
3756 {
3757 warn!(
3758 "received new {:?} CRYPTO data when expecting {:?}",
3759 space, expected
3760 );
3761 return Err(TransportError::PROTOCOL_VIOLATION(
3762 "new data at unexpected encryption level",
3763 ));
3764 }
3765
3766 let crypto_space = &mut self.crypto_state.spaces[space.kind()];
3767 let max = end.saturating_sub(crypto_space.crypto_stream.bytes_read());
3768 if max > self.config.crypto_buffer_size as u64 {
3769 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3770 }
3771
3772 crypto_space
3773 .crypto_stream
3774 .insert(crypto.offset, crypto.data.clone(), payload_len);
3775 while let Some(chunk) = crypto_space.crypto_stream.read(usize::MAX, true) {
3776 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3777 if self.crypto_state.session.read_handshake(&chunk.bytes)? {
3778 self.events.push_back(Event::HandshakeDataReady);
3779 }
3780 }
3781
3782 Ok(())
3783 }
3784
3785 fn write_crypto(&mut self) {
3786 loop {
3787 let space = self.highest_space;
3788 let mut outgoing = Vec::new();
3789 if let Some(crypto) = self.crypto_state.session.write_handshake(&mut outgoing) {
3790 match space {
3791 SpaceKind::Initial => {
3792 self.upgrade_crypto(SpaceKind::Handshake, crypto);
3793 }
3794 SpaceKind::Handshake => {
3795 self.upgrade_crypto(SpaceKind::Data, crypto);
3796 }
3797 SpaceKind::Data => unreachable!("got updated secrets during 1-RTT"),
3798 }
3799 }
3800 if outgoing.is_empty() {
3801 if space == self.highest_space {
3802 break;
3803 } else {
3804 continue;
3806 }
3807 }
3808 let offset = self.crypto_state.spaces[space].crypto_offset;
3809 let outgoing = Bytes::from(outgoing);
3810 if let Some(hs) = self.state.as_handshake_mut()
3811 && space == SpaceKind::Initial
3812 && offset == 0
3813 && self.side.is_client()
3814 {
3815 hs.client_hello = Some(outgoing.clone());
3816 }
3817 self.crypto_state.spaces[space].crypto_offset += outgoing.len() as u64;
3818 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3819 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3820 offset,
3821 data: outgoing,
3822 });
3823 }
3824 }
3825
3826 fn upgrade_crypto(&mut self, space: SpaceKind, crypto: Keys) {
3828 debug_assert!(
3829 !self.crypto_state.has_keys(space.encryption_level()),
3830 "already reached packet space {space:?}"
3831 );
3832 trace!("{:?} keys ready", space);
3833 if space == SpaceKind::Data {
3834 self.crypto_state.next_crypto = Some(
3836 self.crypto_state
3837 .session
3838 .next_1rtt_keys()
3839 .expect("handshake should be complete"),
3840 );
3841 }
3842
3843 self.crypto_state.spaces[space].keys = Some(crypto);
3844 debug_assert!(space > self.highest_space);
3845 self.highest_space = space;
3846 if space == SpaceKind::Data && self.side.is_client() {
3847 self.crypto_state.discard_zero_rtt();
3849 }
3850 }
3851
3852 fn discard_space(&mut self, now: Instant, space: SpaceKind) {
3853 debug_assert!(space != SpaceKind::Data);
3854 trace!("discarding {:?} keys", space);
3855 if space == SpaceKind::Initial {
3856 if let ConnectionSide::Client { token, .. } = &mut self.side {
3858 *token = Bytes::new();
3859 }
3860 }
3861 self.crypto_state.spaces[space].keys = None;
3862 let space = &mut self.spaces[space];
3863 let pns = space.for_path(PathId::ZERO);
3864 pns.time_of_last_ack_eliciting_packet = None;
3865 pns.loss_time = None;
3866 pns.loss_probes = 0;
3867 let sent_packets = mem::take(&mut pns.sent_packets);
3868 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3869 for (_, packet) in sent_packets.into_iter() {
3870 path.data.remove_in_flight(&packet);
3871 }
3872
3873 self.set_loss_detection_timer(now, PathId::ZERO)
3874 }
3875
3876 fn handle_coalesced(
3877 &mut self,
3878 now: Instant,
3879 network_path: FourTuple,
3880 path_id: PathId,
3881 ecn: Option<EcnCodepoint>,
3882 data: BytesMut,
3883 ) {
3884 self.path_data_mut(path_id)
3885 .inc_total_recvd(data.len() as u64);
3886 let mut remaining = Some(data);
3887 let cid_len = self
3888 .local_cid_state
3889 .values()
3890 .map(|cid_state| cid_state.cid_len())
3891 .next()
3892 .expect("one cid_state must exist");
3893 while let Some(data) = remaining {
3894 match PartialDecode::new(
3895 data,
3896 &FixedLengthConnectionIdParser::new(cid_len),
3897 &[self.version],
3898 self.endpoint_config.grease_quic_bit,
3899 ) {
3900 Ok((partial_decode, rest)) => {
3901 remaining = rest;
3902 self.handle_decode(now, network_path, path_id, ecn, partial_decode);
3903 }
3904 Err(e) => {
3905 trace!("malformed header: {}", e);
3906 return;
3907 }
3908 }
3909 }
3910 }
3911
3912 fn handle_decode(
3913 &mut self,
3914 now: Instant,
3915 network_path: FourTuple,
3916 path_id: PathId,
3917 ecn: Option<EcnCodepoint>,
3918 partial_decode: PartialDecode,
3919 ) {
3920 let qlog = QlogRecvPacket::new(partial_decode.len());
3921 if let Some(decoded) = self
3922 .crypto_state
3923 .unprotect_header(partial_decode, self.peer_params.stateless_reset_token)
3924 {
3925 self.handle_packet(
3926 now,
3927 network_path,
3928 path_id,
3929 ecn,
3930 decoded.packet,
3931 decoded.stateless_reset,
3932 qlog,
3933 );
3934 }
3935 }
3936
3937 fn handle_packet(
3938 &mut self,
3939 now: Instant,
3940 network_path: FourTuple,
3941 path_id: PathId,
3942 ecn: Option<EcnCodepoint>,
3943 packet: Option<Packet>,
3944 stateless_reset: bool,
3945 mut qlog: QlogRecvPacket,
3946 ) {
3947 self.stats.udp_rx.ios += 1;
3948 self.path_stats.entry(path_id).or_default().udp_rx.ios += 1;
3949
3950 if let Some(ref packet) = packet {
3951 trace!(
3952 "got {:?} packet ({} bytes) from {} using id {}",
3953 packet.header.space(),
3954 packet.payload.len() + packet.header_data.len(),
3955 network_path,
3956 packet.header.dst_cid(),
3957 );
3958 }
3959
3960 if self.is_handshaking() {
3961 if path_id != PathId::ZERO {
3962 debug!(%network_path, %path_id, "discarding multipath packet during handshake");
3963 return;
3964 }
3965 if network_path != self.path_data_mut(path_id).network_path {
3966 if let Some(hs) = self.state.as_handshake() {
3967 if hs.allow_server_migration {
3968 trace!(%network_path, prev = %self.path_data(path_id).network_path, "server migrated to new remote");
3969 self.path_data_mut(path_id).network_path = network_path;
3970 self.qlog.emit_tuple_assigned(path_id, network_path, now);
3971 } else {
3972 debug!("discarding packet with unexpected remote during handshake");
3973 return;
3974 }
3975 } else {
3976 debug!("discarding packet with unexpected remote during handshake");
3977 return;
3978 }
3979 }
3980 }
3981
3982 let was_closed = self.state.is_closed();
3983 let was_drained = self.state.is_drained();
3984
3985 let decrypted = match packet {
3986 None => Err(None),
3987 Some(mut packet) => self
3988 .decrypt_packet(now, path_id, &mut packet)
3989 .map(move |number| (packet, number)),
3990 };
3991 let result = match decrypted {
3992 _ if stateless_reset => {
3993 debug!("got stateless reset");
3994 Err(ConnectionError::Reset)
3995 }
3996 Err(Some(e)) => {
3997 warn!("illegal packet: {}", e);
3998 Err(e.into())
3999 }
4000 Err(None) => {
4001 debug!("failed to authenticate packet");
4002 self.authentication_failures += 1;
4003 let integrity_limit = self
4004 .crypto_state
4005 .integrity_limit(self.highest_space)
4006 .unwrap();
4007 if self.authentication_failures > integrity_limit {
4008 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
4009 } else {
4010 return;
4011 }
4012 }
4013 Ok((packet, number)) => {
4014 qlog.header(&packet.header, number, path_id);
4015 let span = match number {
4016 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
4017 None => trace_span!("recv", space = ?packet.header.space()),
4018 };
4019 let _guard = span.enter();
4020
4021 let dedup = self.spaces[packet.header.space()]
4022 .path_space_mut(path_id)
4023 .map(|pns| &mut pns.dedup);
4024 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
4025 debug!("discarding possible duplicate packet");
4026 self.qlog.emit_packet_received(qlog, now);
4027 return;
4028 } else if self.state.is_handshake() && packet.header.is_short() {
4029 trace!("dropping short packet during handshake");
4031 self.qlog.emit_packet_received(qlog, now);
4032 return;
4033 } else {
4034 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header
4035 && let Some(hs) = self.state.as_handshake()
4036 && self.side.is_server()
4037 && token != &hs.expected_token
4038 {
4039 warn!("discarding Initial with invalid retry token");
4043 self.qlog.emit_packet_received(qlog, now);
4044 return;
4045 }
4046
4047 if !self.state.is_closed() {
4048 let spin = match packet.header {
4049 Header::Short { spin, .. } => spin,
4050 _ => false,
4051 };
4052
4053 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
4054 self.ensure_path(path_id, network_path, now, number);
4056 }
4057 if self.paths.contains_key(&path_id) {
4058 self.on_packet_authenticated(
4059 now,
4060 packet.header.space(),
4061 path_id,
4062 ecn,
4063 number,
4064 spin,
4065 packet.header.is_1rtt(),
4066 );
4067 }
4068 }
4069
4070 let res = self.process_decrypted_packet(
4071 now,
4072 network_path,
4073 path_id,
4074 number,
4075 packet,
4076 &mut qlog,
4077 );
4078
4079 self.qlog.emit_packet_received(qlog, now);
4080 res
4081 }
4082 }
4083 };
4084
4085 if let Err(conn_err) = result {
4087 match conn_err {
4088 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
4089 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
4090 ConnectionError::Reset
4091 | ConnectionError::TransportError(TransportError {
4092 code: TransportErrorCode::AEAD_LIMIT_REACHED,
4093 ..
4094 }) => {
4095 self.state.move_to_drained(Some(conn_err));
4096 }
4097 ConnectionError::TimedOut => {
4098 unreachable!("timeouts aren't generated by packet processing");
4099 }
4100 ConnectionError::TransportError(err) => {
4101 debug!("closing connection due to transport error: {}", err);
4102 self.state.move_to_closed(err);
4103 }
4104 ConnectionError::VersionMismatch => {
4105 self.state.move_to_draining(Some(conn_err));
4106 }
4107 ConnectionError::LocallyClosed => {
4108 unreachable!("LocallyClosed isn't generated by packet processing");
4109 }
4110 ConnectionError::CidsExhausted => {
4111 unreachable!("CidsExhausted isn't generated by packet processing");
4112 }
4113 };
4114 }
4115
4116 if !was_closed && self.state.is_closed() {
4117 self.close_common();
4118 if !self.state.is_drained() {
4119 self.set_close_timer(now);
4120 }
4121 }
4122 if !was_drained && self.state.is_drained() {
4123 self.endpoint_events.push_back(EndpointEventInner::Drained);
4124 self.timers
4127 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
4128 }
4129
4130 if matches!(self.state.as_type(), StateType::Closed) {
4137 if self
4155 .paths
4156 .get(&path_id)
4157 .map(|p| p.data.validated && p.data.network_path == network_path)
4158 .unwrap_or(false)
4159 {
4160 self.connection_close_pending = true;
4161 }
4162 }
4163 }
4164
4165 fn process_decrypted_packet(
4166 &mut self,
4167 now: Instant,
4168 network_path: FourTuple,
4169 path_id: PathId,
4170 number: Option<u64>,
4171 packet: Packet,
4172 qlog: &mut QlogRecvPacket,
4173 ) -> Result<(), ConnectionError> {
4174 if !self.paths.contains_key(&path_id) {
4175 trace!(%path_id, ?number, "discarding packet for unknown path");
4179 return Ok(());
4180 }
4181 let state = match self.state.as_type() {
4182 StateType::Established => {
4183 match packet.header.space() {
4184 SpaceKind::Data => self.process_payload(
4185 now,
4186 network_path,
4187 path_id,
4188 number.unwrap(),
4189 packet,
4190 qlog,
4191 )?,
4192 _ if packet.header.has_frames() => {
4193 self.process_early_payload(now, path_id, packet, qlog)?
4194 }
4195 _ => {
4196 trace!("discarding unexpected pre-handshake packet");
4197 }
4198 }
4199 return Ok(());
4200 }
4201 StateType::Closed => {
4202 for result in frame::Iter::new(packet.payload.freeze())? {
4203 let frame = match result {
4204 Ok(frame) => frame,
4205 Err(err) => {
4206 debug!("frame decoding error: {err:?}");
4207 continue;
4208 }
4209 };
4210 qlog.frame(&frame);
4211
4212 if let Frame::Padding = frame {
4213 continue;
4214 };
4215
4216 self.stats.frame_rx.record(frame.ty());
4217
4218 if let Frame::Close(_error) = frame {
4219 self.state.move_to_draining(None);
4220 break;
4221 }
4222 }
4223 return Ok(());
4224 }
4225 StateType::Draining | StateType::Drained => return Ok(()),
4226 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
4227 };
4228
4229 match packet.header {
4230 Header::Retry {
4231 src_cid: remote_cid,
4232 ..
4233 } => {
4234 debug_assert_eq!(path_id, PathId::ZERO);
4235 if self.side.is_server() {
4236 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
4237 }
4238
4239 let is_valid_retry = self
4240 .remote_cids
4241 .get(&path_id)
4242 .map(|cids| cids.active())
4243 .map(|orig_dst_cid| {
4244 self.crypto_state.session.is_valid_retry(
4245 orig_dst_cid,
4246 &packet.header_data,
4247 &packet.payload,
4248 )
4249 })
4250 .unwrap_or_default();
4251 if self.total_authed_packets > 1
4252 || packet.payload.len() <= 16 || !is_valid_retry
4254 {
4255 trace!("discarding invalid Retry");
4256 return Ok(());
4264 }
4265
4266 trace!("retrying with CID {}", remote_cid);
4267 let client_hello = state.client_hello.take().unwrap();
4268 self.retry_src_cid = Some(remote_cid);
4269 self.remote_cids
4270 .get_mut(&path_id)
4271 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
4272 .update_initial_cid(remote_cid);
4273 self.remote_handshake_cid = remote_cid;
4274
4275 let space = &mut self.spaces[SpaceId::Initial];
4276 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
4277 self.on_packet_acked(now, PathId::ZERO, info);
4278 };
4279
4280 self.discard_space(now, SpaceKind::Initial); let crypto_space = &mut self.crypto_state.spaces[SpaceKind::Initial];
4283 crypto_space.keys = Some(
4284 self.crypto_state
4285 .session
4286 .initial_keys(remote_cid, self.side.side()),
4287 );
4288 crypto_space.crypto_offset = client_hello.len() as u64;
4289
4290 let next_pn = self.spaces[SpaceId::Initial]
4291 .for_path(path_id)
4292 .next_packet_number;
4293 self.spaces[SpaceId::Initial] = {
4294 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
4295 space.for_path(path_id).next_packet_number = next_pn;
4296 space.pending.crypto.push_back(frame::Crypto {
4297 offset: 0,
4298 data: client_hello,
4299 });
4300 space
4301 };
4302
4303 let zero_rtt = mem::take(
4305 &mut self.spaces[SpaceId::Data]
4306 .for_path(PathId::ZERO)
4307 .sent_packets,
4308 );
4309 for (_, info) in zero_rtt.into_iter() {
4310 self.paths
4311 .get_mut(&PathId::ZERO)
4312 .unwrap()
4313 .remove_in_flight(&info);
4314 self.spaces[SpaceId::Data].pending |= info.retransmits;
4315 }
4316 self.streams.retransmit_all_for_0rtt();
4317
4318 let token_len = packet.payload.len() - 16;
4319 let ConnectionSide::Client { ref mut token, .. } = self.side else {
4320 unreachable!("we already short-circuited if we're server");
4321 };
4322 *token = packet.payload.freeze().split_to(token_len);
4323
4324 self.state = State::handshake(state::Handshake {
4325 expected_token: Bytes::new(),
4326 remote_cid_set: false,
4327 client_hello: None,
4328 allow_server_migration: true,
4329 });
4330 Ok(())
4331 }
4332 Header::Long {
4333 ty: LongType::Handshake,
4334 src_cid: remote_cid,
4335 dst_cid: local_cid,
4336 ..
4337 } => {
4338 debug_assert_eq!(path_id, PathId::ZERO);
4339 if remote_cid != self.remote_handshake_cid {
4340 debug!(
4341 "discarding packet with mismatched remote CID: {} != {}",
4342 self.remote_handshake_cid, remote_cid
4343 );
4344 return Ok(());
4345 }
4346 self.on_path_validated(path_id);
4347
4348 self.process_early_payload(now, path_id, packet, qlog)?;
4349 if self.state.is_closed() {
4350 return Ok(());
4351 }
4352
4353 if self.crypto_state.session.is_handshaking() {
4354 trace!("handshake ongoing");
4355 return Ok(());
4356 }
4357
4358 if self.side.is_client() {
4359 let params = self
4361 .crypto_state
4362 .session
4363 .transport_parameters()?
4364 .ok_or_else(|| {
4365 TransportError::new(
4366 TransportErrorCode::crypto(0x6d),
4367 "transport parameters missing".to_owned(),
4368 )
4369 })?;
4370
4371 if self.has_0rtt() {
4372 if !self.crypto_state.session.early_data_accepted().unwrap() {
4373 debug_assert!(self.side.is_client());
4374 debug!("0-RTT rejected");
4375 self.crypto_state.accepted_0rtt = false;
4376 self.streams.zero_rtt_rejected();
4377
4378 self.spaces[SpaceId::Data].pending = Retransmits::default();
4380
4381 let sent_packets = mem::take(
4383 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4384 );
4385 for (_, packet) in sent_packets.into_iter() {
4386 self.paths
4387 .get_mut(&path_id)
4388 .unwrap()
4389 .remove_in_flight(&packet);
4390 }
4391 } else {
4392 self.crypto_state.accepted_0rtt = true;
4393 params.validate_resumption_from(&self.peer_params)?;
4394 }
4395 }
4396 if let Some(token) = params.stateless_reset_token {
4397 let remote = self.path_data(path_id).network_path.remote;
4398 debug_assert!(!self.state.is_drained()); self.endpoint_events
4400 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4401 }
4402 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4403 self.issue_first_cids(now);
4404 } else {
4405 self.spaces[SpaceId::Data].pending.handshake_done = true;
4407 self.discard_space(now, SpaceKind::Handshake);
4408 self.events.push_back(Event::HandshakeConfirmed);
4409 trace!("handshake confirmed");
4410 }
4411
4412 self.events.push_back(Event::Connected);
4413 self.state.move_to_established();
4414 trace!("established");
4415
4416 self.issue_first_path_cids(now);
4419 Ok(())
4420 }
4421 Header::Initial(InitialHeader {
4422 src_cid: remote_cid,
4423 dst_cid: local_cid,
4424 ..
4425 }) => {
4426 debug_assert_eq!(path_id, PathId::ZERO);
4427 if !state.remote_cid_set {
4428 trace!("switching remote CID to {}", remote_cid);
4429 let mut state = state.clone();
4430 self.remote_cids
4431 .get_mut(&path_id)
4432 .expect("PathId::ZERO not yet abandoned")
4433 .update_initial_cid(remote_cid);
4434 self.remote_handshake_cid = remote_cid;
4435 self.original_remote_cid = remote_cid;
4436 state.remote_cid_set = true;
4437 self.state.move_to_handshake(state);
4438 } else if remote_cid != self.remote_handshake_cid {
4439 debug!(
4440 "discarding packet with mismatched remote CID: {} != {}",
4441 self.remote_handshake_cid, remote_cid
4442 );
4443 return Ok(());
4444 }
4445
4446 let starting_space = self.highest_space;
4447 self.process_early_payload(now, path_id, packet, qlog)?;
4448
4449 if self.side.is_server()
4450 && starting_space == SpaceKind::Initial
4451 && self.highest_space != SpaceKind::Initial
4452 {
4453 let params = self
4454 .crypto_state
4455 .session
4456 .transport_parameters()?
4457 .ok_or_else(|| {
4458 TransportError::new(
4459 TransportErrorCode::crypto(0x6d),
4460 "transport parameters missing".to_owned(),
4461 )
4462 })?;
4463 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4464 self.issue_first_cids(now);
4465 self.init_0rtt(now);
4466 }
4467 Ok(())
4468 }
4469 Header::Long {
4470 ty: LongType::ZeroRtt,
4471 ..
4472 } => {
4473 self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4474 Ok(())
4475 }
4476 Header::VersionNegotiate { .. } => {
4477 if self.total_authed_packets > 1 {
4478 return Ok(());
4479 }
4480 let supported = packet
4481 .payload
4482 .chunks(4)
4483 .any(|x| match <[u8; 4]>::try_from(x) {
4484 Ok(version) => self.version == u32::from_be_bytes(version),
4485 Err(_) => false,
4486 });
4487 if supported {
4488 return Ok(());
4489 }
4490 debug!("remote doesn't support our version");
4491 Err(ConnectionError::VersionMismatch)
4492 }
4493 Header::Short { .. } => unreachable!(
4494 "short packets received during handshake are discarded in handle_packet"
4495 ),
4496 }
4497 }
4498
4499 fn process_early_payload(
4501 &mut self,
4502 now: Instant,
4503 path_id: PathId,
4504 packet: Packet,
4505 #[allow(unused)] qlog: &mut QlogRecvPacket,
4506 ) -> Result<(), TransportError> {
4507 debug_assert_ne!(packet.header.space(), SpaceKind::Data);
4508 debug_assert_eq!(path_id, PathId::ZERO);
4509 let payload_len = packet.payload.len();
4510 let mut ack_eliciting = false;
4511 for result in frame::Iter::new(packet.payload.freeze())? {
4512 let frame = result?;
4513 qlog.frame(&frame);
4514 let span = match frame {
4515 Frame::Padding => continue,
4516 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4517 };
4518
4519 self.stats.frame_rx.record(frame.ty());
4520
4521 let _guard = span.as_ref().map(|x| x.enter());
4522 ack_eliciting |= frame.is_ack_eliciting();
4523
4524 if frame.is_1rtt() && packet.header.space() != SpaceKind::Data {
4526 return Err(TransportError::PROTOCOL_VIOLATION(
4527 "illegal frame type in handshake",
4528 ));
4529 }
4530
4531 match frame {
4532 Frame::Padding | Frame::Ping => {}
4533 Frame::Crypto(frame) => {
4534 self.read_crypto(packet.header.space().into(), &frame, payload_len)?;
4535 }
4536 Frame::Ack(ack) => {
4537 self.on_ack_received(now, packet.header.space().into(), ack)?;
4538 }
4539 Frame::PathAck(ack) => {
4540 span.as_ref()
4541 .map(|span| span.record("path", tracing::field::display(&ack.path_id)));
4542 self.on_path_ack_received(now, packet.header.space().into(), ack)?;
4543 }
4544 Frame::Close(reason) => {
4545 self.state.move_to_draining(Some(reason.into()));
4546 return Ok(());
4547 }
4548 _ => {
4549 let mut err =
4550 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4551 err.frame = frame::MaybeFrame::Known(frame.ty());
4552 return Err(err);
4553 }
4554 }
4555 }
4556
4557 if ack_eliciting {
4558 self.spaces[packet.header.space()]
4560 .for_path(path_id)
4561 .pending_acks
4562 .set_immediate_ack_required();
4563 }
4564
4565 self.write_crypto();
4566 Ok(())
4567 }
4568
4569 fn process_payload(
4571 &mut self,
4572 now: Instant,
4573 network_path: FourTuple,
4574 path_id: PathId,
4575 number: u64,
4576 packet: Packet,
4577 #[allow(unused)] qlog: &mut QlogRecvPacket,
4578 ) -> Result<(), TransportError> {
4579 let is_multipath_negotiated = self.is_multipath_negotiated();
4580 let payload = packet.payload.freeze();
4581 let mut is_probing_packet = true;
4582 let mut close = None;
4583 let payload_len = payload.len();
4584 let mut ack_eliciting = false;
4585 let mut migration_observed_addr = None;
4588 for result in frame::Iter::new(payload)? {
4589 let frame = result?;
4590 qlog.frame(&frame);
4591 let span = match frame {
4592 Frame::Padding => continue,
4593 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4594 };
4595
4596 self.stats.frame_rx.record(frame.ty());
4597 match &frame {
4600 Frame::Crypto(f) => {
4601 trace!(offset = f.offset, len = f.data.len(), "got frame CRYPTO");
4602 }
4603 Frame::Stream(f) => {
4604 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got frame STREAM");
4605 }
4606 Frame::Datagram(f) => {
4607 trace!(len = f.data.len(), "got frame DATAGRAM");
4608 }
4609 f => {
4610 trace!("got frame {f}");
4611 }
4612 }
4613
4614 let _guard = span.enter();
4615 if packet.header.is_0rtt() {
4616 match frame {
4617 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4618 return Err(TransportError::PROTOCOL_VIOLATION(
4619 "illegal frame type in 0-RTT",
4620 ));
4621 }
4622 _ => {
4623 if frame.is_1rtt() {
4624 return Err(TransportError::PROTOCOL_VIOLATION(
4625 "illegal frame type in 0-RTT",
4626 ));
4627 }
4628 }
4629 }
4630 }
4631 ack_eliciting |= frame.is_ack_eliciting();
4632
4633 match frame {
4635 Frame::Padding
4636 | Frame::PathChallenge(_)
4637 | Frame::PathResponse(_)
4638 | Frame::NewConnectionId(_)
4639 | Frame::ObservedAddr(_) => {}
4640 _ => {
4641 is_probing_packet = false;
4642 }
4643 }
4644
4645 match frame {
4646 Frame::Crypto(frame) => {
4647 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4648 }
4649 Frame::Stream(frame) => {
4650 if self.streams.received(frame, payload_len)?.should_transmit() {
4651 self.spaces[SpaceId::Data].pending.max_data = true;
4652 }
4653 }
4654 Frame::Ack(ack) => {
4655 self.on_ack_received(now, SpaceId::Data, ack)?;
4656 }
4657 Frame::PathAck(ack) => {
4658 span.record("path", tracing::field::display(&ack.path_id));
4659 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4660 }
4661 Frame::Padding | Frame::Ping => {}
4662 Frame::Close(reason) => {
4663 close = Some(reason);
4664 }
4665 Frame::PathChallenge(challenge) => {
4666 let path = &mut self
4667 .path_mut(path_id)
4668 .expect("payload is processed only after the path becomes known");
4669 path.path_responses.push(number, challenge.0, network_path);
4670 if network_path == path.network_path {
4673 match self.peer_supports_ack_frequency() {
4683 true => self.immediate_ack(path_id),
4684 false => {
4685 self.ping_path(path_id).ok();
4686 }
4687 }
4688 }
4689 }
4690 Frame::PathResponse(response) => {
4691 let path = self
4692 .paths
4693 .get_mut(&path_id)
4694 .expect("payload is processed only after the path becomes known");
4695
4696 use PathTimer::*;
4697 use paths::OnPathResponseReceived::*;
4698 match path
4699 .data
4700 .on_path_response_received(now, response.0, network_path)
4701 {
4702 OnPath { was_open } => {
4703 let qlog = self.qlog.with_time(now);
4704
4705 self.timers
4706 .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4707 self.timers
4708 .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4709
4710 let next_challenge = path
4711 .data
4712 .earliest_on_path_expiring_challenge()
4713 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4714 self.timers.set_or_stop(
4715 Timer::PerPath(path_id, PathChallengeLost),
4716 next_challenge,
4717 qlog,
4718 );
4719
4720 if !was_open {
4721 if is_multipath_negotiated {
4722 self.events
4723 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4724 }
4725 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4726 {
4727 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4728 id: path_id,
4729 addr: observed.socket_addr(),
4730 }));
4731 }
4732 }
4733 if let Some((_, ref mut prev)) = path.prev {
4734 prev.reset_on_path_challenges();
4735 }
4736 }
4737 OffPath => {
4738 debug!(%response, "Valid response to off-path PATH_CHALLENGE");
4739 }
4740 Ignored {
4741 sent_on,
4742 current_path,
4743 } => {
4744 debug!(%sent_on, %current_path, %response, "ignoring valid PATH_RESPONSE")
4745 }
4746 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4747 }
4748 }
4749 Frame::MaxData(frame::MaxData(bytes)) => {
4750 self.streams.received_max_data(bytes);
4751 }
4752 Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
4753 self.streams.received_max_stream_data(id, offset)?;
4754 }
4755 Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
4756 self.streams.received_max_streams(dir, count)?;
4757 }
4758 Frame::ResetStream(frame) => {
4759 if self.streams.received_reset(frame)?.should_transmit() {
4760 self.spaces[SpaceId::Data].pending.max_data = true;
4761 }
4762 }
4763 Frame::DataBlocked(DataBlocked(offset)) => {
4764 debug!(offset, "peer claims to be blocked at connection level");
4765 }
4766 Frame::StreamDataBlocked(StreamDataBlocked { id, offset }) => {
4767 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4768 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4769 return Err(TransportError::STREAM_STATE_ERROR(
4770 "STREAM_DATA_BLOCKED on send-only stream",
4771 ));
4772 }
4773 debug!(
4774 stream = %id,
4775 offset, "peer claims to be blocked at stream level"
4776 );
4777 }
4778 Frame::StreamsBlocked(StreamsBlocked { dir, limit }) => {
4779 if limit > MAX_STREAM_COUNT {
4780 return Err(TransportError::FRAME_ENCODING_ERROR(
4781 "unrepresentable stream limit",
4782 ));
4783 }
4784 debug!(
4785 "peer claims to be blocked opening more than {} {} streams",
4786 limit, dir
4787 );
4788 }
4789 Frame::StopSending(frame::StopSending { id, error_code }) => {
4790 if id.initiator() != self.side.side() {
4791 if id.dir() == Dir::Uni {
4792 debug!("got STOP_SENDING on recv-only {}", id);
4793 return Err(TransportError::STREAM_STATE_ERROR(
4794 "STOP_SENDING on recv-only stream",
4795 ));
4796 }
4797 } else if self.streams.is_local_unopened(id) {
4798 return Err(TransportError::STREAM_STATE_ERROR(
4799 "STOP_SENDING on unopened stream",
4800 ));
4801 }
4802 self.streams.received_stop_sending(id, error_code);
4803 }
4804 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4805 if let Some(ref path_id) = path_id {
4806 span.record("path", tracing::field::display(&path_id));
4807 }
4808 let path_id = path_id.unwrap_or_default();
4809 match self.local_cid_state.get_mut(&path_id) {
4810 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4811 Some(cid_state) => {
4812 let allow_more_cids = cid_state
4813 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4814
4815 let has_path = !self.abandoned_paths.contains(&path_id);
4819 let allow_more_cids = allow_more_cids && has_path;
4820
4821 debug_assert!(!self.state.is_drained()); self.endpoint_events
4823 .push_back(EndpointEventInner::RetireConnectionId(
4824 now,
4825 path_id,
4826 sequence,
4827 allow_more_cids,
4828 ));
4829 }
4830 }
4831 }
4832 Frame::NewConnectionId(frame) => {
4833 let path_id = if let Some(path_id) = frame.path_id {
4834 if !self.is_multipath_negotiated() {
4835 return Err(TransportError::PROTOCOL_VIOLATION(
4836 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4837 ));
4838 }
4839 if path_id > self.local_max_path_id {
4840 return Err(TransportError::PROTOCOL_VIOLATION(
4841 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4842 ));
4843 }
4844 path_id
4845 } else {
4846 PathId::ZERO
4847 };
4848
4849 if let Some(ref path_id) = frame.path_id {
4850 span.record("path", tracing::field::display(&path_id));
4851 }
4852
4853 if self.abandoned_paths.contains(&path_id) {
4854 trace!("ignoring issued CID for abandoned path");
4855 continue;
4856 }
4857 let remote_cids = self
4858 .remote_cids
4859 .entry(path_id)
4860 .or_insert_with(|| CidQueue::new(frame.id));
4861 if remote_cids.active().is_empty() {
4862 return Err(TransportError::PROTOCOL_VIOLATION(
4863 "NEW_CONNECTION_ID when CIDs aren't in use",
4864 ));
4865 }
4866 if frame.retire_prior_to > frame.sequence {
4867 return Err(TransportError::PROTOCOL_VIOLATION(
4868 "NEW_CONNECTION_ID retiring unissued CIDs",
4869 ));
4870 }
4871
4872 use crate::cid_queue::InsertError;
4873 match remote_cids.insert(frame) {
4874 Ok(None) if self.path(path_id).is_none() => {
4875 self.continue_nat_traversal_round(now);
4878 }
4879 Ok(None) => {}
4880 Ok(Some((retired, reset_token))) => {
4881 let pending_retired =
4882 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4883 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4886 if (pending_retired.len() as u64)
4889 .saturating_add(retired.end.saturating_sub(retired.start))
4890 > MAX_PENDING_RETIRED_CIDS
4891 {
4892 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4893 "queued too many retired CIDs",
4894 ));
4895 }
4896 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4897 self.set_reset_token(path_id, network_path.remote, reset_token);
4898 }
4899 Err(InsertError::ExceedsLimit) => {
4900 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4901 }
4902 Err(InsertError::Retired) => {
4903 trace!("discarding already-retired");
4904 self.spaces[SpaceId::Data]
4908 .pending
4909 .retire_cids
4910 .push((path_id, frame.sequence));
4911 continue;
4912 }
4913 };
4914
4915 if self.side.is_server()
4916 && path_id == PathId::ZERO
4917 && self
4918 .remote_cids
4919 .get(&PathId::ZERO)
4920 .map(|cids| cids.active_seq() == 0)
4921 .unwrap_or_default()
4922 {
4923 self.update_remote_cid(PathId::ZERO);
4926 }
4927 }
4928 Frame::NewToken(NewToken { token }) => {
4929 let ConnectionSide::Client {
4930 token_store,
4931 server_name,
4932 ..
4933 } = &self.side
4934 else {
4935 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4936 };
4937 if token.is_empty() {
4938 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4939 }
4940 trace!("got new token");
4941 token_store.insert(server_name, token);
4942 }
4943 Frame::Datagram(datagram) => {
4944 if self
4945 .datagrams
4946 .received(datagram, &self.config.datagram_receive_buffer_size)?
4947 {
4948 self.events.push_back(Event::DatagramReceived);
4949 }
4950 }
4951 Frame::AckFrequency(ack_frequency) => {
4952 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4955 continue;
4958 }
4959
4960 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4962 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4963
4964 if let Some(timeout) = space
4967 .pending_acks
4968 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4969 {
4970 self.timers.set(
4971 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4972 timeout,
4973 self.qlog.with_time(now),
4974 );
4975 }
4976 }
4977 }
4978 Frame::ImmediateAck => {
4979 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4981 pns.pending_acks.set_immediate_ack_required();
4982 }
4983 }
4984 Frame::HandshakeDone => {
4985 if self.side.is_server() {
4986 return Err(TransportError::PROTOCOL_VIOLATION(
4987 "client sent HANDSHAKE_DONE",
4988 ));
4989 }
4990 if self.crypto_state.has_keys(EncryptionLevel::Handshake) {
4991 self.discard_space(now, SpaceKind::Handshake);
4992 }
4993 self.events.push_back(Event::HandshakeConfirmed);
4994 trace!("handshake confirmed");
4995 }
4996 Frame::ObservedAddr(observed) => {
4997 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4999 if !self
5000 .peer_params
5001 .address_discovery_role
5002 .should_report(&self.config.address_discovery_role)
5003 {
5004 return Err(TransportError::PROTOCOL_VIOLATION(
5005 "received OBSERVED_ADDRESS frame when not negotiated",
5006 ));
5007 }
5008 if packet.header.space() != SpaceKind::Data {
5010 return Err(TransportError::PROTOCOL_VIOLATION(
5011 "OBSERVED_ADDRESS frame outside data space",
5012 ));
5013 }
5014
5015 let path = self.path_data_mut(path_id);
5016 if network_path == path.network_path {
5017 if let Some(updated) = path.update_observed_addr_report(observed)
5018 && path.open_status == paths::OpenStatus::Informed
5019 {
5020 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5021 id: path_id,
5022 addr: updated,
5023 }));
5024 }
5026 } else {
5027 migration_observed_addr = Some(observed)
5029 }
5030 }
5031 Frame::PathAbandon(frame::PathAbandon {
5032 path_id,
5033 error_code,
5034 }) => {
5035 span.record("path", tracing::field::display(&path_id));
5036 match self.close_path_inner(
5037 now,
5038 path_id,
5039 PathAbandonReason::RemoteAbandoned {
5040 error_code: error_code.into(),
5041 },
5042 ) {
5043 Ok(()) => {
5044 trace!("peer abandoned path");
5045 }
5046 Err(ClosePathError::LastOpenPath) => {
5047 trace!("peer abandoned last path, closing connection");
5048 return Err(TransportError::NO_VIABLE_PATH(
5049 "last path abandoned by peer",
5050 ));
5051 }
5052 Err(ClosePathError::ClosedPath) => {
5053 trace!("peer abandoned already closed path");
5054 }
5055 Err(ClosePathError::MultipathNotNegotiated) => {
5056 return Err(TransportError::PROTOCOL_VIOLATION(
5057 "received PATH_ABANDON frame when multipath was not negotiated",
5058 ));
5059 }
5060 };
5061
5062 if let Some(path) = self.paths.get_mut(&path_id)
5064 && !mem::replace(&mut path.data.draining, true)
5065 {
5066 let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5067 let pto = path.data.rtt.pto_base() + ack_delay;
5068 self.timers.set(
5069 Timer::PerPath(path_id, PathTimer::DiscardPath),
5070 now + 3 * pto,
5071 self.qlog.with_time(now),
5072 );
5073
5074 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
5075 }
5076 }
5077 Frame::PathStatusAvailable(info) => {
5078 span.record("path", tracing::field::display(&info.path_id));
5079 if self.is_multipath_negotiated() {
5080 self.on_path_status(
5081 info.path_id,
5082 PathStatus::Available,
5083 info.status_seq_no,
5084 );
5085 } else {
5086 return Err(TransportError::PROTOCOL_VIOLATION(
5087 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
5088 ));
5089 }
5090 }
5091 Frame::PathStatusBackup(info) => {
5092 span.record("path", tracing::field::display(&info.path_id));
5093 if self.is_multipath_negotiated() {
5094 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
5095 } else {
5096 return Err(TransportError::PROTOCOL_VIOLATION(
5097 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
5098 ));
5099 }
5100 }
5101 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
5102 span.record("path", tracing::field::display(&path_id));
5103 if !self.is_multipath_negotiated() {
5104 return Err(TransportError::PROTOCOL_VIOLATION(
5105 "received MAX_PATH_ID frame when multipath was not negotiated",
5106 ));
5107 }
5108 if path_id > self.remote_max_path_id {
5110 self.remote_max_path_id = path_id;
5111 self.issue_first_path_cids(now);
5112 while let Some(true) = self.continue_nat_traversal_round(now) {}
5113 }
5114 }
5115 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
5116 if self.is_multipath_negotiated() {
5120 if max_path_id > self.local_max_path_id {
5121 return Err(TransportError::PROTOCOL_VIOLATION(
5122 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
5123 ));
5124 }
5125 debug!("received PATHS_BLOCKED({:?})", max_path_id);
5126 } else {
5128 return Err(TransportError::PROTOCOL_VIOLATION(
5129 "received PATHS_BLOCKED frame when not multipath was not negotiated",
5130 ));
5131 }
5132 }
5133 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
5134 if self.is_multipath_negotiated() {
5142 if path_id > self.local_max_path_id {
5143 return Err(TransportError::PROTOCOL_VIOLATION(
5144 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
5145 ));
5146 }
5147 if next_seq.0
5148 > self
5149 .local_cid_state
5150 .get(&path_id)
5151 .map(|cid_state| cid_state.active_seq().1 + 1)
5152 .unwrap_or_default()
5153 {
5154 return Err(TransportError::PROTOCOL_VIOLATION(
5155 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
5156 ));
5157 }
5158 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
5159 } else {
5160 return Err(TransportError::PROTOCOL_VIOLATION(
5161 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
5162 ));
5163 }
5164 }
5165 Frame::AddAddress(addr) => {
5166 let client_state = match self.n0_nat_traversal.client_side_mut() {
5167 Ok(state) => state,
5168 Err(err) => {
5169 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5170 "Nat traversal(ADD_ADDRESS): {err}"
5171 )));
5172 }
5173 };
5174
5175 if !client_state.check_remote_address(&addr) {
5176 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
5178 }
5179
5180 match client_state.add_remote_address(addr) {
5181 Ok(maybe_added) => {
5182 if let Some(added) = maybe_added {
5183 self.events.push_back(Event::NatTraversal(
5184 n0_nat_traversal::Event::AddressAdded(added),
5185 ));
5186 }
5187 }
5188 Err(e) => {
5189 warn!(%e, "failed to add remote address")
5190 }
5191 }
5192 }
5193 Frame::RemoveAddress(addr) => {
5194 let client_state = match self.n0_nat_traversal.client_side_mut() {
5195 Ok(state) => state,
5196 Err(err) => {
5197 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5198 "Nat traversal(REMOVE_ADDRESS): {err}"
5199 )));
5200 }
5201 };
5202 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
5203 self.events.push_back(Event::NatTraversal(
5204 n0_nat_traversal::Event::AddressRemoved(removed_addr),
5205 ));
5206 }
5207 }
5208 Frame::ReachOut(reach_out) => {
5209 let ipv6 = self.is_ipv6();
5210 let server_state = match self.n0_nat_traversal.server_side_mut() {
5211 Ok(state) => state,
5212 Err(err) => {
5213 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5214 "Nat traversal(REACH_OUT): {err}"
5215 )));
5216 }
5217 };
5218
5219 if let Err(err) = server_state.handle_reach_out(reach_out, ipv6) {
5220 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5221 "Nat traversal(REACH_OUT): {err}"
5222 )));
5223 }
5224 }
5225 }
5226 }
5227
5228 let space = self.spaces[SpaceId::Data].for_path(path_id);
5229 if space
5230 .pending_acks
5231 .packet_received(now, number, ack_eliciting, &space.dedup)
5232 {
5233 if self.abandoned_paths.contains(&path_id) {
5234 space.pending_acks.set_immediate_ack_required();
5237 } else {
5238 self.timers.set(
5239 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
5240 now + self.ack_frequency.max_ack_delay,
5241 self.qlog.with_time(now),
5242 );
5243 }
5244 }
5245
5246 let pending = &mut self.spaces[SpaceId::Data].pending;
5251 self.streams.queue_max_stream_id(pending);
5252
5253 if let Some(reason) = close {
5254 self.state.move_to_draining(Some(reason.into()));
5255 self.connection_close_pending = true;
5256 }
5257
5258 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
5259 && !is_probing_packet
5260 && network_path != self.path_data(path_id).network_path
5261 {
5262 let ConnectionSide::Server { ref server_config } = self.side else {
5263 panic!("packets from unknown remote should be dropped by clients");
5264 };
5265 debug_assert!(
5266 server_config.migration,
5267 "migration-initiating packets should have been dropped immediately"
5268 );
5269 self.migrate(path_id, now, network_path, migration_observed_addr);
5270 self.update_remote_cid(path_id);
5272 self.spin = false;
5273 }
5274
5275 Ok(())
5276 }
5277
5278 fn migrate(
5279 &mut self,
5280 path_id: PathId,
5281 now: Instant,
5282 network_path: FourTuple,
5283 observed_addr: Option<ObservedAddr>,
5284 ) {
5285 trace!(%network_path, %path_id, "migration initiated");
5286 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
5287 let prev_pto = self.pto(SpaceKind::Data, path_id);
5294 let path = self.paths.get_mut(&path_id).expect("known path");
5295 let mut new_path_data = if network_path.remote.is_ipv4()
5296 && network_path.remote.ip() == path.data.network_path.remote.ip()
5297 {
5298 PathData::from_previous(network_path, &path.data, self.path_generation_counter, now)
5299 } else {
5300 let peer_max_udp_payload_size =
5301 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
5302 .unwrap_or(u16::MAX);
5303 PathData::new(
5304 network_path,
5305 self.allow_mtud,
5306 Some(peer_max_udp_payload_size),
5307 self.path_generation_counter,
5308 now,
5309 &self.config,
5310 )
5311 };
5312 new_path_data.last_observed_addr_report = path.data.last_observed_addr_report.clone();
5313 if let Some(report) = observed_addr
5314 && let Some(updated) = new_path_data.update_observed_addr_report(report)
5315 {
5316 tracing::info!("adding observed addr event from migration");
5317 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5318 id: path_id,
5319 addr: updated,
5320 }));
5321 }
5322 new_path_data.pending_on_path_challenge = true;
5323
5324 let mut prev_path_data = mem::replace(&mut path.data, new_path_data);
5325
5326 if !prev_path_data.validated
5335 && let Some(cid) = self.remote_cids.get(&path_id).map(CidQueue::active)
5336 {
5337 prev_path_data.pending_on_path_challenge = true;
5338 path.prev = Some((cid, prev_path_data));
5341 }
5342
5343 self.qlog.emit_tuple_assigned(path_id, network_path, now);
5345
5346 self.timers.set(
5347 Timer::PerPath(path_id, PathTimer::PathValidation),
5348 now + 3 * cmp::max(self.pto(SpaceKind::Data, path_id), prev_pto),
5349 self.qlog.with_time(now),
5350 );
5351 }
5352
5353 pub fn handle_network_change(&mut self, hint: Option<&dyn NetworkChangeHint>, now: Instant) {
5370 debug!("network changed");
5371 if self.state.is_drained() {
5372 return;
5373 }
5374 if self.highest_space < SpaceKind::Data {
5375 for path in self.paths.values_mut() {
5376 path.data.network_path.local_ip = None;
5378 }
5379
5380 self.update_remote_cid(PathId::ZERO);
5381 self.ping();
5382
5383 return;
5384 }
5385
5386 let mut non_recoverable_paths = Vec::default();
5389 let mut recoverable_paths = Vec::default();
5390 let mut open_paths = 0;
5391
5392 let is_multipath_negotiated = self.is_multipath_negotiated();
5393 let is_client = self.side().is_client();
5394 let immediate_ack_allowed = self.peer_supports_ack_frequency();
5395
5396 for (path_id, path) in self.paths.iter_mut() {
5397 if self.abandoned_paths.contains(path_id) {
5398 continue;
5399 }
5400 open_paths += 1;
5401
5402 path.data.network_path.local_ip = None;
5405
5406 let network_path = path.data.network_path;
5407 let remote = network_path.remote;
5408
5409 let attempt_to_recover = if is_multipath_negotiated {
5413 if is_client {
5414 hint.map(|h| h.is_path_recoverable(*path_id, network_path))
5415 .unwrap_or(false)
5416 } else {
5417 true
5421 }
5422 } else {
5423 true
5425 };
5426
5427 if attempt_to_recover {
5428 recoverable_paths.push((*path_id, remote));
5429 } else {
5430 non_recoverable_paths.push((*path_id, remote, path.data.local_status()))
5431 }
5432 }
5433
5434 let open_first = open_paths == non_recoverable_paths.len();
5443
5444 for (path_id, remote, status) in non_recoverable_paths.into_iter() {
5445 let network_path = FourTuple {
5446 remote,
5447 local_ip: None, };
5449
5450 if open_first && let Err(e) = self.open_path(network_path, status, now) {
5451 debug!(%e,"Failed to open new path for network change");
5452 recoverable_paths.push((path_id, remote));
5454 continue;
5455 }
5456
5457 if let Err(e) =
5458 self.close_path_inner(now, path_id, PathAbandonReason::UnusableAfterNetworkChange)
5459 {
5460 debug!(%e,"Failed to close unrecoverable path after network change");
5461 recoverable_paths.push((path_id, remote));
5462 continue;
5463 }
5464
5465 if !open_first && let Err(e) = self.open_path(network_path, status, now) {
5466 debug!(%e,"Failed to open new path for network change");
5470 }
5471 }
5472
5473 for (path_id, remote) in recoverable_paths.into_iter() {
5476 let space = &mut self.spaces[SpaceId::Data];
5477
5478 if let Some(path_space) = space.number_spaces.get_mut(&path_id) {
5480 path_space.ping_pending = true;
5481
5482 if immediate_ack_allowed {
5483 path_space.immediate_ack_pending = true;
5484 }
5485 }
5486
5487 let Some((reset_token, retired)) =
5488 self.remote_cids.get_mut(&path_id).and_then(CidQueue::next)
5489 else {
5490 continue;
5491 };
5492
5493 space
5495 .pending
5496 .retire_cids
5497 .extend(retired.map(|seq| (path_id, seq)));
5498
5499 debug_assert!(!self.state.is_drained()); self.endpoint_events
5501 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5502 }
5503 }
5504
5505 fn update_remote_cid(&mut self, path_id: PathId) {
5507 let Some((reset_token, retired)) = self
5508 .remote_cids
5509 .get_mut(&path_id)
5510 .and_then(|cids| cids.next())
5511 else {
5512 return;
5513 };
5514
5515 self.spaces[SpaceId::Data]
5517 .pending
5518 .retire_cids
5519 .extend(retired.map(|seq| (path_id, seq)));
5520 let remote = self.path_data(path_id).network_path.remote;
5521 self.set_reset_token(path_id, remote, reset_token);
5522 }
5523
5524 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5533 debug_assert!(!self.state.is_drained()); self.endpoint_events
5535 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5536
5537 if path_id == PathId::ZERO {
5543 self.peer_params.stateless_reset_token = Some(reset_token);
5544 }
5545 }
5546
5547 fn issue_first_cids(&mut self, now: Instant) {
5549 if self
5550 .local_cid_state
5551 .get(&PathId::ZERO)
5552 .expect("PathId::ZERO exists when the connection is created")
5553 .cid_len()
5554 == 0
5555 {
5556 return;
5557 }
5558
5559 let mut n = self.peer_params.issue_cids_limit() - 1;
5561 if let ConnectionSide::Server { server_config } = &self.side
5562 && server_config.has_preferred_address()
5563 {
5564 n -= 1;
5566 }
5567 debug_assert!(!self.state.is_drained()); self.endpoint_events
5569 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5570 }
5571
5572 fn issue_first_path_cids(&mut self, now: Instant) {
5576 if let Some(max_path_id) = self.max_path_id() {
5577 let mut path_id = self.max_path_id_with_cids.next();
5578 while path_id <= max_path_id {
5579 self.endpoint_events
5580 .push_back(EndpointEventInner::NeedIdentifiers(
5581 path_id,
5582 now,
5583 self.peer_params.issue_cids_limit(),
5584 ));
5585 path_id = path_id.next();
5586 }
5587 self.max_path_id_with_cids = max_path_id;
5588 }
5589 }
5590
5591 fn populate_packet<'a, 'b>(
5599 &mut self,
5600 now: Instant,
5601 space_id: SpaceId,
5602 path_id: PathId,
5603 scheduling_info: &PathSchedulingInfo,
5604 builder: &mut PacketBuilder<'a, 'b>,
5605 ) {
5606 let is_multipath_negotiated = self.is_multipath_negotiated();
5607 let space_has_keys = self.crypto_state.has_keys(space_id.encryption_level());
5608 let is_0rtt = space_id == SpaceId::Data && !space_has_keys;
5609 let stats = &mut self.stats.frame_tx;
5610 let space = &mut self.spaces[space_id];
5611 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5612 space
5613 .for_path(path_id)
5614 .pending_acks
5615 .maybe_ack_non_eliciting();
5616
5617 if !is_0rtt
5619 && !scheduling_info.is_abandoned
5620 && scheduling_info.may_send_data
5621 && mem::replace(&mut space.pending.handshake_done, false)
5622 {
5623 builder.write_frame(frame::HandshakeDone, stats);
5624 }
5625
5626 if !scheduling_info.is_abandoned
5628 && mem::replace(&mut space.for_path(path_id).ping_pending, false)
5629 {
5630 builder.write_frame(frame::Ping, stats);
5631 }
5632
5633 if !scheduling_info.is_abandoned
5635 && mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false)
5636 {
5637 debug_assert_eq!(
5638 space_id,
5639 SpaceId::Data,
5640 "immediate acks must be sent in the data space"
5641 );
5642 builder.write_frame(frame::ImmediateAck, stats);
5643 }
5644
5645 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
5647 for path_id in space
5648 .number_spaces
5649 .iter_mut()
5650 .filter(|(_, pns)| pns.pending_acks.can_send())
5651 .map(|(&path_id, _)| path_id)
5652 .collect::<Vec<_>>()
5653 {
5654 Self::populate_acks(
5655 now,
5656 self.receiving_ecn,
5657 path_id,
5658 space_id,
5659 space,
5660 is_multipath_negotiated,
5661 builder,
5662 stats,
5663 space_has_keys,
5664 );
5665 }
5666 }
5667
5668 if !scheduling_info.is_abandoned
5670 && scheduling_info.may_send_data
5671 && mem::replace(&mut space.pending.ack_frequency, false)
5672 {
5673 let sequence_number = self.ack_frequency.next_sequence_number();
5674
5675 let config = self.config.ack_frequency_config.as_ref().unwrap();
5677
5678 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5680 path.rtt.get(),
5681 config,
5682 &self.peer_params,
5683 );
5684
5685 let frame = frame::AckFrequency {
5686 sequence: sequence_number,
5687 ack_eliciting_threshold: config.ack_eliciting_threshold,
5688 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5689 reordering_threshold: config.reordering_threshold,
5690 };
5691 builder.write_frame(frame, stats);
5692
5693 self.ack_frequency
5694 .ack_frequency_sent(path_id, builder.packet_number, max_ack_delay);
5695 }
5696
5697 if !scheduling_info.is_abandoned
5699 && space_id == SpaceId::Data
5700 && path.pending_on_path_challenge
5701 && !self.state.is_closed()
5702 && builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
5703 {
5705 path.pending_on_path_challenge = false;
5706
5707 let token = self.rng.random();
5708 path.record_path_challenge_sent(now, token, path.network_path);
5709 let challenge = frame::PathChallenge(token);
5711 builder.write_frame(challenge, stats);
5712 builder.require_padding();
5713 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5714 if path.open_status == paths::OpenStatus::Pending {
5715 path.open_status = paths::OpenStatus::Sent;
5716 self.timers.set(
5717 Timer::PerPath(path_id, PathTimer::PathOpen),
5718 now + 3 * pto,
5719 self.qlog.with_time(now),
5720 );
5721 }
5722
5723 self.timers.set(
5724 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5725 now + pto,
5726 self.qlog.with_time(now),
5727 );
5728
5729 if is_multipath_negotiated && !path.validated && path.pending_on_path_challenge {
5730 space.pending.path_status.insert(path_id);
5732 }
5733
5734 if space_id == SpaceId::Data
5737 && self
5738 .config
5739 .address_discovery_role
5740 .should_report(&self.peer_params.address_discovery_role)
5741 {
5742 let frame = frame::ObservedAddr::new(
5743 path.network_path.remote,
5744 self.next_observed_addr_seq_no,
5745 );
5746 if builder.frame_space_remaining() > frame.size() {
5747 builder.write_frame(frame, stats);
5748
5749 self.next_observed_addr_seq_no =
5750 self.next_observed_addr_seq_no.saturating_add(1u8);
5751 path.observed_addr_sent = true;
5752
5753 space.pending.observed_addr = false;
5754 }
5755 }
5756 }
5757
5758 if !scheduling_info.is_abandoned
5760 && space_id == SpaceId::Data
5761 && builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
5762 && let Some(token) = path.path_responses.pop_on_path(path.network_path)
5763 {
5764 let response = frame::PathResponse(token);
5765 builder.write_frame(response, stats);
5766 builder.require_padding();
5767
5768 if space_id == SpaceId::Data
5772 && self
5773 .config
5774 .address_discovery_role
5775 .should_report(&self.peer_params.address_discovery_role)
5776 {
5777 let frame = frame::ObservedAddr::new(
5778 path.network_path.remote,
5779 self.next_observed_addr_seq_no,
5780 );
5781 if builder.frame_space_remaining() > frame.size() {
5782 builder.write_frame(frame, stats);
5783
5784 self.next_observed_addr_seq_no =
5785 self.next_observed_addr_seq_no.saturating_add(1u8);
5786 path.observed_addr_sent = true;
5787
5788 space.pending.observed_addr = false;
5789 }
5790 }
5791 }
5792
5793 if !scheduling_info.is_abandoned
5795 && scheduling_info.may_send_data
5796 && let Some((round, addresses)) = space.pending.reach_out.as_mut()
5797 {
5798 while let Some(local_addr) = addresses.iter().next().copied() {
5799 let local_addr = addresses.take(&local_addr).expect("found from iter");
5800 let reach_out = frame::ReachOut::new(*round, local_addr);
5801 if builder.frame_space_remaining() > reach_out.size() {
5802 builder.write_frame(reach_out, stats);
5803 } else {
5804 addresses.insert(local_addr);
5805 break;
5806 }
5807 }
5808 if addresses.is_empty() {
5809 space.pending.reach_out = None;
5810 }
5811 }
5812
5813 if space_id == SpaceId::Data
5815 && scheduling_info.is_abandoned
5816 && scheduling_info.may_self_abandon
5817 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5818 && let Some(error_code) = space.pending.path_abandon.remove(&path_id)
5819 {
5820 let frame = frame::PathAbandon {
5821 path_id,
5822 error_code,
5823 };
5824 builder.write_frame(frame, stats);
5825
5826 self.remote_cids.remove(&path_id);
5829 }
5830 while space_id == SpaceId::Data
5831 && scheduling_info.may_send_data
5832 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5833 && let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
5834 {
5835 let frame = frame::PathAbandon {
5836 path_id: abandoned_path_id,
5837 error_code,
5838 };
5839 builder.write_frame(frame, stats);
5840
5841 self.remote_cids.remove(&abandoned_path_id);
5844 }
5845
5846 if !scheduling_info.is_abandoned
5848 && scheduling_info.may_send_data
5849 && space_id == SpaceId::Data
5850 && self
5851 .config
5852 .address_discovery_role
5853 .should_report(&self.peer_params.address_discovery_role)
5854 && (!path.observed_addr_sent || space.pending.observed_addr)
5855 {
5856 let frame =
5857 frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
5858 if builder.frame_space_remaining() > frame.size() {
5859 builder.write_frame(frame, stats);
5860
5861 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5862 path.observed_addr_sent = true;
5863
5864 space.pending.observed_addr = false;
5865 }
5866 }
5867
5868 while !is_0rtt
5870 && !scheduling_info.is_abandoned
5871 && scheduling_info.may_send_data
5872 && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
5873 {
5874 let Some(mut frame) = space.pending.crypto.pop_front() else {
5875 break;
5876 };
5877
5878 let max_crypto_data_size = builder.frame_space_remaining()
5883 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5885 - 2; let len = frame
5888 .data
5889 .len()
5890 .min(2usize.pow(14) - 1)
5891 .min(max_crypto_data_size);
5892
5893 let data = frame.data.split_to(len);
5894 let offset = frame.offset;
5895 let truncated = frame::Crypto { offset, data };
5896 builder.write_frame(truncated, stats);
5897
5898 if !frame.data.is_empty() {
5899 frame.offset += len as u64;
5900 space.pending.crypto.push_front(frame);
5901 }
5902 }
5903
5904 while space_id == SpaceId::Data
5906 && !scheduling_info.is_abandoned
5907 && scheduling_info.may_send_data
5908 && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
5909 {
5910 let Some(path_id) = space.pending.path_status.pop_first() else {
5911 break;
5912 };
5913 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5914 trace!(%path_id, "discarding queued path status for unknown path");
5915 continue;
5916 };
5917
5918 let seq = path.status.seq();
5919 match path.local_status() {
5920 PathStatus::Available => {
5921 let frame = frame::PathStatusAvailable {
5922 path_id,
5923 status_seq_no: seq,
5924 };
5925 builder.write_frame(frame, stats);
5926 }
5927 PathStatus::Backup => {
5928 let frame = frame::PathStatusBackup {
5929 path_id,
5930 status_seq_no: seq,
5931 };
5932 builder.write_frame(frame, stats);
5933 }
5934 }
5935 }
5936
5937 if space_id == SpaceId::Data
5939 && !scheduling_info.is_abandoned
5940 && scheduling_info.may_send_data
5941 && space.pending.max_path_id
5942 && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
5943 {
5944 let frame = frame::MaxPathId(self.local_max_path_id);
5945 builder.write_frame(frame, stats);
5946 space.pending.max_path_id = false;
5947 }
5948
5949 if space_id == SpaceId::Data
5951 && !scheduling_info.is_abandoned
5952 && scheduling_info.may_send_data
5953 && space.pending.paths_blocked
5954 && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5955 {
5956 let frame = frame::PathsBlocked(self.remote_max_path_id);
5957 builder.write_frame(frame, stats);
5958 space.pending.paths_blocked = false;
5959 }
5960
5961 while space_id == SpaceId::Data
5963 && !scheduling_info.is_abandoned
5964 && scheduling_info.may_send_data
5965 && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5966 {
5967 let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
5968 break;
5969 };
5970 let next_seq = match self.remote_cids.get(&path_id) {
5971 Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
5972 None => VarInt(0),
5973 };
5974 let frame = frame::PathCidsBlocked { path_id, next_seq };
5975 builder.write_frame(frame, stats);
5976 }
5977
5978 if space_id == SpaceId::Data
5980 && !scheduling_info.is_abandoned
5981 && scheduling_info.may_send_data
5982 {
5983 self.streams
5984 .write_control_frames(builder, &mut space.pending, stats);
5985 }
5986
5987 let cid_len = self
5989 .local_cid_state
5990 .values()
5991 .map(|cid_state| cid_state.cid_len())
5992 .max()
5993 .expect("some local CID state must exist");
5994 let new_cid_size_bound =
5995 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5996 while !scheduling_info.is_abandoned
5997 && scheduling_info.may_send_data
5998 && builder.frame_space_remaining() > new_cid_size_bound
5999 {
6000 let Some(issued) = space.pending.new_cids.pop() else {
6001 break;
6002 };
6003 let retire_prior_to = self
6004 .local_cid_state
6005 .get(&issued.path_id)
6006 .map(|cid_state| cid_state.retire_prior_to())
6007 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
6008
6009 let cid_path_id = match is_multipath_negotiated {
6010 true => Some(issued.path_id),
6011 false => {
6012 debug_assert_eq!(issued.path_id, PathId::ZERO);
6013 None
6014 }
6015 };
6016 let frame = frame::NewConnectionId {
6017 path_id: cid_path_id,
6018 sequence: issued.sequence,
6019 retire_prior_to,
6020 id: issued.id,
6021 reset_token: issued.reset_token,
6022 };
6023 builder.write_frame(frame, stats);
6024 }
6025
6026 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
6028 while !scheduling_info.is_abandoned
6029 && scheduling_info.may_send_data
6030 && builder.frame_space_remaining() > retire_cid_bound
6031 {
6032 let (path_id, sequence) = match space.pending.retire_cids.pop() {
6033 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
6034 Some((path_id, seq)) => (Some(path_id), seq),
6035 None => break,
6036 };
6037 let frame = frame::RetireConnectionId { path_id, sequence };
6038 builder.write_frame(frame, stats);
6039 }
6040
6041 let mut sent_datagrams = false;
6043 while !scheduling_info.is_abandoned
6044 && scheduling_info.may_send_data
6045 && builder.frame_space_remaining() > Datagram::SIZE_BOUND
6046 && space_id == SpaceId::Data
6047 {
6048 match self.datagrams.write(builder, stats) {
6049 true => {
6050 sent_datagrams = true;
6051 }
6052 false => break,
6053 }
6054 }
6055 if self.datagrams.send_blocked && sent_datagrams {
6056 self.events.push_back(Event::DatagramsUnblocked);
6057 self.datagrams.send_blocked = false;
6058 }
6059
6060 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6061
6062 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
6064 while let Some(network_path) = space.pending.new_tokens.pop() {
6065 debug_assert_eq!(space_id, SpaceId::Data);
6066 let ConnectionSide::Server { server_config } = &self.side else {
6067 panic!("NEW_TOKEN frames should not be enqueued by clients");
6068 };
6069
6070 if !network_path.is_probably_same_path(&path.network_path) {
6071 continue;
6076 }
6077
6078 let token = Token::new(
6079 TokenPayload::Validation {
6080 ip: network_path.remote.ip(),
6081 issued: server_config.time_source.now(),
6082 },
6083 &mut self.rng,
6084 );
6085 let new_token = NewToken {
6086 token: token.encode(&*server_config.token_key).into(),
6087 };
6088
6089 if builder.frame_space_remaining() < new_token.size() {
6090 space.pending.new_tokens.push(network_path);
6091 break;
6092 }
6093
6094 builder.write_frame(new_token, stats);
6095 builder.retransmits_mut().new_tokens.push(network_path);
6096 }
6097 }
6098
6099 if !scheduling_info.is_abandoned
6101 && scheduling_info.may_send_data
6102 && space_id == SpaceId::Data
6103 {
6104 self.streams
6105 .write_stream_frames(builder, self.config.send_fairness, stats);
6106 }
6107
6108 while space_id == SpaceId::Data
6110 && !scheduling_info.is_abandoned
6111 && scheduling_info.may_send_data
6112 && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
6113 {
6114 if let Some(added_address) = space.pending.add_address.pop_last() {
6115 builder.write_frame(added_address, stats);
6116 } else {
6117 break;
6118 }
6119 }
6120
6121 while space_id == SpaceId::Data
6123 && !scheduling_info.is_abandoned
6124 && scheduling_info.may_send_data
6125 && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
6126 {
6127 if let Some(removed_address) = space.pending.remove_address.pop_last() {
6128 builder.write_frame(removed_address, stats);
6129 } else {
6130 break;
6131 }
6132 }
6133 }
6134
6135 fn populate_acks<'a, 'b>(
6137 now: Instant,
6138 receiving_ecn: bool,
6139 path_id: PathId,
6140 space_id: SpaceId,
6141 space: &mut PacketSpace,
6142 is_multipath_negotiated: bool,
6143 builder: &mut PacketBuilder<'a, 'b>,
6144 stats: &mut FrameStats,
6145 space_has_keys: bool,
6146 ) {
6147 debug_assert!(space_has_keys, "tried to send ACK in 0-RTT");
6149
6150 debug_assert!(
6151 is_multipath_negotiated || path_id == PathId::ZERO,
6152 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
6153 );
6154 if is_multipath_negotiated {
6155 debug_assert!(
6156 space_id == SpaceId::Data || path_id == PathId::ZERO,
6157 "path acks must be sent in 1RTT space (have {space_id:?})"
6158 );
6159 }
6160
6161 let pns = space.for_path(path_id);
6162 let ranges = pns.pending_acks.ranges();
6163 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
6164 let ecn = if receiving_ecn {
6165 Some(&pns.ecn_counters)
6166 } else {
6167 None
6168 };
6169
6170 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
6171 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
6173 let delay = delay_micros >> ack_delay_exp.into_inner();
6174
6175 if is_multipath_negotiated && space_id == SpaceId::Data {
6176 if !ranges.is_empty() {
6177 let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
6178 builder.write_frame(frame, stats);
6179 }
6180 } else {
6181 builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
6182 }
6183 }
6184
6185 fn close_common(&mut self) {
6186 trace!("connection closed");
6187 self.timers.reset();
6188 }
6189
6190 fn set_close_timer(&mut self, now: Instant) {
6191 let pto_max = self.max_pto_for_space(self.highest_space);
6194 self.timers.set(
6195 Timer::Conn(ConnTimer::Close),
6196 now + 3 * pto_max,
6197 self.qlog.with_time(now),
6198 );
6199 }
6200
6201 fn handle_peer_params(
6206 &mut self,
6207 params: TransportParameters,
6208 local_cid: ConnectionId,
6209 remote_cid: ConnectionId,
6210 now: Instant,
6211 ) -> Result<(), TransportError> {
6212 if Some(self.original_remote_cid) != params.initial_src_cid
6213 || (self.side.is_client()
6214 && (Some(self.initial_dst_cid) != params.original_dst_cid
6215 || self.retry_src_cid != params.retry_src_cid))
6216 {
6217 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
6218 "CID authentication failure",
6219 ));
6220 }
6221 if params.initial_max_path_id.is_some() && (local_cid.is_empty() || remote_cid.is_empty()) {
6222 return Err(TransportError::PROTOCOL_VIOLATION(
6223 "multipath must not use zero-length CIDs",
6224 ));
6225 }
6226
6227 self.set_peer_params(params);
6228 self.qlog.emit_peer_transport_params_received(self, now);
6229
6230 Ok(())
6231 }
6232
6233 fn set_peer_params(&mut self, params: TransportParameters) {
6234 self.streams.set_params(¶ms);
6235 self.idle_timeout =
6236 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
6237 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
6238
6239 if let Some(ref info) = params.preferred_address {
6240 self.remote_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
6242 path_id: None,
6243 sequence: 1,
6244 id: info.connection_id,
6245 reset_token: info.stateless_reset_token,
6246 retire_prior_to: 0,
6247 })
6248 .expect(
6249 "preferred address CID is the first received, and hence is guaranteed to be legal",
6250 );
6251 let remote = self.path_data(PathId::ZERO).network_path.remote;
6252 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
6253 }
6254 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
6255
6256 let mut multipath_enabled = None;
6257 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
6258 self.config.get_initial_max_path_id(),
6259 params.initial_max_path_id,
6260 ) {
6261 self.local_max_path_id = local_max_path_id;
6263 self.remote_max_path_id = remote_max_path_id;
6264 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
6265 debug!(%initial_max_path_id, "multipath negotiated");
6266 multipath_enabled = Some(initial_max_path_id);
6267 }
6268
6269 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
6270 self.config
6271 .max_remote_nat_traversal_addresses
6272 .zip(params.max_remote_nat_traversal_addresses)
6273 {
6274 if let Some(max_initial_paths) =
6275 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
6276 {
6277 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
6278 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
6279 self.n0_nat_traversal = n0_nat_traversal::State::new(
6280 max_remote_addresses,
6281 max_local_addresses,
6282 self.side(),
6283 );
6284 debug!(
6285 %max_remote_addresses, %max_local_addresses,
6286 "n0's nat traversal negotiated"
6287 );
6288
6289 match self.side() {
6290 Side::Client => {
6291 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
6292 debug!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
6295 } else if max_local_addresses as u64
6296 > params.active_connection_id_limit.into_inner()
6297 {
6298 debug!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
6302 }
6303 }
6304 Side::Server => {
6305 if (max_initial_paths.as_u32() as u64) < crate::LOCAL_CID_COUNT {
6306 debug!(%max_initial_paths, local_cid_limit=%crate::LOCAL_CID_COUNT, "local server configuration might cause nat traversal issues")
6307 }
6308 }
6309 }
6310 } else {
6311 debug!("n0 nat traversal enabled for both endpoints, but multipath is missing")
6312 }
6313 }
6314
6315 self.peer_params = params;
6316 let peer_max_udp_payload_size =
6317 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
6318 self.path_data_mut(PathId::ZERO)
6319 .mtud
6320 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
6321 }
6322
6323 fn decrypt_packet(
6325 &mut self,
6326 now: Instant,
6327 path_id: PathId,
6328 packet: &mut Packet,
6329 ) -> Result<Option<u64>, Option<TransportError>> {
6330 let result = self
6331 .crypto_state
6332 .decrypt_packet_body(packet, path_id, &self.spaces)?;
6333
6334 let Some(result) = result else {
6335 return Ok(None);
6336 };
6337
6338 if result.outgoing_key_update_acked
6339 && let Some(prev) = self.crypto_state.prev_crypto.as_mut()
6340 {
6341 prev.end_packet = Some((result.number, now));
6342 self.set_key_discard_timer(now, packet.header.space());
6343 }
6344
6345 if result.incoming_key_update {
6346 trace!("key update authenticated");
6347 self.crypto_state
6348 .update_keys(Some((result.number, now)), true);
6349 self.set_key_discard_timer(now, packet.header.space());
6350 }
6351
6352 Ok(Some(result.number))
6353 }
6354
6355 fn peer_supports_ack_frequency(&self) -> bool {
6356 self.peer_params.min_ack_delay.is_some()
6357 }
6358
6359 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
6364 debug_assert_eq!(
6365 self.highest_space,
6366 SpaceKind::Data,
6367 "immediate ack must be written in the data space"
6368 );
6369 self.spaces[SpaceId::Data]
6370 .for_path(path_id)
6371 .immediate_ack_pending = true;
6372 }
6373
6374 #[cfg(test)]
6376 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
6377 let ConnectionEventInner::Datagram(DatagramConnectionEvent {
6378 path_id,
6379 first_decode,
6380 remaining,
6381 ..
6382 }) = &event.0
6383 else {
6384 return None;
6385 };
6386
6387 if remaining.is_some() {
6388 panic!("Packets should never be coalesced in tests");
6389 }
6390
6391 let decrypted_header = self
6392 .crypto_state
6393 .unprotect_header(first_decode.clone(), self.peer_params.stateless_reset_token)?;
6394
6395 let mut packet = decrypted_header.packet?;
6396 self.crypto_state
6397 .decrypt_packet_body(&mut packet, *path_id, &self.spaces)
6398 .ok()?;
6399
6400 Some(packet.payload.to_vec())
6401 }
6402
6403 #[cfg(test)]
6406 pub(crate) fn bytes_in_flight(&self) -> u64 {
6407 self.path_data(PathId::ZERO).in_flight.bytes
6409 }
6410
6411 #[cfg(test)]
6413 pub(crate) fn congestion_window(&self) -> u64 {
6414 let path = self.path_data(PathId::ZERO);
6415 path.congestion
6416 .window()
6417 .saturating_sub(path.in_flight.bytes)
6418 }
6419
6420 #[cfg(test)]
6422 pub(crate) fn is_idle(&self) -> bool {
6423 let current_timers = self.timers.values();
6424 current_timers
6425 .into_iter()
6426 .filter(|(timer, _)| {
6427 !matches!(
6428 timer,
6429 Timer::Conn(ConnTimer::KeepAlive)
6430 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6431 | Timer::Conn(ConnTimer::PushNewCid)
6432 | Timer::Conn(ConnTimer::KeyDiscard)
6433 )
6434 })
6435 .min_by_key(|(_, time)| *time)
6436 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6437 }
6438
6439 #[cfg(test)]
6441 pub(crate) fn using_ecn(&self) -> bool {
6442 self.path_data(PathId::ZERO).sending_ecn
6443 }
6444
6445 #[cfg(test)]
6447 pub(crate) fn total_recvd(&self) -> u64 {
6448 self.path_data(PathId::ZERO).total_recvd
6449 }
6450
6451 #[cfg(test)]
6452 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6453 self.local_cid_state
6454 .get(&PathId::ZERO)
6455 .unwrap()
6456 .active_seq()
6457 }
6458
6459 #[cfg(test)]
6460 #[track_caller]
6461 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6462 self.local_cid_state
6463 .get(&PathId(path_id))
6464 .unwrap()
6465 .active_seq()
6466 }
6467
6468 #[cfg(test)]
6471 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6472 let n = self
6473 .local_cid_state
6474 .get_mut(&PathId::ZERO)
6475 .unwrap()
6476 .assign_retire_seq(v);
6477 debug_assert!(!self.state.is_drained()); self.endpoint_events
6479 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6480 }
6481
6482 #[cfg(test)]
6484 pub(crate) fn active_remote_cid_seq(&self) -> u64 {
6485 self.remote_cids.get(&PathId::ZERO).unwrap().active_seq()
6486 }
6487
6488 #[cfg(test)]
6490 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6491 self.path_data(path_id).current_mtu()
6492 }
6493
6494 #[cfg(test)]
6496 pub(crate) fn trigger_path_validation(&mut self) {
6497 for path in self.paths.values_mut() {
6498 path.data.pending_on_path_challenge = true;
6499 }
6500 }
6501
6502 #[cfg(test)]
6504 pub fn simulate_protocol_violation(&mut self, now: Instant) {
6505 if !self.state.is_closed() {
6506 self.state
6507 .move_to_closed(TransportError::PROTOCOL_VIOLATION("simulated violation"));
6508 self.close_common();
6509 if !self.state.is_drained() {
6510 self.set_close_timer(now);
6511 }
6512 self.connection_close_pending = true;
6513 }
6514 }
6515
6516 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6527 let space_specific = self.paths.get(&path_id).is_some_and(|path| {
6528 path.data.pending_on_path_challenge || !path.data.path_responses.is_empty()
6529 });
6530
6531 let other = self.streams.can_send_stream_data()
6533 || self
6534 .datagrams
6535 .outgoing
6536 .front()
6537 .is_some_and(|x| x.size(true) <= max_size);
6538
6539 SendableFrames {
6541 acks: false,
6542 close: false,
6543 space_specific,
6544 other,
6545 }
6546 }
6547
6548 fn kill(&mut self, reason: ConnectionError) {
6550 self.close_common();
6551 self.state.move_to_drained(Some(reason));
6552 self.endpoint_events.push_back(EndpointEventInner::Drained);
6555 }
6556
6557 pub fn current_mtu(&self) -> u16 {
6564 self.paths
6565 .iter()
6566 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6567 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6568 .min()
6569 .expect("There is always at least one available path")
6570 }
6571
6572 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6579 let pn_len = PacketNumber::new(
6580 pn,
6581 self.spaces[SpaceId::Data]
6582 .for_path(path)
6583 .largest_acked_packet
6584 .unwrap_or(0),
6585 )
6586 .len();
6587
6588 1 + self
6590 .remote_cids
6591 .get(&path)
6592 .map(|cids| cids.active().len())
6593 .unwrap_or(20) + pn_len
6595 + self.tag_len_1rtt()
6596 }
6597
6598 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6599 let pn_len = 4;
6600
6601 let cid_len = self
6602 .remote_cids
6603 .values()
6604 .map(|cids| cids.active().len())
6605 .max()
6606 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6610 }
6611
6612 fn tag_len_1rtt(&self) -> usize {
6613 let packet_crypto = self
6615 .crypto_state
6616 .encryption_keys(SpaceKind::Data, self.side.side())
6617 .map(|(_header, packet, _level)| packet);
6618 packet_crypto.map_or(16, |x| x.tag_len())
6622 }
6623
6624 fn on_path_validated(&mut self, path_id: PathId) {
6626 self.path_data_mut(path_id).validated = true;
6627 let ConnectionSide::Server { server_config } = &self.side else {
6628 return;
6629 };
6630 let network_path = self.path_data(path_id).network_path;
6631 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6632 new_tokens.clear();
6633 for _ in 0..server_config.validation_token.sent {
6634 new_tokens.push(network_path);
6635 }
6636 }
6637
6638 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6640 if let Some(path) = self.paths.get_mut(&path_id) {
6641 path.data.status.remote_update(status, status_seq_no);
6642 } else {
6643 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6644 }
6645 self.events.push_back(
6646 PathEvent::RemoteStatus {
6647 id: path_id,
6648 status,
6649 }
6650 .into(),
6651 );
6652 }
6653
6654 fn max_path_id(&self) -> Option<PathId> {
6663 if self.is_multipath_negotiated() {
6664 Some(self.remote_max_path_id.min(self.local_max_path_id))
6665 } else {
6666 None
6667 }
6668 }
6669
6670 fn is_ipv6(&self) -> bool {
6675 self.paths
6676 .values()
6677 .any(|p| p.data.network_path.remote.is_ipv6())
6678 }
6679
6680 pub fn add_nat_traversal_address(
6682 &mut self,
6683 address: SocketAddr,
6684 ) -> Result<(), n0_nat_traversal::Error> {
6685 if let Some(added) = self.n0_nat_traversal.add_local_address(address)? {
6686 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6687 };
6688 Ok(())
6689 }
6690
6691 pub fn remove_nat_traversal_address(
6695 &mut self,
6696 address: SocketAddr,
6697 ) -> Result<(), n0_nat_traversal::Error> {
6698 if let Some(removed) = self.n0_nat_traversal.remove_local_address(address)? {
6699 self.spaces[SpaceId::Data]
6700 .pending
6701 .remove_address
6702 .insert(removed);
6703 }
6704 Ok(())
6705 }
6706
6707 pub fn get_local_nat_traversal_addresses(
6709 &self,
6710 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6711 self.n0_nat_traversal.get_local_nat_traversal_addresses()
6712 }
6713
6714 pub fn get_remote_nat_traversal_addresses(
6716 &self,
6717 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6718 Ok(self
6719 .n0_nat_traversal
6720 .client_side()?
6721 .get_remote_nat_traversal_addresses())
6722 }
6723
6724 fn open_nat_traversal_path(
6728 &mut self,
6729 now: Instant,
6730 ip_port: (IpAddr, u16),
6731 ) -> Result<Option<(PathId, SocketAddr)>, PathError> {
6732 let remote = ip_port.into();
6733 let network_path = FourTuple {
6738 remote,
6739 local_ip: None,
6740 };
6741 match self.open_path_ensure(network_path, PathStatus::Backup, now) {
6742 Ok((path_id, path_was_known)) => {
6743 if path_was_known {
6744 trace!(%path_id, %remote, "nat traversal: path existed for remote");
6745 }
6746 Ok(Some((path_id, remote)))
6747 }
6748 Err(e) => {
6749 debug!(%remote, %e, "nat traversal: failed to probe remote");
6750 Err(e)
6751 }
6752 }
6753 }
6754
6755 pub fn initiate_nat_traversal_round(
6765 &mut self,
6766 now: Instant,
6767 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6768 if self.state.is_closed() {
6769 return Err(n0_nat_traversal::Error::Closed);
6770 }
6771
6772 let ipv6 = self.is_ipv6();
6773 let client_state = self.n0_nat_traversal.client_side_mut()?;
6774 let n0_nat_traversal::NatTraversalRound {
6775 new_round,
6776 reach_out_at,
6777 addresses_to_probe,
6778 prev_round_path_ids,
6779 } = client_state.initiate_nat_traversal_round(ipv6)?;
6780
6781 trace!(%new_round, reach_out=reach_out_at.len(), to_probe=addresses_to_probe.len(),
6782 "initiating nat traversal round");
6783
6784 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6785
6786 for path_id in prev_round_path_ids {
6787 let Some(path) = self.path(path_id) else {
6788 continue;
6789 };
6790 let ip = path.network_path.remote.ip();
6791 let port = path.network_path.remote.port();
6792
6793 if !addresses_to_probe
6797 .iter()
6798 .any(|(_, probe)| *probe == (ip, port))
6799 && !path.validated
6800 && !self.abandoned_paths.contains(&path_id)
6801 {
6802 trace!(%path_id, "closing path from previous round");
6803 let _ =
6804 self.close_path_inner(now, path_id, PathAbandonReason::NatTraversalRoundEnded);
6805 }
6806 }
6807
6808 let mut err = None;
6809
6810 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6811 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6812
6813 for (id, address) in addresses_to_probe {
6814 match self.open_nat_traversal_path(now, address) {
6815 Ok(None) => {}
6816 Ok(Some((path_id, remote))) => {
6817 path_ids.push(path_id);
6818 probed_addresses.push(remote);
6819 }
6820 Err(e) => {
6821 self.n0_nat_traversal
6822 .client_side_mut()
6823 .expect("validated")
6824 .report_in_continuation(id, e);
6825 err.get_or_insert(e);
6826 }
6827 }
6828 }
6829
6830 if let Some(err) = err {
6831 if probed_addresses.is_empty() {
6833 return Err(n0_nat_traversal::Error::Multipath(err));
6834 }
6835 }
6836
6837 self.n0_nat_traversal
6838 .client_side_mut()
6839 .expect("connection side validated")
6840 .set_round_path_ids(path_ids);
6841
6842 Ok(probed_addresses)
6843 }
6844
6845 fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6850 let ipv6 = self.is_ipv6();
6851 let client_state = self.n0_nat_traversal.client_side_mut().ok()?;
6852 let (id, address) = client_state.continue_nat_traversal_round(ipv6)?;
6853 let open_result = self.open_nat_traversal_path(now, address);
6854 let client_state = self.n0_nat_traversal.client_side_mut().expect("validated");
6855 match open_result {
6856 Ok(None) => Some(true),
6857 Ok(Some((path_id, _remote))) => {
6858 client_state.add_round_path_id(path_id);
6859 Some(true)
6860 }
6861 Err(e) => {
6862 client_state.report_in_continuation(id, e);
6863 Some(false)
6864 }
6865 }
6866 }
6867}
6868
6869impl fmt::Debug for Connection {
6870 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
6871 f.debug_struct("Connection")
6872 .field("handshake_cid", &self.handshake_cid)
6873 .finish()
6874 }
6875}
6876
6877pub trait NetworkChangeHint: std::fmt::Debug + 'static {
6879 fn is_path_recoverable(&self, path_id: PathId, network_path: FourTuple) -> bool;
6888}
6889
6890#[derive(Debug)]
6892enum PollPathStatus {
6893 NothingToSend {
6895 congestion_blocked: bool,
6897 },
6898 Send(Transmit),
6900}
6901
6902#[derive(Debug)]
6904enum PollPathSpaceStatus {
6905 NothingToSend {
6907 congestion_blocked: bool,
6909 },
6910 WrotePacket {
6912 last_packet_number: u64,
6914 pad_datagram: PadDatagram,
6928 },
6929 Send {
6936 last_packet_number: u64,
6938 },
6939}
6940
6941#[derive(Debug, Copy, Clone)]
6947struct PathSchedulingInfo {
6948 is_abandoned: bool,
6954 may_send_data: bool,
6972 may_send_close: bool,
6978 may_self_abandon: bool,
6979}
6980
6981#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6982enum PathBlocked {
6983 No,
6984 AntiAmplification,
6985 Congestion,
6986 Pacing,
6987}
6988
6989enum ConnectionSide {
6991 Client {
6992 token: Bytes,
6994 token_store: Arc<dyn TokenStore>,
6995 server_name: String,
6996 },
6997 Server {
6998 server_config: Arc<ServerConfig>,
6999 },
7000}
7001
7002impl ConnectionSide {
7003 fn remote_may_migrate(&self, state: &State) -> bool {
7004 match self {
7005 Self::Server { server_config } => server_config.migration,
7006 Self::Client { .. } => {
7007 if let Some(hs) = state.as_handshake() {
7008 hs.allow_server_migration
7009 } else {
7010 false
7011 }
7012 }
7013 }
7014 }
7015
7016 fn is_client(&self) -> bool {
7017 self.side().is_client()
7018 }
7019
7020 fn is_server(&self) -> bool {
7021 self.side().is_server()
7022 }
7023
7024 fn side(&self) -> Side {
7025 match *self {
7026 Self::Client { .. } => Side::Client,
7027 Self::Server { .. } => Side::Server,
7028 }
7029 }
7030}
7031
7032impl From<SideArgs> for ConnectionSide {
7033 fn from(side: SideArgs) -> Self {
7034 match side {
7035 SideArgs::Client {
7036 token_store,
7037 server_name,
7038 } => Self::Client {
7039 token: token_store.take(&server_name).unwrap_or_default(),
7040 token_store,
7041 server_name,
7042 },
7043 SideArgs::Server {
7044 server_config,
7045 pref_addr_cid: _,
7046 path_validated: _,
7047 } => Self::Server { server_config },
7048 }
7049 }
7050}
7051
7052pub(crate) enum SideArgs {
7054 Client {
7055 token_store: Arc<dyn TokenStore>,
7056 server_name: String,
7057 },
7058 Server {
7059 server_config: Arc<ServerConfig>,
7060 pref_addr_cid: Option<ConnectionId>,
7061 path_validated: bool,
7062 },
7063}
7064
7065impl SideArgs {
7066 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
7067 match *self {
7068 Self::Client { .. } => None,
7069 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
7070 }
7071 }
7072
7073 pub(crate) fn path_validated(&self) -> bool {
7074 match *self {
7075 Self::Client { .. } => true,
7076 Self::Server { path_validated, .. } => path_validated,
7077 }
7078 }
7079
7080 pub(crate) fn side(&self) -> Side {
7081 match *self {
7082 Self::Client { .. } => Side::Client,
7083 Self::Server { .. } => Side::Server,
7084 }
7085 }
7086}
7087
7088#[derive(Debug, Error, Clone, PartialEq, Eq)]
7090pub enum ConnectionError {
7091 #[error("peer doesn't implement any supported version")]
7093 VersionMismatch,
7094 #[error(transparent)]
7096 TransportError(#[from] TransportError),
7097 #[error("aborted by peer: {0}")]
7099 ConnectionClosed(frame::ConnectionClose),
7100 #[error("closed by peer: {0}")]
7102 ApplicationClosed(frame::ApplicationClose),
7103 #[error("reset by peer")]
7105 Reset,
7106 #[error("timed out")]
7112 TimedOut,
7113 #[error("closed")]
7115 LocallyClosed,
7116 #[error("CIDs exhausted")]
7120 CidsExhausted,
7121}
7122
7123impl From<Close> for ConnectionError {
7124 fn from(x: Close) -> Self {
7125 match x {
7126 Close::Connection(reason) => Self::ConnectionClosed(reason),
7127 Close::Application(reason) => Self::ApplicationClosed(reason),
7128 }
7129 }
7130}
7131
7132impl From<ConnectionError> for io::Error {
7134 fn from(x: ConnectionError) -> Self {
7135 use ConnectionError::*;
7136 let kind = match x {
7137 TimedOut => io::ErrorKind::TimedOut,
7138 Reset => io::ErrorKind::ConnectionReset,
7139 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
7140 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
7141 io::ErrorKind::Other
7142 }
7143 };
7144 Self::new(kind, x)
7145 }
7146}
7147
7148#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
7151pub enum PathError {
7152 #[error("multipath extension not negotiated")]
7154 MultipathNotNegotiated,
7155 #[error("the server side may not open a path")]
7157 ServerSideNotAllowed,
7158 #[error("maximum number of concurrent paths reached")]
7160 MaxPathIdReached,
7161 #[error("remoted CIDs exhausted")]
7163 RemoteCidsExhausted,
7164 #[error("path validation failed")]
7166 ValidationFailed,
7167 #[error("invalid remote address")]
7169 InvalidRemoteAddress(SocketAddr),
7170}
7171
7172#[derive(Debug, Error, Clone, Eq, PartialEq)]
7174pub enum ClosePathError {
7175 #[error("Multipath extension not negotiated")]
7177 MultipathNotNegotiated,
7178 #[error("closed path")]
7180 ClosedPath,
7181 #[error("last open path")]
7183 LastOpenPath,
7184}
7185
7186#[derive(Debug, Error, Clone, Copy)]
7188#[error("Multipath extension not negotiated")]
7189pub struct MultipathNotNegotiated {
7190 _private: (),
7191}
7192
7193#[derive(Debug)]
7195pub enum Event {
7196 HandshakeDataReady,
7198 Connected,
7200 HandshakeConfirmed,
7202 ConnectionLost {
7206 reason: ConnectionError,
7208 },
7209 Stream(StreamEvent),
7211 DatagramReceived,
7213 DatagramsUnblocked,
7215 Path(PathEvent),
7217 NatTraversal(n0_nat_traversal::Event),
7219}
7220
7221impl From<PathEvent> for Event {
7222 fn from(source: PathEvent) -> Self {
7223 Self::Path(source)
7224 }
7225}
7226
7227fn get_max_ack_delay(params: &TransportParameters) -> Duration {
7228 Duration::from_micros(params.max_ack_delay.0 * 1000)
7229}
7230
7231const MAX_BACKOFF_EXPONENT: u32 = 16;
7233
7234const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
7242
7243const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
7249 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
7250
7251#[derive(Default)]
7252struct SentFrames {
7253 retransmits: ThinRetransmits,
7254 largest_acked: FxHashMap<PathId, u64>,
7256 stream_frames: StreamMetaVec,
7257 non_retransmits: bool,
7259 requires_padding: bool,
7261}
7262
7263impl SentFrames {
7264 fn is_ack_only(&self, streams: &StreamsState) -> bool {
7266 !self.largest_acked.is_empty()
7267 && !self.non_retransmits
7268 && self.stream_frames.is_empty()
7269 && self.retransmits.is_empty(streams)
7270 }
7271
7272 fn retransmits_mut(&mut self) -> &mut Retransmits {
7273 self.retransmits.get_or_create()
7274 }
7275
7276 fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
7277 use frame::EncodableFrame::*;
7278 match frame {
7279 PathAck(path_ack_encoder) => {
7280 if let Some(max) = path_ack_encoder.ranges.max() {
7281 self.largest_acked.insert(path_ack_encoder.path_id, max);
7282 }
7283 }
7284 Ack(ack_encoder) => {
7285 if let Some(max) = ack_encoder.ranges.max() {
7286 self.largest_acked.insert(PathId::ZERO, max);
7287 }
7288 }
7289 Close(_) => { }
7290 PathResponse(_) => self.non_retransmits = true,
7291 HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
7292 ReachOut(frame::ReachOut { round, ip, port }) => {
7293 let (recorded_round, reach_outs) = self
7294 .retransmits_mut()
7295 .reach_out
7296 .get_or_insert_with(|| (round, FxHashSet::default()));
7297 if *recorded_round == round {
7299 reach_outs.insert((ip, port));
7301 } else if *recorded_round < round {
7302 *recorded_round = round;
7304 reach_outs.drain();
7305 reach_outs.insert((ip, port));
7306 } else {
7307 }
7309 }
7310
7311 ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
7312 Ping(_) => self.non_retransmits = true,
7313 ImmediateAck(_) => self.non_retransmits = true,
7314 AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
7315 PathChallenge(_) => self.non_retransmits = true,
7316 Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
7317 PathAbandon(path_abandon) => {
7318 self.retransmits_mut()
7319 .path_abandon
7320 .entry(path_abandon.path_id)
7321 .or_insert(path_abandon.error_code);
7322 }
7323 PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
7324 | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
7325 self.retransmits_mut().path_status.insert(path_id);
7326 }
7327 MaxPathId(_) => self.retransmits_mut().max_path_id = true,
7328 PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
7329 PathCidsBlocked(path_cids_blocked) => {
7330 self.retransmits_mut()
7331 .path_cids_blocked
7332 .insert(path_cids_blocked.path_id);
7333 }
7334 ResetStream(reset) => self
7335 .retransmits_mut()
7336 .reset_stream
7337 .push((reset.id, reset.error_code)),
7338 StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
7339 NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
7340 RetireConnectionId(retire_cid) => self
7341 .retransmits_mut()
7342 .retire_cids
7343 .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
7344 Datagram(_) => self.non_retransmits = true,
7345 NewToken(_) => {}
7346 AddAddress(add_address) => {
7347 self.retransmits_mut().add_address.insert(add_address);
7348 }
7349 RemoveAddress(remove_address) => {
7350 self.retransmits_mut().remove_address.insert(remove_address);
7351 }
7352 StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
7353 MaxData(_) => self.retransmits_mut().max_data = true,
7354 MaxStreamData(max) => {
7355 self.retransmits_mut().max_stream_data.insert(max.id);
7356 }
7357 MaxStreams(max_streams) => {
7358 self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
7359 }
7360 }
7361 }
7362}
7363
7364fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
7372 match (x, y) {
7373 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
7374 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
7375 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
7376 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
7377 }
7378}
7379
7380#[cfg(test)]
7381mod tests {
7382 use super::*;
7383
7384 #[test]
7385 fn negotiate_max_idle_timeout_commutative() {
7386 let test_params = [
7387 (None, None, None),
7388 (None, Some(VarInt(0)), None),
7389 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7390 (Some(VarInt(0)), Some(VarInt(0)), None),
7391 (
7392 Some(VarInt(2)),
7393 Some(VarInt(0)),
7394 Some(Duration::from_millis(2)),
7395 ),
7396 (
7397 Some(VarInt(1)),
7398 Some(VarInt(4)),
7399 Some(Duration::from_millis(1)),
7400 ),
7401 ];
7402
7403 for (left, right, result) in test_params {
7404 assert_eq!(negotiate_max_idle_timeout(left, right), result);
7405 assert_eq!(negotiate_max_idle_timeout(right, left), result);
7406 }
7407 }
7408}