1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::SocketAddr,
7 num::{NonZeroU32, NonZeroUsize},
8 sync::Arc,
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13
14use rand::{RngExt, SeedableRng, rngs::StdRng};
15use rustc_hash::FxHashMap;
16use thiserror::Error;
17use tracing::{debug, error, trace, trace_span, warn};
18
19use crate::{
20 Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
21 MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
22 TransportError, TransportErrorCode, VarInt,
23 cid_generator::ConnectionIdGenerator,
24 cid_queue::CidQueue,
25 config::{ServerConfig, TransportConfig},
26 congestion::Controller,
27 connection::{
28 paths::PathRetransmits,
29 qlog::{QlogRecvPacket, QlogSink},
30 spaces::LostPacket,
31 stats::PathStatsMap,
32 timer::{ConnTimer, PathTimer},
33 },
34 crypto::{self, Keys},
35 frame::{
36 self, Close, DataBlocked, Datagram, FrameStruct, NewToken, ObservedAddr, StreamDataBlocked,
37 StreamsBlocked,
38 },
39 n0_nat_traversal,
40 packet::{
41 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
42 PacketNumber, PartialDecode, SpaceId,
43 },
44 range_set::ArrayRangeSet,
45 shared::{
46 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
47 EndpointEvent, EndpointEventInner,
48 },
49 token::{ResetToken, Token, TokenPayload},
50 transport_parameters::TransportParameters,
51};
52
53mod ack_frequency;
54use ack_frequency::AckFrequencyState;
55
56mod assembler;
57pub use assembler::Chunk;
58
59mod cid_state;
60use cid_state::CidState;
61
62mod datagrams;
63use datagrams::DatagramState;
64pub use datagrams::{Datagrams, SendDatagramError};
65
66mod mtud;
67mod pacing;
68
69mod packet_builder;
70use packet_builder::{PacketBuilder, PadDatagram};
71
72mod packet_crypto;
73use packet_crypto::CryptoState;
74pub(crate) use packet_crypto::EncryptionLevel;
75
76mod paths;
77pub use paths::{
78 ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError,
79};
80use paths::{PathData, PathState};
81
82pub(crate) mod qlog;
83pub(crate) mod send_buffer;
84
85pub(crate) mod spaces;
86#[cfg(fuzzing)]
87pub use spaces::Retransmits;
88#[cfg(not(fuzzing))]
89use spaces::Retransmits;
90pub(crate) use spaces::SpaceKind;
91use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
92
93mod stats;
94pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
95
96mod streams;
97#[cfg(fuzzing)]
98pub use streams::StreamsState;
99#[cfg(not(fuzzing))]
100use streams::StreamsState;
101pub use streams::{
102 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
103 ShouldTransmit, StreamEvent, Streams, WriteError,
104};
105
106mod timer;
107use timer::{Timer, TimerTable};
108
109mod transmit_buf;
110use transmit_buf::TransmitBuf;
111
112mod state;
113
114#[cfg(not(fuzzing))]
115use state::State;
116#[cfg(fuzzing)]
117pub use state::State;
118use state::StateType;
119
120pub struct Connection {
160 endpoint_config: Arc<EndpointConfig>,
161 config: Arc<TransportConfig>,
162 rng: StdRng,
163 crypto_state: CryptoState,
165 handshake_cid: ConnectionId,
167 remote_handshake_cid: ConnectionId,
169 paths: BTreeMap<PathId, PathState>,
175 path_generation_counter: u64,
186 allow_mtud: bool,
188 state: State,
189 side: ConnectionSide,
190 peer_params: TransportParameters,
192 original_remote_cid: ConnectionId,
194 initial_dst_cid: ConnectionId,
196 retry_src_cid: Option<ConnectionId>,
199 events: VecDeque<Event>,
201 endpoint_events: VecDeque<EndpointEventInner>,
202 spin_enabled: bool,
204 spin: bool,
206 spaces: [PacketSpace; 3],
208 highest_space: SpaceKind,
210 idle_timeout: Option<Duration>,
212 timers: TimerTable,
213 authentication_failures: u64,
215
216 connection_close_pending: bool,
221
222 ack_frequency: AckFrequencyState,
226
227 receiving_ecn: bool,
232 total_authed_packets: u64,
234
235 next_observed_addr_seq_no: VarInt,
240
241 streams: StreamsState,
242 remote_cids: FxHashMap<PathId, CidQueue>,
248 local_cid_state: FxHashMap<PathId, CidState>,
255 datagrams: DatagramState,
257 path_stats: PathStatsMap,
259 partial_stats: ConnectionStats,
265 version: u32,
267
268 max_concurrent_paths: NonZeroU32,
277 local_max_path_id: PathId,
292 remote_max_path_id: PathId,
298 max_path_id_with_cids: PathId,
304 abandoned_paths: AbandonedPaths,
310
311 n0_nat_traversal: n0_nat_traversal::State,
313 qlog: QlogSink,
314}
315
316impl Connection {
317 pub(crate) fn new(
318 endpoint_config: Arc<EndpointConfig>,
319 config: Arc<TransportConfig>,
320 init_cid: ConnectionId,
321 local_cid: ConnectionId,
322 remote_cid: ConnectionId,
323 network_path: FourTuple,
324 crypto: Box<dyn crypto::Session>,
325 cid_gen: &dyn ConnectionIdGenerator,
326 now: Instant,
327 version: u32,
328 allow_mtud: bool,
329 rng_seed: [u8; 32],
330 side_args: SideArgs,
331 qlog: QlogSink,
332 ) -> Self {
333 let pref_addr_cid = side_args.pref_addr_cid();
334 let path_validated = side_args.path_validated();
335 let connection_side = ConnectionSide::from(side_args);
336 let side = connection_side.side();
337 let mut rng = StdRng::from_seed(rng_seed);
338 let initial_space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
339 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
340 #[cfg(test)]
341 let data_space = match config.deterministic_packet_numbers {
342 true => PacketSpace::new_deterministic(now, SpaceId::Data),
343 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
344 };
345 #[cfg(not(test))]
346 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
347 let state = State::handshake(state::Handshake {
348 remote_cid_set: side.is_server(),
349 expected_token: Bytes::new(),
350 client_hello: None,
351 allow_server_migration: side.is_client() && config.server_handshake_migration,
352 });
353 let local_cid_state = FxHashMap::from_iter([(
354 PathId::ZERO,
355 CidState::new(
356 cid_gen.cid_len(),
357 cid_gen.cid_lifetime(),
358 now,
359 if pref_addr_cid.is_some() { 2 } else { 1 },
360 ),
361 )]);
362
363 let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
364 path.open_status = paths::OpenStatus::Informed;
365 let mut this = Self {
366 endpoint_config,
367 crypto_state: CryptoState::new(crypto, init_cid, side, &mut rng),
368 handshake_cid: local_cid,
369 remote_handshake_cid: remote_cid,
370 local_cid_state,
371 paths: BTreeMap::from_iter([(
372 PathId::ZERO,
373 PathState {
374 data: path,
375 prev: None,
376 },
377 )]),
378 path_generation_counter: 0,
379 allow_mtud,
380 state,
381 side: connection_side,
382 peer_params: TransportParameters::default(),
383 original_remote_cid: remote_cid,
384 initial_dst_cid: init_cid,
385 retry_src_cid: None,
386 events: VecDeque::new(),
387 endpoint_events: VecDeque::new(),
388 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
389 spin: false,
390 spaces: [initial_space, handshake_space, data_space],
391 highest_space: SpaceKind::Initial,
392 idle_timeout: match config.max_idle_timeout {
393 None | Some(VarInt(0)) => None,
394 Some(dur) => Some(Duration::from_millis(dur.0)),
395 },
396 timers: TimerTable::default(),
397 authentication_failures: 0,
398 connection_close_pending: false,
399
400 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
401 &TransportParameters::default(),
402 )),
403
404 receiving_ecn: false,
405 total_authed_packets: 0,
406
407 next_observed_addr_seq_no: 0u32.into(),
408
409 streams: StreamsState::new(
410 side,
411 config.max_concurrent_uni_streams,
412 config.max_concurrent_bidi_streams,
413 config.send_window,
414 config.receive_window,
415 config.stream_receive_window,
416 ),
417 datagrams: DatagramState::default(),
418 config,
419 remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]),
420 rng,
421 path_stats: Default::default(),
422 partial_stats: ConnectionStats::default(),
423 version,
424
425 max_concurrent_paths: NonZeroU32::MIN,
427 local_max_path_id: PathId::ZERO,
428 remote_max_path_id: PathId::ZERO,
429 max_path_id_with_cids: PathId::ZERO,
430 abandoned_paths: Default::default(),
431
432 n0_nat_traversal: Default::default(),
433 qlog,
434 };
435 if path_validated {
436 this.on_path_validated(PathId::ZERO);
437 }
438 if side.is_client() {
439 this.write_crypto();
441 this.init_0rtt(now);
442 }
443 this.qlog
444 .emit_tuple_assigned(PathId::ZERO, network_path, now);
445 this
446 }
447
448 #[must_use]
456 pub fn poll_timeout(&mut self) -> Option<Instant> {
457 self.timers.peek()
458 }
459
460 #[must_use]
466 pub fn poll(&mut self) -> Option<Event> {
467 if let Some(x) = self.events.pop_front() {
468 return Some(x);
469 }
470
471 if let Some(event) = self.streams.poll() {
472 return Some(Event::Stream(event));
473 }
474
475 if let Some(reason) = self.state.take_error() {
476 return Some(Event::ConnectionLost { reason });
477 }
478
479 None
480 }
481
482 #[must_use]
484 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
485 self.endpoint_events.pop_front().map(EndpointEvent)
486 }
487
488 #[must_use]
490 pub fn streams(&mut self) -> Streams<'_> {
491 Streams {
492 state: &mut self.streams,
493 conn_state: &self.state,
494 }
495 }
496
497 #[must_use]
499 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
500 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
501 RecvStream {
502 id,
503 state: &mut self.streams,
504 pending: &mut self.spaces[SpaceId::Data].pending,
505 }
506 }
507
508 #[must_use]
510 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
511 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
512 SendStream {
513 id,
514 state: &mut self.streams,
515 pending: &mut self.spaces[SpaceId::Data].pending,
516 conn_state: &self.state,
517 }
518 }
519
520 pub fn open_path_ensure(
537 &mut self,
538 network_path: FourTuple,
539 initial_status: PathStatus,
540 now: Instant,
541 ) -> Result<(PathId, bool), PathError> {
542 let existing_open_path = self.paths.iter().find(|(id, path)| {
543 network_path.is_probably_same_path(&path.data.network_path)
544 && !self.abandoned_paths.contains(id)
545 });
546 match existing_open_path {
547 Some((path_id, _state)) => Ok((*path_id, true)),
548 None => Ok((self.open_path(network_path, initial_status, now)?, false)),
549 }
550 }
551
552 pub fn open_path(
558 &mut self,
559 network_path: FourTuple,
560 initial_status: PathStatus,
561 now: Instant,
562 ) -> Result<PathId, PathError> {
563 if !self.is_multipath_negotiated() {
564 return Err(PathError::MultipathNotNegotiated);
565 }
566 if self.side().is_server() {
567 return Err(PathError::ServerSideNotAllowed);
568 }
569
570 let max_abandoned = self.abandoned_paths.max();
571 let max_used = self.paths.keys().last().copied();
572 let path_id = max_abandoned
573 .max(max_used)
574 .unwrap_or(PathId::ZERO)
575 .saturating_add(1u8);
576
577 if Some(path_id) > self.max_path_id() {
578 return Err(PathError::MaxPathIdReached);
579 }
580 if path_id > self.remote_max_path_id {
581 self.spaces[SpaceId::Data].pending.paths_blocked = true;
582 return Err(PathError::MaxPathIdReached);
583 }
584 if self
585 .remote_cids
586 .get(&path_id)
587 .map(CidQueue::active)
588 .is_none()
589 {
590 self.spaces[SpaceId::Data]
591 .pending
592 .path_cids_blocked
593 .insert(path_id);
594 return Err(PathError::RemoteCidsExhausted);
595 }
596
597 let path = self.ensure_path(path_id, network_path, now, None);
598 path.status.local_update(initial_status);
599
600 Ok(path_id)
601 }
602
603 pub fn close_path(
609 &mut self,
610 now: Instant,
611 path_id: PathId,
612 error_code: VarInt,
613 ) -> Result<(), ClosePathError> {
614 self.close_path_inner(
615 now,
616 path_id,
617 PathAbandonReason::ApplicationClosed { error_code },
618 )
619 }
620
621 pub(crate) fn close_path_inner(
626 &mut self,
627 now: Instant,
628 path_id: PathId,
629 reason: PathAbandonReason,
630 ) -> Result<(), ClosePathError> {
631 if self.state.is_drained() {
632 return Ok(());
633 }
634
635 if !self.is_multipath_negotiated() {
636 return Err(ClosePathError::MultipathNotNegotiated);
637 }
638 if self.abandoned_paths.contains(&path_id)
639 || Some(path_id) > self.max_path_id()
640 || !self.paths.contains_key(&path_id)
641 {
642 return Err(ClosePathError::ClosedPath);
643 }
644
645 let is_last_path = !self
646 .paths
647 .keys()
648 .any(|id| *id != path_id && !self.abandoned_paths.contains(id));
649
650 if is_last_path && !reason.is_remote() {
651 return Err(ClosePathError::LastOpenPath);
652 }
653
654 self.abandon_path(now, path_id, reason);
655
656 if is_last_path {
660 let rtt = RttEstimator::new(self.config.initial_rtt);
664 let pto = rtt.pto_base() + self.ack_frequency.max_ack_delay_for_pto();
665 let grace = pto * 3;
666 self.timers.set(
667 Timer::Conn(ConnTimer::NoAvailablePath),
668 now + grace,
669 self.qlog.with_time(now),
670 );
671 }
672
673 Ok(())
674 }
675
676 fn abandon_path(&mut self, now: Instant, path_id: PathId, reason: PathAbandonReason) {
681 trace!(%path_id, ?reason, "abandoning path");
682
683 let pending_space = &mut self.spaces[SpaceId::Data].pending;
684 pending_space
686 .path_abandon
687 .insert(path_id, reason.error_code());
688
689 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
691 pending_space.path_cids_blocked.retain(|&id| id != path_id);
692 pending_space.path_status.retain(|&id| id != path_id);
693
694 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
696 for sent_packet in space.sent_packets.values_mut() {
697 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
698 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
699 retransmits.path_cids_blocked.retain(|&id| id != path_id);
700 retransmits.path_status.retain(|&id| id != path_id);
701 }
702 }
703 }
704
705 self.spaces[SpaceId::Data].for_path(path_id).loss_probes = 0;
710
711 debug_assert!(!self.state.is_drained()); self.endpoint_events
716 .push_back(EndpointEventInner::RetireResetToken(path_id));
717
718 self.abandoned_paths.insert(path_id);
719
720 for timer in timer::PathTimer::VALUES {
721 let keep_timer = match timer {
723 PathTimer::PathValidationFailed
727 | PathTimer::PathChallengeLost
728 | PathTimer::AbandonFromValidation => false,
729 PathTimer::PathKeepAlive | PathTimer::PathIdle => false,
732 PathTimer::MaxAckDelay => false,
735 PathTimer::PathDrained => false,
738 PathTimer::LossDetection => true,
741 PathTimer::Pacing => true,
745 };
746
747 if !keep_timer {
748 let qlog = self.qlog.with_time(now);
749 self.timers.stop(Timer::PerPath(path_id, timer), qlog);
750 }
751 }
752
753 self.set_loss_detection_timer(now, path_id);
758
759 self.events.push_back(Event::Path(PathEvent::Abandoned {
761 id: path_id,
762 reason,
763 }));
764 }
765
766 #[track_caller]
770 fn path_data(&self, path_id: PathId) -> &PathData {
771 if let Some(data) = self.paths.get(&path_id) {
772 &data.data
773 } else {
774 panic!(
775 "unknown path: {path_id}, currently known paths: {:?}",
776 self.paths.keys().collect::<Vec<_>>()
777 );
778 }
779 }
780
781 #[track_caller]
785 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
786 &mut self.paths.get_mut(&path_id).expect("known path").data
787 }
788
789 fn path(&self, path_id: PathId) -> Option<&PathData> {
791 self.paths.get(&path_id).map(|path_state| &path_state.data)
792 }
793
794 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
796 self.paths
797 .get_mut(&path_id)
798 .map(|path_state| &mut path_state.data)
799 }
800
801 pub fn paths(&self) -> Vec<PathId> {
805 self.paths.keys().copied().collect()
806 }
807
808 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
810 self.path(path_id)
811 .map(PathData::local_status)
812 .ok_or(ClosedPath { _private: () })
813 }
814
815 pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
817 self.path(path_id)
818 .map(|path| path.network_path)
819 .ok_or(ClosedPath { _private: () })
820 }
821
822 pub fn set_path_status(
826 &mut self,
827 path_id: PathId,
828 status: PathStatus,
829 ) -> Result<PathStatus, SetPathStatusError> {
830 if !self.is_multipath_negotiated() {
831 return Err(SetPathStatusError::MultipathNotNegotiated);
832 }
833 let path = self
834 .path_mut(path_id)
835 .ok_or(SetPathStatusError::ClosedPath)?;
836 let prev = match path.status.local_update(status) {
837 Some(prev) => {
838 self.spaces[SpaceId::Data]
839 .pending
840 .path_status
841 .insert(path_id);
842 prev
843 }
844 None => path.local_status(),
845 };
846 Ok(prev)
847 }
848
849 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
854 self.path(path_id).and_then(|path| path.remote_status())
855 }
856
857 pub fn set_path_max_idle_timeout(
866 &mut self,
867 now: Instant,
868 path_id: PathId,
869 timeout: Option<Duration>,
870 ) -> Result<Option<Duration>, ClosedPath> {
871 let path = self
872 .paths
873 .get_mut(&path_id)
874 .ok_or(ClosedPath { _private: () })?;
875 let prev = std::mem::replace(&mut path.data.idle_timeout, timeout);
876
877 if !self.state.is_closed() {
879 if let Some(new_timeout) = timeout {
880 let timer = Timer::PerPath(path_id, PathTimer::PathIdle);
881 let deadline = match (prev, self.timers.get(timer)) {
882 (Some(old_timeout), Some(old_deadline)) => {
883 let last_activity = old_deadline.checked_sub(old_timeout).unwrap_or(now);
884 last_activity + new_timeout
885 }
886 _ => now + new_timeout,
887 };
888 self.timers.set(timer, deadline, self.qlog.with_time(now));
889 } else {
890 self.timers.stop(
891 Timer::PerPath(path_id, PathTimer::PathIdle),
892 self.qlog.with_time(now),
893 );
894 }
895 }
896
897 Ok(prev)
898 }
899
900 pub fn set_path_keep_alive_interval(
906 &mut self,
907 path_id: PathId,
908 interval: Option<Duration>,
909 ) -> Result<Option<Duration>, ClosedPath> {
910 let path = self
911 .paths
912 .get_mut(&path_id)
913 .ok_or(ClosedPath { _private: () })?;
914 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
915 }
916
917 fn find_validated_path_on_network_path(
921 &self,
922 network_path: FourTuple,
923 ) -> Option<(&PathId, &PathState)> {
924 self.paths.iter().find(|(path_id, path_state)| {
925 path_state.data.validated
926 && network_path.is_probably_same_path(&path_state.data.network_path)
928 && !self.abandoned_paths.contains(path_id)
929 })
930 }
934
935 fn ensure_path(
939 &mut self,
940 path_id: PathId,
941 network_path: FourTuple,
942 now: Instant,
943 pn: Option<u64>,
944 ) -> &mut PathData {
945 let valid_path = self.find_validated_path_on_network_path(network_path);
946 let validated = valid_path.is_some();
947 let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
948 let vacant_entry = match self.paths.entry(path_id) {
949 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
950 btree_map::Entry::Occupied(occupied_entry) => {
951 return &mut occupied_entry.into_mut().data;
952 }
953 };
954
955 debug!(%validated, %path_id, %network_path, "path added");
956
957 self.timers.stop(
959 Timer::Conn(ConnTimer::NoAvailablePath),
960 self.qlog.with_time(now),
961 );
962 let peer_max_udp_payload_size =
963 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
964 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
965 let mut data = PathData::new(
966 network_path,
967 self.allow_mtud,
968 Some(peer_max_udp_payload_size),
969 self.path_generation_counter,
970 now,
971 &self.config,
972 );
973
974 data.validated = validated;
975 if let Some(initial_rtt) = initial_rtt {
976 data.rtt.reset_initial_rtt(initial_rtt);
977 }
978
979 data.pending_on_path_challenge = true;
982 data.pending.observed_address = self
983 .config
984 .address_discovery_role
985 .should_report(&self.peer_params.address_discovery_role);
986
987 let path = vacant_entry.insert(PathState { data, prev: None });
988
989 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
990 if let Some(pn) = pn {
991 pn_space.dedup.insert(pn);
992 }
993 self.spaces[SpaceId::Data]
994 .number_spaces
995 .insert(path_id, pn_space);
996 self.qlog.emit_tuple_assigned(path_id, network_path, now);
997
998 if !self.remote_cids.contains_key(&path_id) {
1002 debug!(%path_id, "Remote opened path without issuing CIDs");
1003 self.spaces[SpaceId::Data]
1004 .pending
1005 .path_cids_blocked
1006 .insert(path_id);
1007 }
1010
1011 &mut path.data
1012 }
1013
1014 #[must_use]
1024 pub fn poll_transmit(
1025 &mut self,
1026 now: Instant,
1027 max_datagrams: NonZeroUsize,
1028 buf: &mut Vec<u8>,
1029 ) -> Option<Transmit> {
1030 let max_datagrams = match self.config.enable_segmentation_offload {
1031 false => NonZeroUsize::MIN,
1032 true => max_datagrams,
1033 };
1034
1035 let connection_close_pending = match self.state.as_type() {
1041 StateType::Drained => {
1042 for path in self.paths.values_mut() {
1043 path.data.app_limited = true;
1044 }
1045 return None;
1046 }
1047 StateType::Draining | StateType::Closed => {
1048 if !self.connection_close_pending {
1051 for path in self.paths.values_mut() {
1052 path.data.app_limited = true;
1053 }
1054 return None;
1055 }
1056 true
1057 }
1058 _ => false,
1059 };
1060
1061 if let Some(config) = &self.config.ack_frequency_config {
1063 let rtt = self
1064 .paths
1065 .values()
1066 .map(|p| p.data.rtt.get())
1067 .min()
1068 .expect("one path exists");
1069 self.spaces[SpaceId::Data].pending.ack_frequency = self
1070 .ack_frequency
1071 .should_send_ack_frequency(rtt, config, &self.peer_params)
1072 && self.highest_space == SpaceKind::Data
1073 && self.peer_supports_ack_frequency();
1074 }
1075
1076 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1077 while let Some(path_id) = next_path_id {
1078 if !connection_close_pending
1079 && let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id)
1080 {
1081 return Some(transmit);
1082 }
1083
1084 let info = self.scheduling_info(path_id);
1085 if let Some(transmit) = self.poll_transmit_on_path(
1086 now,
1087 buf,
1088 path_id,
1089 max_datagrams,
1090 &info,
1091 connection_close_pending,
1092 ) {
1093 return Some(transmit);
1094 }
1095
1096 debug_assert!(
1099 buf.is_empty(),
1100 "nothing to send on path but buffer not empty"
1101 );
1102
1103 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1104 }
1105
1106 debug_assert!(
1108 buf.is_empty(),
1109 "there was data in the buffer, but it was not sent"
1110 );
1111
1112 if self.state.is_established() {
1113 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1115 while let Some(path_id) = next_path_id {
1116 if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) {
1117 return Some(transmit);
1118 }
1119 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1120 }
1121 }
1122
1123 None
1124 }
1125
1126 fn scheduling_info(&self, path_id: PathId) -> PathSchedulingInfo {
1144 let have_validated_status_available_space = self.paths.iter().any(|(path_id, path)| {
1146 self.remote_cids.contains_key(path_id)
1147 && !self.abandoned_paths.contains(path_id)
1148 && path.data.validated
1149 && path.data.local_status() == PathStatus::Available
1150 });
1151
1152 let have_validated_space = self.paths.iter().any(|(path_id, path)| {
1154 self.remote_cids.contains_key(path_id)
1155 && !self.abandoned_paths.contains(path_id)
1156 && path.data.validated
1157 });
1158
1159 let is_handshaking = self.is_handshaking();
1160 let has_cids = self.remote_cids.contains_key(&path_id);
1161 let is_abandoned = self.abandoned_paths.contains(&path_id);
1162 let path_data = self.path_data(path_id);
1163 let validated = path_data.validated;
1164 let status = path_data.local_status();
1165
1166 let may_send_data = has_cids
1169 && !is_abandoned
1170 && if is_handshaking {
1171 true
1175 } else if !validated {
1176 false
1183 } else {
1184 match status {
1185 PathStatus::Available => {
1186 true
1188 }
1189 PathStatus::Backup => {
1190 !have_validated_status_available_space
1192 }
1193 }
1194 };
1195
1196 let may_send_close = has_cids
1201 && !is_abandoned
1202 && if !validated && have_validated_status_available_space {
1203 false
1205 } else {
1206 true
1208 };
1209
1210 let may_self_abandon = has_cids && validated && !have_validated_space;
1214
1215 PathSchedulingInfo {
1216 is_abandoned,
1217 may_send_data,
1218 may_send_close,
1219 may_self_abandon,
1220 }
1221 }
1222
1223 fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit {
1224 debug_assert!(
1225 !transmit.is_empty(),
1226 "must not be called with an empty transmit buffer"
1227 );
1228
1229 let network_path = self.path_data(path_id).network_path;
1230 trace!(
1231 segment_size = transmit.segment_size(),
1232 last_datagram_len = transmit.len() % transmit.segment_size(),
1233 %network_path,
1234 "sending {} bytes in {} datagrams",
1235 transmit.len(),
1236 transmit.num_datagrams()
1237 );
1238 self.path_data_mut(path_id)
1239 .inc_total_sent(transmit.len() as u64);
1240
1241 self.path_stats
1242 .for_path(path_id)
1243 .udp_tx
1244 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1245
1246 Transmit {
1247 destination: network_path.remote,
1248 size: transmit.len(),
1249 ecn: if self.path_data(path_id).sending_ecn {
1250 Some(EcnCodepoint::Ect0)
1251 } else {
1252 None
1253 },
1254 segment_size: match transmit.num_datagrams() {
1255 1 => None,
1256 _ => Some(transmit.segment_size()),
1257 },
1258 src_ip: network_path.local_ip,
1259 }
1260 }
1261
1262 fn poll_transmit_off_path(
1264 &mut self,
1265 now: Instant,
1266 buf: &mut Vec<u8>,
1267 path_id: PathId,
1268 ) -> Option<Transmit> {
1269 if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) {
1270 return Some(challenge);
1271 }
1272 if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
1273 return Some(response);
1274 }
1275 if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
1276 return Some(challenge);
1277 }
1278 None
1279 }
1280
1281 #[must_use]
1288 fn poll_transmit_on_path(
1289 &mut self,
1290 now: Instant,
1291 buf: &mut Vec<u8>,
1292 path_id: PathId,
1293 max_datagrams: NonZeroUsize,
1294 scheduling_info: &PathSchedulingInfo,
1295 connection_close_pending: bool,
1296 ) -> Option<Transmit> {
1297 let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else {
1299 if !self.abandoned_paths.contains(&path_id) {
1300 debug!(%path_id, "no remote CIDs for path");
1301 }
1302 return None;
1303 };
1304
1305 let mut pad_datagram = PadDatagram::No;
1311
1312 let mut last_packet_number = None;
1316
1317 let mut congestion_blocked = false;
1320
1321 let pmtu = self.path_data(path_id).current_mtu().into();
1323 let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu);
1324
1325 for space_id in SpaceId::iter() {
1327 if path_id != PathId::ZERO && space_id != SpaceId::Data {
1329 continue;
1330 }
1331 match self.poll_transmit_path_space(
1332 now,
1333 &mut transmit,
1334 path_id,
1335 space_id,
1336 remote_cid,
1337 scheduling_info,
1338 connection_close_pending,
1339 pad_datagram,
1340 ) {
1341 PollPathSpaceStatus::NothingToSend {
1342 congestion_blocked: cb,
1343 } => {
1344 congestion_blocked |= cb;
1345 }
1348 PollPathSpaceStatus::WrotePacket {
1349 last_packet_number: pn,
1350 pad_datagram: pad,
1351 } => {
1352 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1353 last_packet_number = Some(pn);
1354 pad_datagram = pad;
1355 continue;
1360 }
1361 PollPathSpaceStatus::Send {
1362 last_packet_number: pn,
1363 } => {
1364 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1365 last_packet_number = Some(pn);
1366 break;
1367 }
1368 }
1369 }
1370
1371 if last_packet_number.is_some() || congestion_blocked {
1372 self.qlog.emit_recovery_metrics(
1373 path_id,
1374 &mut self
1375 .paths
1376 .get_mut(&path_id)
1377 .expect("path_id was iterated from self.paths above")
1378 .data,
1379 now,
1380 );
1381 }
1382
1383 self.path_data_mut(path_id).app_limited =
1384 last_packet_number.is_none() && !congestion_blocked;
1385
1386 match last_packet_number {
1387 Some(last_packet_number) => {
1388 self.path_data_mut(path_id).congestion.on_sent(
1391 now,
1392 transmit.len() as u64,
1393 last_packet_number,
1394 );
1395 Some(self.build_transmit(path_id, transmit))
1396 }
1397 None => None,
1398 }
1399 }
1400
1401 #[must_use]
1403 fn poll_transmit_path_space(
1404 &mut self,
1405 now: Instant,
1406 transmit: &mut TransmitBuf<'_>,
1407 path_id: PathId,
1408 space_id: SpaceId,
1409 remote_cid: ConnectionId,
1410 scheduling_info: &PathSchedulingInfo,
1411 connection_close_pending: bool,
1413 mut pad_datagram: PadDatagram,
1415 ) -> PollPathSpaceStatus {
1416 let mut last_packet_number = None;
1419
1420 loop {
1436 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1438 transmit.datagram_remaining_mut()
1440 } else {
1441 transmit.segment_size()
1443 };
1444 let can_send =
1445 self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending);
1446 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1447 let space_will_send = {
1448 if scheduling_info.is_abandoned {
1449 scheduling_info.may_self_abandon
1454 && self.spaces[space_id]
1455 .pending
1456 .path_abandon
1457 .contains_key(&path_id)
1458 } else if can_send.close && scheduling_info.may_send_close {
1459 true
1461 } else if needs_loss_probe || can_send.space_specific {
1462 true
1465 } else {
1466 !can_send.is_empty() && scheduling_info.may_send_data
1469 }
1470 };
1471
1472 if !space_will_send {
1473 return match last_packet_number {
1476 Some(pn) => PollPathSpaceStatus::WrotePacket {
1477 last_packet_number: pn,
1478 pad_datagram,
1479 },
1480 None => {
1481 if self.crypto_state.has_keys(space_id.encryption_level())
1483 || (space_id == SpaceId::Data
1484 && self.crypto_state.has_keys(EncryptionLevel::ZeroRtt))
1485 {
1486 trace!(?space_id, %path_id, "nothing to send in space");
1487 }
1488 PollPathSpaceStatus::NothingToSend {
1489 congestion_blocked: false,
1490 }
1491 }
1492 };
1493 }
1494
1495 if transmit.datagram_remaining_mut() == 0 {
1499 let congestion_blocked =
1500 self.path_congestion_check(space_id, path_id, transmit, &can_send, now);
1501 if congestion_blocked != PathBlocked::No {
1502 return match last_packet_number {
1504 Some(pn) => PollPathSpaceStatus::WrotePacket {
1505 last_packet_number: pn,
1506 pad_datagram,
1507 },
1508 None => {
1509 return PollPathSpaceStatus::NothingToSend {
1510 congestion_blocked: true,
1511 };
1512 }
1513 };
1514 }
1515
1516 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1519 return match last_packet_number {
1522 Some(pn) => PollPathSpaceStatus::WrotePacket {
1523 last_packet_number: pn,
1524 pad_datagram,
1525 },
1526 None => {
1527 return PollPathSpaceStatus::NothingToSend {
1528 congestion_blocked: false,
1529 };
1530 }
1531 };
1532 }
1533
1534 if needs_loss_probe {
1535 let request_immediate_ack =
1537 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1538 self.spaces[space_id].queue_tail_loss_probe(
1539 path_id,
1540 request_immediate_ack,
1541 &self.streams,
1542 );
1543
1544 self.spaces[space_id].for_path(path_id).loss_probes -= 1; transmit.start_new_datagram_with_size(std::cmp::min(
1550 usize::from(INITIAL_MTU),
1551 transmit.segment_size(),
1552 ));
1553 } else {
1554 transmit.start_new_datagram();
1555 }
1556 trace!(count = transmit.num_datagrams(), "new datagram started");
1557
1558 pad_datagram = PadDatagram::No;
1560 }
1561
1562 if transmit.datagram_start_offset() < transmit.len() {
1565 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1566 }
1567
1568 if self.crypto_state.has_keys(EncryptionLevel::Initial)
1573 && space_id == SpaceId::Handshake
1574 && self.side.is_client()
1575 {
1576 self.discard_space(now, SpaceKind::Initial);
1579 }
1580 if let Some(ref mut prev) = self.crypto_state.prev_crypto {
1581 prev.update_unacked = false;
1582 }
1583
1584 let Some(mut builder) =
1585 PacketBuilder::new(now, space_id, path_id, remote_cid, transmit, self)
1586 else {
1587 return PollPathSpaceStatus::NothingToSend {
1594 congestion_blocked: false,
1595 };
1596 };
1597 last_packet_number = Some(builder.packet_number);
1598
1599 if space_id == SpaceId::Initial
1600 && (self.side.is_client() || can_send.is_ack_eliciting() || needs_loss_probe)
1601 {
1602 pad_datagram |= PadDatagram::ToMinMtu;
1604 }
1605 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1606 pad_datagram |= PadDatagram::ToSegmentSize;
1607 }
1608
1609 if scheduling_info.may_send_close && can_send.close {
1610 trace!("sending CONNECTION_CLOSE");
1611 let is_multipath_negotiated = self.is_multipath_negotiated();
1616 for path_id in self.spaces[space_id]
1617 .number_spaces
1618 .iter()
1619 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1620 .map(|(&path_id, _)| path_id)
1621 .collect::<Vec<_>>()
1622 {
1623 Self::populate_acks(
1624 now,
1625 self.receiving_ecn,
1626 path_id,
1627 space_id,
1628 &mut self.spaces[space_id],
1629 is_multipath_negotiated,
1630 &mut builder,
1631 &mut self.path_stats.for_path(path_id).frame_tx,
1632 self.crypto_state.has_keys(space_id.encryption_level()),
1633 );
1634 }
1635
1636 debug_assert!(
1644 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1645 "ACKs should leave space for ConnectionClose"
1646 );
1647 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
1648 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1649 let max_frame_size = builder.frame_space_remaining();
1650 let close: Close = match self.state.as_type() {
1651 StateType::Closed => {
1652 let reason: Close =
1653 self.state.as_closed().expect("checked").clone().into();
1654 if space_id == SpaceId::Data || reason.is_transport_layer() {
1655 reason
1656 } else {
1657 TransportError::APPLICATION_ERROR("").into()
1658 }
1659 }
1660 StateType::Draining => TransportError::NO_ERROR("").into(),
1661 _ => unreachable!(
1662 "tried to make a close packet when the connection wasn't closed"
1663 ),
1664 };
1665 builder.write_frame(close.encoder(max_frame_size), stats);
1666 }
1667 let last_pn = builder.packet_number;
1668 builder.finish_and_track(now, self, path_id, pad_datagram);
1669 if space_id.kind() == self.highest_space {
1670 self.connection_close_pending = false;
1673 }
1674 return PollPathSpaceStatus::WrotePacket {
1687 last_packet_number: last_pn,
1688 pad_datagram,
1689 };
1690 }
1691
1692 self.populate_packet(now, space_id, path_id, scheduling_info, &mut builder);
1693
1694 debug_assert!(
1701 !(builder.sent_frames().is_ack_only(&self.streams)
1702 && !can_send.acks
1703 && (can_send.other || can_send.space_specific)
1704 && builder.buf.segment_size()
1705 == self.path_data(path_id).current_mtu() as usize
1706 && self.datagrams.outgoing.is_empty()),
1707 "SendableFrames was {can_send:?}, but only ACKs have been written"
1708 );
1709 if builder.sent_frames().requires_padding {
1710 pad_datagram |= PadDatagram::ToMinMtu;
1711 }
1712
1713 for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1714 self.spaces[space_id]
1715 .for_path(*path_id)
1716 .pending_acks
1717 .acks_sent();
1718 self.timers.stop(
1719 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1720 self.qlog.with_time(now),
1721 );
1722 }
1723
1724 if builder.can_coalesce && path_id == PathId::ZERO && {
1732 let max_packet_size = builder
1733 .buf
1734 .datagram_remaining_mut()
1735 .saturating_sub(builder.predict_packet_end());
1736 max_packet_size > MIN_PACKET_SPACE
1737 && self.has_pending_packet(space_id, max_packet_size, connection_close_pending)
1738 } {
1739 trace!("will coalesce with next packet");
1742 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1743 } else {
1744 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1750 const MAX_PADDING: usize = 32;
1758 if builder.buf.datagram_remaining_mut()
1759 > builder.predict_packet_end() + MAX_PADDING
1760 {
1761 trace!(
1762 "GSO truncated by demand for {} padding bytes",
1763 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1764 );
1765 let last_pn = builder.packet_number;
1766 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1767 return PollPathSpaceStatus::Send {
1768 last_packet_number: last_pn,
1769 };
1770 }
1771
1772 builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1775 } else {
1776 builder.finish_and_track(now, self, path_id, pad_datagram);
1777 }
1778
1779 if transmit.num_datagrams() == 1 {
1782 transmit.clip_segment_size();
1783 }
1784 }
1785 }
1786 }
1787
1788 fn poll_transmit_mtu_probe(
1789 &mut self,
1790 now: Instant,
1791 buf: &mut Vec<u8>,
1792 path_id: PathId,
1793 ) -> Option<Transmit> {
1794 let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?;
1795
1796 let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize);
1798 transmit.start_new_datagram_with_size(probe_size as usize);
1799
1800 let mut builder =
1801 PacketBuilder::new(now, SpaceId::Data, path_id, active_cid, &mut transmit, self)?;
1802
1803 trace!(?probe_size, "writing MTUD probe");
1805 builder.write_frame(frame::Ping, &mut self.path_stats.for_path(path_id).frame_tx);
1806
1807 if self.peer_supports_ack_frequency() {
1809 builder.write_frame(
1810 frame::ImmediateAck,
1811 &mut self.path_stats.for_path(path_id).frame_tx,
1812 );
1813 }
1814
1815 builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1816
1817 self.path_stats.for_path(path_id).sent_plpmtud_probes += 1;
1818
1819 Some(self.build_transmit(path_id, transmit))
1820 }
1821
1822 fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> {
1830 let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?;
1831 let is_eligible = self.path_data(path_id).validated
1832 && !self.path_data(path_id).is_validating_path()
1833 && !self.abandoned_paths.contains(&path_id);
1834
1835 if !is_eligible {
1836 return None;
1837 }
1838 let next_pn = self.spaces[SpaceId::Data]
1839 .for_path(path_id)
1840 .peek_tx_number();
1841 let probe_size = self
1842 .path_data_mut(path_id)
1843 .mtud
1844 .poll_transmit(now, next_pn)?;
1845
1846 Some((active_cid, probe_size))
1847 }
1848
1849 fn has_pending_packet(
1866 &mut self,
1867 current_space_id: SpaceId,
1868 max_packet_size: usize,
1869 connection_close_pending: bool,
1870 ) -> bool {
1871 let mut space_id = current_space_id;
1872 loop {
1873 let can_send = self.space_can_send(
1874 space_id,
1875 PathId::ZERO,
1876 max_packet_size,
1877 connection_close_pending,
1878 );
1879 if !can_send.is_empty() {
1880 return true;
1881 }
1882 match space_id.next() {
1883 Some(next_space_id) => space_id = next_space_id,
1884 None => break,
1885 }
1886 }
1887 false
1888 }
1889
1890 fn path_congestion_check(
1892 &mut self,
1893 space_id: SpaceId,
1894 path_id: PathId,
1895 transmit: &TransmitBuf<'_>,
1896 can_send: &SendableFrames,
1897 now: Instant,
1898 ) -> PathBlocked {
1899 if self.side().is_server()
1905 && self
1906 .path_data(path_id)
1907 .anti_amplification_blocked(transmit.len() as u64 + 1)
1908 {
1909 trace!(?space_id, %path_id, "blocked by anti-amplification");
1910 return PathBlocked::AntiAmplification;
1911 }
1912
1913 let bytes_to_send = transmit.segment_size() as u64;
1916 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1917
1918 if can_send.other && !need_loss_probe && !can_send.close {
1919 let path = self.path_data(path_id);
1920 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1921 trace!(
1922 ?space_id,
1923 %path_id,
1924 in_flight=%path.in_flight.bytes,
1925 congestion_window=%path.congestion.window(),
1926 "blocked by congestion control",
1927 );
1928 return PathBlocked::Congestion;
1929 }
1930 }
1931
1932 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1934 let resume_time = now + delay;
1935 self.timers.set(
1936 Timer::PerPath(path_id, PathTimer::Pacing),
1937 resume_time,
1938 self.qlog.with_time(now),
1939 );
1940 trace!(?space_id, %path_id, ?delay, "blocked by pacing");
1943 return PathBlocked::Pacing;
1944 }
1945
1946 PathBlocked::No
1947 }
1948
1949 fn send_prev_path_challenge(
1954 &mut self,
1955 now: Instant,
1956 buf: &mut Vec<u8>,
1957 path_id: PathId,
1958 ) -> Option<Transmit> {
1959 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1960 if !prev_path.pending_on_path_challenge {
1961 return None;
1962 };
1963 prev_path.pending_on_path_challenge = false;
1964 let token = self.rng.random();
1965 let network_path = prev_path.network_path;
1966 prev_path.record_path_challenge_sent(now, token, network_path);
1967
1968 debug_assert_eq!(
1969 self.highest_space,
1970 SpaceKind::Data,
1971 "PATH_CHALLENGE queued without 1-RTT keys"
1972 );
1973 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1974 buf.start_new_datagram();
1975
1976 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, self)?;
1982 let challenge = frame::PathChallenge(token);
1983 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
1984 builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1985
1986 builder.pad_to(MIN_INITIAL_SIZE);
1991
1992 builder.finish(self, now);
1993 self.path_stats
1994 .for_path(path_id)
1995 .udp_tx
1996 .on_sent(1, buf.len());
1997
1998 trace!(
1999 dst = ?network_path.remote,
2000 src = ?network_path.local_ip,
2001 len = buf.len(),
2002 "sending prev_path off-path challenge",
2003 );
2004 Some(Transmit {
2005 destination: network_path.remote,
2006 size: buf.len(),
2007 ecn: None,
2008 segment_size: None,
2009 src_ip: network_path.local_ip,
2010 })
2011 }
2012
2013 fn send_off_path_path_response(
2014 &mut self,
2015 now: Instant,
2016 buf: &mut Vec<u8>,
2017 path_id: PathId,
2018 ) -> Option<Transmit> {
2019 let path = self.paths.get_mut(&path_id).map(|state| &mut state.data)?;
2020 let cid_queue = self.remote_cids.get_mut(&path_id)?;
2021 let (token, network_path) = path.path_responses.pop_off_path(path.network_path)?;
2022
2023 let cid = cid_queue.active();
2025
2026 let frame = frame::PathResponse(token);
2027
2028 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2029 buf.start_new_datagram();
2030
2031 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, self)?;
2032 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2033 builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
2034
2035 if self
2040 .find_validated_path_on_network_path(network_path)
2041 .is_none()
2042 && self.n0_nat_traversal.client_side().is_ok()
2043 {
2044 let token = self.rng.random();
2045 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2046 builder.write_frame(frame::PathChallenge(token), stats);
2047 let ip_port = (network_path.remote.ip(), network_path.remote.port());
2048 self.n0_nat_traversal.mark_probe_sent(ip_port, token);
2049 }
2050
2051 builder.pad_to(MIN_INITIAL_SIZE);
2054 builder.finish(self, now);
2055
2056 let size = buf.len();
2057 self.path_stats.for_path(path_id).udp_tx.on_sent(1, size);
2058
2059 trace!(
2060 dst = ?network_path.remote,
2061 src = ?network_path.local_ip,
2062 len = buf.len(),
2063 "sending off-path PATH_RESPONSE",
2064 );
2065 Some(Transmit {
2066 destination: network_path.remote,
2067 size,
2068 ecn: None,
2069 segment_size: None,
2070 src_ip: network_path.local_ip,
2071 })
2072 }
2073
2074 fn send_nat_traversal_path_challenge(
2076 &mut self,
2077 now: Instant,
2078 buf: &mut Vec<u8>,
2079 path_id: PathId,
2080 ) -> Option<Transmit> {
2081 let remote = self.n0_nat_traversal.next_probe_addr()?;
2082
2083 if !self.paths.get(&path_id)?.data.validated {
2084 return None;
2086 }
2087
2088 let Some(cid) = self
2093 .remote_cids
2094 .get(&path_id)
2095 .map(|cid_queue| cid_queue.active())
2096 else {
2097 trace!(%path_id, "Not sending NAT traversal probe for path with no CIDs");
2098 return None;
2099 };
2100 let token = self.rng.random();
2101
2102 let frame = frame::PathChallenge(token);
2103
2104 let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2105 buf.start_new_datagram();
2106
2107 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, self)?;
2108 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2109 builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
2110 builder.finish(self, now);
2113
2114 self.n0_nat_traversal.mark_probe_sent(remote, token);
2116
2117 let size = buf.len();
2118 self.path_stats.for_path(path_id).udp_tx.on_sent(1, size);
2119
2120 trace!(dst = ?remote, len = buf.len(), "sending off-path NAT probe");
2121 Some(Transmit {
2122 destination: remote.into(),
2123 size,
2124 ecn: None,
2125 segment_size: None,
2126 src_ip: None,
2127 })
2128 }
2129
2130 fn space_can_send(
2138 &mut self,
2139 space_id: SpaceId,
2140 path_id: PathId,
2141 packet_size: usize,
2142 connection_close_pending: bool,
2143 ) -> SendableFrames {
2144 let space = &mut self.spaces[space_id];
2145 let space_has_crypto = self.crypto_state.has_keys(space_id.encryption_level());
2146
2147 if !space_has_crypto
2148 && (space_id != SpaceId::Data
2149 || !self.crypto_state.has_keys(EncryptionLevel::ZeroRtt)
2150 || self.side.is_server())
2151 {
2152 return SendableFrames::empty();
2154 }
2155
2156 let mut can_send = space.can_send(path_id, &self.streams);
2157
2158 if space_id == SpaceId::Data {
2160 let pn = space.for_path(path_id).peek_tx_number();
2161 let frame_space_1rtt =
2167 packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
2168 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
2169 }
2170
2171 can_send.close = connection_close_pending && space_has_crypto;
2172
2173 can_send
2174 }
2175
2176 pub fn handle_event(&mut self, event: ConnectionEvent) {
2182 use ConnectionEventInner::*;
2183 match event.0 {
2184 Datagram(DatagramConnectionEvent {
2185 now,
2186 network_path,
2187 path_id,
2188 ecn,
2189 first_decode,
2190 remaining,
2191 }) => {
2192 let span = trace_span!("pkt", %path_id);
2193 let _guard = span.enter();
2194
2195 if self.early_discard_packet(network_path, path_id) {
2196 return;
2198 }
2199
2200 let was_anti_amplification_blocked = self
2201 .path(path_id)
2202 .map(|path| path.anti_amplification_blocked(1))
2203 .unwrap_or(false);
2206
2207 let rx = &mut self.path_stats.for_path(path_id).udp_rx;
2208 rx.datagrams += 1;
2209 rx.bytes += first_decode.len() as u64;
2210 let data_len = first_decode.len();
2211
2212 self.handle_decode(now, network_path, path_id, ecn, first_decode);
2213 if let Some(path) = self.path_mut(path_id) {
2218 path.inc_total_recvd(data_len as u64);
2219 }
2220
2221 if let Some(data) = remaining {
2222 self.path_stats.for_path(path_id).udp_rx.bytes += data.len() as u64;
2223 self.handle_coalesced(now, network_path, path_id, ecn, data);
2224 }
2225
2226 if let Some(path) = self.paths.get_mut(&path_id) {
2227 self.qlog
2228 .emit_recovery_metrics(path_id, &mut path.data, now);
2229 }
2230
2231 if was_anti_amplification_blocked {
2232 self.set_loss_detection_timer(now, path_id);
2236 }
2237 }
2238 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
2239 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
2240 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
2241
2242 if self.abandoned_paths.contains(&path_id) {
2245 if !self.state.is_drained() {
2246 for issued in &ids {
2247 self.endpoint_events
2248 .push_back(EndpointEventInner::RetireConnectionId(
2249 now,
2250 path_id,
2251 issued.sequence,
2252 false,
2253 ));
2254 }
2255 }
2256 return;
2257 }
2258
2259 let cid_state = self
2260 .local_cid_state
2261 .entry(path_id)
2262 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
2263 cid_state.new_cids(&ids, now);
2264
2265 ids.into_iter().rev().for_each(|frame| {
2266 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
2267 });
2268 self.reset_cid_retirement(now);
2270 }
2271 }
2272 }
2273
2274 fn early_discard_packet(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
2282 if self.is_handshaking() && path_id != PathId::ZERO {
2283 debug!(%network_path, %path_id, "discarding multipath packet during handshake");
2284 return true;
2285 }
2286
2287 let peer_may_probe = self.peer_may_probe();
2288 let local_ip_may_migrate = self.local_ip_may_migrate();
2289
2290 if let Some(known_path) = self.path_mut(path_id) {
2294 if network_path.remote != known_path.network_path.remote && !peer_may_probe {
2295 trace!(
2296 %path_id,
2297 %network_path,
2298 %known_path.network_path,
2299 "discarding packet from unrecognized peer"
2300 );
2301 return true;
2302 }
2303
2304 if known_path.network_path.local_ip.is_some()
2305 && network_path.local_ip.is_some()
2306 && known_path.network_path.local_ip != network_path.local_ip
2307 && !local_ip_may_migrate
2308 {
2309 trace!(
2310 %path_id,
2311 %network_path,
2312 %known_path.network_path,
2313 "discarding packet sent to incorrect interface"
2314 );
2315 return true;
2316 }
2317 }
2318 false
2319 }
2320
2321 fn peer_may_probe(&self) -> bool {
2332 match &self.side {
2333 ConnectionSide::Client { .. } => {
2334 if let Some(hs) = self.state.as_handshake() {
2335 hs.allow_server_migration
2336 } else {
2337 self.n0_nat_traversal.is_negotiated() && self.is_handshake_confirmed()
2338 }
2339 }
2340 ConnectionSide::Server { server_config } => {
2341 self.is_handshake_confirmed()
2342 && (server_config.migration || self.n0_nat_traversal.is_negotiated())
2343 }
2344 }
2345 }
2346
2347 fn peer_may_migrate(&self) -> bool {
2359 match &self.side {
2360 ConnectionSide::Server { server_config } => {
2361 server_config.migration && self.is_handshake_confirmed()
2362 }
2363 ConnectionSide::Client { .. } => false,
2364 }
2365 }
2366
2367 fn local_ip_may_migrate(&self) -> bool {
2380 (self.side.is_client() || self.n0_nat_traversal.is_negotiated())
2381 && self.is_handshake_confirmed()
2382 }
2383 pub fn handle_timeout(&mut self, now: Instant) {
2393 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
2394 let span = match timer {
2395 Timer::Conn(timer) => trace_span!("timeout", scope = "conn", ?timer),
2396 Timer::PerPath(path_id, timer) => {
2397 trace_span!("timer_fired", scope="path", %path_id, ?timer)
2398 }
2399 };
2400 let _guard = span.enter();
2401 trace!("timeout");
2402 match timer {
2403 Timer::Conn(timer) => match timer {
2404 ConnTimer::Close => {
2405 let was_draining = self.state.move_to_drained(None);
2406 if !was_draining {
2407 self.endpoint_events.push_back(EndpointEventInner::Draining);
2408 }
2409 self.endpoint_events.push_back(EndpointEventInner::Drained);
2412 }
2413 ConnTimer::Idle => {
2414 self.kill(ConnectionError::TimedOut);
2415 }
2416 ConnTimer::KeepAlive => {
2417 self.ping();
2418 }
2419 ConnTimer::KeyDiscard => {
2420 self.crypto_state.discard_temporary_keys();
2421 }
2422 ConnTimer::PushNewCid => {
2423 while let Some((path_id, when)) = self.next_cid_retirement() {
2424 if when > now {
2425 break;
2426 }
2427 match self.local_cid_state.get_mut(&path_id) {
2428 None => error!(%path_id, "No local CID state for path"),
2429 Some(cid_state) => {
2430 let num_new_cid = cid_state.on_cid_timeout().into();
2432 if !self.state.is_closed() {
2433 trace!(
2434 "push a new CID to peer RETIRE_PRIOR_TO field {}",
2435 cid_state.retire_prior_to()
2436 );
2437 self.endpoint_events.push_back(
2438 EndpointEventInner::NeedIdentifiers(
2439 path_id,
2440 now,
2441 num_new_cid,
2442 ),
2443 );
2444 }
2445 }
2446 }
2447 }
2448 }
2449 ConnTimer::NoAvailablePath => {
2450 if self.state.is_closed() || self.state.is_drained() {
2455 error!("no viable path timer fired, but connection already closing");
2458 } else {
2459 trace!("no viable path grace period expired, closing connection");
2460 let err = TransportError::NO_VIABLE_PATH(
2461 "last path abandoned, no new path opened",
2462 );
2463 self.close_common();
2464 self.set_close_timer(now);
2465 self.connection_close_pending = true;
2466 self.state.move_to_closed(err);
2467 }
2468 }
2469 ConnTimer::NatTraversalProbeRetry => {
2470 self.n0_nat_traversal.queue_retries(self.is_ipv6());
2471 if let Some(delay) =
2472 self.n0_nat_traversal.retry_delay(self.config.initial_rtt)
2473 {
2474 self.timers.set(
2475 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
2476 now + delay,
2477 self.qlog.with_time(now),
2478 );
2479 trace!("re-queued NAT probes");
2480 } else {
2481 trace!("no more NAT probes remaining");
2482 }
2483 }
2484 },
2485 Timer::PerPath(path_id, timer) => {
2486 match timer {
2487 PathTimer::PathIdle => {
2488 if let Err(err) =
2489 self.close_path_inner(now, path_id, PathAbandonReason::TimedOut)
2490 {
2491 warn!(?err, "failed closing path");
2492 }
2493 }
2494
2495 PathTimer::PathKeepAlive => {
2496 self.ping_path(path_id).ok();
2497 }
2498 PathTimer::LossDetection => {
2499 self.on_loss_detection_timeout(now, path_id);
2500 if let Some(path) = self.paths.get_mut(&path_id) {
2501 self.qlog
2502 .emit_recovery_metrics(path_id, &mut path.data, now);
2503 } else {
2504 error!("LossDetection fired for unknown path");
2505 }
2506 }
2507 PathTimer::PathValidationFailed => {
2508 let Some(path) = self.paths.get_mut(&path_id) else {
2509 continue;
2510 };
2511 self.timers.stop(
2512 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2513 self.qlog.with_time(now),
2514 );
2515 debug!("path migration validation failed");
2516 if let Some((_, prev)) = path.prev.take() {
2517 path.data = prev;
2518 }
2519 path.data.reset_on_path_challenges();
2520 }
2521 PathTimer::PathChallengeLost => {
2522 let Some(path) = self.paths.get_mut(&path_id) else {
2523 continue;
2524 };
2525 trace!(?path.data.on_path_challenges_lost, "path challenge deemed lost");
2526 path.data.pending_on_path_challenge = true;
2527 path.data.on_path_challenges_lost += 1;
2528 }
2529 PathTimer::AbandonFromValidation => {
2530 let Some(path) = self.paths.get_mut(&path_id) else {
2531 continue;
2532 };
2533 path.data.reset_on_path_challenges();
2534 self.timers.stop(
2535 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2536 self.qlog.with_time(now),
2537 );
2538 debug!("new path validation failed");
2539 if let Err(err) = self.close_path_inner(
2540 now,
2541 path_id,
2542 PathAbandonReason::ValidationFailed,
2543 ) {
2544 warn!(?err, "failed closing path");
2545 }
2546 }
2547 PathTimer::Pacing => {}
2548 PathTimer::MaxAckDelay => {
2549 self.spaces[SpaceId::Data]
2551 .for_path(path_id)
2552 .pending_acks
2553 .on_max_ack_delay_timeout()
2554 }
2555 PathTimer::PathDrained => {
2556 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2559 if let Some(local_cid_state) = self.local_cid_state.remove(&path_id) {
2560 debug_assert!(!self.state.is_drained()); let (min_seq, max_seq) = local_cid_state.active_seq();
2562 for seq in min_seq..=max_seq {
2563 self.endpoint_events.push_back(
2564 EndpointEventInner::RetireConnectionId(
2565 now, path_id, seq, false,
2566 ),
2567 );
2568 }
2569 }
2570 self.discard_path(path_id, now);
2571 }
2572 }
2573 }
2574 }
2575 }
2576 }
2577
2578 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2590 self.close_inner(
2591 now,
2592 Close::Application(frame::ApplicationClose { error_code, reason }),
2593 )
2594 }
2595
2596 fn close_inner(&mut self, now: Instant, reason: Close) {
2612 let was_closed = self.state.is_closed();
2613 if !was_closed {
2614 self.close_common();
2615 self.set_close_timer(now);
2616 self.connection_close_pending = true;
2617 self.state.move_to_closed_local(reason);
2618 }
2619 }
2620
2621 pub fn datagrams(&mut self) -> Datagrams<'_> {
2623 Datagrams { conn: self }
2624 }
2625
2626 pub fn stats(&mut self) -> ConnectionStats {
2628 let mut stats = self.partial_stats.clone();
2629
2630 for path_stats in self.path_stats.iter_stats() {
2631 stats += *path_stats;
2636 }
2637
2638 stats
2639 }
2640
2641 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2643 let path = self.paths.get(&path_id)?;
2644 let stats = self.path_stats.for_path(path_id);
2645 stats.rtt = path.data.rtt.get();
2646 stats.cwnd = path.data.congestion.window();
2647 stats.current_mtu = path.data.mtud.current_mtu();
2648 Some(*stats)
2649 }
2650
2651 pub fn ping(&mut self) {
2655 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2658 path_data.ping_pending = true;
2659 }
2660 }
2661
2662 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2666 let path_data = self.spaces[self.highest_space]
2667 .number_spaces
2668 .get_mut(&path)
2669 .ok_or(ClosedPath { _private: () })?;
2670 path_data.ping_pending = true;
2671 Ok(())
2672 }
2673
2674 pub fn force_key_update(&mut self) {
2678 if !self.state.is_established() {
2679 debug!("ignoring forced key update in illegal state");
2680 return;
2681 }
2682 if self.crypto_state.prev_crypto.is_some() {
2683 debug!("ignoring redundant forced key update");
2686 return;
2687 }
2688 self.crypto_state.update_keys(None, false);
2689 }
2690
2691 pub fn crypto_session(&self) -> &dyn crypto::Session {
2693 self.crypto_state.session.as_ref()
2694 }
2695
2696 pub fn is_handshaking(&self) -> bool {
2706 self.state.is_handshake()
2707 }
2708
2709 pub fn is_closed(&self) -> bool {
2720 self.state.is_closed()
2721 }
2722
2723 pub fn is_drained(&self) -> bool {
2728 self.state.is_drained()
2729 }
2730
2731 pub fn accepted_0rtt(&self) -> bool {
2735 self.crypto_state.accepted_0rtt
2736 }
2737
2738 pub fn has_0rtt(&self) -> bool {
2740 self.crypto_state.zero_rtt_enabled
2741 }
2742
2743 pub fn has_pending_retransmits(&self) -> bool {
2745 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2746 }
2747
2748 pub fn side(&self) -> Side {
2750 self.side.side()
2751 }
2752
2753 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2755 self.path(path_id)
2756 .map(|path_data| {
2757 path_data
2758 .last_observed_addr_report
2759 .as_ref()
2760 .map(|observed| observed.socket_addr())
2761 })
2762 .ok_or(ClosedPath { _private: () })
2763 }
2764
2765 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2767 self.path(path_id).map(|d| d.rtt.get())
2768 }
2769
2770 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2772 self.path(path_id).map(|d| d.congestion.as_ref())
2773 }
2774
2775 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2780 self.streams.set_max_concurrent(dir, count);
2781 let pending = &mut self.spaces[SpaceId::Data].pending;
2784 self.streams.queue_max_stream_id(pending);
2785 }
2786
2787 pub fn set_max_concurrent_paths(
2797 &mut self,
2798 now: Instant,
2799 count: NonZeroU32,
2800 ) -> Result<(), MultipathNotNegotiated> {
2801 if !self.is_multipath_negotiated() {
2802 return Err(MultipathNotNegotiated { _private: () });
2803 }
2804 self.max_concurrent_paths = count;
2805
2806 let in_use_count = self
2807 .local_max_path_id
2808 .next()
2809 .saturating_sub(self.abandoned_paths.len())
2810 .as_u32();
2811 let extra_needed = count.get().saturating_sub(in_use_count);
2812 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2813
2814 self.set_max_path_id(now, new_max_path_id);
2815
2816 Ok(())
2817 }
2818
2819 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2821 if max_path_id <= self.local_max_path_id {
2822 return;
2823 }
2824
2825 self.local_max_path_id = max_path_id;
2826 self.spaces[SpaceId::Data].pending.max_path_id = true;
2827
2828 self.issue_first_path_cids(now);
2829 }
2830
2831 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2837 self.streams.max_concurrent(dir)
2838 }
2839
2840 pub fn set_send_window(&mut self, send_window: u64) {
2842 self.streams.set_send_window(send_window);
2843 }
2844
2845 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2847 if self.streams.set_receive_window(receive_window) {
2848 self.spaces[SpaceId::Data].pending.max_data = true;
2849 }
2850 }
2851
2852 pub fn is_multipath_negotiated(&self) -> bool {
2857 !self.is_handshaking()
2858 && self.config.max_concurrent_multipath_paths.is_some()
2859 && self.peer_params.initial_max_path_id.is_some()
2860 }
2861
2862 fn on_ack_received(
2863 &mut self,
2864 now: Instant,
2865 space: SpaceId,
2866 ack: frame::Ack,
2867 ) -> Result<(), TransportError> {
2868 let path = PathId::ZERO;
2870 self.inner_on_ack_received(now, space, path, ack)
2871 }
2872
2873 fn on_path_ack_received(
2874 &mut self,
2875 now: Instant,
2876 space: SpaceId,
2877 path_ack: frame::PathAck,
2878 ) -> Result<(), TransportError> {
2879 let (ack, path) = path_ack.into_ack();
2880 self.inner_on_ack_received(now, space, path, ack)
2881 }
2882
2883 fn inner_on_ack_received(
2885 &mut self,
2886 now: Instant,
2887 space: SpaceId,
2888 path: PathId,
2889 ack: frame::Ack,
2890 ) -> Result<(), TransportError> {
2891 if !self.spaces[space].number_spaces.contains_key(&path) {
2892 if self.abandoned_paths.contains(&path) {
2893 trace!("silently ignoring PATH_ACK on discarded path");
2899 return Ok(());
2900 } else {
2901 return Err(TransportError::PROTOCOL_VIOLATION(
2902 "received PATH_ACK with path ID never used",
2903 ));
2904 }
2905 }
2906 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2907 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2908 }
2909 let new_largest_pn = {
2911 let space = &mut self.spaces[space].for_path(path);
2912 if space
2913 .largest_acked_packet_pn
2914 .is_none_or(|pn| ack.largest > pn)
2915 {
2916 space.largest_acked_packet_pn = Some(ack.largest);
2917 if let Some(info) = space.sent_packets.get(ack.largest) {
2918 space.largest_acked_packet_send_time = info.time_sent;
2922 }
2923 Some(ack.largest)
2924 } else {
2925 None
2926 }
2927 };
2928
2929 if self.detect_spurious_loss(&ack, space, path) {
2930 self.path_stats.for_path(path).spurious_congestion_events += 1;
2931 self.path_data_mut(path)
2932 .congestion
2933 .on_spurious_congestion_event();
2934 }
2935
2936 let mut newly_acked: ArrayRangeSet = ArrayRangeSet::new();
2938 for range in ack.iter() {
2939 self.spaces[space].for_path(path).check_ack(range.clone())?;
2940 for (pn, _) in self.spaces[space]
2941 .for_path(path)
2942 .sent_packets
2943 .iter_range(range)
2944 {
2945 newly_acked.insert_one(pn);
2946 }
2947 }
2948
2949 if newly_acked.is_empty() {
2950 return Ok(());
2951 }
2952
2953 let mut ack_eliciting_acked = false;
2954 for packet in newly_acked.elts() {
2955 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2956 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2957 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2963 pns.pending_acks.subtract_below(*acked_pn);
2964 }
2965 }
2966 ack_eliciting_acked |= info.ack_eliciting;
2967
2968 let path_data = self.path_data_mut(path);
2970 let mtu_updated = path_data.mtud.on_acked(space.kind(), packet, info.size);
2971 if mtu_updated {
2972 path_data
2973 .congestion
2974 .on_mtu_update(path_data.mtud.current_mtu());
2975 }
2976
2977 self.ack_frequency.on_acked(path, packet);
2979
2980 self.on_packet_acked(now, path, packet, info);
2981 }
2982 }
2983
2984 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet_pn;
2985 let path_data = self.path_data_mut(path);
2986 let app_limited = path_data.app_limited;
2987 let in_flight = path_data.in_flight.bytes;
2988
2989 path_data
2990 .congestion
2991 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2992
2993 if new_largest_pn.is_some() && ack_eliciting_acked {
2994 let ack_delay = if space != SpaceId::Data {
2995 Duration::from_micros(0)
2996 } else {
2997 cmp::min(
2998 self.ack_frequency.peer_max_ack_delay,
2999 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
3000 )
3001 };
3002 let rtt = now.saturating_duration_since(
3003 self.spaces[space]
3004 .for_path(path)
3005 .largest_acked_packet_send_time,
3006 );
3007
3008 let next_pn = self.spaces[space].for_path(path).next_packet_number;
3009 let path_data = self.path_data_mut(path);
3010 path_data.rtt.update(ack_delay, rtt);
3012 if path_data.first_packet_after_rtt_sample.is_none() {
3013 path_data.first_packet_after_rtt_sample = Some((space.kind(), next_pn));
3014 }
3015 }
3016
3017 self.detect_lost_packets(now, space, path, true);
3019
3020 if self.peer_completed_handshake_address_validation() {
3025 self.path_data_mut(path).pto_count = 0;
3026 }
3027
3028 if self.path_data(path).sending_ecn {
3033 if let Some(ecn) = ack.ecn {
3034 if let Some(largest_sent_pn) = new_largest_pn {
3039 let sent = self.spaces[space]
3040 .for_path(path)
3041 .largest_acked_packet_send_time;
3042 self.process_ecn(
3043 now,
3044 space,
3045 path,
3046 newly_acked.range_count() as u64,
3047 ecn,
3048 sent,
3049 largest_sent_pn,
3050 );
3051 }
3052 } else {
3053 debug!("ECN not acknowledged by peer");
3055 self.path_data_mut(path).sending_ecn = false;
3056 }
3057 }
3058
3059 self.set_loss_detection_timer(now, path);
3060 Ok(())
3061 }
3062
3063 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
3064 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
3065
3066 if lost_packets.is_empty() {
3067 return false;
3068 }
3069
3070 for range in ack.iter() {
3071 let spurious_losses: Vec<u64> = lost_packets
3072 .iter_range(range.clone())
3073 .map(|(pn, _info)| pn)
3074 .collect();
3075
3076 for pn in spurious_losses {
3077 lost_packets.remove(pn);
3078 }
3079 }
3080
3081 lost_packets.is_empty()
3086 }
3087
3088 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
3093 let two_pto = 2 * self.path_data(path).rtt.pto_base();
3094
3095 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
3096 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
3097 }
3098
3099 fn process_ecn(
3101 &mut self,
3102 now: Instant,
3103 space: SpaceId,
3104 path: PathId,
3105 newly_acked_pn: u64,
3106 ecn: frame::EcnCounts,
3107 largest_sent_time: Instant,
3108 largest_sent_pn: u64,
3109 ) {
3110 match self.spaces[space]
3111 .for_path(path)
3112 .detect_ecn(newly_acked_pn, ecn)
3113 {
3114 Err(e) => {
3115 debug!("halting ECN due to verification failure: {}", e);
3116
3117 self.path_data_mut(path).sending_ecn = false;
3118 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
3121 }
3122 Ok(false) => {}
3123 Ok(true) => {
3124 self.path_stats.for_path(path).congestion_events += 1;
3125 self.path_data_mut(path).congestion.on_congestion_event(
3126 now,
3127 largest_sent_time,
3128 false,
3129 true,
3130 0,
3131 largest_sent_pn,
3132 );
3133 }
3134 }
3135 }
3136
3137 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, pn: u64, info: SentPacket) {
3140 let path = self.path_data_mut(path_id);
3141 let app_limited = path.app_limited;
3142 path.remove_in_flight(&info);
3143 if info.ack_eliciting && info.path_generation == path.generation() {
3144 let rtt = path.rtt;
3148 path.congestion
3149 .on_ack(now, info.time_sent, info.size.into(), pn, app_limited, &rtt);
3150 }
3151
3152 if let Some(retransmits) = info.retransmits.get() {
3154 for (id, _) in retransmits.reset_stream.iter() {
3155 self.streams.reset_acked(*id);
3156 }
3157 }
3158
3159 for frame in info.stream_frames {
3160 self.streams.received_ack_of(frame);
3161 }
3162 }
3163
3164 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceKind) {
3165 let start = if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) {
3166 now
3167 } else {
3168 self.crypto_state
3169 .prev_crypto
3170 .as_ref()
3171 .expect("no previous keys")
3172 .end_packet
3173 .as_ref()
3174 .expect("update not acknowledged yet")
3175 .1
3176 };
3177
3178 self.timers.set(
3180 Timer::Conn(ConnTimer::KeyDiscard),
3181 start + self.max_pto_for_space(space) * 3,
3182 self.qlog.with_time(now),
3183 );
3184 }
3185
3186 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
3199 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
3200 self.detect_lost_packets(now, pn_space, path_id, false);
3202 self.set_loss_detection_timer(now, path_id);
3203 return;
3204 }
3205
3206 let Some((_, space)) = self.pto_time_and_space(now, path_id) else {
3207 error!(%path_id, "PTO expired while unset");
3208 return;
3209 };
3210 trace!(
3211 in_flight = self.path_data(path_id).in_flight.bytes,
3212 count = self.path_data(path_id).pto_count,
3213 ?space,
3214 %path_id,
3215 "PTO fired"
3216 );
3217
3218 let count = match self.path_data(path_id).in_flight.ack_eliciting {
3219 0 => {
3222 debug_assert!(!self.peer_completed_handshake_address_validation());
3223 1
3224 }
3225 _ => 2,
3227 };
3228 let pns = self.spaces[space].for_path(path_id);
3229 pns.loss_probes = pns.loss_probes.saturating_add(count);
3230 let path_data = self.path_data_mut(path_id);
3231 path_data.pto_count = path_data.pto_count.saturating_add(1);
3232 self.set_loss_detection_timer(now, path_id);
3233 }
3234
3235 fn detect_lost_packets(
3252 &mut self,
3253 now: Instant,
3254 pn_space: SpaceId,
3255 path_id: PathId,
3256 due_to_ack: bool,
3257 ) {
3258 let mut lost_packets = Vec::<u64>::new();
3259 let mut lost_mtu_probe = None;
3260 let mut in_persistent_congestion = false;
3261 let mut size_of_lost_packets = 0u64;
3262 self.spaces[pn_space].for_path(path_id).loss_time = None;
3263
3264 let path = self.path_data(path_id);
3267 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3268 let loss_delay = path
3269 .rtt
3270 .conservative()
3271 .mul_f32(self.config.time_threshold)
3272 .max(TIMER_GRANULARITY);
3273 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
3274
3275 let largest_acked_packet_pn = self.spaces[pn_space]
3276 .for_path(path_id)
3277 .largest_acked_packet_pn
3278 .expect("detect_lost_packets only to be called if path received at least one ACK");
3279 let packet_threshold = self.config.packet_threshold as u64;
3280
3281 let congestion_period = self
3285 .pto(SpaceKind::Data, path_id)
3286 .saturating_mul(self.config.persistent_congestion_threshold);
3287 let mut persistent_congestion_start: Option<Instant> = None;
3288 let mut prev_packet = None;
3289 let space = self.spaces[pn_space].for_path(path_id);
3290
3291 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet_pn) {
3292 if prev_packet != Some(packet.wrapping_sub(1)) {
3293 persistent_congestion_start = None;
3295 }
3296
3297 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
3301 if packet_too_old || largest_acked_packet_pn >= packet + packet_threshold {
3302 if Some(packet) == in_flight_mtu_probe {
3304 lost_mtu_probe = in_flight_mtu_probe;
3307 } else {
3308 lost_packets.push(packet);
3309 size_of_lost_packets += info.size as u64;
3310 if info.ack_eliciting && due_to_ack {
3311 match persistent_congestion_start {
3312 Some(start) if info.time_sent - start > congestion_period => {
3315 in_persistent_congestion = true;
3316 }
3317 None if first_packet_after_rtt_sample
3319 .is_some_and(|x| x < (pn_space.kind(), packet)) =>
3320 {
3321 persistent_congestion_start = Some(info.time_sent);
3322 }
3323 _ => {}
3324 }
3325 }
3326 }
3327 } else {
3328 if space.loss_time.is_none() {
3330 space.loss_time = Some(info.time_sent + loss_delay);
3333 }
3334 persistent_congestion_start = None;
3335 }
3336
3337 prev_packet = Some(packet);
3338 }
3339
3340 self.handle_lost_packets(
3341 pn_space,
3342 path_id,
3343 now,
3344 lost_packets,
3345 lost_mtu_probe,
3346 loss_delay,
3347 in_persistent_congestion,
3348 size_of_lost_packets,
3349 );
3350 }
3351
3352 fn discard_path(&mut self, path_id: PathId, now: Instant) {
3354 trace!(%path_id, "dropping path state");
3355 let path = self.path_data(path_id);
3356 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3357
3358 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
3360 .for_path(path_id)
3361 .sent_packets
3362 .iter()
3363 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
3364 .map(|(pn, info)| {
3365 size_of_lost_packets += info.size as u64;
3366 pn
3367 })
3368 .collect();
3369
3370 if !lost_pns.is_empty() {
3371 trace!(
3372 %path_id,
3373 count = lost_pns.len(),
3374 lost_bytes = size_of_lost_packets,
3375 "packets lost on path abandon"
3376 );
3377 self.handle_lost_packets(
3378 SpaceId::Data,
3379 path_id,
3380 now,
3381 lost_pns,
3382 in_flight_mtu_probe,
3383 Duration::ZERO,
3384 false,
3385 size_of_lost_packets,
3386 );
3387 }
3388 let path_stats = self.path_stats.discard(&path_id);
3391 self.partial_stats += path_stats;
3392 self.paths.remove(&path_id);
3393 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
3394
3395 self.events.push_back(
3396 PathEvent::Discarded {
3397 id: path_id,
3398 path_stats: Box::new(path_stats),
3399 }
3400 .into(),
3401 );
3402 }
3403
3404 fn handle_lost_packets(
3405 &mut self,
3406 pn_space: SpaceId,
3407 path_id: PathId,
3408 now: Instant,
3409 lost_packets: Vec<u64>,
3410 lost_mtu_probe: Option<u64>,
3411 loss_delay: Duration,
3412 in_persistent_congestion: bool,
3413 size_of_lost_packets: u64,
3414 ) {
3415 debug_assert!(lost_packets.is_sorted(), "lost_packets must be sorted");
3416
3417 self.drain_lost_packets(now, pn_space, path_id);
3418
3419 if let Some(largest_lost) = lost_packets.last().cloned() {
3421 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
3422 let largest_lost_sent = self.spaces[pn_space]
3423 .for_path(path_id)
3424 .sent_packets
3425 .get(largest_lost)
3426 .unwrap()
3427 .time_sent;
3428 let path_stats = self.path_stats.for_path(path_id);
3429 path_stats.lost_packets += lost_packets.len() as u64;
3430 path_stats.lost_bytes += size_of_lost_packets;
3431 trace!(
3432 %path_id,
3433 count = lost_packets.len(),
3434 lost_bytes = size_of_lost_packets,
3435 "packets lost",
3436 );
3437
3438 for &packet in &lost_packets {
3439 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
3440 continue;
3441 };
3442 self.qlog
3443 .emit_packet_lost(packet, &info, loss_delay, pn_space.kind(), now);
3444 self.paths
3445 .get_mut(&path_id)
3446 .unwrap()
3447 .remove_in_flight(&info);
3448
3449 for frame in info.stream_frames {
3450 self.streams.retransmit(frame);
3451 }
3452 self.spaces[pn_space].pending |= info.retransmits;
3453 let path = self.path_data_mut(path_id);
3454 path.pending |= info.path_retransmits;
3455 path.mtud.on_non_probe_lost(packet, info.size);
3456 path.congestion.on_packet_lost(info.size, packet, now);
3457
3458 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
3459 packet,
3460 LostPacket {
3461 time_sent: info.time_sent,
3462 },
3463 );
3464 }
3465
3466 let path = self.path_data_mut(path_id);
3467 if path.mtud.black_hole_detected(now) {
3468 path.congestion.on_mtu_update(path.mtud.current_mtu());
3469 if let Some(max_datagram_size) = self.datagrams().max_size()
3470 && self.datagrams.drop_oversized(max_datagram_size)
3471 && self.datagrams.send_blocked
3472 {
3473 self.datagrams.send_blocked = false;
3474 self.events.push_back(Event::DatagramsUnblocked);
3475 }
3476 self.path_stats.for_path(path_id).black_holes_detected += 1;
3477 }
3478
3479 let lost_ack_eliciting =
3481 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
3482
3483 if lost_ack_eliciting {
3484 self.path_stats.for_path(path_id).congestion_events += 1;
3485 self.path_data_mut(path_id).congestion.on_congestion_event(
3486 now,
3487 largest_lost_sent,
3488 in_persistent_congestion,
3489 false,
3490 size_of_lost_packets,
3491 largest_lost,
3492 );
3493 }
3494 }
3495
3496 if let Some(packet) = lost_mtu_probe {
3498 let info = self.spaces[SpaceId::Data]
3499 .for_path(path_id)
3500 .take(packet)
3501 .unwrap(); self.paths
3504 .get_mut(&path_id)
3505 .unwrap()
3506 .remove_in_flight(&info);
3507 self.path_data_mut(path_id).mtud.on_probe_lost();
3508 self.path_stats.for_path(path_id).lost_plpmtud_probes += 1;
3509 }
3510 }
3511
3512 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
3518 SpaceId::iter()
3519 .filter_map(|id| {
3520 self.spaces[id]
3521 .number_spaces
3522 .get(&path_id)
3523 .and_then(|pns| pns.loss_time)
3524 .map(|time| (time, id))
3525 })
3526 .min_by_key(|&(time, _)| time)
3527 }
3528
3529 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3537 let path = self.path(path_id)?;
3538 let pto_count = path.pto_count;
3539
3540 let max_interval = if path.rtt.get() > SLOW_RTT_THRESHOLD {
3542 (path.rtt.get() * 3) / 2
3544 } else if let Some(idle) = path.idle_timeout.or(self.idle_timeout)
3545 && idle <= MIN_IDLE_FOR_FAST_PTO
3546 {
3547 MAX_PTO_FAST_INTERVAL
3550 } else {
3551 MAX_PTO_INTERVAL
3553 };
3554
3555 if path_id == PathId::ZERO
3556 && path.in_flight.ack_eliciting == 0
3557 && !self.peer_completed_handshake_address_validation()
3558 {
3559 let space = match self.highest_space {
3565 SpaceKind::Handshake => SpaceId::Handshake,
3566 _ => SpaceId::Initial,
3567 };
3568
3569 let backoff = 2u32.pow(path.pto_count.min(MAX_BACKOFF_EXPONENT));
3570 let duration = path.rtt.pto_base() * backoff;
3571 let duration = duration.min(max_interval);
3572 return Some((now + duration, space));
3573 }
3574
3575 let mut result = None;
3576 for space in SpaceId::iter() {
3577 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3578 continue;
3579 };
3580
3581 if space == SpaceId::Data && !self.is_handshake_confirmed() {
3582 continue;
3586 }
3587
3588 if !pns.has_in_flight() {
3589 continue;
3590 }
3591
3592 let duration = {
3597 let max_ack_delay = if space == SpaceId::Data {
3598 self.ack_frequency.max_ack_delay_for_pto()
3599 } else {
3600 Duration::ZERO
3601 };
3602 let pto_base = path.rtt.pto_base() + max_ack_delay;
3603 let mut duration = pto_base;
3604 for i in 1..=pto_count {
3605 let exponential_duration = pto_base * 2u32.pow(i.min(MAX_BACKOFF_EXPONENT));
3606 let max_duration = duration + max_interval;
3607 duration = exponential_duration.min(max_duration);
3608 }
3609 duration
3610 };
3611
3612 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3613 continue;
3614 };
3615 let pto = last_ack_eliciting + duration;
3618 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3619 if path.anti_amplification_blocked(1) {
3620 continue;
3622 }
3623 if path.in_flight.ack_eliciting == 0 {
3624 continue;
3626 }
3627 result = Some((pto, space));
3628 }
3629 }
3630 result
3631 }
3632
3633 fn peer_completed_handshake_address_validation(&self) -> bool {
3635 if self.side.is_server() || self.state.is_closed() {
3636 return true;
3637 }
3638 self.spaces[SpaceId::Handshake]
3642 .path_space(PathId::ZERO)
3643 .and_then(|pns| pns.largest_acked_packet_pn)
3644 .is_some()
3645 || self.spaces[SpaceId::Data]
3646 .path_space(PathId::ZERO)
3647 .and_then(|pns| pns.largest_acked_packet_pn)
3648 .is_some()
3649 || (self.crypto_state.has_keys(EncryptionLevel::OneRtt)
3650 && !self.crypto_state.has_keys(EncryptionLevel::Handshake))
3651 }
3652
3653 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3661 if self.state.is_closed() {
3662 return;
3666 }
3667
3668 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3669 self.timers.set(
3671 Timer::PerPath(path_id, PathTimer::LossDetection),
3672 loss_time,
3673 self.qlog.with_time(now),
3674 );
3675 return;
3676 }
3677
3678 if !self.abandoned_paths.contains(&path_id)
3681 && let Some((timeout, _)) = self.pto_time_and_space(now, path_id)
3682 {
3683 self.timers.set(
3684 Timer::PerPath(path_id, PathTimer::LossDetection),
3685 timeout,
3686 self.qlog.with_time(now),
3687 );
3688 } else {
3689 self.timers.stop(
3690 Timer::PerPath(path_id, PathTimer::LossDetection),
3691 self.qlog.with_time(now),
3692 );
3693 }
3694 }
3695
3696 fn max_pto_for_space(&self, space: SpaceKind) -> Duration {
3700 self.paths
3701 .keys()
3702 .map(|path_id| self.pto(space, *path_id))
3703 .max()
3704 .unwrap_or_else(|| {
3705 let rtt = self.config.initial_rtt;
3709 let max_ack_delay = match space {
3710 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3711 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3712 };
3713 rtt + cmp::max(4 * (rtt / 2), TIMER_GRANULARITY) + max_ack_delay
3714 })
3715 }
3716
3717 fn pto(&self, space: SpaceKind, path_id: PathId) -> Duration {
3722 let max_ack_delay = match space {
3723 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3724 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3725 };
3726 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3727 }
3728
3729 fn on_packet_authenticated(
3730 &mut self,
3731 now: Instant,
3732 space_id: SpaceKind,
3733 path_id: PathId,
3734 ecn: Option<EcnCodepoint>,
3735 packet_number: Option<u64>,
3736 spin: bool,
3737 is_1rtt: bool,
3738 remote: &FourTuple,
3739 ) {
3740 let is_on_path = self
3747 .path_data(path_id)
3748 .network_path
3749 .is_probably_same_path(remote);
3750
3751 self.total_authed_packets += 1;
3752 self.reset_keep_alive(path_id, now);
3753 self.reset_idle_timeout(now, space_id, path_id);
3754 self.path_data_mut(path_id).permit_idle_reset = true;
3755
3756 if is_on_path {
3759 self.receiving_ecn |= ecn.is_some();
3760 if let Some(x) = ecn {
3761 let space = &mut self.spaces[space_id];
3762 space.for_path(path_id).ecn_counters += x;
3763
3764 if x.is_ce() {
3765 space
3766 .for_path(path_id)
3767 .pending_acks
3768 .set_immediate_ack_required();
3769 }
3770 }
3771 }
3772
3773 let Some(packet_number) = packet_number else {
3774 return;
3775 };
3776 match &self.side {
3777 ConnectionSide::Client { .. } => {
3778 if space_id == SpaceKind::Handshake
3782 && let Some(hs) = self.state.as_handshake_mut()
3783 {
3784 hs.allow_server_migration = false;
3785 }
3786 }
3787 ConnectionSide::Server { .. } => {
3788 if self.crypto_state.has_keys(EncryptionLevel::Initial)
3789 && space_id == SpaceKind::Handshake
3790 {
3791 self.discard_space(now, SpaceKind::Initial);
3793 }
3794 if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) && is_1rtt {
3795 self.set_key_discard_timer(now, space_id)
3797 }
3798 }
3799 }
3800 let space = self.spaces[space_id].for_path(path_id);
3801
3802 space.pending_acks.insert_one(packet_number, now);
3803 if packet_number >= space.largest_received_packet_number.unwrap_or_default() {
3804 space.largest_received_packet_number = Some(packet_number);
3805
3806 if is_on_path {
3808 self.spin = self.side.is_client() ^ spin;
3809 }
3810 }
3811 }
3812
3813 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceKind, path_id: PathId) {
3818 if let Some(timeout) = self.idle_timeout {
3820 if self.state.is_closed() {
3821 self.timers
3822 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3823 } else {
3824 let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space));
3825 self.timers.set(
3826 Timer::Conn(ConnTimer::Idle),
3827 now + dt,
3828 self.qlog.with_time(now),
3829 );
3830 }
3831 }
3832
3833 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3835 if self.state.is_closed() {
3836 self.timers.stop(
3837 Timer::PerPath(path_id, PathTimer::PathIdle),
3838 self.qlog.with_time(now),
3839 );
3840 } else {
3841 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3842 self.timers.set(
3843 Timer::PerPath(path_id, PathTimer::PathIdle),
3844 now + dt,
3845 self.qlog.with_time(now),
3846 );
3847 }
3848 }
3849 }
3850
3851 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3853 if !self.state.is_established() {
3854 return;
3855 }
3856
3857 if let Some(interval) = self.config.keep_alive_interval {
3858 self.timers.set(
3859 Timer::Conn(ConnTimer::KeepAlive),
3860 now + interval,
3861 self.qlog.with_time(now),
3862 );
3863 }
3864
3865 if let Some(interval) = self.path_data(path_id).keep_alive {
3866 self.timers.set(
3867 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3868 now + interval,
3869 self.qlog.with_time(now),
3870 );
3871 }
3872 }
3873
3874 fn reset_cid_retirement(&mut self, now: Instant) {
3876 if let Some((_path, t)) = self.next_cid_retirement() {
3877 self.timers.set(
3878 Timer::Conn(ConnTimer::PushNewCid),
3879 t,
3880 self.qlog.with_time(now),
3881 );
3882 }
3883 }
3884
3885 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3887 self.local_cid_state
3888 .iter()
3889 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3890 .min_by_key(|(_path_id, timeout)| *timeout)
3891 }
3892
3893 pub(crate) fn handle_first_packet(
3898 &mut self,
3899 now: Instant,
3900 network_path: FourTuple,
3901 ecn: Option<EcnCodepoint>,
3902 packet_number: u64,
3903 packet: InitialPacket,
3904 remaining: Option<BytesMut>,
3905 ) -> Result<(), ConnectionError> {
3906 let span = trace_span!("first recv");
3907 let _guard = span.enter();
3908 debug_assert!(self.side.is_server());
3909 let len = packet.header_data.len() + packet.payload.len();
3910 let path_id = PathId::ZERO;
3911 self.path_data_mut(path_id).total_recvd = len as u64;
3912
3913 if let Some(hs) = self.state.as_handshake_mut() {
3914 hs.expected_token = packet.header.token.clone();
3915 } else {
3916 unreachable!("first packet must be delivered in Handshake state");
3917 }
3918
3919 self.on_packet_authenticated(
3921 now,
3922 SpaceKind::Initial,
3923 path_id,
3924 ecn,
3925 Some(packet_number),
3926 false,
3927 false,
3928 &network_path,
3929 );
3930
3931 let packet: Packet = packet.into();
3932
3933 let mut qlog = QlogRecvPacket::new(len);
3934 qlog.header(&packet.header, Some(packet_number), path_id);
3935
3936 self.process_decrypted_packet(
3937 now,
3938 network_path,
3939 path_id,
3940 Some(packet_number),
3941 packet,
3942 &mut qlog,
3943 )?;
3944 self.qlog.emit_packet_received(qlog, now);
3945 if let Some(data) = remaining {
3946 self.handle_coalesced(now, network_path, path_id, ecn, data);
3947 }
3948
3949 self.qlog.emit_recovery_metrics(
3950 path_id,
3951 &mut self
3952 .paths
3953 .get_mut(&path_id)
3954 .expect("path_id was supplied by the caller for an active path")
3955 .data,
3956 now,
3957 );
3958
3959 Ok(())
3960 }
3961
3962 fn init_0rtt(&mut self, now: Instant) {
3963 let Some((header, packet)) = self.crypto_state.session.early_crypto() else {
3964 return;
3965 };
3966 if self.side.is_client() {
3967 match self.crypto_state.session.transport_parameters() {
3968 Ok(params) => {
3969 let params = params
3970 .expect("crypto layer didn't supply transport parameters with ticket");
3971 let params = TransportParameters {
3973 initial_src_cid: None,
3974 original_dst_cid: None,
3975 preferred_address: None,
3976 retry_src_cid: None,
3977 stateless_reset_token: None,
3978 min_ack_delay: None,
3979 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3980 max_ack_delay: TransportParameters::default().max_ack_delay,
3981 initial_max_path_id: None,
3982 ..params
3983 };
3984 self.set_peer_params(params);
3985 self.qlog.emit_peer_transport_params_restored(self, now);
3986 }
3987 Err(e) => {
3988 error!("session ticket has malformed transport parameters: {}", e);
3989 return;
3990 }
3991 }
3992 }
3993 trace!("0-RTT enabled");
3994 self.crypto_state.enable_zero_rtt(header, packet);
3995 }
3996
3997 fn read_crypto(
3998 &mut self,
3999 space: SpaceId,
4000 crypto: &frame::Crypto,
4001 payload_len: usize,
4002 ) -> Result<(), TransportError> {
4003 let expected = if !self.state.is_handshake() {
4004 SpaceId::Data
4005 } else if self.highest_space == SpaceKind::Initial {
4006 SpaceId::Initial
4007 } else {
4008 SpaceId::Handshake
4011 };
4012 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
4016
4017 let end = crypto.offset + crypto.data.len() as u64;
4018 if space < expected
4019 && end
4020 > self.crypto_state.spaces[space.kind()]
4021 .crypto_stream
4022 .bytes_read()
4023 {
4024 warn!(
4025 "received new {:?} CRYPTO data when expecting {:?}",
4026 space, expected
4027 );
4028 return Err(TransportError::PROTOCOL_VIOLATION(
4029 "new data at unexpected encryption level",
4030 ));
4031 }
4032
4033 let crypto_space = &mut self.crypto_state.spaces[space.kind()];
4034 let max = end.saturating_sub(crypto_space.crypto_stream.bytes_read());
4035 if max > self.config.crypto_buffer_size as u64 {
4036 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
4037 }
4038
4039 crypto_space
4040 .crypto_stream
4041 .insert(crypto.offset, crypto.data.clone(), payload_len);
4042 while let Some(chunk) = crypto_space.crypto_stream.read(usize::MAX, true) {
4043 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
4044 if self.crypto_state.session.read_handshake(&chunk.bytes)? {
4045 self.events.push_back(Event::HandshakeDataReady);
4046 }
4047 }
4048
4049 Ok(())
4050 }
4051
4052 fn write_crypto(&mut self) {
4053 loop {
4054 let space = self.highest_space;
4055 let mut outgoing = Vec::new();
4056 if let Some(crypto) = self.crypto_state.session.write_handshake(&mut outgoing) {
4057 match space {
4058 SpaceKind::Initial => {
4059 self.upgrade_crypto(SpaceKind::Handshake, crypto);
4060 }
4061 SpaceKind::Handshake => {
4062 self.upgrade_crypto(SpaceKind::Data, crypto);
4063 }
4064 SpaceKind::Data => unreachable!("got updated secrets during 1-RTT"),
4065 }
4066 }
4067 if outgoing.is_empty() {
4068 if space == self.highest_space {
4069 break;
4070 } else {
4071 continue;
4073 }
4074 }
4075 let offset = self.crypto_state.spaces[space].crypto_offset;
4076 let outgoing = Bytes::from(outgoing);
4077 if let Some(hs) = self.state.as_handshake_mut()
4078 && space == SpaceKind::Initial
4079 && offset == 0
4080 && self.side.is_client()
4081 {
4082 hs.client_hello = Some(outgoing.clone());
4083 }
4084 self.crypto_state.spaces[space].crypto_offset += outgoing.len() as u64;
4085 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
4086 self.spaces[space].pending.crypto.push_back(frame::Crypto {
4087 offset,
4088 data: outgoing,
4089 });
4090 }
4091 }
4092
4093 fn upgrade_crypto(&mut self, space: SpaceKind, crypto: Keys) {
4095 debug_assert!(
4096 !self.crypto_state.has_keys(space.encryption_level()),
4097 "already reached packet space {space:?}"
4098 );
4099 trace!("{:?} keys ready", space);
4100 if space == SpaceKind::Data {
4101 self.crypto_state.next_crypto = Some(
4103 self.crypto_state
4104 .session
4105 .next_1rtt_keys()
4106 .expect("handshake should be complete"),
4107 );
4108 }
4109
4110 self.crypto_state.spaces[space].keys = Some(crypto);
4111 debug_assert!(space > self.highest_space);
4112 self.highest_space = space;
4113 if space == SpaceKind::Data && self.side.is_client() {
4114 self.crypto_state.discard_zero_rtt();
4116 }
4117 }
4118
4119 fn discard_space(&mut self, now: Instant, space: SpaceKind) {
4120 debug_assert!(space != SpaceKind::Data);
4121 trace!("discarding {:?} keys", space);
4122 if space == SpaceKind::Initial {
4123 if let ConnectionSide::Client { token, .. } = &mut self.side {
4125 *token = Bytes::new();
4126 }
4127 }
4128 self.crypto_state.spaces[space].keys = None;
4129 let space = &mut self.spaces[space];
4130 let pns = space.for_path(PathId::ZERO);
4131 pns.time_of_last_ack_eliciting_packet = None;
4132 pns.loss_time = None;
4133 pns.loss_probes = 0;
4134 let sent_packets = mem::take(&mut pns.sent_packets);
4135 let path = self
4136 .paths
4137 .get_mut(&PathId::ZERO)
4138 .expect("PathId::ZERO is alive while Initial/Handshake spaces exist");
4139 for (_, packet) in sent_packets.into_iter() {
4140 path.data.remove_in_flight(&packet);
4141 }
4142
4143 self.set_loss_detection_timer(now, PathId::ZERO)
4144 }
4145
4146 fn handle_coalesced(
4147 &mut self,
4148 now: Instant,
4149 network_path: FourTuple,
4150 path_id: PathId,
4151 ecn: Option<EcnCodepoint>,
4152 data: BytesMut,
4153 ) {
4154 self.path_data_mut(path_id)
4155 .inc_total_recvd(data.len() as u64);
4156 let mut remaining = Some(data);
4157 let cid_len = self
4158 .local_cid_state
4159 .values()
4160 .map(|cid_state| cid_state.cid_len())
4161 .next()
4162 .expect("one cid_state must exist");
4163 while let Some(data) = remaining {
4164 match PartialDecode::new(
4165 data,
4166 &FixedLengthConnectionIdParser::new(cid_len),
4167 &[self.version],
4168 self.endpoint_config.grease_quic_bit,
4169 ) {
4170 Ok((partial_decode, rest)) => {
4171 remaining = rest;
4172 self.handle_decode(now, network_path, path_id, ecn, partial_decode);
4173 }
4174 Err(e) => {
4175 trace!("malformed header: {}", e);
4176 return;
4177 }
4178 }
4179 }
4180 }
4181
4182 fn handle_decode(
4188 &mut self,
4189 now: Instant,
4190 network_path: FourTuple,
4191 path_id: PathId,
4192 ecn: Option<EcnCodepoint>,
4193 partial_decode: PartialDecode,
4194 ) {
4195 let qlog = QlogRecvPacket::new(partial_decode.len());
4196 if let Some(decoded) = self
4197 .crypto_state
4198 .unprotect_header(partial_decode, self.peer_params.stateless_reset_token)
4199 {
4200 self.handle_packet(
4201 now,
4202 network_path,
4203 path_id,
4204 ecn,
4205 decoded.packet,
4206 decoded.stateless_reset,
4207 qlog,
4208 );
4209 }
4210 }
4211
4212 fn handle_packet(
4219 &mut self,
4220 now: Instant,
4221 network_path: FourTuple,
4222 path_id: PathId,
4223 ecn: Option<EcnCodepoint>,
4224 packet: Option<Packet>,
4225 stateless_reset: bool,
4226 mut qlog: QlogRecvPacket,
4227 ) {
4228 self.path_stats.for_path(path_id).udp_rx.ios += 1;
4229
4230 if let Some(ref packet) = packet {
4231 trace!(
4232 "got {:?} packet ({} bytes) from {} using id {}",
4233 packet.header.space(),
4234 packet.payload.len() + packet.header_data.len(),
4235 network_path,
4236 packet.header.dst_cid(),
4237 );
4238 }
4239
4240 let was_closed = self.state.is_closed();
4241 let was_drained = self.state.is_drained();
4242
4243 let decrypted = match packet {
4245 None => Err(None),
4246 Some(mut packet) => self
4247 .decrypt_packet(now, path_id, &mut packet)
4248 .map(move |number| (packet, number)),
4249 };
4250 let result = match decrypted {
4251 _ if stateless_reset => {
4252 debug!("got stateless reset");
4253 Err(ConnectionError::Reset)
4254 }
4255 Err(Some(e)) => {
4256 warn!("illegal packet: {}", e);
4257 Err(e.into())
4258 }
4259 Err(None) => {
4260 debug!("failed to authenticate packet");
4261 self.authentication_failures += 1;
4262 let integrity_limit = self
4263 .crypto_state
4264 .integrity_limit(self.highest_space)
4265 .unwrap();
4266 if self.authentication_failures > integrity_limit {
4267 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
4268 } else {
4269 return;
4270 }
4271 }
4272 Ok((packet, pn)) => {
4273 qlog.header(&packet.header, pn, path_id);
4275 let span = match pn {
4276 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
4277 None => trace_span!("recv", space = ?packet.header.space()),
4278 };
4279 let _guard = span.enter();
4280
4281 if self.is_handshaking()
4289 && self
4290 .path(path_id)
4291 .map(|path_data| {
4292 !path_data.network_path.is_probably_same_path(&network_path)
4293 })
4294 .unwrap_or(false)
4295 {
4296 if let Some(hs) = self.state.as_handshake()
4297 && hs.allow_server_migration
4298 {
4299 trace!(
4300 %network_path,
4301 prev = %self.path_data(path_id).network_path,
4302 "server migrated to new remote",
4303 );
4304 self.path_data_mut(path_id).network_path = network_path;
4305 self.qlog.emit_tuple_assigned(path_id, network_path, now);
4306 } else {
4307 debug!(
4308 recv_path = %network_path,
4309 expected_path = %self.path_data_mut(path_id).network_path,
4310 "discarding packet with unexpected remote during handshake",
4311 );
4312 return;
4313 }
4314 }
4315
4316 let dedup = self.spaces[packet.header.space()]
4317 .path_space_mut(path_id)
4318 .map(|pns| &mut pns.dedup);
4319 if pn.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
4320 debug!("discarding possible duplicate packet");
4321 self.qlog.emit_packet_received(qlog, now);
4322 return;
4323 } else if self.state.is_handshake() && packet.header.is_short() {
4324 trace!("dropping short packet during handshake");
4326 self.qlog.emit_packet_received(qlog, now);
4327 return;
4328 } else {
4329 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header
4330 && let Some(hs) = self.state.as_handshake()
4331 && self.side.is_server()
4332 && token != &hs.expected_token
4333 {
4334 warn!("discarding Initial with invalid retry token");
4338 self.qlog.emit_packet_received(qlog, now);
4339 return;
4340 }
4341
4342 if !self.state.is_closed() {
4343 let spin = match packet.header {
4344 Header::Short { spin, .. } => spin,
4345 _ => false,
4346 };
4347
4348 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
4349 self.ensure_path(path_id, network_path, now, pn);
4351 }
4352 if self.paths.contains_key(&path_id) {
4353 self.on_packet_authenticated(
4354 now,
4355 packet.header.space(),
4356 path_id,
4357 ecn,
4358 pn,
4359 spin,
4360 packet.header.is_1rtt(),
4361 &network_path,
4362 );
4363 }
4364 }
4365
4366 let res = self.process_decrypted_packet(
4367 now,
4368 network_path,
4369 path_id,
4370 pn,
4371 packet,
4372 &mut qlog,
4373 );
4374
4375 self.qlog.emit_packet_received(qlog, now);
4376 res
4377 }
4378 }
4379 };
4380
4381 if let Err(conn_err) = result {
4383 match conn_err {
4384 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
4385 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
4386 ConnectionError::Reset
4387 | ConnectionError::TransportError(TransportError {
4388 code: TransportErrorCode::AEAD_LIMIT_REACHED,
4389 ..
4390 }) => {
4391 let was_draining = self.state.move_to_drained(Some(conn_err));
4392 if !was_draining {
4393 self.endpoint_events.push_back(EndpointEventInner::Draining);
4394 }
4395 }
4396 ConnectionError::TimedOut => {
4397 unreachable!("timeouts aren't generated by packet processing");
4398 }
4399 ConnectionError::TransportError(err) => {
4400 debug!("closing connection due to transport error: {}", err);
4401 self.state.move_to_closed(err);
4402 }
4403 ConnectionError::VersionMismatch => {
4404 self.state.move_to_draining(Some(conn_err));
4405 self.endpoint_events.push_back(EndpointEventInner::Draining);
4406 }
4407 ConnectionError::LocallyClosed => {
4408 unreachable!("LocallyClosed isn't generated by packet processing");
4409 }
4410 ConnectionError::CidsExhausted => {
4411 unreachable!("CidsExhausted isn't generated by packet processing");
4412 }
4413 };
4414 }
4415
4416 if !was_closed && self.state.is_closed() {
4417 self.close_common();
4418 if !self.state.is_drained() {
4419 self.set_close_timer(now);
4420 }
4421 }
4422 if !was_drained && self.state.is_drained() {
4423 self.endpoint_events.push_back(EndpointEventInner::Drained);
4424 self.timers
4427 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
4428 }
4429
4430 if matches!(self.state.as_type(), StateType::Closed) {
4437 if self
4455 .paths
4456 .get(&path_id)
4457 .map(|p| p.data.validated && p.data.network_path == network_path)
4458 .unwrap_or(false)
4459 {
4460 self.connection_close_pending = true;
4461 }
4462 }
4463 }
4464
4465 fn process_decrypted_packet(
4466 &mut self,
4467 now: Instant,
4468 network_path: FourTuple,
4469 path_id: PathId,
4470 number: Option<u64>,
4471 packet: Packet,
4472 qlog: &mut QlogRecvPacket,
4473 ) -> Result<(), ConnectionError> {
4474 if !self.paths.contains_key(&path_id) {
4475 trace!(%path_id, ?number, "discarding packet for unknown path");
4479 return Ok(());
4480 }
4481 let state = match self.state.as_type() {
4482 StateType::Established => {
4483 match packet.header.space() {
4484 SpaceKind::Data => self.process_payload(
4485 now,
4486 network_path,
4487 path_id,
4488 number.unwrap(),
4489 packet,
4490 qlog,
4491 )?,
4492 _ if packet.header.has_frames() => {
4493 self.process_early_payload(now, path_id, packet, qlog)?
4494 }
4495 _ => {
4496 trace!("discarding unexpected pre-handshake packet");
4497 }
4498 }
4499 return Ok(());
4500 }
4501 StateType::Closed => {
4502 for result in frame::Iter::new(packet.payload.freeze())? {
4503 let frame = match result {
4504 Ok(frame) => frame,
4505 Err(err) => {
4506 debug!("frame decoding error: {err:?}");
4507 continue;
4508 }
4509 };
4510 qlog.frame(&frame);
4511
4512 if let Frame::Padding = frame {
4513 continue;
4514 };
4515
4516 trace!(?frame, "processing frame in closed state");
4517
4518 self.path_stats
4519 .for_path(path_id)
4520 .frame_rx
4521 .record(frame.ty());
4522
4523 if let Frame::Close(_error) = frame {
4524 self.state.move_to_draining(None);
4525 self.endpoint_events.push_back(EndpointEventInner::Draining);
4526 break;
4527 }
4528 }
4529 return Ok(());
4530 }
4531 StateType::Draining | StateType::Drained => return Ok(()),
4532 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
4533 };
4534
4535 match packet.header {
4536 Header::Retry {
4537 src_cid: remote_cid,
4538 ..
4539 } => {
4540 debug_assert_eq!(path_id, PathId::ZERO);
4541 if self.side.is_server() {
4542 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
4543 }
4544
4545 let is_valid_retry = self
4546 .remote_cids
4547 .get(&path_id)
4548 .map(|cids| cids.active())
4549 .map(|orig_dst_cid| {
4550 self.crypto_state.session.is_valid_retry(
4551 orig_dst_cid,
4552 &packet.header_data,
4553 &packet.payload,
4554 )
4555 })
4556 .unwrap_or_default();
4557 if self.total_authed_packets > 1
4558 || packet.payload.len() <= 16 || !is_valid_retry
4560 {
4561 trace!("discarding invalid Retry");
4562 return Ok(());
4570 }
4571
4572 trace!("retrying with CID {}", remote_cid);
4573 let client_hello = state.client_hello.take().unwrap();
4574 self.retry_src_cid = Some(remote_cid);
4575 self.remote_cids
4576 .get_mut(&path_id)
4577 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
4578 .update_initial_cid(remote_cid);
4579 self.remote_handshake_cid = remote_cid;
4580
4581 let space = &mut self.spaces[SpaceId::Initial];
4582 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
4583 self.on_packet_acked(now, PathId::ZERO, 0, info);
4584 };
4585
4586 self.discard_space(now, SpaceKind::Initial); let crypto_space = &mut self.crypto_state.spaces[SpaceKind::Initial];
4589 crypto_space.keys = Some(
4590 self.crypto_state
4591 .session
4592 .initial_keys(remote_cid, self.side.side()),
4593 );
4594 crypto_space.crypto_offset = client_hello.len() as u64;
4595
4596 let next_pn = self.spaces[SpaceId::Initial]
4597 .for_path(path_id)
4598 .next_packet_number;
4599 self.spaces[SpaceId::Initial] = {
4600 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
4601 space.for_path(path_id).next_packet_number = next_pn;
4602 space.pending.crypto.push_back(frame::Crypto {
4603 offset: 0,
4604 data: client_hello,
4605 });
4606 space
4607 };
4608
4609 let zero_rtt = mem::take(
4611 &mut self.spaces[SpaceId::Data]
4612 .for_path(PathId::ZERO)
4613 .sent_packets,
4614 );
4615 for (_, info) in zero_rtt.into_iter() {
4616 self.paths
4617 .get_mut(&PathId::ZERO)
4618 .unwrap()
4619 .remove_in_flight(&info);
4620 self.spaces[SpaceId::Data].pending |= info.retransmits;
4621 }
4622 self.streams.retransmit_all_for_0rtt();
4623
4624 let token_len = packet.payload.len() - 16;
4625 let ConnectionSide::Client { ref mut token, .. } = self.side else {
4626 unreachable!("we already short-circuited if we're server");
4627 };
4628 *token = packet.payload.freeze().split_to(token_len);
4629
4630 self.state = State::handshake(state::Handshake {
4631 expected_token: Bytes::new(),
4632 remote_cid_set: false,
4633 client_hello: None,
4634 allow_server_migration: self.config.server_handshake_migration,
4635 });
4636 Ok(())
4637 }
4638 Header::Long {
4639 ty: LongType::Handshake,
4640 src_cid: remote_cid,
4641 dst_cid: local_cid,
4642 ..
4643 } => {
4644 debug_assert_eq!(path_id, PathId::ZERO);
4645 if remote_cid != self.remote_handshake_cid {
4646 debug!(
4647 "discarding packet with mismatched remote CID: {} != {}",
4648 self.remote_handshake_cid, remote_cid
4649 );
4650 return Ok(());
4651 }
4652 self.on_path_validated(path_id);
4653
4654 self.process_early_payload(now, path_id, packet, qlog)?;
4655 if self.state.is_closed() {
4656 return Ok(());
4657 }
4658
4659 if self.crypto_state.session.is_handshaking() {
4660 trace!("handshake ongoing");
4661 return Ok(());
4662 }
4663
4664 if self.side.is_client() {
4665 let params = self
4667 .crypto_state
4668 .session
4669 .transport_parameters()?
4670 .ok_or_else(|| {
4671 TransportError::new(
4672 TransportErrorCode::crypto(0x6d),
4673 "transport parameters missing".to_owned(),
4674 )
4675 })?;
4676
4677 if self.has_0rtt() {
4678 if !self.crypto_state.session.early_data_accepted().unwrap() {
4679 debug_assert!(self.side.is_client());
4680 debug!("0-RTT rejected");
4681 self.crypto_state.accepted_0rtt = false;
4682 self.streams.zero_rtt_rejected();
4683
4684 self.spaces[SpaceId::Data].pending = Retransmits::default();
4686
4687 let sent_packets = mem::take(
4689 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4690 );
4691 for (_, packet) in sent_packets.into_iter() {
4692 self.paths
4693 .get_mut(&path_id)
4694 .unwrap()
4695 .remove_in_flight(&packet);
4696 }
4697 } else {
4698 self.crypto_state.accepted_0rtt = true;
4699 params.validate_resumption_from(&self.peer_params)?;
4700 }
4701 }
4702 if let Some(token) = params.stateless_reset_token {
4703 let remote = self.path_data(path_id).network_path.remote;
4704 debug_assert!(!self.state.is_drained()); self.endpoint_events
4706 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4707 }
4708 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4709 self.issue_first_cids(now);
4710 } else {
4711 self.spaces[SpaceId::Data].pending.handshake_done = true;
4713 self.discard_space(now, SpaceKind::Handshake);
4714 self.events.push_back(Event::HandshakeConfirmed);
4715 trace!("handshake confirmed");
4716 }
4717
4718 self.events.push_back(Event::Connected);
4719 self.state.move_to_established();
4720 trace!("established");
4721
4722 self.issue_first_path_cids(now);
4725 Ok(())
4726 }
4727 Header::Initial(InitialHeader {
4728 src_cid: remote_cid,
4729 dst_cid: local_cid,
4730 ..
4731 }) => {
4732 debug_assert_eq!(path_id, PathId::ZERO);
4733 if !state.remote_cid_set {
4734 trace!("switching remote CID to {}", remote_cid);
4735 let mut state = state.clone();
4736 self.remote_cids
4737 .get_mut(&path_id)
4738 .expect("PathId::ZERO not yet abandoned")
4739 .update_initial_cid(remote_cid);
4740 self.remote_handshake_cid = remote_cid;
4741 self.original_remote_cid = remote_cid;
4742 state.remote_cid_set = true;
4743 self.state.move_to_handshake(state);
4744 } else if remote_cid != self.remote_handshake_cid {
4745 debug!(
4746 "discarding packet with mismatched remote CID: {} != {}",
4747 self.remote_handshake_cid, remote_cid
4748 );
4749 return Ok(());
4750 }
4751
4752 let starting_space = self.highest_space;
4753 self.process_early_payload(now, path_id, packet, qlog)?;
4754
4755 if self.side.is_server()
4756 && starting_space == SpaceKind::Initial
4757 && self.highest_space != SpaceKind::Initial
4758 {
4759 let params = self
4760 .crypto_state
4761 .session
4762 .transport_parameters()?
4763 .ok_or_else(|| {
4764 TransportError::new(
4765 TransportErrorCode::crypto(0x6d),
4766 "transport parameters missing".to_owned(),
4767 )
4768 })?;
4769 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4770 self.issue_first_cids(now);
4771 self.init_0rtt(now);
4772 }
4773 Ok(())
4774 }
4775 Header::Long {
4776 ty: LongType::ZeroRtt,
4777 ..
4778 } => {
4779 self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4780 Ok(())
4781 }
4782 Header::VersionNegotiate { .. } => {
4783 if self.total_authed_packets > 1 {
4784 return Ok(());
4785 }
4786 let supported = packet
4787 .payload
4788 .chunks(4)
4789 .any(|x| match <[u8; 4]>::try_from(x) {
4790 Ok(version) => self.version == u32::from_be_bytes(version),
4791 Err(_) => false,
4792 });
4793 if supported {
4794 return Ok(());
4795 }
4796 debug!("remote doesn't support our version");
4797 Err(ConnectionError::VersionMismatch)
4798 }
4799 Header::Short { .. } => unreachable!(
4800 "short packets received during handshake are discarded in handle_packet"
4801 ),
4802 }
4803 }
4804
4805 fn process_early_payload(
4807 &mut self,
4808 now: Instant,
4809 path_id: PathId,
4810 packet: Packet,
4811 #[allow(unused)] qlog: &mut QlogRecvPacket,
4812 ) -> Result<(), TransportError> {
4813 debug_assert_ne!(packet.header.space(), SpaceKind::Data);
4814 debug_assert_eq!(path_id, PathId::ZERO);
4815 let payload_len = packet.payload.len();
4816 let mut ack_eliciting = false;
4817 for result in frame::Iter::new(packet.payload.freeze())? {
4818 let frame = result?;
4819 qlog.frame(&frame);
4820 let span = match frame {
4821 Frame::Padding => continue,
4822 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4823 };
4824
4825 self.path_stats
4826 .for_path(path_id)
4827 .frame_rx
4828 .record(frame.ty());
4829
4830 let _guard = span.as_ref().map(|x| x.enter());
4831 ack_eliciting |= frame.is_ack_eliciting();
4832
4833 if frame.is_1rtt() && packet.header.space() != SpaceKind::Data {
4835 return Err(TransportError::PROTOCOL_VIOLATION(
4836 "illegal frame type in handshake",
4837 ));
4838 }
4839
4840 match frame {
4841 Frame::Padding | Frame::Ping => {}
4842 Frame::Crypto(frame) => {
4843 self.read_crypto(packet.header.space().into(), &frame, payload_len)?;
4844 }
4845 Frame::Ack(ack) => {
4846 self.on_ack_received(now, packet.header.space().into(), ack)?;
4847 }
4848 Frame::PathAck(ack) => {
4849 span.as_ref()
4850 .map(|span| span.record("path", tracing::field::display(&ack.path_id)));
4851 self.on_path_ack_received(now, packet.header.space().into(), ack)?;
4852 }
4853 Frame::Close(reason) => {
4854 self.state.move_to_draining(Some(reason.into()));
4855 self.endpoint_events.push_back(EndpointEventInner::Draining);
4856 return Ok(());
4857 }
4858 _ => {
4859 let mut err =
4860 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4861 err.frame = frame::MaybeFrame::Known(frame.ty());
4862 return Err(err);
4863 }
4864 }
4865 }
4866
4867 if ack_eliciting {
4868 self.spaces[packet.header.space()]
4870 .for_path(path_id)
4871 .pending_acks
4872 .set_immediate_ack_required();
4873 }
4874
4875 self.write_crypto();
4876 Ok(())
4877 }
4878
4879 fn process_payload(
4881 &mut self,
4882 now: Instant,
4883 network_path: FourTuple,
4884 path_id: PathId,
4885 number: u64,
4886 packet: Packet,
4887 #[allow(unused)] qlog: &mut QlogRecvPacket,
4888 ) -> Result<(), TransportError> {
4889 let is_multipath_negotiated = self.is_multipath_negotiated();
4890 let payload = packet.payload.freeze();
4891 let mut is_probing_packet = true;
4892 let mut close = None;
4893 let payload_len = payload.len();
4894 let mut ack_eliciting = false;
4895 let mut migration_observed_addr = None;
4898 for result in frame::Iter::new(payload)? {
4899 let frame = result?;
4900 qlog.frame(&frame);
4901 let span = match frame {
4902 Frame::Padding => continue,
4903 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4904 };
4905
4906 self.path_stats
4907 .for_path(path_id)
4908 .frame_rx
4909 .record(frame.ty());
4910 match &frame {
4913 Frame::Crypto(f) => {
4914 trace!(offset = f.offset, len = f.data.len(), "got frame CRYPTO");
4915 }
4916 Frame::Stream(f) => {
4917 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got frame STREAM");
4918 }
4919 Frame::Datagram(f) => {
4920 trace!(len = f.data.len(), "got frame DATAGRAM");
4921 }
4922 f => {
4923 trace!("got frame {f}");
4924 }
4925 }
4926
4927 let _guard = span.enter();
4928 if packet.header.is_0rtt() {
4929 match frame {
4930 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4931 return Err(TransportError::PROTOCOL_VIOLATION(
4932 "illegal frame type in 0-RTT",
4933 ));
4934 }
4935 _ => {
4936 if frame.is_1rtt() {
4937 return Err(TransportError::PROTOCOL_VIOLATION(
4938 "illegal frame type in 0-RTT",
4939 ));
4940 }
4941 }
4942 }
4943 }
4944 ack_eliciting |= frame.is_ack_eliciting();
4945
4946 match frame {
4948 Frame::Padding
4949 | Frame::PathChallenge(_)
4950 | Frame::PathResponse(_)
4951 | Frame::NewConnectionId(_)
4952 | Frame::ObservedAddr(_) => {}
4953 _ => {
4954 is_probing_packet = false;
4955 }
4956 }
4957
4958 match frame {
4959 Frame::Crypto(frame) => {
4960 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4961 }
4962 Frame::Stream(frame) => {
4963 if self.streams.received(frame, payload_len)?.should_transmit() {
4964 self.spaces[SpaceId::Data].pending.max_data = true;
4965 }
4966 }
4967 Frame::Ack(ack) => {
4968 self.on_ack_received(now, SpaceId::Data, ack)?;
4969 }
4970 Frame::PathAck(ack) => {
4971 if !self.is_multipath_negotiated() {
4972 return Err(TransportError::PROTOCOL_VIOLATION(
4973 "received PATH_ACK frame when multipath was not negotiated",
4974 ));
4975 }
4976 span.record("path", tracing::field::display(&ack.path_id));
4977 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4978 }
4979 Frame::Padding | Frame::Ping => {}
4980 Frame::Close(reason) => {
4981 close = Some(reason);
4982 }
4983 Frame::PathChallenge(challenge) => {
4984 let path = &mut self
4985 .path_mut(path_id)
4986 .expect("payload is processed only after the path becomes known");
4987 path.path_responses.push(number, challenge.0, network_path);
4988 if network_path.remote == path.network_path.remote {
4992 match self.peer_supports_ack_frequency() {
5000 true => self.immediate_ack(path_id),
5001 false => {
5002 self.ping_path(path_id).ok();
5003 }
5004 }
5005 }
5006 }
5007 Frame::PathResponse(response) => {
5008 if self
5010 .n0_nat_traversal
5011 .handle_path_response(network_path, response.0)
5012 {
5013 self.open_nat_traversed_paths(now);
5014 } else {
5015 let path = self
5018 .paths
5019 .get_mut(&path_id)
5020 .expect("payload is processed only after the path becomes known");
5021
5022 use PathTimer::*;
5023 use paths::OnPathResponseReceived::*;
5024 match path
5025 .data
5026 .on_path_response_received(now, response.0, network_path)
5027 {
5028 OnPath { was_open } if !self.abandoned_paths.contains(&path_id) => {
5029 let qlog = self.qlog.with_time(now);
5030
5031 self.timers.stop(
5032 Timer::PerPath(path_id, PathValidationFailed),
5033 qlog.clone(),
5034 );
5035 self.timers.stop(
5036 Timer::PerPath(path_id, AbandonFromValidation),
5037 qlog.clone(),
5038 );
5039
5040 let next_challenge = path
5041 .data
5042 .earliest_on_path_expiring_challenge()
5043 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
5044 self.timers.set_or_stop(
5045 Timer::PerPath(path_id, PathChallengeLost),
5046 next_challenge,
5047 qlog,
5048 );
5049
5050 if !was_open {
5051 if is_multipath_negotiated {
5052 self.events.push_back(Event::Path(
5053 PathEvent::Established { id: path_id },
5054 ));
5055 }
5056 if let Some(observed) =
5057 path.data.last_observed_addr_report.as_ref()
5058 {
5059 self.events.push_back(Event::Path(
5060 PathEvent::ObservedAddr {
5061 id: path_id,
5062 addr: observed.socket_addr(),
5063 },
5064 ));
5065 }
5066 }
5067 if let Some((_, ref mut prev)) = path.prev {
5068 prev.reset_on_path_challenges();
5073 }
5074 }
5075 OnPath { .. } => {
5076 trace!(
5077 %response,
5078 "ignoring PATH_RESPONSE received after path is abandoned"
5079 );
5080 }
5081 Ignored {
5082 sent_on,
5083 current_path,
5084 } => {
5085 debug!(%sent_on, %current_path, %response, "ignoring valid PATH_RESPONSE")
5086 }
5087 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
5088 }
5089 }
5090 }
5091 Frame::MaxData(frame::MaxData(bytes)) => {
5092 self.streams.received_max_data(bytes);
5093 }
5094 Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
5095 self.streams.received_max_stream_data(id, offset)?;
5096 }
5097 Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
5098 self.streams.received_max_streams(dir, count)?;
5099 }
5100 Frame::ResetStream(frame) => {
5101 if self.streams.received_reset(frame)?.should_transmit() {
5102 self.spaces[SpaceId::Data].pending.max_data = true;
5103 }
5104 }
5105 Frame::DataBlocked(DataBlocked(offset)) => {
5106 debug!(offset, "peer claims to be blocked at connection level");
5107 }
5108 Frame::StreamDataBlocked(StreamDataBlocked { id, offset }) => {
5109 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
5110 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
5111 return Err(TransportError::STREAM_STATE_ERROR(
5112 "STREAM_DATA_BLOCKED on send-only stream",
5113 ));
5114 }
5115 debug!(
5116 stream = %id,
5117 offset, "peer claims to be blocked at stream level"
5118 );
5119 }
5120 Frame::StreamsBlocked(StreamsBlocked { dir, limit }) => {
5121 if limit > MAX_STREAM_COUNT {
5122 return Err(TransportError::FRAME_ENCODING_ERROR(
5123 "unrepresentable stream limit",
5124 ));
5125 }
5126 debug!(
5127 "peer claims to be blocked opening more than {} {} streams",
5128 limit, dir
5129 );
5130 }
5131 Frame::StopSending(frame::StopSending { id, error_code }) => {
5132 if id.initiator() != self.side.side() {
5133 if id.dir() == Dir::Uni {
5134 debug!("got STOP_SENDING on recv-only {}", id);
5135 return Err(TransportError::STREAM_STATE_ERROR(
5136 "STOP_SENDING on recv-only stream",
5137 ));
5138 }
5139 } else if self.streams.is_local_unopened(id) {
5140 return Err(TransportError::STREAM_STATE_ERROR(
5141 "STOP_SENDING on unopened stream",
5142 ));
5143 }
5144 self.streams.received_stop_sending(id, error_code);
5145 }
5146 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
5147 if let Some(ref path_id) = path_id {
5148 span.record("path", tracing::field::display(&path_id));
5149 }
5150 let path_id = path_id.unwrap_or_default();
5151 match self.local_cid_state.get_mut(&path_id) {
5152 None => debug!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
5153 Some(cid_state) => {
5154 let allow_more_cids = cid_state
5155 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
5156
5157 let has_path = !self.abandoned_paths.contains(&path_id);
5161 let allow_more_cids = allow_more_cids && has_path;
5162
5163 debug_assert!(!self.state.is_drained()); self.endpoint_events
5165 .push_back(EndpointEventInner::RetireConnectionId(
5166 now,
5167 path_id,
5168 sequence,
5169 allow_more_cids,
5170 ));
5171 }
5172 }
5173 }
5174 Frame::NewConnectionId(frame) => {
5175 let path_id = if let Some(path_id) = frame.path_id {
5176 if !self.is_multipath_negotiated() {
5177 return Err(TransportError::PROTOCOL_VIOLATION(
5178 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
5179 ));
5180 }
5181 if path_id > self.local_max_path_id {
5182 return Err(TransportError::PROTOCOL_VIOLATION(
5183 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
5184 ));
5185 }
5186 path_id
5187 } else {
5188 PathId::ZERO
5189 };
5190
5191 if let Some(ref path_id) = frame.path_id {
5192 span.record("path", tracing::field::display(&path_id));
5193 }
5194
5195 if self.abandoned_paths.contains(&path_id) {
5196 trace!("ignoring issued CID for abandoned path");
5197 continue;
5198 }
5199 let remote_cids = self
5200 .remote_cids
5201 .entry(path_id)
5202 .or_insert_with(|| CidQueue::new(frame.id));
5203 if remote_cids.active().is_empty() {
5204 return Err(TransportError::PROTOCOL_VIOLATION(
5205 "NEW_CONNECTION_ID when CIDs aren't in use",
5206 ));
5207 }
5208 if frame.retire_prior_to > frame.sequence {
5209 return Err(TransportError::PROTOCOL_VIOLATION(
5210 "NEW_CONNECTION_ID retiring unissued CIDs",
5211 ));
5212 }
5213
5214 use crate::cid_queue::InsertError;
5215 match remote_cids.insert(frame) {
5216 Ok(None) => {
5217 self.open_nat_traversed_paths(now);
5218 }
5219 Ok(Some((retired, reset_token))) => {
5220 let pending_retired =
5221 &mut self.spaces[SpaceId::Data].pending.retire_cids;
5222 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
5225 if (pending_retired.len() as u64)
5228 .saturating_add(retired.end.saturating_sub(retired.start))
5229 > MAX_PENDING_RETIRED_CIDS
5230 {
5231 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
5232 "queued too many retired CIDs",
5233 ));
5234 }
5235 pending_retired.extend(retired.map(|seq| (path_id, seq)));
5236 self.set_reset_token(path_id, network_path.remote, reset_token);
5237 self.open_nat_traversed_paths(now);
5238 }
5239 Err(InsertError::ExceedsLimit) => {
5240 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
5241 }
5242 Err(InsertError::Retired) => {
5243 trace!("discarding already-retired");
5244 self.spaces[SpaceId::Data]
5248 .pending
5249 .retire_cids
5250 .push((path_id, frame.sequence));
5251 continue;
5252 }
5253 };
5254
5255 if self.side.is_server()
5256 && path_id == PathId::ZERO
5257 && self
5258 .remote_cids
5259 .get(&PathId::ZERO)
5260 .map(|cids| cids.active_seq() == 0)
5261 .unwrap_or_default()
5262 {
5263 self.update_remote_cid(PathId::ZERO);
5266 }
5267 }
5268 Frame::NewToken(NewToken { token }) => {
5269 let ConnectionSide::Client {
5270 token_store,
5271 server_name,
5272 ..
5273 } = &self.side
5274 else {
5275 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
5276 };
5277 if token.is_empty() {
5278 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
5279 }
5280 trace!("got new token");
5281 token_store.insert(server_name, token);
5282 }
5283 Frame::Datagram(datagram) => {
5284 if self
5285 .datagrams
5286 .received(datagram, &self.config.datagram_receive_buffer_size)?
5287 {
5288 self.events.push_back(Event::DatagramReceived);
5289 }
5290 }
5291 Frame::AckFrequency(ack_frequency) => {
5292 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
5295 continue;
5298 }
5299
5300 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
5302 space.pending_acks.set_ack_frequency_params(&ack_frequency);
5303
5304 if !self.abandoned_paths.contains(path_id)
5308 && let Some(timeout) = space
5309 .pending_acks
5310 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
5311 {
5312 self.timers.set(
5313 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
5314 timeout,
5315 self.qlog.with_time(now),
5316 );
5317 }
5318 }
5319 }
5320 Frame::ImmediateAck => {
5321 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
5323 pns.pending_acks.set_immediate_ack_required();
5324 }
5325 }
5326 Frame::HandshakeDone => {
5327 if self.side.is_server() {
5328 return Err(TransportError::PROTOCOL_VIOLATION(
5329 "client sent HANDSHAKE_DONE",
5330 ));
5331 }
5332 if self.crypto_state.has_keys(EncryptionLevel::Handshake) {
5333 self.discard_space(now, SpaceKind::Handshake);
5334 self.events.push_back(Event::HandshakeConfirmed);
5335 trace!("handshake confirmed");
5336 }
5337 }
5338 Frame::ObservedAddr(observed) => {
5339 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
5341 if !self
5342 .peer_params
5343 .address_discovery_role
5344 .should_report(&self.config.address_discovery_role)
5345 {
5346 return Err(TransportError::PROTOCOL_VIOLATION(
5347 "received OBSERVED_ADDRESS frame when not negotiated",
5348 ));
5349 }
5350 if packet.header.space() != SpaceKind::Data {
5352 return Err(TransportError::PROTOCOL_VIOLATION(
5353 "OBSERVED_ADDRESS frame outside data space",
5354 ));
5355 }
5356
5357 let path = self.path_data_mut(path_id);
5358 if path.network_path.remote == network_path.remote {
5359 if let Some(updated) = path.update_observed_addr_report(observed)
5360 && path.open_status == paths::OpenStatus::Informed
5361 {
5362 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5363 id: path_id,
5364 addr: updated,
5365 }));
5366 }
5368 } else {
5369 migration_observed_addr = Some(observed)
5371 }
5372 }
5373 Frame::PathAbandon(frame::PathAbandon {
5374 path_id,
5375 error_code,
5376 }) => {
5377 span.record("path", tracing::field::display(&path_id));
5378 match self.close_path_inner(
5379 now,
5380 path_id,
5381 PathAbandonReason::RemoteAbandoned {
5382 error_code: error_code.into(),
5383 },
5384 ) {
5385 Ok(()) => {
5386 trace!("peer abandoned path");
5387 }
5388 Err(ClosePathError::ClosedPath) => {
5389 trace!("peer abandoned already closed path");
5390 }
5391 Err(ClosePathError::MultipathNotNegotiated) => {
5392 return Err(TransportError::PROTOCOL_VIOLATION(
5393 "received PATH_ABANDON frame when multipath was not negotiated",
5394 ));
5395 }
5396 Err(ClosePathError::LastOpenPath) => {
5397 error!(
5400 "peer abandoned last path but close_path_inner returned LastOpenPath"
5401 );
5402 }
5403 };
5404
5405 if let Some(path) = self.paths.get_mut(&path_id)
5407 && !mem::replace(&mut path.data.draining, true)
5408 {
5409 let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5410 let pto = path.data.rtt.pto_base() + ack_delay;
5411 self.timers.set(
5412 Timer::PerPath(path_id, PathTimer::PathDrained),
5413 now + 3 * pto,
5414 self.qlog.with_time(now),
5415 );
5416
5417 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
5418 }
5419 }
5420 Frame::PathStatusAvailable(info) => {
5421 span.record("path", tracing::field::display(&info.path_id));
5422 if self.is_multipath_negotiated() {
5423 self.on_path_status(
5424 info.path_id,
5425 PathStatus::Available,
5426 info.status_seq_no,
5427 );
5428 } else {
5429 return Err(TransportError::PROTOCOL_VIOLATION(
5430 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
5431 ));
5432 }
5433 }
5434 Frame::PathStatusBackup(info) => {
5435 span.record("path", tracing::field::display(&info.path_id));
5436 if self.is_multipath_negotiated() {
5437 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
5438 } else {
5439 return Err(TransportError::PROTOCOL_VIOLATION(
5440 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
5441 ));
5442 }
5443 }
5444 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
5445 span.record("path", tracing::field::display(&path_id));
5446 if !self.is_multipath_negotiated() {
5447 return Err(TransportError::PROTOCOL_VIOLATION(
5448 "received MAX_PATH_ID frame when multipath was not negotiated",
5449 ));
5450 }
5451 if path_id > self.remote_max_path_id {
5453 self.remote_max_path_id = path_id;
5454 self.issue_first_path_cids(now);
5455 self.open_nat_traversed_paths(now);
5456 }
5457 }
5458 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
5459 if self.is_multipath_negotiated() {
5463 if max_path_id > self.local_max_path_id {
5464 return Err(TransportError::PROTOCOL_VIOLATION(
5465 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
5466 ));
5467 }
5468 debug!("received PATHS_BLOCKED({:?})", max_path_id);
5469 } else {
5471 return Err(TransportError::PROTOCOL_VIOLATION(
5472 "received PATHS_BLOCKED frame when not multipath was not negotiated",
5473 ));
5474 }
5475 }
5476 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
5477 if self.is_multipath_negotiated() {
5485 if path_id > self.local_max_path_id {
5486 return Err(TransportError::PROTOCOL_VIOLATION(
5487 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
5488 ));
5489 }
5490 if next_seq.0
5491 > self
5492 .local_cid_state
5493 .get(&path_id)
5494 .map(|cid_state| cid_state.active_seq().1 + 1)
5495 .unwrap_or_default()
5496 {
5497 return Err(TransportError::PROTOCOL_VIOLATION(
5498 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
5499 ));
5500 }
5501 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
5502 } else {
5503 return Err(TransportError::PROTOCOL_VIOLATION(
5504 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
5505 ));
5506 }
5507 }
5508 Frame::AddAddress(addr) => {
5509 let client_state = match self.n0_nat_traversal.client_side_mut() {
5510 Ok(state) => state,
5511 Err(err) => {
5512 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5513 "Nat traversal(ADD_ADDRESS): {err}"
5514 )));
5515 }
5516 };
5517
5518 if !client_state.check_remote_address(&addr) {
5519 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
5521 }
5522
5523 match client_state.add_remote_address(addr) {
5524 Ok(maybe_added) => {
5525 if let Some(added) = maybe_added {
5526 self.events.push_back(Event::NatTraversal(
5527 n0_nat_traversal::Event::AddressAdded(added),
5528 ));
5529 }
5530 }
5531 Err(e) => {
5532 warn!(%e, "failed to add remote address")
5533 }
5534 }
5535 }
5536 Frame::RemoveAddress(addr) => {
5537 let client_state = match self.n0_nat_traversal.client_side_mut() {
5538 Ok(state) => state,
5539 Err(err) => {
5540 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5541 "Nat traversal(REMOVE_ADDRESS): {err}"
5542 )));
5543 }
5544 };
5545 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
5546 self.events.push_back(Event::NatTraversal(
5547 n0_nat_traversal::Event::AddressRemoved(removed_addr),
5548 ));
5549 }
5550 }
5551 Frame::ReachOut(reach_out) => {
5552 let ipv6 = self.is_ipv6();
5553 let server_state = match self.n0_nat_traversal.server_side_mut() {
5554 Ok(state) => state,
5555 Err(err) => {
5556 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5557 "Nat traversal(REACH_OUT): {err}"
5558 )));
5559 }
5560 };
5561
5562 let round_before = server_state.current_round();
5563
5564 if let Err(err) = server_state.handle_reach_out(reach_out, ipv6) {
5565 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5566 "Nat traversal(REACH_OUT): {err}"
5567 )));
5568 }
5569
5570 if server_state.current_round() > round_before {
5571 if let Some(delay) =
5573 self.n0_nat_traversal.retry_delay(self.config.initial_rtt)
5574 {
5575 self.timers.set(
5576 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
5577 now + delay,
5578 self.qlog.with_time(now),
5579 );
5580 }
5581 }
5582 }
5583 }
5584 }
5585
5586 let space = self.spaces[SpaceId::Data].for_path(path_id);
5587 if space
5588 .pending_acks
5589 .packet_received(now, number, ack_eliciting, &space.dedup)
5590 {
5591 if self.abandoned_paths.contains(&path_id) {
5592 space.pending_acks.set_immediate_ack_required();
5595 } else {
5596 self.timers.set(
5597 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
5598 now + self.ack_frequency.max_ack_delay,
5599 self.qlog.with_time(now),
5600 );
5601 }
5602 }
5603
5604 let pending = &mut self.spaces[SpaceId::Data].pending;
5609 self.streams.queue_max_stream_id(pending);
5610
5611 if let Some(reason) = close {
5612 self.state.move_to_draining(Some(reason.into()));
5613 self.endpoint_events.push_back(EndpointEventInner::Draining);
5614 self.connection_close_pending = true;
5615 }
5616
5617 let migrate_on_any_packet =
5620 self.is_multipath_negotiated() && !self.n0_nat_traversal.is_negotiated();
5621
5622 let is_largest_received_pn = Some(number)
5624 == self.spaces[SpaceId::Data]
5625 .for_path(path_id)
5626 .largest_received_packet_number;
5627
5628 if (migrate_on_any_packet || !is_probing_packet)
5633 && is_largest_received_pn
5634 && self.local_ip_may_migrate()
5635 && let Some(new_local_ip) = network_path.local_ip
5636 {
5637 let path_data = self.path_data_mut(path_id);
5638 if path_data
5639 .network_path
5640 .local_ip
5641 .is_some_and(|ip| ip != new_local_ip)
5642 {
5643 debug!(
5644 %path_id,
5645 new_4tuple = %network_path,
5646 prev_4tuple = %path_data.network_path,
5647 "local address passive migration"
5648 );
5649 }
5650 path_data.network_path.local_ip = Some(new_local_ip)
5651 }
5652
5653 if self.peer_may_migrate()
5655 && (migrate_on_any_packet || !is_probing_packet)
5656 && is_largest_received_pn
5657 && network_path.remote != self.path_data(path_id).network_path.remote
5658 {
5659 self.migrate(path_id, now, network_path, migration_observed_addr);
5660 self.update_remote_cid(path_id);
5662 self.spin = false;
5663 }
5664
5665 Ok(())
5666 }
5667
5668 fn open_nat_traversed_paths(&mut self, now: Instant) {
5670 while let Some(network_path) = self
5671 .n0_nat_traversal
5672 .client_side_mut()
5673 .ok()
5674 .and_then(|s| s.pop_pending_path_open())
5675 {
5676 match self.open_path_ensure(network_path, PathStatus::Backup, now) {
5677 Ok((path_id, already_existed)) => {
5678 debug!(
5679 %path_id,
5680 ?network_path,
5681 new_path = !already_existed,
5682 "Opened NAT traversal path",
5683 );
5684 }
5685 Err(err) => match err {
5686 PathError::MultipathNotNegotiated
5687 | PathError::ServerSideNotAllowed
5688 | PathError::ValidationFailed
5689 | PathError::InvalidRemoteAddress(_) => {
5690 error!(
5691 ?err,
5692 ?network_path,
5693 "Failed to open path for successful NAT traversal"
5694 );
5695 }
5696 PathError::MaxPathIdReached | PathError::RemoteCidsExhausted => {
5697 self.n0_nat_traversal
5699 .client_side_mut()
5700 .map(|s| s.push_pending_path_open(network_path))
5701 .ok();
5702 debug!(
5703 ?err,
5704 ?network_path,
5705 "Blocked opening NAT traversal path, enqueued"
5706 );
5707 return;
5708 }
5709 },
5710 }
5711 }
5712 }
5713
5714 fn migrate(
5719 &mut self,
5720 path_id: PathId,
5721 now: Instant,
5722 network_path: FourTuple,
5723 observed_addr: Option<ObservedAddr>,
5724 ) {
5725 trace!(
5726 new_4tuple = %network_path,
5727 prev_4tuple = %self.path_data(path_id).network_path,
5728 %path_id,
5729 "migration initiated",
5730 );
5731 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
5732 let prev_pto = self.pto(SpaceKind::Data, path_id);
5739 let path = self.paths.get_mut(&path_id).expect("known path");
5740 let mut new_path_data = if network_path.remote.is_ipv4()
5741 && network_path.remote.ip() == path.data.network_path.remote.ip()
5742 {
5743 PathData::from_previous(network_path, &path.data, self.path_generation_counter, now)
5744 } else {
5745 let peer_max_udp_payload_size =
5746 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
5747 .unwrap_or(u16::MAX);
5748 PathData::new(
5749 network_path,
5750 self.allow_mtud,
5751 Some(peer_max_udp_payload_size),
5752 self.path_generation_counter,
5753 now,
5754 &self.config,
5755 )
5756 };
5757 new_path_data.last_observed_addr_report = path.data.last_observed_addr_report.clone();
5758 if let Some(report) = observed_addr
5759 && let Some(updated) = new_path_data.update_observed_addr_report(report)
5760 {
5761 tracing::info!("adding observed addr event from migration");
5762 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5763 id: path_id,
5764 addr: updated,
5765 }));
5766 }
5767 new_path_data.pending_on_path_challenge = true;
5768 new_path_data.pending.observed_address = self
5769 .config
5770 .address_discovery_role
5771 .should_report(&self.peer_params.address_discovery_role);
5772
5773 let mut prev_path_data = mem::replace(&mut path.data, new_path_data);
5774
5775 if !prev_path_data.validated
5784 && let Some(cid) = self.remote_cids.get(&path_id).map(CidQueue::active)
5785 {
5786 prev_path_data.pending_on_path_challenge = true;
5787 path.prev = Some((cid, prev_path_data));
5790 }
5791
5792 self.qlog.emit_tuple_assigned(path_id, network_path, now);
5794
5795 self.timers.set(
5796 Timer::PerPath(path_id, PathTimer::PathValidationFailed),
5797 now + 3 * cmp::max(self.pto(SpaceKind::Data, path_id), prev_pto),
5798 self.qlog.with_time(now),
5799 );
5800 }
5801
5802 pub fn handle_network_change(&mut self, hint: Option<&dyn NetworkChangeHint>, now: Instant) {
5819 debug!("network changed");
5820 if self.state.is_drained() {
5821 return;
5822 }
5823 if self.highest_space < SpaceKind::Data {
5824 for path in self.paths.values_mut() {
5825 path.data.network_path.local_ip = None;
5827 }
5828
5829 self.update_remote_cid(PathId::ZERO);
5830 self.ping();
5831
5832 return;
5833 }
5834
5835 let mut non_recoverable_paths = Vec::default();
5838 let mut recoverable_paths = Vec::default();
5839 let mut open_paths = 0;
5840
5841 let is_multipath_negotiated = self.is_multipath_negotiated();
5842 let is_client = self.side().is_client();
5843 let immediate_ack_allowed = self.peer_supports_ack_frequency();
5844
5845 for (path_id, path) in self.paths.iter_mut() {
5846 if self.abandoned_paths.contains(path_id) {
5847 continue;
5848 }
5849 open_paths += 1;
5850
5851 let network_path = path.data.network_path;
5854
5855 path.data.network_path.local_ip = None;
5858 let remote = network_path.remote;
5859
5860 let attempt_to_recover = if is_multipath_negotiated {
5864 hint.map(|h| h.is_path_recoverable(*path_id, network_path))
5868 .unwrap_or(!is_client)
5869 } else {
5870 true
5872 };
5873
5874 if attempt_to_recover {
5875 recoverable_paths.push((*path_id, remote));
5876 } else {
5877 non_recoverable_paths.push((*path_id, remote, path.data.local_status()))
5878 }
5879 }
5880
5881 let open_first = open_paths == non_recoverable_paths.len();
5890
5891 for (path_id, remote, status) in non_recoverable_paths.into_iter() {
5892 let network_path = FourTuple {
5893 remote,
5894 local_ip: None, };
5896
5897 if open_first && let Err(e) = self.open_path(network_path, status, now) {
5898 if self.side().is_client() {
5899 debug!(%e, "Failed to open new path for network change");
5900 }
5901 recoverable_paths.push((path_id, remote));
5903 continue;
5904 }
5905
5906 if let Err(e) =
5907 self.close_path_inner(now, path_id, PathAbandonReason::UnusableAfterNetworkChange)
5908 {
5909 debug!(%e,"Failed to close unrecoverable path after network change");
5910 recoverable_paths.push((path_id, remote));
5911 continue;
5912 }
5913
5914 if !open_first && let Err(e) = self.open_path(network_path, status, now) {
5915 debug!(%e,"Failed to open new path for network change");
5919 }
5920 }
5921
5922 for (path_id, remote) in recoverable_paths.into_iter() {
5925 if let Some(path_space) = self.spaces[SpaceId::Data].number_spaces.get_mut(&path_id) {
5927 path_space.ping_pending = true;
5928
5929 if immediate_ack_allowed {
5930 path_space.immediate_ack_pending = true;
5931 }
5932 }
5933
5934 if let Some(path) = self.paths.get_mut(&path_id) {
5939 path.data.pto_count = 0;
5940 }
5941 self.set_loss_detection_timer(now, path_id);
5942
5943 let Some((reset_token, retired)) =
5944 self.remote_cids.get_mut(&path_id).and_then(CidQueue::next)
5945 else {
5946 continue;
5947 };
5948
5949 self.spaces[SpaceId::Data]
5951 .pending
5952 .retire_cids
5953 .extend(retired.map(|seq| (path_id, seq)));
5954
5955 debug_assert!(!self.state.is_drained()); self.endpoint_events
5957 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5958 }
5959 }
5960
5961 fn update_remote_cid(&mut self, path_id: PathId) {
5963 let Some((reset_token, retired)) = self
5964 .remote_cids
5965 .get_mut(&path_id)
5966 .and_then(|cids| cids.next())
5967 else {
5968 return;
5969 };
5970
5971 self.spaces[SpaceId::Data]
5973 .pending
5974 .retire_cids
5975 .extend(retired.map(|seq| (path_id, seq)));
5976 let remote = self.path_data(path_id).network_path.remote;
5977 self.set_reset_token(path_id, remote, reset_token);
5978 }
5979
5980 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5989 debug_assert!(!self.state.is_drained()); self.endpoint_events
5991 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5992
5993 if path_id == PathId::ZERO {
5999 self.peer_params.stateless_reset_token = Some(reset_token);
6000 }
6001 }
6002
6003 fn issue_first_cids(&mut self, now: Instant) {
6005 if self
6006 .local_cid_state
6007 .get(&PathId::ZERO)
6008 .expect("PathId::ZERO exists when the connection is created")
6009 .cid_len()
6010 == 0
6011 {
6012 return;
6013 }
6014
6015 let mut n = self.peer_params.issue_cids_limit() - 1;
6017 if let ConnectionSide::Server { server_config } = &self.side
6018 && server_config.has_preferred_address()
6019 {
6020 n -= 1;
6022 }
6023 debug_assert!(!self.state.is_drained()); self.endpoint_events
6025 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6026 }
6027
6028 fn issue_first_path_cids(&mut self, now: Instant) {
6032 if let Some(max_path_id) = self.max_path_id() {
6033 let mut path_id = self.max_path_id_with_cids.next();
6034 while path_id <= max_path_id {
6035 self.endpoint_events
6036 .push_back(EndpointEventInner::NeedIdentifiers(
6037 path_id,
6038 now,
6039 self.peer_params.issue_cids_limit(),
6040 ));
6041 path_id = path_id.next();
6042 }
6043 self.max_path_id_with_cids = max_path_id;
6044 }
6045 }
6046
6047 fn populate_packet<'a, 'b>(
6055 &mut self,
6056 now: Instant,
6057 space_id: SpaceId,
6058 path_id: PathId,
6059 scheduling_info: &PathSchedulingInfo,
6060 builder: &mut PacketBuilder<'a, 'b>,
6061 ) {
6062 let is_multipath_negotiated = self.is_multipath_negotiated();
6063 let space_has_keys = self.crypto_state.has_keys(space_id.encryption_level());
6064 let is_0rtt = space_id == SpaceId::Data && !space_has_keys;
6065 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
6066 let space = &mut self.spaces[space_id];
6067 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6068 space
6069 .for_path(path_id)
6070 .pending_acks
6071 .maybe_ack_non_eliciting();
6072
6073 if !is_0rtt
6075 && !scheduling_info.is_abandoned
6076 && scheduling_info.may_send_data
6077 && mem::replace(&mut space.pending.handshake_done, false)
6078 {
6079 builder.write_frame(frame::HandshakeDone, stats);
6080 }
6081
6082 if !scheduling_info.is_abandoned
6084 && mem::replace(&mut space.for_path(path_id).ping_pending, false)
6085 {
6086 builder.write_frame(frame::Ping, stats);
6087 }
6088
6089 if !scheduling_info.is_abandoned
6091 && mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false)
6092 {
6093 debug_assert_eq!(
6094 space_id,
6095 SpaceId::Data,
6096 "immediate acks must be sent in the data space"
6097 );
6098 builder.write_frame(frame::ImmediateAck, stats);
6099 }
6100
6101 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
6103 for path_id in space
6104 .number_spaces
6105 .iter_mut()
6106 .filter(|(_, pns)| pns.pending_acks.can_send())
6107 .map(|(&path_id, _)| path_id)
6108 .collect::<Vec<_>>()
6109 {
6110 Self::populate_acks(
6111 now,
6112 self.receiving_ecn,
6113 path_id,
6114 space_id,
6115 space,
6116 is_multipath_negotiated,
6117 builder,
6118 stats,
6119 space_has_keys,
6120 );
6121 }
6122 }
6123
6124 if !scheduling_info.is_abandoned
6126 && scheduling_info.may_send_data
6127 && mem::replace(&mut space.pending.ack_frequency, false)
6128 {
6129 let sequence_number = self.ack_frequency.next_sequence_number();
6130
6131 let config = self.config.ack_frequency_config.as_ref().unwrap();
6133
6134 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
6136 path.rtt.get(),
6137 config,
6138 &self.peer_params,
6139 );
6140
6141 let frame = frame::AckFrequency {
6142 sequence: sequence_number,
6143 ack_eliciting_threshold: config.ack_eliciting_threshold,
6144 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
6145 reordering_threshold: config.reordering_threshold,
6146 };
6147 builder.write_frame(frame, stats);
6148
6149 self.ack_frequency
6150 .ack_frequency_sent(path_id, builder.packet_number, max_ack_delay);
6151 }
6152
6153 if !scheduling_info.is_abandoned
6155 && space_id == SpaceId::Data
6156 && path.pending_on_path_challenge
6157 && !self.state.is_closed()
6159 && builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
6160 && !(builder.buf.num_datagrams() > 1 && builder.buf.segment_size() < usize::from(MIN_INITIAL_SIZE))
6164 {
6165 path.pending_on_path_challenge = false;
6166
6167 let token = self.rng.random();
6168 path.record_path_challenge_sent(now, token, path.network_path);
6169 let challenge = frame::PathChallenge(token);
6171 builder.write_frame(challenge, stats);
6172 builder.require_padding();
6173 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
6174 match path.open_status {
6175 paths::OpenStatus::Sent | paths::OpenStatus::Informed => {}
6176 paths::OpenStatus::Pending => {
6177 path.open_status = paths::OpenStatus::Sent;
6178 self.timers.set(
6179 Timer::PerPath(path_id, PathTimer::AbandonFromValidation),
6180 now + 3 * pto,
6181 self.qlog.with_time(now),
6182 );
6183 }
6184 }
6185
6186 self.timers.set(
6187 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
6188 now + path.on_path_challenge_expiry(),
6189 self.qlog.with_time(now),
6190 );
6191
6192 if is_multipath_negotiated && !path.validated && path.pending_on_path_challenge {
6193 space.pending.path_status.insert(path_id);
6195 }
6196
6197 if space_id == SpaceId::Data
6200 && self
6201 .config
6202 .address_discovery_role
6203 .should_report(&self.peer_params.address_discovery_role)
6204 {
6205 let frame = frame::ObservedAddr::new(
6206 path.network_path.remote,
6207 self.next_observed_addr_seq_no,
6208 );
6209 if builder.frame_space_remaining() > frame.size() {
6210 builder.write_frame(frame, stats);
6211
6212 self.next_observed_addr_seq_no =
6213 self.next_observed_addr_seq_no.saturating_add(1u8);
6214 path.pending.observed_address = false;
6215 }
6216 }
6217 }
6218
6219 if !scheduling_info.is_abandoned
6221 && space_id == SpaceId::Data
6222 && builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
6223 && let Some(token) = path.path_responses.pop_on_path(path.network_path)
6224 && !(builder.buf.num_datagrams() > 1 && builder.buf.segment_size() < usize::from(MIN_INITIAL_SIZE))
6228 {
6229 let response = frame::PathResponse(token);
6230 builder.write_frame(response, stats);
6231 builder.require_padding();
6232
6233 if space_id == SpaceId::Data
6237 && self
6238 .config
6239 .address_discovery_role
6240 .should_report(&self.peer_params.address_discovery_role)
6241 {
6242 let frame = frame::ObservedAddr::new(
6243 path.network_path.remote,
6244 self.next_observed_addr_seq_no,
6245 );
6246 if builder.frame_space_remaining() > frame.size() {
6247 builder.write_frame(frame, stats);
6248
6249 self.next_observed_addr_seq_no =
6250 self.next_observed_addr_seq_no.saturating_add(1u8);
6251 path.pending.observed_address = false;
6252 }
6253 }
6254 }
6255
6256 while !scheduling_info.is_abandoned
6258 && scheduling_info.may_send_data
6259 && let Some(reach_out) = space
6260 .pending
6261 .reach_out
6262 .pop_if(|frame| builder.frame_space_remaining() >= frame.size())
6263 {
6264 builder.write_frame(reach_out, stats);
6265 }
6266
6267 if space_id == SpaceId::Data
6269 && scheduling_info.is_abandoned
6270 && scheduling_info.may_self_abandon
6271 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
6272 && let Some(error_code) = space.pending.path_abandon.remove(&path_id)
6273 {
6274 let frame = frame::PathAbandon {
6275 path_id,
6276 error_code,
6277 };
6278 builder.write_frame(frame, stats);
6279
6280 self.remote_cids.remove(&path_id);
6283 }
6284 while space_id == SpaceId::Data
6285 && scheduling_info.may_send_data
6286 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
6287 && let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
6288 {
6289 let frame = frame::PathAbandon {
6290 path_id: abandoned_path_id,
6291 error_code,
6292 };
6293 builder.write_frame(frame, stats);
6294
6295 self.remote_cids.remove(&abandoned_path_id);
6298 }
6299
6300 if !scheduling_info.is_abandoned
6302 && scheduling_info.may_send_data
6303 && space_id == SpaceId::Data
6304 && path.pending.observed_address
6305 {
6306 let frame =
6307 frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
6308 if builder.frame_space_remaining() > frame.size() {
6309 builder.write_frame(frame, stats);
6310
6311 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
6312 path.pending.observed_address = false;
6313 }
6314 }
6315
6316 while !is_0rtt
6318 && !scheduling_info.is_abandoned
6319 && scheduling_info.may_send_data
6320 && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
6321 {
6322 let Some(mut frame) = space.pending.crypto.pop_front() else {
6323 break;
6324 };
6325
6326 let max_crypto_data_size = builder.frame_space_remaining()
6331 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
6333 - 2; let len = frame
6336 .data
6337 .len()
6338 .min(2usize.pow(14) - 1)
6339 .min(max_crypto_data_size);
6340
6341 let data = frame.data.split_to(len);
6342 let offset = frame.offset;
6343 let truncated = frame::Crypto { offset, data };
6344 builder.write_frame(truncated, stats);
6345
6346 if !frame.data.is_empty() {
6347 frame.offset += len as u64;
6348 space.pending.crypto.push_front(frame);
6349 }
6350 }
6351
6352 while space_id == SpaceId::Data
6354 && !scheduling_info.is_abandoned
6355 && scheduling_info.may_send_data
6356 && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
6357 {
6358 let Some(path_id) = space.pending.path_status.pop_first() else {
6359 break;
6360 };
6361 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
6362 trace!(%path_id, "discarding queued path status for unknown path");
6363 continue;
6364 };
6365
6366 let seq = path.status.seq();
6367 match path.local_status() {
6368 PathStatus::Available => {
6369 let frame = frame::PathStatusAvailable {
6370 path_id,
6371 status_seq_no: seq,
6372 };
6373 builder.write_frame(frame, stats);
6374 }
6375 PathStatus::Backup => {
6376 let frame = frame::PathStatusBackup {
6377 path_id,
6378 status_seq_no: seq,
6379 };
6380 builder.write_frame(frame, stats);
6381 }
6382 }
6383 }
6384
6385 if space_id == SpaceId::Data
6387 && !scheduling_info.is_abandoned
6388 && scheduling_info.may_send_data
6389 && space.pending.max_path_id
6390 && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
6391 {
6392 let frame = frame::MaxPathId(self.local_max_path_id);
6393 builder.write_frame(frame, stats);
6394 space.pending.max_path_id = false;
6395 }
6396
6397 if space_id == SpaceId::Data
6399 && !scheduling_info.is_abandoned
6400 && scheduling_info.may_send_data
6401 && space.pending.paths_blocked
6402 && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
6403 {
6404 let frame = frame::PathsBlocked(self.remote_max_path_id);
6405 builder.write_frame(frame, stats);
6406 space.pending.paths_blocked = false;
6407 }
6408
6409 while space_id == SpaceId::Data
6411 && !scheduling_info.is_abandoned
6412 && scheduling_info.may_send_data
6413 && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
6414 {
6415 let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
6416 break;
6417 };
6418 let next_seq = match self.remote_cids.get(&path_id) {
6419 Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
6420 None => VarInt(0),
6421 };
6422 let frame = frame::PathCidsBlocked { path_id, next_seq };
6423 builder.write_frame(frame, stats);
6424 }
6425
6426 if space_id == SpaceId::Data
6428 && !scheduling_info.is_abandoned
6429 && scheduling_info.may_send_data
6430 {
6431 self.streams
6432 .write_control_frames(builder, &mut space.pending, stats);
6433 }
6434
6435 let cid_len = self
6437 .local_cid_state
6438 .values()
6439 .map(|cid_state| cid_state.cid_len())
6440 .max()
6441 .expect("some local CID state must exist");
6442 let new_cid_size_bound =
6443 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
6444 while !scheduling_info.is_abandoned
6445 && scheduling_info.may_send_data
6446 && builder.frame_space_remaining() > new_cid_size_bound
6447 {
6448 let Some(issued) = space.pending.new_cids.pop() else {
6449 break;
6450 };
6451 let Some(cid_state) = self.local_cid_state.get(&issued.path_id) else {
6453 debug!(
6454 path = %issued.path_id, seq = issued.sequence,
6455 "dropping queued NEW_CONNECTION_ID for discarded path",
6456 );
6457 continue;
6458 };
6459 let retire_prior_to = cid_state.retire_prior_to();
6460
6461 let cid_path_id = match is_multipath_negotiated {
6462 true => Some(issued.path_id),
6463 false => {
6464 debug_assert_eq!(issued.path_id, PathId::ZERO);
6465 None
6466 }
6467 };
6468 let frame = frame::NewConnectionId {
6469 path_id: cid_path_id,
6470 sequence: issued.sequence,
6471 retire_prior_to,
6472 id: issued.id,
6473 reset_token: issued.reset_token,
6474 };
6475 builder.write_frame(frame, stats);
6476 }
6477
6478 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
6480 while !scheduling_info.is_abandoned
6481 && scheduling_info.may_send_data
6482 && builder.frame_space_remaining() > retire_cid_bound
6483 {
6484 let (path_id, sequence) = match space.pending.retire_cids.pop() {
6485 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
6486 Some((path_id, seq)) => (Some(path_id), seq),
6487 None => break,
6488 };
6489 let frame = frame::RetireConnectionId { path_id, sequence };
6490 builder.write_frame(frame, stats);
6491 }
6492
6493 let mut sent_datagrams = false;
6495 while !scheduling_info.is_abandoned
6496 && scheduling_info.may_send_data
6497 && builder.frame_space_remaining() > Datagram::SIZE_BOUND
6498 && space_id == SpaceId::Data
6499 {
6500 match self.datagrams.write(builder, stats) {
6501 true => {
6502 sent_datagrams = true;
6503 }
6504 false => break,
6505 }
6506 }
6507 if self.datagrams.send_blocked && sent_datagrams {
6508 self.events.push_back(Event::DatagramsUnblocked);
6509 self.datagrams.send_blocked = false;
6510 }
6511
6512 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6513
6514 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
6516 while let Some(network_path) = space.pending.new_tokens.pop() {
6517 debug_assert_eq!(space_id, SpaceId::Data);
6518 let ConnectionSide::Server { server_config } = &self.side else {
6519 panic!("NEW_TOKEN frames should not be enqueued by clients");
6520 };
6521
6522 if !network_path.is_probably_same_path(&path.network_path) {
6523 continue;
6528 }
6529
6530 let token = Token::new(
6531 TokenPayload::Validation {
6532 ip: network_path.remote.ip(),
6533 issued: server_config.time_source.now(),
6534 },
6535 &mut self.rng,
6536 );
6537 let new_token = NewToken {
6538 token: token.encode(&*server_config.token_key).into(),
6539 };
6540
6541 if builder.frame_space_remaining() < new_token.size() {
6542 space.pending.new_tokens.push(network_path);
6543 break;
6544 }
6545
6546 builder.write_frame(new_token, stats);
6547 builder.retransmits_mut().new_tokens.push(network_path);
6548 }
6549 }
6550
6551 while space_id == SpaceId::Data
6553 && !scheduling_info.is_abandoned
6554 && scheduling_info.may_send_data
6555 && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
6556 {
6557 if let Some(added_address) = space.pending.add_address.pop_last() {
6558 builder.write_frame(added_address, stats);
6559 } else {
6560 break;
6561 }
6562 }
6563
6564 while space_id == SpaceId::Data
6566 && !scheduling_info.is_abandoned
6567 && scheduling_info.may_send_data
6568 && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
6569 {
6570 if let Some(removed_address) = space.pending.remove_address.pop_last() {
6571 builder.write_frame(removed_address, stats);
6572 } else {
6573 break;
6574 }
6575 }
6576
6577 if !scheduling_info.is_abandoned
6579 && scheduling_info.may_send_data
6580 && space_id == SpaceId::Data
6581 {
6582 self.streams
6583 .write_stream_frames(builder, self.config.send_fairness, stats);
6584 }
6585 }
6586
6587 fn populate_acks<'a, 'b>(
6589 now: Instant,
6590 receiving_ecn: bool,
6591 path_id: PathId,
6592 space_id: SpaceId,
6593 space: &mut PacketSpace,
6594 is_multipath_negotiated: bool,
6595 builder: &mut PacketBuilder<'a, 'b>,
6596 stats: &mut FrameStats,
6597 space_has_keys: bool,
6598 ) {
6599 debug_assert!(space_has_keys, "tried to send ACK in 0-RTT");
6601
6602 debug_assert!(
6603 is_multipath_negotiated || path_id == PathId::ZERO,
6604 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
6605 );
6606 if is_multipath_negotiated {
6607 debug_assert!(
6608 space_id == SpaceId::Data || path_id == PathId::ZERO,
6609 "path acks must be sent in 1RTT space (have {space_id:?})"
6610 );
6611 }
6612
6613 let pns = space.for_path(path_id);
6614 let ranges = pns.pending_acks.ranges();
6615 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
6616 let ecn = if receiving_ecn {
6617 Some(&pns.ecn_counters)
6618 } else {
6619 None
6620 };
6621
6622 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
6623 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
6625 let delay = delay_micros >> ack_delay_exp.into_inner();
6626
6627 if is_multipath_negotiated && space_id == SpaceId::Data {
6628 if !ranges.is_empty() {
6629 let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
6630 builder.write_frame(frame, stats);
6631 }
6632 } else {
6633 builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
6634 }
6635 }
6636
6637 fn close_common(&mut self) {
6638 trace!("connection closed");
6639 self.timers.reset();
6640 }
6641
6642 fn set_close_timer(&mut self, now: Instant) {
6643 let pto_max = self.max_pto_for_space(self.highest_space);
6646 self.timers.set(
6647 Timer::Conn(ConnTimer::Close),
6648 now + 3 * pto_max,
6649 self.qlog.with_time(now),
6650 );
6651 }
6652
6653 fn handle_peer_params(
6658 &mut self,
6659 params: TransportParameters,
6660 local_cid: ConnectionId,
6661 remote_cid: ConnectionId,
6662 now: Instant,
6663 ) -> Result<(), TransportError> {
6664 if Some(self.original_remote_cid) != params.initial_src_cid
6665 || (self.side.is_client()
6666 && (Some(self.initial_dst_cid) != params.original_dst_cid
6667 || self.retry_src_cid != params.retry_src_cid))
6668 {
6669 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
6670 "CID authentication failure",
6671 ));
6672 }
6673 if params.initial_max_path_id.is_some() && (local_cid.is_empty() || remote_cid.is_empty()) {
6674 return Err(TransportError::PROTOCOL_VIOLATION(
6675 "multipath must not use zero-length CIDs",
6676 ));
6677 }
6678
6679 self.set_peer_params(params);
6680 self.qlog.emit_peer_transport_params_received(self, now);
6681
6682 Ok(())
6683 }
6684
6685 fn set_peer_params(&mut self, params: TransportParameters) {
6686 self.streams.set_params(¶ms);
6687 self.idle_timeout =
6688 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
6689 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
6690
6691 if let Some(ref info) = params.preferred_address {
6692 self.remote_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
6694 path_id: None,
6695 sequence: 1,
6696 id: info.connection_id,
6697 reset_token: info.stateless_reset_token,
6698 retire_prior_to: 0,
6699 })
6700 .expect(
6701 "preferred address CID is the first received, and hence is guaranteed to be legal",
6702 );
6703 let remote = self.path_data(PathId::ZERO).network_path.remote;
6704 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
6705 }
6706 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
6707
6708 let mut multipath_enabled = false;
6709 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
6710 self.config.get_initial_max_path_id(),
6711 params.initial_max_path_id,
6712 ) {
6713 self.local_max_path_id = local_max_path_id;
6715 self.remote_max_path_id = remote_max_path_id;
6716 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
6717 debug!(%initial_max_path_id, "multipath negotiated");
6718 multipath_enabled = true;
6719 }
6720
6721 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
6722 self.config
6723 .max_remote_nat_traversal_addresses
6724 .zip(params.max_remote_nat_traversal_addresses)
6725 {
6726 if multipath_enabled {
6727 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
6728 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
6729 self.n0_nat_traversal = n0_nat_traversal::State::new(
6730 max_remote_addresses,
6731 max_local_addresses,
6732 self.side(),
6733 );
6734 debug!(
6735 %max_remote_addresses, %max_local_addresses,
6736 "n0's nat traversal negotiated"
6737 );
6738 } else {
6739 debug!("n0 nat traversal enabled for both endpoints, but multipath is missing")
6740 }
6741 }
6742
6743 self.peer_params = params;
6744 let peer_max_udp_payload_size =
6745 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
6746 let address_discovery_negotiated = self
6747 .config
6748 .address_discovery_role
6749 .should_report(&self.peer_params.address_discovery_role);
6750
6751 let path = self.path_data_mut(PathId::ZERO);
6752 path.pending.observed_address = address_discovery_negotiated;
6753 path.mtud
6754 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
6755 }
6756
6757 fn decrypt_packet(
6759 &mut self,
6760 now: Instant,
6761 path_id: PathId,
6762 packet: &mut Packet,
6763 ) -> Result<Option<u64>, Option<TransportError>> {
6764 let result = self
6765 .crypto_state
6766 .decrypt_packet_body(packet, path_id, &self.spaces)?;
6767
6768 let Some(result) = result else {
6769 return Ok(None);
6770 };
6771
6772 if result.outgoing_key_update_acked
6773 && let Some(prev) = self.crypto_state.prev_crypto.as_mut()
6774 {
6775 prev.end_packet = Some((result.packet_number, now));
6776 self.set_key_discard_timer(now, packet.header.space());
6777 }
6778
6779 if result.incoming_key_update {
6780 trace!("key update authenticated");
6781 self.crypto_state
6782 .update_keys(Some((result.packet_number, now)), true);
6783 self.set_key_discard_timer(now, packet.header.space());
6784 }
6785
6786 Ok(Some(result.packet_number))
6787 }
6788
6789 fn peer_supports_ack_frequency(&self) -> bool {
6790 self.peer_params.min_ack_delay.is_some()
6791 }
6792
6793 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
6798 debug_assert_eq!(
6799 self.highest_space,
6800 SpaceKind::Data,
6801 "immediate ack must be written in the data space"
6802 );
6803 self.spaces[SpaceId::Data]
6804 .for_path(path_id)
6805 .immediate_ack_pending = true;
6806 }
6807
6808 #[cfg(test)]
6810 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
6811 let ConnectionEventInner::Datagram(DatagramConnectionEvent {
6812 path_id,
6813 first_decode,
6814 remaining,
6815 ..
6816 }) = &event.0
6817 else {
6818 return None;
6819 };
6820
6821 if remaining.is_some() {
6822 panic!("Packets should never be coalesced in tests");
6823 }
6824
6825 let decrypted_header = self
6826 .crypto_state
6827 .unprotect_header(first_decode.clone(), self.peer_params.stateless_reset_token)?;
6828
6829 let mut packet = decrypted_header.packet?;
6830 self.crypto_state
6831 .decrypt_packet_body(&mut packet, *path_id, &self.spaces)
6832 .ok()?;
6833
6834 Some(packet.payload.to_vec())
6835 }
6836
6837 #[cfg(test)]
6840 pub(crate) fn bytes_in_flight(&self) -> u64 {
6841 self.path_data(PathId::ZERO).in_flight.bytes
6843 }
6844
6845 #[cfg(test)]
6847 pub(crate) fn congestion_window(&self) -> u64 {
6848 let path = self.path_data(PathId::ZERO);
6849 path.congestion
6850 .window()
6851 .saturating_sub(path.in_flight.bytes)
6852 }
6853
6854 #[cfg(test)]
6856 pub(crate) fn is_idle(&self) -> bool {
6857 let current_timers = self.timers.values();
6858 current_timers
6859 .into_iter()
6860 .filter(|(timer, _)| {
6861 !matches!(
6862 timer,
6863 Timer::Conn(ConnTimer::KeepAlive)
6864 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6865 | Timer::Conn(ConnTimer::PushNewCid)
6866 | Timer::Conn(ConnTimer::KeyDiscard)
6867 )
6868 })
6869 .min_by_key(|(_, time)| *time)
6870 .is_none_or(|(timer, _)| {
6871 matches!(
6872 timer,
6873 Timer::Conn(ConnTimer::Idle) | Timer::PerPath(_, PathTimer::PathIdle)
6874 )
6875 })
6876 }
6877
6878 #[cfg(test)]
6880 pub(crate) fn using_ecn(&self) -> bool {
6881 self.path_data(PathId::ZERO).sending_ecn
6882 }
6883
6884 #[cfg(test)]
6886 pub(crate) fn total_recvd(&self) -> u64 {
6887 self.path_data(PathId::ZERO).total_recvd
6888 }
6889
6890 #[cfg(test)]
6891 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6892 self.local_cid_state
6893 .get(&PathId::ZERO)
6894 .unwrap()
6895 .active_seq()
6896 }
6897
6898 #[cfg(test)]
6899 #[track_caller]
6900 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6901 self.local_cid_state
6902 .get(&PathId(path_id))
6903 .unwrap()
6904 .active_seq()
6905 }
6906
6907 #[cfg(test)]
6910 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6911 let n = self
6912 .local_cid_state
6913 .get_mut(&PathId::ZERO)
6914 .unwrap()
6915 .assign_retire_seq(v);
6916 debug_assert!(!self.state.is_drained()); self.endpoint_events
6918 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6919 }
6920
6921 #[cfg(test)]
6923 pub(crate) fn active_remote_cid_seq(&self) -> u64 {
6924 self.remote_cids.get(&PathId::ZERO).unwrap().active_seq()
6925 }
6926
6927 #[cfg(test)]
6929 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6930 self.path_data(path_id).current_mtu()
6931 }
6932
6933 #[cfg(test)]
6935 pub(crate) fn trigger_path_validation(&mut self) {
6936 for path in self.paths.values_mut() {
6937 path.data.pending_on_path_challenge = true;
6938 }
6939 }
6940
6941 #[cfg(test)]
6943 pub fn simulate_protocol_violation(&mut self, now: Instant) {
6944 if !self.state.is_closed() {
6945 self.state
6946 .move_to_closed(TransportError::PROTOCOL_VIOLATION("simulated violation"));
6947 self.close_common();
6948 if !self.state.is_drained() {
6949 self.set_close_timer(now);
6950 }
6951 self.connection_close_pending = true;
6952 }
6953 }
6954
6955 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6966 let space_specific = self.paths.get(&path_id).is_some_and(|path| {
6967 path.data.pending_on_path_challenge
6968 || !path.data.path_responses.is_empty()
6969 || !path.data.pending.is_empty()
6970 });
6971
6972 let other = self.streams.can_send_stream_data()
6974 || self
6975 .datagrams
6976 .outgoing
6977 .front()
6978 .is_some_and(|x| x.size(true) <= max_size);
6979
6980 SendableFrames {
6982 acks: false,
6983 close: false,
6984 space_specific,
6985 other,
6986 }
6987 }
6988
6989 fn kill(&mut self, reason: ConnectionError) {
6991 self.close_common();
6992 let was_draining = self.state.move_to_drained(Some(reason));
6993 if !was_draining {
6994 self.endpoint_events.push_back(EndpointEventInner::Draining);
6995 }
6996 self.endpoint_events.push_back(EndpointEventInner::Drained);
6999 }
7000
7001 pub fn current_mtu(&self) -> u16 {
7008 self.paths
7009 .iter()
7010 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
7011 .map(|(_path_id, path_state)| path_state.data.current_mtu())
7012 .min()
7013 .unwrap_or(INITIAL_MTU)
7014 }
7015
7016 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
7023 let pn_len = PacketNumber::new(
7024 pn,
7025 self.spaces[SpaceId::Data]
7026 .for_path(path)
7027 .largest_acked_packet_pn
7028 .unwrap_or(0),
7029 )
7030 .len();
7031
7032 1 + self
7034 .remote_cids
7035 .get(&path)
7036 .map(|cids| cids.active().len())
7037 .unwrap_or(20) + pn_len
7039 + self.tag_len_1rtt()
7040 }
7041
7042 fn predict_1rtt_overhead_no_pn(&self) -> usize {
7043 let pn_len = 4;
7044
7045 let cid_len = self
7046 .remote_cids
7047 .values()
7048 .map(|cids| cids.active().len())
7049 .max()
7050 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
7054 }
7055
7056 fn tag_len_1rtt(&self) -> usize {
7057 let packet_crypto = self
7059 .crypto_state
7060 .encryption_keys(SpaceKind::Data, self.side.side())
7061 .map(|(_header, packet, _level)| packet);
7062 packet_crypto.map_or(16, |x| x.tag_len())
7066 }
7067
7068 fn on_path_validated(&mut self, path_id: PathId) {
7070 self.path_data_mut(path_id).validated = true;
7071 let ConnectionSide::Server { server_config } = &self.side else {
7072 return;
7073 };
7074 let network_path = self.path_data(path_id).network_path;
7075 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
7076 new_tokens.clear();
7077 for _ in 0..server_config.validation_token.sent {
7078 new_tokens.push(network_path);
7079 }
7080 }
7081
7082 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
7084 if let Some(path) = self.paths.get_mut(&path_id) {
7085 path.data.status.remote_update(status, status_seq_no);
7086 } else {
7087 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
7088 }
7089 self.events.push_back(
7090 PathEvent::RemoteStatus {
7091 id: path_id,
7092 status,
7093 }
7094 .into(),
7095 );
7096 }
7097
7098 fn max_path_id(&self) -> Option<PathId> {
7107 if self.is_multipath_negotiated() {
7108 Some(self.remote_max_path_id.min(self.local_max_path_id))
7109 } else {
7110 None
7111 }
7112 }
7113
7114 pub(crate) fn is_ipv6(&self) -> bool {
7119 self.paths
7120 .values()
7121 .any(|p| p.data.network_path.remote.is_ipv6())
7122 }
7123
7124 pub fn add_nat_traversal_address(
7126 &mut self,
7127 address: SocketAddr,
7128 ) -> Result<(), n0_nat_traversal::Error> {
7129 if let Some(added) = self.n0_nat_traversal.add_local_address(address)? {
7130 self.spaces[SpaceId::Data].pending.add_address.insert(added);
7131 };
7132 Ok(())
7133 }
7134
7135 pub fn remove_nat_traversal_address(
7139 &mut self,
7140 address: SocketAddr,
7141 ) -> Result<(), n0_nat_traversal::Error> {
7142 if let Some(removed) = self.n0_nat_traversal.remove_local_address(address)? {
7143 self.spaces[SpaceId::Data]
7144 .pending
7145 .remove_address
7146 .insert(removed);
7147 }
7148 Ok(())
7149 }
7150
7151 pub fn get_local_nat_traversal_addresses(
7153 &self,
7154 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7155 self.n0_nat_traversal.get_local_nat_traversal_addresses()
7156 }
7157
7158 pub fn get_remote_nat_traversal_addresses(
7160 &self,
7161 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7162 Ok(self
7163 .n0_nat_traversal
7164 .client_side()?
7165 .get_remote_nat_traversal_addresses())
7166 }
7167
7168 pub fn initiate_nat_traversal_round(
7180 &mut self,
7181 now: Instant,
7182 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7183 if self.state.is_closed() {
7184 return Err(n0_nat_traversal::Error::Closed);
7185 }
7186
7187 let ipv6 = self.is_ipv6();
7188 let client_state = self.n0_nat_traversal.client_side_mut()?;
7189 let (mut reach_out_frames, probed_addrs) =
7190 client_state.initiate_nat_traversal_round(ipv6)?;
7191 if let Some(delay) = self.n0_nat_traversal.retry_delay(self.config.initial_rtt) {
7192 self.timers.set(
7193 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
7194 now + delay,
7195 self.qlog.with_time(now),
7196 );
7197 }
7198
7199 self.spaces[SpaceId::Data]
7200 .pending
7201 .reach_out
7202 .append(&mut reach_out_frames);
7203
7204 Ok(probed_addrs)
7205 }
7206
7207 fn is_handshake_confirmed(&self) -> bool {
7216 !self.is_handshaking() && !self.crypto_state.has_keys(EncryptionLevel::Handshake)
7217 }
7218}
7219
7220impl fmt::Debug for Connection {
7221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
7222 f.debug_struct("Connection")
7223 .field("handshake_cid", &self.handshake_cid)
7224 .finish()
7225 }
7226}
7227
7228#[derive(Debug, Default)]
7234struct AbandonedPaths(ArrayRangeSet<ABANDONED_PATH_INLINE_RANGES, u32>);
7235
7236const ABANDONED_PATH_INLINE_RANGES: usize = 16;
7241
7242impl AbandonedPaths {
7243 fn len(&self) -> u32 {
7245 self.0.elts_count()
7246 }
7247
7248 fn max(&self) -> Option<PathId> {
7250 self.0.max().map(PathId::from)
7251 }
7252
7253 fn contains(&self, val: &PathId) -> bool {
7255 self.0.contains(val.as_u32())
7256 }
7257
7258 fn insert(&mut self, val: PathId) {
7260 self.0.insert_one(val.as_u32());
7261 }
7262}
7263
7264pub trait NetworkChangeHint: std::fmt::Debug + 'static {
7266 fn is_path_recoverable(&self, path_id: PathId, network_path: FourTuple) -> bool;
7275}
7276
7277#[derive(Debug)]
7279enum PollPathSpaceStatus {
7280 NothingToSend {
7282 congestion_blocked: bool,
7284 },
7285 WrotePacket {
7287 last_packet_number: u64,
7289 pad_datagram: PadDatagram,
7303 },
7304 Send {
7311 last_packet_number: u64,
7313 },
7314}
7315
7316#[derive(Debug, Copy, Clone)]
7322struct PathSchedulingInfo {
7323 is_abandoned: bool,
7329 may_send_data: bool,
7347 may_send_close: bool,
7353 may_self_abandon: bool,
7354}
7355
7356#[derive(Debug, Copy, Clone, PartialEq, Eq)]
7357enum PathBlocked {
7358 No,
7359 AntiAmplification,
7360 Congestion,
7361 Pacing,
7362}
7363
7364enum ConnectionSide {
7366 Client {
7367 token: Bytes,
7369 token_store: Arc<dyn TokenStore>,
7370 server_name: String,
7371 },
7372 Server {
7373 server_config: Arc<ServerConfig>,
7374 },
7375}
7376
7377impl ConnectionSide {
7378 fn is_client(&self) -> bool {
7379 self.side().is_client()
7380 }
7381
7382 fn is_server(&self) -> bool {
7383 self.side().is_server()
7384 }
7385
7386 fn side(&self) -> Side {
7387 match *self {
7388 Self::Client { .. } => Side::Client,
7389 Self::Server { .. } => Side::Server,
7390 }
7391 }
7392}
7393
7394impl From<SideArgs> for ConnectionSide {
7395 fn from(side: SideArgs) -> Self {
7396 match side {
7397 SideArgs::Client {
7398 token_store,
7399 server_name,
7400 } => Self::Client {
7401 token: token_store.take(&server_name).unwrap_or_default(),
7402 token_store,
7403 server_name,
7404 },
7405 SideArgs::Server {
7406 server_config,
7407 pref_addr_cid: _,
7408 path_validated: _,
7409 } => Self::Server { server_config },
7410 }
7411 }
7412}
7413
7414pub(crate) enum SideArgs {
7416 Client {
7417 token_store: Arc<dyn TokenStore>,
7418 server_name: String,
7419 },
7420 Server {
7421 server_config: Arc<ServerConfig>,
7422 pref_addr_cid: Option<ConnectionId>,
7423 path_validated: bool,
7424 },
7425}
7426
7427impl SideArgs {
7428 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
7429 match *self {
7430 Self::Client { .. } => None,
7431 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
7432 }
7433 }
7434
7435 pub(crate) fn path_validated(&self) -> bool {
7436 match *self {
7437 Self::Client { .. } => true,
7438 Self::Server { path_validated, .. } => path_validated,
7439 }
7440 }
7441
7442 pub(crate) fn side(&self) -> Side {
7443 match *self {
7444 Self::Client { .. } => Side::Client,
7445 Self::Server { .. } => Side::Server,
7446 }
7447 }
7448}
7449
7450#[derive(Debug, Error, Clone, PartialEq, Eq)]
7452pub enum ConnectionError {
7453 #[error("peer doesn't implement any supported version")]
7455 VersionMismatch,
7456 #[error(transparent)]
7458 TransportError(#[from] TransportError),
7459 #[error("aborted by peer: {0}")]
7461 ConnectionClosed(frame::ConnectionClose),
7462 #[error("closed by peer: {0}")]
7464 ApplicationClosed(frame::ApplicationClose),
7465 #[error("reset by peer")]
7467 Reset,
7468 #[error("timed out")]
7474 TimedOut,
7475 #[error("closed")]
7477 LocallyClosed,
7478 #[error("CIDs exhausted")]
7482 CidsExhausted,
7483}
7484
7485impl From<Close> for ConnectionError {
7486 fn from(x: Close) -> Self {
7487 match x {
7488 Close::Connection(reason) => Self::ConnectionClosed(reason),
7489 Close::Application(reason) => Self::ApplicationClosed(reason),
7490 }
7491 }
7492}
7493
7494impl From<ConnectionError> for io::Error {
7496 fn from(x: ConnectionError) -> Self {
7497 use ConnectionError::*;
7498 let kind = match x {
7499 TimedOut => io::ErrorKind::TimedOut,
7500 Reset => io::ErrorKind::ConnectionReset,
7501 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
7502 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
7503 io::ErrorKind::Other
7504 }
7505 };
7506 Self::new(kind, x)
7507 }
7508}
7509
7510#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
7513pub enum PathError {
7514 #[error("multipath extension not negotiated")]
7516 MultipathNotNegotiated,
7517 #[error("the server side may not open a path")]
7519 ServerSideNotAllowed,
7520 #[error("maximum number of concurrent paths reached")]
7522 MaxPathIdReached,
7523 #[error("remoted CIDs exhausted")]
7525 RemoteCidsExhausted,
7526 #[error("path validation failed")]
7528 ValidationFailed,
7529 #[error("invalid remote address")]
7531 InvalidRemoteAddress(SocketAddr),
7532}
7533
7534#[derive(Debug, Error, Clone, Eq, PartialEq)]
7536pub enum ClosePathError {
7537 #[error("Multipath extension not negotiated")]
7539 MultipathNotNegotiated,
7540 #[error("closed path")]
7542 ClosedPath,
7543 #[error("last open path")]
7547 LastOpenPath,
7548}
7549
7550#[derive(Debug, Error, Clone, Copy)]
7552#[error("Multipath extension not negotiated")]
7553pub struct MultipathNotNegotiated {
7554 _private: (),
7555}
7556
7557#[derive(Debug)]
7559pub enum Event {
7560 HandshakeDataReady,
7562 Connected,
7564 HandshakeConfirmed,
7566 ConnectionLost {
7573 reason: ConnectionError,
7575 },
7576 Stream(StreamEvent),
7578 DatagramReceived,
7580 DatagramsUnblocked,
7582 Path(PathEvent),
7584 NatTraversal(n0_nat_traversal::Event),
7586}
7587
7588impl From<PathEvent> for Event {
7589 fn from(source: PathEvent) -> Self {
7590 Self::Path(source)
7591 }
7592}
7593
7594fn get_max_ack_delay(params: &TransportParameters) -> Duration {
7595 Duration::from_micros(params.max_ack_delay.0 * 1000)
7596}
7597
7598const MAX_BACKOFF_EXPONENT: u32 = 16;
7600
7601const MAX_PTO_INTERVAL: Duration = Duration::from_secs(2);
7605
7606const MIN_IDLE_FOR_FAST_PTO: Duration = Duration::from_secs(25);
7608
7609const MAX_PTO_FAST_INTERVAL: Duration = Duration::from_secs(1);
7614
7615const SLOW_RTT_THRESHOLD: Duration =
7620 Duration::from_millis((MAX_PTO_INTERVAL.as_millis() as u64 * 2) / 3);
7621
7622const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
7630
7631const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
7637 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
7638
7639#[derive(Default)]
7640struct SentFrames {
7641 retransmits: ThinRetransmits,
7642 path_retransmits: PathRetransmits,
7643 largest_acked: FxHashMap<PathId, u64>,
7645 stream_frames: StreamMetaVec,
7646 non_retransmits: bool,
7648 requires_padding: bool,
7650}
7651
7652impl SentFrames {
7653 fn is_ack_only(&self, streams: &StreamsState) -> bool {
7655 !self.largest_acked.is_empty()
7656 && !self.non_retransmits
7657 && self.stream_frames.is_empty()
7658 && self.retransmits.is_empty(streams)
7659 }
7660
7661 fn retransmits_mut(&mut self) -> &mut Retransmits {
7662 self.retransmits.get_or_create()
7663 }
7664
7665 fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
7666 use frame::EncodableFrame::*;
7667 match frame {
7668 PathAck(path_ack_encoder) => {
7669 if let Some(max) = path_ack_encoder.ranges.max() {
7670 self.largest_acked.insert(path_ack_encoder.path_id, max);
7671 }
7672 }
7673 Ack(ack_encoder) => {
7674 if let Some(max) = ack_encoder.ranges.max() {
7675 self.largest_acked.insert(PathId::ZERO, max);
7676 }
7677 }
7678 Close(_) => { }
7679 PathResponse(_) => self.non_retransmits = true,
7680 HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
7681 ReachOut(frame) => self.retransmits_mut().reach_out.push(frame),
7682 ObservedAddr(_) => self.path_retransmits.observed_address = true,
7683 Ping(_) => self.non_retransmits = true,
7684 ImmediateAck(_) => self.non_retransmits = true,
7685 AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
7686 PathChallenge(_) => self.non_retransmits = true,
7687 Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
7688 PathAbandon(path_abandon) => {
7689 self.retransmits_mut()
7690 .path_abandon
7691 .entry(path_abandon.path_id)
7692 .or_insert(path_abandon.error_code);
7693 }
7694 PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
7695 | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
7696 self.retransmits_mut().path_status.insert(path_id);
7697 }
7698 MaxPathId(_) => self.retransmits_mut().max_path_id = true,
7699 PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
7700 PathCidsBlocked(path_cids_blocked) => {
7701 self.retransmits_mut()
7702 .path_cids_blocked
7703 .insert(path_cids_blocked.path_id);
7704 }
7705 ResetStream(reset) => self
7706 .retransmits_mut()
7707 .reset_stream
7708 .push((reset.id, reset.error_code)),
7709 StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
7710 NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
7711 RetireConnectionId(retire_cid) => self
7712 .retransmits_mut()
7713 .retire_cids
7714 .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
7715 Datagram(_) => self.non_retransmits = true,
7716 NewToken(_) => {}
7717 AddAddress(add_address) => {
7718 self.retransmits_mut().add_address.insert(add_address);
7719 }
7720 RemoveAddress(remove_address) => {
7721 self.retransmits_mut().remove_address.insert(remove_address);
7722 }
7723 StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
7724 MaxData(_) => self.retransmits_mut().max_data = true,
7725 MaxStreamData(max) => {
7726 self.retransmits_mut().max_stream_data.insert(max.id);
7727 }
7728 MaxStreams(max_streams) => {
7729 self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
7730 }
7731 StreamsBlocked(streams_blocked) => {
7732 self.retransmits_mut().streams_blocked[streams_blocked.dir as usize] = true
7733 }
7734 }
7735 }
7736}
7737
7738fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
7746 match (x, y) {
7747 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
7748 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
7749 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
7750 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
7751 }
7752}
7753
7754#[cfg(test)]
7755mod tests {
7756 use super::*;
7757
7758 #[test]
7759 fn negotiate_max_idle_timeout_commutative() {
7760 let test_params = [
7761 (None, None, None),
7762 (None, Some(VarInt(0)), None),
7763 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7764 (Some(VarInt(0)), Some(VarInt(0)), None),
7765 (
7766 Some(VarInt(2)),
7767 Some(VarInt(0)),
7768 Some(Duration::from_millis(2)),
7769 ),
7770 (
7771 Some(VarInt(1)),
7772 Some(VarInt(4)),
7773 Some(Duration::from_millis(1)),
7774 ),
7775 ];
7776
7777 for (left, right, result) in test_params {
7778 assert_eq!(negotiate_max_idle_timeout(left, right), result);
7779 assert_eq!(negotiate_max_idle_timeout(right, left), result);
7780 }
7781 }
7782
7783 #[test]
7784 fn abandoned_paths() {
7785 let mut t = AbandonedPaths::default();
7786
7787 t.insert(PathId(0));
7788 t.insert(PathId(1));
7789 assert_eq!(t.len(), 2);
7790 assert_eq!(t.0.range_count(), 1); assert!(t.contains(&PathId(0)));
7792 assert!(t.contains(&PathId(1)));
7793 assert!(!t.contains(&PathId(2)));
7794 assert!(!t.contains(&PathId(3)));
7795 assert_eq!(t.max(), Some(PathId(1)));
7796
7797 t.insert(PathId(3));
7798 assert_eq!(t.len(), 3);
7799 assert_eq!(t.0.range_count(), 2); assert!(t.contains(&PathId(0)));
7801 assert!(t.contains(&PathId(1)));
7802 assert!(!t.contains(&PathId(2)));
7803 assert!(t.contains(&PathId(3)));
7804 assert_eq!(t.max(), Some(PathId(3)));
7805
7806 t.insert(PathId(2));
7807 assert_eq!(t.len(), 4);
7808 assert_eq!(t.0.range_count(), 1); assert!(t.contains(&PathId(0)));
7810 assert!(t.contains(&PathId(1)));
7811 assert!(t.contains(&PathId(2)));
7812 assert!(t.contains(&PathId(3)));
7813 assert_eq!(t.max(), Some(PathId(3)));
7814 }
7815}